# RabbitMQ 配置方案 ## 概述 本文档描述 dataflow 应用的 RabbitMQ 消息队列架构设计和配置方法。 ### 业务背景 - **应用角色**:dataflow 是消息消费者,负责接收来自多个电商平台的数据并入库 - **数据来源**:多个电商平台(DouYin、JD、Tmall、RedBook、Amazon Japan、Naver、Rakuten、Shopify 等) - **数据类型**:订单(orders)、产品(products)、退款(refunds)、库存(inventory) - **隔离需求**:不同平台开发者之间的工作空间需要相互隔离,互不影响 ### 架构特点 - **VHost 隔离**:按应用划分 VHost,支持多业务复用 - **Exchange 隔离**:每个平台独立 Exchange,通过权限控制访问 - **单队列设计**:每种数据类型一个队列,所有平台共享(orders.queue/products.queue/refunds.queue/inventory.queue),保证严格 FIFO 顺序 - **独立消费者**:每种数据类型配备独立消费者,采用批处理 + prefetch + 适配器模式,提升吞吐与稳定性 - **智能重试**:DLX(死信交换机)+ 延迟重试队列 + 错误队列,优雅处理失败消息,不阻塞主队列 - **消息持久化**:RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API --- ## 设计方案总结 ### 核心设计理念 本方案采用 **Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式**,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。 ### 关键设计决策 #### 1. 为什么使用单队列? | 问题 | 传统多队列方案 | 本方案(单队列) | |------|--------------|----------------| | **队列数量** | 8平台 × 4数据类型 = 32个队列 | 4个数据类型队列(队列数量减少 87.5%)| | **FIFO 保证** | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 | | **运维复杂度** | 需要监控 32 个队列 | 只需监控 4 个主队列 | | **消费者部署** | 可能需要 32 个消费者实例 | 只需 4 个消费者实例 | **原因**:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。 #### 2. 为什么使用单消费者? **原因**:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。 **优势**: - ✅ 保证严格 FIFO 顺序 - ✅ 避免消息错投到错误的消费者 - ✅ 简化部署和运维 **如何实现多平台支持?** 通过消费者内部的**适配器模式**,根据消息的 `platform` 字段分发到不同业务逻辑。 #### 3. 为什么使用 Prefetch + 批处理? **原因**:RabbitMQ 是 **Push 模式**,不提供 Kafka 式的批量拉取 API。 **解决方案**: - 设置 `prefetch_count=100`:RabbitMQ 最多推送 100 条未确认消息 - 消费者自行缓存消息,达到 `batch_size=50` 或 `timeout=3s` 后批量处理 - 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性 **优势**: - ✅ 提升数据库写入效率(批量插入) - ✅ 控制并发和内存压力 - ✅ 失败消息不影响其他消息(单条 ACK) #### 4. 为什么使用 DLX + 延迟重试队列? **问题**:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。 **传统方案的问题**: - ❌ `nack(requeue=true)`:消息回队列头部,无限循环 - ❌ 直接丢弃:数据丢失 - ❌ 立即重试:打爆 API/DB **本方案(DLX + 延迟重试)**: ``` 消费失败 → nack(requeue=false) → 进入 DLX ↓ x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列 ↓ x-retries >= 3 → 错误队列 (人工处理) ``` **优势**: - ✅ 不会卡 FIFO - ✅ 不会无限重试 - ✅ 延迟重试避免打爆 API/DB - ✅ 错误消息自动隔离 - ✅ 无需额外 Redis 或手写 retry 逻辑 ### 方案优势总结 | 维度 | 优势 | |------|------| | **可靠性** | 消息持久化 + 单条 ACK + 智能重试,避免数据丢失 | | **性能** | Prefetch + 批处理 + 数据库批量写入,提升吞吐 | | **顺序性** | 单队列 FIFO 保证严格顺序 | | **扩展性** | 适配器模式,新增平台只需添加适配器类 | | **运维** | 队列数量少,监控简单,故障排查容易 | | **容错** | DLX 智能重试,失败消息不阻塞主队列 | ### 适用场景 ✅ **适合**: - 多平台数据同步(电商、物流、金融等) - 需要严格 FIFO 顺序 - 需要高可靠性和高吞吐 - 平台数量多(> 5个),数据类型固定(< 10个) ❌ **不适合**: - 实时性要求极高(< 100ms) - 需要按平台独立控制消费速率 - 平台数量少(< 3个),可以使用独立队列 --- ## 架构设计 ### 1. VHost 设计 VHost 按业务应用进行划分,便于资源隔离和权限管理。 ``` RabbitMQ 实例 ├── dataflow-app # Dataflow 应用专用 ├── analytics-app # 分析应用(预留) └── warehouse-app # 数据仓库应用(预留) ``` **当前需要创建的 VHost**: - `dataflow-app`:用于电商数据流转 --- ### 2. Exchange 设计 在 `dataflow-app` VHost 中,Exchange 分为两类: #### 业务 Exchange(用于数据投递) 每个平台一个独立的 Topic Exchange: | Exchange 名称 | 类型 | 持久化 | 用途 | |--------------|------|--------|------| | `douyin.exchange` | Topic | 是 | DouYin(抖音)平台数据入口 | | `jd.exchange` | Topic | 是 | JD(京东)平台数据入口 | | `tmall.exchange` | Topic | 是 | Tmall(天猫)平台数据入口 | | `redbook.exchange` | Topic | 是 | RedBook(小红书)平台数据入口 | | `amazon.exchange` | Topic | 是 | Amazon Japan 平台数据入口 | | `naver.exchange` | Topic | 是 | Naver 平台数据入口 | | `rakuten.exchange` | Topic | 是 | Rakuten(乐天)平台数据入口 | | `shopify.exchange` | Topic | 是 | Shopify 平台数据入口 | **Routing Key 规范**(采用 `{data_type}.{platform}` 格式): | 平台 | 订单 | 产品 | 退款 | 库存 | |------|------|------|------|------| | DouYin | `order.douyin` | `product.douyin` | `refund.douyin` | `inventory.douyin` | | JD | `order.jd` | `product.jd` | `refund.jd` | `inventory.jd` | | Tmall | `order.tmall` | `product.tmall` | `refund.tmall` | `inventory.tmall` | | RedBook | `order.redbook` | `product.redbook` | `refund.redbook` | `inventory.redbook` | | Amazon | `order.amazon` | `product.amazon` | `refund.amazon` | `inventory.amazon` | | Naver | `order.naver` | `product.naver` | `refund.naver` | `inventory.naver` | | Rakuten | `order.rakuten` | `product.rakuten` | `refund.rakuten` | `inventory.rakuten` | | Shopify | `order.shopify` | `product.shopify` | `refund.shopify` | `inventory.shopify` | **设计说明**: - 使用 Topic Exchange 的通配符特性,队列可通过 `order.#` 匹配所有平台的订单 - 保证单队列内所有平台消息的 FIFO 顺序 - 消费者通过解析 routing key 或消息体的 platform 字段进行业务分发 #### 错误 Exchange(用于错误通知) 每个平台一个独立的错误 Topic Exchange: | Exchange 名称 | 类型 | 持久化 | 用途 | |--------------|------|--------|------| | `douyin.errors.exchange` | Topic | 是 | DouYin 平台错误消息 | | `jd.errors.exchange` | Topic | 是 | JD 平台错误消息 | | `tmall.errors.exchange` | Topic | 是 | Tmall 平台错误消息 | | `redbook.errors.exchange` | Topic | 是 | RedBook 平台错误消息 | | `amazon.errors.exchange` | Topic | 是 | Amazon Japan 平台错误消息 | | `naver.errors.exchange` | Topic | 是 | Naver 平台错误消息 | | `rakuten.errors.exchange` | Topic | 是 | Rakuten 平台错误消息 | | `shopify.errors.exchange` | Topic | 是 | Shopify 平台错误消息 | **Routing Key 规范**: - `validation` - 数据格式验证错误 - `processing_failed` - 业务处理失败 - `system` - 系统错误 --- ### 3. Queue 设计 #### 业务队列(主队列) 物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据: | Queue 名称 | 持久化 | 自动删除 | 消息 TTL | DLX | 用途 | |-----------|--------|----------|----------|-----|------| | `orders.queue` | 是 | 否 | 24小时 | `dlx.orders` | 接收所有平台的订单数据,单消费者批处理 | | `products.queue` | 是 | 否 | 24小时 | `dlx.products` | 接收所有平台的产品数据,单消费者批处理 | | `refunds.queue` | 是 | 否 | 24小时 | `dlx.refunds` | 接收所有平台的退款数据,单消费者批处理 | | `inventory.queue` | 是 | 否 | 24小时 | `dlx.inventory` | 接收所有平台的库存数据,单消费者批处理 | **队列参数配置**: ```json { "x-message-ttl": 86400000, "x-dead-letter-exchange": "dlx.orders", "x-dead-letter-routing-key": "retry" } ``` #### 死信交换机(DLX) 每种数据类型配备一个 DLX,用于处理失败消息的路由: | Exchange 名称 | 类型 | 持久化 | 用途 | |--------------|------|--------|------| | `dlx.orders` | Topic | 是 | 订单失败消息路由 | | `dlx.products` | Topic | 是 | 产品失败消息路由 | | `dlx.refunds` | Topic | 是 | 退款失败消息路由 | | `dlx.inventory` | Topic | 是 | 库存失败消息路由 | #### 延迟重试队列 为每种数据类型提供延迟重试机制: | Queue 名称 | 持久化 | 消息 TTL | DLX | 用途 | |-----------|--------|----------|-----|------| | `orders.retry.queue` | 是 | 5秒 | `main.exchange` | 订单延迟重试(TTL后自动回到主队列)| | `products.retry.queue` | 是 | 5秒 | `main.exchange` | 产品延迟重试 | | `refunds.retry.queue` | 是 | 5秒 | `main.exchange` | 退款延迟重试 | | `inventory.retry.queue` | 是 | 5秒 | `main.exchange` | 库存延迟重试 | **重试队列参数配置**: ```json { "x-message-ttl": 5000, "x-dead-letter-exchange": "main.exchange", "x-dead-letter-routing-key": "order.{platform}" } ``` **重试机制说明**: 1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX 2. DLX 根据消息 header 中的 `x-retries` 判断路由: - `x-retries < 3`:路由到 `retry.queue`(延迟重试) - `x-retries >= 3`:路由到 `errors.queue`(永久失败) 3. 重试队列 TTL 到期后,消息自动死信回主 Exchange,重新进入主队列 4. 最大重试次数:**3次**,延迟时间:**5秒** #### 错误队列 统一的错误队列,接收所有超过重试次数的失败消息: | Queue 名称 | 持久化 | 自动删除 | 消息 TTL | 用途 | |-----------|--------|----------|----------|------| | `errors.queue` | 是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 | --- ### 4. Binding 配置 #### 主 Exchange (用于接收各平台数据) 需要创建一个统一的主 Exchange,用于重试队列的消息回流: | Exchange 名称 | 类型 | 持久化 | 用途 | |--------------|------|--------|------| | `main.exchange` | Topic | 是 | 统一主交换机,用于路由和重试消息回流 | #### 业务队列绑定(主队列) **方案1:直接绑定(推荐)** 每个平台的 Exchange 使用带平台标识的 routing key 绑定到对应数据类型队列: ``` douyin.exchange ├── [routing_key: order.douyin] → orders.queue ├── [routing_key: product.douyin] → products.queue ├── [routing_key: refund.douyin] → refunds.queue └── [routing_key: inventory.douyin] → inventory.queue tmall.exchange ├── [routing_key: order.tmall] → orders.queue ├── [routing_key: product.tmall] → products.queue ├── [routing_key: refund.tmall] → refunds.queue └── [routing_key: inventory.tmall] → inventory.queue ... (其他平台类似) ``` **方案2:通过主 Exchange(备选)** 平台 Exchange 先发到主 Exchange,再由主 Exchange 统一路由: ``` douyin.exchange → [routing_key: order.douyin] → main.exchange main.exchange ├── [routing_key: order.#] → orders.queue ├── [routing_key: product.#] → products.queue ├── [routing_key: refund.#] → refunds.queue └── [routing_key: inventory.#] → inventory.queue ``` **推荐使用方案1**,简化架构,减少消息跳转。 #### DLX 绑定(重试路由) 每个 DLX 根据消息 header 中的 `x-retries` 决定路由目标: ``` dlx.orders ├── [routing_key: retry, x-retries < 3] → orders.retry.queue └── [routing_key: error, x-retries >= 3] → errors.queue dlx.products ├── [routing_key: retry] → products.retry.queue └── [routing_key: error] → errors.queue dlx.refunds ├── [routing_key: retry] → refunds.retry.queue └── [routing_key: error] → errors.queue dlx.inventory ├── [routing_key: retry] → inventory.retry.queue └── [routing_key: error] → errors.queue ``` **注意**:RabbitMQ 原生不支持基于 header 的条件路由,需要: - **方案A**:消费者在 `nack` 时根据重试次数决定发送到 DLX 的 routing key(`retry` 或 `error`) - **方案B**:使用 RabbitMQ 插件(如 `rabbitmq_message_routing_by_headers`)实现条件路由 **推荐方案A**:消费者控制重试逻辑,更灵活可控。 #### 重试队列绑定(回流主队列) 重试队列 TTL 到期后,消息自动死信回主 Exchange: ``` orders.retry.queue └── [TTL=5s后 死信到 main.exchange,routing_key保持原始值:order.{platform}] main.exchange └── [routing_key: order.#] → orders.queue ``` 这样实现了"失败 → 延迟5秒 → 重新进入主队列"的循环。 #### 错误队列绑定 每个平台的错误 Exchange 和所有 DLX 都绑定到统一错误队列: ``` douyin.errors.exchange └── [routing_key: #] → errors.queue dlx.orders └── [routing_key: error] → errors.queue dlx.products └── [routing_key: error] → errors.queue ... (其他类型类似) ``` **说明**:使用 `#` 通配符接收所有错误类型。 --- ## 消费者设计 ### 1. 消费者模型 采用 **单消费者 + 批处理 + prefetch + 适配器模式**: | 数据类型 | 消费者 | Queue | 并发模式 | 处理模式 | |---------|--------|-------|----------|---------| | 订单 | Order Consumer | `orders.queue` | 单消费者 | 批处理 + 适配器 | | 产品 | Product Consumer | `products.queue` | 单消费者 | 批处理 + 适配器 | | 退款 | Refund Consumer | `refunds.queue` | 单消费者 | 批处理 + 适配器 | | 库存 | Inventory Consumer | `inventory.queue` | 单消费者 | 批处理 + 适配器 | **设计原则**: - ✅ **单消费者**:每个队列只有一个消费者实例,保证严格 FIFO 顺序 - ✅ **批处理**:积累一批消息后统一处理,提升数据库写入效率 - ✅ **单条 ACK**:每条消息独立确认,失败不影响其他消息 - ✅ **适配器模式**:消费者内部根据平台字段分发到不同业务逻辑 **消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 ** ### 2. Prefetch 配置 ```json { "prefetch_count": 100, "batch_size": 50, "batch_timeout": 3000 } ``` **参数说明**: - `prefetch_count=100`:RabbitMQ 最多推送 100 条未确认消息到消费者 - `batch_size=50`:消费者积累 50 条消息后触发批处理 - `batch_timeout=3000`:如果3秒内未达到 batch_size,也触发处理 **工作流程**: 1. RabbitMQ 按 Push 模式推送最多 100 条消息到消费者 2. 消费者缓存消息,达到 50 条或超时 3 秒后批量处理 3. 批量处理时,仍然对每条消息单独 ACK/NACK 4. 实现"伪批量拉取"效果,同时保留单条确认的灵活性 ### 3. 消费者处理逻辑(伪代码) ```python class OrderConsumer: def __init__(self): self.prefetch_count = 100 self.batch_size = 50 self.batch_timeout = 3 self.message_buffer = [] self.adapters = { 'douyin': DouyinAdapter(), 'tmall': TmallAdapter(), 'jd': JDAdapter(), # ... 其他平台适配器 } def on_message(self, channel, method, properties, body): """接收到消息时的回调""" message = json.loads(body) self.message_buffer.append({ 'channel': channel, 'method': method, 'properties': properties, 'body': message }) # 达到批量大小,触发处理 if len(self.message_buffer) >= self.batch_size: self.process_batch() def process_batch(self): """批量处理消息""" for msg_data in self.message_buffer: try: message = msg_data['body'] platform = message['platform'] # 使用适配器模式分发到对应平台逻辑 adapter = self.adapters.get(platform) if not adapter: raise ValueError(f"Unknown platform: {platform}") # 处理业务逻辑 adapter.process(message) # 成功:ACK msg_data['channel'].basic_ack(msg_data['method'].delivery_tag) except TemporaryError as e: # 临时错误:重试 retry_count = msg_data['properties'].headers.get('x-retries', 0) if retry_count < 3: # 更新重试次数,发送到 DLX 进行延迟重试 headers = {'x-retries': retry_count + 1} msg_data['channel'].basic_nack( msg_data['method'].delivery_tag, requeue=False ) else: # 超过重试次数,发送到错误队列 self.send_to_error_queue(msg_data) msg_data['channel'].basic_nack( msg_data['method'].delivery_tag, requeue=False ) except PermanentError as e: # 永久错误:直接进入错误队列 self.send_to_error_queue(msg_data) msg_data['channel'].basic_nack( msg_data['method'].delivery_tag, requeue=False ) # 清空缓冲区 self.message_buffer.clear() def send_to_error_queue(self, msg_data): """发送到错误队列""" error_message = { 'original_message': msg_data['body'], 'error': str(e), 'timestamp': datetime.now().isoformat() } # 发送到错误 Exchange self.channel.basic_publish( exchange='douyin.errors.exchange', routing_key='error', body=json.dumps(error_message) ) ``` ### 4. 重试机制工作流程 ``` ┌─────────────────┐ │ orders.queue │ ← 消费者从这里接收消息 └────────┬────────┘ │ ├─ 成功 → basic_ack() │ └─ 失败 → basic_nack(requeue=false) │ ▼ ┌──────────────────┐ │ dlx.orders │ ← 死信交换机 └────────┬─────────┘ │ ├─ x-retries < 3 → orders.retry.queue (TTL=5s) │ │ │ └─ TTL到期 → 自动死信回 main.exchange │ │ │ └─ order.# → orders.queue │ └─ x-retries >= 3 → errors.queue (人工处理) ``` ### 5. 为什么这样设计? | 特性 | 原因 | 优势 | |------|------|------| | **单队列 FIFO** | RabbitMQ 无法在队列内部按 topic 过滤 | 保证严格 FIFO,不拆分队列造成业务乱序 | | **单消费者** | RabbitMQ 同队列多消费者必然 Round-Robin 分配 | 避免消息错投到错误的消费者 | | **Prefetch + 批处理** | RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 | 实现"伪批量拉取",提升吞吐 | | **单条 ACK** | 保留细粒度控制 | 失败消息不影响其他消息 | | **DLX + 延迟重试** | 避免失败消息阻塞队列 | 不会卡 FIFO,不会无限重试 | | **适配器模式** | 多平台业务逻辑隔离 | 扩展性好,易维护 | --- ## 用户权限配置 ### 1. 平台开发者账号 每个平台分配一个专用账号,只能访问自己的 Exchange。 #### DouYin 平台账号 ``` 用户名: user_douyin 密码: <安全密码> VHost: dataflow-app 权限配置: - Configure: (留空) - 不允许创建/删除资源 - Write: ^douyin\.(exchange|errors\.exchange)$ - 只能写入 DouYin 的业务和错误 Exchange - Read: ^douyin\.errors\..*$ - 只能读取 DouYin 错误相关的队列 ``` #### Tmall 平台账号 ``` 用户名: user_tmall 密码: <安全密码> VHost: dataflow-app 权限配置: - Configure: (留空) - Write: ^tmall\.(exchange|errors\.exchange)$ - Read: ^tmall\.errors\..*$ ``` #### 其他平台账号 按相同模式创建: - JD: `user_jd` - RedBook: `user_redbook` - Amazon Japan: `user_amazon` - Naver: `user_naver` - Rakuten: `user_rakuten` - Shopify: `user_shopify` ### 2. Dataflow 消费者账号 Dataflow 应用的后端服务账号,负责消费业务队列并处理错误。 ``` 用户名: user_dataflow_consumer 密码: <安全密码> VHost: dataflow-app 权限配置: - Configure: ^(orders|products|refunds|inventory)\.queue$ - 可以管理业务队列 - Write: .*\.errors\.exchange$ - 可以向所有错误 Exchange 写入消息 - Read: ^(orders|products|refunds|inventory)\.queue$ - 可以读取所有业务队列 ``` ### 3. 运维监控账号 运维团队账号,用于监控所有错误消息。 ``` 用户名: user_ops 密码: <安全密码> VHost: dataflow-app 权限配置: - Configure: ^errors\..*$ - 可以管理错误相关资源 - Write: (留空) - 不需要写入权限 - Read: ^errors\.queue$ - 可以读取统一错误队列 ``` ### 4. 管理员账号 RabbitMQ 超级管理员,用于配置和维护。 ``` 用户名: admin 密码: <强密码> Tag: administrator 权限配置: - 对所有 VHost 具有完全权限 - 可以访问管理界面 ``` --- ## 消息格式规范 ### 1. message_id 设计 message_id 采用结构化格式,确保幂等性和可追溯性。 **格式**: ``` {prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id} ``` **字段说明**: - `prefix`:项目名称前缀(如 `wpic-project1`) - `app_id`:应用标识(固定为 `dataflow`) - `company_id`:公司 ID - `platform_id`:平台 ID - `store_id`:店铺 ID - `entity_type`:数据实体类型(order/product/refund/inventory) - `entity_id`:平台侧唯一标识(如订单号) **分隔符说明**: - 使用 `#` 作为分隔符(RabbitMQ message_id 字段支持任意字符) - `#` 在 routing key 中是通配符,但在 message_id 中是普通字符 **示例**: ``` wpic-project1#dataflow#100#20#200#order#DY123456789 解析: - 项目前缀: wpic-project1 - 应用: dataflow - 公司ID: 100 - 平台ID: 20 (DouYin) - 店铺ID: 200 - 实体类型: order - 平台订单号: DY123456789 ``` **特性**: - ✅ 幂等性:同一实体的 message_id 完全一致 - ✅ 可读性:直接看出数据来源 - ✅ 便于调试:可从 message_id 提取元数据 - ✅ 无冲突:全局唯一 ### 2. 业务消息格式 ```json { "message_id": "wpic-project1#dataflow#100#20#200#order#DY123456", "timestamp": "2025-01-14T10:30:00Z", "platform": "douyin", "data_type": "order", "metadata": { "platform_id": 20, "company_id": 100, "store_id": 200, "source_system": "douyin-open-api", "retry_count": 0, "data_version": 1736840000 }, "data": { "platform_unique_id": "DY123456", "raw_data": { // 平台原始完整数据 } } } ``` **字段说明**: | 字段 | 类型 | 必需 | 说明 | |------|------|------|------| | `message_id` | string | 是 | 结构化消息 ID,用于幂等性 | | `timestamp` | string | 是 | 消息生成时间(ISO 8601 格式) | | `platform` | string | 是 | 平台标识(douyin/tmall/jd...) | | `data_type` | string | 是 | 数据类型(order/product/refund/inventory) | | `metadata.platform_id` | int | 是 | 平台 ID(数据库主键) | | `metadata.company_id` | int | 是 | 公司 ID | | `metadata.store_id` | int | 是 | 店铺 ID | | `metadata.source_system` | string | 是 | 数据来源系统 | | `metadata.retry_count` | int | 是 | 重试次数(初始为 0) | | `metadata.data_version` | int | 是 | 数据版本(Unix 时间戳,用于防止乱序更新) | | `data.platform_unique_id` | string | 是 | 平台侧唯一标识 | | `data.raw_data` | object | 是 | 平台原始完整 JSON 数据 | **重要说明**: - **data_version**:必须使用平台数据的更新时间戳(Unix 时间戳),用于解决消息乱序消费问题 - 示例:平台订单的最后更新时间 `1736840000`(2025-01-14 10:00:00 UTC) - 消费端只有当 `data_version` 大于数据库中的值时才更新数据 - 如果平台未提供更新时间,使用消息生成时间戳 ### 3. 错误消息格式 错误消息由 dataflow 消费者生成,包含原始消息和错误详情。 ```json { "error_id": "err_1234567890abcdef", "original_message": { // 原始业务消息完整内容 }, "error": { "type": "validation", "message": "Missing required field: total_amount", "trace": "Exception stack trace...", "timestamp": "2025-01-14T10:31:00Z" }, "metadata": { "platform": "amazon", "platform_id": 1, "company_id": 100, "store_id": 200, "data_type": "order", "failed_at": "2025-01-14T10:31:00Z" } } ``` --- ## RabbitMQ 管理界面操作指引 ### 1. 访问管理界面 ``` URL: http://:15672 用户名: admin 密码: <管理员密码> ``` ### 2. 创建 VHost 1. 点击顶部导航 **Admin** → **Virtual Hosts** 2. 点击 **Add a new virtual host** 3. 输入 VHost 名称:`dataflow-app` 4. 点击 **Add virtual host** ### 3. 创建用户 1. 点击顶部导航 **Admin** → **Users** 2. 点击 **Add a user** 3. 填写用户信息: - **Username**: `user_amazon` - **Password**: `<安全密码>` - **Tags**: (留空,非管理员用户) 4. 点击 **Add user** ### 4. 配置用户权限 1. 在用户列表中,点击用户名(如 `user_amazon`) 2. 滚动到 **Permissions** 部分 3. 选择 VHost:`dataflow-app` 4. 配置权限正则表达式: - **Configure regexp**: (留空) - **Write regexp**: `^amazon\.(exchange|errors\.exchange)$` - **Read regexp**: `^amazon\.errors\..*$` 5. 点击 **Set permission** ### 5. 创建 Exchange 1. 点击顶部导航 **Exchanges** 2. 确保选择正确的 VHost:`dataflow-app` 3. 点击 **Add a new exchange** 4. 填写配置: - **Name**: `amazon.exchange` - **Type**: `topic` - **Durability**: `Durable` - **Auto delete**: `No` - **Internal**: `No` 5. 点击 **Add exchange** 重复以上步骤创建所有业务 Exchange 和错误 Exchange。 ### 6. 创建 Queue 1. 点击顶部导航 **Queues** 2. 确保选择正确的 VHost:`dataflow-app` 3. 点击 **Add a new queue** 4. 填写配置: - **Name**: `orders.queue` - **Durability**: `Durable` - **Auto delete**: `No` - **Arguments**: - `x-message-ttl`: `86400000` (24小时,单位:毫秒) 5. 点击 **Add queue** 重复以上步骤创建 `products.queue`、`refunds.queue` 和 `errors.queue`。 **注意**:`errors.queue` 的 TTL 设置为 `604800000`(7天)。 ### 7. 创建 Binding 1. 点击顶部导航 **Exchanges** 2. 点击需要绑定的 Exchange(如 `amazon.exchange`) 3. 滚动到 **Bindings** 部分 4. 在 **Add binding from this exchange** 区域填写: - **To queue**: `orders.queue` - **Routing key**: `order` 5. 点击 **Bind** 为每个 Exchange 创建 3 个绑定(order/product/refund)。 对于错误 Exchange,绑定配置: - **To queue**: `errors.queue` - **Routing key**: `#` --- ## 配置脚本 为简化配置过程,可以使用以下脚本自动创建资源。 ### 使用 rabbitmqadmin RabbitMQ 提供命令行工具 `rabbitmqadmin`,可用于批量配置。 #### 安装 rabbitmqadmin ```bash # 从 RabbitMQ 管理界面下载 wget http://:15672/cli/rabbitmqadmin chmod +x rabbitmqadmin sudo mv rabbitmqadmin /usr/local/bin/ ``` #### 配置脚本示例 ```bash #!/bin/bash # RabbitMQ 连接信息 RABBITMQ_HOST="localhost:15672" RABBITMQ_USER="admin" RABBITMQ_PASS="admin_password" VHOST="dataflow-app" # 平台列表 PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify") DATA_TYPES=("orders" "products" "refunds" "inventory") # 1. 创建 VHost rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare vhost name=$VHOST # 2. 创建主 Exchange(用于重试消息回流) rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare exchange name="main.exchange" vhost=$VHOST \ type=topic durable=true # 3. 创建 DLX (死信交换机) for dtype in "${DATA_TYPES[@]}"; do rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare exchange name="dlx.${dtype}" vhost=$VHOST \ type=topic durable=true done # 4. 创建主业务队列(带 DLX 配置) for dtype in "${DATA_TYPES[@]}"; do rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare queue name="${dtype}.queue" vhost=$VHOST durable=true \ arguments="{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}" done # 5. 创建重试队列(带 TTL 和 DLX 回流配置) for dtype in "${DATA_TYPES[@]}"; do rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare queue name="${dtype}.retry.queue" vhost=$VHOST durable=true \ arguments="{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}" done # 6. 创建错误队列 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare queue name="errors.queue" vhost=$VHOST durable=true \ arguments='{"x-message-ttl":604800000}' # 7. 绑定主 Exchange 到主队列(使用通配符) for dtype in "${DATA_TYPES[@]}"; do # 使用 order.#, product.# 等通配符匹配所有平台 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="main.exchange" destination="${dtype}.queue" \ vhost=$VHOST routing_key="${dtype%s}.#" done # 8. 绑定 DLX 到重试队列和错误队列 for dtype in "${DATA_TYPES[@]}"; do # DLX -> 重试队列 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="dlx.${dtype}" destination="${dtype}.retry.queue" \ vhost=$VHOST routing_key="retry" # DLX -> 错误队列 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="dlx.${dtype}" destination="errors.queue" \ vhost=$VHOST routing_key="error" done # 9. 为每个平台创建 Exchange 和 Binding for platform in "${PLATFORMS[@]}"; do # 创建平台业务 Exchange rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare exchange name="${platform}.exchange" vhost=$VHOST \ type=topic durable=true # 绑定到业务队列(使用新的 routing key 格式:data_type.platform) rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="${platform}.exchange" destination="orders.queue" \ vhost=$VHOST routing_key="order.${platform}" rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="${platform}.exchange" destination="products.queue" \ vhost=$VHOST routing_key="product.${platform}" rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="${platform}.exchange" destination="refunds.queue" \ vhost=$VHOST routing_key="refund.${platform}" rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="${platform}.exchange" destination="inventory.queue" \ vhost=$VHOST routing_key="inventory.${platform}" # 创建平台错误 Exchange rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare exchange name="${platform}.errors.exchange" vhost=$VHOST \ type=topic durable=true # 绑定到错误队列 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="${platform}.errors.exchange" destination="errors.queue" \ vhost=$VHOST routing_key="#" # 创建平台用户 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare user name="user_${platform}" password="change_me_${platform}" tags="" # 配置平台用户权限 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permission vhost=$VHOST user="user_${platform}" \ configure="" write="^${platform}\.(exchange|errors\.exchange)$" \ read="^${platform}\.errors\..*$" done # 10. 创建 dataflow 消费者用户 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare user name="user_dataflow_consumer" password="change_me_consumer" tags="" rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permission vhost=$VHOST user="user_dataflow_consumer" \ configure="^(orders|products|refunds|inventory).*\.queue$" \ write="(dlx\..*)|(.*\.errors\.exchange)$" \ read="^(orders|products|refunds|inventory).*\.queue$" # 11. 创建运维监控用户 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare user name="user_ops" password="change_me_ops" tags="" rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permission vhost=$VHOST user="user_ops" \ configure="^errors\..*$" write="" read="^errors\.queue$" echo "RabbitMQ 配置完成!" echo "" echo "已创建:" echo "- 1 个 VHost: dataflow-app" echo "- 1 个主 Exchange: main.exchange" echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue" echo "- 1 个错误队列: errors.queue" echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)" echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)" ``` #### 执行脚本 ```bash chmod +x setup_rabbitmq.sh ./setup_rabbitmq.sh ``` --- ## 监控和维护 ### 1. 监控指标 通过 RabbitMQ 管理界面查看以下关键指标: #### 队列健康状态 1. 点击 **Queues** → 选择队列 2. 关注以下指标: - **Ready**: 待消费消息数(正常应 < 1000) - **Unacked**: 未确认消息数(正常应较低) - **Total**: 总消息数 - **Incoming rate**: 消息入队速率 - **Deliver / Get rate**: 消息消费速率 **告警阈值**: - Ready > 10000:队列堆积严重 - Unacked > 5000:消费者处理缓慢 - Incoming rate >> Deliver rate:消费速度跟不上生产速度 #### Exchange 流量统计 1. 点击 **Exchanges** → 选择 Exchange 2. 查看 **Message rates**: - **Publish in**: 消息发布速率 - **Publish out**: 消息路由到队列的速率 **异常情况**: - Publish in > 0 但 Publish out = 0:可能绑定配置错误 ### 2. 常见问题排查 #### 消息未到达队列 **检查步骤**: 1. 确认 Exchange 存在且类型正确 2. 检查 Binding 配置,确保 routing key 匹配 3. 查看 Exchange 的 **Message rates**,确认消息已发布 4. 检查生产者是否使用了正确的 routing key #### 消费者无法消费 **检查步骤**: 1. 确认用户权限配置正确(Read 权限) 2. 检查队列中是否有消息(Ready > 0) 3. 查看消费者连接状态(Connections 页面) 4. 检查消费者代码是否正确 ACK 消息 #### 错误消息未送达平台开发者 **检查步骤**: 1. 确认错误 Exchange 绑定到 `errors.queue` 2. 检查平台开发者账号的 Read 权限 3. 确认平台开发者订阅了正确的 Exchange 4. 查看 `errors.queue` 中是否有消息 ### 3. 备份和恢复 #### 导出配置 ```bash # 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions) rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ export dataflow-backup.json ``` #### 导入配置 ```bash # 从备份恢复配置 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ import dataflow-backup.json ``` ### 4. 性能优化 #### 消费者并发配置 - 增加消费者实例数量 - 每个消费者配置多个 channel(如 5-10 个) - 使用 prefetch_count 限制未确认消息数量 #### 队列性能优化 - 启用 lazy queue(大量消息堆积时): ```bash rabbitmqadmin declare queue name=orders.queue vhost=dataflow-app \ durable=true arguments='{"x-queue-mode":"lazy"}' ``` - 定期清理过期消息(通过 TTL 自动实现) ### 5. 日志查看 ```bash # 查看 RabbitMQ 日志 tail -f /var/log/rabbitmq/rabbit@.log # 查看连接日志 tail -f /var/log/rabbitmq/rabbit@_connection.log ``` --- ## 安全建议 ### 1. 密码策略 - 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符) - 定期更换密码(建议 3-6 个月) - 不同环境(开发/测试/生产)使用不同密码 ### 2. 网络安全 - 生产环境启用 TLS/SSL 加密连接 - 限制管理界面访问(仅允许内网或 VPN 访问) - 配置防火墙规则: - 5672:AMQP 端口(仅允许应用服务器访问) - 15672:管理界面(仅允许管理员 IP 访问) ### 3. 权限最小化原则 - 平台开发者只能访问自己的 Exchange - 消费者只能读取必要的队列 - 避免赋予不必要的 Configure 权限 ### 4. 审计日志 - 启用 RabbitMQ 审计日志插件 - 定期审查用户操作记录 - 监控异常登录行为 --- ## 新增平台接入流程 当需要接入新平台(例如 Lazada)时,按以下步骤操作: ### 1. 创建 Exchange ```bash # 业务 Exchange rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare exchange name="lazada.exchange" vhost=dataflow-app \ type=topic durable=true # 错误 Exchange rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare exchange name="lazada.errors.exchange" vhost=dataflow-app \ type=topic durable=true ``` ### 2. 创建 Binding ```bash # 绑定到业务队列 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="lazada.exchange" destination="orders.queue" \ vhost=dataflow-app routing_key="order" rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="lazada.exchange" destination="products.queue" \ vhost=dataflow-app routing_key="product" rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="lazada.exchange" destination="refunds.queue" \ vhost=dataflow-app routing_key="refund" rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="lazada.exchange" destination="inventory.queue" \ vhost=dataflow-app routing_key="inventory" # 绑定到错误队列 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding source="lazada.errors.exchange" \ destination="errors.queue" vhost=dataflow-app routing_key="#" ``` ### 3. 创建用户并配置权限 ```bash # 创建用户 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare user name="user_lazada" password="" tags="" # 配置权限 rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permission vhost=dataflow-app user="user_lazada" \ configure="" write="^lazada\.(exchange|errors\.exchange)$" \ read="^lazada\.errors\..*$" ``` ### 4. 提供给平台开发者 将以下信息提供给新平台的开发团队: ``` RabbitMQ 连接信息: - Host: - Port: 5672 - VHost: dataflow-app - Username: user_lazada - Password: 发布配置: - Exchange: lazada.exchange - Routing Keys: - order: 发布订单数据 - product: 发布产品数据 - refund: 发布退款数据 - inventory: 发布库存数据 错误订阅(可选): - Exchange: lazada.errors.exchange - 接收本平台的错误消息 消息格式规范: - 参见「消息格式规范」章节 ``` --- ## 附录 ### A. 完整资源清单 #### VHost - **总数**: 1 个 - `dataflow-app` #### Exchanges **主 Exchange**: 1 个 - `main.exchange` (Topic) - 统一主交换机,用于路由和重试消息回流 **平台业务 Exchanges**: 8 个(每平台 1 个) - `douyin.exchange` - `jd.exchange` - `tmall.exchange` - `redbook.exchange` - `amazon.exchange` - `naver.exchange` - `rakuten.exchange` - `shopify.exchange` **平台错误 Exchanges**: 8 个(每平台 1 个) - `douyin.errors.exchange` - `jd.errors.exchange` - `tmall.errors.exchange` - `redbook.errors.exchange` - `amazon.errors.exchange` - `naver.errors.exchange` - `rakuten.errors.exchange` - `shopify.errors.exchange` **死信交换机 (DLX)**: 4 个(每数据类型 1 个) - `dlx.orders` - `dlx.products` - `dlx.refunds` - `dlx.inventory` **Exchange 总数**: **21 个** (1 主 + 8 平台业务 + 8 平台错误 + 4 DLX) #### Queues **主业务队列**: 4 个 - `orders.queue` (配置 DLX: dlx.orders) - `products.queue` (配置 DLX: dlx.products) - `refunds.queue` (配置 DLX: dlx.refunds) - `inventory.queue` (配置 DLX: dlx.inventory) **延迟重试队列**: 4 个 - `orders.retry.queue` (TTL: 5秒, DLX: main.exchange) - `products.retry.queue` (TTL: 5秒, DLX: main.exchange) - `refunds.retry.queue` (TTL: 5秒, DLX: main.exchange) - `inventory.retry.queue` (TTL: 5秒, DLX: main.exchange) **错误队列**: 1 个 - `errors.queue` (TTL: 7天) **Queue 总数**: **9 个** (4 主队列 + 4 重试队列 + 1 错误队列) #### Bindings **主 Exchange 绑定**: 4 个 - `main.exchange` → `orders.queue` (routing_key: `order.#`) - `main.exchange` → `products.queue` (routing_key: `product.#`) - `main.exchange` → `refunds.queue` (routing_key: `refund.#`) - `main.exchange` → `inventory.queue` (routing_key: `inventory.#`) **平台业务 Exchange 绑定**: 32 个 - 每个平台 Exchange → 4 个业务队列 (8 × 4 = 32) - 使用 routing key 格式:`{data_type}.{platform}` - 例如:`order.douyin`, `product.tmall`, `refund.jd` 等 **平台错误 Exchange 绑定**: 8 个 - 每个平台错误 Exchange → `errors.queue` (routing_key: `#`) **DLX 绑定**: 8 个 - 每个 DLX → 对应重试队列 (routing_key: `retry`) - 4 个 - 每个 DLX → `errors.queue` (routing_key: `error`) - 4 个 **Binding 总数**: **52 个** (4 主 + 32 平台业务 + 8 平台错误 + 8 DLX) #### Users **平台开发者账号**: 8 个 - `user_douyin`, `user_jd`, `user_tmall`, `user_redbook`, `user_amazon`, `user_naver`, `user_rakuten`, `user_shopify` - 权限:只能写入自己的业务和错误 Exchange,只能读取自己的错误队列 **消费者账号**: 1 个 - `user_dataflow_consumer` - 权限:可以管理所有业务队列和重试队列,可以写入 DLX 和错误 Exchange,可以读取所有业务队列 **运维监控账号**: 1 个 - `user_ops` - 权限:可以管理错误相关资源,可以读取错误队列 **管理员账号**: 1 个 - `admin` - 权限:完全权限,可以访问管理界面 **User 总数**: **11 个** (8 平台 + 1 消费者 + 1 运维 + 1 管理员) #### 架构统计 | 资源类型 | 数量 | 说明 | |---------|------|------| | VHost | 1 | dataflow-app | | Exchange | 21 | 1 主 + 8 平台业务 + 8 平台错误 + 4 DLX | | Queue | 9 | 4 主 + 4 重试 + 1 错误 | | Binding | 52 | 完整消息流转路径 | | User | 11 | 多角色权限隔离 | **消息流转路径**: ``` 生产者 → 平台 Exchange → 主队列 → 消费者 ↓ (失败) DLX → 重试队列 (5秒TTL) → 主 Exchange → 主队列 ↓ (超过3次) DLX → 错误队列 (人工处理) ``` ### B. 参考资料 - [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html) - [RabbitMQ Management Plugin](https://www.rabbitmq.com/management.html) - [Access Control (Authentication, Authorization)](https://www.rabbitmq.com/access-control.html) - [rabbitmqadmin 使用指南](https://www.rabbitmq.com/management-cli.html)