2025-11-14 16:04:26 +08:00
|
|
|
|
# 数据流转架构
|
|
|
|
|
|
|
|
|
|
|
|
## 概述
|
|
|
|
|
|
|
|
|
|
|
|
本文档描述电商数据从平台采集到入库的完整流程,包括各系统角色、职责和操作说明。
|
|
|
|
|
|
|
|
|
|
|
|
### 系统角色
|
|
|
|
|
|
|
|
|
|
|
|
| 角色 | 职责 | 系统 |
|
|
|
|
|
|
|------|------|------|
|
|
|
|
|
|
| **数据生产者** | 采集电商平台数据并推送到 MQ | 各平台采集服务 |
|
|
|
|
|
|
| **消息队列** | 数据路由、缓冲、隔离 | RabbitMQ |
|
2026-02-28 10:38:33 +08:00
|
|
|
|
| **数据消费者** | 消费 MQ 消息并入库 | Datahub App |
|
2025-11-14 16:04:26 +08:00
|
|
|
|
| **存储层** | 持久化存储和统计分析 | PostgreSQL + TimescaleDB |
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 整体架构
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
graph TB
|
|
|
|
|
|
subgraph "数据生产者层"
|
|
|
|
|
|
P1[DouYin 采集服务]
|
|
|
|
|
|
P2[Tmall 采集服务]
|
|
|
|
|
|
P3[RedBook 采集服务]
|
|
|
|
|
|
P4[其他平台采集服务...]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "RabbitMQ 消息队列"
|
2026-02-28 10:38:33 +08:00
|
|
|
|
subgraph "VHost: datahub-app"
|
2025-11-14 16:04:26 +08:00
|
|
|
|
subgraph "业务 Exchanges"
|
|
|
|
|
|
E1[douyin.exchange]
|
|
|
|
|
|
E2[tmall.exchange]
|
|
|
|
|
|
E3[redbook.exchange]
|
|
|
|
|
|
E4[其他.exchange...]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "业务 Queues"
|
|
|
|
|
|
Q1[orders.queue<br/>DLX: dlx.orders]
|
|
|
|
|
|
Q2[products.queue<br/>DLX: dlx.products]
|
|
|
|
|
|
Q3[refunds.queue<br/>DLX: dlx.refunds]
|
|
|
|
|
|
Q4[inventory.queue<br/>DLX: dlx.inventory]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "DLX 死信交换机"
|
|
|
|
|
|
DLX1[dlx.orders]
|
|
|
|
|
|
DLX2[dlx.products]
|
|
|
|
|
|
DLX3[dlx.refunds]
|
|
|
|
|
|
DLX4[dlx.inventory]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "重试队列"
|
|
|
|
|
|
RQ1[orders.retry.queue<br/>TTL: 5s]
|
|
|
|
|
|
RQ2[products.retry.queue<br/>TTL: 5s]
|
|
|
|
|
|
RQ3[refunds.retry.queue<br/>TTL: 5s]
|
|
|
|
|
|
RQ4[inventory.retry.queue<br/>TTL: 5s]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "错误处理"
|
|
|
|
|
|
EE1[douyin.errors.exchange]
|
|
|
|
|
|
EE2[tmall.errors.exchange]
|
|
|
|
|
|
EE3[redbook.errors.exchange]
|
|
|
|
|
|
EQ[errors.queue]
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
|
2026-02-28 10:38:33 +08:00
|
|
|
|
subgraph "Datahub 消费者"
|
2025-11-14 16:04:26 +08:00
|
|
|
|
C1[OrderConsumer]
|
|
|
|
|
|
C2[ProductConsumer]
|
|
|
|
|
|
C3[RefundConsumer]
|
|
|
|
|
|
C4[InventoryConsumer]
|
|
|
|
|
|
C5[ErrorConsumer]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "存储层"
|
|
|
|
|
|
DB1[(orders 表<br/>TimescaleDB)]
|
|
|
|
|
|
DB2[(products 表)]
|
|
|
|
|
|
DB3[(refunds 表)]
|
|
|
|
|
|
DB4[(inventory 表)]
|
|
|
|
|
|
DB5[(failed_messages 表)]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "聚合视图"
|
|
|
|
|
|
AGG1[orders_daily_by_created]
|
|
|
|
|
|
AGG2[orders_daily_by_paid]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
P1 -->|推送订单/产品/退款/库存| E1
|
|
|
|
|
|
P2 -->|推送订单/产品/退款/库存| E2
|
|
|
|
|
|
P3 -->|推送订单/产品/退款/库存| E3
|
|
|
|
|
|
P4 -->|推送订单/产品/退款/库存| E4
|
|
|
|
|
|
|
|
|
|
|
|
E1 & E2 & E3 & E4 -->|routing_key: order.platform| Q1
|
|
|
|
|
|
E1 & E2 & E3 & E4 -->|routing_key: product.platform| Q2
|
|
|
|
|
|
E1 & E2 & E3 & E4 -->|routing_key: refund.platform| Q3
|
|
|
|
|
|
E1 & E2 & E3 & E4 -->|routing_key: inventory.platform| Q4
|
|
|
|
|
|
|
|
|
|
|
|
Q1 --> C1
|
|
|
|
|
|
Q2 --> C2
|
|
|
|
|
|
Q3 --> C3
|
|
|
|
|
|
Q4 --> C4
|
|
|
|
|
|
|
|
|
|
|
|
C1 -->|成功| DB1
|
|
|
|
|
|
C2 -->|成功| DB2
|
|
|
|
|
|
C3 -->|成功| DB3
|
|
|
|
|
|
C4 -->|成功| DB4
|
|
|
|
|
|
|
|
|
|
|
|
C1 -.->|失败 nack| DLX1
|
|
|
|
|
|
C2 -.->|失败 nack| DLX2
|
|
|
|
|
|
C3 -.->|失败 nack| DLX3
|
|
|
|
|
|
C4 -.->|失败 nack| DLX4
|
|
|
|
|
|
|
|
|
|
|
|
DLX1 -->|x-retries<3| RQ1
|
|
|
|
|
|
DLX2 -->|x-retries<3| RQ2
|
|
|
|
|
|
DLX3 -->|x-retries<3| RQ3
|
|
|
|
|
|
DLX4 -->|x-retries<3| RQ4
|
|
|
|
|
|
|
|
|
|
|
|
RQ1 -.->|TTL 到期<br/>回到主队列| Q1
|
|
|
|
|
|
RQ2 -.->|TTL 到期<br/>回到主队列| Q2
|
|
|
|
|
|
RQ3 -.->|TTL 到期<br/>回到主队列| Q3
|
|
|
|
|
|
RQ4 -.->|TTL 到期<br/>回到主队列| Q4
|
|
|
|
|
|
|
|
|
|
|
|
DLX1 & DLX2 & DLX3 & DLX4 -->|x-retries>=3| EQ
|
|
|
|
|
|
C1 & C2 & C3 & C4 -.->|永久失败| EE1 & EE2 & EE3
|
|
|
|
|
|
EE1 & EE2 & EE3 --> EQ
|
|
|
|
|
|
EQ --> C5
|
|
|
|
|
|
C5 --> DB5
|
|
|
|
|
|
|
|
|
|
|
|
DB1 --> AGG1
|
|
|
|
|
|
DB1 --> AGG2
|
|
|
|
|
|
|
|
|
|
|
|
style P1 fill:#e1f5dd
|
|
|
|
|
|
style P2 fill:#e1f5dd
|
|
|
|
|
|
style P3 fill:#e1f5dd
|
|
|
|
|
|
style P4 fill:#e1f5dd
|
|
|
|
|
|
style E1 fill:#fff4e6
|
|
|
|
|
|
style E2 fill:#fff4e6
|
|
|
|
|
|
style E3 fill:#fff4e6
|
|
|
|
|
|
style E4 fill:#fff4e6
|
|
|
|
|
|
style Q1 fill:#e3f2fd
|
|
|
|
|
|
style Q2 fill:#e3f2fd
|
|
|
|
|
|
style Q3 fill:#e3f2fd
|
|
|
|
|
|
style Q4 fill:#e3f2fd
|
|
|
|
|
|
style DLX1 fill:#ffe0b2
|
|
|
|
|
|
style DLX2 fill:#ffe0b2
|
|
|
|
|
|
style DLX3 fill:#ffe0b2
|
|
|
|
|
|
style DLX4 fill:#ffe0b2
|
|
|
|
|
|
style RQ1 fill:#fff9c4
|
|
|
|
|
|
style RQ2 fill:#fff9c4
|
|
|
|
|
|
style RQ3 fill:#fff9c4
|
|
|
|
|
|
style RQ4 fill:#fff9c4
|
|
|
|
|
|
style C1 fill:#f3e5f5
|
|
|
|
|
|
style C2 fill:#f3e5f5
|
|
|
|
|
|
style C3 fill:#f3e5f5
|
|
|
|
|
|
style C4 fill:#f3e5f5
|
|
|
|
|
|
style DB1 fill:#fce4ec
|
|
|
|
|
|
style AGG1 fill:#ffebee
|
|
|
|
|
|
style AGG2 fill:#ffebee
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## RabbitMQ 内部架构
|
|
|
|
|
|
|
|
|
|
|
|
### 业务数据流转
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
graph LR
|
|
|
|
|
|
subgraph "生产者权限隔离"
|
|
|
|
|
|
PD[DouYin 开发者<br/>user_douyin]
|
|
|
|
|
|
PT[Tmall 开发者<br/>user_tmall]
|
|
|
|
|
|
PR[RedBook 开发者<br/>user_redbook]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "平台 Exchanges (Topic)"
|
|
|
|
|
|
ED[douyin.exchange]
|
|
|
|
|
|
ET[tmall.exchange]
|
|
|
|
|
|
ER[redbook.exchange]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "数据类型 Queues"
|
|
|
|
|
|
QO[orders.queue<br/>TTL: 24h]
|
|
|
|
|
|
QP[products.queue<br/>TTL: 24h]
|
|
|
|
|
|
QR[refunds.queue<br/>TTL: 24h]
|
|
|
|
|
|
QI[inventory.queue<br/>TTL: 24h]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "消费者"
|
2026-02-28 10:38:33 +08:00
|
|
|
|
CO[OrderConsumer<br/>user_datahub_consumer]
|
2025-11-14 16:04:26 +08:00
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
PD -->|write only| ED
|
|
|
|
|
|
PT -->|write only| ET
|
|
|
|
|
|
PR -->|write only| ER
|
|
|
|
|
|
|
|
|
|
|
|
ED -->|order.douyin| QO
|
|
|
|
|
|
ED -->|product.douyin| QP
|
|
|
|
|
|
ED -->|refund.douyin| QR
|
|
|
|
|
|
ED -->|inventory.douyin| QI
|
|
|
|
|
|
|
|
|
|
|
|
ET -->|order.tmall| QO
|
|
|
|
|
|
ET -->|product.tmall| QP
|
|
|
|
|
|
ET -->|refund.tmall| QR
|
|
|
|
|
|
ET -->|inventory.tmall| QI
|
|
|
|
|
|
|
|
|
|
|
|
ER -->|order.redbook| QO
|
|
|
|
|
|
ER -->|product.redbook| QP
|
|
|
|
|
|
ER -->|refund.redbook| QR
|
|
|
|
|
|
ER -->|inventory.redbook| QI
|
|
|
|
|
|
|
|
|
|
|
|
QO -->|read| CO
|
|
|
|
|
|
QP -->|read| CO
|
|
|
|
|
|
QR -->|read| CO
|
|
|
|
|
|
QI -->|read| CO
|
|
|
|
|
|
|
|
|
|
|
|
style PD fill:#c8e6c9
|
|
|
|
|
|
style PT fill:#c8e6c9
|
|
|
|
|
|
style PR fill:#c8e6c9
|
|
|
|
|
|
style ED fill:#fff9c4
|
|
|
|
|
|
style ET fill:#fff9c4
|
|
|
|
|
|
style ER fill:#fff9c4
|
|
|
|
|
|
style QO fill:#bbdefb
|
|
|
|
|
|
style QP fill:#bbdefb
|
|
|
|
|
|
style QR fill:#bbdefb
|
|
|
|
|
|
style QI fill:#bbdefb
|
|
|
|
|
|
style CO fill:#d1c4e9
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
### 错误处理架构
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
graph TB
|
|
|
|
|
|
subgraph "消费者错误产生"
|
|
|
|
|
|
C1[OrderConsumer]
|
|
|
|
|
|
C2[ProductConsumer]
|
|
|
|
|
|
C3[RefundConsumer]
|
|
|
|
|
|
C4[InventoryConsumer]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "平台错误 Exchanges"
|
|
|
|
|
|
ED[douyin.errors.exchange]
|
|
|
|
|
|
ET[tmall.errors.exchange]
|
|
|
|
|
|
ER[redbook.errors.exchange]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "统一错误队列"
|
|
|
|
|
|
EQ[errors.queue<br/>TTL: 7 days]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "错误消息消费者"
|
|
|
|
|
|
EC[ErrorConsumer<br/>user_ops]
|
|
|
|
|
|
DB[(failed_messages 表)]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "平台开发者监控"
|
|
|
|
|
|
PD[DouYin 开发者<br/>订阅 douyin.errors.exchange]
|
|
|
|
|
|
PT[Tmall 开发者<br/>订阅 tmall.errors.exchange]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
C1 -->|判断平台| ED
|
|
|
|
|
|
C1 -->|判断平台| ET
|
|
|
|
|
|
C1 -->|判断平台| ER
|
|
|
|
|
|
|
|
|
|
|
|
C2 -->|判断平台| ED
|
|
|
|
|
|
C2 -->|判断平台| ET
|
|
|
|
|
|
C2 -->|判断平台| ER
|
|
|
|
|
|
|
|
|
|
|
|
C3 -->|判断平台| ED
|
|
|
|
|
|
C3 -->|判断平台| ET
|
|
|
|
|
|
C3 -->|判断平台| ER
|
|
|
|
|
|
|
|
|
|
|
|
C4 -->|判断平台| ED
|
|
|
|
|
|
C4 -->|判断平台| ET
|
|
|
|
|
|
C4 -->|判断平台| ER
|
|
|
|
|
|
|
|
|
|
|
|
ED -->|routing_key: #| EQ
|
|
|
|
|
|
ET -->|routing_key: #| EQ
|
|
|
|
|
|
ER -->|routing_key: #| EQ
|
|
|
|
|
|
|
|
|
|
|
|
EQ --> EC
|
|
|
|
|
|
EC --> DB
|
|
|
|
|
|
|
|
|
|
|
|
ED -.->|read only| PD
|
|
|
|
|
|
ET -.->|read only| PT
|
|
|
|
|
|
|
|
|
|
|
|
style C1 fill:#ffccbc
|
|
|
|
|
|
style C2 fill:#ffccbc
|
|
|
|
|
|
style C3 fill:#ffccbc
|
|
|
|
|
|
style C4 fill:#ffccbc
|
|
|
|
|
|
style ED fill:#ffe0b2
|
|
|
|
|
|
style ET fill:#ffe0b2
|
|
|
|
|
|
style ER fill:#ffe0b2
|
|
|
|
|
|
style EQ fill:#ffcdd2
|
|
|
|
|
|
style EC fill:#f8bbd0
|
|
|
|
|
|
style PD fill:#c5e1a5
|
|
|
|
|
|
style PT fill:#c5e1a5
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 关键技术方案
|
|
|
|
|
|
|
|
|
|
|
|
### 1. 乱序消费与数据一致性
|
|
|
|
|
|
|
|
|
|
|
|
#### 为什么必须支持乱序消费?
|
|
|
|
|
|
|
|
|
|
|
|
系统设计上**必须支持乱序消费**,原因如下:
|
|
|
|
|
|
|
|
|
|
|
|
**批处理内部并发**:
|
|
|
|
|
|
- 单消费者使用批处理模式(prefetch_count=100,batch_size=50)
|
|
|
|
|
|
- 批次内的消息可能并发处理,无法保证严格顺序
|
|
|
|
|
|
- 同一批次中不同平台的消息会并发入库
|
|
|
|
|
|
|
|
|
|
|
|
**公平性需求**:
|
|
|
|
|
|
- 多个平台的消息进入同一队列(orders.queue)
|
|
|
|
|
|
- 批处理时优先处理简单消息,避免复杂消息阻塞整个批次
|
|
|
|
|
|
- 示例:DouYin 的大量消息和 Tmall 的消息混合在同一批次中并发处理
|
|
|
|
|
|
|
|
|
|
|
|
**重试消息的乱序**:
|
|
|
|
|
|
- 失败消息通过 DLX 延迟重试后重新进入队列
|
|
|
|
|
|
- 重试消息可能与新消息乱序
|
|
|
|
|
|
- 必须防止旧版本数据覆盖新版本
|
|
|
|
|
|
|
|
|
|
|
|
#### 乱序消费带来的挑战
|
|
|
|
|
|
|
|
|
|
|
|
**核心问题**:同一订单的多次更新消息可能乱序到达
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
时间 T1: 订单创建,金额 100 元
|
|
|
|
|
|
→ message_1: {platform_order_id: "DY123", amount: 100, data_version: 1736840000}
|
|
|
|
|
|
|
|
|
|
|
|
时间 T2: 订单更新,金额 200 元
|
|
|
|
|
|
→ message_2: {platform_order_id: "DY123", amount: 200, data_version: 1736840100}
|
|
|
|
|
|
|
|
|
|
|
|
乱序消费(正常现象):
|
|
|
|
|
|
Consumer A 先消费 message_2 → 数据库中金额为 200 ✅
|
|
|
|
|
|
Consumer B 后消费 message_1 → 如果没有版本控制,会覆盖为 100 ❌ 错误!
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**注意**:
|
|
|
|
|
|
- ✅ 不同订单的消息乱序消费 → 完全没问题
|
|
|
|
|
|
- ✅ 不同平台的消息乱序消费 → 完全没问题
|
|
|
|
|
|
- ❌ 同一订单的旧版本覆盖新版本 → 需要防止
|
|
|
|
|
|
|
|
|
|
|
|
**重要说明**:
|
|
|
|
|
|
虽然采用单消费者模型,但以下场景仍会导致乱序:
|
|
|
|
|
|
1. **批处理内部**:批次中的 50 条消息可能并发验证和入库
|
|
|
|
|
|
2. **重试消息**:失败消息经过 DLX 延迟 5 秒后回到队列,会与新消息混合
|
|
|
|
|
|
3. **数据库层面**:即使消费顺序正确,数据库事务提交顺序也可能不同
|
|
|
|
|
|
|
|
|
|
|
|
因此,**`data_version` 机制是必需的**,用于在数据库层面保证数据一致性。
|
|
|
|
|
|
|
|
|
|
|
|
#### 解决方案:数据版本控制
|
|
|
|
|
|
|
|
|
|
|
|
**方案**:在消息中增加 `data_version` 字段(Unix 时间戳),确保只有更新的数据才能写入数据库
|
|
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
|
-- 数据库表增加 data_version 字段
|
|
|
|
|
|
ALTER TABLE orders ADD COLUMN data_version BIGINT NOT NULL DEFAULT 0;
|
|
|
|
|
|
|
|
|
|
|
|
-- 消费时只有更新的数据才写入
|
|
|
|
|
|
INSERT INTO orders (
|
|
|
|
|
|
platform_id, platform_order_id, total_amount, data_version, ...
|
|
|
|
|
|
) VALUES (20, 'DY123', 200, 1736840100, ...)
|
|
|
|
|
|
ON CONFLICT (platform_id, platform_order_id)
|
|
|
|
|
|
DO UPDATE SET
|
|
|
|
|
|
total_amount = EXCLUDED.total_amount,
|
|
|
|
|
|
data_version = EXCLUDED.data_version,
|
|
|
|
|
|
updated_at = NOW()
|
|
|
|
|
|
WHERE orders.data_version < EXCLUDED.data_version; -- 关键:只有版本更新才更新
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**工作流程**:
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
message_2 先到达 (data_version: 1736840100):
|
|
|
|
|
|
INSERT → 数据库: amount=200, data_version=1736840100 ✅
|
|
|
|
|
|
|
|
|
|
|
|
message_1 后到达 (data_version: 1736840000):
|
|
|
|
|
|
ON CONFLICT DO UPDATE
|
|
|
|
|
|
WHERE orders.data_version < 1736840000 → 条件不满足,不更新 ✅
|
|
|
|
|
|
数据库仍然是: amount=200, data_version=1736840100
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**消息格式要求**:
|
|
|
|
|
|
|
|
|
|
|
|
```json
|
|
|
|
|
|
{
|
2026-02-28 10:38:33 +08:00
|
|
|
|
"message_id": "order#datahub#100#20#200#order#DY123",
|
2025-11-14 16:04:26 +08:00
|
|
|
|
"metadata": {
|
|
|
|
|
|
"data_version": 1736840100 // 必需:平台数据的最后更新时间戳
|
|
|
|
|
|
},
|
|
|
|
|
|
"data": {
|
|
|
|
|
|
"platform_unique_id": "DY123",
|
|
|
|
|
|
"raw_data": {
|
|
|
|
|
|
"update_time": 1736840100 // 从平台原始数据中提取
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
### 2. 幂等性保证(无 Redis 依赖)
|
|
|
|
|
|
|
|
|
|
|
|
#### 方案:数据库表记录已处理消息
|
|
|
|
|
|
|
|
|
|
|
|
创建 `processed_messages` 表记录所有已处理的 message_id:
|
|
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
|
CREATE TABLE processed_messages (
|
|
|
|
|
|
message_id VARCHAR(200) PRIMARY KEY,
|
|
|
|
|
|
platform_id INT NOT NULL,
|
|
|
|
|
|
data_type VARCHAR(50) NOT NULL,
|
|
|
|
|
|
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
|
|
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
-- 索引:加速查询
|
|
|
|
|
|
CREATE INDEX idx_processed_messages_created ON processed_messages (created_at);
|
|
|
|
|
|
|
|
|
|
|
|
-- 自动清理 30 天前的记录(可选)
|
|
|
|
|
|
CREATE INDEX idx_processed_messages_cleanup ON processed_messages (created_at)
|
|
|
|
|
|
WHERE created_at < NOW() - INTERVAL '30 days';
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
#### 消费流程
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
1. 收到消息,提取 message_id
|
|
|
|
|
|
|
|
|
|
|
|
2. 检查是否已处理:
|
|
|
|
|
|
SELECT 1 FROM processed_messages WHERE message_id = ?
|
|
|
|
|
|
|
|
|
|
|
|
如果存在 → ACK 消息,跳过处理(幂等性)
|
|
|
|
|
|
|
|
|
|
|
|
3. 开启事务:
|
|
|
|
|
|
BEGIN;
|
|
|
|
|
|
|
|
|
|
|
|
3.1 插入 processed_messages:
|
|
|
|
|
|
INSERT INTO processed_messages (message_id, platform_id, data_type)
|
|
|
|
|
|
VALUES (?, ?, ?)
|
|
|
|
|
|
ON CONFLICT (message_id) DO NOTHING; -- 防止并发插入
|
|
|
|
|
|
|
|
|
|
|
|
3.2 插入/更新业务数据:
|
|
|
|
|
|
INSERT INTO orders (...) VALUES (...)
|
|
|
|
|
|
ON CONFLICT (platform_id, platform_order_id)
|
|
|
|
|
|
DO UPDATE SET ...
|
|
|
|
|
|
WHERE orders.data_version < EXCLUDED.data_version;
|
|
|
|
|
|
|
|
|
|
|
|
COMMIT;
|
|
|
|
|
|
|
|
|
|
|
|
4. ACK 消息
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
#### 性能优化
|
|
|
|
|
|
|
|
|
|
|
|
**问题**:每次消费都查询 `processed_messages` 可能影响性能
|
|
|
|
|
|
|
|
|
|
|
|
**优化方案**:
|
|
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
|
-- 1. 使用 PostgreSQL 的 ON CONFLICT 避免先查询
|
|
|
|
|
|
INSERT INTO processed_messages (message_id, platform_id, data_type)
|
|
|
|
|
|
VALUES (?, ?, ?)
|
|
|
|
|
|
ON CONFLICT (message_id) DO NOTHING
|
|
|
|
|
|
RETURNING message_id;
|
|
|
|
|
|
|
|
|
|
|
|
-- 如果 RETURNING 无结果 → 说明已处理,直接返回
|
|
|
|
|
|
-- 如果 RETURNING 有结果 → 继续处理业务数据
|
|
|
|
|
|
|
|
|
|
|
|
-- 2. 定期清理历史记录(减少表大小)
|
|
|
|
|
|
DELETE FROM processed_messages
|
|
|
|
|
|
WHERE created_at < NOW() - INTERVAL '30 days';
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
#### 与数据版本控制结合
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
完整的幂等性 + 乱序处理流程:
|
|
|
|
|
|
|
|
|
|
|
|
BEGIN;
|
|
|
|
|
|
|
|
|
|
|
|
-- 1. 记录消息已处理(幂等性)
|
|
|
|
|
|
INSERT INTO processed_messages (message_id, platform_id, data_type)
|
2026-02-28 10:38:33 +08:00
|
|
|
|
VALUES ('order#datahub#100#20#200#order#DY123', 20, 'order')
|
2025-11-14 16:04:26 +08:00
|
|
|
|
ON CONFLICT (message_id) DO NOTHING
|
|
|
|
|
|
RETURNING message_id;
|
|
|
|
|
|
|
|
|
|
|
|
-- 如果返回空,说明重复消息,直接 ROLLBACK 并 ACK
|
|
|
|
|
|
-- 如果返回值,继续处理
|
|
|
|
|
|
|
|
|
|
|
|
-- 2. 更新业务数据(带版本控制,防止乱序)
|
|
|
|
|
|
INSERT INTO orders (
|
|
|
|
|
|
platform_id, platform_order_id, total_amount, data_version, created_at, updated_at
|
|
|
|
|
|
) VALUES (20, 'DY123', 200, 1736840100, NOW(), NOW())
|
|
|
|
|
|
ON CONFLICT (platform_id, platform_order_id)
|
|
|
|
|
|
DO UPDATE SET
|
|
|
|
|
|
total_amount = EXCLUDED.total_amount,
|
|
|
|
|
|
data_version = EXCLUDED.data_version,
|
|
|
|
|
|
updated_at = NOW()
|
|
|
|
|
|
WHERE orders.data_version < EXCLUDED.data_version;
|
|
|
|
|
|
|
|
|
|
|
|
COMMIT;
|
|
|
|
|
|
|
|
|
|
|
|
-- 3. ACK 消息
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 数据流转流程
|
|
|
|
|
|
|
|
|
|
|
|
### 1. 订单数据完整流程
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
sequenceDiagram
|
|
|
|
|
|
participant P as DouYin 采集服务
|
|
|
|
|
|
participant E as douyin.exchange
|
|
|
|
|
|
participant Q as orders.queue
|
|
|
|
|
|
participant C as OrderConsumer
|
|
|
|
|
|
participant DB as PostgreSQL
|
|
|
|
|
|
participant PM as processed_messages 表
|
|
|
|
|
|
participant AGG as 连续聚合视图
|
|
|
|
|
|
|
|
|
|
|
|
Note over P: 1. 数据采集阶段
|
|
|
|
|
|
P->>P: 从 DouYin API 获取订单数据
|
2026-02-28 10:38:33 +08:00
|
|
|
|
P->>P: 生成 message_id<br/>order#datahub#100#20#200#order#DY123<br/>提取 data_version
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
Note over P,E: 2. 消息发布阶段
|
|
|
|
|
|
P->>E: 发布消息<br/>routing_key: order<br/>持久化消息
|
|
|
|
|
|
E->>Q: 路由到订单队列
|
|
|
|
|
|
|
|
|
|
|
|
Note over Q,C: 3. 消息消费阶段
|
|
|
|
|
|
Q->>C: 推送消息 (prefetch 10)
|
|
|
|
|
|
C->>C: 验证消息格式
|
|
|
|
|
|
C->>C: 验证 message_id 结构
|
|
|
|
|
|
|
|
|
|
|
|
Note over C,DB: 4. 幂等性检查 + 数据入库 (事务)
|
|
|
|
|
|
C->>DB: BEGIN TRANSACTION
|
|
|
|
|
|
|
|
|
|
|
|
C->>PM: INSERT INTO processed_messages<br/>ON CONFLICT DO NOTHING<br/>RETURNING message_id
|
|
|
|
|
|
|
|
|
|
|
|
alt 返回空 (重复消息)
|
|
|
|
|
|
PM-->>C: 无返回值
|
|
|
|
|
|
C->>DB: ROLLBACK
|
|
|
|
|
|
C->>Q: ACK (跳过处理)
|
|
|
|
|
|
else 返回值 (新消息)
|
|
|
|
|
|
PM-->>C: 返回 message_id
|
|
|
|
|
|
|
|
|
|
|
|
Note over C,DB: 5. 更新业务数据 (带版本控制)
|
|
|
|
|
|
C->>DB: INSERT INTO orders<br/>ON CONFLICT DO UPDATE<br/>WHERE data_version < new_version
|
|
|
|
|
|
DB-->>C: 更新成功
|
|
|
|
|
|
|
|
|
|
|
|
C->>DB: COMMIT
|
|
|
|
|
|
C->>Q: ACK (确认消息)
|
|
|
|
|
|
|
|
|
|
|
|
Note over DB,AGG: 6. 自动聚合 (后台)
|
|
|
|
|
|
DB->>AGG: TimescaleDB 后台任务<br/>每小时刷新聚合视图
|
|
|
|
|
|
end
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
### 2. 错误处理流程
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
sequenceDiagram
|
|
|
|
|
|
participant P as Tmall 采集服务
|
|
|
|
|
|
participant E as tmall.exchange
|
|
|
|
|
|
participant Q as orders.queue
|
|
|
|
|
|
participant C as OrderConsumer
|
|
|
|
|
|
participant EE as tmall.errors.exchange
|
|
|
|
|
|
participant EQ as errors.queue
|
|
|
|
|
|
participant EC as ErrorConsumer
|
|
|
|
|
|
participant DB as failed_messages 表
|
|
|
|
|
|
participant Alert as 告警系统
|
|
|
|
|
|
|
|
|
|
|
|
P->>E: 发布订单消息
|
|
|
|
|
|
E->>Q: 路由到订单队列
|
|
|
|
|
|
Q->>C: 推送消息
|
|
|
|
|
|
|
|
|
|
|
|
Note over C: 消费处理
|
|
|
|
|
|
C->>C: 验证消息格式
|
|
|
|
|
|
alt 数据格式错误 (validation)
|
|
|
|
|
|
C->>C: 捕获 ValidationException
|
|
|
|
|
|
C->>EE: 发布错误消息<br/>routing_key: validation
|
|
|
|
|
|
EE->>EQ: 路由到错误队列
|
|
|
|
|
|
C->>Q: ACK (从业务队列移除)
|
|
|
|
|
|
|
|
|
|
|
|
Note over EQ,EC: 错误消息处理
|
|
|
|
|
|
EQ->>EC: 推送错误消息
|
|
|
|
|
|
EC->>DB: 记录错误日志
|
|
|
|
|
|
EC->>Alert: 判断是否需要告警
|
|
|
|
|
|
alt 满足告警条件
|
|
|
|
|
|
Alert->>Alert: 发送 Slack/Email 通知
|
|
|
|
|
|
end
|
|
|
|
|
|
EC->>EQ: ACK
|
|
|
|
|
|
|
|
|
|
|
|
else 处理成功
|
|
|
|
|
|
C->>C: 入库成功
|
|
|
|
|
|
C->>Q: ACK
|
|
|
|
|
|
end
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
### 3. 消息重试流程(DLX + 延迟重试队列)
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
sequenceDiagram
|
|
|
|
|
|
participant Q as orders.queue
|
|
|
|
|
|
participant C as OrderConsumer
|
|
|
|
|
|
participant DB as PostgreSQL
|
|
|
|
|
|
participant DLX as dlx.orders
|
|
|
|
|
|
participant RQ as orders.retry.queue
|
|
|
|
|
|
participant EQ as errors.queue
|
|
|
|
|
|
|
|
|
|
|
|
Q->>C: 推送消息 (x-retries: 0)
|
|
|
|
|
|
|
|
|
|
|
|
Note over C,DB: 第 1 次处理
|
|
|
|
|
|
C->>DB: 尝试入库
|
|
|
|
|
|
DB-->>C: 数据库连接失败
|
|
|
|
|
|
|
|
|
|
|
|
C->>C: 更新 header: x-retries = 1
|
|
|
|
|
|
C->>C: nack(requeue=false)
|
|
|
|
|
|
C-->>DLX: 消息进入 DLX
|
|
|
|
|
|
|
|
|
|
|
|
Note over DLX: 根据 x-retries 路由
|
|
|
|
|
|
DLX->>RQ: x-retries < 3<br/>routing_key: retry
|
|
|
|
|
|
|
|
|
|
|
|
Note over RQ: TTL = 5 秒
|
|
|
|
|
|
Note over RQ: 5 秒后自动死信
|
|
|
|
|
|
|
|
|
|
|
|
RQ-->>Q: TTL 到期<br/>回到主队列
|
|
|
|
|
|
|
|
|
|
|
|
Note over Q,C: 第 2 次处理
|
|
|
|
|
|
Q->>C: 推送消息 (x-retries: 1)
|
|
|
|
|
|
C->>DB: 尝试入库
|
|
|
|
|
|
DB-->>C: 数据库连接失败
|
|
|
|
|
|
|
|
|
|
|
|
C->>C: 更新 header: x-retries = 2
|
|
|
|
|
|
C->>C: nack(requeue=false)
|
|
|
|
|
|
C-->>DLX: 消息进入 DLX
|
|
|
|
|
|
DLX->>RQ: x-retries < 3
|
|
|
|
|
|
|
|
|
|
|
|
Note over RQ: 5 秒后自动死信
|
|
|
|
|
|
RQ-->>Q: 回到主队列
|
|
|
|
|
|
|
|
|
|
|
|
Note over Q,C: 第 3 次处理
|
|
|
|
|
|
Q->>C: 推送消息 (x-retries: 2)
|
|
|
|
|
|
C->>DB: 尝试入库
|
|
|
|
|
|
DB-->>C: 数据库连接失败
|
|
|
|
|
|
|
|
|
|
|
|
C->>C: 更新 header: x-retries = 3
|
|
|
|
|
|
C->>C: nack(requeue=false)
|
|
|
|
|
|
C-->>DLX: 消息进入 DLX
|
|
|
|
|
|
|
|
|
|
|
|
Note over DLX: x-retries >= 3<br/>超过重试次数
|
|
|
|
|
|
DLX->>EQ: routing_key: error<br/>进入错误队列
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 各角色职责和操作
|
|
|
|
|
|
|
|
|
|
|
|
### 1. 平台开发者(数据生产者)
|
|
|
|
|
|
|
|
|
|
|
|
#### 职责
|
|
|
|
|
|
|
|
|
|
|
|
- 从电商平台 API 采集数据(订单/产品/退款)
|
|
|
|
|
|
- 将数据转换为标准消息格式
|
|
|
|
|
|
- 推送消息到 RabbitMQ
|
|
|
|
|
|
- 监控自己平台的错误消息
|
|
|
|
|
|
|
|
|
|
|
|
#### 权限
|
|
|
|
|
|
|
|
|
|
|
|
| 操作 | 权限 | 说明 |
|
|
|
|
|
|
|------|------|------|
|
2026-02-28 10:38:33 +08:00
|
|
|
|
| 连接到 VHost | `datahub-app` | 只能访问此 VHost |
|
2025-11-14 16:04:26 +08:00
|
|
|
|
| 发布消息 | `{platform}.exchange` | 只能写入自己平台的 Exchange |
|
|
|
|
|
|
| 订阅错误 | `{platform}.errors.exchange` | 可选,接收本平台错误通知 |
|
|
|
|
|
|
|
|
|
|
|
|
#### 操作流程
|
|
|
|
|
|
|
|
|
|
|
|
##### 步骤 1: 生成 message_id
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
格式: {entity_type}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{platform_unique_id}
|
|
|
|
|
|
|
|
|
|
|
|
示例:
|
2026-02-28 10:38:33 +08:00
|
|
|
|
- order#datahub#100#20#200#order#DY123456789
|
|
|
|
|
|
- product#datahub#100#2#201#product#TM-PROD789
|
|
|
|
|
|
- refund#datahub#100#20#200#refund#DY-REF456
|
|
|
|
|
|
- inventory#datahub#100#2#201#inventory#TM-SKU123
|
2025-11-14 16:04:26 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**生成逻辑**:
|
|
|
|
|
|
```
|
|
|
|
|
|
# 伪代码
|
2026-02-28 10:38:33 +08:00
|
|
|
|
message_id = f"{data_type}#datahub#{company_id}#{platform_id}#{store_id}#{data_type}#{platform_order_id}"
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
# 实际示例(DouYin 订单)
|
2026-02-28 10:38:33 +08:00
|
|
|
|
message_id = f"order#datahub#100#20#200#order#{order_data['order_id']}"
|
2025-11-14 16:04:26 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
##### 步骤 2: 提取 data_version
|
|
|
|
|
|
|
|
|
|
|
|
**重要**:必须从平台原始数据中提取更新时间戳作为 `data_version`
|
|
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
|
# 示例:从 DouYin API 响应提取
|
|
|
|
|
|
douyin_order = {
|
|
|
|
|
|
"order_id": "DY202501140001",
|
|
|
|
|
|
"create_time": 1736839200,
|
|
|
|
|
|
"update_time": 1736840000, # ← 使用这个作为 data_version
|
|
|
|
|
|
...
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
data_version = douyin_order.get('update_time') or douyin_order.get('create_time')
|
|
|
|
|
|
|
|
|
|
|
|
# 如果平台未提供更新时间,使用当前时间
|
|
|
|
|
|
if not data_version:
|
|
|
|
|
|
data_version = int(time.time())
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
##### 步骤 3: 构建消息体
|
|
|
|
|
|
|
|
|
|
|
|
```json
|
|
|
|
|
|
{
|
2026-02-28 10:38:33 +08:00
|
|
|
|
"message_id": "order#datahub#100#20#200#order#DY123456",
|
2025-11-14 16:04:26 +08:00
|
|
|
|
"timestamp": "2025-01-14T10:30:00Z",
|
|
|
|
|
|
"platform": "douyin",
|
|
|
|
|
|
"data_type": "order",
|
|
|
|
|
|
"metadata": {
|
|
|
|
|
|
"platform_id": 20,
|
|
|
|
|
|
"company_id": 100,
|
|
|
|
|
|
"store_id": 200,
|
|
|
|
|
|
"source_system": "douyin-open-api",
|
|
|
|
|
|
"retry_count": 0,
|
|
|
|
|
|
"data_version": 1736840000
|
|
|
|
|
|
},
|
|
|
|
|
|
"data": {
|
|
|
|
|
|
"platform_unique_id": "DY123456",
|
|
|
|
|
|
"raw_data": {
|
|
|
|
|
|
// 平台原始完整 JSON 数据(必须包含更新时间)
|
|
|
|
|
|
"order_id": "DY123456",
|
|
|
|
|
|
"update_time": 1736840000,
|
|
|
|
|
|
...
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
##### 步骤 4: 发布消息
|
|
|
|
|
|
|
|
|
|
|
|
**连接参数**:
|
|
|
|
|
|
- Host: `<rabbitmq-host>`
|
|
|
|
|
|
- Port: `5672`
|
2026-02-28 10:38:33 +08:00
|
|
|
|
- VHost: `datahub-app`
|
2025-11-14 16:04:26 +08:00
|
|
|
|
- Username: `user_{platform}`
|
|
|
|
|
|
- Password: `<provided_password>`
|
|
|
|
|
|
|
|
|
|
|
|
**发布配置**:
|
|
|
|
|
|
- Exchange: `{platform}.exchange`
|
|
|
|
|
|
- Routing Key: `{data_type}.{platform}` 格式
|
|
|
|
|
|
- 订单: `order.{platform}` (如 `order.douyin`, `order.tmall`)
|
|
|
|
|
|
- 产品: `product.{platform}` (如 `product.douyin`, `product.tmall`)
|
|
|
|
|
|
- 退款: `refund.{platform}` (如 `refund.douyin`, `refund.tmall`)
|
|
|
|
|
|
- 库存: `inventory.{platform}` (如 `inventory.douyin`, `inventory.tmall`)
|
|
|
|
|
|
- Delivery Mode: `2` (持久化)
|
|
|
|
|
|
|
|
|
|
|
|
##### 步骤 4: 监控错误(可选)
|
|
|
|
|
|
|
|
|
|
|
|
**订阅配置**:
|
|
|
|
|
|
- Exchange: `{platform}.errors.exchange`
|
|
|
|
|
|
- Routing Key: `#` (接收所有错误类型)
|
|
|
|
|
|
- 创建临时队列或持久队列用于接收错误消息
|
|
|
|
|
|
|
|
|
|
|
|
**错误消息示例**:
|
|
|
|
|
|
```json
|
|
|
|
|
|
{
|
|
|
|
|
|
"error_id": "err_1234567890",
|
|
|
|
|
|
"original_message": { /* 原始消息 */ },
|
|
|
|
|
|
"error": {
|
|
|
|
|
|
"type": "validation",
|
|
|
|
|
|
"message": "Missing required field: total_amount",
|
|
|
|
|
|
"timestamp": "2025-01-14T10:31:00Z"
|
|
|
|
|
|
},
|
|
|
|
|
|
"metadata": {
|
|
|
|
|
|
"platform": "amazon",
|
|
|
|
|
|
"platform_id": 1,
|
|
|
|
|
|
"store_id": 200,
|
|
|
|
|
|
"failed_at": "2025-01-14T10:31:00Z"
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
2026-02-28 10:38:33 +08:00
|
|
|
|
### 2. Datahub 应用(数据消费者)
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
#### 职责
|
|
|
|
|
|
|
|
|
|
|
|
- 消费 RabbitMQ 业务队列中的消息
|
|
|
|
|
|
- 使用批处理模式提升吞吐量(prefetch=100, batch_size=50)
|
|
|
|
|
|
- 通过适配器模式支持多平台数据处理
|
|
|
|
|
|
- 验证消息格式和数据完整性
|
|
|
|
|
|
- 将数据入库到 PostgreSQL
|
|
|
|
|
|
- 处理失败消息,通过 DLX 延迟重试或进入错误队列
|
|
|
|
|
|
- 实现幂等性,避免重复处理
|
|
|
|
|
|
|
|
|
|
|
|
#### 消费者架构模型
|
|
|
|
|
|
|
|
|
|
|
|
**单消费者 + 批处理 + 适配器模式**:
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
OrderConsumer (单实例)
|
|
|
|
|
|
├── RabbitMQ Channel
|
|
|
|
|
|
│ └── prefetch_count: 100 (最多缓存 100 条未确认消息)
|
|
|
|
|
|
│
|
|
|
|
|
|
├── 消息缓冲区 (Message Buffer)
|
|
|
|
|
|
│ ├── 接收 RabbitMQ 推送的消息
|
|
|
|
|
|
│ ├── 达到 batch_size=50 或 timeout=3s 触发处理
|
|
|
|
|
|
│ └── 缓冲区最大容量: 100 条
|
|
|
|
|
|
│
|
|
|
|
|
|
├── 批处理引擎 (Batch Processor)
|
|
|
|
|
|
│ ├── 遍历缓冲区中的消息
|
|
|
|
|
|
│ ├── 对每条消息调用对应的适配器
|
|
|
|
|
|
│ └── 批量入库 + 单条 ACK
|
|
|
|
|
|
│
|
|
|
|
|
|
└── 平台适配器 (Platform Adapters)
|
|
|
|
|
|
├── DouyinAdapter: 处理抖音订单数据
|
|
|
|
|
|
├── TmallAdapter: 处理天猫订单数据
|
|
|
|
|
|
├── JDAdapter: 处理京东订单数据
|
|
|
|
|
|
└── ... 其他平台适配器
|
|
|
|
|
|
|
|
|
|
|
|
工作流程:
|
|
|
|
|
|
1. RabbitMQ 推送消息到消费者 (最多 100 条未确认)
|
|
|
|
|
|
2. 消费者缓存消息到缓冲区
|
|
|
|
|
|
3. 达到 batch_size 或 timeout 后触发批处理
|
|
|
|
|
|
4. 批处理引擎遍历消息,根据 platform 字段调用对应适配器
|
|
|
|
|
|
5. 适配器验证和转换数据
|
|
|
|
|
|
6. 批量插入数据库
|
|
|
|
|
|
7. 逐条 ACK/NACK 消息
|
|
|
|
|
|
```
|
|
|
|
|
|
|
2025-11-14 16:07:35 +08:00
|
|
|
|
** 消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **
|
|
|
|
|
|
|
2025-11-14 16:04:26 +08:00
|
|
|
|
#### 权限
|
|
|
|
|
|
|
|
|
|
|
|
| 操作 | 权限 | 说明 |
|
|
|
|
|
|
|------|------|------|
|
2026-02-28 10:38:33 +08:00
|
|
|
|
| 连接到 VHost | `datahub-app` | 访问业务 VHost |
|
2025-11-14 16:04:26 +08:00
|
|
|
|
| 读取队列 | `orders.queue`, `products.queue`, `refunds.queue` | 消费业务消息 |
|
|
|
|
|
|
| 写入错误 Exchange | `*.errors.exchange` | 发送错误消息到对应平台的错误 Exchange |
|
|
|
|
|
|
| 管理队列 | `orders.queue`, `products.queue`, `refunds.queue` | 配置消费者参数 |
|
|
|
|
|
|
|
|
|
|
|
|
#### 核心操作
|
|
|
|
|
|
|
|
|
|
|
|
##### 消息消费配置
|
|
|
|
|
|
|
|
|
|
|
|
**推荐参数**:
|
|
|
|
|
|
- **prefetch_count**: `100` - RabbitMQ 最多推送 100 条未确认消息
|
|
|
|
|
|
- **batch_size**: `50` - 消费者积累 50 条消息后触发批处理
|
|
|
|
|
|
- **batch_timeout**: `3000` ms - 如果 3 秒内未达到 batch_size,也触发处理
|
|
|
|
|
|
- **消费者模型**: 单消费者 + 批处理 + 适配器模式(每个队列一个消费者实例)
|
|
|
|
|
|
- **ACK 模式**: 手动 ACK,单条确认(处理成功后才确认)
|
|
|
|
|
|
- **重连机制**: 自动重连,指数退避
|
|
|
|
|
|
|
|
|
|
|
|
##### 幂等性保证(数据库方案)
|
|
|
|
|
|
|
|
|
|
|
|
**方案**:使用 `processed_messages` 表记录已处理的 message_id
|
|
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
|
-- 1. 创建幂等性记录表
|
|
|
|
|
|
CREATE TABLE processed_messages (
|
|
|
|
|
|
message_id VARCHAR(200) PRIMARY KEY,
|
|
|
|
|
|
platform_id INT NOT NULL,
|
|
|
|
|
|
data_type VARCHAR(50) NOT NULL,
|
|
|
|
|
|
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
|
|
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
CREATE INDEX idx_processed_messages_created ON processed_messages (created_at);
|
|
|
|
|
|
|
|
|
|
|
|
-- 2. 定期清理历史记录(可选,保留 30 天)
|
|
|
|
|
|
DELETE FROM processed_messages WHERE created_at < NOW() - INTERVAL '30 days';
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**消费流程**:
|
|
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
|
-- 在事务中执行
|
|
|
|
|
|
BEGIN;
|
|
|
|
|
|
|
|
|
|
|
|
-- 步骤 1: 尝试插入 processed_messages(幂等性检查)
|
|
|
|
|
|
INSERT INTO processed_messages (message_id, platform_id, data_type)
|
2026-02-28 10:38:33 +08:00
|
|
|
|
VALUES ('order#datahub#100#20#200#order#DY123', 20, 'order')
|
2025-11-14 16:04:26 +08:00
|
|
|
|
ON CONFLICT (message_id) DO NOTHING
|
|
|
|
|
|
RETURNING message_id;
|
|
|
|
|
|
|
|
|
|
|
|
-- 如果返回空 → 说明是重复消息,ROLLBACK 并 ACK
|
|
|
|
|
|
-- 如果返回值 → 继续处理
|
|
|
|
|
|
|
|
|
|
|
|
-- 步骤 2: 插入/更新业务数据(带版本控制)
|
|
|
|
|
|
INSERT INTO orders (
|
|
|
|
|
|
platform_id, platform_order_id, company_id, store_id,
|
|
|
|
|
|
total_amount, data_version, raw, created_at, updated_at
|
|
|
|
|
|
) VALUES (20, 'DY123', 100, 200, 9999, 1736840000, '...'::jsonb, NOW(), NOW())
|
|
|
|
|
|
ON CONFLICT (platform_id, platform_order_id)
|
|
|
|
|
|
DO UPDATE SET
|
|
|
|
|
|
total_amount = EXCLUDED.total_amount,
|
|
|
|
|
|
data_version = EXCLUDED.data_version,
|
|
|
|
|
|
raw = EXCLUDED.raw,
|
|
|
|
|
|
updated_at = NOW()
|
|
|
|
|
|
WHERE orders.data_version < EXCLUDED.data_version; -- 只有更新的版本才更新
|
|
|
|
|
|
|
|
|
|
|
|
COMMIT;
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**关键点**:
|
|
|
|
|
|
- ✅ 无需 Redis 依赖
|
|
|
|
|
|
- ✅ 事务保证原子性
|
|
|
|
|
|
- ✅ `ON CONFLICT DO NOTHING` 实现幂等性
|
|
|
|
|
|
- ✅ `WHERE data_version < EXCLUDED.data_version` 防止乱序更新
|
|
|
|
|
|
|
|
|
|
|
|
##### 错误处理逻辑
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
处理流程:
|
|
|
|
|
|
1. 验证消息格式
|
|
|
|
|
|
├─ 格式正确 → 继续
|
|
|
|
|
|
└─ 格式错误 → 发送到错误队列 (validation 类型), ACK
|
|
|
|
|
|
|
|
|
|
|
|
2. 幂等性检查
|
|
|
|
|
|
├─ 已处理 → 直接 ACK
|
|
|
|
|
|
└─ 未处理 → 继续
|
|
|
|
|
|
|
|
|
|
|
|
3. 业务处理
|
|
|
|
|
|
├─ 成功 → 标记已处理, ACK
|
|
|
|
|
|
└─ 失败 (异常)
|
|
|
|
|
|
├─ 数据库错误 / 网络错误
|
|
|
|
|
|
│ ├─ retry_count < 3 → 重新发布 (延迟重试), ACK
|
|
|
|
|
|
│ └─ retry_count >= 3 → 发送到错误队列 (processing_failed), ACK
|
|
|
|
|
|
└─ 业务逻辑错误 → 发送到错误队列 (validation), ACK
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
##### 延迟重试实现(DLX + 重试队列方案)
|
|
|
|
|
|
|
|
|
|
|
|
**重试策略**:
|
|
|
|
|
|
- 重试延迟: 固定 5 秒(通过 TTL 实现)
|
|
|
|
|
|
- 最大重试次数: 3 次
|
|
|
|
|
|
- 超过 3 次: 进入错误队列
|
|
|
|
|
|
|
|
|
|
|
|
**实现方式**(无需额外插件):
|
|
|
|
|
|
```
|
|
|
|
|
|
消费失败 → nack(requeue=false) → 进入 DLX
|
|
|
|
|
|
↓
|
|
|
|
|
|
x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列
|
|
|
|
|
|
↓
|
|
|
|
|
|
x-retries >= 3 → 错误队列 (人工处理)
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**工作流程**:
|
|
|
|
|
|
1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX(如 dlx.orders)
|
|
|
|
|
|
2. DLX 根据消息 header 中的 `x-retries` 判断路由:
|
|
|
|
|
|
- `x-retries < 3`:路由到 `orders.retry.queue`(延迟重试)
|
|
|
|
|
|
- `x-retries >= 3`:路由到 `errors.queue`(永久失败)
|
|
|
|
|
|
3. 重试队列 TTL 到期后,消息自动死信回主队列,重新消费
|
|
|
|
|
|
4. 消费者需要在 nack 前更新消息的 `x-retries` header
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
### 3. 运维团队
|
|
|
|
|
|
|
|
|
|
|
|
#### 职责
|
|
|
|
|
|
|
|
|
|
|
|
- RabbitMQ 集群部署和维护
|
|
|
|
|
|
- 用户和权限管理
|
|
|
|
|
|
- 监控队列健康状态
|
|
|
|
|
|
- 处理系统级错误
|
|
|
|
|
|
- 性能优化和故障排查
|
|
|
|
|
|
|
|
|
|
|
|
#### 权限
|
|
|
|
|
|
|
|
|
|
|
|
| 操作 | 权限 | 说明 |
|
|
|
|
|
|
|------|------|------|
|
|
|
|
|
|
| 管理员账号 | `admin` tag | 访问管理界面,完全权限 |
|
|
|
|
|
|
| 监控错误队列 | `errors.queue` (read) | 查看所有平台的错误消息 |
|
|
|
|
|
|
| 配置资源 | 所有 Exchange/Queue | 创建、修改、删除资源 |
|
|
|
|
|
|
|
|
|
|
|
|
#### 日常操作
|
|
|
|
|
|
|
|
|
|
|
|
##### 监控关键指标
|
|
|
|
|
|
|
|
|
|
|
|
**队列堆积监控**:
|
|
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
|
# 查看所有队列状态
|
2026-02-28 10:38:33 +08:00
|
|
|
|
rabbitmqctl list_queues -p datahub-app name messages_ready messages_unacknowledged
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
# 告警阈值
|
|
|
|
|
|
# - messages_ready > 10000: 严重堆积
|
|
|
|
|
|
# - messages_unacknowledged > 5000: 消费者处理缓慢
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**消费者连接监控**:
|
|
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
|
# 查看所有消费者连接
|
2026-02-28 10:38:33 +08:00
|
|
|
|
rabbitmqctl list_consumers -p datahub-app
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
# 检查项:
|
|
|
|
|
|
# - 消费者数量是否正常
|
|
|
|
|
|
# - prefetch_count 配置是否合理
|
|
|
|
|
|
# - 是否有消费者断开
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**错误率监控**:
|
|
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
|
-- 查询最近 1 小时各平台错误数量
|
|
|
|
|
|
SELECT
|
|
|
|
|
|
platform,
|
|
|
|
|
|
error_type,
|
|
|
|
|
|
COUNT(*) as error_count
|
|
|
|
|
|
FROM failed_messages
|
|
|
|
|
|
WHERE created_at > NOW() - INTERVAL '1 hour'
|
|
|
|
|
|
GROUP BY platform, error_type
|
|
|
|
|
|
ORDER BY error_count DESC;
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
##### 故障排查
|
|
|
|
|
|
|
|
|
|
|
|
**队列堆积问题**:
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
排查步骤:
|
|
|
|
|
|
1. 检查消费者是否在线
|
2026-02-28 10:38:33 +08:00
|
|
|
|
rabbitmqctl list_consumers -p datahub-app
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
2. 查看消费速率
|
|
|
|
|
|
管理界面 → Queues → 选择队列 → 查看 Deliver/Get rate
|
|
|
|
|
|
|
|
|
|
|
|
3. 检查消费者日志
|
|
|
|
|
|
- 是否有异常报错
|
|
|
|
|
|
- 处理时间是否过长
|
|
|
|
|
|
|
|
|
|
|
|
解决方案:
|
|
|
|
|
|
- 增加消费者实例数量
|
|
|
|
|
|
- 增加 prefetch_count
|
|
|
|
|
|
- 优化业务处理逻辑
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**消息丢失问题**:
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
排查步骤:
|
|
|
|
|
|
1. 确认消息是否发布成功
|
|
|
|
|
|
管理界面 → Exchanges → 查看 Message rates (Publish in)
|
|
|
|
|
|
|
|
|
|
|
|
2. 确认 Binding 配置
|
|
|
|
|
|
管理界面 → Exchanges → 选择 Exchange → 查看 Bindings
|
|
|
|
|
|
|
|
|
|
|
|
3. 检查消息是否持久化
|
|
|
|
|
|
- Exchange durable: true
|
|
|
|
|
|
- Queue durable: true
|
|
|
|
|
|
- Message delivery_mode: 2
|
|
|
|
|
|
|
|
|
|
|
|
4. 查看消费者 ACK 行为
|
|
|
|
|
|
- 是否正确 ACK
|
|
|
|
|
|
- 是否有 NACK/REJECT
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
##### 性能优化
|
|
|
|
|
|
|
|
|
|
|
|
**优化消费速度**:
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
措施:
|
|
|
|
|
|
1. 调整批处理参数
|
|
|
|
|
|
- batch_size: 当前 50,可根据消息大小调整到 30-100
|
|
|
|
|
|
- batch_timeout: 当前 3s,可根据实时性要求调整到 1-5s
|
|
|
|
|
|
|
|
|
|
|
|
2. 调整 prefetch_count
|
|
|
|
|
|
- 当前: 100
|
|
|
|
|
|
- 可根据内存和消息处理速度调整到 50-200
|
|
|
|
|
|
- prefetch_count 应该是 batch_size 的 2-3 倍
|
|
|
|
|
|
|
|
|
|
|
|
3. 优化批处理逻辑
|
|
|
|
|
|
- 使用数据库批量插入(INSERT ... VALUES (...), (...), ...)
|
|
|
|
|
|
- 批次内消息可以并发验证(适配器模式)
|
|
|
|
|
|
- 但必须单条 ACK 保证可靠性
|
|
|
|
|
|
|
|
|
|
|
|
4. 优化业务逻辑
|
|
|
|
|
|
- 减少数据库查询
|
|
|
|
|
|
- 使用连接池
|
|
|
|
|
|
- 异步处理非关键操作
|
|
|
|
|
|
|
|
|
|
|
|
5. 水平扩展(谨慎)
|
|
|
|
|
|
- 单队列单消费者模型,不建议同一队列多消费者
|
|
|
|
|
|
- 可考虑分片:不同数据类型独立扩展(orders/products 分别扩展)
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**队列性能优化**:
|
|
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
|
# 启用 lazy queue (大量消息堆积时)
|
|
|
|
|
|
rabbitmqctl set_policy lazy-queue "^(orders|products|refunds)\.queue$" \
|
2026-02-28 10:38:33 +08:00
|
|
|
|
'{"queue-mode":"lazy"}' --vhost datahub-app
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
# 效果:
|
|
|
|
|
|
# - 消息存储在磁盘,减少内存占用
|
|
|
|
|
|
# - 适用于消息堆积严重的场景
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
### 4. RabbitMQ 管理员
|
|
|
|
|
|
|
|
|
|
|
|
#### 职责
|
|
|
|
|
|
|
|
|
|
|
|
- 初始化 RabbitMQ 配置
|
|
|
|
|
|
- 创建 VHost、Exchange、Queue
|
|
|
|
|
|
- 管理用户和权限
|
|
|
|
|
|
- 新平台接入配置
|
|
|
|
|
|
|
|
|
|
|
|
#### 核心操作
|
|
|
|
|
|
|
|
|
|
|
|
##### 新平台接入
|
|
|
|
|
|
|
|
|
|
|
|
当需要接入新平台(如 `aliexpress`)时,执行以下步骤:
|
|
|
|
|
|
|
|
|
|
|
|
**步骤 1: 创建 Exchange**
|
|
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
|
# 业务 Exchange
|
|
|
|
|
|
rabbitmqadmin -u admin -p <password> declare exchange \
|
2026-02-28 10:38:33 +08:00
|
|
|
|
name="aliexpress.exchange" vhost=datahub-app \
|
2025-11-14 16:04:26 +08:00
|
|
|
|
type=topic durable=true
|
|
|
|
|
|
|
|
|
|
|
|
# 错误 Exchange
|
|
|
|
|
|
rabbitmqadmin -u admin -p <password> declare exchange \
|
2026-02-28 10:38:33 +08:00
|
|
|
|
name="aliexpress.errors.exchange" vhost=datahub-app \
|
2025-11-14 16:04:26 +08:00
|
|
|
|
type=topic durable=true
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**步骤 2: 创建 Binding**
|
|
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
|
# 绑定到业务队列(使用 {data_type}.{platform} 格式)
|
|
|
|
|
|
rabbitmqadmin -u admin -p <password> declare binding \
|
|
|
|
|
|
source="aliexpress.exchange" destination="orders.queue" \
|
2026-02-28 10:38:33 +08:00
|
|
|
|
vhost=datahub-app routing_key="order.aliexpress"
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
rabbitmqadmin -u admin -p <password> declare binding \
|
|
|
|
|
|
source="aliexpress.exchange" destination="products.queue" \
|
2026-02-28 10:38:33 +08:00
|
|
|
|
vhost=datahub-app routing_key="product.aliexpress"
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
rabbitmqadmin -u admin -p <password> declare binding \
|
|
|
|
|
|
source="aliexpress.exchange" destination="refunds.queue" \
|
2026-02-28 10:38:33 +08:00
|
|
|
|
vhost=datahub-app routing_key="refund.aliexpress"
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
rabbitmqadmin -u admin -p <password> declare binding \
|
|
|
|
|
|
source="aliexpress.exchange" destination="inventory.queue" \
|
2026-02-28 10:38:33 +08:00
|
|
|
|
vhost=datahub-app routing_key="inventory.aliexpress"
|
2025-11-14 16:04:26 +08:00
|
|
|
|
|
|
|
|
|
|
# 绑定到错误队列
|
|
|
|
|
|
rabbitmqadmin -u admin -p <password> declare binding \
|
|
|
|
|
|
source="aliexpress.errors.exchange" destination="errors.queue" \
|
2026-02-28 10:38:33 +08:00
|
|
|
|
vhost=datahub-app routing_key="#"
|
2025-11-14 16:04:26 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**步骤 3: 创建用户**
|
|
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
|
# 创建用户
|
|
|
|
|
|
rabbitmqadmin -u admin -p <password> declare user \
|
|
|
|
|
|
name="user_aliexpress" password="<strong_password>" tags=""
|
|
|
|
|
|
|
|
|
|
|
|
# 配置权限
|
|
|
|
|
|
rabbitmqadmin -u admin -p <password> declare permission \
|
2026-02-28 10:38:33 +08:00
|
|
|
|
vhost=datahub-app user="user_aliexpress" \
|
2025-11-14 16:04:26 +08:00
|
|
|
|
configure="" write="^aliexpress\.(exchange|errors\.exchange)$" \
|
|
|
|
|
|
read="^aliexpress\.errors\..*$"
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**步骤 4: 提供给开发者**
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
连接信息:
|
|
|
|
|
|
- Host: <rabbitmq-host>
|
|
|
|
|
|
- Port: 5672
|
2026-02-28 10:38:33 +08:00
|
|
|
|
- VHost: datahub-app
|
2025-11-14 16:04:26 +08:00
|
|
|
|
- Username: user_aliexpress
|
|
|
|
|
|
- Password: <strong_password>
|
|
|
|
|
|
|
|
|
|
|
|
Exchange: aliexpress.exchange
|
|
|
|
|
|
Routing Keys:
|
|
|
|
|
|
- 订单: order.aliexpress
|
|
|
|
|
|
- 产品: product.aliexpress
|
|
|
|
|
|
- 退款: refund.aliexpress
|
|
|
|
|
|
- 库存: inventory.aliexpress
|
|
|
|
|
|
|
|
|
|
|
|
错误订阅 (可选): aliexpress.errors.exchange
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 监控和告警
|
|
|
|
|
|
|
|
|
|
|
|
### 1. 关键监控指标
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
graph TB
|
|
|
|
|
|
subgraph "队列监控"
|
|
|
|
|
|
M1[messages_ready<br/>待消费消息数]
|
|
|
|
|
|
M2[messages_unacknowledged<br/>未确认消息数]
|
|
|
|
|
|
M3[message_stats.publish_in_rate<br/>消息发布速率]
|
|
|
|
|
|
M4[message_stats.deliver_rate<br/>消息消费速率]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "消费者监控"
|
|
|
|
|
|
C1[consumer_count<br/>消费者数量]
|
|
|
|
|
|
C2[prefetch_count<br/>预取数量]
|
|
|
|
|
|
C3[connection_state<br/>连接状态]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "错误监控"
|
|
|
|
|
|
E1[errors.queue 消息数<br/>错误队列堆积]
|
|
|
|
|
|
E2[failed_messages 表记录数<br/>失败消息总数]
|
|
|
|
|
|
E3[每小时错误增长率<br/>错误频率]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subgraph "系统监控"
|
|
|
|
|
|
S1[disk_free<br/>磁盘剩余空间]
|
|
|
|
|
|
S2[mem_used<br/>内存使用率]
|
|
|
|
|
|
S3[fd_used<br/>文件描述符使用]
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
style M1 fill:#ffcdd2
|
|
|
|
|
|
style M2 fill:#ffcdd2
|
|
|
|
|
|
style E1 fill:#ffcdd2
|
|
|
|
|
|
style E2 fill:#ffcdd2
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
### 2. 告警规则
|
|
|
|
|
|
|
|
|
|
|
|
| 指标 | 告警阈值 | 级别 | 处理建议 |
|
|
|
|
|
|
|------|---------|------|---------|
|
|
|
|
|
|
| `messages_ready` | > 10,000 | 警告 | 增加消费者 |
|
|
|
|
|
|
| `messages_ready` | > 50,000 | 严重 | 紧急扩容 |
|
|
|
|
|
|
| `messages_unacknowledged` | > 5,000 | 警告 | 检查消费者处理速度 |
|
|
|
|
|
|
| `consumer_count` | = 0 | 严重 | 消费者服务挂掉 |
|
|
|
|
|
|
| `publish_rate - deliver_rate` | > 1000/s | 警告 | 消费速度跟不上生产速度 |
|
|
|
|
|
|
| `errors.queue` 消息数 | > 1,000 | 警告 | 大量消息处理失败 |
|
|
|
|
|
|
| 单平台小时错误数 | > 100 | 警告 | 平台数据质量问题 |
|
|
|
|
|
|
| `disk_free` | < 10 GB | 严重 | 清理磁盘或扩容 |
|
|
|
|
|
|
| `mem_used` | > 80% | 警告 | 优化或增加内存 |
|
|
|
|
|
|
|
|
|
|
|
|
### 3. 监控实现
|
|
|
|
|
|
|
|
|
|
|
|
**Prometheus + Grafana**:
|
|
|
|
|
|
|
|
|
|
|
|
```yaml
|
|
|
|
|
|
# prometheus.yml
|
|
|
|
|
|
scrape_configs:
|
|
|
|
|
|
- job_name: 'rabbitmq'
|
|
|
|
|
|
static_configs:
|
|
|
|
|
|
- targets: ['rabbitmq-exporter:9419']
|
|
|
|
|
|
metrics_path: /metrics
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**告警规则示例**:
|
|
|
|
|
|
|
|
|
|
|
|
```yaml
|
|
|
|
|
|
# alert_rules.yml
|
|
|
|
|
|
groups:
|
|
|
|
|
|
- name: rabbitmq_alerts
|
|
|
|
|
|
rules:
|
|
|
|
|
|
- alert: QueueBacklog
|
|
|
|
|
|
expr: rabbitmq_queue_messages_ready{queue=~"orders|products|refunds"} > 10000
|
|
|
|
|
|
for: 5m
|
|
|
|
|
|
labels:
|
|
|
|
|
|
severity: warning
|
|
|
|
|
|
annotations:
|
|
|
|
|
|
summary: "队列 {{ $labels.queue }} 堆积严重"
|
|
|
|
|
|
description: "当前待消费消息数: {{ $value }}"
|
|
|
|
|
|
|
|
|
|
|
|
- alert: NoConsumers
|
|
|
|
|
|
expr: rabbitmq_queue_consumers{queue=~"orders|products|refunds"} == 0
|
|
|
|
|
|
for: 1m
|
|
|
|
|
|
labels:
|
|
|
|
|
|
severity: critical
|
|
|
|
|
|
annotations:
|
|
|
|
|
|
summary: "队列 {{ $labels.queue }} 无消费者"
|
|
|
|
|
|
description: "消费者服务可能已挂掉"
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 故障恢复
|
|
|
|
|
|
|
|
|
|
|
|
### 1. 消费者服务挂掉
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
graph LR
|
|
|
|
|
|
A[检测到消费者断开] --> B[消息堆积在队列中]
|
|
|
|
|
|
B --> C[告警通知运维]
|
|
|
|
|
|
C --> D[重启消费者服务]
|
|
|
|
|
|
D --> E[消费者自动重连]
|
|
|
|
|
|
E --> F[继续消费堆积消息]
|
|
|
|
|
|
F --> G{消息是否过期?}
|
|
|
|
|
|
G -->|未过期| H[正常处理]
|
|
|
|
|
|
G -->|已过期| I[消息被 TTL 删除]
|
|
|
|
|
|
|
|
|
|
|
|
style A fill:#ffcdd2
|
|
|
|
|
|
style C fill:#fff9c4
|
|
|
|
|
|
style H fill:#c8e6c9
|
|
|
|
|
|
style I fill:#ffab91
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**恢复时间**:
|
|
|
|
|
|
- 自动重连: 秒级
|
|
|
|
|
|
- 手动重启: 分钟级
|
|
|
|
|
|
- 数据不会丢失(消息持久化)
|
|
|
|
|
|
|
|
|
|
|
|
### 2. RabbitMQ 服务重启
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
graph TB
|
|
|
|
|
|
A[RabbitMQ 重启] --> B[持久化资源自动恢复]
|
|
|
|
|
|
B --> C[Exchanges 恢复]
|
|
|
|
|
|
B --> D[Queues 恢复]
|
|
|
|
|
|
B --> E[Bindings 恢复]
|
|
|
|
|
|
B --> F[持久化消息恢复]
|
|
|
|
|
|
|
|
|
|
|
|
A --> G[生产者重连]
|
|
|
|
|
|
A --> H[消费者重连]
|
|
|
|
|
|
|
|
|
|
|
|
G --> I[继续发布消息]
|
|
|
|
|
|
H --> J[继续消费消息]
|
|
|
|
|
|
|
|
|
|
|
|
style A fill:#ffcdd2
|
|
|
|
|
|
style C fill:#c8e6c9
|
|
|
|
|
|
style D fill:#c8e6c9
|
|
|
|
|
|
style E fill:#c8e6c9
|
|
|
|
|
|
style F fill:#c8e6c9
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
**数据安全性**:
|
|
|
|
|
|
- ✅ 持久化 Exchange 自动恢复
|
|
|
|
|
|
- ✅ 持久化 Queue 自动恢复
|
|
|
|
|
|
- ✅ 持久化消息自动恢复
|
|
|
|
|
|
- ❌ 未 ACK 的消息会重新投递(可能重复消费,需幂等性保证)
|
|
|
|
|
|
|
|
|
|
|
|
### 3. 数据库连接失败
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
graph TB
|
|
|
|
|
|
A[数据库连接失败] --> B{retry_count < 3?}
|
|
|
|
|
|
B -->|是| C[延迟重试<br/>60s / 120s / 240s]
|
|
|
|
|
|
C --> D[重新入队]
|
|
|
|
|
|
D --> E[等待下次消费]
|
|
|
|
|
|
E --> B
|
|
|
|
|
|
|
|
|
|
|
|
B -->|否| F[超过重试次数]
|
|
|
|
|
|
F --> G[发送到错误队列]
|
|
|
|
|
|
G --> H[人工介入处理]
|
|
|
|
|
|
|
|
|
|
|
|
style A fill:#ffcdd2
|
|
|
|
|
|
style C fill:#fff9c4
|
|
|
|
|
|
style G fill:#ffab91
|
|
|
|
|
|
style H fill:#ce93d8
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 最佳实践
|
|
|
|
|
|
|
|
|
|
|
|
### 1. 生产者最佳实践
|
|
|
|
|
|
|
|
|
|
|
|
- ✅ **使用结构化 message_id**:格式 `{type}#{app}#{company}#{platform}#{store}#{type}#{unique_id}`
|
|
|
|
|
|
- ✅ **必须提供 data_version**:从平台数据中提取更新时间戳,用于防止乱序更新
|
|
|
|
|
|
- ✅ **消息持久化**:`delivery_mode=2`
|
|
|
|
|
|
- ✅ **包含完整 raw_data**:保留平台原始完整数据,便于后续数据补全和问题排查
|
|
|
|
|
|
- ✅ **使用正确的 routing_key**:`order`/`product`/`refund`/`inventory`
|
|
|
|
|
|
- ✅ **实现发布确认**:等待 RabbitMQ 确认消息已接收
|
|
|
|
|
|
- ✅ **实现重连机制**:网络故障时自动重连
|
|
|
|
|
|
- ❌ **不要发送超大消息**(> 1 MB):考虑分片或使用对象存储 + 引用
|
|
|
|
|
|
|
|
|
|
|
|
### 2. 消费者最佳实践
|
|
|
|
|
|
|
|
|
|
|
|
- ✅ **单消费者模型**:每个队列一个消费者实例,避免消息分散
|
|
|
|
|
|
- ✅ **批处理机制**:使用 prefetch + 批处理提升吞吐(batch_size=50, prefetch_count=100)
|
|
|
|
|
|
- ✅ **适配器模式**:消费者内部根据平台字段分发到不同业务逻辑
|
|
|
|
|
|
- ✅ **手动 ACK**:单条确认,失败消息不影响其他消息
|
|
|
|
|
|
- ✅ **幂等性保证**:使用 `processed_messages` 表记录已处理的 message_id
|
|
|
|
|
|
- ✅ **版本控制**:使用 `data_version` 字段防止乱序更新(批处理内部可能并发)
|
|
|
|
|
|
- ✅ **事务处理**:在一个事务中完成幂等性检查 + 业务数据更新
|
|
|
|
|
|
- ✅ **DLX 重试机制**:失败消息通过 nack(requeue=false) 进入 DLX,延迟 5 秒后自动重试
|
|
|
|
|
|
- ✅ **错误分类处理**:区分可重试错误(临时)和业务错误(永久)
|
|
|
|
|
|
- ✅ **限制重试次数**:最多 3 次,通过消息 header `x-retries` 控制
|
|
|
|
|
|
- ✅ **批量数据库操作**:批次内使用批量插入提升性能
|
|
|
|
|
|
- ✅ **记录详细日志**:便于故障排查
|
|
|
|
|
|
- ✅ **定期清理 processed_messages**:保留 30 天历史记录即可
|
|
|
|
|
|
- ❌ **不要同队列多消费者**:会导致消息 Round-Robin 分配,破坏设计
|
|
|
|
|
|
|
|
|
|
|
|
### 3. 运维最佳实践
|
|
|
|
|
|
|
|
|
|
|
|
- ✅ **定期备份配置**:使用 `rabbitmqadmin export`
|
|
|
|
|
|
- ✅ **监控告警**:配置 Prometheus + Grafana
|
|
|
|
|
|
- ✅ **日志归档**:定期清理 `failed_messages` 表
|
|
|
|
|
|
- ✅ **压力测试**:定期测试消息吞吐能力
|
|
|
|
|
|
- ✅ **灾难恢复演练**:定期演练故障恢复流程
|
|
|
|
|
|
- ✅ **文档更新**:及时更新配置文档
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 附录
|
|
|
|
|
|
|
|
|
|
|
|
### A. 消息示例
|
|
|
|
|
|
|
|
|
|
|
|
#### 订单消息
|
|
|
|
|
|
|
|
|
|
|
|
```json
|
|
|
|
|
|
{
|
2026-02-28 10:38:33 +08:00
|
|
|
|
"message_id": "order#datahub#100#20#200#order#DY202501140001",
|
2025-11-14 16:04:26 +08:00
|
|
|
|
"timestamp": "2025-01-14T10:30:00Z",
|
|
|
|
|
|
"platform": "douyin",
|
|
|
|
|
|
"data_type": "order",
|
|
|
|
|
|
"metadata": {
|
|
|
|
|
|
"platform_id": 20,
|
|
|
|
|
|
"company_id": 100,
|
|
|
|
|
|
"store_id": 200,
|
|
|
|
|
|
"source_system": "douyin-open-api",
|
|
|
|
|
|
"retry_count": 0,
|
|
|
|
|
|
"data_version": 1736840000
|
|
|
|
|
|
},
|
|
|
|
|
|
"data": {
|
|
|
|
|
|
"platform_unique_id": "DY202501140001",
|
|
|
|
|
|
"raw_data": {
|
|
|
|
|
|
"order_id": "DY202501140001",
|
|
|
|
|
|
"create_time": 1736839200,
|
|
|
|
|
|
"pay_time": 1736840000,
|
|
|
|
|
|
"update_time": 1736840000,
|
|
|
|
|
|
"order_amount": 9999,
|
|
|
|
|
|
"pay_amount": 9999,
|
|
|
|
|
|
"order_status": 2,
|
|
|
|
|
|
"receiver_info": {
|
|
|
|
|
|
"province": "北京市",
|
|
|
|
|
|
"city": "北京市",
|
|
|
|
|
|
"district": "朝阳区",
|
|
|
|
|
|
"detail": "某某街道123号"
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
#### 产品消息
|
|
|
|
|
|
|
|
|
|
|
|
```json
|
|
|
|
|
|
{
|
2026-02-28 10:38:33 +08:00
|
|
|
|
"message_id": "product#datahub#100#2#201#product#TM-PROD-789",
|
2025-11-14 16:04:26 +08:00
|
|
|
|
"timestamp": "2025-01-14T11:00:00Z",
|
|
|
|
|
|
"platform": "tmall",
|
|
|
|
|
|
"data_type": "product",
|
|
|
|
|
|
"metadata": {
|
|
|
|
|
|
"platform_id": 2,
|
|
|
|
|
|
"company_id": 100,
|
|
|
|
|
|
"store_id": 201,
|
|
|
|
|
|
"source_system": "tmall-top-api",
|
|
|
|
|
|
"retry_count": 0,
|
|
|
|
|
|
"data_version": 1736841600
|
|
|
|
|
|
},
|
|
|
|
|
|
"data": {
|
|
|
|
|
|
"platform_unique_id": "TM-PROD-789",
|
|
|
|
|
|
"raw_data": {
|
|
|
|
|
|
"num_iid": 789123456789,
|
|
|
|
|
|
"title": "示例产品",
|
|
|
|
|
|
"price": "499.00",
|
|
|
|
|
|
"type": "fixed",
|
|
|
|
|
|
"modified": "2025-01-14 11:00:00",
|
|
|
|
|
|
"sku_list": [
|
|
|
|
|
|
{
|
|
|
|
|
|
"sku_id": 987654321,
|
|
|
|
|
|
"price": "499.00",
|
|
|
|
|
|
"quantity": 100
|
|
|
|
|
|
}
|
|
|
|
|
|
]
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
### B. 参考文档
|
|
|
|
|
|
|
|
|
|
|
|
- [RabbitMQ 配置方案](./RabbitMQ.md)
|
|
|
|
|
|
- [TimescaleDB 数据查询方案](./data_query.md)
|
|
|
|
|
|
- [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html)
|
|
|
|
|
|
- [AMQP 0-9-1 协议](https://www.rabbitmq.com/amqp-0-9-1-reference.html)
|