55 KiB
RabbitMQ 配置方案
概述
本文档描述 dataflow 应用的 RabbitMQ 消息队列架构设计和配置方法。
业务背景
- 应用角色:dataflow 是消息消费者,负责接收来自多个电商平台的数据并入库
- 数据来源:多个电商平台(DouYin、JD、Tmall、RedBook、Amazon Japan、Naver、Rakuten、Shopify 等)
- 数据类型:订单(orders)、产品(products)、退款(refunds)、库存(inventory)
- 隔离需求:不同平台开发者之间的工作空间需要相互隔离,互不影响
架构特点
- VHost 隔离:按应用划分 VHost,支持多业务复用, 使用
ClassicVhost 模式 - Exchange 隔离:每个平台独立 Exchange,通过权限控制访问
- 单队列设计:每种数据类型一个队列,所有平台共享(orders.queue/products.queue/refunds.queue/inventory.queue),保证严格 FIFO 顺序
- 独立消费者:每种数据类型配备独立消费者,采用批处理 + prefetch + 适配器模式,提升吞吐与稳定性
- 智能重试:DLX(死信交换机)+ 延迟重试队列 + 错误队列,优雅处理失败消息,不阻塞主队列
- 消息持久化:RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API
设计方案总结
核心设计理念
本方案采用 Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。
关键设计决策
** 注意 **
- 消息内部字段无严格要求,但可以在业务侧进行约束
- 默认单条消息的最大 Payload 尺寸为 128MB
- 消息队列为缓存系统,应该尽可能的将批量数据放入 payload\
- MQ 存在的意义在于快速接受远程生产的 Message
1. 为什么使用单队列?
| 问题 | 传统多队列方案 | 本方案(单队列) |
|---|---|---|
| 队列数量 | 8平台 × 4数据类型 = 32个队列 | 4个数据类型队列(队列数量减少 87.5%) |
| FIFO 保证 | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 |
| 运维复杂度 | 需要监控 32 个队列 | 只需监控 4 个主队列 |
| 消费者部署 | 可能需要 32 个消费者实例 | 只需 4 个消费者实例 |
原因:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。
2. 为什么使用单消费者?
原因:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。
优势:
- ✅ 保证严格 FIFO 顺序
- ✅ 避免消息错投到错误的消费者
- ✅ 简化部署和运维
如何实现多平台支持? 通过消费者内部的适配器模式,根据消息的 platform 字段分发到不同业务逻辑。
3. 为什么使用 Prefetch + 批处理?
原因:RabbitMQ 是 Push 模式,不提供 Kafka 式的批量拉取 API。
解决方案:
- 设置
prefetch_count=100:RabbitMQ 最多推送 100 条未确认消息 - 消费者自行缓存消息,达到
batch_size=50或timeout=3s后批量处理 - 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性
优势:
- ✅ 提升数据库写入效率(批量插入)
- ✅ 控制并发和内存压力
- ✅ 失败消息不影响其他消息(单条 ACK)
4. 为什么使用 DLX + 延迟重试队列?
问题:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。
传统方案的问题:
- ❌
nack(requeue=true):消息回队列头部,无限循环,永远不会进入 DLX - ❌ 直接丢弃:数据丢失
- ❌ 立即重试:打爆 API/DB
本方案(DLX + 延迟重试):
消费失败 → nack(requeue=false) → 进入 DLX
↓
Consumer检查 x-death count
↓
count < 3 → DLX路由到重试队列 (TTL=5s) → 自动死信回主队列
↓
count >= 3 → Consumer主动发送到错误队列 (人工处理)
重要说明:
- ⚠️ 必须设置
requeue=false:只有requeue=false才会触发 DLX 机制 - ⚠️
requeue=true会导致无限循环:消息直接回到原队列头部,永远不进入 DLX - ✅ retry 队列自动回流:TTL 到期后自动死信回主 Exchange,不需要额外消费者
- ✅ Consumer 控制重试次数:检查
x-deathheader 的 count 字段判断重试次数
优势:
- ✅ 不会卡 FIFO
- ✅ 不会无限重试(通过 max retries 限制)
- ✅ 延迟重试避免打爆 API/DB
- ✅ 错误消息自动隔离到 error 队列
- ✅ 无需额外 Redis 或手写 retry 逻辑
- ✅ retry 过程完全自动化(依赖 RabbitMQ 的 DLX + TTL 机制)
方案优势总结
| 维度 | 优势 |
|---|---|
| 可靠性 | 消息持久化 + 单条 ACK + 智能重试,避免数据丢失 |
| 性能 | Prefetch + 批处理 + 数据库批量写入,提升吞吐 |
| 顺序性 | 单队列 FIFO 保证严格顺序 |
| 扩展性 | 适配器模式,新增平台只需添加适配器类 |
| 运维 | 队列数量少,监控简单,故障排查容易 |
| 容错 | DLX 智能重试,失败消息不阻塞主队列 |
适用场景
✅ 适合:
- 多平台数据同步(电商、物流、金融等)
- 需要严格 FIFO 顺序
- 需要高可靠性和高吞吐
- 平台数量多(> 5个),数据类型固定(< 10个)
❌ 不适合:
- 实时性要求极高(< 100ms)
- 需要按平台独立控制消费速率
- 平台数量少(< 3个),可以使用独立队列
Requeue 机制详解
requeue 参数的作用
在 RabbitMQ 中,当消费者处理消息失败时,可以通过 basic_nack() 或 basic_reject() 拒绝消息,第三个参数 requeue 决定了消息的去向:
// requeue=true:消息重新回到原队列(队列头部)
$channel->basic_nack($deliveryTag, false, true);
// requeue=false:消息进入 DLX(如果配置了),否则丢弃
$channel->basic_nack($deliveryTag, false, false);
requeue=true 的问题
| 行为 | 后果 |
|---|---|
| 消息回到原队列头部 | 立即被同一个消费者再次取出 |
| 不触发 DLX | 永远不会进入重试队列或错误队列 |
| 无延迟 | 瞬间形成高速循环(毫秒级) |
| 无计数 | 无法统计重试次数,无法设置上限 |
结果:消息在 消费 → 失败 → NACK → 回队列 → 消费 之间无限循环,CPU 和网络资源被浪费,队列状态显示为空(消息循环太快无法观察)。
requeue=false 的正确用法
orders.queue (配置了 x-dead-letter-exchange)
↓ Consumer 消费失败
↓ basic_nack(requeue=false)
↓ 触发 DLX 机制
↓
dlx.orders (死信交换机)
↓ routing_key="retry"
↓
orders.retry.queue (TTL=5s)
↓ 5秒后 TTL 到期
↓ 自动死信回 main.exchange
↓
main.exchange
↓ routing_key="order.#"
↓
orders.queue (重新进入主队列,x-death count+1)
环境变量配置
在 .env 文件中可以配置以下参数:
# 最大重试次数(默认3次)
# 消息重试超过此次数后,会被发送到 errors.queue
AMQP_MAX_RETRIES=3
# 调试延迟(秒,默认0)
# 设置为 2 可以让每条消息处理延迟2秒,方便在 mq:status 中观察队列状态
# 生产环境应设置为 0
AMQP_CONSUMER_DEBUG_DELAY=0
使用示例:
# 开发环境:观察消息流转
AMQP_CONSUMER_DEBUG_DELAY=2 php bin/hyperf.php start
# 生产环境:正常运行
AMQP_CONSUMER_DEBUG_DELAY=0 php bin/hyperf.php start
架构设计
1. VHost 设计
VHost 按业务应用进行划分,便于资源隔离和权限管理。
RabbitMQ 实例
├── dataflow-app # Dataflow 应用专用
├── analytics-app # 分析应用(预留)
└── warehouse-app # 数据仓库应用(预留)
当前需要创建的 VHost:
dataflow-app:用于电商数据流转
2. Exchange 设计
在 dataflow-app VHost 中,Exchange 分为两类:
业务 Exchange(用于数据投递)
每个平台一个独立的 Topic Exchange:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
douyin.exchange |
Topic | 是 | DouYin(抖音)平台数据入口 |
jd.exchange |
Topic | 是 | JD(京东)平台数据入口 |
tmall.exchange |
Topic | 是 | Tmall(天猫)平台数据入口 |
redbook.exchange |
Topic | 是 | RedBook(小红书)平台数据入口 |
amazon.exchange |
Topic | 是 | Amazon Japan 平台数据入口 |
naver.exchange |
Topic | 是 | Naver 平台数据入口 |
rakuten.exchange |
Topic | 是 | Rakuten(乐天)平台数据入口 |
shopify.exchange |
Topic | 是 | Shopify 平台数据入口 |
Routing Key 规范(采用 {data_type}.{platform} 格式):
| 平台 | 订单 | 产品 | 退款 | 库存 |
|---|---|---|---|---|
| DouYin | order.douyin |
product.douyin |
refund.douyin |
inventory.douyin |
| JD | order.jd |
product.jd |
refund.jd |
inventory.jd |
| Tmall | order.tmall |
product.tmall |
refund.tmall |
inventory.tmall |
| RedBook | order.redbook |
product.redbook |
refund.redbook |
inventory.redbook |
| Amazon | order.amazon |
product.amazon |
refund.amazon |
inventory.amazon |
| Naver | order.naver |
product.naver |
refund.naver |
inventory.naver |
| Rakuten | order.rakuten |
product.rakuten |
refund.rakuten |
inventory.rakuten |
| Shopify | order.shopify |
product.shopify |
refund.shopify |
inventory.shopify |
设计说明:
- 使用 Topic Exchange 的通配符特性,队列可通过
order.#匹配所有平台的订单 - 保证单队列内所有平台消息的 FIFO 顺序
- 消费者通过解析 routing key 或消息体的 platform 字段进行业务分发
错误 Exchange(用于错误通知)
每个平台一个独立的错误 Topic Exchange:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
douyin.errors.exchange |
Topic | 是 | DouYin 平台错误消息 |
jd.errors.exchange |
Topic | 是 | JD 平台错误消息 |
tmall.errors.exchange |
Topic | 是 | Tmall 平台错误消息 |
redbook.errors.exchange |
Topic | 是 | RedBook 平台错误消息 |
amazon.errors.exchange |
Topic | 是 | Amazon Japan 平台错误消息 |
naver.errors.exchange |
Topic | 是 | Naver 平台错误消息 |
rakuten.errors.exchange |
Topic | 是 | Rakuten 平台错误消息 |
shopify.errors.exchange |
Topic | 是 | Shopify 平台错误消息 |
Routing Key 规范:
validation- 数据格式验证错误processing_failed- 业务处理失败system- 系统错误
3. Queue 设计
业务队列(主队列)
物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | DLX | 用途 |
|---|---|---|---|---|---|
orders.queue |
是 | 否 | 24小时 | dlx.orders |
接收所有平台的订单数据,单消费者批处理 |
products.queue |
是 | 否 | 24小时 | dlx.products |
接收所有平台的产品数据,单消费者批处理 |
refunds.queue |
是 | 否 | 24小时 | dlx.refunds |
接收所有平台的退款数据,单消费者批处理 |
inventory.queue |
是 | 否 | 24小时 | dlx.inventory |
接收所有平台的库存数据,单消费者批处理 |
队列参数配置:
{
"x-message-ttl": 86400000,
"x-dead-letter-exchange": "dlx.orders",
"x-dead-letter-routing-key": "retry"
}
死信交换机(DLX)
每种数据类型配备一个 DLX,用于处理失败消息的路由:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
dlx.orders |
Topic | 是 | 订单失败消息路由 |
dlx.products |
Topic | 是 | 产品失败消息路由 |
dlx.refunds |
Topic | 是 | 退款失败消息路由 |
dlx.inventory |
Topic | 是 | 库存失败消息路由 |
延迟重试队列
为每种数据类型提供延迟重试机制:
| Queue 名称 | 持久化 | 消息 TTL | DLX | 用途 |
|---|---|---|---|---|
orders.retry.queue |
是 | 5秒 | main.exchange |
订单延迟重试(TTL后自动回到主队列) |
products.retry.queue |
是 | 5秒 | main.exchange |
产品延迟重试 |
refunds.retry.queue |
是 | 5秒 | main.exchange |
退款延迟重试 |
inventory.retry.queue |
是 | 5秒 | main.exchange |
库存延迟重试 |
重试队列参数配置:
{
"x-message-ttl": 5000,
"x-dead-letter-exchange": "main.exchange",
"x-dead-letter-routing-key": "order.{platform}"
}
重试机制说明(方案B实现):
- 消费者处理失败时,检查消息的
x-deathheader 中的 count 字段获取重试次数 - 根据重试次数决定处理方式:
- count < 3:执行
nack(requeue=false),消息进入 DLX → 路由到retry.queue(延迟重试) - count >= 3:消费者主动发送到
errors.queue,然后返回 ACK(避免再次重试)
- count < 3:执行
- 重试队列 TTL(5秒)到期后,消息自动死信回主 Exchange,重新进入主队列
- 配置参数:
- 最大重试次数:3次(可通过
AMQP_MAX_RETRIES环境变量配置) - 延迟时间:5秒(retry 队列的
x-message-ttl) - 调试延迟:可通过
AMQP_CONSUMER_DEBUG_DELAY设置处理延迟,方便观察队列状态
- 最大重试次数:3次(可通过
错误队列
统一的错误队列,接收所有超过重试次数的失败消息:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | 用途 |
|---|---|---|---|---|
errors.queue |
是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 |
错误队列的作用和机制:
- ✅ 收集永久失败的消息:当消息重试次数达到上限(默认3次)后,消费者会主动将消息发送到此队列
- ✅ 避免消息丢失:即使处理失败多次,消息也会被保存在错误队列中,不会丢失
- ✅ 人工排查和修复:运维人员可以从错误队列中查看失败原因,修复问题后重新投递
- ✅ 统一管理:所有数据类型(orders/products/refunds/inventory)的错误消息都集中在一个队列
- ⚠️ 不会自动重试:错误队列中的消息需要人工介入,不会自动回流到主队列
错误消息格式:
{
"error_id": "err_675d8f3a2b1c4",
"original_message": { /* 原始消息内容 */ },
"error": {
"type": "InvalidArgumentException",
"message": "Missing required field: total_amount",
"trace": "Exception stack trace...",
"timestamp": "2025-01-15T10:31:00+00:00"
},
"metadata": {
"platform": "tmall",
"platform_id": 2,
"company_id": 188,
"store_id": 292,
"data_type": "order",
"failed_at": "2025-01-15T10:31:00+00:00",
"retry_count": 3
}
}
4. Binding 配置
主 Exchange (用于接收各平台数据)
需要创建一个统一的主 Exchange,用于重试队列的消息回流:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
main.exchange |
Topic | 是 | 统一主交换机,用于路由和重试消息回流 |
业务队列绑定(主队列)
方案1:直接绑定(推荐) 每个平台的 Exchange 使用带平台标识的 routing key 绑定到对应数据类型队列:
douyin.exchange
├── [routing_key: order.douyin] → orders.queue
├── [routing_key: product.douyin] → products.queue
├── [routing_key: refund.douyin] → refunds.queue
└── [routing_key: inventory.douyin] → inventory.queue
tmall.exchange
├── [routing_key: order.tmall] → orders.queue
├── [routing_key: product.tmall] → products.queue
├── [routing_key: refund.tmall] → refunds.queue
└── [routing_key: inventory.tmall] → inventory.queue
... (其他平台类似)
方案2:通过主 Exchange(备选) 平台 Exchange 先发到主 Exchange,再由主 Exchange 统一路由:
douyin.exchange → [routing_key: order.douyin] → main.exchange
main.exchange
├── [routing_key: order.#] → orders.queue
├── [routing_key: product.#] → products.queue
├── [routing_key: refund.#] → refunds.queue
└── [routing_key: inventory.#] → inventory.queue
推荐使用方案1,简化架构,减少消息跳转。
DLX 绑定(重试路由)
每个 DLX 根据消息 header 中的 x-retries 决定路由目标:
dlx.orders
├── [routing_key: retry, x-retries < 3] → orders.retry.queue
└── [routing_key: error, x-retries >= 3] → errors.queue
dlx.products
├── [routing_key: retry] → products.retry.queue
└── [routing_key: error] → errors.queue
dlx.refunds
├── [routing_key: retry] → refunds.retry.queue
└── [routing_key: error] → errors.queue
dlx.inventory
├── [routing_key: retry] → inventory.retry.queue
└── [routing_key: error] → errors.queue
注意:RabbitMQ 原生不支持基于 header 的条件路由,需要:
- 方案A:消费者在
nack时根据重试次数决定发送到 DLX 的 routing key(retry或error) - 方案B:使用 RabbitMQ 插件(如
rabbitmq_message_routing_by_headers)实现条件路由
推荐方案A:消费者控制重试逻辑,更灵活可控。
重试队列绑定(回流主队列)
重试队列 TTL 到期后,消息自动死信回主 Exchange:
orders.retry.queue
└── [TTL=5s后 死信到 main.exchange,routing_key保持原始值:order.{platform}]
main.exchange
└── [routing_key: order.#] → orders.queue
这样实现了"失败 → 延迟5秒 → 重新进入主队列"的循环。
错误队列绑定
每个平台的错误 Exchange 和所有 DLX 都绑定到统一错误队列:
douyin.errors.exchange
└── [routing_key: #] → errors.queue
dlx.orders
└── [routing_key: error] → errors.queue
dlx.products
└── [routing_key: error] → errors.queue
... (其他类型类似)
说明:使用 # 通配符接收所有错误类型。
消费者设计
1. 消费者模型
采用 单消费者 + 批处理 + prefetch + 适配器模式:
| 数据类型 | 消费者 | Queue | 并发模式 | 处理模式 |
|---|---|---|---|---|
| 订单 | Order Consumer | orders.queue |
单消费者 | 批处理 + 适配器 |
| 产品 | Product Consumer | products.queue |
单消费者 | 批处理 + 适配器 |
| 退款 | Refund Consumer | refunds.queue |
单消费者 | 批处理 + 适配器 |
| 库存 | Inventory Consumer | inventory.queue |
单消费者 | 批处理 + 适配器 |
设计原则:
- ✅ 单消费者:每个队列只有一个消费者实例,保证严格 FIFO 顺序
- ✅ 批处理:积累一批消息后统一处理,提升数据库写入效率
- ✅ 单条 ACK:每条消息独立确认,失败不影响其他消息
- ✅ 适配器模式:消费者内部根据平台字段分发到不同业务逻辑
**消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **
2. Prefetch 配置
{
"prefetch_count": 100,
"batch_size": 50,
"batch_timeout": 3000
}
参数说明:
prefetch_count=100:RabbitMQ 最多推送 100 条未确认消息到消费者batch_size=50:消费者积累 50 条消息后触发批处理batch_timeout=3000:如果3秒内未达到 batch_size,也触发处理
工作流程:
- RabbitMQ 按 Push 模式推送最多 100 条消息到消费者
- 消费者缓存消息,达到 50 条或超时 3 秒后批量处理
- 批量处理时,仍然对每条消息单独 ACK/NACK
- 实现"伪批量拉取"效果,同时保留单条确认的灵活性
3. 消费者处理逻辑(伪代码)
class OrderConsumer:
def __init__(self):
self.prefetch_count = 100
self.batch_size = 50
self.batch_timeout = 3
self.message_buffer = []
self.adapters = {
'douyin': DouyinAdapter(),
'tmall': TmallAdapter(),
'jd': JDAdapter(),
# ... 其他平台适配器
}
def on_message(self, channel, method, properties, body):
"""接收到消息时的回调"""
message = json.loads(body)
self.message_buffer.append({
'channel': channel,
'method': method,
'properties': properties,
'body': message
})
# 达到批量大小,触发处理
if len(self.message_buffer) >= self.batch_size:
self.process_batch()
def process_batch(self):
"""批量处理消息"""
for msg_data in self.message_buffer:
try:
message = msg_data['body']
platform = message['platform']
# 使用适配器模式分发到对应平台逻辑
adapter = self.adapters.get(platform)
if not adapter:
raise ValueError(f"Unknown platform: {platform}")
# 处理业务逻辑
adapter.process(message)
# 成功:ACK
msg_data['channel'].basic_ack(msg_data['method'].delivery_tag)
except TemporaryError as e:
# 临时错误:重试
retry_count = msg_data['properties'].headers.get('x-retries', 0)
if retry_count < 3:
# 更新重试次数,发送到 DLX 进行延迟重试
headers = {'x-retries': retry_count + 1}
msg_data['channel'].basic_nack(
msg_data['method'].delivery_tag,
requeue=False
)
else:
# 超过重试次数,发送到错误队列
self.send_to_error_queue(msg_data)
msg_data['channel'].basic_nack(
msg_data['method'].delivery_tag,
requeue=False
)
except PermanentError as e:
# 永久错误:直接进入错误队列
self.send_to_error_queue(msg_data)
msg_data['channel'].basic_nack(
msg_data['method'].delivery_tag,
requeue=False
)
# 清空缓冲区
self.message_buffer.clear()
def send_to_error_queue(self, msg_data):
"""发送到错误队列"""
error_message = {
'original_message': msg_data['body'],
'error': str(e),
'timestamp': datetime.now().isoformat()
}
# 发送到错误 Exchange
self.channel.basic_publish(
exchange='douyin.errors.exchange',
routing_key='error',
body=json.dumps(error_message)
)
4. 重试机制工作流程(方案B实现)
┌─────────────────┐
│ orders.queue │ ← 消费者从这里接收消息
└────────┬────────┘
│
├─ 成功 → basic_ack()
│
└─ 失败 → Consumer检查 x-death count
│
├─ count < 3 (未超过重试次数)
│ │
│ └→ basic_nack(requeue=false)
│ │
│ ▼
│ ┌──────────────────┐
│ │ dlx.orders │ ← 死信交换机
│ └────────┬─────────┘
│ │
│ └→ routing_key="retry"
│ │
│ ▼
│ orders.retry.queue (TTL=5s)
│ │
│ └─ TTL到期 → 自动死信回 main.exchange
│ │
│ └─ order.# → orders.queue
│ (x-death count自动+1)
│
└─ count >= 3 (超过重试次数)
│
└→ Consumer主动发送到 errors.queue
│
└→ basic_ack() (避免再次重试)
│
▼
errors.queue (人工处理)
关键点:
- ✅ requeue=false:让失败消息进入 DLX,而不是回到原队列
- ✅ retry 队列自动回流:TTL 到期后自动死信回主队列,不需要额外消费者
- ✅ x-death count 自动累加:每次死信 RabbitMQ 会自动增加 count
- ✅ Consumer 控制路由:超过重试次数时,Consumer 主动发送到 error 队列并 ACK
- ⚠️ 不能使用动态 routing key:DLX 的 routing key 在队列配置中是固定的,无法基于 count 动态改变
5. 为什么这样设计?
| 特性 | 原因 | 优势 |
|---|---|---|
| 单队列 FIFO | RabbitMQ 无法在队列内部按 topic 过滤 | 保证严格 FIFO,不拆分队列造成业务乱序 |
| 单消费者 | RabbitMQ 同队列多消费者必然 Round-Robin 分配 | 避免消息错投到错误的消费者 |
| Prefetch + 批处理 | RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 | 实现"伪批量拉取",提升吞吐 |
| 单条 ACK | 保留细粒度控制 | 失败消息不影响其他消息 |
| DLX + 延迟重试 | 避免失败消息阻塞队列 | 不会卡 FIFO,不会无限重试 |
| 适配器模式 | 多平台业务逻辑隔离 | 扩展性好,易维护 |
用户权限配置
1. 平台开发者账号
每个平台分配一个专用账号,只能访问自己的 Exchange。
DouYin 平台账号
用户名: user_douyin
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: (留空) - 不允许创建/删除资源
- Write: ^douyin\.(exchange|errors\.exchange)$ - 只能写入 DouYin 的业务和错误 Exchange
- Read: ^douyin\.errors\..*$ - 只能读取 DouYin 错误相关的队列
Tmall 平台账号
用户名: user_tmall
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: (留空)
- Write: ^tmall\.(exchange|errors\.exchange)$
- Read: ^tmall\.errors\..*$
其他平台账号
按相同模式创建:
- JD:
user_jd - RedBook:
user_redbook - Amazon Japan:
user_amazon - Naver:
user_naver - Rakuten:
user_rakuten - Shopify:
user_shopify
2. Dataflow 消费者账号
Dataflow 应用的后端服务账号,负责消费业务队列并处理错误。
用户名: user_dataflow_consumer
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: ^(orders|products|refunds|inventory)\.queue$ - 可以管理业务队列
- Write: .*\.errors\.exchange$ - 可以向所有错误 Exchange 写入消息
- Read: ^(orders|products|refunds|inventory)\.queue$ - 可以读取所有业务队列
3. 运维监控账号
运维团队账号,用于监控所有错误消息。
用户名: user_ops
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: ^errors\..*$ - 可以管理错误相关资源
- Write: (留空) - 不需要写入权限
- Read: ^errors\.queue$ - 可以读取统一错误队列
4. 管理员账号
RabbitMQ 超级管理员,用于配置和维护。
用户名: admin
密码: <强密码>
Tag: administrator
权限配置:
- 对所有 VHost 具有完全权限
- 可以访问管理界面
消息格式规范
1. message_id 设计
message_id 采用结构化格式,确保幂等性和可追溯性。
格式:
{company_id}#{platform_id}#{store_id}#{entity_type}#{request_time_range/formated_suffix}
字段说明:
prefix:项目名称前缀(如wpic-project1)app_id:应用标识(固定为dataflow)company_id:公司 IDplatform_id:平台 IDstore_id:店铺 IDentity_type:数据实体类型(order/product/refund/inventory)request_time_range/formated_suffix:数据请求的时间区间/格式化的后缀,业务侧决定即可,需要业务侧维护幂等性
分隔符说明:
- 使用
#作为分隔符(RabbitMQ message_id 字段支持任意字符) #在 routing key 中是通配符,但在 message_id 中是普通字符
示例:
wpic-project1#dataflow#100#20#200#order#2025-11-10~2025-11-15
解析:
- 项目前缀: wpic-project1
- 应用: dataflow
- 公司ID: 100
- 平台ID: 20 (DouYin)
- 店铺ID: 200
- 实体类型: order
- 平台订单号: DY123456789
特性:
- ✅ 幂等性:同一实体的 message_id 完全一致
- ✅ 可读性:直接看出数据来源
- ✅ 便于调试:可从 message_id 提取元数据
- ✅ 无冲突:全局唯一
2. 业务消息格式
{
"message_id": "wpic-project1#dataflow#100#20#200#order#DY123456",
"timestamp": "2025-01-14T10:30:00Z",
"platform": "douyin",
"data_type": "order",
"metadata": {
"platform_id": 20,
"company_id": 100,
"store_id": 200,
"source_system": "douyin-open-api",
"retry_count": 0,
"data_version": 1736840000
},
"data": {
"platform_unique_id": "DY123456",
"raw_data": {
// 平台原始完整数据
}
}
}
字段说明:
| 字段 | 类型 | 必需 | 说明 |
|---|---|---|---|
message_id |
string | 是 | 结构化消息 ID,用于幂等性 |
timestamp |
string | 是 | 消息生成时间(ISO 8601 格式) |
platform |
string | 是 | 平台标识(douyin/tmall/jd...) |
data_type |
string | 是 | 数据类型(order/product/refund/inventory) |
metadata.platform_id |
int | 是 | 平台 ID(数据库主键) |
metadata.company_id |
int | 是 | 公司 ID |
metadata.store_id |
int | 是 | 店铺 ID |
metadata.source_system |
string | 是 | 数据来源系统 |
metadata.retry_count |
int | 是 | 重试次数(初始为 0) |
metadata.data_version |
int | 是 | 数据版本(Unix 时间戳,用于防止乱序更新) |
data.platform_unique_id |
string | 是 | 平台侧唯一标识 |
data.raw_data |
object | 是 | 平台原始完整 JSON 数据 |
重要说明:
- data_version:必须使用平台数据的更新时间戳(Unix 时间戳),用于解决消息乱序消费问题
- 示例:平台订单的最后更新时间
1736840000(2025-01-14 10:00:00 UTC) - 消费端只有当
data_version大于数据库中的值时才更新数据 - 如果平台未提供更新时间,使用消息生成时间戳
- 示例:平台订单的最后更新时间
3. 错误消息格式
错误消息由 dataflow 消费者生成,包含原始消息和错误详情。
{
"error_id": "err_1234567890abcdef",
"original_message": {
// 原始业务消息完整内容
},
"error": {
"type": "validation",
"message": "Missing required field: total_amount",
"trace": "Exception stack trace...",
"timestamp": "2025-01-14T10:31:00Z"
},
"metadata": {
"platform": "amazon",
"platform_id": 1,
"company_id": 100,
"store_id": 200,
"data_type": "order",
"failed_at": "2025-01-14T10:31:00Z"
}
}
RabbitMQ 管理界面操作指引
1. 访问管理界面
URL: http://<rabbitmq-host>:15672
用户名: admin
密码: <管理员密码>
2. 创建 VHost
- 点击顶部导航 Admin → Virtual Hosts
- 点击 Add a new virtual host
- 输入 VHost 名称:
dataflow-app - 点击 Add virtual host
3. 创建用户
- 点击顶部导航 Admin → Users
- 点击 Add a user
- 填写用户信息:
- Username:
user_amazon - Password:
<安全密码> - Tags: (留空,非管理员用户)
- Username:
- 点击 Add user
4. 配置用户权限
- 在用户列表中,点击用户名(如
user_amazon) - 滚动到 Permissions 部分
- 选择 VHost:
dataflow-app - 配置权限正则表达式:
- Configure regexp: (留空)
- Write regexp:
^amazon\.(exchange|errors\.exchange)$ - Read regexp:
^amazon\.errors\..*$
- 点击 Set permission
5. 创建 Exchange
- 点击顶部导航 Exchanges
- 确保选择正确的 VHost:
dataflow-app - 点击 Add a new exchange
- 填写配置:
- Name:
amazon.exchange - Type:
topic - Durability:
Durable - Auto delete:
No - Internal:
No
- Name:
- 点击 Add exchange
重复以上步骤创建所有业务 Exchange 和错误 Exchange。
6. 创建 Queue
- 点击顶部导航 Queues
- 确保选择正确的 VHost:
dataflow-app - 点击 Add a new queue
- 填写配置:
- Name:
orders.queue - Durability:
Durable - Auto delete:
No - Arguments:
x-message-ttl:86400000(24小时,单位:毫秒)
- Name:
- 点击 Add queue
重复以上步骤创建 products.queue、refunds.queue 和 errors.queue。
注意:errors.queue 的 TTL 设置为 604800000(7天)。
7. 创建 Binding
- 点击顶部导航 Exchanges
- 点击需要绑定的 Exchange(如
amazon.exchange) - 滚动到 Bindings 部分
- 在 Add binding from this exchange 区域填写:
- To queue:
orders.queue - Routing key:
order
- To queue:
- 点击 Bind
为每个 Exchange 创建 3 个绑定(order/product/refund)。
对于错误 Exchange,绑定配置:
- To queue:
errors.queue - Routing key:
#
配置脚本
为简化配置过程,可以使用以下脚本自动创建资源。
使用 rabbitmqadmin
RabbitMQ 提供命令行工具 rabbitmqadmin,可用于批量配置。
安装 rabbitmqadmin
# 从 RabbitMQ 管理界面下载
wget http://<rabbitmq-host>:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/
配置脚本示例
注意:本脚本适用于新版本 rabbitmqadmin(Rust 版本)。如果您使用的是旧版本(Python),请参考官方文档调整语法。
#!/bin/bash
# RabbitMQ 连接信息
RABBITMQ_HOST="127.0.0.1"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin"
VHOST="dataflow"
# 平台列表
PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify")
DATA_TYPES=("orders" "products" "refunds" "inventory")
# 0. 检测并清理现有 VHost 和用户(如果存在)
echo "========================================"
echo "开始清理现有配置..."
echo "========================================"
# 检查 VHost 是否存在
echo "检查 VHost: $VHOST 是否存在..."
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true)
if [ -n "$VHOST_EXISTS" ]; then
echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..."
# 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete vhost --name "$VHOST"
echo "✓ VHost '$VHOST' 及其所有资源已删除"
else
echo "VHost '$VHOST' 不存在"
fi
# 清理所有相关用户账号
echo ""
echo "清理用户账号..."
# 平台用户列表
ALL_USERS=()
for platform in "${PLATFORMS[@]}"; do
ALL_USERS+=("user_${platform}")
done
# 添加其他用户
ALL_USERS+=("user_dataflow_consumer")
ALL_USERS+=("user_ops")
# 删除用户(如果存在)
for user in "${ALL_USERS[@]}"; do
USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true)
if [ -n "$USER_EXISTS" ]; then
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete user --name "$user"
echo "✓ 删除用户: $user"
fi
done
echo ""
echo "========================================"
echo "清理完成,开始重新配置..."
echo "========================================"
echo ""
# 1. 创建 VHost
echo "创建 VHost: $VHOST"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare vhost --name "$VHOST"
# 2. 创建主 Exchange(用于重试消息回流)
echo "创建主 Exchange: main.exchange"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "main.exchange" --vhost "$VHOST" \
--type topic --durable true
# 3. 创建 DLX (死信交换机)
echo "创建死信交换机 (DLX)..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建 DLX: dlx.${dtype}"
done
# 4. 创建主业务队列(带 DLX 配置)
echo ""
echo "创建主业务队列..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable \
--arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
echo "✓ 创建队列: ${dtype}.queue"
done
# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
echo ""
echo "创建重试队列..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable \
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}"
echo "✓ 创建重试队列: ${dtype}.retry.queue"
done
# 6. 创建错误队列
echo ""
echo "创建错误队列..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "errors.queue" --vhost "$VHOST" --durable \
--arguments '{"x-message-ttl":604800000}'
echo "✓ 创建错误队列: errors.queue"
# 7. 绑定主 Exchange 到主队列(使用通配符)
echo ""
echo "绑定主 Exchange 到主队列..."
for dtype in "${DATA_TYPES[@]}"; do
# 使用 order.#, product.# 等通配符匹配所有平台
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "main.exchange" --destination "${dtype}.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#"
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
done
# 8. 绑定 DLX 到重试队列和错误队列
echo ""
echo "绑定 DLX 到重试队列和错误队列..."
for dtype in "${DATA_TYPES[@]}"; do
# DLX -> 重试队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "retry"
echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)"
# DLX -> 错误队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "dlx.${dtype}" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "error"
echo "✓ 绑定: dlx.${dtype} → errors.queue (routing_key: error)"
done
# 9. 为每个平台创建 Exchange 和 Binding
echo ""
echo "========================================"
echo "创建平台 Exchange 和 Binding..."
echo "========================================"
for platform in "${PLATFORMS[@]}"; do
echo ""
echo "处理平台: ${platform}"
# 创建平台业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建业务 Exchange: ${platform}.exchange"
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "orders.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "order.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "products.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "product.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "refunds.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "inventory.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}"
echo "✓ 绑定到业务队列 (4个)"
# 创建平台错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建错误 Exchange: ${platform}.errors.exchange"
# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "#"
echo "✓ 绑定到错误队列"
# 创建平台用户
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_${platform}" --password "change_me_${platform}" --tags ""
echo "✓ 创建用户: user_${platform}"
# 配置平台用户权限
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_${platform}" \
--configure "^${platform}\\.(exchange|errors\\.exchange)$" \
--write "^${platform}\\.(exchange|errors\\.exchange)$" \
--read "^${platform}\\.errors\\..*$"
echo "✓ 配置用户权限"
done
# 10. 创建 dataflow 消费者用户
echo ""
echo "========================================"
echo "创建 dataflow 消费者用户..."
echo "========================================"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags ""
echo "✓ 创建用户: user_dataflow_consumer"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \
--configure "^(orders|products|refunds|inventory).*\\.queue$" \
--write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \
--read "^(orders|products|refunds|inventory).*\\.queue$"
echo "✓ 配置用户权限"
# 11. 创建运维监控用户
echo ""
echo "创建运维监控用户..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_ops" --password "change_me_ops" --tags ""
echo "✓ 创建用户: user_ops"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_ops" \
--configure "^errors\\..*$" --write "" --read "^errors\\.queue$"
echo "✓ 配置用户权限"
echo ""
echo "========================================"
echo "RabbitMQ 配置完成!"
echo "========================================"
echo ""
echo "已创建:"
echo "- 1 个 VHost: $VHOST"
echo "- 1 个主 Exchange: main.exchange"
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
echo "- 1 个错误队列: errors.queue"
echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)"
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
echo ""
echo "提示: 请修改所有用户的默认密码 (change_me_*)"
执行脚本
chmod +x setup_rabbitmq.sh
./setup_rabbitmq.sh
监控和维护
1. 监控指标
通过 RabbitMQ 管理界面查看以下关键指标:
队列健康状态
- 点击 Queues → 选择队列
- 关注以下指标:
- Ready: 待消费消息数(正常应 < 1000)
- Unacked: 未确认消息数(正常应较低)
- Total: 总消息数
- Incoming rate: 消息入队速率
- Deliver / Get rate: 消息消费速率
告警阈值:
- Ready > 10000:队列堆积严重
- Unacked > 5000:消费者处理缓慢
- Incoming rate >> Deliver rate:消费速度跟不上生产速度
Exchange 流量统计
- 点击 Exchanges → 选择 Exchange
- 查看 Message rates:
- Publish in: 消息发布速率
- Publish out: 消息路由到队列的速率
异常情况:
- Publish in > 0 但 Publish out = 0:可能绑定配置错误
2. 常见问题排查
消息未到达队列
检查步骤:
- 确认 Exchange 存在且类型正确
- 检查 Binding 配置,确保 routing key 匹配
- 查看 Exchange 的 Message rates,确认消息已发布
- 检查生产者是否使用了正确的 routing key
消费者无法消费
检查步骤:
- 确认用户权限配置正确(Read 权限)
- 检查队列中是否有消息(Ready > 0)
- 查看消费者连接状态(Connections 页面)
- 检查消费者代码是否正确 ACK 消息
错误消息未送达平台开发者
检查步骤:
- 确认错误 Exchange 绑定到
errors.queue - 检查平台开发者账号的 Read 权限
- 确认平台开发者订阅了正确的 Exchange
- 查看
errors.queue中是否有消息
3. 备份和恢复
导出配置
# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
export dataflow-backup.json
导入配置
# 从备份恢复配置
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
import dataflow-backup.json
4. 性能优化
消费者并发配置
- 增加消费者实例数量
- 每个消费者配置多个 channel(如 5-10 个)
- 使用 prefetch_count 限制未确认消息数量
队列性能优化
- 启用 lazy queue(大量消息堆积时):
rabbitmqadmin declare queue name=orders.queue vhost=dataflow-app \ durable=true arguments='{"x-queue-mode":"lazy"}' - 定期清理过期消息(通过 TTL 自动实现)
5. 日志查看
# 查看 RabbitMQ 日志
tail -f /var/log/rabbitmq/rabbit@<hostname>.log
# 查看连接日志
tail -f /var/log/rabbitmq/rabbit@<hostname>_connection.log
安全建议
1. 密码策略
- 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符)
- 定期更换密码(建议 3-6 个月)
- 不同环境(开发/测试/生产)使用不同密码
2. 网络安全
- 生产环境启用 TLS/SSL 加密连接
- 限制管理界面访问(仅允许内网或 VPN 访问)
- 配置防火墙规则:
- 5672:AMQP 端口(仅允许应用服务器访问)
- 15672:管理界面(仅允许管理员 IP 访问)
3. 权限最小化原则
- 平台开发者只能访问自己的 Exchange
- 消费者只能读取必要的队列
- 避免赋予不必要的 Configure 权限
4. 审计日志
- 启用 RabbitMQ 审计日志插件
- 定期审查用户操作记录
- 监控异常登录行为
新增平台接入流程
当需要接入新平台(例如 Lazada)时,按以下步骤操作:
1. 创建 Exchange
# 业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "lazada.exchange" --vhost dataflow-app \
--type topic --durable true
# 错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "lazada.errors.exchange" --vhost dataflow-app \
--type topic --durable true
2. 创建 Binding
# 绑定到业务队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "lazada.exchange" --destination "orders.queue" \
--destination-type queue --vhost dataflow-app --routing-key "order.lazada"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "lazada.exchange" --destination "products.queue" \
--destination-type queue --vhost dataflow-app --routing-key "product.lazada"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "lazada.exchange" --destination "refunds.queue" \
--destination-type queue --vhost dataflow-app --routing-key "refund.lazada"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "lazada.exchange" --destination "inventory.queue" \
--destination-type queue --vhost dataflow-app --routing-key "inventory.lazada"
# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "lazada.errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost dataflow-app --routing-key "#"
3. 创建用户并配置权限
# 创建用户
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_lazada" --password "<secure_password>" --tags ""
# 配置权限
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost dataflow-app --user "user_lazada" \
--configure "^lazada\\.(exchange|errors\\.exchange)$" \
--write "^lazada\\.(exchange|errors\\.exchange)$" \
--read "^lazada\\.errors\\..*$"
4. 提供给平台开发者
将以下信息提供给新平台的开发团队:
RabbitMQ 连接信息:
- Host: <rabbitmq-host>
- Port: 5672
- VHost: dataflow-app
- Username: user_lazada
- Password: <secure_password>
发布配置:
- Exchange: lazada.exchange
- Routing Keys:
- order: 发布订单数据
- product: 发布产品数据
- refund: 发布退款数据
- inventory: 发布库存数据
错误订阅(可选):
- Exchange: lazada.errors.exchange
- 接收本平台的错误消息
消息格式规范:
- 参见「消息格式规范」章节
附录
A. 完整资源清单
VHost
- 总数: 1 个
dataflow-app
Exchanges
主 Exchange: 1 个
main.exchange(Topic) - 统一主交换机,用于路由和重试消息回流
平台业务 Exchanges: 8 个(每平台 1 个)
douyin.exchangejd.exchangetmall.exchangeredbook.exchangeamazon.exchangenaver.exchangerakuten.exchangeshopify.exchange
平台错误 Exchanges: 8 个(每平台 1 个)
douyin.errors.exchangejd.errors.exchangetmall.errors.exchangeredbook.errors.exchangeamazon.errors.exchangenaver.errors.exchangerakuten.errors.exchangeshopify.errors.exchange
死信交换机 (DLX): 4 个(每数据类型 1 个)
dlx.ordersdlx.productsdlx.refundsdlx.inventory
Exchange 总数: 21 个 (1 主 + 8 平台业务 + 8 平台错误 + 4 DLX)
Queues
主业务队列: 4 个
orders.queue(配置 DLX: dlx.orders)products.queue(配置 DLX: dlx.products)refunds.queue(配置 DLX: dlx.refunds)inventory.queue(配置 DLX: dlx.inventory)
延迟重试队列: 4 个
orders.retry.queue(TTL: 5秒, DLX: main.exchange)products.retry.queue(TTL: 5秒, DLX: main.exchange)refunds.retry.queue(TTL: 5秒, DLX: main.exchange)inventory.retry.queue(TTL: 5秒, DLX: main.exchange)
错误队列: 1 个
errors.queue(TTL: 7天)
Queue 总数: 9 个 (4 主队列 + 4 重试队列 + 1 错误队列)
Bindings
主 Exchange 绑定: 4 个
main.exchange→orders.queue(routing_key:order.#)main.exchange→products.queue(routing_key:product.#)main.exchange→refunds.queue(routing_key:refund.#)main.exchange→inventory.queue(routing_key:inventory.#)
平台业务 Exchange 绑定: 32 个
- 每个平台 Exchange → 4 个业务队列 (8 × 4 = 32)
- 使用 routing key 格式:
{data_type}.{platform}- 例如:
order.douyin,product.tmall,refund.jd等
- 例如:
平台错误 Exchange 绑定: 8 个
- 每个平台错误 Exchange →
errors.queue(routing_key:#)
DLX 绑定: 8 个
- 每个 DLX → 对应重试队列 (routing_key:
retry) - 4 个 - 每个 DLX →
errors.queue(routing_key:error) - 4 个
Binding 总数: 52 个 (4 主 + 32 平台业务 + 8 平台错误 + 8 DLX)
Users
平台开发者账号: 8 个
user_douyin,user_jd,user_tmall,user_redbook,user_amazon,user_naver,user_rakuten,user_shopify- 权限:只能写入自己的业务和错误 Exchange,只能读取自己的错误队列
消费者账号: 1 个
user_dataflow_consumer- 权限:可以管理所有业务队列和重试队列,可以写入 DLX 和错误 Exchange,可以读取所有业务队列
运维监控账号: 1 个
user_ops- 权限:可以管理错误相关资源,可以读取错误队列
管理员账号: 1 个
admin- 权限:完全权限,可以访问管理界面
User 总数: 11 个 (8 平台 + 1 消费者 + 1 运维 + 1 管理员)
架构统计
| 资源类型 | 数量 | 说明 |
|---|---|---|
| VHost | 1 | dataflow-app |
| Exchange | 21 | 1 主 + 8 平台业务 + 8 平台错误 + 4 DLX |
| Queue | 9 | 4 主 + 4 重试 + 1 错误 |
| Binding | 52 | 完整消息流转路径 |
| User | 11 | 多角色权限隔离 |
消息流转路径:
生产者 → 平台 Exchange → 主队列 → 消费者
↓ (失败)
DLX → 重试队列 (5秒TTL) → 主 Exchange → 主队列
↓ (超过3次)
DLX → 错误队列 (人工处理)