Files
datahub/docs/OrderConsumer业务流程.md
2026-02-27 16:45:16 +08:00

14 KiB
Raw Permalink Blame History

OrderConsumer 业务流程图

概览

OrderConsumer 是订单消息的消费者,负责从 orders.queue 接收消息,解析并持久化订单数据到数据库。它实现了自动重试机制,失败的消息会自动重试,超过重试次数的消息会被发送到错误队列等待人工处理。


完整流程图

flowchart TD
    Start([消息到达 orders.queue]) --> CheckDebug{是否开启调试延迟?}
    CheckDebug -->|是| Sleep[休眠 N 秒]
    CheckDebug -->|否| GetRetry[获取重试次数]
    Sleep --> GetRetry

    GetRetry --> ParseRetry{检查 application_headers}

    ParseRetry -->|不存在| FirstTime[首次处理 retryCount=0]
    ParseRetry -->|存在| ReadXDeath[读取 x-death count]
    ReadXDeath --> RetryCount[retryCount = count]

    FirstTime --> LogRetry[记录重试次数]
    RetryCount --> LogRetry

    LogRetry --> TryBlock[开始处理消息]

    TryBlock --> CreateParser[创建 Parser]
    CreateParser --> ExtractMeta[提取 metadata]
    ExtractMeta --> EntityMatch[匹配实体对象]
    EntityMatch --> EntityMap[转换 raw_data]
    EntityMap --> BeginTx[开始事务]

    BeginTx --> LoopSave[保存实体]
    LoopSave --> Commit[提交事务]
    Commit --> ReturnACK[返回 ACK]
    ReturnACK --> End([处理完成])

    TryBlock -->|异常| CatchError[捕获异常]
    CatchError --> Rollback[回滚事务]
    Rollback --> LogError[记录错误]
    LogError --> CheckMaxRetry{超过重试次数?}

    CheckMaxRetry -->|是| ExceededLog[超过重试次数]
    CheckMaxRetry -->|否| NotExceededLog[未超过次数]

    ExceededLog --> SendError[发送到错误队列]
    SendError --> BuildErrorMsg[构建错误消息]
    BuildErrorMsg --> ProduceError[发送到 errors.exchange]
    ProduceError --> TrySend{发送成功?}

    TrySend -->|成功| SuccessLog[记录成功日志]
    TrySend -->|失败| FailLog[记录失败日志]

    SuccessLog --> AckAfterError[返回 ACK]
    FailLog --> AckAfterError
    AckAfterError --> EndError([已发送到错误队列])

    NotExceededLog --> ReturnNACK[返回 NACK]
    ReturnNACK --> ToDLX([进入 DLX])
    ToDLX --> ToRetryQueue[进入重试队列]
    ToRetryQueue --> WaitTTL[等待 5 秒]
    WaitTTL --> BackToMain[TTL到期回流]
    BackToMain --> BackToQueue[回到 orders.queue]
    BackToQueue --> Start

    style Start fill:#e1f5e1
    style End fill:#e1f5e1
    style EndError fill:#ffe1e1
    style ReturnACK fill:#90EE90
    style AckAfterError fill:#FFB6C1
    style ReturnNACK fill:#FFD700
    style SendError fill:#FF6B6B
    style Commit fill:#4CAF50
    style Rollback fill:#F44336

消息流转状态图

stateDiagram-v2
    [*] --> OrdersQueue: 新消息到达

    OrdersQueue --> Processing: Consumer 消费

    Processing --> Success: 处理成功
    Processing --> FirstFailure: 首次失败 count=0
    Processing --> SecondFailure: 第2次失败 count=1
    Processing --> ThirdFailure: 第3次失败 count=2
    Processing --> MaxRetryExceeded: 超过重试次数

    Success --> [*]: ACK

    FirstFailure --> DLX: NACK requeue=false
    SecondFailure --> DLX: NACK requeue=false
    ThirdFailure --> DLX: NACK requeue=false

    DLX --> RetryQueue: routing_key retry

    RetryQueue --> RetryQueue: 等待 TTL 5秒
    RetryQueue --> MainExchange: TTL到期自动死信

    MainExchange --> OrdersQueue: routing_key order.retry

    MaxRetryExceeded --> ErrorsExchange: sendToErrorQueue
    ErrorsExchange --> ErrorsQueue: routing_key error
    ErrorsQueue --> [*]: 等待人工处理

重试计数机制

graph LR
    M1[消息 首次处理] --> H1{有 headers?}
    H1 -->|否| C0[count = 0]

    C0 -->|NACK| M2[消息 第1次重试]
    M2 --> H2{有 headers?}
    H2 -->|是| XD2[读取 x-death]
    XD2 --> C1[count = 1]

    C1 -->|NACK| M3[消息 第2次重试]
    M3 --> H3{有 headers?}
    H3 -->|是| XD3[读取 x-death]
    XD3 --> C2[count = 2]

    C2 -->|NACK| M4[消息 第3次重试]
    M4 --> H4{有 headers?}
    H4 -->|是| XD4[读取 x-death]
    XD4 --> C3[count = 3]

    C3 --> ERROR[发送到错误队列]

    style C0 fill:#90EE90
    style C1 fill:#FFD700
    style C2 fill:#FFA500
    style C3 fill:#FF6B6B
    style ERROR fill:#DC143C,color:#fff

关键代码逻辑

1. getRetryCount() - 获取重试次数

protected function getRetryCount(AMQPMessage $message): int
{
    // ⚠️ 关键:首次消息没有 application_headers,必须先检查
    if (!$message->has('application_headers')) {
        return 0;  // 首次处理
    }

    $headers = $message->get('application_headers');
    if (!$headers) {
        return 0;
    }

    $headerData = $headers->getNativeData();
    $xDeath = $headerData['x-death'] ?? [];

    if (empty($xDeath)) {
        return 0;
    }

    // RabbitMQ 自动维护的重试次数
    return $xDeath[0]['count'] ?? 0;
}

x-death 结构示例:

[
    'x-death' => [
        0 => [
            'count' => 2,                    // 重试次数
            'reason' => 'rejected',          // 死信原因
            'queue' => 'orders.queue',       // 原队列
            'time' => 1701234567,           // 时间戳
            'exchange' => 'dlx.orders',      // DLX 名称
            'routing-keys' => ['retry']
        ]
    ]
]

2. sendToErrorQueue() - 发送到错误队列

protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
{
    $originalData = json_decode($message->getBody(), true);

    // 构建详细的错误消息
    $errorMessage = [
        'error_id' => uniqid('err_', true),
        'original_message' => $originalData,
        'error' => [
            'type' => get_class($error),
            'message' => $error->getMessage(),
            'trace' => $error->getTraceAsString(),
            'timestamp' => date('c'),
        ],
        'metadata' => [
            'platform' => $originalData['platform'] ?? 'unknown',
            'platform_id' => $originalData['meta']['platform_id'] ?? null,
            'company_id' => $originalData['meta']['company_id'] ?? null,
            'store_id' => $originalData['meta']['store_id'] ?? null,
            'data_type' => $originalData['data_type'] ?? 'unknown',
            'failed_at' => date('c'),
            'retry_count' => $this->getRetryCount($message),
        ],
    ];

    // 使用 errors.exchange 发送(不是默认 exchange
    $producer = ApplicationContext::getContainer()->get(Producer::class);
    $producer->produce(
        new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage {
            protected string $exchange = 'errors.exchange';      // ✅ 通用错误交换机
            protected string|array $routingKey = 'error';
            protected string $poolName = 'default_consumer';
            protected array $properties = [
                'delivery_mode' => 2,  // 持久化
            ];

            public function __construct(array $data) {
                $this->payload = $data;
            }
        }
    );
}

时序图:完整的重试流程

sequenceDiagram
    participant P as Producer
    participant MQ as orders.queue
    participant C as OrderConsumer
    participant DLX as dlx.orders
    participant RQ as orders.retry.queue
    participant ME as main.exchange
    participant EX as errors.exchange
    participant EQ as errors.queue
    participant DB as Database

    Note over P,MQ: 1. 首次消息
    P->>MQ: 发送订单消息
    MQ->>C: 消费消息 (count=0)
    C->>C: getRetryCount() → 0
    C->>DB: 尝试保存数据
    DB-->>C: 失败 (异常)
    C->>C: count(0) < MAX(3)
    C->>MQ: NACK(requeue=false)
    MQ->>DLX: 死信到 DLX
    Note over DLX: x-death count=1
    DLX->>RQ: routing_key: retry

    Note over RQ: 2. 第1次重试
    RQ->>RQ: 等待 TTL 5秒
    RQ->>ME: TTL到期,自动死信
    ME->>MQ: routing_key: order.retry
    MQ->>C: 消费消息 (count=1)
    C->>C: getRetryCount() → 1
    C->>DB: 尝试保存数据
    DB-->>C: 失败 (异常)
    C->>C: count(1) < MAX(3)
    C->>MQ: NACK(requeue=false)
    MQ->>DLX: 死信到 DLX
    Note over DLX: x-death count=2
    DLX->>RQ: routing_key: retry

    Note over RQ: 3. 第2次重试
    RQ->>RQ: 等待 TTL 5秒
    RQ->>ME: TTL到期,自动死信
    ME->>MQ: routing_key: order.retry
    MQ->>C: 消费消息 (count=2)
    C->>C: getRetryCount() → 2
    C->>DB: 尝试保存数据
    DB-->>C: 失败 (异常)
    C->>C: count(2) < MAX(3)
    C->>MQ: NACK(requeue=false)
    MQ->>DLX: 死信到 DLX
    Note over DLX: x-death count=3
    DLX->>RQ: routing_key: retry

    Note over RQ,EQ: 4. 超过重试次数
    RQ->>RQ: 等待 TTL 5秒
    RQ->>ME: TTL到期,自动死信
    ME->>MQ: routing_key: order.retry
    MQ->>C: 消费消息 (count=3)
    C->>C: getRetryCount() → 3
    C->>DB: 尝试保存数据
    DB-->>C: 失败 (异常)
    C->>C: count(3) >= MAX(3) ✓
    C->>EX: sendToErrorQueue()
    EX->>EQ: 消息进入错误队列
    C->>MQ: ACK (防止再次重试)
    Note over EQ: 等待人工处理

配置参数

环境变量 (.env)

# 最大重试次数(默认3次)
AMQP_MAX_RETRIES=3

# 调试延迟(秒),便于在 mq:status 中观察消息流转
# 生产环境设置为 0,开发环境可设置为 2
AMQP_CONSUMER_DEBUG_DELAY=0

Consumer 配置

#[Consumer(
    exchange: "main.exchange",
    routingKey: "order.#",
    queue: "orders.queue",
    pool: "default_consumer",
    nums: 1,
    enable: true
)]
class OrderConsumer extends ConsumerMessage
{
    // ⚠️ 关键:必须设置为 false,否则会无限循环重试
    protected bool $requeue = false;

    protected ?array $qos = [
        'prefetch_size' => 0,
        'prefetch_count' => 1,  // 开发环境建议设置为1,生产环境可设置为10-100
        'global' => false,
    ];
}

队列配置 (rabbitmq.sh)

orders.queue:

  • x-dead-letter-exchange: dlx.orders - 失败时发送到的 DLX
  • x-dead-letter-routing-key: retry - DLX 路由键
  • x-message-ttl: 86400000 (24小时) - 消息最长存活时间

orders.retry.queue:

  • x-message-ttl: 5000 (5秒) - 重试延迟时间
  • x-dead-letter-exchange: main.exchange - TTL到期后回流的 exchange
  • x-dead-letter-routing-key: order.retry - 回流时的 routing key

errors.queue:

  • x-message-ttl: 604800000 (7天) - 错误消息保留时间

监控和调试

查看队列状态

php bin/hyperf.php app:mq:status

输出示例:

=== Business Queues ===
+-----------+---------+----------+---------+-----------+
| Metric    | orders  | products | refunds | inventory |
+-----------+---------+----------+---------+-----------+
| Messages  | 0       | 0        | 0       | 0         |
| Consumers | 1       | 0        | 0       | 0         |
| Status    | ✓ Empty | ✓ Empty  | ✓ Empty | ✓ Empty   |
+-----------+---------+----------+---------+-----------+

=== Dead Letter Queues (Retry Queues) ===
+-----------+--------------+----------------+---------------+-----------------+
| Metric    | orders.retry | products.retry | refunds.retry | inventory.retry |
+-----------+--------------+----------------+---------------+-----------------+
| Messages  | 0            | 0              | 0             | 0               |
| Consumers | 0            | 0              | 0             | 0               |
| Status    | ✓ Empty      | ✓ Empty        | ✓ Empty       | ✓ Empty         |
+-----------+--------------+----------------+---------------+-----------------+

=== Shared Queues ===
+-----------+----------+
| Metric    | errors   |
+-----------+----------+
| Messages  | 5        |
| Consumers | 0        |
| Status    | ✓ Active |
+-----------+----------+

开启调试模式观察流程

# 1. 设置调试延迟
echo "AMQP_CONSUMER_DEBUG_DELAY=2" >> .env

# 2. 启动消费者
php bin/hyperf.php start

# 3. 发送测试消息
php bin/hyperf.php app:mq-push:tmall

# 4. 观察日志输出
tail -f runtime/logs/hyperf.log

你将看到:

>>> No application_headers, first time processing
Retry count: 0/3
=== Error Caught ===
Error: Cannot find parser for platform 'tmall' and entity 'order'
>>> Retry not exceeded, sending to DLX (NACK)

[5秒后]
>>> Extracted count from x-death: 1
Retry count: 1/3
>>> Retry not exceeded, sending to DLX (NACK)

[5秒后]
>>> Extracted count from x-death: 2
Retry count: 2/3
>>> Retry not exceeded, sending to DLX (NACK)

[5秒后]
>>> Extracted count from x-death: 3
Retry count: 3/3
>>> MAX RETRIES EXCEEDED! Sending to error queue...
>>> Successfully sent to error queue!
>>> Returning ACK to prevent further retries

常见问题排查

问题1: 消息无限循环,队列始终显示空

原因: requeue 没有设置为 false

解决:

protected bool $requeue = false;  // ⚠️ 必须设置

问题2: OutOfBoundsException - No "application_headers"

原因: 首次处理的消息没有 application_headers 属性

解决: 使用 has() 检查

if (!$message->has('application_headers')) {
    return 0;
}

问题3: 消息不进入错误队列

原因:

  1. Consumer 没有 errors.exchange 的写权限
  2. 使用了默认 exchange (空字符串) 而非 errors.exchange

解决:

  1. 确保 rabbitmq.sh 中 consumer 权限包含 errors.exchange
  2. 使用 errors.exchange 而非空 exchange:
protected string $exchange = 'errors.exchange';  // ✅ 正确
protected string $exchange = '';                 // ❌ 错误

问题4: 重试次数始终是 0

原因:

  1. 重试队列没有正确配置 x-dead-letter-routing-key
  2. 消息没有回流到主队列

解决: 确保 rabbitmq.sh 中重试队列配置:

--arguments '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"order.retry"}'

相关文档

注意事项

EntityParse 为数据处理的核心业务,会影响后续连续聚合物化视图的结果

backend/app/Platform/[Platform]/EntityParse/Order.php backend/app/Platform/Tmall/EntityParse/Order.php