Files
datahub/docs/data_flow.md
T

1500 lines
41 KiB
Markdown
Raw Normal View History

2025-11-14 16:04:26 +08:00
# 数据流转架构
## 概述
本文档描述电商数据从平台采集到入库的完整流程,包括各系统角色、职责和操作说明。
### 系统角色
| 角色 | 职责 | 系统 |
|------|------|------|
| **数据生产者** | 采集电商平台数据并推送到 MQ | 各平台采集服务 |
| **消息队列** | 数据路由、缓冲、隔离 | RabbitMQ |
2026-02-28 10:38:33 +08:00
| **数据消费者** | 消费 MQ 消息并入库 | Datahub App |
2025-11-14 16:04:26 +08:00
| **存储层** | 持久化存储和统计分析 | PostgreSQL + TimescaleDB |
---
## 整体架构
```mermaid
graph TB
subgraph "数据生产者层"
P1[DouYin 采集服务]
P2[Tmall 采集服务]
P3[RedBook 采集服务]
P4[其他平台采集服务...]
end
subgraph "RabbitMQ 消息队列"
2026-02-28 10:38:33 +08:00
subgraph "VHost: datahub-app"
2025-11-14 16:04:26 +08:00
subgraph "业务 Exchanges"
E1[douyin.exchange]
E2[tmall.exchange]
E3[redbook.exchange]
E4[其他.exchange...]
end
subgraph "业务 Queues"
Q1[orders.queue<br/>DLX: dlx.orders]
Q2[products.queue<br/>DLX: dlx.products]
Q3[refunds.queue<br/>DLX: dlx.refunds]
Q4[inventory.queue<br/>DLX: dlx.inventory]
end
subgraph "DLX 死信交换机"
DLX1[dlx.orders]
DLX2[dlx.products]
DLX3[dlx.refunds]
DLX4[dlx.inventory]
end
subgraph "重试队列"
RQ1[orders.retry.queue<br/>TTL: 5s]
RQ2[products.retry.queue<br/>TTL: 5s]
RQ3[refunds.retry.queue<br/>TTL: 5s]
RQ4[inventory.retry.queue<br/>TTL: 5s]
end
subgraph "错误处理"
EE1[douyin.errors.exchange]
EE2[tmall.errors.exchange]
EE3[redbook.errors.exchange]
EQ[errors.queue]
end
end
end
2026-02-28 10:38:33 +08:00
subgraph "Datahub 消费者"
2025-11-14 16:04:26 +08:00
C1[OrderConsumer]
C2[ProductConsumer]
C3[RefundConsumer]
C4[InventoryConsumer]
C5[ErrorConsumer]
end
subgraph "存储层"
DB1[(orders 表<br/>TimescaleDB)]
DB2[(products 表)]
DB3[(refunds 表)]
DB4[(inventory 表)]
DB5[(failed_messages 表)]
end
subgraph "聚合视图"
AGG1[orders_daily_by_created]
AGG2[orders_daily_by_paid]
end
P1 -->|推送订单/产品/退款/库存| E1
P2 -->|推送订单/产品/退款/库存| E2
P3 -->|推送订单/产品/退款/库存| E3
P4 -->|推送订单/产品/退款/库存| E4
E1 & E2 & E3 & E4 -->|routing_key: order.platform| Q1
E1 & E2 & E3 & E4 -->|routing_key: product.platform| Q2
E1 & E2 & E3 & E4 -->|routing_key: refund.platform| Q3
E1 & E2 & E3 & E4 -->|routing_key: inventory.platform| Q4
Q1 --> C1
Q2 --> C2
Q3 --> C3
Q4 --> C4
C1 -->|成功| DB1
C2 -->|成功| DB2
C3 -->|成功| DB3
C4 -->|成功| DB4
C1 -.->|失败 nack| DLX1
C2 -.->|失败 nack| DLX2
C3 -.->|失败 nack| DLX3
C4 -.->|失败 nack| DLX4
DLX1 -->|x-retries<3| RQ1
DLX2 -->|x-retries<3| RQ2
DLX3 -->|x-retries<3| RQ3
DLX4 -->|x-retries<3| RQ4
RQ1 -.->|TTL 到期<br/>回到主队列| Q1
RQ2 -.->|TTL 到期<br/>回到主队列| Q2
RQ3 -.->|TTL 到期<br/>回到主队列| Q3
RQ4 -.->|TTL 到期<br/>回到主队列| Q4
DLX1 & DLX2 & DLX3 & DLX4 -->|x-retries>=3| EQ
C1 & C2 & C3 & C4 -.->|永久失败| EE1 & EE2 & EE3
EE1 & EE2 & EE3 --> EQ
EQ --> C5
C5 --> DB5
DB1 --> AGG1
DB1 --> AGG2
style P1 fill:#e1f5dd
style P2 fill:#e1f5dd
style P3 fill:#e1f5dd
style P4 fill:#e1f5dd
style E1 fill:#fff4e6
style E2 fill:#fff4e6
style E3 fill:#fff4e6
style E4 fill:#fff4e6
style Q1 fill:#e3f2fd
style Q2 fill:#e3f2fd
style Q3 fill:#e3f2fd
style Q4 fill:#e3f2fd
style DLX1 fill:#ffe0b2
style DLX2 fill:#ffe0b2
style DLX3 fill:#ffe0b2
style DLX4 fill:#ffe0b2
style RQ1 fill:#fff9c4
style RQ2 fill:#fff9c4
style RQ3 fill:#fff9c4
style RQ4 fill:#fff9c4
style C1 fill:#f3e5f5
style C2 fill:#f3e5f5
style C3 fill:#f3e5f5
style C4 fill:#f3e5f5
style DB1 fill:#fce4ec
style AGG1 fill:#ffebee
style AGG2 fill:#ffebee
```
---
## RabbitMQ 内部架构
### 业务数据流转
```mermaid
graph LR
subgraph "生产者权限隔离"
PD[DouYin 开发者<br/>user_douyin]
PT[Tmall 开发者<br/>user_tmall]
PR[RedBook 开发者<br/>user_redbook]
end
subgraph "平台 Exchanges (Topic)"
ED[douyin.exchange]
ET[tmall.exchange]
ER[redbook.exchange]
end
subgraph "数据类型 Queues"
QO[orders.queue<br/>TTL: 24h]
QP[products.queue<br/>TTL: 24h]
QR[refunds.queue<br/>TTL: 24h]
QI[inventory.queue<br/>TTL: 24h]
end
subgraph "消费者"
2026-02-28 10:38:33 +08:00
CO[OrderConsumer<br/>user_datahub_consumer]
2025-11-14 16:04:26 +08:00
end
PD -->|write only| ED
PT -->|write only| ET
PR -->|write only| ER
ED -->|order.douyin| QO
ED -->|product.douyin| QP
ED -->|refund.douyin| QR
ED -->|inventory.douyin| QI
ET -->|order.tmall| QO
ET -->|product.tmall| QP
ET -->|refund.tmall| QR
ET -->|inventory.tmall| QI
ER -->|order.redbook| QO
ER -->|product.redbook| QP
ER -->|refund.redbook| QR
ER -->|inventory.redbook| QI
QO -->|read| CO
QP -->|read| CO
QR -->|read| CO
QI -->|read| CO
style PD fill:#c8e6c9
style PT fill:#c8e6c9
style PR fill:#c8e6c9
style ED fill:#fff9c4
style ET fill:#fff9c4
style ER fill:#fff9c4
style QO fill:#bbdefb
style QP fill:#bbdefb
style QR fill:#bbdefb
style QI fill:#bbdefb
style CO fill:#d1c4e9
```
### 错误处理架构
```mermaid
graph TB
subgraph "消费者错误产生"
C1[OrderConsumer]
C2[ProductConsumer]
C3[RefundConsumer]
C4[InventoryConsumer]
end
subgraph "平台错误 Exchanges"
ED[douyin.errors.exchange]
ET[tmall.errors.exchange]
ER[redbook.errors.exchange]
end
subgraph "统一错误队列"
EQ[errors.queue<br/>TTL: 7 days]
end
subgraph "错误消息消费者"
EC[ErrorConsumer<br/>user_ops]
DB[(failed_messages 表)]
end
subgraph "平台开发者监控"
PD[DouYin 开发者<br/>订阅 douyin.errors.exchange]
PT[Tmall 开发者<br/>订阅 tmall.errors.exchange]
end
C1 -->|判断平台| ED
C1 -->|判断平台| ET
C1 -->|判断平台| ER
C2 -->|判断平台| ED
C2 -->|判断平台| ET
C2 -->|判断平台| ER
C3 -->|判断平台| ED
C3 -->|判断平台| ET
C3 -->|判断平台| ER
C4 -->|判断平台| ED
C4 -->|判断平台| ET
C4 -->|判断平台| ER
ED -->|routing_key: #| EQ
ET -->|routing_key: #| EQ
ER -->|routing_key: #| EQ
EQ --> EC
EC --> DB
ED -.->|read only| PD
ET -.->|read only| PT
style C1 fill:#ffccbc
style C2 fill:#ffccbc
style C3 fill:#ffccbc
style C4 fill:#ffccbc
style ED fill:#ffe0b2
style ET fill:#ffe0b2
style ER fill:#ffe0b2
style EQ fill:#ffcdd2
style EC fill:#f8bbd0
style PD fill:#c5e1a5
style PT fill:#c5e1a5
```
---
## 关键技术方案
### 1. 乱序消费与数据一致性
#### 为什么必须支持乱序消费?
系统设计上**必须支持乱序消费**,原因如下:
**批处理内部并发**
- 单消费者使用批处理模式(prefetch_count=100batch_size=50
- 批次内的消息可能并发处理,无法保证严格顺序
- 同一批次中不同平台的消息会并发入库
**公平性需求**
- 多个平台的消息进入同一队列(orders.queue
- 批处理时优先处理简单消息,避免复杂消息阻塞整个批次
- 示例:DouYin 的大量消息和 Tmall 的消息混合在同一批次中并发处理
**重试消息的乱序**
- 失败消息通过 DLX 延迟重试后重新进入队列
- 重试消息可能与新消息乱序
- 必须防止旧版本数据覆盖新版本
#### 乱序消费带来的挑战
**核心问题**:同一订单的多次更新消息可能乱序到达
```
时间 T1: 订单创建,金额 100 元
→ message_1: {platform_order_id: "DY123", amount: 100, data_version: 1736840000}
时间 T2: 订单更新,金额 200 元
→ message_2: {platform_order_id: "DY123", amount: 200, data_version: 1736840100}
乱序消费(正常现象):
Consumer A 先消费 message_2 → 数据库中金额为 200 ✅
Consumer B 后消费 message_1 → 如果没有版本控制,会覆盖为 100 ❌ 错误!
```
**注意**
- ✅ 不同订单的消息乱序消费 → 完全没问题
- ✅ 不同平台的消息乱序消费 → 完全没问题
- ❌ 同一订单的旧版本覆盖新版本 → 需要防止
**重要说明**
虽然采用单消费者模型,但以下场景仍会导致乱序:
1. **批处理内部**:批次中的 50 条消息可能并发验证和入库
2. **重试消息**:失败消息经过 DLX 延迟 5 秒后回到队列,会与新消息混合
3. **数据库层面**:即使消费顺序正确,数据库事务提交顺序也可能不同
因此,**`data_version` 机制是必需的**,用于在数据库层面保证数据一致性。
#### 解决方案:数据版本控制
**方案**:在消息中增加 `data_version` 字段(Unix 时间戳),确保只有更新的数据才能写入数据库
```sql
-- 数据库表增加 data_version 字段
ALTER TABLE orders ADD COLUMN data_version BIGINT NOT NULL DEFAULT 0;
-- 消费时只有更新的数据才写入
INSERT INTO orders (
platform_id, platform_order_id, total_amount, data_version, ...
) VALUES (20, 'DY123', 200, 1736840100, ...)
ON CONFLICT (platform_id, platform_order_id)
DO UPDATE SET
total_amount = EXCLUDED.total_amount,
data_version = EXCLUDED.data_version,
updated_at = NOW()
WHERE orders.data_version < EXCLUDED.data_version; -- 关键:只有版本更新才更新
```
**工作流程**
```
message_2 先到达 (data_version: 1736840100):
INSERT → 数据库: amount=200, data_version=1736840100 ✅
message_1 后到达 (data_version: 1736840000):
ON CONFLICT DO UPDATE
WHERE orders.data_version < 1736840000 → 条件不满足,不更新 ✅
数据库仍然是: amount=200, data_version=1736840100
```
**消息格式要求**
```json
{
2026-02-28 10:38:33 +08:00
"message_id": "order#datahub#100#20#200#order#DY123",
2025-11-14 16:04:26 +08:00
"metadata": {
"data_version": 1736840100 // 必需:平台数据的最后更新时间戳
},
"data": {
"platform_unique_id": "DY123",
"raw_data": {
"update_time": 1736840100 // 从平台原始数据中提取
}
}
}
```
---
### 2. 幂等性保证(无 Redis 依赖)
#### 方案:数据库表记录已处理消息
创建 `processed_messages` 表记录所有已处理的 message_id
```sql
CREATE TABLE processed_messages (
message_id VARCHAR(200) PRIMARY KEY,
platform_id INT NOT NULL,
data_type VARCHAR(50) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 索引:加速查询
CREATE INDEX idx_processed_messages_created ON processed_messages (created_at);
-- 自动清理 30 天前的记录(可选)
CREATE INDEX idx_processed_messages_cleanup ON processed_messages (created_at)
WHERE created_at < NOW() - INTERVAL '30 days';
```
#### 消费流程
```
1. 收到消息,提取 message_id
2. 检查是否已处理:
SELECT 1 FROM processed_messages WHERE message_id = ?
如果存在 → ACK 消息,跳过处理(幂等性)
3. 开启事务:
BEGIN;
3.1 插入 processed_messages:
INSERT INTO processed_messages (message_id, platform_id, data_type)
VALUES (?, ?, ?)
ON CONFLICT (message_id) DO NOTHING; -- 防止并发插入
3.2 插入/更新业务数据:
INSERT INTO orders (...) VALUES (...)
ON CONFLICT (platform_id, platform_order_id)
DO UPDATE SET ...
WHERE orders.data_version < EXCLUDED.data_version;
COMMIT;
4. ACK 消息
```
#### 性能优化
**问题**:每次消费都查询 `processed_messages` 可能影响性能
**优化方案**
```sql
-- 1. 使用 PostgreSQL 的 ON CONFLICT 避免先查询
INSERT INTO processed_messages (message_id, platform_id, data_type)
VALUES (?, ?, ?)
ON CONFLICT (message_id) DO NOTHING
RETURNING message_id;
-- 如果 RETURNING 无结果 → 说明已处理,直接返回
-- 如果 RETURNING 有结果 → 继续处理业务数据
-- 2. 定期清理历史记录(减少表大小)
DELETE FROM processed_messages
WHERE created_at < NOW() - INTERVAL '30 days';
```
#### 与数据版本控制结合
```
完整的幂等性 + 乱序处理流程:
BEGIN;
-- 1. 记录消息已处理(幂等性)
INSERT INTO processed_messages (message_id, platform_id, data_type)
2026-02-28 10:38:33 +08:00
VALUES ('order#datahub#100#20#200#order#DY123', 20, 'order')
2025-11-14 16:04:26 +08:00
ON CONFLICT (message_id) DO NOTHING
RETURNING message_id;
-- 如果返回空,说明重复消息,直接 ROLLBACK 并 ACK
-- 如果返回值,继续处理
-- 2. 更新业务数据(带版本控制,防止乱序)
INSERT INTO orders (
platform_id, platform_order_id, total_amount, data_version, created_at, updated_at
) VALUES (20, 'DY123', 200, 1736840100, NOW(), NOW())
ON CONFLICT (platform_id, platform_order_id)
DO UPDATE SET
total_amount = EXCLUDED.total_amount,
data_version = EXCLUDED.data_version,
updated_at = NOW()
WHERE orders.data_version < EXCLUDED.data_version;
COMMIT;
-- 3. ACK 消息
```
---
## 数据流转流程
### 1. 订单数据完整流程
```mermaid
sequenceDiagram
participant P as DouYin 采集服务
participant E as douyin.exchange
participant Q as orders.queue
participant C as OrderConsumer
participant DB as PostgreSQL
participant PM as processed_messages 表
participant AGG as 连续聚合视图
Note over P: 1. 数据采集阶段
P->>P: 从 DouYin API 获取订单数据
2026-02-28 10:38:33 +08:00
P->>P: 生成 message_id<br/>order#datahub#100#20#200#order#DY123<br/>提取 data_version
2025-11-14 16:04:26 +08:00
Note over P,E: 2. 消息发布阶段
P->>E: 发布消息<br/>routing_key: order<br/>持久化消息
E->>Q: 路由到订单队列
Note over Q,C: 3. 消息消费阶段
Q->>C: 推送消息 (prefetch 10)
C->>C: 验证消息格式
C->>C: 验证 message_id 结构
Note over C,DB: 4. 幂等性检查 + 数据入库 (事务)
C->>DB: BEGIN TRANSACTION
C->>PM: INSERT INTO processed_messages<br/>ON CONFLICT DO NOTHING<br/>RETURNING message_id
alt 返回空 (重复消息)
PM-->>C: 无返回值
C->>DB: ROLLBACK
C->>Q: ACK (跳过处理)
else 返回值 (新消息)
PM-->>C: 返回 message_id
Note over C,DB: 5. 更新业务数据 (带版本控制)
C->>DB: INSERT INTO orders<br/>ON CONFLICT DO UPDATE<br/>WHERE data_version < new_version
DB-->>C: 更新成功
C->>DB: COMMIT
C->>Q: ACK (确认消息)
Note over DB,AGG: 6. 自动聚合 (后台)
DB->>AGG: TimescaleDB 后台任务<br/>每小时刷新聚合视图
end
```
### 2. 错误处理流程
```mermaid
sequenceDiagram
participant P as Tmall 采集服务
participant E as tmall.exchange
participant Q as orders.queue
participant C as OrderConsumer
participant EE as tmall.errors.exchange
participant EQ as errors.queue
participant EC as ErrorConsumer
participant DB as failed_messages 表
participant Alert as 告警系统
P->>E: 发布订单消息
E->>Q: 路由到订单队列
Q->>C: 推送消息
Note over C: 消费处理
C->>C: 验证消息格式
alt 数据格式错误 (validation)
C->>C: 捕获 ValidationException
C->>EE: 发布错误消息<br/>routing_key: validation
EE->>EQ: 路由到错误队列
C->>Q: ACK (从业务队列移除)
Note over EQ,EC: 错误消息处理
EQ->>EC: 推送错误消息
EC->>DB: 记录错误日志
EC->>Alert: 判断是否需要告警
alt 满足告警条件
Alert->>Alert: 发送 Slack/Email 通知
end
EC->>EQ: ACK
else 处理成功
C->>C: 入库成功
C->>Q: ACK
end
```
### 3. 消息重试流程(DLX + 延迟重试队列)
```mermaid
sequenceDiagram
participant Q as orders.queue
participant C as OrderConsumer
participant DB as PostgreSQL
participant DLX as dlx.orders
participant RQ as orders.retry.queue
participant EQ as errors.queue
Q->>C: 推送消息 (x-retries: 0)
Note over C,DB: 第 1 次处理
C->>DB: 尝试入库
DB-->>C: 数据库连接失败
C->>C: 更新 header: x-retries = 1
C->>C: nack(requeue=false)
C-->>DLX: 消息进入 DLX
Note over DLX: 根据 x-retries 路由
DLX->>RQ: x-retries < 3<br/>routing_key: retry
Note over RQ: TTL = 5 秒
Note over RQ: 5 秒后自动死信
RQ-->>Q: TTL 到期<br/>回到主队列
Note over Q,C: 第 2 次处理
Q->>C: 推送消息 (x-retries: 1)
C->>DB: 尝试入库
DB-->>C: 数据库连接失败
C->>C: 更新 header: x-retries = 2
C->>C: nack(requeue=false)
C-->>DLX: 消息进入 DLX
DLX->>RQ: x-retries < 3
Note over RQ: 5 秒后自动死信
RQ-->>Q: 回到主队列
Note over Q,C: 第 3 次处理
Q->>C: 推送消息 (x-retries: 2)
C->>DB: 尝试入库
DB-->>C: 数据库连接失败
C->>C: 更新 header: x-retries = 3
C->>C: nack(requeue=false)
C-->>DLX: 消息进入 DLX
Note over DLX: x-retries >= 3<br/>超过重试次数
DLX->>EQ: routing_key: error<br/>进入错误队列
```
---
## 各角色职责和操作
### 1. 平台开发者(数据生产者)
#### 职责
- 从电商平台 API 采集数据(订单/产品/退款)
- 将数据转换为标准消息格式
- 推送消息到 RabbitMQ
- 监控自己平台的错误消息
#### 权限
| 操作 | 权限 | 说明 |
|------|------|------|
2026-02-28 10:38:33 +08:00
| 连接到 VHost | `datahub-app` | 只能访问此 VHost |
2025-11-14 16:04:26 +08:00
| 发布消息 | `{platform}.exchange` | 只能写入自己平台的 Exchange |
| 订阅错误 | `{platform}.errors.exchange` | 可选,接收本平台错误通知 |
#### 操作流程
##### 步骤 1: 生成 message_id
```
格式: {entity_type}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{platform_unique_id}
示例:
2026-02-28 10:38:33 +08:00
- order#datahub#100#20#200#order#DY123456789
- product#datahub#100#2#201#product#TM-PROD789
- refund#datahub#100#20#200#refund#DY-REF456
- inventory#datahub#100#2#201#inventory#TM-SKU123
2025-11-14 16:04:26 +08:00
```
**生成逻辑**
```
# 伪代码
2026-02-28 10:38:33 +08:00
message_id = f"{data_type}#datahub#{company_id}#{platform_id}#{store_id}#{data_type}#{platform_order_id}"
2025-11-14 16:04:26 +08:00
# 实际示例(DouYin 订单)
2026-02-28 10:38:33 +08:00
message_id = f"order#datahub#100#20#200#order#{order_data['order_id']}"
2025-11-14 16:04:26 +08:00
```
##### 步骤 2: 提取 data_version
**重要**:必须从平台原始数据中提取更新时间戳作为 `data_version`
```python
# 示例:从 DouYin API 响应提取
douyin_order = {
"order_id": "DY202501140001",
"create_time": 1736839200,
"update_time": 1736840000, # ← 使用这个作为 data_version
...
}
data_version = douyin_order.get('update_time') or douyin_order.get('create_time')
# 如果平台未提供更新时间,使用当前时间
if not data_version:
data_version = int(time.time())
```
##### 步骤 3: 构建消息体
```json
{
2026-02-28 10:38:33 +08:00
"message_id": "order#datahub#100#20#200#order#DY123456",
2025-11-14 16:04:26 +08:00
"timestamp": "2025-01-14T10:30:00Z",
"platform": "douyin",
"data_type": "order",
"metadata": {
"platform_id": 20,
"company_id": 100,
"store_id": 200,
"source_system": "douyin-open-api",
"retry_count": 0,
"data_version": 1736840000
},
"data": {
"platform_unique_id": "DY123456",
"raw_data": {
// 平台原始完整 JSON 数据(必须包含更新时间)
"order_id": "DY123456",
"update_time": 1736840000,
...
}
}
}
```
##### 步骤 4: 发布消息
**连接参数**
- Host: `<rabbitmq-host>`
- Port: `5672`
2026-02-28 10:38:33 +08:00
- VHost: `datahub-app`
2025-11-14 16:04:26 +08:00
- Username: `user_{platform}`
- Password: `<provided_password>`
**发布配置**
- Exchange: `{platform}.exchange`
- Routing Key: `{data_type}.{platform}` 格式
- 订单: `order.{platform}` (如 `order.douyin`, `order.tmall`)
- 产品: `product.{platform}` (如 `product.douyin`, `product.tmall`)
- 退款: `refund.{platform}` (如 `refund.douyin`, `refund.tmall`)
- 库存: `inventory.{platform}` (如 `inventory.douyin`, `inventory.tmall`)
- Delivery Mode: `2` (持久化)
##### 步骤 4: 监控错误(可选)
**订阅配置**
- Exchange: `{platform}.errors.exchange`
- Routing Key: `#` (接收所有错误类型)
- 创建临时队列或持久队列用于接收错误消息
**错误消息示例**
```json
{
"error_id": "err_1234567890",
"original_message": { /* */ },
"error": {
"type": "validation",
"message": "Missing required field: total_amount",
"timestamp": "2025-01-14T10:31:00Z"
},
"metadata": {
"platform": "amazon",
"platform_id": 1,
"store_id": 200,
"failed_at": "2025-01-14T10:31:00Z"
}
}
```
---
2026-02-28 10:38:33 +08:00
### 2. Datahub 应用(数据消费者)
2025-11-14 16:04:26 +08:00
#### 职责
- 消费 RabbitMQ 业务队列中的消息
- 使用批处理模式提升吞吐量(prefetch=100, batch_size=50
- 通过适配器模式支持多平台数据处理
- 验证消息格式和数据完整性
- 将数据入库到 PostgreSQL
- 处理失败消息,通过 DLX 延迟重试或进入错误队列
- 实现幂等性,避免重复处理
#### 消费者架构模型
**单消费者 + 批处理 + 适配器模式**
```
OrderConsumer (单实例)
├── RabbitMQ Channel
│ └── prefetch_count: 100 (最多缓存 100 条未确认消息)
├── 消息缓冲区 (Message Buffer)
│ ├── 接收 RabbitMQ 推送的消息
│ ├── 达到 batch_size=50 或 timeout=3s 触发处理
│ └── 缓冲区最大容量: 100 条
├── 批处理引擎 (Batch Processor)
│ ├── 遍历缓冲区中的消息
│ ├── 对每条消息调用对应的适配器
│ └── 批量入库 + 单条 ACK
└── 平台适配器 (Platform Adapters)
├── DouyinAdapter: 处理抖音订单数据
├── TmallAdapter: 处理天猫订单数据
├── JDAdapter: 处理京东订单数据
└── ... 其他平台适配器
工作流程:
1. RabbitMQ 推送消息到消费者 (最多 100 条未确认)
2. 消费者缓存消息到缓冲区
3. 达到 batch_size 或 timeout 后触发批处理
4. 批处理引擎遍历消息,根据 platform 字段调用对应适配器
5. 适配器验证和转换数据
6. 批量插入数据库
7. 逐条 ACK/NACK 消息
```
2025-11-14 16:07:35 +08:00
** 消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **
2025-11-14 16:04:26 +08:00
#### 权限
| 操作 | 权限 | 说明 |
|------|------|------|
2026-02-28 10:38:33 +08:00
| 连接到 VHost | `datahub-app` | 访问业务 VHost |
2025-11-14 16:04:26 +08:00
| 读取队列 | `orders.queue`, `products.queue`, `refunds.queue` | 消费业务消息 |
| 写入错误 Exchange | `*.errors.exchange` | 发送错误消息到对应平台的错误 Exchange |
| 管理队列 | `orders.queue`, `products.queue`, `refunds.queue` | 配置消费者参数 |
#### 核心操作
##### 消息消费配置
**推荐参数**
- **prefetch_count**: `100` - RabbitMQ 最多推送 100 条未确认消息
- **batch_size**: `50` - 消费者积累 50 条消息后触发批处理
- **batch_timeout**: `3000` ms - 如果 3 秒内未达到 batch_size,也触发处理
- **消费者模型**: 单消费者 + 批处理 + 适配器模式(每个队列一个消费者实例)
- **ACK 模式**: 手动 ACK,单条确认(处理成功后才确认)
- **重连机制**: 自动重连,指数退避
##### 幂等性保证(数据库方案)
**方案**:使用 `processed_messages` 表记录已处理的 message_id
```sql
-- 1. 创建幂等性记录表
CREATE TABLE processed_messages (
message_id VARCHAR(200) PRIMARY KEY,
platform_id INT NOT NULL,
data_type VARCHAR(50) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_processed_messages_created ON processed_messages (created_at);
-- 2. 定期清理历史记录(可选,保留 30 天)
DELETE FROM processed_messages WHERE created_at < NOW() - INTERVAL '30 days';
```
**消费流程**
```sql
-- 在事务中执行
BEGIN;
-- 步骤 1: 尝试插入 processed_messages(幂等性检查)
INSERT INTO processed_messages (message_id, platform_id, data_type)
2026-02-28 10:38:33 +08:00
VALUES ('order#datahub#100#20#200#order#DY123', 20, 'order')
2025-11-14 16:04:26 +08:00
ON CONFLICT (message_id) DO NOTHING
RETURNING message_id;
-- 如果返回空 → 说明是重复消息,ROLLBACK 并 ACK
-- 如果返回值 → 继续处理
-- 步骤 2: 插入/更新业务数据(带版本控制)
INSERT INTO orders (
platform_id, platform_order_id, company_id, store_id,
total_amount, data_version, raw, created_at, updated_at
) VALUES (20, 'DY123', 100, 200, 9999, 1736840000, '...'::jsonb, NOW(), NOW())
ON CONFLICT (platform_id, platform_order_id)
DO UPDATE SET
total_amount = EXCLUDED.total_amount,
data_version = EXCLUDED.data_version,
raw = EXCLUDED.raw,
updated_at = NOW()
WHERE orders.data_version < EXCLUDED.data_version; -- 只有更新的版本才更新
COMMIT;
```
**关键点**
- ✅ 无需 Redis 依赖
- ✅ 事务保证原子性
-`ON CONFLICT DO NOTHING` 实现幂等性
-`WHERE data_version < EXCLUDED.data_version` 防止乱序更新
##### 错误处理逻辑
```
处理流程:
1. 验证消息格式
├─ 格式正确 → 继续
└─ 格式错误 → 发送到错误队列 (validation 类型), ACK
2. 幂等性检查
├─ 已处理 → 直接 ACK
└─ 未处理 → 继续
3. 业务处理
├─ 成功 → 标记已处理, ACK
└─ 失败 (异常)
├─ 数据库错误 / 网络错误
│ ├─ retry_count < 3 → 重新发布 (延迟重试), ACK
│ └─ retry_count >= 3 → 发送到错误队列 (processing_failed), ACK
└─ 业务逻辑错误 → 发送到错误队列 (validation), ACK
```
##### 延迟重试实现(DLX + 重试队列方案)
**重试策略**
- 重试延迟: 固定 5 秒(通过 TTL 实现)
- 最大重试次数: 3 次
- 超过 3 次: 进入错误队列
**实现方式**(无需额外插件):
```
消费失败 → nack(requeue=false) → 进入 DLX
x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列
x-retries >= 3 → 错误队列 (人工处理)
```
**工作流程**
1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX(如 dlx.orders
2. DLX 根据消息 header 中的 `x-retries` 判断路由:
- `x-retries < 3`:路由到 `orders.retry.queue`(延迟重试)
- `x-retries >= 3`:路由到 `errors.queue`(永久失败)
3. 重试队列 TTL 到期后,消息自动死信回主队列,重新消费
4. 消费者需要在 nack 前更新消息的 `x-retries` header
---
### 3. 运维团队
#### 职责
- RabbitMQ 集群部署和维护
- 用户和权限管理
- 监控队列健康状态
- 处理系统级错误
- 性能优化和故障排查
#### 权限
| 操作 | 权限 | 说明 |
|------|------|------|
| 管理员账号 | `admin` tag | 访问管理界面,完全权限 |
| 监控错误队列 | `errors.queue` (read) | 查看所有平台的错误消息 |
| 配置资源 | 所有 Exchange/Queue | 创建、修改、删除资源 |
#### 日常操作
##### 监控关键指标
**队列堆积监控**
```bash
# 查看所有队列状态
2026-02-28 10:38:33 +08:00
rabbitmqctl list_queues -p datahub-app name messages_ready messages_unacknowledged
2025-11-14 16:04:26 +08:00
# 告警阈值
# - messages_ready > 10000: 严重堆积
# - messages_unacknowledged > 5000: 消费者处理缓慢
```
**消费者连接监控**
```bash
# 查看所有消费者连接
2026-02-28 10:38:33 +08:00
rabbitmqctl list_consumers -p datahub-app
2025-11-14 16:04:26 +08:00
# 检查项:
# - 消费者数量是否正常
# - prefetch_count 配置是否合理
# - 是否有消费者断开
```
**错误率监控**
```sql
-- 查询最近 1 小时各平台错误数量
SELECT
platform,
error_type,
COUNT(*) as error_count
FROM failed_messages
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY platform, error_type
ORDER BY error_count DESC;
```
##### 故障排查
**队列堆积问题**
```
排查步骤:
1. 检查消费者是否在线
2026-02-28 10:38:33 +08:00
rabbitmqctl list_consumers -p datahub-app
2025-11-14 16:04:26 +08:00
2. 查看消费速率
管理界面 → Queues → 选择队列 → 查看 Deliver/Get rate
3. 检查消费者日志
- 是否有异常报错
- 处理时间是否过长
解决方案:
- 增加消费者实例数量
- 增加 prefetch_count
- 优化业务处理逻辑
```
**消息丢失问题**
```
排查步骤:
1. 确认消息是否发布成功
管理界面 → Exchanges → 查看 Message rates (Publish in)
2. 确认 Binding 配置
管理界面 → Exchanges → 选择 Exchange → 查看 Bindings
3. 检查消息是否持久化
- Exchange durable: true
- Queue durable: true
- Message delivery_mode: 2
4. 查看消费者 ACK 行为
- 是否正确 ACK
- 是否有 NACK/REJECT
```
##### 性能优化
**优化消费速度**
```
措施:
1. 调整批处理参数
- batch_size: 当前 50,可根据消息大小调整到 30-100
- batch_timeout: 当前 3s,可根据实时性要求调整到 1-5s
2. 调整 prefetch_count
- 当前: 100
- 可根据内存和消息处理速度调整到 50-200
- prefetch_count 应该是 batch_size 的 2-3 倍
3. 优化批处理逻辑
- 使用数据库批量插入(INSERT ... VALUES (...), (...), ...
- 批次内消息可以并发验证(适配器模式)
- 但必须单条 ACK 保证可靠性
4. 优化业务逻辑
- 减少数据库查询
- 使用连接池
- 异步处理非关键操作
5. 水平扩展(谨慎)
- 单队列单消费者模型,不建议同一队列多消费者
- 可考虑分片:不同数据类型独立扩展(orders/products 分别扩展)
```
**队列性能优化**
```bash
# 启用 lazy queue (大量消息堆积时)
rabbitmqctl set_policy lazy-queue "^(orders|products|refunds)\.queue$" \
2026-02-28 10:38:33 +08:00
'{"queue-mode":"lazy"}' --vhost datahub-app
2025-11-14 16:04:26 +08:00
# 效果:
# - 消息存储在磁盘,减少内存占用
# - 适用于消息堆积严重的场景
```
---
### 4. RabbitMQ 管理员
#### 职责
- 初始化 RabbitMQ 配置
- 创建 VHost、Exchange、Queue
- 管理用户和权限
- 新平台接入配置
#### 核心操作
##### 新平台接入
当需要接入新平台(如 `aliexpress`)时,执行以下步骤:
**步骤 1: 创建 Exchange**
```bash
# 业务 Exchange
rabbitmqadmin -u admin -p <password> declare exchange \
2026-02-28 10:38:33 +08:00
name="aliexpress.exchange" vhost=datahub-app \
2025-11-14 16:04:26 +08:00
type=topic durable=true
# 错误 Exchange
rabbitmqadmin -u admin -p <password> declare exchange \
2026-02-28 10:38:33 +08:00
name="aliexpress.errors.exchange" vhost=datahub-app \
2025-11-14 16:04:26 +08:00
type=topic durable=true
```
**步骤 2: 创建 Binding**
```bash
# 绑定到业务队列(使用 {data_type}.{platform} 格式)
rabbitmqadmin -u admin -p <password> declare binding \
source="aliexpress.exchange" destination="orders.queue" \
2026-02-28 10:38:33 +08:00
vhost=datahub-app routing_key="order.aliexpress"
2025-11-14 16:04:26 +08:00
rabbitmqadmin -u admin -p <password> declare binding \
source="aliexpress.exchange" destination="products.queue" \
2026-02-28 10:38:33 +08:00
vhost=datahub-app routing_key="product.aliexpress"
2025-11-14 16:04:26 +08:00
rabbitmqadmin -u admin -p <password> declare binding \
source="aliexpress.exchange" destination="refunds.queue" \
2026-02-28 10:38:33 +08:00
vhost=datahub-app routing_key="refund.aliexpress"
2025-11-14 16:04:26 +08:00
rabbitmqadmin -u admin -p <password> declare binding \
source="aliexpress.exchange" destination="inventory.queue" \
2026-02-28 10:38:33 +08:00
vhost=datahub-app routing_key="inventory.aliexpress"
2025-11-14 16:04:26 +08:00
# 绑定到错误队列
rabbitmqadmin -u admin -p <password> declare binding \
source="aliexpress.errors.exchange" destination="errors.queue" \
2026-02-28 10:38:33 +08:00
vhost=datahub-app routing_key="#"
2025-11-14 16:04:26 +08:00
```
**步骤 3: 创建用户**
```bash
# 创建用户
rabbitmqadmin -u admin -p <password> declare user \
name="user_aliexpress" password="<strong_password>" tags=""
# 配置权限
rabbitmqadmin -u admin -p <password> declare permission \
2026-02-28 10:38:33 +08:00
vhost=datahub-app user="user_aliexpress" \
2025-11-14 16:04:26 +08:00
configure="" write="^aliexpress\.(exchange|errors\.exchange)$" \
read="^aliexpress\.errors\..*$"
```
**步骤 4: 提供给开发者**
```
连接信息:
- Host: <rabbitmq-host>
- Port: 5672
2026-02-28 10:38:33 +08:00
- VHost: datahub-app
2025-11-14 16:04:26 +08:00
- Username: user_aliexpress
- Password: <strong_password>
Exchange: aliexpress.exchange
Routing Keys:
- 订单: order.aliexpress
- 产品: product.aliexpress
- 退款: refund.aliexpress
- 库存: inventory.aliexpress
错误订阅 (可选): aliexpress.errors.exchange
```
---
## 监控和告警
### 1. 关键监控指标
```mermaid
graph TB
subgraph "队列监控"
M1[messages_ready<br/>待消费消息数]
M2[messages_unacknowledged<br/>未确认消息数]
M3[message_stats.publish_in_rate<br/>消息发布速率]
M4[message_stats.deliver_rate<br/>消息消费速率]
end
subgraph "消费者监控"
C1[consumer_count<br/>消费者数量]
C2[prefetch_count<br/>预取数量]
C3[connection_state<br/>连接状态]
end
subgraph "错误监控"
E1[errors.queue 消息数<br/>错误队列堆积]
E2[failed_messages 表记录数<br/>失败消息总数]
E3[每小时错误增长率<br/>错误频率]
end
subgraph "系统监控"
S1[disk_free<br/>磁盘剩余空间]
S2[mem_used<br/>内存使用率]
S3[fd_used<br/>文件描述符使用]
end
style M1 fill:#ffcdd2
style M2 fill:#ffcdd2
style E1 fill:#ffcdd2
style E2 fill:#ffcdd2
```
### 2. 告警规则
| 指标 | 告警阈值 | 级别 | 处理建议 |
|------|---------|------|---------|
| `messages_ready` | > 10,000 | 警告 | 增加消费者 |
| `messages_ready` | > 50,000 | 严重 | 紧急扩容 |
| `messages_unacknowledged` | > 5,000 | 警告 | 检查消费者处理速度 |
| `consumer_count` | = 0 | 严重 | 消费者服务挂掉 |
| `publish_rate - deliver_rate` | > 1000/s | 警告 | 消费速度跟不上生产速度 |
| `errors.queue` 消息数 | > 1,000 | 警告 | 大量消息处理失败 |
| 单平台小时错误数 | > 100 | 警告 | 平台数据质量问题 |
| `disk_free` | < 10 GB | 严重 | 清理磁盘或扩容 |
| `mem_used` | > 80% | 警告 | 优化或增加内存 |
### 3. 监控实现
**Prometheus + Grafana**
```yaml
# prometheus.yml
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets: ['rabbitmq-exporter:9419']
metrics_path: /metrics
```
**告警规则示例**
```yaml
# alert_rules.yml
groups:
- name: rabbitmq_alerts
rules:
- alert: QueueBacklog
expr: rabbitmq_queue_messages_ready{queue=~"orders|products|refunds"} > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "队列 {{ $labels.queue }} 堆积严重"
description: "当前待消费消息数: {{ $value }}"
- alert: NoConsumers
expr: rabbitmq_queue_consumers{queue=~"orders|products|refunds"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "队列 {{ $labels.queue }} 无消费者"
description: "消费者服务可能已挂掉"
```
---
## 故障恢复
### 1. 消费者服务挂掉
```mermaid
graph LR
A[检测到消费者断开] --> B[消息堆积在队列中]
B --> C[告警通知运维]
C --> D[重启消费者服务]
D --> E[消费者自动重连]
E --> F[继续消费堆积消息]
F --> G{消息是否过期?}
G -->|未过期| H[正常处理]
G -->|已过期| I[消息被 TTL 删除]
style A fill:#ffcdd2
style C fill:#fff9c4
style H fill:#c8e6c9
style I fill:#ffab91
```
**恢复时间**
- 自动重连: 秒级
- 手动重启: 分钟级
- 数据不会丢失(消息持久化)
### 2. RabbitMQ 服务重启
```mermaid
graph TB
A[RabbitMQ 重启] --> B[持久化资源自动恢复]
B --> C[Exchanges 恢复]
B --> D[Queues 恢复]
B --> E[Bindings 恢复]
B --> F[持久化消息恢复]
A --> G[生产者重连]
A --> H[消费者重连]
G --> I[继续发布消息]
H --> J[继续消费消息]
style A fill:#ffcdd2
style C fill:#c8e6c9
style D fill:#c8e6c9
style E fill:#c8e6c9
style F fill:#c8e6c9
```
**数据安全性**
- ✅ 持久化 Exchange 自动恢复
- ✅ 持久化 Queue 自动恢复
- ✅ 持久化消息自动恢复
- ❌ 未 ACK 的消息会重新投递(可能重复消费,需幂等性保证)
### 3. 数据库连接失败
```mermaid
graph TB
A[数据库连接失败] --> B{retry_count < 3?}
B -->|是| C[延迟重试<br/>60s / 120s / 240s]
C --> D[重新入队]
D --> E[等待下次消费]
E --> B
B -->|否| F[超过重试次数]
F --> G[发送到错误队列]
G --> H[人工介入处理]
style A fill:#ffcdd2
style C fill:#fff9c4
style G fill:#ffab91
style H fill:#ce93d8
```
---
## 最佳实践
### 1. 生产者最佳实践
-**使用结构化 message_id**:格式 `{type}#{app}#{company}#{platform}#{store}#{type}#{unique_id}`
-**必须提供 data_version**:从平台数据中提取更新时间戳,用于防止乱序更新
-**消息持久化**`delivery_mode=2`
-**包含完整 raw_data**:保留平台原始完整数据,便于后续数据补全和问题排查
-**使用正确的 routing_key**`order`/`product`/`refund`/`inventory`
-**实现发布确认**:等待 RabbitMQ 确认消息已接收
-**实现重连机制**:网络故障时自动重连
-**不要发送超大消息**(> 1 MB):考虑分片或使用对象存储 + 引用
### 2. 消费者最佳实践
-**单消费者模型**:每个队列一个消费者实例,避免消息分散
-**批处理机制**:使用 prefetch + 批处理提升吞吐(batch_size=50, prefetch_count=100
-**适配器模式**:消费者内部根据平台字段分发到不同业务逻辑
-**手动 ACK**:单条确认,失败消息不影响其他消息
-**幂等性保证**:使用 `processed_messages` 表记录已处理的 message_id
-**版本控制**:使用 `data_version` 字段防止乱序更新(批处理内部可能并发)
-**事务处理**:在一个事务中完成幂等性检查 + 业务数据更新
-**DLX 重试机制**:失败消息通过 nack(requeue=false) 进入 DLX,延迟 5 秒后自动重试
-**错误分类处理**:区分可重试错误(临时)和业务错误(永久)
-**限制重试次数**:最多 3 次,通过消息 header `x-retries` 控制
-**批量数据库操作**:批次内使用批量插入提升性能
-**记录详细日志**:便于故障排查
-**定期清理 processed_messages**:保留 30 天历史记录即可
-**不要同队列多消费者**:会导致消息 Round-Robin 分配,破坏设计
### 3. 运维最佳实践
-**定期备份配置**:使用 `rabbitmqadmin export`
-**监控告警**:配置 Prometheus + Grafana
-**日志归档**:定期清理 `failed_messages`
-**压力测试**:定期测试消息吞吐能力
-**灾难恢复演练**:定期演练故障恢复流程
-**文档更新**:及时更新配置文档
---
## 附录
### A. 消息示例
#### 订单消息
```json
{
2026-02-28 10:38:33 +08:00
"message_id": "order#datahub#100#20#200#order#DY202501140001",
2025-11-14 16:04:26 +08:00
"timestamp": "2025-01-14T10:30:00Z",
"platform": "douyin",
"data_type": "order",
"metadata": {
"platform_id": 20,
"company_id": 100,
"store_id": 200,
"source_system": "douyin-open-api",
"retry_count": 0,
"data_version": 1736840000
},
"data": {
"platform_unique_id": "DY202501140001",
"raw_data": {
"order_id": "DY202501140001",
"create_time": 1736839200,
"pay_time": 1736840000,
"update_time": 1736840000,
"order_amount": 9999,
"pay_amount": 9999,
"order_status": 2,
"receiver_info": {
"province": "北京市",
"city": "北京市",
"district": "朝阳区",
"detail": "某某街道123号"
}
}
}
}
```
#### 产品消息
```json
{
2026-02-28 10:38:33 +08:00
"message_id": "product#datahub#100#2#201#product#TM-PROD-789",
2025-11-14 16:04:26 +08:00
"timestamp": "2025-01-14T11:00:00Z",
"platform": "tmall",
"data_type": "product",
"metadata": {
"platform_id": 2,
"company_id": 100,
"store_id": 201,
"source_system": "tmall-top-api",
"retry_count": 0,
"data_version": 1736841600
},
"data": {
"platform_unique_id": "TM-PROD-789",
"raw_data": {
"num_iid": 789123456789,
"title": "示例产品",
"price": "499.00",
"type": "fixed",
"modified": "2025-01-14 11:00:00",
"sku_list": [
{
"sku_id": 987654321,
"price": "499.00",
"quantity": 100
}
]
}
}
}
```
### B. 参考文档
- [RabbitMQ 配置方案](./RabbitMQ.md)
- [TimescaleDB 数据查询方案](./data_query.md)
- [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html)
- [AMQP 0-9-1 协议](https://www.rabbitmq.com/amqp-0-9-1-reference.html)