# OrderConsumer 业务流程图 ## 概览 OrderConsumer 是订单消息的消费者,负责从 `orders.queue` 接收消息,解析并持久化订单数据到数据库。它实现了自动重试机制,失败的消息会自动重试,超过重试次数的消息会被发送到错误队列等待人工处理。 --- ## 完整流程图 ```mermaid flowchart TD Start([消息到达 orders.queue]) --> CheckDebug{是否开启调试延迟?} CheckDebug -->|是| Sleep[休眠 N 秒] CheckDebug -->|否| GetRetry[获取重试次数] Sleep --> GetRetry GetRetry --> ParseRetry{检查 application_headers} ParseRetry -->|不存在| FirstTime[首次处理 retryCount=0] ParseRetry -->|存在| ReadXDeath[读取 x-death count] ReadXDeath --> RetryCount[retryCount = count] FirstTime --> LogRetry[记录重试次数] RetryCount --> LogRetry LogRetry --> TryBlock[开始处理消息] TryBlock --> CreateParser[创建 Parser] CreateParser --> ExtractMeta[提取 metadata] ExtractMeta --> EntityMatch[匹配实体对象] EntityMatch --> EntityMap[转换 raw_data] EntityMap --> BeginTx[开始事务] BeginTx --> LoopSave[保存实体] LoopSave --> Commit[提交事务] Commit --> ReturnACK[返回 ACK] ReturnACK --> End([处理完成]) TryBlock -->|异常| CatchError[捕获异常] CatchError --> Rollback[回滚事务] Rollback --> LogError[记录错误] LogError --> CheckMaxRetry{超过重试次数?} CheckMaxRetry -->|是| ExceededLog[超过重试次数] CheckMaxRetry -->|否| NotExceededLog[未超过次数] ExceededLog --> SendError[发送到错误队列] SendError --> BuildErrorMsg[构建错误消息] BuildErrorMsg --> ProduceError[发送到 errors.exchange] ProduceError --> TrySend{发送成功?} TrySend -->|成功| SuccessLog[记录成功日志] TrySend -->|失败| FailLog[记录失败日志] SuccessLog --> AckAfterError[返回 ACK] FailLog --> AckAfterError AckAfterError --> EndError([已发送到错误队列]) NotExceededLog --> ReturnNACK[返回 NACK] ReturnNACK --> ToDLX([进入 DLX]) ToDLX --> ToRetryQueue[进入重试队列] ToRetryQueue --> WaitTTL[等待 5 秒] WaitTTL --> BackToMain[TTL到期回流] BackToMain --> BackToQueue[回到 orders.queue] BackToQueue --> Start style Start fill:#e1f5e1 style End fill:#e1f5e1 style EndError fill:#ffe1e1 style ReturnACK fill:#90EE90 style AckAfterError fill:#FFB6C1 style ReturnNACK fill:#FFD700 style SendError fill:#FF6B6B style Commit fill:#4CAF50 style Rollback fill:#F44336 ``` --- ## 消息流转状态图 ```mermaid stateDiagram-v2 [*] --> OrdersQueue: 新消息到达 OrdersQueue --> Processing: Consumer 消费 Processing --> Success: 处理成功 Processing --> FirstFailure: 首次失败 count=0 Processing --> SecondFailure: 第2次失败 count=1 Processing --> ThirdFailure: 第3次失败 count=2 Processing --> MaxRetryExceeded: 超过重试次数 Success --> [*]: ACK FirstFailure --> DLX: NACK requeue=false SecondFailure --> DLX: NACK requeue=false ThirdFailure --> DLX: NACK requeue=false DLX --> RetryQueue: routing_key retry RetryQueue --> RetryQueue: 等待 TTL 5秒 RetryQueue --> MainExchange: TTL到期自动死信 MainExchange --> OrdersQueue: routing_key order.retry MaxRetryExceeded --> ErrorsExchange: sendToErrorQueue ErrorsExchange --> ErrorsQueue: routing_key error ErrorsQueue --> [*]: 等待人工处理 ``` --- ## 重试计数机制 ```mermaid graph LR M1[消息 首次处理] --> H1{有 headers?} H1 -->|否| C0[count = 0] C0 -->|NACK| M2[消息 第1次重试] M2 --> H2{有 headers?} H2 -->|是| XD2[读取 x-death] XD2 --> C1[count = 1] C1 -->|NACK| M3[消息 第2次重试] M3 --> H3{有 headers?} H3 -->|是| XD3[读取 x-death] XD3 --> C2[count = 2] C2 -->|NACK| M4[消息 第3次重试] M4 --> H4{有 headers?} H4 -->|是| XD4[读取 x-death] XD4 --> C3[count = 3] C3 --> ERROR[发送到错误队列] style C0 fill:#90EE90 style C1 fill:#FFD700 style C2 fill:#FFA500 style C3 fill:#FF6B6B style ERROR fill:#DC143C,color:#fff ``` --- ## 关键代码逻辑 ### 1. getRetryCount() - 获取重试次数 ```php protected function getRetryCount(AMQPMessage $message): int { // ⚠️ 关键:首次消息没有 application_headers,必须先检查 if (!$message->has('application_headers')) { return 0; // 首次处理 } $headers = $message->get('application_headers'); if (!$headers) { return 0; } $headerData = $headers->getNativeData(); $xDeath = $headerData['x-death'] ?? []; if (empty($xDeath)) { return 0; } // RabbitMQ 自动维护的重试次数 return $xDeath[0]['count'] ?? 0; } ``` **x-death 结构示例**: ```php [ 'x-death' => [ 0 => [ 'count' => 2, // 重试次数 'reason' => 'rejected', // 死信原因 'queue' => 'orders.queue', // 原队列 'time' => 1701234567, // 时间戳 'exchange' => 'dlx.orders', // DLX 名称 'routing-keys' => ['retry'] ] ] ] ``` ### 2. sendToErrorQueue() - 发送到错误队列 ```php protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void { $originalData = json_decode($message->getBody(), true); // 构建详细的错误消息 $errorMessage = [ 'error_id' => uniqid('err_', true), 'original_message' => $originalData, 'error' => [ 'type' => get_class($error), 'message' => $error->getMessage(), 'trace' => $error->getTraceAsString(), 'timestamp' => date('c'), ], 'metadata' => [ 'platform' => $originalData['platform'] ?? 'unknown', 'platform_id' => $originalData['meta']['platform_id'] ?? null, 'company_id' => $originalData['meta']['company_id'] ?? null, 'store_id' => $originalData['meta']['store_id'] ?? null, 'data_type' => $originalData['data_type'] ?? 'unknown', 'failed_at' => date('c'), 'retry_count' => $this->getRetryCount($message), ], ]; // 使用 errors.exchange 发送(不是默认 exchange) $producer = ApplicationContext::getContainer()->get(Producer::class); $producer->produce( new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage { protected string $exchange = 'errors.exchange'; // ✅ 通用错误交换机 protected string|array $routingKey = 'error'; protected string $poolName = 'default_consumer'; protected array $properties = [ 'delivery_mode' => 2, // 持久化 ]; public function __construct(array $data) { $this->payload = $data; } } ); } ``` --- ## 时序图:完整的重试流程 ```mermaid sequenceDiagram participant P as Producer participant MQ as orders.queue participant C as OrderConsumer participant DLX as dlx.orders participant RQ as orders.retry.queue participant ME as main.exchange participant EX as errors.exchange participant EQ as errors.queue participant DB as Database Note over P,MQ: 1. 首次消息 P->>MQ: 发送订单消息 MQ->>C: 消费消息 (count=0) C->>C: getRetryCount() → 0 C->>DB: 尝试保存数据 DB-->>C: 失败 (异常) C->>C: count(0) < MAX(3) C->>MQ: NACK(requeue=false) MQ->>DLX: 死信到 DLX Note over DLX: x-death count=1 DLX->>RQ: routing_key: retry Note over RQ: 2. 第1次重试 RQ->>RQ: 等待 TTL 5秒 RQ->>ME: TTL到期,自动死信 ME->>MQ: routing_key: order.retry MQ->>C: 消费消息 (count=1) C->>C: getRetryCount() → 1 C->>DB: 尝试保存数据 DB-->>C: 失败 (异常) C->>C: count(1) < MAX(3) C->>MQ: NACK(requeue=false) MQ->>DLX: 死信到 DLX Note over DLX: x-death count=2 DLX->>RQ: routing_key: retry Note over RQ: 3. 第2次重试 RQ->>RQ: 等待 TTL 5秒 RQ->>ME: TTL到期,自动死信 ME->>MQ: routing_key: order.retry MQ->>C: 消费消息 (count=2) C->>C: getRetryCount() → 2 C->>DB: 尝试保存数据 DB-->>C: 失败 (异常) C->>C: count(2) < MAX(3) C->>MQ: NACK(requeue=false) MQ->>DLX: 死信到 DLX Note over DLX: x-death count=3 DLX->>RQ: routing_key: retry Note over RQ,EQ: 4. 超过重试次数 RQ->>RQ: 等待 TTL 5秒 RQ->>ME: TTL到期,自动死信 ME->>MQ: routing_key: order.retry MQ->>C: 消费消息 (count=3) C->>C: getRetryCount() → 3 C->>DB: 尝试保存数据 DB-->>C: 失败 (异常) C->>C: count(3) >= MAX(3) ✓ C->>EX: sendToErrorQueue() EX->>EQ: 消息进入错误队列 C->>MQ: ACK (防止再次重试) Note over EQ: 等待人工处理 ``` --- ## 配置参数 ### 环境变量 (.env) ```bash # 最大重试次数(默认3次) AMQP_MAX_RETRIES=3 # 调试延迟(秒),便于在 mq:status 中观察消息流转 # 生产环境设置为 0,开发环境可设置为 2 AMQP_CONSUMER_DEBUG_DELAY=0 ``` ### Consumer 配置 ```php #[Consumer( exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", pool: "default_consumer", nums: 1, enable: true )] class OrderConsumer extends ConsumerMessage { // ⚠️ 关键:必须设置为 false,否则会无限循环重试 protected bool $requeue = false; protected ?array $qos = [ 'prefetch_size' => 0, 'prefetch_count' => 1, // 开发环境建议设置为1,生产环境可设置为10-100 'global' => false, ]; } ``` ### 队列配置 (rabbitmq.sh) **orders.queue**: - `x-dead-letter-exchange`: `dlx.orders` - 失败时发送到的 DLX - `x-dead-letter-routing-key`: `retry` - DLX 路由键 - `x-message-ttl`: 86400000 (24小时) - 消息最长存活时间 **orders.retry.queue**: - `x-message-ttl`: 5000 (5秒) - 重试延迟时间 - `x-dead-letter-exchange`: `main.exchange` - TTL到期后回流的 exchange - `x-dead-letter-routing-key`: `order.retry` - 回流时的 routing key **errors.queue**: - `x-message-ttl`: 604800000 (7天) - 错误消息保留时间 --- ## 监控和调试 ### 查看队列状态 ```bash php bin/hyperf.php app:mq:status ``` 输出示例: ``` === Business Queues === +-----------+---------+----------+---------+-----------+ | Metric | orders | products | refunds | inventory | +-----------+---------+----------+---------+-----------+ | Messages | 0 | 0 | 0 | 0 | | Consumers | 1 | 0 | 0 | 0 | | Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty | +-----------+---------+----------+---------+-----------+ === Dead Letter Queues (Retry Queues) === +-----------+--------------+----------------+---------------+-----------------+ | Metric | orders.retry | products.retry | refunds.retry | inventory.retry | +-----------+--------------+----------------+---------------+-----------------+ | Messages | 0 | 0 | 0 | 0 | | Consumers | 0 | 0 | 0 | 0 | | Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty | +-----------+--------------+----------------+---------------+-----------------+ === Shared Queues === +-----------+----------+ | Metric | errors | +-----------+----------+ | Messages | 5 | | Consumers | 0 | | Status | ✓ Active | +-----------+----------+ ``` ### 开启调试模式观察流程 ```bash # 1. 设置调试延迟 echo "AMQP_CONSUMER_DEBUG_DELAY=2" >> .env # 2. 启动消费者 php bin/hyperf.php start # 3. 发送测试消息 php bin/hyperf.php app:mq-push:tmall # 4. 观察日志输出 tail -f runtime/logs/hyperf.log ``` 你将看到: ``` >>> No application_headers, first time processing Retry count: 0/3 === Error Caught === Error: Cannot find parser for platform 'tmall' and entity 'order' >>> Retry not exceeded, sending to DLX (NACK) [5秒后] >>> Extracted count from x-death: 1 Retry count: 1/3 >>> Retry not exceeded, sending to DLX (NACK) [5秒后] >>> Extracted count from x-death: 2 Retry count: 2/3 >>> Retry not exceeded, sending to DLX (NACK) [5秒后] >>> Extracted count from x-death: 3 Retry count: 3/3 >>> MAX RETRIES EXCEEDED! Sending to error queue... >>> Successfully sent to error queue! >>> Returning ACK to prevent further retries ``` --- ## 常见问题排查 ### 问题1: 消息无限循环,队列始终显示空 **原因**: `requeue` 没有设置为 `false` **解决**: ```php protected bool $requeue = false; // ⚠️ 必须设置 ``` ### 问题2: OutOfBoundsException - No "application_headers" **原因**: 首次处理的消息没有 `application_headers` 属性 **解决**: 使用 `has()` 检查 ```php if (!$message->has('application_headers')) { return 0; } ``` ### 问题3: 消息不进入错误队列 **原因**: 1. Consumer 没有 `errors.exchange` 的写权限 2. 使用了默认 exchange (空字符串) 而非 `errors.exchange` **解决**: 1. 确保 rabbitmq.sh 中 consumer 权限包含 `errors.exchange` 2. 使用 `errors.exchange` 而非空 exchange: ```php protected string $exchange = 'errors.exchange'; // ✅ 正确 protected string $exchange = ''; // ❌ 错误 ``` ### 问题4: 重试次数始终是 0 **原因**: 1. 重试队列没有正确配置 `x-dead-letter-routing-key` 2. 消息没有回流到主队列 **解决**: 确保 rabbitmq.sh 中重试队列配置: ```bash --arguments '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"order.retry"}' ``` --- ## 相关文档 - [RabbitMQ.md](RabbitMQ.md) - RabbitMQ 架构和配置详解 - [HyperfAmqp.md](HyperfAmqp.md) - Hyperf AMQP 使用指南和重试机制详解 - [EntityParseFactory完整使用指南.md](EntityParseFactory完整使用指南.md) - 实体解析工厂使用 ## 注意事项 EntityParse 为数据处理的核心业务,会影响后续连续聚合物化视图的结果 backend/app/Platform/[Platform]/EntityParse/Order.php backend/app/Platform/Tmall/EntityParse/Order.php