511 lines
14 KiB
Markdown
511 lines
14 KiB
Markdown
|
|
# OrderConsumer 业务流程图
|
|||
|
|
|
|||
|
|
## 概览
|
|||
|
|
|
|||
|
|
OrderConsumer 是订单消息的消费者,负责从 `orders.queue` 接收消息,解析并持久化订单数据到数据库。它实现了自动重试机制,失败的消息会自动重试,超过重试次数的消息会被发送到错误队列等待人工处理。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 完整流程图
|
|||
|
|
|
|||
|
|
```mermaid
|
|||
|
|
flowchart TD
|
|||
|
|
Start([消息到达 orders.queue]) --> CheckDebug{是否开启调试延迟?}
|
|||
|
|
CheckDebug -->|是| Sleep[休眠 N 秒]
|
|||
|
|
CheckDebug -->|否| GetRetry[获取重试次数]
|
|||
|
|
Sleep --> GetRetry
|
|||
|
|
|
|||
|
|
GetRetry --> ParseRetry{检查 application_headers}
|
|||
|
|
|
|||
|
|
ParseRetry -->|不存在| FirstTime[首次处理 retryCount=0]
|
|||
|
|
ParseRetry -->|存在| ReadXDeath[读取 x-death count]
|
|||
|
|
ReadXDeath --> RetryCount[retryCount = count]
|
|||
|
|
|
|||
|
|
FirstTime --> LogRetry[记录重试次数]
|
|||
|
|
RetryCount --> LogRetry
|
|||
|
|
|
|||
|
|
LogRetry --> TryBlock[开始处理消息]
|
|||
|
|
|
|||
|
|
TryBlock --> CreateParser[创建 Parser]
|
|||
|
|
CreateParser --> ExtractMeta[提取 metadata]
|
|||
|
|
ExtractMeta --> EntityMatch[匹配实体对象]
|
|||
|
|
EntityMatch --> EntityMap[转换 raw_data]
|
|||
|
|
EntityMap --> BeginTx[开始事务]
|
|||
|
|
|
|||
|
|
BeginTx --> LoopSave[保存实体]
|
|||
|
|
LoopSave --> Commit[提交事务]
|
|||
|
|
Commit --> ReturnACK[返回 ACK]
|
|||
|
|
ReturnACK --> End([处理完成])
|
|||
|
|
|
|||
|
|
TryBlock -->|异常| CatchError[捕获异常]
|
|||
|
|
CatchError --> Rollback[回滚事务]
|
|||
|
|
Rollback --> LogError[记录错误]
|
|||
|
|
LogError --> CheckMaxRetry{超过重试次数?}
|
|||
|
|
|
|||
|
|
CheckMaxRetry -->|是| ExceededLog[超过重试次数]
|
|||
|
|
CheckMaxRetry -->|否| NotExceededLog[未超过次数]
|
|||
|
|
|
|||
|
|
ExceededLog --> SendError[发送到错误队列]
|
|||
|
|
SendError --> BuildErrorMsg[构建错误消息]
|
|||
|
|
BuildErrorMsg --> ProduceError[发送到 errors.exchange]
|
|||
|
|
ProduceError --> TrySend{发送成功?}
|
|||
|
|
|
|||
|
|
TrySend -->|成功| SuccessLog[记录成功日志]
|
|||
|
|
TrySend -->|失败| FailLog[记录失败日志]
|
|||
|
|
|
|||
|
|
SuccessLog --> AckAfterError[返回 ACK]
|
|||
|
|
FailLog --> AckAfterError
|
|||
|
|
AckAfterError --> EndError([已发送到错误队列])
|
|||
|
|
|
|||
|
|
NotExceededLog --> ReturnNACK[返回 NACK]
|
|||
|
|
ReturnNACK --> ToDLX([进入 DLX])
|
|||
|
|
ToDLX --> ToRetryQueue[进入重试队列]
|
|||
|
|
ToRetryQueue --> WaitTTL[等待 5 秒]
|
|||
|
|
WaitTTL --> BackToMain[TTL到期回流]
|
|||
|
|
BackToMain --> BackToQueue[回到 orders.queue]
|
|||
|
|
BackToQueue --> Start
|
|||
|
|
|
|||
|
|
style Start fill:#e1f5e1
|
|||
|
|
style End fill:#e1f5e1
|
|||
|
|
style EndError fill:#ffe1e1
|
|||
|
|
style ReturnACK fill:#90EE90
|
|||
|
|
style AckAfterError fill:#FFB6C1
|
|||
|
|
style ReturnNACK fill:#FFD700
|
|||
|
|
style SendError fill:#FF6B6B
|
|||
|
|
style Commit fill:#4CAF50
|
|||
|
|
style Rollback fill:#F44336
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 消息流转状态图
|
|||
|
|
|
|||
|
|
```mermaid
|
|||
|
|
stateDiagram-v2
|
|||
|
|
[*] --> OrdersQueue: 新消息到达
|
|||
|
|
|
|||
|
|
OrdersQueue --> Processing: Consumer 消费
|
|||
|
|
|
|||
|
|
Processing --> Success: 处理成功
|
|||
|
|
Processing --> FirstFailure: 首次失败 count=0
|
|||
|
|
Processing --> SecondFailure: 第2次失败 count=1
|
|||
|
|
Processing --> ThirdFailure: 第3次失败 count=2
|
|||
|
|
Processing --> MaxRetryExceeded: 超过重试次数
|
|||
|
|
|
|||
|
|
Success --> [*]: ACK
|
|||
|
|
|
|||
|
|
FirstFailure --> DLX: NACK requeue=false
|
|||
|
|
SecondFailure --> DLX: NACK requeue=false
|
|||
|
|
ThirdFailure --> DLX: NACK requeue=false
|
|||
|
|
|
|||
|
|
DLX --> RetryQueue: routing_key retry
|
|||
|
|
|
|||
|
|
RetryQueue --> RetryQueue: 等待 TTL 5秒
|
|||
|
|
RetryQueue --> MainExchange: TTL到期自动死信
|
|||
|
|
|
|||
|
|
MainExchange --> OrdersQueue: routing_key order.retry
|
|||
|
|
|
|||
|
|
MaxRetryExceeded --> ErrorsExchange: sendToErrorQueue
|
|||
|
|
ErrorsExchange --> ErrorsQueue: routing_key error
|
|||
|
|
ErrorsQueue --> [*]: 等待人工处理
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 重试计数机制
|
|||
|
|
|
|||
|
|
```mermaid
|
|||
|
|
graph LR
|
|||
|
|
M1[消息 首次处理] --> H1{有 headers?}
|
|||
|
|
H1 -->|否| C0[count = 0]
|
|||
|
|
|
|||
|
|
C0 -->|NACK| M2[消息 第1次重试]
|
|||
|
|
M2 --> H2{有 headers?}
|
|||
|
|
H2 -->|是| XD2[读取 x-death]
|
|||
|
|
XD2 --> C1[count = 1]
|
|||
|
|
|
|||
|
|
C1 -->|NACK| M3[消息 第2次重试]
|
|||
|
|
M3 --> H3{有 headers?}
|
|||
|
|
H3 -->|是| XD3[读取 x-death]
|
|||
|
|
XD3 --> C2[count = 2]
|
|||
|
|
|
|||
|
|
C2 -->|NACK| M4[消息 第3次重试]
|
|||
|
|
M4 --> H4{有 headers?}
|
|||
|
|
H4 -->|是| XD4[读取 x-death]
|
|||
|
|
XD4 --> C3[count = 3]
|
|||
|
|
|
|||
|
|
C3 --> ERROR[发送到错误队列]
|
|||
|
|
|
|||
|
|
style C0 fill:#90EE90
|
|||
|
|
style C1 fill:#FFD700
|
|||
|
|
style C2 fill:#FFA500
|
|||
|
|
style C3 fill:#FF6B6B
|
|||
|
|
style ERROR fill:#DC143C,color:#fff
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 关键代码逻辑
|
|||
|
|
|
|||
|
|
### 1. getRetryCount() - 获取重试次数
|
|||
|
|
|
|||
|
|
```php
|
|||
|
|
protected function getRetryCount(AMQPMessage $message): int
|
|||
|
|
{
|
|||
|
|
// ⚠️ 关键:首次消息没有 application_headers,必须先检查
|
|||
|
|
if (!$message->has('application_headers')) {
|
|||
|
|
return 0; // 首次处理
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
$headers = $message->get('application_headers');
|
|||
|
|
if (!$headers) {
|
|||
|
|
return 0;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
$headerData = $headers->getNativeData();
|
|||
|
|
$xDeath = $headerData['x-death'] ?? [];
|
|||
|
|
|
|||
|
|
if (empty($xDeath)) {
|
|||
|
|
return 0;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RabbitMQ 自动维护的重试次数
|
|||
|
|
return $xDeath[0]['count'] ?? 0;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**x-death 结构示例**:
|
|||
|
|
```php
|
|||
|
|
[
|
|||
|
|
'x-death' => [
|
|||
|
|
0 => [
|
|||
|
|
'count' => 2, // 重试次数
|
|||
|
|
'reason' => 'rejected', // 死信原因
|
|||
|
|
'queue' => 'orders.queue', // 原队列
|
|||
|
|
'time' => 1701234567, // 时间戳
|
|||
|
|
'exchange' => 'dlx.orders', // DLX 名称
|
|||
|
|
'routing-keys' => ['retry']
|
|||
|
|
]
|
|||
|
|
]
|
|||
|
|
]
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 2. sendToErrorQueue() - 发送到错误队列
|
|||
|
|
|
|||
|
|
```php
|
|||
|
|
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
|
|||
|
|
{
|
|||
|
|
$originalData = json_decode($message->getBody(), true);
|
|||
|
|
|
|||
|
|
// 构建详细的错误消息
|
|||
|
|
$errorMessage = [
|
|||
|
|
'error_id' => uniqid('err_', true),
|
|||
|
|
'original_message' => $originalData,
|
|||
|
|
'error' => [
|
|||
|
|
'type' => get_class($error),
|
|||
|
|
'message' => $error->getMessage(),
|
|||
|
|
'trace' => $error->getTraceAsString(),
|
|||
|
|
'timestamp' => date('c'),
|
|||
|
|
],
|
|||
|
|
'metadata' => [
|
|||
|
|
'platform' => $originalData['platform'] ?? 'unknown',
|
|||
|
|
'platform_id' => $originalData['meta']['platform_id'] ?? null,
|
|||
|
|
'company_id' => $originalData['meta']['company_id'] ?? null,
|
|||
|
|
'store_id' => $originalData['meta']['store_id'] ?? null,
|
|||
|
|
'data_type' => $originalData['data_type'] ?? 'unknown',
|
|||
|
|
'failed_at' => date('c'),
|
|||
|
|
'retry_count' => $this->getRetryCount($message),
|
|||
|
|
],
|
|||
|
|
];
|
|||
|
|
|
|||
|
|
// 使用 errors.exchange 发送(不是默认 exchange)
|
|||
|
|
$producer = ApplicationContext::getContainer()->get(Producer::class);
|
|||
|
|
$producer->produce(
|
|||
|
|
new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage {
|
|||
|
|
protected string $exchange = 'errors.exchange'; // ✅ 通用错误交换机
|
|||
|
|
protected string|array $routingKey = 'error';
|
|||
|
|
protected string $poolName = 'default_consumer';
|
|||
|
|
protected array $properties = [
|
|||
|
|
'delivery_mode' => 2, // 持久化
|
|||
|
|
];
|
|||
|
|
|
|||
|
|
public function __construct(array $data) {
|
|||
|
|
$this->payload = $data;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 时序图:完整的重试流程
|
|||
|
|
|
|||
|
|
```mermaid
|
|||
|
|
sequenceDiagram
|
|||
|
|
participant P as Producer
|
|||
|
|
participant MQ as orders.queue
|
|||
|
|
participant C as OrderConsumer
|
|||
|
|
participant DLX as dlx.orders
|
|||
|
|
participant RQ as orders.retry.queue
|
|||
|
|
participant ME as main.exchange
|
|||
|
|
participant EX as errors.exchange
|
|||
|
|
participant EQ as errors.queue
|
|||
|
|
participant DB as Database
|
|||
|
|
|
|||
|
|
Note over P,MQ: 1. 首次消息
|
|||
|
|
P->>MQ: 发送订单消息
|
|||
|
|
MQ->>C: 消费消息 (count=0)
|
|||
|
|
C->>C: getRetryCount() → 0
|
|||
|
|
C->>DB: 尝试保存数据
|
|||
|
|
DB-->>C: 失败 (异常)
|
|||
|
|
C->>C: count(0) < MAX(3)
|
|||
|
|
C->>MQ: NACK(requeue=false)
|
|||
|
|
MQ->>DLX: 死信到 DLX
|
|||
|
|
Note over DLX: x-death count=1
|
|||
|
|
DLX->>RQ: routing_key: retry
|
|||
|
|
|
|||
|
|
Note over RQ: 2. 第1次重试
|
|||
|
|
RQ->>RQ: 等待 TTL 5秒
|
|||
|
|
RQ->>ME: TTL到期,自动死信
|
|||
|
|
ME->>MQ: routing_key: order.retry
|
|||
|
|
MQ->>C: 消费消息 (count=1)
|
|||
|
|
C->>C: getRetryCount() → 1
|
|||
|
|
C->>DB: 尝试保存数据
|
|||
|
|
DB-->>C: 失败 (异常)
|
|||
|
|
C->>C: count(1) < MAX(3)
|
|||
|
|
C->>MQ: NACK(requeue=false)
|
|||
|
|
MQ->>DLX: 死信到 DLX
|
|||
|
|
Note over DLX: x-death count=2
|
|||
|
|
DLX->>RQ: routing_key: retry
|
|||
|
|
|
|||
|
|
Note over RQ: 3. 第2次重试
|
|||
|
|
RQ->>RQ: 等待 TTL 5秒
|
|||
|
|
RQ->>ME: TTL到期,自动死信
|
|||
|
|
ME->>MQ: routing_key: order.retry
|
|||
|
|
MQ->>C: 消费消息 (count=2)
|
|||
|
|
C->>C: getRetryCount() → 2
|
|||
|
|
C->>DB: 尝试保存数据
|
|||
|
|
DB-->>C: 失败 (异常)
|
|||
|
|
C->>C: count(2) < MAX(3)
|
|||
|
|
C->>MQ: NACK(requeue=false)
|
|||
|
|
MQ->>DLX: 死信到 DLX
|
|||
|
|
Note over DLX: x-death count=3
|
|||
|
|
DLX->>RQ: routing_key: retry
|
|||
|
|
|
|||
|
|
Note over RQ,EQ: 4. 超过重试次数
|
|||
|
|
RQ->>RQ: 等待 TTL 5秒
|
|||
|
|
RQ->>ME: TTL到期,自动死信
|
|||
|
|
ME->>MQ: routing_key: order.retry
|
|||
|
|
MQ->>C: 消费消息 (count=3)
|
|||
|
|
C->>C: getRetryCount() → 3
|
|||
|
|
C->>DB: 尝试保存数据
|
|||
|
|
DB-->>C: 失败 (异常)
|
|||
|
|
C->>C: count(3) >= MAX(3) ✓
|
|||
|
|
C->>EX: sendToErrorQueue()
|
|||
|
|
EX->>EQ: 消息进入错误队列
|
|||
|
|
C->>MQ: ACK (防止再次重试)
|
|||
|
|
Note over EQ: 等待人工处理
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 配置参数
|
|||
|
|
|
|||
|
|
### 环境变量 (.env)
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
# 最大重试次数(默认3次)
|
|||
|
|
AMQP_MAX_RETRIES=3
|
|||
|
|
|
|||
|
|
# 调试延迟(秒),便于在 mq:status 中观察消息流转
|
|||
|
|
# 生产环境设置为 0,开发环境可设置为 2
|
|||
|
|
AMQP_CONSUMER_DEBUG_DELAY=0
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### Consumer 配置
|
|||
|
|
|
|||
|
|
```php
|
|||
|
|
#[Consumer(
|
|||
|
|
exchange: "main.exchange",
|
|||
|
|
routingKey: "order.#",
|
|||
|
|
queue: "orders.queue",
|
|||
|
|
pool: "default_consumer",
|
|||
|
|
nums: 1,
|
|||
|
|
enable: true
|
|||
|
|
)]
|
|||
|
|
class OrderConsumer extends ConsumerMessage
|
|||
|
|
{
|
|||
|
|
// ⚠️ 关键:必须设置为 false,否则会无限循环重试
|
|||
|
|
protected bool $requeue = false;
|
|||
|
|
|
|||
|
|
protected ?array $qos = [
|
|||
|
|
'prefetch_size' => 0,
|
|||
|
|
'prefetch_count' => 1, // 开发环境建议设置为1,生产环境可设置为10-100
|
|||
|
|
'global' => false,
|
|||
|
|
];
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 队列配置 (rabbitmq.sh)
|
|||
|
|
|
|||
|
|
**orders.queue**:
|
|||
|
|
- `x-dead-letter-exchange`: `dlx.orders` - 失败时发送到的 DLX
|
|||
|
|
- `x-dead-letter-routing-key`: `retry` - DLX 路由键
|
|||
|
|
- `x-message-ttl`: 86400000 (24小时) - 消息最长存活时间
|
|||
|
|
|
|||
|
|
**orders.retry.queue**:
|
|||
|
|
- `x-message-ttl`: 5000 (5秒) - 重试延迟时间
|
|||
|
|
- `x-dead-letter-exchange`: `main.exchange` - TTL到期后回流的 exchange
|
|||
|
|
- `x-dead-letter-routing-key`: `order.retry` - 回流时的 routing key
|
|||
|
|
|
|||
|
|
**errors.queue**:
|
|||
|
|
- `x-message-ttl`: 604800000 (7天) - 错误消息保留时间
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 监控和调试
|
|||
|
|
|
|||
|
|
### 查看队列状态
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
php bin/hyperf.php app:mq:status
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
输出示例:
|
|||
|
|
```
|
|||
|
|
=== Business Queues ===
|
|||
|
|
+-----------+---------+----------+---------+-----------+
|
|||
|
|
| Metric | orders | products | refunds | inventory |
|
|||
|
|
+-----------+---------+----------+---------+-----------+
|
|||
|
|
| Messages | 0 | 0 | 0 | 0 |
|
|||
|
|
| Consumers | 1 | 0 | 0 | 0 |
|
|||
|
|
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
|
|||
|
|
+-----------+---------+----------+---------+-----------+
|
|||
|
|
|
|||
|
|
=== Dead Letter Queues (Retry Queues) ===
|
|||
|
|
+-----------+--------------+----------------+---------------+-----------------+
|
|||
|
|
| Metric | orders.retry | products.retry | refunds.retry | inventory.retry |
|
|||
|
|
+-----------+--------------+----------------+---------------+-----------------+
|
|||
|
|
| Messages | 0 | 0 | 0 | 0 |
|
|||
|
|
| Consumers | 0 | 0 | 0 | 0 |
|
|||
|
|
| Status | ✓ Empty | ✓ Empty | ✓ Empty | ✓ Empty |
|
|||
|
|
+-----------+--------------+----------------+---------------+-----------------+
|
|||
|
|
|
|||
|
|
=== Shared Queues ===
|
|||
|
|
+-----------+----------+
|
|||
|
|
| Metric | errors |
|
|||
|
|
+-----------+----------+
|
|||
|
|
| Messages | 5 |
|
|||
|
|
| Consumers | 0 |
|
|||
|
|
| Status | ✓ Active |
|
|||
|
|
+-----------+----------+
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 开启调试模式观察流程
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
# 1. 设置调试延迟
|
|||
|
|
echo "AMQP_CONSUMER_DEBUG_DELAY=2" >> .env
|
|||
|
|
|
|||
|
|
# 2. 启动消费者
|
|||
|
|
php bin/hyperf.php start
|
|||
|
|
|
|||
|
|
# 3. 发送测试消息
|
|||
|
|
php bin/hyperf.php app:mq-push:tmall
|
|||
|
|
|
|||
|
|
# 4. 观察日志输出
|
|||
|
|
tail -f runtime/logs/hyperf.log
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
你将看到:
|
|||
|
|
```
|
|||
|
|
>>> No application_headers, first time processing
|
|||
|
|
Retry count: 0/3
|
|||
|
|
=== Error Caught ===
|
|||
|
|
Error: Cannot find parser for platform 'tmall' and entity 'order'
|
|||
|
|
>>> Retry not exceeded, sending to DLX (NACK)
|
|||
|
|
|
|||
|
|
[5秒后]
|
|||
|
|
>>> Extracted count from x-death: 1
|
|||
|
|
Retry count: 1/3
|
|||
|
|
>>> Retry not exceeded, sending to DLX (NACK)
|
|||
|
|
|
|||
|
|
[5秒后]
|
|||
|
|
>>> Extracted count from x-death: 2
|
|||
|
|
Retry count: 2/3
|
|||
|
|
>>> Retry not exceeded, sending to DLX (NACK)
|
|||
|
|
|
|||
|
|
[5秒后]
|
|||
|
|
>>> Extracted count from x-death: 3
|
|||
|
|
Retry count: 3/3
|
|||
|
|
>>> MAX RETRIES EXCEEDED! Sending to error queue...
|
|||
|
|
>>> Successfully sent to error queue!
|
|||
|
|
>>> Returning ACK to prevent further retries
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 常见问题排查
|
|||
|
|
|
|||
|
|
### 问题1: 消息无限循环,队列始终显示空
|
|||
|
|
|
|||
|
|
**原因**: `requeue` 没有设置为 `false`
|
|||
|
|
|
|||
|
|
**解决**:
|
|||
|
|
```php
|
|||
|
|
protected bool $requeue = false; // ⚠️ 必须设置
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 问题2: OutOfBoundsException - No "application_headers"
|
|||
|
|
|
|||
|
|
**原因**: 首次处理的消息没有 `application_headers` 属性
|
|||
|
|
|
|||
|
|
**解决**: 使用 `has()` 检查
|
|||
|
|
```php
|
|||
|
|
if (!$message->has('application_headers')) {
|
|||
|
|
return 0;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 问题3: 消息不进入错误队列
|
|||
|
|
|
|||
|
|
**原因**:
|
|||
|
|
1. Consumer 没有 `errors.exchange` 的写权限
|
|||
|
|
2. 使用了默认 exchange (空字符串) 而非 `errors.exchange`
|
|||
|
|
|
|||
|
|
**解决**:
|
|||
|
|
1. 确保 rabbitmq.sh 中 consumer 权限包含 `errors.exchange`
|
|||
|
|
2. 使用 `errors.exchange` 而非空 exchange:
|
|||
|
|
```php
|
|||
|
|
protected string $exchange = 'errors.exchange'; // ✅ 正确
|
|||
|
|
protected string $exchange = ''; // ❌ 错误
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 问题4: 重试次数始终是 0
|
|||
|
|
|
|||
|
|
**原因**:
|
|||
|
|
1. 重试队列没有正确配置 `x-dead-letter-routing-key`
|
|||
|
|
2. 消息没有回流到主队列
|
|||
|
|
|
|||
|
|
**解决**: 确保 rabbitmq.sh 中重试队列配置:
|
|||
|
|
```bash
|
|||
|
|
--arguments '{"x-message-ttl":5000,"x-dead-letter-exchange":"main.exchange","x-dead-letter-routing-key":"order.retry"}'
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 相关文档
|
|||
|
|
|
|||
|
|
- [RabbitMQ.md](RabbitMQ.md) - RabbitMQ 架构和配置详解
|
|||
|
|
- [HyperfAmqp.md](HyperfAmqp.md) - Hyperf AMQP 使用指南和重试机制详解
|
|||
|
|
- [EntityParseFactory完整使用指南.md](EntityParseFactory完整使用指南.md) - 实体解析工厂使用
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
## 注意事项
|
|||
|
|
|
|||
|
|
EntityParse 为数据处理的核心业务,会影响后续连续聚合物化视图的结果
|
|||
|
|
|
|||
|
|
backend/app/Platform/[Platform]/EntityParse/Order.php
|
|||
|
|
backend/app/Platform/Tmall/EntityParse/Order.php
|