Files
datahub/docs/data_flow.md
T
2026-02-28 10:38:33 +08:00

41 KiB
Raw Blame History

数据流转架构

概述

本文档描述电商数据从平台采集到入库的完整流程,包括各系统角色、职责和操作说明。

系统角色

角色 职责 系统
数据生产者 采集电商平台数据并推送到 MQ 各平台采集服务
消息队列 数据路由、缓冲、隔离 RabbitMQ
数据消费者 消费 MQ 消息并入库 Datahub App
存储层 持久化存储和统计分析 PostgreSQL + TimescaleDB

整体架构

graph TB
    subgraph "数据生产者层"
        P1[DouYin 采集服务]
        P2[Tmall 采集服务]
        P3[RedBook 采集服务]
        P4[其他平台采集服务...]
    end

    subgraph "RabbitMQ 消息队列"
        subgraph "VHost: datahub-app"
            subgraph "业务 Exchanges"
                E1[douyin.exchange]
                E2[tmall.exchange]
                E3[redbook.exchange]
                E4[其他.exchange...]
            end

            subgraph "业务 Queues"
                Q1[orders.queue<br/>DLX: dlx.orders]
                Q2[products.queue<br/>DLX: dlx.products]
                Q3[refunds.queue<br/>DLX: dlx.refunds]
                Q4[inventory.queue<br/>DLX: dlx.inventory]
            end

            subgraph "DLX 死信交换机"
                DLX1[dlx.orders]
                DLX2[dlx.products]
                DLX3[dlx.refunds]
                DLX4[dlx.inventory]
            end

            subgraph "重试队列"
                RQ1[orders.retry.queue<br/>TTL: 5s]
                RQ2[products.retry.queue<br/>TTL: 5s]
                RQ3[refunds.retry.queue<br/>TTL: 5s]
                RQ4[inventory.retry.queue<br/>TTL: 5s]
            end

            subgraph "错误处理"
                EE1[douyin.errors.exchange]
                EE2[tmall.errors.exchange]
                EE3[redbook.errors.exchange]
                EQ[errors.queue]
            end
        end
    end

    subgraph "Datahub 消费者"
        C1[OrderConsumer]
        C2[ProductConsumer]
        C3[RefundConsumer]
        C4[InventoryConsumer]
        C5[ErrorConsumer]
    end

    subgraph "存储层"
        DB1[(orders 表<br/>TimescaleDB)]
        DB2[(products 表)]
        DB3[(refunds 表)]
        DB4[(inventory 表)]
        DB5[(failed_messages 表)]
    end

    subgraph "聚合视图"
        AGG1[orders_daily_by_created]
        AGG2[orders_daily_by_paid]
    end

    P1 -->|推送订单/产品/退款/库存| E1
    P2 -->|推送订单/产品/退款/库存| E2
    P3 -->|推送订单/产品/退款/库存| E3
    P4 -->|推送订单/产品/退款/库存| E4

    E1 & E2 & E3 & E4 -->|routing_key: order.platform| Q1
    E1 & E2 & E3 & E4 -->|routing_key: product.platform| Q2
    E1 & E2 & E3 & E4 -->|routing_key: refund.platform| Q3
    E1 & E2 & E3 & E4 -->|routing_key: inventory.platform| Q4

    Q1 --> C1
    Q2 --> C2
    Q3 --> C3
    Q4 --> C4

    C1 -->|成功| DB1
    C2 -->|成功| DB2
    C3 -->|成功| DB3
    C4 -->|成功| DB4

    C1 -.->|失败 nack| DLX1
    C2 -.->|失败 nack| DLX2
    C3 -.->|失败 nack| DLX3
    C4 -.->|失败 nack| DLX4

    DLX1 -->|x-retries<3| RQ1
    DLX2 -->|x-retries<3| RQ2
    DLX3 -->|x-retries<3| RQ3
    DLX4 -->|x-retries<3| RQ4

    RQ1 -.->|TTL 到期<br/>回到主队列| Q1
    RQ2 -.->|TTL 到期<br/>回到主队列| Q2
    RQ3 -.->|TTL 到期<br/>回到主队列| Q3
    RQ4 -.->|TTL 到期<br/>回到主队列| Q4

    DLX1 & DLX2 & DLX3 & DLX4 -->|x-retries>=3| EQ
    C1 & C2 & C3 & C4 -.->|永久失败| EE1 & EE2 & EE3
    EE1 & EE2 & EE3 --> EQ
    EQ --> C5
    C5 --> DB5

    DB1 --> AGG1
    DB1 --> AGG2

    style P1 fill:#e1f5dd
    style P2 fill:#e1f5dd
    style P3 fill:#e1f5dd
    style P4 fill:#e1f5dd
    style E1 fill:#fff4e6
    style E2 fill:#fff4e6
    style E3 fill:#fff4e6
    style E4 fill:#fff4e6
    style Q1 fill:#e3f2fd
    style Q2 fill:#e3f2fd
    style Q3 fill:#e3f2fd
    style Q4 fill:#e3f2fd
    style DLX1 fill:#ffe0b2
    style DLX2 fill:#ffe0b2
    style DLX3 fill:#ffe0b2
    style DLX4 fill:#ffe0b2
    style RQ1 fill:#fff9c4
    style RQ2 fill:#fff9c4
    style RQ3 fill:#fff9c4
    style RQ4 fill:#fff9c4
    style C1 fill:#f3e5f5
    style C2 fill:#f3e5f5
    style C3 fill:#f3e5f5
    style C4 fill:#f3e5f5
    style DB1 fill:#fce4ec
    style AGG1 fill:#ffebee
    style AGG2 fill:#ffebee

RabbitMQ 内部架构

业务数据流转

graph LR
    subgraph "生产者权限隔离"
        PD[DouYin 开发者<br/>user_douyin]
        PT[Tmall 开发者<br/>user_tmall]
        PR[RedBook 开发者<br/>user_redbook]
    end

    subgraph "平台 Exchanges (Topic)"
        ED[douyin.exchange]
        ET[tmall.exchange]
        ER[redbook.exchange]
    end

    subgraph "数据类型 Queues"
        QO[orders.queue<br/>TTL: 24h]
        QP[products.queue<br/>TTL: 24h]
        QR[refunds.queue<br/>TTL: 24h]
        QI[inventory.queue<br/>TTL: 24h]
    end

    subgraph "消费者"
        CO[OrderConsumer<br/>user_datahub_consumer]
    end

    PD -->|write only| ED
    PT -->|write only| ET
    PR -->|write only| ER

    ED -->|order.douyin| QO
    ED -->|product.douyin| QP
    ED -->|refund.douyin| QR
    ED -->|inventory.douyin| QI

    ET -->|order.tmall| QO
    ET -->|product.tmall| QP
    ET -->|refund.tmall| QR
    ET -->|inventory.tmall| QI

    ER -->|order.redbook| QO
    ER -->|product.redbook| QP
    ER -->|refund.redbook| QR
    ER -->|inventory.redbook| QI

    QO -->|read| CO
    QP -->|read| CO
    QR -->|read| CO
    QI -->|read| CO

    style PD fill:#c8e6c9
    style PT fill:#c8e6c9
    style PR fill:#c8e6c9
    style ED fill:#fff9c4
    style ET fill:#fff9c4
    style ER fill:#fff9c4
    style QO fill:#bbdefb
    style QP fill:#bbdefb
    style QR fill:#bbdefb
    style QI fill:#bbdefb
    style CO fill:#d1c4e9

错误处理架构

graph TB
    subgraph "消费者错误产生"
        C1[OrderConsumer]
        C2[ProductConsumer]
        C3[RefundConsumer]
        C4[InventoryConsumer]
    end

    subgraph "平台错误 Exchanges"
        ED[douyin.errors.exchange]
        ET[tmall.errors.exchange]
        ER[redbook.errors.exchange]
    end

    subgraph "统一错误队列"
        EQ[errors.queue<br/>TTL: 7 days]
    end

    subgraph "错误消息消费者"
        EC[ErrorConsumer<br/>user_ops]
        DB[(failed_messages 表)]
    end

    subgraph "平台开发者监控"
        PD[DouYin 开发者<br/>订阅 douyin.errors.exchange]
        PT[Tmall 开发者<br/>订阅 tmall.errors.exchange]
    end

    C1 -->|判断平台| ED
    C1 -->|判断平台| ET
    C1 -->|判断平台| ER

    C2 -->|判断平台| ED
    C2 -->|判断平台| ET
    C2 -->|判断平台| ER

    C3 -->|判断平台| ED
    C3 -->|判断平台| ET
    C3 -->|判断平台| ER

    C4 -->|判断平台| ED
    C4 -->|判断平台| ET
    C4 -->|判断平台| ER

    ED -->|routing_key: #| EQ
    ET -->|routing_key: #| EQ
    ER -->|routing_key: #| EQ

    EQ --> EC
    EC --> DB

    ED -.->|read only| PD
    ET -.->|read only| PT

    style C1 fill:#ffccbc
    style C2 fill:#ffccbc
    style C3 fill:#ffccbc
    style C4 fill:#ffccbc
    style ED fill:#ffe0b2
    style ET fill:#ffe0b2
    style ER fill:#ffe0b2
    style EQ fill:#ffcdd2
    style EC fill:#f8bbd0
    style PD fill:#c5e1a5
    style PT fill:#c5e1a5

关键技术方案

1. 乱序消费与数据一致性

为什么必须支持乱序消费?

系统设计上必须支持乱序消费,原因如下:

批处理内部并发

  • 单消费者使用批处理模式(prefetch_count=100batch_size=50
  • 批次内的消息可能并发处理,无法保证严格顺序
  • 同一批次中不同平台的消息会并发入库

公平性需求

  • 多个平台的消息进入同一队列(orders.queue
  • 批处理时优先处理简单消息,避免复杂消息阻塞整个批次
  • 示例:DouYin 的大量消息和 Tmall 的消息混合在同一批次中并发处理

重试消息的乱序

  • 失败消息通过 DLX 延迟重试后重新进入队列
  • 重试消息可能与新消息乱序
  • 必须防止旧版本数据覆盖新版本

乱序消费带来的挑战

核心问题:同一订单的多次更新消息可能乱序到达

时间 T1: 订单创建,金额 100 元
  → message_1: {platform_order_id: "DY123", amount: 100, data_version: 1736840000}

时间 T2: 订单更新,金额 200 元
  → message_2: {platform_order_id: "DY123", amount: 200, data_version: 1736840100}

乱序消费(正常现象):
  Consumer A 先消费 message_2 → 数据库中金额为 200 ✅
  Consumer B 后消费 message_1 → 如果没有版本控制,会覆盖为 100 ❌ 错误!

注意

  • 不同订单的消息乱序消费 → 完全没问题
  • 不同平台的消息乱序消费 → 完全没问题
  • 同一订单的旧版本覆盖新版本 → 需要防止

重要说明: 虽然采用单消费者模型,但以下场景仍会导致乱序:

  1. 批处理内部:批次中的 50 条消息可能并发验证和入库
  2. 重试消息:失败消息经过 DLX 延迟 5 秒后回到队列,会与新消息混合
  3. 数据库层面:即使消费顺序正确,数据库事务提交顺序也可能不同

因此,data_version 机制是必需的,用于在数据库层面保证数据一致性。

解决方案:数据版本控制

方案:在消息中增加 data_version 字段(Unix 时间戳),确保只有更新的数据才能写入数据库

-- 数据库表增加 data_version 字段
ALTER TABLE orders ADD COLUMN data_version BIGINT NOT NULL DEFAULT 0;

-- 消费时只有更新的数据才写入
INSERT INTO orders (
  platform_id, platform_order_id, total_amount, data_version, ...
) VALUES (20, 'DY123', 200, 1736840100, ...)
ON CONFLICT (platform_id, platform_order_id)
DO UPDATE SET
  total_amount = EXCLUDED.total_amount,
  data_version = EXCLUDED.data_version,
  updated_at = NOW()
WHERE orders.data_version < EXCLUDED.data_version;  -- 关键:只有版本更新才更新

工作流程

message_2 先到达 (data_version: 1736840100):
  INSERT → 数据库: amount=200, data_version=1736840100 ✅

message_1 后到达 (data_version: 1736840000):
  ON CONFLICT DO UPDATE
  WHERE orders.data_version < 1736840000  → 条件不满足,不更新 ✅
  数据库仍然是: amount=200, data_version=1736840100

消息格式要求

{
  "message_id": "order#datahub#100#20#200#order#DY123",
  "metadata": {
    "data_version": 1736840100  // 必需:平台数据的最后更新时间戳
  },
  "data": {
    "platform_unique_id": "DY123",
    "raw_data": {
      "update_time": 1736840100  // 从平台原始数据中提取
    }
  }
}

2. 幂等性保证(无 Redis 依赖)

方案:数据库表记录已处理消息

创建 processed_messages 表记录所有已处理的 message_id:

CREATE TABLE processed_messages (
  message_id VARCHAR(200) PRIMARY KEY,
  platform_id INT NOT NULL,
  data_type VARCHAR(50) NOT NULL,
  processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 索引:加速查询
CREATE INDEX idx_processed_messages_created ON processed_messages (created_at);

-- 自动清理 30 天前的记录(可选)
CREATE INDEX idx_processed_messages_cleanup ON processed_messages (created_at)
  WHERE created_at < NOW() - INTERVAL '30 days';

消费流程

1. 收到消息,提取 message_id

2. 检查是否已处理:
   SELECT 1 FROM processed_messages WHERE message_id = ?

   如果存在 → ACK 消息,跳过处理(幂等性)

3. 开启事务:
   BEGIN;

   3.1 插入 processed_messages:
       INSERT INTO processed_messages (message_id, platform_id, data_type)
       VALUES (?, ?, ?)
       ON CONFLICT (message_id) DO NOTHING;  -- 防止并发插入

   3.2 插入/更新业务数据:
       INSERT INTO orders (...) VALUES (...)
       ON CONFLICT (platform_id, platform_order_id)
       DO UPDATE SET ...
       WHERE orders.data_version < EXCLUDED.data_version;

   COMMIT;

4. ACK 消息

性能优化

问题:每次消费都查询 processed_messages 可能影响性能

优化方案

-- 1. 使用 PostgreSQL 的 ON CONFLICT 避免先查询
INSERT INTO processed_messages (message_id, platform_id, data_type)
VALUES (?, ?, ?)
ON CONFLICT (message_id) DO NOTHING
RETURNING message_id;

-- 如果 RETURNING 无结果 → 说明已处理,直接返回
-- 如果 RETURNING 有结果 → 继续处理业务数据

-- 2. 定期清理历史记录(减少表大小)
DELETE FROM processed_messages
WHERE created_at < NOW() - INTERVAL '30 days';

与数据版本控制结合

完整的幂等性 + 乱序处理流程:

BEGIN;

-- 1. 记录消息已处理(幂等性)
INSERT INTO processed_messages (message_id, platform_id, data_type)
VALUES ('order#datahub#100#20#200#order#DY123', 20, 'order')
ON CONFLICT (message_id) DO NOTHING
RETURNING message_id;

-- 如果返回空,说明重复消息,直接 ROLLBACK 并 ACK
-- 如果返回值,继续处理

-- 2. 更新业务数据(带版本控制,防止乱序)
INSERT INTO orders (
  platform_id, platform_order_id, total_amount, data_version, created_at, updated_at
) VALUES (20, 'DY123', 200, 1736840100, NOW(), NOW())
ON CONFLICT (platform_id, platform_order_id)
DO UPDATE SET
  total_amount = EXCLUDED.total_amount,
  data_version = EXCLUDED.data_version,
  updated_at = NOW()
WHERE orders.data_version < EXCLUDED.data_version;

COMMIT;

-- 3. ACK 消息

数据流转流程

1. 订单数据完整流程

sequenceDiagram
    participant P as DouYin 采集服务
    participant E as douyin.exchange
    participant Q as orders.queue
    participant C as OrderConsumer
    participant DB as PostgreSQL
    participant PM as processed_messages 表
    participant AGG as 连续聚合视图

    Note over P: 1. 数据采集阶段
    P->>P: 从 DouYin API 获取订单数据
    P->>P: 生成 message_id<br/>order#datahub#100#20#200#order#DY123<br/>提取 data_version

    Note over P,E: 2. 消息发布阶段
    P->>E: 发布消息<br/>routing_key: order<br/>持久化消息
    E->>Q: 路由到订单队列

    Note over Q,C: 3. 消息消费阶段
    Q->>C: 推送消息 (prefetch 10)
    C->>C: 验证消息格式
    C->>C: 验证 message_id 结构

    Note over C,DB: 4. 幂等性检查 + 数据入库 (事务)
    C->>DB: BEGIN TRANSACTION

    C->>PM: INSERT INTO processed_messages<br/>ON CONFLICT DO NOTHING<br/>RETURNING message_id

    alt 返回空 (重复消息)
        PM-->>C: 无返回值
        C->>DB: ROLLBACK
        C->>Q: ACK (跳过处理)
    else 返回值 (新消息)
        PM-->>C: 返回 message_id

        Note over C,DB: 5. 更新业务数据 (带版本控制)
        C->>DB: INSERT INTO orders<br/>ON CONFLICT DO UPDATE<br/>WHERE data_version < new_version
        DB-->>C: 更新成功

        C->>DB: COMMIT
        C->>Q: ACK (确认消息)

        Note over DB,AGG: 6. 自动聚合 (后台)
        DB->>AGG: TimescaleDB 后台任务<br/>每小时刷新聚合视图
    end

2. 错误处理流程

sequenceDiagram
    participant P as Tmall 采集服务
    participant E as tmall.exchange
    participant Q as orders.queue
    participant C as OrderConsumer
    participant EE as tmall.errors.exchange
    participant EQ as errors.queue
    participant EC as ErrorConsumer
    participant DB as failed_messages 表
    participant Alert as 告警系统

    P->>E: 发布订单消息
    E->>Q: 路由到订单队列
    Q->>C: 推送消息

    Note over C: 消费处理
    C->>C: 验证消息格式
    alt 数据格式错误 (validation)
        C->>C: 捕获 ValidationException
        C->>EE: 发布错误消息<br/>routing_key: validation
        EE->>EQ: 路由到错误队列
        C->>Q: ACK (从业务队列移除)

        Note over EQ,EC: 错误消息处理
        EQ->>EC: 推送错误消息
        EC->>DB: 记录错误日志
        EC->>Alert: 判断是否需要告警
        alt 满足告警条件
            Alert->>Alert: 发送 Slack/Email 通知
        end
        EC->>EQ: ACK

    else 处理成功
        C->>C: 入库成功
        C->>Q: ACK
    end

3. 消息重试流程(DLX + 延迟重试队列)

sequenceDiagram
    participant Q as orders.queue
    participant C as OrderConsumer
    participant DB as PostgreSQL
    participant DLX as dlx.orders
    participant RQ as orders.retry.queue
    participant EQ as errors.queue

    Q->>C: 推送消息 (x-retries: 0)

    Note over C,DB: 第 1 次处理
    C->>DB: 尝试入库
    DB-->>C: 数据库连接失败

    C->>C: 更新 header: x-retries = 1
    C->>C: nack(requeue=false)
    C-->>DLX: 消息进入 DLX

    Note over DLX: 根据 x-retries 路由
    DLX->>RQ: x-retries < 3<br/>routing_key: retry

    Note over RQ: TTL = 5 秒
    Note over RQ: 5 秒后自动死信

    RQ-->>Q: TTL 到期<br/>回到主队列

    Note over Q,C: 第 2 次处理
    Q->>C: 推送消息 (x-retries: 1)
    C->>DB: 尝试入库
    DB-->>C: 数据库连接失败

    C->>C: 更新 header: x-retries = 2
    C->>C: nack(requeue=false)
    C-->>DLX: 消息进入 DLX
    DLX->>RQ: x-retries < 3

    Note over RQ: 5 秒后自动死信
    RQ-->>Q: 回到主队列

    Note over Q,C: 第 3 次处理
    Q->>C: 推送消息 (x-retries: 2)
    C->>DB: 尝试入库
    DB-->>C: 数据库连接失败

    C->>C: 更新 header: x-retries = 3
    C->>C: nack(requeue=false)
    C-->>DLX: 消息进入 DLX

    Note over DLX: x-retries >= 3<br/>超过重试次数
    DLX->>EQ: routing_key: error<br/>进入错误队列

各角色职责和操作

1. 平台开发者(数据生产者)

职责

  • 从电商平台 API 采集数据(订单/产品/退款)
  • 将数据转换为标准消息格式
  • 推送消息到 RabbitMQ
  • 监控自己平台的错误消息

权限

操作 权限 说明
连接到 VHost datahub-app 只能访问此 VHost
发布消息 {platform}.exchange 只能写入自己平台的 Exchange
订阅错误 {platform}.errors.exchange 可选,接收本平台错误通知

操作流程

步骤 1: 生成 message_id
格式: {entity_type}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{platform_unique_id}

示例:
- order#datahub#100#20#200#order#DY123456789
- product#datahub#100#2#201#product#TM-PROD789
- refund#datahub#100#20#200#refund#DY-REF456
- inventory#datahub#100#2#201#inventory#TM-SKU123

生成逻辑

# 伪代码
message_id = f"{data_type}#datahub#{company_id}#{platform_id}#{store_id}#{data_type}#{platform_order_id}"

# 实际示例(DouYin 订单)
message_id = f"order#datahub#100#20#200#order#{order_data['order_id']}"
步骤 2: 提取 data_version

重要:必须从平台原始数据中提取更新时间戳作为 data_version

# 示例:从 DouYin API 响应提取
douyin_order = {
    "order_id": "DY202501140001",
    "create_time": 1736839200,
    "update_time": 1736840000,  # ← 使用这个作为 data_version
    ...
}

data_version = douyin_order.get('update_time') or douyin_order.get('create_time')

# 如果平台未提供更新时间,使用当前时间
if not data_version:
    data_version = int(time.time())
步骤 3: 构建消息体
{
  "message_id": "order#datahub#100#20#200#order#DY123456",
  "timestamp": "2025-01-14T10:30:00Z",
  "platform": "douyin",
  "data_type": "order",
  "metadata": {
    "platform_id": 20,
    "company_id": 100,
    "store_id": 200,
    "source_system": "douyin-open-api",
    "retry_count": 0,
    "data_version": 1736840000
  },
  "data": {
    "platform_unique_id": "DY123456",
    "raw_data": {
      // 平台原始完整 JSON 数据(必须包含更新时间)
      "order_id": "DY123456",
      "update_time": 1736840000,
      ...
    }
  }
}
步骤 4: 发布消息

连接参数

  • Host: <rabbitmq-host>
  • Port: 5672
  • VHost: datahub-app
  • Username: user_{platform}
  • Password: <provided_password>

发布配置

  • Exchange: {platform}.exchange
  • Routing Key: {data_type}.{platform} 格式
    • 订单: order.{platform} (如 order.douyin, order.tmall)
    • 产品: product.{platform} (如 product.douyin, product.tmall)
    • 退款: refund.{platform} (如 refund.douyin, refund.tmall)
    • 库存: inventory.{platform} (如 inventory.douyin, inventory.tmall)
  • Delivery Mode: 2 (持久化)
步骤 4: 监控错误(可选)

订阅配置

  • Exchange: {platform}.errors.exchange
  • Routing Key: # (接收所有错误类型)
  • 创建临时队列或持久队列用于接收错误消息

错误消息示例

{
  "error_id": "err_1234567890",
  "original_message": { /* 原始消息 */ },
  "error": {
    "type": "validation",
    "message": "Missing required field: total_amount",
    "timestamp": "2025-01-14T10:31:00Z"
  },
  "metadata": {
    "platform": "amazon",
    "platform_id": 1,
    "store_id": 200,
    "failed_at": "2025-01-14T10:31:00Z"
  }
}

2. Datahub 应用(数据消费者)

职责

  • 消费 RabbitMQ 业务队列中的消息
  • 使用批处理模式提升吞吐量(prefetch=100, batch_size=50
  • 通过适配器模式支持多平台数据处理
  • 验证消息格式和数据完整性
  • 将数据入库到 PostgreSQL
  • 处理失败消息,通过 DLX 延迟重试或进入错误队列
  • 实现幂等性,避免重复处理

消费者架构模型

单消费者 + 批处理 + 适配器模式

OrderConsumer (单实例)
├── RabbitMQ Channel
│   └── prefetch_count: 100 (最多缓存 100 条未确认消息)
│
├── 消息缓冲区 (Message Buffer)
│   ├── 接收 RabbitMQ 推送的消息
│   ├── 达到 batch_size=50 或 timeout=3s 触发处理
│   └── 缓冲区最大容量: 100 条
│
├── 批处理引擎 (Batch Processor)
│   ├── 遍历缓冲区中的消息
│   ├── 对每条消息调用对应的适配器
│   └── 批量入库 + 单条 ACK
│
└── 平台适配器 (Platform Adapters)
    ├── DouyinAdapter: 处理抖音订单数据
    ├── TmallAdapter: 处理天猫订单数据
    ├── JDAdapter: 处理京东订单数据
    └── ... 其他平台适配器

工作流程:
1. RabbitMQ 推送消息到消费者 (最多 100 条未确认)
2. 消费者缓存消息到缓冲区
3. 达到 batch_size 或 timeout 后触发批处理
4. 批处理引擎遍历消息,根据 platform 字段调用对应适配器
5. 适配器验证和转换数据
6. 批量插入数据库
7. 逐条 ACK/NACK 消息

** 消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **

权限

操作 权限 说明
连接到 VHost datahub-app 访问业务 VHost
读取队列 orders.queue, products.queue, refunds.queue 消费业务消息
写入错误 Exchange *.errors.exchange 发送错误消息到对应平台的错误 Exchange
管理队列 orders.queue, products.queue, refunds.queue 配置消费者参数

核心操作

消息消费配置

推荐参数

  • prefetch_count: 100 - RabbitMQ 最多推送 100 条未确认消息
  • batch_size: 50 - 消费者积累 50 条消息后触发批处理
  • batch_timeout: 3000 ms - 如果 3 秒内未达到 batch_size,也触发处理
  • 消费者模型: 单消费者 + 批处理 + 适配器模式(每个队列一个消费者实例)
  • ACK 模式: 手动 ACK,单条确认(处理成功后才确认)
  • 重连机制: 自动重连,指数退避
幂等性保证(数据库方案)

方案:使用 processed_messages 表记录已处理的 message_id

-- 1. 创建幂等性记录表
CREATE TABLE processed_messages (
  message_id VARCHAR(200) PRIMARY KEY,
  platform_id INT NOT NULL,
  data_type VARCHAR(50) NOT NULL,
  processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_processed_messages_created ON processed_messages (created_at);

-- 2. 定期清理历史记录(可选,保留 30 天)
DELETE FROM processed_messages WHERE created_at < NOW() - INTERVAL '30 days';

消费流程

-- 在事务中执行
BEGIN;

-- 步骤 1: 尝试插入 processed_messages(幂等性检查)
INSERT INTO processed_messages (message_id, platform_id, data_type)
VALUES ('order#datahub#100#20#200#order#DY123', 20, 'order')
ON CONFLICT (message_id) DO NOTHING
RETURNING message_id;

-- 如果返回空 → 说明是重复消息,ROLLBACK 并 ACK
-- 如果返回值 → 继续处理

-- 步骤 2: 插入/更新业务数据(带版本控制)
INSERT INTO orders (
  platform_id, platform_order_id, company_id, store_id,
  total_amount, data_version, raw, created_at, updated_at
) VALUES (20, 'DY123', 100, 200, 9999, 1736840000, '...'::jsonb, NOW(), NOW())
ON CONFLICT (platform_id, platform_order_id)
DO UPDATE SET
  total_amount = EXCLUDED.total_amount,
  data_version = EXCLUDED.data_version,
  raw = EXCLUDED.raw,
  updated_at = NOW()
WHERE orders.data_version < EXCLUDED.data_version;  -- 只有更新的版本才更新

COMMIT;

关键点

  • 无需 Redis 依赖
  • 事务保证原子性
  • ON CONFLICT DO NOTHING 实现幂等性
  • WHERE data_version < EXCLUDED.data_version 防止乱序更新
错误处理逻辑
处理流程:
1. 验证消息格式
   ├─ 格式正确 → 继续
   └─ 格式错误 → 发送到错误队列 (validation 类型), ACK

2. 幂等性检查
   ├─ 已处理 → 直接 ACK
   └─ 未处理 → 继续

3. 业务处理
   ├─ 成功 → 标记已处理, ACK
   └─ 失败 (异常)
       ├─ 数据库错误 / 网络错误
       │   ├─ retry_count < 3 → 重新发布 (延迟重试), ACK
       │   └─ retry_count >= 3 → 发送到错误队列 (processing_failed), ACK
       └─ 业务逻辑错误 → 发送到错误队列 (validation), ACK
延迟重试实现(DLX + 重试队列方案)

重试策略

  • 重试延迟: 固定 5 秒(通过 TTL 实现)
  • 最大重试次数: 3 次
  • 超过 3 次: 进入错误队列

实现方式(无需额外插件):

消费失败 → nack(requeue=false) → 进入 DLX
           ↓
    x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列
           ↓
    x-retries >= 3 → 错误队列 (人工处理)

工作流程

  1. 消费失败时,消费者执行 nack(requeue=false),消息进入 DLX(如 dlx.orders
  2. DLX 根据消息 header 中的 x-retries 判断路由:
    • x-retries < 3:路由到 orders.retry.queue(延迟重试)
    • x-retries >= 3:路由到 errors.queue(永久失败)
  3. 重试队列 TTL 到期后,消息自动死信回主队列,重新消费
  4. 消费者需要在 nack 前更新消息的 x-retries header

3. 运维团队

职责

  • RabbitMQ 集群部署和维护
  • 用户和权限管理
  • 监控队列健康状态
  • 处理系统级错误
  • 性能优化和故障排查

权限

操作 权限 说明
管理员账号 admin tag 访问管理界面,完全权限
监控错误队列 errors.queue (read) 查看所有平台的错误消息
配置资源 所有 Exchange/Queue 创建、修改、删除资源

日常操作

监控关键指标

队列堆积监控

# 查看所有队列状态
rabbitmqctl list_queues -p datahub-app name messages_ready messages_unacknowledged

# 告警阈值
# - messages_ready > 10000: 严重堆积
# - messages_unacknowledged > 5000: 消费者处理缓慢

消费者连接监控

# 查看所有消费者连接
rabbitmqctl list_consumers -p datahub-app

# 检查项:
# - 消费者数量是否正常
# - prefetch_count 配置是否合理
# - 是否有消费者断开

错误率监控

-- 查询最近 1 小时各平台错误数量
SELECT
  platform,
  error_type,
  COUNT(*) as error_count
FROM failed_messages
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY platform, error_type
ORDER BY error_count DESC;
故障排查

队列堆积问题

排查步骤:
1. 检查消费者是否在线
   rabbitmqctl list_consumers -p datahub-app

2. 查看消费速率
   管理界面 → Queues → 选择队列 → 查看 Deliver/Get rate

3. 检查消费者日志
   - 是否有异常报错
   - 处理时间是否过长

解决方案:
- 增加消费者实例数量
- 增加 prefetch_count
- 优化业务处理逻辑

消息丢失问题

排查步骤:
1. 确认消息是否发布成功
   管理界面 → Exchanges → 查看 Message rates (Publish in)

2. 确认 Binding 配置
   管理界面 → Exchanges → 选择 Exchange → 查看 Bindings

3. 检查消息是否持久化
   - Exchange durable: true
   - Queue durable: true
   - Message delivery_mode: 2

4. 查看消费者 ACK 行为
   - 是否正确 ACK
   - 是否有 NACK/REJECT
性能优化

优化消费速度

措施:
1. 调整批处理参数
   - batch_size: 当前 50,可根据消息大小调整到 30-100
   - batch_timeout: 当前 3s,可根据实时性要求调整到 1-5s

2. 调整 prefetch_count
   - 当前: 100
   - 可根据内存和消息处理速度调整到 50-200
   - prefetch_count 应该是 batch_size 的 2-3 倍

3. 优化批处理逻辑
   - 使用数据库批量插入(INSERT ... VALUES (...), (...), ...
   - 批次内消息可以并发验证(适配器模式)
   - 但必须单条 ACK 保证可靠性

4. 优化业务逻辑
   - 减少数据库查询
   - 使用连接池
   - 异步处理非关键操作

5. 水平扩展(谨慎)
   - 单队列单消费者模型,不建议同一队列多消费者
   - 可考虑分片:不同数据类型独立扩展(orders/products 分别扩展)

队列性能优化

# 启用 lazy queue (大量消息堆积时)
rabbitmqctl set_policy lazy-queue "^(orders|products|refunds)\.queue$" \
  '{"queue-mode":"lazy"}' --vhost datahub-app

# 效果:
# - 消息存储在磁盘,减少内存占用
# - 适用于消息堆积严重的场景

4. RabbitMQ 管理员

职责

  • 初始化 RabbitMQ 配置
  • 创建 VHost、Exchange、Queue
  • 管理用户和权限
  • 新平台接入配置

核心操作

新平台接入

当需要接入新平台(如 aliexpress)时,执行以下步骤:

步骤 1: 创建 Exchange

# 业务 Exchange
rabbitmqadmin -u admin -p <password> declare exchange \
  name="aliexpress.exchange" vhost=datahub-app \
  type=topic durable=true

# 错误 Exchange
rabbitmqadmin -u admin -p <password> declare exchange \
  name="aliexpress.errors.exchange" vhost=datahub-app \
  type=topic durable=true

步骤 2: 创建 Binding

# 绑定到业务队列(使用 {data_type}.{platform} 格式)
rabbitmqadmin -u admin -p <password> declare binding \
  source="aliexpress.exchange" destination="orders.queue" \
  vhost=datahub-app routing_key="order.aliexpress"

rabbitmqadmin -u admin -p <password> declare binding \
  source="aliexpress.exchange" destination="products.queue" \
  vhost=datahub-app routing_key="product.aliexpress"

rabbitmqadmin -u admin -p <password> declare binding \
  source="aliexpress.exchange" destination="refunds.queue" \
  vhost=datahub-app routing_key="refund.aliexpress"

rabbitmqadmin -u admin -p <password> declare binding \
  source="aliexpress.exchange" destination="inventory.queue" \
  vhost=datahub-app routing_key="inventory.aliexpress"

# 绑定到错误队列
rabbitmqadmin -u admin -p <password> declare binding \
  source="aliexpress.errors.exchange" destination="errors.queue" \
  vhost=datahub-app routing_key="#"

步骤 3: 创建用户

# 创建用户
rabbitmqadmin -u admin -p <password> declare user \
  name="user_aliexpress" password="<strong_password>" tags=""

# 配置权限
rabbitmqadmin -u admin -p <password> declare permission \
  vhost=datahub-app user="user_aliexpress" \
  configure="" write="^aliexpress\.(exchange|errors\.exchange)$" \
  read="^aliexpress\.errors\..*$"

步骤 4: 提供给开发者

连接信息:
- Host: <rabbitmq-host>
- Port: 5672
- VHost: datahub-app
- Username: user_aliexpress
- Password: <strong_password>

Exchange: aliexpress.exchange
Routing Keys:
  - 订单: order.aliexpress
  - 产品: product.aliexpress
  - 退款: refund.aliexpress
  - 库存: inventory.aliexpress

错误订阅 (可选): aliexpress.errors.exchange

监控和告警

1. 关键监控指标

graph TB
    subgraph "队列监控"
        M1[messages_ready<br/>待消费消息数]
        M2[messages_unacknowledged<br/>未确认消息数]
        M3[message_stats.publish_in_rate<br/>消息发布速率]
        M4[message_stats.deliver_rate<br/>消息消费速率]
    end

    subgraph "消费者监控"
        C1[consumer_count<br/>消费者数量]
        C2[prefetch_count<br/>预取数量]
        C3[connection_state<br/>连接状态]
    end

    subgraph "错误监控"
        E1[errors.queue 消息数<br/>错误队列堆积]
        E2[failed_messages 表记录数<br/>失败消息总数]
        E3[每小时错误增长率<br/>错误频率]
    end

    subgraph "系统监控"
        S1[disk_free<br/>磁盘剩余空间]
        S2[mem_used<br/>内存使用率]
        S3[fd_used<br/>文件描述符使用]
    end

    style M1 fill:#ffcdd2
    style M2 fill:#ffcdd2
    style E1 fill:#ffcdd2
    style E2 fill:#ffcdd2

2. 告警规则

指标 告警阈值 级别 处理建议
messages_ready > 10,000 警告 增加消费者
messages_ready > 50,000 严重 紧急扩容
messages_unacknowledged > 5,000 警告 检查消费者处理速度
consumer_count = 0 严重 消费者服务挂掉
publish_rate - deliver_rate > 1000/s 警告 消费速度跟不上生产速度
errors.queue 消息数 > 1,000 警告 大量消息处理失败
单平台小时错误数 > 100 警告 平台数据质量问题
disk_free < 10 GB 严重 清理磁盘或扩容
mem_used > 80% 警告 优化或增加内存

3. 监控实现

Prometheus + Grafana

# prometheus.yml
scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets: ['rabbitmq-exporter:9419']
    metrics_path: /metrics

告警规则示例

# alert_rules.yml
groups:
  - name: rabbitmq_alerts
    rules:
      - alert: QueueBacklog
        expr: rabbitmq_queue_messages_ready{queue=~"orders|products|refunds"} > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "队列 {{ $labels.queue }} 堆积严重"
          description: "当前待消费消息数: {{ $value }}"

      - alert: NoConsumers
        expr: rabbitmq_queue_consumers{queue=~"orders|products|refunds"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "队列 {{ $labels.queue }} 无消费者"
          description: "消费者服务可能已挂掉"

故障恢复

1. 消费者服务挂掉

graph LR
    A[检测到消费者断开] --> B[消息堆积在队列中]
    B --> C[告警通知运维]
    C --> D[重启消费者服务]
    D --> E[消费者自动重连]
    E --> F[继续消费堆积消息]
    F --> G{消息是否过期?}
    G -->|未过期| H[正常处理]
    G -->|已过期| I[消息被 TTL 删除]

    style A fill:#ffcdd2
    style C fill:#fff9c4
    style H fill:#c8e6c9
    style I fill:#ffab91

恢复时间

  • 自动重连: 秒级
  • 手动重启: 分钟级
  • 数据不会丢失(消息持久化)

2. RabbitMQ 服务重启

graph TB
    A[RabbitMQ 重启] --> B[持久化资源自动恢复]
    B --> C[Exchanges 恢复]
    B --> D[Queues 恢复]
    B --> E[Bindings 恢复]
    B --> F[持久化消息恢复]

    A --> G[生产者重连]
    A --> H[消费者重连]

    G --> I[继续发布消息]
    H --> J[继续消费消息]

    style A fill:#ffcdd2
    style C fill:#c8e6c9
    style D fill:#c8e6c9
    style E fill:#c8e6c9
    style F fill:#c8e6c9

数据安全性

  • 持久化 Exchange 自动恢复
  • 持久化 Queue 自动恢复
  • 持久化消息自动恢复
  • 未 ACK 的消息会重新投递(可能重复消费,需幂等性保证)

3. 数据库连接失败

graph TB
    A[数据库连接失败] --> B{retry_count < 3?}
    B -->|是| C[延迟重试<br/>60s / 120s / 240s]
    C --> D[重新入队]
    D --> E[等待下次消费]
    E --> B

    B -->|否| F[超过重试次数]
    F --> G[发送到错误队列]
    G --> H[人工介入处理]

    style A fill:#ffcdd2
    style C fill:#fff9c4
    style G fill:#ffab91
    style H fill:#ce93d8

最佳实践

1. 生产者最佳实践

  • 使用结构化 message_id:格式 {type}#{app}#{company}#{platform}#{store}#{type}#{unique_id}
  • 必须提供 data_version:从平台数据中提取更新时间戳,用于防止乱序更新
  • 消息持久化delivery_mode=2
  • 包含完整 raw_data:保留平台原始完整数据,便于后续数据补全和问题排查
  • 使用正确的 routing_keyorder/product/refund/inventory
  • 实现发布确认:等待 RabbitMQ 确认消息已接收
  • 实现重连机制:网络故障时自动重连
  • 不要发送超大消息(> 1 MB):考虑分片或使用对象存储 + 引用

2. 消费者最佳实践

  • 单消费者模型:每个队列一个消费者实例,避免消息分散
  • 批处理机制:使用 prefetch + 批处理提升吞吐(batch_size=50, prefetch_count=100
  • 适配器模式:消费者内部根据平台字段分发到不同业务逻辑
  • 手动 ACK:单条确认,失败消息不影响其他消息
  • 幂等性保证:使用 processed_messages 表记录已处理的 message_id
  • 版本控制:使用 data_version 字段防止乱序更新(批处理内部可能并发)
  • 事务处理:在一个事务中完成幂等性检查 + 业务数据更新
  • DLX 重试机制:失败消息通过 nack(requeue=false) 进入 DLX,延迟 5 秒后自动重试
  • 错误分类处理:区分可重试错误(临时)和业务错误(永久)
  • 限制重试次数:最多 3 次,通过消息 header x-retries 控制
  • 批量数据库操作:批次内使用批量插入提升性能
  • 记录详细日志:便于故障排查
  • 定期清理 processed_messages:保留 30 天历史记录即可
  • 不要同队列多消费者:会导致消息 Round-Robin 分配,破坏设计

3. 运维最佳实践

  • 定期备份配置:使用 rabbitmqadmin export
  • 监控告警:配置 Prometheus + Grafana
  • 日志归档:定期清理 failed_messages
  • 压力测试:定期测试消息吞吐能力
  • 灾难恢复演练:定期演练故障恢复流程
  • 文档更新:及时更新配置文档

附录

A. 消息示例

订单消息

{
  "message_id": "order#datahub#100#20#200#order#DY202501140001",
  "timestamp": "2025-01-14T10:30:00Z",
  "platform": "douyin",
  "data_type": "order",
  "metadata": {
    "platform_id": 20,
    "company_id": 100,
    "store_id": 200,
    "source_system": "douyin-open-api",
    "retry_count": 0,
    "data_version": 1736840000
  },
  "data": {
    "platform_unique_id": "DY202501140001",
    "raw_data": {
      "order_id": "DY202501140001",
      "create_time": 1736839200,
      "pay_time": 1736840000,
      "update_time": 1736840000,
      "order_amount": 9999,
      "pay_amount": 9999,
      "order_status": 2,
      "receiver_info": {
        "province": "北京市",
        "city": "北京市",
        "district": "朝阳区",
        "detail": "某某街道123号"
      }
    }
  }
}

产品消息

{
  "message_id": "product#datahub#100#2#201#product#TM-PROD-789",
  "timestamp": "2025-01-14T11:00:00Z",
  "platform": "tmall",
  "data_type": "product",
  "metadata": {
    "platform_id": 2,
    "company_id": 100,
    "store_id": 201,
    "source_system": "tmall-top-api",
    "retry_count": 0,
    "data_version": 1736841600
  },
  "data": {
    "platform_unique_id": "TM-PROD-789",
    "raw_data": {
      "num_iid": 789123456789,
      "title": "示例产品",
      "price": "499.00",
      "type": "fixed",
      "modified": "2025-01-14 11:00:00",
      "sku_list": [
        {
          "sku_id": 987654321,
          "price": "499.00",
          "quantity": 100
        }
      ]
    }
  }
}

B. 参考文档