diff --git a/backend/bin/rabbitmq.sh b/backend/bin/rabbitmq.sh index f9bcde3..1be247b 100755 --- a/backend/bin/rabbitmq.sh +++ b/backend/bin/rabbitmq.sh @@ -53,7 +53,7 @@ ${YELLOW}用法:${NC} $0 [options] ${YELLOW}命令:${NC} - ${GREEN}init${NC} 从数据库读取所有启用的平台,完全重建 MQ 配置 + ${GREEN}init${NC} 从数据库读取所有启用的平台,完全重建 MQ 配置 - 删除现有 VHost 及所有资源 - 从 platforms 表读取 enabled=true 的平台 - 创建所有 Exchange、Queue、Binding @@ -71,9 +71,9 @@ ${YELLOW}命令:${NC} - 从 mq_user.php 中移除配置 示例: $0 remove shopee - ${GREEN}list${NC} 列出当前 MQ 中已配置的平台 + ${GREEN}list${NC} 列出当前 MQ 中已配置的平台 - ${GREEN}version${NC} 显示 RabbitMQ 服务器版本信息 + ${GREEN}version${NC} 显示 RabbitMQ 服务器版本信息 ${GREEN}reset-password${NC} 重置指定用户的密码 --user 用户名称 (consumer/ops/平台名) diff --git a/docs/RabbitMQ.md b/docs/RabbitMQ.md index d31d6ed..a4f25c5 100644 --- a/docs/RabbitMQ.md +++ b/docs/RabbitMQ.md @@ -1,5 +1,14 @@ # RabbitMQ 配置方案 +> **权威源声明**:本文档已与现网代码对齐(2026-05-14)。如本文与代码冲突,**以代码为准**: +> - 拓扑创建脚本:[`backend/bin/rabbitmq.sh`](../backend/bin/rabbitmq.sh) +> - 用户/凭据配置:[`backend/config/autoload/mq_user.php`](../backend/config/autoload/mq_user.php) + [`backend/config/autoload/amqp.php`](../backend/config/autoload/amqp.php) +> - 业务消费者(注解 declare):`backend/app/Platform/{Order,Product,Refund}Consumer.php` +> - 错误兜底(双写):[`backend/app/Platform/Traits/FailedMessageTrait.php`](../backend/app/Platform/Traits/FailedMessageTrait.php) + [`backend/app/Platform/ErrorProducer.php`](../backend/app/Platform/ErrorProducer.php) + [`backend/app/Model/FailedMessage.php`](../backend/app/Model/FailedMessage.php) +> - 运维状态检查:`php ./bin/hyperf.php app:mq:status [--interval=N] [--watch]`([`backend/app/Command/AppMqStatus.php`](../backend/app/Command/AppMqStatus.php) + [`backend/app/Service/MqStatusService.php`](../backend/app/Service/MqStatusService.php)) + +**配置层级说明**:`mq_user.php` 从数据库 `platforms` 表动态读取启用的平台名,按 `strtolower(str_replace(' ', '_', $name))` 标准化生成 `user_{platform}` 用户名与 `MQ_PASSWORD_{PLATFORM_UPPER}` env 变量名;`amqp.php` 中 `default_consumer` 用户硬编码为 `user_datahub_consumer`,密码由 env `MQ_PASSWORD_CONSUMER` 注入。 + ## 概述 本文档描述 datahub 应用的 RabbitMQ 消息队列架构设计和配置方法。 @@ -7,17 +16,18 @@ ### 业务背景 - **应用角色**:datahub 是消息消费者,负责接收来自多个电商平台的数据并入库 -- **数据来源**:多个电商平台(DouYin、JD、Tmall、RedBook、Amazon Japan、Naver、Rakuten、Shopify 等) +- **数据来源**:19 个电商/线下平台(amazon_japan、coupang、dewu、douyin、goofish、jd、kaola、lazada、naver、offline、pdd、rakuten、redbook、shopee、shopify、tmall、wechat、wechat_video、youzan)— 平台列表由 `platforms` 表动态管理,名称按 `strtolower(str_replace(' ', '_', $name))` 标准化 - **数据类型**:订单(orders)、产品(products)、退款(refunds)、库存(inventory) - **隔离需求**:不同平台开发者之间的工作空间需要相互隔离,互不影响 ### 架构特点 -- **VHost 隔离**:按应用划分 VHost,支持多业务复用, 使用 `Classic` Vhost 模式 -- **Exchange 隔离**:每个平台独立 Exchange,通过权限控制访问 -- **单队列设计**:每种数据类型一个队列,所有平台共享(orders.queue/products.queue/refunds.queue/inventory.queue),保证严格 FIFO 顺序 -- **独立消费者**:每种数据类型配备独立消费者,采用批处理 + prefetch + 适配器模式,提升吞吐与稳定性 -- **智能重试**:DLX(死信交换机)+ 延迟重试队列 + 错误队列,优雅处理失败消息,不阻塞主队列 +- **VHost 隔离**:按应用划分 VHost,使用 `Classic` 队列类型 +- **Exchange 隔离**:每个平台独立的业务 Exchange + 错误 Exchange,通过权限正则控制访问(用户只能写自己的 platform.exchange / platform.errors.exchange) +- **单队列设计**:每种数据类型一个队列,所有平台共享(`orders.queue` / `products.queue` / `refunds.queue` / `inventory.queue`),保证同 data_type 内严格 FIFO +- **独立消费者**:每种数据类型一个 Hyperf Consumer 实例,使用 prefetch + 单条 ACK 模式 +- **智能重试**:DLX(死信交换机)+ 延迟重试队列(TTL=5s 自动回流 main.exchange)+ 应用层 ErrorProducer 兜底 +- **双层错误持久化**:超过重试上限的消息 **同时**写入 `errors.queue` (MQ, TTL=7d) + DB `failed_messages` 表(人工捞回与统计分析) - **消息持久化**:RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API --- @@ -41,10 +51,11 @@ | 问题 | 传统多队列方案 | 本方案(单队列) | |------|--------------|----------------| -| **队列数量** | 8平台 × 4数据类型 = 32个队列 | 4个数据类型队列(队列数量减少 87.5%)| +| **队列数量** | 19 平台 × 4 数据类型 = 76 个队列 | 4 个数据类型队列(数量减少 ~95%,且与平台数量解耦)| | **FIFO 保证** | 同类型数据可能分散在多个队列 | 严格 FIFO,同类型数据都在一个队列 | -| **运维复杂度** | 需要监控 32 个队列 | 只需监控 4 个主队列 | -| **消费者部署** | 可能需要 32 个消费者实例 | 只需 4 个消费者实例 | +| **运维复杂度** | 需要监控 76 个队列 | 只需监控 4 个主队列 + 4 个 retry 队列 + 1 个 errors 队列 = 9 个 | +| **消费者部署** | 需要 N 个消费者实例 | 只需 4 个消费者实例 | +| **新增平台成本** | 新建 4 队列 + 4 binding + 用户/权限 | 仅新建 1 业务 exchange + 1 错误 exchange + 5 binding + 1 用户/权限(队列不动)| **原因**:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。 @@ -82,22 +93,39 @@ - ❌ 直接丢弃:数据丢失 - ❌ 立即重试:打爆 API/DB -**本方案(DLX + 延迟重试)**: +**本方案(DLX + 延迟重试 + 双写错误持久化)**: ``` -消费失败 → nack(requeue=false) → 进入 DLX +消费失败 → Consumer 检查 x-death count ↓ - Consumer检查 x-death count + count < AMQP_MAX_RETRIES(默认 3) ↓ - count < 3 → DLX路由到重试队列 (TTL=5s) → 自动死信回主队列 + Consumer 返回 NACK(requeue=false) ↓ - count >= 3 → Consumer主动发送到错误队列 (人工处理) + 源 queue 的 x-dead-letter-exchange = dlx.{dtype}, dl-rk = "retry" + ↓ + dlx.{dtype} (topic) — binding rk="retry" → {dtype}.retry.queue (TTL=5s) + ↓ + retry.queue 的 x-dead-letter-exchange = main.exchange, dl-rk = "{entity}.retry" + ↓ 5s 后死信回流 + main.exchange (topic) — binding "{entity}.#" → {dtype}.queue (重新消费, x-death count+1) + + + count >= AMQP_MAX_RETRIES + ↓ + Consumer 调用 FailedMessageTrait::sendToErrorQueue + ↓ ┌─ ErrorProducer.publish → errors.exchange (rk="error") → errors.queue (TTL=7d) ← MQ 兜底 + └─ FailedMessage::create → DB `failed_messages` 表 ← DB 兜底 + ↓ + Consumer 返回 ACK(避免主队列继续重试) ``` **重要说明**: -- ⚠️ **必须设置 `requeue=false`**:只有 `requeue=false` 才会触发 DLX 机制 +- ⚠️ **必须设置 `requeue=false`**:只有 `requeue=false` 才会触发 DLX 机制(已由 `protected bool $requeue = false` 在所有 Consumer 中固定) - ⚠️ **`requeue=true` 会导致无限循环**:消息直接回到原队列头部,永远不进入 DLX -- ✅ **retry 队列自动回流**:TTL 到期后自动死信回主 Exchange,不需要额外消费者 -- ✅ **Consumer 控制重试次数**:检查 `x-death` header 的 count 字段判断重试次数 +- ✅ **retry 队列自动回流**:TTL 到期后由 RabbitMQ 引擎自动死信回 main.exchange,不需要额外消费者 +- ✅ **Consumer 控制重试次数**:通过 `FailedMessageTrait::getRetryCount` 读 `x-death[0].count` +- ✅ **错误兜底双写**:超过重试上限时,errors.queue + failed_messages 表**同步双写**,互为备份;监控/告警/捞回需同时覆盖两个数据源 +- ❌ **DLX 不直接绑定 errors.queue**:错误进入 errors.queue 完全由应用层 `ErrorProducer` 主动 publish 到 `errors.exchange` 实现,**不**走 DLX 路由分流 **优势**: - ✅ 不会卡 FIFO @@ -161,24 +189,33 @@ $channel->basic_nack($deliveryTag, false, false); ### requeue=false 的正确用法 ``` -orders.queue (配置了 x-dead-letter-exchange) - ↓ Consumer 消费失败 +orders.queue + arguments: + x-dead-letter-exchange = dlx.orders + x-dead-letter-routing-key = retry + x-message-ttl = 86400000 (24h) + ↓ Consumer 消费失败 (count < max_retries) ↓ basic_nack(requeue=false) ↓ 触发 DLX 机制 ↓ -dlx.orders (死信交换机) - ↓ routing_key="retry" +dlx.orders (topic) + ↓ binding key = "retry" ↓ -orders.retry.queue (TTL=5s) - ↓ 5秒后 TTL 到期 - ↓ 自动死信回 main.exchange +orders.retry.queue + arguments: + x-message-ttl = 5000 (5s) + x-dead-letter-exchange = main.exchange + x-dead-letter-routing-key = order.retry + ↓ 5s 后 TTL 到期,自动死信回 main.exchange ↓ -main.exchange - ↓ routing_key="order.#" +main.exchange (topic) + ↓ binding key = "order.#" 匹配 "order.retry" ↓ orders.queue (重新进入主队列,x-death count+1) ``` +> 注意:retry queue 的 `x-dead-letter-routing-key` 是 **`{entity_singular}.retry`**(如 `order.retry` / `product.retry`),不保留原 routing key。由于 main.exchange 用通配符 `order.#` 绑定 orders.queue,`order.retry` 能正常匹配回流;但失去了"消息原平台来源"信息(platform 字段已存在 message 体内,不依赖 routing key 追溯)。 + ### 环境变量配置 在 `.env` 文件中可以配置以下参数: @@ -212,74 +249,84 @@ AMQP_CONSUMER_DEBUG_DELAY=0 php bin/hyperf.php start VHost 按业务应用进行划分,便于资源隔离和权限管理。 -``` -RabbitMQ 实例 -├── datahub-app # Datahub 应用专用 -├── analytics-app # 分析应用(预留) -└── warehouse-app # 数据仓库应用(预留) -``` - -**当前需要创建的 VHost**: -- `datahub-app`:用于电商数据流转 +**当前 VHost**: +- `datahub` — 电商数据流转专用(注:早期文档误写为 `datahub`,**现网实际为 `datahub`**) --- ### 2. Exchange 设计 -在 `datahub-app` VHost 中,Exchange 分为两类: +在 `datahub` VHost 中,Exchange 分为三类: -#### 业务 Exchange(用于数据投递) - -每个平台一个独立的 Topic Exchange: +#### 主路由 Exchange | Exchange 名称 | 类型 | 持久化 | 用途 | |--------------|------|--------|------| -| `douyin.exchange` | Topic | 是 | DouYin(抖音)平台数据入口 | -| `jd.exchange` | Topic | 是 | JD(京东)平台数据入口 | -| `tmall.exchange` | Topic | 是 | Tmall(天猫)平台数据入口 | -| `redbook.exchange` | Topic | 是 | RedBook(小红书)平台数据入口 | -| `amazon.exchange` | Topic | 是 | Amazon Japan 平台数据入口 | -| `naver.exchange` | Topic | 是 | Naver 平台数据入口 | -| `rakuten.exchange` | Topic | 是 | Rakuten(乐天)平台数据入口 | -| `shopify.exchange` | Topic | 是 | Shopify 平台数据入口 | +| `main.exchange` | Topic | 是 | **仅作为 retry 回流入口** + Consumer Annotation binding 声明源;**不**作为 producer 入口 | -**Routing Key 规范**(采用 `{data_type}.{platform}` 格式): +> ⚠ producer 直推 `{platform}.exchange`,**不**经过 main.exchange。main.exchange 的两个用途:① retry queue 的 `x-dead-letter-exchange` 指向它,retry 消息通过它回流业务队列;② Hyperf `#[Consumer(exchange: "main.exchange", routingKey: "{entity}.#", queue: "{dtype}.queue")]` 注解声明 binding 时用它。 -| 平台 | 订单 | 产品 | 退款 | 库存 | -|------|------|------|------|------| -| DouYin | `order.douyin` | `product.douyin` | `refund.douyin` | `inventory.douyin` | -| JD | `order.jd` | `product.jd` | `refund.jd` | `inventory.jd` | -| Tmall | `order.tmall` | `product.tmall` | `refund.tmall` | `inventory.tmall` | -| RedBook | `order.redbook` | `product.redbook` | `refund.redbook` | `inventory.redbook` | -| Amazon | `order.amazon` | `product.amazon` | `refund.amazon` | `inventory.amazon` | -| Naver | `order.naver` | `product.naver` | `refund.naver` | `inventory.naver` | -| Rakuten | `order.rakuten` | `product.rakuten` | `refund.rakuten` | `inventory.rakuten` | -| Shopify | `order.shopify` | `product.shopify` | `refund.shopify` | `inventory.shopify` | +#### 平台业务 Exchange(producer 入口) + +每个平台一个独立的 Topic Exchange,共 19 个(与 `platforms` 表 enabled 平台对齐): + +| 平台 (`name` 标准化后) | Exchange | +|---|---| +| amazon_japan | `amazon_japan.exchange` | +| coupang | `coupang.exchange` | +| dewu | `dewu.exchange` | +| douyin | `douyin.exchange` | +| goofish | `goofish.exchange` | +| jd | `jd.exchange` | +| kaola | `kaola.exchange` | +| lazada | `lazada.exchange` | +| naver | `naver.exchange` | +| offline | `offline.exchange` | +| pdd | `pdd.exchange` | +| rakuten | `rakuten.exchange` | +| redbook | `redbook.exchange` | +| shopee | `shopee.exchange` | +| shopify | `shopify.exchange` | +| tmall | `tmall.exchange` | +| wechat | `wechat.exchange` | +| wechat_video | `wechat_video.exchange` | +| youzan | `youzan.exchange` | + +全部 `type=topic, durable=true`。 + +**Routing Key 规范**(`{entity_singular}.{platform}` 格式,注意 entity 单数 / 平台名小写下划线): + +| 数据类型 | 示例 routing key | +|---|---| +| 订单 (order) | `order.tmall` / `order.amazon_japan` / `order.wechat_video` | +| 产品 (product) | `product.tmall` / `product.amazon_japan` / `product.wechat_video` | +| 退款 (refund) | `refund.tmall` / `refund.amazon_japan` / `refund.wechat_video` | +| 库存 (inventory) | `inventory.tmall` / `inventory.amazon_japan` / `inventory.wechat_video` | **设计说明**: -- 使用 Topic Exchange 的通配符特性,队列可通过 `order.#` 匹配所有平台的订单 -- 保证单队列内所有平台消息的 FIFO 顺序 -- 消费者通过解析 routing key 或消息体的 platform 字段进行业务分发 +- producer 直推 `{platform}.exchange`,使用 routing key `{entity_singular}.{platform}` +- 业务 binding 把 `{platform}.exchange` 通过 routing key 直绑到 4 个 `{dtype}.queue`(不经过 main.exchange) +- 消费者通过消息体 `platform` / `data_type` 字段进行业务分发(**不**依赖 routing key 解析) -#### 错误 Exchange(用于错误通知) +#### 平台错误 Exchange(应用层错误推送 + 平台错误通知) -每个平台一个独立的错误 Topic Exchange: +每个平台一个独立的错误 Topic Exchange,共 19 个,命名 `{platform}.errors.exchange`: + +| 来源平台 | Errors Exchange | +|---|---| +| 19 个平台 | `{platform}.errors.exchange`(amazon_japan / coupang / dewu / douyin / goofish / jd / kaola / lazada / naver / offline / pdd / rakuten / redbook / shopee / shopify / tmall / wechat / wechat_video / youzan)| + +每个 `{platform}.errors.exchange` 通过 binding rk=`#` 直连 `errors.queue`。 + +#### 全局错误 Exchange | Exchange 名称 | 类型 | 持久化 | 用途 | -|--------------|------|--------|------| -| `douyin.errors.exchange` | Topic | 是 | DouYin 平台错误消息 | -| `jd.errors.exchange` | Topic | 是 | JD 平台错误消息 | -| `tmall.errors.exchange` | Topic | 是 | Tmall 平台错误消息 | -| `redbook.errors.exchange` | Topic | 是 | RedBook 平台错误消息 | -| `amazon.errors.exchange` | Topic | 是 | Amazon Japan 平台错误消息 | -| `naver.errors.exchange` | Topic | 是 | Naver 平台错误消息 | -| `rakuten.errors.exchange` | Topic | 是 | Rakuten 平台错误消息 | -| `shopify.errors.exchange` | Topic | 是 | Shopify 平台错误消息 | +|---|---|---|---| +| `errors.exchange` | Topic | 是 | 应用层 `ErrorProducer` 推送目标,binding rk=`#` 直连 `errors.queue` | -**Routing Key 规范**: -- `validation` - 数据格式验证错误 -- `processing_failed` - 业务处理失败 -- `system` - 系统错误 +**Routing Key 规范(应用层 publish 时)**: +- 应用层 `ErrorProducer.php` 固定使用 routing key = `error`(见代码) +- 平台 producer 在自检/校验失败时可向 `{platform}.errors.exchange` 自定义 routing key(业务侧决定,由 binding `#` 兜底) --- @@ -327,25 +374,29 @@ RabbitMQ 实例 | `refunds.retry.queue` | 是 | 5秒 | `main.exchange` | 退款延迟重试 | | `inventory.retry.queue` | 是 | 5秒 | `main.exchange` | 库存延迟重试 | -**重试队列参数配置**: +**重试队列参数配置**(按 data_type 实例化,`{entity_singular}` ∈ {`order`, `product`, `refund`, `inventory`}): ```json { "x-message-ttl": 5000, "x-dead-letter-exchange": "main.exchange", - "x-dead-letter-routing-key": "order.{platform}" + "x-dead-letter-routing-key": "{entity_singular}.retry" } ``` -**重试机制说明(方案B实现)**: -1. 消费者处理失败时,检查消息的 `x-death` header 中的 count 字段获取重试次数 +例:`orders.retry.queue` 的 dl-rk = `order.retry`,回流时由 main.exchange 的 `order.#` binding 匹配进入 `orders.queue`。 + +**重试机制说明(现网实现)**: +1. 消费者处理失败时,调用 `FailedMessageTrait::getRetryCount` 读 `x-death[0].count` 2. 根据重试次数决定处理方式: - - **count < 3**:执行 `nack(requeue=false)`,消息进入 DLX → 路由到 `retry.queue`(延迟重试) - - **count >= 3**:消费者主动发送到 `errors.queue`,然后返回 ACK(避免再次重试) -3. 重试队列 TTL(5秒)到期后,消息自动死信回主 Exchange,重新进入主队列 -4. 配置参数: - - **最大重试次数**:3次(可通过 `AMQP_MAX_RETRIES` 环境变量配置) - - **延迟时间**:5秒(retry 队列的 `x-message-ttl`) - - **调试延迟**:可通过 `AMQP_CONSUMER_DEBUG_DELAY` 设置处理延迟,方便观察队列状态 + - **count < `AMQP_MAX_RETRIES`(默认 3)**:`return Result::NACK`(Hyperf 自动按 `$requeue=false` 触发 DLX)→ 进入 `dlx.{dtype}` → 路由到 `{dtype}.retry.queue`(5s 延迟)→ TTL 到期回流 `main.exchange` → 重新进入主队列 + - **count >= max_retries**:Consumer 调用 `FailedMessageTrait::sendToErrorQueue`,**同时**: + - `ErrorProducer.publish` → `errors.exchange` (rk=`error`) → `errors.queue` (TTL 7d) + - `FailedMessage::create` → DB `failed_messages` 表 + - 然后 `return Result::ACK`,避免主队列继续重试 +3. 配置参数: + - **最大重试次数**:`AMQP_MAX_RETRIES`(默认 3) + - **延迟时间**:5 秒(retry 队列 `x-message-ttl=5000`) + - **调试延迟**:`AMQP_CONSUMER_DEBUG_DELAY`(生产环境设 0) #### 错误队列 @@ -387,112 +438,80 @@ RabbitMQ 实例 --- -### 4. Binding 配置 +### 4. Binding 配置(现网实际) -#### 主 Exchange (用于接收各平台数据) +本节按"消息流向"组织,每个 binding 都来自本地 MQ 实测(`curl mgmt-api/bindings/datahub`)+ `backend/bin/rabbitmq.sh` 创建逻辑。 -需要创建一个统一的主 Exchange,用于重试队列的消息回流: +#### 4.1 平台业务 Exchange → 业务队列(producer 主路径) -| Exchange 名称 | 类型 | 持久化 | 用途 | -|--------------|------|--------|------| -| `main.exchange` | Topic | 是 | 统一主交换机,用于路由和重试消息回流 | - -#### 业务队列绑定(主队列) - -**方案1:直接绑定(推荐)** -每个平台的 Exchange 使用带平台标识的 routing key 绑定到对应数据类型队列: +每个 `{platform}.exchange` 通过 4 条 binding 直接绑定到 4 个业务队列: ``` -douyin.exchange -├── [routing_key: order.douyin] → orders.queue -├── [routing_key: product.douyin] → products.queue -├── [routing_key: refund.douyin] → refunds.queue -└── [routing_key: inventory.douyin] → inventory.queue - -tmall.exchange -├── [routing_key: order.tmall] → orders.queue -├── [routing_key: product.tmall] → products.queue -├── [routing_key: refund.tmall] → refunds.queue -└── [routing_key: inventory.tmall] → inventory.queue - -... (其他平台类似) +{platform}.exchange (topic) +├── rk = "order.{platform}" → orders.queue +├── rk = "product.{platform}" → products.queue +├── rk = "refund.{platform}" → refunds.queue +└── rk = "inventory.{platform}" → inventory.queue ``` -**方案2:通过主 Exchange(备选)** -平台 Exchange 先发到主 Exchange,再由主 Exchange 统一路由: +19 平台 × 4 = **76 条 binding**。 + +#### 4.2 main.exchange → 业务队列(retry 回流路径) ``` -douyin.exchange → [routing_key: order.douyin] → main.exchange - -main.exchange -├── [routing_key: order.#] → orders.queue -├── [routing_key: product.#] → products.queue -├── [routing_key: refund.#] → refunds.queue -└── [routing_key: inventory.#] → inventory.queue +main.exchange (topic) +├── rk = "order.#" → orders.queue +├── rk = "product.#" → products.queue +├── rk = "refund.#" → refunds.queue +└── rk = "inventory.#" → inventory.queue ``` -**推荐使用方案1**,简化架构,减少消息跳转。 +**4 条 binding**。这是 retry queue 死信回流的唯一入口;producer **不**通过 main.exchange 投递消息。 -#### DLX 绑定(重试路由) - -每个 DLX 根据消息 header 中的 `x-retries` 决定路由目标: +#### 4.3 DLX → retry queue(仅一种绑定) ``` -dlx.orders -├── [routing_key: retry, x-retries < 3] → orders.retry.queue -└── [routing_key: error, x-retries >= 3] → errors.queue - -dlx.products -├── [routing_key: retry] → products.retry.queue -└── [routing_key: error] → errors.queue - -dlx.refunds -├── [routing_key: retry] → refunds.retry.queue -└── [routing_key: error] → errors.queue - -dlx.inventory -├── [routing_key: retry] → inventory.retry.queue -└── [routing_key: error] → errors.queue +dlx.orders (topic) — rk = "retry" → orders.retry.queue +dlx.products (topic) — rk = "retry" → products.retry.queue +dlx.refunds (topic) — rk = "retry" → refunds.retry.queue +dlx.inventory (topic) — rk = "retry" → inventory.retry.queue ``` -**注意**:RabbitMQ 原生不支持基于 header 的条件路由,需要: -- **方案A**:消费者在 `nack` 时根据重试次数决定发送到 DLX 的 routing key(`retry` 或 `error`) -- **方案B**:使用 RabbitMQ 插件(如 `rabbitmq_message_routing_by_headers`)实现条件路由 +**4 条 binding**。 -**推荐方案A**:消费者控制重试逻辑,更灵活可控。 +> ⚠ **DLX 不绑定到 errors.queue**。错误进入 errors.queue 由应用层 `ErrorProducer` 主动 publish 实现(见 §4.5),**不**通过 DLX 路由分流。这一点与早期文档草稿("dlx.{dtype} → errors.queue (rk=error)" 双绑模式)不符,**以现网实现为准**。 -#### 重试队列绑定(回流主队列) +#### 4.4 retry queue → main.exchange(TTL 死信回流) -重试队列 TTL 到期后,消息自动死信回主 Exchange: +retry queue 不需要显式 binding;其 `x-dead-letter-exchange = main.exchange` 与 `x-dead-letter-routing-key = {entity_singular}.retry` 定义在队列 arguments 中,TTL 到期由 RabbitMQ 引擎自动死信投递到 main.exchange。 ``` -orders.retry.queue -└── [TTL=5s后 死信到 main.exchange,routing_key保持原始值:order.{platform}] - -main.exchange -└── [routing_key: order.#] → orders.queue +orders.retry.queue —(TTL 5s, dl-rk=order.retry)→ main.exchange →[order.#]→ orders.queue +products.retry.queue —(TTL 5s, dl-rk=product.retry)→ main.exchange →[product.#]→ products.queue +refunds.retry.queue —(TTL 5s, dl-rk=refund.retry)→ main.exchange →[refund.#]→ refunds.queue +inventory.retry.queue —(TTL 5s, dl-rk=inventory.retry)→ main.exchange →[inventory.#]→ inventory.queue ``` -这样实现了"失败 → 延迟5秒 → 重新进入主队列"的循环。 - -#### 错误队列绑定 - -每个平台的错误 Exchange 和所有 DLX 都绑定到统一错误队列: +#### 4.5 errors exchange → errors.queue(双入口扇入) ``` -douyin.errors.exchange -└── [routing_key: #] → errors.queue - -dlx.orders -└── [routing_key: error] → errors.queue - -dlx.products -└── [routing_key: error] → errors.queue - -... (其他类型类似) +errors.exchange (topic) — rk = "#" → errors.queue (全局兜底入口,应用层 ErrorProducer 用) +{platform}.errors.exchange (topic) — rk = "#" → errors.queue × 19 (平台自定义错误投递入口) ``` -**说明**:使用 `#` 通配符接收所有错误类型。 +**20 条 binding**(19 平台 + 1 全局)。 + +#### 4.6 binding 总数核对 + +| 类别 | 数量 | +|---|---| +| 平台业务 exchange → 4 业务队列 | 19 × 4 = 76 | +| main.exchange → 4 业务队列 | 4 | +| dlx.{dtype} → retry queue | 4 | +| errors exchange → errors.queue(19 平台 + 1 全局) | 20 | +| **总计** | **104** | + +(不含 default exchange `(AMQP default)` 自动给每个 queue 创建的同名 binding) --- @@ -538,145 +557,121 @@ dlx.products 3. 批量处理时,仍然对每条消息单独 ACK/NACK 4. 实现"伪批量拉取"效果,同时保留单条确认的灵活性 -### 3. 消费者处理逻辑(伪代码) +### 3. 消费者处理逻辑(现网实现简化版) -```python -class OrderConsumer: - def __init__(self): - self.prefetch_count = 100 - self.batch_size = 50 - self.batch_timeout = 3 - self.message_buffer = [] - self.adapters = { - 'douyin': DouyinAdapter(), - 'tmall': TmallAdapter(), - 'jd': JDAdapter(), - # ... 其他平台适配器 +完整实现见 [`backend/app/Platform/OrderConsumer.php`](../backend/app/Platform/OrderConsumer.php) 与 [`FailedMessageTrait.php`](../backend/app/Platform/Traits/FailedMessageTrait.php)。简化骨架: + +```php +#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", + pool: "default_consumer", nums: 1, enable: true)] +class OrderConsumer extends ConsumerMessage +{ + use FailedMessageTrait; + + protected ?array $qos = ['prefetch_size' => 0, 'prefetch_count' => 1, 'global' => false]; + protected bool $requeue = false; // 关键:让失败消息进 DLX 而不是 requeue + + public function getQueueBuilder(): QueueBuilder + { + return (new QueueBuilder()) + ->setQueue($this->getQueue()) + ->setArguments([ + 'x-dead-letter-exchange' => ['S', 'dlx.orders'], + 'x-dead-letter-routing-key' => ['S', 'retry'], + 'x-message-ttl' => ['I', 86400000], // 24h + 'x-queue-type' => ['S', 'classic'], + ]); + } + + public function consumeMessage($data, AMQPMessage $message): Result + { + $retry_count = $this->getRetryCount($message); // 读 x-death[0].count + $max_retries = (int) env('AMQP_MAX_RETRIES', 3); + + try { + // 1. 创建 EntityParse、entityMatch、entityMap、upsert + $parse = EntityParseFactory::createFromMessage($message); + $entity = $parse->entityMatch($data['meta']); + $rows = $parse->entityMap($data['data'])->all(); + + Db::beginTransaction(); + $entity->newQuery()->upsert($rows, $parse->getUniqueBy(), $parse->getUpdateFields()); + // 2. 子项处理、聚合补刷等业务逻辑 ... + Db::commit(); + + return Result::ACK; + } catch (Throwable $error) { + Db::rollBack(); + Log::get()->error('Consumer processing failed', [...]); + + if ($retry_count >= $max_retries) { + // 双写兜底:MQ errors.queue + DB failed_messages 表 + $this->sendToErrorQueue($message, $error); + return Result::ACK; // 防止主队列继续重试 + } + + return Result::NACK; // requeue=false → 进 DLX → retry queue } - - def on_message(self, channel, method, properties, body): - """接收到消息时的回调""" - message = json.loads(body) - self.message_buffer.append({ - 'channel': channel, - 'method': method, - 'properties': properties, - 'body': message - }) - - # 达到批量大小,触发处理 - if len(self.message_buffer) >= self.batch_size: - self.process_batch() - - def process_batch(self): - """批量处理消息""" - for msg_data in self.message_buffer: - try: - message = msg_data['body'] - platform = message['platform'] - - # 使用适配器模式分发到对应平台逻辑 - adapter = self.adapters.get(platform) - if not adapter: - raise ValueError(f"Unknown platform: {platform}") - - # 处理业务逻辑 - adapter.process(message) - - # 成功:ACK - msg_data['channel'].basic_ack(msg_data['method'].delivery_tag) - - except TemporaryError as e: - # 临时错误:重试 - retry_count = msg_data['properties'].headers.get('x-retries', 0) - - if retry_count < 3: - # 更新重试次数,发送到 DLX 进行延迟重试 - headers = {'x-retries': retry_count + 1} - msg_data['channel'].basic_nack( - msg_data['method'].delivery_tag, - requeue=False - ) - else: - # 超过重试次数,发送到错误队列 - self.send_to_error_queue(msg_data) - msg_data['channel'].basic_nack( - msg_data['method'].delivery_tag, - requeue=False - ) - - except PermanentError as e: - # 永久错误:直接进入错误队列 - self.send_to_error_queue(msg_data) - msg_data['channel'].basic_nack( - msg_data['method'].delivery_tag, - requeue=False - ) - - # 清空缓冲区 - self.message_buffer.clear() - - def send_to_error_queue(self, msg_data): - """发送到错误队列""" - error_message = { - 'original_message': msg_data['body'], - 'error': str(e), - 'timestamp': datetime.now().isoformat() - } - # 发送到错误 Exchange - self.channel.basic_publish( - exchange='douyin.errors.exchange', - routing_key='error', - body=json.dumps(error_message) - ) + } +} ``` -### 4. 重试机制工作流程(方案B实现) +`FailedMessageTrait::sendToErrorQueue` 内部: + +```php +$producer = ApplicationContext::getContainer()->get(Producer::class); +$producer->produce(new ErrorProducer($message, $error, $retry_count)); +// ↓ ErrorProducer 注解 #[Producer(exchange: 'errors.exchange', routingKey: 'error')] +// → errors.exchange → errors.queue (MQ 兜底, 7d TTL) + +FailedMessage::query()->create([...]); // → DB failed_messages 表 (DB 兜底) +``` + +### 4. 重试机制工作流程(现网实现) ``` ┌─────────────────┐ │ orders.queue │ ← 消费者从这里接收消息 └────────┬────────┘ │ - ├─ 成功 → basic_ack() + ├─ 成功 → return Result::ACK │ - └─ 失败 → Consumer检查 x-death count + └─ 失败 → FailedMessageTrait::getRetryCount($message) + (读 x-death[0].count) │ - ├─ count < 3 (未超过重试次数) + ├─ count < AMQP_MAX_RETRIES (默认 3) │ │ - │ └→ basic_nack(requeue=false) + │ └→ return Result::NACK (Hyperf 自动按 $requeue=false 处理) │ │ - │ ▼ - │ ┌──────────────────┐ - │ │ dlx.orders │ ← 死信交换机 - │ └────────┬─────────┘ - │ │ - │ └→ routing_key="retry" - │ │ - │ ▼ - │ orders.retry.queue (TTL=5s) - │ │ - │ └─ TTL到期 → 自动死信回 main.exchange - │ │ - │ └─ order.# → orders.queue - │ (x-death count自动+1) + │ ▼ 触发 DLX + │ dlx.orders (topic) ──rk="retry"──► orders.retry.queue + │ │ TTL=5s + │ ▼ + │ main.exchange (rk="order.retry") + │ │ + │ ▼ 命中 binding "order.#" + │ orders.queue (重新消费,x-death.count+1) │ - └─ count >= 3 (超过重试次数) + └─ count >= AMQP_MAX_RETRIES │ - └→ Consumer主动发送到 errors.queue + └→ FailedMessageTrait::sendToErrorQueue($message, $error) │ - └→ basic_ack() (避免再次重试) - │ - ▼ - errors.queue (人工处理) + ├──► ErrorProducer.publish → errors.exchange (rk="error") + │ → errors.queue (TTL 7d, user_ops 捞回) + │ + └──► FailedMessage::query()->create(...) + → DB failed_messages 表 + │ + ▼ + return Result::ACK (避免主队列继续重试) ``` **关键点**: -- ✅ **requeue=false**:让失败消息进入 DLX,而不是回到原队列 -- ✅ **retry 队列自动回流**:TTL 到期后自动死信回主队列,不需要额外消费者 -- ✅ **x-death count 自动累加**:每次死信 RabbitMQ 会自动增加 count -- ✅ **Consumer 控制路由**:超过重试次数时,Consumer 主动发送到 error 队列并 ACK -- ⚠️ **不能使用动态 routing key**:DLX 的 routing key 在队列配置中是固定的,无法基于 count 动态改变 +- ✅ **`$requeue=false` 由 Consumer 类属性固定**:让失败消息进入 DLX 而不是 requeue +- ✅ **retry 队列自动回流**:RabbitMQ 引擎按 retry queue 的 `x-message-ttl=5000` 自动死信回 main.exchange,不需要额外消费者 +- ✅ **`x-death.count` 自动累加**:每次死信 RabbitMQ 自动增加 count;Consumer 通过 `FailedMessageTrait::getRetryCount` 读取 +- ✅ **错误兜底双写**:超过重试上限时 errors.queue + failed_messages 表**同步双写**,互为备份 +- ⚠️ **DLX 不绑定 errors.queue**:进入 errors.queue 完全由应用层 `ErrorProducer` 主动 publish 实现,**不**走 DLX 路由分流 ### 5. 为什么这样设计? @@ -693,214 +688,199 @@ class OrderConsumer: ## 用户权限配置 -### 1. 平台开发者账号 +> **VHost 名称**:现网为 `datahub`(早期草稿误写为 `datahub-app`)。下述权限正则均来自本地 MQ 实测(`curl mgmt-api/permissions`)+ `backend/bin/rabbitmq.sh`,是**现网真实使用**的正则,不要随意改动。 -每个平台分配一个专用账号,只能访问自己的 Exchange。 +### 1. 平台开发者账号(19 个) -#### DouYin 平台账号 +用户名规范:`user_{platform}`,其中 `{platform}` 是 `platforms` 表 `name` 字段经 `strtolower(str_replace(' ', '_', $name))` 标准化后的结果。 + +完整列表:`user_amazon_japan` / `user_coupang` / `user_dewu` / `user_douyin` / `user_goofish` / `user_jd` / `user_kaola` / `user_lazada` / `user_naver` / `user_offline` / `user_pdd` / `user_rakuten` / `user_redbook` / `user_shopee` / `user_shopify` / `user_tmall` / `user_wechat` / `user_wechat_video` / `user_youzan` + +**权限正则模板**(按平台名展开): ``` -用户名: user_douyin -密码: <安全密码> -VHost: datahub-app - -权限配置: -- Configure: (留空) - 不允许创建/删除资源 -- Write: ^douyin\.(exchange|errors\.exchange)$ - 只能写入 DouYin 的业务和错误 Exchange -- Read: ^douyin\.errors\..*$ - 只能读取 DouYin 错误相关的队列 +configure = ^{platform}\.(exchange|errors\.exchange)$ +write = ^{platform}\.(exchange|errors\.exchange)$ +read = ^{platform}\.errors\..*$ ``` -#### Tmall 平台账号 +示例(tmall): ``` -用户名: user_tmall -密码: <安全密码> -VHost: datahub-app - -权限配置: -- Configure: (留空) -- Write: ^tmall\.(exchange|errors\.exchange)$ -- Read: ^tmall\.errors\..*$ +configure = ^tmall\.(exchange|errors\.exchange)$ +write = ^tmall\.(exchange|errors\.exchange)$ +read = ^tmall\.errors\..*$ ``` -#### 其他平台账号 - -按相同模式创建: -- JD: `user_jd` -- RedBook: `user_redbook` -- Amazon Japan: `user_amazon` -- Naver: `user_naver` -- Rakuten: `user_rakuten` -- Shopify: `user_shopify` +**密码 env 命名**:`MQ_PASSWORD_{PLATFORM_UPPER}`,其中 `PLATFORM_UPPER` = 平台标准化名转大写。例:`MQ_PASSWORD_TMALL`、`MQ_PASSWORD_AMAZON_JAPAN`、`MQ_PASSWORD_WECHAT_VIDEO`。密码由部署期 render-rabbitmq.sh 生成;由 `mq_user.php` 动态从 `platforms` 表读取启用平台后注入。 ### 2. Datahub 消费者账号 -Datahub 应用的后端服务账号,负责消费业务队列并处理错误。 - ``` 用户名: user_datahub_consumer -密码: <安全密码> -VHost: datahub-app +密码: env MQ_PASSWORD_CONSUMER +VHost: datahub -权限配置: -- Configure: ^(orders|products|refunds|inventory)\.queue$ - 可以管理业务队列 -- Write: .*\.errors\.exchange$ - 可以向所有错误 Exchange 写入消息 -- Read: ^(orders|products|refunds|inventory)\.queue$ - 可以读取所有业务队列 +configure = ^(main\.exchange|errors\.exchange|dlx\..*)|(.*\.queue)$ +write = ^(orders|products|refunds|inventory).*\.queue$|(dlx\..*)|(errors\.exchange)|(.*\.errors\.exchange)$ +read = ^(main\.exchange|(orders|products|refunds|inventory).*\.queue|dlx\..*)$ ``` +> ⚠ 此权限正则比早期文档("只读业务队列")宽松:消费者需要 **configure** main.exchange / errors.exchange / dlx.* / 所有 queue(因为 Hyperf Consumer Annotation 在 worker 启动时 declare 这些资源),需要 **write** errors.exchange + platform.errors.exchange(因为 `ErrorProducer` 走这条),需要 **read** main.exchange + 业务队列 + dlx.*。 + ### 3. 运维监控账号 -运维团队账号,用于监控所有错误消息。 - ``` 用户名: user_ops -密码: <安全密码> -VHost: datahub-app +密码: env MQ_PASSWORD_OPS +VHost: datahub -权限配置: -- Configure: ^errors\..*$ - 可以管理错误相关资源 -- Write: (留空) - 不需要写入权限 -- Read: ^errors\.queue$ - 可以读取统一错误队列 +configure = ^errors\..*$ +write = (留空) +read = ^errors\.queue$ ``` +仅能读 `errors.queue` 做人工捞回;与 DB `failed_messages` 表(应用层提供 Admin UI / API 查询)互为冗余。 + ### 4. 管理员账号 -RabbitMQ 超级管理员,用于配置和维护。 - ``` 用户名: admin -密码: <强密码> -Tag: administrator +密码: <强密码,仅 ops/dev 持有> +Tag: administrator -权限配置: -- 对所有 VHost 具有完全权限 -- 可以访问管理界面 +management API: AMQP_ADMIN_USER / AMQP_ADMIN_PASSWORD +(见 backend/config/autoload/amqp.php "management" 段) ``` +完整权限,可访问管理界面。**生产环境**仅用于运维操作 / `rabbitmqadmin` CLI / mgmt API;不暴露给应用代码。 + --- ## 消息格式规范 ### 1. message_id 设计 -message_id 采用结构化格式,确保幂等性和可追溯性。 +message_id 采用结构化 5 段格式(见 [`OrderProducer::generateMessageId`](../backend/app/Platform/OrderProducer.php)): **格式**: + ``` -{company_id}#{platform_id}#{store_id}#{entity_type}#{request_time_range/formated_suffix} +{company_id}#{platform_id}#{store_id}#{entity_type}#{unique_id} ``` **字段说明**: -- `prefix`:项目名称前缀(如 `wpic-project1`) -- `app_id`:应用标识(固定为 `datahub`) -- `company_id`:公司 ID -- `platform_id`:平台 ID -- `store_id`:店铺 ID -- `entity_type`:数据实体类型(order/product/refund/inventory) -- `request_time_range/formated_suffix`:数据请求的时间区间/格式化的后缀,业务侧决定即可,需要业务侧维护幂等性 -**分隔符说明**: -- 使用 `#` 作为分隔符(RabbitMQ message_id 字段支持任意字符) -- `#` 在 routing key 中是通配符,但在 message_id 中是普通字符 +| 段 | 含义 | 来源 | +|---|---|---| +| `company_id` | 公司 ID | producer 输入参数 | +| `platform_id` | 平台 ID(数据库主键) | producer 输入参数 | +| `store_id` | 店铺 ID | producer 输入参数 | +| `entity_type` | 数据实体类型(order/product/refund/inventory),**单数** | 由 `routingKey` 前缀自动派生(`explode('.', $routingKey)[0]`)| +| `unique_id` | 业务侧定义的唯一标识 / 时间区间 / batch id 等,**业务侧维护幂等性** | producer 输入参数 | **示例**: -``` -wpic-project1#datahub#100#20#200#order#2025-11-10~2025-11-15 -解析: -- 项目前缀: wpic-project1 -- 应用: datahub -- 公司ID: 100 -- 平台ID: 20 (DouYin) -- 店铺ID: 200 -- 实体类型: order -- 平台订单号: DY123456789 ``` +188#2#292#order#68a3f2c4e9b1f +``` + +解析:company=188, platform=2 (tmall), store=292, entity=order, unique_id=`68a3f2c4e9b1f`(uniqid()) + +**分隔符**:`#`(RabbitMQ message_id 字段允许任意字符;不与 routing key 通配符冲突)。 **特性**: -- ✅ 幂等性:同一实体的 message_id 完全一致 -- ✅ 可读性:直接看出数据来源 -- ✅ 便于调试:可从 message_id 提取元数据 -- ✅ 无冲突:全局唯一 +- ✅ 幂等性:业务侧通过 `unique_id` 保证同一实体的 message_id 一致(消费端 upsert 唯一键由 EntityParse 配置) +- ✅ 可读性:直接从 message_id 看出数据来源 4 段元数据 +- ✅ 便于调试:日志 / 监控可直接根据前 4 段过滤 ### 2. 业务消息格式 +由 [`OrderProducer::buildMessage`](../backend/app/Platform/OrderProducer.php) 构造(基类,所有 platform Producer 共用): + ```json { - "message_id": "wpic-project1#datahub#100#20#200#order#DY123456", - "timestamp": "2025-01-14T10:30:00Z", - "platform": "douyin", + "message_id": "188#2#292#order#68a3f2c4e9b1f", + "timestamp": "2026-05-14T13:43:59+08:00", + "platform": "tmall", "data_type": "order", - "metadata": { - "platform_id": 20, - "company_id": 100, - "store_id": 200, - "source_system": "douyin-open-api", + "meta": { + "platform_id": 2, + "company_id": 188, + "store_id": 292, + "platform_store_id": null, + "unique_id": "68a3f2c4e9b1f", + "source_system": "tmall_api", "retry_count": 0, - "data_version": 1736840000 + "data_version": 1747203839 }, - "data": { - "platform_unique_id": "DY123456", - "raw_data": { - // 平台原始完整数据 - } - } + "data": [ + { /* 平台原始 raw_data 元素 1 */ }, + { /* 平台原始 raw_data 元素 2 */ } + ] } ``` **字段说明**: | 字段 | 类型 | 必需 | 说明 | -|------|------|------|------| -| `message_id` | string | 是 | 结构化消息 ID,用于幂等性 | -| `timestamp` | string | 是 | 消息生成时间(ISO 8601 格式) | -| `platform` | string | 是 | 平台标识(douyin/tmall/jd...) | -| `data_type` | string | 是 | 数据类型(order/product/refund/inventory) | -| `metadata.platform_id` | int | 是 | 平台 ID(数据库主键) | -| `metadata.company_id` | int | 是 | 公司 ID | -| `metadata.store_id` | int | 是 | 店铺 ID | -| `metadata.source_system` | string | 是 | 数据来源系统 | -| `metadata.retry_count` | int | 是 | 重试次数(初始为 0) | -| `metadata.data_version` | int | 是 | 数据版本(Unix 时间戳,用于防止乱序更新) | -| `data.platform_unique_id` | string | 是 | 平台侧唯一标识 | -| `data.raw_data` | object | 是 | 平台原始完整 JSON 数据 | +|---|---|---|---| +| `message_id` | string | 是 | 5 段结构化 ID(见 §1)| +| `timestamp` | string | 是 | ISO 8601 格式(`date('c')`)| +| `platform` | string | 是 | 平台标识(标准化后的小写下划线名);基类默认 `'platform_unkonw'`,子类 `buildMessage` 中覆盖(如 `TmallProductProducer` 覆盖为 `'tmall'`)| +| `data_type` | string | 是 | 数据类型(order/product/refund/inventory,**单数**);基类默认 `'order'`,子类覆盖 | +| `meta.platform_id` | int | 是 | 平台 ID(数据库主键,来自 `platforms` 表)| +| `meta.company_id` | int | 是 | 公司 ID | +| `meta.store_id` | int | 是 | 店铺 ID | +| `meta.platform_store_id` | string\|null | 否 | 平台侧店铺标识(如有)| +| `meta.unique_id` | string | 是 | 业务唯一标识(与 message_id 末段一致)| +| `meta.source_system` | string | 是 | 数据来源系统(基类默认 `'origin_unknow'`,子类覆盖如 `'tmall_api'`)| +| `meta.retry_count` | int | 是 | 重试次数(初始 0;消费端通过 AMQP `x-death[0].count` 读取真实值,不读 meta)| +| `meta.data_version` | int | 是 | 数据版本(Unix 时间戳,用于乱序消费保护)| +| `data` | array | 是 | 平台原始数据数组(基类直接传 `$data['raw_data']`)| -**重要说明**: +> ⚠ **字段名是 `meta` 不是 `metadata`**(与早期文档草稿不同)。消费端 `OrderConsumer.php:86` 读 `$data['meta']`。 -- **data_version**:必须使用平台数据的更新时间戳(Unix 时间戳),用于解决消息乱序消费问题 - - 示例:平台订单的最后更新时间 `1736840000`(2025-01-14 10:00:00 UTC) - - 消费端只有当 `data_version` 大于数据库中的值时才更新数据 - - 如果平台未提供更新时间,使用消息生成时间戳 +**关于 `data_version`**:建议使用平台数据的更新时间戳,消费端可据此防止旧版本覆盖新版本;如平台未提供则 `OrderProducer::buildMessage` fallback 到 `time()`。 ### 3. 错误消息格式 -错误消息由 datahub 消费者生成,包含原始消息和错误详情。 +由 [`ErrorProducer::buildErrorMessage`](../backend/app/Platform/ErrorProducer.php) 构造(投递到 `errors.exchange` rk=`error`): ```json { - "error_id": "err_1234567890abcdef", + "error_id": "err_68f3d2c4abc1f.93214785", "original_message": { - // 原始业务消息完整内容 + /* 完整原始业务消息(含 meta / data)*/ }, "error": { - "type": "validation", - "message": "Missing required field: total_amount", + "type": "InvalidArgumentException", + "message": "Company with ID 188 not found", + "code": 0, + "file": "/var/www/app/Platform/Tmall/EntityParse/Product.php", + "line": 42, "trace": "Exception stack trace...", - "timestamp": "2025-01-14T10:31:00Z" + "timestamp": "2026-05-14T13:31:00+08:00" }, "metadata": { - "platform": "amazon", - "platform_id": 1, - "company_id": 100, - "store_id": 200, + "platform": "tmall", + "platform_id": 2, + "company_id": 188, + "store_id": 292, "data_type": "order", - "failed_at": "2025-01-14T10:31:00Z" + "message_id": "188#2#292#order#68a3f2c4e9b1f", + "failed_at": "2026-05-14T13:31:00+08:00", + "retry_count": 3 } } ``` +> 注意:错误消息的字段名是 `metadata`(与业务消息的 `meta` 不一致;保留以兼容现有应用代码 `FailedMessageTrait::persistFailedMessage` 的解析)。 + +**双写到 DB**:错误消息同步写入 [`failed_messages`](../backend/app/Model/FailedMessage.php) 表(model 字段:error_id / data_type / platform / platform_id / company_id / store_id / error_type / error_message / error_code / error_trace / original_message(json) / retry_count / message_id / failed_at / created_at)。Admin 可通过 [`FailedMessageController`](../backend/app/Controller/Api/V1/FailedMessageController.php) 查询。 + --- ## RabbitMQ 管理界面操作指引 @@ -909,15 +889,24 @@ wpic-project1#datahub#100#20#200#order#2025-11-10~2025-11-15 ``` URL: http://:15672 -用户名: admin +用户名: admin(开发期默认 admin/admin;生产期通过 podman secret / env 注入) 密码: <管理员密码> ``` +**推荐使用 Hyperf 命令**(避开浏览器,看核心 9 个队列状态): + +```bash +php ./bin/hyperf.php app:mq:status # 单次输出 +php ./bin/hyperf.php app:mq:status --watch --interval=5 # 每 5 秒刷新 +``` + +输出包含三组:Business Queues(4 业务)/ Dead Letter Queues (Retry Queues)(4 retry)/ Shared Queues(errors),分别展示 Messages / Consumers / Status。 + ### 2. 创建 VHost 1. 点击顶部导航 **Admin** → **Virtual Hosts** 2. 点击 **Add a new virtual host** -3. 输入 VHost 名称:`datahub-app` +3. 输入 VHost 名称:`datahub` 4. 点击 **Add virtual host** ### 3. 创建用户 @@ -925,75 +914,92 @@ URL: http://:15672 1. 点击顶部导航 **Admin** → **Users** 2. 点击 **Add a user** 3. 填写用户信息: - - **Username**: `user_amazon` + - **Username**: `user_tmall`(按 `user_{platform}` 规范) - **Password**: `<安全密码>` - **Tags**: (留空,非管理员用户) 4. 点击 **Add user** ### 4. 配置用户权限 -1. 在用户列表中,点击用户名(如 `user_amazon`) +1. 在用户列表中,点击用户名(如 `user_tmall`) 2. 滚动到 **Permissions** 部分 -3. 选择 VHost:`datahub-app` -4. 配置权限正则表达式: - - **Configure regexp**: (留空) - - **Write regexp**: `^amazon\.(exchange|errors\.exchange)$` - - **Read regexp**: `^amazon\.errors\..*$` +3. 选择 VHost:`datahub` +4. 配置权限正则表达式(与现网模板一致): + - **Configure regexp**: `^tmall\.(exchange|errors\.exchange)$` + - **Write regexp**: `^tmall\.(exchange|errors\.exchange)$` + - **Read regexp**: `^tmall\.errors\..*$` 5. 点击 **Set permission** ### 5. 创建 Exchange 1. 点击顶部导航 **Exchanges** -2. 确保选择正确的 VHost:`datahub-app` +2. 确保选择正确的 VHost:`datahub` 3. 点击 **Add a new exchange** 4. 填写配置: - - **Name**: `amazon.exchange` + - **Name**: `tmall.exchange` - **Type**: `topic` - **Durability**: `Durable` - **Auto delete**: `No` - **Internal**: `No` 5. 点击 **Add exchange** -重复以上步骤创建所有业务 Exchange 和错误 Exchange。 +重复以上步骤创建:1 个 `main.exchange` + 4 个 `dlx.{dtype}` + 1 个 `errors.exchange` + 每平台 2 个(`{platform}.exchange` 与 `{platform}.errors.exchange`)。 ### 6. 创建 Queue 1. 点击顶部导航 **Queues** -2. 确保选择正确的 VHost:`datahub-app` +2. 确保选择正确的 VHost:`datahub` 3. 点击 **Add a new queue** -4. 填写配置: +4. 填写配置(以 `orders.queue` 为例): - **Name**: `orders.queue` - **Durability**: `Durable` - **Auto delete**: `No` - **Arguments**: - - `x-message-ttl`: `86400000` (24小时,单位:毫秒) + - `x-message-ttl` = `86400000`(24h) + - `x-dead-letter-exchange` = `dlx.orders` + - `x-dead-letter-routing-key` = `retry` + - `x-queue-type` = `classic` 5. 点击 **Add queue** -重复以上步骤创建 `products.queue`、`refunds.queue` 和 `errors.queue`。 +四个业务队列同模式;retry 队列 arguments: -**注意**:`errors.queue` 的 TTL 设置为 `604800000`(7天)。 +``` +x-message-ttl = 5000 # 5s +x-dead-letter-exchange = main.exchange +x-dead-letter-routing-key = {entity_singular}.retry # 如 order.retry +x-queue-type = classic +``` + +`errors.queue` arguments: + +``` +x-message-ttl = 604800000 # 7d +x-queue-type = classic +``` ### 7. 创建 Binding 1. 点击顶部导航 **Exchanges** -2. 点击需要绑定的 Exchange(如 `amazon.exchange`) +2. 点击需要绑定的 Exchange(如 `tmall.exchange`) 3. 滚动到 **Bindings** 部分 4. 在 **Add binding from this exchange** 区域填写: - **To queue**: `orders.queue` - - **Routing key**: `order` + - **Routing key**: `order.tmall` 5. 点击 **Bind** -为每个 Exchange 创建 3 个绑定(order/product/refund)。 +为每个平台 Exchange 创建 **4 个 binding**(order/product/refund/inventory)。 -对于错误 Exchange,绑定配置: -- **To queue**: `errors.queue` -- **Routing key**: `#` +`main.exchange` 创建 4 个 binding(rk: `order.#` / `product.#` / `refund.#` / `inventory.#`)。 + +每个 `{platform}.errors.exchange` + 全局 `errors.exchange` 各创建 1 个绑定到 `errors.queue`(rk=`#`)。 + +DLX 各创建 1 个绑定到对应 retry queue(rk=`retry`)。**不**绑定 errors.queue。 --- ## 配置脚本 -为简化配置过程,可以使用以下脚本自动创建资源。 +> **生产环境推荐**:使用 `backend/bin/rabbitmq.sh`(与现网完全一致的脚本,从 `platforms` 表动态读取平台列表)。下方独立脚本仅供**临时手工部署 / 故障排查** 使用。 ### 使用 rabbitmqadmin @@ -1022,8 +1028,13 @@ RABBITMQ_USER="admin" RABBITMQ_PASS="admin" VHOST="datahub" -# 平台列表 -PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify") +# 平台列表(与现网 19 个对齐;新增平台时追加即可) +PLATFORMS=( + "amazon_japan" "coupang" "dewu" "douyin" "goofish" + "jd" "kaola" "lazada" "naver" "offline" + "pdd" "rakuten" "redbook" "shopee" "shopify" + "tmall" "wechat" "wechat_video" "youzan" +) DATA_TYPES=("orders" "products" "refunds" "inventory") # 0. 检测并清理现有 VHost 和用户(如果存在) @@ -1111,10 +1122,11 @@ done echo "" echo "创建重试队列..." for dtype in "${DATA_TYPES[@]}"; do + dtype_singular="${dtype%s}" rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable \ - --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}" - echo "✓ 创建重试队列: ${dtype}.retry.queue" + --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}" + echo "✓ 创建重试队列: ${dtype}.retry.queue (dl-rk: ${dtype_singular}.retry)" done # 6. 创建错误队列 @@ -1136,23 +1148,26 @@ for dtype in "${DATA_TYPES[@]}"; do echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)" done -# 8. 绑定 DLX 到重试队列和错误队列 +# 8. 绑定 DLX 到重试队列(注意:现网 DLX 不绑定 errors.queue,错误由应用层 ErrorProducer 直接推 errors.exchange) echo "" -echo "绑定 DLX 到重试队列和错误队列..." +echo "绑定 DLX 到重试队列..." for dtype in "${DATA_TYPES[@]}"; do - # DLX -> 重试队列 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \ --destination-type queue --vhost "$VHOST" --routing-key "retry" echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)" - - # DLX -> 错误队列 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "dlx.${dtype}" --destination "errors.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "error" - echo "✓ 绑定: dlx.${dtype} → errors.queue (routing_key: error)" done +# 8b. 创建全局 errors.exchange 并绑定到 errors.queue +echo "" +echo "创建全局错误 Exchange..." +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "errors.exchange" --vhost "$VHOST" --type topic --durable true +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "#" +echo "✓ 绑定: errors.exchange → errors.queue (routing_key: #)" + # 9. 为每个平台创建 Exchange 和 Binding echo "" echo "========================================" @@ -1223,9 +1238,9 @@ echo "✓ 创建用户: user_datahub_consumer" rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permissions --vhost "$VHOST" --user "user_datahub_consumer" \ - --configure "^(orders|products|refunds|inventory).*\\.queue$" \ - --write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \ - --read "^(orders|products|refunds|inventory).*\\.queue$" + --configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \ + --write "^(orders|products|refunds|inventory).*\\.queue\$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)\$" \ + --read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)\$" echo "✓ 配置用户权限" # 11. 创建运维监控用户 @@ -1248,14 +1263,16 @@ echo "" echo "已创建:" echo "- 1 个 VHost: $VHOST" echo "- 1 个主 Exchange: main.exchange" +echo "- 1 个全局错误 Exchange: errors.exchange" echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory" echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue" echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue" echo "- 1 个错误队列: errors.queue" -echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)" +echo "- $((${#PLATFORMS[@]} * 2)) 个平台 Exchange (${#PLATFORMS[@]} 业务 + ${#PLATFORMS[@]} 错误)" echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)" echo "" echo "提示: 请修改所有用户的默认密码 (change_me_*)" +echo "提示: 验证拓扑:php ./bin/hyperf.php app:mq:status" ``` #### 执行脚本 @@ -1354,7 +1371,7 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ - 启用 lazy queue(大量消息堆积时): ```bash - rabbitmqadmin declare queue name=orders.queue vhost=datahub-app \ + rabbitmqadmin declare queue name=orders.queue vhost=datahub \ durable=true arguments='{"x-queue-mode":"lazy"}' ``` - 定期清理过期消息(通过 TTL 自动实现) @@ -1403,212 +1420,188 @@ tail -f /var/log/rabbitmq/rabbit@_connection.log ## 新增平台接入流程 -当需要接入新平台(例如 Lazada)时,按以下步骤操作: +> **首选路径**:使用 `backend/bin/rabbitmq.sh add `(自动化、与现网一致);下方手工脚本仅作参考。 -### 1. 创建 Exchange +新增平台(示例 `` = 标准化后的小写下划线名): + +### 1. 创建 Exchange(业务 + 错误) ```bash -# 业务 Exchange -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "lazada.exchange" --vhost datahub-app \ - --type topic --durable true +P="" # 例如 "tiktok_shop" -# 错误 Exchange -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "lazada.errors.exchange" --vhost datahub-app \ - --type topic --durable true +rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \ + declare exchange --name "${P}.exchange" --vhost datahub --type topic --durable true + +rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \ + declare exchange --name "${P}.errors.exchange" --vhost datahub --type topic --durable true ``` -### 2. 创建 Binding +### 2. 创建 5 条 Binding(4 业务 + 1 错误) ```bash -# 绑定到业务队列 -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "lazada.exchange" --destination "orders.queue" \ - --destination-type queue --vhost datahub-app --routing-key "order.lazada" +for DTYPE in orders products refunds inventory; do + DTYPE_SINGULAR="${DTYPE%s}" + rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \ + declare binding --source "${P}.exchange" --destination "${DTYPE}.queue" \ + --destination-type queue --vhost datahub \ + --routing-key "${DTYPE_SINGULAR}.${P}" +done -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "lazada.exchange" --destination "products.queue" \ - --destination-type queue --vhost datahub-app --routing-key "product.lazada" - -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "lazada.exchange" --destination "refunds.queue" \ - --destination-type queue --vhost datahub-app --routing-key "refund.lazada" - -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "lazada.exchange" --destination "inventory.queue" \ - --destination-type queue --vhost datahub-app --routing-key "inventory.lazada" - -# 绑定到错误队列 -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "lazada.errors.exchange" --destination "errors.queue" \ - --destination-type queue --vhost datahub-app --routing-key "#" +rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \ + declare binding --source "${P}.errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost datahub --routing-key "#" ``` ### 3. 创建用户并配置权限 ```bash -# 创建用户 -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user --name "user_lazada" --password "" --tags "" +rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \ + declare user --name "user_${P}" --password "" --tags "" -# 配置权限 -rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permissions --vhost datahub-app --user "user_lazada" \ - --configure "^lazada\\.(exchange|errors\\.exchange)$" \ - --write "^lazada\\.(exchange|errors\\.exchange)$" \ - --read "^lazada\\.errors\\..*$" +rabbitmqadmin -H 127.0.0.1 -P 15672 -u admin -p admin \ + declare permissions --vhost datahub --user "user_${P}" \ + --configure "^${P}\\.(exchange|errors\\.exchange)\$" \ + --write "^${P}\\.(exchange|errors\\.exchange)\$" \ + --read "^${P}\\.errors\\..*\$" ``` -### 4. 提供给平台开发者 +### 4. 同步到 backend -将以下信息提供给新平台的开发团队: +1. **DB**:`INSERT INTO platforms (name, enabled, ...) VALUES ('', true, ...)`,确保 `mq_user.php` 动态读取时能匹配到 +2. **env**:在 backend 的 `.env`(或 host 端 `datahub-backend.env`)追加 `MQ_PASSWORD_=` +3. **重启 backend**:`systemctl --user restart datahub-backend`,让 Hyperf worker 重新加载 mq_user.php 与 env + +### 5. 提供给平台开发者 ``` RabbitMQ 连接信息: - Host: - Port: 5672 -- VHost: datahub-app -- Username: user_lazada +- VHost: datahub +- Username: user_ - Password: 发布配置: -- Exchange: lazada.exchange +- Exchange: .exchange - Routing Keys: - - order: 发布订单数据 - - product: 发布产品数据 - - refund: 发布退款数据 - - inventory: 发布库存数据 + - order. → 发布订单数据 + - product. → 发布产品数据 + - refund. → 发布退款数据 + - inventory. → 发布库存数据 -错误订阅(可选): -- Exchange: lazada.errors.exchange -- 接收本平台的错误消息 +错误投递(可选): +- Exchange: .errors.exchange +- Routing Key: 任意(由 binding `#` 兜底) +- 用途:平台侧主动上报无法处理的消息 / 校验失败的数据 消息格式规范: -- 参见「消息格式规范」章节 +- 参见「消息格式规范」章节(5 段 message_id;meta 字段单数;data_version 时间戳防乱序) ``` --- ## 附录 -### A. 完整资源清单 +### A. 完整资源清单(与本地 MQ 实测对齐 — 2026-05-14) #### VHost -- **总数**: 1 个 -- `datahub-app` + +- **总数**:1 个 +- `datahub` #### Exchanges -**主 Exchange**: 1 个 -- `main.exchange` (Topic) - 统一主交换机,用于路由和重试消息回流 +| 类别 | 数量 | 名称 | +|---|---|---| +| 主路由 | 1 | `main.exchange`(topic)| +| 全局错误 | 1 | `errors.exchange`(topic)| +| DLX | 4 | `dlx.orders` / `dlx.products` / `dlx.refunds` / `dlx.inventory`(全 topic)| +| 平台业务 | 19 | `{platform}.exchange` × 19(全 topic)| +| 平台错误 | 19 | `{platform}.errors.exchange` × 19(全 topic)| -**平台业务 Exchanges**: 8 个(每平台 1 个) -- `douyin.exchange` -- `jd.exchange` -- `tmall.exchange` -- `redbook.exchange` -- `amazon.exchange` -- `naver.exchange` -- `rakuten.exchange` -- `shopify.exchange` - -**平台错误 Exchanges**: 8 个(每平台 1 个) -- `douyin.errors.exchange` -- `jd.errors.exchange` -- `tmall.errors.exchange` -- `redbook.errors.exchange` -- `amazon.errors.exchange` -- `naver.errors.exchange` -- `rakuten.errors.exchange` -- `shopify.errors.exchange` - -**死信交换机 (DLX)**: 4 个(每数据类型 1 个) -- `dlx.orders` -- `dlx.products` -- `dlx.refunds` -- `dlx.inventory` - -**Exchange 总数**: **21 个** (1 主 + 8 平台业务 + 8 平台错误 + 4 DLX) +**Exchange 业务总数**:**44 个**(不含 `amq.*` 系统 exchange 7 个) #### Queues -**主业务队列**: 4 个 -- `orders.queue` (配置 DLX: dlx.orders) -- `products.queue` (配置 DLX: dlx.products) -- `refunds.queue` (配置 DLX: dlx.refunds) -- `inventory.queue` (配置 DLX: dlx.inventory) +| 类别 | 数量 | 名称 + 关键 arguments | +|---|---|---| +| 业务队列 | 4 | `{dtype}.queue`:ttl=24h, dl-exchange=`dlx.{dtype}`, dl-rk=`retry`, type=classic | +| 重试队列 | 4 | `{dtype}.retry.queue`:ttl=5s, dl-exchange=`main.exchange`, dl-rk=`{entity_singular}.retry`, type=classic | +| 错误队列 | 1 | `errors.queue`:ttl=7d, 无 DLX, type=classic | -**延迟重试队列**: 4 个 -- `orders.retry.queue` (TTL: 5秒, DLX: main.exchange) -- `products.retry.queue` (TTL: 5秒, DLX: main.exchange) -- `refunds.retry.queue` (TTL: 5秒, DLX: main.exchange) -- `inventory.retry.queue` (TTL: 5秒, DLX: main.exchange) - -**错误队列**: 1 个 -- `errors.queue` (TTL: 7天) - -**Queue 总数**: **9 个** (4 主队列 + 4 重试队列 + 1 错误队列) +**Queue 总数**:**9 个**(与 `php ./bin/hyperf.php app:mq:status` 输出一致) #### Bindings -**主 Exchange 绑定**: 4 个 -- `main.exchange` → `orders.queue` (routing_key: `order.#`) -- `main.exchange` → `products.queue` (routing_key: `product.#`) -- `main.exchange` → `refunds.queue` (routing_key: `refund.#`) -- `main.exchange` → `inventory.queue` (routing_key: `inventory.#`) +| 类别 | 数量 | 路径 | +|---|---|---| +| 平台业务 → 业务队列 | 76 | `{platform}.exchange` —rk=`{entity}.{platform}`→ `{dtype}.queue`(19 × 4)| +| main → 业务队列 | 4 | `main.exchange` —rk=`{entity}.#`→ `{dtype}.queue` | +| DLX → 重试队列 | 4 | `dlx.{dtype}` —rk=`retry`→ `{dtype}.retry.queue`(**仅此一种**,**不**绑定 errors.queue)| +| 错误 exchange → errors.queue | 20 | `errors.exchange` 1 条 + `{platform}.errors.exchange` × 19 条,全部 rk=`#` | -**平台业务 Exchange 绑定**: 32 个 -- 每个平台 Exchange → 4 个业务队列 (8 × 4 = 32) -- 使用 routing key 格式:`{data_type}.{platform}` - - 例如:`order.douyin`, `product.tmall`, `refund.jd` 等 +**Binding 业务总数**:**104 条**(不含 default exchange 自动同名 binding 9 条) -**平台错误 Exchange 绑定**: 8 个 -- 每个平台错误 Exchange → `errors.queue` (routing_key: `#`) +> 死信回流路径(retry → main)不是 binding,而是 retry queue 的 `x-dead-letter-exchange` arguments 字段,RabbitMQ 引擎在 TTL 到期时自动投递。 -**DLX 绑定**: 8 个 -- 每个 DLX → 对应重试队列 (routing_key: `retry`) - 4 个 -- 每个 DLX → `errors.queue` (routing_key: `error`) - 4 个 +#### Users(vhost `datahub`) -**Binding 总数**: **52 个** (4 主 + 32 平台业务 + 8 平台错误 + 8 DLX) +| 类别 | 数量 | 命名 | 权限说明 | +|---|---|---|---| +| 平台开发者 | 19 | `user_{platform}` | 仅写自己的 `{platform}.exchange` / `{platform}.errors.exchange`,读自己的 errors(最小权限)| +| Datahub 消费者 | 1 | `user_datahub_consumer` | configure/write/read 见 §"消费者账号"现网正则 | +| 运维监控 | 1 | `user_ops` | 仅读 `errors.queue` | +| 管理员 | 1 | `admin` | tag=administrator,全权限 | -#### Users +**User 总数**:**22 个** -**平台开发者账号**: 8 个 -- `user_douyin`, `user_jd`, `user_tmall`, `user_redbook`, `user_amazon`, `user_naver`, `user_rakuten`, `user_shopify` -- 权限:只能写入自己的业务和错误 Exchange,只能读取自己的错误队列 - -**消费者账号**: 1 个 -- `user_datahub_consumer` -- 权限:可以管理所有业务队列和重试队列,可以写入 DLX 和错误 Exchange,可以读取所有业务队列 - -**运维监控账号**: 1 个 -- `user_ops` -- 权限:可以管理错误相关资源,可以读取错误队列 - -**管理员账号**: 1 个 -- `admin` -- 权限:完全权限,可以访问管理界面 - -**User 总数**: **11 个** (8 平台 + 1 消费者 + 1 运维 + 1 管理员) +完整平台用户清单: +``` +user_amazon_japan, user_coupang, user_dewu, user_douyin, user_goofish, +user_jd, user_kaola, user_lazada, user_naver, user_offline, +user_pdd, user_rakuten, user_redbook, user_shopee, user_shopify, +user_tmall, user_wechat, user_wechat_video, user_youzan +``` #### 架构统计 | 资源类型 | 数量 | 说明 | |---------|------|------| -| VHost | 1 | datahub-app | -| Exchange | 21 | 1 主 + 8 平台业务 + 8 平台错误 + 4 DLX | -| Queue | 9 | 4 主 + 4 重试 + 1 错误 | -| Binding | 52 | 完整消息流转路径 | -| User | 11 | 多角色权限隔离 | +| VHost | 1 | `datahub` | +| Exchange | 44 | 1 main + 1 全局 errors + 4 DLX + 19 平台业务 + 19 平台错误 | +| Queue | 9 | 4 业务 + 4 retry + 1 errors | +| Binding | 104 | 见上表(不含 default exchange 自动 binding)| +| User | 22 | 19 平台 + 1 消费者 + 1 运维 + 1 管理员 | + +**消息流转路径(双层兜底)**: -**消息流转路径**: ``` -生产者 → 平台 Exchange → 主队列 → 消费者 - ↓ (失败) - DLX → 重试队列 (5秒TTL) → 主 Exchange → 主队列 - ↓ (超过3次) - DLX → 错误队列 (人工处理) +生产者 → {platform}.exchange ──rk={entity}.{platform}──► {dtype}.queue ──► Consumer + │ + 消费失败 (count < max) ↓ + ◄──── NACK + │ + ▼ + dlx.{dtype} ──rk=retry──► {dtype}.retry.queue + │ TTL=5s + ▼ + main.exchange (rk={entity}.retry) + │ + ▼ + {dtype}.queue (重新消费, count+1) + │ + 消费失败 (count >= max) + │ + FailedMessageTrait::sendToErrorQueue + │ + ┌───────────────────────┼───────────────────────┐ + ▼ ▼ + ErrorProducer ──► errors.exchange ──► errors.queue FailedMessage::create + (MQ 兜底, TTL 7d, user_ops 捞回) (DB 兜底, failed_messages 表) + │ + ▼ + Consumer 返回 ACK + (避免主队列继续重试) ``` ### B. 参考资料