fix:修复rabbitmq.sh脚本执行后没有队列的问题
This commit is contained in:
+328
-138
@@ -1,25 +1,36 @@
|
||||
#!/bin/bash
|
||||
|
||||
# ============================================================================
|
||||
# RabbitMQ 配置管理脚本
|
||||
# RabbitMQ 配置管理脚本 (适配 datahub-backend 容器环境)
|
||||
# 支持从数据库读取平台列表,自动配置 Exchange、Queue、Binding 和用户
|
||||
# 通过 RabbitMQ Management HTTP API (curl) 操作,无需 rabbitmqadmin
|
||||
# ============================================================================
|
||||
|
||||
set -e
|
||||
|
||||
# 脚本所在目录
|
||||
# 脚本所在目录 (容器内路径: /opt/www)
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
BACKEND_DIR="$(dirname "$SCRIPT_DIR")"
|
||||
ENV_FILE="$BACKEND_DIR/.env"
|
||||
MQ_USER_CONFIG="$BACKEND_DIR/config/autoload/mq_user.php"
|
||||
|
||||
# RabbitMQ 连接信息
|
||||
RABBITMQ_HOST="127.0.0.1"
|
||||
# .env 密码文件输出路径
|
||||
# 容器内 /mnt/share 映射到宿主机 /var/container/share
|
||||
# 执行完成后需手动合并到宿主机 /opt/datahub/backend/.env
|
||||
ENV_FILE="/mnt/share/rabbitmq_passwords.env"
|
||||
|
||||
# RabbitMQ 连接信息 (容器间通过容器名通信)
|
||||
RABBITMQ_HOST="datahub-rabbitmq"
|
||||
RABBITMQ_PORT="15672"
|
||||
RABBITMQ_USER="admin"
|
||||
RABBITMQ_PASS="admin"
|
||||
RABBITMQ_USER="user"
|
||||
RABBITMQ_PASS="hub123456"
|
||||
VHOST="datahub"
|
||||
|
||||
# URL 编码的 VHost (用于 HTTP API 路径)
|
||||
VHOST_ENCODED=$(php -r "echo rawurlencode('$VHOST');" 2>/dev/null || echo "$VHOST")
|
||||
|
||||
# RabbitMQ API 基础 URL
|
||||
MQ_API="http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api"
|
||||
|
||||
# 数据类型列表
|
||||
DATA_TYPES=("orders" "products" "refunds" "inventory")
|
||||
|
||||
@@ -36,7 +47,7 @@ NC='\033[0m' # No Color
|
||||
show_help() {
|
||||
# 使用 printf '%b' 解释 ANSI 转义序列
|
||||
printf '%b\n' "$(cat << EOF
|
||||
${BLUE}RabbitMQ 配置管理脚本${NC}
|
||||
${BLUE}RabbitMQ 配置管理脚本 (容器版)${NC}
|
||||
|
||||
${YELLOW}用法:${NC}
|
||||
$0 <command> [options]
|
||||
@@ -72,7 +83,7 @@ ${YELLOW}命令:${NC}
|
||||
${GREEN}--help, -h${NC} 显示此帮助信息
|
||||
|
||||
${YELLOW}配置文件:${NC}
|
||||
环境配置: $ENV_FILE (包含 MQ_PASSWORD_* 密码变量)
|
||||
密码输出: /mnt/share/rabbitmq_passwords.env (宿主机 /var/container/share/)
|
||||
用户配置: $MQ_USER_CONFIG (动态从数据库读取平台列表)
|
||||
|
||||
${YELLOW}平台名称转换规则:${NC}
|
||||
@@ -138,9 +149,9 @@ check_dependencies() {
|
||||
in_container=true
|
||||
fi
|
||||
|
||||
# 检查 rabbitmqadmin
|
||||
if ! command -v rabbitmqadmin &> /dev/null; then
|
||||
missing+=("rabbitmqadmin")
|
||||
# 检查 curl (用于调用 RabbitMQ HTTP API)
|
||||
if ! command -v curl &> /dev/null; then
|
||||
missing+=("curl")
|
||||
fi
|
||||
|
||||
# 检查 php 及必要扩展
|
||||
@@ -168,23 +179,15 @@ check_dependencies() {
|
||||
|
||||
for tool in "${missing[@]}"; do
|
||||
case "$tool" in
|
||||
rabbitmqadmin)
|
||||
echo -e "${YELLOW}rabbitmqadmin${NC} - RabbitMQ 管理工具"
|
||||
curl)
|
||||
echo -e "${YELLOW}curl${NC} - HTTP 请求工具 (用于调用 RabbitMQ Management API)"
|
||||
if $in_container; then
|
||||
echo " Dockerfile/Containerfile 示例:"
|
||||
echo " RUN wget -O /usr/local/bin/rabbitmqadmin \\"
|
||||
echo " http://rabbitmq:15672/cli/rabbitmqadmin && \\"
|
||||
echo " chmod +x /usr/local/bin/rabbitmqadmin"
|
||||
echo " 或使用 pip:"
|
||||
echo " RUN pip install rabbitmqadmin"
|
||||
echo " Alpine: apk add curl"
|
||||
echo " Debian: apt-get install curl"
|
||||
else
|
||||
echo " 安装方式:"
|
||||
echo " 1. 从 RabbitMQ 管理界面下载:"
|
||||
echo " wget http://localhost:15672/cli/rabbitmqadmin"
|
||||
echo " chmod +x rabbitmqadmin"
|
||||
echo " sudo mv rabbitmqadmin /usr/local/bin/"
|
||||
echo " 2. 或者使用 pip:"
|
||||
echo " pip install rabbitmqadmin"
|
||||
echo " Ubuntu/Debian: sudo apt install curl"
|
||||
echo " CentOS/RHEL: sudo yum install curl"
|
||||
fi
|
||||
echo ""
|
||||
;;
|
||||
@@ -222,7 +225,6 @@ check_dependencies() {
|
||||
echo " 安装方式:"
|
||||
echo " Ubuntu/Debian: sudo apt install php-sodium"
|
||||
echo " CentOS/RHEL: sudo yum install php-sodium"
|
||||
echo " 注意: PHP 7.2+ 默认包含 sodium 扩展"
|
||||
fi
|
||||
echo ""
|
||||
;;
|
||||
@@ -230,6 +232,160 @@ check_dependencies() {
|
||||
done
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 检查 RabbitMQ Management API 是否可达
|
||||
local http_code
|
||||
http_code=$(curl -s -o /dev/null -w "%{http_code}" -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \
|
||||
"${MQ_API}/overview" 2>/dev/null || echo "000")
|
||||
|
||||
if [[ "$http_code" == "000" ]]; then
|
||||
error "无法连接到 RabbitMQ Management API: ${MQ_API}\n请确认 datahub-rabbitmq 容器正在运行且 management 插件已启用"
|
||||
elif [[ "$http_code" == "401" ]]; then
|
||||
error "RabbitMQ 认证失败,请检查用户名和密码"
|
||||
elif [[ "$http_code" != "200" ]]; then
|
||||
error "RabbitMQ Management API 返回异常状态码: ${http_code}"
|
||||
fi
|
||||
}
|
||||
|
||||
# ============================================================================
|
||||
# RabbitMQ HTTP API 封装函数
|
||||
# ============================================================================
|
||||
|
||||
# 通用 API 请求函数
|
||||
# 用法: mq_api_call <METHOD> <PATH> [JSON_DATA]
|
||||
# 写操作 (PUT/POST/DELETE) 自动丢弃响应体,GET 保留输出
|
||||
mq_api_call() {
|
||||
local method="$1"
|
||||
local path="$2"
|
||||
local data="${3:-}"
|
||||
|
||||
if [[ "$method" != "GET" ]]; then
|
||||
if [[ -n "$data" ]]; then
|
||||
curl -s -o /dev/null -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \
|
||||
-H "content-type: application/json" \
|
||||
-X "$method" \
|
||||
-d "$data" \
|
||||
"${MQ_API}${path}"
|
||||
else
|
||||
curl -s -o /dev/null -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \
|
||||
-H "content-type: application/json" \
|
||||
-X "$method" \
|
||||
"${MQ_API}${path}"
|
||||
fi
|
||||
else
|
||||
if [[ -n "$data" ]]; then
|
||||
curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \
|
||||
-H "content-type: application/json" \
|
||||
-X "$method" \
|
||||
-d "$data" \
|
||||
"${MQ_API}${path}"
|
||||
else
|
||||
curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" \
|
||||
-H "content-type: application/json" \
|
||||
-X "$method" \
|
||||
"${MQ_API}${path}"
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
# 声明 VHost
|
||||
mq_declare_vhost() {
|
||||
local name="$1"
|
||||
local encoded
|
||||
encoded=$(php -r "echo rawurlencode('$name');" 2>/dev/null || echo "$name")
|
||||
mq_api_call PUT "/vhosts/${encoded}" '{}'
|
||||
}
|
||||
|
||||
# 删除 VHost
|
||||
mq_delete_vhost() {
|
||||
local name="$1"
|
||||
local encoded
|
||||
encoded=$(php -r "echo rawurlencode('$name');" 2>/dev/null || echo "$name")
|
||||
mq_api_call DELETE "/vhosts/${encoded}" 2>/dev/null || true
|
||||
}
|
||||
|
||||
# 列出 VHosts
|
||||
mq_list_vhosts() {
|
||||
mq_api_call GET "/vhosts" 2>/dev/null
|
||||
}
|
||||
|
||||
# 声明 Exchange
|
||||
mq_declare_exchange() {
|
||||
local name="$1"
|
||||
local type="${2:-topic}"
|
||||
local durable="${3:-true}"
|
||||
local body
|
||||
body=$(printf '{"type":"%s","durable":%s}' "$type" "$durable")
|
||||
mq_api_call PUT "/exchanges/${VHOST_ENCODED}/${name}" "$body"
|
||||
}
|
||||
|
||||
# 删除 Exchange
|
||||
mq_delete_exchange() {
|
||||
local name="$1"
|
||||
mq_api_call DELETE "/exchanges/${VHOST_ENCODED}/${name}" 2>/dev/null || true
|
||||
}
|
||||
|
||||
# 声明 Queue
|
||||
mq_declare_queue() {
|
||||
local name="$1"
|
||||
local arguments="$2"
|
||||
if [[ -z "$arguments" ]]; then
|
||||
arguments='{}'
|
||||
fi
|
||||
local body
|
||||
body=$(printf '{"durable":true,"arguments":%s}' "$arguments")
|
||||
mq_api_call PUT "/queues/${VHOST_ENCODED}/${name}" "$body"
|
||||
}
|
||||
|
||||
# 声明 Binding
|
||||
mq_declare_binding() {
|
||||
local source="$1"
|
||||
local destination="$2"
|
||||
local routing_key="$3"
|
||||
local dest_type="${4:-queue}"
|
||||
|
||||
local dest_char="q"
|
||||
if [[ "$dest_type" == "exchange" ]]; then
|
||||
dest_char="e"
|
||||
fi
|
||||
|
||||
mq_api_call POST "/bindings/${VHOST_ENCODED}/e/${source}/${dest_char}/${destination}" \
|
||||
"$(printf '{"routing_key":"%s"}' "$routing_key")"
|
||||
}
|
||||
|
||||
# 声明用户
|
||||
mq_declare_user() {
|
||||
local name="$1"
|
||||
local password="$2"
|
||||
local tags="${3:-}"
|
||||
mq_api_call PUT "/users/${name}" \
|
||||
"$(printf '{"password":"%s","tags":"%s"}' "$password" "$tags")"
|
||||
}
|
||||
|
||||
# 删除用户
|
||||
mq_delete_user() {
|
||||
local name="$1"
|
||||
mq_api_call DELETE "/users/${name}" 2>/dev/null || true
|
||||
}
|
||||
|
||||
# 列出用户
|
||||
mq_list_users() {
|
||||
mq_api_call GET "/users" 2>/dev/null
|
||||
}
|
||||
|
||||
# 设置用户权限
|
||||
mq_set_permissions() {
|
||||
local user="$1"
|
||||
local configure="$2"
|
||||
local write="$3"
|
||||
local read="$4"
|
||||
mq_api_call PUT "/permissions/${VHOST_ENCODED}/${user}" \
|
||||
"$(printf '{"configure":"%s","write":"%s","read":"%s"}' "$configure" "$write" "$read")"
|
||||
}
|
||||
|
||||
# 列出 Exchanges
|
||||
mq_list_exchanges() {
|
||||
mq_api_call GET "/exchanges/${VHOST_ENCODED}" 2>/dev/null
|
||||
}
|
||||
|
||||
# ============================================================================
|
||||
@@ -250,21 +406,17 @@ error() {
|
||||
exit 1
|
||||
}
|
||||
|
||||
# 从 .env 文件读取配置
|
||||
# 从容器环境变量读取数据库配置
|
||||
load_env() {
|
||||
if [[ ! -f "$ENV_FILE" ]]; then
|
||||
error "找不到 .env 文件: $ENV_FILE"
|
||||
fi
|
||||
|
||||
# 读取数据库配置
|
||||
DB_HOST=$(grep -E "^DB_HOST=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'")
|
||||
DB_PORT=$(grep -E "^DB_PORT=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'")
|
||||
DB_DATABASE=$(grep -E "^DB_DATABASE=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'")
|
||||
DB_USERNAME=$(grep -E "^DB_USERNAME=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'")
|
||||
DB_PASSWORD=$(grep -E "^DB_PASSWORD=" "$ENV_FILE" | cut -d'=' -f2 | tr -d '"' | tr -d "'")
|
||||
# 容器内通过环境变量注入,直接读取
|
||||
DB_HOST="${DB_HOST:-}"
|
||||
DB_PORT="${DB_PORT:-5432}"
|
||||
DB_DATABASE="${DB_DATABASE:-}"
|
||||
DB_USERNAME="${DB_USERNAME:-}"
|
||||
DB_PASSWORD="${DB_PASSWORD:-}"
|
||||
|
||||
if [[ -z "$DB_HOST" || -z "$DB_DATABASE" ]]; then
|
||||
error "无法从 .env 文件读取数据库配置"
|
||||
error "无法读取数据库配置,请确认容器环境变量 DB_HOST, DB_DATABASE 等已设置"
|
||||
fi
|
||||
}
|
||||
|
||||
@@ -405,43 +557,33 @@ create_platform_config() {
|
||||
echo -e "${BLUE}处理平台: ${platform}${NC}"
|
||||
|
||||
# 创建平台业务 Exchange
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare exchange --name "${platform}.exchange" --vhost "$VHOST" \
|
||||
--type topic --durable true
|
||||
mq_declare_exchange "${platform}.exchange" "topic" "true"
|
||||
info "创建业务 Exchange: ${platform}.exchange"
|
||||
|
||||
# 绑定到业务队列
|
||||
for dtype in "${DATA_TYPES[@]}"; do
|
||||
local dtype_singular="${dtype%s}"
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare binding --source "${platform}.exchange" --destination "${dtype}.queue" \
|
||||
--destination-type queue --vhost "$VHOST" --routing-key "${dtype_singular}.${platform}"
|
||||
mq_declare_binding "${platform}.exchange" "${dtype}.queue" "${dtype_singular}.${platform}"
|
||||
done
|
||||
info "绑定到业务队列 (${#DATA_TYPES[@]}个)"
|
||||
|
||||
# 创建平台错误 Exchange
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \
|
||||
--type topic --durable true
|
||||
mq_declare_exchange "${platform}.errors.exchange" "topic" "true"
|
||||
info "创建错误 Exchange: ${platform}.errors.exchange"
|
||||
|
||||
# 绑定到错误队列
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \
|
||||
--destination-type queue --vhost "$VHOST" --routing-key "#"
|
||||
mq_declare_binding "${platform}.errors.exchange" "errors.queue" "#"
|
||||
info "绑定到错误队列"
|
||||
|
||||
# 创建平台用户
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare user --name "user_${platform}" --password "$password" --tags ""
|
||||
mq_declare_user "user_${platform}" "$password" ""
|
||||
info "创建用户: user_${platform}"
|
||||
|
||||
# 配置平台用户权限
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare permissions --vhost "$VHOST" --user "user_${platform}" \
|
||||
--configure "^${platform}\\.(exchange|errors\\.exchange)$" \
|
||||
--write "^${platform}\\.(exchange|errors\\.exchange)$" \
|
||||
--read "^${platform}\\.errors\\..*$"
|
||||
mq_set_permissions "user_${platform}" \
|
||||
"^${platform}\\.(exchange|errors\\.exchange)$" \
|
||||
"^${platform}\\.(exchange|errors\\.exchange)$" \
|
||||
"^${platform}\\.errors\\..*$"
|
||||
info "配置用户权限"
|
||||
}
|
||||
|
||||
@@ -453,17 +595,14 @@ remove_platform_config() {
|
||||
echo -e "${BLUE}移除平台: ${platform}${NC}"
|
||||
|
||||
# 删除用户
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
delete user --name "user_${platform}" 2>/dev/null || true
|
||||
mq_delete_user "user_${platform}"
|
||||
info "删除用户: user_${platform}"
|
||||
|
||||
# 删除 Exchange (会自动删除相关的 Binding)
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
delete exchange --name "${platform}.exchange" --vhost "$VHOST" 2>/dev/null || true
|
||||
mq_delete_exchange "${platform}.exchange"
|
||||
info "删除 Exchange: ${platform}.exchange"
|
||||
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
delete exchange --name "${platform}.errors.exchange" --vhost "$VHOST" 2>/dev/null || true
|
||||
mq_delete_exchange "${platform}.errors.exchange"
|
||||
info "删除 Exchange: ${platform}.errors.exchange"
|
||||
}
|
||||
|
||||
@@ -477,23 +616,18 @@ create_infrastructure() {
|
||||
# 1. 创建 VHost
|
||||
echo ""
|
||||
echo "创建 VHost: $VHOST"
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare vhost --name "$VHOST"
|
||||
mq_declare_vhost "$VHOST"
|
||||
info "VHost 创建成功"
|
||||
|
||||
# 2. 创建主 Exchange
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare exchange --name "main.exchange" --vhost "$VHOST" \
|
||||
--type topic --durable true
|
||||
mq_declare_exchange "main.exchange" "topic" "true"
|
||||
info "创建主 Exchange: main.exchange"
|
||||
|
||||
# 3. 创建 DLX
|
||||
echo ""
|
||||
echo "创建死信交换机 (DLX)..."
|
||||
for dtype in "${DATA_TYPES[@]}"; do
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \
|
||||
--type topic --durable true
|
||||
mq_declare_exchange "dlx.${dtype}" "topic" "true"
|
||||
info "创建 DLX: dlx.${dtype}"
|
||||
done
|
||||
|
||||
@@ -501,9 +635,8 @@ create_infrastructure() {
|
||||
echo ""
|
||||
echo "创建主业务队列..."
|
||||
for dtype in "${DATA_TYPES[@]}"; do
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable true \
|
||||
--arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
|
||||
mq_declare_queue "${dtype}.queue" \
|
||||
'{"x-message-ttl":86400000,"x-dead-letter-exchange":"dlx.'"${dtype}"'","x-dead-letter-routing-key":"retry"}'
|
||||
info "创建队列: ${dtype}.queue"
|
||||
done
|
||||
|
||||
@@ -512,28 +645,21 @@ create_infrastructure() {
|
||||
echo "创建重试队列..."
|
||||
for dtype in "${DATA_TYPES[@]}"; do
|
||||
local dtype_singular="${dtype%s}"
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \
|
||||
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}"
|
||||
mq_declare_queue "${dtype}.retry.queue" \
|
||||
'{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"'"${dtype_singular}"'.retry"}'
|
||||
info "创建重试队列: ${dtype}.retry.queue"
|
||||
done
|
||||
|
||||
# 6. 创建错误 Exchange 和队列
|
||||
echo ""
|
||||
echo "创建错误 Exchange 和队列..."
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare exchange --name "errors.exchange" --vhost "$VHOST" \
|
||||
--type topic --durable true
|
||||
mq_declare_exchange "errors.exchange" "topic" "true"
|
||||
info "创建错误 Exchange: errors.exchange"
|
||||
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare queue --name "errors.queue" --vhost "$VHOST" --durable true \
|
||||
--arguments '{"x-message-ttl":604800000}'
|
||||
mq_declare_queue "errors.queue" '{"x-message-ttl":604800000}'
|
||||
info "创建错误队列: errors.queue"
|
||||
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare binding --source "errors.exchange" --destination "errors.queue" \
|
||||
--destination-type queue --vhost "$VHOST" --routing-key "#"
|
||||
mq_declare_binding "errors.exchange" "errors.queue" "#"
|
||||
info "绑定: errors.exchange → errors.queue"
|
||||
|
||||
# 7. 绑定主 Exchange 到主队列
|
||||
@@ -541,9 +667,7 @@ create_infrastructure() {
|
||||
echo "绑定主 Exchange 到主队列..."
|
||||
for dtype in "${DATA_TYPES[@]}"; do
|
||||
local dtype_singular="${dtype%s}"
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare binding --source "main.exchange" --destination "${dtype}.queue" \
|
||||
--destination-type queue --vhost "$VHOST" --routing-key "${dtype_singular}.#"
|
||||
mq_declare_binding "main.exchange" "${dtype}.queue" "${dtype_singular}.#"
|
||||
info "绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype_singular}.#)"
|
||||
done
|
||||
|
||||
@@ -551,9 +675,7 @@ create_infrastructure() {
|
||||
echo ""
|
||||
echo "绑定 DLX 到重试队列..."
|
||||
for dtype in "${DATA_TYPES[@]}"; do
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
|
||||
--destination-type queue --vhost "$VHOST" --routing-key "retry"
|
||||
mq_declare_binding "dlx.${dtype}" "${dtype}.retry.queue" "retry"
|
||||
info "绑定: dlx.${dtype} → ${dtype}.retry.queue"
|
||||
done
|
||||
}
|
||||
@@ -569,25 +691,21 @@ create_system_users() {
|
||||
echo "========================================"
|
||||
|
||||
# 创建 datahub 消费者用户
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare user --name "user_datahub_consumer" --password "$consumer_password" --tags ""
|
||||
mq_declare_user "user_datahub_consumer" "$consumer_password" ""
|
||||
info "创建用户: user_datahub_consumer"
|
||||
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare permissions --vhost "$VHOST" --user "user_datahub_consumer" \
|
||||
--configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \
|
||||
--write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \
|
||||
--read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$"
|
||||
mq_set_permissions "user_datahub_consumer" \
|
||||
"^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \
|
||||
"^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \
|
||||
"^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$"
|
||||
info "配置 consumer 用户权限"
|
||||
|
||||
# 创建运维监控用户
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare user --name "user_ops" --password "$ops_password" --tags ""
|
||||
mq_declare_user "user_ops" "$ops_password" ""
|
||||
info "创建用户: user_ops"
|
||||
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare permissions --vhost "$VHOST" --user "user_ops" \
|
||||
--configure "^errors\\..*$" --write "" --read "^errors\\.queue$"
|
||||
mq_set_permissions "user_ops" \
|
||||
"^errors\\..*$" "" "^errors\\.queue$"
|
||||
info "配置 ops 用户权限"
|
||||
}
|
||||
|
||||
@@ -600,25 +718,41 @@ cleanup_existing() {
|
||||
# 检查 VHost 是否存在
|
||||
echo ""
|
||||
echo "检查 VHost: $VHOST 是否存在..."
|
||||
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
list vhosts | grep -w "$VHOST" || true)
|
||||
local vhost_list
|
||||
vhost_list=$(mq_list_vhosts)
|
||||
local vhost_exists
|
||||
vhost_exists=$(echo "$vhost_list" | php -r '
|
||||
$data = json_decode(file_get_contents("php://stdin"), true);
|
||||
foreach ($data ?? [] as $v) {
|
||||
if ($v["name"] === "'"$VHOST"'") { echo "yes"; exit; }
|
||||
}
|
||||
' 2>/dev/null)
|
||||
|
||||
if [[ -n "$VHOST_EXISTS" ]]; then
|
||||
if [[ "$vhost_exists" == "yes" ]]; then
|
||||
warn "VHost '$VHOST' 已存在,将删除所有现有配置..."
|
||||
|
||||
# 获取所有平台用户并删除
|
||||
local users=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
list users name -f tsv 2>/dev/null | grep "^user_" || true)
|
||||
local users_json
|
||||
users_json=$(mq_list_users)
|
||||
local users
|
||||
users=$(echo "$users_json" | php -r '
|
||||
$data = json_decode(file_get_contents("php://stdin"), true);
|
||||
foreach ($data ?? [] as $u) {
|
||||
if (strpos($u["name"], "user_") === 0) {
|
||||
echo $u["name"] . "\n";
|
||||
}
|
||||
}
|
||||
' 2>/dev/null)
|
||||
|
||||
for user in $users; do
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
delete user --name "$user" 2>/dev/null || true
|
||||
info "删除用户: $user"
|
||||
done
|
||||
while IFS= read -r user; do
|
||||
if [[ -n "$user" ]]; then
|
||||
mq_delete_user "$user"
|
||||
info "删除用户: $user"
|
||||
fi
|
||||
done <<< "$users"
|
||||
|
||||
# 删除 VHost
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
delete vhost --name "$VHOST"
|
||||
mq_delete_vhost "$VHOST"
|
||||
info "VHost '$VHOST' 及其所有资源已删除"
|
||||
else
|
||||
echo "VHost '$VHOST' 不存在"
|
||||
@@ -704,7 +838,9 @@ cmd_init() {
|
||||
echo "- ${#PLATFORMS[@]} 个平台配置"
|
||||
echo "- $((${#PLATFORMS[@]} + 2)) 个用户"
|
||||
echo ""
|
||||
echo "密码配置: $ENV_FILE (MQ_PASSWORD_* 变量)"
|
||||
echo "密码配置: $ENV_FILE"
|
||||
echo " (宿主机路径: /var/container/share/rabbitmq_passwords.env)"
|
||||
echo " 请手动将密码合并到宿主机 /opt/datahub/backend/.env"
|
||||
echo "用户配置: $MQ_USER_CONFIG (动态从数据库读取)"
|
||||
}
|
||||
|
||||
@@ -723,18 +859,32 @@ cmd_add() {
|
||||
echo "========================================${NC}"
|
||||
|
||||
# 检查 VHost 是否存在
|
||||
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
list vhosts | grep -w "$VHOST" || true)
|
||||
local vhost_list
|
||||
vhost_list=$(mq_list_vhosts)
|
||||
local vhost_exists
|
||||
vhost_exists=$(echo "$vhost_list" | php -r '
|
||||
$data = json_decode(file_get_contents("php://stdin"), true);
|
||||
foreach ($data ?? [] as $v) {
|
||||
if ($v["name"] === "'"$VHOST"'") { echo "yes"; exit; }
|
||||
}
|
||||
' 2>/dev/null)
|
||||
|
||||
if [[ -z "$VHOST_EXISTS" ]]; then
|
||||
if [[ "$vhost_exists" != "yes" ]]; then
|
||||
error "VHost '$VHOST' 不存在,请先运行 '$0 init' 初始化"
|
||||
fi
|
||||
|
||||
# 检查平台是否已存在
|
||||
local exchange_exists=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
list exchanges --vhost "$VHOST" | grep -w "${platform}.exchange" || true)
|
||||
local exchanges_json
|
||||
exchanges_json=$(mq_list_exchanges)
|
||||
local exchange_exists
|
||||
exchange_exists=$(echo "$exchanges_json" | php -r '
|
||||
$data = json_decode(file_get_contents("php://stdin"), true);
|
||||
foreach ($data ?? [] as $e) {
|
||||
if ($e["name"] === "'"${platform}.exchange"'") { echo "yes"; exit; }
|
||||
}
|
||||
' 2>/dev/null)
|
||||
|
||||
if [[ -n "$exchange_exists" ]]; then
|
||||
if [[ "$exchange_exists" == "yes" ]]; then
|
||||
error "平台 '$platform' 已存在"
|
||||
fi
|
||||
|
||||
@@ -833,17 +983,23 @@ cmd_reset_password() {
|
||||
esac
|
||||
|
||||
# 检查用户是否存在
|
||||
local user_exists=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
list users 2>/dev/null | grep -w "${mq_user}" || true)
|
||||
local users_json
|
||||
users_json=$(mq_list_users)
|
||||
local user_exists
|
||||
user_exists=$(echo "$users_json" | php -r '
|
||||
$data = json_decode(file_get_contents("php://stdin"), true);
|
||||
foreach ($data ?? [] as $u) {
|
||||
if ($u["name"] === "'"${mq_user}"'") { echo "yes"; exit; }
|
||||
}
|
||||
' 2>/dev/null)
|
||||
|
||||
if [[ -z "$user_exists" ]]; then
|
||||
if [[ "$user_exists" != "yes" ]]; then
|
||||
error "用户 '$mq_user' 不存在"
|
||||
fi
|
||||
|
||||
# 更新 RabbitMQ 用户密码
|
||||
echo ""
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
declare user --name "$mq_user" --password "$new_password" --tags ""
|
||||
mq_declare_user "$mq_user" "$new_password" ""
|
||||
info "RabbitMQ 用户密码已更新: $mq_user"
|
||||
|
||||
# 更新 .env 文件
|
||||
@@ -868,10 +1024,17 @@ cmd_list() {
|
||||
echo "========================================${NC}"
|
||||
|
||||
# 检查 VHost 是否存在
|
||||
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
list vhosts 2>/dev/null | grep -w "$VHOST" || true)
|
||||
local vhost_list
|
||||
vhost_list=$(mq_list_vhosts)
|
||||
local vhost_exists
|
||||
vhost_exists=$(echo "$vhost_list" | php -r '
|
||||
$data = json_decode(file_get_contents("php://stdin"), true);
|
||||
foreach ($data ?? [] as $v) {
|
||||
if ($v["name"] === "'"$VHOST"'") { echo "yes"; exit; }
|
||||
}
|
||||
' 2>/dev/null)
|
||||
|
||||
if [[ -z "$VHOST_EXISTS" ]]; then
|
||||
if [[ "$vhost_exists" != "yes" ]]; then
|
||||
warn "VHost '$VHOST' 不存在"
|
||||
return
|
||||
fi
|
||||
@@ -879,11 +1042,26 @@ cmd_list() {
|
||||
# 获取所有平台 Exchange (排除系统 Exchange)
|
||||
echo ""
|
||||
echo "MQ 中的平台:"
|
||||
local exchanges=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
list exchanges --vhost "$VHOST" name -f tsv 2>/dev/null | grep -E "^[a-z0-9_]+\.exchange$" | grep -v "^main\." | grep -v "^errors\." | sed 's/\.exchange$//' | sort)
|
||||
local exchanges_json
|
||||
exchanges_json=$(mq_list_exchanges)
|
||||
local platforms
|
||||
platforms=$(echo "$exchanges_json" | php -r '
|
||||
$data = json_decode(file_get_contents("php://stdin"), true);
|
||||
foreach ($data ?? [] as $e) {
|
||||
$name = $e["name"];
|
||||
// 匹配 xxx.exchange 但排除 main.exchange, errors.exchange, xxx.errors.exchange, dlx.xxx
|
||||
if (preg_match("/^([a-z0-9_]+)\.exchange$/", $name, $m)
|
||||
&& $m[1] !== "main"
|
||||
&& $m[1] !== "errors"
|
||||
&& !str_contains($name, ".errors.exchange")
|
||||
&& !str_starts_with($name, "dlx.")) {
|
||||
echo $m[1] . "\n";
|
||||
}
|
||||
}
|
||||
' 2>/dev/null | sort)
|
||||
|
||||
if [[ -n "$exchanges" ]]; then
|
||||
echo "$exchanges" | while read platform; do
|
||||
if [[ -n "$platforms" ]]; then
|
||||
echo "$platforms" | while read platform; do
|
||||
echo " - $platform"
|
||||
done
|
||||
else
|
||||
@@ -910,8 +1088,20 @@ cmd_version() {
|
||||
echo -e "========================================${NC}"
|
||||
echo ""
|
||||
|
||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||
show overview 2>/dev/null || error "无法连接到 RabbitMQ 服务器"
|
||||
local overview
|
||||
overview=$(mq_api_call GET "/overview" 2>/dev/null) || error "无法连接到 RabbitMQ 服务器"
|
||||
|
||||
echo "$overview" | php -r '
|
||||
$data = json_decode(file_get_contents("php://stdin"), true);
|
||||
if ($data) {
|
||||
echo "RabbitMQ 版本: " . ($data["rabbitmq_version"] ?? "未知") . "\n";
|
||||
echo "Erlang 版本: " . ($data["erlang_version"] ?? "未知") . "\n";
|
||||
echo "Management 版本: " . ($data["management_version"] ?? "未知") . "\n";
|
||||
echo "集群名称: " . ($data["cluster_name"] ?? "未知") . "\n";
|
||||
} else {
|
||||
echo "无法解析服务器信息\n";
|
||||
}
|
||||
' 2>/dev/null || error "无法解析服务器信息"
|
||||
}
|
||||
|
||||
# ============================================================================
|
||||
|
||||
Reference in New Issue
Block a user