Files
datahub/docs/OrderConsumer业务流程.md
T
2026-02-27 16:45:16 +08:00

511 lines
14 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# OrderConsumer 业务流程图
## 概览
OrderConsumer 是订单消息的消费者,负责从 `orders.queue` 接收消息,解析并持久化订单数据到数据库。它实现了自动重试机制,失败的消息会自动重试,超过重试次数的消息会被发送到错误队列等待人工处理。
---
## 完整流程图
```mermaid
flowchart TD
Start([消息到达 orders.queue]) --> CheckDebug{是否开启调试延迟?}
CheckDebug -->|是| Sleep[休眠 N 秒]
CheckDebug -->|否| GetRetry[获取重试次数]
Sleep --> GetRetry
GetRetry --> ParseRetry{检查 application_headers}
ParseRetry -->|不存在| FirstTime[首次处理 retryCount=0]
ParseRetry -->|存在| ReadXDeath[读取 x-death count]
ReadXDeath --> RetryCount[retryCount = count]
FirstTime --> LogRetry[记录重试次数]
RetryCount --> LogRetry
LogRetry --> TryBlock[开始处理消息]
TryBlock --> CreateParser[创建 Parser]
CreateParser --> ExtractMeta[提取 metadata]
ExtractMeta --> EntityMatch[匹配实体对象]
EntityMatch --> EntityMap[转换 raw_data]
EntityMap --> BeginTx[开始事务]
BeginTx --> LoopSave[保存实体]
LoopSave --> Commit[提交事务]
Commit --> ReturnACK[返回 ACK]
ReturnACK --> End([处理完成])
TryBlock -->|异常| CatchError[捕获异常]
CatchError --> Rollback[回滚事务]
Rollback --> LogError[记录错误]
LogError --> CheckMaxRetry{超过重试次数?}
CheckMaxRetry -->|是| ExceededLog[超过重试次数]
CheckMaxRetry -->|否| NotExceededLog[未超过次数]
ExceededLog --> SendError[发送到错误队列]
SendError --> BuildErrorMsg[构建错误消息]
BuildErrorMsg --> ProduceError[发送到 errors.exchange]
ProduceError --> TrySend{发送成功?}
TrySend -->|成功| SuccessLog[记录成功日志]
TrySend -->|失败| FailLog[记录失败日志]
SuccessLog --> AckAfterError[返回 ACK]
FailLog --> AckAfterError
AckAfterError --> EndError([已发送到错误队列])
NotExceededLog --> ReturnNACK[返回 NACK]
ReturnNACK --> ToDLX([进入 DLX])
ToDLX --> ToRetryQueue[进入重试队列]
ToRetryQueue --> WaitTTL[等待 5 秒]
WaitTTL --> BackToMain[TTL到期回流]
BackToMain --> BackToQueue[回到 orders.queue]
BackToQueue --> Start
style Start fill:#e1f5e1
style End fill:#e1f5e1
style EndError fill:#ffe1e1
style ReturnACK fill:#90EE90
style AckAfterError fill:#FFB6C1
style ReturnNACK fill:#FFD700
style SendError fill:#FF6B6B
style Commit fill:#4CAF50
style Rollback fill:#F44336
```
---
## 消息流转状态图
```mermaid
stateDiagram-v2
[*] --> OrdersQueue: 新消息到达
OrdersQueue --> Processing: Consumer 消费
Processing --> Success: 处理成功
Processing --> FirstFailure: 首次失败 count=0
Processing --> SecondFailure: 第2次失败 count=1
Processing --> ThirdFailure: 第3次失败 count=2
Processing --> MaxRetryExceeded: 超过重试次数
Success --> [*]: ACK
FirstFailure --> DLX: NACK requeue=false
SecondFailure --> DLX: NACK requeue=false
ThirdFailure --> DLX: NACK requeue=false
DLX --> RetryQueue: routing_key retry
RetryQueue --> RetryQueue: 等待 TTL 5秒
RetryQueue --> MainExchange: TTL到期自动死信
MainExchange --> OrdersQueue: routing_key order.retry
MaxRetryExceeded --> ErrorsExchange: sendToErrorQueue
ErrorsExchange --> ErrorsQueue: routing_key error
ErrorsQueue --> [*]: 等待人工处理
```
---
## 重试计数机制
```mermaid
graph LR
M1[消息 首次处理] --> H1{有 headers?}
H1 -->|否| C0[count = 0]
C0 -->|NACK| M2[消息 第1次重试]
M2 --> H2{有 headers?}
H2 -->|是| XD2[读取 x-death]
XD2 --> C1[count = 1]
C1 -->|NACK| M3[消息 第2次重试]
M3 --> H3{有 headers?}
H3 -->|是| XD3[读取 x-death]
XD3 --> C2[count = 2]
C2 -->|NACK| M4[消息 第3次重试]
M4 --> H4{有 headers?}
H4 -->|是| XD4[读取 x-death]
XD4 --> C3[count = 3]
C3 --> ERROR[发送到错误队列]
style C0 fill:#90EE90
style C1 fill:#FFD700
style C2 fill:#FFA500
style C3 fill:#FF6B6B
style ERROR fill:#DC143C,color:#fff
```
---
## 关键代码逻辑
### 1. getRetryCount() - 获取重试次数
```php
protected function getRetryCount(AMQPMessage $message): int
{
// ⚠️ 关键:首次消息没有 application_headers,必须先检查
if (!$message->has('application_headers')) {
return 0; // 首次处理
}
$headers = $message->get('application_headers');
if (!$headers) {
return 0;
}
$headerData = $headers->getNativeData();
$xDeath = $headerData['x-death'] ?? [];
if (empty($xDeath)) {
return 0;
}
// RabbitMQ 自动维护的重试次数
return $xDeath[0]['count'] ?? 0;
}
```
**x-death 结构示例**:
```php
[
'x-death' => [
0 => [
'count' => 2, // 重试次数
'reason' => 'rejected', // 死信原因
'queue' => 'orders.queue', // 原队列
'time' => 1701234567, // 时间戳
'exchange' => 'dlx.orders', // DLX 名称
'routing-keys' => ['retry']
]
]
]
```
### 2. sendToErrorQueue() - 发送到错误队列
```php
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
{
$originalData = json_decode($message->getBody(), true);
// 构建详细的错误消息
$errorMessage = [
'error_id' => uniqid('err_', true),
'original_message' => $originalData,
'error' => [
'type' => get_class($error),
'message' => $error->getMessage(),
'trace' => $error->getTraceAsString(),
'timestamp' => date('c'),
],
'metadata' => [
'platform' => $originalData['platform'] ?? 'unknown',
'platform_id' => $originalData['meta']['platform_id'] ?? null,
'company_id' => $originalData['meta']['company_id'] ?? null,
'store_id' => $originalData['meta']['store_id'] ?? null,
'data_type' => $originalData['data_type'] ?? 'unknown',
'failed_at' => date('c'),
'retry_count' => $this->getRetryCount($message),
],
];
// 使用 errors.exchange 发送(不是默认 exchange
$producer = ApplicationContext::getContainer()->get(Producer::class);
$producer->produce(
new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage {
protected string $exchange = 'errors.exchange'; // ✅ 通用错误交换机
protected string|array $routingKey = 'error';
protected string $poolName = 'default_consumer';
protected array $properties = [
'delivery_mode' => 2, // 持久化
];
public function __construct(array $data) {
$this->payload = $data;
}
}
);
}
```
---
## 时序图:完整的重试流程
```mermaid
sequenceDiagram
participant P as Producer
participant MQ as orders.queue
participant C as OrderConsumer
participant DLX as dlx.orders
participant RQ as orders.retry.queue
participant ME as main.exchange
participant EX as errors.exchange
participant EQ as errors.queue
participant DB as Database
Note over P,MQ: 1. 首次消息
P->>MQ: 发送订单消息
MQ->>C: 消费消息 (count=0)
C->>C: getRetryCount() → 0
C->>DB: 尝试保存数据
DB-->>C: 失败 (异常)
C->>C: count(0) < MAX(3)
C->>MQ: NACK(requeue=false)
MQ->>DLX: 死信到 DLX
Note over DLX: x-death count=1
DLX->>RQ: routing_key: retry
Note over RQ: 2. 第1次重试
RQ->>RQ: 等待 TTL 5秒
RQ->>ME: TTL到期,自动死信
ME->>MQ: routing_key: order.retry
MQ->>C: 消费消息 (count=1)
C->>C: getRetryCount() → 1
C->>DB: 尝试保存数据
DB-->>C: 失败 (异常)
C->>C: count(1) < MAX(3)
C->>MQ: NACK(requeue=false)
MQ->>DLX: 死信到 DLX
Note over DLX: x-death count=2
DLX->>RQ: routing_key: retry
Note over RQ: 3. 第2次重试
RQ->>RQ: 等待 TTL 5秒
RQ->>ME: TTL到期,自动死信
ME->>MQ: routing_key: order.retry
MQ->>C: 消费消息 (count=2)
C->>C: getRetryCount() → 2
C->>DB: 尝试保存数据
DB-->>C: 失败 (异常)
C->>C: count(2) < MAX(3)
C->>MQ: NACK(requeue=false)
MQ->>DLX: 死信到 DLX
Note over DLX: x-death count=3
DLX->>RQ: routing_key: retry
Note over RQ,EQ: 4. 超过重试次数
RQ->>RQ: 等待 TTL 5秒
RQ->>ME: TTL到期,自动死信
ME->>MQ: routing_key: order.retry
MQ->>C: 消费消息 (count=3)
C->>C: getRetryCount() → 3
C->>DB: 尝试保存数据
DB-->>C: 失败 (异常)
C->>C: count(3) >= MAX(3) ✓
C->>EX: sendToErrorQueue()
EX->>EQ: 消息进入错误队列
C->>MQ: ACK (防止再次重试)
Note over EQ: 等待人工处理
```
---
## 配置参数
### 环境变量 (.env)
```bash
# 最大重试次数(默认3次)
AMQP_MAX_RETRIES=3
# 调试延迟(秒),便于在 mq:status 中观察消息流转
# 生产环境设置为 0,开发环境可设置为 2
AMQP_CONSUMER_DEBUG_DELAY=0
```
### Consumer 配置
```php
#[Consumer(
exchange: "main.exchange",
routingKey: "order.#",
queue: "orders.queue",
pool: "default_consumer",
nums: 1,
enable: true
)]
class OrderConsumer extends ConsumerMessage
{
// ⚠️ 关键:必须设置为 false,否则会无限循环重试
protected bool $requeue = false;
protected ?array $qos = [
'prefetch_size' => 0,
'prefetch_count' => 1, // 开发环境建议设置为1,生产环境可设置为10-100
'global' => false,
];
}
```
### 队列配置 (rabbitmq.sh)
**orders.queue**:
- `x-dead-letter-exchange`: `dlx.orders` - 失败时发送到的 DLX
- `x-dead-letter-routing-key`: `retry` - DLX 路由键
- `x-message-ttl`: 86400000 (24小时) - 消息最长存活时间
**orders.retry.queue**:
- `x-message-ttl`: 5000 (5秒) - 重试延迟时间
- `x-dead-letter-exchange`: `main.exchange` - TTL到期后回流的 exchange
- `x-dead-letter-routing-key`: `order.retry` - 回流时的 routing key
**errors.queue**:
- `x-message-ttl`: 604800000 (7天) - 错误消息保留时间
---
## 监控和调试
### 查看队列状态
```bash
php bin/hyperf.php app:mq:status
```
输出示例:
```
=== Business Queues ===
+-----------+---------+----------+---------+-----------+
| Metric | orders | products | refunds | inventory |
+-----------+---------+----------+---------+-----------+
| Messages | 0 | 0 | 0 | 0 |
| Consumers | 1 | 0 | 0 | 0 |
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
+-----------+---------+----------+---------+-----------+
=== Dead Letter Queues (Retry Queues) ===
+-----------+--------------+----------------+---------------+-----------------+
| Metric | orders.retry | products.retry | refunds.retry | inventory.retry |
+-----------+--------------+----------------+---------------+-----------------+
| Messages | 0 | 0 | 0 | 0 |
| Consumers | 0 | 0 | 0 | 0 |
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
+-----------+--------------+----------------+---------------+-----------------+
=== Shared Queues ===
+-----------+----------+
| Metric | errors |
+-----------+----------+
| Messages | 5 |
| Consumers | 0 |
| Status | ✓ Active |
+-----------+----------+
```
### 开启调试模式观察流程
```bash
# 1. 设置调试延迟
echo "AMQP_CONSUMER_DEBUG_DELAY=2" >> .env
# 2. 启动消费者
php bin/hyperf.php start
# 3. 发送测试消息
php bin/hyperf.php app:mq-push:tmall
# 4. 观察日志输出
tail -f runtime/logs/hyperf.log
```
你将看到:
```
>>> No application_headers, first time processing
Retry count: 0/3
=== Error Caught ===
Error: Cannot find parser for platform 'tmall' and entity 'order'
>>> Retry not exceeded, sending to DLX (NACK)
[5秒后]
>>> Extracted count from x-death: 1
Retry count: 1/3
>>> Retry not exceeded, sending to DLX (NACK)
[5秒后]
>>> Extracted count from x-death: 2
Retry count: 2/3
>>> Retry not exceeded, sending to DLX (NACK)
[5秒后]
>>> Extracted count from x-death: 3
Retry count: 3/3
>>> MAX RETRIES EXCEEDED! Sending to error queue...
>>> Successfully sent to error queue!
>>> Returning ACK to prevent further retries
```
---
## 常见问题排查
### 问题1: 消息无限循环,队列始终显示空
**原因**: `requeue` 没有设置为 `false`
**解决**:
```php
protected bool $requeue = false; // ⚠️ 必须设置
```
### 问题2: OutOfBoundsException - No "application_headers"
**原因**: 首次处理的消息没有 `application_headers` 属性
**解决**: 使用 `has()` 检查
```php
if (!$message->has('application_headers')) {
return 0;
}
```
### 问题3: 消息不进入错误队列
**原因**:
1. Consumer 没有 `errors.exchange` 的写权限
2. 使用了默认 exchange (空字符串) 而非 `errors.exchange`
**解决**:
1. 确保 rabbitmq.sh 中 consumer 权限包含 `errors.exchange`
2. 使用 `errors.exchange` 而非空 exchange:
```php
protected string $exchange = 'errors.exchange'; // ✅ 正确
protected string $exchange = ''; // ❌ 错误
```
### 问题4: 重试次数始终是 0
**原因**:
1. 重试队列没有正确配置 `x-dead-letter-routing-key`
2. 消息没有回流到主队列
**解决**: 确保 rabbitmq.sh 中重试队列配置:
```bash
--arguments '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"order.retry"}'
```
---
## 相关文档
- [RabbitMQ.md](RabbitMQ.md) - RabbitMQ 架构和配置详解
- [HyperfAmqp.md](HyperfAmqp.md) - Hyperf AMQP 使用指南和重试机制详解
- [EntityParseFactory完整使用指南.md](EntityParseFactory完整使用指南.md) - 实体解析工厂使用
## 注意事项
EntityParse 为数据处理的核心业务,会影响后续连续聚合物化视图的结果
backend/app/Platform/[Platform]/EntityParse/Order.php
backend/app/Platform/Tmall/EntityParse/Order.php