1620 lines
55 KiB
Markdown
1620 lines
55 KiB
Markdown
# RabbitMQ 配置方案
|
||
|
||
## 概述
|
||
|
||
本文档描述 dataflow 应用的 RabbitMQ 消息队列架构设计和配置方法。
|
||
|
||
### 业务背景
|
||
|
||
- **应用角色**:dataflow 是消息消费者,负责接收来自多个电商平台的数据并入库
|
||
- **数据来源**:多个电商平台(DouYin、JD、Tmall、RedBook、Amazon Japan、Naver、Rakuten、Shopify 等)
|
||
- **数据类型**:订单(orders)、产品(products)、退款(refunds)、库存(inventory)
|
||
- **隔离需求**:不同平台开发者之间的工作空间需要相互隔离,互不影响
|
||
|
||
### 架构特点
|
||
|
||
- **VHost 隔离**:按应用划分 VHost,支持多业务复用, 使用 `Classic` Vhost 模式
|
||
- **Exchange 隔离**:每个平台独立 Exchange,通过权限控制访问
|
||
- **单队列设计**:每种数据类型一个队列,所有平台共享(orders.queue/products.queue/refunds.queue/inventory.queue),保证严格 FIFO 顺序
|
||
- **独立消费者**:每种数据类型配备独立消费者,采用批处理 + prefetch + 适配器模式,提升吞吐与稳定性
|
||
- **智能重试**:DLX(死信交换机)+ 延迟重试队列 + 错误队列,优雅处理失败消息,不阻塞主队列
|
||
- **消息持久化**:RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API
|
||
|
||
---
|
||
|
||
## 设计方案总结
|
||
|
||
### 核心设计理念
|
||
|
||
本方案采用 **Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式**,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。
|
||
|
||
### 关键设计决策
|
||
|
||
** 注意 **
|
||
1. 消息内部字段无严格要求,但可以在业务侧进行约束
|
||
2. 默认单条消息的最大 Payload 尺寸为 128MB
|
||
3. 消息队列为缓存系统,应该尽可能的将批量数据放入 payload\
|
||
4. MQ 存在的意义在于快速接受远程生产的 Message
|
||
|
||
|
||
#### 1. 为什么使用单队列?
|
||
|
||
| 问题 | 传统多队列方案 | 本方案(单队列) |
|
||
|------|--------------|----------------|
|
||
| **队列数量** | 8平台 × 4数据类型 = 32个队列 | 4个数据类型队列(队列数量减少 87.5%)|
|
||
| **FIFO 保证** | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 |
|
||
| **运维复杂度** | 需要监控 32 个队列 | 只需监控 4 个主队列 |
|
||
| **消费者部署** | 可能需要 32 个消费者实例 | 只需 4 个消费者实例 |
|
||
|
||
**原因**:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。
|
||
|
||
#### 2. 为什么使用单消费者?
|
||
|
||
**原因**:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。
|
||
|
||
**优势**:
|
||
- ✅ 保证严格 FIFO 顺序
|
||
- ✅ 避免消息错投到错误的消费者
|
||
- ✅ 简化部署和运维
|
||
|
||
**如何实现多平台支持?** 通过消费者内部的**适配器模式**,根据消息的 `platform` 字段分发到不同业务逻辑。
|
||
|
||
#### 3. 为什么使用 Prefetch + 批处理?
|
||
|
||
**原因**:RabbitMQ 是 **Push 模式**,不提供 Kafka 式的批量拉取 API。
|
||
|
||
**解决方案**:
|
||
- 设置 `prefetch_count=100`:RabbitMQ 最多推送 100 条未确认消息
|
||
- 消费者自行缓存消息,达到 `batch_size=50` 或 `timeout=3s` 后批量处理
|
||
- 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性
|
||
|
||
**优势**:
|
||
- ✅ 提升数据库写入效率(批量插入)
|
||
- ✅ 控制并发和内存压力
|
||
- ✅ 失败消息不影响其他消息(单条 ACK)
|
||
|
||
#### 4. 为什么使用 DLX + 延迟重试队列?
|
||
|
||
**问题**:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。
|
||
|
||
**传统方案的问题**:
|
||
- ❌ `nack(requeue=true)`:消息回队列头部,无限循环,**永远不会进入 DLX**
|
||
- ❌ 直接丢弃:数据丢失
|
||
- ❌ 立即重试:打爆 API/DB
|
||
|
||
**本方案(DLX + 延迟重试)**:
|
||
```
|
||
消费失败 → nack(requeue=false) → 进入 DLX
|
||
↓
|
||
Consumer检查 x-death count
|
||
↓
|
||
count < 3 → DLX路由到重试队列 (TTL=5s) → 自动死信回主队列
|
||
↓
|
||
count >= 3 → Consumer主动发送到错误队列 (人工处理)
|
||
```
|
||
|
||
**重要说明**:
|
||
- ⚠️ **必须设置 `requeue=false`**:只有 `requeue=false` 才会触发 DLX 机制
|
||
- ⚠️ **`requeue=true` 会导致无限循环**:消息直接回到原队列头部,永远不进入 DLX
|
||
- ✅ **retry 队列自动回流**:TTL 到期后自动死信回主 Exchange,不需要额外消费者
|
||
- ✅ **Consumer 控制重试次数**:检查 `x-death` header 的 count 字段判断重试次数
|
||
|
||
**优势**:
|
||
- ✅ 不会卡 FIFO
|
||
- ✅ 不会无限重试(通过 max retries 限制)
|
||
- ✅ 延迟重试避免打爆 API/DB
|
||
- ✅ 错误消息自动隔离到 error 队列
|
||
- ✅ 无需额外 Redis 或手写 retry 逻辑
|
||
- ✅ retry 过程完全自动化(依赖 RabbitMQ 的 DLX + TTL 机制)
|
||
|
||
### 方案优势总结
|
||
|
||
| 维度 | 优势 |
|
||
|------|------|
|
||
| **可靠性** | 消息持久化 + 单条 ACK + 智能重试,避免数据丢失 |
|
||
| **性能** | Prefetch + 批处理 + 数据库批量写入,提升吞吐 |
|
||
| **顺序性** | 单队列 FIFO 保证严格顺序 |
|
||
| **扩展性** | 适配器模式,新增平台只需添加适配器类 |
|
||
| **运维** | 队列数量少,监控简单,故障排查容易 |
|
||
| **容错** | DLX 智能重试,失败消息不阻塞主队列 |
|
||
|
||
### 适用场景
|
||
|
||
✅ **适合**:
|
||
- 多平台数据同步(电商、物流、金融等)
|
||
- 需要严格 FIFO 顺序
|
||
- 需要高可靠性和高吞吐
|
||
- 平台数量多(> 5个),数据类型固定(< 10个)
|
||
|
||
❌ **不适合**:
|
||
- 实时性要求极高(< 100ms)
|
||
- 需要按平台独立控制消费速率
|
||
- 平台数量少(< 3个),可以使用独立队列
|
||
|
||
---
|
||
|
||
## Requeue 机制详解
|
||
|
||
### requeue 参数的作用
|
||
|
||
在 RabbitMQ 中,当消费者处理消息失败时,可以通过 `basic_nack()` 或 `basic_reject()` 拒绝消息,第三个参数 `requeue` 决定了消息的去向:
|
||
|
||
```php
|
||
// requeue=true:消息重新回到原队列(队列头部)
|
||
$channel->basic_nack($deliveryTag, false, true);
|
||
|
||
// requeue=false:消息进入 DLX(如果配置了),否则丢弃
|
||
$channel->basic_nack($deliveryTag, false, false);
|
||
```
|
||
|
||
### requeue=true 的问题
|
||
|
||
| 行为 | 后果 |
|
||
|------|------|
|
||
| 消息回到原队列头部 | 立即被同一个消费者再次取出 |
|
||
| 不触发 DLX | 永远不会进入重试队列或错误队列 |
|
||
| 无延迟 | 瞬间形成高速循环(毫秒级) |
|
||
| 无计数 | 无法统计重试次数,无法设置上限 |
|
||
|
||
**结果**:消息在 `消费 → 失败 → NACK → 回队列 → 消费` 之间无限循环,CPU 和网络资源被浪费,队列状态显示为空(消息循环太快无法观察)。
|
||
|
||
### requeue=false 的正确用法
|
||
|
||
```
|
||
orders.queue (配置了 x-dead-letter-exchange)
|
||
↓ Consumer 消费失败
|
||
↓ basic_nack(requeue=false)
|
||
↓ 触发 DLX 机制
|
||
↓
|
||
dlx.orders (死信交换机)
|
||
↓ routing_key="retry"
|
||
↓
|
||
orders.retry.queue (TTL=5s)
|
||
↓ 5秒后 TTL 到期
|
||
↓ 自动死信回 main.exchange
|
||
↓
|
||
main.exchange
|
||
↓ routing_key="order.#"
|
||
↓
|
||
orders.queue (重新进入主队列,x-death count+1)
|
||
```
|
||
|
||
### 环境变量配置
|
||
|
||
在 `.env` 文件中可以配置以下参数:
|
||
|
||
```bash
|
||
# 最大重试次数(默认3次)
|
||
# 消息重试超过此次数后,会被发送到 errors.queue
|
||
AMQP_MAX_RETRIES=3
|
||
|
||
# 调试延迟(秒,默认0)
|
||
# 设置为 2 可以让每条消息处理延迟2秒,方便在 mq:status 中观察队列状态
|
||
# 生产环境应设置为 0
|
||
AMQP_CONSUMER_DEBUG_DELAY=0
|
||
```
|
||
|
||
**使用示例**:
|
||
|
||
```bash
|
||
# 开发环境:观察消息流转
|
||
AMQP_CONSUMER_DEBUG_DELAY=2 php bin/hyperf.php start
|
||
|
||
# 生产环境:正常运行
|
||
AMQP_CONSUMER_DEBUG_DELAY=0 php bin/hyperf.php start
|
||
```
|
||
|
||
---
|
||
|
||
## 架构设计
|
||
|
||
### 1. VHost 设计
|
||
|
||
VHost 按业务应用进行划分,便于资源隔离和权限管理。
|
||
|
||
```
|
||
RabbitMQ 实例
|
||
├── dataflow-app # Dataflow 应用专用
|
||
├── analytics-app # 分析应用(预留)
|
||
└── warehouse-app # 数据仓库应用(预留)
|
||
```
|
||
|
||
**当前需要创建的 VHost**:
|
||
- `dataflow-app`:用于电商数据流转
|
||
|
||
---
|
||
|
||
### 2. Exchange 设计
|
||
|
||
在 `dataflow-app` VHost 中,Exchange 分为两类:
|
||
|
||
#### 业务 Exchange(用于数据投递)
|
||
|
||
每个平台一个独立的 Topic Exchange:
|
||
|
||
| Exchange 名称 | 类型 | 持久化 | 用途 |
|
||
|--------------|------|--------|------|
|
||
| `douyin.exchange` | Topic | 是 | DouYin(抖音)平台数据入口 |
|
||
| `jd.exchange` | Topic | 是 | JD(京东)平台数据入口 |
|
||
| `tmall.exchange` | Topic | 是 | Tmall(天猫)平台数据入口 |
|
||
| `redbook.exchange` | Topic | 是 | RedBook(小红书)平台数据入口 |
|
||
| `amazon.exchange` | Topic | 是 | Amazon Japan 平台数据入口 |
|
||
| `naver.exchange` | Topic | 是 | Naver 平台数据入口 |
|
||
| `rakuten.exchange` | Topic | 是 | Rakuten(乐天)平台数据入口 |
|
||
| `shopify.exchange` | Topic | 是 | Shopify 平台数据入口 |
|
||
|
||
**Routing Key 规范**(采用 `{data_type}.{platform}` 格式):
|
||
|
||
| 平台 | 订单 | 产品 | 退款 | 库存 |
|
||
|------|------|------|------|------|
|
||
| DouYin | `order.douyin` | `product.douyin` | `refund.douyin` | `inventory.douyin` |
|
||
| JD | `order.jd` | `product.jd` | `refund.jd` | `inventory.jd` |
|
||
| Tmall | `order.tmall` | `product.tmall` | `refund.tmall` | `inventory.tmall` |
|
||
| RedBook | `order.redbook` | `product.redbook` | `refund.redbook` | `inventory.redbook` |
|
||
| Amazon | `order.amazon` | `product.amazon` | `refund.amazon` | `inventory.amazon` |
|
||
| Naver | `order.naver` | `product.naver` | `refund.naver` | `inventory.naver` |
|
||
| Rakuten | `order.rakuten` | `product.rakuten` | `refund.rakuten` | `inventory.rakuten` |
|
||
| Shopify | `order.shopify` | `product.shopify` | `refund.shopify` | `inventory.shopify` |
|
||
|
||
**设计说明**:
|
||
- 使用 Topic Exchange 的通配符特性,队列可通过 `order.#` 匹配所有平台的订单
|
||
- 保证单队列内所有平台消息的 FIFO 顺序
|
||
- 消费者通过解析 routing key 或消息体的 platform 字段进行业务分发
|
||
|
||
#### 错误 Exchange(用于错误通知)
|
||
|
||
每个平台一个独立的错误 Topic Exchange:
|
||
|
||
| Exchange 名称 | 类型 | 持久化 | 用途 |
|
||
|--------------|------|--------|------|
|
||
| `douyin.errors.exchange` | Topic | 是 | DouYin 平台错误消息 |
|
||
| `jd.errors.exchange` | Topic | 是 | JD 平台错误消息 |
|
||
| `tmall.errors.exchange` | Topic | 是 | Tmall 平台错误消息 |
|
||
| `redbook.errors.exchange` | Topic | 是 | RedBook 平台错误消息 |
|
||
| `amazon.errors.exchange` | Topic | 是 | Amazon Japan 平台错误消息 |
|
||
| `naver.errors.exchange` | Topic | 是 | Naver 平台错误消息 |
|
||
| `rakuten.errors.exchange` | Topic | 是 | Rakuten 平台错误消息 |
|
||
| `shopify.errors.exchange` | Topic | 是 | Shopify 平台错误消息 |
|
||
|
||
**Routing Key 规范**:
|
||
- `validation` - 数据格式验证错误
|
||
- `processing_failed` - 业务处理失败
|
||
- `system` - 系统错误
|
||
|
||
---
|
||
|
||
### 3. Queue 设计
|
||
|
||
#### 业务队列(主队列)
|
||
|
||
物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据:
|
||
|
||
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | DLX | 用途 |
|
||
|-----------|--------|----------|----------|-----|------|
|
||
| `orders.queue` | 是 | 否 | 24小时 | `dlx.orders` | 接收所有平台的订单数据,单消费者批处理 |
|
||
| `products.queue` | 是 | 否 | 24小时 | `dlx.products` | 接收所有平台的产品数据,单消费者批处理 |
|
||
| `refunds.queue` | 是 | 否 | 24小时 | `dlx.refunds` | 接收所有平台的退款数据,单消费者批处理 |
|
||
| `inventory.queue` | 是 | 否 | 24小时 | `dlx.inventory` | 接收所有平台的库存数据,单消费者批处理 |
|
||
|
||
**队列参数配置**:
|
||
```json
|
||
{
|
||
"x-message-ttl": 86400000,
|
||
"x-dead-letter-exchange": "dlx.orders",
|
||
"x-dead-letter-routing-key": "retry"
|
||
}
|
||
```
|
||
|
||
#### 死信交换机(DLX)
|
||
|
||
每种数据类型配备一个 DLX,用于处理失败消息的路由:
|
||
|
||
| Exchange 名称 | 类型 | 持久化 | 用途 |
|
||
|--------------|------|--------|------|
|
||
| `dlx.orders` | Topic | 是 | 订单失败消息路由 |
|
||
| `dlx.products` | Topic | 是 | 产品失败消息路由 |
|
||
| `dlx.refunds` | Topic | 是 | 退款失败消息路由 |
|
||
| `dlx.inventory` | Topic | 是 | 库存失败消息路由 |
|
||
|
||
#### 延迟重试队列
|
||
|
||
为每种数据类型提供延迟重试机制:
|
||
|
||
| Queue 名称 | 持久化 | 消息 TTL | DLX | 用途 |
|
||
|-----------|--------|----------|-----|------|
|
||
| `orders.retry.queue` | 是 | 5秒 | `main.exchange` | 订单延迟重试(TTL后自动回到主队列)|
|
||
| `products.retry.queue` | 是 | 5秒 | `main.exchange` | 产品延迟重试 |
|
||
| `refunds.retry.queue` | 是 | 5秒 | `main.exchange` | 退款延迟重试 |
|
||
| `inventory.retry.queue` | 是 | 5秒 | `main.exchange` | 库存延迟重试 |
|
||
|
||
**重试队列参数配置**:
|
||
```json
|
||
{
|
||
"x-message-ttl": 5000,
|
||
"x-dead-letter-exchange": "main.exchange",
|
||
"x-dead-letter-routing-key": "order.{platform}"
|
||
}
|
||
```
|
||
|
||
**重试机制说明(方案B实现)**:
|
||
1. 消费者处理失败时,检查消息的 `x-death` header 中的 count 字段获取重试次数
|
||
2. 根据重试次数决定处理方式:
|
||
- **count < 3**:执行 `nack(requeue=false)`,消息进入 DLX → 路由到 `retry.queue`(延迟重试)
|
||
- **count >= 3**:消费者主动发送到 `errors.queue`,然后返回 ACK(避免再次重试)
|
||
3. 重试队列 TTL(5秒)到期后,消息自动死信回主 Exchange,重新进入主队列
|
||
4. 配置参数:
|
||
- **最大重试次数**:3次(可通过 `AMQP_MAX_RETRIES` 环境变量配置)
|
||
- **延迟时间**:5秒(retry 队列的 `x-message-ttl`)
|
||
- **调试延迟**:可通过 `AMQP_CONSUMER_DEBUG_DELAY` 设置处理延迟,方便观察队列状态
|
||
|
||
#### 错误队列
|
||
|
||
统一的错误队列,接收所有超过重试次数的失败消息:
|
||
|
||
| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | 用途 |
|
||
|-----------|--------|----------|----------|------|
|
||
| `errors.queue` | 是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 |
|
||
|
||
**错误队列的作用和机制**:
|
||
- ✅ **收集永久失败的消息**:当消息重试次数达到上限(默认3次)后,消费者会主动将消息发送到此队列
|
||
- ✅ **避免消息丢失**:即使处理失败多次,消息也会被保存在错误队列中,不会丢失
|
||
- ✅ **人工排查和修复**:运维人员可以从错误队列中查看失败原因,修复问题后重新投递
|
||
- ✅ **统一管理**:所有数据类型(orders/products/refunds/inventory)的错误消息都集中在一个队列
|
||
- ⚠️ **不会自动重试**:错误队列中的消息需要人工介入,不会自动回流到主队列
|
||
|
||
**错误消息格式**:
|
||
```json
|
||
{
|
||
"error_id": "err_675d8f3a2b1c4",
|
||
"original_message": { /* 原始消息内容 */ },
|
||
"error": {
|
||
"type": "InvalidArgumentException",
|
||
"message": "Missing required field: total_amount",
|
||
"trace": "Exception stack trace...",
|
||
"timestamp": "2025-01-15T10:31:00+00:00"
|
||
},
|
||
"metadata": {
|
||
"platform": "tmall",
|
||
"platform_id": 2,
|
||
"company_id": 188,
|
||
"store_id": 292,
|
||
"data_type": "order",
|
||
"failed_at": "2025-01-15T10:31:00+00:00",
|
||
"retry_count": 3
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 4. Binding 配置
|
||
|
||
#### 主 Exchange (用于接收各平台数据)
|
||
|
||
需要创建一个统一的主 Exchange,用于重试队列的消息回流:
|
||
|
||
| Exchange 名称 | 类型 | 持久化 | 用途 |
|
||
|--------------|------|--------|------|
|
||
| `main.exchange` | Topic | 是 | 统一主交换机,用于路由和重试消息回流 |
|
||
|
||
#### 业务队列绑定(主队列)
|
||
|
||
**方案1:直接绑定(推荐)**
|
||
每个平台的 Exchange 使用带平台标识的 routing key 绑定到对应数据类型队列:
|
||
|
||
```
|
||
douyin.exchange
|
||
├── [routing_key: order.douyin] → orders.queue
|
||
├── [routing_key: product.douyin] → products.queue
|
||
├── [routing_key: refund.douyin] → refunds.queue
|
||
└── [routing_key: inventory.douyin] → inventory.queue
|
||
|
||
tmall.exchange
|
||
├── [routing_key: order.tmall] → orders.queue
|
||
├── [routing_key: product.tmall] → products.queue
|
||
├── [routing_key: refund.tmall] → refunds.queue
|
||
└── [routing_key: inventory.tmall] → inventory.queue
|
||
|
||
... (其他平台类似)
|
||
```
|
||
|
||
**方案2:通过主 Exchange(备选)**
|
||
平台 Exchange 先发到主 Exchange,再由主 Exchange 统一路由:
|
||
|
||
```
|
||
douyin.exchange → [routing_key: order.douyin] → main.exchange
|
||
|
||
main.exchange
|
||
├── [routing_key: order.#] → orders.queue
|
||
├── [routing_key: product.#] → products.queue
|
||
├── [routing_key: refund.#] → refunds.queue
|
||
└── [routing_key: inventory.#] → inventory.queue
|
||
```
|
||
|
||
**推荐使用方案1**,简化架构,减少消息跳转。
|
||
|
||
#### DLX 绑定(重试路由)
|
||
|
||
每个 DLX 根据消息 header 中的 `x-retries` 决定路由目标:
|
||
|
||
```
|
||
dlx.orders
|
||
├── [routing_key: retry, x-retries < 3] → orders.retry.queue
|
||
└── [routing_key: error, x-retries >= 3] → errors.queue
|
||
|
||
dlx.products
|
||
├── [routing_key: retry] → products.retry.queue
|
||
└── [routing_key: error] → errors.queue
|
||
|
||
dlx.refunds
|
||
├── [routing_key: retry] → refunds.retry.queue
|
||
└── [routing_key: error] → errors.queue
|
||
|
||
dlx.inventory
|
||
├── [routing_key: retry] → inventory.retry.queue
|
||
└── [routing_key: error] → errors.queue
|
||
```
|
||
|
||
**注意**:RabbitMQ 原生不支持基于 header 的条件路由,需要:
|
||
- **方案A**:消费者在 `nack` 时根据重试次数决定发送到 DLX 的 routing key(`retry` 或 `error`)
|
||
- **方案B**:使用 RabbitMQ 插件(如 `rabbitmq_message_routing_by_headers`)实现条件路由
|
||
|
||
**推荐方案A**:消费者控制重试逻辑,更灵活可控。
|
||
|
||
#### 重试队列绑定(回流主队列)
|
||
|
||
重试队列 TTL 到期后,消息自动死信回主 Exchange:
|
||
|
||
```
|
||
orders.retry.queue
|
||
└── [TTL=5s后 死信到 main.exchange,routing_key保持原始值:order.{platform}]
|
||
|
||
main.exchange
|
||
└── [routing_key: order.#] → orders.queue
|
||
```
|
||
|
||
这样实现了"失败 → 延迟5秒 → 重新进入主队列"的循环。
|
||
|
||
#### 错误队列绑定
|
||
|
||
每个平台的错误 Exchange 和所有 DLX 都绑定到统一错误队列:
|
||
|
||
```
|
||
douyin.errors.exchange
|
||
└── [routing_key: #] → errors.queue
|
||
|
||
dlx.orders
|
||
└── [routing_key: error] → errors.queue
|
||
|
||
dlx.products
|
||
└── [routing_key: error] → errors.queue
|
||
|
||
... (其他类型类似)
|
||
```
|
||
|
||
**说明**:使用 `#` 通配符接收所有错误类型。
|
||
|
||
---
|
||
|
||
## 消费者设计
|
||
|
||
### 1. 消费者模型
|
||
|
||
采用 **单消费者 + 批处理 + prefetch + 适配器模式**:
|
||
|
||
| 数据类型 | 消费者 | Queue | 并发模式 | 处理模式 |
|
||
|---------|--------|-------|----------|---------|
|
||
| 订单 | Order Consumer | `orders.queue` | 单消费者 | 批处理 + 适配器 |
|
||
| 产品 | Product Consumer | `products.queue` | 单消费者 | 批处理 + 适配器 |
|
||
| 退款 | Refund Consumer | `refunds.queue` | 单消费者 | 批处理 + 适配器 |
|
||
| 库存 | Inventory Consumer | `inventory.queue` | 单消费者 | 批处理 + 适配器 |
|
||
|
||
**设计原则**:
|
||
- ✅ **单消费者**:每个队列只有一个消费者实例,保证严格 FIFO 顺序
|
||
- ✅ **批处理**:积累一批消息后统一处理,提升数据库写入效率
|
||
- ✅ **单条 ACK**:每条消息独立确认,失败不影响其他消息
|
||
- ✅ **适配器模式**:消费者内部根据平台字段分发到不同业务逻辑
|
||
|
||
|
||
**消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **
|
||
### 2. Prefetch 配置
|
||
|
||
```json
|
||
{
|
||
"prefetch_count": 100,
|
||
"batch_size": 50,
|
||
"batch_timeout": 3000
|
||
}
|
||
```
|
||
|
||
**参数说明**:
|
||
- `prefetch_count=100`:RabbitMQ 最多推送 100 条未确认消息到消费者
|
||
- `batch_size=50`:消费者积累 50 条消息后触发批处理
|
||
- `batch_timeout=3000`:如果3秒内未达到 batch_size,也触发处理
|
||
|
||
**工作流程**:
|
||
1. RabbitMQ 按 Push 模式推送最多 100 条消息到消费者
|
||
2. 消费者缓存消息,达到 50 条或超时 3 秒后批量处理
|
||
3. 批量处理时,仍然对每条消息单独 ACK/NACK
|
||
4. 实现"伪批量拉取"效果,同时保留单条确认的灵活性
|
||
|
||
### 3. 消费者处理逻辑(伪代码)
|
||
|
||
```python
|
||
class OrderConsumer:
|
||
def __init__(self):
|
||
self.prefetch_count = 100
|
||
self.batch_size = 50
|
||
self.batch_timeout = 3
|
||
self.message_buffer = []
|
||
self.adapters = {
|
||
'douyin': DouyinAdapter(),
|
||
'tmall': TmallAdapter(),
|
||
'jd': JDAdapter(),
|
||
# ... 其他平台适配器
|
||
}
|
||
|
||
def on_message(self, channel, method, properties, body):
|
||
"""接收到消息时的回调"""
|
||
message = json.loads(body)
|
||
self.message_buffer.append({
|
||
'channel': channel,
|
||
'method': method,
|
||
'properties': properties,
|
||
'body': message
|
||
})
|
||
|
||
# 达到批量大小,触发处理
|
||
if len(self.message_buffer) >= self.batch_size:
|
||
self.process_batch()
|
||
|
||
def process_batch(self):
|
||
"""批量处理消息"""
|
||
for msg_data in self.message_buffer:
|
||
try:
|
||
message = msg_data['body']
|
||
platform = message['platform']
|
||
|
||
# 使用适配器模式分发到对应平台逻辑
|
||
adapter = self.adapters.get(platform)
|
||
if not adapter:
|
||
raise ValueError(f"Unknown platform: {platform}")
|
||
|
||
# 处理业务逻辑
|
||
adapter.process(message)
|
||
|
||
# 成功:ACK
|
||
msg_data['channel'].basic_ack(msg_data['method'].delivery_tag)
|
||
|
||
except TemporaryError as e:
|
||
# 临时错误:重试
|
||
retry_count = msg_data['properties'].headers.get('x-retries', 0)
|
||
|
||
if retry_count < 3:
|
||
# 更新重试次数,发送到 DLX 进行延迟重试
|
||
headers = {'x-retries': retry_count + 1}
|
||
msg_data['channel'].basic_nack(
|
||
msg_data['method'].delivery_tag,
|
||
requeue=False
|
||
)
|
||
else:
|
||
# 超过重试次数,发送到错误队列
|
||
self.send_to_error_queue(msg_data)
|
||
msg_data['channel'].basic_nack(
|
||
msg_data['method'].delivery_tag,
|
||
requeue=False
|
||
)
|
||
|
||
except PermanentError as e:
|
||
# 永久错误:直接进入错误队列
|
||
self.send_to_error_queue(msg_data)
|
||
msg_data['channel'].basic_nack(
|
||
msg_data['method'].delivery_tag,
|
||
requeue=False
|
||
)
|
||
|
||
# 清空缓冲区
|
||
self.message_buffer.clear()
|
||
|
||
def send_to_error_queue(self, msg_data):
|
||
"""发送到错误队列"""
|
||
error_message = {
|
||
'original_message': msg_data['body'],
|
||
'error': str(e),
|
||
'timestamp': datetime.now().isoformat()
|
||
}
|
||
# 发送到错误 Exchange
|
||
self.channel.basic_publish(
|
||
exchange='douyin.errors.exchange',
|
||
routing_key='error',
|
||
body=json.dumps(error_message)
|
||
)
|
||
```
|
||
|
||
### 4. 重试机制工作流程(方案B实现)
|
||
|
||
```
|
||
┌─────────────────┐
|
||
│ orders.queue │ ← 消费者从这里接收消息
|
||
└────────┬────────┘
|
||
│
|
||
├─ 成功 → basic_ack()
|
||
│
|
||
└─ 失败 → Consumer检查 x-death count
|
||
│
|
||
├─ count < 3 (未超过重试次数)
|
||
│ │
|
||
│ └→ basic_nack(requeue=false)
|
||
│ │
|
||
│ ▼
|
||
│ ┌──────────────────┐
|
||
│ │ dlx.orders │ ← 死信交换机
|
||
│ └────────┬─────────┘
|
||
│ │
|
||
│ └→ routing_key="retry"
|
||
│ │
|
||
│ ▼
|
||
│ orders.retry.queue (TTL=5s)
|
||
│ │
|
||
│ └─ TTL到期 → 自动死信回 main.exchange
|
||
│ │
|
||
│ └─ order.# → orders.queue
|
||
│ (x-death count自动+1)
|
||
│
|
||
└─ count >= 3 (超过重试次数)
|
||
│
|
||
└→ Consumer主动发送到 errors.queue
|
||
│
|
||
└→ basic_ack() (避免再次重试)
|
||
│
|
||
▼
|
||
errors.queue (人工处理)
|
||
```
|
||
|
||
**关键点**:
|
||
- ✅ **requeue=false**:让失败消息进入 DLX,而不是回到原队列
|
||
- ✅ **retry 队列自动回流**:TTL 到期后自动死信回主队列,不需要额外消费者
|
||
- ✅ **x-death count 自动累加**:每次死信 RabbitMQ 会自动增加 count
|
||
- ✅ **Consumer 控制路由**:超过重试次数时,Consumer 主动发送到 error 队列并 ACK
|
||
- ⚠️ **不能使用动态 routing key**:DLX 的 routing key 在队列配置中是固定的,无法基于 count 动态改变
|
||
|
||
### 5. 为什么这样设计?
|
||
|
||
| 特性 | 原因 | 优势 |
|
||
|------|------|------|
|
||
| **单队列 FIFO** | RabbitMQ 无法在队列内部按 topic 过滤 | 保证严格 FIFO,不拆分队列造成业务乱序 |
|
||
| **单消费者** | RabbitMQ 同队列多消费者必然 Round-Robin 分配 | 避免消息错投到错误的消费者 |
|
||
| **Prefetch + 批处理** | RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 | 实现"伪批量拉取",提升吞吐 |
|
||
| **单条 ACK** | 保留细粒度控制 | 失败消息不影响其他消息 |
|
||
| **DLX + 延迟重试** | 避免失败消息阻塞队列 | 不会卡 FIFO,不会无限重试 |
|
||
| **适配器模式** | 多平台业务逻辑隔离 | 扩展性好,易维护 |
|
||
|
||
---
|
||
|
||
## 用户权限配置
|
||
|
||
### 1. 平台开发者账号
|
||
|
||
每个平台分配一个专用账号,只能访问自己的 Exchange。
|
||
|
||
#### DouYin 平台账号
|
||
|
||
```
|
||
用户名: user_douyin
|
||
密码: <安全密码>
|
||
VHost: dataflow-app
|
||
|
||
权限配置:
|
||
- Configure: (留空) - 不允许创建/删除资源
|
||
- Write: ^douyin\.(exchange|errors\.exchange)$ - 只能写入 DouYin 的业务和错误 Exchange
|
||
- Read: ^douyin\.errors\..*$ - 只能读取 DouYin 错误相关的队列
|
||
```
|
||
|
||
#### Tmall 平台账号
|
||
|
||
```
|
||
用户名: user_tmall
|
||
密码: <安全密码>
|
||
VHost: dataflow-app
|
||
|
||
权限配置:
|
||
- Configure: (留空)
|
||
- Write: ^tmall\.(exchange|errors\.exchange)$
|
||
- Read: ^tmall\.errors\..*$
|
||
```
|
||
|
||
#### 其他平台账号
|
||
|
||
按相同模式创建:
|
||
- JD: `user_jd`
|
||
- RedBook: `user_redbook`
|
||
- Amazon Japan: `user_amazon`
|
||
- Naver: `user_naver`
|
||
- Rakuten: `user_rakuten`
|
||
- Shopify: `user_shopify`
|
||
|
||
### 2. Dataflow 消费者账号
|
||
|
||
Dataflow 应用的后端服务账号,负责消费业务队列并处理错误。
|
||
|
||
```
|
||
用户名: user_dataflow_consumer
|
||
密码: <安全密码>
|
||
VHost: dataflow-app
|
||
|
||
权限配置:
|
||
- Configure: ^(orders|products|refunds|inventory)\.queue$ - 可以管理业务队列
|
||
- Write: .*\.errors\.exchange$ - 可以向所有错误 Exchange 写入消息
|
||
- Read: ^(orders|products|refunds|inventory)\.queue$ - 可以读取所有业务队列
|
||
```
|
||
|
||
### 3. 运维监控账号
|
||
|
||
运维团队账号,用于监控所有错误消息。
|
||
|
||
```
|
||
用户名: user_ops
|
||
密码: <安全密码>
|
||
VHost: dataflow-app
|
||
|
||
权限配置:
|
||
- Configure: ^errors\..*$ - 可以管理错误相关资源
|
||
- Write: (留空) - 不需要写入权限
|
||
- Read: ^errors\.queue$ - 可以读取统一错误队列
|
||
```
|
||
|
||
### 4. 管理员账号
|
||
|
||
RabbitMQ 超级管理员,用于配置和维护。
|
||
|
||
```
|
||
用户名: admin
|
||
密码: <强密码>
|
||
Tag: administrator
|
||
|
||
权限配置:
|
||
- 对所有 VHost 具有完全权限
|
||
- 可以访问管理界面
|
||
```
|
||
|
||
---
|
||
|
||
## 消息格式规范
|
||
|
||
### 1. message_id 设计
|
||
|
||
message_id 采用结构化格式,确保幂等性和可追溯性。
|
||
|
||
**格式**:
|
||
```
|
||
{company_id}#{platform_id}#{store_id}#{entity_type}#{request_time_range/formated_suffix}
|
||
```
|
||
|
||
**字段说明**:
|
||
- `prefix`:项目名称前缀(如 `wpic-project1`)
|
||
- `app_id`:应用标识(固定为 `dataflow`)
|
||
- `company_id`:公司 ID
|
||
- `platform_id`:平台 ID
|
||
- `store_id`:店铺 ID
|
||
- `entity_type`:数据实体类型(order/product/refund/inventory)
|
||
- `request_time_range/formated_suffix`:数据请求的时间区间/格式化的后缀,业务侧决定即可,需要业务侧维护幂等性
|
||
|
||
**分隔符说明**:
|
||
- 使用 `#` 作为分隔符(RabbitMQ message_id 字段支持任意字符)
|
||
- `#` 在 routing key 中是通配符,但在 message_id 中是普通字符
|
||
|
||
**示例**:
|
||
```
|
||
wpic-project1#dataflow#100#20#200#order#2025-11-10~2025-11-15
|
||
|
||
解析:
|
||
- 项目前缀: wpic-project1
|
||
- 应用: dataflow
|
||
- 公司ID: 100
|
||
- 平台ID: 20 (DouYin)
|
||
- 店铺ID: 200
|
||
- 实体类型: order
|
||
- 平台订单号: DY123456789
|
||
```
|
||
|
||
**特性**:
|
||
- ✅ 幂等性:同一实体的 message_id 完全一致
|
||
- ✅ 可读性:直接看出数据来源
|
||
- ✅ 便于调试:可从 message_id 提取元数据
|
||
- ✅ 无冲突:全局唯一
|
||
|
||
### 2. 业务消息格式
|
||
|
||
```json
|
||
{
|
||
"message_id": "wpic-project1#dataflow#100#20#200#order#DY123456",
|
||
"timestamp": "2025-01-14T10:30:00Z",
|
||
"platform": "douyin",
|
||
"data_type": "order",
|
||
|
||
"metadata": {
|
||
"platform_id": 20,
|
||
"company_id": 100,
|
||
"store_id": 200,
|
||
"source_system": "douyin-open-api",
|
||
"retry_count": 0,
|
||
"data_version": 1736840000
|
||
},
|
||
|
||
"data": {
|
||
"platform_unique_id": "DY123456",
|
||
"raw_data": {
|
||
// 平台原始完整数据
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
**字段说明**:
|
||
|
||
| 字段 | 类型 | 必需 | 说明 |
|
||
|------|------|------|------|
|
||
| `message_id` | string | 是 | 结构化消息 ID,用于幂等性 |
|
||
| `timestamp` | string | 是 | 消息生成时间(ISO 8601 格式) |
|
||
| `platform` | string | 是 | 平台标识(douyin/tmall/jd...) |
|
||
| `data_type` | string | 是 | 数据类型(order/product/refund/inventory) |
|
||
| `metadata.platform_id` | int | 是 | 平台 ID(数据库主键) |
|
||
| `metadata.company_id` | int | 是 | 公司 ID |
|
||
| `metadata.store_id` | int | 是 | 店铺 ID |
|
||
| `metadata.source_system` | string | 是 | 数据来源系统 |
|
||
| `metadata.retry_count` | int | 是 | 重试次数(初始为 0) |
|
||
| `metadata.data_version` | int | 是 | 数据版本(Unix 时间戳,用于防止乱序更新) |
|
||
| `data.platform_unique_id` | string | 是 | 平台侧唯一标识 |
|
||
| `data.raw_data` | object | 是 | 平台原始完整 JSON 数据 |
|
||
|
||
**重要说明**:
|
||
|
||
- **data_version**:必须使用平台数据的更新时间戳(Unix 时间戳),用于解决消息乱序消费问题
|
||
- 示例:平台订单的最后更新时间 `1736840000`(2025-01-14 10:00:00 UTC)
|
||
- 消费端只有当 `data_version` 大于数据库中的值时才更新数据
|
||
- 如果平台未提供更新时间,使用消息生成时间戳
|
||
|
||
### 3. 错误消息格式
|
||
|
||
错误消息由 dataflow 消费者生成,包含原始消息和错误详情。
|
||
|
||
```json
|
||
{
|
||
"error_id": "err_1234567890abcdef",
|
||
"original_message": {
|
||
// 原始业务消息完整内容
|
||
},
|
||
"error": {
|
||
"type": "validation",
|
||
"message": "Missing required field: total_amount",
|
||
"trace": "Exception stack trace...",
|
||
"timestamp": "2025-01-14T10:31:00Z"
|
||
},
|
||
"metadata": {
|
||
"platform": "amazon",
|
||
"platform_id": 1,
|
||
"company_id": 100,
|
||
"store_id": 200,
|
||
"data_type": "order",
|
||
"failed_at": "2025-01-14T10:31:00Z"
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## RabbitMQ 管理界面操作指引
|
||
|
||
### 1. 访问管理界面
|
||
|
||
```
|
||
URL: http://<rabbitmq-host>:15672
|
||
用户名: admin
|
||
密码: <管理员密码>
|
||
```
|
||
|
||
### 2. 创建 VHost
|
||
|
||
1. 点击顶部导航 **Admin** → **Virtual Hosts**
|
||
2. 点击 **Add a new virtual host**
|
||
3. 输入 VHost 名称:`dataflow-app`
|
||
4. 点击 **Add virtual host**
|
||
|
||
### 3. 创建用户
|
||
|
||
1. 点击顶部导航 **Admin** → **Users**
|
||
2. 点击 **Add a user**
|
||
3. 填写用户信息:
|
||
- **Username**: `user_amazon`
|
||
- **Password**: `<安全密码>`
|
||
- **Tags**: (留空,非管理员用户)
|
||
4. 点击 **Add user**
|
||
|
||
### 4. 配置用户权限
|
||
|
||
1. 在用户列表中,点击用户名(如 `user_amazon`)
|
||
2. 滚动到 **Permissions** 部分
|
||
3. 选择 VHost:`dataflow-app`
|
||
4. 配置权限正则表达式:
|
||
- **Configure regexp**: (留空)
|
||
- **Write regexp**: `^amazon\.(exchange|errors\.exchange)$`
|
||
- **Read regexp**: `^amazon\.errors\..*$`
|
||
5. 点击 **Set permission**
|
||
|
||
### 5. 创建 Exchange
|
||
|
||
1. 点击顶部导航 **Exchanges**
|
||
2. 确保选择正确的 VHost:`dataflow-app`
|
||
3. 点击 **Add a new exchange**
|
||
4. 填写配置:
|
||
- **Name**: `amazon.exchange`
|
||
- **Type**: `topic`
|
||
- **Durability**: `Durable`
|
||
- **Auto delete**: `No`
|
||
- **Internal**: `No`
|
||
5. 点击 **Add exchange**
|
||
|
||
重复以上步骤创建所有业务 Exchange 和错误 Exchange。
|
||
|
||
### 6. 创建 Queue
|
||
|
||
1. 点击顶部导航 **Queues**
|
||
2. 确保选择正确的 VHost:`dataflow-app`
|
||
3. 点击 **Add a new queue**
|
||
4. 填写配置:
|
||
- **Name**: `orders.queue`
|
||
- **Durability**: `Durable`
|
||
- **Auto delete**: `No`
|
||
- **Arguments**:
|
||
- `x-message-ttl`: `86400000` (24小时,单位:毫秒)
|
||
5. 点击 **Add queue**
|
||
|
||
重复以上步骤创建 `products.queue`、`refunds.queue` 和 `errors.queue`。
|
||
|
||
**注意**:`errors.queue` 的 TTL 设置为 `604800000`(7天)。
|
||
|
||
### 7. 创建 Binding
|
||
|
||
1. 点击顶部导航 **Exchanges**
|
||
2. 点击需要绑定的 Exchange(如 `amazon.exchange`)
|
||
3. 滚动到 **Bindings** 部分
|
||
4. 在 **Add binding from this exchange** 区域填写:
|
||
- **To queue**: `orders.queue`
|
||
- **Routing key**: `order`
|
||
5. 点击 **Bind**
|
||
|
||
为每个 Exchange 创建 3 个绑定(order/product/refund)。
|
||
|
||
对于错误 Exchange,绑定配置:
|
||
- **To queue**: `errors.queue`
|
||
- **Routing key**: `#`
|
||
|
||
---
|
||
|
||
## 配置脚本
|
||
|
||
为简化配置过程,可以使用以下脚本自动创建资源。
|
||
|
||
### 使用 rabbitmqadmin
|
||
|
||
RabbitMQ 提供命令行工具 `rabbitmqadmin`,可用于批量配置。
|
||
|
||
#### 安装 rabbitmqadmin
|
||
|
||
```bash
|
||
# 从 RabbitMQ 管理界面下载
|
||
wget http://<rabbitmq-host>:15672/cli/rabbitmqadmin
|
||
chmod +x rabbitmqadmin
|
||
sudo mv rabbitmqadmin /usr/local/bin/
|
||
```
|
||
|
||
#### 配置脚本示例
|
||
|
||
**注意**:本脚本适用于**新版本 rabbitmqadmin**(Rust 版本)。如果您使用的是旧版本(Python),请参考官方文档调整语法。
|
||
|
||
```bash
|
||
#!/bin/bash
|
||
|
||
# RabbitMQ 连接信息
|
||
RABBITMQ_HOST="127.0.0.1"
|
||
RABBITMQ_PORT="15672"
|
||
RABBITMQ_USER="admin"
|
||
RABBITMQ_PASS="admin"
|
||
VHOST="dataflow"
|
||
|
||
# 平台列表
|
||
PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify")
|
||
DATA_TYPES=("orders" "products" "refunds" "inventory")
|
||
|
||
# 0. 检测并清理现有 VHost 和用户(如果存在)
|
||
echo "========================================"
|
||
echo "开始清理现有配置..."
|
||
echo "========================================"
|
||
|
||
# 检查 VHost 是否存在
|
||
echo "检查 VHost: $VHOST 是否存在..."
|
||
VHOST_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list vhosts | grep -w "$VHOST" || true)
|
||
|
||
if [ -n "$VHOST_EXISTS" ]; then
|
||
echo "警告: VHost '$VHOST' 已存在,将删除所有现有配置..."
|
||
|
||
# 删除 VHost(会自动删除该 VHost 下的所有 Exchanges、Queues、Bindings、权限)
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
delete vhost --name "$VHOST"
|
||
|
||
echo "✓ VHost '$VHOST' 及其所有资源已删除"
|
||
else
|
||
echo "VHost '$VHOST' 不存在"
|
||
fi
|
||
|
||
# 清理所有相关用户账号
|
||
echo ""
|
||
echo "清理用户账号..."
|
||
|
||
# 平台用户列表
|
||
ALL_USERS=()
|
||
for platform in "${PLATFORMS[@]}"; do
|
||
ALL_USERS+=("user_${platform}")
|
||
done
|
||
|
||
# 添加其他用户
|
||
ALL_USERS+=("user_dataflow_consumer")
|
||
ALL_USERS+=("user_ops")
|
||
|
||
# 删除用户(如果存在)
|
||
for user in "${ALL_USERS[@]}"; do
|
||
USER_EXISTS=$(rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS list users | grep -w "$user" || true)
|
||
if [ -n "$USER_EXISTS" ]; then
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
delete user --name "$user"
|
||
echo "✓ 删除用户: $user"
|
||
fi
|
||
done
|
||
|
||
echo ""
|
||
echo "========================================"
|
||
echo "清理完成,开始重新配置..."
|
||
echo "========================================"
|
||
echo ""
|
||
|
||
# 1. 创建 VHost
|
||
echo "创建 VHost: $VHOST"
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare vhost --name "$VHOST"
|
||
|
||
# 2. 创建主 Exchange(用于重试消息回流)
|
||
echo "创建主 Exchange: main.exchange"
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare exchange --name "main.exchange" --vhost "$VHOST" \
|
||
--type topic --durable true
|
||
|
||
# 3. 创建 DLX (死信交换机)
|
||
echo "创建死信交换机 (DLX)..."
|
||
for dtype in "${DATA_TYPES[@]}"; do
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare exchange --name "dlx.${dtype}" --vhost "$VHOST" \
|
||
--type topic --durable true
|
||
echo "✓ 创建 DLX: dlx.${dtype}"
|
||
done
|
||
|
||
# 4. 创建主业务队列(带 DLX 配置)
|
||
echo ""
|
||
echo "创建主业务队列..."
|
||
for dtype in "${DATA_TYPES[@]}"; do
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare queue --name "${dtype}.queue" --vhost "$VHOST" --durable \
|
||
--arguments "{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
|
||
echo "✓ 创建队列: ${dtype}.queue"
|
||
done
|
||
|
||
# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
|
||
echo ""
|
||
echo "创建重试队列..."
|
||
for dtype in "${DATA_TYPES[@]}"; do
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable \
|
||
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}"
|
||
echo "✓ 创建重试队列: ${dtype}.retry.queue"
|
||
done
|
||
|
||
# 6. 创建错误队列
|
||
echo ""
|
||
echo "创建错误队列..."
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare queue --name "errors.queue" --vhost "$VHOST" --durable \
|
||
--arguments '{"x-message-ttl":604800000}'
|
||
echo "✓ 创建错误队列: errors.queue"
|
||
|
||
# 7. 绑定主 Exchange 到主队列(使用通配符)
|
||
echo ""
|
||
echo "绑定主 Exchange 到主队列..."
|
||
for dtype in "${DATA_TYPES[@]}"; do
|
||
# 使用 order.#, product.# 等通配符匹配所有平台
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "main.exchange" --destination "${dtype}.queue" \
|
||
--destination-type queue --vhost "$VHOST" --routing-key "${dtype%s}.#"
|
||
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
|
||
done
|
||
|
||
# 8. 绑定 DLX 到重试队列和错误队列
|
||
echo ""
|
||
echo "绑定 DLX 到重试队列和错误队列..."
|
||
for dtype in "${DATA_TYPES[@]}"; do
|
||
# DLX -> 重试队列
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
|
||
--destination-type queue --vhost "$VHOST" --routing-key "retry"
|
||
echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)"
|
||
|
||
# DLX -> 错误队列
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "dlx.${dtype}" --destination "errors.queue" \
|
||
--destination-type queue --vhost "$VHOST" --routing-key "error"
|
||
echo "✓ 绑定: dlx.${dtype} → errors.queue (routing_key: error)"
|
||
done
|
||
|
||
# 9. 为每个平台创建 Exchange 和 Binding
|
||
echo ""
|
||
echo "========================================"
|
||
echo "创建平台 Exchange 和 Binding..."
|
||
echo "========================================"
|
||
for platform in "${PLATFORMS[@]}"; do
|
||
echo ""
|
||
echo "处理平台: ${platform}"
|
||
|
||
# 创建平台业务 Exchange
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare exchange --name "${platform}.exchange" --vhost "$VHOST" \
|
||
--type topic --durable true
|
||
echo "✓ 创建业务 Exchange: ${platform}.exchange"
|
||
|
||
# 绑定到业务队列(使用新的 routing key 格式:data_type.platform)
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "${platform}.exchange" --destination "orders.queue" \
|
||
--destination-type queue --vhost "$VHOST" --routing-key "order.${platform}"
|
||
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "${platform}.exchange" --destination "products.queue" \
|
||
--destination-type queue --vhost "$VHOST" --routing-key "product.${platform}"
|
||
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "${platform}.exchange" --destination "refunds.queue" \
|
||
--destination-type queue --vhost "$VHOST" --routing-key "refund.${platform}"
|
||
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "${platform}.exchange" --destination "inventory.queue" \
|
||
--destination-type queue --vhost "$VHOST" --routing-key "inventory.${platform}"
|
||
echo "✓ 绑定到业务队列 (4个)"
|
||
|
||
# 创建平台错误 Exchange
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare exchange --name "${platform}.errors.exchange" --vhost "$VHOST" \
|
||
--type topic --durable true
|
||
echo "✓ 创建错误 Exchange: ${platform}.errors.exchange"
|
||
|
||
# 绑定到错误队列
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "${platform}.errors.exchange" --destination "errors.queue" \
|
||
--destination-type queue --vhost "$VHOST" --routing-key "#"
|
||
echo "✓ 绑定到错误队列"
|
||
|
||
# 创建平台用户
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare user --name "user_${platform}" --password "change_me_${platform}" --tags ""
|
||
echo "✓ 创建用户: user_${platform}"
|
||
|
||
# 配置平台用户权限
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare permissions --vhost "$VHOST" --user "user_${platform}" \
|
||
--configure "^${platform}\\.(exchange|errors\\.exchange)$" \
|
||
--write "^${platform}\\.(exchange|errors\\.exchange)$" \
|
||
--read "^${platform}\\.errors\\..*$"
|
||
echo "✓ 配置用户权限"
|
||
done
|
||
|
||
# 10. 创建 dataflow 消费者用户
|
||
echo ""
|
||
echo "========================================"
|
||
echo "创建 dataflow 消费者用户..."
|
||
echo "========================================"
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags ""
|
||
echo "✓ 创建用户: user_dataflow_consumer"
|
||
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \
|
||
--configure "^(orders|products|refunds|inventory).*\\.queue$" \
|
||
--write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \
|
||
--read "^(orders|products|refunds|inventory).*\\.queue$"
|
||
echo "✓ 配置用户权限"
|
||
|
||
# 11. 创建运维监控用户
|
||
echo ""
|
||
echo "创建运维监控用户..."
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare user --name "user_ops" --password "change_me_ops" --tags ""
|
||
echo "✓ 创建用户: user_ops"
|
||
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare permissions --vhost "$VHOST" --user "user_ops" \
|
||
--configure "^errors\\..*$" --write "" --read "^errors\\.queue$"
|
||
echo "✓ 配置用户权限"
|
||
|
||
echo ""
|
||
echo "========================================"
|
||
echo "RabbitMQ 配置完成!"
|
||
echo "========================================"
|
||
echo ""
|
||
echo "已创建:"
|
||
echo "- 1 个 VHost: $VHOST"
|
||
echo "- 1 个主 Exchange: main.exchange"
|
||
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
|
||
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
|
||
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
|
||
echo "- 1 个错误队列: errors.queue"
|
||
echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)"
|
||
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"
|
||
echo ""
|
||
echo "提示: 请修改所有用户的默认密码 (change_me_*)"
|
||
```
|
||
|
||
#### 执行脚本
|
||
|
||
```bash
|
||
chmod +x setup_rabbitmq.sh
|
||
./setup_rabbitmq.sh
|
||
```
|
||
|
||
---
|
||
|
||
## 监控和维护
|
||
|
||
### 1. 监控指标
|
||
|
||
通过 RabbitMQ 管理界面查看以下关键指标:
|
||
|
||
#### 队列健康状态
|
||
|
||
1. 点击 **Queues** → 选择队列
|
||
2. 关注以下指标:
|
||
- **Ready**: 待消费消息数(正常应 < 1000)
|
||
- **Unacked**: 未确认消息数(正常应较低)
|
||
- **Total**: 总消息数
|
||
- **Incoming rate**: 消息入队速率
|
||
- **Deliver / Get rate**: 消息消费速率
|
||
|
||
**告警阈值**:
|
||
- Ready > 10000:队列堆积严重
|
||
- Unacked > 5000:消费者处理缓慢
|
||
- Incoming rate >> Deliver rate:消费速度跟不上生产速度
|
||
|
||
#### Exchange 流量统计
|
||
|
||
1. 点击 **Exchanges** → 选择 Exchange
|
||
2. 查看 **Message rates**:
|
||
- **Publish in**: 消息发布速率
|
||
- **Publish out**: 消息路由到队列的速率
|
||
|
||
**异常情况**:
|
||
- Publish in > 0 但 Publish out = 0:可能绑定配置错误
|
||
|
||
### 2. 常见问题排查
|
||
|
||
#### 消息未到达队列
|
||
|
||
**检查步骤**:
|
||
1. 确认 Exchange 存在且类型正确
|
||
2. 检查 Binding 配置,确保 routing key 匹配
|
||
3. 查看 Exchange 的 **Message rates**,确认消息已发布
|
||
4. 检查生产者是否使用了正确的 routing key
|
||
|
||
#### 消费者无法消费
|
||
|
||
**检查步骤**:
|
||
1. 确认用户权限配置正确(Read 权限)
|
||
2. 检查队列中是否有消息(Ready > 0)
|
||
3. 查看消费者连接状态(Connections 页面)
|
||
4. 检查消费者代码是否正确 ACK 消息
|
||
|
||
#### 错误消息未送达平台开发者
|
||
|
||
**检查步骤**:
|
||
1. 确认错误 Exchange 绑定到 `errors.queue`
|
||
2. 检查平台开发者账号的 Read 权限
|
||
3. 确认平台开发者订阅了正确的 Exchange
|
||
4. 查看 `errors.queue` 中是否有消息
|
||
|
||
### 3. 备份和恢复
|
||
|
||
#### 导出配置
|
||
|
||
```bash
|
||
# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions)
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
export dataflow-backup.json
|
||
```
|
||
|
||
#### 导入配置
|
||
|
||
```bash
|
||
# 从备份恢复配置
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
import dataflow-backup.json
|
||
```
|
||
|
||
### 4. 性能优化
|
||
|
||
#### 消费者并发配置
|
||
|
||
- 增加消费者实例数量
|
||
- 每个消费者配置多个 channel(如 5-10 个)
|
||
- 使用 prefetch_count 限制未确认消息数量
|
||
|
||
#### 队列性能优化
|
||
|
||
- 启用 lazy queue(大量消息堆积时):
|
||
```bash
|
||
rabbitmqadmin declare queue name=orders.queue vhost=dataflow-app \
|
||
durable=true arguments='{"x-queue-mode":"lazy"}'
|
||
```
|
||
- 定期清理过期消息(通过 TTL 自动实现)
|
||
|
||
### 5. 日志查看
|
||
|
||
```bash
|
||
# 查看 RabbitMQ 日志
|
||
tail -f /var/log/rabbitmq/rabbit@<hostname>.log
|
||
|
||
# 查看连接日志
|
||
tail -f /var/log/rabbitmq/rabbit@<hostname>_connection.log
|
||
```
|
||
|
||
---
|
||
|
||
## 安全建议
|
||
|
||
### 1. 密码策略
|
||
|
||
- 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符)
|
||
- 定期更换密码(建议 3-6 个月)
|
||
- 不同环境(开发/测试/生产)使用不同密码
|
||
|
||
### 2. 网络安全
|
||
|
||
- 生产环境启用 TLS/SSL 加密连接
|
||
- 限制管理界面访问(仅允许内网或 VPN 访问)
|
||
- 配置防火墙规则:
|
||
- 5672:AMQP 端口(仅允许应用服务器访问)
|
||
- 15672:管理界面(仅允许管理员 IP 访问)
|
||
|
||
### 3. 权限最小化原则
|
||
|
||
- 平台开发者只能访问自己的 Exchange
|
||
- 消费者只能读取必要的队列
|
||
- 避免赋予不必要的 Configure 权限
|
||
|
||
### 4. 审计日志
|
||
|
||
- 启用 RabbitMQ 审计日志插件
|
||
- 定期审查用户操作记录
|
||
- 监控异常登录行为
|
||
|
||
---
|
||
|
||
## 新增平台接入流程
|
||
|
||
当需要接入新平台(例如 Lazada)时,按以下步骤操作:
|
||
|
||
### 1. 创建 Exchange
|
||
|
||
```bash
|
||
# 业务 Exchange
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare exchange --name "lazada.exchange" --vhost dataflow-app \
|
||
--type topic --durable true
|
||
|
||
# 错误 Exchange
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare exchange --name "lazada.errors.exchange" --vhost dataflow-app \
|
||
--type topic --durable true
|
||
```
|
||
|
||
### 2. 创建 Binding
|
||
|
||
```bash
|
||
# 绑定到业务队列
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "lazada.exchange" --destination "orders.queue" \
|
||
--destination-type queue --vhost dataflow-app --routing-key "order.lazada"
|
||
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "lazada.exchange" --destination "products.queue" \
|
||
--destination-type queue --vhost dataflow-app --routing-key "product.lazada"
|
||
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "lazada.exchange" --destination "refunds.queue" \
|
||
--destination-type queue --vhost dataflow-app --routing-key "refund.lazada"
|
||
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "lazada.exchange" --destination "inventory.queue" \
|
||
--destination-type queue --vhost dataflow-app --routing-key "inventory.lazada"
|
||
|
||
# 绑定到错误队列
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare binding --source "lazada.errors.exchange" --destination "errors.queue" \
|
||
--destination-type queue --vhost dataflow-app --routing-key "#"
|
||
```
|
||
|
||
### 3. 创建用户并配置权限
|
||
|
||
```bash
|
||
# 创建用户
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare user --name "user_lazada" --password "<secure_password>" --tags ""
|
||
|
||
# 配置权限
|
||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||
declare permissions --vhost dataflow-app --user "user_lazada" \
|
||
--configure "^lazada\\.(exchange|errors\\.exchange)$" \
|
||
--write "^lazada\\.(exchange|errors\\.exchange)$" \
|
||
--read "^lazada\\.errors\\..*$"
|
||
```
|
||
|
||
### 4. 提供给平台开发者
|
||
|
||
将以下信息提供给新平台的开发团队:
|
||
|
||
```
|
||
RabbitMQ 连接信息:
|
||
- Host: <rabbitmq-host>
|
||
- Port: 5672
|
||
- VHost: dataflow-app
|
||
- Username: user_lazada
|
||
- Password: <secure_password>
|
||
|
||
发布配置:
|
||
- Exchange: lazada.exchange
|
||
- Routing Keys:
|
||
- order: 发布订单数据
|
||
- product: 发布产品数据
|
||
- refund: 发布退款数据
|
||
- inventory: 发布库存数据
|
||
|
||
错误订阅(可选):
|
||
- Exchange: lazada.errors.exchange
|
||
- 接收本平台的错误消息
|
||
|
||
消息格式规范:
|
||
- 参见「消息格式规范」章节
|
||
```
|
||
|
||
---
|
||
|
||
## 附录
|
||
|
||
### A. 完整资源清单
|
||
|
||
#### VHost
|
||
- **总数**: 1 个
|
||
- `dataflow-app`
|
||
|
||
#### Exchanges
|
||
|
||
**主 Exchange**: 1 个
|
||
- `main.exchange` (Topic) - 统一主交换机,用于路由和重试消息回流
|
||
|
||
**平台业务 Exchanges**: 8 个(每平台 1 个)
|
||
- `douyin.exchange`
|
||
- `jd.exchange`
|
||
- `tmall.exchange`
|
||
- `redbook.exchange`
|
||
- `amazon.exchange`
|
||
- `naver.exchange`
|
||
- `rakuten.exchange`
|
||
- `shopify.exchange`
|
||
|
||
**平台错误 Exchanges**: 8 个(每平台 1 个)
|
||
- `douyin.errors.exchange`
|
||
- `jd.errors.exchange`
|
||
- `tmall.errors.exchange`
|
||
- `redbook.errors.exchange`
|
||
- `amazon.errors.exchange`
|
||
- `naver.errors.exchange`
|
||
- `rakuten.errors.exchange`
|
||
- `shopify.errors.exchange`
|
||
|
||
**死信交换机 (DLX)**: 4 个(每数据类型 1 个)
|
||
- `dlx.orders`
|
||
- `dlx.products`
|
||
- `dlx.refunds`
|
||
- `dlx.inventory`
|
||
|
||
**Exchange 总数**: **21 个** (1 主 + 8 平台业务 + 8 平台错误 + 4 DLX)
|
||
|
||
#### Queues
|
||
|
||
**主业务队列**: 4 个
|
||
- `orders.queue` (配置 DLX: dlx.orders)
|
||
- `products.queue` (配置 DLX: dlx.products)
|
||
- `refunds.queue` (配置 DLX: dlx.refunds)
|
||
- `inventory.queue` (配置 DLX: dlx.inventory)
|
||
|
||
**延迟重试队列**: 4 个
|
||
- `orders.retry.queue` (TTL: 5秒, DLX: main.exchange)
|
||
- `products.retry.queue` (TTL: 5秒, DLX: main.exchange)
|
||
- `refunds.retry.queue` (TTL: 5秒, DLX: main.exchange)
|
||
- `inventory.retry.queue` (TTL: 5秒, DLX: main.exchange)
|
||
|
||
**错误队列**: 1 个
|
||
- `errors.queue` (TTL: 7天)
|
||
|
||
**Queue 总数**: **9 个** (4 主队列 + 4 重试队列 + 1 错误队列)
|
||
|
||
#### Bindings
|
||
|
||
**主 Exchange 绑定**: 4 个
|
||
- `main.exchange` → `orders.queue` (routing_key: `order.#`)
|
||
- `main.exchange` → `products.queue` (routing_key: `product.#`)
|
||
- `main.exchange` → `refunds.queue` (routing_key: `refund.#`)
|
||
- `main.exchange` → `inventory.queue` (routing_key: `inventory.#`)
|
||
|
||
**平台业务 Exchange 绑定**: 32 个
|
||
- 每个平台 Exchange → 4 个业务队列 (8 × 4 = 32)
|
||
- 使用 routing key 格式:`{data_type}.{platform}`
|
||
- 例如:`order.douyin`, `product.tmall`, `refund.jd` 等
|
||
|
||
**平台错误 Exchange 绑定**: 8 个
|
||
- 每个平台错误 Exchange → `errors.queue` (routing_key: `#`)
|
||
|
||
**DLX 绑定**: 8 个
|
||
- 每个 DLX → 对应重试队列 (routing_key: `retry`) - 4 个
|
||
- 每个 DLX → `errors.queue` (routing_key: `error`) - 4 个
|
||
|
||
**Binding 总数**: **52 个** (4 主 + 32 平台业务 + 8 平台错误 + 8 DLX)
|
||
|
||
#### Users
|
||
|
||
**平台开发者账号**: 8 个
|
||
- `user_douyin`, `user_jd`, `user_tmall`, `user_redbook`, `user_amazon`, `user_naver`, `user_rakuten`, `user_shopify`
|
||
- 权限:只能写入自己的业务和错误 Exchange,只能读取自己的错误队列
|
||
|
||
**消费者账号**: 1 个
|
||
- `user_dataflow_consumer`
|
||
- 权限:可以管理所有业务队列和重试队列,可以写入 DLX 和错误 Exchange,可以读取所有业务队列
|
||
|
||
**运维监控账号**: 1 个
|
||
- `user_ops`
|
||
- 权限:可以管理错误相关资源,可以读取错误队列
|
||
|
||
**管理员账号**: 1 个
|
||
- `admin`
|
||
- 权限:完全权限,可以访问管理界面
|
||
|
||
**User 总数**: **11 个** (8 平台 + 1 消费者 + 1 运维 + 1 管理员)
|
||
|
||
#### 架构统计
|
||
|
||
| 资源类型 | 数量 | 说明 |
|
||
|---------|------|------|
|
||
| VHost | 1 | dataflow-app |
|
||
| Exchange | 21 | 1 主 + 8 平台业务 + 8 平台错误 + 4 DLX |
|
||
| Queue | 9 | 4 主 + 4 重试 + 1 错误 |
|
||
| Binding | 52 | 完整消息流转路径 |
|
||
| User | 11 | 多角色权限隔离 |
|
||
|
||
**消息流转路径**:
|
||
```
|
||
生产者 → 平台 Exchange → 主队列 → 消费者
|
||
↓ (失败)
|
||
DLX → 重试队列 (5秒TTL) → 主 Exchange → 主队列
|
||
↓ (超过3次)
|
||
DLX → 错误队列 (人工处理)
|
||
```
|
||
|
||
### B. 参考资料
|
||
|
||
- [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html)
|
||
- [RabbitMQ Management Plugin](https://www.rabbitmq.com/management.html)
|
||
- [Access Control (Authentication, Authorization)](https://www.rabbitmq.com/access-control.html)
|
||
- [rabbitmqadmin 使用指南](https://www.rabbitmq.com/management-cli.html)
|