From e9ee0dc1f0a079e2d345288b10039ac0d5eea880 Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Fri, 14 Nov 2025 16:04:26 +0800 Subject: [PATCH] add dataflow design --- docs/data_flow.md | 1497 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1497 insertions(+) create mode 100644 docs/data_flow.md diff --git a/docs/data_flow.md b/docs/data_flow.md new file mode 100644 index 0000000..9fe5c6c --- /dev/null +++ b/docs/data_flow.md @@ -0,0 +1,1497 @@ +# 数据流转架构 + +## 概述 + +本文档描述电商数据从平台采集到入库的完整流程,包括各系统角色、职责和操作说明。 + +### 系统角色 + +| 角色 | 职责 | 系统 | +|------|------|------| +| **数据生产者** | 采集电商平台数据并推送到 MQ | 各平台采集服务 | +| **消息队列** | 数据路由、缓冲、隔离 | RabbitMQ | +| **数据消费者** | 消费 MQ 消息并入库 | Dataflow App | +| **存储层** | 持久化存储和统计分析 | PostgreSQL + TimescaleDB | + +--- + +## 整体架构 + +```mermaid +graph TB + subgraph "数据生产者层" + P1[DouYin 采集服务] + P2[Tmall 采集服务] + P3[RedBook 采集服务] + P4[其他平台采集服务...] + end + + subgraph "RabbitMQ 消息队列" + subgraph "VHost: dataflow-app" + subgraph "业务 Exchanges" + E1[douyin.exchange] + E2[tmall.exchange] + E3[redbook.exchange] + E4[其他.exchange...] + end + + subgraph "业务 Queues" + Q1[orders.queue
DLX: dlx.orders] + Q2[products.queue
DLX: dlx.products] + Q3[refunds.queue
DLX: dlx.refunds] + Q4[inventory.queue
DLX: dlx.inventory] + end + + subgraph "DLX 死信交换机" + DLX1[dlx.orders] + DLX2[dlx.products] + DLX3[dlx.refunds] + DLX4[dlx.inventory] + end + + subgraph "重试队列" + RQ1[orders.retry.queue
TTL: 5s] + RQ2[products.retry.queue
TTL: 5s] + RQ3[refunds.retry.queue
TTL: 5s] + RQ4[inventory.retry.queue
TTL: 5s] + end + + subgraph "错误处理" + EE1[douyin.errors.exchange] + EE2[tmall.errors.exchange] + EE3[redbook.errors.exchange] + EQ[errors.queue] + end + end + end + + subgraph "Dataflow 消费者" + C1[OrderConsumer] + C2[ProductConsumer] + C3[RefundConsumer] + C4[InventoryConsumer] + C5[ErrorConsumer] + end + + subgraph "存储层" + DB1[(orders 表
TimescaleDB)] + DB2[(products 表)] + DB3[(refunds 表)] + DB4[(inventory 表)] + DB5[(failed_messages 表)] + end + + subgraph "聚合视图" + AGG1[orders_daily_by_created] + AGG2[orders_daily_by_paid] + end + + P1 -->|推送订单/产品/退款/库存| E1 + P2 -->|推送订单/产品/退款/库存| E2 + P3 -->|推送订单/产品/退款/库存| E3 + P4 -->|推送订单/产品/退款/库存| E4 + + E1 & E2 & E3 & E4 -->|routing_key: order.platform| Q1 + E1 & E2 & E3 & E4 -->|routing_key: product.platform| Q2 + E1 & E2 & E3 & E4 -->|routing_key: refund.platform| Q3 + E1 & E2 & E3 & E4 -->|routing_key: inventory.platform| Q4 + + Q1 --> C1 + Q2 --> C2 + Q3 --> C3 + Q4 --> C4 + + C1 -->|成功| DB1 + C2 -->|成功| DB2 + C3 -->|成功| DB3 + C4 -->|成功| DB4 + + C1 -.->|失败 nack| DLX1 + C2 -.->|失败 nack| DLX2 + C3 -.->|失败 nack| DLX3 + C4 -.->|失败 nack| DLX4 + + DLX1 -->|x-retries<3| RQ1 + DLX2 -->|x-retries<3| RQ2 + DLX3 -->|x-retries<3| RQ3 + DLX4 -->|x-retries<3| RQ4 + + RQ1 -.->|TTL 到期
回到主队列| Q1 + RQ2 -.->|TTL 到期
回到主队列| Q2 + RQ3 -.->|TTL 到期
回到主队列| Q3 + RQ4 -.->|TTL 到期
回到主队列| Q4 + + DLX1 & DLX2 & DLX3 & DLX4 -->|x-retries>=3| EQ + C1 & C2 & C3 & C4 -.->|永久失败| EE1 & EE2 & EE3 + EE1 & EE2 & EE3 --> EQ + EQ --> C5 + C5 --> DB5 + + DB1 --> AGG1 + DB1 --> AGG2 + + style P1 fill:#e1f5dd + style P2 fill:#e1f5dd + style P3 fill:#e1f5dd + style P4 fill:#e1f5dd + style E1 fill:#fff4e6 + style E2 fill:#fff4e6 + style E3 fill:#fff4e6 + style E4 fill:#fff4e6 + style Q1 fill:#e3f2fd + style Q2 fill:#e3f2fd + style Q3 fill:#e3f2fd + style Q4 fill:#e3f2fd + style DLX1 fill:#ffe0b2 + style DLX2 fill:#ffe0b2 + style DLX3 fill:#ffe0b2 + style DLX4 fill:#ffe0b2 + style RQ1 fill:#fff9c4 + style RQ2 fill:#fff9c4 + style RQ3 fill:#fff9c4 + style RQ4 fill:#fff9c4 + style C1 fill:#f3e5f5 + style C2 fill:#f3e5f5 + style C3 fill:#f3e5f5 + style C4 fill:#f3e5f5 + style DB1 fill:#fce4ec + style AGG1 fill:#ffebee + style AGG2 fill:#ffebee +``` + +--- + +## RabbitMQ 内部架构 + +### 业务数据流转 + +```mermaid +graph LR + subgraph "生产者权限隔离" + PD[DouYin 开发者
user_douyin] + PT[Tmall 开发者
user_tmall] + PR[RedBook 开发者
user_redbook] + end + + subgraph "平台 Exchanges (Topic)" + ED[douyin.exchange] + ET[tmall.exchange] + ER[redbook.exchange] + end + + subgraph "数据类型 Queues" + QO[orders.queue
TTL: 24h] + QP[products.queue
TTL: 24h] + QR[refunds.queue
TTL: 24h] + QI[inventory.queue
TTL: 24h] + end + + subgraph "消费者" + CO[OrderConsumer
user_dataflow_consumer] + end + + PD -->|write only| ED + PT -->|write only| ET + PR -->|write only| ER + + ED -->|order.douyin| QO + ED -->|product.douyin| QP + ED -->|refund.douyin| QR + ED -->|inventory.douyin| QI + + ET -->|order.tmall| QO + ET -->|product.tmall| QP + ET -->|refund.tmall| QR + ET -->|inventory.tmall| QI + + ER -->|order.redbook| QO + ER -->|product.redbook| QP + ER -->|refund.redbook| QR + ER -->|inventory.redbook| QI + + QO -->|read| CO + QP -->|read| CO + QR -->|read| CO + QI -->|read| CO + + style PD fill:#c8e6c9 + style PT fill:#c8e6c9 + style PR fill:#c8e6c9 + style ED fill:#fff9c4 + style ET fill:#fff9c4 + style ER fill:#fff9c4 + style QO fill:#bbdefb + style QP fill:#bbdefb + style QR fill:#bbdefb + style QI fill:#bbdefb + style CO fill:#d1c4e9 +``` + +### 错误处理架构 + +```mermaid +graph TB + subgraph "消费者错误产生" + C1[OrderConsumer] + C2[ProductConsumer] + C3[RefundConsumer] + C4[InventoryConsumer] + end + + subgraph "平台错误 Exchanges" + ED[douyin.errors.exchange] + ET[tmall.errors.exchange] + ER[redbook.errors.exchange] + end + + subgraph "统一错误队列" + EQ[errors.queue
TTL: 7 days] + end + + subgraph "错误消息消费者" + EC[ErrorConsumer
user_ops] + DB[(failed_messages 表)] + end + + subgraph "平台开发者监控" + PD[DouYin 开发者
订阅 douyin.errors.exchange] + PT[Tmall 开发者
订阅 tmall.errors.exchange] + end + + C1 -->|判断平台| ED + C1 -->|判断平台| ET + C1 -->|判断平台| ER + + C2 -->|判断平台| ED + C2 -->|判断平台| ET + C2 -->|判断平台| ER + + C3 -->|判断平台| ED + C3 -->|判断平台| ET + C3 -->|判断平台| ER + + C4 -->|判断平台| ED + C4 -->|判断平台| ET + C4 -->|判断平台| ER + + ED -->|routing_key: #| EQ + ET -->|routing_key: #| EQ + ER -->|routing_key: #| EQ + + EQ --> EC + EC --> DB + + ED -.->|read only| PD + ET -.->|read only| PT + + style C1 fill:#ffccbc + style C2 fill:#ffccbc + style C3 fill:#ffccbc + style C4 fill:#ffccbc + style ED fill:#ffe0b2 + style ET fill:#ffe0b2 + style ER fill:#ffe0b2 + style EQ fill:#ffcdd2 + style EC fill:#f8bbd0 + style PD fill:#c5e1a5 + style PT fill:#c5e1a5 +``` + +--- + +## 关键技术方案 + +### 1. 乱序消费与数据一致性 + +#### 为什么必须支持乱序消费? + +系统设计上**必须支持乱序消费**,原因如下: + +**批处理内部并发**: +- 单消费者使用批处理模式(prefetch_count=100,batch_size=50) +- 批次内的消息可能并发处理,无法保证严格顺序 +- 同一批次中不同平台的消息会并发入库 + +**公平性需求**: +- 多个平台的消息进入同一队列(orders.queue) +- 批处理时优先处理简单消息,避免复杂消息阻塞整个批次 +- 示例:DouYin 的大量消息和 Tmall 的消息混合在同一批次中并发处理 + +**重试消息的乱序**: +- 失败消息通过 DLX 延迟重试后重新进入队列 +- 重试消息可能与新消息乱序 +- 必须防止旧版本数据覆盖新版本 + +#### 乱序消费带来的挑战 + +**核心问题**:同一订单的多次更新消息可能乱序到达 + +``` +时间 T1: 订单创建,金额 100 元 + → message_1: {platform_order_id: "DY123", amount: 100, data_version: 1736840000} + +时间 T2: 订单更新,金额 200 元 + → message_2: {platform_order_id: "DY123", amount: 200, data_version: 1736840100} + +乱序消费(正常现象): + Consumer A 先消费 message_2 → 数据库中金额为 200 ✅ + Consumer B 后消费 message_1 → 如果没有版本控制,会覆盖为 100 ❌ 错误! +``` + +**注意**: +- ✅ 不同订单的消息乱序消费 → 完全没问题 +- ✅ 不同平台的消息乱序消费 → 完全没问题 +- ❌ 同一订单的旧版本覆盖新版本 → 需要防止 + +**重要说明**: +虽然采用单消费者模型,但以下场景仍会导致乱序: +1. **批处理内部**:批次中的 50 条消息可能并发验证和入库 +2. **重试消息**:失败消息经过 DLX 延迟 5 秒后回到队列,会与新消息混合 +3. **数据库层面**:即使消费顺序正确,数据库事务提交顺序也可能不同 + +因此,**`data_version` 机制是必需的**,用于在数据库层面保证数据一致性。 + +#### 解决方案:数据版本控制 + +**方案**:在消息中增加 `data_version` 字段(Unix 时间戳),确保只有更新的数据才能写入数据库 + +```sql +-- 数据库表增加 data_version 字段 +ALTER TABLE orders ADD COLUMN data_version BIGINT NOT NULL DEFAULT 0; + +-- 消费时只有更新的数据才写入 +INSERT INTO orders ( + platform_id, platform_order_id, total_amount, data_version, ... +) VALUES (20, 'DY123', 200, 1736840100, ...) +ON CONFLICT (platform_id, platform_order_id) +DO UPDATE SET + total_amount = EXCLUDED.total_amount, + data_version = EXCLUDED.data_version, + updated_at = NOW() +WHERE orders.data_version < EXCLUDED.data_version; -- 关键:只有版本更新才更新 +``` + +**工作流程**: + +``` +message_2 先到达 (data_version: 1736840100): + INSERT → 数据库: amount=200, data_version=1736840100 ✅ + +message_1 后到达 (data_version: 1736840000): + ON CONFLICT DO UPDATE + WHERE orders.data_version < 1736840000 → 条件不满足,不更新 ✅ + 数据库仍然是: amount=200, data_version=1736840100 +``` + +**消息格式要求**: + +```json +{ + "message_id": "order#dataflow#100#20#200#order#DY123", + "metadata": { + "data_version": 1736840100 // 必需:平台数据的最后更新时间戳 + }, + "data": { + "platform_unique_id": "DY123", + "raw_data": { + "update_time": 1736840100 // 从平台原始数据中提取 + } + } +} +``` + +--- + +### 2. 幂等性保证(无 Redis 依赖) + +#### 方案:数据库表记录已处理消息 + +创建 `processed_messages` 表记录所有已处理的 message_id: + +```sql +CREATE TABLE processed_messages ( + message_id VARCHAR(200) PRIMARY KEY, + platform_id INT NOT NULL, + data_type VARCHAR(50) NOT NULL, + processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- 索引:加速查询 +CREATE INDEX idx_processed_messages_created ON processed_messages (created_at); + +-- 自动清理 30 天前的记录(可选) +CREATE INDEX idx_processed_messages_cleanup ON processed_messages (created_at) + WHERE created_at < NOW() - INTERVAL '30 days'; +``` + +#### 消费流程 + +``` +1. 收到消息,提取 message_id + +2. 检查是否已处理: + SELECT 1 FROM processed_messages WHERE message_id = ? + + 如果存在 → ACK 消息,跳过处理(幂等性) + +3. 开启事务: + BEGIN; + + 3.1 插入 processed_messages: + INSERT INTO processed_messages (message_id, platform_id, data_type) + VALUES (?, ?, ?) + ON CONFLICT (message_id) DO NOTHING; -- 防止并发插入 + + 3.2 插入/更新业务数据: + INSERT INTO orders (...) VALUES (...) + ON CONFLICT (platform_id, platform_order_id) + DO UPDATE SET ... + WHERE orders.data_version < EXCLUDED.data_version; + + COMMIT; + +4. ACK 消息 +``` + +#### 性能优化 + +**问题**:每次消费都查询 `processed_messages` 可能影响性能 + +**优化方案**: + +```sql +-- 1. 使用 PostgreSQL 的 ON CONFLICT 避免先查询 +INSERT INTO processed_messages (message_id, platform_id, data_type) +VALUES (?, ?, ?) +ON CONFLICT (message_id) DO NOTHING +RETURNING message_id; + +-- 如果 RETURNING 无结果 → 说明已处理,直接返回 +-- 如果 RETURNING 有结果 → 继续处理业务数据 + +-- 2. 定期清理历史记录(减少表大小) +DELETE FROM processed_messages +WHERE created_at < NOW() - INTERVAL '30 days'; +``` + +#### 与数据版本控制结合 + +``` +完整的幂等性 + 乱序处理流程: + +BEGIN; + +-- 1. 记录消息已处理(幂等性) +INSERT INTO processed_messages (message_id, platform_id, data_type) +VALUES ('order#dataflow#100#20#200#order#DY123', 20, 'order') +ON CONFLICT (message_id) DO NOTHING +RETURNING message_id; + +-- 如果返回空,说明重复消息,直接 ROLLBACK 并 ACK +-- 如果返回值,继续处理 + +-- 2. 更新业务数据(带版本控制,防止乱序) +INSERT INTO orders ( + platform_id, platform_order_id, total_amount, data_version, created_at, updated_at +) VALUES (20, 'DY123', 200, 1736840100, NOW(), NOW()) +ON CONFLICT (platform_id, platform_order_id) +DO UPDATE SET + total_amount = EXCLUDED.total_amount, + data_version = EXCLUDED.data_version, + updated_at = NOW() +WHERE orders.data_version < EXCLUDED.data_version; + +COMMIT; + +-- 3. ACK 消息 +``` + +--- + +## 数据流转流程 + +### 1. 订单数据完整流程 + +```mermaid +sequenceDiagram + participant P as DouYin 采集服务 + participant E as douyin.exchange + participant Q as orders.queue + participant C as OrderConsumer + participant DB as PostgreSQL + participant PM as processed_messages 表 + participant AGG as 连续聚合视图 + + Note over P: 1. 数据采集阶段 + P->>P: 从 DouYin API 获取订单数据 + P->>P: 生成 message_id
order#dataflow#100#20#200#order#DY123
提取 data_version + + Note over P,E: 2. 消息发布阶段 + P->>E: 发布消息
routing_key: order
持久化消息 + E->>Q: 路由到订单队列 + + Note over Q,C: 3. 消息消费阶段 + Q->>C: 推送消息 (prefetch 10) + C->>C: 验证消息格式 + C->>C: 验证 message_id 结构 + + Note over C,DB: 4. 幂等性检查 + 数据入库 (事务) + C->>DB: BEGIN TRANSACTION + + C->>PM: INSERT INTO processed_messages
ON CONFLICT DO NOTHING
RETURNING message_id + + alt 返回空 (重复消息) + PM-->>C: 无返回值 + C->>DB: ROLLBACK + C->>Q: ACK (跳过处理) + else 返回值 (新消息) + PM-->>C: 返回 message_id + + Note over C,DB: 5. 更新业务数据 (带版本控制) + C->>DB: INSERT INTO orders
ON CONFLICT DO UPDATE
WHERE data_version < new_version + DB-->>C: 更新成功 + + C->>DB: COMMIT + C->>Q: ACK (确认消息) + + Note over DB,AGG: 6. 自动聚合 (后台) + DB->>AGG: TimescaleDB 后台任务
每小时刷新聚合视图 + end +``` + +### 2. 错误处理流程 + +```mermaid +sequenceDiagram + participant P as Tmall 采集服务 + participant E as tmall.exchange + participant Q as orders.queue + participant C as OrderConsumer + participant EE as tmall.errors.exchange + participant EQ as errors.queue + participant EC as ErrorConsumer + participant DB as failed_messages 表 + participant Alert as 告警系统 + + P->>E: 发布订单消息 + E->>Q: 路由到订单队列 + Q->>C: 推送消息 + + Note over C: 消费处理 + C->>C: 验证消息格式 + alt 数据格式错误 (validation) + C->>C: 捕获 ValidationException + C->>EE: 发布错误消息
routing_key: validation + EE->>EQ: 路由到错误队列 + C->>Q: ACK (从业务队列移除) + + Note over EQ,EC: 错误消息处理 + EQ->>EC: 推送错误消息 + EC->>DB: 记录错误日志 + EC->>Alert: 判断是否需要告警 + alt 满足告警条件 + Alert->>Alert: 发送 Slack/Email 通知 + end + EC->>EQ: ACK + + else 处理成功 + C->>C: 入库成功 + C->>Q: ACK + end +``` + +### 3. 消息重试流程(DLX + 延迟重试队列) + +```mermaid +sequenceDiagram + participant Q as orders.queue + participant C as OrderConsumer + participant DB as PostgreSQL + participant DLX as dlx.orders + participant RQ as orders.retry.queue + participant EQ as errors.queue + + Q->>C: 推送消息 (x-retries: 0) + + Note over C,DB: 第 1 次处理 + C->>DB: 尝试入库 + DB-->>C: 数据库连接失败 + + C->>C: 更新 header: x-retries = 1 + C->>C: nack(requeue=false) + C-->>DLX: 消息进入 DLX + + Note over DLX: 根据 x-retries 路由 + DLX->>RQ: x-retries < 3
routing_key: retry + + Note over RQ: TTL = 5 秒 + Note over RQ: 5 秒后自动死信 + + RQ-->>Q: TTL 到期
回到主队列 + + Note over Q,C: 第 2 次处理 + Q->>C: 推送消息 (x-retries: 1) + C->>DB: 尝试入库 + DB-->>C: 数据库连接失败 + + C->>C: 更新 header: x-retries = 2 + C->>C: nack(requeue=false) + C-->>DLX: 消息进入 DLX + DLX->>RQ: x-retries < 3 + + Note over RQ: 5 秒后自动死信 + RQ-->>Q: 回到主队列 + + Note over Q,C: 第 3 次处理 + Q->>C: 推送消息 (x-retries: 2) + C->>DB: 尝试入库 + DB-->>C: 数据库连接失败 + + C->>C: 更新 header: x-retries = 3 + C->>C: nack(requeue=false) + C-->>DLX: 消息进入 DLX + + Note over DLX: x-retries >= 3
超过重试次数 + DLX->>EQ: routing_key: error
进入错误队列 +``` + +--- + +## 各角色职责和操作 + +### 1. 平台开发者(数据生产者) + +#### 职责 + +- 从电商平台 API 采集数据(订单/产品/退款) +- 将数据转换为标准消息格式 +- 推送消息到 RabbitMQ +- 监控自己平台的错误消息 + +#### 权限 + +| 操作 | 权限 | 说明 | +|------|------|------| +| 连接到 VHost | `dataflow-app` | 只能访问此 VHost | +| 发布消息 | `{platform}.exchange` | 只能写入自己平台的 Exchange | +| 订阅错误 | `{platform}.errors.exchange` | 可选,接收本平台错误通知 | + +#### 操作流程 + +##### 步骤 1: 生成 message_id + +``` +格式: {entity_type}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{platform_unique_id} + +示例: +- order#dataflow#100#20#200#order#DY123456789 +- product#dataflow#100#2#201#product#TM-PROD789 +- refund#dataflow#100#20#200#refund#DY-REF456 +- inventory#dataflow#100#2#201#inventory#TM-SKU123 +``` + +**生成逻辑**: +``` +# 伪代码 +message_id = f"{data_type}#dataflow#{company_id}#{platform_id}#{store_id}#{data_type}#{platform_order_id}" + +# 实际示例(DouYin 订单) +message_id = f"order#dataflow#100#20#200#order#{order_data['order_id']}" +``` + +##### 步骤 2: 提取 data_version + +**重要**:必须从平台原始数据中提取更新时间戳作为 `data_version` + +```python +# 示例:从 DouYin API 响应提取 +douyin_order = { + "order_id": "DY202501140001", + "create_time": 1736839200, + "update_time": 1736840000, # ← 使用这个作为 data_version + ... +} + +data_version = douyin_order.get('update_time') or douyin_order.get('create_time') + +# 如果平台未提供更新时间,使用当前时间 +if not data_version: + data_version = int(time.time()) +``` + +##### 步骤 3: 构建消息体 + +```json +{ + "message_id": "order#dataflow#100#20#200#order#DY123456", + "timestamp": "2025-01-14T10:30:00Z", + "platform": "douyin", + "data_type": "order", + "metadata": { + "platform_id": 20, + "company_id": 100, + "store_id": 200, + "source_system": "douyin-open-api", + "retry_count": 0, + "data_version": 1736840000 + }, + "data": { + "platform_unique_id": "DY123456", + "raw_data": { + // 平台原始完整 JSON 数据(必须包含更新时间) + "order_id": "DY123456", + "update_time": 1736840000, + ... + } + } +} +``` + +##### 步骤 4: 发布消息 + +**连接参数**: +- Host: `` +- Port: `5672` +- VHost: `dataflow-app` +- Username: `user_{platform}` +- Password: `` + +**发布配置**: +- Exchange: `{platform}.exchange` +- Routing Key: `{data_type}.{platform}` 格式 + - 订单: `order.{platform}` (如 `order.douyin`, `order.tmall`) + - 产品: `product.{platform}` (如 `product.douyin`, `product.tmall`) + - 退款: `refund.{platform}` (如 `refund.douyin`, `refund.tmall`) + - 库存: `inventory.{platform}` (如 `inventory.douyin`, `inventory.tmall`) +- Delivery Mode: `2` (持久化) + +##### 步骤 4: 监控错误(可选) + +**订阅配置**: +- Exchange: `{platform}.errors.exchange` +- Routing Key: `#` (接收所有错误类型) +- 创建临时队列或持久队列用于接收错误消息 + +**错误消息示例**: +```json +{ + "error_id": "err_1234567890", + "original_message": { /* 原始消息 */ }, + "error": { + "type": "validation", + "message": "Missing required field: total_amount", + "timestamp": "2025-01-14T10:31:00Z" + }, + "metadata": { + "platform": "amazon", + "platform_id": 1, + "store_id": 200, + "failed_at": "2025-01-14T10:31:00Z" + } +} +``` + +--- + +### 2. Dataflow 应用(数据消费者) + +#### 职责 + +- 消费 RabbitMQ 业务队列中的消息 +- 使用批处理模式提升吞吐量(prefetch=100, batch_size=50) +- 通过适配器模式支持多平台数据处理 +- 验证消息格式和数据完整性 +- 将数据入库到 PostgreSQL +- 处理失败消息,通过 DLX 延迟重试或进入错误队列 +- 实现幂等性,避免重复处理 + +#### 消费者架构模型 + +**单消费者 + 批处理 + 适配器模式**: + +``` +OrderConsumer (单实例) +├── RabbitMQ Channel +│ └── prefetch_count: 100 (最多缓存 100 条未确认消息) +│ +├── 消息缓冲区 (Message Buffer) +│ ├── 接收 RabbitMQ 推送的消息 +│ ├── 达到 batch_size=50 或 timeout=3s 触发处理 +│ └── 缓冲区最大容量: 100 条 +│ +├── 批处理引擎 (Batch Processor) +│ ├── 遍历缓冲区中的消息 +│ ├── 对每条消息调用对应的适配器 +│ └── 批量入库 + 单条 ACK +│ +└── 平台适配器 (Platform Adapters) + ├── DouyinAdapter: 处理抖音订单数据 + ├── TmallAdapter: 处理天猫订单数据 + ├── JDAdapter: 处理京东订单数据 + └── ... 其他平台适配器 + +工作流程: +1. RabbitMQ 推送消息到消费者 (最多 100 条未确认) +2. 消费者缓存消息到缓冲区 +3. 达到 batch_size 或 timeout 后触发批处理 +4. 批处理引擎遍历消息,根据 platform 字段调用对应适配器 +5. 适配器验证和转换数据 +6. 批量插入数据库 +7. 逐条 ACK/NACK 消息 +``` + +#### 权限 + +| 操作 | 权限 | 说明 | +|------|------|------| +| 连接到 VHost | `dataflow-app` | 访问业务 VHost | +| 读取队列 | `orders.queue`, `products.queue`, `refunds.queue` | 消费业务消息 | +| 写入错误 Exchange | `*.errors.exchange` | 发送错误消息到对应平台的错误 Exchange | +| 管理队列 | `orders.queue`, `products.queue`, `refunds.queue` | 配置消费者参数 | + +#### 核心操作 + +##### 消息消费配置 + +**推荐参数**: +- **prefetch_count**: `100` - RabbitMQ 最多推送 100 条未确认消息 +- **batch_size**: `50` - 消费者积累 50 条消息后触发批处理 +- **batch_timeout**: `3000` ms - 如果 3 秒内未达到 batch_size,也触发处理 +- **消费者模型**: 单消费者 + 批处理 + 适配器模式(每个队列一个消费者实例) +- **ACK 模式**: 手动 ACK,单条确认(处理成功后才确认) +- **重连机制**: 自动重连,指数退避 + +##### 幂等性保证(数据库方案) + +**方案**:使用 `processed_messages` 表记录已处理的 message_id + +```sql +-- 1. 创建幂等性记录表 +CREATE TABLE processed_messages ( + message_id VARCHAR(200) PRIMARY KEY, + platform_id INT NOT NULL, + data_type VARCHAR(50) NOT NULL, + processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_processed_messages_created ON processed_messages (created_at); + +-- 2. 定期清理历史记录(可选,保留 30 天) +DELETE FROM processed_messages WHERE created_at < NOW() - INTERVAL '30 days'; +``` + +**消费流程**: + +```sql +-- 在事务中执行 +BEGIN; + +-- 步骤 1: 尝试插入 processed_messages(幂等性检查) +INSERT INTO processed_messages (message_id, platform_id, data_type) +VALUES ('order#dataflow#100#20#200#order#DY123', 20, 'order') +ON CONFLICT (message_id) DO NOTHING +RETURNING message_id; + +-- 如果返回空 → 说明是重复消息,ROLLBACK 并 ACK +-- 如果返回值 → 继续处理 + +-- 步骤 2: 插入/更新业务数据(带版本控制) +INSERT INTO orders ( + platform_id, platform_order_id, company_id, store_id, + total_amount, data_version, raw, created_at, updated_at +) VALUES (20, 'DY123', 100, 200, 9999, 1736840000, '...'::jsonb, NOW(), NOW()) +ON CONFLICT (platform_id, platform_order_id) +DO UPDATE SET + total_amount = EXCLUDED.total_amount, + data_version = EXCLUDED.data_version, + raw = EXCLUDED.raw, + updated_at = NOW() +WHERE orders.data_version < EXCLUDED.data_version; -- 只有更新的版本才更新 + +COMMIT; +``` + +**关键点**: +- ✅ 无需 Redis 依赖 +- ✅ 事务保证原子性 +- ✅ `ON CONFLICT DO NOTHING` 实现幂等性 +- ✅ `WHERE data_version < EXCLUDED.data_version` 防止乱序更新 + +##### 错误处理逻辑 + +``` +处理流程: +1. 验证消息格式 + ├─ 格式正确 → 继续 + └─ 格式错误 → 发送到错误队列 (validation 类型), ACK + +2. 幂等性检查 + ├─ 已处理 → 直接 ACK + └─ 未处理 → 继续 + +3. 业务处理 + ├─ 成功 → 标记已处理, ACK + └─ 失败 (异常) + ├─ 数据库错误 / 网络错误 + │ ├─ retry_count < 3 → 重新发布 (延迟重试), ACK + │ └─ retry_count >= 3 → 发送到错误队列 (processing_failed), ACK + └─ 业务逻辑错误 → 发送到错误队列 (validation), ACK +``` + +##### 延迟重试实现(DLX + 重试队列方案) + +**重试策略**: +- 重试延迟: 固定 5 秒(通过 TTL 实现) +- 最大重试次数: 3 次 +- 超过 3 次: 进入错误队列 + +**实现方式**(无需额外插件): +``` +消费失败 → nack(requeue=false) → 进入 DLX + ↓ + x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列 + ↓ + x-retries >= 3 → 错误队列 (人工处理) +``` + +**工作流程**: +1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX(如 dlx.orders) +2. DLX 根据消息 header 中的 `x-retries` 判断路由: + - `x-retries < 3`:路由到 `orders.retry.queue`(延迟重试) + - `x-retries >= 3`:路由到 `errors.queue`(永久失败) +3. 重试队列 TTL 到期后,消息自动死信回主队列,重新消费 +4. 消费者需要在 nack 前更新消息的 `x-retries` header + +--- + +### 3. 运维团队 + +#### 职责 + +- RabbitMQ 集群部署和维护 +- 用户和权限管理 +- 监控队列健康状态 +- 处理系统级错误 +- 性能优化和故障排查 + +#### 权限 + +| 操作 | 权限 | 说明 | +|------|------|------| +| 管理员账号 | `admin` tag | 访问管理界面,完全权限 | +| 监控错误队列 | `errors.queue` (read) | 查看所有平台的错误消息 | +| 配置资源 | 所有 Exchange/Queue | 创建、修改、删除资源 | + +#### 日常操作 + +##### 监控关键指标 + +**队列堆积监控**: + +```bash +# 查看所有队列状态 +rabbitmqctl list_queues -p dataflow-app name messages_ready messages_unacknowledged + +# 告警阈值 +# - messages_ready > 10000: 严重堆积 +# - messages_unacknowledged > 5000: 消费者处理缓慢 +``` + +**消费者连接监控**: + +```bash +# 查看所有消费者连接 +rabbitmqctl list_consumers -p dataflow-app + +# 检查项: +# - 消费者数量是否正常 +# - prefetch_count 配置是否合理 +# - 是否有消费者断开 +``` + +**错误率监控**: + +```sql +-- 查询最近 1 小时各平台错误数量 +SELECT + platform, + error_type, + COUNT(*) as error_count +FROM failed_messages +WHERE created_at > NOW() - INTERVAL '1 hour' +GROUP BY platform, error_type +ORDER BY error_count DESC; +``` + +##### 故障排查 + +**队列堆积问题**: + +``` +排查步骤: +1. 检查消费者是否在线 + rabbitmqctl list_consumers -p dataflow-app + +2. 查看消费速率 + 管理界面 → Queues → 选择队列 → 查看 Deliver/Get rate + +3. 检查消费者日志 + - 是否有异常报错 + - 处理时间是否过长 + +解决方案: +- 增加消费者实例数量 +- 增加 prefetch_count +- 优化业务处理逻辑 +``` + +**消息丢失问题**: + +``` +排查步骤: +1. 确认消息是否发布成功 + 管理界面 → Exchanges → 查看 Message rates (Publish in) + +2. 确认 Binding 配置 + 管理界面 → Exchanges → 选择 Exchange → 查看 Bindings + +3. 检查消息是否持久化 + - Exchange durable: true + - Queue durable: true + - Message delivery_mode: 2 + +4. 查看消费者 ACK 行为 + - 是否正确 ACK + - 是否有 NACK/REJECT +``` + +##### 性能优化 + +**优化消费速度**: + +``` +措施: +1. 调整批处理参数 + - batch_size: 当前 50,可根据消息大小调整到 30-100 + - batch_timeout: 当前 3s,可根据实时性要求调整到 1-5s + +2. 调整 prefetch_count + - 当前: 100 + - 可根据内存和消息处理速度调整到 50-200 + - prefetch_count 应该是 batch_size 的 2-3 倍 + +3. 优化批处理逻辑 + - 使用数据库批量插入(INSERT ... VALUES (...), (...), ...) + - 批次内消息可以并发验证(适配器模式) + - 但必须单条 ACK 保证可靠性 + +4. 优化业务逻辑 + - 减少数据库查询 + - 使用连接池 + - 异步处理非关键操作 + +5. 水平扩展(谨慎) + - 单队列单消费者模型,不建议同一队列多消费者 + - 可考虑分片:不同数据类型独立扩展(orders/products 分别扩展) +``` + +**队列性能优化**: + +```bash +# 启用 lazy queue (大量消息堆积时) +rabbitmqctl set_policy lazy-queue "^(orders|products|refunds)\.queue$" \ + '{"queue-mode":"lazy"}' --vhost dataflow-app + +# 效果: +# - 消息存储在磁盘,减少内存占用 +# - 适用于消息堆积严重的场景 +``` + +--- + +### 4. RabbitMQ 管理员 + +#### 职责 + +- 初始化 RabbitMQ 配置 +- 创建 VHost、Exchange、Queue +- 管理用户和权限 +- 新平台接入配置 + +#### 核心操作 + +##### 新平台接入 + +当需要接入新平台(如 `aliexpress`)时,执行以下步骤: + +**步骤 1: 创建 Exchange** + +```bash +# 业务 Exchange +rabbitmqadmin -u admin -p declare exchange \ + name="aliexpress.exchange" vhost=dataflow-app \ + type=topic durable=true + +# 错误 Exchange +rabbitmqadmin -u admin -p declare exchange \ + name="aliexpress.errors.exchange" vhost=dataflow-app \ + type=topic durable=true +``` + +**步骤 2: 创建 Binding** + +```bash +# 绑定到业务队列(使用 {data_type}.{platform} 格式) +rabbitmqadmin -u admin -p declare binding \ + source="aliexpress.exchange" destination="orders.queue" \ + vhost=dataflow-app routing_key="order.aliexpress" + +rabbitmqadmin -u admin -p declare binding \ + source="aliexpress.exchange" destination="products.queue" \ + vhost=dataflow-app routing_key="product.aliexpress" + +rabbitmqadmin -u admin -p declare binding \ + source="aliexpress.exchange" destination="refunds.queue" \ + vhost=dataflow-app routing_key="refund.aliexpress" + +rabbitmqadmin -u admin -p declare binding \ + source="aliexpress.exchange" destination="inventory.queue" \ + vhost=dataflow-app routing_key="inventory.aliexpress" + +# 绑定到错误队列 +rabbitmqadmin -u admin -p declare binding \ + source="aliexpress.errors.exchange" destination="errors.queue" \ + vhost=dataflow-app routing_key="#" +``` + +**步骤 3: 创建用户** + +```bash +# 创建用户 +rabbitmqadmin -u admin -p declare user \ + name="user_aliexpress" password="" tags="" + +# 配置权限 +rabbitmqadmin -u admin -p declare permission \ + vhost=dataflow-app user="user_aliexpress" \ + configure="" write="^aliexpress\.(exchange|errors\.exchange)$" \ + read="^aliexpress\.errors\..*$" +``` + +**步骤 4: 提供给开发者** + +``` +连接信息: +- Host: +- Port: 5672 +- VHost: dataflow-app +- Username: user_aliexpress +- Password: + +Exchange: aliexpress.exchange +Routing Keys: + - 订单: order.aliexpress + - 产品: product.aliexpress + - 退款: refund.aliexpress + - 库存: inventory.aliexpress + +错误订阅 (可选): aliexpress.errors.exchange +``` + +--- + +## 监控和告警 + +### 1. 关键监控指标 + +```mermaid +graph TB + subgraph "队列监控" + M1[messages_ready
待消费消息数] + M2[messages_unacknowledged
未确认消息数] + M3[message_stats.publish_in_rate
消息发布速率] + M4[message_stats.deliver_rate
消息消费速率] + end + + subgraph "消费者监控" + C1[consumer_count
消费者数量] + C2[prefetch_count
预取数量] + C3[connection_state
连接状态] + end + + subgraph "错误监控" + E1[errors.queue 消息数
错误队列堆积] + E2[failed_messages 表记录数
失败消息总数] + E3[每小时错误增长率
错误频率] + end + + subgraph "系统监控" + S1[disk_free
磁盘剩余空间] + S2[mem_used
内存使用率] + S3[fd_used
文件描述符使用] + end + + style M1 fill:#ffcdd2 + style M2 fill:#ffcdd2 + style E1 fill:#ffcdd2 + style E2 fill:#ffcdd2 +``` + +### 2. 告警规则 + +| 指标 | 告警阈值 | 级别 | 处理建议 | +|------|---------|------|---------| +| `messages_ready` | > 10,000 | 警告 | 增加消费者 | +| `messages_ready` | > 50,000 | 严重 | 紧急扩容 | +| `messages_unacknowledged` | > 5,000 | 警告 | 检查消费者处理速度 | +| `consumer_count` | = 0 | 严重 | 消费者服务挂掉 | +| `publish_rate - deliver_rate` | > 1000/s | 警告 | 消费速度跟不上生产速度 | +| `errors.queue` 消息数 | > 1,000 | 警告 | 大量消息处理失败 | +| 单平台小时错误数 | > 100 | 警告 | 平台数据质量问题 | +| `disk_free` | < 10 GB | 严重 | 清理磁盘或扩容 | +| `mem_used` | > 80% | 警告 | 优化或增加内存 | + +### 3. 监控实现 + +**Prometheus + Grafana**: + +```yaml +# prometheus.yml +scrape_configs: + - job_name: 'rabbitmq' + static_configs: + - targets: ['rabbitmq-exporter:9419'] + metrics_path: /metrics +``` + +**告警规则示例**: + +```yaml +# alert_rules.yml +groups: + - name: rabbitmq_alerts + rules: + - alert: QueueBacklog + expr: rabbitmq_queue_messages_ready{queue=~"orders|products|refunds"} > 10000 + for: 5m + labels: + severity: warning + annotations: + summary: "队列 {{ $labels.queue }} 堆积严重" + description: "当前待消费消息数: {{ $value }}" + + - alert: NoConsumers + expr: rabbitmq_queue_consumers{queue=~"orders|products|refunds"} == 0 + for: 1m + labels: + severity: critical + annotations: + summary: "队列 {{ $labels.queue }} 无消费者" + description: "消费者服务可能已挂掉" +``` + +--- + +## 故障恢复 + +### 1. 消费者服务挂掉 + +```mermaid +graph LR + A[检测到消费者断开] --> B[消息堆积在队列中] + B --> C[告警通知运维] + C --> D[重启消费者服务] + D --> E[消费者自动重连] + E --> F[继续消费堆积消息] + F --> G{消息是否过期?} + G -->|未过期| H[正常处理] + G -->|已过期| I[消息被 TTL 删除] + + style A fill:#ffcdd2 + style C fill:#fff9c4 + style H fill:#c8e6c9 + style I fill:#ffab91 +``` + +**恢复时间**: +- 自动重连: 秒级 +- 手动重启: 分钟级 +- 数据不会丢失(消息持久化) + +### 2. RabbitMQ 服务重启 + +```mermaid +graph TB + A[RabbitMQ 重启] --> B[持久化资源自动恢复] + B --> C[Exchanges 恢复] + B --> D[Queues 恢复] + B --> E[Bindings 恢复] + B --> F[持久化消息恢复] + + A --> G[生产者重连] + A --> H[消费者重连] + + G --> I[继续发布消息] + H --> J[继续消费消息] + + style A fill:#ffcdd2 + style C fill:#c8e6c9 + style D fill:#c8e6c9 + style E fill:#c8e6c9 + style F fill:#c8e6c9 +``` + +**数据安全性**: +- ✅ 持久化 Exchange 自动恢复 +- ✅ 持久化 Queue 自动恢复 +- ✅ 持久化消息自动恢复 +- ❌ 未 ACK 的消息会重新投递(可能重复消费,需幂等性保证) + +### 3. 数据库连接失败 + +```mermaid +graph TB + A[数据库连接失败] --> B{retry_count < 3?} + B -->|是| C[延迟重试
60s / 120s / 240s] + C --> D[重新入队] + D --> E[等待下次消费] + E --> B + + B -->|否| F[超过重试次数] + F --> G[发送到错误队列] + G --> H[人工介入处理] + + style A fill:#ffcdd2 + style C fill:#fff9c4 + style G fill:#ffab91 + style H fill:#ce93d8 +``` + +--- + +## 最佳实践 + +### 1. 生产者最佳实践 + +- ✅ **使用结构化 message_id**:格式 `{type}#{app}#{company}#{platform}#{store}#{type}#{unique_id}` +- ✅ **必须提供 data_version**:从平台数据中提取更新时间戳,用于防止乱序更新 +- ✅ **消息持久化**:`delivery_mode=2` +- ✅ **包含完整 raw_data**:保留平台原始完整数据,便于后续数据补全和问题排查 +- ✅ **使用正确的 routing_key**:`order`/`product`/`refund`/`inventory` +- ✅ **实现发布确认**:等待 RabbitMQ 确认消息已接收 +- ✅ **实现重连机制**:网络故障时自动重连 +- ❌ **不要发送超大消息**(> 1 MB):考虑分片或使用对象存储 + 引用 + +### 2. 消费者最佳实践 + +- ✅ **单消费者模型**:每个队列一个消费者实例,避免消息分散 +- ✅ **批处理机制**:使用 prefetch + 批处理提升吞吐(batch_size=50, prefetch_count=100) +- ✅ **适配器模式**:消费者内部根据平台字段分发到不同业务逻辑 +- ✅ **手动 ACK**:单条确认,失败消息不影响其他消息 +- ✅ **幂等性保证**:使用 `processed_messages` 表记录已处理的 message_id +- ✅ **版本控制**:使用 `data_version` 字段防止乱序更新(批处理内部可能并发) +- ✅ **事务处理**:在一个事务中完成幂等性检查 + 业务数据更新 +- ✅ **DLX 重试机制**:失败消息通过 nack(requeue=false) 进入 DLX,延迟 5 秒后自动重试 +- ✅ **错误分类处理**:区分可重试错误(临时)和业务错误(永久) +- ✅ **限制重试次数**:最多 3 次,通过消息 header `x-retries` 控制 +- ✅ **批量数据库操作**:批次内使用批量插入提升性能 +- ✅ **记录详细日志**:便于故障排查 +- ✅ **定期清理 processed_messages**:保留 30 天历史记录即可 +- ❌ **不要同队列多消费者**:会导致消息 Round-Robin 分配,破坏设计 + +### 3. 运维最佳实践 + +- ✅ **定期备份配置**:使用 `rabbitmqadmin export` +- ✅ **监控告警**:配置 Prometheus + Grafana +- ✅ **日志归档**:定期清理 `failed_messages` 表 +- ✅ **压力测试**:定期测试消息吞吐能力 +- ✅ **灾难恢复演练**:定期演练故障恢复流程 +- ✅ **文档更新**:及时更新配置文档 + +--- + +## 附录 + +### A. 消息示例 + +#### 订单消息 + +```json +{ + "message_id": "order#dataflow#100#20#200#order#DY202501140001", + "timestamp": "2025-01-14T10:30:00Z", + "platform": "douyin", + "data_type": "order", + "metadata": { + "platform_id": 20, + "company_id": 100, + "store_id": 200, + "source_system": "douyin-open-api", + "retry_count": 0, + "data_version": 1736840000 + }, + "data": { + "platform_unique_id": "DY202501140001", + "raw_data": { + "order_id": "DY202501140001", + "create_time": 1736839200, + "pay_time": 1736840000, + "update_time": 1736840000, + "order_amount": 9999, + "pay_amount": 9999, + "order_status": 2, + "receiver_info": { + "province": "北京市", + "city": "北京市", + "district": "朝阳区", + "detail": "某某街道123号" + } + } + } +} +``` + +#### 产品消息 + +```json +{ + "message_id": "product#dataflow#100#2#201#product#TM-PROD-789", + "timestamp": "2025-01-14T11:00:00Z", + "platform": "tmall", + "data_type": "product", + "metadata": { + "platform_id": 2, + "company_id": 100, + "store_id": 201, + "source_system": "tmall-top-api", + "retry_count": 0, + "data_version": 1736841600 + }, + "data": { + "platform_unique_id": "TM-PROD-789", + "raw_data": { + "num_iid": 789123456789, + "title": "示例产品", + "price": "499.00", + "type": "fixed", + "modified": "2025-01-14 11:00:00", + "sku_list": [ + { + "sku_id": 987654321, + "price": "499.00", + "quantity": 100 + } + ] + } + } +} +``` + +### B. 参考文档 + +- [RabbitMQ 配置方案](./RabbitMQ.md) +- [TimescaleDB 数据查询方案](./data_query.md) +- [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html) +- [AMQP 0-9-1 协议](https://www.rabbitmq.com/amqp-0-9-1-reference.html)