add order consumer business flow
This commit is contained in:
@@ -0,0 +1,511 @@
|
||||
# OrderConsumer 业务流程图
|
||||
|
||||
## 概览
|
||||
|
||||
OrderConsumer 是订单消息的消费者,负责从 `orders.queue` 接收消息,解析并持久化订单数据到数据库。它实现了自动重试机制,失败的消息会自动重试,超过重试次数的消息会被发送到错误队列等待人工处理。
|
||||
|
||||
---
|
||||
|
||||
## 完整流程图
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
Start([消息到达 orders.queue]) --> CheckDebug{是否开启调试延迟?}
|
||||
CheckDebug -->|是| Sleep[休眠 N 秒]
|
||||
CheckDebug -->|否| GetRetry[获取重试次数]
|
||||
Sleep --> GetRetry
|
||||
|
||||
GetRetry --> ParseRetry{检查 application_headers}
|
||||
|
||||
ParseRetry -->|不存在| FirstTime[首次处理 retryCount=0]
|
||||
ParseRetry -->|存在| ReadXDeath[读取 x-death count]
|
||||
ReadXDeath --> RetryCount[retryCount = count]
|
||||
|
||||
FirstTime --> LogRetry[记录重试次数]
|
||||
RetryCount --> LogRetry
|
||||
|
||||
LogRetry --> TryBlock[开始处理消息]
|
||||
|
||||
TryBlock --> CreateParser[创建 Parser]
|
||||
CreateParser --> ExtractMeta[提取 metadata]
|
||||
ExtractMeta --> EntityMatch[匹配实体对象]
|
||||
EntityMatch --> EntityMap[转换 raw_data]
|
||||
EntityMap --> BeginTx[开始事务]
|
||||
|
||||
BeginTx --> LoopSave[保存实体]
|
||||
LoopSave --> Commit[提交事务]
|
||||
Commit --> ReturnACK[返回 ACK]
|
||||
ReturnACK --> End([处理完成])
|
||||
|
||||
TryBlock -->|异常| CatchError[捕获异常]
|
||||
CatchError --> Rollback[回滚事务]
|
||||
Rollback --> LogError[记录错误]
|
||||
LogError --> CheckMaxRetry{超过重试次数?}
|
||||
|
||||
CheckMaxRetry -->|是| ExceededLog[超过重试次数]
|
||||
CheckMaxRetry -->|否| NotExceededLog[未超过次数]
|
||||
|
||||
ExceededLog --> SendError[发送到错误队列]
|
||||
SendError --> BuildErrorMsg[构建错误消息]
|
||||
BuildErrorMsg --> ProduceError[发送到 errors.exchange]
|
||||
ProduceError --> TrySend{发送成功?}
|
||||
|
||||
TrySend -->|成功| SuccessLog[记录成功日志]
|
||||
TrySend -->|失败| FailLog[记录失败日志]
|
||||
|
||||
SuccessLog --> AckAfterError[返回 ACK]
|
||||
FailLog --> AckAfterError
|
||||
AckAfterError --> EndError([已发送到错误队列])
|
||||
|
||||
NotExceededLog --> ReturnNACK[返回 NACK]
|
||||
ReturnNACK --> ToDLX([进入 DLX])
|
||||
ToDLX --> ToRetryQueue[进入重试队列]
|
||||
ToRetryQueue --> WaitTTL[等待 5 秒]
|
||||
WaitTTL --> BackToMain[TTL到期回流]
|
||||
BackToMain --> BackToQueue[回到 orders.queue]
|
||||
BackToQueue --> Start
|
||||
|
||||
style Start fill:#e1f5e1
|
||||
style End fill:#e1f5e1
|
||||
style EndError fill:#ffe1e1
|
||||
style ReturnACK fill:#90EE90
|
||||
style AckAfterError fill:#FFB6C1
|
||||
style ReturnNACK fill:#FFD700
|
||||
style SendError fill:#FF6B6B
|
||||
style Commit fill:#4CAF50
|
||||
style Rollback fill:#F44336
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 消息流转状态图
|
||||
|
||||
```mermaid
|
||||
stateDiagram-v2
|
||||
[*] --> OrdersQueue: 新消息到达
|
||||
|
||||
OrdersQueue --> Processing: Consumer 消费
|
||||
|
||||
Processing --> Success: 处理成功
|
||||
Processing --> FirstFailure: 首次失败 count=0
|
||||
Processing --> SecondFailure: 第2次失败 count=1
|
||||
Processing --> ThirdFailure: 第3次失败 count=2
|
||||
Processing --> MaxRetryExceeded: 超过重试次数
|
||||
|
||||
Success --> [*]: ACK
|
||||
|
||||
FirstFailure --> DLX: NACK requeue=false
|
||||
SecondFailure --> DLX: NACK requeue=false
|
||||
ThirdFailure --> DLX: NACK requeue=false
|
||||
|
||||
DLX --> RetryQueue: routing_key retry
|
||||
|
||||
RetryQueue --> RetryQueue: 等待 TTL 5秒
|
||||
RetryQueue --> MainExchange: TTL到期自动死信
|
||||
|
||||
MainExchange --> OrdersQueue: routing_key order.retry
|
||||
|
||||
MaxRetryExceeded --> ErrorsExchange: sendToErrorQueue
|
||||
ErrorsExchange --> ErrorsQueue: routing_key error
|
||||
ErrorsQueue --> [*]: 等待人工处理
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 重试计数机制
|
||||
|
||||
```mermaid
|
||||
graph LR
|
||||
M1[消息 首次处理] --> H1{有 headers?}
|
||||
H1 -->|否| C0[count = 0]
|
||||
|
||||
C0 -->|NACK| M2[消息 第1次重试]
|
||||
M2 --> H2{有 headers?}
|
||||
H2 -->|是| XD2[读取 x-death]
|
||||
XD2 --> C1[count = 1]
|
||||
|
||||
C1 -->|NACK| M3[消息 第2次重试]
|
||||
M3 --> H3{有 headers?}
|
||||
H3 -->|是| XD3[读取 x-death]
|
||||
XD3 --> C2[count = 2]
|
||||
|
||||
C2 -->|NACK| M4[消息 第3次重试]
|
||||
M4 --> H4{有 headers?}
|
||||
H4 -->|是| XD4[读取 x-death]
|
||||
XD4 --> C3[count = 3]
|
||||
|
||||
C3 --> ERROR[发送到错误队列]
|
||||
|
||||
style C0 fill:#90EE90
|
||||
style C1 fill:#FFD700
|
||||
style C2 fill:#FFA500
|
||||
style C3 fill:#FF6B6B
|
||||
style ERROR fill:#DC143C,color:#fff
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 关键代码逻辑
|
||||
|
||||
### 1. getRetryCount() - 获取重试次数
|
||||
|
||||
```php
|
||||
protected function getRetryCount(AMQPMessage $message): int
|
||||
{
|
||||
// ⚠️ 关键:首次消息没有 application_headers,必须先检查
|
||||
if (!$message->has('application_headers')) {
|
||||
return 0; // 首次处理
|
||||
}
|
||||
|
||||
$headers = $message->get('application_headers');
|
||||
if (!$headers) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
$headerData = $headers->getNativeData();
|
||||
$xDeath = $headerData['x-death'] ?? [];
|
||||
|
||||
if (empty($xDeath)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// RabbitMQ 自动维护的重试次数
|
||||
return $xDeath[0]['count'] ?? 0;
|
||||
}
|
||||
```
|
||||
|
||||
**x-death 结构示例**:
|
||||
```php
|
||||
[
|
||||
'x-death' => [
|
||||
0 => [
|
||||
'count' => 2, // 重试次数
|
||||
'reason' => 'rejected', // 死信原因
|
||||
'queue' => 'orders.queue', // 原队列
|
||||
'time' => 1701234567, // 时间戳
|
||||
'exchange' => 'dlx.orders', // DLX 名称
|
||||
'routing-keys' => ['retry']
|
||||
]
|
||||
]
|
||||
]
|
||||
```
|
||||
|
||||
### 2. sendToErrorQueue() - 发送到错误队列
|
||||
|
||||
```php
|
||||
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
|
||||
{
|
||||
$originalData = json_decode($message->getBody(), true);
|
||||
|
||||
// 构建详细的错误消息
|
||||
$errorMessage = [
|
||||
'error_id' => uniqid('err_', true),
|
||||
'original_message' => $originalData,
|
||||
'error' => [
|
||||
'type' => get_class($error),
|
||||
'message' => $error->getMessage(),
|
||||
'trace' => $error->getTraceAsString(),
|
||||
'timestamp' => date('c'),
|
||||
],
|
||||
'metadata' => [
|
||||
'platform' => $originalData['platform'] ?? 'unknown',
|
||||
'platform_id' => $originalData['meta']['platform_id'] ?? null,
|
||||
'company_id' => $originalData['meta']['company_id'] ?? null,
|
||||
'store_id' => $originalData['meta']['store_id'] ?? null,
|
||||
'data_type' => $originalData['data_type'] ?? 'unknown',
|
||||
'failed_at' => date('c'),
|
||||
'retry_count' => $this->getRetryCount($message),
|
||||
],
|
||||
];
|
||||
|
||||
// 使用 errors.exchange 发送(不是默认 exchange)
|
||||
$producer = ApplicationContext::getContainer()->get(Producer::class);
|
||||
$producer->produce(
|
||||
new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage {
|
||||
protected string $exchange = 'errors.exchange'; // ✅ 通用错误交换机
|
||||
protected string|array $routingKey = 'error';
|
||||
protected string $poolName = 'default_consumer';
|
||||
protected array $properties = [
|
||||
'delivery_mode' => 2, // 持久化
|
||||
];
|
||||
|
||||
public function __construct(array $data) {
|
||||
$this->payload = $data;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 时序图:完整的重试流程
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant P as Producer
|
||||
participant MQ as orders.queue
|
||||
participant C as OrderConsumer
|
||||
participant DLX as dlx.orders
|
||||
participant RQ as orders.retry.queue
|
||||
participant ME as main.exchange
|
||||
participant EX as errors.exchange
|
||||
participant EQ as errors.queue
|
||||
participant DB as Database
|
||||
|
||||
Note over P,MQ: 1. 首次消息
|
||||
P->>MQ: 发送订单消息
|
||||
MQ->>C: 消费消息 (count=0)
|
||||
C->>C: getRetryCount() → 0
|
||||
C->>DB: 尝试保存数据
|
||||
DB-->>C: 失败 (异常)
|
||||
C->>C: count(0) < MAX(3)
|
||||
C->>MQ: NACK(requeue=false)
|
||||
MQ->>DLX: 死信到 DLX
|
||||
Note over DLX: x-death count=1
|
||||
DLX->>RQ: routing_key: retry
|
||||
|
||||
Note over RQ: 2. 第1次重试
|
||||
RQ->>RQ: 等待 TTL 5秒
|
||||
RQ->>ME: TTL到期,自动死信
|
||||
ME->>MQ: routing_key: order.retry
|
||||
MQ->>C: 消费消息 (count=1)
|
||||
C->>C: getRetryCount() → 1
|
||||
C->>DB: 尝试保存数据
|
||||
DB-->>C: 失败 (异常)
|
||||
C->>C: count(1) < MAX(3)
|
||||
C->>MQ: NACK(requeue=false)
|
||||
MQ->>DLX: 死信到 DLX
|
||||
Note over DLX: x-death count=2
|
||||
DLX->>RQ: routing_key: retry
|
||||
|
||||
Note over RQ: 3. 第2次重试
|
||||
RQ->>RQ: 等待 TTL 5秒
|
||||
RQ->>ME: TTL到期,自动死信
|
||||
ME->>MQ: routing_key: order.retry
|
||||
MQ->>C: 消费消息 (count=2)
|
||||
C->>C: getRetryCount() → 2
|
||||
C->>DB: 尝试保存数据
|
||||
DB-->>C: 失败 (异常)
|
||||
C->>C: count(2) < MAX(3)
|
||||
C->>MQ: NACK(requeue=false)
|
||||
MQ->>DLX: 死信到 DLX
|
||||
Note over DLX: x-death count=3
|
||||
DLX->>RQ: routing_key: retry
|
||||
|
||||
Note over RQ,EQ: 4. 超过重试次数
|
||||
RQ->>RQ: 等待 TTL 5秒
|
||||
RQ->>ME: TTL到期,自动死信
|
||||
ME->>MQ: routing_key: order.retry
|
||||
MQ->>C: 消费消息 (count=3)
|
||||
C->>C: getRetryCount() → 3
|
||||
C->>DB: 尝试保存数据
|
||||
DB-->>C: 失败 (异常)
|
||||
C->>C: count(3) >= MAX(3) ✓
|
||||
C->>EX: sendToErrorQueue()
|
||||
EX->>EQ: 消息进入错误队列
|
||||
C->>MQ: ACK (防止再次重试)
|
||||
Note over EQ: 等待人工处理
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 配置参数
|
||||
|
||||
### 环境变量 (.env)
|
||||
|
||||
```bash
|
||||
# 最大重试次数(默认3次)
|
||||
AMQP_MAX_RETRIES=3
|
||||
|
||||
# 调试延迟(秒),便于在 mq:status 中观察消息流转
|
||||
# 生产环境设置为 0,开发环境可设置为 2
|
||||
AMQP_CONSUMER_DEBUG_DELAY=0
|
||||
```
|
||||
|
||||
### Consumer 配置
|
||||
|
||||
```php
|
||||
#[Consumer(
|
||||
exchange: "main.exchange",
|
||||
routingKey: "order.#",
|
||||
queue: "orders.queue",
|
||||
pool: "default_consumer",
|
||||
nums: 1,
|
||||
enable: true
|
||||
)]
|
||||
class OrderConsumer extends ConsumerMessage
|
||||
{
|
||||
// ⚠️ 关键:必须设置为 false,否则会无限循环重试
|
||||
protected bool $requeue = false;
|
||||
|
||||
protected ?array $qos = [
|
||||
'prefetch_size' => 0,
|
||||
'prefetch_count' => 1, // 开发环境建议设置为1,生产环境可设置为10-100
|
||||
'global' => false,
|
||||
];
|
||||
}
|
||||
```
|
||||
|
||||
### 队列配置 (rabbitmq.sh)
|
||||
|
||||
**orders.queue**:
|
||||
- `x-dead-letter-exchange`: `dlx.orders` - 失败时发送到的 DLX
|
||||
- `x-dead-letter-routing-key`: `retry` - DLX 路由键
|
||||
- `x-message-ttl`: 86400000 (24小时) - 消息最长存活时间
|
||||
|
||||
**orders.retry.queue**:
|
||||
- `x-message-ttl`: 5000 (5秒) - 重试延迟时间
|
||||
- `x-dead-letter-exchange`: `main.exchange` - TTL到期后回流的 exchange
|
||||
- `x-dead-letter-routing-key`: `order.retry` - 回流时的 routing key
|
||||
|
||||
**errors.queue**:
|
||||
- `x-message-ttl`: 604800000 (7天) - 错误消息保留时间
|
||||
|
||||
---
|
||||
|
||||
## 监控和调试
|
||||
|
||||
### 查看队列状态
|
||||
|
||||
```bash
|
||||
php bin/hyperf.php app:mq:status
|
||||
```
|
||||
|
||||
输出示例:
|
||||
```
|
||||
=== Business Queues ===
|
||||
+-----------+---------+----------+---------+-----------+
|
||||
| Metric | orders | products | refunds | inventory |
|
||||
+-----------+---------+----------+---------+-----------+
|
||||
| Messages | 0 | 0 | 0 | 0 |
|
||||
| Consumers | 1 | 0 | 0 | 0 |
|
||||
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
|
||||
+-----------+---------+----------+---------+-----------+
|
||||
|
||||
=== Dead Letter Queues (Retry Queues) ===
|
||||
+-----------+--------------+----------------+---------------+-----------------+
|
||||
| Metric | orders.retry | products.retry | refunds.retry | inventory.retry |
|
||||
+-----------+--------------+----------------+---------------+-----------------+
|
||||
| Messages | 0 | 0 | 0 | 0 |
|
||||
| Consumers | 0 | 0 | 0 | 0 |
|
||||
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
|
||||
+-----------+--------------+----------------+---------------+-----------------+
|
||||
|
||||
=== Shared Queues ===
|
||||
+-----------+----------+
|
||||
| Metric | errors |
|
||||
+-----------+----------+
|
||||
| Messages | 5 |
|
||||
| Consumers | 0 |
|
||||
| Status | ✓ Active |
|
||||
+-----------+----------+
|
||||
```
|
||||
|
||||
### 开启调试模式观察流程
|
||||
|
||||
```bash
|
||||
# 1. 设置调试延迟
|
||||
echo "AMQP_CONSUMER_DEBUG_DELAY=2" >> .env
|
||||
|
||||
# 2. 启动消费者
|
||||
php bin/hyperf.php start
|
||||
|
||||
# 3. 发送测试消息
|
||||
php bin/hyperf.php app:mq-push:tmall
|
||||
|
||||
# 4. 观察日志输出
|
||||
tail -f runtime/logs/hyperf.log
|
||||
```
|
||||
|
||||
你将看到:
|
||||
```
|
||||
>>> No application_headers, first time processing
|
||||
Retry count: 0/3
|
||||
=== Error Caught ===
|
||||
Error: Cannot find parser for platform 'tmall' and entity 'order'
|
||||
>>> Retry not exceeded, sending to DLX (NACK)
|
||||
|
||||
[5秒后]
|
||||
>>> Extracted count from x-death: 1
|
||||
Retry count: 1/3
|
||||
>>> Retry not exceeded, sending to DLX (NACK)
|
||||
|
||||
[5秒后]
|
||||
>>> Extracted count from x-death: 2
|
||||
Retry count: 2/3
|
||||
>>> Retry not exceeded, sending to DLX (NACK)
|
||||
|
||||
[5秒后]
|
||||
>>> Extracted count from x-death: 3
|
||||
Retry count: 3/3
|
||||
>>> MAX RETRIES EXCEEDED! Sending to error queue...
|
||||
>>> Successfully sent to error queue!
|
||||
>>> Returning ACK to prevent further retries
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 常见问题排查
|
||||
|
||||
### 问题1: 消息无限循环,队列始终显示空
|
||||
|
||||
**原因**: `requeue` 没有设置为 `false`
|
||||
|
||||
**解决**:
|
||||
```php
|
||||
protected bool $requeue = false; // ⚠️ 必须设置
|
||||
```
|
||||
|
||||
### 问题2: OutOfBoundsException - No "application_headers"
|
||||
|
||||
**原因**: 首次处理的消息没有 `application_headers` 属性
|
||||
|
||||
**解决**: 使用 `has()` 检查
|
||||
```php
|
||||
if (!$message->has('application_headers')) {
|
||||
return 0;
|
||||
}
|
||||
```
|
||||
|
||||
### 问题3: 消息不进入错误队列
|
||||
|
||||
**原因**:
|
||||
1. Consumer 没有 `errors.exchange` 的写权限
|
||||
2. 使用了默认 exchange (空字符串) 而非 `errors.exchange`
|
||||
|
||||
**解决**:
|
||||
1. 确保 rabbitmq.sh 中 consumer 权限包含 `errors.exchange`
|
||||
2. 使用 `errors.exchange` 而非空 exchange:
|
||||
```php
|
||||
protected string $exchange = 'errors.exchange'; // ✅ 正确
|
||||
protected string $exchange = ''; // ❌ 错误
|
||||
```
|
||||
|
||||
### 问题4: 重试次数始终是 0
|
||||
|
||||
**原因**:
|
||||
1. 重试队列没有正确配置 `x-dead-letter-routing-key`
|
||||
2. 消息没有回流到主队列
|
||||
|
||||
**解决**: 确保 rabbitmq.sh 中重试队列配置:
|
||||
```bash
|
||||
--arguments '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"order.retry"}'
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 相关文档
|
||||
|
||||
- [RabbitMQ.md](RabbitMQ.md) - RabbitMQ 架构和配置详解
|
||||
- [HyperfAmqp.md](HyperfAmqp.md) - Hyperf AMQP 使用指南和重试机制详解
|
||||
- [EntityParseFactory完整使用指南.md](EntityParseFactory完整使用指南.md) - 实体解析工厂使用
|
||||
|
||||
|
||||
|
||||
## 注意事项
|
||||
|
||||
EntityParse 为数据处理的核心业务,会影响后续连续聚合物化视图的结果
|
||||
|
||||
backend/app/Platform/[Platform]/EntityParse/Order.php
|
||||
backend/app/Platform/Tmall/EntityParse/Order.php
|
||||
Reference in New Issue
Block a user