511 lines
14 KiB
Markdown
511 lines
14 KiB
Markdown
# OrderConsumer 业务流程图
|
||
|
||
## 概览
|
||
|
||
OrderConsumer 是订单消息的消费者,负责从 `orders.queue` 接收消息,解析并持久化订单数据到数据库。它实现了自动重试机制,失败的消息会自动重试,超过重试次数的消息会被发送到错误队列等待人工处理。
|
||
|
||
---
|
||
|
||
## 完整流程图
|
||
|
||
```mermaid
|
||
flowchart TD
|
||
Start([消息到达 orders.queue]) --> CheckDebug{是否开启调试延迟?}
|
||
CheckDebug -->|是| Sleep[休眠 N 秒]
|
||
CheckDebug -->|否| GetRetry[获取重试次数]
|
||
Sleep --> GetRetry
|
||
|
||
GetRetry --> ParseRetry{检查 application_headers}
|
||
|
||
ParseRetry -->|不存在| FirstTime[首次处理 retryCount=0]
|
||
ParseRetry -->|存在| ReadXDeath[读取 x-death count]
|
||
ReadXDeath --> RetryCount[retryCount = count]
|
||
|
||
FirstTime --> LogRetry[记录重试次数]
|
||
RetryCount --> LogRetry
|
||
|
||
LogRetry --> TryBlock[开始处理消息]
|
||
|
||
TryBlock --> CreateParser[创建 Parser]
|
||
CreateParser --> ExtractMeta[提取 metadata]
|
||
ExtractMeta --> EntityMatch[匹配实体对象]
|
||
EntityMatch --> EntityMap[转换 raw_data]
|
||
EntityMap --> BeginTx[开始事务]
|
||
|
||
BeginTx --> LoopSave[保存实体]
|
||
LoopSave --> Commit[提交事务]
|
||
Commit --> ReturnACK[返回 ACK]
|
||
ReturnACK --> End([处理完成])
|
||
|
||
TryBlock -->|异常| CatchError[捕获异常]
|
||
CatchError --> Rollback[回滚事务]
|
||
Rollback --> LogError[记录错误]
|
||
LogError --> CheckMaxRetry{超过重试次数?}
|
||
|
||
CheckMaxRetry -->|是| ExceededLog[超过重试次数]
|
||
CheckMaxRetry -->|否| NotExceededLog[未超过次数]
|
||
|
||
ExceededLog --> SendError[发送到错误队列]
|
||
SendError --> BuildErrorMsg[构建错误消息]
|
||
BuildErrorMsg --> ProduceError[发送到 errors.exchange]
|
||
ProduceError --> TrySend{发送成功?}
|
||
|
||
TrySend -->|成功| SuccessLog[记录成功日志]
|
||
TrySend -->|失败| FailLog[记录失败日志]
|
||
|
||
SuccessLog --> AckAfterError[返回 ACK]
|
||
FailLog --> AckAfterError
|
||
AckAfterError --> EndError([已发送到错误队列])
|
||
|
||
NotExceededLog --> ReturnNACK[返回 NACK]
|
||
ReturnNACK --> ToDLX([进入 DLX])
|
||
ToDLX --> ToRetryQueue[进入重试队列]
|
||
ToRetryQueue --> WaitTTL[等待 5 秒]
|
||
WaitTTL --> BackToMain[TTL到期回流]
|
||
BackToMain --> BackToQueue[回到 orders.queue]
|
||
BackToQueue --> Start
|
||
|
||
style Start fill:#e1f5e1
|
||
style End fill:#e1f5e1
|
||
style EndError fill:#ffe1e1
|
||
style ReturnACK fill:#90EE90
|
||
style AckAfterError fill:#FFB6C1
|
||
style ReturnNACK fill:#FFD700
|
||
style SendError fill:#FF6B6B
|
||
style Commit fill:#4CAF50
|
||
style Rollback fill:#F44336
|
||
```
|
||
|
||
---
|
||
|
||
## 消息流转状态图
|
||
|
||
```mermaid
|
||
stateDiagram-v2
|
||
[*] --> OrdersQueue: 新消息到达
|
||
|
||
OrdersQueue --> Processing: Consumer 消费
|
||
|
||
Processing --> Success: 处理成功
|
||
Processing --> FirstFailure: 首次失败 count=0
|
||
Processing --> SecondFailure: 第2次失败 count=1
|
||
Processing --> ThirdFailure: 第3次失败 count=2
|
||
Processing --> MaxRetryExceeded: 超过重试次数
|
||
|
||
Success --> [*]: ACK
|
||
|
||
FirstFailure --> DLX: NACK requeue=false
|
||
SecondFailure --> DLX: NACK requeue=false
|
||
ThirdFailure --> DLX: NACK requeue=false
|
||
|
||
DLX --> RetryQueue: routing_key retry
|
||
|
||
RetryQueue --> RetryQueue: 等待 TTL 5秒
|
||
RetryQueue --> MainExchange: TTL到期自动死信
|
||
|
||
MainExchange --> OrdersQueue: routing_key order.retry
|
||
|
||
MaxRetryExceeded --> ErrorsExchange: sendToErrorQueue
|
||
ErrorsExchange --> ErrorsQueue: routing_key error
|
||
ErrorsQueue --> [*]: 等待人工处理
|
||
```
|
||
|
||
---
|
||
|
||
## 重试计数机制
|
||
|
||
```mermaid
|
||
graph LR
|
||
M1[消息 首次处理] --> H1{有 headers?}
|
||
H1 -->|否| C0[count = 0]
|
||
|
||
C0 -->|NACK| M2[消息 第1次重试]
|
||
M2 --> H2{有 headers?}
|
||
H2 -->|是| XD2[读取 x-death]
|
||
XD2 --> C1[count = 1]
|
||
|
||
C1 -->|NACK| M3[消息 第2次重试]
|
||
M3 --> H3{有 headers?}
|
||
H3 -->|是| XD3[读取 x-death]
|
||
XD3 --> C2[count = 2]
|
||
|
||
C2 -->|NACK| M4[消息 第3次重试]
|
||
M4 --> H4{有 headers?}
|
||
H4 -->|是| XD4[读取 x-death]
|
||
XD4 --> C3[count = 3]
|
||
|
||
C3 --> ERROR[发送到错误队列]
|
||
|
||
style C0 fill:#90EE90
|
||
style C1 fill:#FFD700
|
||
style C2 fill:#FFA500
|
||
style C3 fill:#FF6B6B
|
||
style ERROR fill:#DC143C,color:#fff
|
||
```
|
||
|
||
---
|
||
|
||
## 关键代码逻辑
|
||
|
||
### 1. getRetryCount() - 获取重试次数
|
||
|
||
```php
|
||
protected function getRetryCount(AMQPMessage $message): int
|
||
{
|
||
// ⚠️ 关键:首次消息没有 application_headers,必须先检查
|
||
if (!$message->has('application_headers')) {
|
||
return 0; // 首次处理
|
||
}
|
||
|
||
$headers = $message->get('application_headers');
|
||
if (!$headers) {
|
||
return 0;
|
||
}
|
||
|
||
$headerData = $headers->getNativeData();
|
||
$xDeath = $headerData['x-death'] ?? [];
|
||
|
||
if (empty($xDeath)) {
|
||
return 0;
|
||
}
|
||
|
||
// RabbitMQ 自动维护的重试次数
|
||
return $xDeath[0]['count'] ?? 0;
|
||
}
|
||
```
|
||
|
||
**x-death 结构示例**:
|
||
```php
|
||
[
|
||
'x-death' => [
|
||
0 => [
|
||
'count' => 2, // 重试次数
|
||
'reason' => 'rejected', // 死信原因
|
||
'queue' => 'orders.queue', // 原队列
|
||
'time' => 1701234567, // 时间戳
|
||
'exchange' => 'dlx.orders', // DLX 名称
|
||
'routing-keys' => ['retry']
|
||
]
|
||
]
|
||
]
|
||
```
|
||
|
||
### 2. sendToErrorQueue() - 发送到错误队列
|
||
|
||
```php
|
||
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
|
||
{
|
||
$originalData = json_decode($message->getBody(), true);
|
||
|
||
// 构建详细的错误消息
|
||
$errorMessage = [
|
||
'error_id' => uniqid('err_', true),
|
||
'original_message' => $originalData,
|
||
'error' => [
|
||
'type' => get_class($error),
|
||
'message' => $error->getMessage(),
|
||
'trace' => $error->getTraceAsString(),
|
||
'timestamp' => date('c'),
|
||
],
|
||
'metadata' => [
|
||
'platform' => $originalData['platform'] ?? 'unknown',
|
||
'platform_id' => $originalData['meta']['platform_id'] ?? null,
|
||
'company_id' => $originalData['meta']['company_id'] ?? null,
|
||
'store_id' => $originalData['meta']['store_id'] ?? null,
|
||
'data_type' => $originalData['data_type'] ?? 'unknown',
|
||
'failed_at' => date('c'),
|
||
'retry_count' => $this->getRetryCount($message),
|
||
],
|
||
];
|
||
|
||
// 使用 errors.exchange 发送(不是默认 exchange)
|
||
$producer = ApplicationContext::getContainer()->get(Producer::class);
|
||
$producer->produce(
|
||
new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage {
|
||
protected string $exchange = 'errors.exchange'; // ✅ 通用错误交换机
|
||
protected string|array $routingKey = 'error';
|
||
protected string $poolName = 'default_consumer';
|
||
protected array $properties = [
|
||
'delivery_mode' => 2, // 持久化
|
||
];
|
||
|
||
public function __construct(array $data) {
|
||
$this->payload = $data;
|
||
}
|
||
}
|
||
);
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 时序图:完整的重试流程
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
participant P as Producer
|
||
participant MQ as orders.queue
|
||
participant C as OrderConsumer
|
||
participant DLX as dlx.orders
|
||
participant RQ as orders.retry.queue
|
||
participant ME as main.exchange
|
||
participant EX as errors.exchange
|
||
participant EQ as errors.queue
|
||
participant DB as Database
|
||
|
||
Note over P,MQ: 1. 首次消息
|
||
P->>MQ: 发送订单消息
|
||
MQ->>C: 消费消息 (count=0)
|
||
C->>C: getRetryCount() → 0
|
||
C->>DB: 尝试保存数据
|
||
DB-->>C: 失败 (异常)
|
||
C->>C: count(0) < MAX(3)
|
||
C->>MQ: NACK(requeue=false)
|
||
MQ->>DLX: 死信到 DLX
|
||
Note over DLX: x-death count=1
|
||
DLX->>RQ: routing_key: retry
|
||
|
||
Note over RQ: 2. 第1次重试
|
||
RQ->>RQ: 等待 TTL 5秒
|
||
RQ->>ME: TTL到期,自动死信
|
||
ME->>MQ: routing_key: order.retry
|
||
MQ->>C: 消费消息 (count=1)
|
||
C->>C: getRetryCount() → 1
|
||
C->>DB: 尝试保存数据
|
||
DB-->>C: 失败 (异常)
|
||
C->>C: count(1) < MAX(3)
|
||
C->>MQ: NACK(requeue=false)
|
||
MQ->>DLX: 死信到 DLX
|
||
Note over DLX: x-death count=2
|
||
DLX->>RQ: routing_key: retry
|
||
|
||
Note over RQ: 3. 第2次重试
|
||
RQ->>RQ: 等待 TTL 5秒
|
||
RQ->>ME: TTL到期,自动死信
|
||
ME->>MQ: routing_key: order.retry
|
||
MQ->>C: 消费消息 (count=2)
|
||
C->>C: getRetryCount() → 2
|
||
C->>DB: 尝试保存数据
|
||
DB-->>C: 失败 (异常)
|
||
C->>C: count(2) < MAX(3)
|
||
C->>MQ: NACK(requeue=false)
|
||
MQ->>DLX: 死信到 DLX
|
||
Note over DLX: x-death count=3
|
||
DLX->>RQ: routing_key: retry
|
||
|
||
Note over RQ,EQ: 4. 超过重试次数
|
||
RQ->>RQ: 等待 TTL 5秒
|
||
RQ->>ME: TTL到期,自动死信
|
||
ME->>MQ: routing_key: order.retry
|
||
MQ->>C: 消费消息 (count=3)
|
||
C->>C: getRetryCount() → 3
|
||
C->>DB: 尝试保存数据
|
||
DB-->>C: 失败 (异常)
|
||
C->>C: count(3) >= MAX(3) ✓
|
||
C->>EX: sendToErrorQueue()
|
||
EX->>EQ: 消息进入错误队列
|
||
C->>MQ: ACK (防止再次重试)
|
||
Note over EQ: 等待人工处理
|
||
```
|
||
|
||
---
|
||
|
||
## 配置参数
|
||
|
||
### 环境变量 (.env)
|
||
|
||
```bash
|
||
# 最大重试次数(默认3次)
|
||
AMQP_MAX_RETRIES=3
|
||
|
||
# 调试延迟(秒),便于在 mq:status 中观察消息流转
|
||
# 生产环境设置为 0,开发环境可设置为 2
|
||
AMQP_CONSUMER_DEBUG_DELAY=0
|
||
```
|
||
|
||
### Consumer 配置
|
||
|
||
```php
|
||
#[Consumer(
|
||
exchange: "main.exchange",
|
||
routingKey: "order.#",
|
||
queue: "orders.queue",
|
||
pool: "default_consumer",
|
||
nums: 1,
|
||
enable: true
|
||
)]
|
||
class OrderConsumer extends ConsumerMessage
|
||
{
|
||
// ⚠️ 关键:必须设置为 false,否则会无限循环重试
|
||
protected bool $requeue = false;
|
||
|
||
protected ?array $qos = [
|
||
'prefetch_size' => 0,
|
||
'prefetch_count' => 1, // 开发环境建议设置为1,生产环境可设置为10-100
|
||
'global' => false,
|
||
];
|
||
}
|
||
```
|
||
|
||
### 队列配置 (rabbitmq.sh)
|
||
|
||
**orders.queue**:
|
||
- `x-dead-letter-exchange`: `dlx.orders` - 失败时发送到的 DLX
|
||
- `x-dead-letter-routing-key`: `retry` - DLX 路由键
|
||
- `x-message-ttl`: 86400000 (24小时) - 消息最长存活时间
|
||
|
||
**orders.retry.queue**:
|
||
- `x-message-ttl`: 5000 (5秒) - 重试延迟时间
|
||
- `x-dead-letter-exchange`: `main.exchange` - TTL到期后回流的 exchange
|
||
- `x-dead-letter-routing-key`: `order.retry` - 回流时的 routing key
|
||
|
||
**errors.queue**:
|
||
- `x-message-ttl`: 604800000 (7天) - 错误消息保留时间
|
||
|
||
---
|
||
|
||
## 监控和调试
|
||
|
||
### 查看队列状态
|
||
|
||
```bash
|
||
php bin/hyperf.php app:mq:status
|
||
```
|
||
|
||
输出示例:
|
||
```
|
||
=== Business Queues ===
|
||
+-----------+---------+----------+---------+-----------+
|
||
| Metric | orders | products | refunds | inventory |
|
||
+-----------+---------+----------+---------+-----------+
|
||
| Messages | 0 | 0 | 0 | 0 |
|
||
| Consumers | 1 | 0 | 0 | 0 |
|
||
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
|
||
+-----------+---------+----------+---------+-----------+
|
||
|
||
=== Dead Letter Queues (Retry Queues) ===
|
||
+-----------+--------------+----------------+---------------+-----------------+
|
||
| Metric | orders.retry | products.retry | refunds.retry | inventory.retry |
|
||
+-----------+--------------+----------------+---------------+-----------------+
|
||
| Messages | 0 | 0 | 0 | 0 |
|
||
| Consumers | 0 | 0 | 0 | 0 |
|
||
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
|
||
+-----------+--------------+----------------+---------------+-----------------+
|
||
|
||
=== Shared Queues ===
|
||
+-----------+----------+
|
||
| Metric | errors |
|
||
+-----------+----------+
|
||
| Messages | 5 |
|
||
| Consumers | 0 |
|
||
| Status | ✓ Active |
|
||
+-----------+----------+
|
||
```
|
||
|
||
### 开启调试模式观察流程
|
||
|
||
```bash
|
||
# 1. 设置调试延迟
|
||
echo "AMQP_CONSUMER_DEBUG_DELAY=2" >> .env
|
||
|
||
# 2. 启动消费者
|
||
php bin/hyperf.php start
|
||
|
||
# 3. 发送测试消息
|
||
php bin/hyperf.php app:mq-push:tmall
|
||
|
||
# 4. 观察日志输出
|
||
tail -f runtime/logs/hyperf.log
|
||
```
|
||
|
||
你将看到:
|
||
```
|
||
>>> No application_headers, first time processing
|
||
Retry count: 0/3
|
||
=== Error Caught ===
|
||
Error: Cannot find parser for platform 'tmall' and entity 'order'
|
||
>>> Retry not exceeded, sending to DLX (NACK)
|
||
|
||
[5秒后]
|
||
>>> Extracted count from x-death: 1
|
||
Retry count: 1/3
|
||
>>> Retry not exceeded, sending to DLX (NACK)
|
||
|
||
[5秒后]
|
||
>>> Extracted count from x-death: 2
|
||
Retry count: 2/3
|
||
>>> Retry not exceeded, sending to DLX (NACK)
|
||
|
||
[5秒后]
|
||
>>> Extracted count from x-death: 3
|
||
Retry count: 3/3
|
||
>>> MAX RETRIES EXCEEDED! Sending to error queue...
|
||
>>> Successfully sent to error queue!
|
||
>>> Returning ACK to prevent further retries
|
||
```
|
||
|
||
---
|
||
|
||
## 常见问题排查
|
||
|
||
### 问题1: 消息无限循环,队列始终显示空
|
||
|
||
**原因**: `requeue` 没有设置为 `false`
|
||
|
||
**解决**:
|
||
```php
|
||
protected bool $requeue = false; // ⚠️ 必须设置
|
||
```
|
||
|
||
### 问题2: OutOfBoundsException - No "application_headers"
|
||
|
||
**原因**: 首次处理的消息没有 `application_headers` 属性
|
||
|
||
**解决**: 使用 `has()` 检查
|
||
```php
|
||
if (!$message->has('application_headers')) {
|
||
return 0;
|
||
}
|
||
```
|
||
|
||
### 问题3: 消息不进入错误队列
|
||
|
||
**原因**:
|
||
1. Consumer 没有 `errors.exchange` 的写权限
|
||
2. 使用了默认 exchange (空字符串) 而非 `errors.exchange`
|
||
|
||
**解决**:
|
||
1. 确保 rabbitmq.sh 中 consumer 权限包含 `errors.exchange`
|
||
2. 使用 `errors.exchange` 而非空 exchange:
|
||
```php
|
||
protected string $exchange = 'errors.exchange'; // ✅ 正确
|
||
protected string $exchange = ''; // ❌ 错误
|
||
```
|
||
|
||
### 问题4: 重试次数始终是 0
|
||
|
||
**原因**:
|
||
1. 重试队列没有正确配置 `x-dead-letter-routing-key`
|
||
2. 消息没有回流到主队列
|
||
|
||
**解决**: 确保 rabbitmq.sh 中重试队列配置:
|
||
```bash
|
||
--arguments '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"order.retry"}'
|
||
```
|
||
|
||
---
|
||
|
||
## 相关文档
|
||
|
||
- [RabbitMQ.md](RabbitMQ.md) - RabbitMQ 架构和配置详解
|
||
- [HyperfAmqp.md](HyperfAmqp.md) - Hyperf AMQP 使用指南和重试机制详解
|
||
- [EntityParseFactory完整使用指南.md](EntityParseFactory完整使用指南.md) - 实体解析工厂使用
|
||
|
||
|
||
|
||
## 注意事项
|
||
|
||
EntityParse 为数据处理的核心业务,会影响后续连续聚合物化视图的结果
|
||
|
||
backend/app/Platform/[Platform]/EntityParse/Order.php
|
||
backend/app/Platform/Tmall/EntityParse/Order.php |