diff --git a/docs/data_flow.md b/docs/data_flow.md
new file mode 100644
index 0000000..9fe5c6c
--- /dev/null
+++ b/docs/data_flow.md
@@ -0,0 +1,1497 @@
+# 数据流转架构
+
+## 概述
+
+本文档描述电商数据从平台采集到入库的完整流程,包括各系统角色、职责和操作说明。
+
+### 系统角色
+
+| 角色 | 职责 | 系统 |
+|------|------|------|
+| **数据生产者** | 采集电商平台数据并推送到 MQ | 各平台采集服务 |
+| **消息队列** | 数据路由、缓冲、隔离 | RabbitMQ |
+| **数据消费者** | 消费 MQ 消息并入库 | Dataflow App |
+| **存储层** | 持久化存储和统计分析 | PostgreSQL + TimescaleDB |
+
+---
+
+## 整体架构
+
+```mermaid
+graph TB
+ subgraph "数据生产者层"
+ P1[DouYin 采集服务]
+ P2[Tmall 采集服务]
+ P3[RedBook 采集服务]
+ P4[其他平台采集服务...]
+ end
+
+ subgraph "RabbitMQ 消息队列"
+ subgraph "VHost: dataflow-app"
+ subgraph "业务 Exchanges"
+ E1[douyin.exchange]
+ E2[tmall.exchange]
+ E3[redbook.exchange]
+ E4[其他.exchange...]
+ end
+
+ subgraph "业务 Queues"
+ Q1[orders.queue
DLX: dlx.orders]
+ Q2[products.queue
DLX: dlx.products]
+ Q3[refunds.queue
DLX: dlx.refunds]
+ Q4[inventory.queue
DLX: dlx.inventory]
+ end
+
+ subgraph "DLX 死信交换机"
+ DLX1[dlx.orders]
+ DLX2[dlx.products]
+ DLX3[dlx.refunds]
+ DLX4[dlx.inventory]
+ end
+
+ subgraph "重试队列"
+ RQ1[orders.retry.queue
TTL: 5s]
+ RQ2[products.retry.queue
TTL: 5s]
+ RQ3[refunds.retry.queue
TTL: 5s]
+ RQ4[inventory.retry.queue
TTL: 5s]
+ end
+
+ subgraph "错误处理"
+ EE1[douyin.errors.exchange]
+ EE2[tmall.errors.exchange]
+ EE3[redbook.errors.exchange]
+ EQ[errors.queue]
+ end
+ end
+ end
+
+ subgraph "Dataflow 消费者"
+ C1[OrderConsumer]
+ C2[ProductConsumer]
+ C3[RefundConsumer]
+ C4[InventoryConsumer]
+ C5[ErrorConsumer]
+ end
+
+ subgraph "存储层"
+ DB1[(orders 表
TimescaleDB)]
+ DB2[(products 表)]
+ DB3[(refunds 表)]
+ DB4[(inventory 表)]
+ DB5[(failed_messages 表)]
+ end
+
+ subgraph "聚合视图"
+ AGG1[orders_daily_by_created]
+ AGG2[orders_daily_by_paid]
+ end
+
+ P1 -->|推送订单/产品/退款/库存| E1
+ P2 -->|推送订单/产品/退款/库存| E2
+ P3 -->|推送订单/产品/退款/库存| E3
+ P4 -->|推送订单/产品/退款/库存| E4
+
+ E1 & E2 & E3 & E4 -->|routing_key: order.platform| Q1
+ E1 & E2 & E3 & E4 -->|routing_key: product.platform| Q2
+ E1 & E2 & E3 & E4 -->|routing_key: refund.platform| Q3
+ E1 & E2 & E3 & E4 -->|routing_key: inventory.platform| Q4
+
+ Q1 --> C1
+ Q2 --> C2
+ Q3 --> C3
+ Q4 --> C4
+
+ C1 -->|成功| DB1
+ C2 -->|成功| DB2
+ C3 -->|成功| DB3
+ C4 -->|成功| DB4
+
+ C1 -.->|失败 nack| DLX1
+ C2 -.->|失败 nack| DLX2
+ C3 -.->|失败 nack| DLX3
+ C4 -.->|失败 nack| DLX4
+
+ DLX1 -->|x-retries<3| RQ1
+ DLX2 -->|x-retries<3| RQ2
+ DLX3 -->|x-retries<3| RQ3
+ DLX4 -->|x-retries<3| RQ4
+
+ RQ1 -.->|TTL 到期
回到主队列| Q1
+ RQ2 -.->|TTL 到期
回到主队列| Q2
+ RQ3 -.->|TTL 到期
回到主队列| Q3
+ RQ4 -.->|TTL 到期
回到主队列| Q4
+
+ DLX1 & DLX2 & DLX3 & DLX4 -->|x-retries>=3| EQ
+ C1 & C2 & C3 & C4 -.->|永久失败| EE1 & EE2 & EE3
+ EE1 & EE2 & EE3 --> EQ
+ EQ --> C5
+ C5 --> DB5
+
+ DB1 --> AGG1
+ DB1 --> AGG2
+
+ style P1 fill:#e1f5dd
+ style P2 fill:#e1f5dd
+ style P3 fill:#e1f5dd
+ style P4 fill:#e1f5dd
+ style E1 fill:#fff4e6
+ style E2 fill:#fff4e6
+ style E3 fill:#fff4e6
+ style E4 fill:#fff4e6
+ style Q1 fill:#e3f2fd
+ style Q2 fill:#e3f2fd
+ style Q3 fill:#e3f2fd
+ style Q4 fill:#e3f2fd
+ style DLX1 fill:#ffe0b2
+ style DLX2 fill:#ffe0b2
+ style DLX3 fill:#ffe0b2
+ style DLX4 fill:#ffe0b2
+ style RQ1 fill:#fff9c4
+ style RQ2 fill:#fff9c4
+ style RQ3 fill:#fff9c4
+ style RQ4 fill:#fff9c4
+ style C1 fill:#f3e5f5
+ style C2 fill:#f3e5f5
+ style C3 fill:#f3e5f5
+ style C4 fill:#f3e5f5
+ style DB1 fill:#fce4ec
+ style AGG1 fill:#ffebee
+ style AGG2 fill:#ffebee
+```
+
+---
+
+## RabbitMQ 内部架构
+
+### 业务数据流转
+
+```mermaid
+graph LR
+ subgraph "生产者权限隔离"
+ PD[DouYin 开发者
user_douyin]
+ PT[Tmall 开发者
user_tmall]
+ PR[RedBook 开发者
user_redbook]
+ end
+
+ subgraph "平台 Exchanges (Topic)"
+ ED[douyin.exchange]
+ ET[tmall.exchange]
+ ER[redbook.exchange]
+ end
+
+ subgraph "数据类型 Queues"
+ QO[orders.queue
TTL: 24h]
+ QP[products.queue
TTL: 24h]
+ QR[refunds.queue
TTL: 24h]
+ QI[inventory.queue
TTL: 24h]
+ end
+
+ subgraph "消费者"
+ CO[OrderConsumer
user_dataflow_consumer]
+ end
+
+ PD -->|write only| ED
+ PT -->|write only| ET
+ PR -->|write only| ER
+
+ ED -->|order.douyin| QO
+ ED -->|product.douyin| QP
+ ED -->|refund.douyin| QR
+ ED -->|inventory.douyin| QI
+
+ ET -->|order.tmall| QO
+ ET -->|product.tmall| QP
+ ET -->|refund.tmall| QR
+ ET -->|inventory.tmall| QI
+
+ ER -->|order.redbook| QO
+ ER -->|product.redbook| QP
+ ER -->|refund.redbook| QR
+ ER -->|inventory.redbook| QI
+
+ QO -->|read| CO
+ QP -->|read| CO
+ QR -->|read| CO
+ QI -->|read| CO
+
+ style PD fill:#c8e6c9
+ style PT fill:#c8e6c9
+ style PR fill:#c8e6c9
+ style ED fill:#fff9c4
+ style ET fill:#fff9c4
+ style ER fill:#fff9c4
+ style QO fill:#bbdefb
+ style QP fill:#bbdefb
+ style QR fill:#bbdefb
+ style QI fill:#bbdefb
+ style CO fill:#d1c4e9
+```
+
+### 错误处理架构
+
+```mermaid
+graph TB
+ subgraph "消费者错误产生"
+ C1[OrderConsumer]
+ C2[ProductConsumer]
+ C3[RefundConsumer]
+ C4[InventoryConsumer]
+ end
+
+ subgraph "平台错误 Exchanges"
+ ED[douyin.errors.exchange]
+ ET[tmall.errors.exchange]
+ ER[redbook.errors.exchange]
+ end
+
+ subgraph "统一错误队列"
+ EQ[errors.queue
TTL: 7 days]
+ end
+
+ subgraph "错误消息消费者"
+ EC[ErrorConsumer
user_ops]
+ DB[(failed_messages 表)]
+ end
+
+ subgraph "平台开发者监控"
+ PD[DouYin 开发者
订阅 douyin.errors.exchange]
+ PT[Tmall 开发者
订阅 tmall.errors.exchange]
+ end
+
+ C1 -->|判断平台| ED
+ C1 -->|判断平台| ET
+ C1 -->|判断平台| ER
+
+ C2 -->|判断平台| ED
+ C2 -->|判断平台| ET
+ C2 -->|判断平台| ER
+
+ C3 -->|判断平台| ED
+ C3 -->|判断平台| ET
+ C3 -->|判断平台| ER
+
+ C4 -->|判断平台| ED
+ C4 -->|判断平台| ET
+ C4 -->|判断平台| ER
+
+ ED -->|routing_key: #| EQ
+ ET -->|routing_key: #| EQ
+ ER -->|routing_key: #| EQ
+
+ EQ --> EC
+ EC --> DB
+
+ ED -.->|read only| PD
+ ET -.->|read only| PT
+
+ style C1 fill:#ffccbc
+ style C2 fill:#ffccbc
+ style C3 fill:#ffccbc
+ style C4 fill:#ffccbc
+ style ED fill:#ffe0b2
+ style ET fill:#ffe0b2
+ style ER fill:#ffe0b2
+ style EQ fill:#ffcdd2
+ style EC fill:#f8bbd0
+ style PD fill:#c5e1a5
+ style PT fill:#c5e1a5
+```
+
+---
+
+## 关键技术方案
+
+### 1. 乱序消费与数据一致性
+
+#### 为什么必须支持乱序消费?
+
+系统设计上**必须支持乱序消费**,原因如下:
+
+**批处理内部并发**:
+- 单消费者使用批处理模式(prefetch_count=100,batch_size=50)
+- 批次内的消息可能并发处理,无法保证严格顺序
+- 同一批次中不同平台的消息会并发入库
+
+**公平性需求**:
+- 多个平台的消息进入同一队列(orders.queue)
+- 批处理时优先处理简单消息,避免复杂消息阻塞整个批次
+- 示例:DouYin 的大量消息和 Tmall 的消息混合在同一批次中并发处理
+
+**重试消息的乱序**:
+- 失败消息通过 DLX 延迟重试后重新进入队列
+- 重试消息可能与新消息乱序
+- 必须防止旧版本数据覆盖新版本
+
+#### 乱序消费带来的挑战
+
+**核心问题**:同一订单的多次更新消息可能乱序到达
+
+```
+时间 T1: 订单创建,金额 100 元
+ → message_1: {platform_order_id: "DY123", amount: 100, data_version: 1736840000}
+
+时间 T2: 订单更新,金额 200 元
+ → message_2: {platform_order_id: "DY123", amount: 200, data_version: 1736840100}
+
+乱序消费(正常现象):
+ Consumer A 先消费 message_2 → 数据库中金额为 200 ✅
+ Consumer B 后消费 message_1 → 如果没有版本控制,会覆盖为 100 ❌ 错误!
+```
+
+**注意**:
+- ✅ 不同订单的消息乱序消费 → 完全没问题
+- ✅ 不同平台的消息乱序消费 → 完全没问题
+- ❌ 同一订单的旧版本覆盖新版本 → 需要防止
+
+**重要说明**:
+虽然采用单消费者模型,但以下场景仍会导致乱序:
+1. **批处理内部**:批次中的 50 条消息可能并发验证和入库
+2. **重试消息**:失败消息经过 DLX 延迟 5 秒后回到队列,会与新消息混合
+3. **数据库层面**:即使消费顺序正确,数据库事务提交顺序也可能不同
+
+因此,**`data_version` 机制是必需的**,用于在数据库层面保证数据一致性。
+
+#### 解决方案:数据版本控制
+
+**方案**:在消息中增加 `data_version` 字段(Unix 时间戳),确保只有更新的数据才能写入数据库
+
+```sql
+-- 数据库表增加 data_version 字段
+ALTER TABLE orders ADD COLUMN data_version BIGINT NOT NULL DEFAULT 0;
+
+-- 消费时只有更新的数据才写入
+INSERT INTO orders (
+ platform_id, platform_order_id, total_amount, data_version, ...
+) VALUES (20, 'DY123', 200, 1736840100, ...)
+ON CONFLICT (platform_id, platform_order_id)
+DO UPDATE SET
+ total_amount = EXCLUDED.total_amount,
+ data_version = EXCLUDED.data_version,
+ updated_at = NOW()
+WHERE orders.data_version < EXCLUDED.data_version; -- 关键:只有版本更新才更新
+```
+
+**工作流程**:
+
+```
+message_2 先到达 (data_version: 1736840100):
+ INSERT → 数据库: amount=200, data_version=1736840100 ✅
+
+message_1 后到达 (data_version: 1736840000):
+ ON CONFLICT DO UPDATE
+ WHERE orders.data_version < 1736840000 → 条件不满足,不更新 ✅
+ 数据库仍然是: amount=200, data_version=1736840100
+```
+
+**消息格式要求**:
+
+```json
+{
+ "message_id": "order#dataflow#100#20#200#order#DY123",
+ "metadata": {
+ "data_version": 1736840100 // 必需:平台数据的最后更新时间戳
+ },
+ "data": {
+ "platform_unique_id": "DY123",
+ "raw_data": {
+ "update_time": 1736840100 // 从平台原始数据中提取
+ }
+ }
+}
+```
+
+---
+
+### 2. 幂等性保证(无 Redis 依赖)
+
+#### 方案:数据库表记录已处理消息
+
+创建 `processed_messages` 表记录所有已处理的 message_id:
+
+```sql
+CREATE TABLE processed_messages (
+ message_id VARCHAR(200) PRIMARY KEY,
+ platform_id INT NOT NULL,
+ data_type VARCHAR(50) NOT NULL,
+ processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
+);
+
+-- 索引:加速查询
+CREATE INDEX idx_processed_messages_created ON processed_messages (created_at);
+
+-- 自动清理 30 天前的记录(可选)
+CREATE INDEX idx_processed_messages_cleanup ON processed_messages (created_at)
+ WHERE created_at < NOW() - INTERVAL '30 days';
+```
+
+#### 消费流程
+
+```
+1. 收到消息,提取 message_id
+
+2. 检查是否已处理:
+ SELECT 1 FROM processed_messages WHERE message_id = ?
+
+ 如果存在 → ACK 消息,跳过处理(幂等性)
+
+3. 开启事务:
+ BEGIN;
+
+ 3.1 插入 processed_messages:
+ INSERT INTO processed_messages (message_id, platform_id, data_type)
+ VALUES (?, ?, ?)
+ ON CONFLICT (message_id) DO NOTHING; -- 防止并发插入
+
+ 3.2 插入/更新业务数据:
+ INSERT INTO orders (...) VALUES (...)
+ ON CONFLICT (platform_id, platform_order_id)
+ DO UPDATE SET ...
+ WHERE orders.data_version < EXCLUDED.data_version;
+
+ COMMIT;
+
+4. ACK 消息
+```
+
+#### 性能优化
+
+**问题**:每次消费都查询 `processed_messages` 可能影响性能
+
+**优化方案**:
+
+```sql
+-- 1. 使用 PostgreSQL 的 ON CONFLICT 避免先查询
+INSERT INTO processed_messages (message_id, platform_id, data_type)
+VALUES (?, ?, ?)
+ON CONFLICT (message_id) DO NOTHING
+RETURNING message_id;
+
+-- 如果 RETURNING 无结果 → 说明已处理,直接返回
+-- 如果 RETURNING 有结果 → 继续处理业务数据
+
+-- 2. 定期清理历史记录(减少表大小)
+DELETE FROM processed_messages
+WHERE created_at < NOW() - INTERVAL '30 days';
+```
+
+#### 与数据版本控制结合
+
+```
+完整的幂等性 + 乱序处理流程:
+
+BEGIN;
+
+-- 1. 记录消息已处理(幂等性)
+INSERT INTO processed_messages (message_id, platform_id, data_type)
+VALUES ('order#dataflow#100#20#200#order#DY123', 20, 'order')
+ON CONFLICT (message_id) DO NOTHING
+RETURNING message_id;
+
+-- 如果返回空,说明重复消息,直接 ROLLBACK 并 ACK
+-- 如果返回值,继续处理
+
+-- 2. 更新业务数据(带版本控制,防止乱序)
+INSERT INTO orders (
+ platform_id, platform_order_id, total_amount, data_version, created_at, updated_at
+) VALUES (20, 'DY123', 200, 1736840100, NOW(), NOW())
+ON CONFLICT (platform_id, platform_order_id)
+DO UPDATE SET
+ total_amount = EXCLUDED.total_amount,
+ data_version = EXCLUDED.data_version,
+ updated_at = NOW()
+WHERE orders.data_version < EXCLUDED.data_version;
+
+COMMIT;
+
+-- 3. ACK 消息
+```
+
+---
+
+## 数据流转流程
+
+### 1. 订单数据完整流程
+
+```mermaid
+sequenceDiagram
+ participant P as DouYin 采集服务
+ participant E as douyin.exchange
+ participant Q as orders.queue
+ participant C as OrderConsumer
+ participant DB as PostgreSQL
+ participant PM as processed_messages 表
+ participant AGG as 连续聚合视图
+
+ Note over P: 1. 数据采集阶段
+ P->>P: 从 DouYin API 获取订单数据
+ P->>P: 生成 message_id
order#dataflow#100#20#200#order#DY123
提取 data_version
+
+ Note over P,E: 2. 消息发布阶段
+ P->>E: 发布消息
routing_key: order
持久化消息
+ E->>Q: 路由到订单队列
+
+ Note over Q,C: 3. 消息消费阶段
+ Q->>C: 推送消息 (prefetch 10)
+ C->>C: 验证消息格式
+ C->>C: 验证 message_id 结构
+
+ Note over C,DB: 4. 幂等性检查 + 数据入库 (事务)
+ C->>DB: BEGIN TRANSACTION
+
+ C->>PM: INSERT INTO processed_messages
ON CONFLICT DO NOTHING
RETURNING message_id
+
+ alt 返回空 (重复消息)
+ PM-->>C: 无返回值
+ C->>DB: ROLLBACK
+ C->>Q: ACK (跳过处理)
+ else 返回值 (新消息)
+ PM-->>C: 返回 message_id
+
+ Note over C,DB: 5. 更新业务数据 (带版本控制)
+ C->>DB: INSERT INTO orders
ON CONFLICT DO UPDATE
WHERE data_version < new_version
+ DB-->>C: 更新成功
+
+ C->>DB: COMMIT
+ C->>Q: ACK (确认消息)
+
+ Note over DB,AGG: 6. 自动聚合 (后台)
+ DB->>AGG: TimescaleDB 后台任务
每小时刷新聚合视图
+ end
+```
+
+### 2. 错误处理流程
+
+```mermaid
+sequenceDiagram
+ participant P as Tmall 采集服务
+ participant E as tmall.exchange
+ participant Q as orders.queue
+ participant C as OrderConsumer
+ participant EE as tmall.errors.exchange
+ participant EQ as errors.queue
+ participant EC as ErrorConsumer
+ participant DB as failed_messages 表
+ participant Alert as 告警系统
+
+ P->>E: 发布订单消息
+ E->>Q: 路由到订单队列
+ Q->>C: 推送消息
+
+ Note over C: 消费处理
+ C->>C: 验证消息格式
+ alt 数据格式错误 (validation)
+ C->>C: 捕获 ValidationException
+ C->>EE: 发布错误消息
routing_key: validation
+ EE->>EQ: 路由到错误队列
+ C->>Q: ACK (从业务队列移除)
+
+ Note over EQ,EC: 错误消息处理
+ EQ->>EC: 推送错误消息
+ EC->>DB: 记录错误日志
+ EC->>Alert: 判断是否需要告警
+ alt 满足告警条件
+ Alert->>Alert: 发送 Slack/Email 通知
+ end
+ EC->>EQ: ACK
+
+ else 处理成功
+ C->>C: 入库成功
+ C->>Q: ACK
+ end
+```
+
+### 3. 消息重试流程(DLX + 延迟重试队列)
+
+```mermaid
+sequenceDiagram
+ participant Q as orders.queue
+ participant C as OrderConsumer
+ participant DB as PostgreSQL
+ participant DLX as dlx.orders
+ participant RQ as orders.retry.queue
+ participant EQ as errors.queue
+
+ Q->>C: 推送消息 (x-retries: 0)
+
+ Note over C,DB: 第 1 次处理
+ C->>DB: 尝试入库
+ DB-->>C: 数据库连接失败
+
+ C->>C: 更新 header: x-retries = 1
+ C->>C: nack(requeue=false)
+ C-->>DLX: 消息进入 DLX
+
+ Note over DLX: 根据 x-retries 路由
+ DLX->>RQ: x-retries < 3
routing_key: retry
+
+ Note over RQ: TTL = 5 秒
+ Note over RQ: 5 秒后自动死信
+
+ RQ-->>Q: TTL 到期
回到主队列
+
+ Note over Q,C: 第 2 次处理
+ Q->>C: 推送消息 (x-retries: 1)
+ C->>DB: 尝试入库
+ DB-->>C: 数据库连接失败
+
+ C->>C: 更新 header: x-retries = 2
+ C->>C: nack(requeue=false)
+ C-->>DLX: 消息进入 DLX
+ DLX->>RQ: x-retries < 3
+
+ Note over RQ: 5 秒后自动死信
+ RQ-->>Q: 回到主队列
+
+ Note over Q,C: 第 3 次处理
+ Q->>C: 推送消息 (x-retries: 2)
+ C->>DB: 尝试入库
+ DB-->>C: 数据库连接失败
+
+ C->>C: 更新 header: x-retries = 3
+ C->>C: nack(requeue=false)
+ C-->>DLX: 消息进入 DLX
+
+ Note over DLX: x-retries >= 3
超过重试次数
+ DLX->>EQ: routing_key: error
进入错误队列
+```
+
+---
+
+## 各角色职责和操作
+
+### 1. 平台开发者(数据生产者)
+
+#### 职责
+
+- 从电商平台 API 采集数据(订单/产品/退款)
+- 将数据转换为标准消息格式
+- 推送消息到 RabbitMQ
+- 监控自己平台的错误消息
+
+#### 权限
+
+| 操作 | 权限 | 说明 |
+|------|------|------|
+| 连接到 VHost | `dataflow-app` | 只能访问此 VHost |
+| 发布消息 | `{platform}.exchange` | 只能写入自己平台的 Exchange |
+| 订阅错误 | `{platform}.errors.exchange` | 可选,接收本平台错误通知 |
+
+#### 操作流程
+
+##### 步骤 1: 生成 message_id
+
+```
+格式: {entity_type}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{platform_unique_id}
+
+示例:
+- order#dataflow#100#20#200#order#DY123456789
+- product#dataflow#100#2#201#product#TM-PROD789
+- refund#dataflow#100#20#200#refund#DY-REF456
+- inventory#dataflow#100#2#201#inventory#TM-SKU123
+```
+
+**生成逻辑**:
+```
+# 伪代码
+message_id = f"{data_type}#dataflow#{company_id}#{platform_id}#{store_id}#{data_type}#{platform_order_id}"
+
+# 实际示例(DouYin 订单)
+message_id = f"order#dataflow#100#20#200#order#{order_data['order_id']}"
+```
+
+##### 步骤 2: 提取 data_version
+
+**重要**:必须从平台原始数据中提取更新时间戳作为 `data_version`
+
+```python
+# 示例:从 DouYin API 响应提取
+douyin_order = {
+ "order_id": "DY202501140001",
+ "create_time": 1736839200,
+ "update_time": 1736840000, # ← 使用这个作为 data_version
+ ...
+}
+
+data_version = douyin_order.get('update_time') or douyin_order.get('create_time')
+
+# 如果平台未提供更新时间,使用当前时间
+if not data_version:
+ data_version = int(time.time())
+```
+
+##### 步骤 3: 构建消息体
+
+```json
+{
+ "message_id": "order#dataflow#100#20#200#order#DY123456",
+ "timestamp": "2025-01-14T10:30:00Z",
+ "platform": "douyin",
+ "data_type": "order",
+ "metadata": {
+ "platform_id": 20,
+ "company_id": 100,
+ "store_id": 200,
+ "source_system": "douyin-open-api",
+ "retry_count": 0,
+ "data_version": 1736840000
+ },
+ "data": {
+ "platform_unique_id": "DY123456",
+ "raw_data": {
+ // 平台原始完整 JSON 数据(必须包含更新时间)
+ "order_id": "DY123456",
+ "update_time": 1736840000,
+ ...
+ }
+ }
+}
+```
+
+##### 步骤 4: 发布消息
+
+**连接参数**:
+- Host: ``
+- Port: `5672`
+- VHost: `dataflow-app`
+- Username: `user_{platform}`
+- Password: ``
+
+**发布配置**:
+- Exchange: `{platform}.exchange`
+- Routing Key: `{data_type}.{platform}` 格式
+ - 订单: `order.{platform}` (如 `order.douyin`, `order.tmall`)
+ - 产品: `product.{platform}` (如 `product.douyin`, `product.tmall`)
+ - 退款: `refund.{platform}` (如 `refund.douyin`, `refund.tmall`)
+ - 库存: `inventory.{platform}` (如 `inventory.douyin`, `inventory.tmall`)
+- Delivery Mode: `2` (持久化)
+
+##### 步骤 4: 监控错误(可选)
+
+**订阅配置**:
+- Exchange: `{platform}.errors.exchange`
+- Routing Key: `#` (接收所有错误类型)
+- 创建临时队列或持久队列用于接收错误消息
+
+**错误消息示例**:
+```json
+{
+ "error_id": "err_1234567890",
+ "original_message": { /* 原始消息 */ },
+ "error": {
+ "type": "validation",
+ "message": "Missing required field: total_amount",
+ "timestamp": "2025-01-14T10:31:00Z"
+ },
+ "metadata": {
+ "platform": "amazon",
+ "platform_id": 1,
+ "store_id": 200,
+ "failed_at": "2025-01-14T10:31:00Z"
+ }
+}
+```
+
+---
+
+### 2. Dataflow 应用(数据消费者)
+
+#### 职责
+
+- 消费 RabbitMQ 业务队列中的消息
+- 使用批处理模式提升吞吐量(prefetch=100, batch_size=50)
+- 通过适配器模式支持多平台数据处理
+- 验证消息格式和数据完整性
+- 将数据入库到 PostgreSQL
+- 处理失败消息,通过 DLX 延迟重试或进入错误队列
+- 实现幂等性,避免重复处理
+
+#### 消费者架构模型
+
+**单消费者 + 批处理 + 适配器模式**:
+
+```
+OrderConsumer (单实例)
+├── RabbitMQ Channel
+│ └── prefetch_count: 100 (最多缓存 100 条未确认消息)
+│
+├── 消息缓冲区 (Message Buffer)
+│ ├── 接收 RabbitMQ 推送的消息
+│ ├── 达到 batch_size=50 或 timeout=3s 触发处理
+│ └── 缓冲区最大容量: 100 条
+│
+├── 批处理引擎 (Batch Processor)
+│ ├── 遍历缓冲区中的消息
+│ ├── 对每条消息调用对应的适配器
+│ └── 批量入库 + 单条 ACK
+│
+└── 平台适配器 (Platform Adapters)
+ ├── DouyinAdapter: 处理抖音订单数据
+ ├── TmallAdapter: 处理天猫订单数据
+ ├── JDAdapter: 处理京东订单数据
+ └── ... 其他平台适配器
+
+工作流程:
+1. RabbitMQ 推送消息到消费者 (最多 100 条未确认)
+2. 消费者缓存消息到缓冲区
+3. 达到 batch_size 或 timeout 后触发批处理
+4. 批处理引擎遍历消息,根据 platform 字段调用对应适配器
+5. 适配器验证和转换数据
+6. 批量插入数据库
+7. 逐条 ACK/NACK 消息
+```
+
+#### 权限
+
+| 操作 | 权限 | 说明 |
+|------|------|------|
+| 连接到 VHost | `dataflow-app` | 访问业务 VHost |
+| 读取队列 | `orders.queue`, `products.queue`, `refunds.queue` | 消费业务消息 |
+| 写入错误 Exchange | `*.errors.exchange` | 发送错误消息到对应平台的错误 Exchange |
+| 管理队列 | `orders.queue`, `products.queue`, `refunds.queue` | 配置消费者参数 |
+
+#### 核心操作
+
+##### 消息消费配置
+
+**推荐参数**:
+- **prefetch_count**: `100` - RabbitMQ 最多推送 100 条未确认消息
+- **batch_size**: `50` - 消费者积累 50 条消息后触发批处理
+- **batch_timeout**: `3000` ms - 如果 3 秒内未达到 batch_size,也触发处理
+- **消费者模型**: 单消费者 + 批处理 + 适配器模式(每个队列一个消费者实例)
+- **ACK 模式**: 手动 ACK,单条确认(处理成功后才确认)
+- **重连机制**: 自动重连,指数退避
+
+##### 幂等性保证(数据库方案)
+
+**方案**:使用 `processed_messages` 表记录已处理的 message_id
+
+```sql
+-- 1. 创建幂等性记录表
+CREATE TABLE processed_messages (
+ message_id VARCHAR(200) PRIMARY KEY,
+ platform_id INT NOT NULL,
+ data_type VARCHAR(50) NOT NULL,
+ processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
+);
+
+CREATE INDEX idx_processed_messages_created ON processed_messages (created_at);
+
+-- 2. 定期清理历史记录(可选,保留 30 天)
+DELETE FROM processed_messages WHERE created_at < NOW() - INTERVAL '30 days';
+```
+
+**消费流程**:
+
+```sql
+-- 在事务中执行
+BEGIN;
+
+-- 步骤 1: 尝试插入 processed_messages(幂等性检查)
+INSERT INTO processed_messages (message_id, platform_id, data_type)
+VALUES ('order#dataflow#100#20#200#order#DY123', 20, 'order')
+ON CONFLICT (message_id) DO NOTHING
+RETURNING message_id;
+
+-- 如果返回空 → 说明是重复消息,ROLLBACK 并 ACK
+-- 如果返回值 → 继续处理
+
+-- 步骤 2: 插入/更新业务数据(带版本控制)
+INSERT INTO orders (
+ platform_id, platform_order_id, company_id, store_id,
+ total_amount, data_version, raw, created_at, updated_at
+) VALUES (20, 'DY123', 100, 200, 9999, 1736840000, '...'::jsonb, NOW(), NOW())
+ON CONFLICT (platform_id, platform_order_id)
+DO UPDATE SET
+ total_amount = EXCLUDED.total_amount,
+ data_version = EXCLUDED.data_version,
+ raw = EXCLUDED.raw,
+ updated_at = NOW()
+WHERE orders.data_version < EXCLUDED.data_version; -- 只有更新的版本才更新
+
+COMMIT;
+```
+
+**关键点**:
+- ✅ 无需 Redis 依赖
+- ✅ 事务保证原子性
+- ✅ `ON CONFLICT DO NOTHING` 实现幂等性
+- ✅ `WHERE data_version < EXCLUDED.data_version` 防止乱序更新
+
+##### 错误处理逻辑
+
+```
+处理流程:
+1. 验证消息格式
+ ├─ 格式正确 → 继续
+ └─ 格式错误 → 发送到错误队列 (validation 类型), ACK
+
+2. 幂等性检查
+ ├─ 已处理 → 直接 ACK
+ └─ 未处理 → 继续
+
+3. 业务处理
+ ├─ 成功 → 标记已处理, ACK
+ └─ 失败 (异常)
+ ├─ 数据库错误 / 网络错误
+ │ ├─ retry_count < 3 → 重新发布 (延迟重试), ACK
+ │ └─ retry_count >= 3 → 发送到错误队列 (processing_failed), ACK
+ └─ 业务逻辑错误 → 发送到错误队列 (validation), ACK
+```
+
+##### 延迟重试实现(DLX + 重试队列方案)
+
+**重试策略**:
+- 重试延迟: 固定 5 秒(通过 TTL 实现)
+- 最大重试次数: 3 次
+- 超过 3 次: 进入错误队列
+
+**实现方式**(无需额外插件):
+```
+消费失败 → nack(requeue=false) → 进入 DLX
+ ↓
+ x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列
+ ↓
+ x-retries >= 3 → 错误队列 (人工处理)
+```
+
+**工作流程**:
+1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX(如 dlx.orders)
+2. DLX 根据消息 header 中的 `x-retries` 判断路由:
+ - `x-retries < 3`:路由到 `orders.retry.queue`(延迟重试)
+ - `x-retries >= 3`:路由到 `errors.queue`(永久失败)
+3. 重试队列 TTL 到期后,消息自动死信回主队列,重新消费
+4. 消费者需要在 nack 前更新消息的 `x-retries` header
+
+---
+
+### 3. 运维团队
+
+#### 职责
+
+- RabbitMQ 集群部署和维护
+- 用户和权限管理
+- 监控队列健康状态
+- 处理系统级错误
+- 性能优化和故障排查
+
+#### 权限
+
+| 操作 | 权限 | 说明 |
+|------|------|------|
+| 管理员账号 | `admin` tag | 访问管理界面,完全权限 |
+| 监控错误队列 | `errors.queue` (read) | 查看所有平台的错误消息 |
+| 配置资源 | 所有 Exchange/Queue | 创建、修改、删除资源 |
+
+#### 日常操作
+
+##### 监控关键指标
+
+**队列堆积监控**:
+
+```bash
+# 查看所有队列状态
+rabbitmqctl list_queues -p dataflow-app name messages_ready messages_unacknowledged
+
+# 告警阈值
+# - messages_ready > 10000: 严重堆积
+# - messages_unacknowledged > 5000: 消费者处理缓慢
+```
+
+**消费者连接监控**:
+
+```bash
+# 查看所有消费者连接
+rabbitmqctl list_consumers -p dataflow-app
+
+# 检查项:
+# - 消费者数量是否正常
+# - prefetch_count 配置是否合理
+# - 是否有消费者断开
+```
+
+**错误率监控**:
+
+```sql
+-- 查询最近 1 小时各平台错误数量
+SELECT
+ platform,
+ error_type,
+ COUNT(*) as error_count
+FROM failed_messages
+WHERE created_at > NOW() - INTERVAL '1 hour'
+GROUP BY platform, error_type
+ORDER BY error_count DESC;
+```
+
+##### 故障排查
+
+**队列堆积问题**:
+
+```
+排查步骤:
+1. 检查消费者是否在线
+ rabbitmqctl list_consumers -p dataflow-app
+
+2. 查看消费速率
+ 管理界面 → Queues → 选择队列 → 查看 Deliver/Get rate
+
+3. 检查消费者日志
+ - 是否有异常报错
+ - 处理时间是否过长
+
+解决方案:
+- 增加消费者实例数量
+- 增加 prefetch_count
+- 优化业务处理逻辑
+```
+
+**消息丢失问题**:
+
+```
+排查步骤:
+1. 确认消息是否发布成功
+ 管理界面 → Exchanges → 查看 Message rates (Publish in)
+
+2. 确认 Binding 配置
+ 管理界面 → Exchanges → 选择 Exchange → 查看 Bindings
+
+3. 检查消息是否持久化
+ - Exchange durable: true
+ - Queue durable: true
+ - Message delivery_mode: 2
+
+4. 查看消费者 ACK 行为
+ - 是否正确 ACK
+ - 是否有 NACK/REJECT
+```
+
+##### 性能优化
+
+**优化消费速度**:
+
+```
+措施:
+1. 调整批处理参数
+ - batch_size: 当前 50,可根据消息大小调整到 30-100
+ - batch_timeout: 当前 3s,可根据实时性要求调整到 1-5s
+
+2. 调整 prefetch_count
+ - 当前: 100
+ - 可根据内存和消息处理速度调整到 50-200
+ - prefetch_count 应该是 batch_size 的 2-3 倍
+
+3. 优化批处理逻辑
+ - 使用数据库批量插入(INSERT ... VALUES (...), (...), ...)
+ - 批次内消息可以并发验证(适配器模式)
+ - 但必须单条 ACK 保证可靠性
+
+4. 优化业务逻辑
+ - 减少数据库查询
+ - 使用连接池
+ - 异步处理非关键操作
+
+5. 水平扩展(谨慎)
+ - 单队列单消费者模型,不建议同一队列多消费者
+ - 可考虑分片:不同数据类型独立扩展(orders/products 分别扩展)
+```
+
+**队列性能优化**:
+
+```bash
+# 启用 lazy queue (大量消息堆积时)
+rabbitmqctl set_policy lazy-queue "^(orders|products|refunds)\.queue$" \
+ '{"queue-mode":"lazy"}' --vhost dataflow-app
+
+# 效果:
+# - 消息存储在磁盘,减少内存占用
+# - 适用于消息堆积严重的场景
+```
+
+---
+
+### 4. RabbitMQ 管理员
+
+#### 职责
+
+- 初始化 RabbitMQ 配置
+- 创建 VHost、Exchange、Queue
+- 管理用户和权限
+- 新平台接入配置
+
+#### 核心操作
+
+##### 新平台接入
+
+当需要接入新平台(如 `aliexpress`)时,执行以下步骤:
+
+**步骤 1: 创建 Exchange**
+
+```bash
+# 业务 Exchange
+rabbitmqadmin -u admin -p declare exchange \
+ name="aliexpress.exchange" vhost=dataflow-app \
+ type=topic durable=true
+
+# 错误 Exchange
+rabbitmqadmin -u admin -p declare exchange \
+ name="aliexpress.errors.exchange" vhost=dataflow-app \
+ type=topic durable=true
+```
+
+**步骤 2: 创建 Binding**
+
+```bash
+# 绑定到业务队列(使用 {data_type}.{platform} 格式)
+rabbitmqadmin -u admin -p declare binding \
+ source="aliexpress.exchange" destination="orders.queue" \
+ vhost=dataflow-app routing_key="order.aliexpress"
+
+rabbitmqadmin -u admin -p declare binding \
+ source="aliexpress.exchange" destination="products.queue" \
+ vhost=dataflow-app routing_key="product.aliexpress"
+
+rabbitmqadmin -u admin -p declare binding \
+ source="aliexpress.exchange" destination="refunds.queue" \
+ vhost=dataflow-app routing_key="refund.aliexpress"
+
+rabbitmqadmin -u admin -p declare binding \
+ source="aliexpress.exchange" destination="inventory.queue" \
+ vhost=dataflow-app routing_key="inventory.aliexpress"
+
+# 绑定到错误队列
+rabbitmqadmin -u admin -p declare binding \
+ source="aliexpress.errors.exchange" destination="errors.queue" \
+ vhost=dataflow-app routing_key="#"
+```
+
+**步骤 3: 创建用户**
+
+```bash
+# 创建用户
+rabbitmqadmin -u admin -p declare user \
+ name="user_aliexpress" password="" tags=""
+
+# 配置权限
+rabbitmqadmin -u admin -p declare permission \
+ vhost=dataflow-app user="user_aliexpress" \
+ configure="" write="^aliexpress\.(exchange|errors\.exchange)$" \
+ read="^aliexpress\.errors\..*$"
+```
+
+**步骤 4: 提供给开发者**
+
+```
+连接信息:
+- Host:
+- Port: 5672
+- VHost: dataflow-app
+- Username: user_aliexpress
+- Password:
+
+Exchange: aliexpress.exchange
+Routing Keys:
+ - 订单: order.aliexpress
+ - 产品: product.aliexpress
+ - 退款: refund.aliexpress
+ - 库存: inventory.aliexpress
+
+错误订阅 (可选): aliexpress.errors.exchange
+```
+
+---
+
+## 监控和告警
+
+### 1. 关键监控指标
+
+```mermaid
+graph TB
+ subgraph "队列监控"
+ M1[messages_ready
待消费消息数]
+ M2[messages_unacknowledged
未确认消息数]
+ M3[message_stats.publish_in_rate
消息发布速率]
+ M4[message_stats.deliver_rate
消息消费速率]
+ end
+
+ subgraph "消费者监控"
+ C1[consumer_count
消费者数量]
+ C2[prefetch_count
预取数量]
+ C3[connection_state
连接状态]
+ end
+
+ subgraph "错误监控"
+ E1[errors.queue 消息数
错误队列堆积]
+ E2[failed_messages 表记录数
失败消息总数]
+ E3[每小时错误增长率
错误频率]
+ end
+
+ subgraph "系统监控"
+ S1[disk_free
磁盘剩余空间]
+ S2[mem_used
内存使用率]
+ S3[fd_used
文件描述符使用]
+ end
+
+ style M1 fill:#ffcdd2
+ style M2 fill:#ffcdd2
+ style E1 fill:#ffcdd2
+ style E2 fill:#ffcdd2
+```
+
+### 2. 告警规则
+
+| 指标 | 告警阈值 | 级别 | 处理建议 |
+|------|---------|------|---------|
+| `messages_ready` | > 10,000 | 警告 | 增加消费者 |
+| `messages_ready` | > 50,000 | 严重 | 紧急扩容 |
+| `messages_unacknowledged` | > 5,000 | 警告 | 检查消费者处理速度 |
+| `consumer_count` | = 0 | 严重 | 消费者服务挂掉 |
+| `publish_rate - deliver_rate` | > 1000/s | 警告 | 消费速度跟不上生产速度 |
+| `errors.queue` 消息数 | > 1,000 | 警告 | 大量消息处理失败 |
+| 单平台小时错误数 | > 100 | 警告 | 平台数据质量问题 |
+| `disk_free` | < 10 GB | 严重 | 清理磁盘或扩容 |
+| `mem_used` | > 80% | 警告 | 优化或增加内存 |
+
+### 3. 监控实现
+
+**Prometheus + Grafana**:
+
+```yaml
+# prometheus.yml
+scrape_configs:
+ - job_name: 'rabbitmq'
+ static_configs:
+ - targets: ['rabbitmq-exporter:9419']
+ metrics_path: /metrics
+```
+
+**告警规则示例**:
+
+```yaml
+# alert_rules.yml
+groups:
+ - name: rabbitmq_alerts
+ rules:
+ - alert: QueueBacklog
+ expr: rabbitmq_queue_messages_ready{queue=~"orders|products|refunds"} > 10000
+ for: 5m
+ labels:
+ severity: warning
+ annotations:
+ summary: "队列 {{ $labels.queue }} 堆积严重"
+ description: "当前待消费消息数: {{ $value }}"
+
+ - alert: NoConsumers
+ expr: rabbitmq_queue_consumers{queue=~"orders|products|refunds"} == 0
+ for: 1m
+ labels:
+ severity: critical
+ annotations:
+ summary: "队列 {{ $labels.queue }} 无消费者"
+ description: "消费者服务可能已挂掉"
+```
+
+---
+
+## 故障恢复
+
+### 1. 消费者服务挂掉
+
+```mermaid
+graph LR
+ A[检测到消费者断开] --> B[消息堆积在队列中]
+ B --> C[告警通知运维]
+ C --> D[重启消费者服务]
+ D --> E[消费者自动重连]
+ E --> F[继续消费堆积消息]
+ F --> G{消息是否过期?}
+ G -->|未过期| H[正常处理]
+ G -->|已过期| I[消息被 TTL 删除]
+
+ style A fill:#ffcdd2
+ style C fill:#fff9c4
+ style H fill:#c8e6c9
+ style I fill:#ffab91
+```
+
+**恢复时间**:
+- 自动重连: 秒级
+- 手动重启: 分钟级
+- 数据不会丢失(消息持久化)
+
+### 2. RabbitMQ 服务重启
+
+```mermaid
+graph TB
+ A[RabbitMQ 重启] --> B[持久化资源自动恢复]
+ B --> C[Exchanges 恢复]
+ B --> D[Queues 恢复]
+ B --> E[Bindings 恢复]
+ B --> F[持久化消息恢复]
+
+ A --> G[生产者重连]
+ A --> H[消费者重连]
+
+ G --> I[继续发布消息]
+ H --> J[继续消费消息]
+
+ style A fill:#ffcdd2
+ style C fill:#c8e6c9
+ style D fill:#c8e6c9
+ style E fill:#c8e6c9
+ style F fill:#c8e6c9
+```
+
+**数据安全性**:
+- ✅ 持久化 Exchange 自动恢复
+- ✅ 持久化 Queue 自动恢复
+- ✅ 持久化消息自动恢复
+- ❌ 未 ACK 的消息会重新投递(可能重复消费,需幂等性保证)
+
+### 3. 数据库连接失败
+
+```mermaid
+graph TB
+ A[数据库连接失败] --> B{retry_count < 3?}
+ B -->|是| C[延迟重试
60s / 120s / 240s]
+ C --> D[重新入队]
+ D --> E[等待下次消费]
+ E --> B
+
+ B -->|否| F[超过重试次数]
+ F --> G[发送到错误队列]
+ G --> H[人工介入处理]
+
+ style A fill:#ffcdd2
+ style C fill:#fff9c4
+ style G fill:#ffab91
+ style H fill:#ce93d8
+```
+
+---
+
+## 最佳实践
+
+### 1. 生产者最佳实践
+
+- ✅ **使用结构化 message_id**:格式 `{type}#{app}#{company}#{platform}#{store}#{type}#{unique_id}`
+- ✅ **必须提供 data_version**:从平台数据中提取更新时间戳,用于防止乱序更新
+- ✅ **消息持久化**:`delivery_mode=2`
+- ✅ **包含完整 raw_data**:保留平台原始完整数据,便于后续数据补全和问题排查
+- ✅ **使用正确的 routing_key**:`order`/`product`/`refund`/`inventory`
+- ✅ **实现发布确认**:等待 RabbitMQ 确认消息已接收
+- ✅ **实现重连机制**:网络故障时自动重连
+- ❌ **不要发送超大消息**(> 1 MB):考虑分片或使用对象存储 + 引用
+
+### 2. 消费者最佳实践
+
+- ✅ **单消费者模型**:每个队列一个消费者实例,避免消息分散
+- ✅ **批处理机制**:使用 prefetch + 批处理提升吞吐(batch_size=50, prefetch_count=100)
+- ✅ **适配器模式**:消费者内部根据平台字段分发到不同业务逻辑
+- ✅ **手动 ACK**:单条确认,失败消息不影响其他消息
+- ✅ **幂等性保证**:使用 `processed_messages` 表记录已处理的 message_id
+- ✅ **版本控制**:使用 `data_version` 字段防止乱序更新(批处理内部可能并发)
+- ✅ **事务处理**:在一个事务中完成幂等性检查 + 业务数据更新
+- ✅ **DLX 重试机制**:失败消息通过 nack(requeue=false) 进入 DLX,延迟 5 秒后自动重试
+- ✅ **错误分类处理**:区分可重试错误(临时)和业务错误(永久)
+- ✅ **限制重试次数**:最多 3 次,通过消息 header `x-retries` 控制
+- ✅ **批量数据库操作**:批次内使用批量插入提升性能
+- ✅ **记录详细日志**:便于故障排查
+- ✅ **定期清理 processed_messages**:保留 30 天历史记录即可
+- ❌ **不要同队列多消费者**:会导致消息 Round-Robin 分配,破坏设计
+
+### 3. 运维最佳实践
+
+- ✅ **定期备份配置**:使用 `rabbitmqadmin export`
+- ✅ **监控告警**:配置 Prometheus + Grafana
+- ✅ **日志归档**:定期清理 `failed_messages` 表
+- ✅ **压力测试**:定期测试消息吞吐能力
+- ✅ **灾难恢复演练**:定期演练故障恢复流程
+- ✅ **文档更新**:及时更新配置文档
+
+---
+
+## 附录
+
+### A. 消息示例
+
+#### 订单消息
+
+```json
+{
+ "message_id": "order#dataflow#100#20#200#order#DY202501140001",
+ "timestamp": "2025-01-14T10:30:00Z",
+ "platform": "douyin",
+ "data_type": "order",
+ "metadata": {
+ "platform_id": 20,
+ "company_id": 100,
+ "store_id": 200,
+ "source_system": "douyin-open-api",
+ "retry_count": 0,
+ "data_version": 1736840000
+ },
+ "data": {
+ "platform_unique_id": "DY202501140001",
+ "raw_data": {
+ "order_id": "DY202501140001",
+ "create_time": 1736839200,
+ "pay_time": 1736840000,
+ "update_time": 1736840000,
+ "order_amount": 9999,
+ "pay_amount": 9999,
+ "order_status": 2,
+ "receiver_info": {
+ "province": "北京市",
+ "city": "北京市",
+ "district": "朝阳区",
+ "detail": "某某街道123号"
+ }
+ }
+ }
+}
+```
+
+#### 产品消息
+
+```json
+{
+ "message_id": "product#dataflow#100#2#201#product#TM-PROD-789",
+ "timestamp": "2025-01-14T11:00:00Z",
+ "platform": "tmall",
+ "data_type": "product",
+ "metadata": {
+ "platform_id": 2,
+ "company_id": 100,
+ "store_id": 201,
+ "source_system": "tmall-top-api",
+ "retry_count": 0,
+ "data_version": 1736841600
+ },
+ "data": {
+ "platform_unique_id": "TM-PROD-789",
+ "raw_data": {
+ "num_iid": 789123456789,
+ "title": "示例产品",
+ "price": "499.00",
+ "type": "fixed",
+ "modified": "2025-01-14 11:00:00",
+ "sku_list": [
+ {
+ "sku_id": 987654321,
+ "price": "499.00",
+ "quantity": 100
+ }
+ ]
+ }
+ }
+}
+```
+
+### B. 参考文档
+
+- [RabbitMQ 配置方案](./RabbitMQ.md)
+- [TimescaleDB 数据查询方案](./data_query.md)
+- [RabbitMQ 官方文档](https://www.rabbitmq.com/documentation.html)
+- [AMQP 0-9-1 协议](https://www.rabbitmq.com/amqp-0-9-1-reference.html)