update mq doc
This commit is contained in:
+158
-21
@@ -78,7 +78,7 @@
|
||||
**问题**:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。
|
||||
|
||||
**传统方案的问题**:
|
||||
- ❌ `nack(requeue=true)`:消息回队列头部,无限循环
|
||||
- ❌ `nack(requeue=true)`:消息回队列头部,无限循环,**永远不会进入 DLX**
|
||||
- ❌ 直接丢弃:数据丢失
|
||||
- ❌ 立即重试:打爆 API/DB
|
||||
|
||||
@@ -86,17 +86,26 @@
|
||||
```
|
||||
消费失败 → nack(requeue=false) → 进入 DLX
|
||||
↓
|
||||
x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列
|
||||
Consumer检查 x-death count
|
||||
↓
|
||||
x-retries >= 3 → 错误队列 (人工处理)
|
||||
count < 3 → DLX路由到重试队列 (TTL=5s) → 自动死信回主队列
|
||||
↓
|
||||
count >= 3 → Consumer主动发送到错误队列 (人工处理)
|
||||
```
|
||||
|
||||
**重要说明**:
|
||||
- ⚠️ **必须设置 `requeue=false`**:只有 `requeue=false` 才会触发 DLX 机制
|
||||
- ⚠️ **`requeue=true` 会导致无限循环**:消息直接回到原队列头部,永远不进入 DLX
|
||||
- ✅ **retry 队列自动回流**:TTL 到期后自动死信回主 Exchange,不需要额外消费者
|
||||
- ✅ **Consumer 控制重试次数**:检查 `x-death` header 的 count 字段判断重试次数
|
||||
|
||||
**优势**:
|
||||
- ✅ 不会卡 FIFO
|
||||
- ✅ 不会无限重试
|
||||
- ✅ 不会无限重试(通过 max retries 限制)
|
||||
- ✅ 延迟重试避免打爆 API/DB
|
||||
- ✅ 错误消息自动隔离
|
||||
- ✅ 错误消息自动隔离到 error 队列
|
||||
- ✅ 无需额外 Redis 或手写 retry 逻辑
|
||||
- ✅ retry 过程完全自动化(依赖 RabbitMQ 的 DLX + TTL 机制)
|
||||
|
||||
### 方案优势总结
|
||||
|
||||
@@ -124,6 +133,79 @@
|
||||
|
||||
---
|
||||
|
||||
## Requeue 机制详解
|
||||
|
||||
### requeue 参数的作用
|
||||
|
||||
在 RabbitMQ 中,当消费者处理消息失败时,可以通过 `basic_nack()` 或 `basic_reject()` 拒绝消息,第三个参数 `requeue` 决定了消息的去向:
|
||||
|
||||
```php
|
||||
// requeue=true:消息重新回到原队列(队列头部)
|
||||
$channel->basic_nack($deliveryTag, false, true);
|
||||
|
||||
// requeue=false:消息进入 DLX(如果配置了),否则丢弃
|
||||
$channel->basic_nack($deliveryTag, false, false);
|
||||
```
|
||||
|
||||
### requeue=true 的问题
|
||||
|
||||
| 行为 | 后果 |
|
||||
|------|------|
|
||||
| 消息回到原队列头部 | 立即被同一个消费者再次取出 |
|
||||
| 不触发 DLX | 永远不会进入重试队列或错误队列 |
|
||||
| 无延迟 | 瞬间形成高速循环(毫秒级) |
|
||||
| 无计数 | 无法统计重试次数,无法设置上限 |
|
||||
|
||||
**结果**:消息在 `消费 → 失败 → NACK → 回队列 → 消费` 之间无限循环,CPU 和网络资源被浪费,队列状态显示为空(消息循环太快无法观察)。
|
||||
|
||||
### requeue=false 的正确用法
|
||||
|
||||
```
|
||||
orders.queue (配置了 x-dead-letter-exchange)
|
||||
↓ Consumer 消费失败
|
||||
↓ basic_nack(requeue=false)
|
||||
↓ 触发 DLX 机制
|
||||
↓
|
||||
dlx.orders (死信交换机)
|
||||
↓ routing_key="retry"
|
||||
↓
|
||||
orders.retry.queue (TTL=5s)
|
||||
↓ 5秒后 TTL 到期
|
||||
↓ 自动死信回 main.exchange
|
||||
↓
|
||||
main.exchange
|
||||
↓ routing_key="order.#"
|
||||
↓
|
||||
orders.queue (重新进入主队列,x-death count+1)
|
||||
```
|
||||
|
||||
### 环境变量配置
|
||||
|
||||
在 `.env` 文件中可以配置以下参数:
|
||||
|
||||
```bash
|
||||
# 最大重试次数(默认3次)
|
||||
# 消息重试超过此次数后,会被发送到 errors.queue
|
||||
AMQP_MAX_RETRIES=3
|
||||
|
||||
# 调试延迟(秒,默认0)
|
||||
# 设置为 2 可以让每条消息处理延迟2秒,方便在 mq:status 中观察队列状态
|
||||
# 生产环境应设置为 0
|
||||
AMQP_CONSUMER_DEBUG_DELAY=0
|
||||
```
|
||||
|
||||
**使用示例**:
|
||||
|
||||
```bash
|
||||
# 开发环境:观察消息流转
|
||||
AMQP_CONSUMER_DEBUG_DELAY=2 php bin/hyperf.php start
|
||||
|
||||
# 生产环境:正常运行
|
||||
AMQP_CONSUMER_DEBUG_DELAY=0 php bin/hyperf.php start
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 架构设计
|
||||
|
||||
### 1. VHost 设计
|
||||
@@ -254,13 +336,16 @@ RabbitMQ 实例
|
||||
}
|
||||
```
|
||||
|
||||
**重试机制说明**:
|
||||
1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX
|
||||
2. DLX 根据消息 header 中的 `x-retries` 判断路由:
|
||||
- `x-retries < 3`:路由到 `retry.queue`(延迟重试)
|
||||
- `x-retries >= 3`:路由到 `errors.queue`(永久失败)
|
||||
3. 重试队列 TTL 到期后,消息自动死信回主 Exchange,重新进入主队列
|
||||
4. 最大重试次数:**3次**,延迟时间:**5秒**
|
||||
**重试机制说明(方案B实现)**:
|
||||
1. 消费者处理失败时,检查消息的 `x-death` header 中的 count 字段获取重试次数
|
||||
2. 根据重试次数决定处理方式:
|
||||
- **count < 3**:执行 `nack(requeue=false)`,消息进入 DLX → 路由到 `retry.queue`(延迟重试)
|
||||
- **count >= 3**:消费者主动发送到 `errors.queue`,然后返回 ACK(避免再次重试)
|
||||
3. 重试队列 TTL(5秒)到期后,消息自动死信回主 Exchange,重新进入主队列
|
||||
4. 配置参数:
|
||||
- **最大重试次数**:3次(可通过 `AMQP_MAX_RETRIES` 环境变量配置)
|
||||
- **延迟时间**:5秒(retry 队列的 `x-message-ttl`)
|
||||
- **调试延迟**:可通过 `AMQP_CONSUMER_DEBUG_DELAY` 设置处理延迟,方便观察队列状态
|
||||
|
||||
#### 错误队列
|
||||
|
||||
@@ -270,6 +355,36 @@ RabbitMQ 实例
|
||||
|-----------|--------|----------|----------|------|
|
||||
| `errors.queue` | 是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 |
|
||||
|
||||
**错误队列的作用和机制**:
|
||||
- ✅ **收集永久失败的消息**:当消息重试次数达到上限(默认3次)后,消费者会主动将消息发送到此队列
|
||||
- ✅ **避免消息丢失**:即使处理失败多次,消息也会被保存在错误队列中,不会丢失
|
||||
- ✅ **人工排查和修复**:运维人员可以从错误队列中查看失败原因,修复问题后重新投递
|
||||
- ✅ **统一管理**:所有数据类型(orders/products/refunds/inventory)的错误消息都集中在一个队列
|
||||
- ⚠️ **不会自动重试**:错误队列中的消息需要人工介入,不会自动回流到主队列
|
||||
|
||||
**错误消息格式**:
|
||||
```json
|
||||
{
|
||||
"error_id": "err_675d8f3a2b1c4",
|
||||
"original_message": { /* 原始消息内容 */ },
|
||||
"error": {
|
||||
"type": "InvalidArgumentException",
|
||||
"message": "Missing required field: total_amount",
|
||||
"trace": "Exception stack trace...",
|
||||
"timestamp": "2025-01-15T10:31:00+00:00"
|
||||
},
|
||||
"metadata": {
|
||||
"platform": "tmall",
|
||||
"platform_id": 2,
|
||||
"company_id": 188,
|
||||
"store_id": 292,
|
||||
"data_type": "order",
|
||||
"failed_at": "2025-01-15T10:31:00+00:00",
|
||||
"retry_count": 3
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 4. Binding 配置
|
||||
@@ -516,7 +631,7 @@ class OrderConsumer:
|
||||
)
|
||||
```
|
||||
|
||||
### 4. 重试机制工作流程
|
||||
### 4. 重试机制工作流程(方案B实现)
|
||||
|
||||
```
|
||||
┌─────────────────┐
|
||||
@@ -525,22 +640,44 @@ class OrderConsumer:
|
||||
│
|
||||
├─ 成功 → basic_ack()
|
||||
│
|
||||
└─ 失败 → basic_nack(requeue=false)
|
||||
└─ 失败 → Consumer检查 x-death count
|
||||
│
|
||||
▼
|
||||
┌──────────────────┐
|
||||
│ dlx.orders │ ← 死信交换机
|
||||
└────────┬─────────┘
|
||||
│
|
||||
├─ x-retries < 3 → orders.retry.queue (TTL=5s)
|
||||
├─ count < 3 (未超过重试次数)
|
||||
│ │
|
||||
│ └→ basic_nack(requeue=false)
|
||||
│ │
|
||||
│ ▼
|
||||
│ ┌──────────────────┐
|
||||
│ │ dlx.orders │ ← 死信交换机
|
||||
│ └────────┬─────────┘
|
||||
│ │
|
||||
│ └→ routing_key="retry"
|
||||
│ │
|
||||
│ ▼
|
||||
│ orders.retry.queue (TTL=5s)
|
||||
│ │
|
||||
│ └─ TTL到期 → 自动死信回 main.exchange
|
||||
│ │
|
||||
│ └─ order.# → orders.queue
|
||||
│ (x-death count自动+1)
|
||||
│
|
||||
└─ x-retries >= 3 → errors.queue (人工处理)
|
||||
└─ count >= 3 (超过重试次数)
|
||||
│
|
||||
└→ Consumer主动发送到 errors.queue
|
||||
│
|
||||
└→ basic_ack() (避免再次重试)
|
||||
│
|
||||
▼
|
||||
errors.queue (人工处理)
|
||||
```
|
||||
|
||||
**关键点**:
|
||||
- ✅ **requeue=false**:让失败消息进入 DLX,而不是回到原队列
|
||||
- ✅ **retry 队列自动回流**:TTL 到期后自动死信回主队列,不需要额外消费者
|
||||
- ✅ **x-death count 自动累加**:每次死信 RabbitMQ 会自动增加 count
|
||||
- ✅ **Consumer 控制路由**:超过重试次数时,Consumer 主动发送到 error 队列并 ACK
|
||||
- ⚠️ **不能使用动态 routing key**:DLX 的 routing key 在队列配置中是固定的,无法基于 count 动态改变
|
||||
|
||||
### 5. 为什么这样设计?
|
||||
|
||||
| 特性 | 原因 | 优势 |
|
||||
|
||||
Reference in New Issue
Block a user