#!/bin/bash # ============================================================================ # RabbitMQ 配置管理脚本 (适配 datahub-backend 容器环境) # 支持从数据库读取平台列表,自动配置 Exchange、Queue、Binding 和用户 # 通过 RabbitMQ Management HTTP API (curl) 操作,无需 rabbitmqadmin # ============================================================================ set -e # 脚本所在目录 (容器内路径: /opt/www) SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" BACKEND_DIR="$(dirname "$SCRIPT_DIR")" MQ_USER_CONFIG="$BACKEND_DIR/config/autoload/mq_user.php" # .env 密码文件输出路径 # 容器内 /mnt/share 映射到宿主机 /var/container/share # 执行完成后需手动合并到宿主机 /opt/datahub/backend/.env ENV_FILE="/mnt/share/rabbitmq_passwords.env" # RabbitMQ 连接信息 (容器间通过容器名通信) RABBITMQ_HOST="datahub-rabbitmq" RABBITMQ_PORT="15672" RABBITMQ_USER="user" RABBITMQ_PASS="hub123456" VHOST="datahub" # URL 编码的 VHost (用于 HTTP API 路径) VHOST_ENCODED=$(php -r "echo rawurlencode('$VHOST');" 2>/dev/null || echo "$VHOST") # RabbitMQ API 基础 URL MQ_API="http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api" # 数据类型列表 DATA_TYPES=("orders" "products" "refunds" "inventory") # 颜色输出 RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' BLUE='\033[0;34m' NC='\033[0m' # No Color # ============================================================================ # 帮助信息 # ============================================================================ show_help() { # 使用 printf '%b' 解释 ANSI 转义序列 printf '%b\n' "$(cat << EOF ${BLUE}RabbitMQ 配置管理脚本 (容器版)${NC} ${YELLOW}用法:${NC} $0 [options] ${YELLOW}命令:${NC} ${GREEN}init${NC} 从数据库读取所有启用的平台,完全重建 MQ 配置 - 删除现有 VHost 及所有资源 - 从 platforms 表读取 enabled=true 的平台 - 创建所有 Exchange、Queue、Binding - 生成随机密码并保存到 mq_user.php ${GREEN}add ${NC} 添加单个平台的 MQ 配置 - 创建平台的 Exchange 和 Binding - 创建平台用户并设置权限 - 生成随机密码并更新 mq_user.php 示例: $0 add shopee ${GREEN}remove ${NC} 移除单个平台的 MQ 配置 - 删除平台的 Exchange 和 Binding - 删除平台用户 - 从 mq_user.php 中移除配置 示例: $0 remove shopee ${GREEN}list${NC} 列出当前 MQ 中已配置的平台 ${GREEN}version${NC} 显示 RabbitMQ 服务器版本信息 ${GREEN}reset-password${NC} 重置指定用户的密码 --user 用户名称 (consumer/ops/平台名) 示例: $0 reset-password --user consumer 示例: $0 reset-password --user tmall ${GREEN}--help, -h${NC} 显示此帮助信息 ${YELLOW}配置文件:${NC} 密码输出: /mnt/share/rabbitmq_passwords.env (宿主机 /var/container/share/) 用户配置: $MQ_USER_CONFIG (动态从数据库读取平台列表) ${YELLOW}平台名称转换规则:${NC} - 空格转换为下划线 - 全部转为小写 - 示例: "Amazon Japan" -> "amazon_japan" ${YELLOW}数据库表结构:${NC} platforms 表需要包含以下字段: - id: 平台 ID - name: 平台名称 - enabled: 是否启用 (boolean) ${YELLOW}示例:${NC} $0 init # 完全重建 MQ 配置 $0 add lazada # 添加 lazada 平台 $0 remove kaola # 移除 kaola 平台 $0 list # 列出已配置的平台 EOF )" } # ============================================================================ # 依赖检测 # ============================================================================ # 检测是否在容器环境中运行 is_container() { # 检测 Docker if [[ -f /.dockerenv ]]; then return 0 fi # 检测 Podman (通过 /run/.containerenv 文件) if [[ -f /run/.containerenv ]]; then return 0 fi # 检测 cgroup (Docker/Kubernetes/Podman) if grep -qE 'docker|kubepods|containerd|libpod' /proc/1/cgroup 2>/dev/null; then return 0 fi # 检测 Kubernetes if [[ -n "${KUBERNETES_SERVICE_HOST:-}" ]]; then return 0 fi # 检测容器运行时环境变量 if [[ -n "${container:-}" ]]; then return 0 fi return 1 } check_dependencies() { local missing=() local in_container=false if is_container; then in_container=true fi # 检查 curl (用于调用 RabbitMQ HTTP API) if ! command -v curl &> /dev/null; then missing+=("curl") fi # 检查 php 及必要扩展 if ! command -v php &> /dev/null; then missing+=("php") else # 检查 PHP 扩展 if ! php -m 2>/dev/null | grep -qi "^pdo_pgsql$"; then missing+=("php-pdo_pgsql") fi if ! php -m 2>/dev/null | grep -qi "^sodium$"; then missing+=("php-sodium") fi fi if [[ ${#missing[@]} -gt 0 ]]; then echo -e "${RED}错误: 缺少必要的依赖工具${NC}" echo "" if $in_container; then echo -e "${YELLOW}检测到容器环境 (Docker/Podman/Kubernetes)${NC}" echo "建议在 Dockerfile/Containerfile 或镜像中预装以下工具:" echo "" fi for tool in "${missing[@]}"; do case "$tool" in curl) echo -e "${YELLOW}curl${NC} - HTTP 请求工具 (用于调用 RabbitMQ Management API)" if $in_container; then echo " Alpine: apk add curl" echo " Debian: apt-get install curl" else echo " 安装方式:" echo " Ubuntu/Debian: sudo apt install curl" echo " CentOS/RHEL: sudo yum install curl" fi echo "" ;; php) echo -e "${YELLOW}php${NC} - PHP 命令行" if $in_container; then echo " 如果使用 PHP 基础镜像,php 应该已经存在" echo " 否则参考: https://hub.docker.com/_/php" else echo " 安装方式:" echo " Ubuntu/Debian: sudo apt install php-cli" echo " CentOS/RHEL: sudo yum install php-cli" echo " macOS: brew install php" fi echo "" ;; php-pdo_pgsql) echo -e "${YELLOW}php-pdo_pgsql${NC} - PHP PostgreSQL PDO 扩展" if $in_container; then echo " Dockerfile/Containerfile 示例:" echo " RUN docker-php-ext-install pdo_pgsql" else echo " 安装方式:" echo " Ubuntu/Debian: sudo apt install php-pgsql" echo " CentOS/RHEL: sudo yum install php-pgsql" fi echo "" ;; php-sodium) echo -e "${YELLOW}php-sodium${NC} - PHP Sodium 扩展" if $in_container; then echo " Dockerfile/Containerfile 示例:" echo " RUN docker-php-ext-install sodium" else echo " 安装方式:" echo " Ubuntu/Debian: sudo apt install php-sodium" echo " CentOS/RHEL: sudo yum install php-sodium" fi echo "" ;; esac done exit 1 fi # 检查 RabbitMQ Management API 是否可达 local http_code http_code=$(curl -s -o /dev/null -w "%{http_code}" -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ "${MQ_API}/overview" 2>/dev/null || echo "000") if [[ "$http_code" == "000" ]]; then error "无法连接到 RabbitMQ Management API: ${MQ_API}\n请确认 datahub-rabbitmq 容器正在运行且 management 插件已启用" elif [[ "$http_code" == "401" ]]; then error "RabbitMQ 认证失败,请检查用户名和密码" elif [[ "$http_code" != "200" ]]; then error "RabbitMQ Management API 返回异常状态码: ${http_code}" fi } # ============================================================================ # RabbitMQ HTTP API 封装函数 # ============================================================================ # 通用 API 请求函数 # 用法: mq_api_call [JSON_DATA] # 写操作 (PUT/POST/DELETE) 自动丢弃响应体,GET 保留输出 mq_api_call() { local method="$1" local path="$2" local data="${3:-}" if [[ "$method" != "GET" ]]; then if [[ -n "$data" ]]; then curl -s -o /dev/null -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ -H "content-type: application/json" \ -X "$method" \ -d "$data" \ "${MQ_API}${path}" else curl -s -o /dev/null -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ -H "content-type: application/json" \ -X "$method" \ "${MQ_API}${path}" fi else if [[ -n "$data" ]]; then curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ -H "content-type: application/json" \ -X "$method" \ -d "$data" \ "${MQ_API}${path}" else curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \ -H "content-type: application/json" \ -X "$method" \ "${MQ_API}${path}" fi fi } # 声明 VHost mq_declare_vhost() { local name="$1" local encoded encoded=$(php -r "echo rawurlencode('$name');" 2>/dev/null || echo "$name") mq_api_call PUT "/vhosts/${encoded}" '{}' } # 删除 VHost mq_delete_vhost() { local name="$1" local encoded encoded=$(php -r "echo rawurlencode('$name');" 2>/dev/null || echo "$name") mq_api_call DELETE "/vhosts/${encoded}" 2>/dev/null || true } # 列出 VHosts mq_list_vhosts() { mq_api_call GET "/vhosts" 2>/dev/null } # 声明 Exchange mq_declare_exchange() { local name="$1" local type="${2:-topic}" local durable="${3:-true}" local body body=$(printf '{"type":"%s","durable":%s}' "$type" "$durable") mq_api_call PUT "/exchanges/${VHOST_ENCODED}/${name}" "$body" } # 删除 Exchange mq_delete_exchange() { local name="$1" mq_api_call DELETE "/exchanges/${VHOST_ENCODED}/${name}" 2>/dev/null || true } # 声明 Queue mq_declare_queue() { local name="$1" local arguments="$2" if [[ -z "$arguments" ]]; then arguments='{}' fi local body body=$(printf '{"durable":true,"arguments":%s}' "$arguments") mq_api_call PUT "/queues/${VHOST_ENCODED}/${name}" "$body" } # 声明 Binding mq_declare_binding() { local source="$1" local destination="$2" local routing_key="$3" local dest_type="${4:-queue}" local dest_char="q" if [[ "$dest_type" == "exchange" ]]; then dest_char="e" fi mq_api_call POST "/bindings/${VHOST_ENCODED}/e/${source}/${dest_char}/${destination}" \ "$(printf '{"routing_key":"%s"}' "$routing_key")" } # 声明用户 mq_declare_user() { local name="$1" local password="$2" local tags="${3:-}" mq_api_call PUT "/users/${name}" \ "$(printf '{"password":"%s","tags":"%s"}' "$password" "$tags")" } # 删除用户 mq_delete_user() { local name="$1" mq_api_call DELETE "/users/${name}" 2>/dev/null || true } # 列出用户 mq_list_users() { mq_api_call GET "/users" 2>/dev/null } # 设置用户权限 mq_set_permissions() { local user="$1" local configure="$2" local write="$3" local read="$4" mq_api_call PUT "/permissions/${VHOST_ENCODED}/${user}" \ "$(printf '{"configure":"%s","write":"%s","read":"%s"}' "$configure" "$write" "$read")" } # 列出 Exchanges mq_list_exchanges() { mq_api_call GET "/exchanges/${VHOST_ENCODED}" 2>/dev/null } # ============================================================================ # 工具函数 # ============================================================================ # 输出信息 info() { echo -e "${GREEN}✓${NC} $1" } warn() { echo -e "${YELLOW}⚠${NC} $1" } error() { echo -e "${RED}✗${NC} $1" exit 1 } # 从容器环境变量读取数据库配置 load_env() { # 容器内通过环境变量注入,直接读取 DB_HOST="${DB_HOST:-}" DB_PORT="${DB_PORT:-5432}" DB_DATABASE="${DB_DATABASE:-}" DB_USERNAME="${DB_USERNAME:-}" DB_PASSWORD="${DB_PASSWORD:-}" if [[ -z "$DB_HOST" || -z "$DB_DATABASE" ]]; then error "无法读取数据库配置,请确认容器环境变量 DB_HOST, DB_DATABASE 等已设置" fi } # 平台名称标准化:空格转下划线,全部小写 normalize_platform_name() { echo "$1" | tr '[:upper:]' '[:lower:]' | tr ' ' '_' } # 生成随机密码 (16位,使用 PHP sodium) generate_password() { php -r "echo bin2hex(sodium_crypto_secretbox_keygen());" | head -c 16 } # 从数据库获取所有启用的平台 (使用 PHP PDO) get_platforms_from_db() { load_env php << EOF PDO::ERRMODE_EXCEPTION ]); \$stmt = \$pdo->query("SELECT name FROM platforms WHERE enabled = true ORDER BY id"); while (\$row = \$stmt->fetch(PDO::FETCH_ASSOC)) { echo \$row['name'] . "\n"; } } catch (PDOException \$e) { fwrite(STDERR, "数据库连接失败: " . \$e->getMessage() . "\n"); exit(1); } EOF } # 检查平台是否在数据库中存在 (使用 PHP PDO) platform_exists_in_db() { local platform_name="$1" load_env local count=$(php << EOF PDO::ERRMODE_EXCEPTION ]); \$stmt = \$pdo->prepare("SELECT COUNT(*) FROM platforms WHERE LOWER(REPLACE(name, ' ', '_')) = LOWER(:name)"); \$stmt->execute(['name' => '$platform_name']); echo \$stmt->fetchColumn(); } catch (PDOException \$e) { echo "0"; } EOF ) [[ "$count" -gt 0 ]] } # ============================================================================ # .env 密码配置操作 # ============================================================================ # 添加或更新 .env 中的密码变量 # 用法: add_password_to_env "SHOPEE" "password123" add_password_to_env() { local platform_upper="$1" local password="$2" local env_key="MQ_PASSWORD_${platform_upper}" # 检查变量是否已存在 if grep -q "^${env_key}=" "$ENV_FILE" 2>/dev/null; then # 更新现有变量 sed -i "s|^${env_key}=.*|${env_key}=\"${password}\"|" "$ENV_FILE" else # 检查是否需要添加分隔注释 if ! grep -q "^# RabbitMQ" "$ENV_FILE" 2>/dev/null; then echo "" >> "$ENV_FILE" echo "# RabbitMQ 用户密码配置 (由 bin/rabbitmq.sh 自动生成)" >> "$ENV_FILE" fi # 追加新变量 echo "${env_key}=\"${password}\"" >> "$ENV_FILE" fi } # 从 .env 中移除密码变量 remove_password_from_env() { local platform_upper="$1" local env_key="MQ_PASSWORD_${platform_upper}" if grep -q "^${env_key}=" "$ENV_FILE" 2>/dev/null; then sed -i "/^${env_key}=/d" "$ENV_FILE" fi } # 读取 .env 中的现有密码 get_existing_password_from_env() { local platform_upper="$1" local env_key="MQ_PASSWORD_${platform_upper}" grep "^${env_key}=" "$ENV_FILE" 2>/dev/null | cut -d'=' -f2 | tr -d '"' | tr -d "'" } # 批量写入所有密码到 .env (用于 init 模式) write_all_passwords_to_env() { local -n platforms_ref=$1 local -n passwords_ref=$2 local consumer_password="$3" local ops_password="$4" # 先清理现有的 MQ_PASSWORD_ 变量 sed -i '/^MQ_PASSWORD_/d' "$ENV_FILE" 2>/dev/null || true sed -i '/^# RabbitMQ 用户密码配置/d' "$ENV_FILE" 2>/dev/null || true # 移除文件末尾的多余空行 sed -i -e :a -e '/^\n*$/{$d;N;ba' -e '}' "$ENV_FILE" 2>/dev/null || true # 追加新配置 { echo "" echo "# RabbitMQ 用户密码配置 (由 bin/rabbitmq.sh 自动生成,时间: $(date '+%Y-%m-%d %H:%M:%S'))" echo "MQ_PASSWORD_CONSUMER=\"${consumer_password}\"" echo "MQ_PASSWORD_OPS=\"${ops_password}\"" for platform in "${platforms_ref[@]}"; do local platform_upper=$(echo "$platform" | tr '[:lower:]' '[:upper:]') echo "MQ_PASSWORD_${platform_upper}=\"${passwords_ref[$platform]}\"" done } >> "$ENV_FILE" } # ============================================================================ # RabbitMQ 操作函数 # ============================================================================ # 创建单个平台的 MQ 配置 create_platform_config() { local platform="$1" local password="$2" echo "" echo -e "${BLUE}处理平台: ${platform}${NC}" # 创建平台业务 Exchange mq_declare_exchange "${platform}.exchange" "topic" "true" info "创建业务 Exchange: ${platform}.exchange" # 绑定到业务队列 for dtype in "${DATA_TYPES[@]}"; do local dtype_singular="${dtype%s}" mq_declare_binding "${platform}.exchange" "${dtype}.queue" "${dtype_singular}.${platform}" done info "绑定到业务队列 (${#DATA_TYPES[@]}个)" # 创建平台错误 Exchange mq_declare_exchange "${platform}.errors.exchange" "topic" "true" info "创建错误 Exchange: ${platform}.errors.exchange" # 绑定到错误队列 mq_declare_binding "${platform}.errors.exchange" "errors.queue" "#" info "绑定到错误队列" # 创建平台用户 mq_declare_user "user_${platform}" "$password" "" info "创建用户: user_${platform}" # 配置平台用户权限 mq_set_permissions "user_${platform}" \ "^${platform}\\.(exchange|errors\\.exchange)$" \ "^${platform}\\.(exchange|errors\\.exchange)$" \ "^${platform}\\.errors\\..*$" info "配置用户权限" } # 删除单个平台的 MQ 配置 remove_platform_config() { local platform="$1" echo "" echo -e "${BLUE}移除平台: ${platform}${NC}" # 删除用户 mq_delete_user "user_${platform}" info "删除用户: user_${platform}" # 删除 Exchange (会自动删除相关的 Binding) mq_delete_exchange "${platform}.exchange" info "删除 Exchange: ${platform}.exchange" mq_delete_exchange "${platform}.errors.exchange" info "删除 Exchange: ${platform}.errors.exchange" } # 创建基础设施 (VHost, 主 Exchange, DLX, 队列等) create_infrastructure() { echo "" echo "========================================" echo "创建基础设施..." echo "========================================" # 1. 创建 VHost echo "" echo "创建 VHost: $VHOST" mq_declare_vhost "$VHOST" info "VHost 创建成功" # 2. 创建主 Exchange mq_declare_exchange "main.exchange" "topic" "true" info "创建主 Exchange: main.exchange" # 3. 创建 DLX echo "" echo "创建死信交换机 (DLX)..." for dtype in "${DATA_TYPES[@]}"; do mq_declare_exchange "dlx.${dtype}" "topic" "true" info "创建 DLX: dlx.${dtype}" done # 4. 创建主业务队列 echo "" echo "创建主业务队列..." for dtype in "${DATA_TYPES[@]}"; do mq_declare_queue "${dtype}.queue" \ '{"x-message-ttl":86400000,"x-dead-letter-exchange":"dlx.'"${dtype}"'","x-dead-letter-routing-key":"retry"}' info "创建队列: ${dtype}.queue" done # 5. 创建重试队列 echo "" echo "创建重试队列..." for dtype in "${DATA_TYPES[@]}"; do local dtype_singular="${dtype%s}" mq_declare_queue "${dtype}.retry.queue" \ '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"'"${dtype_singular}"'.retry"}' info "创建重试队列: ${dtype}.retry.queue" done # 6. 创建错误 Exchange 和队列 echo "" echo "创建错误 Exchange 和队列..." mq_declare_exchange "errors.exchange" "topic" "true" info "创建错误 Exchange: errors.exchange" mq_declare_queue "errors.queue" '{"x-message-ttl":604800000}' info "创建错误队列: errors.queue" mq_declare_binding "errors.exchange" "errors.queue" "#" info "绑定: errors.exchange → errors.queue" # 7. 绑定主 Exchange 到主队列 echo "" echo "绑定主 Exchange 到主队列..." for dtype in "${DATA_TYPES[@]}"; do local dtype_singular="${dtype%s}" mq_declare_binding "main.exchange" "${dtype}.queue" "${dtype_singular}.#" info "绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype_singular}.#)" done # 8. 绑定 DLX 到重试队列 echo "" echo "绑定 DLX 到重试队列..." for dtype in "${DATA_TYPES[@]}"; do mq_declare_binding "dlx.${dtype}" "${dtype}.retry.queue" "retry" info "绑定: dlx.${dtype} → ${dtype}.retry.queue" done } # 创建系统用户 (consumer 和 ops) create_system_users() { local consumer_password="$1" local ops_password="$2" echo "" echo "========================================" echo "创建系统用户..." echo "========================================" # 创建 datahub 消费者用户 mq_declare_user "user_datahub_consumer" "$consumer_password" "" info "创建用户: user_datahub_consumer" mq_set_permissions "user_datahub_consumer" \ "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \ "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \ "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$" info "配置 consumer 用户权限" # 创建运维监控用户 mq_declare_user "user_ops" "$ops_password" "" info "创建用户: user_ops" mq_set_permissions "user_ops" \ "^errors\\..*$" "" "^errors\\.queue$" info "配置 ops 用户权限" } # 清理现有配置 cleanup_existing() { echo "========================================" echo "开始清理现有配置..." echo "========================================" # 检查 VHost 是否存在 echo "" echo "检查 VHost: $VHOST 是否存在..." local vhost_list vhost_list=$(mq_list_vhosts) local vhost_exists vhost_exists=$(echo "$vhost_list" | php -r ' $data = json_decode(file_get_contents("php://stdin"), true); foreach ($data ?? [] as $v) { if ($v["name"] === "'"$VHOST"'") { echo "yes"; exit; } } ' 2>/dev/null) if [[ "$vhost_exists" == "yes" ]]; then warn "VHost '$VHOST' 已存在,将删除所有现有配置..." # 获取所有平台用户并删除 local users_json users_json=$(mq_list_users) local users users=$(echo "$users_json" | php -r ' $data = json_decode(file_get_contents("php://stdin"), true); foreach ($data ?? [] as $u) { if (strpos($u["name"], "user_") === 0) { echo $u["name"] . "\n"; } } ' 2>/dev/null) while IFS= read -r user; do if [[ -n "$user" ]]; then mq_delete_user "$user" info "删除用户: $user" fi done <<< "$users" # 删除 VHost mq_delete_vhost "$VHOST" info "VHost '$VHOST' 及其所有资源已删除" else echo "VHost '$VHOST' 不存在" fi } # ============================================================================ # 主命令函数 # ============================================================================ # init 命令:完全重建 cmd_init() { echo -e "${BLUE}========================================" echo "RabbitMQ 初始化模式" echo "========================================${NC}" # 1. 从数据库获取平台列表 echo "" echo "从数据库读取平台列表..." local db_platforms=$(get_platforms_from_db) if [[ -z "$db_platforms" ]]; then error "无法从数据库获取平台列表,或没有启用的平台" fi # 转换平台名称 declare -a PLATFORMS=() declare -A PLATFORM_PASSWORDS=() while IFS= read -r platform; do local normalized=$(normalize_platform_name "$platform") PLATFORMS+=("$normalized") PLATFORM_PASSWORDS["$normalized"]=$(generate_password) done <<< "$db_platforms" echo "找到 ${#PLATFORMS[@]} 个启用的平台:" printf " - %s\n" "${PLATFORMS[@]}" # 2. 清理现有配置 cleanup_existing # 3. 创建基础设施 create_infrastructure # 4. 生成系统用户密码 local consumer_password=$(generate_password) local ops_password=$(generate_password) # 5. 创建系统用户 create_system_users "$consumer_password" "$ops_password" # 6. 为每个平台创建配置 echo "" echo "========================================" echo "创建平台配置..." echo "========================================" for platform in "${PLATFORMS[@]}"; do create_platform_config "$platform" "${PLATFORM_PASSWORDS[$platform]}" done # 7. 写入密码到 .env echo "" echo "========================================" echo "写入密码到 .env..." echo "========================================" write_all_passwords_to_env PLATFORMS PLATFORM_PASSWORDS "$consumer_password" "$ops_password" info "密码已写入: $ENV_FILE" # 8. 输出摘要 echo "" echo -e "${GREEN}========================================" echo "RabbitMQ 配置完成!" echo "========================================${NC}" echo "" echo "已创建:" echo "- 1 个 VHost: $VHOST" echo "- 1 个主 Exchange: main.exchange" echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" echo "- 4 个重试队列: *.retry.queue" echo "- 1 个错误队列: errors.queue" echo "- ${#PLATFORMS[@]} 个平台配置" echo "- $((${#PLATFORMS[@]} + 2)) 个用户" echo "" echo "密码配置: $ENV_FILE" echo " (宿主机路径: /var/container/share/rabbitmq_passwords.env)" echo " 请手动将密码合并到宿主机 /opt/datahub/backend/.env" echo "用户配置: $MQ_USER_CONFIG (动态从数据库读取)" } # add 命令:添加单个平台 cmd_add() { local platform_input="$1" if [[ -z "$platform_input" ]]; then error "请指定要添加的平台名称,例如: $0 add shopee" fi local platform=$(normalize_platform_name "$platform_input") echo -e "${BLUE}========================================" echo "添加平台: $platform" echo "========================================${NC}" # 检查 VHost 是否存在 local vhost_list vhost_list=$(mq_list_vhosts) local vhost_exists vhost_exists=$(echo "$vhost_list" | php -r ' $data = json_decode(file_get_contents("php://stdin"), true); foreach ($data ?? [] as $v) { if ($v["name"] === "'"$VHOST"'") { echo "yes"; exit; } } ' 2>/dev/null) if [[ "$vhost_exists" != "yes" ]]; then error "VHost '$VHOST' 不存在,请先运行 '$0 init' 初始化" fi # 检查平台是否已存在 local exchanges_json exchanges_json=$(mq_list_exchanges) local exchange_exists exchange_exists=$(echo "$exchanges_json" | php -r ' $data = json_decode(file_get_contents("php://stdin"), true); foreach ($data ?? [] as $e) { if ($e["name"] === "'"${platform}.exchange"'") { echo "yes"; exit; } } ' 2>/dev/null) if [[ "$exchange_exists" == "yes" ]]; then error "平台 '$platform' 已存在" fi # 生成密码 local password=$(generate_password) # 创建平台配置 create_platform_config "$platform" "$password" # 写入密码到 .env local platform_upper=$(echo "$platform" | tr '[:lower:]' '[:upper:]') add_password_to_env "$platform_upper" "$password" echo "" echo -e "${GREEN}========================================" echo "平台 $platform 添加成功!" echo "========================================${NC}" echo "" echo "用户: user_${platform}" echo "密码已写入 .env: MQ_PASSWORD_${platform_upper}" } # remove 命令:移除单个平台 cmd_remove() { local platform_input="$1" if [[ -z "$platform_input" ]]; then error "请指定要移除的平台名称,例如: $0 remove shopee" fi local platform=$(normalize_platform_name "$platform_input") echo -e "${BLUE}========================================" echo "移除平台: $platform" echo "========================================${NC}" # 移除 MQ 配置 remove_platform_config "$platform" # 从 .env 中移除密码 local platform_upper=$(echo "$platform" | tr '[:lower:]' '[:upper:]') remove_password_from_env "$platform_upper" echo "" echo -e "${GREEN}========================================" echo "平台 $platform 移除成功!" echo "========================================${NC}" echo "" echo "已从 .env 移除: MQ_PASSWORD_${platform_upper}" } # reset-password 命令:重置用户密码 cmd_reset_password() { local user_name="" # 解析参数 while [[ $# -gt 0 ]]; do case "$1" in --user) user_name="$2" shift 2 ;; *) error "未知参数: $1\n用法: $0 reset-password --user " ;; esac done if [[ -z "$user_name" ]]; then error "请指定用户名称\n用法: $0 reset-password --user \n示例: $0 reset-password --user consumer" fi # 标准化用户名称 local normalized_name=$(normalize_platform_name "$user_name") local new_password=$(generate_password) echo -e "${BLUE}========================================" echo "重置用户密码: $normalized_name" echo "========================================${NC}" # 判断用户类型并处理 case "$normalized_name" in consumer) local mq_user="user_datahub_consumer" local env_key="CONSUMER" ;; ops) local mq_user="user_ops" local env_key="OPS" ;; *) # 平台用户 local mq_user="user_${normalized_name}" local env_key=$(echo "$normalized_name" | tr '[:lower:]' '[:upper:]') ;; esac # 检查用户是否存在 local users_json users_json=$(mq_list_users) local user_exists user_exists=$(echo "$users_json" | php -r ' $data = json_decode(file_get_contents("php://stdin"), true); foreach ($data ?? [] as $u) { if ($u["name"] === "'"${mq_user}"'") { echo "yes"; exit; } } ' 2>/dev/null) if [[ "$user_exists" != "yes" ]]; then error "用户 '$mq_user' 不存在" fi # 更新 RabbitMQ 用户密码 echo "" mq_declare_user "$mq_user" "$new_password" "" info "RabbitMQ 用户密码已更新: $mq_user" # 更新 .env 文件 add_password_to_env "$env_key" "$new_password" info "密码已写入 .env: MQ_PASSWORD_${env_key}" echo "" echo -e "${GREEN}========================================" echo "密码重置成功!" echo "========================================${NC}" echo "" echo "用户: $mq_user" echo "新密码已保存到 .env: MQ_PASSWORD_${env_key}" echo "" echo -e "${YELLOW}注意: 请确保相关服务使用新密码重新连接${NC}" } # list 命令:列出已配置的平台 cmd_list() { echo -e "${BLUE}========================================" echo "已配置的平台列表" echo "========================================${NC}" # 检查 VHost 是否存在 local vhost_list vhost_list=$(mq_list_vhosts) local vhost_exists vhost_exists=$(echo "$vhost_list" | php -r ' $data = json_decode(file_get_contents("php://stdin"), true); foreach ($data ?? [] as $v) { if ($v["name"] === "'"$VHOST"'") { echo "yes"; exit; } } ' 2>/dev/null) if [[ "$vhost_exists" != "yes" ]]; then warn "VHost '$VHOST' 不存在" return fi # 获取所有平台 Exchange (排除系统 Exchange) echo "" echo "MQ 中的平台:" local exchanges_json exchanges_json=$(mq_list_exchanges) local platforms platforms=$(echo "$exchanges_json" | php -r ' $data = json_decode(file_get_contents("php://stdin"), true); foreach ($data ?? [] as $e) { $name = $e["name"]; // 匹配 xxx.exchange 但排除 main.exchange, errors.exchange, xxx.errors.exchange, dlx.xxx if (preg_match("/^([a-z0-9_]+)\.exchange$/", $name, $m) && $m[1] !== "main" && $m[1] !== "errors" && !str_contains($name, ".errors.exchange") && !str_starts_with($name, "dlx.")) { echo $m[1] . "\n"; } } ' 2>/dev/null | sort) if [[ -n "$platforms" ]]; then echo "$platforms" | while read platform; do echo " - $platform" done else echo " (无)" fi # 显示配置文件中的平台 if [[ -f "$MQ_USER_CONFIG" ]]; then echo "" echo "配置文件中的平台:" php -r " \$config = include '$MQ_USER_CONFIG'; foreach (\$config['platforms'] ?? [] as \$name => \$data) { echo \" - \$name\n\"; } " 2>/dev/null || echo " (无法读取配置文件)" fi } # version 命令:显示 RabbitMQ 服务器版本信息 cmd_version() { echo -e "${BLUE}========================================" echo -e "RabbitMQ 服务器版本信息" echo -e "========================================${NC}" echo "" local overview overview=$(mq_api_call GET "/overview" 2>/dev/null) || error "无法连接到 RabbitMQ 服务器" echo "$overview" | php -r ' $data = json_decode(file_get_contents("php://stdin"), true); if ($data) { echo "RabbitMQ 版本: " . ($data["rabbitmq_version"] ?? "未知") . "\n"; echo "Erlang 版本: " . ($data["erlang_version"] ?? "未知") . "\n"; echo "Management 版本: " . ($data["management_version"] ?? "未知") . "\n"; echo "集群名称: " . ($data["cluster_name"] ?? "未知") . "\n"; } else { echo "无法解析服务器信息\n"; } ' 2>/dev/null || error "无法解析服务器信息" } # ============================================================================ # 主入口 # ============================================================================ main() { local command="${1:-}" # 显示帮助不需要检测依赖 if [[ "$command" != "--help" && "$command" != "-h" && "$command" != "help" && -n "$command" ]]; then check_dependencies fi case "$command" in init) cmd_init ;; add) cmd_add "$2" ;; remove) cmd_remove "$2" ;; list) cmd_list ;; version) cmd_version ;; reset-password) shift cmd_reset_password "$@" ;; --help|-h|help|"") show_help ;; *) error "未知命令: $command\n使用 '$0 --help' 查看帮助" ;; esac } main "$@"