From 312abdc34bd317b876d9b27a32d7599f308b31f0 Mon Sep 17 00:00:00 2001 From: admin Date: Wed, 13 May 2026 14:07:12 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8Drabbitmq.sh=E8=84=9A?= =?UTF-8?q?=E6=9C=AC=E6=89=A7=E8=A1=8C=E5=90=8E=E6=B2=A1=E6=9C=89=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/bin/bak_rabbitmq.sh | 958 ++++++++++++++++++++++++++++++++++++ backend/bin/rabbitmq.sh | 466 ++++++++++++------ 2 files changed, 1286 insertions(+), 138 deletions(-) create mode 100644 backend/bin/bak_rabbitmq.sh diff --git a/backend/bin/bak_rabbitmq.sh b/backend/bin/bak_rabbitmq.sh new file mode 100644 index 0000000..1a2aab2 --- /dev/null +++ b/backend/bin/bak_rabbitmq.sh @@ -0,0 +1,958 @@ +#!/bin/bash + +# ============================================================================ +# RabbitMQ 配置管理脚本 +# 支持从数据库读取平台列表,自动配置 Exchange、Queue、Binding 和用户 +# ============================================================================ + +set -e + +# 脚本所在目录 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BACKEND_DIR="$(dirname "$SCRIPT_DIR")" +ENV_FILE="$BACKEND_DIR/.env" +MQ_USER_CONFIG="$BACKEND_DIR/config/autoload/mq_user.php" + +# RabbitMQ 连接信息 +RABBITMQ_HOST="127.0.0.1" +RABBITMQ_PORT="15672" +RABBITMQ_USER="admin" +RABBITMQ_PASS="admin" +VHOST="datahub" + +# 数据类型列表 +DATA_TYPES=("orders" "products" "refunds" "inventory") + +# 颜色输出 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# ============================================================================ +# 帮助信息 +# ============================================================================ +show_help() { + # 使用 printf '%b' 解释 ANSI 转义序列 + printf '%b\n' "$(cat << EOF +${BLUE}RabbitMQ 配置管理脚本${NC} + +${YELLOW}用法:${NC} + $0 [options] + +${YELLOW}命令:${NC} + ${GREEN}init${NC} 从数据库读取所有启用的平台,完全重建 MQ 配置 + - 删除现有 VHost 及所有资源 + - 从 platforms 表读取 enabled=true 的平台 + - 创建所有 Exchange、Queue、Binding + - 生成随机密码并保存到 mq_user.php + + ${GREEN}add ${NC} 添加单个平台的 MQ 配置 + - 创建平台的 Exchange 和 Binding + - 创建平台用户并设置权限 + - 生成随机密码并更新 mq_user.php + 示例: $0 add shopee + + ${GREEN}remove ${NC} 移除单个平台的 MQ 配置 + - 删除平台的 Exchange 和 Binding + - 删除平台用户 + - 从 mq_user.php 中移除配置 + 示例: $0 remove shopee + + ${GREEN}list${NC} 列出当前 MQ 中已配置的平台 + + ${GREEN}version${NC} 显示 RabbitMQ 服务器版本信息 + + ${GREEN}reset-password${NC} 重置指定用户的密码 + --user 用户名称 (consumer/ops/平台名) + 示例: $0 reset-password --user consumer + 示例: $0 reset-password --user tmall + + ${GREEN}--help, -h${NC} 显示此帮助信息 + +${YELLOW}配置文件:${NC} + 环境配置: $ENV_FILE (包含 MQ_PASSWORD_* 密码变量) + 用户配置: $MQ_USER_CONFIG (动态从数据库读取平台列表) + +${YELLOW}平台名称转换规则:${NC} + - 空格转换为下划线 + - 全部转为小写 + - 示例: "Amazon Japan" -> "amazon_japan" + +${YELLOW}数据库表结构:${NC} + platforms 表需要包含以下字段: + - id: 平台 ID + - name: 平台名称 + - enabled: 是否启用 (boolean) + +${YELLOW}示例:${NC} + $0 init # 完全重建 MQ 配置 + $0 add lazada # 添加 lazada 平台 + $0 remove kaola # 移除 kaola 平台 + $0 list # 列出已配置的平台 + +EOF +)" +} + +# ============================================================================ +# 依赖检测 +# ============================================================================ + +# 检测是否在容器环境中运行 +is_container() { + # 检测 Docker + if [[ -f /.dockerenv ]]; then + return 0 + fi + + # 检测 Podman (通过 /run/.containerenv 文件) + if [[ -f /run/.containerenv ]]; then + return 0 + fi + + # 检测 cgroup (Docker/Kubernetes/Podman) + if grep -qE 'docker|kubepods|containerd|libpod' /proc/1/cgroup 2>/dev/null; then + return 0 + fi + + # 检测 Kubernetes + if [[ -n "${KUBERNETES_SERVICE_HOST:-}" ]]; then + return 0 + fi + + # 检测容器运行时环境变量 + if [[ -n "${container:-}" ]]; then + return 0 + fi + + return 1 +} + +check_dependencies() { + local missing=() + local in_container=false + + if is_container; then + in_container=true + fi + + # 检查 rabbitmqadmin + if ! command -v rabbitmqadmin &> /dev/null; then + missing+=("rabbitmqadmin") + fi + + # 检查 php 及必要扩展 + if ! command -v php &> /dev/null; then + missing+=("php") + else + # 检查 PHP 扩展 + if ! php -m 2>/dev/null | grep -qi "^pdo_pgsql$"; then + missing+=("php-pdo_pgsql") + fi + if ! php -m 2>/dev/null | grep -qi "^sodium$"; then + missing+=("php-sodium") + fi + fi + + if [[ ${#missing[@]} -gt 0 ]]; then + echo -e "${RED}错误: 缺少必要的依赖工具${NC}" + echo "" + + if $in_container; then + echo -e "${YELLOW}检测到容器环境 (Docker/Podman/Kubernetes)${NC}" + echo "建议在 Dockerfile/Containerfile 或镜像中预装以下工具:" + echo "" + fi + + for tool in "${missing[@]}"; do + case "$tool" in + rabbitmqadmin) + echo -e "${YELLOW}rabbitmqadmin${NC} - RabbitMQ 管理工具" + if $in_container; then + echo " Dockerfile/Containerfile 示例:" + echo " RUN wget -O /usr/local/bin/rabbitmqadmin \\" + echo " http://rabbitmq:15672/cli/rabbitmqadmin && \\" + echo " chmod +x /usr/local/bin/rabbitmqadmin" + echo " 或使用 pip:" + echo " RUN pip install rabbitmqadmin" + else + echo " 安装方式:" + echo " 1. 从 RabbitMQ 管理界面下载:" + echo " wget http://localhost:15672/cli/rabbitmqadmin" + echo " chmod +x rabbitmqadmin" + echo " sudo mv rabbitmqadmin /usr/local/bin/" + echo " 2. 或者使用 pip:" + echo " pip install rabbitmqadmin" + fi + echo "" + ;; + php) + echo -e "${YELLOW}php${NC} - PHP 命令行" + if $in_container; then + echo " 如果使用 PHP 基础镜像,php 应该已经存在" + echo " 否则参考: https://hub.docker.com/_/php" + else + echo " 安装方式:" + echo " Ubuntu/Debian: sudo apt install php-cli" + echo " CentOS/RHEL: sudo yum install php-cli" + echo " macOS: brew install php" + fi + echo "" + ;; + php-pdo_pgsql) + echo -e "${YELLOW}php-pdo_pgsql${NC} - PHP PostgreSQL PDO 扩展" + if $in_container; then + echo " Dockerfile/Containerfile 示例:" + echo " RUN docker-php-ext-install pdo_pgsql" + else + echo " 安装方式:" + echo " Ubuntu/Debian: sudo apt install php-pgsql" + echo " CentOS/RHEL: sudo yum install php-pgsql" + fi + echo "" + ;; + php-sodium) + echo -e "${YELLOW}php-sodium${NC} - PHP Sodium 扩展" + if $in_container; then + echo " Dockerfile/Containerfile 示例:" + echo " RUN docker-php-ext-install sodium" + else + echo " 安装方式:" + echo " Ubuntu/Debian: sudo apt install php-sodium" + echo " CentOS/RHEL: sudo yum install php-sodium" + echo " 注意: PHP 7.2+ 默认包含 sodium 扩展" + fi + echo "" + ;; + esac + done + exit 1 + fi +} + +# ============================================================================ +# 工具函数 +# ============================================================================ + +# 输出信息 +info() { + echo -e "${GREEN}✓${NC} $1" +} + +warn() { + echo -e "${YELLOW}⚠${NC} $1" +} + +error() { + echo -e "${RED}✗${NC} $1" + exit 1 +} + +# 从 .env 文件读取配置 +load_env() { + if [[ ! -f "$ENV_FILE" ]]; then + error "找不到 .env 文件: $ENV_FILE" + fi + + # 读取数据库配置 + DB_HOST=$(grep -E "^DB_HOST=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + DB_PORT=$(grep -E "^DB_PORT=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + DB_DATABASE=$(grep -E "^DB_DATABASE=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + DB_USERNAME=$(grep -E "^DB_USERNAME=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + DB_PASSWORD=$(grep -E "^DB_PASSWORD=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + + if [[ -z "$DB_HOST" || -z "$DB_DATABASE" ]]; then + error "无法从 .env 文件读取数据库配置" + fi +} + +# 平台名称标准化:空格转下划线,全部小写 +normalize_platform_name() { + echo "$1" | tr '[:upper:]' '[:lower:]' | tr ' ' '_' +} + +# 生成随机密码 (16位,使用 PHP sodium) +generate_password() { + php -r "echo bin2hex(sodium_crypto_secretbox_keygen());" | head -c 16 +} + +# 从数据库获取所有启用的平台 (使用 PHP PDO) +get_platforms_from_db() { + load_env + php << EOF + PDO::ERRMODE_EXCEPTION + ]); + \$stmt = \$pdo->query("SELECT name FROM platforms WHERE enabled = true ORDER BY id"); + while (\$row = \$stmt->fetch(PDO::FETCH_ASSOC)) { + echo \$row['name'] . "\n"; + } +} catch (PDOException \$e) { + fwrite(STDERR, "数据库连接失败: " . \$e->getMessage() . "\n"); + exit(1); +} +EOF +} + +# 检查平台是否在数据库中存在 (使用 PHP PDO) +platform_exists_in_db() { + local platform_name="$1" + load_env + local count=$(php << EOF + PDO::ERRMODE_EXCEPTION + ]); + \$stmt = \$pdo->prepare("SELECT COUNT(*) FROM platforms WHERE LOWER(REPLACE(name, ' ', '_')) = LOWER(:name)"); + \$stmt->execute(['name' => '$platform_name']); + echo \$stmt->fetchColumn(); +} catch (PDOException \$e) { + echo "0"; +} +EOF +) + [[ "$count" -gt 0 ]] +} + +# ============================================================================ +# .env 密码配置操作 +# ============================================================================ + +# 添加或更新 .env 中的密码变量 +# 用法: add_password_to_env "SHOPEE" "password123" +add_password_to_env() { + local platform_upper="$1" + local password="$2" + local env_key="MQ_PASSWORD_${platform_upper}" + + # 检查变量是否已存在 + if grep -q "^${env_key}=" "$ENV_FILE" 2>/dev/null; then + # 更新现有变量 + sed -i "s|^${env_key}=.*|${env_key}=\"${password}\"|" "$ENV_FILE" + else + # 检查是否需要添加分隔注释 + if ! grep -q "^# RabbitMQ" "$ENV_FILE" 2>/dev/null; then + echo "" >> "$ENV_FILE" + echo "# RabbitMQ 用户密码配置 (由 bin/rabbitmq.sh 自动生成)" >> "$ENV_FILE" + fi + # 追加新变量 + echo "${env_key}=\"${password}\"" >> "$ENV_FILE" + fi +} + +# 从 .env 中移除密码变量 +remove_password_from_env() { + local platform_upper="$1" + local env_key="MQ_PASSWORD_${platform_upper}" + + if grep -q "^${env_key}=" "$ENV_FILE" 2>/dev/null; then + sed -i "/^${env_key}=/d" "$ENV_FILE" + fi +} + +# 读取 .env 中的现有密码 +get_existing_password_from_env() { + local platform_upper="$1" + local env_key="MQ_PASSWORD_${platform_upper}" + + grep "^${env_key}=" "$ENV_FILE" 2>/dev/null | cut -d'=' -f2 | tr -d '"' | tr -d "'" +} + +# 批量写入所有密码到 .env (用于 init 模式) +write_all_passwords_to_env() { + local -n platforms_ref=$1 + local -n passwords_ref=$2 + local consumer_password="$3" + local ops_password="$4" + + # 先清理现有的 MQ_PASSWORD_ 变量 + sed -i '/^MQ_PASSWORD_/d' "$ENV_FILE" 2>/dev/null || true + sed -i '/^# RabbitMQ 用户密码配置/d' "$ENV_FILE" 2>/dev/null || true + + # 移除文件末尾的多余空行 + sed -i -e :a -e '/^\n*$/{$d;N;ba' -e '}' "$ENV_FILE" 2>/dev/null || true + + # 追加新配置 + { + echo "" + echo "# RabbitMQ 用户密码配置 (由 bin/rabbitmq.sh 自动生成,时间: $(date '+%Y-%m-%d %H:%M:%S'))" + echo "MQ_PASSWORD_CONSUMER=\"${consumer_password}\"" + echo "MQ_PASSWORD_OPS=\"${ops_password}\"" + for platform in "${platforms_ref[@]}"; do + local platform_upper=$(echo "$platform" | tr '[:lower:]' '[:upper:]') + echo "MQ_PASSWORD_${platform_upper}=\"${passwords_ref[$platform]}\"" + done + } >> "$ENV_FILE" +} + +# ============================================================================ +# RabbitMQ 操作函数 +# ============================================================================ + +# 创建单个平台的 MQ 配置 +create_platform_config() { + local platform="$1" + local password="$2" + + echo "" + echo -e "${BLUE}处理平台: ${platform}${NC}" + + # 创建平台业务 Exchange + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "${platform}.exchange" --vhost "$VHOST" \ + --type topic --durable true + info "创建业务 Exchange: ${platform}.exchange" + + # 绑定到业务队列 + for dtype in "${DATA_TYPES[@]}"; do + local dtype_singular="${dtype%s}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "${dtype}.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "${dtype_singular}.${platform}" + done + info "绑定到业务队列 (${#DATA_TYPES[@]}个)" + + # 创建平台错误 Exchange + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \ + --type topic --durable true + info "创建错误 Exchange: ${platform}.errors.exchange" + + # 绑定到错误队列 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "#" + info "绑定到错误队列" + + # 创建平台用户 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_${platform}" --password "$password" --tags "" + info "创建用户: user_${platform}" + + # 配置平台用户权限 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_${platform}" \ + --configure "^${platform}\\.(exchange|errors\\.exchange)$" \ + --write "^${platform}\\.(exchange|errors\\.exchange)$" \ + --read "^${platform}\\.errors\\..*$" + info "配置用户权限" +} + +# 删除单个平台的 MQ 配置 +remove_platform_config() { + local platform="$1" + + echo "" + echo -e "${BLUE}移除平台: ${platform}${NC}" + + # 删除用户 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete user --name "user_${platform}" 2>/dev/null || true + info "删除用户: user_${platform}" + + # 删除 Exchange (会自动删除相关的 Binding) + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete exchange --name "${platform}.exchange" --vhost "$VHOST" 2>/dev/null || true + info "删除 Exchange: ${platform}.exchange" + + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete exchange --name "${platform}.errors.exchange" --vhost "$VHOST" 2>/dev/null || true + info "删除 Exchange: ${platform}.errors.exchange" +} + +# 创建基础设施 (VHost, 主 Exchange, DLX, 队列等) +create_infrastructure() { + echo "" + echo "========================================" + echo "创建基础设施..." + echo "========================================" + + # 1. 创建 VHost + echo "" + echo "创建 VHost: $VHOST" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare vhost --name "$VHOST" + info "VHost 创建成功" + + # 2. 创建主 Exchange + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "main.exchange" --vhost "$VHOST" \ + --type topic --durable true + info "创建主 Exchange: main.exchange" + + # 3. 创建 DLX + echo "" + echo "创建死信交换机 (DLX)..." + for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \ + --type topic --durable true + info "创建 DLX: dlx.${dtype}" + done + + # 4. 创建主业务队列 + echo "" + echo "创建主业务队列..." + for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable true \ + --arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" + info "创建队列: ${dtype}.queue" + done + + # 5. 创建重试队列 + echo "" + echo "创建重试队列..." + for dtype in "${DATA_TYPES[@]}"; do + local dtype_singular="${dtype%s}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \ + --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}" + info "创建重试队列: ${dtype}.retry.queue" + done + + # 6. 创建错误 Exchange 和队列 + echo "" + echo "创建错误 Exchange 和队列..." + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "errors.exchange" --vhost "$VHOST" \ + --type topic --durable true + info "创建错误 Exchange: errors.exchange" + + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "errors.queue" --vhost "$VHOST" --durable true \ + --arguments '{"x-message-ttl":604800000}' + info "创建错误队列: errors.queue" + + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "#" + info "绑定: errors.exchange → errors.queue" + + # 7. 绑定主 Exchange 到主队列 + echo "" + echo "绑定主 Exchange 到主队列..." + for dtype in "${DATA_TYPES[@]}"; do + local dtype_singular="${dtype%s}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "main.exchange" --destination "${dtype}.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "${dtype_singular}.#" + info "绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype_singular}.#)" + done + + # 8. 绑定 DLX 到重试队列 + echo "" + echo "绑定 DLX 到重试队列..." + for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "retry" + info "绑定: dlx.${dtype} → ${dtype}.retry.queue" + done +} + +# 创建系统用户 (consumer 和 ops) +create_system_users() { + local consumer_password="$1" + local ops_password="$2" + + echo "" + echo "========================================" + echo "创建系统用户..." + echo "========================================" + + # 创建 datahub 消费者用户 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_datahub_consumer" --password "$consumer_password" --tags "" + info "创建用户: user_datahub_consumer" + + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_datahub_consumer" \ + --configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \ + --write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \ + --read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$" + info "配置 consumer 用户权限" + + # 创建运维监控用户 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_ops" --password "$ops_password" --tags "" + info "创建用户: user_ops" + + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_ops" \ + --configure "^errors\\..*$" --write "" --read "^errors\\.queue$" + info "配置 ops 用户权限" +} + +# 清理现有配置 +cleanup_existing() { + echo "========================================" + echo "开始清理现有配置..." + echo "========================================" + + # 检查 VHost 是否存在 + echo "" + echo "检查 VHost: $VHOST 是否存在..." + VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list vhosts | grep -w "$VHOST" || true) + + if [[ -n "$VHOST_EXISTS" ]]; then + warn "VHost '$VHOST' 已存在,将删除所有现有配置..." + + # 获取所有平台用户并删除 + local users=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list users name -f tsv 2>/dev/null | grep "^user_" || true) + + for user in $users; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete user --name "$user" 2>/dev/null || true + info "删除用户: $user" + done + + # 删除 VHost + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete vhost --name "$VHOST" + info "VHost '$VHOST' 及其所有资源已删除" + else + echo "VHost '$VHOST' 不存在" + fi +} + +# ============================================================================ +# 主命令函数 +# ============================================================================ + +# init 命令:完全重建 +cmd_init() { + echo -e "${BLUE}========================================" + echo "RabbitMQ 初始化模式" + echo "========================================${NC}" + + # 1. 从数据库获取平台列表 + echo "" + echo "从数据库读取平台列表..." + local db_platforms=$(get_platforms_from_db) + + if [[ -z "$db_platforms" ]]; then + error "无法从数据库获取平台列表,或没有启用的平台" + fi + + # 转换平台名称 + declare -a PLATFORMS=() + declare -A PLATFORM_PASSWORDS=() + + while IFS= read -r platform; do + local normalized=$(normalize_platform_name "$platform") + PLATFORMS+=("$normalized") + PLATFORM_PASSWORDS["$normalized"]=$(generate_password) + done <<< "$db_platforms" + + echo "找到 ${#PLATFORMS[@]} 个启用的平台:" + printf " - %s\n" "${PLATFORMS[@]}" + + # 2. 清理现有配置 + cleanup_existing + + # 3. 创建基础设施 + create_infrastructure + + # 4. 生成系统用户密码 + local consumer_password=$(generate_password) + local ops_password=$(generate_password) + + # 5. 创建系统用户 + create_system_users "$consumer_password" "$ops_password" + + # 6. 为每个平台创建配置 + echo "" + echo "========================================" + echo "创建平台配置..." + echo "========================================" + + for platform in "${PLATFORMS[@]}"; do + create_platform_config "$platform" "${PLATFORM_PASSWORDS[$platform]}" + done + + # 7. 写入密码到 .env + echo "" + echo "========================================" + echo "写入密码到 .env..." + echo "========================================" + write_all_passwords_to_env PLATFORMS PLATFORM_PASSWORDS "$consumer_password" "$ops_password" + info "密码已写入: $ENV_FILE" + + # 8. 输出摘要 + echo "" + echo -e "${GREEN}========================================" + echo "RabbitMQ 配置完成!" + echo "========================================${NC}" + echo "" + echo "已创建:" + echo "- 1 个 VHost: $VHOST" + echo "- 1 个主 Exchange: main.exchange" + echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" + echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" + echo "- 4 个重试队列: *.retry.queue" + echo "- 1 个错误队列: errors.queue" + echo "- ${#PLATFORMS[@]} 个平台配置" + echo "- $((${#PLATFORMS[@]} + 2)) 个用户" + echo "" + echo "密码配置: $ENV_FILE (MQ_PASSWORD_* 变量)" + echo "用户配置: $MQ_USER_CONFIG (动态从数据库读取)" +} + +# add 命令:添加单个平台 +cmd_add() { + local platform_input="$1" + + if [[ -z "$platform_input" ]]; then + error "请指定要添加的平台名称,例如: $0 add shopee" + fi + + local platform=$(normalize_platform_name "$platform_input") + + echo -e "${BLUE}========================================" + echo "添加平台: $platform" + echo "========================================${NC}" + + # 检查 VHost 是否存在 + VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list vhosts | grep -w "$VHOST" || true) + + if [[ -z "$VHOST_EXISTS" ]]; then + error "VHost '$VHOST' 不存在,请先运行 '$0 init' 初始化" + fi + + # 检查平台是否已存在 + local exchange_exists=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list exchanges --vhost "$VHOST" | grep -w "${platform}.exchange" || true) + + if [[ -n "$exchange_exists" ]]; then + error "平台 '$platform' 已存在" + fi + + # 生成密码 + local password=$(generate_password) + + # 创建平台配置 + create_platform_config "$platform" "$password" + + # 写入密码到 .env + local platform_upper=$(echo "$platform" | tr '[:lower:]' '[:upper:]') + add_password_to_env "$platform_upper" "$password" + + echo "" + echo -e "${GREEN}========================================" + echo "平台 $platform 添加成功!" + echo "========================================${NC}" + echo "" + echo "用户: user_${platform}" + echo "密码已写入 .env: MQ_PASSWORD_${platform_upper}" +} + +# remove 命令:移除单个平台 +cmd_remove() { + local platform_input="$1" + + if [[ -z "$platform_input" ]]; then + error "请指定要移除的平台名称,例如: $0 remove shopee" + fi + + local platform=$(normalize_platform_name "$platform_input") + + echo -e "${BLUE}========================================" + echo "移除平台: $platform" + echo "========================================${NC}" + + # 移除 MQ 配置 + remove_platform_config "$platform" + + # 从 .env 中移除密码 + local platform_upper=$(echo "$platform" | tr '[:lower:]' '[:upper:]') + remove_password_from_env "$platform_upper" + + echo "" + echo -e "${GREEN}========================================" + echo "平台 $platform 移除成功!" + echo "========================================${NC}" + echo "" + echo "已从 .env 移除: MQ_PASSWORD_${platform_upper}" +} + +# reset-password 命令:重置用户密码 +cmd_reset_password() { + local user_name="" + + # 解析参数 + while [[ $# -gt 0 ]]; do + case "$1" in + --user) + user_name="$2" + shift 2 + ;; + *) + error "未知参数: $1\n用法: $0 reset-password --user " + ;; + esac + done + + if [[ -z "$user_name" ]]; then + error "请指定用户名称\n用法: $0 reset-password --user \n示例: $0 reset-password --user consumer" + fi + + # 标准化用户名称 + local normalized_name=$(normalize_platform_name "$user_name") + local new_password=$(generate_password) + + echo -e "${BLUE}========================================" + echo "重置用户密码: $normalized_name" + echo "========================================${NC}" + + # 判断用户类型并处理 + case "$normalized_name" in + consumer) + local mq_user="user_datahub_consumer" + local env_key="CONSUMER" + ;; + ops) + local mq_user="user_ops" + local env_key="OPS" + ;; + *) + # 平台用户 + local mq_user="user_${normalized_name}" + local env_key=$(echo "$normalized_name" | tr '[:lower:]' '[:upper:]') + ;; + esac + + # 检查用户是否存在 + local user_exists=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list users 2>/dev/null | grep -w "${mq_user}" || true) + + if [[ -z "$user_exists" ]]; then + error "用户 '$mq_user' 不存在" + fi + + # 更新 RabbitMQ 用户密码 + echo "" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "$mq_user" --password "$new_password" --tags "" + info "RabbitMQ 用户密码已更新: $mq_user" + + # 更新 .env 文件 + add_password_to_env "$env_key" "$new_password" + info "密码已写入 .env: MQ_PASSWORD_${env_key}" + + echo "" + echo -e "${GREEN}========================================" + echo "密码重置成功!" + echo "========================================${NC}" + echo "" + echo "用户: $mq_user" + echo "新密码已保存到 .env: MQ_PASSWORD_${env_key}" + echo "" + echo -e "${YELLOW}注意: 请确保相关服务使用新密码重新连接${NC}" +} + +# list 命令:列出已配置的平台 +cmd_list() { + echo -e "${BLUE}========================================" + echo "已配置的平台列表" + echo "========================================${NC}" + + # 检查 VHost 是否存在 + VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list vhosts 2>/dev/null | grep -w "$VHOST" || true) + + if [[ -z "$VHOST_EXISTS" ]]; then + warn "VHost '$VHOST' 不存在" + return + fi + + # 获取所有平台 Exchange (排除系统 Exchange) + echo "" + echo "MQ 中的平台:" + local exchanges=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list exchanges --vhost "$VHOST" name -f tsv 2>/dev/null | grep -E "^[a-z0-9_]+\.exchange$" | grep -v "^main\." | grep -v "^errors\." | sed 's/\.exchange$//' | sort) + + if [[ -n "$exchanges" ]]; then + echo "$exchanges" | while read platform; do + echo " - $platform" + done + else + echo " (无)" + fi + + # 显示配置文件中的平台 + if [[ -f "$MQ_USER_CONFIG" ]]; then + echo "" + echo "配置文件中的平台:" + php -r " + \$config = include '$MQ_USER_CONFIG'; + foreach (\$config['platforms'] ?? [] as \$name => \$data) { + echo \" - \$name\n\"; + } + " 2>/dev/null || echo " (无法读取配置文件)" + fi +} + +# version 命令:显示 RabbitMQ 服务器版本信息 +cmd_version() { + echo -e "${BLUE}========================================" + echo -e "RabbitMQ 服务器版本信息" + echo -e "========================================${NC}" + echo "" + + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + show overview 2>/dev/null || error "无法连接到 RabbitMQ 服务器" +} + +# ============================================================================ +# 主入口 +# ============================================================================ + +main() { + local command="${1:-}" + + # 显示帮助不需要检测依赖 + if [[ "$command" != "--help" && "$command" != "-h" && "$command" != "help" && -n "$command" ]]; then + check_dependencies + fi + + case "$command" in + init) + cmd_init + ;; + add) + cmd_add "$2" + ;; + remove) + cmd_remove "$2" + ;; + list) + cmd_list + ;; + version) + cmd_version + ;; + reset-password) + shift + cmd_reset_password "$@" + ;; + --help|-h|help|"") + show_help + ;; + *) + error "未知命令: $command\n使用 '$0 --help' 查看帮助" + ;; + esac +} + +main "$@" diff --git a/backend/bin/rabbitmq.sh b/backend/bin/rabbitmq.sh index 1a2aab2..f9bcde3 100755 --- a/backend/bin/rabbitmq.sh +++ b/backend/bin/rabbitmq.sh @@ -1,25 +1,36 @@ #!/bin/bash # ============================================================================ -# RabbitMQ 配置管理脚本 +# RabbitMQ 配置管理脚本 (适配 datahub-backend 容器环境) # 支持从数据库读取平台列表,自动配置 Exchange、Queue、Binding 和用户 +# 通过 RabbitMQ Management HTTP API (curl) 操作,无需 rabbitmqadmin # ============================================================================ set -e -# 脚本所在目录 +# 脚本所在目录 (容器内路径: /opt/www) SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" BACKEND_DIR="$(dirname "$SCRIPT_DIR")" -ENV_FILE="$BACKEND_DIR/.env" MQ_USER_CONFIG="$BACKEND_DIR/config/autoload/mq_user.php" -# RabbitMQ 连接信息 -RABBITMQ_HOST="127.0.0.1" +# .env 密码文件输出路径 +# 容器内 /mnt/share 映射到宿主机 /var/container/share +# 执行完成后需手动合并到宿主机 /opt/datahub/backend/.env +ENV_FILE="/mnt/share/rabbitmq_passwords.env" + +# RabbitMQ 连接信息 (容器间通过容器名通信) +RABBITMQ_HOST="datahub-rabbitmq" RABBITMQ_PORT="15672" -RABBITMQ_USER="admin" -RABBITMQ_PASS="admin" +RABBITMQ_USER="user" +RABBITMQ_PASS="hub123456" VHOST="datahub" +# URL 编码的 VHost (用于 HTTP API 路径) +VHOST_ENCODED=$(php -r "echo rawurlencode('$VHOST');" 2>/dev/null || echo "$VHOST") + +# RabbitMQ API 基础 URL +MQ_API="http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api" + # 数据类型列表 DATA_TYPES=("orders" "products" "refunds" "inventory") @@ -36,7 +47,7 @@ NC='\033[0m' # No Color show_help() { # 使用 printf '%b' 解释 ANSI 转义序列 printf '%b\n' "$(cat << EOF -${BLUE}RabbitMQ 配置管理脚本${NC} +${BLUE}RabbitMQ 配置管理脚本 (容器版)${NC} ${YELLOW}用法:${NC} $0 [options] @@ -72,7 +83,7 @@ ${YELLOW}命令:${NC} ${GREEN}--help, -h${NC} 显示此帮助信息 ${YELLOW}配置文件:${NC} - 环境配置: $ENV_FILE (包含 MQ_PASSWORD_* 密码变量) + 密码输出: /mnt/share/rabbitmq_passwords.env (宿主机 /var/container/share/) 用户配置: $MQ_USER_CONFIG (动态从数据库读取平台列表) ${YELLOW}平台名称转换规则:${NC} @@ -138,9 +149,9 @@ check_dependencies() { in_container=true fi - # 检查 rabbitmqadmin - if ! command -v rabbitmqadmin &> /dev/null; then - missing+=("rabbitmqadmin") + # 检查 curl (用于调用 RabbitMQ HTTP API) + if ! command -v curl &> /dev/null; then + missing+=("curl") fi # 检查 php 及必要扩展 @@ -168,23 +179,15 @@ check_dependencies() { for tool in "${missing[@]}"; do case "$tool" in - rabbitmqadmin) - echo -e "${YELLOW}rabbitmqadmin${NC} - RabbitMQ 管理工具" + curl) + echo -e "${YELLOW}curl${NC} - HTTP 请求工具 (用于调用 RabbitMQ Management API)" if $in_container; then - echo " Dockerfile/Containerfile 示例:" - echo " RUN wget -O /usr/local/bin/rabbitmqadmin \\" - echo " http://rabbitmq:15672/cli/rabbitmqadmin && \\" - echo " chmod +x /usr/local/bin/rabbitmqadmin" - echo " 或使用 pip:" - echo " RUN pip install rabbitmqadmin" + echo " Alpine: apk add curl" + echo " Debian: apt-get install curl" else echo " 安装方式:" - echo " 1. 从 RabbitMQ 管理界面下载:" - echo " wget http://localhost:15672/cli/rabbitmqadmin" - echo " chmod +x rabbitmqadmin" - echo " sudo mv rabbitmqadmin /usr/local/bin/" - echo " 2. 或者使用 pip:" - echo " pip install rabbitmqadmin" + echo " Ubuntu/Debian: sudo apt install curl" + echo " CentOS/RHEL: sudo yum install curl" fi echo "" ;; @@ -222,7 +225,6 @@ check_dependencies() { echo " 安装方式:" echo " Ubuntu/Debian: sudo apt install php-sodium" echo " CentOS/RHEL: sudo yum install php-sodium" - echo " 注意: PHP 7.2+ 默认包含 sodium 扩展" fi echo "" ;; @@ -230,6 +232,160 @@ check_dependencies() { done exit 1 fi + + # 检查 RabbitMQ Management API 是否可达 + local http_code + http_code=$(curl -s -o /dev/null -w "%{http_code}" -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ + "${MQ_API}/overview" 2>/dev/null || echo "000") + + if [[ "$http_code" == "000" ]]; then + error "无法连接到 RabbitMQ Management API: ${MQ_API}\n请确认 datahub-rabbitmq 容器正在运行且 management 插件已启用" + elif [[ "$http_code" == "401" ]]; then + error "RabbitMQ 认证失败,请检查用户名和密码" + elif [[ "$http_code" != "200" ]]; then + error "RabbitMQ Management API 返回异常状态码: ${http_code}" + fi +} + +# ============================================================================ +# RabbitMQ HTTP API 封装函数 +# ============================================================================ + +# 通用 API 请求函数 +# 用法: mq_api_call [JSON_DATA] +# 写操作 (PUT/POST/DELETE) 自动丢弃响应体,GET 保留输出 +mq_api_call() { + local method="$1" + local path="$2" + local data="${3:-}" + + if [[ "$method" != "GET" ]]; then + if [[ -n "$data" ]]; then + curl -s -o /dev/null -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ + -H "content-type: application/json" \ + -X "$method" \ + -d "$data" \ + "${MQ_API}${path}" + else + curl -s -o /dev/null -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ + -H "content-type: application/json" \ + -X "$method" \ + "${MQ_API}${path}" + fi + else + if [[ -n "$data" ]]; then + curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ + -H "content-type: application/json" \ + -X "$method" \ + -d "$data" \ + "${MQ_API}${path}" + else + curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ + -H "content-type: application/json" \ + -X "$method" \ + "${MQ_API}${path}" + fi + fi +} + +# 声明 VHost +mq_declare_vhost() { + local name="$1" + local encoded + encoded=$(php -r "echo rawurlencode('$name');" 2>/dev/null || echo "$name") + mq_api_call PUT "/vhosts/${encoded}" '{}' +} + +# 删除 VHost +mq_delete_vhost() { + local name="$1" + local encoded + encoded=$(php -r "echo rawurlencode('$name');" 2>/dev/null || echo "$name") + mq_api_call DELETE "/vhosts/${encoded}" 2>/dev/null || true +} + +# 列出 VHosts +mq_list_vhosts() { + mq_api_call GET "/vhosts" 2>/dev/null +} + +# 声明 Exchange +mq_declare_exchange() { + local name="$1" + local type="${2:-topic}" + local durable="${3:-true}" + local body + body=$(printf '{"type":"%s","durable":%s}' "$type" "$durable") + mq_api_call PUT "/exchanges/${VHOST_ENCODED}/${name}" "$body" +} + +# 删除 Exchange +mq_delete_exchange() { + local name="$1" + mq_api_call DELETE "/exchanges/${VHOST_ENCODED}/${name}" 2>/dev/null || true +} + +# 声明 Queue +mq_declare_queue() { + local name="$1" + local arguments="$2" + if [[ -z "$arguments" ]]; then + arguments='{}' + fi + local body + body=$(printf '{"durable":true,"arguments":%s}' "$arguments") + mq_api_call PUT "/queues/${VHOST_ENCODED}/${name}" "$body" +} + +# 声明 Binding +mq_declare_binding() { + local source="$1" + local destination="$2" + local routing_key="$3" + local dest_type="${4:-queue}" + + local dest_char="q" + if [[ "$dest_type" == "exchange" ]]; then + dest_char="e" + fi + + mq_api_call POST "/bindings/${VHOST_ENCODED}/e/${source}/${dest_char}/${destination}" \ + "$(printf '{"routing_key":"%s"}' "$routing_key")" +} + +# 声明用户 +mq_declare_user() { + local name="$1" + local password="$2" + local tags="${3:-}" + mq_api_call PUT "/users/${name}" \ + "$(printf '{"password":"%s","tags":"%s"}' "$password" "$tags")" +} + +# 删除用户 +mq_delete_user() { + local name="$1" + mq_api_call DELETE "/users/${name}" 2>/dev/null || true +} + +# 列出用户 +mq_list_users() { + mq_api_call GET "/users" 2>/dev/null +} + +# 设置用户权限 +mq_set_permissions() { + local user="$1" + local configure="$2" + local write="$3" + local read="$4" + mq_api_call PUT "/permissions/${VHOST_ENCODED}/${user}" \ + "$(printf '{"configure":"%s","write":"%s","read":"%s"}' "$configure" "$write" "$read")" +} + +# 列出 Exchanges +mq_list_exchanges() { + mq_api_call GET "/exchanges/${VHOST_ENCODED}" 2>/dev/null } # ============================================================================ @@ -250,21 +406,17 @@ error() { exit 1 } -# 从 .env 文件读取配置 +# 从容器环境变量读取数据库配置 load_env() { - if [[ ! -f "$ENV_FILE" ]]; then - error "找不到 .env 文件: $ENV_FILE" - fi - - # 读取数据库配置 - DB_HOST=$(grep -E "^DB_HOST=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") - DB_PORT=$(grep -E "^DB_PORT=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") - DB_DATABASE=$(grep -E "^DB_DATABASE=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") - DB_USERNAME=$(grep -E "^DB_USERNAME=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") - DB_PASSWORD=$(grep -E "^DB_PASSWORD=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + # 容器内通过环境变量注入,直接读取 + DB_HOST="${DB_HOST:-}" + DB_PORT="${DB_PORT:-5432}" + DB_DATABASE="${DB_DATABASE:-}" + DB_USERNAME="${DB_USERNAME:-}" + DB_PASSWORD="${DB_PASSWORD:-}" if [[ -z "$DB_HOST" || -z "$DB_DATABASE" ]]; then - error "无法从 .env 文件读取数据库配置" + error "无法读取数据库配置,请确认容器环境变量 DB_HOST, DB_DATABASE 等已设置" fi } @@ -405,43 +557,33 @@ create_platform_config() { echo -e "${BLUE}处理平台: ${platform}${NC}" # 创建平台业务 Exchange - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "${platform}.exchange" --vhost "$VHOST" \ - --type topic --durable true + mq_declare_exchange "${platform}.exchange" "topic" "true" info "创建业务 Exchange: ${platform}.exchange" # 绑定到业务队列 for dtype in "${DATA_TYPES[@]}"; do local dtype_singular="${dtype%s}" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "${platform}.exchange" --destination "${dtype}.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "${dtype_singular}.${platform}" + mq_declare_binding "${platform}.exchange" "${dtype}.queue" "${dtype_singular}.${platform}" done info "绑定到业务队列 (${#DATA_TYPES[@]}个)" # 创建平台错误 Exchange - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \ - --type topic --durable true + mq_declare_exchange "${platform}.errors.exchange" "topic" "true" info "创建错误 Exchange: ${platform}.errors.exchange" # 绑定到错误队列 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "#" + mq_declare_binding "${platform}.errors.exchange" "errors.queue" "#" info "绑定到错误队列" # 创建平台用户 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user --name "user_${platform}" --password "$password" --tags "" + mq_declare_user "user_${platform}" "$password" "" info "创建用户: user_${platform}" # 配置平台用户权限 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permissions --vhost "$VHOST" --user "user_${platform}" \ - --configure "^${platform}\\.(exchange|errors\\.exchange)$" \ - --write "^${platform}\\.(exchange|errors\\.exchange)$" \ - --read "^${platform}\\.errors\\..*$" + mq_set_permissions "user_${platform}" \ + "^${platform}\\.(exchange|errors\\.exchange)$" \ + "^${platform}\\.(exchange|errors\\.exchange)$" \ + "^${platform}\\.errors\\..*$" info "配置用户权限" } @@ -453,17 +595,14 @@ remove_platform_config() { echo -e "${BLUE}移除平台: ${platform}${NC}" # 删除用户 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - delete user --name "user_${platform}" 2>/dev/null || true + mq_delete_user "user_${platform}" info "删除用户: user_${platform}" # 删除 Exchange (会自动删除相关的 Binding) - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - delete exchange --name "${platform}.exchange" --vhost "$VHOST" 2>/dev/null || true + mq_delete_exchange "${platform}.exchange" info "删除 Exchange: ${platform}.exchange" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - delete exchange --name "${platform}.errors.exchange" --vhost "$VHOST" 2>/dev/null || true + mq_delete_exchange "${platform}.errors.exchange" info "删除 Exchange: ${platform}.errors.exchange" } @@ -477,23 +616,18 @@ create_infrastructure() { # 1. 创建 VHost echo "" echo "创建 VHost: $VHOST" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare vhost --name "$VHOST" + mq_declare_vhost "$VHOST" info "VHost 创建成功" # 2. 创建主 Exchange - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "main.exchange" --vhost "$VHOST" \ - --type topic --durable true + mq_declare_exchange "main.exchange" "topic" "true" info "创建主 Exchange: main.exchange" # 3. 创建 DLX echo "" echo "创建死信交换机 (DLX)..." for dtype in "${DATA_TYPES[@]}"; do - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \ - --type topic --durable true + mq_declare_exchange "dlx.${dtype}" "topic" "true" info "创建 DLX: dlx.${dtype}" done @@ -501,9 +635,8 @@ create_infrastructure() { echo "" echo "创建主业务队列..." for dtype in "${DATA_TYPES[@]}"; do - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable true \ - --arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" + mq_declare_queue "${dtype}.queue" \ + '{"x-message-ttl":86400000,"x-dead-letter-exchange":"dlx.'"${dtype}"'","x-dead-letter-routing-key":"retry"}' info "创建队列: ${dtype}.queue" done @@ -512,28 +645,21 @@ create_infrastructure() { echo "创建重试队列..." for dtype in "${DATA_TYPES[@]}"; do local dtype_singular="${dtype%s}" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \ - --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}" + mq_declare_queue "${dtype}.retry.queue" \ + '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"'"${dtype_singular}"'.retry"}' info "创建重试队列: ${dtype}.retry.queue" done # 6. 创建错误 Exchange 和队列 echo "" echo "创建错误 Exchange 和队列..." - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "errors.exchange" --vhost "$VHOST" \ - --type topic --durable true + mq_declare_exchange "errors.exchange" "topic" "true" info "创建错误 Exchange: errors.exchange" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare queue --name "errors.queue" --vhost "$VHOST" --durable true \ - --arguments '{"x-message-ttl":604800000}' + mq_declare_queue "errors.queue" '{"x-message-ttl":604800000}' info "创建错误队列: errors.queue" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "errors.exchange" --destination "errors.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "#" + mq_declare_binding "errors.exchange" "errors.queue" "#" info "绑定: errors.exchange → errors.queue" # 7. 绑定主 Exchange 到主队列 @@ -541,9 +667,7 @@ create_infrastructure() { echo "绑定主 Exchange 到主队列..." for dtype in "${DATA_TYPES[@]}"; do local dtype_singular="${dtype%s}" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "main.exchange" --destination "${dtype}.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "${dtype_singular}.#" + mq_declare_binding "main.exchange" "${dtype}.queue" "${dtype_singular}.#" info "绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype_singular}.#)" done @@ -551,9 +675,7 @@ create_infrastructure() { echo "" echo "绑定 DLX 到重试队列..." for dtype in "${DATA_TYPES[@]}"; do - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "retry" + mq_declare_binding "dlx.${dtype}" "${dtype}.retry.queue" "retry" info "绑定: dlx.${dtype} → ${dtype}.retry.queue" done } @@ -569,25 +691,21 @@ create_system_users() { echo "========================================" # 创建 datahub 消费者用户 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user --name "user_datahub_consumer" --password "$consumer_password" --tags "" + mq_declare_user "user_datahub_consumer" "$consumer_password" "" info "创建用户: user_datahub_consumer" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permissions --vhost "$VHOST" --user "user_datahub_consumer" \ - --configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \ - --write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \ - --read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$" + mq_set_permissions "user_datahub_consumer" \ + "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \ + "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \ + "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$" info "配置 consumer 用户权限" # 创建运维监控用户 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user --name "user_ops" --password "$ops_password" --tags "" + mq_declare_user "user_ops" "$ops_password" "" info "创建用户: user_ops" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permissions --vhost "$VHOST" --user "user_ops" \ - --configure "^errors\\..*$" --write "" --read "^errors\\.queue$" + mq_set_permissions "user_ops" \ + "^errors\\..*$" "" "^errors\\.queue$" info "配置 ops 用户权限" } @@ -600,25 +718,41 @@ cleanup_existing() { # 检查 VHost 是否存在 echo "" echo "检查 VHost: $VHOST 是否存在..." - VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - list vhosts | grep -w "$VHOST" || true) + local vhost_list + vhost_list=$(mq_list_vhosts) + local vhost_exists + vhost_exists=$(echo "$vhost_list" | php -r ' + $data = json_decode(file_get_contents("php://stdin"), true); + foreach ($data ?? [] as $v) { + if ($v["name"] === "'"$VHOST"'") { echo "yes"; exit; } + } + ' 2>/dev/null) - if [[ -n "$VHOST_EXISTS" ]]; then + if [[ "$vhost_exists" == "yes" ]]; then warn "VHost '$VHOST' 已存在,将删除所有现有配置..." # 获取所有平台用户并删除 - local users=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - list users name -f tsv 2>/dev/null | grep "^user_" || true) + local users_json + users_json=$(mq_list_users) + local users + users=$(echo "$users_json" | php -r ' + $data = json_decode(file_get_contents("php://stdin"), true); + foreach ($data ?? [] as $u) { + if (strpos($u["name"], "user_") === 0) { + echo $u["name"] . "\n"; + } + } + ' 2>/dev/null) - for user in $users; do - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - delete user --name "$user" 2>/dev/null || true - info "删除用户: $user" - done + while IFS= read -r user; do + if [[ -n "$user" ]]; then + mq_delete_user "$user" + info "删除用户: $user" + fi + done <<< "$users" # 删除 VHost - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - delete vhost --name "$VHOST" + mq_delete_vhost "$VHOST" info "VHost '$VHOST' 及其所有资源已删除" else echo "VHost '$VHOST' 不存在" @@ -704,7 +838,9 @@ cmd_init() { echo "- ${#PLATFORMS[@]} 个平台配置" echo "- $((${#PLATFORMS[@]} + 2)) 个用户" echo "" - echo "密码配置: $ENV_FILE (MQ_PASSWORD_* 变量)" + echo "密码配置: $ENV_FILE" + echo " (宿主机路径: /var/container/share/rabbitmq_passwords.env)" + echo " 请手动将密码合并到宿主机 /opt/datahub/backend/.env" echo "用户配置: $MQ_USER_CONFIG (动态从数据库读取)" } @@ -723,18 +859,32 @@ cmd_add() { echo "========================================${NC}" # 检查 VHost 是否存在 - VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - list vhosts | grep -w "$VHOST" || true) + local vhost_list + vhost_list=$(mq_list_vhosts) + local vhost_exists + vhost_exists=$(echo "$vhost_list" | php -r ' + $data = json_decode(file_get_contents("php://stdin"), true); + foreach ($data ?? [] as $v) { + if ($v["name"] === "'"$VHOST"'") { echo "yes"; exit; } + } + ' 2>/dev/null) - if [[ -z "$VHOST_EXISTS" ]]; then + if [[ "$vhost_exists" != "yes" ]]; then error "VHost '$VHOST' 不存在,请先运行 '$0 init' 初始化" fi # 检查平台是否已存在 - local exchange_exists=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - list exchanges --vhost "$VHOST" | grep -w "${platform}.exchange" || true) + local exchanges_json + exchanges_json=$(mq_list_exchanges) + local exchange_exists + exchange_exists=$(echo "$exchanges_json" | php -r ' + $data = json_decode(file_get_contents("php://stdin"), true); + foreach ($data ?? [] as $e) { + if ($e["name"] === "'"${platform}.exchange"'") { echo "yes"; exit; } + } + ' 2>/dev/null) - if [[ -n "$exchange_exists" ]]; then + if [[ "$exchange_exists" == "yes" ]]; then error "平台 '$platform' 已存在" fi @@ -833,17 +983,23 @@ cmd_reset_password() { esac # 检查用户是否存在 - local user_exists=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - list users 2>/dev/null | grep -w "${mq_user}" || true) + local users_json + users_json=$(mq_list_users) + local user_exists + user_exists=$(echo "$users_json" | php -r ' + $data = json_decode(file_get_contents("php://stdin"), true); + foreach ($data ?? [] as $u) { + if ($u["name"] === "'"${mq_user}"'") { echo "yes"; exit; } + } + ' 2>/dev/null) - if [[ -z "$user_exists" ]]; then + if [[ "$user_exists" != "yes" ]]; then error "用户 '$mq_user' 不存在" fi # 更新 RabbitMQ 用户密码 echo "" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user --name "$mq_user" --password "$new_password" --tags "" + mq_declare_user "$mq_user" "$new_password" "" info "RabbitMQ 用户密码已更新: $mq_user" # 更新 .env 文件 @@ -868,10 +1024,17 @@ cmd_list() { echo "========================================${NC}" # 检查 VHost 是否存在 - VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - list vhosts 2>/dev/null | grep -w "$VHOST" || true) + local vhost_list + vhost_list=$(mq_list_vhosts) + local vhost_exists + vhost_exists=$(echo "$vhost_list" | php -r ' + $data = json_decode(file_get_contents("php://stdin"), true); + foreach ($data ?? [] as $v) { + if ($v["name"] === "'"$VHOST"'") { echo "yes"; exit; } + } + ' 2>/dev/null) - if [[ -z "$VHOST_EXISTS" ]]; then + if [[ "$vhost_exists" != "yes" ]]; then warn "VHost '$VHOST' 不存在" return fi @@ -879,11 +1042,26 @@ cmd_list() { # 获取所有平台 Exchange (排除系统 Exchange) echo "" echo "MQ 中的平台:" - local exchanges=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - list exchanges --vhost "$VHOST" name -f tsv 2>/dev/null | grep -E "^[a-z0-9_]+\.exchange$" | grep -v "^main\." | grep -v "^errors\." | sed 's/\.exchange$//' | sort) + local exchanges_json + exchanges_json=$(mq_list_exchanges) + local platforms + platforms=$(echo "$exchanges_json" | php -r ' + $data = json_decode(file_get_contents("php://stdin"), true); + foreach ($data ?? [] as $e) { + $name = $e["name"]; + // 匹配 xxx.exchange 但排除 main.exchange, errors.exchange, xxx.errors.exchange, dlx.xxx + if (preg_match("/^([a-z0-9_]+)\.exchange$/", $name, $m) + && $m[1] !== "main" + && $m[1] !== "errors" + && !str_contains($name, ".errors.exchange") + && !str_starts_with($name, "dlx.")) { + echo $m[1] . "\n"; + } + } + ' 2>/dev/null | sort) - if [[ -n "$exchanges" ]]; then - echo "$exchanges" | while read platform; do + if [[ -n "$platforms" ]]; then + echo "$platforms" | while read platform; do echo " - $platform" done else @@ -910,8 +1088,20 @@ cmd_version() { echo -e "========================================${NC}" echo "" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - show overview 2>/dev/null || error "无法连接到 RabbitMQ 服务器" + local overview + overview=$(mq_api_call GET "/overview" 2>/dev/null) || error "无法连接到 RabbitMQ 服务器" + + echo "$overview" | php -r ' + $data = json_decode(file_get_contents("php://stdin"), true); + if ($data) { + echo "RabbitMQ 版本: " . ($data["rabbitmq_version"] ?? "未知") . "\n"; + echo "Erlang 版本: " . ($data["erlang_version"] ?? "未知") . "\n"; + echo "Management 版本: " . ($data["management_version"] ?? "未知") . "\n"; + echo "集群名称: " . ($data["cluster_name"] ?? "未知") . "\n"; + } else { + echo "无法解析服务器信息\n"; + } + ' 2>/dev/null || error "无法解析服务器信息" } # ============================================================================