Files
datahub/docs/RabbitMQ.md
T
2025-11-14 16:39:00 +08:00

44 KiB
Raw Blame History

RabbitMQ 配置方案

概述

本文档描述 dataflow 应用的 RabbitMQ 消息队列架构设计和配置方法。

业务背景

  • 应用角色:dataflow 是消息消费者,负责接收来自多个电商平台的数据并入库
  • 数据来源:多个电商平台(DouYin、JD、Tmall、RedBook、Amazon Japan、Naver、Rakuten、Shopify 等)
  • 数据类型:订单(orders)、产品(products)、退款(refunds)、库存(inventory
  • 隔离需求:不同平台开发者之间的工作空间需要相互隔离,互不影响

架构特点

  • VHost 隔离:按应用划分 VHost,支持多业务复用, 使用 Classic Vhost 模式
  • Exchange 隔离:每个平台独立 Exchange,通过权限控制访问
  • 单队列设计:每种数据类型一个队列,所有平台共享(orders.queue/products.queue/refunds.queue/inventory.queue),保证严格 FIFO 顺序
  • 独立消费者:每种数据类型配备独立消费者,采用批处理 + prefetch + 适配器模式,提升吞吐与稳定性
  • 智能重试:DLX(死信交换机)+ 延迟重试队列 + 错误队列,优雅处理失败消息,不阻塞主队列
  • 消息持久化RabbitMQ 充当"远程平台数据的本地缓存",避免重复调用远端 API

设计方案总结

核心设计理念

本方案采用 Topic Exchange + 单队列统一路由 + 单消费者批处理 + 单条 ACK + DLX 智能重试 + 适配器模式,实现跨平台数据同步的高可靠性、高吞吐与强一致性架构。

关键设计决策

1. 为什么使用单队列?

问题 传统多队列方案 本方案(单队列)
队列数量 8平台 × 4数据类型 = 32个队列 4个数据类型队列(队列数量减少 87.5%)
FIFO 保证 同类型数据可能分散在多个队列 严格 FIFO,同类型数据都在一个队列
运维复杂度 需要监控 32 个队列 只需监控 4 个主队列
消费者部署 可能需要 32 个消费者实例 只需 4 个消费者实例

原因:RabbitMQ 无法在队列内部按 topic 再过滤,拆分队列会造成队列爆炸和业务乱序。

2. 为什么使用单消费者?

原因:RabbitMQ 在同一个队列内使用多消费者时,必然采用 Round-Robin 均匀分配消息,无法按 topic 分发到不同消费者。

优势

  • 保证严格 FIFO 顺序
  • 避免消息错投到错误的消费者
  • 简化部署和运维

如何实现多平台支持? 通过消费者内部的适配器模式,根据消息的 platform 字段分发到不同业务逻辑。

3. 为什么使用 Prefetch + 批处理?

原因RabbitMQ 是 Push 模式,不提供 Kafka 式的批量拉取 API。

解决方案

  • 设置 prefetch_count=100RabbitMQ 最多推送 100 条未确认消息
  • 消费者自行缓存消息,达到 batch_size=50timeout=3s 后批量处理
  • 实现"伪批量拉取"效果,同时保留单条 ACK 的灵活性

优势

  • 提升数据库写入效率(批量插入)
  • 控制并发和内存压力
  • 失败消息不影响其他消息(单条 ACK)

4. 为什么使用 DLX + 延迟重试队列?

问题:单队列 FIFO 中,如果某条消息持续失败,会阻塞后续消息消费。

传统方案的问题

  • nack(requeue=true):消息回队列头部,无限循环
  • 直接丢弃:数据丢失
  • 立即重试:打爆 API/DB

本方案(DLX + 延迟重试)

消费失败 → nack(requeue=false) → 进入 DLX
           ↓
    x-retries < 3 → 重试队列 (TTL=5s) → 自动死信回主队列
           ↓
    x-retries >= 3 → 错误队列 (人工处理)

优势

  • 不会卡 FIFO
  • 不会无限重试
  • 延迟重试避免打爆 API/DB
  • 错误消息自动隔离
  • 无需额外 Redis 或手写 retry 逻辑

方案优势总结

维度 优势
可靠性 消息持久化 + 单条 ACK + 智能重试,避免数据丢失
性能 Prefetch + 批处理 + 数据库批量写入,提升吞吐
顺序性 单队列 FIFO 保证严格顺序
扩展性 适配器模式,新增平台只需添加适配器类
运维 队列数量少,监控简单,故障排查容易
容错 DLX 智能重试,失败消息不阻塞主队列

适用场景

适合

  • 多平台数据同步(电商、物流、金融等)
  • 需要严格 FIFO 顺序
  • 需要高可靠性和高吞吐
  • 平台数量多(> 5个),数据类型固定(< 10个)

不适合

  • 实时性要求极高(< 100ms
  • 需要按平台独立控制消费速率
  • 平台数量少(< 3个),可以使用独立队列

架构设计

1. VHost 设计

VHost 按业务应用进行划分,便于资源隔离和权限管理。

RabbitMQ 实例
├── dataflow-app          # Dataflow 应用专用
├── analytics-app         # 分析应用(预留)
└── warehouse-app         # 数据仓库应用(预留)

当前需要创建的 VHost

  • dataflow-app:用于电商数据流转

2. Exchange 设计

dataflow-app VHost 中,Exchange 分为两类:

业务 Exchange(用于数据投递)

每个平台一个独立的 Topic Exchange

Exchange 名称 类型 持久化 用途
douyin.exchange Topic DouYin(抖音)平台数据入口
jd.exchange Topic JD(京东)平台数据入口
tmall.exchange Topic Tmall(天猫)平台数据入口
redbook.exchange Topic RedBook(小红书)平台数据入口
amazon.exchange Topic Amazon Japan 平台数据入口
naver.exchange Topic Naver 平台数据入口
rakuten.exchange Topic Rakuten(乐天)平台数据入口
shopify.exchange Topic Shopify 平台数据入口

Routing Key 规范(采用 {data_type}.{platform} 格式):

平台 订单 产品 退款 库存
DouYin order.douyin product.douyin refund.douyin inventory.douyin
JD order.jd product.jd refund.jd inventory.jd
Tmall order.tmall product.tmall refund.tmall inventory.tmall
RedBook order.redbook product.redbook refund.redbook inventory.redbook
Amazon order.amazon product.amazon refund.amazon inventory.amazon
Naver order.naver product.naver refund.naver inventory.naver
Rakuten order.rakuten product.rakuten refund.rakuten inventory.rakuten
Shopify order.shopify product.shopify refund.shopify inventory.shopify

设计说明

  • 使用 Topic Exchange 的通配符特性,队列可通过 order.# 匹配所有平台的订单
  • 保证单队列内所有平台消息的 FIFO 顺序
  • 消费者通过解析 routing key 或消息体的 platform 字段进行业务分发

错误 Exchange(用于错误通知)

每个平台一个独立的错误 Topic Exchange

Exchange 名称 类型 持久化 用途
douyin.errors.exchange Topic DouYin 平台错误消息
jd.errors.exchange Topic JD 平台错误消息
tmall.errors.exchange Topic Tmall 平台错误消息
redbook.errors.exchange Topic RedBook 平台错误消息
amazon.errors.exchange Topic Amazon Japan 平台错误消息
naver.errors.exchange Topic Naver 平台错误消息
rakuten.errors.exchange Topic Rakuten 平台错误消息
shopify.errors.exchange Topic Shopify 平台错误消息

Routing Key 规范

  • validation - 数据格式验证错误
  • processing_failed - 业务处理失败
  • system - 系统错误

3. Queue 设计

业务队列(主队列)

物理队列数量固定为 4 个,按数据类型划分,每个队列接收所有平台该类型的数据:

Queue 名称 持久化 自动删除 消息 TTL DLX 用途
orders.queue 24小时 dlx.orders 接收所有平台的订单数据,单消费者批处理
products.queue 24小时 dlx.products 接收所有平台的产品数据,单消费者批处理
refunds.queue 24小时 dlx.refunds 接收所有平台的退款数据,单消费者批处理
inventory.queue 24小时 dlx.inventory 接收所有平台的库存数据,单消费者批处理

队列参数配置

{
  "x-message-ttl": 86400000,
  "x-dead-letter-exchange": "dlx.orders",
  "x-dead-letter-routing-key": "retry"
}

死信交换机(DLX

每种数据类型配备一个 DLX,用于处理失败消息的路由:

Exchange 名称 类型 持久化 用途
dlx.orders Topic 订单失败消息路由
dlx.products Topic 产品失败消息路由
dlx.refunds Topic 退款失败消息路由
dlx.inventory Topic 库存失败消息路由

延迟重试队列

为每种数据类型提供延迟重试机制:

Queue 名称 持久化 消息 TTL DLX 用途
orders.retry.queue 5秒 main.exchange 订单延迟重试(TTL后自动回到主队列)
products.retry.queue 5秒 main.exchange 产品延迟重试
refunds.retry.queue 5秒 main.exchange 退款延迟重试
inventory.retry.queue 5秒 main.exchange 库存延迟重试

重试队列参数配置

{
  "x-message-ttl": 5000,
  "x-dead-letter-exchange": "main.exchange",
  "x-dead-letter-routing-key": "order.{platform}"
}

重试机制说明

  1. 消费失败时,消费者执行 nack(requeue=false),消息进入 DLX
  2. DLX 根据消息 header 中的 x-retries 判断路由:
    • x-retries < 3:路由到 retry.queue(延迟重试)
    • x-retries >= 3:路由到 errors.queue(永久失败)
  3. 重试队列 TTL 到期后,消息自动死信回主 Exchange,重新进入主队列
  4. 最大重试次数:3次,延迟时间:5秒

错误队列

统一的错误队列,接收所有超过重试次数的失败消息:

Queue 名称 持久化 自动删除 消息 TTL 用途
errors.queue 7天 接收所有类型、所有平台的最终失败消息,供人工排查

4. Binding 配置

主 Exchange (用于接收各平台数据)

需要创建一个统一的主 Exchange,用于重试队列的消息回流:

Exchange 名称 类型 持久化 用途
main.exchange Topic 统一主交换机,用于路由和重试消息回流

业务队列绑定(主队列)

方案1:直接绑定(推荐) 每个平台的 Exchange 使用带平台标识的 routing key 绑定到对应数据类型队列:

douyin.exchange
├── [routing_key: order.douyin] → orders.queue
├── [routing_key: product.douyin] → products.queue
├── [routing_key: refund.douyin] → refunds.queue
└── [routing_key: inventory.douyin] → inventory.queue

tmall.exchange
├── [routing_key: order.tmall] → orders.queue
├── [routing_key: product.tmall] → products.queue
├── [routing_key: refund.tmall] → refunds.queue
└── [routing_key: inventory.tmall] → inventory.queue

... (其他平台类似)

方案2:通过主 Exchange(备选) 平台 Exchange 先发到主 Exchange,再由主 Exchange 统一路由:

douyin.exchange → [routing_key: order.douyin] → main.exchange

main.exchange
├── [routing_key: order.#] → orders.queue
├── [routing_key: product.#] → products.queue
├── [routing_key: refund.#] → refunds.queue
└── [routing_key: inventory.#] → inventory.queue

推荐使用方案1,简化架构,减少消息跳转。

DLX 绑定(重试路由)

每个 DLX 根据消息 header 中的 x-retries 决定路由目标:

dlx.orders
├── [routing_key: retry, x-retries < 3] → orders.retry.queue
└── [routing_key: error, x-retries >= 3] → errors.queue

dlx.products
├── [routing_key: retry] → products.retry.queue
└── [routing_key: error] → errors.queue

dlx.refunds
├── [routing_key: retry] → refunds.retry.queue
└── [routing_key: error] → errors.queue

dlx.inventory
├── [routing_key: retry] → inventory.retry.queue
└── [routing_key: error] → errors.queue

注意RabbitMQ 原生不支持基于 header 的条件路由,需要:

  • 方案A:消费者在 nack 时根据重试次数决定发送到 DLX 的 routing keyretryerror
  • 方案B:使用 RabbitMQ 插件(如 rabbitmq_message_routing_by_headers)实现条件路由

推荐方案A:消费者控制重试逻辑,更灵活可控。

重试队列绑定(回流主队列)

重试队列 TTL 到期后,消息自动死信回主 Exchange:

orders.retry.queue
└── [TTL=5s后 死信到 main.exchangerouting_key保持原始值:order.{platform}]

main.exchange
└── [routing_key: order.#] → orders.queue

这样实现了"失败 → 延迟5秒 → 重新进入主队列"的循环。

错误队列绑定

每个平台的错误 Exchange 和所有 DLX 都绑定到统一错误队列:

douyin.errors.exchange
└── [routing_key: #] → errors.queue

dlx.orders
└── [routing_key: error] → errors.queue

dlx.products
└── [routing_key: error] → errors.queue

... (其他类型类似)

说明:使用 # 通配符接收所有错误类型。


消费者设计

1. 消费者模型

采用 单消费者 + 批处理 + prefetch + 适配器模式

数据类型 消费者 Queue 并发模式 处理模式
订单 Order Consumer orders.queue 单消费者 批处理 + 适配器
产品 Product Consumer products.queue 单消费者 批处理 + 适配器
退款 Refund Consumer refunds.queue 单消费者 批处理 + 适配器
库存 Inventory Consumer inventory.queue 单消费者 批处理 + 适配器

设计原则

  • 单消费者:每个队列只有一个消费者实例,保证严格 FIFO 顺序
  • 批处理:积累一批消息后统一处理,提升数据库写入效率
  • 单条 ACK:每条消息独立确认,失败不影响其他消息
  • 适配器模式:消费者内部根据平台字段分发到不同业务逻辑

**消费侧业务可以根据业务负载的强度,灵活调整 prefetch 配置,或者更进一步使用 hyperf 多 worker 模式提高吞吐 **

2. Prefetch 配置

{
  "prefetch_count": 100,
  "batch_size": 50,
  "batch_timeout": 3000
}

参数说明

  • prefetch_count=100RabbitMQ 最多推送 100 条未确认消息到消费者
  • batch_size=50:消费者积累 50 条消息后触发批处理
  • batch_timeout=3000:如果3秒内未达到 batch_size,也触发处理

工作流程

  1. RabbitMQ 按 Push 模式推送最多 100 条消息到消费者
  2. 消费者缓存消息,达到 50 条或超时 3 秒后批量处理
  3. 批量处理时,仍然对每条消息单独 ACK/NACK
  4. 实现"伪批量拉取"效果,同时保留单条确认的灵活性

3. 消费者处理逻辑(伪代码)

class OrderConsumer:
    def __init__(self):
        self.prefetch_count = 100
        self.batch_size = 50
        self.batch_timeout = 3
        self.message_buffer = []
        self.adapters = {
            'douyin': DouyinAdapter(),
            'tmall': TmallAdapter(),
            'jd': JDAdapter(),
            # ... 其他平台适配器
        }

    def on_message(self, channel, method, properties, body):
        """接收到消息时的回调"""
        message = json.loads(body)
        self.message_buffer.append({
            'channel': channel,
            'method': method,
            'properties': properties,
            'body': message
        })

        # 达到批量大小,触发处理
        if len(self.message_buffer) >= self.batch_size:
            self.process_batch()

    def process_batch(self):
        """批量处理消息"""
        for msg_data in self.message_buffer:
            try:
                message = msg_data['body']
                platform = message['platform']

                # 使用适配器模式分发到对应平台逻辑
                adapter = self.adapters.get(platform)
                if not adapter:
                    raise ValueError(f"Unknown platform: {platform}")

                # 处理业务逻辑
                adapter.process(message)

                # 成功:ACK
                msg_data['channel'].basic_ack(msg_data['method'].delivery_tag)

            except TemporaryError as e:
                # 临时错误:重试
                retry_count = msg_data['properties'].headers.get('x-retries', 0)

                if retry_count < 3:
                    # 更新重试次数,发送到 DLX 进行延迟重试
                    headers = {'x-retries': retry_count + 1}
                    msg_data['channel'].basic_nack(
                        msg_data['method'].delivery_tag,
                        requeue=False
                    )
                else:
                    # 超过重试次数,发送到错误队列
                    self.send_to_error_queue(msg_data)
                    msg_data['channel'].basic_nack(
                        msg_data['method'].delivery_tag,
                        requeue=False
                    )

            except PermanentError as e:
                # 永久错误:直接进入错误队列
                self.send_to_error_queue(msg_data)
                msg_data['channel'].basic_nack(
                    msg_data['method'].delivery_tag,
                    requeue=False
                )

        # 清空缓冲区
        self.message_buffer.clear()

    def send_to_error_queue(self, msg_data):
        """发送到错误队列"""
        error_message = {
            'original_message': msg_data['body'],
            'error': str(e),
            'timestamp': datetime.now().isoformat()
        }
        # 发送到错误 Exchange
        self.channel.basic_publish(
            exchange='douyin.errors.exchange',
            routing_key='error',
            body=json.dumps(error_message)
        )

4. 重试机制工作流程

┌─────────────────┐
│  orders.queue   │  ← 消费者从这里接收消息
└────────┬────────┘
         │
         ├─ 成功 → basic_ack()
         │
         └─ 失败 → basic_nack(requeue=false)
                    │
                    ▼
         ┌──────────────────┐
         │   dlx.orders     │  ← 死信交换机
         └────────┬─────────┘
                  │
                  ├─ x-retries < 3 → orders.retry.queue (TTL=5s)
                  │                     │
                  │                     └─ TTL到期 → 自动死信回 main.exchange
                  │                                    │
                  │                                    └─ order.# → orders.queue
                  │
                  └─ x-retries >= 3 → errors.queue (人工处理)

5. 为什么这样设计?

特性 原因 优势
单队列 FIFO RabbitMQ 无法在队列内部按 topic 过滤 保证严格 FIFO,不拆分队列造成业务乱序
单消费者 RabbitMQ 同队列多消费者必然 Round-Robin 分配 避免消息错投到错误的消费者
Prefetch + 批处理 RabbitMQ 是 Push 模式,不支持 Kafka 式批量拉取 实现"伪批量拉取",提升吞吐
单条 ACK 保留细粒度控制 失败消息不影响其他消息
DLX + 延迟重试 避免失败消息阻塞队列 不会卡 FIFO,不会无限重试
适配器模式 多平台业务逻辑隔离 扩展性好,易维护

用户权限配置

1. 平台开发者账号

每个平台分配一个专用账号,只能访问自己的 Exchange。

DouYin 平台账号

用户名: user_douyin
密码: <安全密码>
VHost: dataflow-app

权限配置:
- Configure: (留空) - 不允许创建/删除资源
- Write: ^douyin\.(exchange|errors\.exchange)$ - 只能写入 DouYin 的业务和错误 Exchange
- Read: ^douyin\.errors\..*$ - 只能读取 DouYin 错误相关的队列

Tmall 平台账号

用户名: user_tmall
密码: <安全密码>
VHost: dataflow-app

权限配置:
- Configure: (留空)
- Write: ^tmall\.(exchange|errors\.exchange)$
- Read: ^tmall\.errors\..*$

其他平台账号

按相同模式创建:

  • JD: user_jd
  • RedBook: user_redbook
  • Amazon Japan: user_amazon
  • Naver: user_naver
  • Rakuten: user_rakuten
  • Shopify: user_shopify

2. Dataflow 消费者账号

Dataflow 应用的后端服务账号,负责消费业务队列并处理错误。

用户名: user_dataflow_consumer
密码: <安全密码>
VHost: dataflow-app

权限配置:
- Configure: ^(orders|products|refunds|inventory)\.queue$ - 可以管理业务队列
- Write: .*\.errors\.exchange$ - 可以向所有错误 Exchange 写入消息
- Read: ^(orders|products|refunds|inventory)\.queue$ - 可以读取所有业务队列

3. 运维监控账号

运维团队账号,用于监控所有错误消息。

用户名: user_ops
密码: <安全密码>
VHost: dataflow-app

权限配置:
- Configure: ^errors\..*$ - 可以管理错误相关资源
- Write: (留空) - 不需要写入权限
- Read: ^errors\.queue$ - 可以读取统一错误队列

4. 管理员账号

RabbitMQ 超级管理员,用于配置和维护。

用户名: admin
密码: <强密码>
Tag: administrator

权限配置:
- 对所有 VHost 具有完全权限
- 可以访问管理界面

消息格式规范

1. message_id 设计

message_id 采用结构化格式,确保幂等性和可追溯性。

格式

{prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id}

字段说明

  • prefix:项目名称前缀(如 wpic-project1
  • app_id:应用标识(固定为 dataflow
  • company_id:公司 ID
  • platform_id:平台 ID
  • store_id:店铺 ID
  • entity_type:数据实体类型(order/product/refund/inventory
  • entity_id:平台侧唯一标识(如订单号)

分隔符说明

  • 使用 # 作为分隔符(RabbitMQ message_id 字段支持任意字符)
  • # 在 routing key 中是通配符,但在 message_id 中是普通字符

示例

wpic-project1#dataflow#100#20#200#order#DY123456789

解析:
- 项目前缀: wpic-project1
- 应用: dataflow
- 公司ID: 100
- 平台ID: 20 (DouYin)
- 店铺ID: 200
- 实体类型: order
- 平台订单号: DY123456789

特性

  • 幂等性:同一实体的 message_id 完全一致
  • 可读性:直接看出数据来源
  • 便于调试:可从 message_id 提取元数据
  • 无冲突:全局唯一

2. 业务消息格式

{
  "message_id": "wpic-project1#dataflow#100#20#200#order#DY123456",
  "timestamp": "2025-01-14T10:30:00Z",
  "platform": "douyin",
  "data_type": "order",

  "metadata": {
    "platform_id": 20,
    "company_id": 100,
    "store_id": 200,
    "source_system": "douyin-open-api",
    "retry_count": 0,
    "data_version": 1736840000
  },

  "data": {
    "platform_unique_id": "DY123456",
    "raw_data": {
      // 平台原始完整数据
    }
  }
}

字段说明

字段 类型 必需 说明
message_id string 结构化消息 ID,用于幂等性
timestamp string 消息生成时间(ISO 8601 格式)
platform string 平台标识(douyin/tmall/jd...
data_type string 数据类型(order/product/refund/inventory
metadata.platform_id int 平台 ID(数据库主键)
metadata.company_id int 公司 ID
metadata.store_id int 店铺 ID
metadata.source_system string 数据来源系统
metadata.retry_count int 重试次数(初始为 0
metadata.data_version int 数据版本(Unix 时间戳,用于防止乱序更新)
data.platform_unique_id string 平台侧唯一标识
data.raw_data object 平台原始完整 JSON 数据

重要说明

  • data_version:必须使用平台数据的更新时间戳(Unix 时间戳),用于解决消息乱序消费问题
    • 示例:平台订单的最后更新时间 17368400002025-01-14 10:00:00 UTC
    • 消费端只有当 data_version 大于数据库中的值时才更新数据
    • 如果平台未提供更新时间,使用消息生成时间戳

3. 错误消息格式

错误消息由 dataflow 消费者生成,包含原始消息和错误详情。

{
  "error_id": "err_1234567890abcdef",
  "original_message": {
    // 原始业务消息完整内容
  },
  "error": {
    "type": "validation",
    "message": "Missing required field: total_amount",
    "trace": "Exception stack trace...",
    "timestamp": "2025-01-14T10:31:00Z"
  },
  "metadata": {
    "platform": "amazon",
    "platform_id": 1,
    "company_id": 100,
    "store_id": 200,
    "data_type": "order",
    "failed_at": "2025-01-14T10:31:00Z"
  }
}

RabbitMQ 管理界面操作指引

1. 访问管理界面

URL: http://<rabbitmq-host>:15672
用户名: admin
密码: <管理员密码>

2. 创建 VHost

  1. 点击顶部导航 AdminVirtual Hosts
  2. 点击 Add a new virtual host
  3. 输入 VHost 名称:dataflow-app
  4. 点击 Add virtual host

3. 创建用户

  1. 点击顶部导航 AdminUsers
  2. 点击 Add a user
  3. 填写用户信息:
    • Username: user_amazon
    • Password: <安全密码>
    • Tags: (留空,非管理员用户)
  4. 点击 Add user

4. 配置用户权限

  1. 在用户列表中,点击用户名(如 user_amazon
  2. 滚动到 Permissions 部分
  3. 选择 VHostdataflow-app
  4. 配置权限正则表达式:
    • Configure regexp: (留空)
    • Write regexp: ^amazon\.(exchange|errors\.exchange)$
    • Read regexp: ^amazon\.errors\..*$
  5. 点击 Set permission

5. 创建 Exchange

  1. 点击顶部导航 Exchanges
  2. 确保选择正确的 VHostdataflow-app
  3. 点击 Add a new exchange
  4. 填写配置:
    • Name: amazon.exchange
    • Type: topic
    • Durability: Durable
    • Auto delete: No
    • Internal: No
  5. 点击 Add exchange

重复以上步骤创建所有业务 Exchange 和错误 Exchange。

6. 创建 Queue

  1. 点击顶部导航 Queues
  2. 确保选择正确的 VHostdataflow-app
  3. 点击 Add a new queue
  4. 填写配置:
    • Name: orders.queue
    • Durability: Durable
    • Auto delete: No
    • Arguments:
      • x-message-ttl: 86400000 (24小时,单位:毫秒)
  5. 点击 Add queue

重复以上步骤创建 products.queuerefunds.queueerrors.queue

注意errors.queue 的 TTL 设置为 6048000007天)。

7. 创建 Binding

  1. 点击顶部导航 Exchanges
  2. 点击需要绑定的 Exchange(如 amazon.exchange
  3. 滚动到 Bindings 部分
  4. Add binding from this exchange 区域填写:
    • To queue: orders.queue
    • Routing key: order
  5. 点击 Bind

为每个 Exchange 创建 3 个绑定(order/product/refund)。

对于错误 Exchange,绑定配置:

  • To queue: errors.queue
  • Routing key: #

配置脚本

为简化配置过程,可以使用以下脚本自动创建资源。

使用 rabbitmqadmin

RabbitMQ 提供命令行工具 rabbitmqadmin,可用于批量配置。

安装 rabbitmqadmin

# 从 RabbitMQ 管理界面下载
wget http://<rabbitmq-host>:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/

配置脚本示例

#!/bin/bash

# RabbitMQ 连接信息
RABBITMQ_HOST="localhost:15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="admin_password"
VHOST="dataflow-app"

# 平台列表
PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify")
DATA_TYPES=("orders" "products" "refunds" "inventory")

# 1. 创建 VHost
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare vhost name=$VHOST

# 2. 创建主 Exchange(用于重试消息回流)
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare exchange name="main.exchange" vhost=$VHOST \
  type=topic durable=true

# 3. 创建 DLX (死信交换机)
for dtype in "${DATA_TYPES[@]}"; do
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare exchange name="dlx.${dtype}" vhost=$VHOST \
    type=topic durable=true
done

# 4. 创建主业务队列(带 DLX 配置)
for dtype in "${DATA_TYPES[@]}"; do
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare queue name="${dtype}.queue" vhost=$VHOST durable=true \
    arguments="{\"x-message-ttl\":86400000,\"x-dead-letter-exchange\":\"dlx.${dtype}\",\"x-dead-letter-routing-key\":\"retry\"}"
done

# 5. 创建重试队列(带 TTL 和 DLX 回流配置)
for dtype in "${DATA_TYPES[@]}"; do
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare queue name="${dtype}.retry.queue" vhost=$VHOST durable=true \
    arguments="{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}"
done

# 6. 创建错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare queue name="errors.queue" vhost=$VHOST durable=true \
  arguments='{"x-message-ttl":604800000}'

# 7. 绑定主 Exchange 到主队列(使用通配符)
for dtype in "${DATA_TYPES[@]}"; do
  # 使用 order.#, product.# 等通配符匹配所有平台
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare binding source="main.exchange" destination="${dtype}.queue" \
    vhost=$VHOST routing_key="${dtype%s}.#"
done

# 8. 绑定 DLX 到重试队列和错误队列
for dtype in "${DATA_TYPES[@]}"; do
  # DLX -> 重试队列
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare binding source="dlx.${dtype}" destination="${dtype}.retry.queue" \
    vhost=$VHOST routing_key="retry"

  # DLX -> 错误队列
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare binding source="dlx.${dtype}" destination="errors.queue" \
    vhost=$VHOST routing_key="error"
done

# 9. 为每个平台创建 Exchange 和 Binding
for platform in "${PLATFORMS[@]}"; do
  # 创建平台业务 Exchange
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare exchange name="${platform}.exchange" vhost=$VHOST \
    type=topic durable=true

  # 绑定到业务队列(使用新的 routing key 格式:data_type.platform
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare binding source="${platform}.exchange" destination="orders.queue" \
    vhost=$VHOST routing_key="order.${platform}"

  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare binding source="${platform}.exchange" destination="products.queue" \
    vhost=$VHOST routing_key="product.${platform}"

  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare binding source="${platform}.exchange" destination="refunds.queue" \
    vhost=$VHOST routing_key="refund.${platform}"

  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare binding source="${platform}.exchange" destination="inventory.queue" \
    vhost=$VHOST routing_key="inventory.${platform}"

  # 创建平台错误 Exchange
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare exchange name="${platform}.errors.exchange" vhost=$VHOST \
    type=topic durable=true

  # 绑定到错误队列
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare binding source="${platform}.errors.exchange" destination="errors.queue" \
    vhost=$VHOST routing_key="#"

  # 创建平台用户
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare user name="user_${platform}" password="change_me_${platform}" tags=""

  # 配置平台用户权限
  rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
    declare permission vhost=$VHOST user="user_${platform}" \
    configure="" write="^${platform}\.(exchange|errors\.exchange)$" \
    read="^${platform}\.errors\..*$"
done

# 10. 创建 dataflow 消费者用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare user name="user_dataflow_consumer" password="change_me_consumer" tags=""

rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare permission vhost=$VHOST user="user_dataflow_consumer" \
  configure="^(orders|products|refunds|inventory).*\.queue$" \
  write="(dlx\..*)|(.*\.errors\.exchange)$" \
  read="^(orders|products|refunds|inventory).*\.queue$"

# 11. 创建运维监控用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare user name="user_ops" password="change_me_ops" tags=""

rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare permission vhost=$VHOST user="user_ops" \
  configure="^errors\..*$" write="" read="^errors\.queue$"

echo "RabbitMQ 配置完成!"
echo ""
echo "已创建:"
echo "- 1 个 VHost: dataflow-app"
echo "- 1 个主 Exchange: main.exchange"
echo "- 4 个 DLX: dlx.orders, dlx.products, dlx.refunds, dlx.inventory"
echo "- 4 个主队列: orders.queue, products.queue, refunds.queue, inventory.queue"
echo "- 4 个重试队列: orders.retry.queue, products.retry.queue, refunds.retry.queue, inventory.retry.queue"
echo "- 1 个错误队列: errors.queue"
echo "- ${#PLATFORMS[@]} 个平台 Exchange (业务 + 错误)"
echo "- $((${#PLATFORMS[@]} + 2)) 个用户 (${#PLATFORMS[@]} 个平台用户 + 1 个消费者 + 1 个运维)"

执行脚本

chmod +x setup_rabbitmq.sh
./setup_rabbitmq.sh

监控和维护

1. 监控指标

通过 RabbitMQ 管理界面查看以下关键指标:

队列健康状态

  1. 点击 Queues → 选择队列
  2. 关注以下指标:
    • Ready: 待消费消息数(正常应 < 1000)
    • Unacked: 未确认消息数(正常应较低)
    • Total: 总消息数
    • Incoming rate: 消息入队速率
    • Deliver / Get rate: 消息消费速率

告警阈值

  • Ready > 10000:队列堆积严重
  • Unacked > 5000:消费者处理缓慢
  • Incoming rate >> Deliver rate:消费速度跟不上生产速度

Exchange 流量统计

  1. 点击 Exchanges → 选择 Exchange
  2. 查看 Message rates
    • Publish in: 消息发布速率
    • Publish out: 消息路由到队列的速率

异常情况

  • Publish in > 0 但 Publish out = 0:可能绑定配置错误

2. 常见问题排查

消息未到达队列

检查步骤

  1. 确认 Exchange 存在且类型正确
  2. 检查 Binding 配置,确保 routing key 匹配
  3. 查看 Exchange 的 Message rates,确认消息已发布
  4. 检查生产者是否使用了正确的 routing key

消费者无法消费

检查步骤

  1. 确认用户权限配置正确(Read 权限)
  2. 检查队列中是否有消息(Ready > 0
  3. 查看消费者连接状态(Connections 页面)
  4. 检查消费者代码是否正确 ACK 消息

错误消息未送达平台开发者

检查步骤

  1. 确认错误 Exchange 绑定到 errors.queue
  2. 检查平台开发者账号的 Read 权限
  3. 确认平台开发者订阅了正确的 Exchange
  4. 查看 errors.queue 中是否有消息

3. 备份和恢复

导出配置

# 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  export dataflow-backup.json

导入配置

# 从备份恢复配置
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  import dataflow-backup.json

4. 性能优化

消费者并发配置

  • 增加消费者实例数量
  • 每个消费者配置多个 channel(如 5-10 个)
  • 使用 prefetch_count 限制未确认消息数量

队列性能优化

  • 启用 lazy queue(大量消息堆积时):
    rabbitmqadmin declare queue name=orders.queue vhost=dataflow-app \
      durable=true arguments='{"x-queue-mode":"lazy"}'
    
  • 定期清理过期消息(通过 TTL 自动实现)

5. 日志查看

# 查看 RabbitMQ 日志
tail -f /var/log/rabbitmq/rabbit@<hostname>.log

# 查看连接日志
tail -f /var/log/rabbitmq/rabbit@<hostname>_connection.log

安全建议

1. 密码策略

  • 所有用户使用强密码(至少 16 位,包含大小写字母、数字、特殊字符)
  • 定期更换密码(建议 3-6 个月)
  • 不同环境(开发/测试/生产)使用不同密码

2. 网络安全

  • 生产环境启用 TLS/SSL 加密连接
  • 限制管理界面访问(仅允许内网或 VPN 访问)
  • 配置防火墙规则:
    • 5672:AMQP 端口(仅允许应用服务器访问)
    • 15672:管理界面(仅允许管理员 IP 访问)

3. 权限最小化原则

  • 平台开发者只能访问自己的 Exchange
  • 消费者只能读取必要的队列
  • 避免赋予不必要的 Configure 权限

4. 审计日志

  • 启用 RabbitMQ 审计日志插件
  • 定期审查用户操作记录
  • 监控异常登录行为

新增平台接入流程

当需要接入新平台(例如 Lazada)时,按以下步骤操作:

1. 创建 Exchange

# 业务 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare exchange name="lazada.exchange" vhost=dataflow-app \
  type=topic durable=true

# 错误 Exchange
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare exchange name="lazada.errors.exchange" vhost=dataflow-app \
  type=topic durable=true

2. 创建 Binding

# 绑定到业务队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare binding source="lazada.exchange" destination="orders.queue" \
  vhost=dataflow-app routing_key="order"

rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare binding source="lazada.exchange" destination="products.queue" \
  vhost=dataflow-app routing_key="product"

rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare binding source="lazada.exchange" destination="refunds.queue" \
  vhost=dataflow-app routing_key="refund"

rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare binding source="lazada.exchange" destination="inventory.queue" \
  vhost=dataflow-app routing_key="inventory"

# 绑定到错误队列
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare binding source="lazada.errors.exchange" \
  destination="errors.queue" vhost=dataflow-app routing_key="#"

3. 创建用户并配置权限

# 创建用户
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare user name="user_lazada" password="<secure_password>" tags=""

# 配置权限
rabbitmqadmin -H $RABBITMQ_HOST -u $RABBITMQ_USER -p $RABBITMQ_PASS \
  declare permission vhost=dataflow-app user="user_lazada" \
  configure="" write="^lazada\.(exchange|errors\.exchange)$" \
  read="^lazada\.errors\..*$"

4. 提供给平台开发者

将以下信息提供给新平台的开发团队:

RabbitMQ 连接信息:
- Host: <rabbitmq-host>
- Port: 5672
- VHost: dataflow-app
- Username: user_lazada
- Password: <secure_password>

发布配置:
- Exchange: lazada.exchange
- Routing Keys:
  - order: 发布订单数据
  - product: 发布产品数据
  - refund: 发布退款数据
  - inventory: 发布库存数据

错误订阅(可选):
- Exchange: lazada.errors.exchange
- 接收本平台的错误消息

消息格式规范:
- 参见「消息格式规范」章节

附录

A. 完整资源清单

VHost

  • 总数: 1 个
  • dataflow-app

Exchanges

主 Exchange: 1 个

  • main.exchange (Topic) - 统一主交换机,用于路由和重试消息回流

平台业务 Exchanges: 8 个(每平台 1 个)

  • douyin.exchange
  • jd.exchange
  • tmall.exchange
  • redbook.exchange
  • amazon.exchange
  • naver.exchange
  • rakuten.exchange
  • shopify.exchange

平台错误 Exchanges: 8 个(每平台 1 个)

  • douyin.errors.exchange
  • jd.errors.exchange
  • tmall.errors.exchange
  • redbook.errors.exchange
  • amazon.errors.exchange
  • naver.errors.exchange
  • rakuten.errors.exchange
  • shopify.errors.exchange

死信交换机 (DLX): 4 个(每数据类型 1 个)

  • dlx.orders
  • dlx.products
  • dlx.refunds
  • dlx.inventory

Exchange 总数: 21 个 (1 主 + 8 平台业务 + 8 平台错误 + 4 DLX)

Queues

主业务队列: 4 个

  • orders.queue (配置 DLX: dlx.orders)
  • products.queue (配置 DLX: dlx.products)
  • refunds.queue (配置 DLX: dlx.refunds)
  • inventory.queue (配置 DLX: dlx.inventory)

延迟重试队列: 4 个

  • orders.retry.queue (TTL: 5秒, DLX: main.exchange)
  • products.retry.queue (TTL: 5秒, DLX: main.exchange)
  • refunds.retry.queue (TTL: 5秒, DLX: main.exchange)
  • inventory.retry.queue (TTL: 5秒, DLX: main.exchange)

错误队列: 1 个

  • errors.queue (TTL: 7天)

Queue 总数: 9 个 (4 主队列 + 4 重试队列 + 1 错误队列)

Bindings

主 Exchange 绑定: 4 个

  • main.exchangeorders.queue (routing_key: order.#)
  • main.exchangeproducts.queue (routing_key: product.#)
  • main.exchangerefunds.queue (routing_key: refund.#)
  • main.exchangeinventory.queue (routing_key: inventory.#)

平台业务 Exchange 绑定: 32 个

  • 每个平台 Exchange → 4 个业务队列 (8 × 4 = 32)
  • 使用 routing key 格式:{data_type}.{platform}
    • 例如:order.douyin, product.tmall, refund.jd

平台错误 Exchange 绑定: 8 个

  • 每个平台错误 Exchange → errors.queue (routing_key: #)

DLX 绑定: 8 个

  • 每个 DLX → 对应重试队列 (routing_key: retry) - 4 个
  • 每个 DLX → errors.queue (routing_key: error) - 4 个

Binding 总数: 52 个 (4 主 + 32 平台业务 + 8 平台错误 + 8 DLX)

Users

平台开发者账号: 8 个

  • user_douyin, user_jd, user_tmall, user_redbook, user_amazon, user_naver, user_rakuten, user_shopify
  • 权限:只能写入自己的业务和错误 Exchange,只能读取自己的错误队列

消费者账号: 1 个

  • user_dataflow_consumer
  • 权限:可以管理所有业务队列和重试队列,可以写入 DLX 和错误 Exchange,可以读取所有业务队列

运维监控账号: 1 个

  • user_ops
  • 权限:可以管理错误相关资源,可以读取错误队列

管理员账号: 1 个

  • admin
  • 权限:完全权限,可以访问管理界面

User 总数: 11 个 (8 平台 + 1 消费者 + 1 运维 + 1 管理员)

架构统计

资源类型 数量 说明
VHost 1 dataflow-app
Exchange 21 1 主 + 8 平台业务 + 8 平台错误 + 4 DLX
Queue 9 4 主 + 4 重试 + 1 错误
Binding 52 完整消息流转路径
User 11 多角色权限隔离

消息流转路径

生产者 → 平台 Exchange → 主队列 → 消费者
                            ↓ (失败)
                          DLX → 重试队列 (5秒TTL) → 主 Exchange → 主队列
                            ↓ (超过3次)
                          DLX → 错误队列 (人工处理)

B. 参考资料