Files

511 lines
14 KiB
Markdown
Raw Permalink Normal View History

2026-02-27 16:45:16 +08:00
# OrderConsumer 业务流程图
## 概览
OrderConsumer 是订单消息的消费者,负责从 `orders.queue` 接收消息,解析并持久化订单数据到数据库。它实现了自动重试机制,失败的消息会自动重试,超过重试次数的消息会被发送到错误队列等待人工处理。
---
## 完整流程图
```mermaid
flowchart TD
Start([消息到达 orders.queue]) --> CheckDebug{是否开启调试延迟?}
CheckDebug -->|是| Sleep[休眠 N 秒]
CheckDebug -->|否| GetRetry[获取重试次数]
Sleep --> GetRetry
GetRetry --> ParseRetry{检查 application_headers}
ParseRetry -->|不存在| FirstTime[首次处理 retryCount=0]
ParseRetry -->|存在| ReadXDeath[读取 x-death count]
ReadXDeath --> RetryCount[retryCount = count]
FirstTime --> LogRetry[记录重试次数]
RetryCount --> LogRetry
LogRetry --> TryBlock[开始处理消息]
TryBlock --> CreateParser[创建 Parser]
CreateParser --> ExtractMeta[提取 metadata]
ExtractMeta --> EntityMatch[匹配实体对象]
EntityMatch --> EntityMap[转换 raw_data]
EntityMap --> BeginTx[开始事务]
BeginTx --> LoopSave[保存实体]
LoopSave --> Commit[提交事务]
Commit --> ReturnACK[返回 ACK]
ReturnACK --> End([处理完成])
TryBlock -->|异常| CatchError[捕获异常]
CatchError --> Rollback[回滚事务]
Rollback --> LogError[记录错误]
LogError --> CheckMaxRetry{超过重试次数?}
CheckMaxRetry -->|是| ExceededLog[超过重试次数]
CheckMaxRetry -->|否| NotExceededLog[未超过次数]
ExceededLog --> SendError[发送到错误队列]
SendError --> BuildErrorMsg[构建错误消息]
BuildErrorMsg --> ProduceError[发送到 errors.exchange]
ProduceError --> TrySend{发送成功?}
TrySend -->|成功| SuccessLog[记录成功日志]
TrySend -->|失败| FailLog[记录失败日志]
SuccessLog --> AckAfterError[返回 ACK]
FailLog --> AckAfterError
AckAfterError --> EndError([已发送到错误队列])
NotExceededLog --> ReturnNACK[返回 NACK]
ReturnNACK --> ToDLX([进入 DLX])
ToDLX --> ToRetryQueue[进入重试队列]
ToRetryQueue --> WaitTTL[等待 5 秒]
WaitTTL --> BackToMain[TTL到期回流]
BackToMain --> BackToQueue[回到 orders.queue]
BackToQueue --> Start
style Start fill:#e1f5e1
style End fill:#e1f5e1
style EndError fill:#ffe1e1
style ReturnACK fill:#90EE90
style AckAfterError fill:#FFB6C1
style ReturnNACK fill:#FFD700
style SendError fill:#FF6B6B
style Commit fill:#4CAF50
style Rollback fill:#F44336
```
---
## 消息流转状态图
```mermaid
stateDiagram-v2
[*] --> OrdersQueue: 新消息到达
OrdersQueue --> Processing: Consumer 消费
Processing --> Success: 处理成功
Processing --> FirstFailure: 首次失败 count=0
Processing --> SecondFailure: 第2次失败 count=1
Processing --> ThirdFailure: 第3次失败 count=2
Processing --> MaxRetryExceeded: 超过重试次数
Success --> [*]: ACK
FirstFailure --> DLX: NACK requeue=false
SecondFailure --> DLX: NACK requeue=false
ThirdFailure --> DLX: NACK requeue=false
DLX --> RetryQueue: routing_key retry
RetryQueue --> RetryQueue: 等待 TTL 5秒
RetryQueue --> MainExchange: TTL到期自动死信
MainExchange --> OrdersQueue: routing_key order.retry
MaxRetryExceeded --> ErrorsExchange: sendToErrorQueue
ErrorsExchange --> ErrorsQueue: routing_key error
ErrorsQueue --> [*]: 等待人工处理
```
---
## 重试计数机制
```mermaid
graph LR
M1[消息 首次处理] --> H1{有 headers?}
H1 -->|否| C0[count = 0]
C0 -->|NACK| M2[消息 第1次重试]
M2 --> H2{有 headers?}
H2 -->|是| XD2[读取 x-death]
XD2 --> C1[count = 1]
C1 -->|NACK| M3[消息 第2次重试]
M3 --> H3{有 headers?}
H3 -->|是| XD3[读取 x-death]
XD3 --> C2[count = 2]
C2 -->|NACK| M4[消息 第3次重试]
M4 --> H4{有 headers?}
H4 -->|是| XD4[读取 x-death]
XD4 --> C3[count = 3]
C3 --> ERROR[发送到错误队列]
style C0 fill:#90EE90
style C1 fill:#FFD700
style C2 fill:#FFA500
style C3 fill:#FF6B6B
style ERROR fill:#DC143C,color:#fff
```
---
## 关键代码逻辑
### 1. getRetryCount() - 获取重试次数
```php
protected function getRetryCount(AMQPMessage $message): int
{
// ⚠️ 关键:首次消息没有 application_headers,必须先检查
if (!$message->has('application_headers')) {
return 0; // 首次处理
}
$headers = $message->get('application_headers');
if (!$headers) {
return 0;
}
$headerData = $headers->getNativeData();
$xDeath = $headerData['x-death'] ?? [];
if (empty($xDeath)) {
return 0;
}
// RabbitMQ 自动维护的重试次数
return $xDeath[0]['count'] ?? 0;
}
```
**x-death 结构示例**:
```php
[
'x-death' => [
0 => [
'count' => 2, // 重试次数
'reason' => 'rejected', // 死信原因
'queue' => 'orders.queue', // 原队列
'time' => 1701234567, // 时间戳
'exchange' => 'dlx.orders', // DLX 名称
'routing-keys' => ['retry']
]
]
]
```
### 2. sendToErrorQueue() - 发送到错误队列
```php
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
{
$originalData = json_decode($message->getBody(), true);
// 构建详细的错误消息
$errorMessage = [
'error_id' => uniqid('err_', true),
'original_message' => $originalData,
'error' => [
'type' => get_class($error),
'message' => $error->getMessage(),
'trace' => $error->getTraceAsString(),
'timestamp' => date('c'),
],
'metadata' => [
'platform' => $originalData['platform'] ?? 'unknown',
'platform_id' => $originalData['meta']['platform_id'] ?? null,
'company_id' => $originalData['meta']['company_id'] ?? null,
'store_id' => $originalData['meta']['store_id'] ?? null,
'data_type' => $originalData['data_type'] ?? 'unknown',
'failed_at' => date('c'),
'retry_count' => $this->getRetryCount($message),
],
];
// 使用 errors.exchange 发送(不是默认 exchange
$producer = ApplicationContext::getContainer()->get(Producer::class);
$producer->produce(
new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage {
protected string $exchange = 'errors.exchange'; // ✅ 通用错误交换机
protected string|array $routingKey = 'error';
protected string $poolName = 'default_consumer';
protected array $properties = [
'delivery_mode' => 2, // 持久化
];
public function __construct(array $data) {
$this->payload = $data;
}
}
);
}
```
---
## 时序图:完整的重试流程
```mermaid
sequenceDiagram
participant P as Producer
participant MQ as orders.queue
participant C as OrderConsumer
participant DLX as dlx.orders
participant RQ as orders.retry.queue
participant ME as main.exchange
participant EX as errors.exchange
participant EQ as errors.queue
participant DB as Database
Note over P,MQ: 1. 首次消息
P->>MQ: 发送订单消息
MQ->>C: 消费消息 (count=0)
C->>C: getRetryCount() → 0
C->>DB: 尝试保存数据
DB-->>C: 失败 (异常)
C->>C: count(0) < MAX(3)
C->>MQ: NACK(requeue=false)
MQ->>DLX: 死信到 DLX
Note over DLX: x-death count=1
DLX->>RQ: routing_key: retry
Note over RQ: 2. 第1次重试
RQ->>RQ: 等待 TTL 5秒
RQ->>ME: TTL到期,自动死信
ME->>MQ: routing_key: order.retry
MQ->>C: 消费消息 (count=1)
C->>C: getRetryCount() → 1
C->>DB: 尝试保存数据
DB-->>C: 失败 (异常)
C->>C: count(1) < MAX(3)
C->>MQ: NACK(requeue=false)
MQ->>DLX: 死信到 DLX
Note over DLX: x-death count=2
DLX->>RQ: routing_key: retry
Note over RQ: 3. 第2次重试
RQ->>RQ: 等待 TTL 5秒
RQ->>ME: TTL到期,自动死信
ME->>MQ: routing_key: order.retry
MQ->>C: 消费消息 (count=2)
C->>C: getRetryCount() → 2
C->>DB: 尝试保存数据
DB-->>C: 失败 (异常)
C->>C: count(2) < MAX(3)
C->>MQ: NACK(requeue=false)
MQ->>DLX: 死信到 DLX
Note over DLX: x-death count=3
DLX->>RQ: routing_key: retry
Note over RQ,EQ: 4. 超过重试次数
RQ->>RQ: 等待 TTL 5秒
RQ->>ME: TTL到期,自动死信
ME->>MQ: routing_key: order.retry
MQ->>C: 消费消息 (count=3)
C->>C: getRetryCount() → 3
C->>DB: 尝试保存数据
DB-->>C: 失败 (异常)
C->>C: count(3) >= MAX(3) ✓
C->>EX: sendToErrorQueue()
EX->>EQ: 消息进入错误队列
C->>MQ: ACK (防止再次重试)
Note over EQ: 等待人工处理
```
---
## 配置参数
### 环境变量 (.env)
```bash
# 最大重试次数(默认3次)
AMQP_MAX_RETRIES=3
# 调试延迟(秒),便于在 mq:status 中观察消息流转
# 生产环境设置为 0,开发环境可设置为 2
AMQP_CONSUMER_DEBUG_DELAY=0
```
### Consumer 配置
```php
#[Consumer(
exchange: "main.exchange",
routingKey: "order.#",
queue: "orders.queue",
pool: "default_consumer",
nums: 1,
enable: true
)]
class OrderConsumer extends ConsumerMessage
{
// ⚠️ 关键:必须设置为 false,否则会无限循环重试
protected bool $requeue = false;
protected ?array $qos = [
'prefetch_size' => 0,
'prefetch_count' => 1, // 开发环境建议设置为1,生产环境可设置为10-100
'global' => false,
];
}
```
### 队列配置 (rabbitmq.sh)
**orders.queue**:
- `x-dead-letter-exchange`: `dlx.orders` - 失败时发送到的 DLX
- `x-dead-letter-routing-key`: `retry` - DLX 路由键
- `x-message-ttl`: 86400000 (24小时) - 消息最长存活时间
**orders.retry.queue**:
- `x-message-ttl`: 5000 (5秒) - 重试延迟时间
- `x-dead-letter-exchange`: `main.exchange` - TTL到期后回流的 exchange
- `x-dead-letter-routing-key`: `order.retry` - 回流时的 routing key
**errors.queue**:
- `x-message-ttl`: 604800000 (7天) - 错误消息保留时间
---
## 监控和调试
### 查看队列状态
```bash
php bin/hyperf.php app:mq:status
```
输出示例:
```
=== Business Queues ===
+-----------+---------+----------+---------+-----------+
| Metric | orders | products | refunds | inventory |
+-----------+---------+----------+---------+-----------+
| Messages | 0 | 0 | 0 | 0 |
| Consumers | 1 | 0 | 0 | 0 |
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
+-----------+---------+----------+---------+-----------+
=== Dead Letter Queues (Retry Queues) ===
+-----------+--------------+----------------+---------------+-----------------+
| Metric | orders.retry | products.retry | refunds.retry | inventory.retry |
+-----------+--------------+----------------+---------------+-----------------+
| Messages | 0 | 0 | 0 | 0 |
| Consumers | 0 | 0 | 0 | 0 |
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
+-----------+--------------+----------------+---------------+-----------------+
=== Shared Queues ===
+-----------+----------+
| Metric | errors |
+-----------+----------+
| Messages | 5 |
| Consumers | 0 |
| Status | ✓ Active |
+-----------+----------+
```
### 开启调试模式观察流程
```bash
# 1. 设置调试延迟
echo "AMQP_CONSUMER_DEBUG_DELAY=2" >> .env
# 2. 启动消费者
php bin/hyperf.php start
# 3. 发送测试消息
php bin/hyperf.php app:mq-push:tmall
# 4. 观察日志输出
tail -f runtime/logs/hyperf.log
```
你将看到:
```
>>> No application_headers, first time processing
Retry count: 0/3
=== Error Caught ===
Error: Cannot find parser for platform 'tmall' and entity 'order'
>>> Retry not exceeded, sending to DLX (NACK)
[5秒后]
>>> Extracted count from x-death: 1
Retry count: 1/3
>>> Retry not exceeded, sending to DLX (NACK)
[5秒后]
>>> Extracted count from x-death: 2
Retry count: 2/3
>>> Retry not exceeded, sending to DLX (NACK)
[5秒后]
>>> Extracted count from x-death: 3
Retry count: 3/3
>>> MAX RETRIES EXCEEDED! Sending to error queue...
>>> Successfully sent to error queue!
>>> Returning ACK to prevent further retries
```
---
## 常见问题排查
### 问题1: 消息无限循环,队列始终显示空
**原因**: `requeue` 没有设置为 `false`
**解决**:
```php
protected bool $requeue = false; // ⚠️ 必须设置
```
### 问题2: OutOfBoundsException - No "application_headers"
**原因**: 首次处理的消息没有 `application_headers` 属性
**解决**: 使用 `has()` 检查
```php
if (!$message->has('application_headers')) {
return 0;
}
```
### 问题3: 消息不进入错误队列
**原因**:
1. Consumer 没有 `errors.exchange` 的写权限
2. 使用了默认 exchange (空字符串) 而非 `errors.exchange`
**解决**:
1. 确保 rabbitmq.sh 中 consumer 权限包含 `errors.exchange`
2. 使用 `errors.exchange` 而非空 exchange:
```php
protected string $exchange = 'errors.exchange'; // ✅ 正确
protected string $exchange = ''; // ❌ 错误
```
### 问题4: 重试次数始终是 0
**原因**:
1. 重试队列没有正确配置 `x-dead-letter-routing-key`
2. 消息没有回流到主队列
**解决**: 确保 rabbitmq.sh 中重试队列配置:
```bash
--arguments '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"order.retry"}'
```
---
## 相关文档
- [RabbitMQ.md](RabbitMQ.md) - RabbitMQ 架构和配置详解
- [HyperfAmqp.md](HyperfAmqp.md) - Hyperf AMQP 使用指南和重试机制详解
- [EntityParseFactory完整使用指南.md](EntityParseFactory完整使用指南.md) - 实体解析工厂使用
## 注意事项
EntityParse 为数据处理的核心业务,会影响后续连续聚合物化视图的结果
backend/app/Platform/[Platform]/EntityParse/Order.php
backend/app/Platform/Tmall/EntityParse/Order.php