Files
datahub/docs/RabbitMQ.md
T

1613 lines
66 KiB
Markdown
Raw Normal View History

2025-11-14 16:04:05 +08:00
# RabbitMQ 配置方案
2026-05-15 08:48:24 +08:00
> **权威源声明**:本文档已与现网代码对齐(2026-05-14)。如本文与代码冲突,**以代码为准**:
> - 拓扑创建脚本:[`backend/bin/rabbitmq.sh`](../backend/bin/rabbitmq.sh)
> - 用户/凭据配置:[`backend/config/autoload/mq_user.php`](../backend/config/autoload/mq_user.php) + [`backend/config/autoload/amqp.php`](../backend/config/autoload/amqp.php)
> - 业务消费者(注解 declare):`backend/app/Platform/{Order,Product,Refund}Consumer.php`
> - 错误兜底(双写):[`backend/app/Platform/Traits/FailedMessageTrait.php`](../backend/app/Platform/Traits/FailedMessageTrait.php) + [`backend/app/Platform/ErrorProducer.php`](../backend/app/Platform/ErrorProducer.php) + [`backend/app/Model/FailedMessage.php`](../backend/app/Model/FailedMessage.php)
> - 运维状态检查:`php ./bin/hyperf.php app:mq:status [--interval=N] [--watch]`[`backend/app/Command/AppMqStatus.php`](../backend/app/Command/AppMqStatus.php) + [`backend/app/Service/MqStatusService.php`](../backend/app/Service/MqStatusService.php)
**配置层级说明**`mq_user.php` 从数据库 `platforms` 表动态读取启用的平台名,按 `strtolower(str_replace(' ', '_', $name))` 标准化生成 `user_{platform}` 用户名与 `MQ_PASSWORD_{PLATFORM_UPPER}` env 变量名;`amqp.php``default_consumer` 用户硬编码为 `user_datahub_consumer`,密码由 env `MQ_PASSWORD_CONSUMER` 注入。
2025-11-14 16:04:05 +08:00
## 概述
2026-02-28 10:38:33 +08:00
本文档描述 datahub 应用的 RabbitMQ 消息队列架构设计和配置方法。
2025-11-14 16:04:05 +08:00
### 业务背景
2026-02-28 10:38:33 +08:00
- **应用角色**:datahub 是消息消费者,负责接收来自多个电商平台的数据并入库
2026-05-15 08:48:24 +08:00
- **数据来源**19 个电商/线下平台(amazon_japan、coupang、dewu、douyin、goofish、jd、kaola、lazada、naver、offline、pdd、rakuten、redbook、shopee、shopify、tmall、wechat、wechat_video、youzan)— 平台列表由 `platforms` 表动态管理,名称按 `strtolower(str_replace(' ', '_', $name))` 标准化
2025-11-14 16:04:05 +08:00
- **数据类型**:订单(orders)、产品(products)、退款(refunds)、库存(inventory
- **隔离需求**:不同平台开发者之间的工作空间需要相互隔离,互不影响
### 架构特点
2026-05-15 08:48:24 +08:00
- **VHost 隔离**:按应用划分 VHost,使用 `Classic` 队列类型
- **Exchange 隔离**:每个平台独立的业务 Exchange + 错误 Exchange,通过权限正则控制访问(用户只能写自己的 platform.exchange / platform.errors.exchange
- **单队列设计**:每种数据类型一个队列,所有平台共享(`orders.queue` / `products.queue` / `refunds.queue` / `inventory.queue`),保证同 data_type 内严格 FIFO
- **独立消费者**:每种数据类型一个 Hyperf Consumer 实例,使用 prefetch + 单条 ACK 模式
- **智能重试**:DLX(死信交换机)+ 延迟重试队列(TTL=5s 自动回流 main.exchange+ 应用层 ErrorProducer 兜底
- **双层错误持久化**:超过重试上限的消息 **同时**写入 `errors.queue` (MQ, TTL=7d) + DB `failed_messages` 表(人工捞回与统计分析)
2025-11-14 16:04:05 +08:00
- **消息持久化**RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API
---
## 设计方案总结
### 核心设计理念
本方案采用 **Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式**,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。
### 关键设计决策
2025-11-17 16:51:51 +08:00
** 注意 **
1. 消息内部字段无严格要求,但可以在业务侧进行约束
2. 默认单条消息的最大 Payload 尺寸为 128MB
3. 消息队列为缓存系统,应该尽可能的将批量数据放入 payload\
4. MQ 存在的意义在于快速接受远程生产的 Message
2025-11-14 16:04:05 +08:00
#### 1. 为什么使用单队列?
| 问题 | 传统多队列方案 | 本方案(单队列) |
|------|--------------|----------------|
2026-05-15 08:48:24 +08:00
| **队列数量** | 19 平台 × 4 数据类型 = 76 个队列 | 4 个数据类型队列(数量减少 ~95%,且与平台数量解耦)|
2025-11-14 16:04:05 +08:00
| **FIFO 保证** | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 |
2026-05-15 08:48:24 +08:00
| **运维复杂度** | 需要监控 76 个队列 | 只需监控 4 个主队列 + 4 个 retry 队列 + 1 个 errors 队列 = 9 个 |
| **消费者部署** | 需要 N 个消费者实例 | 只需 4 个消费者实例 |
| **新增平台成本** | 新建 4 队列 + 4 binding + 用户/权限 | 仅新建 1 业务 exchange + 1 错误 exchange + 5 binding + 1 用户/权限(队列不动)|
2025-11-14 16:04:05 +08:00
**原因**:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。
#### 2. 为什么使用单消费者?
**原因**:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。
**优势**
- ✅ 保证严格 FIFO 顺序
- ✅ 避免消息错投到错误的消费者
- ✅ 简化部署和运维
**如何实现多平台支持?** 通过消费者内部的**适配器模式**,根据消息的 `platform` 字段分发到不同业务逻辑。
#### 3. 为什么使用 Prefetch + 批处理?
**原因**RabbitMQ 是 **Push 模式**,不提供 Kafka 式的批量拉取 API。
**解决方案**
- 设置 `prefetch_count=100`RabbitMQ 最多推送 100 条未确认消息
- 消费者自行缓存消息,达到 `batch_size=50``timeout=3s` 后批量处理
- 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性
**优势**
- ✅ 提升数据库写入效率(批量插入)
- ✅ 控制并发和内存压力
- ✅ 失败消息不影响其他消息(单条 ACK)
#### 4. 为什么使用 DLX + 延迟重试队列?
**问题**:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。
**传统方案的问题**
2026-01-30 15:38:11 +08:00
-`nack(requeue=true)`:消息回队列头部,无限循环,**永远不会进入 DLX**
2025-11-14 16:04:05 +08:00
- ❌ 直接丢弃:数据丢失
- ❌ 立即重试:打爆 API/DB
2026-05-15 08:48:24 +08:00
**本方案(DLX + 延迟重试 + 双写错误持久化)**
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
消费失败 → Consumer 检查 x-death count
count < AMQP_MAX_RETRIES(默认 3)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
Consumer 返回 NACK(requeue=false)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
源 queue 的 x-dead-letter-exchange = dlx.{dtype}, dl-rk = "retry"
2026-01-30 15:38:11 +08:00
2026-05-15 08:48:24 +08:00
dlx.{dtype} (topic) — binding rk="retry" → {dtype}.retry.queue (TTL=5s)
retry.queue 的 x-dead-letter-exchange = main.exchange, dl-rk = "{entity}.retry"
↓ 5s 后死信回流
main.exchange (topic) — binding "{entity}.#" → {dtype}.queue (重新消费, x-death count+1)
count >= AMQP_MAX_RETRIES
Consumer 调用 FailedMessageTrait::sendToErrorQueue
↓ ┌─ ErrorProducer.publish → errors.exchange (rk="error") → errors.queue (TTL=7d) ← MQ 兜底
└─ FailedMessage::create → DB `failed_messages` 表 ← DB 兜底
Consumer 返回 ACK(避免主队列继续重试)
2025-11-14 16:04:05 +08:00
```
2026-01-30 15:38:11 +08:00
**重要说明**
2026-05-15 08:48:24 +08:00
- ⚠️ **必须设置 `requeue=false`**:只有 `requeue=false` 才会触发 DLX 机制(已由 `protected bool $requeue = false` 在所有 Consumer 中固定)
2026-01-30 15:38:11 +08:00
- ⚠️ **`requeue=true` 会导致无限循环**:消息直接回到原队列头部,永远不进入 DLX
2026-05-15 08:48:24 +08:00
-**retry 队列自动回流**TTL 到期后由 RabbitMQ 引擎自动死信回 main.exchange,不需要额外消费者
-**Consumer 控制重试次数**:通过 `FailedMessageTrait::getRetryCount``x-death[0].count`
-**错误兜底双写**:超过重试上限时,errors.queue + failed_messages 表**同步双写**,互为备份;监控/告警/捞回需同时覆盖两个数据源
-**DLX 不直接绑定 errors.queue**:错误进入 errors.queue 完全由应用层 `ErrorProducer` 主动 publish 到 `errors.exchange` 实现,**不**走 DLX 路由分流
2026-01-30 15:38:11 +08:00
2025-11-14 16:04:05 +08:00
**优势**
- ✅ 不会卡 FIFO
2026-01-30 15:38:11 +08:00
- ✅ 不会无限重试(通过 max retries 限制)
2025-11-14 16:04:05 +08:00
- ✅ 延迟重试避免打爆 API/DB
2026-01-30 15:38:11 +08:00
- ✅ 错误消息自动隔离到 error 队列
2025-11-14 16:04:05 +08:00
- ✅ 无需额外 Redis 或手写 retry 逻辑
2026-01-30 15:38:11 +08:00
- ✅ retry 过程完全自动化(依赖 RabbitMQ 的 DLX + TTL 机制)
2025-11-14 16:04:05 +08:00
### 方案优势总结
| 维度 | 优势 |
|------|------|
| **可靠性** | 消息持久化 + 单条 ACK + 智能重试,避免数据丢失 |
| **性能** | Prefetch + 批处理 + 数据库批量写入,提升吞吐 |
| **顺序性** | 单队列 FIFO 保证严格顺序 |
| **扩展性** | 适配器模式,新增平台只需添加适配器类 |
| **运维** | 队列数量少,监控简单,故障排查容易 |
| **容错** | DLX 智能重试,失败消息不阻塞主队列 |
### 适用场景
**适合**
- 多平台数据同步(电商、物流、金融等)
- 需要严格 FIFO 顺序
- 需要高可靠性和高吞吐
- 平台数量多(> 5个),数据类型固定(< 10个)
**不适合**
- 实时性要求极高(< 100ms
- 需要按平台独立控制消费速率
- 平台数量少(< 3个),可以使用独立队列
---
2026-01-30 15:38:11 +08:00
## Requeue 机制详解
### requeue 参数的作用
在 RabbitMQ 中,当消费者处理消息失败时,可以通过 `basic_nack()``basic_reject()` 拒绝消息,第三个参数 `requeue` 决定了消息的去向:
```php
// requeue=true:消息重新回到原队列(队列头部)
$channel->basic_nack($deliveryTag, false, true);
// requeue=false:消息进入 DLX(如果配置了),否则丢弃
$channel->basic_nack($deliveryTag, false, false);
```
### requeue=true 的问题
| 行为 | 后果 |
|------|------|
| 消息回到原队列头部 | 立即被同一个消费者再次取出 |
| 不触发 DLX | 永远不会进入重试队列或错误队列 |
| 无延迟 | 瞬间形成高速循环(毫秒级) |
| 无计数 | 无法统计重试次数,无法设置上限 |
**结果**:消息在 `消费 → 失败 → NACK → 回队列 → 消费` 之间无限循环,CPU 和网络资源被浪费,队列状态显示为空(消息循环太快无法观察)。
### requeue=false 的正确用法
```
2026-05-15 08:48:24 +08:00
orders.queue
arguments:
x-dead-letter-exchange = dlx.orders
x-dead-letter-routing-key = retry
x-message-ttl = 86400000 (24h)
↓ Consumer 消费失败 (count < max_retries)
2026-01-30 15:38:11 +08:00
↓ basic_nack(requeue=false)
↓ 触发 DLX 机制
2026-05-15 08:48:24 +08:00
dlx.orders (topic)
↓ binding key = "retry"
2026-01-30 15:38:11 +08:00
2026-05-15 08:48:24 +08:00
orders.retry.queue
arguments:
x-message-ttl = 5000 (5s)
x-dead-letter-exchange = main.exchange
x-dead-letter-routing-key = order.retry
↓ 5s 后 TTL 到期,自动死信回 main.exchange
2026-01-30 15:38:11 +08:00
2026-05-15 08:48:24 +08:00
main.exchange (topic)
↓ binding key = "order.#" 匹配 "order.retry"
2026-01-30 15:38:11 +08:00
orders.queue (重新进入主队列,x-death count+1)
```
2026-05-15 08:48:24 +08:00
> 注意:retry queue 的 `x-dead-letter-routing-key` 是 **`{entity_singular}.retry`**(如 `order.retry` / `product.retry`),不保留原 routing key。由于 main.exchange 用通配符 `order.#` 绑定 orders.queue`order.retry` 能正常匹配回流;但失去了"消息原平台来源"信息(platform 字段已存在 message 体内,不依赖 routing key 追溯)。
2026-01-30 15:38:11 +08:00
### 环境变量配置
`.env` 文件中可以配置以下参数:
```bash
# 最大重试次数(默认3次)
# 消息重试超过此次数后,会被发送到 errors.queue
AMQP_MAX_RETRIES=3
# 调试延迟(秒,默认0
# 设置为 2 可以让每条消息处理延迟2秒,方便在 mq:status 中观察队列状态
# 生产环境应设置为 0
AMQP_CONSUMER_DEBUG_DELAY=0
```
**使用示例**
```bash
# 开发环境:观察消息流转
AMQP_CONSUMER_DEBUG_DELAY=2 php bin/hyperf.php start
# 生产环境:正常运行
AMQP_CONSUMER_DEBUG_DELAY=0 php bin/hyperf.php start
```
---
2025-11-14 16:04:05 +08:00
## 架构设计
### 1. VHost 设计
VHost 按业务应用进行划分,便于资源隔离和权限管理。
2026-05-15 08:48:24 +08:00
**当前 VHost**
- `datahub` — 电商数据流转专用(注:早期文档误写为 `datahub`**现网实际为 `datahub`**
2025-11-14 16:04:05 +08:00
---
### 2. Exchange 设计
2026-05-15 08:48:24 +08:00
`datahub` VHost 中,Exchange 分为三类:
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### 主路由 Exchange
2025-11-14 16:04:05 +08:00
| Exchange 名称 | 类型 | 持久化 | 用途 |
|--------------|------|--------|------|
2026-05-15 08:48:24 +08:00
| `main.exchange` | Topic | 是 | **仅作为 retry 回流入口** + Consumer Annotation binding 声明源;**不**作为 producer 入口 |
> ⚠ producer 直推 `{platform}.exchange`**不**经过 main.exchange。main.exchange 的两个用途:① retry queue 的 `x-dead-letter-exchange` 指向它,retry 消息通过它回流业务队列;② Hyperf `#[Consumer(exchange: "main.exchange", routingKey: "{entity}.#", queue: "{dtype}.queue")]` 注解声明 binding 时用它。
#### 平台业务 Exchangeproducer 入口)
每个平台一个独立的 Topic Exchange,共 19 个(与 `platforms` 表 enabled 平台对齐):
| 平台 (`name` 标准化后) | Exchange |
|---|---|
| amazon_japan | `amazon_japan.exchange` |
| coupang | `coupang.exchange` |
| dewu | `dewu.exchange` |
| douyin | `douyin.exchange` |
| goofish | `goofish.exchange` |
| jd | `jd.exchange` |
| kaola | `kaola.exchange` |
| lazada | `lazada.exchange` |
| naver | `naver.exchange` |
| offline | `offline.exchange` |
| pdd | `pdd.exchange` |
| rakuten | `rakuten.exchange` |
| redbook | `redbook.exchange` |
| shopee | `shopee.exchange` |
| shopify | `shopify.exchange` |
| tmall | `tmall.exchange` |
| wechat | `wechat.exchange` |
| wechat_video | `wechat_video.exchange` |
| youzan | `youzan.exchange` |
全部 `type=topic, durable=true`
**Routing Key 规范**`{entity_singular}.{platform}` 格式,注意 entity 单数 / 平台名小写下划线):
| 数据类型 | 示例 routing key |
|---|---|
| 订单 (order) | `order.tmall` / `order.amazon_japan` / `order.wechat_video` |
| 产品 (product) | `product.tmall` / `product.amazon_japan` / `product.wechat_video` |
| 退款 (refund) | `refund.tmall` / `refund.amazon_japan` / `refund.wechat_video` |
| 库存 (inventory) | `inventory.tmall` / `inventory.amazon_japan` / `inventory.wechat_video` |
2025-11-14 16:04:05 +08:00
**设计说明**
2026-05-15 08:48:24 +08:00
- producer 直推 `{platform}.exchange`,使用 routing key `{entity_singular}.{platform}`
- 业务 binding 把 `{platform}.exchange` 通过 routing key 直绑到 4 个 `{dtype}.queue`(不经过 main.exchange
- 消费者通过消息体 `platform` / `data_type` 字段进行业务分发(**不**依赖 routing key 解析)
#### 平台错误 Exchange(应用层错误推送 + 平台错误通知)
每个平台一个独立的错误 Topic Exchange,共 19 个,命名 `{platform}.errors.exchange`
| 来源平台 | Errors Exchange |
|---|---|
| 19 个平台 | `{platform}.errors.exchange`amazon_japan / coupang / dewu / douyin / goofish / jd / kaola / lazada / naver / offline / pdd / rakuten / redbook / shopee / shopify / tmall / wechat / wechat_video / youzan|
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
每个 `{platform}.errors.exchange` 通过 binding rk=`#` 直连 `errors.queue`
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### 全局错误 Exchange
2025-11-14 16:04:05 +08:00
| Exchange 名称 | 类型 | 持久化 | 用途 |
2026-05-15 08:48:24 +08:00
|---|---|---|---|
| `errors.exchange` | Topic | 是 | 应用层 `ErrorProducer` 推送目标,binding rk=`#` 直连 `errors.queue` |
**Routing Key 规范(应用层 publish 时)**
- 应用层 `ErrorProducer.php` 固定使用 routing key = `error`(见代码)
- 平台 producer 在自检/校验失败时可向 `{platform}.errors.exchange` 自定义 routing key(业务侧决定,由 binding `#` 兜底)
2025-11-14 16:04:05 +08:00
---
### 3. Queue 设计
#### 业务队列(主队列)
物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | DLX | 用途 |
|-----------|--------|----------|----------|-----|------|
| `orders.queue` | 是 | 否 | 24小时 | `dlx.orders` | 接收所有平台的订单数据,单消费者批处理 |
| `products.queue` | 是 | 否 | 24小时 | `dlx.products` | 接收所有平台的产品数据,单消费者批处理 |
| `refunds.queue` | 是 | 否 | 24小时 | `dlx.refunds` | 接收所有平台的退款数据,单消费者批处理 |
| `inventory.queue` | 是 | 否 | 24小时 | `dlx.inventory` | 接收所有平台的库存数据,单消费者批处理 |
**队列参数配置**
```json
{
"x-message-ttl": 86400000,
"x-dead-letter-exchange": "dlx.orders",
"x-dead-letter-routing-key": "retry"
}
```
#### 死信交换机(DLX
每种数据类型配备一个 DLX,用于处理失败消息的路由:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|--------------|------|--------|------|
| `dlx.orders` | Topic | 是 | 订单失败消息路由 |
| `dlx.products` | Topic | 是 | 产品失败消息路由 |
| `dlx.refunds` | Topic | 是 | 退款失败消息路由 |
| `dlx.inventory` | Topic | 是 | 库存失败消息路由 |
#### 延迟重试队列
为每种数据类型提供延迟重试机制:
| Queue 名称 | 持久化 | 消息 TTL | DLX | 用途 |
|-----------|--------|----------|-----|------|
| `orders.retry.queue` | 是 | 5秒 | `main.exchange` | 订单延迟重试(TTL后自动回到主队列)|
| `products.retry.queue` | 是 | 5秒 | `main.exchange` | 产品延迟重试 |
| `refunds.retry.queue` | 是 | 5秒 | `main.exchange` | 退款延迟重试 |
| `inventory.retry.queue` | 是 | 5秒 | `main.exchange` | 库存延迟重试 |
2026-05-15 08:48:24 +08:00
**重试队列参数配置**(按 data_type 实例化,`{entity_singular}` ∈ {`order`, `product`, `refund`, `inventory`}):
2025-11-14 16:04:05 +08:00
```json
{
"x-message-ttl": 5000,
"x-dead-letter-exchange": "main.exchange",
2026-05-15 08:48:24 +08:00
"x-dead-letter-routing-key": "{entity_singular}.retry"
2025-11-14 16:04:05 +08:00
}
```
2026-05-15 08:48:24 +08:00
例:`orders.retry.queue` 的 dl-rk = `order.retry`,回流时由 main.exchange 的 `order.#` binding 匹配进入 `orders.queue`
**重试机制说明(现网实现)**
1. 消费者处理失败时,调用 `FailedMessageTrait::getRetryCount``x-death[0].count`
2026-01-30 15:38:11 +08:00
2. 根据重试次数决定处理方式:
2026-05-15 08:48:24 +08:00
- **count < `AMQP_MAX_RETRIES`(默认 3**`return Result::NACK`Hyperf 自动按 `$requeue=false` 触发 DLX)→ 进入 `dlx.{dtype}` → 路由到 `{dtype}.retry.queue`5s 延迟)→ TTL 到期回流 `main.exchange` → 重新进入主队列
- **count >= max_retries**Consumer 调用 `FailedMessageTrait::sendToErrorQueue`**同时**
- `ErrorProducer.publish``errors.exchange` (rk=`error`) → `errors.queue` (TTL 7d)
- `FailedMessage::create` → DB `failed_messages`
- 然后 `return Result::ACK`,避免主队列继续重试
3. 配置参数:
- **最大重试次数**`AMQP_MAX_RETRIES`(默认 3
- **延迟时间**5 秒(retry 队列 `x-message-ttl=5000`
- **调试延迟**`AMQP_CONSUMER_DEBUG_DELAY`(生产环境设 0
2025-11-14 16:04:05 +08:00
#### 错误队列
统一的错误队列,接收所有超过重试次数的失败消息:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | 用途 |
|-----------|--------|----------|----------|------|
| `errors.queue` | 是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 |
2026-01-30 15:38:11 +08:00
**错误队列的作用和机制**
-**收集永久失败的消息**:当消息重试次数达到上限(默认3次)后,消费者会主动将消息发送到此队列
-**避免消息丢失**:即使处理失败多次,消息也会被保存在错误队列中,不会丢失
-**人工排查和修复**:运维人员可以从错误队列中查看失败原因,修复问题后重新投递
-**统一管理**:所有数据类型(orders/products/refunds/inventory)的错误消息都集中在一个队列
- ⚠️ **不会自动重试**:错误队列中的消息需要人工介入,不会自动回流到主队列
**错误消息格式**
```json
{
"error_id": "err_675d8f3a2b1c4",
"original_message": { /* */ },
"error": {
"type": "InvalidArgumentException",
"message": "Missing required field: total_amount",
"trace": "Exception stack trace...",
"timestamp": "2025-01-15T10:31:00+00:00"
},
"metadata": {
"platform": "tmall",
"platform_id": 2,
"company_id": 188,
"store_id": 292,
"data_type": "order",
"failed_at": "2025-01-15T10:31:00+00:00",
"retry_count": 3
}
}
```
2025-11-14 16:04:05 +08:00
---
2026-05-15 08:48:24 +08:00
### 4. Binding 配置(现网实际)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
本节按"消息流向"组织,每个 binding 都来自本地 MQ 实测(`curl mgmt-api/bindings/datahub`+ `backend/bin/rabbitmq.sh` 创建逻辑。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### 4.1 平台业务 Exchange → 业务队列(producer 主路径)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
每个 `{platform}.exchange` 通过 4 条 binding 直接绑定到 4 个业务队列:
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
{platform}.exchange (topic)
├── rk = "order.{platform}" → orders.queue
├── rk = "product.{platform}" → products.queue
├── rk = "refund.{platform}" → refunds.queue
└── rk = "inventory.{platform}" → inventory.queue
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
19 平台 × 4 = **76 条 binding**
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### 4.2 main.exchange → 业务队列(retry 回流路径)
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
main.exchange (topic)
├── rk = "order.#" → orders.queue
├── rk = "product.#" → products.queue
├── rk = "refund.#" → refunds.queue
└── rk = "inventory.#" → inventory.queue
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
**4 条 binding**。这是 retry queue 死信回流的唯一入口;producer **不**通过 main.exchange 投递消息。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### 4.3 DLX → retry queue(仅一种绑定)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
```
dlx.orders (topic) — rk = "retry" → orders.retry.queue
dlx.products (topic) — rk = "retry" → products.retry.queue
dlx.refunds (topic) — rk = "retry" → refunds.retry.queue
dlx.inventory (topic) — rk = "retry" → inventory.retry.queue
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
**4 条 binding**
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
> ⚠ **DLX 不绑定到 errors.queue**。错误进入 errors.queue 由应用层 `ErrorProducer` 主动 publish 实现(见 §4.5),**不**通过 DLX 路由分流。这一点与早期文档草稿("dlx.{dtype} → errors.queue (rk=error)" 双绑模式)不符,**以现网实现为准**。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### 4.4 retry queue → main.exchangeTTL 死信回流)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
retry queue 不需要显式 binding;其 `x-dead-letter-exchange = main.exchange``x-dead-letter-routing-key = {entity_singular}.retry` 定义在队列 arguments 中,TTL 到期由 RabbitMQ 引擎自动死信投递到 main.exchange。
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
orders.retry.queue —(TTL 5s, dl-rk=order.retry)→ main.exchange →[order.#]→ orders.queue
products.retry.queue —(TTL 5s, dl-rk=product.retry)→ main.exchange →[product.#]→ products.queue
refunds.retry.queue —(TTL 5s, dl-rk=refund.retry)→ main.exchange →[refund.#]→ refunds.queue
inventory.retry.queue —(TTL 5s, dl-rk=inventory.retry)→ main.exchange →[inventory.#]→ inventory.queue
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
#### 4.5 errors exchange → errors.queue(双入口扇入)
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
errors.exchange (topic) — rk = "#" → errors.queue (全局兜底入口,应用层 ErrorProducer 用)
{platform}.errors.exchange (topic) — rk = "#" → errors.queue × 19 (平台自定义错误投递入口)
```
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
**20 条 binding**19 平台 + 1 全局)。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### 4.6 binding 总数核对
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
| 类别 | 数量 |
|---|---|
| 平台业务 exchange → 4 业务队列 | 19 × 4 = 76 |
| main.exchange → 4 业务队列 | 4 |
| dlx.{dtype} → retry queue | 4 |
| errors exchange → errors.queue19 平台 + 1 全局) | 20 |
| **总计** | **104** |
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
(不含 default exchange `(AMQP default)` 自动给每个 queue 创建的同名 binding
2025-11-14 16:04:05 +08:00
---
## 消费者设计
### 1. 消费者模型
采用 **单消费者 + 批处理 + prefetch + 适配器模式**
| 数据类型 | 消费者 | Queue | 并发模式 | 处理模式 |
|---------|--------|-------|----------|---------|
| 订单 | Order Consumer | `orders.queue` | 单消费者 | 批处理 + 适配器 |
| 产品 | Product Consumer | `products.queue` | 单消费者 | 批处理 + 适配器 |
| 退款 | Refund Consumer | `refunds.queue` | 单消费者 | 批处理 + 适配器 |
| 库存 | Inventory Consumer | `inventory.queue` | 单消费者 | 批处理 + 适配器 |
**设计原则**
-**单消费者**:每个队列只有一个消费者实例,保证严格 FIFO 顺序
-**批处理**:积累一批消息后统一处理,提升数据库写入效率
-**单条 ACK**:每条消息独立确认,失败不影响其他消息
-**适配器模式**:消费者内部根据平台字段分发到不同业务逻辑
2025-11-14 16:07:35 +08:00
**消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **
2025-11-14 16:04:05 +08:00
### 2. Prefetch 配置
```json
{
"prefetch_count": 100,
"batch_size": 50,
"batch_timeout": 3000
}
```
**参数说明**
- `prefetch_count=100`RabbitMQ 最多推送 100 条未确认消息到消费者
- `batch_size=50`:消费者积累 50 条消息后触发批处理
- `batch_timeout=3000`:如果3秒内未达到 batch_size,也触发处理
**工作流程**
1. RabbitMQ 按 Push 模式推送最多 100 条消息到消费者
2. 消费者缓存消息,达到 50 条或超时 3 秒后批量处理
3. 批量处理时,仍然对每条消息单独 ACK/NACK
4. 实现"伪批量拉取"效果,同时保留单条确认的灵活性
2026-05-15 08:48:24 +08:00
### 3. 消费者处理逻辑(现网实现简化版)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
完整实现见 [`backend/app/Platform/OrderConsumer.php`](../backend/app/Platform/OrderConsumer.php) 与 [`FailedMessageTrait.php`](../backend/app/Platform/Traits/FailedMessageTrait.php)。简化骨架:
```php
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue",
pool: "default_consumer", nums: 1, enable: true)]
class OrderConsumer extends ConsumerMessage
{
use FailedMessageTrait;
protected ?array $qos = ['prefetch_size' => 0, 'prefetch_count' => 1, 'global' => false];
protected bool $requeue = false; // 关键:让失败消息进 DLX 而不是 requeue
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())
->setQueue($this->getQueue())
->setArguments([
'x-dead-letter-exchange' => ['S', 'dlx.orders'],
'x-dead-letter-routing-key' => ['S', 'retry'],
'x-message-ttl' => ['I', 86400000], // 24h
'x-queue-type' => ['S', 'classic'],
]);
}
public function consumeMessage($data, AMQPMessage $message): Result
{
$retry_count = $this->getRetryCount($message); // 读 x-death[0].count
$max_retries = (int) env('AMQP_MAX_RETRIES', 3);
try {
// 1. 创建 EntityParse、entityMatch、entityMap、upsert
$parse = EntityParseFactory::createFromMessage($message);
$entity = $parse->entityMatch($data['meta']);
$rows = $parse->entityMap($data['data'])->all();
Db::beginTransaction();
$entity->newQuery()->upsert($rows, $parse->getUniqueBy(), $parse->getUpdateFields());
// 2. 子项处理、聚合补刷等业务逻辑 ...
Db::commit();
return Result::ACK;
} catch (Throwable $error) {
Db::rollBack();
Log::get()->error('Consumer processing failed', [...]);
if ($retry_count >= $max_retries) {
// 双写兜底:MQ errors.queue + DB failed_messages 表
$this->sendToErrorQueue($message, $error);
return Result::ACK; // 防止主队列继续重试
}
return Result::NACK; // requeue=false → 进 DLX → retry queue
2025-11-14 16:04:05 +08:00
}
2026-05-15 08:48:24 +08:00
}
}
```
`FailedMessageTrait::sendToErrorQueue` 内部:
```php
$producer = ApplicationContext::getContainer()->get(Producer::class);
$producer->produce(new ErrorProducer($message, $error, $retry_count));
// ↓ ErrorProducer 注解 #[Producer(exchange: 'errors.exchange', routingKey: 'error')]
// → errors.exchange → errors.queue (MQ 兜底, 7d TTL)
FailedMessage::query()->create([...]); // → DB failed_messages 表 (DB 兜底)
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
### 4. 重试机制工作流程(现网实现)
2025-11-14 16:04:05 +08:00
```
┌─────────────────┐
│ orders.queue │ ← 消费者从这里接收消息
└────────┬────────┘
2026-05-15 08:48:24 +08:00
├─ 成功 → return Result::ACK
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
└─ 失败 → FailedMessageTrait::getRetryCount($message)
(读 x-death[0].count)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
├─ count < AMQP_MAX_RETRIES (默认 3)
2026-01-30 15:38:11 +08:00
│ │
2026-05-15 08:48:24 +08:00
│ └→ return Result::NACK (Hyperf 自动按 $requeue=false 处理)
2026-01-30 15:38:11 +08:00
│ │
2026-05-15 08:48:24 +08:00
│ ▼ 触发 DLX
│ dlx.orders (topic) ──rk="retry"──► orders.retry.queue
│ │ TTL=5s
│ ▼
│ main.exchange (rk="order.retry")
│ │
│ ▼ 命中 binding "order.#"
│ orders.queue (重新消费,x-death.count+1)
2026-01-30 15:38:11 +08:00
2026-05-15 08:48:24 +08:00
└─ count >= AMQP_MAX_RETRIES
2026-01-30 15:38:11 +08:00
2026-05-15 08:48:24 +08:00
└→ FailedMessageTrait::sendToErrorQueue($message, $error)
├──► ErrorProducer.publish → errors.exchange (rk="error")
│ → errors.queue (TTL 7d, user_ops 捞回)
└──► FailedMessage::query()->create(...)
→ DB failed_messages 表
2026-01-30 15:38:11 +08:00
2026-05-15 08:48:24 +08:00
return Result::ACK (避免主队列继续重试)
2025-11-14 16:04:05 +08:00
```
2026-01-30 15:38:11 +08:00
**关键点**
2026-05-15 08:48:24 +08:00
-**`$requeue=false` 由 Consumer 类属性固定**:让失败消息进入 DLX 而不是 requeue
-**retry 队列自动回流**RabbitMQ 引擎按 retry queue 的 `x-message-ttl=5000` 自动死信回 main.exchange,不需要额外消费者
-**`x-death.count` 自动累加**:每次死信 RabbitMQ 自动增加 countConsumer 通过 `FailedMessageTrait::getRetryCount` 读取
-**错误兜底双写**:超过重试上限时 errors.queue + failed_messages 表**同步双写**,互为备份
- ⚠️ **DLX 不绑定 errors.queue**:进入 errors.queue 完全由应用层 `ErrorProducer` 主动 publish 实现,**不**走 DLX 路由分流
2026-01-30 15:38:11 +08:00
2025-11-14 16:04:05 +08:00
### 5. 为什么这样设计?
| 特性 | 原因 | 优势 |
|------|------|------|
| **单队列 FIFO** | RabbitMQ 无法在队列内部按 topic 过滤 | 保证严格 FIFO,不拆分队列造成业务乱序 |
| **单消费者** | RabbitMQ 同队列多消费者必然 Round-Robin 分配 | 避免消息错投到错误的消费者 |
| **Prefetch + 批处理** | RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 | 实现"伪批量拉取",提升吞吐 |
| **单条 ACK** | 保留细粒度控制 | 失败消息不影响其他消息 |
| **DLX + 延迟重试** | 避免失败消息阻塞队列 | 不会卡 FIFO,不会无限重试 |
| **适配器模式** | 多平台业务逻辑隔离 | 扩展性好,易维护 |
---
## 用户权限配置
2026-05-15 08:48:24 +08:00
> **VHost 名称**:现网为 `datahub`(早期草稿误写为 `datahub-app`)。下述权限正则均来自本地 MQ 实测(`curl mgmt-api/permissions`+ `backend/bin/rabbitmq.sh`,是**现网真实使用**的正则,不要随意改动。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
### 1. 平台开发者账号(19 个)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
用户名规范:`user_{platform}`,其中 `{platform}``platforms``name` 字段经 `strtolower(str_replace(' ', '_', $name))` 标准化后的结果。
完整列表:`user_amazon_japan` / `user_coupang` / `user_dewu` / `user_douyin` / `user_goofish` / `user_jd` / `user_kaola` / `user_lazada` / `user_naver` / `user_offline` / `user_pdd` / `user_rakuten` / `user_redbook` / `user_shopee` / `user_shopify` / `user_tmall` / `user_wechat` / `user_wechat_video` / `user_youzan`
**权限正则模板**(按平台名展开):
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
configure = ^{platform}\.(exchange|errors\.exchange)$
write = ^{platform}\.(exchange|errors\.exchange)$
read = ^{platform}\.errors\..*$
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
示例(tmall):
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
configure = ^tmall\.(exchange|errors\.exchange)$
write = ^tmall\.(exchange|errors\.exchange)$
read = ^tmall\.errors\..*$
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
**密码 env 命名**`MQ_PASSWORD_{PLATFORM_UPPER}`,其中 `PLATFORM_UPPER` = 平台标准化名转大写。例:`MQ_PASSWORD_TMALL``MQ_PASSWORD_AMAZON_JAPAN``MQ_PASSWORD_WECHAT_VIDEO`。密码由部署期 render-rabbitmq.sh 生成;由 `mq_user.php` 动态从 `platforms` 表读取启用平台后注入。
2025-11-14 16:04:05 +08:00
2026-02-28 10:38:33 +08:00
### 2. Datahub 消费者账号
2025-11-14 16:04:05 +08:00
```
2026-02-28 10:38:33 +08:00
用户名: user_datahub_consumer
2026-05-15 08:48:24 +08:00
密码: env MQ_PASSWORD_CONSUMER
VHost: datahub
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
configure = ^(main\.exchange|errors\.exchange|dlx\..*)|(.*\.queue)$
write = ^(orders|products|refunds|inventory).*\.queue$|(dlx\..*)|(errors\.exchange)|(.*\.errors\.exchange)$
read = ^(main\.exchange|(orders|products|refunds|inventory).*\.queue|dlx\..*)$
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
> ⚠ 此权限正则比早期文档("只读业务队列")宽松:消费者需要 **configure** main.exchange / errors.exchange / dlx.* / 所有 queue(因为 Hyperf Consumer Annotation 在 worker 启动时 declare 这些资源),需要 **write** errors.exchange + platform.errors.exchange(因为 `ErrorProducer` 走这条),需要 **read** main.exchange + 业务队列 + dlx.*。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
### 3. 运维监控账号
2025-11-14 16:04:05 +08:00
```
用户名: user_ops
2026-05-15 08:48:24 +08:00
密码: env MQ_PASSWORD_OPS
VHost: datahub
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
configure = ^errors\..*$
write = (留空)
read = ^errors\.queue$
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
仅能读 `errors.queue` 做人工捞回;与 DB `failed_messages` 表(应用层提供 Admin UI / API 查询)互为冗余。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
### 4. 管理员账号
2025-11-14 16:04:05 +08:00
```
用户名: admin
2026-05-15 08:48:24 +08:00
密码: <强密码,仅 ops/dev 持有>
Tag: administrator
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
management API: AMQP_ADMIN_USER / AMQP_ADMIN_PASSWORD
(见 backend/config/autoload/amqp.php "management" 段)
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
完整权限,可访问管理界面。**生产环境**仅用于运维操作 / `rabbitmqadmin` CLI / mgmt API;不暴露给应用代码。
2025-11-14 16:04:05 +08:00
---
## 消息格式规范
### 1. message_id 设计
2026-05-15 08:48:24 +08:00
message_id 采用结构化 5 段格式(见 [`OrderProducer::generateMessageId`](../backend/app/Platform/OrderProducer.php)):
2025-11-14 16:04:05 +08:00
**格式**
2026-05-15 08:48:24 +08:00
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
{company_id}#{platform_id}#{store_id}#{entity_type}#{unique_id}
2025-11-14 16:04:05 +08:00
```
**字段说明**
2026-05-15 08:48:24 +08:00
| 段 | 含义 | 来源 |
|---|---|---|
| `company_id` | 公司 ID | producer 输入参数 |
| `platform_id` | 平台 ID(数据库主键) | producer 输入参数 |
| `store_id` | 店铺 ID | producer 输入参数 |
| `entity_type` | 数据实体类型(order/product/refund/inventory),**单数** | 由 `routingKey` 前缀自动派生(`explode('.', $routingKey)[0]`|
| `unique_id` | 业务侧定义的唯一标识 / 时间区间 / batch id 等,**业务侧维护幂等性** | producer 输入参数 |
2025-11-14 16:04:05 +08:00
**示例**
2026-05-15 08:48:24 +08:00
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
188#2#292#order#68a3f2c4e9b1f
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
解析:company=188, platform=2 (tmall), store=292, entity=order, unique_id=`68a3f2c4e9b1f`uniqid()
**分隔符**`#`RabbitMQ message_id 字段允许任意字符;不与 routing key 通配符冲突)。
2025-11-14 16:04:05 +08:00
**特性**
2026-05-15 08:48:24 +08:00
- ✅ 幂等性:业务侧通过 `unique_id` 保证同一实体的 message_id 一致(消费端 upsert 唯一键由 EntityParse 配置)
- ✅ 可读性:直接从 message_id 看出数据来源 4 段元数据
- ✅ 便于调试:日志 / 监控可直接根据前 4 段过滤
2025-11-14 16:04:05 +08:00
### 2. 业务消息格式
2026-05-15 08:48:24 +08:00
由 [`OrderProducer::buildMessage`](../backend/app/Platform/OrderProducer.php) 构造(基类,所有 platform Producer 共用):
2025-11-14 16:04:05 +08:00
```json
{
2026-05-15 08:48:24 +08:00
"message_id": "188#2#292#order#68a3f2c4e9b1f",
"timestamp": "2026-05-14T13:43:59+08:00",
"platform": "tmall",
2025-11-14 16:04:05 +08:00
"data_type": "order",
2026-05-15 08:48:24 +08:00
"meta": {
"platform_id": 2,
"company_id": 188,
"store_id": 292,
"platform_store_id": null,
"unique_id": "68a3f2c4e9b1f",
"source_system": "tmall_api",
2025-11-14 16:04:05 +08:00
"retry_count": 0,
2026-05-15 08:48:24 +08:00
"data_version": 1747203839
2025-11-14 16:04:05 +08:00
},
2026-05-15 08:48:24 +08:00
"data": [
{ /* raw_data 1 */ },
{ /* raw_data 2 */ }
]
2025-11-14 16:04:05 +08:00
}
```
**字段说明**
| 字段 | 类型 | 必需 | 说明 |
2026-05-15 08:48:24 +08:00
|---|---|---|---|
| `message_id` | string | 是 | 5 段结构化 ID(见 §1)|
| `timestamp` | string | 是 | ISO 8601 格式(`date('c')`|
| `platform` | string | 是 | 平台标识(标准化后的小写下划线名);基类默认 `'platform_unkonw'`,子类 `buildMessage` 中覆盖(如 `TmallProductProducer` 覆盖为 `'tmall'`|
| `data_type` | string | 是 | 数据类型(order/product/refund/inventory**单数**);基类默认 `'order'`,子类覆盖 |
| `meta.platform_id` | int | 是 | 平台 ID(数据库主键,来自 `platforms` 表)|
| `meta.company_id` | int | 是 | 公司 ID |
| `meta.store_id` | int | 是 | 店铺 ID |
| `meta.platform_store_id` | string\|null | 否 | 平台侧店铺标识(如有)|
| `meta.unique_id` | string | 是 | 业务唯一标识(与 message_id 末段一致)|
| `meta.source_system` | string | 是 | 数据来源系统(基类默认 `'origin_unknow'`,子类覆盖如 `'tmall_api'`|
| `meta.retry_count` | int | 是 | 重试次数(初始 0;消费端通过 AMQP `x-death[0].count` 读取真实值,不读 meta|
| `meta.data_version` | int | 是 | 数据版本(Unix 时间戳,用于乱序消费保护)|
| `data` | array | 是 | 平台原始数据数组(基类直接传 `$data['raw_data']`|
> ⚠ **字段名是 `meta` 不是 `metadata`**(与早期文档草稿不同)。消费端 `OrderConsumer.php:86` 读 `$data['meta']`。
**关于 `data_version`**:建议使用平台数据的更新时间戳,消费端可据此防止旧版本覆盖新版本;如平台未提供则 `OrderProducer::buildMessage` fallback 到 `time()`
2025-11-14 16:04:05 +08:00
### 3. 错误消息格式
2026-05-15 08:48:24 +08:00
由 [`ErrorProducer::buildErrorMessage`](../backend/app/Platform/ErrorProducer.php) 构造(投递到 `errors.exchange` rk=`error`):
2025-11-14 16:04:05 +08:00
```json
{
2026-05-15 08:48:24 +08:00
"error_id": "err_68f3d2c4abc1f.93214785",
2025-11-14 16:04:05 +08:00
"original_message": {
2026-05-15 08:48:24 +08:00
/* meta / data*/
2025-11-14 16:04:05 +08:00
},
"error": {
2026-05-15 08:48:24 +08:00
"type": "InvalidArgumentException",
"message": "Company with ID 188 not found",
"code": 0,
"file": "/var/www/app/Platform/Tmall/EntityParse/Product.php",
"line": 42,
2025-11-14 16:04:05 +08:00
"trace": "Exception stack trace...",
2026-05-15 08:48:24 +08:00
"timestamp": "2026-05-14T13:31:00+08:00"
2025-11-14 16:04:05 +08:00
},
"metadata": {
2026-05-15 08:48:24 +08:00
"platform": "tmall",
"platform_id": 2,
"company_id": 188,
"store_id": 292,
2025-11-14 16:04:05 +08:00
"data_type": "order",
2026-05-15 08:48:24 +08:00
"message_id": "188#2#292#order#68a3f2c4e9b1f",
"failed_at": "2026-05-14T13:31:00+08:00",
"retry_count": 3
2025-11-14 16:04:05 +08:00
}
}
```
2026-05-15 08:48:24 +08:00
> 注意:错误消息的字段名是 `metadata`(与业务消息的 `meta` 不一致;保留以兼容现有应用代码 `FailedMessageTrait::persistFailedMessage` 的解析)。
**双写到 DB**:错误消息同步写入 [`failed_messages`](../backend/app/Model/FailedMessage.php) 表(model 字段:error_id / data_type / platform / platform_id / company_id / store_id / error_type / error_message / error_code / error_trace / original_message(json) / retry_count / message_id / failed_at / created_at)。Admin 可通过 [`FailedMessageController`](../backend/app/Controller/Api/V1/FailedMessageController.php) 查询。
2025-11-14 16:04:05 +08:00
---
## RabbitMQ 管理界面操作指引
### 1. 访问管理界面
```
URL: http://<rabbitmq-host>:15672
2026-05-15 08:48:24 +08:00
用户名: admin(开发期默认 admin/admin;生产期通过 podman secret / env 注入)
2025-11-14 16:04:05 +08:00
密码: <管理员密码>
```
2026-05-15 08:48:24 +08:00
**推荐使用 Hyperf 命令**(避开浏览器,看核心 9 个队列状态):
```bash
php ./bin/hyperf.php app:mq:status # 单次输出
php ./bin/hyperf.php app:mq:status --watch --interval=5 # 每 5 秒刷新
```
输出包含三组:Business Queues4 业务)/ Dead Letter Queues (Retry Queues)4 retry/ Shared Queueserrors),分别展示 Messages / Consumers / Status。
2025-11-14 16:04:05 +08:00
### 2. 创建 VHost
1. 点击顶部导航 **Admin****Virtual Hosts**
2. 点击 **Add a new virtual host**
2026-05-15 08:48:24 +08:00
3. 输入 VHost 名称:`datahub`
2025-11-14 16:04:05 +08:00
4. 点击 **Add virtual host**
### 3. 创建用户
1. 点击顶部导航 **Admin****Users**
2. 点击 **Add a user**
3. 填写用户信息:
2026-05-15 08:48:24 +08:00
- **Username**: `user_tmall`(按 `user_{platform}` 规范)
2025-11-14 16:04:05 +08:00
- **Password**: `<安全密码>`
- **Tags**: (留空,非管理员用户)
4. 点击 **Add user**
### 4. 配置用户权限
2026-05-15 08:48:24 +08:00
1. 在用户列表中,点击用户名(如 `user_tmall`
2025-11-14 16:04:05 +08:00
2. 滚动到 **Permissions** 部分
2026-05-15 08:48:24 +08:00
3. 选择 VHost`datahub`
4. 配置权限正则表达式(与现网模板一致):
- **Configure regexp**: `^tmall\.(exchange|errors\.exchange)$`
- **Write regexp**: `^tmall\.(exchange|errors\.exchange)$`
- **Read regexp**: `^tmall\.errors\..*$`
2025-11-14 16:04:05 +08:00
5. 点击 **Set permission**
### 5. 创建 Exchange
1. 点击顶部导航 **Exchanges**
2026-05-15 08:48:24 +08:00
2. 确保选择正确的 VHost`datahub`
2025-11-14 16:04:05 +08:00
3. 点击 **Add a new exchange**
4. 填写配置:
2026-05-15 08:48:24 +08:00
- **Name**: `tmall.exchange`
2025-11-14 16:04:05 +08:00
- **Type**: `topic`
- **Durability**: `Durable`
- **Auto delete**: `No`
- **Internal**: `No`
5. 点击 **Add exchange**
2026-05-15 08:48:24 +08:00
重复以上步骤创建:1 个 `main.exchange` + 4 个 `dlx.{dtype}` + 1 个 `errors.exchange` + 每平台 2 个(`{platform}.exchange``{platform}.errors.exchange`)。
2025-11-14 16:04:05 +08:00
### 6. 创建 Queue
1. 点击顶部导航 **Queues**
2026-05-15 08:48:24 +08:00
2. 确保选择正确的 VHost`datahub`
2025-11-14 16:04:05 +08:00
3. 点击 **Add a new queue**
2026-05-15 08:48:24 +08:00
4. 填写配置(以 `orders.queue` 为例):
2025-11-14 16:04:05 +08:00
- **Name**: `orders.queue`
- **Durability**: `Durable`
- **Auto delete**: `No`
- **Arguments**:
2026-05-15 08:48:24 +08:00
- `x-message-ttl` = `86400000`24h
- `x-dead-letter-exchange` = `dlx.orders`
- `x-dead-letter-routing-key` = `retry`
- `x-queue-type` = `classic`
2025-11-14 16:04:05 +08:00
5. 点击 **Add queue**
2026-05-15 08:48:24 +08:00
四个业务队列同模式;retry 队列 arguments
```
x-message-ttl = 5000 # 5s
x-dead-letter-exchange = main.exchange
x-dead-letter-routing-key = {entity_singular}.retry # 如 order.retry
x-queue-type = classic
```
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
`errors.queue` arguments
```
x-message-ttl = 604800000 # 7d
x-queue-type = classic
```
2025-11-14 16:04:05 +08:00
### 7. 创建 Binding
1. 点击顶部导航 **Exchanges**
2026-05-15 08:48:24 +08:00
2. 点击需要绑定的 Exchange(如 `tmall.exchange`
2025-11-14 16:04:05 +08:00
3. 滚动到 **Bindings** 部分
4.**Add binding from this exchange** 区域填写:
- **To queue**: `orders.queue`
2026-05-15 08:48:24 +08:00
- **Routing key**: `order.tmall`
2025-11-14 16:04:05 +08:00
5. 点击 **Bind**
2026-05-15 08:48:24 +08:00
为每个平台 Exchange 创建 **4 个 binding**order/product/refund/inventory)。
`main.exchange` 创建 4 个 bindingrk: `order.#` / `product.#` / `refund.#` / `inventory.#`)。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
每个 `{platform}.errors.exchange` + 全局 `errors.exchange` 各创建 1 个绑定到 `errors.queue`rk=`#`)。
DLX 各创建 1 个绑定到对应 retry queuerk=`retry`)。**不**绑定 errors.queue。
2025-11-14 16:04:05 +08:00
---
## 配置脚本
2026-05-15 08:48:24 +08:00
> **生产环境推荐**:使用 `backend/bin/rabbitmq.sh`(与现网完全一致的脚本,从 `platforms` 表动态读取平台列表)。下方独立脚本仅供**临时手工部署 / 故障排查** 使用。
2025-11-14 16:04:05 +08:00
### 使用 rabbitmqadmin
RabbitMQ 提供命令行工具 `rabbitmqadmin`,可用于批量配置。
#### 安装 rabbitmqadmin
```bash
# 从 RabbitMQ 管理界面下载
wget http://<rabbitmq-host>:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/
```
#### 配置脚本示例
2025-11-17 14:28:05 +08:00
**注意**:本脚本适用于**新版本 rabbitmqadmin**Rust 版本)。如果您使用的是旧版本(Python),请参考官方文档调整语法。
2025-11-14 16:04:05 +08:00
```bash
#!/bin/bash
# RabbitMQ 连接信息
2025-11-17 14:28:05 +08:00
RABBITMQ_HOST="127.0.0.1"
RABBITMQ_PORT="15672"
2025-11-14 16:04:05 +08:00
RABBITMQ_USER="admin"
2025-11-17 14:28:05 +08:00
RABBITMQ_PASS="admin"
2026-02-28 10:38:33 +08:00
VHOST="datahub"
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
# 平台列表(与现网 19 个对齐;新增平台时追加即可)
PLATFORMS=(
"amazon_japan" "coupang" "dewu" "douyin" "goofish"
"jd" "kaola" "lazada" "naver" "offline"
"pdd" "rakuten" "redbook" "shopee" "shopify"
"tmall" "wechat" "wechat_video" "youzan"
)
2025-11-14 16:04:05 +08:00
DATA_TYPES=("orders" "products" "refunds" "inventory")
2025-11-17 14:28:05 +08:00
# 0. 检测并清理现有 VHost 和用户(如果存在)
echo "========================================"
echo "开始清理现有配置..."
echo "========================================"
# 检查 VHost 是否存在
echo "检查 VHost: $VHOST 是否存在..."
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true)
if [ -n "$VHOST_EXISTS" ]; then
echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..."
# 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限)
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete vhost --name "$VHOST"
echo "✓ VHost '$VHOST' 及其所有资源已删除"
else
echo "VHost '$VHOST' 不存在"
fi
# 清理所有相关用户账号
echo ""
echo "清理用户账号..."
# 平台用户列表
ALL_USERS=()
for platform in "${PLATFORMS[@]}"; do
ALL_USERS+=("user_${platform}")
done
# 添加其他用户
2026-02-28 10:38:33 +08:00
ALL_USERS+=("user_datahub_consumer")
2025-11-17 14:28:05 +08:00
ALL_USERS+=("user_ops")
# 删除用户(如果存在)
for user in "${ALL_USERS[@]}"; do
USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true)
if [ -n "$USER_EXISTS" ]; then
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
delete user --name "$user"
echo "✓ 删除用户: $user"
fi
done
echo ""
echo "========================================"
echo "清理完成,开始重新配置..."
echo "========================================"
echo ""
2025-11-14 16:04:05 +08:00
# 1. 创建 VHost
2025-11-17 14:28:05 +08:00
echo "创建 VHost: $VHOST"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare vhost --name "$VHOST"
2025-11-14 16:04:05 +08:00
# 2. 创建主 Exchange(用于重试消息回流)
2025-11-17 14:28:05 +08:00
echo "创建主 Exchange: main.exchange"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "main.exchange" --vhost "$VHOST" \
--type topic --durable true
2025-11-14 16:04:05 +08:00
# 3. 创建 DLX (死信交换机)
2025-11-17 14:28:05 +08:00
echo "创建死信交换机 (DLX)..."
2025-11-14 16:04:05 +08:00
for dtype in "${DATA_TYPES[@]}"; do
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建 DLX: dlx.${dtype}"
2025-11-14 16:04:05 +08:00
done
# 4. 创建主业务队列(带 DLX 配置)
2025-11-17 14:28:05 +08:00
echo ""
echo "创建主业务队列..."
2025-11-14 16:04:05 +08:00
for dtype in "${DATA_TYPES[@]}"; do
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable \
--arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
echo "✓ 创建队列: ${dtype}.queue"
2025-11-14 16:04:05 +08:00
done
# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
2025-11-17 14:28:05 +08:00
echo ""
echo "创建重试队列..."
2025-11-14 16:04:05 +08:00
for dtype in "${DATA_TYPES[@]}"; do
2026-05-15 08:48:24 +08:00
dtype_singular="${dtype%s}"
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable \
2026-05-15 08:48:24 +08:00
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}"
echo "✓ 创建重试队列: ${dtype}.retry.queue (dl-rk: ${dtype_singular}.retry)"
2025-11-14 16:04:05 +08:00
done
# 6. 创建错误队列
2025-11-17 14:28:05 +08:00
echo ""
echo "创建错误队列..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue --name "errors.queue" --vhost "$VHOST" --durable \
--arguments '{"x-message-ttl":604800000}'
echo "✓ 创建错误队列: errors.queue"
2025-11-14 16:04:05 +08:00
# 7. 绑定主 Exchange 到主队列(使用通配符)
2025-11-17 14:28:05 +08:00
echo ""
echo "绑定主 Exchange 到主队列..."
2025-11-14 16:04:05 +08:00
for dtype in "${DATA_TYPES[@]}"; do
# 使用 order.#, product.# 等通配符匹配所有平台
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "main.exchange" --destination "${dtype}.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#"
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
2025-11-14 16:04:05 +08:00
done
2026-05-15 08:48:24 +08:00
# 8. 绑定 DLX 到重试队列(注意:现网 DLX 不绑定 errors.queue,错误由应用层 ErrorProducer 直接推 errors.exchange
2025-11-17 14:28:05 +08:00
echo ""
2026-05-15 08:48:24 +08:00
echo "绑定 DLX 到重试队列..."
2025-11-14 16:04:05 +08:00
for dtype in "${DATA_TYPES[@]}"; do
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "retry"
echo "✓ 绑定: dlx.${dtype}${dtype}.retry.queue (routing_key: retry)"
2025-11-14 16:04:05 +08:00
done
2026-05-15 08:48:24 +08:00
# 8b. 创建全局 errors.exchange 并绑定到 errors.queue
echo ""
echo "创建全局错误 Exchange..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "errors.exchange" --vhost "$VHOST" --type topic --durable true
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "#"
echo "✓ 绑定: errors.exchange → errors.queue (routing_key: #)"
2025-11-14 16:04:05 +08:00
# 9. 为每个平台创建 Exchange 和 Binding
2025-11-17 14:28:05 +08:00
echo ""
echo "========================================"
echo "创建平台 Exchange 和 Binding..."
echo "========================================"
2025-11-14 16:04:05 +08:00
for platform in "${PLATFORMS[@]}"; do
2025-11-17 14:28:05 +08:00
echo ""
echo "处理平台: ${platform}"
2025-11-14 16:04:05 +08:00
# 创建平台业务 Exchange
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建业务 Exchange: ${platform}.exchange"
2025-11-14 16:04:05 +08:00
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "orders.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "order.${platform}"
2025-11-14 16:04:05 +08:00
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "products.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "product.${platform}"
2025-11-14 16:04:05 +08:00
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "refunds.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}"
2025-11-14 16:04:05 +08:00
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.exchange" --destination "inventory.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}"
echo "✓ 绑定到业务队列 (4个)"
2025-11-14 16:04:05 +08:00
# 创建平台错误 Exchange
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \
--type topic --durable true
echo "✓ 创建错误 Exchange: ${platform}.errors.exchange"
2025-11-14 16:04:05 +08:00
# 绑定到错误队列
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost "$VHOST" --routing-key "#"
echo "✓ 绑定到错误队列"
2025-11-14 16:04:05 +08:00
# 创建平台用户
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_${platform}" --password "change_me_${platform}" --tags ""
echo "✓ 创建用户: user_${platform}"
2025-11-14 16:04:05 +08:00
# 配置平台用户权限
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_${platform}" \
2025-11-17 16:51:51 +08:00
--configure "^${platform}\\.(exchange|errors\\.exchange)$" \
--write "^${platform}\\.(exchange|errors\\.exchange)$" \
2025-11-17 14:28:05 +08:00
--read "^${platform}\\.errors\\..*$"
echo "✓ 配置用户权限"
2025-11-14 16:04:05 +08:00
done
2026-02-28 10:38:33 +08:00
# 10. 创建 datahub 消费者用户
2025-11-17 14:28:05 +08:00
echo ""
echo "========================================"
2026-02-28 10:38:33 +08:00
echo "创建 datahub 消费者用户..."
2025-11-17 14:28:05 +08:00
echo "========================================"
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
2026-02-28 10:38:33 +08:00
declare user --name "user_datahub_consumer" --password "change_me_consumer" --tags ""
echo "✓ 创建用户: user_datahub_consumer"
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
2026-02-28 10:38:33 +08:00
declare permissions --vhost "$VHOST" --user "user_datahub_consumer" \
2026-05-15 08:48:24 +08:00
--configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \
--write "^(orders|products|refunds|inventory).*\\.queue\$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)\$" \
--read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)\$"
2025-11-17 14:28:05 +08:00
echo "✓ 配置用户权限"
2025-11-14 16:04:05 +08:00
# 11. 创建运维监控用户
2025-11-17 14:28:05 +08:00
echo ""
echo "创建运维监控用户..."
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user --name "user_ops" --password "change_me_ops" --tags ""
echo "✓ 创建用户: user_ops"
2025-11-14 16:04:05 +08:00
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_ops" \
--configure "^errors\\..*$" --write "" --read "^errors\\.queue$"
echo "✓ 配置用户权限"
2025-11-14 16:04:05 +08:00
2025-11-17 14:28:05 +08:00
echo ""
echo "========================================"
2025-11-14 16:04:05 +08:00
echo "RabbitMQ 配置完成!"
2025-11-17 14:28:05 +08:00
echo "========================================"
2025-11-14 16:04:05 +08:00
echo ""
echo "已创建:"
2025-11-17 14:28:05 +08:00
echo "- 1 个 VHost: $VHOST"
2025-11-14 16:04:05 +08:00
echo "- 1 个主 Exchange: main.exchange"
2026-05-15 08:48:24 +08:00
echo "- 1 个全局错误 Exchange: errors.exchange"
2025-11-14 16:04:05 +08:00
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
echo "- 1 个错误队列: errors.queue"
2026-05-15 08:48:24 +08:00
echo "- $((${#PLATFORMS[@]} * 2)) 个平台 Exchange (${#PLATFORMS[@]} 业务 + ${#PLATFORMS[@]} 错误)"
2025-11-14 16:04:05 +08:00
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
2025-11-17 14:28:05 +08:00
echo ""
echo "提示: 请修改所有用户的默认密码 (change_me_*)"
2026-05-15 08:48:24 +08:00
echo "提示: 验证拓扑:php ./bin/hyperf.php app:mq:status"
2025-11-14 16:04:05 +08:00
```
#### 执行脚本
```bash
chmod +x setup_rabbitmq.sh
./setup_rabbitmq.sh
```
---
## 监控和维护
### 1. 监控指标
通过 RabbitMQ 管理界面查看以下关键指标:
#### 队列健康状态
1. 点击 **Queues** → 选择队列
2. 关注以下指标:
- **Ready**: 待消费消息数(正常应 < 1000)
- **Unacked**: 未确认消息数(正常应较低)
- **Total**: 总消息数
- **Incoming rate**: 消息入队速率
- **Deliver / Get rate**: 消息消费速率
**告警阈值**
- Ready > 10000:队列堆积严重
- Unacked > 5000:消费者处理缓慢
- Incoming rate >> Deliver rate:消费速度跟不上生产速度
#### Exchange 流量统计
1. 点击 **Exchanges** → 选择 Exchange
2. 查看 **Message rates**
- **Publish in**: 消息发布速率
- **Publish out**: 消息路由到队列的速率
**异常情况**
- Publish in > 0 但 Publish out = 0:可能绑定配置错误
### 2. 常见问题排查
#### 消息未到达队列
**检查步骤**
1. 确认 Exchange 存在且类型正确
2. 检查 Binding 配置,确保 routing key 匹配
3. 查看 Exchange 的 **Message rates**,确认消息已发布
4. 检查生产者是否使用了正确的 routing key
#### 消费者无法消费
**检查步骤**
1. 确认用户权限配置正确(Read 权限)
2. 检查队列中是否有消息(Ready > 0
3. 查看消费者连接状态(Connections 页面)
4. 检查消费者代码是否正确 ACK 消息
#### 错误消息未送达平台开发者
**检查步骤**
1. 确认错误 Exchange 绑定到 `errors.queue`
2. 检查平台开发者账号的 Read 权限
3. 确认平台开发者订阅了正确的 Exchange
4. 查看 `errors.queue` 中是否有消息
### 3. 备份和恢复
#### 导出配置
```bash
# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
2026-02-28 10:38:33 +08:00
export datahub-backup.json
2025-11-14 16:04:05 +08:00
```
#### 导入配置
```bash
# 从备份恢复配置
2025-11-17 14:28:05 +08:00
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
2026-02-28 10:38:33 +08:00
import datahub-backup.json
2025-11-14 16:04:05 +08:00
```
### 4. 性能优化
#### 消费者并发配置
- 增加消费者实例数量
- 每个消费者配置多个 channel(如 5-10 个)
- 使用 prefetch_count 限制未确认消息数量
#### 队列性能优化
- 启用 lazy queue(大量消息堆积时):
```bash
2026-05-15 08:48:24 +08:00
rabbitmqadmin declare queue name=orders.queue vhost=datahub \
2025-11-14 16:04:05 +08:00
durable=true arguments='{"x-queue-mode":"lazy"}'
```
- 定期清理过期消息(通过 TTL 自动实现)
### 5. 日志查看
```bash
# 查看 RabbitMQ 日志
tail -f /var/log/rabbitmq/rabbit@<hostname>.log
# 查看连接日志
tail -f /var/log/rabbitmq/rabbit@<hostname>_connection.log
```
---
## 安全建议
### 1. 密码策略
- 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符)
- 定期更换密码(建议 3-6 个月)
- 不同环境(开发/测试/生产)使用不同密码
### 2. 网络安全
- 生产环境启用 TLS/SSL 加密连接
- 限制管理界面访问(仅允许内网或 VPN 访问)
- 配置防火墙规则:
- 5672:AMQP 端口(仅允许应用服务器访问)
- 15672:管理界面(仅允许管理员 IP 访问)
### 3. 权限最小化原则
- 平台开发者只能访问自己的 Exchange
- 消费者只能读取必要的队列
- 避免赋予不必要的 Configure 权限
### 4. 审计日志
- 启用 RabbitMQ 审计日志插件
- 定期审查用户操作记录
- 监控异常登录行为
---
## 新增平台接入流程
2026-05-15 08:48:24 +08:00
> **首选路径**:使用 `backend/bin/rabbitmq.sh add <platform>`(自动化、与现网一致);下方手工脚本仅作参考。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
新增平台(示例 `<platform>` = 标准化后的小写下划线名):
### 1. 创建 Exchange(业务 + 错误)
2025-11-14 16:04:05 +08:00
```bash
2026-05-15 08:48:24 +08:00
P="<platform>" # 例如 "tiktok_shop"
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare exchange --name "${P}.exchange" --vhost datahub --type topic --durable true
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare exchange --name "${P}.errors.exchange" --vhost datahub --type topic --durable true
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
### 2. 创建 5 条 Binding4 业务 + 1 错误)
2025-11-14 16:04:05 +08:00
```bash
2026-05-15 08:48:24 +08:00
for DTYPE in orders products refunds inventory; do
DTYPE_SINGULAR="${DTYPE%s}"
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare binding --source "${P}.exchange" --destination "${DTYPE}.queue" \
--destination-type queue --vhost datahub \
--routing-key "${DTYPE_SINGULAR}.${P}"
done
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare binding --source "${P}.errors.exchange" --destination "errors.queue" \
--destination-type queue --vhost datahub --routing-key "#"
2025-11-14 16:04:05 +08:00
```
### 3. 创建用户并配置权限
```bash
2026-05-15 08:48:24 +08:00
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare user --name "user_${P}" --password "<secure_password>" --tags ""
rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \
declare permissions --vhost datahub --user "user_${P}" \
--configure "^${P}\\.(exchange|errors\\.exchange)\$" \
--write "^${P}\\.(exchange|errors\\.exchange)\$" \
--read "^${P}\\.errors\\..*\$"
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
### 4. 同步到 backend
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
1. **DB**`INSERT INTO platforms (name, enabled, ...) VALUES ('<Platform Display Name>', true, ...)`,确保 `mq_user.php` 动态读取时能匹配到
2. **env**:在 backend 的 `.env`(或 host 端 `datahub-backend.env`)追加 `MQ_PASSWORD_<PLATFORM_UPPER>=<secure_password>`
3. **重启 backend**`systemctl --user restart datahub-backend`,让 Hyperf worker 重新加载 mq_user.php 与 env
### 5. 提供给平台开发者
2025-11-14 16:04:05 +08:00
```
RabbitMQ 连接信息:
- Host: <rabbitmq-host>
- Port: 5672
2026-05-15 08:48:24 +08:00
- VHost: datahub
- Username: user_<platform>
2025-11-14 16:04:05 +08:00
- Password: <secure_password>
发布配置:
2026-05-15 08:48:24 +08:00
- Exchange: <platform>.exchange
2025-11-14 16:04:05 +08:00
- Routing Keys:
2026-05-15 08:48:24 +08:00
- order.<platform> → 发布订单数据
- product.<platform> → 发布产品数据
- refund.<platform> → 发布退款数据
- inventory.<platform> → 发布库存数据
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
错误投递(可选):
- Exchange: <platform>.errors.exchange
- Routing Key: 任意(由 binding `#` 兜底)
- 用途:平台侧主动上报无法处理的消息 / 校验失败的数据
2025-11-14 16:04:05 +08:00
消息格式规范:
2026-05-15 08:48:24 +08:00
- 参见「消息格式规范」章节(5 段 message_idmeta 字段单数;data_version 时间戳防乱序)
2025-11-14 16:04:05 +08:00
```
---
## 附录
2026-05-15 08:48:24 +08:00
### A. 完整资源清单(与本地 MQ 实测对齐 — 2026-05-14
2025-11-14 16:04:05 +08:00
#### VHost
2026-05-15 08:48:24 +08:00
- **总数**1 个
- `datahub`
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### Exchanges
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
| 类别 | 数量 | 名称 |
|---|---|---|
| 主路由 | 1 | `main.exchange`topic|
| 全局错误 | 1 | `errors.exchange`topic|
| DLX | 4 | `dlx.orders` / `dlx.products` / `dlx.refunds` / `dlx.inventory`(全 topic|
| 平台业务 | 19 | `{platform}.exchange` × 19(全 topic|
| 平台错误 | 19 | `{platform}.errors.exchange` × 19(全 topic|
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
**Exchange 业务总数****44 个**(不含 `amq.*` 系统 exchange 7 个)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### Queues
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
| 类别 | 数量 | 名称 + 关键 arguments |
|---|---|---|
| 业务队列 | 4 | `{dtype}.queue`ttl=24h, dl-exchange=`dlx.{dtype}`, dl-rk=`retry`, type=classic |
| 重试队列 | 4 | `{dtype}.retry.queue`ttl=5s, dl-exchange=`main.exchange`, dl-rk=`{entity_singular}.retry`, type=classic |
| 错误队列 | 1 | `errors.queue`ttl=7d, 无 DLX, type=classic |
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
**Queue 总数****9 个**(与 `php ./bin/hyperf.php app:mq:status` 输出一致)
2025-11-14 16:04:05 +08:00
#### Bindings
2026-05-15 08:48:24 +08:00
| 类别 | 数量 | 路径 |
|---|---|---|
| 平台业务 → 业务队列 | 76 | `{platform}.exchange` —rk=`{entity}.{platform}`→ `{dtype}.queue`19 × 4|
| main → 业务队列 | 4 | `main.exchange` —rk=`{entity}.#`→ `{dtype}.queue` |
| DLX → 重试队列 | 4 | `dlx.{dtype}` —rk=`retry`→ `{dtype}.retry.queue`**仅此一种****不**绑定 errors.queue|
| 错误 exchange → errors.queue | 20 | `errors.exchange` 1 条 + `{platform}.errors.exchange` × 19 条,全部 rk=`#` |
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
**Binding 业务总数****104 条**(不含 default exchange 自动同名 binding 9 条)
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
> 死信回流路径(retry → main)不是 binding,而是 retry queue 的 `x-dead-letter-exchange` arguments 字段,RabbitMQ 引擎在 TTL 到期时自动投递。
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
#### Usersvhost `datahub`
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
| 类别 | 数量 | 命名 | 权限说明 |
|---|---|---|---|
| 平台开发者 | 19 | `user_{platform}` | 仅写自己的 `{platform}.exchange` / `{platform}.errors.exchange`,读自己的 errors(最小权限)|
| Datahub 消费者 | 1 | `user_datahub_consumer` | configure/write/read 见 §"消费者账号"现网正则 |
| 运维监控 | 1 | `user_ops` | 仅读 `errors.queue` |
| 管理员 | 1 | `admin` | tag=administrator,全权限 |
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
**User 总数****22 个**
2025-11-14 16:04:05 +08:00
2026-05-15 08:48:24 +08:00
完整平台用户清单:
```
user_amazon_japan, user_coupang, user_dewu, user_douyin, user_goofish,
user_jd, user_kaola, user_lazada, user_naver, user_offline,
user_pdd, user_rakuten, user_redbook, user_shopee, user_shopify,
user_tmall, user_wechat, user_wechat_video, user_youzan
```
2025-11-14 16:04:05 +08:00
#### 架构统计
| 资源类型 | 数量 | 说明 |
|---------|------|------|
2026-05-15 08:48:24 +08:00
| VHost | 1 | `datahub` |
| Exchange | 44 | 1 main + 1 全局 errors + 4 DLX + 19 平台业务 + 19 平台错误 |
| Queue | 9 | 4 业务 + 4 retry + 1 errors |
| Binding | 104 | 见上表(不含 default exchange 自动 binding|
| User | 22 | 19 平台 + 1 消费者 + 1 运维 + 1 管理员 |
**消息流转路径(双层兜底)**
2025-11-14 16:04:05 +08:00
```
2026-05-15 08:48:24 +08:00
生产者 → {platform}.exchange ──rk={entity}.{platform}──► {dtype}.queue ──► Consumer
消费失败 (count < max) ↓
◄──── NACK
dlx.{dtype} ──rk=retry──► {dtype}.retry.queue
│ TTL=5s
main.exchange (rk={entity}.retry)
{dtype}.queue (重新消费, count+1)
消费失败 (count >= max)
FailedMessageTrait::sendToErrorQueue
┌───────────────────────┼───────────────────────┐
▼ ▼
ErrorProducer ──► errors.exchange ──► errors.queue FailedMessage::create
(MQ 兜底, TTL 7d, user_ops 捞回) (DB 兜底, failed_messages 表)
Consumer 返回 ACK
(避免主队列继续重试)
2025-11-14 16:04:05 +08:00
```
### B. 参考资料
- [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html)
- [RabbitMQ Management Plugin](https://www.rabbitmq.com/management.html)
- [Access Control (Authentication, Authorization)](https://www.rabbitmq.com/access-control.html)
- [rabbitmqadmin 使用指南](https://www.rabbitmq.com/management-cli.html)