diff --git a/docs/RabbitMQ.md b/docs/RabbitMQ.md new file mode 100644 index 0000000..e1032ff --- /dev/null +++ b/docs/RabbitMQ.md @@ -0,0 +1,1369 @@ +# RabbitMQ 配置方案 + +## 概述 + +本文档描述 dataflow 应用的 RabbitMQ 消息队列架构设计和配置方法。 + +### 业务背景 + +- **应用角色**:dataflow 是消息消费者,负责接收来自多个电商平台的数据并入库 +- **数据来源**:多个电商平台(DouYin、JD、Tmall、RedBook、Amazon Japan、Naver、Rakuten、Shopify 等) +- **数据类型**:订单(orders)、产品(products)、退款(refunds)、库存(inventory) +- **隔离需求**:不同平台开发者之间的工作空间需要相互隔离,互不影响 + +### 架构特点 + +- **VHost 隔离**:按应用划分 VHost,支持多业务复用 +- **Exchange 隔离**:每个平台独立 Exchange,通过权限控制访问 +- **单队列设计**:每种数据类型一个队列,所有平台共享(orders.queue/products.queue/refunds.queue/inventory.queue),保证严格 FIFO 顺序 +- **独立消费者**:每种数据类型配备独立消费者,采用批处理 + prefetch + 适配器模式,提升吞吐与稳定性 +- **智能重试**:DLX(死信交换机)+ 延迟重试队列 + 错误队列,优雅处理失败消息,不阻塞主队列 +- **消息持久化**:RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API + +--- + +## 设计方案总结 + +### 核心设计理念 + +本方案采用 **Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式**,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。 + +### 关键设计决策 + +#### 1. 为什么使用单队列? + +| 问题 | 传统多队列方案 | 本方案(单队列) | +|------|--------------|----------------| +| **队列数量** | 8平台 × 4数据类型 = 32个队列 | 4个数据类型队列(队列数量减少 87.5%)| +| **FIFO 保证** | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 | +| **运维复杂度** | 需要监控 32 个队列 | 只需监控 4 个主队列 | +| **消费者部署** | 可能需要 32 个消费者实例 | 只需 4 个消费者实例 | + +**原因**:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。 + +#### 2. 为什么使用单消费者? + +**原因**:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。 + +**优势**: +- ✅ 保证严格 FIFO 顺序 +- ✅ 避免消息错投到错误的消费者 +- ✅ 简化部署和运维 + +**如何实现多平台支持?** 通过消费者内部的**适配器模式**,根据消息的 `platform` 字段分发到不同业务逻辑。 + +#### 3. 为什么使用 Prefetch + 批处理? + +**原因**:RabbitMQ 是 **Push 模式**,不提供 Kafka 式的批量拉取 API。 + +**解决方案**: +- 设置 `prefetch_count=100`:RabbitMQ 最多推送 100 条未确认消息 +- 消费者自行缓存消息,达到 `batch_size=50` 或 `timeout=3s` 后批量处理 +- 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性 + +**优势**: +- ✅ 提升数据库写入效率(批量插入) +- ✅ 控制并发和内存压力 +- ✅ 失败消息不影响其他消息(单条 ACK) + +#### 4. 为什么使用 DLX + 延迟重试队列? + +**问题**:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。 + +**传统方案的问题**: +- ❌ `nack(requeue=true)`:消息回队列头部,无限循环 +- ❌ 直接丢弃:数据丢失 +- ❌ 立即重试:打爆 API/DB + +**本方案(DLX + 延迟重试)**: +``` +消费失败 → nack(requeue=false) → 进入 DLX + ↓ + x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列 + ↓ + x-retries >= 3 → 错误队列 (人工处理) +``` + +**优势**: +- ✅ 不会卡 FIFO +- ✅ 不会无限重试 +- ✅ 延迟重试避免打爆 API/DB +- ✅ 错误消息自动隔离 +- ✅ 无需额外 Redis 或手写 retry 逻辑 + +### 方案优势总结 + +| 维度 | 优势 | +|------|------| +| **可靠性** | 消息持久化 + 单条 ACK + 智能重试,避免数据丢失 | +| **性能** | Prefetch + 批处理 + 数据库批量写入,提升吞吐 | +| **顺序性** | 单队列 FIFO 保证严格顺序 | +| **扩展性** | 适配器模式,新增平台只需添加适配器类 | +| **运维** | 队列数量少,监控简单,故障排查容易 | +| **容错** | DLX 智能重试,失败消息不阻塞主队列 | + +### 适用场景 + +✅ **适合**: +- 多平台数据同步(电商、物流、金融等) +- 需要严格 FIFO 顺序 +- 需要高可靠性和高吞吐 +- 平台数量多(> 5个),数据类型固定(< 10个) + +❌ **不适合**: +- 实时性要求极高(< 100ms) +- 需要按平台独立控制消费速率 +- 平台数量少(< 3个),可以使用独立队列 + +--- + +## 架构设计 + +### 1. VHost 设计 + +VHost 按业务应用进行划分,便于资源隔离和权限管理。 + +``` +RabbitMQ 实例 +├── dataflow-app # Dataflow 应用专用 +├── analytics-app # 分析应用(预留) +└── warehouse-app # 数据仓库应用(预留) +``` + +**当前需要创建的 VHost**: +- `dataflow-app`:用于电商数据流转 + +--- + +### 2. Exchange 设计 + +在 `dataflow-app` VHost 中,Exchange 分为两类: + +#### 业务 Exchange(用于数据投递) + +每个平台一个独立的 Topic Exchange: + +| Exchange 名称 | 类型 | 持久化 | 用途 | +|--------------|------|--------|------| +| `douyin.exchange` | Topic | 是 | DouYin(抖音)平台数据入口 | +| `jd.exchange` | Topic | 是 | JD(京东)平台数据入口 | +| `tmall.exchange` | Topic | 是 | Tmall(天猫)平台数据入口 | +| `redbook.exchange` | Topic | 是 | RedBook(小红书)平台数据入口 | +| `amazon.exchange` | Topic | 是 | Amazon Japan 平台数据入口 | +| `naver.exchange` | Topic | 是 | Naver 平台数据入口 | +| `rakuten.exchange` | Topic | 是 | Rakuten(乐天)平台数据入口 | +| `shopify.exchange` | Topic | 是 | Shopify 平台数据入口 | + +**Routing Key 规范**(采用 `{data_type}.{platform}` 格式): + +| 平台 | 订单 | 产品 | 退款 | 库存 | +|------|------|------|------|------| +| DouYin | `order.douyin` | `product.douyin` | `refund.douyin` | `inventory.douyin` | +| JD | `order.jd` | `product.jd` | `refund.jd` | `inventory.jd` | +| Tmall | `order.tmall` | `product.tmall` | `refund.tmall` | `inventory.tmall` | +| RedBook | `order.redbook` | `product.redbook` | `refund.redbook` | `inventory.redbook` | +| Amazon | `order.amazon` | `product.amazon` | `refund.amazon` | `inventory.amazon` | +| Naver | `order.naver` | `product.naver` | `refund.naver` | `inventory.naver` | +| Rakuten | `order.rakuten` | `product.rakuten` | `refund.rakuten` | `inventory.rakuten` | +| Shopify | `order.shopify` | `product.shopify` | `refund.shopify` | `inventory.shopify` | + +**设计说明**: +- 使用 Topic Exchange 的通配符特性,队列可通过 `order.#` 匹配所有平台的订单 +- 保证单队列内所有平台消息的 FIFO 顺序 +- 消费者通过解析 routing key 或消息体的 platform 字段进行业务分发 + +#### 错误 Exchange(用于错误通知) + +每个平台一个独立的错误 Topic Exchange: + +| Exchange 名称 | 类型 | 持久化 | 用途 | +|--------------|------|--------|------| +| `douyin.errors.exchange` | Topic | 是 | DouYin 平台错误消息 | +| `jd.errors.exchange` | Topic | 是 | JD 平台错误消息 | +| `tmall.errors.exchange` | Topic | 是 | Tmall 平台错误消息 | +| `redbook.errors.exchange` | Topic | 是 | RedBook 平台错误消息 | +| `amazon.errors.exchange` | Topic | 是 | Amazon Japan 平台错误消息 | +| `naver.errors.exchange` | Topic | 是 | Naver 平台错误消息 | +| `rakuten.errors.exchange` | Topic | 是 | Rakuten 平台错误消息 | +| `shopify.errors.exchange` | Topic | 是 | Shopify 平台错误消息 | + +**Routing Key 规范**: +- `validation` - 数据格式验证错误 +- `processing_failed` - 业务处理失败 +- `system` - 系统错误 + +--- + +### 3. Queue 设计 + +#### 业务队列(主队列) + +物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据: + +| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | DLX | 用途 | +|-----------|--------|----------|----------|-----|------| +| `orders.queue` | 是 | 否 | 24小时 | `dlx.orders` | 接收所有平台的订单数据,单消费者批处理 | +| `products.queue` | 是 | 否 | 24小时 | `dlx.products` | 接收所有平台的产品数据,单消费者批处理 | +| `refunds.queue` | 是 | 否 | 24小时 | `dlx.refunds` | 接收所有平台的退款数据,单消费者批处理 | +| `inventory.queue` | 是 | 否 | 24小时 | `dlx.inventory` | 接收所有平台的库存数据,单消费者批处理 | + +**队列参数配置**: +```json +{ + "x-message-ttl": 86400000, + "x-dead-letter-exchange": "dlx.orders", + "x-dead-letter-routing-key": "retry" +} +``` + +#### 死信交换机(DLX) + +每种数据类型配备一个 DLX,用于处理失败消息的路由: + +| Exchange 名称 | 类型 | 持久化 | 用途 | +|--------------|------|--------|------| +| `dlx.orders` | Topic | 是 | 订单失败消息路由 | +| `dlx.products` | Topic | 是 | 产品失败消息路由 | +| `dlx.refunds` | Topic | 是 | 退款失败消息路由 | +| `dlx.inventory` | Topic | 是 | 库存失败消息路由 | + +#### 延迟重试队列 + +为每种数据类型提供延迟重试机制: + +| Queue 名称 | 持久化 | 消息 TTL | DLX | 用途 | +|-----------|--------|----------|-----|------| +| `orders.retry.queue` | 是 | 5秒 | `main.exchange` | 订单延迟重试(TTL后自动回到主队列)| +| `products.retry.queue` | 是 | 5秒 | `main.exchange` | 产品延迟重试 | +| `refunds.retry.queue` | 是 | 5秒 | `main.exchange` | 退款延迟重试 | +| `inventory.retry.queue` | 是 | 5秒 | `main.exchange` | 库存延迟重试 | + +**重试队列参数配置**: +```json +{ + "x-message-ttl": 5000, + "x-dead-letter-exchange": "main.exchange", + "x-dead-letter-routing-key": "order.{platform}" +} +``` + +**重试机制说明**: +1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX +2. DLX 根据消息 header 中的 `x-retries` 判断路由: + - `x-retries < 3`:路由到 `retry.queue`(延迟重试) + - `x-retries >= 3`:路由到 `errors.queue`(永久失败) +3. 重试队列 TTL 到期后,消息自动死信回主 Exchange,重新进入主队列 +4. 最大重试次数:**3次**,延迟时间:**5秒** + +#### 错误队列 + +统一的错误队列,接收所有超过重试次数的失败消息: + +| Queue 名称 | 持久化 | 自动删除 | 消息 TTL | 用途 | +|-----------|--------|----------|----------|------| +| `errors.queue` | 是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 | + +--- + +### 4. Binding 配置 + +#### 主 Exchange (用于接收各平台数据) + +需要创建一个统一的主 Exchange,用于重试队列的消息回流: + +| Exchange 名称 | 类型 | 持久化 | 用途 | +|--------------|------|--------|------| +| `main.exchange` | Topic | 是 | 统一主交换机,用于路由和重试消息回流 | + +#### 业务队列绑定(主队列) + +**方案1:直接绑定(推荐)** +每个平台的 Exchange 使用带平台标识的 routing key 绑定到对应数据类型队列: + +``` +douyin.exchange +├── [routing_key: order.douyin] → orders.queue +├── [routing_key: product.douyin] → products.queue +├── [routing_key: refund.douyin] → refunds.queue +└── [routing_key: inventory.douyin] → inventory.queue + +tmall.exchange +├── [routing_key: order.tmall] → orders.queue +├── [routing_key: product.tmall] → products.queue +├── [routing_key: refund.tmall] → refunds.queue +└── [routing_key: inventory.tmall] → inventory.queue + +... (其他平台类似) +``` + +**方案2:通过主 Exchange(备选)** +平台 Exchange 先发到主 Exchange,再由主 Exchange 统一路由: + +``` +douyin.exchange → [routing_key: order.douyin] → main.exchange + +main.exchange +├── [routing_key: order.#] → orders.queue +├── [routing_key: product.#] → products.queue +├── [routing_key: refund.#] → refunds.queue +└── [routing_key: inventory.#] → inventory.queue +``` + +**推荐使用方案1**,简化架构,减少消息跳转。 + +#### DLX 绑定(重试路由) + +每个 DLX 根据消息 header 中的 `x-retries` 决定路由目标: + +``` +dlx.orders +├── [routing_key: retry, x-retries < 3] → orders.retry.queue +└── [routing_key: error, x-retries >= 3] → errors.queue + +dlx.products +├── [routing_key: retry] → products.retry.queue +└── [routing_key: error] → errors.queue + +dlx.refunds +├── [routing_key: retry] → refunds.retry.queue +└── [routing_key: error] → errors.queue + +dlx.inventory +├── [routing_key: retry] → inventory.retry.queue +└── [routing_key: error] → errors.queue +``` + +**注意**:RabbitMQ 原生不支持基于 header 的条件路由,需要: +- **方案A**:消费者在 `nack` 时根据重试次数决定发送到 DLX 的 routing key(`retry` 或 `error`) +- **方案B**:使用 RabbitMQ 插件(如 `rabbitmq_message_routing_by_headers`)实现条件路由 + +**推荐方案A**:消费者控制重试逻辑,更灵活可控。 + +#### 重试队列绑定(回流主队列) + +重试队列 TTL 到期后,消息自动死信回主 Exchange: + +``` +orders.retry.queue +└── [TTL=5s后 死信到 main.exchange,routing_key保持原始值:order.{platform}] + +main.exchange +└── [routing_key: order.#] → orders.queue +``` + +这样实现了"失败 → 延迟5秒 → 重新进入主队列"的循环。 + +#### 错误队列绑定 + +每个平台的错误 Exchange 和所有 DLX 都绑定到统一错误队列: + +``` +douyin.errors.exchange +└── [routing_key: #] → errors.queue + +dlx.orders +└── [routing_key: error] → errors.queue + +dlx.products +└── [routing_key: error] → errors.queue + +... (其他类型类似) +``` + +**说明**:使用 `#` 通配符接收所有错误类型。 + +--- + +## 消费者设计 + +### 1. 消费者模型 + +采用 **单消费者 + 批处理 + prefetch + 适配器模式**: + +| 数据类型 | 消费者 | Queue | 并发模式 | 处理模式 | +|---------|--------|-------|----------|---------| +| 订单 | Order Consumer | `orders.queue` | 单消费者 | 批处理 + 适配器 | +| 产品 | Product Consumer | `products.queue` | 单消费者 | 批处理 + 适配器 | +| 退款 | Refund Consumer | `refunds.queue` | 单消费者 | 批处理 + 适配器 | +| 库存 | Inventory Consumer | `inventory.queue` | 单消费者 | 批处理 + 适配器 | + +**设计原则**: +- ✅ **单消费者**:每个队列只有一个消费者实例,保证严格 FIFO 顺序 +- ✅ **批处理**:积累一批消息后统一处理,提升数据库写入效率 +- ✅ **单条 ACK**:每条消息独立确认,失败不影响其他消息 +- ✅ **适配器模式**:消费者内部根据平台字段分发到不同业务逻辑 + +### 2. Prefetch 配置 + +```json +{ + "prefetch_count": 100, + "batch_size": 50, + "batch_timeout": 3000 +} +``` + +**参数说明**: +- `prefetch_count=100`:RabbitMQ 最多推送 100 条未确认消息到消费者 +- `batch_size=50`:消费者积累 50 条消息后触发批处理 +- `batch_timeout=3000`:如果3秒内未达到 batch_size,也触发处理 + +**工作流程**: +1. RabbitMQ 按 Push 模式推送最多 100 条消息到消费者 +2. 消费者缓存消息,达到 50 条或超时 3 秒后批量处理 +3. 批量处理时,仍然对每条消息单独 ACK/NACK +4. 实现"伪批量拉取"效果,同时保留单条确认的灵活性 + +### 3. 消费者处理逻辑(伪代码) + +```python +class OrderConsumer: + def __init__(self): + self.prefetch_count = 100 + self.batch_size = 50 + self.batch_timeout = 3 + self.message_buffer = [] + self.adapters = { + 'douyin': DouyinAdapter(), + 'tmall': TmallAdapter(), + 'jd': JDAdapter(), + # ... 其他平台适配器 + } + + def on_message(self, channel, method, properties, body): + """接收到消息时的回调""" + message = json.loads(body) + self.message_buffer.append({ + 'channel': channel, + 'method': method, + 'properties': properties, + 'body': message + }) + + # 达到批量大小,触发处理 + if len(self.message_buffer) >= self.batch_size: + self.process_batch() + + def process_batch(self): + """批量处理消息""" + for msg_data in self.message_buffer: + try: + message = msg_data['body'] + platform = message['platform'] + + # 使用适配器模式分发到对应平台逻辑 + adapter = self.adapters.get(platform) + if not adapter: + raise ValueError(f"Unknown platform: {platform}") + + # 处理业务逻辑 + adapter.process(message) + + # 成功:ACK + msg_data['channel'].basic_ack(msg_data['method'].delivery_tag) + + except TemporaryError as e: + # 临时错误:重试 + retry_count = msg_data['properties'].headers.get('x-retries', 0) + + if retry_count < 3: + # 更新重试次数,发送到 DLX 进行延迟重试 + headers = {'x-retries': retry_count + 1} + msg_data['channel'].basic_nack( + msg_data['method'].delivery_tag, + requeue=False + ) + else: + # 超过重试次数,发送到错误队列 + self.send_to_error_queue(msg_data) + msg_data['channel'].basic_nack( + msg_data['method'].delivery_tag, + requeue=False + ) + + except PermanentError as e: + # 永久错误:直接进入错误队列 + self.send_to_error_queue(msg_data) + msg_data['channel'].basic_nack( + msg_data['method'].delivery_tag, + requeue=False + ) + + # 清空缓冲区 + self.message_buffer.clear() + + def send_to_error_queue(self, msg_data): + """发送到错误队列""" + error_message = { + 'original_message': msg_data['body'], + 'error': str(e), + 'timestamp': datetime.now().isoformat() + } + # 发送到错误 Exchange + self.channel.basic_publish( + exchange='douyin.errors.exchange', + routing_key='error', + body=json.dumps(error_message) + ) +``` + +### 4. 重试机制工作流程 + +``` +┌─────────────────┐ +│ orders.queue │ ← 消费者从这里接收消息 +└────────┬────────┘ + │ + ├─ 成功 → basic_ack() + │ + └─ 失败 → basic_nack(requeue=false) + │ + ▼ + ┌──────────────────┐ + │ dlx.orders │ ← 死信交换机 + └────────┬─────────┘ + │ + ├─ x-retries < 3 → orders.retry.queue (TTL=5s) + │ │ + │ └─ TTL到期 → 自动死信回 main.exchange + │ │ + │ └─ order.# → orders.queue + │ + └─ x-retries >= 3 → errors.queue (人工处理) +``` + +### 5. 为什么这样设计? + +| 特性 | 原因 | 优势 | +|------|------|------| +| **单队列 FIFO** | RabbitMQ 无法在队列内部按 topic 过滤 | 保证严格 FIFO,不拆分队列造成业务乱序 | +| **单消费者** | RabbitMQ 同队列多消费者必然 Round-Robin 分配 | 避免消息错投到错误的消费者 | +| **Prefetch + 批处理** | RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 | 实现"伪批量拉取",提升吞吐 | +| **单条 ACK** | 保留细粒度控制 | 失败消息不影响其他消息 | +| **DLX + 延迟重试** | 避免失败消息阻塞队列 | 不会卡 FIFO,不会无限重试 | +| **适配器模式** | 多平台业务逻辑隔离 | 扩展性好,易维护 | + +--- + +## 用户权限配置 + +### 1. 平台开发者账号 + +每个平台分配一个专用账号,只能访问自己的 Exchange。 + +#### DouYin 平台账号 + +``` +用户名: user_douyin +密码: <安全密码> +VHost: dataflow-app + +权限配置: +- Configure: (留空) - 不允许创建/删除资源 +- Write: ^douyin\.(exchange|errors\.exchange)$ - 只能写入 DouYin 的业务和错误 Exchange +- Read: ^douyin\.errors\..*$ - 只能读取 DouYin 错误相关的队列 +``` + +#### Tmall 平台账号 + +``` +用户名: user_tmall +密码: <安全密码> +VHost: dataflow-app + +权限配置: +- Configure: (留空) +- Write: ^tmall\.(exchange|errors\.exchange)$ +- Read: ^tmall\.errors\..*$ +``` + +#### 其他平台账号 + +按相同模式创建: +- JD: `user_jd` +- RedBook: `user_redbook` +- Amazon Japan: `user_amazon` +- Naver: `user_naver` +- Rakuten: `user_rakuten` +- Shopify: `user_shopify` + +### 2. Dataflow 消费者账号 + +Dataflow 应用的后端服务账号,负责消费业务队列并处理错误。 + +``` +用户名: user_dataflow_consumer +密码: <安全密码> +VHost: dataflow-app + +权限配置: +- Configure: ^(orders|products|refunds|inventory)\.queue$ - 可以管理业务队列 +- Write: .*\.errors\.exchange$ - 可以向所有错误 Exchange 写入消息 +- Read: ^(orders|products|refunds|inventory)\.queue$ - 可以读取所有业务队列 +``` + +### 3. 运维监控账号 + +运维团队账号,用于监控所有错误消息。 + +``` +用户名: user_ops +密码: <安全密码> +VHost: dataflow-app + +权限配置: +- Configure: ^errors\..*$ - 可以管理错误相关资源 +- Write: (留空) - 不需要写入权限 +- Read: ^errors\.queue$ - 可以读取统一错误队列 +``` + +### 4. 管理员账号 + +RabbitMQ 超级管理员,用于配置和维护。 + +``` +用户名: admin +密码: <强密码> +Tag: administrator + +权限配置: +- 对所有 VHost 具有完全权限 +- 可以访问管理界面 +``` + +--- + +## 消息格式规范 + +### 1. message_id 设计 + +message_id 采用结构化格式,确保幂等性和可追溯性。 + +**格式**: +``` +{prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id} +``` + +**字段说明**: +- `prefix`:项目名称前缀(如 `wpic-project1`) +- `app_id`:应用标识(固定为 `dataflow`) +- `company_id`:公司 ID +- `platform_id`:平台 ID +- `store_id`:店铺 ID +- `entity_type`:数据实体类型(order/product/refund/inventory) +- `entity_id`:平台侧唯一标识(如订单号) + +**分隔符说明**: +- 使用 `#` 作为分隔符(RabbitMQ message_id 字段支持任意字符) +- `#` 在 routing key 中是通配符,但在 message_id 中是普通字符 + +**示例**: +``` +wpic-project1#dataflow#100#20#200#order#DY123456789 + +解析: +- 项目前缀: wpic-project1 +- 应用: dataflow +- 公司ID: 100 +- 平台ID: 20 (DouYin) +- 店铺ID: 200 +- 实体类型: order +- 平台订单号: DY123456789 +``` + +**特性**: +- ✅ 幂等性:同一实体的 message_id 完全一致 +- ✅ 可读性:直接看出数据来源 +- ✅ 便于调试:可从 message_id 提取元数据 +- ✅ 无冲突:全局唯一 + +### 2. 业务消息格式 + +```json +{ + "message_id": "wpic-project1#dataflow#100#20#200#order#DY123456", + "timestamp": "2025-01-14T10:30:00Z", + "platform": "douyin", + "data_type": "order", + + "metadata": { + "platform_id": 20, + "company_id": 100, + "store_id": 200, + "source_system": "douyin-open-api", + "retry_count": 0, + "data_version": 1736840000 + }, + + "data": { + "platform_unique_id": "DY123456", + "raw_data": { + // 平台原始完整数据 + } + } +} +``` + +**字段说明**: + +| 字段 | 类型 | 必需 | 说明 | +|------|------|------|------| +| `message_id` | string | 是 | 结构化消息 ID,用于幂等性 | +| `timestamp` | string | 是 | 消息生成时间(ISO 8601 格式) | +| `platform` | string | 是 | 平台标识(douyin/tmall/jd...) | +| `data_type` | string | 是 | 数据类型(order/product/refund/inventory) | +| `metadata.platform_id` | int | 是 | 平台 ID(数据库主键) | +| `metadata.company_id` | int | 是 | 公司 ID | +| `metadata.store_id` | int | 是 | 店铺 ID | +| `metadata.source_system` | string | 是 | 数据来源系统 | +| `metadata.retry_count` | int | 是 | 重试次数(初始为 0) | +| `metadata.data_version` | int | 是 | 数据版本(Unix 时间戳,用于防止乱序更新) | +| `data.platform_unique_id` | string | 是 | 平台侧唯一标识 | +| `data.raw_data` | object | 是 | 平台原始完整 JSON 数据 | + +**重要说明**: + +- **data_version**:必须使用平台数据的更新时间戳(Unix 时间戳),用于解决消息乱序消费问题 + - 示例:平台订单的最后更新时间 `1736840000`(2025-01-14 10:00:00 UTC) + - 消费端只有当 `data_version` 大于数据库中的值时才更新数据 + - 如果平台未提供更新时间,使用消息生成时间戳 + +### 3. 错误消息格式 + +错误消息由 dataflow 消费者生成,包含原始消息和错误详情。 + +```json +{ + "error_id": "err_1234567890abcdef", + "original_message": { + // 原始业务消息完整内容 + }, + "error": { + "type": "validation", + "message": "Missing required field: total_amount", + "trace": "Exception stack trace...", + "timestamp": "2025-01-14T10:31:00Z" + }, + "metadata": { + "platform": "amazon", + "platform_id": 1, + "company_id": 100, + "store_id": 200, + "data_type": "order", + "failed_at": "2025-01-14T10:31:00Z" + } +} +``` + +--- + +## RabbitMQ 管理界面操作指引 + +### 1. 访问管理界面 + +``` +URL: http://:15672 +用户名: admin +密码: <管理员密码> +``` + +### 2. 创建 VHost + +1. 点击顶部导航 **Admin** → **Virtual Hosts** +2. 点击 **Add a new virtual host** +3. 输入 VHost 名称:`dataflow-app` +4. 点击 **Add virtual host** + +### 3. 创建用户 + +1. 点击顶部导航 **Admin** → **Users** +2. 点击 **Add a user** +3. 填写用户信息: + - **Username**: `user_amazon` + - **Password**: `<安全密码>` + - **Tags**: (留空,非管理员用户) +4. 点击 **Add user** + +### 4. 配置用户权限 + +1. 在用户列表中,点击用户名(如 `user_amazon`) +2. 滚动到 **Permissions** 部分 +3. 选择 VHost:`dataflow-app` +4. 配置权限正则表达式: + - **Configure regexp**: (留空) + - **Write regexp**: `^amazon\.(exchange|errors\.exchange)$` + - **Read regexp**: `^amazon\.errors\..*$` +5. 点击 **Set permission** + +### 5. 创建 Exchange + +1. 点击顶部导航 **Exchanges** +2. 确保选择正确的 VHost:`dataflow-app` +3. 点击 **Add a new exchange** +4. 填写配置: + - **Name**: `amazon.exchange` + - **Type**: `topic` + - **Durability**: `Durable` + - **Auto delete**: `No` + - **Internal**: `No` +5. 点击 **Add exchange** + +重复以上步骤创建所有业务 Exchange 和错误 Exchange。 + +### 6. 创建 Queue + +1. 点击顶部导航 **Queues** +2. 确保选择正确的 VHost:`dataflow-app` +3. 点击 **Add a new queue** +4. 填写配置: + - **Name**: `orders.queue` + - **Durability**: `Durable` + - **Auto delete**: `No` + - **Arguments**: + - `x-message-ttl`: `86400000` (24小时,单位:毫秒) +5. 点击 **Add queue** + +重复以上步骤创建 `products.queue`、`refunds.queue` 和 `errors.queue`。 + +**注意**:`errors.queue` 的 TTL 设置为 `604800000`(7天)。 + +### 7. 创建 Binding + +1. 点击顶部导航 **Exchanges** +2. 点击需要绑定的 Exchange(如 `amazon.exchange`) +3. 滚动到 **Bindings** 部分 +4. 在 **Add binding from this exchange** 区域填写: + - **To queue**: `orders.queue` + - **Routing key**: `order` +5. 点击 **Bind** + +为每个 Exchange 创建 3 个绑定(order/product/refund)。 + +对于错误 Exchange,绑定配置: +- **To queue**: `errors.queue` +- **Routing key**: `#` + +--- + +## 配置脚本 + +为简化配置过程,可以使用以下脚本自动创建资源。 + +### 使用 rabbitmqadmin + +RabbitMQ 提供命令行工具 `rabbitmqadmin`,可用于批量配置。 + +#### 安装 rabbitmqadmin + +```bash +# 从 RabbitMQ 管理界面下载 +wget http://:15672/cli/rabbitmqadmin +chmod +x rabbitmqadmin +sudo mv rabbitmqadmin /usr/local/bin/ +``` + +#### 配置脚本示例 + +```bash +#!/bin/bash + +# RabbitMQ 连接信息 +RABBITMQ_HOST="localhost:15672" +RABBITMQ_USER="admin" +RABBITMQ_PASS="admin_password" +VHOST="dataflow-app" + +# 平台列表 +PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify") +DATA_TYPES=("orders" "products" "refunds" "inventory") + +# 1. 创建 VHost +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare vhost name=$VHOST + +# 2. 创建主 Exchange(用于重试消息回流) +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange name="main.exchange" vhost=$VHOST \ + type=topic durable=true + +# 3. 创建 DLX (死信交换机) +for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange name="dlx.${dtype}" vhost=$VHOST \ + type=topic durable=true +done + +# 4. 创建主业务队列(带 DLX 配置) +for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue name="${dtype}.queue" vhost=$VHOST durable=true \ + arguments="{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" +done + +# 5. 创建重试队列(带 TTL 和 DLX 回流配置) +for dtype in "${DATA_TYPES[@]}"; do + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue name="${dtype}.retry.queue" vhost=$VHOST durable=true \ + arguments="{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}" +done + +# 6. 创建错误队列 +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare queue name="errors.queue" vhost=$VHOST durable=true \ + arguments='{"x-message-ttl":604800000}' + +# 7. 绑定主 Exchange 到主队列(使用通配符) +for dtype in "${DATA_TYPES[@]}"; do + # 使用 order.#, product.# 等通配符匹配所有平台 + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="main.exchange" destination="${dtype}.queue" \ + vhost=$VHOST routing_key="${dtype%s}.#" +done + +# 8. 绑定 DLX 到重试队列和错误队列 +for dtype in "${DATA_TYPES[@]}"; do + # DLX -> 重试队列 + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="dlx.${dtype}" destination="${dtype}.retry.queue" \ + vhost=$VHOST routing_key="retry" + + # DLX -> 错误队列 + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="dlx.${dtype}" destination="errors.queue" \ + vhost=$VHOST routing_key="error" +done + +# 9. 为每个平台创建 Exchange 和 Binding +for platform in "${PLATFORMS[@]}"; do + # 创建平台业务 Exchange + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange name="${platform}.exchange" vhost=$VHOST \ + type=topic durable=true + + # 绑定到业务队列(使用新的 routing key 格式:data_type.platform) + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="${platform}.exchange" destination="orders.queue" \ + vhost=$VHOST routing_key="order.${platform}" + + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="${platform}.exchange" destination="products.queue" \ + vhost=$VHOST routing_key="product.${platform}" + + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="${platform}.exchange" destination="refunds.queue" \ + vhost=$VHOST routing_key="refund.${platform}" + + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="${platform}.exchange" destination="inventory.queue" \ + vhost=$VHOST routing_key="inventory.${platform}" + + # 创建平台错误 Exchange + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange name="${platform}.errors.exchange" vhost=$VHOST \ + type=topic durable=true + + # 绑定到错误队列 + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="${platform}.errors.exchange" destination="errors.queue" \ + vhost=$VHOST routing_key="#" + + # 创建平台用户 + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user name="user_${platform}" password="change_me_${platform}" tags="" + + # 配置平台用户权限 + rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permission vhost=$VHOST user="user_${platform}" \ + configure="" write="^${platform}\.(exchange|errors\.exchange)$" \ + read="^${platform}\.errors\..*$" +done + +# 10. 创建 dataflow 消费者用户 +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user name="user_dataflow_consumer" password="change_me_consumer" tags="" + +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permission vhost=$VHOST user="user_dataflow_consumer" \ + configure="^(orders|products|refunds|inventory).*\.queue$" \ + write="(dlx\..*)|(.*\.errors\.exchange)$" \ + read="^(orders|products|refunds|inventory).*\.queue$" + +# 11. 创建运维监控用户 +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user name="user_ops" password="change_me_ops" tags="" + +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permission vhost=$VHOST user="user_ops" \ + configure="^errors\..*$" write="" read="^errors\.queue$" + +echo "RabbitMQ 配置完成!" +echo "" +echo "已创建:" +echo "- 1 个 VHost: dataflow-app" +echo "- 1 个主 Exchange: main.exchange" +echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" +echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" +echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue" +echo "- 1 个错误队列: errors.queue" +echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)" +echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)" +``` + +#### 执行脚本 + +```bash +chmod +x setup_rabbitmq.sh +./setup_rabbitmq.sh +``` + +--- + +## 监控和维护 + +### 1. 监控指标 + +通过 RabbitMQ 管理界面查看以下关键指标: + +#### 队列健康状态 + +1. 点击 **Queues** → 选择队列 +2. 关注以下指标: + - **Ready**: 待消费消息数(正常应 < 1000) + - **Unacked**: 未确认消息数(正常应较低) + - **Total**: 总消息数 + - **Incoming rate**: 消息入队速率 + - **Deliver / Get rate**: 消息消费速率 + +**告警阈值**: +- Ready > 10000:队列堆积严重 +- Unacked > 5000:消费者处理缓慢 +- Incoming rate >> Deliver rate:消费速度跟不上生产速度 + +#### Exchange 流量统计 + +1. 点击 **Exchanges** → 选择 Exchange +2. 查看 **Message rates**: + - **Publish in**: 消息发布速率 + - **Publish out**: 消息路由到队列的速率 + +**异常情况**: +- Publish in > 0 但 Publish out = 0:可能绑定配置错误 + +### 2. 常见问题排查 + +#### 消息未到达队列 + +**检查步骤**: +1. 确认 Exchange 存在且类型正确 +2. 检查 Binding 配置,确保 routing key 匹配 +3. 查看 Exchange 的 **Message rates**,确认消息已发布 +4. 检查生产者是否使用了正确的 routing key + +#### 消费者无法消费 + +**检查步骤**: +1. 确认用户权限配置正确(Read 权限) +2. 检查队列中是否有消息(Ready > 0) +3. 查看消费者连接状态(Connections 页面) +4. 检查消费者代码是否正确 ACK 消息 + +#### 错误消息未送达平台开发者 + +**检查步骤**: +1. 确认错误 Exchange 绑定到 `errors.queue` +2. 检查平台开发者账号的 Read 权限 +3. 确认平台开发者订阅了正确的 Exchange +4. 查看 `errors.queue` 中是否有消息 + +### 3. 备份和恢复 + +#### 导出配置 + +```bash +# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions) +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + export dataflow-backup.json +``` + +#### 导入配置 + +```bash +# 从备份恢复配置 +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + import dataflow-backup.json +``` + +### 4. 性能优化 + +#### 消费者并发配置 + +- 增加消费者实例数量 +- 每个消费者配置多个 channel(如 5-10 个) +- 使用 prefetch_count 限制未确认消息数量 + +#### 队列性能优化 + +- 启用 lazy queue(大量消息堆积时): + ```bash + rabbitmqadmin declare queue name=orders.queue vhost=dataflow-app \ + durable=true arguments='{"x-queue-mode":"lazy"}' + ``` +- 定期清理过期消息(通过 TTL 自动实现) + +### 5. 日志查看 + +```bash +# 查看 RabbitMQ 日志 +tail -f /var/log/rabbitmq/rabbit@.log + +# 查看连接日志 +tail -f /var/log/rabbitmq/rabbit@_connection.log +``` + +--- + +## 安全建议 + +### 1. 密码策略 + +- 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符) +- 定期更换密码(建议 3-6 个月) +- 不同环境(开发/测试/生产)使用不同密码 + +### 2. 网络安全 + +- 生产环境启用 TLS/SSL 加密连接 +- 限制管理界面访问(仅允许内网或 VPN 访问) +- 配置防火墙规则: + - 5672:AMQP 端口(仅允许应用服务器访问) + - 15672:管理界面(仅允许管理员 IP 访问) + +### 3. 权限最小化原则 + +- 平台开发者只能访问自己的 Exchange +- 消费者只能读取必要的队列 +- 避免赋予不必要的 Configure 权限 + +### 4. 审计日志 + +- 启用 RabbitMQ 审计日志插件 +- 定期审查用户操作记录 +- 监控异常登录行为 + +--- + +## 新增平台接入流程 + +当需要接入新平台(例如 Lazada)时,按以下步骤操作: + +### 1. 创建 Exchange + +```bash +# 业务 Exchange +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange name="lazada.exchange" vhost=dataflow-app \ + type=topic durable=true + +# 错误 Exchange +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange name="lazada.errors.exchange" vhost=dataflow-app \ + type=topic durable=true +``` + +### 2. 创建 Binding + +```bash +# 绑定到业务队列 +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="lazada.exchange" destination="orders.queue" \ + vhost=dataflow-app routing_key="order" + +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="lazada.exchange" destination="products.queue" \ + vhost=dataflow-app routing_key="product" + +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="lazada.exchange" destination="refunds.queue" \ + vhost=dataflow-app routing_key="refund" + +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="lazada.exchange" destination="inventory.queue" \ + vhost=dataflow-app routing_key="inventory" + +# 绑定到错误队列 +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding source="lazada.errors.exchange" \ + destination="errors.queue" vhost=dataflow-app routing_key="#" +``` + +### 3. 创建用户并配置权限 + +```bash +# 创建用户 +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare user name="user_lazada" password="" tags="" + +# 配置权限 +rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare permission vhost=dataflow-app user="user_lazada" \ + configure="" write="^lazada\.(exchange|errors\.exchange)$" \ + read="^lazada\.errors\..*$" +``` + +### 4. 提供给平台开发者 + +将以下信息提供给新平台的开发团队: + +``` +RabbitMQ 连接信息: +- Host: +- Port: 5672 +- VHost: dataflow-app +- Username: user_lazada +- Password: + +发布配置: +- Exchange: lazada.exchange +- Routing Keys: + - order: 发布订单数据 + - product: 发布产品数据 + - refund: 发布退款数据 + - inventory: 发布库存数据 + +错误订阅(可选): +- Exchange: lazada.errors.exchange +- 接收本平台的错误消息 + +消息格式规范: +- 参见「消息格式规范」章节 +``` + +--- + +## 附录 + +### A. 完整资源清单 + +#### VHost +- **总数**: 1 个 +- `dataflow-app` + +#### Exchanges + +**主 Exchange**: 1 个 +- `main.exchange` (Topic) - 统一主交换机,用于路由和重试消息回流 + +**平台业务 Exchanges**: 8 个(每平台 1 个) +- `douyin.exchange` +- `jd.exchange` +- `tmall.exchange` +- `redbook.exchange` +- `amazon.exchange` +- `naver.exchange` +- `rakuten.exchange` +- `shopify.exchange` + +**平台错误 Exchanges**: 8 个(每平台 1 个) +- `douyin.errors.exchange` +- `jd.errors.exchange` +- `tmall.errors.exchange` +- `redbook.errors.exchange` +- `amazon.errors.exchange` +- `naver.errors.exchange` +- `rakuten.errors.exchange` +- `shopify.errors.exchange` + +**死信交换机 (DLX)**: 4 个(每数据类型 1 个) +- `dlx.orders` +- `dlx.products` +- `dlx.refunds` +- `dlx.inventory` + +**Exchange 总数**: **21 个** (1 主 + 8 平台业务 + 8 平台错误 + 4 DLX) + +#### Queues + +**主业务队列**: 4 个 +- `orders.queue` (配置 DLX: dlx.orders) +- `products.queue` (配置 DLX: dlx.products) +- `refunds.queue` (配置 DLX: dlx.refunds) +- `inventory.queue` (配置 DLX: dlx.inventory) + +**延迟重试队列**: 4 个 +- `orders.retry.queue` (TTL: 5秒, DLX: main.exchange) +- `products.retry.queue` (TTL: 5秒, DLX: main.exchange) +- `refunds.retry.queue` (TTL: 5秒, DLX: main.exchange) +- `inventory.retry.queue` (TTL: 5秒, DLX: main.exchange) + +**错误队列**: 1 个 +- `errors.queue` (TTL: 7天) + +**Queue 总数**: **9 个** (4 主队列 + 4 重试队列 + 1 错误队列) + +#### Bindings + +**主 Exchange 绑定**: 4 个 +- `main.exchange` → `orders.queue` (routing_key: `order.#`) +- `main.exchange` → `products.queue` (routing_key: `product.#`) +- `main.exchange` → `refunds.queue` (routing_key: `refund.#`) +- `main.exchange` → `inventory.queue` (routing_key: `inventory.#`) + +**平台业务 Exchange 绑定**: 32 个 +- 每个平台 Exchange → 4 个业务队列 (8 × 4 = 32) +- 使用 routing key 格式:`{data_type}.{platform}` + - 例如:`order.douyin`, `product.tmall`, `refund.jd` 等 + +**平台错误 Exchange 绑定**: 8 个 +- 每个平台错误 Exchange → `errors.queue` (routing_key: `#`) + +**DLX 绑定**: 8 个 +- 每个 DLX → 对应重试队列 (routing_key: `retry`) - 4 个 +- 每个 DLX → `errors.queue` (routing_key: `error`) - 4 个 + +**Binding 总数**: **52 个** (4 主 + 32 平台业务 + 8 平台错误 + 8 DLX) + +#### Users + +**平台开发者账号**: 8 个 +- `user_douyin`, `user_jd`, `user_tmall`, `user_redbook`, `user_amazon`, `user_naver`, `user_rakuten`, `user_shopify` +- 权限:只能写入自己的业务和错误 Exchange,只能读取自己的错误队列 + +**消费者账号**: 1 个 +- `user_dataflow_consumer` +- 权限:可以管理所有业务队列和重试队列,可以写入 DLX 和错误 Exchange,可以读取所有业务队列 + +**运维监控账号**: 1 个 +- `user_ops` +- 权限:可以管理错误相关资源,可以读取错误队列 + +**管理员账号**: 1 个 +- `admin` +- 权限:完全权限,可以访问管理界面 + +**User 总数**: **11 个** (8 平台 + 1 消费者 + 1 运维 + 1 管理员) + +#### 架构统计 + +| 资源类型 | 数量 | 说明 | +|---------|------|------| +| VHost | 1 | dataflow-app | +| Exchange | 21 | 1 主 + 8 平台业务 + 8 平台错误 + 4 DLX | +| Queue | 9 | 4 主 + 4 重试 + 1 错误 | +| Binding | 52 | 完整消息流转路径 | +| User | 11 | 多角色权限隔离 | + +**消息流转路径**: +``` +生产者 → 平台 Exchange → 主队列 → 消费者 + ↓ (失败) + DLX → 重试队列 (5秒TTL) → 主 Exchange → 主队列 + ↓ (超过3次) + DLX → 错误队列 (人工处理) +``` + +### B. 参考资料 + +- [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html) +- [RabbitMQ Management Plugin](https://www.rabbitmq.com/management.html) +- [Access Control (Authentication, Authorization)](https://www.rabbitmq.com/access-control.html) +- [rabbitmqadmin 使用指南](https://www.rabbitmq.com/management-cli.html)