update rabbit scripts

This commit is contained in:
2025-11-17 14:28:05 +08:00
parent 46073d666c
commit 6b8943f07f
2 changed files with 442 additions and 98 deletions
+242
View File
@@ -0,0 +1,242 @@
#!/bin/bash
# RabbitMQ 连接信息
RABBITMQ_HOST="127.0.0.1"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin"
VHOST="dataflow"
# 平台列表
PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify")
DATA_TYPES=("orders" "products" "refunds" "inventory")
# 0. 检测并清理现有 VHost 和用户(如果存在)
echo "========================================"
echo "开始清理现有配置..."
echo "========================================"
# 检查 VHost 是否存在
echo "检查 VHost: $VHOST 是否存在..."
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true)
if [ -n "$VHOST_EXISTS" ]; then
echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..."
# 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete vhost --name "$VHOST"
echo "✓ VHost '$VHOST' 及其所有资源已删除"
else
echo "VHost '$VHOST' 不存在"
fi
# 清理所有相关用户账号
echo ""
echo "清理用户账号..."
# 平台用户列表
ALL_USERS=()
for platform in "${PLATFORMS[@]}"; do
ALL_USERS+=("user_${platform}")
done
# 添加其他用户
ALL_USERS+=("user_dataflow_consumer")
ALL_USERS+=("user_ops")
# 删除用户(如果存在)
for user in "${ALL_USERS[@]}"; do
USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true)
if [ -n "$USER_EXISTS" ]; then
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete user --name "$user"
echo "✓ 删除用户: $user"
fi
done
echo ""
echo "========================================"
echo "清理完成,开始重新配置..."
echo "========================================"
echo ""
# 1. 创建 VHost
echo "创建 VHost: $VHOST"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare vhost --name "$VHOST"
# 2. 创建主 Exchange(用于重试消息回流)
echo "创建主 Exchange: main.exchange"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "main.exchange" --vhost "$VHOST" \
--type topic --durable true
# 3. 创建 DLX (死信交换机)
echo "创建死信交换机 (DLX)..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建 DLX: dlx.${dtype}"
done
# 4. 创建主业务队列(带 DLX 配置)
echo ""
echo "创建主业务队列..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable true \
--arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
echo "✓ 创建队列: ${dtype}.queue"
done
# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
echo ""
echo "创建重试队列..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}"
echo "✓ 创建重试队列: ${dtype}.retry.queue"
done
# 6. 创建错误队列
echo ""
echo "创建错误队列..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "errors.queue" --vhost "$VHOST" --durable true \
--arguments '{"x-message-ttl":604800000}'
echo "✓ 创建错误队列: errors.queue"
# 7. 绑定主 Exchange 到主队列(使用通配符)
echo ""
echo "绑定主 Exchange 到主队列..."
for dtype in "${DATA_TYPES[@]}"; do
# 使用 order.#, product.# 等通配符匹配所有平台
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "main.exchange" --destination "${dtype}.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#"
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
done
# 8. 绑定 DLX 到重试队列和错误队列
echo ""
echo "绑定 DLX 到重试队列和错误队列..."
for dtype in "${DATA_TYPES[@]}"; do
# DLX -> 重试队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "retry"
echo "✓ 绑定: dlx.${dtype}${dtype}.retry.queue (routing_key: retry)"
# DLX -> 错误队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "dlx.${dtype}" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "error"
echo "✓ 绑定: dlx.${dtype} → errors.queue (routing_key: error)"
done
# 9. 为每个平台创建 Exchange 和 Binding
echo ""
echo "========================================"
echo "创建平台 Exchange 和 Binding..."
echo "========================================"
for platform in "${PLATFORMS[@]}"; do
echo ""
echo "处理平台: ${platform}"
# 创建平台业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建业务 Exchange: ${platform}.exchange"
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "orders.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "order.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "products.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "product.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "refunds.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "inventory.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}"
echo "✓ 绑定到业务队列 (4个)"
# 创建平台错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建错误 Exchange: ${platform}.errors.exchange"
# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "#"
echo "✓ 绑定到错误队列"
# 创建平台用户
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_${platform}" --password "change_me_${platform}" --tags ""
echo "✓ 创建用户: user_${platform}"
# 配置平台用户权限
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_${platform}" \
--configure "" --write "^${platform}\\.(exchange|errors\\.exchange)$" \
--read "^${platform}\\.errors\\..*$"
echo "✓ 配置用户权限"
done
# 10. 创建 dataflow 消费者用户
echo ""
echo "========================================"
echo "创建 dataflow 消费者用户..."
echo "========================================"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags ""
echo "✓ 创建用户: user_dataflow_consumer"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \
--configure "^(orders|products|refunds|inventory).*\\.queue$" \
--write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \
--read "^(orders|products|refunds|inventory).*\\.queue$"
echo "✓ 配置用户权限"
# 11. 创建运维监控用户
echo ""
echo "创建运维监控用户..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_ops" --password "change_me_ops" --tags ""
echo "✓ 创建用户: user_ops"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_ops" \
--configure "^errors\\..*$" --write "" --read "^errors\\.queue$"
echo "✓ 配置用户权限"
echo ""
echo "========================================"
echo "RabbitMQ 配置完成!"
echo "========================================"
echo ""
echo "已创建:"
echo "- 1 个 VHost: $VHOST"
echo "- 1 个主 Exchange: main.exchange"
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
echo "- 1 个错误队列: errors.queue"
echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)"
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
echo ""
echo "提示: 请修改所有用户的默认密码 (change_me_*)"
+200 -98
View File
@@ -866,142 +866,242 @@ sudo mv rabbitmqadmin /usr/local/bin/
#### 配置脚本示例 #### 配置脚本示例
**注意**:本脚本适用于**新版本 rabbitmqadmin**Rust 版本)。如果您使用的是旧版本(Python),请参考官方文档调整语法。
```bash ```bash
#!/bin/bash #!/bin/bash
# RabbitMQ 连接信息 # RabbitMQ 连接信息
RABBITMQ_HOST="localhost:15672" RABBITMQ_HOST="127.0.0.1"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin" RABBITMQ_USER="admin"
RABBITMQ_PASS="admin_password" RABBITMQ_PASS="admin"
VHOST="dataflow-app" VHOST="dataflow"
# 平台列表 # 平台列表
PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify") PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify")
DATA_TYPES=("orders" "products" "refunds" "inventory") DATA_TYPES=("orders" "products" "refunds" "inventory")
# 0. 检测并清理现有 VHost 和用户(如果存在)
echo "========================================"
echo "开始清理现有配置..."
echo "========================================"
# 检查 VHost 是否存在
echo "检查 VHost: $VHOST 是否存在..."
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true)
if [ -n "$VHOST_EXISTS" ]; then
echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..."
# 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete vhost --name "$VHOST"
echo "✓ VHost '$VHOST' 及其所有资源已删除"
else
echo "VHost '$VHOST' 不存在"
fi
# 清理所有相关用户账号
echo ""
echo "清理用户账号..."
# 平台用户列表
ALL_USERS=()
for platform in "${PLATFORMS[@]}"; do
ALL_USERS+=("user_${platform}")
done
# 添加其他用户
ALL_USERS+=("user_dataflow_consumer")
ALL_USERS+=("user_ops")
# 删除用户(如果存在)
for user in "${ALL_USERS[@]}"; do
USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true)
if [ -n "$USER_EXISTS" ]; then
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete user --name "$user"
echo "✓ 删除用户: $user"
fi
done
echo ""
echo "========================================"
echo "清理完成,开始重新配置..."
echo "========================================"
echo ""
# 1. 创建 VHost # 1. 创建 VHost
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ echo "创建 VHost: $VHOST"
declare vhost name=$VHOST rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare vhost --name "$VHOST"
# 2. 创建主 Exchange(用于重试消息回流) # 2. 创建主 Exchange(用于重试消息回流)
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ echo "创建主 Exchange: main.exchange"
declare exchange name="main.exchange" vhost=$VHOST \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
type=topic durable=true declare exchange --name "main.exchange" --vhost "$VHOST" \
--type topic --durable true
# 3. 创建 DLX (死信交换机) # 3. 创建 DLX (死信交换机)
echo "创建死信交换机 (DLX)..."
for dtype in "${DATA_TYPES[@]}"; do for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="dlx.${dtype}" vhost=$VHOST \ declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \
type=topic durable=true --type topic --durable true
echo "✓ 创建 DLX: dlx.${dtype}"
done done
# 4. 创建主业务队列(带 DLX 配置) # 4. 创建主业务队列(带 DLX 配置)
echo ""
echo "创建主业务队列..."
for dtype in "${DATA_TYPES[@]}"; do for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue name="${dtype}.queue" vhost=$VHOST durable=true \ declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable \
arguments="{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" --arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
echo "✓ 创建队列: ${dtype}.queue"
done done
# 5. 创建重试队列(带 TTL 和 DLX 回流配置) # 5. 创建重试队列(带 TTL 和 DLX 回流配置)
echo ""
echo "创建重试队列..."
for dtype in "${DATA_TYPES[@]}"; do for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue name="${dtype}.retry.queue" vhost=$VHOST durable=true \ declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable \
arguments="{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}" --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}"
echo "✓ 创建重试队列: ${dtype}.retry.queue"
done done
# 6. 创建错误队列 # 6. 创建错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ echo ""
declare queue name="errors.queue" vhost=$VHOST durable=true \ echo "创建错误队列..."
arguments='{"x-message-ttl":604800000}' rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "errors.queue" --vhost "$VHOST" --durable \
--arguments '{"x-message-ttl":604800000}'
echo "✓ 创建错误队列: errors.queue"
# 7. 绑定主 Exchange 到主队列(使用通配符) # 7. 绑定主 Exchange 到主队列(使用通配符)
echo ""
echo "绑定主 Exchange 到主队列..."
for dtype in "${DATA_TYPES[@]}"; do for dtype in "${DATA_TYPES[@]}"; do
# 使用 order.#, product.# 等通配符匹配所有平台 # 使用 order.#, product.# 等通配符匹配所有平台
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="main.exchange" destination="${dtype}.queue" \ declare binding --source "main.exchange" --destination "${dtype}.queue" \
vhost=$VHOST routing_key="${dtype%s}.#" --destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#"
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
done done
# 8. 绑定 DLX 到重试队列和错误队列 # 8. 绑定 DLX 到重试队列和错误队列
echo ""
echo "绑定 DLX 到重试队列和错误队列..."
for dtype in "${DATA_TYPES[@]}"; do for dtype in "${DATA_TYPES[@]}"; do
# DLX -> 重试队列 # DLX -> 重试队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="dlx.${dtype}" destination="${dtype}.retry.queue" \ declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
vhost=$VHOST routing_key="retry" --destination-type queue --vhost "$VHOST" --routing-key "retry"
echo "✓ 绑定: dlx.${dtype}${dtype}.retry.queue (routing_key: retry)"
# DLX -> 错误队列 # DLX -> 错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="dlx.${dtype}" destination="errors.queue" \ declare binding --source "dlx.${dtype}" --destination "errors.queue" \
vhost=$VHOST routing_key="error" --destination-type queue --vhost "$VHOST" --routing-key "error"
echo "✓ 绑定: dlx.${dtype} → errors.queue (routing_key: error)"
done done
# 9. 为每个平台创建 Exchange 和 Binding # 9. 为每个平台创建 Exchange 和 Binding
echo ""
echo "========================================"
echo "创建平台 Exchange 和 Binding..."
echo "========================================"
for platform in "${PLATFORMS[@]}"; do for platform in "${PLATFORMS[@]}"; do
echo ""
echo "处理平台: ${platform}"
# 创建平台业务 Exchange # 创建平台业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="${platform}.exchange" vhost=$VHOST \ declare exchange --name "${platform}.exchange" --vhost "$VHOST" \
type=topic durable=true --type topic --durable true
echo "✓ 创建业务 Exchange: ${platform}.exchange"
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform # 绑定到业务队列(使用新的 routing key 格式:data_type.platform
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="orders.queue" \ declare binding --source "${platform}.exchange" --destination "orders.queue" \
vhost=$VHOST routing_key="order.${platform}" --destination-type queue --vhost "$VHOST" --routing-key "order.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="products.queue" \ declare binding --source "${platform}.exchange" --destination "products.queue" \
vhost=$VHOST routing_key="product.${platform}" --destination-type queue --vhost "$VHOST" --routing-key "product.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="refunds.queue" \ declare binding --source "${platform}.exchange" --destination "refunds.queue" \
vhost=$VHOST routing_key="refund.${platform}" --destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="inventory.queue" \ declare binding --source "${platform}.exchange" --destination "inventory.queue" \
vhost=$VHOST routing_key="inventory.${platform}" --destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}"
echo "✓ 绑定到业务队列 (4个)"
# 创建平台错误 Exchange # 创建平台错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="${platform}.errors.exchange" vhost=$VHOST \ declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \
type=topic durable=true --type topic --durable true
echo "✓ 创建错误 Exchange: ${platform}.errors.exchange"
# 绑定到错误队列 # 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.errors.exchange" destination="errors.queue" \ declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \
vhost=$VHOST routing_key="#" --destination-type queue --vhost "$VHOST" --routing-key "#"
echo "✓ 绑定到错误队列"
# 创建平台用户 # 创建平台用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_${platform}" password="change_me_${platform}" tags="" declare user --name "user_${platform}" --password "change_me_${platform}" --tags ""
echo "✓ 创建用户: user_${platform}"
# 配置平台用户权限 # 配置平台用户权限
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=$VHOST user="user_${platform}" \ declare permissions --vhost "$VHOST" --user "user_${platform}" \
configure="" write="^${platform}\.(exchange|errors\.exchange)$" \ --configure "" --write "^${platform}\\.(exchange|errors\\.exchange)$" \
read="^${platform}\.errors\..*$" --read "^${platform}\\.errors\\..*$"
echo "✓ 配置用户权限"
done done
# 10. 创建 dataflow 消费者用户 # 10. 创建 dataflow 消费者用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ echo ""
declare user name="user_dataflow_consumer" password="change_me_consumer" tags="" echo "========================================"
echo "创建 dataflow 消费者用户..."
echo "========================================"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags ""
echo "✓ 创建用户: user_dataflow_consumer"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=$VHOST user="user_dataflow_consumer" \ declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \
configure="^(orders|products|refunds|inventory).*\.queue$" \ --configure "^(orders|products|refunds|inventory).*\\.queue$" \
write="(dlx\..*)|(.*\.errors\.exchange)$" \ --write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \
read="^(orders|products|refunds|inventory).*\.queue$" --read "^(orders|products|refunds|inventory).*\\.queue$"
echo "✓ 配置用户权限"
# 11. 创建运维监控用户 # 11. 创建运维监控用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ echo ""
declare user name="user_ops" password="change_me_ops" tags="" echo "创建运维监控用户..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_ops" --password "change_me_ops" --tags ""
echo "✓ 创建用户: user_ops"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=$VHOST user="user_ops" \ declare permissions --vhost "$VHOST" --user "user_ops" \
configure="^errors\..*$" write="" read="^errors\.queue$" --configure "^errors\\..*$" --write "" --read "^errors\\.queue$"
echo "✓ 配置用户权限"
echo ""
echo "========================================"
echo "RabbitMQ 配置完成!" echo "RabbitMQ 配置完成!"
echo "========================================"
echo "" echo ""
echo "已创建:" echo "已创建:"
echo "- 1 个 VHost: dataflow-app" echo "- 1 个 VHost: $VHOST"
echo "- 1 个主 Exchange: main.exchange" echo "- 1 个主 Exchange: main.exchange"
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
@@ -1009,6 +1109,8 @@ echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.ret
echo "- 1 个错误队列: errors.queue" echo "- 1 个错误队列: errors.queue"
echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)" echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)"
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)" echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
echo ""
echo "提示: 请修改所有用户的默认密码 (change_me_*)"
``` ```
#### 执行脚本 #### 执行脚本
@@ -1083,7 +1185,7 @@ chmod +x setup_rabbitmq.sh
```bash ```bash
# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions # 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
export dataflow-backup.json export dataflow-backup.json
``` ```
@@ -1091,7 +1193,7 @@ rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
```bash ```bash
# 从备份恢复配置 # 从备份恢复配置
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
import dataflow-backup.json import dataflow-backup.json
``` ```
@@ -1162,54 +1264,54 @@ tail -f /var/log/rabbitmq/rabbit@<hostname>_connection.log
```bash ```bash
# 业务 Exchange # 业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="lazada.exchange" vhost=dataflow-app \ declare exchange --name "lazada.exchange" --vhost dataflow-app \
type=topic durable=true --type topic --durable true
# 错误 Exchange # 错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="lazada.errors.exchange" vhost=dataflow-app \ declare exchange --name "lazada.errors.exchange" --vhost dataflow-app \
type=topic durable=true --type topic --durable true
``` ```
### 2. 创建 Binding ### 2. 创建 Binding
```bash ```bash
# 绑定到业务队列 # 绑定到业务队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="orders.queue" \ declare binding --source "lazada.exchange" --destination "orders.queue" \
vhost=dataflow-app routing_key="order" --destination-type queue --vhost dataflow-app --routing-key "order.lazada"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="products.queue" \ declare binding --source "lazada.exchange" --destination "products.queue" \
vhost=dataflow-app routing_key="product" --destination-type queue --vhost dataflow-app --routing-key "product.lazada"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="refunds.queue" \ declare binding --source "lazada.exchange" --destination "refunds.queue" \
vhost=dataflow-app routing_key="refund" --destination-type queue --vhost dataflow-app --routing-key "refund.lazada"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="inventory.queue" \ declare binding --source "lazada.exchange" --destination "inventory.queue" \
vhost=dataflow-app routing_key="inventory" --destination-type queue --vhost dataflow-app --routing-key "inventory.lazada"
# 绑定到错误队列 # 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.errors.exchange" \ declare binding --source "lazada.errors.exchange" --destination "errors.queue" \
destination="errors.queue" vhost=dataflow-app routing_key="#" --destination-type queue --vhost dataflow-app --routing-key "#"
``` ```
### 3. 创建用户并配置权限 ### 3. 创建用户并配置权限
```bash ```bash
# 创建用户 # 创建用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_lazada" password="<secure_password>" tags="" declare user --name "user_lazada" --password "<secure_password>" --tags ""
# 配置权限 # 配置权限
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=dataflow-app user="user_lazada" \ declare permissions --vhost dataflow-app --user "user_lazada" \
configure="" write="^lazada\.(exchange|errors\.exchange)$" \ --configure "" --write "^lazada\\.(exchange|errors\\.exchange)$" \
read="^lazada\.errors\..*$" --read "^lazada\\.errors\\..*$"
``` ```
### 4. 提供给平台开发者 ### 4. 提供给平台开发者