diff --git a/docs/OrderConsumer业务流程.md b/docs/OrderConsumer业务流程.md new file mode 100644 index 0000000..7e52f18 --- /dev/null +++ b/docs/OrderConsumer业务流程.md @@ -0,0 +1,511 @@ +# OrderConsumer 业务流程图 + +## 概览 + +OrderConsumer 是订单消息的消费者,负责从 `orders.queue` 接收消息,解析并持久化订单数据到数据库。它实现了自动重试机制,失败的消息会自动重试,超过重试次数的消息会被发送到错误队列等待人工处理。 + +--- + +## 完整流程图 + +```mermaid +flowchart TD + Start([消息到达 orders.queue]) --> CheckDebug{是否开启调试延迟?} + CheckDebug -->|是| Sleep[休眠 N 秒] + CheckDebug -->|否| GetRetry[获取重试次数] + Sleep --> GetRetry + + GetRetry --> ParseRetry{检查 application_headers} + + ParseRetry -->|不存在| FirstTime[首次处理 retryCount=0] + ParseRetry -->|存在| ReadXDeath[读取 x-death count] + ReadXDeath --> RetryCount[retryCount = count] + + FirstTime --> LogRetry[记录重试次数] + RetryCount --> LogRetry + + LogRetry --> TryBlock[开始处理消息] + + TryBlock --> CreateParser[创建 Parser] + CreateParser --> ExtractMeta[提取 metadata] + ExtractMeta --> EntityMatch[匹配实体对象] + EntityMatch --> EntityMap[转换 raw_data] + EntityMap --> BeginTx[开始事务] + + BeginTx --> LoopSave[保存实体] + LoopSave --> Commit[提交事务] + Commit --> ReturnACK[返回 ACK] + ReturnACK --> End([处理完成]) + + TryBlock -->|异常| CatchError[捕获异常] + CatchError --> Rollback[回滚事务] + Rollback --> LogError[记录错误] + LogError --> CheckMaxRetry{超过重试次数?} + + CheckMaxRetry -->|是| ExceededLog[超过重试次数] + CheckMaxRetry -->|否| NotExceededLog[未超过次数] + + ExceededLog --> SendError[发送到错误队列] + SendError --> BuildErrorMsg[构建错误消息] + BuildErrorMsg --> ProduceError[发送到 errors.exchange] + ProduceError --> TrySend{发送成功?} + + TrySend -->|成功| SuccessLog[记录成功日志] + TrySend -->|失败| FailLog[记录失败日志] + + SuccessLog --> AckAfterError[返回 ACK] + FailLog --> AckAfterError + AckAfterError --> EndError([已发送到错误队列]) + + NotExceededLog --> ReturnNACK[返回 NACK] + ReturnNACK --> ToDLX([进入 DLX]) + ToDLX --> ToRetryQueue[进入重试队列] + ToRetryQueue --> WaitTTL[等待 5 秒] + WaitTTL --> BackToMain[TTL到期回流] + BackToMain --> BackToQueue[回到 orders.queue] + BackToQueue --> Start + + style Start fill:#e1f5e1 + style End fill:#e1f5e1 + style EndError fill:#ffe1e1 + style ReturnACK fill:#90EE90 + style AckAfterError fill:#FFB6C1 + style ReturnNACK fill:#FFD700 + style SendError fill:#FF6B6B + style Commit fill:#4CAF50 + style Rollback fill:#F44336 +``` + +--- + +## 消息流转状态图 + +```mermaid +stateDiagram-v2 + [*] --> OrdersQueue: 新消息到达 + + OrdersQueue --> Processing: Consumer 消费 + + Processing --> Success: 处理成功 + Processing --> FirstFailure: 首次失败 count=0 + Processing --> SecondFailure: 第2次失败 count=1 + Processing --> ThirdFailure: 第3次失败 count=2 + Processing --> MaxRetryExceeded: 超过重试次数 + + Success --> [*]: ACK + + FirstFailure --> DLX: NACK requeue=false + SecondFailure --> DLX: NACK requeue=false + ThirdFailure --> DLX: NACK requeue=false + + DLX --> RetryQueue: routing_key retry + + RetryQueue --> RetryQueue: 等待 TTL 5秒 + RetryQueue --> MainExchange: TTL到期自动死信 + + MainExchange --> OrdersQueue: routing_key order.retry + + MaxRetryExceeded --> ErrorsExchange: sendToErrorQueue + ErrorsExchange --> ErrorsQueue: routing_key error + ErrorsQueue --> [*]: 等待人工处理 +``` + +--- + +## 重试计数机制 + +```mermaid +graph LR + M1[消息 首次处理] --> H1{有 headers?} + H1 -->|否| C0[count = 0] + + C0 -->|NACK| M2[消息 第1次重试] + M2 --> H2{有 headers?} + H2 -->|是| XD2[读取 x-death] + XD2 --> C1[count = 1] + + C1 -->|NACK| M3[消息 第2次重试] + M3 --> H3{有 headers?} + H3 -->|是| XD3[读取 x-death] + XD3 --> C2[count = 2] + + C2 -->|NACK| M4[消息 第3次重试] + M4 --> H4{有 headers?} + H4 -->|是| XD4[读取 x-death] + XD4 --> C3[count = 3] + + C3 --> ERROR[发送到错误队列] + + style C0 fill:#90EE90 + style C1 fill:#FFD700 + style C2 fill:#FFA500 + style C3 fill:#FF6B6B + style ERROR fill:#DC143C,color:#fff +``` + +--- + +## 关键代码逻辑 + +### 1. getRetryCount() - 获取重试次数 + +```php +protected function getRetryCount(AMQPMessage $message): int +{ + // ⚠️ 关键:首次消息没有 application_headers,必须先检查 + if (!$message->has('application_headers')) { + return 0; // 首次处理 + } + + $headers = $message->get('application_headers'); + if (!$headers) { + return 0; + } + + $headerData = $headers->getNativeData(); + $xDeath = $headerData['x-death'] ?? []; + + if (empty($xDeath)) { + return 0; + } + + // RabbitMQ 自动维护的重试次数 + return $xDeath[0]['count'] ?? 0; +} +``` + +**x-death 结构示例**: +```php +[ + 'x-death' => [ + 0 => [ + 'count' => 2, // 重试次数 + 'reason' => 'rejected', // 死信原因 + 'queue' => 'orders.queue', // 原队列 + 'time' => 1701234567, // 时间戳 + 'exchange' => 'dlx.orders', // DLX 名称 + 'routing-keys' => ['retry'] + ] + ] +] +``` + +### 2. sendToErrorQueue() - 发送到错误队列 + +```php +protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void +{ + $originalData = json_decode($message->getBody(), true); + + // 构建详细的错误消息 + $errorMessage = [ + 'error_id' => uniqid('err_', true), + 'original_message' => $originalData, + 'error' => [ + 'type' => get_class($error), + 'message' => $error->getMessage(), + 'trace' => $error->getTraceAsString(), + 'timestamp' => date('c'), + ], + 'metadata' => [ + 'platform' => $originalData['platform'] ?? 'unknown', + 'platform_id' => $originalData['meta']['platform_id'] ?? null, + 'company_id' => $originalData['meta']['company_id'] ?? null, + 'store_id' => $originalData['meta']['store_id'] ?? null, + 'data_type' => $originalData['data_type'] ?? 'unknown', + 'failed_at' => date('c'), + 'retry_count' => $this->getRetryCount($message), + ], + ]; + + // 使用 errors.exchange 发送(不是默认 exchange) + $producer = ApplicationContext::getContainer()->get(Producer::class); + $producer->produce( + new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage { + protected string $exchange = 'errors.exchange'; // ✅ 通用错误交换机 + protected string|array $routingKey = 'error'; + protected string $poolName = 'default_consumer'; + protected array $properties = [ + 'delivery_mode' => 2, // 持久化 + ]; + + public function __construct(array $data) { + $this->payload = $data; + } + } + ); +} +``` + +--- + +## 时序图:完整的重试流程 + +```mermaid +sequenceDiagram + participant P as Producer + participant MQ as orders.queue + participant C as OrderConsumer + participant DLX as dlx.orders + participant RQ as orders.retry.queue + participant ME as main.exchange + participant EX as errors.exchange + participant EQ as errors.queue + participant DB as Database + + Note over P,MQ: 1. 首次消息 + P->>MQ: 发送订单消息 + MQ->>C: 消费消息 (count=0) + C->>C: getRetryCount() → 0 + C->>DB: 尝试保存数据 + DB-->>C: 失败 (异常) + C->>C: count(0) < MAX(3) + C->>MQ: NACK(requeue=false) + MQ->>DLX: 死信到 DLX + Note over DLX: x-death count=1 + DLX->>RQ: routing_key: retry + + Note over RQ: 2. 第1次重试 + RQ->>RQ: 等待 TTL 5秒 + RQ->>ME: TTL到期,自动死信 + ME->>MQ: routing_key: order.retry + MQ->>C: 消费消息 (count=1) + C->>C: getRetryCount() → 1 + C->>DB: 尝试保存数据 + DB-->>C: 失败 (异常) + C->>C: count(1) < MAX(3) + C->>MQ: NACK(requeue=false) + MQ->>DLX: 死信到 DLX + Note over DLX: x-death count=2 + DLX->>RQ: routing_key: retry + + Note over RQ: 3. 第2次重试 + RQ->>RQ: 等待 TTL 5秒 + RQ->>ME: TTL到期,自动死信 + ME->>MQ: routing_key: order.retry + MQ->>C: 消费消息 (count=2) + C->>C: getRetryCount() → 2 + C->>DB: 尝试保存数据 + DB-->>C: 失败 (异常) + C->>C: count(2) < MAX(3) + C->>MQ: NACK(requeue=false) + MQ->>DLX: 死信到 DLX + Note over DLX: x-death count=3 + DLX->>RQ: routing_key: retry + + Note over RQ,EQ: 4. 超过重试次数 + RQ->>RQ: 等待 TTL 5秒 + RQ->>ME: TTL到期,自动死信 + ME->>MQ: routing_key: order.retry + MQ->>C: 消费消息 (count=3) + C->>C: getRetryCount() → 3 + C->>DB: 尝试保存数据 + DB-->>C: 失败 (异常) + C->>C: count(3) >= MAX(3) ✓ + C->>EX: sendToErrorQueue() + EX->>EQ: 消息进入错误队列 + C->>MQ: ACK (防止再次重试) + Note over EQ: 等待人工处理 +``` + +--- + +## 配置参数 + +### 环境变量 (.env) + +```bash +# 最大重试次数(默认3次) +AMQP_MAX_RETRIES=3 + +# 调试延迟(秒),便于在 mq:status 中观察消息流转 +# 生产环境设置为 0,开发环境可设置为 2 +AMQP_CONSUMER_DEBUG_DELAY=0 +``` + +### Consumer 配置 + +```php +#[Consumer( + exchange: "main.exchange", + routingKey: "order.#", + queue: "orders.queue", + pool: "default_consumer", + nums: 1, + enable: true +)] +class OrderConsumer extends ConsumerMessage +{ + // ⚠️ 关键:必须设置为 false,否则会无限循环重试 + protected bool $requeue = false; + + protected ?array $qos = [ + 'prefetch_size' => 0, + 'prefetch_count' => 1, // 开发环境建议设置为1,生产环境可设置为10-100 + 'global' => false, + ]; +} +``` + +### 队列配置 (rabbitmq.sh) + +**orders.queue**: +- `x-dead-letter-exchange`: `dlx.orders` - 失败时发送到的 DLX +- `x-dead-letter-routing-key`: `retry` - DLX 路由键 +- `x-message-ttl`: 86400000 (24小时) - 消息最长存活时间 + +**orders.retry.queue**: +- `x-message-ttl`: 5000 (5秒) - 重试延迟时间 +- `x-dead-letter-exchange`: `main.exchange` - TTL到期后回流的 exchange +- `x-dead-letter-routing-key`: `order.retry` - 回流时的 routing key + +**errors.queue**: +- `x-message-ttl`: 604800000 (7天) - 错误消息保留时间 + +--- + +## 监控和调试 + +### 查看队列状态 + +```bash +php bin/hyperf.php app:mq:status +``` + +输出示例: +``` +=== Business Queues === ++-----------+---------+----------+---------+-----------+ +| Metric | orders | products | refunds | inventory | ++-----------+---------+----------+---------+-----------+ +| Messages | 0 | 0 | 0 | 0 | +| Consumers | 1 | 0 | 0 | 0 | +| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty | ++-----------+---------+----------+---------+-----------+ + +=== Dead Letter Queues (Retry Queues) === ++-----------+--------------+----------------+---------------+-----------------+ +| Metric | orders.retry | products.retry | refunds.retry | inventory.retry | ++-----------+--------------+----------------+---------------+-----------------+ +| Messages | 0 | 0 | 0 | 0 | +| Consumers | 0 | 0 | 0 | 0 | +| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty | ++-----------+--------------+----------------+---------------+-----------------+ + +=== Shared Queues === ++-----------+----------+ +| Metric | errors | ++-----------+----------+ +| Messages | 5 | +| Consumers | 0 | +| Status | ✓ Active | ++-----------+----------+ +``` + +### 开启调试模式观察流程 + +```bash +# 1. 设置调试延迟 +echo "AMQP_CONSUMER_DEBUG_DELAY=2" >> .env + +# 2. 启动消费者 +php bin/hyperf.php start + +# 3. 发送测试消息 +php bin/hyperf.php app:mq-push:tmall + +# 4. 观察日志输出 +tail -f runtime/logs/hyperf.log +``` + +你将看到: +``` +>>> No application_headers, first time processing +Retry count: 0/3 +=== Error Caught === +Error: Cannot find parser for platform 'tmall' and entity 'order' +>>> Retry not exceeded, sending to DLX (NACK) + +[5秒后] +>>> Extracted count from x-death: 1 +Retry count: 1/3 +>>> Retry not exceeded, sending to DLX (NACK) + +[5秒后] +>>> Extracted count from x-death: 2 +Retry count: 2/3 +>>> Retry not exceeded, sending to DLX (NACK) + +[5秒后] +>>> Extracted count from x-death: 3 +Retry count: 3/3 +>>> MAX RETRIES EXCEEDED! Sending to error queue... +>>> Successfully sent to error queue! +>>> Returning ACK to prevent further retries +``` + +--- + +## 常见问题排查 + +### 问题1: 消息无限循环,队列始终显示空 + +**原因**: `requeue` 没有设置为 `false` + +**解决**: +```php +protected bool $requeue = false; // ⚠️ 必须设置 +``` + +### 问题2: OutOfBoundsException - No "application_headers" + +**原因**: 首次处理的消息没有 `application_headers` 属性 + +**解决**: 使用 `has()` 检查 +```php +if (!$message->has('application_headers')) { + return 0; +} +``` + +### 问题3: 消息不进入错误队列 + +**原因**: +1. Consumer 没有 `errors.exchange` 的写权限 +2. 使用了默认 exchange (空字符串) 而非 `errors.exchange` + +**解决**: +1. 确保 rabbitmq.sh 中 consumer 权限包含 `errors.exchange` +2. 使用 `errors.exchange` 而非空 exchange: +```php +protected string $exchange = 'errors.exchange'; // ✅ 正确 +protected string $exchange = ''; // ❌ 错误 +``` + +### 问题4: 重试次数始终是 0 + +**原因**: +1. 重试队列没有正确配置 `x-dead-letter-routing-key` +2. 消息没有回流到主队列 + +**解决**: 确保 rabbitmq.sh 中重试队列配置: +```bash +--arguments '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"order.retry"}' +``` + +--- + +## 相关文档 + +- [RabbitMQ.md](RabbitMQ.md) - RabbitMQ 架构和配置详解 +- [HyperfAmqp.md](HyperfAmqp.md) - Hyperf AMQP 使用指南和重试机制详解 +- [EntityParseFactory完整使用指南.md](EntityParseFactory完整使用指南.md) - 实体解析工厂使用 + + + +## 注意事项 + +EntityParse 为数据处理的核心业务,会影响后续连续聚合物化视图的结果 + +backend/app/Platform/[Platform]/EntityParse/Order.php +backend/app/Platform/Tmall/EntityParse/Order.php \ No newline at end of file