Files
datahub/docs/RabbitMQ.md
T
2026-05-15 08:48:24 +08:00

1613 lines
66 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# RabbitMQ 配置方案
> **权威源声明**:本文档已与现网代码对齐(2026-05-14)。如本文与代码冲突,**以代码为准**:
> - 拓扑创建脚本:[`backend/bin/rabbitmq.sh`](../backend/bin/rabbitmq.sh)
> - 用户/凭据配置:[`backend/config/autoload/mq_user.php`](../backend/config/autoload/mq_user.php) + [`backend/config/autoload/amqp.php`](../backend/config/autoload/amqp.php)
> - 业务消费者(注解 declare):`backend/app/Platform/{Order,Product,Refund}Consumer.php`
> - 错误兜底(双写):[`backend/app/Platform/Traits/FailedMessageTrait.php`](../backend/app/Platform/Traits/FailedMessageTrait.php) + [`backend/app/Platform/ErrorProducer.php`](../backend/app/Platform/ErrorProducer.php) + [`backend/app/Model/FailedMessage.php`](../backend/app/Model/FailedMessage.php)
> - 运维状态检查:`php ./bin/hyperf.php app:mq:status [--interval=N] [--watch]`[`backend/app/Command/AppMqStatus.php`](../backend/app/Command/AppMqStatus.php) + [`backend/app/Service/MqStatusService.php`](../backend/app/Service/MqStatusService.php)
**配置层级说明**`mq_user.php` 从数据库 `platforms` 表动态读取启用的平台名,按 `strtolower(str_replace(' ', '_', $name))` 标准化生成 `user_{platform}` 用户名与 `MQ_PASSWORD_{PLATFORM_UPPER}` env 变量名;`amqp.php``default_consumer` 用户硬编码为 `user_datahub_consumer`,密码由 env `MQ_PASSWORD_CONSUMER` 注入。
## 概述
本文档描述 datahub 应用的 RabbitMQ 消息队列架构设计和配置方法。
### 业务背景
- **应用角色**:datahub 是消息消费者,负责接收来自多个电商平台的数据并入库
- **数据来源**19 个电商/线下平台(amazon_japan、coupang、dewu、douyin、goofish、jd、kaola、lazada、naver、offline、pdd、rakuten、redbook、shopee、shopify、tmall、wechat、wechat_video、youzan)— 平台列表由 `platforms` 表动态管理,名称按 `strtolower(str_replace(' ', '_', $name))` 标准化
- **数据类型**:订单(orders)、产品(products)、退款(refunds)、库存(inventory
- **隔离需求**:不同平台开发者之间的工作空间需要相互隔离,互不影响
### 架构特点
- **VHost 隔离**:按应用划分 VHost,使用 `Classic` 队列类型
- **Exchange 隔离**:每个平台独立的业务 Exchange + 错误 Exchange,通过权限正则控制访问(用户只能写自己的 platform.exchange / platform.errors.exchange
- **单队列设计**:每种数据类型一个队列,所有平台共享(`orders.queue` / `products.queue` / `refunds.queue` / `inventory.queue`),保证同 data_type 内严格 FIFO
- **独立消费者**:每种数据类型一个 Hyperf Consumer 实例,使用 prefetch + 单条 ACK 模式
- **智能重试**:DLX(死信交换机)+ 延迟重试队列(TTL=5s 自动回流 main.exchange+ 应用层 ErrorProducer 兜底
- **双层错误持久化**:超过重试上限的消息 **同时**写入 `errors.queue` (MQ, TTL=7d) + DB `failed_messages` 表(人工捞回与统计分析)
- **消息持久化**RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API
---
## 设计方案总结
### 核心设计理念
本方案采用 **Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式**,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。
### 关键设计决策
** 注意 **
1. 消息内部字段无严格要求,但可以在业务侧进行约束
2. 默认单条消息的最大 Payload 尺寸为 128MB
3. 消息队列为缓存系统,应该尽可能的将批量数据放入 payload\
4. MQ 存在的意义在于快速接受远程生产的 Message
#### 1. 为什么使用单队列?
| 问题 | 传统多队列方案 | 本方案(单队列) |
|------|--------------|----------------|
| **队列数量** | 19 平台 × 4 数据类型 = 76 个队列 | 4 个数据类型队列(数量减少 ~95%,且与平台数量解耦)|
| **FIFO 保证** | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 |
| **运维复杂度** | 需要监控 76 个队列 | 只需监控 4 个主队列 + 4 个 retry 队列 + 1 个 errors 队列 = 9 个 |
| **消费者部署** | 需要 N 个消费者实例 | 只需 4 个消费者实例 |
| **新增平台成本** | 新建 4 队列 + 4 binding + 用户/权限 | 仅新建 1 业务 exchange + 1 错误 exchange + 5 binding + 1 用户/权限(队列不动)|
**原因**:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。
#### 2. 为什么使用单消费者?
**原因**:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。
**优势**
- ✅ 保证严格 FIFO 顺序
- ✅ 避免消息错投到错误的消费者
- ✅ 简化部署和运维
**如何实现多平台支持?** 通过消费者内部的**适配器模式**,根据消息的 `platform` 字段分发到不同业务逻辑。
#### 3. 为什么使用 Prefetch + 批处理?
**原因**RabbitMQ 是 **Push 模式**,不提供 Kafka 式的批量拉取 API。
**解决方案**
- 设置 `prefetch_count=100`RabbitMQ 最多推送 100 条未确认消息
- 消费者自行缓存消息,达到 `batch_size=50``timeout=3s` 后批量处理
- 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性
**优势**
- ✅ 提升数据库写入效率(批量插入)
- ✅ 控制并发和内存压力
- ✅ 失败消息不影响其他消息(单条 ACK)
#### 4. 为什么使用 DLX + 延迟重试队列?
**问题**:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。
**传统方案的问题**
-`nack(requeue=true)`:消息回队列头部,无限循环,**永远不会进入 DLX**
- ❌ 直接丢弃:数据丢失
- ❌ 立即重试:打爆 API/DB
**本方案(DLX + 延迟重试 + 双写错误持久化)**
```
消费失败 → Consumer 检查 x-death count
count < AMQP_MAX_RETRIES(默认 3)
Consumer 返回 NACK(requeue=false)
源 queue 的 x-dead-letter-exchange = dlx.{dtype}, dl-rk = "retry"
dlx.{dtype} (topic) — binding rk="retry" → {dtype}.retry.queue (TTL=5s)
retry.queue 的 x-dead-letter-exchange = main.exchange, dl-rk = "{entity}.retry"
↓ 5s 后死信回流
main.exchange (topic) — binding "{entity}.#" → {dtype}.queue (重新消费, x-death count+1)
count >= AMQP_MAX_RETRIES
Consumer 调用 FailedMessageTrait::sendToErrorQueue
↓ ┌─ ErrorProducer.publish → errors.exchange (rk="error") → errors.queue (TTL=7d) ← MQ 兜底
└─ FailedMessage::create → DB `failed_messages` 表 ← DB 兜底
Consumer 返回 ACK(避免主队列继续重试)
```
**重要说明**
- ⚠️ **必须设置 `requeue=false`**:只有 `requeue=false` 才会触发 DLX 机制(已由 `protected bool $requeue = false` 在所有 Consumer 中固定)
- ⚠️ **`requeue=true` 会导致无限循环**:消息直接回到原队列头部,永远不进入 DLX
-**retry 队列自动回流**TTL 到期后由 RabbitMQ 引擎自动死信回 main.exchange,不需要额外消费者
-**Consumer 控制重试次数**:通过 `FailedMessageTrait::getRetryCount``x-death[0].count`
-**错误兜底双写**:超过重试上限时,errors.queue + failed_messages 表**同步双写**,互为备份;监控/告警/捞回需同时覆盖两个数据源
-**DLX 不直接绑定 errors.queue**:错误进入 errors.queue 完全由应用层 `ErrorProducer` 主动 publish 到 `errors.exchange` 实现,**不**走 DLX 路由分流
**优势**
- ✅ 不会卡 FIFO
- ✅ 不会无限重试(通过 max retries 限制)
- ✅ 延迟重试避免打爆 API/DB
- ✅ 错误消息自动隔离到 error 队列
- ✅ 无需额外 Redis 或手写 retry 逻辑
- ✅ retry 过程完全自动化(依赖 RabbitMQ 的 DLX + TTL 机制)
### 方案优势总结
| 维度 | 优势 |
|------|------|
| **可靠性** | 消息持久化 + 单条 ACK + 智能重试,避免数据丢失 |
| **性能** | Prefetch + 批处理 + 数据库批量写入,提升吞吐 |
| **顺序性** | 单队列 FIFO 保证严格顺序 |
| **扩展性** | 适配器模式,新增平台只需添加适配器类 |
| **运维** | 队列数量少,监控简单,故障排查容易 |
| **容错** | DLX 智能重试,失败消息不阻塞主队列 |
### 适用场景
**适合**
- 多平台数据同步(电商、物流、金融等)
- 需要严格 FIFO 顺序
- 需要高可靠性和高吞吐
- 平台数量多(> 5个),数据类型固定(< 10个)
**不适合**
- 实时性要求极高(< 100ms
- 需要按平台独立控制消费速率
- 平台数量少(< 3个),可以使用独立队列
---
## Requeue 机制详解
### requeue 参数的作用
在 RabbitMQ 中,当消费者处理消息失败时,可以通过 `basic_nack()``basic_reject()` 拒绝消息,第三个参数 `requeue` 决定了消息的去向:
```php
// requeue=true:消息重新回到原队列(队列头部)
$channel->basic_nack($deliveryTag, false, true);
// requeue=false:消息进入 DLX(如果配置了),否则丢弃
$channel->basic_nack($deliveryTag, false, false);
```
### requeue=true 的问题
| 行为 | 后果 |
|------|------|
| 消息回到原队列头部 | 立即被同一个消费者再次取出 |
| 不触发 DLX | 永远不会进入重试队列或错误队列 |
| 无延迟 | 瞬间形成高速循环(毫秒级) |
| 无计数 | 无法统计重试次数,无法设置上限 |
**结果**:消息在 `消费 → 失败 → NACK → 回队列 → 消费` 之间无限循环,CPU 和网络资源被浪费,队列状态显示为空(消息循环太快无法观察)。
### requeue=false 的正确用法
```
orders.queue
arguments:
x-dead-letter-exchange = dlx.orders
x-dead-letter-routing-key = retry
x-message-ttl = 86400000 (24h)
↓ Consumer 消费失败 (count < max_retries)
↓ basic_nack(requeue=false)
↓ 触发 DLX 机制
dlx.orders (topic)
↓ binding key = "retry"
orders.retry.queue
arguments:
x-message-ttl = 5000 (5s)
x-dead-letter-exchange = main.exchange
x-dead-letter-routing-key = order.retry
↓ 5s 后 TTL 到期,自动死信回 main.exchange
main.exchange (topic)
↓ binding key = "order.#" 匹配 "order.retry"
orders.queue (重新进入主队列,x-death count+1)
```
> 注意:retry queue 的 `x-dead-letter-routing-key` 是 **`{entity_singular}.retry`**(如 `order.retry` / `product.retry`),不保留原 routing key。由于 main.exchange 用通配符 `order.#` 绑定 orders.queue`order.retry` 能正常匹配回流;但失去了"消息原平台来源"信息(platform 字段已存在 message 体内,不依赖 routing key 追溯)。
### 环境变量配置
`.env` 文件中可以配置以下参数:
```bash
# 最大重试次数(默认3次)
# 消息重试超过此次数后,会被发送到 errors.queue
AMQP_MAX_RETRIES=3
# 调试延迟(秒,默认0
# 设置为 2 可以让每条消息处理延迟2秒,方便在 mq:status 中观察队列状态
# 生产环境应设置为 0
AMQP_CONSUMER_DEBUG_DELAY=0
```
**使用示例**
```bash
# 开发环境:观察消息流转
AMQP_CONSUMER_DEBUG_DELAY=2 php bin/hyperf.php start
# 生产环境:正常运行
AMQP_CONSUMER_DEBUG_DELAY=0 php bin/hyperf.php start
```
---
## 架构设计
### 1. VHost 设计
VHost 按业务应用进行划分,便于资源隔离和权限管理。
**当前 VHost**
- `datahub` — 电商数据流转专用(注:早期文档误写为 `datahub`**现网实际为 `datahub`**
---
### 2. Exchange 设计
`datahub` VHost 中,Exchange 分为三类:
#### 主路由 Exchange
| Exchange 名称 | 类型 | 持久化 | 用途 |
|--------------|------|--------|------|
| `main.exchange` | Topic | 是 | **仅作为 retry 回流入口** + Consumer Annotation binding 声明源;**不**作为 producer 入口 |
> ⚠ producer 直推 `{platform}.exchange`**不**经过 main.exchange。main.exchange 的两个用途:① retry queue 的 `x-dead-letter-exchange` 指向它,retry 消息通过它回流业务队列;② Hyperf `#[Consumer(exchange: "main.exchange", routingKey: "{entity}.#", queue: "{dtype}.queue")]` 注解声明 binding 时用它。
#### 平台业务 Exchangeproducer 入口)
每个平台一个独立的 Topic Exchange,共 19 个(与 `platforms` 表 enabled 平台对齐):
| 平台 (`name` 标准化后) | Exchange |
|---|---|
| amazon_japan | `amazon_japan.exchange` |
| coupang | `coupang.exchange` |
| dewu | `dewu.exchange` |
| douyin | `douyin.exchange` |
| goofish | `goofish.exchange` |
| jd | `jd.exchange` |
| kaola | `kaola.exchange` |
| lazada | `lazada.exchange` |
| naver | `naver.exchange` |
| offline | `offline.exchange` |
| pdd | `pdd.exchange` |
| rakuten | `rakuten.exchange` |
| redbook | `redbook.exchange` |
| shopee | `shopee.exchange` |
| shopify | `shopify.exchange` |
| tmall | `tmall.exchange` |
| wechat | `wechat.exchange` |
| wechat_video | `wechat_video.exchange` |
| youzan | `youzan.exchange` |
全部 `type=topic, durable=true`
**Routing Key 规范**`{entity_singular}.{platform}` 格式,注意 entity 单数 / 平台名小写下划线):
| 数据类型 | 示例 routing key |
|---|---|
| 订单 (order) | `order.tmall` / `order.amazon_japan` / `order.wechat_video` |
| 产品 (product) | `product.tmall` / `product.amazon_japan` / `product.wechat_video` |
| 退款 (refund) | `refund.tmall` / `refund.amazon_japan` / `refund.wechat_video` |
| 库存 (inventory) | `inventory.tmall` / `inventory.amazon_japan` / `inventory.wechat_video` |
**设计说明**
- producer 直推 `{platform}.exchange`,使用 routing key `{entity_singular}.{platform}`
- 业务 binding 把 `{platform}.exchange` 通过 routing key 直绑到 4 个 `{dtype}.queue`(不经过 main.exchange
- 消费者通过消息体 `platform` / `data_type` 字段进行业务分发(**不**依赖 routing key 解析)
#### 平台错误 Exchange(应用层错误推送 + 平台错误通知)
每个平台一个独立的错误 Topic Exchange,共 19 个,命名 `{platform}.errors.exchange`
| 来源平台 | Errors Exchange |
|---|---|
| 19 个平台 | `{platform}.errors.exchange`amazon_japan / coupang / dewu / douyin / goofish / jd / kaola / lazada / naver / offline / pdd / rakuten / redbook / shopee / shopify / tmall / wechat / wechat_video / youzan|
每个 `{platform}.errors.exchange` 通过 binding rk=`#` 直连 `errors.queue`
#### 全局错误 Exchange
| Exchange 名称 | 类型 | 持久化 | 用途 |
|---|---|---|---|
| `errors.exchange` | Topic | 是 | 应用层 `ErrorProducer` 推送目标,binding rk=`#` 直连 `errors.queue` |
**Routing Key 规范(应用层 publish 时)**
- 应用层 `ErrorProducer.php` 固定使用 routing key = `error`(见代码)
- 平台 producer 在自检/校验失败时可向 `{platform}.errors.exchange` 自定义 routing key(业务侧决定,由 binding `#` 兜底)
---
### 3. Queue 设计
#### 业务队列(主队列)
物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | DLX | 用途 |
|-----------|--------|----------|----------|-----|------|
| `orders.queue` | 是 | 否 | 24小时 | `dlx.orders` | 接收所有平台的订单数据,单消费者批处理 |
| `products.queue` | 是 | 否 | 24小时 | `dlx.products` | 接收所有平台的产品数据,单消费者批处理 |
| `refunds.queue` | 是 | 否 | 24小时 | `dlx.refunds` | 接收所有平台的退款数据,单消费者批处理 |
| `inventory.queue` | 是 | 否 | 24小时 | `dlx.inventory` | 接收所有平台的库存数据,单消费者批处理 |
**队列参数配置**
```json
{
"x-message-ttl": 86400000,
"x-dead-letter-exchange": "dlx.orders",
"x-dead-letter-routing-key": "retry"
}
```
#### 死信交换机(DLX
每种数据类型配备一个 DLX,用于处理失败消息的路由:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|--------------|------|--------|------|
| `dlx.orders` | Topic | 是 | 订单失败消息路由 |
| `dlx.products` | Topic | 是 | 产品失败消息路由 |
| `dlx.refunds` | Topic | 是 | 退款失败消息路由 |
| `dlx.inventory` | Topic | 是 | 库存失败消息路由 |
#### 延迟重试队列
为每种数据类型提供延迟重试机制:
| Queue 名称 | 持久化 | 消息 TTL | DLX | 用途 |
|-----------|--------|----------|-----|------|
| `orders.retry.queue` | 是 | 5秒 | `main.exchange` | 订单延迟重试(TTL后自动回到主队列)|
| `products.retry.queue` | 是 | 5秒 | `main.exchange` | 产品延迟重试 |
| `refunds.retry.queue` | 是 | 5秒 | `main.exchange` | 退款延迟重试 |
| `inventory.retry.queue` | 是 | 5秒 | `main.exchange` | 库存延迟重试 |
**重试队列参数配置**(按 data_type 实例化,`{entity_singular}` ∈ {`order`, `product`, `refund`, `inventory`}):
```json
{
"x-message-ttl": 5000,
"x-dead-letter-exchange": "main.exchange",
"x-dead-letter-routing-key": "{entity_singular}.retry"
}
```
例:`orders.retry.queue` 的 dl-rk = `order.retry`,回流时由 main.exchange 的 `order.#` binding 匹配进入 `orders.queue`
**重试机制说明(现网实现)**
1. 消费者处理失败时,调用 `FailedMessageTrait::getRetryCount``x-death[0].count`
2. 根据重试次数决定处理方式:
- **count < `AMQP_MAX_RETRIES`(默认 3**`return Result::NACK`Hyperf 自动按 `$requeue=false` 触发 DLX)→ 进入 `dlx.{dtype}` → 路由到 `{dtype}.retry.queue`5s 延迟)→ TTL 到期回流 `main.exchange` → 重新进入主队列
- **count >= max_retries**Consumer 调用 `FailedMessageTrait::sendToErrorQueue`**同时**
- `ErrorProducer.publish``errors.exchange` (rk=`error`) → `errors.queue` (TTL 7d)
- `FailedMessage::create` → DB `failed_messages`
- 然后 `return Result::ACK`,避免主队列继续重试
3. 配置参数:
- **最大重试次数**`AMQP_MAX_RETRIES`(默认 3
- **延迟时间**5 秒(retry 队列 `x-message-ttl=5000`
- **调试延迟**`AMQP_CONSUMER_DEBUG_DELAY`(生产环境设 0
#### 错误队列
统一的错误队列,接收所有超过重试次数的失败消息:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | 用途 |
|-----------|--------|----------|----------|------|
| `errors.queue` | 是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 |
**错误队列的作用和机制**
-**收集永久失败的消息**:当消息重试次数达到上限(默认3次)后,消费者会主动将消息发送到此队列
-**避免消息丢失**:即使处理失败多次,消息也会被保存在错误队列中,不会丢失
-**人工排查和修复**:运维人员可以从错误队列中查看失败原因,修复问题后重新投递
-**统一管理**:所有数据类型(orders/products/refunds/inventory)的错误消息都集中在一个队列
- ⚠️ **不会自动重试**:错误队列中的消息需要人工介入,不会自动回流到主队列
**错误消息格式**
```json
{
"error_id": "err_675d8f3a2b1c4",
"original_message": { /* */ },
"error": {
"type": "InvalidArgumentException",
"message": "Missing required field: total_amount",
"trace": "Exception stack trace...",
"timestamp": "2025-01-15T10:31:00+00:00"
},
"metadata": {
"platform": "tmall",
"platform_id": 2,
"company_id": 188,
"store_id": 292,
"data_type": "order",
"failed_at": "2025-01-15T10:31:00+00:00",
"retry_count": 3
}
}
```
---
### 4. Binding 配置(现网实际)
本节按"消息流向"组织,每个 binding 都来自本地 MQ 实测(`curl mgmt-api/bindings/datahub`+ `backend/bin/rabbitmq.sh` 创建逻辑。
#### 4.1 平台业务 Exchange → 业务队列(producer 主路径)
每个 `{platform}.exchange` 通过 4 条 binding 直接绑定到 4 个业务队列:
```
{platform}.exchange (topic)
├── rk = "order.{platform}" → orders.queue
├── rk = "product.{platform}" → products.queue
├── rk = "refund.{platform}" → refunds.queue
└── rk = "inventory.{platform}" → inventory.queue
```
19 平台 × 4 = **76 条 binding**
#### 4.2 main.exchange → 业务队列(retry 回流路径)
```
main.exchange (topic)
├── rk = "order.#" → orders.queue
├── rk = "product.#" → products.queue
├── rk = "refund.#" → refunds.queue
└── rk = "inventory.#" → inventory.queue
```
**4 条 binding**。这是 retry queue 死信回流的唯一入口;producer **不**通过 main.exchange 投递消息。
#### 4.3 DLX → retry queue(仅一种绑定)
```
dlx.orders (topic) — rk = "retry" → orders.retry.queue
dlx.products (topic) — rk = "retry" → products.retry.queue
dlx.refunds (topic) — rk = "retry" → refunds.retry.queue
dlx.inventory (topic) — rk = "retry" → inventory.retry.queue
```
**4 条 binding**
> ⚠ **DLX 不绑定到 errors.queue**。错误进入 errors.queue 由应用层 `ErrorProducer` 主动 publish 实现(见 §4.5),**不**通过 DLX 路由分流。这一点与早期文档草稿("dlx.{dtype} → errors.queue (rk=error)" 双绑模式)不符,**以现网实现为准**。
#### 4.4 retry queue → main.exchangeTTL 死信回流)
retry queue 不需要显式 binding;其 `x-dead-letter-exchange = main.exchange``x-dead-letter-routing-key = {entity_singular}.retry` 定义在队列 arguments 中,TTL 到期由 RabbitMQ 引擎自动死信投递到 main.exchange。
```
orders.retry.queue —(TTL 5s, dl-rk=order.retry)→ main.exchange →[order.#]→ orders.queue
products.retry.queue —(TTL 5s, dl-rk=product.retry)→ main.exchange →[product.#]→ products.queue
refunds.retry.queue —(TTL 5s, dl-rk=refund.retry)→ main.exchange →[refund.#]→ refunds.queue
inventory.retry.queue —(TTL 5s, dl-rk=inventory.retry)→ main.exchange →[inventory.#]→ inventory.queue
```
#### 4.5 errors exchange → errors.queue(双入口扇入)
```
errors.exchange (topic) — rk = "#" → errors.queue (全局兜底入口,应用层 ErrorProducer 用)
{platform}.errors.exchange (topic) — rk = "#" → errors.queue × 19 (平台自定义错误投递入口)
```
**20 条 binding**19 平台 + 1 全局)。
#### 4.6 binding 总数核对
| 类别 | 数量 |
|---|---|
| 平台业务 exchange → 4 业务队列 | 19 × 4 = 76 |
| main.exchange → 4 业务队列 | 4 |
| dlx.{dtype} → retry queue | 4 |
| errors exchange → errors.queue19 平台 + 1 全局) | 20 |
| **总计** | **104** |
(不含 default exchange `(AMQP default)` 自动给每个 queue 创建的同名 binding
---
## 消费者设计
### 1. 消费者模型
采用 **单消费者 + 批处理 + prefetch + 适配器模式**
| 数据类型 | 消费者 | Queue | 并发模式 | 处理模式 |
|---------|--------|-------|----------|---------|
| 订单 | Order Consumer | `orders.queue` | 单消费者 | 批处理 + 适配器 |
| 产品 | Product Consumer | `products.queue` | 单消费者 | 批处理 + 适配器 |
| 退款 | Refund Consumer | `refunds.queue` | 单消费者 | 批处理 + 适配器 |
| 库存 | Inventory Consumer | `inventory.queue` | 单消费者 | 批处理 + 适配器 |
**设计原则**
-**单消费者**:每个队列只有一个消费者实例,保证严格 FIFO 顺序
-**批处理**:积累一批消息后统一处理,提升数据库写入效率
-**单条 ACK**:每条消息独立确认,失败不影响其他消息
-**适配器模式**:消费者内部根据平台字段分发到不同业务逻辑
**消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **
### 2. Prefetch 配置
```json
{
"prefetch_count": 100,
"batch_size": 50,
"batch_timeout": 3000
}
```
**参数说明**
- `prefetch_count=100`RabbitMQ 最多推送 100 条未确认消息到消费者
- `batch_size=50`:消费者积累 50 条消息后触发批处理
- `batch_timeout=3000`:如果3秒内未达到 batch_size,也触发处理
**工作流程**
1. RabbitMQ 按 Push 模式推送最多 100 条消息到消费者
2. 消费者缓存消息,达到 50 条或超时 3 秒后批量处理
3. 批量处理时,仍然对每条消息单独 ACK/NACK
4. 实现"伪批量拉取"效果,同时保留单条确认的灵活性
### 3. 消费者处理逻辑(现网实现简化版)
完整实现见 [`backend/app/Platform/OrderConsumer.php`](../backend/app/Platform/OrderConsumer.php) 与 [`FailedMessageTrait.php`](../backend/app/Platform/Traits/FailedMessageTrait.php)。简化骨架:
```php
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue",
pool: "default_consumer", nums: 1, enable: true)]
class OrderConsumer extends ConsumerMessage
{
use FailedMessageTrait;
protected ?array $qos = ['prefetch_size' => 0, 'prefetch_count' => 1, 'global' => false];
protected bool $requeue = false; // 关键:让失败消息进 DLX 而不是 requeue
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())
->setQueue($this->getQueue())
->setArguments([
'x-dead-letter-exchange' => ['S', 'dlx.orders'],
'x-dead-letter-routing-key' => ['S', 'retry'],
'x-message-ttl' => ['I', 86400000], // 24h
'x-queue-type' => ['S', 'classic'],
]);
}
public function consumeMessage($data, AMQPMessage $message): Result
{
$retry_count = $this->getRetryCount($message); // 读 x-death[0].count
$max_retries = (int) env('AMQP_MAX_RETRIES', 3);
try {
// 1. 创建 EntityParse、entityMatch、entityMap、upsert
$parse = EntityParseFactory::createFromMessage($message);
$entity = $parse->entityMatch($data['meta']);
$rows = $parse->entityMap($data['data'])->all();
Db::beginTransaction();
$entity->newQuery()->upsert($rows, $parse->getUniqueBy(), $parse->getUpdateFields());
// 2. 子项处理、聚合补刷等业务逻辑 ...
Db::commit();
return Result::ACK;
} catch (Throwable $error) {
Db::rollBack();
Log::get()->error('Consumer processing failed', [...]);
if ($retry_count >= $max_retries) {
// 双写兜底:MQ errors.queue + DB failed_messages 表
$this->sendToErrorQueue($message, $error);
return Result::ACK; // 防止主队列继续重试
}
return Result::NACK; // requeue=false → 进 DLX → retry queue
}
}
}
```
`FailedMessageTrait::sendToErrorQueue` 内部:
```php
$producer = ApplicationContext::getContainer()->get(Producer::class);
$producer->produce(new ErrorProducer($message, $error, $retry_count));
// ↓ ErrorProducer 注解 #[Producer(exchange: 'errors.exchange', routingKey: 'error')]
// → errors.exchange → errors.queue (MQ 兜底, 7d TTL)
FailedMessage::query()->create([...]); // → DB failed_messages 表 (DB 兜底)
```
### 4. 重试机制工作流程(现网实现)
```
┌─────────────────┐
│ orders.queue │ ← 消费者从这里接收消息
└────────┬────────┘
├─ 成功 → return Result::ACK
└─ 失败 → FailedMessageTrait::getRetryCount($message)
(读 x-death[0].count)
├─ count < AMQP_MAX_RETRIES (默认 3)
│ │
│ └→ return Result::NACK (Hyperf 自动按 $requeue=false 处理)
│ │
│ ▼ 触发 DLX
│ dlx.orders (topic) ──rk="retry"──► orders.retry.queue
│ │ TTL=5s
│ ▼
│ main.exchange (rk="order.retry")
│ │
│ ▼ 命中 binding "order.#"
│ orders.queue (重新消费,x-death.count+1)
└─ count >= AMQP_MAX_RETRIES
└→ FailedMessageTrait::sendToErrorQueue($message, $error)
├──► ErrorProducer.publish → errors.exchange (rk="error")
│ → errors.queue (TTL 7d, user_ops 捞回)
└──► FailedMessage::query()->create(...)
→ DB failed_messages 表
return Result::ACK (避免主队列继续重试)
```
**关键点**
-**`$requeue=false` 由 Consumer 类属性固定**:让失败消息进入 DLX 而不是 requeue
-**retry 队列自动回流**RabbitMQ 引擎按 retry queue 的 `x-message-ttl=5000` 自动死信回 main.exchange,不需要额外消费者
-**`x-death.count` 自动累加**:每次死信 RabbitMQ 自动增加 countConsumer 通过 `FailedMessageTrait::getRetryCount` 读取
-**错误兜底双写**:超过重试上限时 errors.queue + failed_messages 表**同步双写**,互为备份
- ⚠️ **DLX 不绑定 errors.queue**:进入 errors.queue 完全由应用层 `ErrorProducer` 主动 publish 实现,**不**走 DLX 路由分流
### 5. 为什么这样设计?
| 特性 | 原因 | 优势 |
|------|------|------|
| **单队列 FIFO** | RabbitMQ 无法在队列内部按 topic 过滤 | 保证严格 FIFO,不拆分队列造成业务乱序 |
| **单消费者** | RabbitMQ 同队列多消费者必然 Round-Robin 分配 | 避免消息错投到错误的消费者 |
| **Prefetch + 批处理** | RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 | 实现"伪批量拉取",提升吞吐 |
| **单条 ACK** | 保留细粒度控制 | 失败消息不影响其他消息 |
| **DLX + 延迟重试** | 避免失败消息阻塞队列 | 不会卡 FIFO,不会无限重试 |
| **适配器模式** | 多平台业务逻辑隔离 | 扩展性好,易维护 |
---
## 用户权限配置
> **VHost 名称**:现网为 `datahub`(早期草稿误写为 `datahub-app`)。下述权限正则均来自本地 MQ 实测(`curl mgmt-api/permissions`+ `backend/bin/rabbitmq.sh`,是**现网真实使用**的正则,不要随意改动。
### 1. 平台开发者账号(19 个)
用户名规范:`user_{platform}`,其中 `{platform}``platforms``name` 字段经 `strtolower(str_replace(' ', '_', $name))` 标准化后的结果。
完整列表:`user_amazon_japan` / `user_coupang` / `user_dewu` / `user_douyin` / `user_goofish` / `user_jd` / `user_kaola` / `user_lazada` / `user_naver` / `user_offline` / `user_pdd` / `user_rakuten` / `user_redbook` / `user_shopee` / `user_shopify` / `user_tmall` / `user_wechat` / `user_wechat_video` / `user_youzan`
**权限正则模板**(按平台名展开):
```
configure = ^{platform}\.(exchange|errors\.exchange)$
write = ^{platform}\.(exchange|errors\.exchange)$
read = ^{platform}\.errors\..*$
```
示例(tmall):
```
configure = ^tmall\.(exchange|errors\.exchange)$
write = ^tmall\.(exchange|errors\.exchange)$
read = ^tmall\.errors\..*$
```
**密码 env 命名**`MQ_PASSWORD_{PLATFORM_UPPER}`,其中 `PLATFORM_UPPER` = 平台标准化名转大写。例:`MQ_PASSWORD_TMALL``MQ_PASSWORD_AMAZON_JAPAN``MQ_PASSWORD_WECHAT_VIDEO`。密码由部署期 render-rabbitmq.sh 生成;由 `mq_user.php` 动态从 `platforms` 表读取启用平台后注入。
### 2. Datahub 消费者账号
```
用户名: user_datahub_consumer
密码: env MQ_PASSWORD_CONSUMER
VHost: datahub
configure = ^(main\.exchange|errors\.exchange|dlx\..*)|(.*\.queue)$
write = ^(orders|products|refunds|inventory).*\.queue$|(dlx\..*)|(errors\.exchange)|(.*\.errors\.exchange)$
read = ^(main\.exchange|(orders|products|refunds|inventory).*\.queue|dlx\..*)$
```
> ⚠ 此权限正则比早期文档("只读业务队列")宽松:消费者需要 **configure** main.exchange / errors.exchange / dlx.* / 所有 queue(因为 Hyperf Consumer Annotation 在 worker 启动时 declare 这些资源),需要 **write** errors.exchange + platform.errors.exchange(因为 `ErrorProducer` 走这条),需要 **read** main.exchange + 业务队列 + dlx.*。
### 3. 运维监控账号
```
用户名: user_ops
密码: env MQ_PASSWORD_OPS
VHost: datahub
configure = ^errors\..*$
write = (留空)
read = ^errors\.queue$
```
仅能读 `errors.queue` 做人工捞回;与 DB `failed_messages` 表(应用层提供 Admin UI / API 查询)互为冗余。
### 4. 管理员账号
```
用户名: admin
密码: <强密码,仅 ops/dev 持有>
Tag: administrator
management API: AMQP_ADMIN_USER / AMQP_ADMIN_PASSWORD
(见 backend/config/autoload/amqp.php "management" 段)
```
完整权限,可访问管理界面。**生产环境**仅用于运维操作 / `rabbitmqadmin` CLI / mgmt API;不暴露给应用代码。
---
## 消息格式规范
### 1. message_id 设计
message_id 采用结构化 5 段格式(见 [`OrderProducer::generateMessageId`](../backend/app/Platform/OrderProducer.php)):
**格式**
```
{company_id}#{platform_id}#{store_id}#{entity_type}#{unique_id}
```
**字段说明**
| 段 | 含义 | 来源 |
|---|---|---|
| `company_id` | 公司 ID | producer 输入参数 |
| `platform_id` | 平台 ID(数据库主键) | producer 输入参数 |
| `store_id` | 店铺 ID | producer 输入参数 |
| `entity_type` | 数据实体类型(order/product/refund/inventory),**单数** | 由 `routingKey` 前缀自动派生(`explode('.', $routingKey)[0]`|
| `unique_id` | 业务侧定义的唯一标识 / 时间区间 / batch id 等,**业务侧维护幂等性** | producer 输入参数 |
**示例**
```
188#2#292#order#68a3f2c4e9b1f
```
解析:company=188, platform=2 (tmall), store=292, entity=order, unique_id=`68a3f2c4e9b1f`uniqid()
**分隔符**`#`RabbitMQ message_id 字段允许任意字符;不与 routing key 通配符冲突)。
**特性**
- ✅ 幂等性:业务侧通过 `unique_id` 保证同一实体的 message_id 一致(消费端 upsert 唯一键由 EntityParse 配置)
- ✅ 可读性:直接从 message_id 看出数据来源 4 段元数据
- ✅ 便于调试:日志 / 监控可直接根据前 4 段过滤
### 2. 业务消息格式
由 [`OrderProducer::buildMessage`](../backend/app/Platform/OrderProducer.php) 构造(基类,所有 platform Producer 共用):
```json
{
"message_id": "188#2#292#order#68a3f2c4e9b1f",
"timestamp": "2026-05-14T13:43:59+08:00",
"platform": "tmall",
"data_type": "order",
"meta": {
"platform_id": 2,
"company_id": 188,
"store_id": 292,
"platform_store_id": null,
"unique_id": "68a3f2c4e9b1f",
"source_system": "tmall_api",
"retry_count": 0,
"data_version": 1747203839
},
"data": [
{ /* raw_data 1 */ },
{ /* raw_data 2 */ }
]
}
```
**字段说明**
| 字段 | 类型 | 必需 | 说明 |
|---|---|---|---|
| `message_id` | string | 是 | 5 段结构化 ID(见 §1)|
| `timestamp` | string | 是 | ISO 8601 格式(`date('c')`|
| `platform` | string | 是 | 平台标识(标准化后的小写下划线名);基类默认 `'platform_unkonw'`,子类 `buildMessage` 中覆盖(如 `TmallProductProducer` 覆盖为 `'tmall'`|
| `data_type` | string | 是 | 数据类型(order/product/refund/inventory**单数**);基类默认 `'order'`,子类覆盖 |
| `meta.platform_id` | int | 是 | 平台 ID(数据库主键,来自 `platforms` 表)|
| `meta.company_id` | int | 是 | 公司 ID |
| `meta.store_id` | int | 是 | 店铺 ID |
| `meta.platform_store_id` | string\|null | 否 | 平台侧店铺标识(如有)|
| `meta.unique_id` | string | 是 | 业务唯一标识(与 message_id 末段一致)|
| `meta.source_system` | string | 是 | 数据来源系统(基类默认 `'origin_unknow'`,子类覆盖如 `'tmall_api'`|
| `meta.retry_count` | int | 是 | 重试次数(初始 0;消费端通过 AMQP `x-death[0].count` 读取真实值,不读 meta|
| `meta.data_version` | int | 是 | 数据版本(Unix 时间戳,用于乱序消费保护)|
| `data` | array | 是 | 平台原始数据数组(基类直接传 `$data['raw_data']`|
> ⚠ **字段名是 `meta` 不是 `metadata`**(与早期文档草稿不同)。消费端 `OrderConsumer.php:86` 读 `$data['meta']`。
**关于 `data_version`**:建议使用平台数据的更新时间戳,消费端可据此防止旧版本覆盖新版本;如平台未提供则 `OrderProducer::buildMessage` fallback 到 `time()`
### 3. 错误消息格式
由 [`ErrorProducer::buildErrorMessage`](../backend/app/Platform/ErrorProducer.php) 构造(投递到 `errors.exchange` rk=`error`):
```json
{
"error_id": "err_68f3d2c4abc1f.93214785",
"original_message": {
/* meta / data*/
},
"error": {
"type": "InvalidArgumentException",
"message": "Company with ID 188 not found",
"code": 0,
"file": "/var/www/app/Platform/Tmall/EntityParse/Product.php",
"line": 42,
"trace": "Exception stack trace...",
"timestamp": "2026-05-14T13:31:00+08:00"
},
"metadata": {
"platform": "tmall",
"platform_id": 2,
"company_id": 188,
"store_id": 292,
"data_type": "order",
"message_id": "188#2#292#order#68a3f2c4e9b1f",
"failed_at": "2026-05-14T13:31:00+08:00",
"retry_count": 3
}
}
```
> 注意:错误消息的字段名是 `metadata`(与业务消息的 `meta` 不一致;保留以兼容现有应用代码 `FailedMessageTrait::persistFailedMessage` 的解析)。
**双写到 DB**:错误消息同步写入 [`failed_messages`](../backend/app/Model/FailedMessage.php) 表(model 字段:error_id / data_type / platform / platform_id / company_id / store_id / error_type / error_message / error_code / error_trace / original_message(json) / retry_count / message_id / failed_at / created_at)。Admin 可通过 [`FailedMessageController`](../backend/app/Controller/Api/V1/FailedMessageController.php) 查询。
---
## RabbitMQ 管理界面操作指引
### 1. 访问管理界面
```
URL: http://<rabbitmq-host>:15672
用户名: admin(开发期默认 admin/admin;生产期通过 podman secret / env 注入)
密码: <管理员密码>
```
**推荐使用 Hyperf 命令**(避开浏览器,看核心 9 个队列状态):
```bash
php ./bin/hyperf.php app:mq:status # 单次输出
php ./bin/hyperf.php app:mq:status --watch --interval=5 # 每 5 秒刷新
```
输出包含三组:Business Queues4 业务)/ Dead Letter Queues (Retry Queues)4 retry/ Shared Queueserrors),分别展示 Messages / Consumers / Status。
### 2. 创建 VHost
1. 点击顶部导航 **Admin****Virtual Hosts**
2. 点击 **Add a new virtual host**
3. 输入 VHost 名称:`datahub`
4. 点击 **Add virtual host**
### 3. 创建用户
1. 点击顶部导航 **Admin****Users**
2. 点击 **Add a user**
3. 填写用户信息:
- **Username**: `user_tmall`(按 `user_{platform}` 规范)
- **Password**: `<安全密码>`
- **Tags**: (留空,非管理员用户)
4. 点击 **Add user**
### 4. 配置用户权限
1. 在用户列表中,点击用户名(如 `user_tmall`
2. 滚动到 **Permissions** 部分
3. 选择 VHost`datahub`
4. 配置权限正则表达式(与现网模板一致):
- **Configure regexp**: `^tmall\.(exchange|errors\.exchange)$`
- **Write regexp**: `^tmall\.(exchange|errors\.exchange)$`
- **Read regexp**: `^tmall\.errors\..*$`
5. 点击 **Set permission**
### 5. 创建 Exchange
1. 点击顶部导航 **Exchanges**
2. 确保选择正确的 VHost`datahub`
3. 点击 **Add a new exchange**
4. 填写配置:
- **Name**: `tmall.exchange`
- **Type**: `topic`
- **Durability**: `Durable`
- **Auto delete**: `No`
- **Internal**: `No`
5. 点击 **Add exchange**
重复以上步骤创建:1 个 `main.exchange` + 4 个 `dlx.{dtype}` + 1 个 `errors.exchange` + 每平台 2 个(`{platform}.exchange``{platform}.errors.exchange`)。
### 6. 创建 Queue
1. 点击顶部导航 **Queues**
2. 确保选择正确的 VHost`datahub`
3. 点击 **Add a new queue**
4. 填写配置(以 `orders.queue` 为例):
- **Name**: `orders.queue`
- **Durability**: `Durable`
- **Auto delete**: `No`
- **Arguments**:
- `x-message-ttl` = `86400000`24h
- `x-dead-letter-exchange` = `dlx.orders`
- `x-dead-letter-routing-key` = `retry`
- `x-queue-type` = `classic`
5. 点击 **Add queue**
四个业务队列同模式;retry 队列 arguments
```
x-message-ttl = 5000 # 5s
x-dead-letter-exchange = main.exchange
x-dead-letter-routing-key = {entity_singular}.retry # 如 order.retry
x-queue-type = classic
```
`errors.queue` arguments
```
x-message-ttl = 604800000 # 7d
x-queue-type = classic
```
### 7. 创建 Binding
1. 点击顶部导航 **Exchanges**
2. 点击需要绑定的 Exchange(如 `tmall.exchange`
3. 滚动到 **Bindings** 部分
4.**Add binding from this exchange** 区域填写:
- **To queue**: `orders.queue`
- **Routing key**: `order.tmall`
5. 点击 **Bind**
为每个平台 Exchange 创建 **4 个 binding**order/product/refund/inventory)。
`main.exchange` 创建 4 个 bindingrk: `order.#` / `product.#` / `refund.#` / `inventory.#`)。
每个 `{platform}.errors.exchange` + 全局 `errors.exchange` 各创建 1 个绑定到 `errors.queue`rk=`#`)。
DLX 各创建 1 个绑定到对应 retry queuerk=`retry`)。**不**绑定 errors.queue。
---
## 配置脚本
> **生产环境推荐**:使用 `backend/bin/rabbitmq.sh`(与现网完全一致的脚本,从 `platforms` 表动态读取平台列表)。下方独立脚本仅供**临时手工部署 / 故障排查** 使用。
### 使用 rabbitmqadmin
RabbitMQ 提供命令行工具 `rabbitmqadmin`,可用于批量配置。
#### 安装 rabbitmqadmin
```bash
# 从 RabbitMQ 管理界面下载
wget http://<rabbitmq-host>:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/
```
#### 配置脚本示例
**注意**:本脚本适用于**新版本 rabbitmqadmin**Rust 版本)。如果您使用的是旧版本(Python),请参考官方文档调整语法。
```bash
#!/bin/bash
# RabbitMQ 连接信息
RABBITMQ_HOST="127.0.0.1"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin"
VHOST="datahub"
# 平台列表(与现网 19 个对齐;新增平台时追加即可)
PLATFORMS=(
"amazon_japan" "coupang" "dewu" "douyin" "goofish"
"jd" "kaola" "lazada" "naver" "offline"
"pdd" "rakuten" "redbook" "shopee" "shopify"
"tmall" "wechat" "wechat_video" "youzan"
)
DATA_TYPES=("orders" "products" "refunds" "inventory")
# 0. 检测并清理现有 VHost 和用户(如果存在)
echo "========================================"
echo "开始清理现有配置..."
echo "========================================"
# 检查 VHost 是否存在
echo "检查 VHost: $VHOST 是否存在..."
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true)
if [ -n "$VHOST_EXISTS" ]; then
echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..."
# 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete vhost --name "$VHOST"
echo "✓ VHost '$VHOST' 及其所有资源已删除"
else
echo "VHost '$VHOST' 不存在"
fi
# 清理所有相关用户账号
echo ""
echo "清理用户账号..."
# 平台用户列表
ALL_USERS=()
for platform in "${PLATFORMS[@]}"; do
ALL_USERS+=("user_${platform}")
done
# 添加其他用户
ALL_USERS+=("user_datahub_consumer")
ALL_USERS+=("user_ops")
# 删除用户(如果存在)
for user in "${ALL_USERS[@]}"; do
USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true)
if [ -n "$USER_EXISTS" ]; then
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete user --name "$user"
echo "✓ 删除用户: $user"
fi
done
echo ""
echo "========================================"
echo "清理完成,开始重新配置..."
echo "========================================"
echo ""
# 1. 创建 VHost
echo "创建 VHost: $VHOST"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare vhost --name "$VHOST"
# 2. 创建主 Exchange(用于重试消息回流)
echo "创建主 Exchange: main.exchange"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "main.exchange" --vhost "$VHOST" \
--type topic --durable true
# 3. 创建 DLX (死信交换机)
echo "创建死信交换机 (DLX)..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建 DLX: dlx.${dtype}"
done
# 4. 创建主业务队列(带 DLX 配置)
echo ""
echo "创建主业务队列..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable \
--arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
echo "✓ 创建队列: ${dtype}.queue"
done
# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
echo ""
echo "创建重试队列..."
for dtype in "${DATA_TYPES[@]}"; do
dtype_singular="${dtype%s}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable \
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}"
echo "✓ 创建重试队列: ${dtype}.retry.queue (dl-rk: ${dtype_singular}.retry)"
done
# 6. 创建错误队列
echo ""
echo "创建错误队列..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "errors.queue" --vhost "$VHOST" --durable \
--arguments '{"x-message-ttl":604800000}'
echo "✓ 创建错误队列: errors.queue"
# 7. 绑定主 Exchange 到主队列(使用通配符)
echo ""
echo "绑定主 Exchange 到主队列..."
for dtype in "${DATA_TYPES[@]}"; do
# 使用 order.#, product.# 等通配符匹配所有平台
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "main.exchange" --destination "${dtype}.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#"
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
done
# 8. 绑定 DLX 到重试队列(注意:现网 DLX 不绑定 errors.queue,错误由应用层 ErrorProducer 直接推 errors.exchange
echo ""
echo "绑定 DLX 到重试队列..."
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "retry"
echo "✓ 绑定: dlx.${dtype}${dtype}.retry.queue (routing_key: retry)"
done
# 8b. 创建全局 errors.exchange 并绑定到 errors.queue
echo ""
echo "创建全局错误 Exchange..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "errors.exchange" --vhost "$VHOST" --type topic --durable true
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "#"
echo "✓ 绑定: errors.exchange → errors.queue (routing_key: #)"
# 9. 为每个平台创建 Exchange 和 Binding
echo ""
echo "========================================"
echo "创建平台 Exchange 和 Binding..."
echo "========================================"
for platform in "${PLATFORMS[@]}"; do
echo ""
echo "处理平台: ${platform}"
# 创建平台业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建业务 Exchange: ${platform}.exchange"
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "orders.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "order.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "products.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "product.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "refunds.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "inventory.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}"
echo "✓ 绑定到业务队列 (4个)"
# 创建平台错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建错误 Exchange: ${platform}.errors.exchange"
# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "#"
echo "✓ 绑定到错误队列"
# 创建平台用户
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_${platform}" --password "change_me_${platform}" --tags ""
echo "✓ 创建用户: user_${platform}"
# 配置平台用户权限
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_${platform}" \
--configure "^${platform}\\.(exchange|errors\\.exchange)$" \
--write "^${platform}\\.(exchange|errors\\.exchange)$" \
--read "^${platform}\\.errors\\..*$"
echo "✓ 配置用户权限"
done
# 10. 创建 datahub 消费者用户
echo ""
echo "========================================"
echo "创建 datahub 消费者用户..."
echo "========================================"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_datahub_consumer" --password "change_me_consumer" --tags ""
echo "✓ 创建用户: user_datahub_consumer"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_datahub_consumer" \
--configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \
--write "^(orders|products|refunds|inventory).*\\.queue\$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)\$" \
--read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)\$"
echo "✓ 配置用户权限"
# 11. 创建运维监控用户
echo ""
echo "创建运维监控用户..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_ops" --password "change_me_ops" --tags ""
echo "✓ 创建用户: user_ops"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_ops" \
--configure "^errors\\..*$" --write "" --read "^errors\\.queue$"
echo "✓ 配置用户权限"
echo ""
echo "========================================"
echo "RabbitMQ 配置完成!"
echo "========================================"
echo ""
echo "已创建:"
echo "- 1 个 VHost: $VHOST"
echo "- 1 个主 Exchange: main.exchange"
echo "- 1 个全局错误 Exchange: errors.exchange"
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
echo "- 1 个错误队列: errors.queue"
echo "- $((${#PLATFORMS[@]} * 2)) 个平台 Exchange (${#PLATFORMS[@]} 业务 + ${#PLATFORMS[@]} 错误)"
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
echo ""
echo "提示: 请修改所有用户的默认密码 (change_me_*)"
echo "提示: 验证拓扑:php ./bin/hyperf.php app:mq:status"
```
#### 执行脚本
```bash
chmod +x setup_rabbitmq.sh
./setup_rabbitmq.sh
```
---
## 监控和维护
### 1. 监控指标
通过 RabbitMQ 管理界面查看以下关键指标:
#### 队列健康状态
1. 点击 **Queues** → 选择队列
2. 关注以下指标:
- **Ready**: 待消费消息数(正常应 < 1000)
- **Unacked**: 未确认消息数(正常应较低)
- **Total**: 总消息数
- **Incoming rate**: 消息入队速率
- **Deliver / Get rate**: 消息消费速率
**告警阈值**
- Ready > 10000:队列堆积严重
- Unacked > 5000:消费者处理缓慢
- Incoming rate >> Deliver rate:消费速度跟不上生产速度
#### Exchange 流量统计
1. 点击 **Exchanges** → 选择 Exchange
2. 查看 **Message rates**
- **Publish in**: 消息发布速率
- **Publish out**: 消息路由到队列的速率
**异常情况**
- Publish in > 0 但 Publish out = 0:可能绑定配置错误
### 2. 常见问题排查
#### 消息未到达队列
**检查步骤**
1. 确认 Exchange 存在且类型正确
2. 检查 Binding 配置,确保 routing key 匹配
3. 查看 Exchange 的 **Message rates**,确认消息已发布
4. 检查生产者是否使用了正确的 routing key
#### 消费者无法消费
**检查步骤**
1. 确认用户权限配置正确(Read 权限)
2. 检查队列中是否有消息(Ready > 0
3. 查看消费者连接状态(Connections 页面)
4. 检查消费者代码是否正确 ACK 消息
#### 错误消息未送达平台开发者
**检查步骤**
1. 确认错误 Exchange 绑定到 `errors.queue`
2. 检查平台开发者账号的 Read 权限
3. 确认平台开发者订阅了正确的 Exchange
4. 查看 `errors.queue` 中是否有消息
### 3. 备份和恢复
#### 导出配置
```bash
# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
export datahub-backup.json
```
#### 导入配置
```bash
# 从备份恢复配置
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
import datahub-backup.json
```
### 4. 性能优化
#### 消费者并发配置
- 增加消费者实例数量
- 每个消费者配置多个 channel(如 5-10 个)
- 使用 prefetch_count 限制未确认消息数量
#### 队列性能优化
- 启用 lazy queue(大量消息堆积时):
```bash
rabbitmqadmin declare queue name=orders.queue vhost=datahub \
durable=true arguments='{"x-queue-mode":"lazy"}'
```
- 定期清理过期消息(通过 TTL 自动实现)
### 5. 日志查看
```bash
# 查看 RabbitMQ 日志
tail -f /var/log/rabbitmq/rabbit@<hostname>.log
# 查看连接日志
tail -f /var/log/rabbitmq/rabbit@<hostname>_connection.log
```
---
## 安全建议
### 1. 密码策略
- 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符)
- 定期更换密码(建议 3-6 个月)
- 不同环境(开发/测试/生产)使用不同密码
### 2. 网络安全
- 生产环境启用 TLS/SSL 加密连接
- 限制管理界面访问(仅允许内网或 VPN 访问)
- 配置防火墙规则:
- 5672:AMQP 端口(仅允许应用服务器访问)
- 15672:管理界面(仅允许管理员 IP 访问)
### 3. 权限最小化原则
- 平台开发者只能访问自己的 Exchange
- 消费者只能读取必要的队列
- 避免赋予不必要的 Configure 权限
### 4. 审计日志
- 启用 RabbitMQ 审计日志插件
- 定期审查用户操作记录
- 监控异常登录行为
---
## 新增平台接入流程
> **首选路径**:使用 `backend/bin/rabbitmq.sh add <platform>`(自动化、与现网一致);下方手工脚本仅作参考。
新增平台(示例 `<platform>` = 标准化后的小写下划线名):
### 1. 创建 Exchange(业务 + 错误)
```bash
P="<platform>" # 例如 "tiktok_shop"
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare exchange --name "${P}.exchange" --vhost datahub --type topic --durable true
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare exchange --name "${P}.errors.exchange" --vhost datahub --type topic --durable true
```
### 2. 创建 5 条 Binding4 业务 + 1 错误)
```bash
for DTYPE in orders products refunds inventory; do
DTYPE_SINGULAR="${DTYPE%s}"
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare binding --source "${P}.exchange" --destination "${DTYPE}.queue" \
--destination-type queue --vhost datahub \
--routing-key "${DTYPE_SINGULAR}.${P}"
done
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare binding --source "${P}.errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost datahub --routing-key "#"
```
### 3. 创建用户并配置权限
```bash
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare user --name "user_${P}" --password "<secure_password>" --tags ""
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare permissions --vhost datahub --user "user_${P}" \
--configure "^${P}\\.(exchange|errors\\.exchange)\$" \
--write "^${P}\\.(exchange|errors\\.exchange)\$" \
--read "^${P}\\.errors\\..*\$"
```
### 4. 同步到 backend
1. **DB**`INSERT INTO platforms (name, enabled, ...) VALUES ('<Platform Display Name>', true, ...)`,确保 `mq_user.php` 动态读取时能匹配到
2. **env**:在 backend 的 `.env`(或 host 端 `datahub-backend.env`)追加 `MQ_PASSWORD_<PLATFORM_UPPER>=<secure_password>`
3. **重启 backend**`systemctl --user restart datahub-backend`,让 Hyperf worker 重新加载 mq_user.php 与 env
### 5. 提供给平台开发者
```
RabbitMQ 连接信息:
- Host: <rabbitmq-host>
- Port: 5672
- VHost: datahub
- Username: user_<platform>
- Password: <secure_password>
发布配置:
- Exchange: <platform>.exchange
- Routing Keys:
- order.<platform> → 发布订单数据
- product.<platform> → 发布产品数据
- refund.<platform> → 发布退款数据
- inventory.<platform> → 发布库存数据
错误投递(可选):
- Exchange: <platform>.errors.exchange
- Routing Key: 任意(由 binding `#` 兜底)
- 用途:平台侧主动上报无法处理的消息 / 校验失败的数据
消息格式规范:
- 参见「消息格式规范」章节(5 段 message_idmeta 字段单数;data_version 时间戳防乱序)
```
---
## 附录
### A. 完整资源清单(与本地 MQ 实测对齐 — 2026-05-14
#### VHost
- **总数**1 个
- `datahub`
#### Exchanges
| 类别 | 数量 | 名称 |
|---|---|---|
| 主路由 | 1 | `main.exchange`topic|
| 全局错误 | 1 | `errors.exchange`topic|
| DLX | 4 | `dlx.orders` / `dlx.products` / `dlx.refunds` / `dlx.inventory`(全 topic|
| 平台业务 | 19 | `{platform}.exchange` × 19(全 topic|
| 平台错误 | 19 | `{platform}.errors.exchange` × 19(全 topic|
**Exchange 业务总数****44 个**(不含 `amq.*` 系统 exchange 7 个)
#### Queues
| 类别 | 数量 | 名称 + 关键 arguments |
|---|---|---|
| 业务队列 | 4 | `{dtype}.queue`ttl=24h, dl-exchange=`dlx.{dtype}`, dl-rk=`retry`, type=classic |
| 重试队列 | 4 | `{dtype}.retry.queue`ttl=5s, dl-exchange=`main.exchange`, dl-rk=`{entity_singular}.retry`, type=classic |
| 错误队列 | 1 | `errors.queue`ttl=7d, 无 DLX, type=classic |
**Queue 总数****9 个**(与 `php ./bin/hyperf.php app:mq:status` 输出一致)
#### Bindings
| 类别 | 数量 | 路径 |
|---|---|---|
| 平台业务 → 业务队列 | 76 | `{platform}.exchange` —rk=`{entity}.{platform}`→ `{dtype}.queue`19 × 4|
| main → 业务队列 | 4 | `main.exchange` —rk=`{entity}.#`→ `{dtype}.queue` |
| DLX → 重试队列 | 4 | `dlx.{dtype}` —rk=`retry`→ `{dtype}.retry.queue`**仅此一种****不**绑定 errors.queue|
| 错误 exchange → errors.queue | 20 | `errors.exchange` 1 条 + `{platform}.errors.exchange` × 19 条,全部 rk=`#` |
**Binding 业务总数****104 条**(不含 default exchange 自动同名 binding 9 条)
> 死信回流路径(retry → main)不是 binding,而是 retry queue 的 `x-dead-letter-exchange` arguments 字段,RabbitMQ 引擎在 TTL 到期时自动投递。
#### Usersvhost `datahub`
| 类别 | 数量 | 命名 | 权限说明 |
|---|---|---|---|
| 平台开发者 | 19 | `user_{platform}` | 仅写自己的 `{platform}.exchange` / `{platform}.errors.exchange`,读自己的 errors(最小权限)|
| Datahub 消费者 | 1 | `user_datahub_consumer` | configure/write/read 见 §"消费者账号"现网正则 |
| 运维监控 | 1 | `user_ops` | 仅读 `errors.queue` |
| 管理员 | 1 | `admin` | tag=administrator,全权限 |
**User 总数****22 个**
完整平台用户清单:
```
user_amazon_japan, user_coupang, user_dewu, user_douyin, user_goofish,
user_jd, user_kaola, user_lazada, user_naver, user_offline,
user_pdd, user_rakuten, user_redbook, user_shopee, user_shopify,
user_tmall, user_wechat, user_wechat_video, user_youzan
```
#### 架构统计
| 资源类型 | 数量 | 说明 |
|---------|------|------|
| VHost | 1 | `datahub` |
| Exchange | 44 | 1 main + 1 全局 errors + 4 DLX + 19 平台业务 + 19 平台错误 |
| Queue | 9 | 4 业务 + 4 retry + 1 errors |
| Binding | 104 | 见上表(不含 default exchange 自动 binding|
| User | 22 | 19 平台 + 1 消费者 + 1 运维 + 1 管理员 |
**消息流转路径(双层兜底)**
```
生产者 → {platform}.exchange ──rk={entity}.{platform}──► {dtype}.queue ──► Consumer
消费失败 (count < max) ↓
◄──── NACK
dlx.{dtype} ──rk=retry──► {dtype}.retry.queue
│ TTL=5s
main.exchange (rk={entity}.retry)
{dtype}.queue (重新消费, count+1)
消费失败 (count >= max)
FailedMessageTrait::sendToErrorQueue
┌───────────────────────┼───────────────────────┐
▼ ▼
ErrorProducer ──► errors.exchange ──► errors.queue FailedMessage::create
(MQ 兜底, TTL 7d, user_ops 捞回) (DB 兜底, failed_messages 表)
Consumer 返回 ACK
(避免主队列继续重试)
```
### B. 参考资料
- [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html)
- [RabbitMQ Management Plugin](https://www.rabbitmq.com/management.html)
- [Access Control (Authentication, Authorization)](https://www.rabbitmq.com/access-control.html)
- [rabbitmqadmin 使用指南](https://www.rabbitmq.com/management-cli.html)