Files
datahub/docs/RabbitMQ.md
T

1370 lines
44 KiB
Markdown
Raw Normal View History

2025-11-14 16:04:05 +08:00
# RabbitMQ 配置方案
## 概述
本文档描述 dataflow 应用的 RabbitMQ 消息队列架构设计和配置方法。
### 业务背景
- **应用角色**:dataflow 是消息消费者,负责接收来自多个电商平台的数据并入库
- **数据来源**:多个电商平台(DouYin、JD、Tmall、RedBook、Amazon Japan、Naver、Rakuten、Shopify 等)
- **数据类型**:订单(orders)、产品(products)、退款(refunds)、库存(inventory
- **隔离需求**:不同平台开发者之间的工作空间需要相互隔离,互不影响
### 架构特点
- **VHost 隔离**:按应用划分 VHost,支持多业务复用
- **Exchange 隔离**:每个平台独立 Exchange,通过权限控制访问
- **单队列设计**:每种数据类型一个队列,所有平台共享(orders.queue/products.queue/refunds.queue/inventory.queue),保证严格 FIFO 顺序
- **独立消费者**:每种数据类型配备独立消费者,采用批处理 + prefetch + 适配器模式,提升吞吐与稳定性
- **智能重试**:DLX(死信交换机)+ 延迟重试队列 + 错误队列,优雅处理失败消息,不阻塞主队列
- **消息持久化**RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API
---
## 设计方案总结
### 核心设计理念
本方案采用 **Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式**,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。
### 关键设计决策
#### 1. 为什么使用单队列?
| 问题 | 传统多队列方案 | 本方案(单队列) |
|------|--------------|----------------|
| **队列数量** | 8平台 × 4数据类型 = 32个队列 | 4个数据类型队列(队列数量减少 87.5%)|
| **FIFO 保证** | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 |
| **运维复杂度** | 需要监控 32 个队列 | 只需监控 4 个主队列 |
| **消费者部署** | 可能需要 32 个消费者实例 | 只需 4 个消费者实例 |
**原因**:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。
#### 2. 为什么使用单消费者?
**原因**:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。
**优势**
- ✅ 保证严格 FIFO 顺序
- ✅ 避免消息错投到错误的消费者
- ✅ 简化部署和运维
**如何实现多平台支持?** 通过消费者内部的**适配器模式**,根据消息的 `platform` 字段分发到不同业务逻辑。
#### 3. 为什么使用 Prefetch + 批处理?
**原因**RabbitMQ 是 **Push 模式**,不提供 Kafka 式的批量拉取 API。
**解决方案**
- 设置 `prefetch_count=100`RabbitMQ 最多推送 100 条未确认消息
- 消费者自行缓存消息,达到 `batch_size=50``timeout=3s` 后批量处理
- 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性
**优势**
- ✅ 提升数据库写入效率(批量插入)
- ✅ 控制并发和内存压力
- ✅ 失败消息不影响其他消息(单条 ACK)
#### 4. 为什么使用 DLX + 延迟重试队列?
**问题**:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。
**传统方案的问题**
-`nack(requeue=true)`:消息回队列头部,无限循环
- ❌ 直接丢弃:数据丢失
- ❌ 立即重试:打爆 API/DB
**本方案(DLX + 延迟重试)**
```
消费失败 → nack(requeue=false) → 进入 DLX
x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列
x-retries >= 3 → 错误队列 (人工处理)
```
**优势**
- ✅ 不会卡 FIFO
- ✅ 不会无限重试
- ✅ 延迟重试避免打爆 API/DB
- ✅ 错误消息自动隔离
- ✅ 无需额外 Redis 或手写 retry 逻辑
### 方案优势总结
| 维度 | 优势 |
|------|------|
| **可靠性** | 消息持久化 + 单条 ACK + 智能重试,避免数据丢失 |
| **性能** | Prefetch + 批处理 + 数据库批量写入,提升吞吐 |
| **顺序性** | 单队列 FIFO 保证严格顺序 |
| **扩展性** | 适配器模式,新增平台只需添加适配器类 |
| **运维** | 队列数量少,监控简单,故障排查容易 |
| **容错** | DLX 智能重试,失败消息不阻塞主队列 |
### 适用场景
**适合**
- 多平台数据同步(电商、物流、金融等)
- 需要严格 FIFO 顺序
- 需要高可靠性和高吞吐
- 平台数量多(> 5个),数据类型固定(< 10个)
**不适合**
- 实时性要求极高(< 100ms
- 需要按平台独立控制消费速率
- 平台数量少(< 3个),可以使用独立队列
---
## 架构设计
### 1. VHost 设计
VHost 按业务应用进行划分,便于资源隔离和权限管理。
```
RabbitMQ 实例
├── dataflow-app # Dataflow 应用专用
├── analytics-app # 分析应用(预留)
└── warehouse-app # 数据仓库应用(预留)
```
**当前需要创建的 VHost**
- `dataflow-app`:用于电商数据流转
---
### 2. Exchange 设计
`dataflow-app` VHost 中,Exchange 分为两类:
#### 业务 Exchange(用于数据投递)
每个平台一个独立的 Topic Exchange
| Exchange 名称 | 类型 | 持久化 | 用途 |
|--------------|------|--------|------|
| `douyin.exchange` | Topic | 是 | DouYin(抖音)平台数据入口 |
| `jd.exchange` | Topic | 是 | JD(京东)平台数据入口 |
| `tmall.exchange` | Topic | 是 | Tmall(天猫)平台数据入口 |
| `redbook.exchange` | Topic | 是 | RedBook(小红书)平台数据入口 |
| `amazon.exchange` | Topic | 是 | Amazon Japan 平台数据入口 |
| `naver.exchange` | Topic | 是 | Naver 平台数据入口 |
| `rakuten.exchange` | Topic | 是 | Rakuten(乐天)平台数据入口 |
| `shopify.exchange` | Topic | 是 | Shopify 平台数据入口 |
**Routing Key 规范**(采用 `{data_type}.{platform}` 格式):
| 平台 | 订单 | 产品 | 退款 | 库存 |
|------|------|------|------|------|
| DouYin | `order.douyin` | `product.douyin` | `refund.douyin` | `inventory.douyin` |
| JD | `order.jd` | `product.jd` | `refund.jd` | `inventory.jd` |
| Tmall | `order.tmall` | `product.tmall` | `refund.tmall` | `inventory.tmall` |
| RedBook | `order.redbook` | `product.redbook` | `refund.redbook` | `inventory.redbook` |
| Amazon | `order.amazon` | `product.amazon` | `refund.amazon` | `inventory.amazon` |
| Naver | `order.naver` | `product.naver` | `refund.naver` | `inventory.naver` |
| Rakuten | `order.rakuten` | `product.rakuten` | `refund.rakuten` | `inventory.rakuten` |
| Shopify | `order.shopify` | `product.shopify` | `refund.shopify` | `inventory.shopify` |
**设计说明**
- 使用 Topic Exchange 的通配符特性,队列可通过 `order.#` 匹配所有平台的订单
- 保证单队列内所有平台消息的 FIFO 顺序
- 消费者通过解析 routing key 或消息体的 platform 字段进行业务分发
#### 错误 Exchange(用于错误通知)
每个平台一个独立的错误 Topic Exchange
| Exchange 名称 | 类型 | 持久化 | 用途 |
|--------------|------|--------|------|
| `douyin.errors.exchange` | Topic | 是 | DouYin 平台错误消息 |
| `jd.errors.exchange` | Topic | 是 | JD 平台错误消息 |
| `tmall.errors.exchange` | Topic | 是 | Tmall 平台错误消息 |
| `redbook.errors.exchange` | Topic | 是 | RedBook 平台错误消息 |
| `amazon.errors.exchange` | Topic | 是 | Amazon Japan 平台错误消息 |
| `naver.errors.exchange` | Topic | 是 | Naver 平台错误消息 |
| `rakuten.errors.exchange` | Topic | 是 | Rakuten 平台错误消息 |
| `shopify.errors.exchange` | Topic | 是 | Shopify 平台错误消息 |
**Routing Key 规范**
- `validation` - 数据格式验证错误
- `processing_failed` - 业务处理失败
- `system` - 系统错误
---
### 3. Queue 设计
#### 业务队列(主队列)
物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | DLX | 用途 |
|-----------|--------|----------|----------|-----|------|
| `orders.queue` | 是 | 否 | 24小时 | `dlx.orders` | 接收所有平台的订单数据,单消费者批处理 |
| `products.queue` | 是 | 否 | 24小时 | `dlx.products` | 接收所有平台的产品数据,单消费者批处理 |
| `refunds.queue` | 是 | 否 | 24小时 | `dlx.refunds` | 接收所有平台的退款数据,单消费者批处理 |
| `inventory.queue` | 是 | 否 | 24小时 | `dlx.inventory` | 接收所有平台的库存数据,单消费者批处理 |
**队列参数配置**
```json
{
"x-message-ttl": 86400000,
"x-dead-letter-exchange": "dlx.orders",
"x-dead-letter-routing-key": "retry"
}
```
#### 死信交换机(DLX
每种数据类型配备一个 DLX,用于处理失败消息的路由:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|--------------|------|--------|------|
| `dlx.orders` | Topic | 是 | 订单失败消息路由 |
| `dlx.products` | Topic | 是 | 产品失败消息路由 |
| `dlx.refunds` | Topic | 是 | 退款失败消息路由 |
| `dlx.inventory` | Topic | 是 | 库存失败消息路由 |
#### 延迟重试队列
为每种数据类型提供延迟重试机制:
| Queue 名称 | 持久化 | 消息 TTL | DLX | 用途 |
|-----------|--------|----------|-----|------|
| `orders.retry.queue` | 是 | 5秒 | `main.exchange` | 订单延迟重试(TTL后自动回到主队列)|
| `products.retry.queue` | 是 | 5秒 | `main.exchange` | 产品延迟重试 |
| `refunds.retry.queue` | 是 | 5秒 | `main.exchange` | 退款延迟重试 |
| `inventory.retry.queue` | 是 | 5秒 | `main.exchange` | 库存延迟重试 |
**重试队列参数配置**
```json
{
"x-message-ttl": 5000,
"x-dead-letter-exchange": "main.exchange",
"x-dead-letter-routing-key": "order.{platform}"
}
```
**重试机制说明**
1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX
2. DLX 根据消息 header 中的 `x-retries` 判断路由:
- `x-retries < 3`:路由到 `retry.queue`(延迟重试)
- `x-retries >= 3`:路由到 `errors.queue`(永久失败)
3. 重试队列 TTL 到期后,消息自动死信回主 Exchange,重新进入主队列
4. 最大重试次数:**3次**,延迟时间:**5秒**
#### 错误队列
统一的错误队列,接收所有超过重试次数的失败消息:
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | 用途 |
|-----------|--------|----------|----------|------|
| `errors.queue` | 是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 |
---
### 4. Binding 配置
#### 主 Exchange (用于接收各平台数据)
需要创建一个统一的主 Exchange,用于重试队列的消息回流:
| Exchange 名称 | 类型 | 持久化 | 用途 |
|--------------|------|--------|------|
| `main.exchange` | Topic | 是 | 统一主交换机,用于路由和重试消息回流 |
#### 业务队列绑定(主队列)
**方案1:直接绑定(推荐)**
每个平台的 Exchange 使用带平台标识的 routing key 绑定到对应数据类型队列:
```
douyin.exchange
├── [routing_key: order.douyin] → orders.queue
├── [routing_key: product.douyin] → products.queue
├── [routing_key: refund.douyin] → refunds.queue
└── [routing_key: inventory.douyin] → inventory.queue
tmall.exchange
├── [routing_key: order.tmall] → orders.queue
├── [routing_key: product.tmall] → products.queue
├── [routing_key: refund.tmall] → refunds.queue
└── [routing_key: inventory.tmall] → inventory.queue
... (其他平台类似)
```
**方案2:通过主 Exchange(备选)**
平台 Exchange 先发到主 Exchange,再由主 Exchange 统一路由:
```
douyin.exchange → [routing_key: order.douyin] → main.exchange
main.exchange
├── [routing_key: order.#] → orders.queue
├── [routing_key: product.#] → products.queue
├── [routing_key: refund.#] → refunds.queue
└── [routing_key: inventory.#] → inventory.queue
```
**推荐使用方案1**,简化架构,减少消息跳转。
#### DLX 绑定(重试路由)
每个 DLX 根据消息 header 中的 `x-retries` 决定路由目标:
```
dlx.orders
├── [routing_key: retry, x-retries < 3] → orders.retry.queue
└── [routing_key: error, x-retries >= 3] → errors.queue
dlx.products
├── [routing_key: retry] → products.retry.queue
└── [routing_key: error] → errors.queue
dlx.refunds
├── [routing_key: retry] → refunds.retry.queue
└── [routing_key: error] → errors.queue
dlx.inventory
├── [routing_key: retry] → inventory.retry.queue
└── [routing_key: error] → errors.queue
```
**注意**RabbitMQ 原生不支持基于 header 的条件路由,需要:
- **方案A**:消费者在 `nack` 时根据重试次数决定发送到 DLX 的 routing key`retry``error`
- **方案B**:使用 RabbitMQ 插件(如 `rabbitmq_message_routing_by_headers`)实现条件路由
**推荐方案A**:消费者控制重试逻辑,更灵活可控。
#### 重试队列绑定(回流主队列)
重试队列 TTL 到期后,消息自动死信回主 Exchange:
```
orders.retry.queue
└── [TTL=5s后 死信到 main.exchangerouting_key保持原始值:order.{platform}]
main.exchange
└── [routing_key: order.#] → orders.queue
```
这样实现了"失败 → 延迟5秒 → 重新进入主队列"的循环。
#### 错误队列绑定
每个平台的错误 Exchange 和所有 DLX 都绑定到统一错误队列:
```
douyin.errors.exchange
└── [routing_key: #] → errors.queue
dlx.orders
└── [routing_key: error] → errors.queue
dlx.products
└── [routing_key: error] → errors.queue
... (其他类型类似)
```
**说明**:使用 `#` 通配符接收所有错误类型。
---
## 消费者设计
### 1. 消费者模型
采用 **单消费者 + 批处理 + prefetch + 适配器模式**
| 数据类型 | 消费者 | Queue | 并发模式 | 处理模式 |
|---------|--------|-------|----------|---------|
| 订单 | Order Consumer | `orders.queue` | 单消费者 | 批处理 + 适配器 |
| 产品 | Product Consumer | `products.queue` | 单消费者 | 批处理 + 适配器 |
| 退款 | Refund Consumer | `refunds.queue` | 单消费者 | 批处理 + 适配器 |
| 库存 | Inventory Consumer | `inventory.queue` | 单消费者 | 批处理 + 适配器 |
**设计原则**
-**单消费者**:每个队列只有一个消费者实例,保证严格 FIFO 顺序
-**批处理**:积累一批消息后统一处理,提升数据库写入效率
-**单条 ACK**:每条消息独立确认,失败不影响其他消息
-**适配器模式**:消费者内部根据平台字段分发到不同业务逻辑
### 2. Prefetch 配置
```json
{
"prefetch_count": 100,
"batch_size": 50,
"batch_timeout": 3000
}
```
**参数说明**
- `prefetch_count=100`RabbitMQ 最多推送 100 条未确认消息到消费者
- `batch_size=50`:消费者积累 50 条消息后触发批处理
- `batch_timeout=3000`:如果3秒内未达到 batch_size,也触发处理
**工作流程**
1. RabbitMQ 按 Push 模式推送最多 100 条消息到消费者
2. 消费者缓存消息,达到 50 条或超时 3 秒后批量处理
3. 批量处理时,仍然对每条消息单独 ACK/NACK
4. 实现"伪批量拉取"效果,同时保留单条确认的灵活性
### 3. 消费者处理逻辑(伪代码)
```python
class OrderConsumer:
def __init__(self):
self.prefetch_count = 100
self.batch_size = 50
self.batch_timeout = 3
self.message_buffer = []
self.adapters = {
'douyin': DouyinAdapter(),
'tmall': TmallAdapter(),
'jd': JDAdapter(),
# ... 其他平台适配器
}
def on_message(self, channel, method, properties, body):
"""接收到消息时的回调"""
message = json.loads(body)
self.message_buffer.append({
'channel': channel,
'method': method,
'properties': properties,
'body': message
})
# 达到批量大小,触发处理
if len(self.message_buffer) >= self.batch_size:
self.process_batch()
def process_batch(self):
"""批量处理消息"""
for msg_data in self.message_buffer:
try:
message = msg_data['body']
platform = message['platform']
# 使用适配器模式分发到对应平台逻辑
adapter = self.adapters.get(platform)
if not adapter:
raise ValueError(f"Unknown platform: {platform}")
# 处理业务逻辑
adapter.process(message)
# 成功:ACK
msg_data['channel'].basic_ack(msg_data['method'].delivery_tag)
except TemporaryError as e:
# 临时错误:重试
retry_count = msg_data['properties'].headers.get('x-retries', 0)
if retry_count < 3:
# 更新重试次数,发送到 DLX 进行延迟重试
headers = {'x-retries': retry_count + 1}
msg_data['channel'].basic_nack(
msg_data['method'].delivery_tag,
requeue=False
)
else:
# 超过重试次数,发送到错误队列
self.send_to_error_queue(msg_data)
msg_data['channel'].basic_nack(
msg_data['method'].delivery_tag,
requeue=False
)
except PermanentError as e:
# 永久错误:直接进入错误队列
self.send_to_error_queue(msg_data)
msg_data['channel'].basic_nack(
msg_data['method'].delivery_tag,
requeue=False
)
# 清空缓冲区
self.message_buffer.clear()
def send_to_error_queue(self, msg_data):
"""发送到错误队列"""
error_message = {
'original_message': msg_data['body'],
'error': str(e),
'timestamp': datetime.now().isoformat()
}
# 发送到错误 Exchange
self.channel.basic_publish(
exchange='douyin.errors.exchange',
routing_key='error',
body=json.dumps(error_message)
)
```
### 4. 重试机制工作流程
```
┌─────────────────┐
│ orders.queue │ ← 消费者从这里接收消息
└────────┬────────┘
├─ 成功 → basic_ack()
└─ 失败 → basic_nack(requeue=false)
┌──────────────────┐
│ dlx.orders │ ← 死信交换机
└────────┬─────────┘
├─ x-retries < 3 → orders.retry.queue (TTL=5s)
│ │
│ └─ TTL到期 → 自动死信回 main.exchange
│ │
│ └─ order.# → orders.queue
└─ x-retries >= 3 → errors.queue (人工处理)
```
### 5. 为什么这样设计?
| 特性 | 原因 | 优势 |
|------|------|------|
| **单队列 FIFO** | RabbitMQ 无法在队列内部按 topic 过滤 | 保证严格 FIFO,不拆分队列造成业务乱序 |
| **单消费者** | RabbitMQ 同队列多消费者必然 Round-Robin 分配 | 避免消息错投到错误的消费者 |
| **Prefetch + 批处理** | RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 | 实现"伪批量拉取",提升吞吐 |
| **单条 ACK** | 保留细粒度控制 | 失败消息不影响其他消息 |
| **DLX + 延迟重试** | 避免失败消息阻塞队列 | 不会卡 FIFO,不会无限重试 |
| **适配器模式** | 多平台业务逻辑隔离 | 扩展性好,易维护 |
---
## 用户权限配置
### 1. 平台开发者账号
每个平台分配一个专用账号,只能访问自己的 Exchange。
#### DouYin 平台账号
```
用户名: user_douyin
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: (留空) - 不允许创建/删除资源
- Write: ^douyin\.(exchange|errors\.exchange)$ - 只能写入 DouYin 的业务和错误 Exchange
- Read: ^douyin\.errors\..*$ - 只能读取 DouYin 错误相关的队列
```
#### Tmall 平台账号
```
用户名: user_tmall
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: (留空)
- Write: ^tmall\.(exchange|errors\.exchange)$
- Read: ^tmall\.errors\..*$
```
#### 其他平台账号
按相同模式创建:
- JD: `user_jd`
- RedBook: `user_redbook`
- Amazon Japan: `user_amazon`
- Naver: `user_naver`
- Rakuten: `user_rakuten`
- Shopify: `user_shopify`
### 2. Dataflow 消费者账号
Dataflow 应用的后端服务账号,负责消费业务队列并处理错误。
```
用户名: user_dataflow_consumer
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: ^(orders|products|refunds|inventory)\.queue$ - 可以管理业务队列
- Write: .*\.errors\.exchange$ - 可以向所有错误 Exchange 写入消息
- Read: ^(orders|products|refunds|inventory)\.queue$ - 可以读取所有业务队列
```
### 3. 运维监控账号
运维团队账号,用于监控所有错误消息。
```
用户名: user_ops
密码: <安全密码>
VHost: dataflow-app
权限配置:
- Configure: ^errors\..*$ - 可以管理错误相关资源
- Write: (留空) - 不需要写入权限
- Read: ^errors\.queue$ - 可以读取统一错误队列
```
### 4. 管理员账号
RabbitMQ 超级管理员,用于配置和维护。
```
用户名: admin
密码: <强密码>
Tag: administrator
权限配置:
- 对所有 VHost 具有完全权限
- 可以访问管理界面
```
---
## 消息格式规范
### 1. message_id 设计
message_id 采用结构化格式,确保幂等性和可追溯性。
**格式**
```
{prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id}
```
**字段说明**
- `prefix`:项目名称前缀(如 `wpic-project1`
- `app_id`:应用标识(固定为 `dataflow`
- `company_id`:公司 ID
- `platform_id`:平台 ID
- `store_id`:店铺 ID
- `entity_type`:数据实体类型(order/product/refund/inventory
- `entity_id`:平台侧唯一标识(如订单号)
**分隔符说明**
- 使用 `#` 作为分隔符(RabbitMQ message_id 字段支持任意字符)
- `#` 在 routing key 中是通配符,但在 message_id 中是普通字符
**示例**
```
wpic-project1#dataflow#100#20#200#order#DY123456789
解析:
- 项目前缀: wpic-project1
- 应用: dataflow
- 公司ID: 100
- 平台ID: 20 (DouYin)
- 店铺ID: 200
- 实体类型: order
- 平台订单号: DY123456789
```
**特性**
- ✅ 幂等性:同一实体的 message_id 完全一致
- ✅ 可读性:直接看出数据来源
- ✅ 便于调试:可从 message_id 提取元数据
- ✅ 无冲突:全局唯一
### 2. 业务消息格式
```json
{
"message_id": "wpic-project1#dataflow#100#20#200#order#DY123456",
"timestamp": "2025-01-14T10:30:00Z",
"platform": "douyin",
"data_type": "order",
"metadata": {
"platform_id": 20,
"company_id": 100,
"store_id": 200,
"source_system": "douyin-open-api",
"retry_count": 0,
"data_version": 1736840000
},
"data": {
"platform_unique_id": "DY123456",
"raw_data": {
// 平台原始完整数据
}
}
}
```
**字段说明**
| 字段 | 类型 | 必需 | 说明 |
|------|------|------|------|
| `message_id` | string | 是 | 结构化消息 ID,用于幂等性 |
| `timestamp` | string | 是 | 消息生成时间(ISO 8601 格式) |
| `platform` | string | 是 | 平台标识(douyin/tmall/jd... |
| `data_type` | string | 是 | 数据类型(order/product/refund/inventory |
| `metadata.platform_id` | int | 是 | 平台 ID(数据库主键) |
| `metadata.company_id` | int | 是 | 公司 ID |
| `metadata.store_id` | int | 是 | 店铺 ID |
| `metadata.source_system` | string | 是 | 数据来源系统 |
| `metadata.retry_count` | int | 是 | 重试次数(初始为 0) |
| `metadata.data_version` | int | 是 | 数据版本(Unix 时间戳,用于防止乱序更新) |
| `data.platform_unique_id` | string | 是 | 平台侧唯一标识 |
| `data.raw_data` | object | 是 | 平台原始完整 JSON 数据 |
**重要说明**
- **data_version**:必须使用平台数据的更新时间戳(Unix 时间戳),用于解决消息乱序消费问题
- 示例:平台订单的最后更新时间 `1736840000`2025-01-14 10:00:00 UTC
- 消费端只有当 `data_version` 大于数据库中的值时才更新数据
- 如果平台未提供更新时间,使用消息生成时间戳
### 3. 错误消息格式
错误消息由 dataflow 消费者生成,包含原始消息和错误详情。
```json
{
"error_id": "err_1234567890abcdef",
"original_message": {
// 原始业务消息完整内容
},
"error": {
"type": "validation",
"message": "Missing required field: total_amount",
"trace": "Exception stack trace...",
"timestamp": "2025-01-14T10:31:00Z"
},
"metadata": {
"platform": "amazon",
"platform_id": 1,
"company_id": 100,
"store_id": 200,
"data_type": "order",
"failed_at": "2025-01-14T10:31:00Z"
}
}
```
---
## RabbitMQ 管理界面操作指引
### 1. 访问管理界面
```
URL: http://<rabbitmq-host>:15672
用户名: admin
密码: <管理员密码>
```
### 2. 创建 VHost
1. 点击顶部导航 **Admin****Virtual Hosts**
2. 点击 **Add a new virtual host**
3. 输入 VHost 名称:`dataflow-app`
4. 点击 **Add virtual host**
### 3. 创建用户
1. 点击顶部导航 **Admin****Users**
2. 点击 **Add a user**
3. 填写用户信息:
- **Username**: `user_amazon`
- **Password**: `<安全密码>`
- **Tags**: (留空,非管理员用户)
4. 点击 **Add user**
### 4. 配置用户权限
1. 在用户列表中,点击用户名(如 `user_amazon`
2. 滚动到 **Permissions** 部分
3. 选择 VHost`dataflow-app`
4. 配置权限正则表达式:
- **Configure regexp**: (留空)
- **Write regexp**: `^amazon\.(exchange|errors\.exchange)$`
- **Read regexp**: `^amazon\.errors\..*$`
5. 点击 **Set permission**
### 5. 创建 Exchange
1. 点击顶部导航 **Exchanges**
2. 确保选择正确的 VHost`dataflow-app`
3. 点击 **Add a new exchange**
4. 填写配置:
- **Name**: `amazon.exchange`
- **Type**: `topic`
- **Durability**: `Durable`
- **Auto delete**: `No`
- **Internal**: `No`
5. 点击 **Add exchange**
重复以上步骤创建所有业务 Exchange 和错误 Exchange。
### 6. 创建 Queue
1. 点击顶部导航 **Queues**
2. 确保选择正确的 VHost`dataflow-app`
3. 点击 **Add a new queue**
4. 填写配置:
- **Name**: `orders.queue`
- **Durability**: `Durable`
- **Auto delete**: `No`
- **Arguments**:
- `x-message-ttl`: `86400000` (24小时,单位:毫秒)
5. 点击 **Add queue**
重复以上步骤创建 `products.queue``refunds.queue``errors.queue`
**注意**`errors.queue` 的 TTL 设置为 `604800000`7天)。
### 7. 创建 Binding
1. 点击顶部导航 **Exchanges**
2. 点击需要绑定的 Exchange(如 `amazon.exchange`
3. 滚动到 **Bindings** 部分
4.**Add binding from this exchange** 区域填写:
- **To queue**: `orders.queue`
- **Routing key**: `order`
5. 点击 **Bind**
为每个 Exchange 创建 3 个绑定(order/product/refund)。
对于错误 Exchange,绑定配置:
- **To queue**: `errors.queue`
- **Routing key**: `#`
---
## 配置脚本
为简化配置过程,可以使用以下脚本自动创建资源。
### 使用 rabbitmqadmin
RabbitMQ 提供命令行工具 `rabbitmqadmin`,可用于批量配置。
#### 安装 rabbitmqadmin
```bash
# 从 RabbitMQ 管理界面下载
wget http://<rabbitmq-host>:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/
```
#### 配置脚本示例
```bash
#!/bin/bash
# RabbitMQ 连接信息
RABBITMQ_HOST="localhost:15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin_password"
VHOST="dataflow-app"
# 平台列表
PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify")
DATA_TYPES=("orders" "products" "refunds" "inventory")
# 1. 创建 VHost
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare vhost name=$VHOST
# 2. 创建主 Exchange(用于重试消息回流)
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="main.exchange" vhost=$VHOST \
type=topic durable=true
# 3. 创建 DLX (死信交换机)
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="dlx.${dtype}" vhost=$VHOST \
type=topic durable=true
done
# 4. 创建主业务队列(带 DLX 配置)
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue name="${dtype}.queue" vhost=$VHOST durable=true \
arguments="{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
done
# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
for dtype in "${DATA_TYPES[@]}"; do
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue name="${dtype}.retry.queue" vhost=$VHOST durable=true \
arguments="{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}"
done
# 6. 创建错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare queue name="errors.queue" vhost=$VHOST durable=true \
arguments='{"x-message-ttl":604800000}'
# 7. 绑定主 Exchange 到主队列(使用通配符)
for dtype in "${DATA_TYPES[@]}"; do
# 使用 order.#, product.# 等通配符匹配所有平台
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="main.exchange" destination="${dtype}.queue" \
vhost=$VHOST routing_key="${dtype%s}.#"
done
# 8. 绑定 DLX 到重试队列和错误队列
for dtype in "${DATA_TYPES[@]}"; do
# DLX -> 重试队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="dlx.${dtype}" destination="${dtype}.retry.queue" \
vhost=$VHOST routing_key="retry"
# DLX -> 错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="dlx.${dtype}" destination="errors.queue" \
vhost=$VHOST routing_key="error"
done
# 9. 为每个平台创建 Exchange 和 Binding
for platform in "${PLATFORMS[@]}"; do
# 创建平台业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="${platform}.exchange" vhost=$VHOST \
type=topic durable=true
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="orders.queue" \
vhost=$VHOST routing_key="order.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="products.queue" \
vhost=$VHOST routing_key="product.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="refunds.queue" \
vhost=$VHOST routing_key="refund.${platform}"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.exchange" destination="inventory.queue" \
vhost=$VHOST routing_key="inventory.${platform}"
# 创建平台错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="${platform}.errors.exchange" vhost=$VHOST \
type=topic durable=true
# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="${platform}.errors.exchange" destination="errors.queue" \
vhost=$VHOST routing_key="#"
# 创建平台用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_${platform}" password="change_me_${platform}" tags=""
# 配置平台用户权限
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=$VHOST user="user_${platform}" \
configure="" write="^${platform}\.(exchange|errors\.exchange)$" \
read="^${platform}\.errors\..*$"
done
# 10. 创建 dataflow 消费者用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_dataflow_consumer" password="change_me_consumer" tags=""
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=$VHOST user="user_dataflow_consumer" \
configure="^(orders|products|refunds|inventory).*\.queue$" \
write="(dlx\..*)|(.*\.errors\.exchange)$" \
read="^(orders|products|refunds|inventory).*\.queue$"
# 11. 创建运维监控用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_ops" password="change_me_ops" tags=""
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=$VHOST user="user_ops" \
configure="^errors\..*$" write="" read="^errors\.queue$"
echo "RabbitMQ 配置完成!"
echo ""
echo "已创建:"
echo "- 1 个 VHost: dataflow-app"
echo "- 1 个主 Exchange: main.exchange"
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
echo "- 1 个错误队列: errors.queue"
echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)"
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
```
#### 执行脚本
```bash
chmod +x setup_rabbitmq.sh
./setup_rabbitmq.sh
```
---
## 监控和维护
### 1. 监控指标
通过 RabbitMQ 管理界面查看以下关键指标:
#### 队列健康状态
1. 点击 **Queues** → 选择队列
2. 关注以下指标:
- **Ready**: 待消费消息数(正常应 < 1000)
- **Unacked**: 未确认消息数(正常应较低)
- **Total**: 总消息数
- **Incoming rate**: 消息入队速率
- **Deliver / Get rate**: 消息消费速率
**告警阈值**
- Ready > 10000:队列堆积严重
- Unacked > 5000:消费者处理缓慢
- Incoming rate >> Deliver rate:消费速度跟不上生产速度
#### Exchange 流量统计
1. 点击 **Exchanges** → 选择 Exchange
2. 查看 **Message rates**
- **Publish in**: 消息发布速率
- **Publish out**: 消息路由到队列的速率
**异常情况**
- Publish in > 0 但 Publish out = 0:可能绑定配置错误
### 2. 常见问题排查
#### 消息未到达队列
**检查步骤**
1. 确认 Exchange 存在且类型正确
2. 检查 Binding 配置,确保 routing key 匹配
3. 查看 Exchange 的 **Message rates**,确认消息已发布
4. 检查生产者是否使用了正确的 routing key
#### 消费者无法消费
**检查步骤**
1. 确认用户权限配置正确(Read 权限)
2. 检查队列中是否有消息(Ready > 0
3. 查看消费者连接状态(Connections 页面)
4. 检查消费者代码是否正确 ACK 消息
#### 错误消息未送达平台开发者
**检查步骤**
1. 确认错误 Exchange 绑定到 `errors.queue`
2. 检查平台开发者账号的 Read 权限
3. 确认平台开发者订阅了正确的 Exchange
4. 查看 `errors.queue` 中是否有消息
### 3. 备份和恢复
#### 导出配置
```bash
# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
export dataflow-backup.json
```
#### 导入配置
```bash
# 从备份恢复配置
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
import dataflow-backup.json
```
### 4. 性能优化
#### 消费者并发配置
- 增加消费者实例数量
- 每个消费者配置多个 channel(如 5-10 个)
- 使用 prefetch_count 限制未确认消息数量
#### 队列性能优化
- 启用 lazy queue(大量消息堆积时):
```bash
rabbitmqadmin declare queue name=orders.queue vhost=dataflow-app \
durable=true arguments='{"x-queue-mode":"lazy"}'
```
- 定期清理过期消息(通过 TTL 自动实现)
### 5. 日志查看
```bash
# 查看 RabbitMQ 日志
tail -f /var/log/rabbitmq/rabbit@<hostname>.log
# 查看连接日志
tail -f /var/log/rabbitmq/rabbit@<hostname>_connection.log
```
---
## 安全建议
### 1. 密码策略
- 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符)
- 定期更换密码(建议 3-6 个月)
- 不同环境(开发/测试/生产)使用不同密码
### 2. 网络安全
- 生产环境启用 TLS/SSL 加密连接
- 限制管理界面访问(仅允许内网或 VPN 访问)
- 配置防火墙规则:
- 5672:AMQP 端口(仅允许应用服务器访问)
- 15672:管理界面(仅允许管理员 IP 访问)
### 3. 权限最小化原则
- 平台开发者只能访问自己的 Exchange
- 消费者只能读取必要的队列
- 避免赋予不必要的 Configure 权限
### 4. 审计日志
- 启用 RabbitMQ 审计日志插件
- 定期审查用户操作记录
- 监控异常登录行为
---
## 新增平台接入流程
当需要接入新平台(例如 Lazada)时,按以下步骤操作:
### 1. 创建 Exchange
```bash
# 业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="lazada.exchange" vhost=dataflow-app \
type=topic durable=true
# 错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare exchange name="lazada.errors.exchange" vhost=dataflow-app \
type=topic durable=true
```
### 2. 创建 Binding
```bash
# 绑定到业务队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="orders.queue" \
vhost=dataflow-app routing_key="order"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="products.queue" \
vhost=dataflow-app routing_key="product"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="refunds.queue" \
vhost=dataflow-app routing_key="refund"
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.exchange" destination="inventory.queue" \
vhost=dataflow-app routing_key="inventory"
# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare binding source="lazada.errors.exchange" \
destination="errors.queue" vhost=dataflow-app routing_key="#"
```
### 3. 创建用户并配置权限
```bash
# 创建用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare user name="user_lazada" password="<secure_password>" tags=""
# 配置权限
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permission vhost=dataflow-app user="user_lazada" \
configure="" write="^lazada\.(exchange|errors\.exchange)$" \
read="^lazada\.errors\..*$"
```
### 4. 提供给平台开发者
将以下信息提供给新平台的开发团队:
```
RabbitMQ 连接信息:
- Host: <rabbitmq-host>
- Port: 5672
- VHost: dataflow-app
- Username: user_lazada
- Password: <secure_password>
发布配置:
- Exchange: lazada.exchange
- Routing Keys:
- order: 发布订单数据
- product: 发布产品数据
- refund: 发布退款数据
- inventory: 发布库存数据
错误订阅(可选):
- Exchange: lazada.errors.exchange
- 接收本平台的错误消息
消息格式规范:
- 参见「消息格式规范」章节
```
---
## 附录
### A. 完整资源清单
#### VHost
- **总数**: 1 个
- `dataflow-app`
#### Exchanges
**主 Exchange**: 1 个
- `main.exchange` (Topic) - 统一主交换机,用于路由和重试消息回流
**平台业务 Exchanges**: 8 个(每平台 1 个)
- `douyin.exchange`
- `jd.exchange`
- `tmall.exchange`
- `redbook.exchange`
- `amazon.exchange`
- `naver.exchange`
- `rakuten.exchange`
- `shopify.exchange`
**平台错误 Exchanges**: 8 个(每平台 1 个)
- `douyin.errors.exchange`
- `jd.errors.exchange`
- `tmall.errors.exchange`
- `redbook.errors.exchange`
- `amazon.errors.exchange`
- `naver.errors.exchange`
- `rakuten.errors.exchange`
- `shopify.errors.exchange`
**死信交换机 (DLX)**: 4 个(每数据类型 1 个)
- `dlx.orders`
- `dlx.products`
- `dlx.refunds`
- `dlx.inventory`
**Exchange 总数**: **21 个** (1 主 + 8 平台业务 + 8 平台错误 + 4 DLX)
#### Queues
**主业务队列**: 4 个
- `orders.queue` (配置 DLX: dlx.orders)
- `products.queue` (配置 DLX: dlx.products)
- `refunds.queue` (配置 DLX: dlx.refunds)
- `inventory.queue` (配置 DLX: dlx.inventory)
**延迟重试队列**: 4 个
- `orders.retry.queue` (TTL: 5秒, DLX: main.exchange)
- `products.retry.queue` (TTL: 5秒, DLX: main.exchange)
- `refunds.retry.queue` (TTL: 5秒, DLX: main.exchange)
- `inventory.retry.queue` (TTL: 5秒, DLX: main.exchange)
**错误队列**: 1 个
- `errors.queue` (TTL: 7天)
**Queue 总数**: **9 个** (4 主队列 + 4 重试队列 + 1 错误队列)
#### Bindings
**主 Exchange 绑定**: 4 个
- `main.exchange` → `orders.queue` (routing_key: `order.#`)
- `main.exchange` → `products.queue` (routing_key: `product.#`)
- `main.exchange` → `refunds.queue` (routing_key: `refund.#`)
- `main.exchange` → `inventory.queue` (routing_key: `inventory.#`)
**平台业务 Exchange 绑定**: 32 个
- 每个平台 Exchange → 4 个业务队列 (8 × 4 = 32)
- 使用 routing key 格式:`{data_type}.{platform}`
- 例如:`order.douyin`, `product.tmall`, `refund.jd` 等
**平台错误 Exchange 绑定**: 8 个
- 每个平台错误 Exchange → `errors.queue` (routing_key: `#`)
**DLX 绑定**: 8 个
- 每个 DLX → 对应重试队列 (routing_key: `retry`) - 4 个
- 每个 DLX → `errors.queue` (routing_key: `error`) - 4 个
**Binding 总数**: **52 个** (4 主 + 32 平台业务 + 8 平台错误 + 8 DLX)
#### Users
**平台开发者账号**: 8 个
- `user_douyin`, `user_jd`, `user_tmall`, `user_redbook`, `user_amazon`, `user_naver`, `user_rakuten`, `user_shopify`
- 权限:只能写入自己的业务和错误 Exchange,只能读取自己的错误队列
**消费者账号**: 1 个
- `user_dataflow_consumer`
- 权限:可以管理所有业务队列和重试队列,可以写入 DLX 和错误 Exchange,可以读取所有业务队列
**运维监控账号**: 1 个
- `user_ops`
- 权限:可以管理错误相关资源,可以读取错误队列
**管理员账号**: 1 个
- `admin`
- 权限:完全权限,可以访问管理界面
**User 总数**: **11 个** (8 平台 + 1 消费者 + 1 运维 + 1 管理员)
#### 架构统计
| 资源类型 | 数量 | 说明 |
|---------|------|------|
| VHost | 1 | dataflow-app |
| Exchange | 21 | 1 主 + 8 平台业务 + 8 平台错误 + 4 DLX |
| Queue | 9 | 4 主 + 4 重试 + 1 错误 |
| Binding | 52 | 完整消息流转路径 |
| User | 11 | 多角色权限隔离 |
**消息流转路径**
```
生产者 → 平台 Exchange → 主队列 → 消费者
↓ (失败)
DLX → 重试队列 (5秒TTL) → 主 Exchange → 主队列
↓ (超过3次)
DLX → 错误队列 (人工处理)
```
### B. 参考资料
- [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html)
- [RabbitMQ Management Plugin](https://www.rabbitmq.com/management.html)
- [Access Control (Authentication, Authorization)](https://www.rabbitmq.com/access-control.html)
- [rabbitmqadmin 使用指南](https://www.rabbitmq.com/management-cli.html)