# 数据流转架构 ## 概述 本文档描述电商数据从平台采集到入库的完整流程,包括各系统角色、职责和操作说明。 ### 系统角色 | 角色 | 职责 | 系统 | |------|------|------| | **数据生产者** | 采集电商平台数据并推送到 MQ | 各平台采集服务 | | **消息队列** | 数据路由、缓冲、隔离 | RabbitMQ | | **数据消费者** | 消费 MQ 消息并入库 | Dataflow App | | **存储层** | 持久化存储和统计分析 | PostgreSQL + TimescaleDB | --- ## 整体架构 ```mermaid graph TB subgraph "数据生产者层" P1[DouYin 采集服务] P2[Tmall 采集服务] P3[RedBook 采集服务] P4[其他平台采集服务...] end subgraph "RabbitMQ 消息队列" subgraph "VHost: dataflow-app" subgraph "业务 Exchanges" E1[douyin.exchange] E2[tmall.exchange] E3[redbook.exchange] E4[其他.exchange...] end subgraph "业务 Queues" Q1[orders.queue
DLX: dlx.orders] Q2[products.queue
DLX: dlx.products] Q3[refunds.queue
DLX: dlx.refunds] Q4[inventory.queue
DLX: dlx.inventory] end subgraph "DLX 死信交换机" DLX1[dlx.orders] DLX2[dlx.products] DLX3[dlx.refunds] DLX4[dlx.inventory] end subgraph "重试队列" RQ1[orders.retry.queue
TTL: 5s] RQ2[products.retry.queue
TTL: 5s] RQ3[refunds.retry.queue
TTL: 5s] RQ4[inventory.retry.queue
TTL: 5s] end subgraph "错误处理" EE1[douyin.errors.exchange] EE2[tmall.errors.exchange] EE3[redbook.errors.exchange] EQ[errors.queue] end end end subgraph "Dataflow 消费者" C1[OrderConsumer] C2[ProductConsumer] C3[RefundConsumer] C4[InventoryConsumer] C5[ErrorConsumer] end subgraph "存储层" DB1[(orders 表
TimescaleDB)] DB2[(products 表)] DB3[(refunds 表)] DB4[(inventory 表)] DB5[(failed_messages 表)] end subgraph "聚合视图" AGG1[orders_daily_by_created] AGG2[orders_daily_by_paid] end P1 -->|推送订单/产品/退款/库存| E1 P2 -->|推送订单/产品/退款/库存| E2 P3 -->|推送订单/产品/退款/库存| E3 P4 -->|推送订单/产品/退款/库存| E4 E1 & E2 & E3 & E4 -->|routing_key: order.platform| Q1 E1 & E2 & E3 & E4 -->|routing_key: product.platform| Q2 E1 & E2 & E3 & E4 -->|routing_key: refund.platform| Q3 E1 & E2 & E3 & E4 -->|routing_key: inventory.platform| Q4 Q1 --> C1 Q2 --> C2 Q3 --> C3 Q4 --> C4 C1 -->|成功| DB1 C2 -->|成功| DB2 C3 -->|成功| DB3 C4 -->|成功| DB4 C1 -.->|失败 nack| DLX1 C2 -.->|失败 nack| DLX2 C3 -.->|失败 nack| DLX3 C4 -.->|失败 nack| DLX4 DLX1 -->|x-retries<3| RQ1 DLX2 -->|x-retries<3| RQ2 DLX3 -->|x-retries<3| RQ3 DLX4 -->|x-retries<3| RQ4 RQ1 -.->|TTL 到期
回到主队列| Q1 RQ2 -.->|TTL 到期
回到主队列| Q2 RQ3 -.->|TTL 到期
回到主队列| Q3 RQ4 -.->|TTL 到期
回到主队列| Q4 DLX1 & DLX2 & DLX3 & DLX4 -->|x-retries>=3| EQ C1 & C2 & C3 & C4 -.->|永久失败| EE1 & EE2 & EE3 EE1 & EE2 & EE3 --> EQ EQ --> C5 C5 --> DB5 DB1 --> AGG1 DB1 --> AGG2 style P1 fill:#e1f5dd style P2 fill:#e1f5dd style P3 fill:#e1f5dd style P4 fill:#e1f5dd style E1 fill:#fff4e6 style E2 fill:#fff4e6 style E3 fill:#fff4e6 style E4 fill:#fff4e6 style Q1 fill:#e3f2fd style Q2 fill:#e3f2fd style Q3 fill:#e3f2fd style Q4 fill:#e3f2fd style DLX1 fill:#ffe0b2 style DLX2 fill:#ffe0b2 style DLX3 fill:#ffe0b2 style DLX4 fill:#ffe0b2 style RQ1 fill:#fff9c4 style RQ2 fill:#fff9c4 style RQ3 fill:#fff9c4 style RQ4 fill:#fff9c4 style C1 fill:#f3e5f5 style C2 fill:#f3e5f5 style C3 fill:#f3e5f5 style C4 fill:#f3e5f5 style DB1 fill:#fce4ec style AGG1 fill:#ffebee style AGG2 fill:#ffebee ``` --- ## RabbitMQ 内部架构 ### 业务数据流转 ```mermaid graph LR subgraph "生产者权限隔离" PD[DouYin 开发者
user_douyin] PT[Tmall 开发者
user_tmall] PR[RedBook 开发者
user_redbook] end subgraph "平台 Exchanges (Topic)" ED[douyin.exchange] ET[tmall.exchange] ER[redbook.exchange] end subgraph "数据类型 Queues" QO[orders.queue
TTL: 24h] QP[products.queue
TTL: 24h] QR[refunds.queue
TTL: 24h] QI[inventory.queue
TTL: 24h] end subgraph "消费者" CO[OrderConsumer
user_dataflow_consumer] end PD -->|write only| ED PT -->|write only| ET PR -->|write only| ER ED -->|order.douyin| QO ED -->|product.douyin| QP ED -->|refund.douyin| QR ED -->|inventory.douyin| QI ET -->|order.tmall| QO ET -->|product.tmall| QP ET -->|refund.tmall| QR ET -->|inventory.tmall| QI ER -->|order.redbook| QO ER -->|product.redbook| QP ER -->|refund.redbook| QR ER -->|inventory.redbook| QI QO -->|read| CO QP -->|read| CO QR -->|read| CO QI -->|read| CO style PD fill:#c8e6c9 style PT fill:#c8e6c9 style PR fill:#c8e6c9 style ED fill:#fff9c4 style ET fill:#fff9c4 style ER fill:#fff9c4 style QO fill:#bbdefb style QP fill:#bbdefb style QR fill:#bbdefb style QI fill:#bbdefb style CO fill:#d1c4e9 ``` ### 错误处理架构 ```mermaid graph TB subgraph "消费者错误产生" C1[OrderConsumer] C2[ProductConsumer] C3[RefundConsumer] C4[InventoryConsumer] end subgraph "平台错误 Exchanges" ED[douyin.errors.exchange] ET[tmall.errors.exchange] ER[redbook.errors.exchange] end subgraph "统一错误队列" EQ[errors.queue
TTL: 7 days] end subgraph "错误消息消费者" EC[ErrorConsumer
user_ops] DB[(failed_messages 表)] end subgraph "平台开发者监控" PD[DouYin 开发者
订阅 douyin.errors.exchange] PT[Tmall 开发者
订阅 tmall.errors.exchange] end C1 -->|判断平台| ED C1 -->|判断平台| ET C1 -->|判断平台| ER C2 -->|判断平台| ED C2 -->|判断平台| ET C2 -->|判断平台| ER C3 -->|判断平台| ED C3 -->|判断平台| ET C3 -->|判断平台| ER C4 -->|判断平台| ED C4 -->|判断平台| ET C4 -->|判断平台| ER ED -->|routing_key: #| EQ ET -->|routing_key: #| EQ ER -->|routing_key: #| EQ EQ --> EC EC --> DB ED -.->|read only| PD ET -.->|read only| PT style C1 fill:#ffccbc style C2 fill:#ffccbc style C3 fill:#ffccbc style C4 fill:#ffccbc style ED fill:#ffe0b2 style ET fill:#ffe0b2 style ER fill:#ffe0b2 style EQ fill:#ffcdd2 style EC fill:#f8bbd0 style PD fill:#c5e1a5 style PT fill:#c5e1a5 ``` --- ## 关键技术方案 ### 1. 乱序消费与数据一致性 #### 为什么必须支持乱序消费? 系统设计上**必须支持乱序消费**,原因如下: **批处理内部并发**: - 单消费者使用批处理模式(prefetch_count=100,batch_size=50) - 批次内的消息可能并发处理,无法保证严格顺序 - 同一批次中不同平台的消息会并发入库 **公平性需求**: - 多个平台的消息进入同一队列(orders.queue) - 批处理时优先处理简单消息,避免复杂消息阻塞整个批次 - 示例:DouYin 的大量消息和 Tmall 的消息混合在同一批次中并发处理 **重试消息的乱序**: - 失败消息通过 DLX 延迟重试后重新进入队列 - 重试消息可能与新消息乱序 - 必须防止旧版本数据覆盖新版本 #### 乱序消费带来的挑战 **核心问题**:同一订单的多次更新消息可能乱序到达 ``` 时间 T1: 订单创建,金额 100 元 → message_1: {platform_order_id: "DY123", amount: 100, data_version: 1736840000} 时间 T2: 订单更新,金额 200 元 → message_2: {platform_order_id: "DY123", amount: 200, data_version: 1736840100} 乱序消费(正常现象): Consumer A 先消费 message_2 → 数据库中金额为 200 ✅ Consumer B 后消费 message_1 → 如果没有版本控制,会覆盖为 100 ❌ 错误! ``` **注意**: - ✅ 不同订单的消息乱序消费 → 完全没问题 - ✅ 不同平台的消息乱序消费 → 完全没问题 - ❌ 同一订单的旧版本覆盖新版本 → 需要防止 **重要说明**: 虽然采用单消费者模型,但以下场景仍会导致乱序: 1. **批处理内部**:批次中的 50 条消息可能并发验证和入库 2. **重试消息**:失败消息经过 DLX 延迟 5 秒后回到队列,会与新消息混合 3. **数据库层面**:即使消费顺序正确,数据库事务提交顺序也可能不同 因此,**`data_version` 机制是必需的**,用于在数据库层面保证数据一致性。 #### 解决方案:数据版本控制 **方案**:在消息中增加 `data_version` 字段(Unix 时间戳),确保只有更新的数据才能写入数据库 ```sql -- 数据库表增加 data_version 字段 ALTER TABLE orders ADD COLUMN data_version BIGINT NOT NULL DEFAULT 0; -- 消费时只有更新的数据才写入 INSERT INTO orders ( platform_id, platform_order_id, total_amount, data_version, ... ) VALUES (20, 'DY123', 200, 1736840100, ...) ON CONFLICT (platform_id, platform_order_id) DO UPDATE SET total_amount = EXCLUDED.total_amount, data_version = EXCLUDED.data_version, updated_at = NOW() WHERE orders.data_version < EXCLUDED.data_version; -- 关键:只有版本更新才更新 ``` **工作流程**: ``` message_2 先到达 (data_version: 1736840100): INSERT → 数据库: amount=200, data_version=1736840100 ✅ message_1 后到达 (data_version: 1736840000): ON CONFLICT DO UPDATE WHERE orders.data_version < 1736840000 → 条件不满足,不更新 ✅ 数据库仍然是: amount=200, data_version=1736840100 ``` **消息格式要求**: ```json { "message_id": "order#dataflow#100#20#200#order#DY123", "metadata": { "data_version": 1736840100 // 必需:平台数据的最后更新时间戳 }, "data": { "platform_unique_id": "DY123", "raw_data": { "update_time": 1736840100 // 从平台原始数据中提取 } } } ``` --- ### 2. 幂等性保证(无 Redis 依赖) #### 方案:数据库表记录已处理消息 创建 `processed_messages` 表记录所有已处理的 message_id: ```sql CREATE TABLE processed_messages ( message_id VARCHAR(200) PRIMARY KEY, platform_id INT NOT NULL, data_type VARCHAR(50) NOT NULL, processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -- 索引:加速查询 CREATE INDEX idx_processed_messages_created ON processed_messages (created_at); -- 自动清理 30 天前的记录(可选) CREATE INDEX idx_processed_messages_cleanup ON processed_messages (created_at) WHERE created_at < NOW() - INTERVAL '30 days'; ``` #### 消费流程 ``` 1. 收到消息,提取 message_id 2. 检查是否已处理: SELECT 1 FROM processed_messages WHERE message_id = ? 如果存在 → ACK 消息,跳过处理(幂等性) 3. 开启事务: BEGIN; 3.1 插入 processed_messages: INSERT INTO processed_messages (message_id, platform_id, data_type) VALUES (?, ?, ?) ON CONFLICT (message_id) DO NOTHING; -- 防止并发插入 3.2 插入/更新业务数据: INSERT INTO orders (...) VALUES (...) ON CONFLICT (platform_id, platform_order_id) DO UPDATE SET ... WHERE orders.data_version < EXCLUDED.data_version; COMMIT; 4. ACK 消息 ``` #### 性能优化 **问题**:每次消费都查询 `processed_messages` 可能影响性能 **优化方案**: ```sql -- 1. 使用 PostgreSQL 的 ON CONFLICT 避免先查询 INSERT INTO processed_messages (message_id, platform_id, data_type) VALUES (?, ?, ?) ON CONFLICT (message_id) DO NOTHING RETURNING message_id; -- 如果 RETURNING 无结果 → 说明已处理,直接返回 -- 如果 RETURNING 有结果 → 继续处理业务数据 -- 2. 定期清理历史记录(减少表大小) DELETE FROM processed_messages WHERE created_at < NOW() - INTERVAL '30 days'; ``` #### 与数据版本控制结合 ``` 完整的幂等性 + 乱序处理流程: BEGIN; -- 1. 记录消息已处理(幂等性) INSERT INTO processed_messages (message_id, platform_id, data_type) VALUES ('order#dataflow#100#20#200#order#DY123', 20, 'order') ON CONFLICT (message_id) DO NOTHING RETURNING message_id; -- 如果返回空,说明重复消息,直接 ROLLBACK 并 ACK -- 如果返回值,继续处理 -- 2. 更新业务数据(带版本控制,防止乱序) INSERT INTO orders ( platform_id, platform_order_id, total_amount, data_version, created_at, updated_at ) VALUES (20, 'DY123', 200, 1736840100, NOW(), NOW()) ON CONFLICT (platform_id, platform_order_id) DO UPDATE SET total_amount = EXCLUDED.total_amount, data_version = EXCLUDED.data_version, updated_at = NOW() WHERE orders.data_version < EXCLUDED.data_version; COMMIT; -- 3. ACK 消息 ``` --- ## 数据流转流程 ### 1. 订单数据完整流程 ```mermaid sequenceDiagram participant P as DouYin 采集服务 participant E as douyin.exchange participant Q as orders.queue participant C as OrderConsumer participant DB as PostgreSQL participant PM as processed_messages 表 participant AGG as 连续聚合视图 Note over P: 1. 数据采集阶段 P->>P: 从 DouYin API 获取订单数据 P->>P: 生成 message_id
order#dataflow#100#20#200#order#DY123
提取 data_version Note over P,E: 2. 消息发布阶段 P->>E: 发布消息
routing_key: order
持久化消息 E->>Q: 路由到订单队列 Note over Q,C: 3. 消息消费阶段 Q->>C: 推送消息 (prefetch 10) C->>C: 验证消息格式 C->>C: 验证 message_id 结构 Note over C,DB: 4. 幂等性检查 + 数据入库 (事务) C->>DB: BEGIN TRANSACTION C->>PM: INSERT INTO processed_messages
ON CONFLICT DO NOTHING
RETURNING message_id alt 返回空 (重复消息) PM-->>C: 无返回值 C->>DB: ROLLBACK C->>Q: ACK (跳过处理) else 返回值 (新消息) PM-->>C: 返回 message_id Note over C,DB: 5. 更新业务数据 (带版本控制) C->>DB: INSERT INTO orders
ON CONFLICT DO UPDATE
WHERE data_version < new_version DB-->>C: 更新成功 C->>DB: COMMIT C->>Q: ACK (确认消息) Note over DB,AGG: 6. 自动聚合 (后台) DB->>AGG: TimescaleDB 后台任务
每小时刷新聚合视图 end ``` ### 2. 错误处理流程 ```mermaid sequenceDiagram participant P as Tmall 采集服务 participant E as tmall.exchange participant Q as orders.queue participant C as OrderConsumer participant EE as tmall.errors.exchange participant EQ as errors.queue participant EC as ErrorConsumer participant DB as failed_messages 表 participant Alert as 告警系统 P->>E: 发布订单消息 E->>Q: 路由到订单队列 Q->>C: 推送消息 Note over C: 消费处理 C->>C: 验证消息格式 alt 数据格式错误 (validation) C->>C: 捕获 ValidationException C->>EE: 发布错误消息
routing_key: validation EE->>EQ: 路由到错误队列 C->>Q: ACK (从业务队列移除) Note over EQ,EC: 错误消息处理 EQ->>EC: 推送错误消息 EC->>DB: 记录错误日志 EC->>Alert: 判断是否需要告警 alt 满足告警条件 Alert->>Alert: 发送 Slack/Email 通知 end EC->>EQ: ACK else 处理成功 C->>C: 入库成功 C->>Q: ACK end ``` ### 3. 消息重试流程(DLX + 延迟重试队列) ```mermaid sequenceDiagram participant Q as orders.queue participant C as OrderConsumer participant DB as PostgreSQL participant DLX as dlx.orders participant RQ as orders.retry.queue participant EQ as errors.queue Q->>C: 推送消息 (x-retries: 0) Note over C,DB: 第 1 次处理 C->>DB: 尝试入库 DB-->>C: 数据库连接失败 C->>C: 更新 header: x-retries = 1 C->>C: nack(requeue=false) C-->>DLX: 消息进入 DLX Note over DLX: 根据 x-retries 路由 DLX->>RQ: x-retries < 3
routing_key: retry Note over RQ: TTL = 5 秒 Note over RQ: 5 秒后自动死信 RQ-->>Q: TTL 到期
回到主队列 Note over Q,C: 第 2 次处理 Q->>C: 推送消息 (x-retries: 1) C->>DB: 尝试入库 DB-->>C: 数据库连接失败 C->>C: 更新 header: x-retries = 2 C->>C: nack(requeue=false) C-->>DLX: 消息进入 DLX DLX->>RQ: x-retries < 3 Note over RQ: 5 秒后自动死信 RQ-->>Q: 回到主队列 Note over Q,C: 第 3 次处理 Q->>C: 推送消息 (x-retries: 2) C->>DB: 尝试入库 DB-->>C: 数据库连接失败 C->>C: 更新 header: x-retries = 3 C->>C: nack(requeue=false) C-->>DLX: 消息进入 DLX Note over DLX: x-retries >= 3
超过重试次数 DLX->>EQ: routing_key: error
进入错误队列 ``` --- ## 各角色职责和操作 ### 1. 平台开发者(数据生产者) #### 职责 - 从电商平台 API 采集数据(订单/产品/退款) - 将数据转换为标准消息格式 - 推送消息到 RabbitMQ - 监控自己平台的错误消息 #### 权限 | 操作 | 权限 | 说明 | |------|------|------| | 连接到 VHost | `dataflow-app` | 只能访问此 VHost | | 发布消息 | `{platform}.exchange` | 只能写入自己平台的 Exchange | | 订阅错误 | `{platform}.errors.exchange` | 可选,接收本平台错误通知 | #### 操作流程 ##### 步骤 1: 生成 message_id ``` 格式: {entity_type}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{platform_unique_id} 示例: - order#dataflow#100#20#200#order#DY123456789 - product#dataflow#100#2#201#product#TM-PROD789 - refund#dataflow#100#20#200#refund#DY-REF456 - inventory#dataflow#100#2#201#inventory#TM-SKU123 ``` **生成逻辑**: ``` # 伪代码 message_id = f"{data_type}#dataflow#{company_id}#{platform_id}#{store_id}#{data_type}#{platform_order_id}" # 实际示例(DouYin 订单) message_id = f"order#dataflow#100#20#200#order#{order_data['order_id']}" ``` ##### 步骤 2: 提取 data_version **重要**:必须从平台原始数据中提取更新时间戳作为 `data_version` ```python # 示例:从 DouYin API 响应提取 douyin_order = { "order_id": "DY202501140001", "create_time": 1736839200, "update_time": 1736840000, # ← 使用这个作为 data_version ... } data_version = douyin_order.get('update_time') or douyin_order.get('create_time') # 如果平台未提供更新时间,使用当前时间 if not data_version: data_version = int(time.time()) ``` ##### 步骤 3: 构建消息体 ```json { "message_id": "order#dataflow#100#20#200#order#DY123456", "timestamp": "2025-01-14T10:30:00Z", "platform": "douyin", "data_type": "order", "metadata": { "platform_id": 20, "company_id": 100, "store_id": 200, "source_system": "douyin-open-api", "retry_count": 0, "data_version": 1736840000 }, "data": { "platform_unique_id": "DY123456", "raw_data": { // 平台原始完整 JSON 数据(必须包含更新时间) "order_id": "DY123456", "update_time": 1736840000, ... } } } ``` ##### 步骤 4: 发布消息 **连接参数**: - Host: `` - Port: `5672` - VHost: `dataflow-app` - Username: `user_{platform}` - Password: `` **发布配置**: - Exchange: `{platform}.exchange` - Routing Key: `{data_type}.{platform}` 格式 - 订单: `order.{platform}` (如 `order.douyin`, `order.tmall`) - 产品: `product.{platform}` (如 `product.douyin`, `product.tmall`) - 退款: `refund.{platform}` (如 `refund.douyin`, `refund.tmall`) - 库存: `inventory.{platform}` (如 `inventory.douyin`, `inventory.tmall`) - Delivery Mode: `2` (持久化) ##### 步骤 4: 监控错误(可选) **订阅配置**: - Exchange: `{platform}.errors.exchange` - Routing Key: `#` (接收所有错误类型) - 创建临时队列或持久队列用于接收错误消息 **错误消息示例**: ```json { "error_id": "err_1234567890", "original_message": { /* 原始消息 */ }, "error": { "type": "validation", "message": "Missing required field: total_amount", "timestamp": "2025-01-14T10:31:00Z" }, "metadata": { "platform": "amazon", "platform_id": 1, "store_id": 200, "failed_at": "2025-01-14T10:31:00Z" } } ``` --- ### 2. Dataflow 应用(数据消费者) #### 职责 - 消费 RabbitMQ 业务队列中的消息 - 使用批处理模式提升吞吐量(prefetch=100, batch_size=50) - 通过适配器模式支持多平台数据处理 - 验证消息格式和数据完整性 - 将数据入库到 PostgreSQL - 处理失败消息,通过 DLX 延迟重试或进入错误队列 - 实现幂等性,避免重复处理 #### 消费者架构模型 **单消费者 + 批处理 + 适配器模式**: ``` OrderConsumer (单实例) ├── RabbitMQ Channel │ └── prefetch_count: 100 (最多缓存 100 条未确认消息) │ ├── 消息缓冲区 (Message Buffer) │ ├── 接收 RabbitMQ 推送的消息 │ ├── 达到 batch_size=50 或 timeout=3s 触发处理 │ └── 缓冲区最大容量: 100 条 │ ├── 批处理引擎 (Batch Processor) │ ├── 遍历缓冲区中的消息 │ ├── 对每条消息调用对应的适配器 │ └── 批量入库 + 单条 ACK │ └── 平台适配器 (Platform Adapters) ├── DouyinAdapter: 处理抖音订单数据 ├── TmallAdapter: 处理天猫订单数据 ├── JDAdapter: 处理京东订单数据 └── ... 其他平台适配器 工作流程: 1. RabbitMQ 推送消息到消费者 (最多 100 条未确认) 2. 消费者缓存消息到缓冲区 3. 达到 batch_size 或 timeout 后触发批处理 4. 批处理引擎遍历消息,根据 platform 字段调用对应适配器 5. 适配器验证和转换数据 6. 批量插入数据库 7. 逐条 ACK/NACK 消息 ``` #### 权限 | 操作 | 权限 | 说明 | |------|------|------| | 连接到 VHost | `dataflow-app` | 访问业务 VHost | | 读取队列 | `orders.queue`, `products.queue`, `refunds.queue` | 消费业务消息 | | 写入错误 Exchange | `*.errors.exchange` | 发送错误消息到对应平台的错误 Exchange | | 管理队列 | `orders.queue`, `products.queue`, `refunds.queue` | 配置消费者参数 | #### 核心操作 ##### 消息消费配置 **推荐参数**: - **prefetch_count**: `100` - RabbitMQ 最多推送 100 条未确认消息 - **batch_size**: `50` - 消费者积累 50 条消息后触发批处理 - **batch_timeout**: `3000` ms - 如果 3 秒内未达到 batch_size,也触发处理 - **消费者模型**: 单消费者 + 批处理 + 适配器模式(每个队列一个消费者实例) - **ACK 模式**: 手动 ACK,单条确认(处理成功后才确认) - **重连机制**: 自动重连,指数退避 ##### 幂等性保证(数据库方案) **方案**:使用 `processed_messages` 表记录已处理的 message_id ```sql -- 1. 创建幂等性记录表 CREATE TABLE processed_messages ( message_id VARCHAR(200) PRIMARY KEY, platform_id INT NOT NULL, data_type VARCHAR(50) NOT NULL, processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_processed_messages_created ON processed_messages (created_at); -- 2. 定期清理历史记录(可选,保留 30 天) DELETE FROM processed_messages WHERE created_at < NOW() - INTERVAL '30 days'; ``` **消费流程**: ```sql -- 在事务中执行 BEGIN; -- 步骤 1: 尝试插入 processed_messages(幂等性检查) INSERT INTO processed_messages (message_id, platform_id, data_type) VALUES ('order#dataflow#100#20#200#order#DY123', 20, 'order') ON CONFLICT (message_id) DO NOTHING RETURNING message_id; -- 如果返回空 → 说明是重复消息,ROLLBACK 并 ACK -- 如果返回值 → 继续处理 -- 步骤 2: 插入/更新业务数据(带版本控制) INSERT INTO orders ( platform_id, platform_order_id, company_id, store_id, total_amount, data_version, raw, created_at, updated_at ) VALUES (20, 'DY123', 100, 200, 9999, 1736840000, '...'::jsonb, NOW(), NOW()) ON CONFLICT (platform_id, platform_order_id) DO UPDATE SET total_amount = EXCLUDED.total_amount, data_version = EXCLUDED.data_version, raw = EXCLUDED.raw, updated_at = NOW() WHERE orders.data_version < EXCLUDED.data_version; -- 只有更新的版本才更新 COMMIT; ``` **关键点**: - ✅ 无需 Redis 依赖 - ✅ 事务保证原子性 - ✅ `ON CONFLICT DO NOTHING` 实现幂等性 - ✅ `WHERE data_version < EXCLUDED.data_version` 防止乱序更新 ##### 错误处理逻辑 ``` 处理流程: 1. 验证消息格式 ├─ 格式正确 → 继续 └─ 格式错误 → 发送到错误队列 (validation 类型), ACK 2. 幂等性检查 ├─ 已处理 → 直接 ACK └─ 未处理 → 继续 3. 业务处理 ├─ 成功 → 标记已处理, ACK └─ 失败 (异常) ├─ 数据库错误 / 网络错误 │ ├─ retry_count < 3 → 重新发布 (延迟重试), ACK │ └─ retry_count >= 3 → 发送到错误队列 (processing_failed), ACK └─ 业务逻辑错误 → 发送到错误队列 (validation), ACK ``` ##### 延迟重试实现(DLX + 重试队列方案) **重试策略**: - 重试延迟: 固定 5 秒(通过 TTL 实现) - 最大重试次数: 3 次 - 超过 3 次: 进入错误队列 **实现方式**(无需额外插件): ``` 消费失败 → nack(requeue=false) → 进入 DLX ↓ x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列 ↓ x-retries >= 3 → 错误队列 (人工处理) ``` **工作流程**: 1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX(如 dlx.orders) 2. DLX 根据消息 header 中的 `x-retries` 判断路由: - `x-retries < 3`:路由到 `orders.retry.queue`(延迟重试) - `x-retries >= 3`:路由到 `errors.queue`(永久失败) 3. 重试队列 TTL 到期后,消息自动死信回主队列,重新消费 4. 消费者需要在 nack 前更新消息的 `x-retries` header --- ### 3. 运维团队 #### 职责 - RabbitMQ 集群部署和维护 - 用户和权限管理 - 监控队列健康状态 - 处理系统级错误 - 性能优化和故障排查 #### 权限 | 操作 | 权限 | 说明 | |------|------|------| | 管理员账号 | `admin` tag | 访问管理界面,完全权限 | | 监控错误队列 | `errors.queue` (read) | 查看所有平台的错误消息 | | 配置资源 | 所有 Exchange/Queue | 创建、修改、删除资源 | #### 日常操作 ##### 监控关键指标 **队列堆积监控**: ```bash # 查看所有队列状态 rabbitmqctl list_queues -p dataflow-app name messages_ready messages_unacknowledged # 告警阈值 # - messages_ready > 10000: 严重堆积 # - messages_unacknowledged > 5000: 消费者处理缓慢 ``` **消费者连接监控**: ```bash # 查看所有消费者连接 rabbitmqctl list_consumers -p dataflow-app # 检查项: # - 消费者数量是否正常 # - prefetch_count 配置是否合理 # - 是否有消费者断开 ``` **错误率监控**: ```sql -- 查询最近 1 小时各平台错误数量 SELECT platform, error_type, COUNT(*) as error_count FROM failed_messages WHERE created_at > NOW() - INTERVAL '1 hour' GROUP BY platform, error_type ORDER BY error_count DESC; ``` ##### 故障排查 **队列堆积问题**: ``` 排查步骤: 1. 检查消费者是否在线 rabbitmqctl list_consumers -p dataflow-app 2. 查看消费速率 管理界面 → Queues → 选择队列 → 查看 Deliver/Get rate 3. 检查消费者日志 - 是否有异常报错 - 处理时间是否过长 解决方案: - 增加消费者实例数量 - 增加 prefetch_count - 优化业务处理逻辑 ``` **消息丢失问题**: ``` 排查步骤: 1. 确认消息是否发布成功 管理界面 → Exchanges → 查看 Message rates (Publish in) 2. 确认 Binding 配置 管理界面 → Exchanges → 选择 Exchange → 查看 Bindings 3. 检查消息是否持久化 - Exchange durable: true - Queue durable: true - Message delivery_mode: 2 4. 查看消费者 ACK 行为 - 是否正确 ACK - 是否有 NACK/REJECT ``` ##### 性能优化 **优化消费速度**: ``` 措施: 1. 调整批处理参数 - batch_size: 当前 50,可根据消息大小调整到 30-100 - batch_timeout: 当前 3s,可根据实时性要求调整到 1-5s 2. 调整 prefetch_count - 当前: 100 - 可根据内存和消息处理速度调整到 50-200 - prefetch_count 应该是 batch_size 的 2-3 倍 3. 优化批处理逻辑 - 使用数据库批量插入(INSERT ... VALUES (...), (...), ...) - 批次内消息可以并发验证(适配器模式) - 但必须单条 ACK 保证可靠性 4. 优化业务逻辑 - 减少数据库查询 - 使用连接池 - 异步处理非关键操作 5. 水平扩展(谨慎) - 单队列单消费者模型,不建议同一队列多消费者 - 可考虑分片:不同数据类型独立扩展(orders/products 分别扩展) ``` **队列性能优化**: ```bash # 启用 lazy queue (大量消息堆积时) rabbitmqctl set_policy lazy-queue "^(orders|products|refunds)\.queue$" \ '{"queue-mode":"lazy"}' --vhost dataflow-app # 效果: # - 消息存储在磁盘,减少内存占用 # - 适用于消息堆积严重的场景 ``` --- ### 4. RabbitMQ 管理员 #### 职责 - 初始化 RabbitMQ 配置 - 创建 VHost、Exchange、Queue - 管理用户和权限 - 新平台接入配置 #### 核心操作 ##### 新平台接入 当需要接入新平台(如 `aliexpress`)时,执行以下步骤: **步骤 1: 创建 Exchange** ```bash # 业务 Exchange rabbitmqadmin -u admin -p declare exchange \ name="aliexpress.exchange" vhost=dataflow-app \ type=topic durable=true # 错误 Exchange rabbitmqadmin -u admin -p declare exchange \ name="aliexpress.errors.exchange" vhost=dataflow-app \ type=topic durable=true ``` **步骤 2: 创建 Binding** ```bash # 绑定到业务队列(使用 {data_type}.{platform} 格式) rabbitmqadmin -u admin -p declare binding \ source="aliexpress.exchange" destination="orders.queue" \ vhost=dataflow-app routing_key="order.aliexpress" rabbitmqadmin -u admin -p declare binding \ source="aliexpress.exchange" destination="products.queue" \ vhost=dataflow-app routing_key="product.aliexpress" rabbitmqadmin -u admin -p declare binding \ source="aliexpress.exchange" destination="refunds.queue" \ vhost=dataflow-app routing_key="refund.aliexpress" rabbitmqadmin -u admin -p declare binding \ source="aliexpress.exchange" destination="inventory.queue" \ vhost=dataflow-app routing_key="inventory.aliexpress" # 绑定到错误队列 rabbitmqadmin -u admin -p declare binding \ source="aliexpress.errors.exchange" destination="errors.queue" \ vhost=dataflow-app routing_key="#" ``` **步骤 3: 创建用户** ```bash # 创建用户 rabbitmqadmin -u admin -p declare user \ name="user_aliexpress" password="" tags="" # 配置权限 rabbitmqadmin -u admin -p declare permission \ vhost=dataflow-app user="user_aliexpress" \ configure="" write="^aliexpress\.(exchange|errors\.exchange)$" \ read="^aliexpress\.errors\..*$" ``` **步骤 4: 提供给开发者** ``` 连接信息: - Host: - Port: 5672 - VHost: dataflow-app - Username: user_aliexpress - Password: Exchange: aliexpress.exchange Routing Keys: - 订单: order.aliexpress - 产品: product.aliexpress - 退款: refund.aliexpress - 库存: inventory.aliexpress 错误订阅 (可选): aliexpress.errors.exchange ``` --- ## 监控和告警 ### 1. 关键监控指标 ```mermaid graph TB subgraph "队列监控" M1[messages_ready
待消费消息数] M2[messages_unacknowledged
未确认消息数] M3[message_stats.publish_in_rate
消息发布速率] M4[message_stats.deliver_rate
消息消费速率] end subgraph "消费者监控" C1[consumer_count
消费者数量] C2[prefetch_count
预取数量] C3[connection_state
连接状态] end subgraph "错误监控" E1[errors.queue 消息数
错误队列堆积] E2[failed_messages 表记录数
失败消息总数] E3[每小时错误增长率
错误频率] end subgraph "系统监控" S1[disk_free
磁盘剩余空间] S2[mem_used
内存使用率] S3[fd_used
文件描述符使用] end style M1 fill:#ffcdd2 style M2 fill:#ffcdd2 style E1 fill:#ffcdd2 style E2 fill:#ffcdd2 ``` ### 2. 告警规则 | 指标 | 告警阈值 | 级别 | 处理建议 | |------|---------|------|---------| | `messages_ready` | > 10,000 | 警告 | 增加消费者 | | `messages_ready` | > 50,000 | 严重 | 紧急扩容 | | `messages_unacknowledged` | > 5,000 | 警告 | 检查消费者处理速度 | | `consumer_count` | = 0 | 严重 | 消费者服务挂掉 | | `publish_rate - deliver_rate` | > 1000/s | 警告 | 消费速度跟不上生产速度 | | `errors.queue` 消息数 | > 1,000 | 警告 | 大量消息处理失败 | | 单平台小时错误数 | > 100 | 警告 | 平台数据质量问题 | | `disk_free` | < 10 GB | 严重 | 清理磁盘或扩容 | | `mem_used` | > 80% | 警告 | 优化或增加内存 | ### 3. 监控实现 **Prometheus + Grafana**: ```yaml # prometheus.yml scrape_configs: - job_name: 'rabbitmq' static_configs: - targets: ['rabbitmq-exporter:9419'] metrics_path: /metrics ``` **告警规则示例**: ```yaml # alert_rules.yml groups: - name: rabbitmq_alerts rules: - alert: QueueBacklog expr: rabbitmq_queue_messages_ready{queue=~"orders|products|refunds"} > 10000 for: 5m labels: severity: warning annotations: summary: "队列 {{ $labels.queue }} 堆积严重" description: "当前待消费消息数: {{ $value }}" - alert: NoConsumers expr: rabbitmq_queue_consumers{queue=~"orders|products|refunds"} == 0 for: 1m labels: severity: critical annotations: summary: "队列 {{ $labels.queue }} 无消费者" description: "消费者服务可能已挂掉" ``` --- ## 故障恢复 ### 1. 消费者服务挂掉 ```mermaid graph LR A[检测到消费者断开] --> B[消息堆积在队列中] B --> C[告警通知运维] C --> D[重启消费者服务] D --> E[消费者自动重连] E --> F[继续消费堆积消息] F --> G{消息是否过期?} G -->|未过期| H[正常处理] G -->|已过期| I[消息被 TTL 删除] style A fill:#ffcdd2 style C fill:#fff9c4 style H fill:#c8e6c9 style I fill:#ffab91 ``` **恢复时间**: - 自动重连: 秒级 - 手动重启: 分钟级 - 数据不会丢失(消息持久化) ### 2. RabbitMQ 服务重启 ```mermaid graph TB A[RabbitMQ 重启] --> B[持久化资源自动恢复] B --> C[Exchanges 恢复] B --> D[Queues 恢复] B --> E[Bindings 恢复] B --> F[持久化消息恢复] A --> G[生产者重连] A --> H[消费者重连] G --> I[继续发布消息] H --> J[继续消费消息] style A fill:#ffcdd2 style C fill:#c8e6c9 style D fill:#c8e6c9 style E fill:#c8e6c9 style F fill:#c8e6c9 ``` **数据安全性**: - ✅ 持久化 Exchange 自动恢复 - ✅ 持久化 Queue 自动恢复 - ✅ 持久化消息自动恢复 - ❌ 未 ACK 的消息会重新投递(可能重复消费,需幂等性保证) ### 3. 数据库连接失败 ```mermaid graph TB A[数据库连接失败] --> B{retry_count < 3?} B -->|是| C[延迟重试
60s / 120s / 240s] C --> D[重新入队] D --> E[等待下次消费] E --> B B -->|否| F[超过重试次数] F --> G[发送到错误队列] G --> H[人工介入处理] style A fill:#ffcdd2 style C fill:#fff9c4 style G fill:#ffab91 style H fill:#ce93d8 ``` --- ## 最佳实践 ### 1. 生产者最佳实践 - ✅ **使用结构化 message_id**:格式 `{type}#{app}#{company}#{platform}#{store}#{type}#{unique_id}` - ✅ **必须提供 data_version**:从平台数据中提取更新时间戳,用于防止乱序更新 - ✅ **消息持久化**:`delivery_mode=2` - ✅ **包含完整 raw_data**:保留平台原始完整数据,便于后续数据补全和问题排查 - ✅ **使用正确的 routing_key**:`order`/`product`/`refund`/`inventory` - ✅ **实现发布确认**:等待 RabbitMQ 确认消息已接收 - ✅ **实现重连机制**:网络故障时自动重连 - ❌ **不要发送超大消息**(> 1 MB):考虑分片或使用对象存储 + 引用 ### 2. 消费者最佳实践 - ✅ **单消费者模型**:每个队列一个消费者实例,避免消息分散 - ✅ **批处理机制**:使用 prefetch + 批处理提升吞吐(batch_size=50, prefetch_count=100) - ✅ **适配器模式**:消费者内部根据平台字段分发到不同业务逻辑 - ✅ **手动 ACK**:单条确认,失败消息不影响其他消息 - ✅ **幂等性保证**:使用 `processed_messages` 表记录已处理的 message_id - ✅ **版本控制**:使用 `data_version` 字段防止乱序更新(批处理内部可能并发) - ✅ **事务处理**:在一个事务中完成幂等性检查 + 业务数据更新 - ✅ **DLX 重试机制**:失败消息通过 nack(requeue=false) 进入 DLX,延迟 5 秒后自动重试 - ✅ **错误分类处理**:区分可重试错误(临时)和业务错误(永久) - ✅ **限制重试次数**:最多 3 次,通过消息 header `x-retries` 控制 - ✅ **批量数据库操作**:批次内使用批量插入提升性能 - ✅ **记录详细日志**:便于故障排查 - ✅ **定期清理 processed_messages**:保留 30 天历史记录即可 - ❌ **不要同队列多消费者**:会导致消息 Round-Robin 分配,破坏设计 ### 3. 运维最佳实践 - ✅ **定期备份配置**:使用 `rabbitmqadmin export` - ✅ **监控告警**:配置 Prometheus + Grafana - ✅ **日志归档**:定期清理 `failed_messages` 表 - ✅ **压力测试**:定期测试消息吞吐能力 - ✅ **灾难恢复演练**:定期演练故障恢复流程 - ✅ **文档更新**:及时更新配置文档 --- ## 附录 ### A. 消息示例 #### 订单消息 ```json { "message_id": "order#dataflow#100#20#200#order#DY202501140001", "timestamp": "2025-01-14T10:30:00Z", "platform": "douyin", "data_type": "order", "metadata": { "platform_id": 20, "company_id": 100, "store_id": 200, "source_system": "douyin-open-api", "retry_count": 0, "data_version": 1736840000 }, "data": { "platform_unique_id": "DY202501140001", "raw_data": { "order_id": "DY202501140001", "create_time": 1736839200, "pay_time": 1736840000, "update_time": 1736840000, "order_amount": 9999, "pay_amount": 9999, "order_status": 2, "receiver_info": { "province": "北京市", "city": "北京市", "district": "朝阳区", "detail": "某某街道123号" } } } } ``` #### 产品消息 ```json { "message_id": "product#dataflow#100#2#201#product#TM-PROD-789", "timestamp": "2025-01-14T11:00:00Z", "platform": "tmall", "data_type": "product", "metadata": { "platform_id": 2, "company_id": 100, "store_id": 201, "source_system": "tmall-top-api", "retry_count": 0, "data_version": 1736841600 }, "data": { "platform_unique_id": "TM-PROD-789", "raw_data": { "num_iid": 789123456789, "title": "示例产品", "price": "499.00", "type": "fixed", "modified": "2025-01-14 11:00:00", "sku_list": [ { "sku_id": 987654321, "price": "499.00", "quantity": 100 } ] } } } ``` ### B. 参考文档 - [RabbitMQ 配置方案](./RabbitMQ.md) - [TimescaleDB 数据查询方案](./data_query.md) - [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html) - [AMQP 0-9-1 协议](https://www.rabbitmq.com/amqp-0-9-1-reference.html)