66 KiB
RabbitMQ 配置方案
权威源声明:本文档已与现网代码对齐(2026-05-14)。如本文与代码冲突,以代码为准:
- 拓扑创建脚本:
backend/bin/rabbitmq.sh- 用户/凭据配置:
backend/config/autoload/mq_user.php+backend/config/autoload/amqp.php- 业务消费者(注解 declare):
backend/app/Platform/{Order,Product,Refund}Consumer.php- 错误兜底(双写):
backend/app/Platform/Traits/FailedMessageTrait.php+backend/app/Platform/ErrorProducer.php+backend/app/Model/FailedMessage.php- 运维状态检查:
php ./bin/hyperf.php app:mq:status [--interval=N] [--watch](backend/app/Command/AppMqStatus.php+backend/app/Service/MqStatusService.php)
配置层级说明:mq_user.php 从数据库 platforms 表动态读取启用的平台名,按 strtolower(str_replace(' ', '_', $name)) 标准化生成 user_{platform} 用户名与 MQ_PASSWORD_{PLATFORM_UPPER} env 变量名;amqp.php 中 default_consumer 用户硬编码为 user_datahub_consumer,密码由 env MQ_PASSWORD_CONSUMER 注入。
概述
本文档描述 datahub 应用的 RabbitMQ 消息队列架构设计和配置方法。
业务背景
- 应用角色:datahub 是消息消费者,负责接收来自多个电商平台的数据并入库
- 数据来源:19 个电商/线下平台(amazon_japan、coupang、dewu、douyin、goofish、jd、kaola、lazada、naver、offline、pdd、rakuten、redbook、shopee、shopify、tmall、wechat、wechat_video、youzan)— 平台列表由
platforms表动态管理,名称按strtolower(str_replace(' ', '_', $name))标准化 - 数据类型:订单(orders)、产品(products)、退款(refunds)、库存(inventory)
- 隔离需求:不同平台开发者之间的工作空间需要相互隔离,互不影响
架构特点
- VHost 隔离:按应用划分 VHost,使用
Classic队列类型 - Exchange 隔离:每个平台独立的业务 Exchange + 错误 Exchange,通过权限正则控制访问(用户只能写自己的 platform.exchange / platform.errors.exchange)
- 单队列设计:每种数据类型一个队列,所有平台共享(
orders.queue/products.queue/refunds.queue/inventory.queue),保证同 data_type 内严格 FIFO - 独立消费者:每种数据类型一个 Hyperf Consumer 实例,使用 prefetch + 单条 ACK 模式
- 智能重试:DLX(死信交换机)+ 延迟重试队列(TTL=5s 自动回流 main.exchange)+ 应用层 ErrorProducer 兜底
- 双层错误持久化:超过重试上限的消息 同时写入
errors.queue(MQ, TTL=7d) + DBfailed_messages表(人工捞回与统计分析) - 消息持久化:RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API
设计方案总结
核心设计理念
本方案采用 Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。
关键设计决策
** 注意 **
- 消息内部字段无严格要求,但可以在业务侧进行约束
- 默认单条消息的最大 Payload 尺寸为 128MB
- 消息队列为缓存系统,应该尽可能的将批量数据放入 payload\
- MQ 存在的意义在于快速接受远程生产的 Message
1. 为什么使用单队列?
| 问题 | 传统多队列方案 | 本方案(单队列) |
|---|---|---|
| 队列数量 | 19 平台 × 4 数据类型 = 76 个队列 | 4 个数据类型队列(数量减少 ~95%,且与平台数量解耦) |
| FIFO 保证 | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 |
| 运维复杂度 | 需要监控 76 个队列 | 只需监控 4 个主队列 + 4 个 retry 队列 + 1 个 errors 队列 = 9 个 |
| 消费者部署 | 需要 N 个消费者实例 | 只需 4 个消费者实例 |
| 新增平台成本 | 新建 4 队列 + 4 binding + 用户/权限 | 仅新建 1 业务 exchange + 1 错误 exchange + 5 binding + 1 用户/权限(队列不动) |
原因:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。
2. 为什么使用单消费者?
原因:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。
优势:
- ✅ 保证严格 FIFO 顺序
- ✅ 避免消息错投到错误的消费者
- ✅ 简化部署和运维
如何实现多平台支持? 通过消费者内部的适配器模式,根据消息的 platform 字段分发到不同业务逻辑。
3. 为什么使用 Prefetch + 批处理?
原因:RabbitMQ 是 Push 模式,不提供 Kafka 式的批量拉取 API。
解决方案:
- 设置
prefetch_count=100:RabbitMQ 最多推送 100 条未确认消息 - 消费者自行缓存消息,达到
batch_size=50或timeout=3s后批量处理 - 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性
优势:
- ✅ 提升数据库写入效率(批量插入)
- ✅ 控制并发和内存压力
- ✅ 失败消息不影响其他消息(单条 ACK)
4. 为什么使用 DLX + 延迟重试队列?
问题:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。
传统方案的问题:
- ❌
nack(requeue=true):消息回队列头部,无限循环,永远不会进入 DLX - ❌ 直接丢弃:数据丢失
- ❌ 立即重试:打爆 API/DB
本方案(DLX + 延迟重试 + 双写错误持久化):
消费失败 → Consumer 检查 x-death count
↓
count < AMQP_MAX_RETRIES(默认 3)
↓
Consumer 返回 NACK(requeue=false)
↓
源 queue 的 x-dead-letter-exchange = dlx.{dtype}, dl-rk = "retry"
↓
dlx.{dtype} (topic) — binding rk="retry" → {dtype}.retry.queue (TTL=5s)
↓
retry.queue 的 x-dead-letter-exchange = main.exchange, dl-rk = "{entity}.retry"
↓ 5s 后死信回流
main.exchange (topic) — binding "{entity}.#" → {dtype}.queue (重新消费, x-death count+1)
count >= AMQP_MAX_RETRIES
↓
Consumer 调用 FailedMessageTrait::sendToErrorQueue
↓ ┌─ ErrorProducer.publish → errors.exchange (rk="error") → errors.queue (TTL=7d) ← MQ 兜底
└─ FailedMessage::create → DB `failed_messages` 表 ← DB 兜底
↓
Consumer 返回 ACK(避免主队列继续重试)
重要说明:
- ⚠️ 必须设置
requeue=false:只有requeue=false才会触发 DLX 机制(已由protected bool $requeue = false在所有 Consumer 中固定) - ⚠️
requeue=true会导致无限循环:消息直接回到原队列头部,永远不进入 DLX - ✅ retry 队列自动回流:TTL 到期后由 RabbitMQ 引擎自动死信回 main.exchange,不需要额外消费者
- ✅ Consumer 控制重试次数:通过
FailedMessageTrait::getRetryCount读x-death[0].count - ✅ 错误兜底双写:超过重试上限时,errors.queue + failed_messages 表同步双写,互为备份;监控/告警/捞回需同时覆盖两个数据源
- ❌ DLX 不直接绑定 errors.queue:错误进入 errors.queue 完全由应用层
ErrorProducer主动 publish 到errors.exchange实现,不走 DLX 路由分流
优势:
- ✅ 不会卡 FIFO
- ✅ 不会无限重试(通过 max retries 限制)
- ✅ 延迟重试避免打爆 API/DB
- ✅ 错误消息自动隔离到 error 队列
- ✅ 无需额外 Redis 或手写 retry 逻辑
- ✅ retry 过程完全自动化(依赖 RabbitMQ 的 DLX + TTL 机制)
方案优势总结
| 维度 | 优势 |
|---|---|
| 可靠性 | 消息持久化 + 单条 ACK + 智能重试,避免数据丢失 |
| 性能 | Prefetch + 批处理 + 数据库批量写入,提升吞吐 |
| 顺序性 | 单队列 FIFO 保证严格顺序 |
| 扩展性 | 适配器模式,新增平台只需添加适配器类 |
| 运维 | 队列数量少,监控简单,故障排查容易 |
| 容错 | DLX 智能重试,失败消息不阻塞主队列 |
适用场景
✅ 适合:
- 多平台数据同步(电商、物流、金融等)
- 需要严格 FIFO 顺序
- 需要高可靠性和高吞吐
- 平台数量多(> 5个),数据类型固定(< 10个)
❌ 不适合:
- 实时性要求极高(< 100ms)
- 需要按平台独立控制消费速率
- 平台数量少(< 3个),可以使用独立队列
Requeue 机制详解
requeue 参数的作用
在 RabbitMQ 中,当消费者处理消息失败时,可以通过 basic_nack() 或 basic_reject() 拒绝消息,第三个参数 requeue 决定了消息的去向:
// requeue=true:消息重新回到原队列(队列头部)
$channel->basic_nack($deliveryTag, false, true);
// requeue=false:消息进入 DLX(如果配置了),否则丢弃
$channel->basic_nack($deliveryTag, false, false);
requeue=true 的问题
| 行为 | 后果 |
|---|---|
| 消息回到原队列头部 | 立即被同一个消费者再次取出 |
| 不触发 DLX | 永远不会进入重试队列或错误队列 |
| 无延迟 | 瞬间形成高速循环(毫秒级) |
| 无计数 | 无法统计重试次数,无法设置上限 |
结果:消息在 消费 → 失败 → NACK → 回队列 → 消费 之间无限循环,CPU 和网络资源被浪费,队列状态显示为空(消息循环太快无法观察)。
requeue=false 的正确用法
orders.queue
arguments:
x-dead-letter-exchange = dlx.orders
x-dead-letter-routing-key = retry
x-message-ttl = 86400000 (24h)
↓ Consumer 消费失败 (count < max_retries)
↓ basic_nack(requeue=false)
↓ 触发 DLX 机制
↓
dlx.orders (topic)
↓ binding key = "retry"
↓
orders.retry.queue
arguments:
x-message-ttl = 5000 (5s)
x-dead-letter-exchange = main.exchange
x-dead-letter-routing-key = order.retry
↓ 5s 后 TTL 到期,自动死信回 main.exchange
↓
main.exchange (topic)
↓ binding key = "order.#" 匹配 "order.retry"
↓
orders.queue (重新进入主队列,x-death count+1)
注意:retry queue 的
x-dead-letter-routing-key是{entity_singular}.retry(如order.retry/product.retry),不保留原 routing key。由于 main.exchange 用通配符order.#绑定 orders.queue,order.retry能正常匹配回流;但失去了"消息原平台来源"信息(platform 字段已存在 message 体内,不依赖 routing key 追溯)。
环境变量配置
在 .env 文件中可以配置以下参数:
# 最大重试次数(默认3次)
# 消息重试超过此次数后,会被发送到 errors.queue
AMQP_MAX_RETRIES=3
# 调试延迟(秒,默认0)
# 设置为 2 可以让每条消息处理延迟2秒,方便在 mq:status 中观察队列状态
# 生产环境应设置为 0
AMQP_CONSUMER_DEBUG_DELAY=0
使用示例:
# 开发环境:观察消息流转
AMQP_CONSUMER_DEBUG_DELAY=2 php bin/hyperf.php start
# 生产环境:正常运行
AMQP_CONSUMER_DEBUG_DELAY=0 php bin/hyperf.php start
架构设计
1. VHost 设计
VHost 按业务应用进行划分,便于资源隔离和权限管理。
当前 VHost:
datahub— 电商数据流转专用(注:早期文档误写为datahub,现网实际为datahub)
2. Exchange 设计
在 datahub VHost 中,Exchange 分为三类:
主路由 Exchange
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
main.exchange |
Topic | 是 | 仅作为 retry 回流入口 + Consumer Annotation binding 声明源;不作为 producer 入口 |
⚠ producer 直推
{platform}.exchange,不经过 main.exchange。main.exchange 的两个用途:① retry queue 的x-dead-letter-exchange指向它,retry 消息通过它回流业务队列;② Hyperf#[Consumer(exchange: "main.exchange", routingKey: "{entity}.#", queue: "{dtype}.queue")]注解声明 binding 时用它。
平台业务 Exchange(producer 入口)
每个平台一个独立的 Topic Exchange,共 19 个(与 platforms 表 enabled 平台对齐):
平台 (name 标准化后) |
Exchange |
|---|---|
| amazon_japan | amazon_japan.exchange |
| coupang | coupang.exchange |
| dewu | dewu.exchange |
| douyin | douyin.exchange |
| goofish | goofish.exchange |
| jd | jd.exchange |
| kaola | kaola.exchange |
| lazada | lazada.exchange |
| naver | naver.exchange |
| offline | offline.exchange |
| pdd | pdd.exchange |
| rakuten | rakuten.exchange |
| redbook | redbook.exchange |
| shopee | shopee.exchange |
| shopify | shopify.exchange |
| tmall | tmall.exchange |
wechat.exchange |
|
| wechat_video | wechat_video.exchange |
| youzan | youzan.exchange |
全部 type=topic, durable=true。
Routing Key 规范({entity_singular}.{platform} 格式,注意 entity 单数 / 平台名小写下划线):
| 数据类型 | 示例 routing key |
|---|---|
| 订单 (order) | order.tmall / order.amazon_japan / order.wechat_video |
| 产品 (product) | product.tmall / product.amazon_japan / product.wechat_video |
| 退款 (refund) | refund.tmall / refund.amazon_japan / refund.wechat_video |
| 库存 (inventory) | inventory.tmall / inventory.amazon_japan / inventory.wechat_video |
设计说明:
- producer 直推
{platform}.exchange,使用 routing key{entity_singular}.{platform} - 业务 binding 把
{platform}.exchange通过 routing key 直绑到 4 个{dtype}.queue(不经过 main.exchange) - 消费者通过消息体
platform/data_type字段进行业务分发(不依赖 routing key 解析)
平台错误 Exchange(应用层错误推送 + 平台错误通知)
每个平台一个独立的错误 Topic Exchange,共 19 个,命名 {platform}.errors.exchange:
| 来源平台 | Errors Exchange |
|---|---|
| 19 个平台 | {platform}.errors.exchange(amazon_japan / coupang / dewu / douyin / goofish / jd / kaola / lazada / naver / offline / pdd / rakuten / redbook / shopee / shopify / tmall / wechat / wechat_video / youzan) |
每个 {platform}.errors.exchange 通过 binding rk=# 直连 errors.queue。
全局错误 Exchange
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
errors.exchange |
Topic | 是 | 应用层 ErrorProducer 推送目标,binding rk=# 直连 errors.queue |
Routing Key 规范(应用层 publish 时):
- 应用层
ErrorProducer.php固定使用 routing key =error(见代码) - 平台 producer 在自检/校验失败时可向
{platform}.errors.exchange自定义 routing key(业务侧决定,由 binding#兜底)
3. Queue 设计
业务队列(主队列)
物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | DLX | 用途 |
|---|---|---|---|---|---|
orders.queue |
是 | 否 | 24小时 | dlx.orders |
接收所有平台的订单数据,单消费者批处理 |
products.queue |
是 | 否 | 24小时 | dlx.products |
接收所有平台的产品数据,单消费者批处理 |
refunds.queue |
是 | 否 | 24小时 | dlx.refunds |
接收所有平台的退款数据,单消费者批处理 |
inventory.queue |
是 | 否 | 24小时 | dlx.inventory |
接收所有平台的库存数据,单消费者批处理 |
队列参数配置:
{
"x-message-ttl": 86400000,
"x-dead-letter-exchange": "dlx.orders",
"x-dead-letter-routing-key": "retry"
}
死信交换机(DLX)
每种数据类型配备一个 DLX,用于处理失败消息的路由:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
dlx.orders |
Topic | 是 | 订单失败消息路由 |
dlx.products |
Topic | 是 | 产品失败消息路由 |
dlx.refunds |
Topic | 是 | 退款失败消息路由 |
dlx.inventory |
Topic | 是 | 库存失败消息路由 |
延迟重试队列
为每种数据类型提供延迟重试机制:
| Queue 名称 | 持久化 | 消息 TTL | DLX | 用途 |
|---|---|---|---|---|
orders.retry.queue |
是 | 5秒 | main.exchange |
订单延迟重试(TTL后自动回到主队列) |
products.retry.queue |
是 | 5秒 | main.exchange |
产品延迟重试 |
refunds.retry.queue |
是 | 5秒 | main.exchange |
退款延迟重试 |
inventory.retry.queue |
是 | 5秒 | main.exchange |
库存延迟重试 |
重试队列参数配置(按 data_type 实例化,{entity_singular} ∈ {order, product, refund, inventory}):
{
"x-message-ttl": 5000,
"x-dead-letter-exchange": "main.exchange",
"x-dead-letter-routing-key": "{entity_singular}.retry"
}
例:orders.retry.queue 的 dl-rk = order.retry,回流时由 main.exchange 的 order.# binding 匹配进入 orders.queue。
重试机制说明(现网实现):
- 消费者处理失败时,调用
FailedMessageTrait::getRetryCount读x-death[0].count - 根据重试次数决定处理方式:
- count <
AMQP_MAX_RETRIES(默认 3):return Result::NACK(Hyperf 自动按$requeue=false触发 DLX)→ 进入dlx.{dtype}→ 路由到{dtype}.retry.queue(5s 延迟)→ TTL 到期回流main.exchange→ 重新进入主队列 - count >= max_retries:Consumer 调用
FailedMessageTrait::sendToErrorQueue,同时:ErrorProducer.publish→errors.exchange(rk=error) →errors.queue(TTL 7d)FailedMessage::create→ DBfailed_messages表
- 然后
return Result::ACK,避免主队列继续重试
- count <
- 配置参数:
- 最大重试次数:
AMQP_MAX_RETRIES(默认 3) - 延迟时间:5 秒(retry 队列
x-message-ttl=5000) - 调试延迟:
AMQP_CONSUMER_DEBUG_DELAY(生产环境设 0)
- 最大重试次数:
错误队列
统一的错误队列,接收所有超过重试次数的失败消息:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | 用途 |
|---|---|---|---|---|
errors.queue |
是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 |
错误队列的作用和机制:
- ✅ 收集永久失败的消息:当消息重试次数达到上限(默认3次)后,消费者会主动将消息发送到此队列
- ✅ 避免消息丢失:即使处理失败多次,消息也会被保存在错误队列中,不会丢失
- ✅ 人工排查和修复:运维人员可以从错误队列中查看失败原因,修复问题后重新投递
- ✅ 统一管理:所有数据类型(orders/products/refunds/inventory)的错误消息都集中在一个队列
- ⚠️ 不会自动重试:错误队列中的消息需要人工介入,不会自动回流到主队列
错误消息格式:
{
"error_id": "err_675d8f3a2b1c4",
"original_message": { /* 原始消息内容 */ },
"error": {
"type": "InvalidArgumentException",
"message": "Missing required field: total_amount",
"trace": "Exception stack trace...",
"timestamp": "2025-01-15T10:31:00+00:00"
},
"metadata": {
"platform": "tmall",
"platform_id": 2,
"company_id": 188,
"store_id": 292,
"data_type": "order",
"failed_at": "2025-01-15T10:31:00+00:00",
"retry_count": 3
}
}
4. Binding 配置(现网实际)
本节按"消息流向"组织,每个 binding 都来自本地 MQ 实测(curl mgmt-api/bindings/datahub)+ backend/bin/rabbitmq.sh 创建逻辑。
4.1 平台业务 Exchange → 业务队列(producer 主路径)
每个 {platform}.exchange 通过 4 条 binding 直接绑定到 4 个业务队列:
{platform}.exchange (topic)
├── rk = "order.{platform}" → orders.queue
├── rk = "product.{platform}" → products.queue
├── rk = "refund.{platform}" → refunds.queue
└── rk = "inventory.{platform}" → inventory.queue
19 平台 × 4 = 76 条 binding。
4.2 main.exchange → 业务队列(retry 回流路径)
main.exchange (topic)
├── rk = "order.#" → orders.queue
├── rk = "product.#" → products.queue
├── rk = "refund.#" → refunds.queue
└── rk = "inventory.#" → inventory.queue
4 条 binding。这是 retry queue 死信回流的唯一入口;producer 不通过 main.exchange 投递消息。
4.3 DLX → retry queue(仅一种绑定)
dlx.orders (topic) — rk = "retry" → orders.retry.queue
dlx.products (topic) — rk = "retry" → products.retry.queue
dlx.refunds (topic) — rk = "retry" → refunds.retry.queue
dlx.inventory (topic) — rk = "retry" → inventory.retry.queue
4 条 binding。
⚠ DLX 不绑定到 errors.queue。错误进入 errors.queue 由应用层
ErrorProducer主动 publish 实现(见 §4.5),不通过 DLX 路由分流。这一点与早期文档草稿("dlx.{dtype} → errors.queue (rk=error)" 双绑模式)不符,以现网实现为准。
4.4 retry queue → main.exchange(TTL 死信回流)
retry queue 不需要显式 binding;其 x-dead-letter-exchange = main.exchange 与 x-dead-letter-routing-key = {entity_singular}.retry 定义在队列 arguments 中,TTL 到期由 RabbitMQ 引擎自动死信投递到 main.exchange。
orders.retry.queue —(TTL 5s, dl-rk=order.retry)→ main.exchange →[order.#]→ orders.queue
products.retry.queue —(TTL 5s, dl-rk=product.retry)→ main.exchange →[product.#]→ products.queue
refunds.retry.queue —(TTL 5s, dl-rk=refund.retry)→ main.exchange →[refund.#]→ refunds.queue
inventory.retry.queue —(TTL 5s, dl-rk=inventory.retry)→ main.exchange →[inventory.#]→ inventory.queue
4.5 errors exchange → errors.queue(双入口扇入)
errors.exchange (topic) — rk = "#" → errors.queue (全局兜底入口,应用层 ErrorProducer 用)
{platform}.errors.exchange (topic) — rk = "#" → errors.queue × 19 (平台自定义错误投递入口)
20 条 binding(19 平台 + 1 全局)。
4.6 binding 总数核对
| 类别 | 数量 |
|---|---|
| 平台业务 exchange → 4 业务队列 | 19 × 4 = 76 |
| main.exchange → 4 业务队列 | 4 |
| dlx.{dtype} → retry queue | 4 |
| errors exchange → errors.queue(19 平台 + 1 全局) | 20 |
| 总计 | 104 |
(不含 default exchange (AMQP default) 自动给每个 queue 创建的同名 binding)
消费者设计
1. 消费者模型
采用 单消费者 + 批处理 + prefetch + 适配器模式:
| 数据类型 | 消费者 | Queue | 并发模式 | 处理模式 |
|---|---|---|---|---|
| 订单 | Order Consumer | orders.queue |
单消费者 | 批处理 + 适配器 |
| 产品 | Product Consumer | products.queue |
单消费者 | 批处理 + 适配器 |
| 退款 | Refund Consumer | refunds.queue |
单消费者 | 批处理 + 适配器 |
| 库存 | Inventory Consumer | inventory.queue |
单消费者 | 批处理 + 适配器 |
设计原则:
- ✅ 单消费者:每个队列只有一个消费者实例,保证严格 FIFO 顺序
- ✅ 批处理:积累一批消息后统一处理,提升数据库写入效率
- ✅ 单条 ACK:每条消息独立确认,失败不影响其他消息
- ✅ 适配器模式:消费者内部根据平台字段分发到不同业务逻辑
**消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **
2. Prefetch 配置
{
"prefetch_count": 100,
"batch_size": 50,
"batch_timeout": 3000
}
参数说明:
prefetch_count=100:RabbitMQ 最多推送 100 条未确认消息到消费者batch_size=50:消费者积累 50 条消息后触发批处理batch_timeout=3000:如果3秒内未达到 batch_size,也触发处理
工作流程:
- RabbitMQ 按 Push 模式推送最多 100 条消息到消费者
- 消费者缓存消息,达到 50 条或超时 3 秒后批量处理
- 批量处理时,仍然对每条消息单独 ACK/NACK
- 实现"伪批量拉取"效果,同时保留单条确认的灵活性
3. 消费者处理逻辑(现网实现简化版)
完整实现见 backend/app/Platform/OrderConsumer.php 与 FailedMessageTrait.php。简化骨架:
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue",
pool: "default_consumer", nums: 1, enable: true)]
class OrderConsumer extends ConsumerMessage
{
use FailedMessageTrait;
protected ?array $qos = ['prefetch_size' => 0, 'prefetch_count' => 1, 'global' => false];
protected bool $requeue = false; // 关键:让失败消息进 DLX 而不是 requeue
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())
->setQueue($this->getQueue())
->setArguments([
'x-dead-letter-exchange' => ['S', 'dlx.orders'],
'x-dead-letter-routing-key' => ['S', 'retry'],
'x-message-ttl' => ['I', 86400000], // 24h
'x-queue-type' => ['S', 'classic'],
]);
}
public function consumeMessage($data, AMQPMessage $message): Result
{
$retry_count = $this->getRetryCount($message); // 读 x-death[0].count
$max_retries = (int) env('AMQP_MAX_RETRIES', 3);
try {
// 1. 创建 EntityParse、entityMatch、entityMap、upsert
$parse = EntityParseFactory::createFromMessage($message);
$entity = $parse->entityMatch($data['meta']);
$rows = $parse->entityMap($data['data'])->all();
Db::beginTransaction();
$entity->newQuery()->upsert($rows, $parse->getUniqueBy(), $parse->getUpdateFields());
// 2. 子项处理、聚合补刷等业务逻辑 ...
Db::commit();
return Result::ACK;
} catch (Throwable $error) {
Db::rollBack();
Log::get()->error('Consumer processing failed', [...]);
if ($retry_count >= $max_retries) {
// 双写兜底:MQ errors.queue + DB failed_messages 表
$this->sendToErrorQueue($message, $error);
return Result::ACK; // 防止主队列继续重试
}
return Result::NACK; // requeue=false → 进 DLX → retry queue
}
}
}
FailedMessageTrait::sendToErrorQueue 内部:
$producer = ApplicationContext::getContainer()->get(Producer::class);
$producer->produce(new ErrorProducer($message, $error, $retry_count));
// ↓ ErrorProducer 注解 #[Producer(exchange: 'errors.exchange', routingKey: 'error')]
// → errors.exchange → errors.queue (MQ 兜底, 7d TTL)
FailedMessage::query()->create([...]); // → DB failed_messages 表 (DB 兜底)
4. 重试机制工作流程(现网实现)
┌─────────────────┐
│ orders.queue │ ← 消费者从这里接收消息
└────────┬────────┘
│
├─ 成功 → return Result::ACK
│
└─ 失败 → FailedMessageTrait::getRetryCount($message)
(读 x-death[0].count)
│
├─ count < AMQP_MAX_RETRIES (默认 3)
│ │
│ └→ return Result::NACK (Hyperf 自动按 $requeue=false 处理)
│ │
│ ▼ 触发 DLX
│ dlx.orders (topic) ──rk="retry"──► orders.retry.queue
│ │ TTL=5s
│ ▼
│ main.exchange (rk="order.retry")
│ │
│ ▼ 命中 binding "order.#"
│ orders.queue (重新消费,x-death.count+1)
│
└─ count >= AMQP_MAX_RETRIES
│
└→ FailedMessageTrait::sendToErrorQueue($message, $error)
│
├──► ErrorProducer.publish → errors.exchange (rk="error")
│ → errors.queue (TTL 7d, user_ops 捞回)
│
└──► FailedMessage::query()->create(...)
→ DB failed_messages 表
│
▼
return Result::ACK (避免主队列继续重试)
关键点:
- ✅
$requeue=false由 Consumer 类属性固定:让失败消息进入 DLX 而不是 requeue - ✅ retry 队列自动回流:RabbitMQ 引擎按 retry queue 的
x-message-ttl=5000自动死信回 main.exchange,不需要额外消费者 - ✅
x-death.count自动累加:每次死信 RabbitMQ 自动增加 count;Consumer 通过FailedMessageTrait::getRetryCount读取 - ✅ 错误兜底双写:超过重试上限时 errors.queue + failed_messages 表同步双写,互为备份
- ⚠️ DLX 不绑定 errors.queue:进入 errors.queue 完全由应用层
ErrorProducer主动 publish 实现,不走 DLX 路由分流
5. 为什么这样设计?
| 特性 | 原因 | 优势 |
|---|---|---|
| 单队列 FIFO | RabbitMQ 无法在队列内部按 topic 过滤 | 保证严格 FIFO,不拆分队列造成业务乱序 |
| 单消费者 | RabbitMQ 同队列多消费者必然 Round-Robin 分配 | 避免消息错投到错误的消费者 |
| Prefetch + 批处理 | RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 | 实现"伪批量拉取",提升吞吐 |
| 单条 ACK | 保留细粒度控制 | 失败消息不影响其他消息 |
| DLX + 延迟重试 | 避免失败消息阻塞队列 | 不会卡 FIFO,不会无限重试 |
| 适配器模式 | 多平台业务逻辑隔离 | 扩展性好,易维护 |
用户权限配置
VHost 名称:现网为
datahub(早期草稿误写为datahub-app)。下述权限正则均来自本地 MQ 实测(curl mgmt-api/permissions)+backend/bin/rabbitmq.sh,是现网真实使用的正则,不要随意改动。
1. 平台开发者账号(19 个)
用户名规范:user_{platform},其中 {platform} 是 platforms 表 name 字段经 strtolower(str_replace(' ', '_', $name)) 标准化后的结果。
完整列表:user_amazon_japan / user_coupang / user_dewu / user_douyin / user_goofish / user_jd / user_kaola / user_lazada / user_naver / user_offline / user_pdd / user_rakuten / user_redbook / user_shopee / user_shopify / user_tmall / user_wechat / user_wechat_video / user_youzan
权限正则模板(按平台名展开):
configure = ^{platform}\.(exchange|errors\.exchange)$
write = ^{platform}\.(exchange|errors\.exchange)$
read = ^{platform}\.errors\..*$
示例(tmall):
configure = ^tmall\.(exchange|errors\.exchange)$
write = ^tmall\.(exchange|errors\.exchange)$
read = ^tmall\.errors\..*$
密码 env 命名:MQ_PASSWORD_{PLATFORM_UPPER},其中 PLATFORM_UPPER = 平台标准化名转大写。例:MQ_PASSWORD_TMALL、MQ_PASSWORD_AMAZON_JAPAN、MQ_PASSWORD_WECHAT_VIDEO。密码由部署期 render-rabbitmq.sh 生成;由 mq_user.php 动态从 platforms 表读取启用平台后注入。
2. Datahub 消费者账号
用户名: user_datahub_consumer
密码: env MQ_PASSWORD_CONSUMER
VHost: datahub
configure = ^(main\.exchange|errors\.exchange|dlx\..*)|(.*\.queue)$
write = ^(orders|products|refunds|inventory).*\.queue$|(dlx\..*)|(errors\.exchange)|(.*\.errors\.exchange)$
read = ^(main\.exchange|(orders|products|refunds|inventory).*\.queue|dlx\..*)$
⚠ 此权限正则比早期文档("只读业务队列")宽松:消费者需要 configure main.exchange / errors.exchange / dlx.* / 所有 queue(因为 Hyperf Consumer Annotation 在 worker 启动时 declare 这些资源),需要 write errors.exchange + platform.errors.exchange(因为
ErrorProducer走这条),需要 read main.exchange + 业务队列 + dlx.*。
3. 运维监控账号
用户名: user_ops
密码: env MQ_PASSWORD_OPS
VHost: datahub
configure = ^errors\..*$
write = (留空)
read = ^errors\.queue$
仅能读 errors.queue 做人工捞回;与 DB failed_messages 表(应用层提供 Admin UI / API 查询)互为冗余。
4. 管理员账号
用户名: admin
密码: <强密码,仅 ops/dev 持有>
Tag: administrator
management API: AMQP_ADMIN_USER / AMQP_ADMIN_PASSWORD
(见 backend/config/autoload/amqp.php "management" 段)
完整权限,可访问管理界面。生产环境仅用于运维操作 / rabbitmqadmin CLI / mgmt API;不暴露给应用代码。
消息格式规范
1. message_id 设计
message_id 采用结构化 5 段格式(见 OrderProducer::generateMessageId):
格式:
{company_id}#{platform_id}#{store_id}#{entity_type}#{unique_id}
字段说明:
| 段 | 含义 | 来源 |
|---|---|---|
company_id |
公司 ID | producer 输入参数 |
platform_id |
平台 ID(数据库主键) | producer 输入参数 |
store_id |
店铺 ID | producer 输入参数 |
entity_type |
数据实体类型(order/product/refund/inventory),单数 | 由 routingKey 前缀自动派生(explode('.', $routingKey)[0]) |
unique_id |
业务侧定义的唯一标识 / 时间区间 / batch id 等,业务侧维护幂等性 | producer 输入参数 |
示例:
188#2#292#order#68a3f2c4e9b1f
解析:company=188, platform=2 (tmall), store=292, entity=order, unique_id=68a3f2c4e9b1f(uniqid())
分隔符:#(RabbitMQ message_id 字段允许任意字符;不与 routing key 通配符冲突)。
特性:
- ✅ 幂等性:业务侧通过
unique_id保证同一实体的 message_id 一致(消费端 upsert 唯一键由 EntityParse 配置) - ✅ 可读性:直接从 message_id 看出数据来源 4 段元数据
- ✅ 便于调试:日志 / 监控可直接根据前 4 段过滤
2. 业务消息格式
由 OrderProducer::buildMessage 构造(基类,所有 platform Producer 共用):
{
"message_id": "188#2#292#order#68a3f2c4e9b1f",
"timestamp": "2026-05-14T13:43:59+08:00",
"platform": "tmall",
"data_type": "order",
"meta": {
"platform_id": 2,
"company_id": 188,
"store_id": 292,
"platform_store_id": null,
"unique_id": "68a3f2c4e9b1f",
"source_system": "tmall_api",
"retry_count": 0,
"data_version": 1747203839
},
"data": [
{ /* 平台原始 raw_data 元素 1 */ },
{ /* 平台原始 raw_data 元素 2 */ }
]
}
字段说明:
| 字段 | 类型 | 必需 | 说明 |
|---|---|---|---|
message_id |
string | 是 | 5 段结构化 ID(见 §1) |
timestamp |
string | 是 | ISO 8601 格式(date('c')) |
platform |
string | 是 | 平台标识(标准化后的小写下划线名);基类默认 'platform_unkonw',子类 buildMessage 中覆盖(如 TmallProductProducer 覆盖为 'tmall') |
data_type |
string | 是 | 数据类型(order/product/refund/inventory,单数);基类默认 'order',子类覆盖 |
meta.platform_id |
int | 是 | 平台 ID(数据库主键,来自 platforms 表) |
meta.company_id |
int | 是 | 公司 ID |
meta.store_id |
int | 是 | 店铺 ID |
meta.platform_store_id |
string|null | 否 | 平台侧店铺标识(如有) |
meta.unique_id |
string | 是 | 业务唯一标识(与 message_id 末段一致) |
meta.source_system |
string | 是 | 数据来源系统(基类默认 'origin_unknow',子类覆盖如 'tmall_api') |
meta.retry_count |
int | 是 | 重试次数(初始 0;消费端通过 AMQP x-death[0].count 读取真实值,不读 meta) |
meta.data_version |
int | 是 | 数据版本(Unix 时间戳,用于乱序消费保护) |
data |
array | 是 | 平台原始数据数组(基类直接传 $data['raw_data']) |
⚠ 字段名是
meta不是metadata(与早期文档草稿不同)。消费端OrderConsumer.php:86读$data['meta']。
关于 data_version:建议使用平台数据的更新时间戳,消费端可据此防止旧版本覆盖新版本;如平台未提供则 OrderProducer::buildMessage fallback 到 time()。
3. 错误消息格式
由 ErrorProducer::buildErrorMessage 构造(投递到 errors.exchange rk=error):
{
"error_id": "err_68f3d2c4abc1f.93214785",
"original_message": {
/* 完整原始业务消息(含 meta / data)*/
},
"error": {
"type": "InvalidArgumentException",
"message": "Company with ID 188 not found",
"code": 0,
"file": "/var/www/app/Platform/Tmall/EntityParse/Product.php",
"line": 42,
"trace": "Exception stack trace...",
"timestamp": "2026-05-14T13:31:00+08:00"
},
"metadata": {
"platform": "tmall",
"platform_id": 2,
"company_id": 188,
"store_id": 292,
"data_type": "order",
"message_id": "188#2#292#order#68a3f2c4e9b1f",
"failed_at": "2026-05-14T13:31:00+08:00",
"retry_count": 3
}
}
注意:错误消息的字段名是
metadata(与业务消息的meta不一致;保留以兼容现有应用代码FailedMessageTrait::persistFailedMessage的解析)。
双写到 DB:错误消息同步写入 failed_messages 表(model 字段:error_id / data_type / platform / platform_id / company_id / store_id / error_type / error_message / error_code / error_trace / original_message(json) / retry_count / message_id / failed_at / created_at)。Admin 可通过 FailedMessageController 查询。
RabbitMQ 管理界面操作指引
1. 访问管理界面
URL: http://<rabbitmq-host>:15672
用户名: admin(开发期默认 admin/admin;生产期通过 podman secret / env 注入)
密码: <管理员密码>
推荐使用 Hyperf 命令(避开浏览器,看核心 9 个队列状态):
php ./bin/hyperf.php app:mq:status # 单次输出
php ./bin/hyperf.php app:mq:status --watch --interval=5 # 每 5 秒刷新
输出包含三组:Business Queues(4 业务)/ Dead Letter Queues (Retry Queues)(4 retry)/ Shared Queues(errors),分别展示 Messages / Consumers / Status。
2. 创建 VHost
- 点击顶部导航 Admin → Virtual Hosts
- 点击 Add a new virtual host
- 输入 VHost 名称:
datahub - 点击 Add virtual host
3. 创建用户
- 点击顶部导航 Admin → Users
- 点击 Add a user
- 填写用户信息:
- Username:
user_tmall(按user_{platform}规范) - Password:
<安全密码> - Tags: (留空,非管理员用户)
- Username:
- 点击 Add user
4. 配置用户权限
- 在用户列表中,点击用户名(如
user_tmall) - 滚动到 Permissions 部分
- 选择 VHost:
datahub - 配置权限正则表达式(与现网模板一致):
- Configure regexp:
^tmall\.(exchange|errors\.exchange)$ - Write regexp:
^tmall\.(exchange|errors\.exchange)$ - Read regexp:
^tmall\.errors\..*$
- Configure regexp:
- 点击 Set permission
5. 创建 Exchange
- 点击顶部导航 Exchanges
- 确保选择正确的 VHost:
datahub - 点击 Add a new exchange
- 填写配置:
- Name:
tmall.exchange - Type:
topic - Durability:
Durable - Auto delete:
No - Internal:
No
- Name:
- 点击 Add exchange
重复以上步骤创建:1 个 main.exchange + 4 个 dlx.{dtype} + 1 个 errors.exchange + 每平台 2 个({platform}.exchange 与 {platform}.errors.exchange)。
6. 创建 Queue
- 点击顶部导航 Queues
- 确保选择正确的 VHost:
datahub - 点击 Add a new queue
- 填写配置(以
orders.queue为例):- Name:
orders.queue - Durability:
Durable - Auto delete:
No - Arguments:
x-message-ttl=86400000(24h)x-dead-letter-exchange=dlx.ordersx-dead-letter-routing-key=retryx-queue-type=classic
- Name:
- 点击 Add queue
四个业务队列同模式;retry 队列 arguments:
x-message-ttl = 5000 # 5s
x-dead-letter-exchange = main.exchange
x-dead-letter-routing-key = {entity_singular}.retry # 如 order.retry
x-queue-type = classic
errors.queue arguments:
x-message-ttl = 604800000 # 7d
x-queue-type = classic
7. 创建 Binding
- 点击顶部导航 Exchanges
- 点击需要绑定的 Exchange(如
tmall.exchange) - 滚动到 Bindings 部分
- 在 Add binding from this exchange 区域填写:
- To queue:
orders.queue - Routing key:
order.tmall
- To queue:
- 点击 Bind
为每个平台 Exchange 创建 4 个 binding(order/product/refund/inventory)。
main.exchange 创建 4 个 binding(rk: order.# / product.# / refund.# / inventory.#)。
每个 {platform}.errors.exchange + 全局 errors.exchange 各创建 1 个绑定到 errors.queue(rk=#)。
DLX 各创建 1 个绑定到对应 retry queue(rk=retry)。不绑定 errors.queue。
配置脚本
生产环境推荐:使用
backend/bin/rabbitmq.sh(与现网完全一致的脚本,从platforms表动态读取平台列表)。下方独立脚本仅供临时手工部署 / 故障排查 使用。
使用 rabbitmqadmin
RabbitMQ 提供命令行工具 rabbitmqadmin,可用于批量配置。
安装 rabbitmqadmin
# 从 RabbitMQ 管理界面下载
wget http://<rabbitmq-host>:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/
配置脚本示例
注意:本脚本适用于新版本 rabbitmqadmin(Rust 版本)。如果您使用的是旧版本(Python),请参考官方文档调整语法。
#!/bin/bash
# RabbitMQ 连接信息
RABBITMQ_HOST="127.0.0.1"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin"
VHOST="datahub"
# 平台列表(与现网 19 个对齐;新增平台时追加即可)
PLATFORMS=(
"amazon_japan" "coupang" "dewu" "douyin" "goofish"
"jd" "kaola" "lazada" "naver" "offline"
"pdd" "rakuten" "redbook" "shopee" "shopify"
"tmall" "wechat" "wechat_video" "youzan"
)
DATA_TYPES=("orders" "products" "refunds" "inventory")
# 0. 检测并清理现有 VHost 和用户(如果存在)
echo "========================================"
echo "开始清理现有配置..."
echo "========================================"
# 检查 VHost 是否存在
echo "检查 VHost: $VHOST 是否存在..."
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true)
if [ -n "$VHOST_EXISTS" ]; then
echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..."
# 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete vhost --name "$VHOST"
echo "✓ VHost '$VHOST' 及其所有资源已删除"
else
echo "VHost '$VHOST' 不存在"
fi
# 清理所有相关用户账号
echo ""
echo "清理用户账号..."
# 平台用户列表
ALL_USERS=()
for platform in "${PLATFORMS[@]}"; do
ALL_USERS+=("user_${platform}")
done
# 添加其他用户
ALL_USERS+=("user_datahub_consumer")
ALL_USERS+=("user_ops")
# 删除用户(如果存在)
for user in "${ALL_USERS[@]}"; do
USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true)
if [ -n "$USER_EXISTS" ]; then
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete user --name "$user"
echo "✓ 删除用户: $user"
fi
done
echo ""
echo "========================================"
echo "清理完成,开始重新配置..."
echo "========================================"
echo ""
# 1. 创建 VHost
echo "创建 VHost: $VHOST"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare vhost --name "$VHOST"
# 2. 创建主 Exchange(用于重试消息回流)
echo "创建主 Exchange: main.exchange"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "main.exchange" --vhost "$VHOST" \
--type topic --durable true
# 3. 创建 DLX (死信交换机)
echo "创建死信交换机 (DLX)..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建 DLX: dlx.${dtype}"
done
# 4. 创建主业务队列(带 DLX 配置)
echo ""
echo "创建主业务队列..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable \
--arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
echo "✓ 创建队列: ${dtype}.queue"
done
# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
echo ""
echo "创建重试队列..."
for dtype in "${DATA_TYPES[@]}"; do
dtype_singular="${dtype%s}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable \
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}"
echo "✓ 创建重试队列: ${dtype}.retry.queue (dl-rk: ${dtype_singular}.retry)"
done
# 6. 创建错误队列
echo ""
echo "创建错误队列..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "errors.queue" --vhost "$VHOST" --durable \
--arguments '{"x-message-ttl":604800000}'
echo "✓ 创建错误队列: errors.queue"
# 7. 绑定主 Exchange 到主队列(使用通配符)
echo ""
echo "绑定主 Exchange 到主队列..."
for dtype in "${DATA_TYPES[@]}"; do
# 使用 order.#, product.# 等通配符匹配所有平台
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "main.exchange" --destination "${dtype}.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#"
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
done
# 8. 绑定 DLX 到重试队列(注意:现网 DLX 不绑定 errors.queue,错误由应用层 ErrorProducer 直接推 errors.exchange)
echo ""
echo "绑定 DLX 到重试队列..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "retry"
echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)"
done
# 8b. 创建全局 errors.exchange 并绑定到 errors.queue
echo ""
echo "创建全局错误 Exchange..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "errors.exchange" --vhost "$VHOST" --type topic --durable true
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "#"
echo "✓ 绑定: errors.exchange → errors.queue (routing_key: #)"
# 9. 为每个平台创建 Exchange 和 Binding
echo ""
echo "========================================"
echo "创建平台 Exchange 和 Binding..."
echo "========================================"
for platform in "${PLATFORMS[@]}"; do
echo ""
echo "处理平台: ${platform}"
# 创建平台业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建业务 Exchange: ${platform}.exchange"
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "orders.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "order.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "products.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "product.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "refunds.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "inventory.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}"
echo "✓ 绑定到业务队列 (4个)"
# 创建平台错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建错误 Exchange: ${platform}.errors.exchange"
# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "#"
echo "✓ 绑定到错误队列"
# 创建平台用户
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_${platform}" --password "change_me_${platform}" --tags ""
echo "✓ 创建用户: user_${platform}"
# 配置平台用户权限
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_${platform}" \
--configure "^${platform}\\.(exchange|errors\\.exchange)$" \
--write "^${platform}\\.(exchange|errors\\.exchange)$" \
--read "^${platform}\\.errors\\..*$"
echo "✓ 配置用户权限"
done
# 10. 创建 datahub 消费者用户
echo ""
echo "========================================"
echo "创建 datahub 消费者用户..."
echo "========================================"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_datahub_consumer" --password "change_me_consumer" --tags ""
echo "✓ 创建用户: user_datahub_consumer"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_datahub_consumer" \
--configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \
--write "^(orders|products|refunds|inventory).*\\.queue\$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)\$" \
--read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)\$"
echo "✓ 配置用户权限"
# 11. 创建运维监控用户
echo ""
echo "创建运维监控用户..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_ops" --password "change_me_ops" --tags ""
echo "✓ 创建用户: user_ops"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_ops" \
--configure "^errors\\..*$" --write "" --read "^errors\\.queue$"
echo "✓ 配置用户权限"
echo ""
echo "========================================"
echo "RabbitMQ 配置完成!"
echo "========================================"
echo ""
echo "已创建:"
echo "- 1 个 VHost: $VHOST"
echo "- 1 个主 Exchange: main.exchange"
echo "- 1 个全局错误 Exchange: errors.exchange"
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
echo "- 1 个错误队列: errors.queue"
echo "- $((${#PLATFORMS[@]} * 2)) 个平台 Exchange (${#PLATFORMS[@]} 业务 + ${#PLATFORMS[@]} 错误)"
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
echo ""
echo "提示: 请修改所有用户的默认密码 (change_me_*)"
echo "提示: 验证拓扑:php ./bin/hyperf.php app:mq:status"
执行脚本
chmod +x setup_rabbitmq.sh
./setup_rabbitmq.sh
监控和维护
1. 监控指标
通过 RabbitMQ 管理界面查看以下关键指标:
队列健康状态
- 点击 Queues → 选择队列
- 关注以下指标:
- Ready: 待消费消息数(正常应 < 1000)
- Unacked: 未确认消息数(正常应较低)
- Total: 总消息数
- Incoming rate: 消息入队速率
- Deliver / Get rate: 消息消费速率
告警阈值:
- Ready > 10000:队列堆积严重
- Unacked > 5000:消费者处理缓慢
- Incoming rate >> Deliver rate:消费速度跟不上生产速度
Exchange 流量统计
- 点击 Exchanges → 选择 Exchange
- 查看 Message rates:
- Publish in: 消息发布速率
- Publish out: 消息路由到队列的速率
异常情况:
- Publish in > 0 但 Publish out = 0:可能绑定配置错误
2. 常见问题排查
消息未到达队列
检查步骤:
- 确认 Exchange 存在且类型正确
- 检查 Binding 配置,确保 routing key 匹配
- 查看 Exchange 的 Message rates,确认消息已发布
- 检查生产者是否使用了正确的 routing key
消费者无法消费
检查步骤:
- 确认用户权限配置正确(Read 权限)
- 检查队列中是否有消息(Ready > 0)
- 查看消费者连接状态(Connections 页面)
- 检查消费者代码是否正确 ACK 消息
错误消息未送达平台开发者
检查步骤:
- 确认错误 Exchange 绑定到
errors.queue - 检查平台开发者账号的 Read 权限
- 确认平台开发者订阅了正确的 Exchange
- 查看
errors.queue中是否有消息
3. 备份和恢复
导出配置
# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
export datahub-backup.json
导入配置
# 从备份恢复配置
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
import datahub-backup.json
4. 性能优化
消费者并发配置
- 增加消费者实例数量
- 每个消费者配置多个 channel(如 5-10 个)
- 使用 prefetch_count 限制未确认消息数量
队列性能优化
- 启用 lazy queue(大量消息堆积时):
rabbitmqadmin declare queue name=orders.queue vhost=datahub \ durable=true arguments='{"x-queue-mode":"lazy"}' - 定期清理过期消息(通过 TTL 自动实现)
5. 日志查看
# 查看 RabbitMQ 日志
tail -f /var/log/rabbitmq/rabbit@<hostname>.log
# 查看连接日志
tail -f /var/log/rabbitmq/rabbit@<hostname>_connection.log
安全建议
1. 密码策略
- 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符)
- 定期更换密码(建议 3-6 个月)
- 不同环境(开发/测试/生产)使用不同密码
2. 网络安全
- 生产环境启用 TLS/SSL 加密连接
- 限制管理界面访问(仅允许内网或 VPN 访问)
- 配置防火墙规则:
- 5672:AMQP 端口(仅允许应用服务器访问)
- 15672:管理界面(仅允许管理员 IP 访问)
3. 权限最小化原则
- 平台开发者只能访问自己的 Exchange
- 消费者只能读取必要的队列
- 避免赋予不必要的 Configure 权限
4. 审计日志
- 启用 RabbitMQ 审计日志插件
- 定期审查用户操作记录
- 监控异常登录行为
新增平台接入流程
首选路径:使用
backend/bin/rabbitmq.sh add <platform>(自动化、与现网一致);下方手工脚本仅作参考。
新增平台(示例 <platform> = 标准化后的小写下划线名):
1. 创建 Exchange(业务 + 错误)
P="<platform>" # 例如 "tiktok_shop"
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare exchange --name "${P}.exchange" --vhost datahub --type topic --durable true
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare exchange --name "${P}.errors.exchange" --vhost datahub --type topic --durable true
2. 创建 5 条 Binding(4 业务 + 1 错误)
for DTYPE in orders products refunds inventory; do
DTYPE_SINGULAR="${DTYPE%s}"
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare binding --source "${P}.exchange" --destination "${DTYPE}.queue" \
--destination-type queue --vhost datahub \
--routing-key "${DTYPE_SINGULAR}.${P}"
done
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare binding --source "${P}.errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost datahub --routing-key "#"
3. 创建用户并配置权限
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare user --name "user_${P}" --password "<secure_password>" --tags ""
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare permissions --vhost datahub --user "user_${P}" \
--configure "^${P}\\.(exchange|errors\\.exchange)\$" \
--write "^${P}\\.(exchange|errors\\.exchange)\$" \
--read "^${P}\\.errors\\..*\$"
4. 同步到 backend
- DB:
INSERT INTO platforms (name, enabled, ...) VALUES ('<Platform Display Name>', true, ...),确保mq_user.php动态读取时能匹配到 - env:在 backend 的
.env(或 host 端datahub-backend.env)追加MQ_PASSWORD_<PLATFORM_UPPER>=<secure_password> - 重启 backend:
systemctl --user restart datahub-backend,让 Hyperf worker 重新加载 mq_user.php 与 env
5. 提供给平台开发者
RabbitMQ 连接信息:
- Host: <rabbitmq-host>
- Port: 5672
- VHost: datahub
- Username: user_<platform>
- Password: <secure_password>
发布配置:
- Exchange: <platform>.exchange
- Routing Keys:
- order.<platform> → 发布订单数据
- product.<platform> → 发布产品数据
- refund.<platform> → 发布退款数据
- inventory.<platform> → 发布库存数据
错误投递(可选):
- Exchange: <platform>.errors.exchange
- Routing Key: 任意(由 binding `#` 兜底)
- 用途:平台侧主动上报无法处理的消息 / 校验失败的数据
消息格式规范:
- 参见「消息格式规范」章节(5 段 message_id;meta 字段单数;data_version 时间戳防乱序)
附录
A. 完整资源清单(与本地 MQ 实测对齐 — 2026-05-14)
VHost
- 总数:1 个
datahub
Exchanges
| 类别 | 数量 | 名称 |
|---|---|---|
| 主路由 | 1 | main.exchange(topic) |
| 全局错误 | 1 | errors.exchange(topic) |
| DLX | 4 | dlx.orders / dlx.products / dlx.refunds / dlx.inventory(全 topic) |
| 平台业务 | 19 | {platform}.exchange × 19(全 topic) |
| 平台错误 | 19 | {platform}.errors.exchange × 19(全 topic) |
Exchange 业务总数:44 个(不含 amq.* 系统 exchange 7 个)
Queues
| 类别 | 数量 | 名称 + 关键 arguments |
|---|---|---|
| 业务队列 | 4 | {dtype}.queue:ttl=24h, dl-exchange=dlx.{dtype}, dl-rk=retry, type=classic |
| 重试队列 | 4 | {dtype}.retry.queue:ttl=5s, dl-exchange=main.exchange, dl-rk={entity_singular}.retry, type=classic |
| 错误队列 | 1 | errors.queue:ttl=7d, 无 DLX, type=classic |
Queue 总数:9 个(与 php ./bin/hyperf.php app:mq:status 输出一致)
Bindings
| 类别 | 数量 | 路径 |
|---|---|---|
| 平台业务 → 业务队列 | 76 | {platform}.exchange —rk={entity}.{platform}→ {dtype}.queue(19 × 4) |
| main → 业务队列 | 4 | main.exchange —rk={entity}.#→ {dtype}.queue |
| DLX → 重试队列 | 4 | dlx.{dtype} —rk=retry→ {dtype}.retry.queue(仅此一种,不绑定 errors.queue) |
| 错误 exchange → errors.queue | 20 | errors.exchange 1 条 + {platform}.errors.exchange × 19 条,全部 rk=# |
Binding 业务总数:104 条(不含 default exchange 自动同名 binding 9 条)
死信回流路径(retry → main)不是 binding,而是 retry queue 的
x-dead-letter-exchangearguments 字段,RabbitMQ 引擎在 TTL 到期时自动投递。
Users(vhost datahub)
| 类别 | 数量 | 命名 | 权限说明 |
|---|---|---|---|
| 平台开发者 | 19 | user_{platform} |
仅写自己的 {platform}.exchange / {platform}.errors.exchange,读自己的 errors(最小权限) |
| Datahub 消费者 | 1 | user_datahub_consumer |
configure/write/read 见 §"消费者账号"现网正则 |
| 运维监控 | 1 | user_ops |
仅读 errors.queue |
| 管理员 | 1 | admin |
tag=administrator,全权限 |
User 总数:22 个
完整平台用户清单:
user_amazon_japan, user_coupang, user_dewu, user_douyin, user_goofish,
user_jd, user_kaola, user_lazada, user_naver, user_offline,
user_pdd, user_rakuten, user_redbook, user_shopee, user_shopify,
user_tmall, user_wechat, user_wechat_video, user_youzan
架构统计
| 资源类型 | 数量 | 说明 |
|---|---|---|
| VHost | 1 | datahub |
| Exchange | 44 | 1 main + 1 全局 errors + 4 DLX + 19 平台业务 + 19 平台错误 |
| Queue | 9 | 4 业务 + 4 retry + 1 errors |
| Binding | 104 | 见上表(不含 default exchange 自动 binding) |
| User | 22 | 19 平台 + 1 消费者 + 1 运维 + 1 管理员 |
消息流转路径(双层兜底):
生产者 → {platform}.exchange ──rk={entity}.{platform}──► {dtype}.queue ──► Consumer
│
消费失败 (count < max) ↓
◄──── NACK
│
▼
dlx.{dtype} ──rk=retry──► {dtype}.retry.queue
│ TTL=5s
▼
main.exchange (rk={entity}.retry)
│
▼
{dtype}.queue (重新消费, count+1)
│
消费失败 (count >= max)
│
FailedMessageTrait::sendToErrorQueue
│
┌───────────────────────┼───────────────────────┐
▼ ▼
ErrorProducer ──► errors.exchange ──► errors.queue FailedMessage::create
(MQ 兜底, TTL 7d, user_ops 捞回) (DB 兜底, failed_messages 表)
│
▼
Consumer 返回 ACK
(避免主队列继续重试)