44 KiB
RabbitMQ 配置方案
概述
本文档描述 dataflow 应用的 RabbitMQ 消息队列架构设计和配置方法。
业务背景
- 应用角色:dataflow 是消息消费者,负责接收来自多个电商平台的数据并入库
- 数据来源:多个电商平台(DouYin、JD、Tmall、RedBook、Amazon Japan、Naver、Rakuten、Shopify 等)
- 数据类型:订单(orders)、产品(products)、退款(refunds)、库存(inventory)
- 隔离需求:不同平台开发者之间的工作空间需要相互隔离,互不影响
架构特点
- VHost 隔离:按应用划分 VHost,支持多业务复用
- Exchange 隔离:每个平台独立 Exchange,通过权限控制访问
- 单队列设计:每种数据类型一个队列,所有平台共享(orders.queue/products.queue/refunds.queue/inventory.queue),保证严格 FIFO 顺序
- 独立消费者:每种数据类型配备独立消费者,采用批处理 + prefetch + 适配器模式,提升吞吐与稳定性
- 智能重试:DLX(死信交换机)+ 延迟重试队列 + 错误队列,优雅处理失败消息,不阻塞主队列
- 消息持久化:RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API
设计方案总结
核心设计理念
本方案采用 Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。
关键设计决策
1. 为什么使用单队列?
| 问题 | 传统多队列方案 | 本方案(单队列) |
|---|---|---|
| 队列数量 | 8平台 × 4数据类型 = 32个队列 | 4个数据类型队列(队列数量减少 87.5%) |
| FIFO 保证 | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 |
| 运维复杂度 | 需要监控 32 个队列 | 只需监控 4 个主队列 |
| 消费者部署 | 可能需要 32 个消费者实例 | 只需 4 个消费者实例 |
原因:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。
2. 为什么使用单消费者?
原因:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。
优势:
- ✅ 保证严格 FIFO 顺序
- ✅ 避免消息错投到错误的消费者
- ✅ 简化部署和运维
如何实现多平台支持? 通过消费者内部的适配器模式,根据消息的 platform 字段分发到不同业务逻辑。
3. 为什么使用 Prefetch + 批处理?
原因:RabbitMQ 是 Push 模式,不提供 Kafka 式的批量拉取 API。
解决方案:
- 设置
prefetch_count=100:RabbitMQ 最多推送 100 条未确认消息 - 消费者自行缓存消息,达到
batch_size=50或timeout=3s后批量处理 - 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性
优势:
- ✅ 提升数据库写入效率(批量插入)
- ✅ 控制并发和内存压力
- ✅ 失败消息不影响其他消息(单条 ACK)
4. 为什么使用 DLX + 延迟重试队列?
问题:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。
传统方案的问题:
- ❌
nack(requeue=true):消息回队列头部,无限循环 - ❌ 直接丢弃:数据丢失
- ❌ 立即重试:打爆 API/DB
本方案(DLX + 延迟重试):
消费失败 → nack(requeue=false) → 进入 DLX
↓
x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列
↓
x-retries >= 3 → 错误队列 (人工处理)
优势:
- ✅ 不会卡 FIFO
- ✅ 不会无限重试
- ✅ 延迟重试避免打爆 API/DB
- ✅ 错误消息自动隔离
- ✅ 无需额外 Redis 或手写 retry 逻辑
方案优势总结
| 维度 | 优势 |
|---|---|
| 可靠性 | 消息持久化 + 单条 ACK + 智能重试,避免数据丢失 |
| 性能 | Prefetch + 批处理 + 数据库批量写入,提升吞吐 |
| 顺序性 | 单队列 FIFO 保证严格顺序 |
| 扩展性 | 适配器模式,新增平台只需添加适配器类 |
| 运维 | 队列数量少,监控简单,故障排查容易 |
| 容错 | DLX 智能重试,失败消息不阻塞主队列 |
适用场景
✅ 适合:
- 多平台数据同步(电商、物流、金融等)
- 需要严格 FIFO 顺序
- 需要高可靠性和高吞吐
- 平台数量多(> 5个),数据类型固定(< 10个)
❌ 不适合:
- 实时性要求极高(< 100ms)
- 需要按平台独立控制消费速率
- 平台数量少(< 3个),可以使用独立队列
架构设计
1. VHost 设计
VHost 按业务应用进行划分,便于资源隔离和权限管理。
RabbitMQ 实例
├── dataflow-app # Dataflow 应用专用
├── analytics-app # 分析应用(预留)
└── warehouse-app # 数据仓库应用(预留)
当前需要创建的 VHost:
dataflow-app:用于电商数据流转
2. Exchange 设计
在 dataflow-app VHost 中,Exchange 分为两类:
业务 Exchange(用于数据投递)
每个平台一个独立的 Topic Exchange:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
douyin.exchange |
Topic | 是 | DouYin(抖音)平台数据入口 |
jd.exchange |
Topic | 是 | JD(京东)平台数据入口 |
tmall.exchange |
Topic | 是 | Tmall(天猫)平台数据入口 |
redbook.exchange |
Topic | 是 | RedBook(小红书)平台数据入口 |
amazon.exchange |
Topic | 是 | Amazon Japan 平台数据入口 |
naver.exchange |
Topic | 是 | Naver 平台数据入口 |
rakuten.exchange |
Topic | 是 | Rakuten(乐天)平台数据入口 |
shopify.exchange |
Topic | 是 | Shopify 平台数据入口 |
Routing Key 规范(采用 {data_type}.{platform} 格式):
| 平台 | 订单 | 产品 | 退款 | 库存 |
|---|---|---|---|---|
| DouYin | order.douyin |
product.douyin |
refund.douyin |
inventory.douyin |
| JD | order.jd |
product.jd |
refund.jd |
inventory.jd |
| Tmall | order.tmall |
product.tmall |
refund.tmall |
inventory.tmall |
| RedBook | order.redbook |
product.redbook |
refund.redbook |
inventory.redbook |
| Amazon | order.amazon |
product.amazon |
refund.amazon |
inventory.amazon |
| Naver | order.naver |
product.naver |
refund.naver |
inventory.naver |
| Rakuten | order.rakuten |
product.rakuten |
refund.rakuten |
inventory.rakuten |
| Shopify | order.shopify |
product.shopify |
refund.shopify |
inventory.shopify |
设计说明:
- 使用 Topic Exchange 的通配符特性,队列可通过
order.#匹配所有平台的订单 - 保证单队列内所有平台消息的 FIFO 顺序
- 消费者通过解析 routing key 或消息体的 platform 字段进行业务分发
错误 Exchange(用于错误通知)
每个平台一个独立的错误 Topic Exchange:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
douyin.errors.exchange |
Topic | 是 | DouYin 平台错误消息 |
jd.errors.exchange |
Topic | 是 | JD 平台错误消息 |
tmall.errors.exchange |
Topic | 是 | Tmall 平台错误消息 |
redbook.errors.exchange |
Topic | 是 | RedBook 平台错误消息 |
amazon.errors.exchange |
Topic | 是 | Amazon Japan 平台错误消息 |
naver.errors.exchange |
Topic | 是 | Naver 平台错误消息 |
rakuten.errors.exchange |
Topic | 是 | Rakuten 平台错误消息 |
shopify.errors.exchange |
Topic | 是 | Shopify 平台错误消息 |
Routing Key 规范:
validation- 数据格式验证错误processing_failed- 业务处理失败system- 系统错误
3. Queue 设计
业务队列(主队列)
物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | DLX | 用途 |
|---|---|---|---|---|---|
orders.queue |
是 | 否 | 24小时 | dlx.orders |
接收所有平台的订单数据,单消费者批处理 |
products.queue |
是 | 否 | 24小时 | dlx.products |
接收所有平台的产品数据,单消费者批处理 |
refunds.queue |
是 | 否 | 24小时 | dlx.refunds |
接收所有平台的退款数据,单消费者批处理 |
inventory.queue |
是 | 否 | 24小时 | dlx.inventory |
接收所有平台的库存数据,单消费者批处理 |
队列参数配置:
{
"x-message-ttl": 86400000,
"x-dead-letter-exchange": "dlx.orders",
"x-dead-letter-routing-key": "retry"
}
死信交换机(DLX)
每种数据类型配备一个 DLX,用于处理失败消息的路由:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
dlx.orders |
Topic | 是 | 订单失败消息路由 |
dlx.products |
Topic | 是 | 产品失败消息路由 |
dlx.refunds |
Topic | 是 | 退款失败消息路由 |
dlx.inventory |
Topic | 是 | 库存失败消息路由 |
延迟重试队列
为每种数据类型提供延迟重试机制:
| Queue 名称 | 持久化 | 消息 TTL | DLX | 用途 |
|---|---|---|---|---|
orders.retry.queue |
是 | 5秒 | main.exchange |
订单延迟重试(TTL后自动回到主队列) |
products.retry.queue |
是 | 5秒 | main.exchange |
产品延迟重试 |
refunds.retry.queue |
是 | 5秒 | main.exchange |
退款延迟重试 |
inventory.retry.queue |
是 | 5秒 | main.exchange |
库存延迟重试 |
重试队列参数配置:
{
"x-message-ttl": 5000,
"x-dead-letter-exchange": "main.exchange",
"x-dead-letter-routing-key": "order.{platform}"
}
重试机制说明:
- 消费失败时,消费者执行
nack(requeue=false),消息进入 DLX - DLX 根据消息 header 中的
x-retries判断路由:x-retries < 3:路由到retry.queue(延迟重试)x-retries >= 3:路由到errors.queue(永久失败)
- 重试队列 TTL 到期后,消息自动死信回主 Exchange,重新进入主队列
- 最大重试次数:3次,延迟时间:5秒
错误队列
统一的错误队列,接收所有超过重试次数的失败消息:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | 用途 |
|---|---|---|---|---|
errors.queue |
是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 |
4. Binding 配置
主 Exchange (用于接收各平台数据)
需要创建一个统一的主 Exchange,用于重试队列的消息回流:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
main.exchange |
Topic | 是 | 统一主交换机,用于路由和重试消息回流 |
业务队列绑定(主队列)
方案1:直接绑定(推荐) 每个平台的 Exchange 使用带平台标识的 routing key 绑定到对应数据类型队列:
douyin.exchange
├── [routing_key: order.douyin] → orders.queue
├── [routing_key: product.douyin] → products.queue
├── [routing_key: refund.douyin] → refunds.queue
└── [routing_key: inventory.douyin] → inventory.queue
tmall.exchange
├── [routing_key: order.tmall] → orders.queue
├── [routing_key: product.tmall] → products.queue
├── [routing_key: refund.tmall] → refunds.queue
└── [routing_key: inventory.tmall] → inventory.queue
... (其他平台类似)
方案2:通过主 Exchange(备选) 平台 Exchange 先发到主 Exchange,再由主 Exchange 统一路由:
douyin.exchange → [routing_key: order.douyin] → main.exchange
main.exchange
├── [routing_key: order.#] → orders.queue
├── [routing_key: product.#] → products.queue
├── [routing_key: refund.#] → refunds.queue
└── [routing_key: inventory.#] → inventory.queue
推荐使用方案1,简化架构,减少消息跳转。
DLX 绑定(重试路由)
每个 DLX 根据消息 header 中的 x-retries 决定路由目标:
dlx.orders
├── [routing_key: retry, x-retries < 3] → orders.retry.queue
└── [routing_key: error, x-retries >= 3] → errors.queue
dlx.products
├── [routing_key: retry] → products.retry.queue
└── [routing_key: error] → errors.queue
dlx.refunds
├── [routing_key: retry] → refunds.retry.queue
└── [routing_key: error] → errors.queue
dlx.inventory
├── [routing_key: retry] → inventory.retry.queue
└── [routing_key: error] → errors.queue
注意:RabbitMQ 原生不支持基于 header 的条件路由,需要:
- 方案A:消费者在
nack时根据重试次数决定发送到 DLX 的 routing key(retry或error) - 方案B:使用 RabbitMQ 插件(如
rabbitmq_message_routing_by_headers)实现条件路由
推荐方案A:消费者控制重试逻辑,更灵活可控。
重试队列绑定(回流主队列)
重试队列 TTL 到期后,消息自动死信回主 Exchange:
orders.retry.queue
└── [TTL=5s后 死信到 main.exchange,routing_key保持原始值:order.{platform}]
main.exchange
└── [routing_key: order.#] → orders.queue
这样实现了"失败 → 延迟5秒 → 重新进入主队列"的循环。
错误队列绑定
每个平台的错误 Exchange 和所有 DLX 都绑定到统一错误队列:
douyin.errors.exchange
└── [routing_key: #] → errors.queue
dlx.orders
└── [routing_key: error] → errors.queue
dlx.products
└── [routing_key: error] → errors.queue
... (其他类型类似)
说明:使用 # 通配符接收所有错误类型。
消费者设计
1. 消费者模型
采用 单消费者 + 批处理 + prefetch + 适配器模式:
| 数据类型 | 消费者 | Queue | 并发模式 | 处理模式 |
|---|---|---|---|---|
| 订单 | Order Consumer | orders.queue |
单消费者 | 批处理 + 适配器 |
| 产品 | Product Consumer | products.queue |
单消费者 | 批处理 + 适配器 |
| 退款 | Refund Consumer | refunds.queue |
单消费者 | 批处理 + 适配器 |
| 库存 | Inventory Consumer | inventory.queue |
单消费者 | 批处理 + 适配器 |
设计原则:
- ✅ 单消费者:每个队列只有一个消费者实例,保证严格 FIFO 顺序
- ✅ 批处理:积累一批消息后统一处理,提升数据库写入效率
- ✅ 单条 ACK:每条消息独立确认,失败不影响其他消息
- ✅ 适配器模式:消费者内部根据平台字段分发到不同业务逻辑
**消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **
2. Prefetch 配置
{
"prefetch_count": 100,
"batch_size": 50,
"batch_timeout": 3000
}
参数说明:
prefetch_count=100:RabbitMQ 最多推送 100 条未确认消息到消费者batch_size=50:消费者积累 50 条消息后触发批处理batch_timeout=3000:如果3秒内未达到 batch_size,也触发处理
工作流程:
- RabbitMQ 按 Push 模式推送最多 100 条消息到消费者
- 消费者缓存消息,达到 50 条或超时 3 秒后批量处理
- 批量处理时,仍然对每条消息单独 ACK/NACK
- 实现"伪批量拉取"效果,同时保留单条确认的灵活性
3. 消费者处理逻辑(伪代码)
class OrderConsumer:
def __init__(self):
self.prefetch_count = 100
self.batch_size = 50
self.batch_timeout = 3
self.message_buffer = []
self.adapters = {
'douyin': DouyinAdapter(),
'tmall': TmallAdapter(),
'jd': JDAdapter(),
# ... 其他平台适配器
}
def on_message(self, channel, method, properties, body):
"""接收到消息时的回调"""
message = json.loads(body)
self.message_buffer.append({
'channel': channel,
'method': method,
'properties': properties,
'body': message
})
# 达到批量大小,触发处理
if len(self.message_buffer) >= self.batch_size:
self.process_batch()
def process_batch(self):
"""批量处理消息"""
for msg_data in self.message_buffer:
try:
message = msg_data['body']
platform = message['platform']
# 使用适配器模式分发到对应平台逻辑
adapter = self.adapters.get(platform)
if not adapter:
raise ValueError(f"Unknown platform: {platform}")
# 处理业务逻辑
adapter.process(message)
# 成功:ACK
msg_data['channel'].basic_ack(msg_data['method'].delivery_tag)
except TemporaryError as e:
# 临时错误:重试
retry_count = msg_data['properties'].headers.get('x-retries', 0)
if retry_count < 3:
# 更新重试次数,发送到 DLX 进行延迟重试
headers = {'x-retries': retry_count + 1}
msg_data['channel'].basic_nack(
msg_data['method'].delivery_tag,
requeue=False
)
else:
# 超过重试次数,发送到错误队列
self.send_to_error_queue(msg_data)
msg_data['channel'].basic_nack(
msg_data['method'].delivery_tag,
requeue=False
)
except PermanentError as e:
# 永久错误:直接进入错误队列
self.send_to_error_queue(msg_data)
msg_data['channel'].basic_nack(
msg_data['method'].delivery_tag,
requeue=False
)
# 清空缓冲区
self.message_buffer.clear()
def send_to_error_queue(self, msg_data):
"""发送到错误队列"""
error_message = {
'original_message': msg_data['body'],
'error': str(e),
'timestamp': datetime.now().isoformat()
}
# 发送到错误 Exchange
self.channel.basic_publish(
exchange='douyin.errors.exchange',
routing_key='error',
body=json.dumps(error_message)
)
4. 重试机制工作流程
┌─────────────────┐
│ orders.queue │ ← 消费者从这里接收消息
└────────┬────────┘
│
├─ 成功 → basic_ack()
│
└─ 失败 → basic_nack(requeue=false)
│
▼
┌──────────────────┐
│ dlx.orders │ ← 死信交换机
└────────┬─────────┘
│
├─ x-retries < 3 → orders.retry.queue (TTL=5s)
│ │
│ └─ TTL到期 → 自动死信回 main.exchange
│ │
│ └─ order.# → orders.queue
│
└─ x-retries >= 3 → errors.queue (人工处理)
5. 为什么这样设计?
| 特性 | 原因 | 优势 |
|---|---|---|
| 单队列 FIFO | RabbitMQ 无法在队列内部按 topic 过滤 | 保证严格 FIFO,不拆分队列造成业务乱序 |
| 单消费者 | RabbitMQ 同队列多消费者必然 Round-Robin 分配 | 避免消息错投到错误的消费者 |
| Prefetch + 批处理 | RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 | 实现"伪批量拉取",提升吞吐 |
| 单条 ACK | 保留细粒度控制 | 失败消息不影响其他消息 |
| DLX + 延迟重试 | 避免失败消息阻塞队列 | 不会卡 FIFO,不会无限重试 |
| 适配器模式 | 多平台业务逻辑隔离 | 扩展性好,易维护 |
用户权限配置
1. 平台开发者账号
每个平台分配一个专用账号,只能访问自己的 Exchange。
DouYin 平台账号
用户名: user_douyin
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: (留空) - 不允许创建/删除资源
- Write: ^douyin\.(exchange|errors\.exchange)$ - 只能写入 DouYin 的业务和错误 Exchange
- Read: ^douyin\.errors\..*$ - 只能读取 DouYin 错误相关的队列
Tmall 平台账号
用户名: user_tmall
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: (留空)
- Write: ^tmall\.(exchange|errors\.exchange)$
- Read: ^tmall\.errors\..*$
其他平台账号
按相同模式创建:
- JD:
user_jd - RedBook:
user_redbook - Amazon Japan:
user_amazon - Naver:
user_naver - Rakuten:
user_rakuten - Shopify:
user_shopify
2. Dataflow 消费者账号
Dataflow 应用的后端服务账号,负责消费业务队列并处理错误。
用户名: user_dataflow_consumer
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: ^(orders|products|refunds|inventory)\.queue$ - 可以管理业务队列
- Write: .*\.errors\.exchange$ - 可以向所有错误 Exchange 写入消息
- Read: ^(orders|products|refunds|inventory)\.queue$ - 可以读取所有业务队列
3. 运维监控账号
运维团队账号,用于监控所有错误消息。
用户名: user_ops
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: ^errors\..*$ - 可以管理错误相关资源
- Write: (留空) - 不需要写入权限
- Read: ^errors\.queue$ - 可以读取统一错误队列
4. 管理员账号
RabbitMQ 超级管理员,用于配置和维护。
用户名: admin
密码: <强密码>
Tag: administrator
权限配置:
- 对所有 VHost 具有完全权限
- 可以访问管理界面
消息格式规范
1. message_id 设计
message_id 采用结构化格式,确保幂等性和可追溯性。
格式:
{prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id}
字段说明:
prefix:项目名称前缀(如wpic-project1)app_id:应用标识(固定为dataflow)company_id:公司 IDplatform_id:平台 IDstore_id:店铺 IDentity_type:数据实体类型(order/product/refund/inventory)entity_id:平台侧唯一标识(如订单号)
分隔符说明:
- 使用
#作为分隔符(RabbitMQ message_id 字段支持任意字符) #在 routing key 中是通配符,但在 message_id 中是普通字符
示例:
wpic-project1#dataflow#100#20#200#order#DY123456789
解析:
- 项目前缀: wpic-project1
- 应用: dataflow
- 公司ID: 100
- 平台ID: 20 (DouYin)
- 店铺ID: 200
- 实体类型: order
- 平台订单号: DY123456789
特性:
- ✅ 幂等性:同一实体的 message_id 完全一致
- ✅ 可读性:直接看出数据来源
- ✅ 便于调试:可从 message_id 提取元数据
- ✅ 无冲突:全局唯一
2. 业务消息格式
{
"message_id": "wpic-project1#dataflow#100#20#200#order#DY123456",
"timestamp": "2025-01-14T10:30:00Z",
"platform": "douyin",
"data_type": "order",
"metadata": {
"platform_id": 20,
"company_id": 100,
"store_id": 200,
"source_system": "douyin-open-api",
"retry_count": 0,
"data_version": 1736840000
},
"data": {
"platform_unique_id": "DY123456",
"raw_data": {
// 平台原始完整数据
}
}
}
字段说明:
| 字段 | 类型 | 必需 | 说明 |
|---|---|---|---|
message_id |
string | 是 | 结构化消息 ID,用于幂等性 |
timestamp |
string | 是 | 消息生成时间(ISO 8601 格式) |
platform |
string | 是 | 平台标识(douyin/tmall/jd...) |
data_type |
string | 是 | 数据类型(order/product/refund/inventory) |
metadata.platform_id |
int | 是 | 平台 ID(数据库主键) |
metadata.company_id |
int | 是 | 公司 ID |
metadata.store_id |
int | 是 | 店铺 ID |
metadata.source_system |
string | 是 | 数据来源系统 |
metadata.retry_count |
int | 是 | 重试次数(初始为 0) |
metadata.data_version |
int | 是 | 数据版本(Unix 时间戳,用于防止乱序更新) |
data.platform_unique_id |
string | 是 | 平台侧唯一标识 |
data.raw_data |
object | 是 | 平台原始完整 JSON 数据 |
重要说明:
- data_version:必须使用平台数据的更新时间戳(Unix 时间戳),用于解决消息乱序消费问题
- 示例:平台订单的最后更新时间
1736840000(2025-01-14 10:00:00 UTC) - 消费端只有当
data_version大于数据库中的值时才更新数据 - 如果平台未提供更新时间,使用消息生成时间戳
- 示例:平台订单的最后更新时间
3. 错误消息格式
错误消息由 dataflow 消费者生成,包含原始消息和错误详情。
{
"error_id": "err_1234567890abcdef",
"original_message": {
// 原始业务消息完整内容
},
"error": {
"type": "validation",
"message": "Missing required field: total_amount",
"trace": "Exception stack trace...",
"timestamp": "2025-01-14T10:31:00Z"
},
"metadata": {
"platform": "amazon",
"platform_id": 1,
"company_id": 100,
"store_id": 200,
"data_type": "order",
"failed_at": "2025-01-14T10:31:00Z"
}
}
RabbitMQ 管理界面操作指引
1. 访问管理界面
URL: http://<rabbitmq-host>:15672
用户名: admin
密码: <管理员密码>
2. 创建 VHost
- 点击顶部导航 Admin → Virtual Hosts
- 点击 Add a new virtual host
- 输入 VHost 名称:
dataflow-app - 点击 Add virtual host
3. 创建用户
- 点击顶部导航 Admin → Users
- 点击 Add a user
- 填写用户信息:
- Username:
user_amazon - Password:
<安全密码> - Tags: (留空,非管理员用户)
- Username:
- 点击 Add user
4. 配置用户权限
- 在用户列表中,点击用户名(如
user_amazon) - 滚动到 Permissions 部分
- 选择 VHost:
dataflow-app - 配置权限正则表达式:
- Configure regexp: (留空)
- Write regexp:
^amazon\.(exchange|errors\.exchange)$ - Read regexp:
^amazon\.errors\..*$
- 点击 Set permission
5. 创建 Exchange
- 点击顶部导航 Exchanges
- 确保选择正确的 VHost:
dataflow-app - 点击 Add a new exchange
- 填写配置:
- Name:
amazon.exchange - Type:
topic - Durability:
Durable - Auto delete:
No - Internal:
No
- Name:
- 点击 Add exchange
重复以上步骤创建所有业务 Exchange 和错误 Exchange。
6. 创建 Queue
- 点击顶部导航 Queues
- 确保选择正确的 VHost:
dataflow-app - 点击 Add a new queue
- 填写配置:
- Name:
orders.queue - Durability:
Durable - Auto delete:
No - Arguments:
x-message-ttl:86400000(24小时,单位:毫秒)
- Name:
- 点击 Add queue
重复以上步骤创建 products.queue、refunds.queue 和 errors.queue。
注意:errors.queue 的 TTL 设置为 604800000(7天)。
7. 创建 Binding
- 点击顶部导航 Exchanges
- 点击需要绑定的 Exchange(如
amazon.exchange) - 滚动到 Bindings 部分
- 在 Add binding from this exchange 区域填写:
- To queue:
orders.queue - Routing key:
order
- To queue:
- 点击 Bind
为每个 Exchange 创建 3 个绑定(order/product/refund)。
对于错误 Exchange,绑定配置:
- To queue:
errors.queue - Routing key:
#
配置脚本
为简化配置过程,可以使用以下脚本自动创建资源。
使用 rabbitmqadmin
RabbitMQ 提供命令行工具 rabbitmqadmin,可用于批量配置。
安装 rabbitmqadmin
# 从 RabbitMQ 管理界面下载
wget http://<rabbitmq-host>:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/
配置脚本示例
#!/bin/bash
# RabbitMQ 连接信息
RABBITMQ_HOST="localhost:15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin_password"
VHOST="dataflow-app"
# 平台列表
PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify")
DATA_TYPES=("orders" "products" "refunds" "inventory")
# 1. 创建 VHost
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare vhost name=$VHOST
# 2. 创建主 Exchange(用于重试消息回流)
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="main.exchange" vhost=$VHOST \
type=topic durable=true
# 3. 创建 DLX (死信交换机)
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="dlx.${dtype}" vhost=$VHOST \
type=topic durable=true
done
# 4. 创建主业务队列(带 DLX 配置)
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue name="${dtype}.queue" vhost=$VHOST durable=true \
arguments="{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
done
# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue name="${dtype}.retry.queue" vhost=$VHOST durable=true \
arguments="{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}"
done
# 6. 创建错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue name="errors.queue" vhost=$VHOST durable=true \
arguments='{"x-message-ttl":604800000}'
# 7. 绑定主 Exchange 到主队列(使用通配符)
for dtype in "${DATA_TYPES[@]}"; do
# 使用 order.#, product.# 等通配符匹配所有平台
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="main.exchange" destination="${dtype}.queue" \
vhost=$VHOST routing_key="${dtype%s}.#"
done
# 8. 绑定 DLX 到重试队列和错误队列
for dtype in "${DATA_TYPES[@]}"; do
# DLX -> 重试队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="dlx.${dtype}" destination="${dtype}.retry.queue" \
vhost=$VHOST routing_key="retry"
# DLX -> 错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="dlx.${dtype}" destination="errors.queue" \
vhost=$VHOST routing_key="error"
done
# 9. 为每个平台创建 Exchange 和 Binding
for platform in "${PLATFORMS[@]}"; do
# 创建平台业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="${platform}.exchange" vhost=$VHOST \
type=topic durable=true
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform)
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="orders.queue" \
vhost=$VHOST routing_key="order.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="products.queue" \
vhost=$VHOST routing_key="product.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="refunds.queue" \
vhost=$VHOST routing_key="refund.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="inventory.queue" \
vhost=$VHOST routing_key="inventory.${platform}"
# 创建平台错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="${platform}.errors.exchange" vhost=$VHOST \
type=topic durable=true
# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.errors.exchange" destination="errors.queue" \
vhost=$VHOST routing_key="#"
# 创建平台用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_${platform}" password="change_me_${platform}" tags=""
# 配置平台用户权限
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=$VHOST user="user_${platform}" \
configure="" write="^${platform}\.(exchange|errors\.exchange)$" \
read="^${platform}\.errors\..*$"
done
# 10. 创建 dataflow 消费者用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_dataflow_consumer" password="change_me_consumer" tags=""
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=$VHOST user="user_dataflow_consumer" \
configure="^(orders|products|refunds|inventory).*\.queue$" \
write="(dlx\..*)|(.*\.errors\.exchange)$" \
read="^(orders|products|refunds|inventory).*\.queue$"
# 11. 创建运维监控用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_ops" password="change_me_ops" tags=""
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=$VHOST user="user_ops" \
configure="^errors\..*$" write="" read="^errors\.queue$"
echo "RabbitMQ 配置完成!"
echo ""
echo "已创建:"
echo "- 1 个 VHost: dataflow-app"
echo "- 1 个主 Exchange: main.exchange"
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
echo "- 1 个错误队列: errors.queue"
echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)"
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
执行脚本
chmod +x setup_rabbitmq.sh
./setup_rabbitmq.sh
监控和维护
1. 监控指标
通过 RabbitMQ 管理界面查看以下关键指标:
队列健康状态
- 点击 Queues → 选择队列
- 关注以下指标:
- Ready: 待消费消息数(正常应 < 1000)
- Unacked: 未确认消息数(正常应较低)
- Total: 总消息数
- Incoming rate: 消息入队速率
- Deliver / Get rate: 消息消费速率
告警阈值:
- Ready > 10000:队列堆积严重
- Unacked > 5000:消费者处理缓慢
- Incoming rate >> Deliver rate:消费速度跟不上生产速度
Exchange 流量统计
- 点击 Exchanges → 选择 Exchange
- 查看 Message rates:
- Publish in: 消息发布速率
- Publish out: 消息路由到队列的速率
异常情况:
- Publish in > 0 但 Publish out = 0:可能绑定配置错误
2. 常见问题排查
消息未到达队列
检查步骤:
- 确认 Exchange 存在且类型正确
- 检查 Binding 配置,确保 routing key 匹配
- 查看 Exchange 的 Message rates,确认消息已发布
- 检查生产者是否使用了正确的 routing key
消费者无法消费
检查步骤:
- 确认用户权限配置正确(Read 权限)
- 检查队列中是否有消息(Ready > 0)
- 查看消费者连接状态(Connections 页面)
- 检查消费者代码是否正确 ACK 消息
错误消息未送达平台开发者
检查步骤:
- 确认错误 Exchange 绑定到
errors.queue - 检查平台开发者账号的 Read 权限
- 确认平台开发者订阅了正确的 Exchange
- 查看
errors.queue中是否有消息
3. 备份和恢复
导出配置
# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions)
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
export dataflow-backup.json
导入配置
# 从备份恢复配置
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
import dataflow-backup.json
4. 性能优化
消费者并发配置
- 增加消费者实例数量
- 每个消费者配置多个 channel(如 5-10 个)
- 使用 prefetch_count 限制未确认消息数量
队列性能优化
- 启用 lazy queue(大量消息堆积时):
rabbitmqadmin declare queue name=orders.queue vhost=dataflow-app \ durable=true arguments='{"x-queue-mode":"lazy"}' - 定期清理过期消息(通过 TTL 自动实现)
5. 日志查看
# 查看 RabbitMQ 日志
tail -f /var/log/rabbitmq/rabbit@<hostname>.log
# 查看连接日志
tail -f /var/log/rabbitmq/rabbit@<hostname>_connection.log
安全建议
1. 密码策略
- 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符)
- 定期更换密码(建议 3-6 个月)
- 不同环境(开发/测试/生产)使用不同密码
2. 网络安全
- 生产环境启用 TLS/SSL 加密连接
- 限制管理界面访问(仅允许内网或 VPN 访问)
- 配置防火墙规则:
- 5672:AMQP 端口(仅允许应用服务器访问)
- 15672:管理界面(仅允许管理员 IP 访问)
3. 权限最小化原则
- 平台开发者只能访问自己的 Exchange
- 消费者只能读取必要的队列
- 避免赋予不必要的 Configure 权限
4. 审计日志
- 启用 RabbitMQ 审计日志插件
- 定期审查用户操作记录
- 监控异常登录行为
新增平台接入流程
当需要接入新平台(例如 Lazada)时,按以下步骤操作:
1. 创建 Exchange
# 业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="lazada.exchange" vhost=dataflow-app \
type=topic durable=true
# 错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="lazada.errors.exchange" vhost=dataflow-app \
type=topic durable=true
2. 创建 Binding
# 绑定到业务队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="orders.queue" \
vhost=dataflow-app routing_key="order"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="products.queue" \
vhost=dataflow-app routing_key="product"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="refunds.queue" \
vhost=dataflow-app routing_key="refund"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="inventory.queue" \
vhost=dataflow-app routing_key="inventory"
# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.errors.exchange" \
destination="errors.queue" vhost=dataflow-app routing_key="#"
3. 创建用户并配置权限
# 创建用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_lazada" password="<secure_password>" tags=""
# 配置权限
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=dataflow-app user="user_lazada" \
configure="" write="^lazada\.(exchange|errors\.exchange)$" \
read="^lazada\.errors\..*$"
4. 提供给平台开发者
将以下信息提供给新平台的开发团队:
RabbitMQ 连接信息:
- Host: <rabbitmq-host>
- Port: 5672
- VHost: dataflow-app
- Username: user_lazada
- Password: <secure_password>
发布配置:
- Exchange: lazada.exchange
- Routing Keys:
- order: 发布订单数据
- product: 发布产品数据
- refund: 发布退款数据
- inventory: 发布库存数据
错误订阅(可选):
- Exchange: lazada.errors.exchange
- 接收本平台的错误消息
消息格式规范:
- 参见「消息格式规范」章节
附录
A. 完整资源清单
VHost
- 总数: 1 个
dataflow-app
Exchanges
主 Exchange: 1 个
main.exchange(Topic) - 统一主交换机,用于路由和重试消息回流
平台业务 Exchanges: 8 个(每平台 1 个)
douyin.exchangejd.exchangetmall.exchangeredbook.exchangeamazon.exchangenaver.exchangerakuten.exchangeshopify.exchange
平台错误 Exchanges: 8 个(每平台 1 个)
douyin.errors.exchangejd.errors.exchangetmall.errors.exchangeredbook.errors.exchangeamazon.errors.exchangenaver.errors.exchangerakuten.errors.exchangeshopify.errors.exchange
死信交换机 (DLX): 4 个(每数据类型 1 个)
dlx.ordersdlx.productsdlx.refundsdlx.inventory
Exchange 总数: 21 个 (1 主 + 8 平台业务 + 8 平台错误 + 4 DLX)
Queues
主业务队列: 4 个
orders.queue(配置 DLX: dlx.orders)products.queue(配置 DLX: dlx.products)refunds.queue(配置 DLX: dlx.refunds)inventory.queue(配置 DLX: dlx.inventory)
延迟重试队列: 4 个
orders.retry.queue(TTL: 5秒, DLX: main.exchange)products.retry.queue(TTL: 5秒, DLX: main.exchange)refunds.retry.queue(TTL: 5秒, DLX: main.exchange)inventory.retry.queue(TTL: 5秒, DLX: main.exchange)
错误队列: 1 个
errors.queue(TTL: 7天)
Queue 总数: 9 个 (4 主队列 + 4 重试队列 + 1 错误队列)
Bindings
主 Exchange 绑定: 4 个
main.exchange→orders.queue(routing_key:order.#)main.exchange→products.queue(routing_key:product.#)main.exchange→refunds.queue(routing_key:refund.#)main.exchange→inventory.queue(routing_key:inventory.#)
平台业务 Exchange 绑定: 32 个
- 每个平台 Exchange → 4 个业务队列 (8 × 4 = 32)
- 使用 routing key 格式:
{data_type}.{platform}- 例如:
order.douyin,product.tmall,refund.jd等
- 例如:
平台错误 Exchange 绑定: 8 个
- 每个平台错误 Exchange →
errors.queue(routing_key:#)
DLX 绑定: 8 个
- 每个 DLX → 对应重试队列 (routing_key:
retry) - 4 个 - 每个 DLX →
errors.queue(routing_key:error) - 4 个
Binding 总数: 52 个 (4 主 + 32 平台业务 + 8 平台错误 + 8 DLX)
Users
平台开发者账号: 8 个
user_douyin,user_jd,user_tmall,user_redbook,user_amazon,user_naver,user_rakuten,user_shopify- 权限:只能写入自己的业务和错误 Exchange,只能读取自己的错误队列
消费者账号: 1 个
user_dataflow_consumer- 权限:可以管理所有业务队列和重试队列,可以写入 DLX 和错误 Exchange,可以读取所有业务队列
运维监控账号: 1 个
user_ops- 权限:可以管理错误相关资源,可以读取错误队列
管理员账号: 1 个
admin- 权限:完全权限,可以访问管理界面
User 总数: 11 个 (8 平台 + 1 消费者 + 1 运维 + 1 管理员)
架构统计
| 资源类型 | 数量 | 说明 |
|---|---|---|
| VHost | 1 | dataflow-app |
| Exchange | 21 | 1 主 + 8 平台业务 + 8 平台错误 + 4 DLX |
| Queue | 9 | 4 主 + 4 重试 + 1 错误 |
| Binding | 52 | 完整消息流转路径 |
| User | 11 | 多角色权限隔离 |
消息流转路径:
生产者 → 平台 Exchange → 主队列 → 消费者
↓ (失败)
DLX → 重试队列 (5秒TTL) → 主 Exchange → 主队列
↓ (超过3次)
DLX → 错误队列 (人工处理)