diff --git a/backend/bin/rabbitmq.sh b/backend/bin/rabbitmq.sh index a5e5a92..7bc47e0 100755 --- a/backend/bin/rabbitmq.sh +++ b/backend/bin/rabbitmq.sh @@ -1,5 +1,18 @@ #!/bin/bash +# ============================================================================ +# RabbitMQ 配置管理脚本 +# 支持从数据库读取平台列表,自动配置 Exchange、Queue、Binding 和用户 +# ============================================================================ + +set -e + +# 脚本所在目录 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BACKEND_DIR="$(dirname "$SCRIPT_DIR")" +ENV_FILE="$BACKEND_DIR/.env" +MQ_USER_CONFIG="$BACKEND_DIR/config/autoload/mq_user.php" + # RabbitMQ 连接信息 RABBITMQ_HOST="127.0.0.1" RABBITMQ_PORT="15672" @@ -7,254 +20,837 @@ RABBITMQ_USER="admin" RABBITMQ_PASS="admin" VHOST="dataflow" -# 平台列表 -PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify") +# 数据类型列表 DATA_TYPES=("orders" "products" "refunds" "inventory") -# 0. 检测并清理现有 VHost 和用户(如果存在) -echo "========================================" -echo "开始清理现有配置..." -echo "========================================" +# 颜色输出 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color -# 检查 VHost 是否存在 -echo "检查 VHost: $VHOST 是否存在..." -VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true) +# ============================================================================ +# 帮助信息 +# ============================================================================ +show_help() { + cat << EOF +${BLUE}RabbitMQ 配置管理脚本${NC} -if [ -n "$VHOST_EXISTS" ]; then - echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..." +${YELLOW}用法:${NC} + $0 [options] - # 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限) - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - delete vhost --name "$VHOST" +${YELLOW}命令:${NC} + ${GREEN}init${NC} 从数据库读取所有启用的平台,完全重建 MQ 配置 + - 删除现有 VHost 及所有资源 + - 从 platforms 表读取 enabled=true 的平台 + - 创建所有 Exchange、Queue、Binding + - 生成随机密码并保存到 mq_user.php - echo "✓ VHost '$VHOST' 及其所有资源已删除" -else - echo "VHost '$VHOST' 不存在" -fi + ${GREEN}add ${NC} 添加单个平台的 MQ 配置 + - 创建平台的 Exchange 和 Binding + - 创建平台用户并设置权限 + - 生成随机密码并更新 mq_user.php + 示例: $0 add shopee -# 清理所有相关用户账号 -echo "" -echo "清理用户账号..." + ${GREEN}remove ${NC} 移除单个平台的 MQ 配置 + - 删除平台的 Exchange 和 Binding + - 删除平台用户 + - 从 mq_user.php 中移除配置 + 示例: $0 remove shopee -# 平台用户列表 -ALL_USERS=() -for platform in "${PLATFORMS[@]}"; do - ALL_USERS+=("user_${platform}") -done + ${GREEN}list${NC} 列出当前 MQ 中已配置的平台 -# 添加其他用户 -ALL_USERS+=("user_dataflow_consumer") -ALL_USERS+=("user_ops") + ${GREEN}--help, -h${NC} 显示此帮助信息 -# 删除用户(如果存在) -for user in "${ALL_USERS[@]}"; do - USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true) - if [ -n "$USER_EXISTS" ]; then +${YELLOW}配置文件:${NC} + 环境配置: $ENV_FILE (包含 MQ_PASSWORD_* 密码变量) + 用户配置: $MQ_USER_CONFIG (动态从数据库读取平台列表) + +${YELLOW}平台名称转换规则:${NC} + - 空格转换为下划线 + - 全部转为小写 + - 示例: "Amazon Japan" -> "amazon_japan" + +${YELLOW}数据库表结构:${NC} + platforms 表需要包含以下字段: + - id: 平台 ID + - name: 平台名称 + - enabled: 是否启用 (boolean) + +${YELLOW}示例:${NC} + $0 init # 完全重建 MQ 配置 + $0 add lazada # 添加 lazada 平台 + $0 remove kaola # 移除 kaola 平台 + $0 list # 列出已配置的平台 + +EOF +} + +# ============================================================================ +# 依赖检测 +# ============================================================================ + +# 检测是否在容器环境中运行 +is_container() { + # 检测 Docker + if [[ -f /.dockerenv ]]; then + return 0 + fi + + # 检测 Podman (通过 /run/.containerenv 文件) + if [[ -f /run/.containerenv ]]; then + return 0 + fi + + # 检测 cgroup (Docker/Kubernetes/Podman) + if grep -qE 'docker|kubepods|containerd|libpod' /proc/1/cgroup 2>/dev/null; then + return 0 + fi + + # 检测 Kubernetes + if [[ -n "${KUBERNETES_SERVICE_HOST:-}" ]]; then + return 0 + fi + + # 检测容器运行时环境变量 + if [[ -n "${container:-}" ]]; then + return 0 + fi + + return 1 +} + +check_dependencies() { + local missing=() + local in_container=false + + if is_container; then + in_container=true + fi + + # 检查 rabbitmqadmin + if ! command -v rabbitmqadmin &> /dev/null; then + missing+=("rabbitmqadmin") + fi + + # 检查 php 及必要扩展 + if ! command -v php &> /dev/null; then + missing+=("php") + else + # 检查 PHP 扩展 + if ! php -m 2>/dev/null | grep -qi "^pdo_pgsql$"; then + missing+=("php-pdo_pgsql") + fi + if ! php -m 2>/dev/null | grep -qi "^sodium$"; then + missing+=("php-sodium") + fi + fi + + if [[ ${#missing[@]} -gt 0 ]]; then + echo -e "${RED}错误: 缺少必要的依赖工具${NC}" + echo "" + + if $in_container; then + echo -e "${YELLOW}检测到容器环境 (Docker/Podman/Kubernetes)${NC}" + echo "建议在 Dockerfile/Containerfile 或镜像中预装以下工具:" + echo "" + fi + + for tool in "${missing[@]}"; do + case "$tool" in + rabbitmqadmin) + echo -e "${YELLOW}rabbitmqadmin${NC} - RabbitMQ 管理工具" + if $in_container; then + echo " Dockerfile/Containerfile 示例:" + echo " RUN wget -O /usr/local/bin/rabbitmqadmin \\" + echo " http://rabbitmq:15672/cli/rabbitmqadmin && \\" + echo " chmod +x /usr/local/bin/rabbitmqadmin" + echo " 或使用 pip:" + echo " RUN pip install rabbitmqadmin" + else + echo " 安装方式:" + echo " 1. 从 RabbitMQ 管理界面下载:" + echo " wget http://localhost:15672/cli/rabbitmqadmin" + echo " chmod +x rabbitmqadmin" + echo " sudo mv rabbitmqadmin /usr/local/bin/" + echo " 2. 或者使用 pip:" + echo " pip install rabbitmqadmin" + fi + echo "" + ;; + php) + echo -e "${YELLOW}php${NC} - PHP 命令行" + if $in_container; then + echo " 如果使用 PHP 基础镜像,php 应该已经存在" + echo " 否则参考: https://hub.docker.com/_/php" + else + echo " 安装方式:" + echo " Ubuntu/Debian: sudo apt install php-cli" + echo " CentOS/RHEL: sudo yum install php-cli" + echo " macOS: brew install php" + fi + echo "" + ;; + php-pdo_pgsql) + echo -e "${YELLOW}php-pdo_pgsql${NC} - PHP PostgreSQL PDO 扩展" + if $in_container; then + echo " Dockerfile/Containerfile 示例:" + echo " RUN docker-php-ext-install pdo_pgsql" + else + echo " 安装方式:" + echo " Ubuntu/Debian: sudo apt install php-pgsql" + echo " CentOS/RHEL: sudo yum install php-pgsql" + fi + echo "" + ;; + php-sodium) + echo -e "${YELLOW}php-sodium${NC} - PHP Sodium 扩展" + if $in_container; then + echo " Dockerfile/Containerfile 示例:" + echo " RUN docker-php-ext-install sodium" + else + echo " 安装方式:" + echo " Ubuntu/Debian: sudo apt install php-sodium" + echo " CentOS/RHEL: sudo yum install php-sodium" + echo " 注意: PHP 7.2+ 默认包含 sodium 扩展" + fi + echo "" + ;; + esac + done + exit 1 + fi +} + +# ============================================================================ +# 工具函数 +# ============================================================================ + +# 输出信息 +info() { + echo -e "${GREEN}✓${NC} $1" +} + +warn() { + echo -e "${YELLOW}⚠${NC} $1" +} + +error() { + echo -e "${RED}✗${NC} $1" + exit 1 +} + +# 从 .env 文件读取配置 +load_env() { + if [[ ! -f "$ENV_FILE" ]]; then + error "找不到 .env 文件: $ENV_FILE" + fi + + # 读取数据库配置 + DB_HOST=$(grep -E "^DB_HOST=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + DB_PORT=$(grep -E "^DB_PORT=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + DB_DATABASE=$(grep -E "^DB_DATABASE=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + DB_USERNAME=$(grep -E "^DB_USERNAME=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + DB_PASSWORD=$(grep -E "^DB_PASSWORD=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'") + + if [[ -z "$DB_HOST" || -z "$DB_DATABASE" ]]; then + error "无法从 .env 文件读取数据库配置" + fi +} + +# 平台名称标准化:空格转下划线,全部小写 +normalize_platform_name() { + echo "$1" | tr '[:upper:]' '[:lower:]' | tr ' ' '_' +} + +# 生成随机密码 (16位,使用 PHP sodium) +generate_password() { + php -r "echo bin2hex(sodium_crypto_secretbox_keygen());" | head -c 16 +} + +# 从数据库获取所有启用的平台 (使用 PHP PDO) +get_platforms_from_db() { + load_env + php << EOF + PDO::ERRMODE_EXCEPTION + ]); + \$stmt = \$pdo->query("SELECT name FROM platforms WHERE enabled = true ORDER BY id"); + while (\$row = \$stmt->fetch(PDO::FETCH_ASSOC)) { + echo \$row['name'] . "\n"; + } +} catch (PDOException \$e) { + fwrite(STDERR, "数据库连接失败: " . \$e->getMessage() . "\n"); + exit(1); +} +EOF +} + +# 检查平台是否在数据库中存在 (使用 PHP PDO) +platform_exists_in_db() { + local platform_name="$1" + load_env + local count=$(php << EOF + PDO::ERRMODE_EXCEPTION + ]); + \$stmt = \$pdo->prepare("SELECT COUNT(*) FROM platforms WHERE LOWER(REPLACE(name, ' ', '_')) = LOWER(:name)"); + \$stmt->execute(['name' => '$platform_name']); + echo \$stmt->fetchColumn(); +} catch (PDOException \$e) { + echo "0"; +} +EOF +) + [[ "$count" -gt 0 ]] +} + +# ============================================================================ +# .env 密码配置操作 +# ============================================================================ + +# 添加或更新 .env 中的密码变量 +# 用法: add_password_to_env "SHOPEE" "password123" +add_password_to_env() { + local platform_upper="$1" + local password="$2" + local env_key="MQ_PASSWORD_${platform_upper}" + + # 检查变量是否已存在 + if grep -q "^${env_key}=" "$ENV_FILE" 2>/dev/null; then + # 更新现有变量 + sed -i "s|^${env_key}=.*|${env_key}=\"${password}\"|" "$ENV_FILE" + else + # 检查是否需要添加分隔注释 + if ! grep -q "^# RabbitMQ" "$ENV_FILE" 2>/dev/null; then + echo "" >> "$ENV_FILE" + echo "# RabbitMQ 用户密码配置 (由 bin/rabbitmq.sh 自动生成)" >> "$ENV_FILE" + fi + # 追加新变量 + echo "${env_key}=\"${password}\"" >> "$ENV_FILE" + fi +} + +# 从 .env 中移除密码变量 +remove_password_from_env() { + local platform_upper="$1" + local env_key="MQ_PASSWORD_${platform_upper}" + + if grep -q "^${env_key}=" "$ENV_FILE" 2>/dev/null; then + sed -i "/^${env_key}=/d" "$ENV_FILE" + fi +} + +# 读取 .env 中的现有密码 +get_existing_password_from_env() { + local platform_upper="$1" + local env_key="MQ_PASSWORD_${platform_upper}" + + grep "^${env_key}=" "$ENV_FILE" 2>/dev/null | cut -d'=' -f2 | tr -d '"' | tr -d "'" +} + +# 批量写入所有密码到 .env (用于 init 模式) +write_all_passwords_to_env() { + local -n platforms_ref=$1 + local -n passwords_ref=$2 + local consumer_password="$3" + local ops_password="$4" + + # 先清理现有的 MQ_PASSWORD_ 变量 + sed -i '/^MQ_PASSWORD_/d' "$ENV_FILE" 2>/dev/null || true + sed -i '/^# RabbitMQ 用户密码配置/d' "$ENV_FILE" 2>/dev/null || true + + # 移除文件末尾的多余空行 + sed -i -e :a -e '/^\n*$/{$d;N;ba' -e '}' "$ENV_FILE" 2>/dev/null || true + + # 追加新配置 + { + echo "" + echo "# RabbitMQ 用户密码配置 (由 bin/rabbitmq.sh 自动生成,时间: $(date '+%Y-%m-%d %H:%M:%S'))" + echo "MQ_PASSWORD_CONSUMER=\"${consumer_password}\"" + echo "MQ_PASSWORD_OPS=\"${ops_password}\"" + for platform in "${platforms_ref[@]}"; do + local platform_upper=$(echo "$platform" | tr '[:lower:]' '[:upper:]') + echo "MQ_PASSWORD_${platform_upper}=\"${passwords_ref[$platform]}\"" + done + } >> "$ENV_FILE" +} + +# ============================================================================ +# RabbitMQ 操作函数 +# ============================================================================ + +# 创建单个平台的 MQ 配置 +create_platform_config() { + local platform="$1" + local password="$2" + + echo "" + echo -e "${BLUE}处理平台: ${platform}${NC}" + + # 创建平台业务 Exchange rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - delete user --name "$user" - echo "✓ 删除用户: $user" - fi -done + declare exchange --name "${platform}.exchange" --vhost "$VHOST" \ + --type topic --durable true + info "创建业务 Exchange: ${platform}.exchange" -echo "" -echo "========================================" -echo "清理完成,开始重新配置..." -echo "========================================" -echo "" + # 绑定到业务队列 + for dtype in "${DATA_TYPES[@]}"; do + local dtype_singular="${dtype%s}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "${dtype}.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "${dtype_singular}.${platform}" + done + info "绑定到业务队列 (${#DATA_TYPES[@]}个)" -# 1. 创建 VHost -echo "创建 VHost: $VHOST" -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare vhost --name "$VHOST" + # 创建平台错误 Exchange + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \ + --type topic --durable true + info "创建错误 Exchange: ${platform}.errors.exchange" -# 2. 创建主 Exchange(用于重试消息回流) -echo "创建主 Exchange: main.exchange" -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "main.exchange" --vhost "$VHOST" \ - --type topic --durable true + # 绑定到错误队列 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "#" + info "绑定到错误队列" -# 3. 创建 DLX (死信交换机) -echo "创建死信交换机 (DLX)..." -for dtype in "${DATA_TYPES[@]}"; do - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \ - --type topic --durable true - echo "✓ 创建 DLX: dlx.${dtype}" -done + # 创建平台用户 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_${platform}" --password "$password" --tags "" + info "创建用户: user_${platform}" -# 4. 创建主业务队列(带 DLX 配置) -echo "" -echo "创建主业务队列..." -for dtype in "${DATA_TYPES[@]}"; do - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable true \ - --arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" - echo "✓ 创建队列: ${dtype}.queue" -done + # 配置平台用户权限 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_${platform}" \ + --configure "^${platform}\\.(exchange|errors\\.exchange)$" \ + --write "^${platform}\\.(exchange|errors\\.exchange)$" \ + --read "^${platform}\\.errors\\..*$" + info "配置用户权限" +} -# 5. 创建重试队列(带 TTL 和 DLX 回流配置) -echo "" -echo "创建重试队列..." -for dtype in "${DATA_TYPES[@]}"; do - # 去掉末尾的 's' 得到单数形式(orders -> order) - dtype_singular="${dtype%s}" +# 删除单个平台的 MQ 配置 +remove_platform_config() { + local platform="$1" - # 配置回流时的 routing key 为 {type}.retry,可以被 {type}.# 匹配 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \ - --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}" - echo "✓ 创建重试队列: ${dtype}.retry.queue (回流 routing key: ${dtype_singular}.retry)" -done + echo "" + echo -e "${BLUE}移除平台: ${platform}${NC}" -# 6. 创建错误 Exchange 和错误队列 -echo "" -echo "创建错误 Exchange 和错误队列..." -# 创建通用错误 Exchange(供 Consumer 发送错误消息使用) -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "errors.exchange" --vhost "$VHOST" \ - --type topic --durable true -echo "✓ 创建错误 Exchange: errors.exchange" + # 删除用户 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete user --name "user_${platform}" 2>/dev/null || true + info "删除用户: user_${platform}" -# 创建错误队列 -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare queue --name "errors.queue" --vhost "$VHOST" --durable true \ - --arguments '{"x-message-ttl":604800000}' -echo "✓ 创建错误队列: errors.queue" + # 删除 Exchange (会自动删除相关的 Binding) + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete exchange --name "${platform}.exchange" --vhost "$VHOST" 2>/dev/null || true + info "删除 Exchange: ${platform}.exchange" -# 绑定通用错误 Exchange 到错误队列 -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "errors.exchange" --destination "errors.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "#" -echo "✓ 绑定: errors.exchange → errors.queue (routing_key: #)" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete exchange --name "${platform}.errors.exchange" --vhost "$VHOST" 2>/dev/null || true + info "删除 Exchange: ${platform}.errors.exchange" +} -# 7. 绑定主 Exchange 到主队列(使用通配符) -echo "" -echo "绑定主 Exchange 到主队列..." -for dtype in "${DATA_TYPES[@]}"; do - # 使用 order.#, product.# 等通配符匹配所有平台 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "main.exchange" --destination "${dtype}.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#" - echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)" -done +# 创建基础设施 (VHost, 主 Exchange, DLX, 队列等) +create_infrastructure() { + echo "" + echo "========================================" + echo "创建基础设施..." + echo "========================================" -# 8. 绑定 DLX 到重试队列 -echo "" -echo "绑定 DLX 到重试队列..." -for dtype in "${DATA_TYPES[@]}"; do - # DLX -> 重试队列 - # 注意:方案B中,超过重试次数的消息由 Consumer 直接发送到 errors.queue - # 不使用 DLX 的 routing_key="error" 路由,因此不需要绑定到 errors.queue - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "retry" - echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)" -done + # 1. 创建 VHost + echo "" + echo "创建 VHost: $VHOST" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare vhost --name "$VHOST" + info "VHost 创建成功" -# 9. 为每个平台创建 Exchange 和 Binding -echo "" -echo "========================================" -echo "创建平台 Exchange 和 Binding..." -echo "========================================" -for platform in "${PLATFORMS[@]}"; do - echo "" - echo "处理平台: ${platform}" + # 2. 创建主 Exchange + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "main.exchange" --vhost "$VHOST" \ + --type topic --durable true + info "创建主 Exchange: main.exchange" - # 创建平台业务 Exchange - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "${platform}.exchange" --vhost "$VHOST" \ - --type topic --durable true - echo "✓ 创建业务 Exchange: ${platform}.exchange" + # 3. 创建 DLX + echo "" + echo "创建死信交换机 (DLX)..." + for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \ + --type topic --durable true + info "创建 DLX: dlx.${dtype}" + done - # 绑定到业务队列(使用新的 routing key 格式:data_type.platform) - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "${platform}.exchange" --destination "orders.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "order.${platform}" + # 4. 创建主业务队列 + echo "" + echo "创建主业务队列..." + for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable true \ + --arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" + info "创建队列: ${dtype}.queue" + done - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "${platform}.exchange" --destination "products.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "product.${platform}" + # 5. 创建重试队列 + echo "" + echo "创建重试队列..." + for dtype in "${DATA_TYPES[@]}"; do + local dtype_singular="${dtype%s}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \ + --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}" + info "创建重试队列: ${dtype}.retry.queue" + done - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "${platform}.exchange" --destination "refunds.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}" + # 6. 创建错误 Exchange 和队列 + echo "" + echo "创建错误 Exchange 和队列..." + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "errors.exchange" --vhost "$VHOST" \ + --type topic --durable true + info "创建错误 Exchange: errors.exchange" - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "${platform}.exchange" --destination "inventory.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}" - echo "✓ 绑定到业务队列 (4个)" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "errors.queue" --vhost "$VHOST" --durable true \ + --arguments '{"x-message-ttl":604800000}' + info "创建错误队列: errors.queue" - # 创建平台错误 Exchange - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \ - --type topic --durable true - echo "✓ 创建错误 Exchange: ${platform}.errors.exchange" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "#" + info "绑定: errors.exchange → errors.queue" - # 绑定到错误队列 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "#" - echo "✓ 绑定到错误队列" + # 7. 绑定主 Exchange 到主队列 + echo "" + echo "绑定主 Exchange 到主队列..." + for dtype in "${DATA_TYPES[@]}"; do + local dtype_singular="${dtype%s}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "main.exchange" --destination "${dtype}.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "${dtype_singular}.#" + info "绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype_singular}.#)" + done - # 创建平台用户 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user --name "user_${platform}" --password "change_me_${platform}" --tags "" - echo "✓ 创建用户: user_${platform}" + # 8. 绑定 DLX 到重试队列 + echo "" + echo "绑定 DLX 到重试队列..." + for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "retry" + info "绑定: dlx.${dtype} → ${dtype}.retry.queue" + done +} - # 配置平台用户权限 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permissions --vhost "$VHOST" --user "user_${platform}" \ - --configure "^${platform}\\.(exchange|errors\\.exchange)$" \ - --write "^${platform}\\.(exchange|errors\\.exchange)$" \ - --read "^${platform}\\.errors\\..*$" - echo "✓ 配置用户权限" -done +# 创建系统用户 (consumer 和 ops) +create_system_users() { + local consumer_password="$1" + local ops_password="$2" -# 10. 创建 dataflow 消费者用户 -echo "" -echo "========================================" -echo "创建 dataflow 消费者用户..." -echo "========================================" -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags "" -echo "✓ 创建用户: user_dataflow_consumer" + echo "" + echo "========================================" + echo "创建系统用户..." + echo "========================================" -# Consumer 需要完整的权限来声明和消费队列 -# - configure: 允许声明 main.exchange、DLX exchanges、通用 errors.exchange 和所有队列 -# - write: 允许向业务队列写入(ACK/NACK)、DLX、通用 errors.exchange 和平台 errors exchanges -# - read: 允许读取 main.exchange、业务队列和 DLX exchanges -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \ - --configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \ - --write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \ - --read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$" -echo "✓ 配置用户权限 (configure: main.exchange+errors.exchange+DLX+queues, write: business queues+DLX+errors.exchange, read: main.exchange+business queues+DLX)" + # 创建 dataflow 消费者用户 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_dataflow_consumer" --password "$consumer_password" --tags "" + info "创建用户: user_dataflow_consumer" -# 11. 创建运维监控用户 -echo "" -echo "创建运维监控用户..." -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user --name "user_ops" --password "change_me_ops" --tags "" -echo "✓ 创建用户: user_ops" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \ + --configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \ + --write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \ + --read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$" + info "配置 consumer 用户权限" -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permissions --vhost "$VHOST" --user "user_ops" \ - --configure "^errors\\..*$" --write "" --read "^errors\\.queue$" -echo "✓ 配置用户权限" + # 创建运维监控用户 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_ops" --password "$ops_password" --tags "" + info "创建用户: user_ops" -echo "" -echo "========================================" -echo "RabbitMQ 配置完成!" -echo "========================================" -echo "" -echo "已创建:" -echo "- 1 个 VHost: $VHOST" -echo "- 1 个主 Exchange: main.exchange" -echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" -echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" -echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue" -echo "- 1 个错误队列: errors.queue" -echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)" -echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)" -echo "" -echo "提示: 请修改所有用户的默认密码 (change_me_*)" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_ops" \ + --configure "^errors\\..*$" --write "" --read "^errors\\.queue$" + info "配置 ops 用户权限" +} + +# 清理现有配置 +cleanup_existing() { + echo "========================================" + echo "开始清理现有配置..." + echo "========================================" + + # 检查 VHost 是否存在 + echo "" + echo "检查 VHost: $VHOST 是否存在..." + VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list vhosts | grep -w "$VHOST" || true) + + if [[ -n "$VHOST_EXISTS" ]]; then + warn "VHost '$VHOST' 已存在,将删除所有现有配置..." + + # 获取所有平台用户并删除 + local users=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list users name -f tsv 2>/dev/null | grep "^user_" || true) + + for user in $users; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete user --name "$user" 2>/dev/null || true + info "删除用户: $user" + done + + # 删除 VHost + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete vhost --name "$VHOST" + info "VHost '$VHOST' 及其所有资源已删除" + else + echo "VHost '$VHOST' 不存在" + fi +} + +# ============================================================================ +# 主命令函数 +# ============================================================================ + +# init 命令:完全重建 +cmd_init() { + echo -e "${BLUE}========================================" + echo "RabbitMQ 初始化模式" + echo "========================================${NC}" + + # 1. 从数据库获取平台列表 + echo "" + echo "从数据库读取平台列表..." + local db_platforms=$(get_platforms_from_db) + + if [[ -z "$db_platforms" ]]; then + error "无法从数据库获取平台列表,或没有启用的平台" + fi + + # 转换平台名称 + declare -a PLATFORMS=() + declare -A PLATFORM_PASSWORDS=() + + while IFS= read -r platform; do + local normalized=$(normalize_platform_name "$platform") + PLATFORMS+=("$normalized") + PLATFORM_PASSWORDS["$normalized"]=$(generate_password) + done <<< "$db_platforms" + + echo "找到 ${#PLATFORMS[@]} 个启用的平台:" + printf " - %s\n" "${PLATFORMS[@]}" + + # 2. 清理现有配置 + cleanup_existing + + # 3. 创建基础设施 + create_infrastructure + + # 4. 生成系统用户密码 + local consumer_password=$(generate_password) + local ops_password=$(generate_password) + + # 5. 创建系统用户 + create_system_users "$consumer_password" "$ops_password" + + # 6. 为每个平台创建配置 + echo "" + echo "========================================" + echo "创建平台配置..." + echo "========================================" + + for platform in "${PLATFORMS[@]}"; do + create_platform_config "$platform" "${PLATFORM_PASSWORDS[$platform]}" + done + + # 7. 写入密码到 .env + echo "" + echo "========================================" + echo "写入密码到 .env..." + echo "========================================" + write_all_passwords_to_env PLATFORMS PLATFORM_PASSWORDS "$consumer_password" "$ops_password" + info "密码已写入: $ENV_FILE" + + # 8. 输出摘要 + echo "" + echo -e "${GREEN}========================================" + echo "RabbitMQ 配置完成!" + echo "========================================${NC}" + echo "" + echo "已创建:" + echo "- 1 个 VHost: $VHOST" + echo "- 1 个主 Exchange: main.exchange" + echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" + echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" + echo "- 4 个重试队列: *.retry.queue" + echo "- 1 个错误队列: errors.queue" + echo "- ${#PLATFORMS[@]} 个平台配置" + echo "- $((${#PLATFORMS[@]} + 2)) 个用户" + echo "" + echo "密码配置: $ENV_FILE (MQ_PASSWORD_* 变量)" + echo "用户配置: $MQ_USER_CONFIG (动态从数据库读取)" +} + +# add 命令:添加单个平台 +cmd_add() { + local platform_input="$1" + + if [[ -z "$platform_input" ]]; then + error "请指定要添加的平台名称,例如: $0 add shopee" + fi + + local platform=$(normalize_platform_name "$platform_input") + + echo -e "${BLUE}========================================" + echo "添加平台: $platform" + echo "========================================${NC}" + + # 检查 VHost 是否存在 + VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list vhosts | grep -w "$VHOST" || true) + + if [[ -z "$VHOST_EXISTS" ]]; then + error "VHost '$VHOST' 不存在,请先运行 '$0 init' 初始化" + fi + + # 检查平台是否已存在 + local exchange_exists=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list exchanges --vhost "$VHOST" | grep -w "${platform}.exchange" || true) + + if [[ -n "$exchange_exists" ]]; then + error "平台 '$platform' 已存在" + fi + + # 生成密码 + local password=$(generate_password) + + # 创建平台配置 + create_platform_config "$platform" "$password" + + # 写入密码到 .env + local platform_upper=$(echo "$platform" | tr '[:lower:]' '[:upper:]') + add_password_to_env "$platform_upper" "$password" + + echo "" + echo -e "${GREEN}========================================" + echo "平台 $platform 添加成功!" + echo "========================================${NC}" + echo "" + echo "用户: user_${platform}" + echo "密码已写入 .env: MQ_PASSWORD_${platform_upper}" +} + +# remove 命令:移除单个平台 +cmd_remove() { + local platform_input="$1" + + if [[ -z "$platform_input" ]]; then + error "请指定要移除的平台名称,例如: $0 remove shopee" + fi + + local platform=$(normalize_platform_name "$platform_input") + + echo -e "${BLUE}========================================" + echo "移除平台: $platform" + echo "========================================${NC}" + + # 移除 MQ 配置 + remove_platform_config "$platform" + + # 从 .env 中移除密码 + local platform_upper=$(echo "$platform" | tr '[:lower:]' '[:upper:]') + remove_password_from_env "$platform_upper" + + echo "" + echo -e "${GREEN}========================================" + echo "平台 $platform 移除成功!" + echo "========================================${NC}" + echo "" + echo "已从 .env 移除: MQ_PASSWORD_${platform_upper}" +} + +# list 命令:列出已配置的平台 +cmd_list() { + echo -e "${BLUE}========================================" + echo "已配置的平台列表" + echo "========================================${NC}" + + # 检查 VHost 是否存在 + VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list vhosts 2>/dev/null | grep -w "$VHOST" || true) + + if [[ -z "$VHOST_EXISTS" ]]; then + warn "VHost '$VHOST' 不存在" + return + fi + + # 获取所有平台 Exchange (排除系统 Exchange) + echo "" + echo "MQ 中的平台:" + local exchanges=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + list exchanges --vhost "$VHOST" name -f tsv 2>/dev/null | grep -E "^[a-z0-9_]+\.exchange$" | grep -v "^main\." | grep -v "^errors\." | sed 's/\.exchange$//' | sort) + + if [[ -n "$exchanges" ]]; then + echo "$exchanges" | while read platform; do + echo " - $platform" + done + else + echo " (无)" + fi + + # 显示配置文件中的平台 + if [[ -f "$MQ_USER_CONFIG" ]]; then + echo "" + echo "配置文件中的平台:" + php -r " + \$config = include '$MQ_USER_CONFIG'; + foreach (\$config['platforms'] ?? [] as \$name => \$data) { + echo \" - \$name\n\"; + } + " 2>/dev/null || echo " (无法读取配置文件)" + fi +} + +# ============================================================================ +# 主入口 +# ============================================================================ + +main() { + local command="${1:-}" + + # 显示帮助不需要检测依赖 + if [[ "$command" != "--help" && "$command" != "-h" && "$command" != "help" && -n "$command" ]]; then + check_dependencies + fi + + case "$command" in + init) + cmd_init + ;; + add) + cmd_add "$2" + ;; + remove) + cmd_remove "$2" + ;; + list) + cmd_list + ;; + --help|-h|help|"") + show_help + ;; + *) + error "未知命令: $command\n使用 '$0 --help' 查看帮助" + ;; + esac +} + +main "$@"