From 6b8943f07fe959f8a4cf4f7844f5bc7d536d8c3b Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Mon, 17 Nov 2025 14:28:05 +0800 Subject: [PATCH] update rabbit scripts --- backend/bin/rabbitmq.sh | 242 ++++++++++++++++++++++++++++++++ docs/RabbitMQ.md | 298 +++++++++++++++++++++++++++------------- 2 files changed, 442 insertions(+), 98 deletions(-) create mode 100755 backend/bin/rabbitmq.sh diff --git a/backend/bin/rabbitmq.sh b/backend/bin/rabbitmq.sh new file mode 100755 index 0000000..da5fae7 --- /dev/null +++ b/backend/bin/rabbitmq.sh @@ -0,0 +1,242 @@ +#!/bin/bash + +# RabbitMQ 连接信息 +RABBITMQ_HOST="127.0.0.1" +RABBITMQ_PORT="15672" +RABBITMQ_USER="admin" +RABBITMQ_PASS="admin" +VHOST="dataflow" + +# 平台列表 +PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify") +DATA_TYPES=("orders" "products" "refunds" "inventory") + +# 0. 检测并清理现有 VHost 和用户(如果存在) +echo "========================================" +echo "开始清理现有配置..." +echo "========================================" + +# 检查 VHost 是否存在 +echo "检查 VHost: $VHOST 是否存在..." +VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true) + +if [ -n "$VHOST_EXISTS" ]; then + echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..." + + # 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限) + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete vhost --name "$VHOST" + + echo "✓ VHost '$VHOST' 及其所有资源已删除" +else + echo "VHost '$VHOST' 不存在" +fi + +# 清理所有相关用户账号 +echo "" +echo "清理用户账号..." + +# 平台用户列表 +ALL_USERS=() +for platform in "${PLATFORMS[@]}"; do + ALL_USERS+=("user_${platform}") +done + +# 添加其他用户 +ALL_USERS+=("user_dataflow_consumer") +ALL_USERS+=("user_ops") + +# 删除用户(如果存在) +for user in "${ALL_USERS[@]}"; do + USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true) + if [ -n "$USER_EXISTS" ]; then + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete user --name "$user" + echo "✓ 删除用户: $user" + fi +done + +echo "" +echo "========================================" +echo "清理完成,开始重新配置..." +echo "========================================" +echo "" + +# 1. 创建 VHost +echo "创建 VHost: $VHOST" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare vhost --name "$VHOST" + +# 2. 创建主 Exchange(用于重试消息回流) +echo "创建主 Exchange: main.exchange" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "main.exchange" --vhost "$VHOST" \ + --type topic --durable true + +# 3. 创建 DLX (死信交换机) +echo "创建死信交换机 (DLX)..." +for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \ + --type topic --durable true + echo "✓ 创建 DLX: dlx.${dtype}" +done + +# 4. 创建主业务队列(带 DLX 配置) +echo "" +echo "创建主业务队列..." +for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable true \ + --arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" + echo "✓ 创建队列: ${dtype}.queue" +done + +# 5. 创建重试队列(带 TTL 和 DLX 回流配置) +echo "" +echo "创建重试队列..." +for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \ + --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}" + echo "✓ 创建重试队列: ${dtype}.retry.queue" +done + +# 6. 创建错误队列 +echo "" +echo "创建错误队列..." +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "errors.queue" --vhost "$VHOST" --durable true \ + --arguments '{"x-message-ttl":604800000}' +echo "✓ 创建错误队列: errors.queue" + +# 7. 绑定主 Exchange 到主队列(使用通配符) +echo "" +echo "绑定主 Exchange 到主队列..." +for dtype in "${DATA_TYPES[@]}"; do + # 使用 order.#, product.# 等通配符匹配所有平台 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "main.exchange" --destination "${dtype}.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#" + echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)" +done + +# 8. 绑定 DLX 到重试队列和错误队列 +echo "" +echo "绑定 DLX 到重试队列和错误队列..." +for dtype in "${DATA_TYPES[@]}"; do + # DLX -> 重试队列 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "retry" + echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)" + + # DLX -> 错误队列 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "dlx.${dtype}" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "error" + echo "✓ 绑定: dlx.${dtype} → errors.queue (routing_key: error)" +done + +# 9. 为每个平台创建 Exchange 和 Binding +echo "" +echo "========================================" +echo "创建平台 Exchange 和 Binding..." +echo "========================================" +for platform in "${PLATFORMS[@]}"; do + echo "" + echo "处理平台: ${platform}" + + # 创建平台业务 Exchange + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "${platform}.exchange" --vhost "$VHOST" \ + --type topic --durable true + echo "✓ 创建业务 Exchange: ${platform}.exchange" + + # 绑定到业务队列(使用新的 routing key 格式:data_type.platform) + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "orders.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "order.${platform}" + + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "products.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "product.${platform}" + + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "refunds.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}" + + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "inventory.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}" + echo "✓ 绑定到业务队列 (4个)" + + # 创建平台错误 Exchange + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \ + --type topic --durable true + echo "✓ 创建错误 Exchange: ${platform}.errors.exchange" + + # 绑定到错误队列 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "#" + echo "✓ 绑定到错误队列" + + # 创建平台用户 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_${platform}" --password "change_me_${platform}" --tags "" + echo "✓ 创建用户: user_${platform}" + + # 配置平台用户权限 + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_${platform}" \ + --configure "" --write "^${platform}\\.(exchange|errors\\.exchange)$" \ + --read "^${platform}\\.errors\\..*$" + echo "✓ 配置用户权限" +done + +# 10. 创建 dataflow 消费者用户 +echo "" +echo "========================================" +echo "创建 dataflow 消费者用户..." +echo "========================================" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags "" +echo "✓ 创建用户: user_dataflow_consumer" + +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \ + --configure "^(orders|products|refunds|inventory).*\\.queue$" \ + --write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \ + --read "^(orders|products|refunds|inventory).*\\.queue$" +echo "✓ 配置用户权限" + +# 11. 创建运维监控用户 +echo "" +echo "创建运维监控用户..." +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_ops" --password "change_me_ops" --tags "" +echo "✓ 创建用户: user_ops" + +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_ops" \ + --configure "^errors\\..*$" --write "" --read "^errors\\.queue$" +echo "✓ 配置用户权限" + +echo "" +echo "========================================" +echo "RabbitMQ 配置完成!" +echo "========================================" +echo "" +echo "已创建:" +echo "- 1 个 VHost: $VHOST" +echo "- 1 个主 Exchange: main.exchange" +echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" +echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" +echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue" +echo "- 1 个错误队列: errors.queue" +echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)" +echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)" +echo "" +echo "提示: 请修改所有用户的默认密码 (change_me_*)" diff --git a/docs/RabbitMQ.md b/docs/RabbitMQ.md index b6f9f04..48d60c7 100644 --- a/docs/RabbitMQ.md +++ b/docs/RabbitMQ.md @@ -866,142 +866,242 @@ sudo mv rabbitmqadmin /usr/local/bin/ #### 配置脚本示例 +**注意**:本脚本适用于**新版本 rabbitmqadmin**(Rust 版本)。如果您使用的是旧版本(Python),请参考官方文档调整语法。 + ```bash #!/bin/bash # RabbitMQ 连接信息 -RABBITMQ_HOST="localhost:15672" +RABBITMQ_HOST="127.0.0.1" +RABBITMQ_PORT="15672" RABBITMQ_USER="admin" -RABBITMQ_PASS="admin_password" -VHOST="dataflow-app" +RABBITMQ_PASS="admin" +VHOST="dataflow" # 平台列表 PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify") DATA_TYPES=("orders" "products" "refunds" "inventory") +# 0. 检测并清理现有 VHost 和用户(如果存在) +echo "========================================" +echo "开始清理现有配置..." +echo "========================================" + +# 检查 VHost 是否存在 +echo "检查 VHost: $VHOST 是否存在..." +VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true) + +if [ -n "$VHOST_EXISTS" ]; then + echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..." + + # 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限) + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete vhost --name "$VHOST" + + echo "✓ VHost '$VHOST' 及其所有资源已删除" +else + echo "VHost '$VHOST' 不存在" +fi + +# 清理所有相关用户账号 +echo "" +echo "清理用户账号..." + +# 平台用户列表 +ALL_USERS=() +for platform in "${PLATFORMS[@]}"; do + ALL_USERS+=("user_${platform}") +done + +# 添加其他用户 +ALL_USERS+=("user_dataflow_consumer") +ALL_USERS+=("user_ops") + +# 删除用户(如果存在) +for user in "${ALL_USERS[@]}"; do + USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true) + if [ -n "$USER_EXISTS" ]; then + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + delete user --name "$user" + echo "✓ 删除用户: $user" + fi +done + +echo "" +echo "========================================" +echo "清理完成,开始重新配置..." +echo "========================================" +echo "" + # 1. 创建 VHost -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare vhost name=$VHOST +echo "创建 VHost: $VHOST" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare vhost --name "$VHOST" # 2. 创建主 Exchange(用于重试消息回流) -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange name="main.exchange" vhost=$VHOST \ - type=topic durable=true +echo "创建主 Exchange: main.exchange" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "main.exchange" --vhost "$VHOST" \ + --type topic --durable true # 3. 创建 DLX (死信交换机) +echo "创建死信交换机 (DLX)..." for dtype in "${DATA_TYPES[@]}"; do - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange name="dlx.${dtype}" vhost=$VHOST \ - type=topic durable=true + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \ + --type topic --durable true + echo "✓ 创建 DLX: dlx.${dtype}" done # 4. 创建主业务队列(带 DLX 配置) +echo "" +echo "创建主业务队列..." for dtype in "${DATA_TYPES[@]}"; do - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare queue name="${dtype}.queue" vhost=$VHOST durable=true \ - arguments="{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable \ + --arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" + echo "✓ 创建队列: ${dtype}.queue" done # 5. 创建重试队列(带 TTL 和 DLX 回流配置) +echo "" +echo "创建重试队列..." for dtype in "${DATA_TYPES[@]}"; do - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare queue name="${dtype}.retry.queue" vhost=$VHOST durable=true \ - arguments="{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable \ + --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}" + echo "✓ 创建重试队列: ${dtype}.retry.queue" done # 6. 创建错误队列 -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare queue name="errors.queue" vhost=$VHOST durable=true \ - arguments='{"x-message-ttl":604800000}' +echo "" +echo "创建错误队列..." +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue --name "errors.queue" --vhost "$VHOST" --durable \ + --arguments '{"x-message-ttl":604800000}' +echo "✓ 创建错误队列: errors.queue" # 7. 绑定主 Exchange 到主队列(使用通配符) +echo "" +echo "绑定主 Exchange 到主队列..." for dtype in "${DATA_TYPES[@]}"; do # 使用 order.#, product.# 等通配符匹配所有平台 - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="main.exchange" destination="${dtype}.queue" \ - vhost=$VHOST routing_key="${dtype%s}.#" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "main.exchange" --destination "${dtype}.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#" + echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)" done # 8. 绑定 DLX 到重试队列和错误队列 +echo "" +echo "绑定 DLX 到重试队列和错误队列..." for dtype in "${DATA_TYPES[@]}"; do # DLX -> 重试队列 - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="dlx.${dtype}" destination="${dtype}.retry.queue" \ - vhost=$VHOST routing_key="retry" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "retry" + echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)" # DLX -> 错误队列 - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="dlx.${dtype}" destination="errors.queue" \ - vhost=$VHOST routing_key="error" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "dlx.${dtype}" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "error" + echo "✓ 绑定: dlx.${dtype} → errors.queue (routing_key: error)" done # 9. 为每个平台创建 Exchange 和 Binding +echo "" +echo "========================================" +echo "创建平台 Exchange 和 Binding..." +echo "========================================" for platform in "${PLATFORMS[@]}"; do + echo "" + echo "处理平台: ${platform}" + # 创建平台业务 Exchange - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange name="${platform}.exchange" vhost=$VHOST \ - type=topic durable=true + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "${platform}.exchange" --vhost "$VHOST" \ + --type topic --durable true + echo "✓ 创建业务 Exchange: ${platform}.exchange" # 绑定到业务队列(使用新的 routing key 格式:data_type.platform) - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="${platform}.exchange" destination="orders.queue" \ - vhost=$VHOST routing_key="order.${platform}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "orders.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "order.${platform}" - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="${platform}.exchange" destination="products.queue" \ - vhost=$VHOST routing_key="product.${platform}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "products.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "product.${platform}" - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="${platform}.exchange" destination="refunds.queue" \ - vhost=$VHOST routing_key="refund.${platform}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "refunds.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}" - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="${platform}.exchange" destination="inventory.queue" \ - vhost=$VHOST routing_key="inventory.${platform}" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.exchange" --destination "inventory.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}" + echo "✓ 绑定到业务队列 (4个)" # 创建平台错误 Exchange - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange name="${platform}.errors.exchange" vhost=$VHOST \ - type=topic durable=true + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \ + --type topic --durable true + echo "✓ 创建错误 Exchange: ${platform}.errors.exchange" # 绑定到错误队列 - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="${platform}.errors.exchange" destination="errors.queue" \ - vhost=$VHOST routing_key="#" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "#" + echo "✓ 绑定到错误队列" # 创建平台用户 - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user name="user_${platform}" password="change_me_${platform}" tags="" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_${platform}" --password "change_me_${platform}" --tags "" + echo "✓ 创建用户: user_${platform}" # 配置平台用户权限 - rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permission vhost=$VHOST user="user_${platform}" \ - configure="" write="^${platform}\.(exchange|errors\.exchange)$" \ - read="^${platform}\.errors\..*$" + rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_${platform}" \ + --configure "" --write "^${platform}\\.(exchange|errors\\.exchange)$" \ + --read "^${platform}\\.errors\\..*$" + echo "✓ 配置用户权限" done # 10. 创建 dataflow 消费者用户 -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user name="user_dataflow_consumer" password="change_me_consumer" tags="" +echo "" +echo "========================================" +echo "创建 dataflow 消费者用户..." +echo "========================================" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags "" +echo "✓ 创建用户: user_dataflow_consumer" -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permission vhost=$VHOST user="user_dataflow_consumer" \ - configure="^(orders|products|refunds|inventory).*\.queue$" \ - write="(dlx\..*)|(.*\.errors\.exchange)$" \ - read="^(orders|products|refunds|inventory).*\.queue$" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \ + --configure "^(orders|products|refunds|inventory).*\\.queue$" \ + --write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \ + --read "^(orders|products|refunds|inventory).*\\.queue$" +echo "✓ 配置用户权限" # 11. 创建运维监控用户 -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user name="user_ops" password="change_me_ops" tags="" +echo "" +echo "创建运维监控用户..." +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_ops" --password "change_me_ops" --tags "" +echo "✓ 创建用户: user_ops" -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permission vhost=$VHOST user="user_ops" \ - configure="^errors\..*$" write="" read="^errors\.queue$" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost "$VHOST" --user "user_ops" \ + --configure "^errors\\..*$" --write "" --read "^errors\\.queue$" +echo "✓ 配置用户权限" +echo "" +echo "========================================" echo "RabbitMQ 配置完成!" +echo "========================================" echo "" echo "已创建:" -echo "- 1 个 VHost: dataflow-app" +echo "- 1 个 VHost: $VHOST" echo "- 1 个主 Exchange: main.exchange" echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" @@ -1009,6 +1109,8 @@ echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.ret echo "- 1 个错误队列: errors.queue" echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)" echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)" +echo "" +echo "提示: 请修改所有用户的默认密码 (change_me_*)" ``` #### 执行脚本 @@ -1083,7 +1185,7 @@ chmod +x setup_rabbitmq.sh ```bash # 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions) -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ export dataflow-backup.json ``` @@ -1091,7 +1193,7 @@ rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ ```bash # 从备份恢复配置 -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ import dataflow-backup.json ``` @@ -1162,54 +1264,54 @@ tail -f /var/log/rabbitmq/rabbit@_connection.log ```bash # 业务 Exchange -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange name="lazada.exchange" vhost=dataflow-app \ - type=topic durable=true +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "lazada.exchange" --vhost dataflow-app \ + --type topic --durable true # 错误 Exchange -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange name="lazada.errors.exchange" vhost=dataflow-app \ - type=topic durable=true +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "lazada.errors.exchange" --vhost dataflow-app \ + --type topic --durable true ``` ### 2. 创建 Binding ```bash # 绑定到业务队列 -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="lazada.exchange" destination="orders.queue" \ - vhost=dataflow-app routing_key="order" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "lazada.exchange" --destination "orders.queue" \ + --destination-type queue --vhost dataflow-app --routing-key "order.lazada" -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="lazada.exchange" destination="products.queue" \ - vhost=dataflow-app routing_key="product" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "lazada.exchange" --destination "products.queue" \ + --destination-type queue --vhost dataflow-app --routing-key "product.lazada" -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="lazada.exchange" destination="refunds.queue" \ - vhost=dataflow-app routing_key="refund" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "lazada.exchange" --destination "refunds.queue" \ + --destination-type queue --vhost dataflow-app --routing-key "refund.lazada" -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="lazada.exchange" destination="inventory.queue" \ - vhost=dataflow-app routing_key="inventory" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "lazada.exchange" --destination "inventory.queue" \ + --destination-type queue --vhost dataflow-app --routing-key "inventory.lazada" # 绑定到错误队列 -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding source="lazada.errors.exchange" \ - destination="errors.queue" vhost=dataflow-app routing_key="#" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "lazada.errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost dataflow-app --routing-key "#" ``` ### 3. 创建用户并配置权限 ```bash # 创建用户 -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user name="user_lazada" password="" tags="" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user --name "user_lazada" --password "" --tags "" # 配置权限 -rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permission vhost=dataflow-app user="user_lazada" \ - configure="" write="^lazada\.(exchange|errors\.exchange)$" \ - read="^lazada\.errors\..*$" +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permissions --vhost dataflow-app --user "user_lazada" \ + --configure "" --write "^lazada\\.(exchange|errors\\.exchange)$" \ + --read "^lazada\\.errors\\..*$" ``` ### 4. 提供给平台开发者