diff --git a/docs/RabbitMQ.md b/docs/RabbitMQ.md index 4b5929d..5fa48da 100644 --- a/docs/RabbitMQ.md +++ b/docs/RabbitMQ.md @@ -78,7 +78,7 @@ **问题**:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。 **传统方案的问题**: -- ❌ `nack(requeue=true)`:消息回队列头部,无限循环 +- ❌ `nack(requeue=true)`:消息回队列头部,无限循环,**永远不会进入 DLX** - ❌ 直接丢弃:数据丢失 - ❌ 立即重试:打爆 API/DB @@ -86,17 +86,26 @@ ``` 消费失败 → nack(requeue=false) → 进入 DLX ↓ - x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列 + Consumer检查 x-death count ↓ - x-retries >= 3 → 错误队列 (人工处理) + count < 3 → DLX路由到重试队列 (TTL=5s) → 自动死信回主队列 + ↓ + count >= 3 → Consumer主动发送到错误队列 (人工处理) ``` +**重要说明**: +- ⚠️ **必须设置 `requeue=false`**:只有 `requeue=false` 才会触发 DLX 机制 +- ⚠️ **`requeue=true` 会导致无限循环**:消息直接回到原队列头部,永远不进入 DLX +- ✅ **retry 队列自动回流**:TTL 到期后自动死信回主 Exchange,不需要额外消费者 +- ✅ **Consumer 控制重试次数**:检查 `x-death` header 的 count 字段判断重试次数 + **优势**: - ✅ 不会卡 FIFO -- ✅ 不会无限重试 +- ✅ 不会无限重试(通过 max retries 限制) - ✅ 延迟重试避免打爆 API/DB -- ✅ 错误消息自动隔离 +- ✅ 错误消息自动隔离到 error 队列 - ✅ 无需额外 Redis 或手写 retry 逻辑 +- ✅ retry 过程完全自动化(依赖 RabbitMQ 的 DLX + TTL 机制) ### 方案优势总结 @@ -124,6 +133,79 @@ --- +## Requeue 机制详解 + +### requeue 参数的作用 + +在 RabbitMQ 中,当消费者处理消息失败时,可以通过 `basic_nack()` 或 `basic_reject()` 拒绝消息,第三个参数 `requeue` 决定了消息的去向: + +```php +// requeue=true:消息重新回到原队列(队列头部) +$channel->basic_nack($deliveryTag, false, true); + +// requeue=false:消息进入 DLX(如果配置了),否则丢弃 +$channel->basic_nack($deliveryTag, false, false); +``` + +### requeue=true 的问题 + +| 行为 | 后果 | +|------|------| +| 消息回到原队列头部 | 立即被同一个消费者再次取出 | +| 不触发 DLX | 永远不会进入重试队列或错误队列 | +| 无延迟 | 瞬间形成高速循环(毫秒级) | +| 无计数 | 无法统计重试次数,无法设置上限 | + +**结果**:消息在 `消费 → 失败 → NACK → 回队列 → 消费` 之间无限循环,CPU 和网络资源被浪费,队列状态显示为空(消息循环太快无法观察)。 + +### requeue=false 的正确用法 + +``` +orders.queue (配置了 x-dead-letter-exchange) + ↓ Consumer 消费失败 + ↓ basic_nack(requeue=false) + ↓ 触发 DLX 机制 + ↓ +dlx.orders (死信交换机) + ↓ routing_key="retry" + ↓ +orders.retry.queue (TTL=5s) + ↓ 5秒后 TTL 到期 + ↓ 自动死信回 main.exchange + ↓ +main.exchange + ↓ routing_key="order.#" + ↓ +orders.queue (重新进入主队列,x-death count+1) +``` + +### 环境变量配置 + +在 `.env` 文件中可以配置以下参数: + +```bash +# 最大重试次数(默认3次) +# 消息重试超过此次数后,会被发送到 errors.queue +AMQP_MAX_RETRIES=3 + +# 调试延迟(秒,默认0) +# 设置为 2 可以让每条消息处理延迟2秒,方便在 mq:status 中观察队列状态 +# 生产环境应设置为 0 +AMQP_CONSUMER_DEBUG_DELAY=0 +``` + +**使用示例**: + +```bash +# 开发环境:观察消息流转 +AMQP_CONSUMER_DEBUG_DELAY=2 php bin/hyperf.php start + +# 生产环境:正常运行 +AMQP_CONSUMER_DEBUG_DELAY=0 php bin/hyperf.php start +``` + +--- + ## 架构设计 ### 1. VHost 设计 @@ -254,13 +336,16 @@ RabbitMQ 实例 } ``` -**重试机制说明**: -1. 消费失败时,消费者执行 `nack(requeue=false)`,消息进入 DLX -2. DLX 根据消息 header 中的 `x-retries` 判断路由: - - `x-retries < 3`:路由到 `retry.queue`(延迟重试) - - `x-retries >= 3`:路由到 `errors.queue`(永久失败) -3. 重试队列 TTL 到期后,消息自动死信回主 Exchange,重新进入主队列 -4. 最大重试次数:**3次**,延迟时间:**5秒** +**重试机制说明(方案B实现)**: +1. 消费者处理失败时,检查消息的 `x-death` header 中的 count 字段获取重试次数 +2. 根据重试次数决定处理方式: + - **count < 3**:执行 `nack(requeue=false)`,消息进入 DLX → 路由到 `retry.queue`(延迟重试) + - **count >= 3**:消费者主动发送到 `errors.queue`,然后返回 ACK(避免再次重试) +3. 重试队列 TTL(5秒)到期后,消息自动死信回主 Exchange,重新进入主队列 +4. 配置参数: + - **最大重试次数**:3次(可通过 `AMQP_MAX_RETRIES` 环境变量配置) + - **延迟时间**:5秒(retry 队列的 `x-message-ttl`) + - **调试延迟**:可通过 `AMQP_CONSUMER_DEBUG_DELAY` 设置处理延迟,方便观察队列状态 #### 错误队列 @@ -270,6 +355,36 @@ RabbitMQ 实例 |-----------|--------|----------|----------|------| | `errors.queue` | 是 | 否 | 7天 | 接收所有类型、所有平台的最终失败消息,供人工排查 | +**错误队列的作用和机制**: +- ✅ **收集永久失败的消息**:当消息重试次数达到上限(默认3次)后,消费者会主动将消息发送到此队列 +- ✅ **避免消息丢失**:即使处理失败多次,消息也会被保存在错误队列中,不会丢失 +- ✅ **人工排查和修复**:运维人员可以从错误队列中查看失败原因,修复问题后重新投递 +- ✅ **统一管理**:所有数据类型(orders/products/refunds/inventory)的错误消息都集中在一个队列 +- ⚠️ **不会自动重试**:错误队列中的消息需要人工介入,不会自动回流到主队列 + +**错误消息格式**: +```json +{ + "error_id": "err_675d8f3a2b1c4", + "original_message": { /* 原始消息内容 */ }, + "error": { + "type": "InvalidArgumentException", + "message": "Missing required field: total_amount", + "trace": "Exception stack trace...", + "timestamp": "2025-01-15T10:31:00+00:00" + }, + "metadata": { + "platform": "tmall", + "platform_id": 2, + "company_id": 188, + "store_id": 292, + "data_type": "order", + "failed_at": "2025-01-15T10:31:00+00:00", + "retry_count": 3 + } +} +``` + --- ### 4. Binding 配置 @@ -516,7 +631,7 @@ class OrderConsumer: ) ``` -### 4. 重试机制工作流程 +### 4. 重试机制工作流程(方案B实现) ``` ┌─────────────────┐ @@ -525,22 +640,44 @@ class OrderConsumer: │ ├─ 成功 → basic_ack() │ - └─ 失败 → basic_nack(requeue=false) + └─ 失败 → Consumer检查 x-death count │ - ▼ - ┌──────────────────┐ - │ dlx.orders │ ← 死信交换机 - └────────┬─────────┘ - │ - ├─ x-retries < 3 → orders.retry.queue (TTL=5s) - │ │ - │ └─ TTL到期 → 自动死信回 main.exchange - │ │ - │ └─ order.# → orders.queue - │ - └─ x-retries >= 3 → errors.queue (人工处理) + ├─ count < 3 (未超过重试次数) + │ │ + │ └→ basic_nack(requeue=false) + │ │ + │ ▼ + │ ┌──────────────────┐ + │ │ dlx.orders │ ← 死信交换机 + │ └────────┬─────────┘ + │ │ + │ └→ routing_key="retry" + │ │ + │ ▼ + │ orders.retry.queue (TTL=5s) + │ │ + │ └─ TTL到期 → 自动死信回 main.exchange + │ │ + │ └─ order.# → orders.queue + │ (x-death count自动+1) + │ + └─ count >= 3 (超过重试次数) + │ + └→ Consumer主动发送到 errors.queue + │ + └→ basic_ack() (避免再次重试) + │ + ▼ + errors.queue (人工处理) ``` +**关键点**: +- ✅ **requeue=false**:让失败消息进入 DLX,而不是回到原队列 +- ✅ **retry 队列自动回流**:TTL 到期后自动死信回主队列,不需要额外消费者 +- ✅ **x-death count 自动累加**:每次死信 RabbitMQ 会自动增加 count +- ✅ **Consumer 控制路由**:超过重试次数时,Consumer 主动发送到 error 队列并 ACK +- ⚠️ **不能使用动态 routing key**:DLX 的 routing key 在队列配置中是固定的,无法基于 count 动态改变 + ### 5. 为什么这样设计? | 特性 | 原因 | 优势 |