2025-11-17 14:28:05 +08:00
|
|
|
|
#!/bin/bash
|
|
|
|
|
|
|
|
|
|
|
|
# RabbitMQ 连接信息
|
|
|
|
|
|
RABBITMQ_HOST="127.0.0.1"
|
|
|
|
|
|
RABBITMQ_PORT="15672"
|
|
|
|
|
|
RABBITMQ_USER="admin"
|
|
|
|
|
|
RABBITMQ_PASS="admin"
|
|
|
|
|
|
VHOST="dataflow"
|
|
|
|
|
|
|
|
|
|
|
|
# 平台列表
|
|
|
|
|
|
PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify")
|
|
|
|
|
|
DATA_TYPES=("orders" "products" "refunds" "inventory")
|
|
|
|
|
|
|
|
|
|
|
|
# 0. 检测并清理现有 VHost 和用户(如果存在)
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
echo "开始清理现有配置..."
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
|
|
|
|
|
|
# 检查 VHost 是否存在
|
|
|
|
|
|
echo "检查 VHost: $VHOST 是否存在..."
|
|
|
|
|
|
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true)
|
|
|
|
|
|
|
|
|
|
|
|
if [ -n "$VHOST_EXISTS" ]; then
|
|
|
|
|
|
echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..."
|
|
|
|
|
|
|
|
|
|
|
|
# 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限)
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
delete vhost --name "$VHOST"
|
|
|
|
|
|
|
|
|
|
|
|
echo "✓ VHost '$VHOST' 及其所有资源已删除"
|
|
|
|
|
|
else
|
|
|
|
|
|
echo "VHost '$VHOST' 不存在"
|
|
|
|
|
|
fi
|
|
|
|
|
|
|
|
|
|
|
|
# 清理所有相关用户账号
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "清理用户账号..."
|
|
|
|
|
|
|
|
|
|
|
|
# 平台用户列表
|
|
|
|
|
|
ALL_USERS=()
|
|
|
|
|
|
for platform in "${PLATFORMS[@]}"; do
|
|
|
|
|
|
ALL_USERS+=("user_${platform}")
|
|
|
|
|
|
done
|
|
|
|
|
|
|
|
|
|
|
|
# 添加其他用户
|
|
|
|
|
|
ALL_USERS+=("user_dataflow_consumer")
|
|
|
|
|
|
ALL_USERS+=("user_ops")
|
|
|
|
|
|
|
|
|
|
|
|
# 删除用户(如果存在)
|
|
|
|
|
|
for user in "${ALL_USERS[@]}"; do
|
|
|
|
|
|
USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true)
|
|
|
|
|
|
if [ -n "$USER_EXISTS" ]; then
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
delete user --name "$user"
|
|
|
|
|
|
echo "✓ 删除用户: $user"
|
|
|
|
|
|
fi
|
|
|
|
|
|
done
|
|
|
|
|
|
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
echo "清理完成,开始重新配置..."
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
|
|
|
|
|
|
# 1. 创建 VHost
|
|
|
|
|
|
echo "创建 VHost: $VHOST"
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare vhost --name "$VHOST"
|
|
|
|
|
|
|
|
|
|
|
|
# 2. 创建主 Exchange(用于重试消息回流)
|
|
|
|
|
|
echo "创建主 Exchange: main.exchange"
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare exchange --name "main.exchange" --vhost "$VHOST" \
|
|
|
|
|
|
--type topic --durable true
|
|
|
|
|
|
|
|
|
|
|
|
# 3. 创建 DLX (死信交换机)
|
|
|
|
|
|
echo "创建死信交换机 (DLX)..."
|
|
|
|
|
|
for dtype in "${DATA_TYPES[@]}"; do
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \
|
|
|
|
|
|
--type topic --durable true
|
|
|
|
|
|
echo "✓ 创建 DLX: dlx.${dtype}"
|
|
|
|
|
|
done
|
|
|
|
|
|
|
|
|
|
|
|
# 4. 创建主业务队列(带 DLX 配置)
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "创建主业务队列..."
|
|
|
|
|
|
for dtype in "${DATA_TYPES[@]}"; do
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable true \
|
|
|
|
|
|
--arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
|
|
|
|
|
|
echo "✓ 创建队列: ${dtype}.queue"
|
|
|
|
|
|
done
|
|
|
|
|
|
|
|
|
|
|
|
# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "创建重试队列..."
|
|
|
|
|
|
for dtype in "${DATA_TYPES[@]}"; do
|
2025-12-01 16:31:13 +08:00
|
|
|
|
# 去掉末尾的 's' 得到单数形式(orders -> order)
|
|
|
|
|
|
dtype_singular="${dtype%s}"
|
|
|
|
|
|
|
|
|
|
|
|
# 配置回流时的 routing key 为 {type}.retry,可以被 {type}.# 匹配
|
2025-11-17 14:28:05 +08:00
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \
|
2025-12-01 16:31:13 +08:00
|
|
|
|
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}"
|
|
|
|
|
|
echo "✓ 创建重试队列: ${dtype}.retry.queue (回流 routing key: ${dtype_singular}.retry)"
|
2025-11-17 14:28:05 +08:00
|
|
|
|
done
|
|
|
|
|
|
|
2025-12-01 16:31:13 +08:00
|
|
|
|
# 6. 创建错误 Exchange 和错误队列
|
2025-11-17 14:28:05 +08:00
|
|
|
|
echo ""
|
2025-12-01 16:31:13 +08:00
|
|
|
|
echo "创建错误 Exchange 和错误队列..."
|
|
|
|
|
|
# 创建通用错误 Exchange(供 Consumer 发送错误消息使用)
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare exchange --name "errors.exchange" --vhost "$VHOST" \
|
|
|
|
|
|
--type topic --durable true
|
|
|
|
|
|
echo "✓ 创建错误 Exchange: errors.exchange"
|
|
|
|
|
|
|
|
|
|
|
|
# 创建错误队列
|
2025-11-17 14:28:05 +08:00
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare queue --name "errors.queue" --vhost "$VHOST" --durable true \
|
|
|
|
|
|
--arguments '{"x-message-ttl":604800000}'
|
|
|
|
|
|
echo "✓ 创建错误队列: errors.queue"
|
|
|
|
|
|
|
2025-12-01 16:31:13 +08:00
|
|
|
|
# 绑定通用错误 Exchange 到错误队列
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare binding --source "errors.exchange" --destination "errors.queue" \
|
|
|
|
|
|
--destination-type queue --vhost "$VHOST" --routing-key "#"
|
|
|
|
|
|
echo "✓ 绑定: errors.exchange → errors.queue (routing_key: #)"
|
|
|
|
|
|
|
2025-11-17 14:28:05 +08:00
|
|
|
|
# 7. 绑定主 Exchange 到主队列(使用通配符)
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "绑定主 Exchange 到主队列..."
|
|
|
|
|
|
for dtype in "${DATA_TYPES[@]}"; do
|
|
|
|
|
|
# 使用 order.#, product.# 等通配符匹配所有平台
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare binding --source "main.exchange" --destination "${dtype}.queue" \
|
|
|
|
|
|
--destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#"
|
|
|
|
|
|
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
|
|
|
|
|
|
done
|
|
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
# 8. 绑定 DLX 到重试队列
|
2025-11-17 14:28:05 +08:00
|
|
|
|
echo ""
|
2025-12-01 11:13:42 +08:00
|
|
|
|
echo "绑定 DLX 到重试队列..."
|
2025-11-17 14:28:05 +08:00
|
|
|
|
for dtype in "${DATA_TYPES[@]}"; do
|
|
|
|
|
|
# DLX -> 重试队列
|
2025-12-01 11:13:42 +08:00
|
|
|
|
# 注意:方案B中,超过重试次数的消息由 Consumer 直接发送到 errors.queue
|
|
|
|
|
|
# 不使用 DLX 的 routing_key="error" 路由,因此不需要绑定到 errors.queue
|
2025-11-17 14:28:05 +08:00
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
|
|
|
|
|
|
--destination-type queue --vhost "$VHOST" --routing-key "retry"
|
|
|
|
|
|
echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)"
|
|
|
|
|
|
done
|
|
|
|
|
|
|
|
|
|
|
|
# 9. 为每个平台创建 Exchange 和 Binding
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
echo "创建平台 Exchange 和 Binding..."
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
for platform in "${PLATFORMS[@]}"; do
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "处理平台: ${platform}"
|
|
|
|
|
|
|
|
|
|
|
|
# 创建平台业务 Exchange
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare exchange --name "${platform}.exchange" --vhost "$VHOST" \
|
|
|
|
|
|
--type topic --durable true
|
|
|
|
|
|
echo "✓ 创建业务 Exchange: ${platform}.exchange"
|
|
|
|
|
|
|
|
|
|
|
|
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform)
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare binding --source "${platform}.exchange" --destination "orders.queue" \
|
|
|
|
|
|
--destination-type queue --vhost "$VHOST" --routing-key "order.${platform}"
|
|
|
|
|
|
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare binding --source "${platform}.exchange" --destination "products.queue" \
|
|
|
|
|
|
--destination-type queue --vhost "$VHOST" --routing-key "product.${platform}"
|
|
|
|
|
|
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare binding --source "${platform}.exchange" --destination "refunds.queue" \
|
|
|
|
|
|
--destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}"
|
|
|
|
|
|
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare binding --source "${platform}.exchange" --destination "inventory.queue" \
|
|
|
|
|
|
--destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}"
|
|
|
|
|
|
echo "✓ 绑定到业务队列 (4个)"
|
|
|
|
|
|
|
|
|
|
|
|
# 创建平台错误 Exchange
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \
|
|
|
|
|
|
--type topic --durable true
|
|
|
|
|
|
echo "✓ 创建错误 Exchange: ${platform}.errors.exchange"
|
|
|
|
|
|
|
|
|
|
|
|
# 绑定到错误队列
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \
|
|
|
|
|
|
--destination-type queue --vhost "$VHOST" --routing-key "#"
|
|
|
|
|
|
echo "✓ 绑定到错误队列"
|
|
|
|
|
|
|
|
|
|
|
|
# 创建平台用户
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare user --name "user_${platform}" --password "change_me_${platform}" --tags ""
|
|
|
|
|
|
echo "✓ 创建用户: user_${platform}"
|
|
|
|
|
|
|
|
|
|
|
|
# 配置平台用户权限
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare permissions --vhost "$VHOST" --user "user_${platform}" \
|
2025-11-17 16:51:51 +08:00
|
|
|
|
--configure "^${platform}\\.(exchange|errors\\.exchange)$" \
|
|
|
|
|
|
--write "^${platform}\\.(exchange|errors\\.exchange)$" \
|
2025-11-17 14:28:05 +08:00
|
|
|
|
--read "^${platform}\\.errors\\..*$"
|
|
|
|
|
|
echo "✓ 配置用户权限"
|
|
|
|
|
|
done
|
|
|
|
|
|
|
|
|
|
|
|
# 10. 创建 dataflow 消费者用户
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
echo "创建 dataflow 消费者用户..."
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags ""
|
|
|
|
|
|
echo "✓ 创建用户: user_dataflow_consumer"
|
|
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
# Consumer 需要完整的权限来声明和消费队列
|
2025-12-01 16:31:13 +08:00
|
|
|
|
# - configure: 允许声明 main.exchange、DLX exchanges、通用 errors.exchange 和所有队列
|
|
|
|
|
|
# - write: 允许向业务队列写入(ACK/NACK)、DLX、通用 errors.exchange 和平台 errors exchanges
|
2025-12-01 11:13:42 +08:00
|
|
|
|
# - read: 允许读取 main.exchange、业务队列和 DLX exchanges
|
2025-11-17 14:28:05 +08:00
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \
|
2025-12-01 16:31:13 +08:00
|
|
|
|
--configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \
|
|
|
|
|
|
--write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \
|
2025-12-01 11:13:42 +08:00
|
|
|
|
--read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$"
|
2025-12-01 16:31:13 +08:00
|
|
|
|
echo "✓ 配置用户权限 (configure: main.exchange+errors.exchange+DLX+queues, write: business queues+DLX+errors.exchange, read: main.exchange+business queues+DLX)"
|
2025-11-17 14:28:05 +08:00
|
|
|
|
|
|
|
|
|
|
# 11. 创建运维监控用户
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "创建运维监控用户..."
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare user --name "user_ops" --password "change_me_ops" --tags ""
|
|
|
|
|
|
echo "✓ 创建用户: user_ops"
|
|
|
|
|
|
|
|
|
|
|
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
|
|
|
|
declare permissions --vhost "$VHOST" --user "user_ops" \
|
|
|
|
|
|
--configure "^errors\\..*$" --write "" --read "^errors\\.queue$"
|
|
|
|
|
|
echo "✓ 配置用户权限"
|
|
|
|
|
|
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
echo "RabbitMQ 配置完成!"
|
|
|
|
|
|
echo "========================================"
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "已创建:"
|
|
|
|
|
|
echo "- 1 个 VHost: $VHOST"
|
|
|
|
|
|
echo "- 1 个主 Exchange: main.exchange"
|
|
|
|
|
|
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
|
|
|
|
|
|
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
|
|
|
|
|
|
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
|
|
|
|
|
|
echo "- 1 个错误队列: errors.queue"
|
|
|
|
|
|
echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)"
|
|
|
|
|
|
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
|
|
|
|
|
|
echo ""
|
|
|
|
|
|
echo "提示: 请修改所有用户的默认密码 (change_me_*)"
|