diff --git a/backend/app/Command/AppMessageQueuePushShopee.php b/backend/app/Command/AppMessageQueuePushShopee.php index b4ebb39..a214fb8 100644 --- a/backend/app/Command/AppMessageQueuePushShopee.php +++ b/backend/app/Command/AppMessageQueuePushShopee.php @@ -36,6 +36,12 @@ class AppMessageQueuePushShopee extends HyperfCommand InputOption::VALUE_REQUIRED, 'Queue type: product, order, refund, inventory' ); + $this->addOption( + 'ids', + null, + InputOption::VALUE_REQUIRED, + 'Comma-separated primary key IDs to fetch specific records' + ); } /** @@ -48,21 +54,25 @@ class AppMessageQueuePushShopee extends HyperfCommand 'table' => 'wpic_shopee_order', 'column' => 'raw', 'producer' => ShopeeOrderProducer::class, + 'primary' => 'order_sn', ], 'product' => [ 'table' => 'wpic_shopee_item', 'column' => 'raw', 'producer' => ShopeeProductProducer::class, + 'primary' => 'item_id', ], 'refund' => [ 'table' => 'wpic_shopee_return', 'column' => 'raw', 'producer' => ShopeeRefundProducer::class, + 'primary' => 'return_sn', ], 'inventory' => [ 'table' => 'wpic_shopee_inventory', 'column' => 'raw', 'producer' => ShopeeInventoryProducer::class, + 'primary' => 'item_id', ], ]; @@ -86,7 +96,7 @@ class AppMessageQueuePushShopee extends HyperfCommand // 尝试更新 store 信息 $store = Store::where('platform_id', '=', 25) - ->where('platform_store_id', '=', 1669343109 ) + ->where('platform_store_id', '=', 1402800321 ) ->first(); if(!$store){ @@ -98,7 +108,7 @@ class AppMessageQueuePushShopee extends HyperfCommand 'partner_id' => 2006675, 'app_id' => '', 'merchant_id' => 3726273, - 'name' => 'owala.sg', + 'name' => 'nanit.sg', 'zone' => 'SG', ]; @@ -107,11 +117,21 @@ class AppMessageQueuePushShopee extends HyperfCommand $store->save(); // 从 raw 数据库连接获取数据 - $records = Db::connection('raw') + $query = Db::connection('raw') ->table($config['table']) - ->where('store_id', '=', 1669343109) - ->orderBy('id', 'desc') - ->limit(4)->get($config['column'])->lazy(); + ->where('store_id', '=', 1402800321); + + $idsOption = $this->input->getOption('ids'); + if (!empty($idsOption)) { + $ids = array_map('trim', explode(',', $idsOption)); + $ids = array_unique(array_filter($ids, fn($id) => $id !== '')); + $query->whereIn($config['primary'], $ids); + $this->info(sprintf('Fetching records by IDs: %s', implode(', ', $ids))); + } else { + $query->orderBy('id', 'desc')->limit(4); + } + + $records = $query->get([$config['column']])->lazy(); if ($records->isEmpty()) { $this->warn(sprintf('No records found in %s table', $config['table'])); @@ -136,10 +156,10 @@ class AppMessageQueuePushShopee extends HyperfCommand //@ATTENTION 生产环境需要注意, 暂时使用 Shopee Loop SG 进行测试 $messageData = [ - 'company_id' => 171, + 'company_id' => 198, 'platform_id' => 25, - 'store_id' => 255, - 'platform_store_id' => 1669343109, + 'store_id' => 340, + 'platform_store_id' => 1402800321, 'unique_id' => time() . '_' . uniqid(), 'raw_data' => $raw_data, // 包含 2 条原始记录 ]; @@ -167,3 +187,4 @@ class AppMessageQueuePushShopee extends HyperfCommand } } } + diff --git a/backend/app/Controller/IndexController.php b/backend/app/Controller/IndexController.php index d39403c..3b2145e 100644 --- a/backend/app/Controller/IndexController.php +++ b/backend/app/Controller/IndexController.php @@ -25,7 +25,7 @@ class IndexController extends AbstractController return [ 'method' => $method, - 'message' => "Dataflow App", + 'message' => "Datahub App", ]; } diff --git a/backend/app/Platform/ErrorProducer.php b/backend/app/Platform/ErrorProducer.php index 8de9b05..55048aa 100644 --- a/backend/app/Platform/ErrorProducer.php +++ b/backend/app/Platform/ErrorProducer.php @@ -26,7 +26,7 @@ class ErrorProducer extends ProducerMessage /** * VHost */ - protected string $vhost = 'dataflow'; + protected string $vhost = 'datahub'; /** * 消息持久化 diff --git a/backend/app/Platform/OrderProducer.php b/backend/app/Platform/OrderProducer.php index 96f7b41..4f64b55 100644 --- a/backend/app/Platform/OrderProducer.php +++ b/backend/app/Platform/OrderProducer.php @@ -20,7 +20,7 @@ class OrderProducer extends ProducerMessage /** * VHost */ - protected string $vhost = 'dataflow'; + protected string $vhost = 'datahub'; /** * 消息持久化 diff --git a/backend/config/autoload/auth.php b/backend/config/autoload/auth.php index 912a7cd..0dc1c3e 100644 --- a/backend/config/autoload/auth.php +++ b/backend/config/autoload/auth.php @@ -167,7 +167,7 @@ return [ * 可选配置 * 缓存前缀 */ - 'prefix' => env('SIMPLE_JWT_PREFIX', 'dataflow'), + 'prefix' => env('SIMPLE_JWT_PREFIX', 'datahub'), ], 'session' => [ 'driver' => Qbhy\HyperfAuth\Guard\SessionGuard::class, diff --git a/backend/config/autoload/databases.php b/backend/config/autoload/databases.php index 9b7e85b..c1cc5bf 100644 --- a/backend/config/autoload/databases.php +++ b/backend/config/autoload/databases.php @@ -15,10 +15,10 @@ return [ 'default' => [ 'driver' => env('DB_DRIVER', 'pgsql'), 'host' => env('DB_HOST', '127.0.0.1'), - 'database' => env('DB_DATABASE', 'dataflow'), + 'database' => env('DB_DATABASE', 'datahub'), 'port' => env('DB_PORT', 5416), - 'username' => env('DB_USERNAME', 'dataflow'), - 'password' => env('DB_PASSWORD', 'dataflow'), + 'username' => env('DB_USERNAME', 'datahub'), + 'password' => env('DB_PASSWORD', 'datahub'), 'charset' => env('DB_CHARSET', 'utf8'), 'collation' => env('DB_COLLATION', 'utf8_unicode_ci'), 'prefix' => env('DB_PREFIX', ''), @@ -43,16 +43,16 @@ return [ ], 'raw' => [ 'driver' => 'mysql', - // 'host' => '127.0.0.1', - // 'port' => 3380, - // 'database' => 'wpic_task', - // 'username' => 'tools', - // 'password' => 'root', - 'host' => '120.27.136.208', - 'port' => 3306, - 'database' => 'wpic', - 'username' => 'wpic', - 'password' => 'wpic#jsKk223', + 'host' => '127.0.0.1', + 'port' => 3380, + 'database' => 'wpic_task', + 'username' => 'tools', + 'password' => 'root', + // 'host' => '120.27.136.208', + // 'port' => 3306, + // 'database' => 'wpic', + // 'username' => 'wpic', + // 'password' => 'wpic#jsKk223', 'charset' => 'utf8', 'collation' => 'utf8_unicode_ci', 'prefix' => '', diff --git a/docs/AUTH_USAGE.md b/docs/AUTH_USAGE.md index e0054fe..814fce5 100644 --- a/docs/AUTH_USAGE.md +++ b/docs/AUTH_USAGE.md @@ -22,16 +22,16 @@ DB_DRIVER=pgsql DB_HOST=127.0.0.1 DB_PORT=5416 -DB_DATABASE=dataflow -DB_USERNAME=dataflow -DB_PASSWORD=dataflow +DB_DATABASE=datahub +DB_USERNAME=datahub +DB_PASSWORD=datahub # JWT 配置 SIMPLE_JWT_SECRET=your-secret-key-change-this-in-production # 生产环境请修改 JWT_HEADER_NAME=Authorization SIMPLE_JWT_TTL=7200 # Access Token 有效期:2小时 SIMPLE_JWT_REFRESH_TTL=2592000 # Refresh Token 有效期:30天 -SIMPLE_JWT_PREFIX=dataflow +SIMPLE_JWT_PREFIX=datahub ``` ### Token 设计 diff --git a/docs/RabbitMQ.md b/docs/RabbitMQ.md index 5fa48da..d31d6ed 100644 --- a/docs/RabbitMQ.md +++ b/docs/RabbitMQ.md @@ -2,11 +2,11 @@ ## 概述 -本文档描述 dataflow 应用的 RabbitMQ 消息队列架构设计和配置方法。 +本文档描述 datahub 应用的 RabbitMQ 消息队列架构设计和配置方法。 ### 业务背景 -- **应用角色**:dataflow 是消息消费者,负责接收来自多个电商平台的数据并入库 +- **应用角色**:datahub 是消息消费者,负责接收来自多个电商平台的数据并入库 - **数据来源**:多个电商平台(DouYin、JD、Tmall、RedBook、Amazon Japan、Naver、Rakuten、Shopify 等) - **数据类型**:订单(orders)、产品(products)、退款(refunds)、库存(inventory) - **隔离需求**:不同平台开发者之间的工作空间需要相互隔离,互不影响 @@ -214,19 +214,19 @@ VHost 按业务应用进行划分,便于资源隔离和权限管理。 ``` RabbitMQ 实例 -├── dataflow-app # Dataflow 应用专用 +├── datahub-app # Datahub 应用专用 ├── analytics-app # 分析应用(预留) └── warehouse-app # 数据仓库应用(预留) ``` **当前需要创建的 VHost**: -- `dataflow-app`:用于电商数据流转 +- `datahub-app`:用于电商数据流转 --- ### 2. Exchange 设计 -在 `dataflow-app` VHost 中,Exchange 分为两类: +在 `datahub-app` VHost 中,Exchange 分为两类: #### 业务 Exchange(用于数据投递) @@ -702,7 +702,7 @@ class OrderConsumer: ``` 用户名: user_douyin 密码: <安全密码> -VHost: dataflow-app +VHost: datahub-app 权限配置: - Configure: (留空) - 不允许创建/删除资源 @@ -715,7 +715,7 @@ VHost: dataflow-app ``` 用户名: user_tmall 密码: <安全密码> -VHost: dataflow-app +VHost: datahub-app 权限配置: - Configure: (留空) @@ -733,14 +733,14 @@ VHost: dataflow-app - Rakuten: `user_rakuten` - Shopify: `user_shopify` -### 2. Dataflow 消费者账号 +### 2. Datahub 消费者账号 -Dataflow 应用的后端服务账号,负责消费业务队列并处理错误。 +Datahub 应用的后端服务账号,负责消费业务队列并处理错误。 ``` -用户名: user_dataflow_consumer +用户名: user_datahub_consumer 密码: <安全密码> -VHost: dataflow-app +VHost: datahub-app 权限配置: - Configure: ^(orders|products|refunds|inventory)\.queue$ - 可以管理业务队列 @@ -755,7 +755,7 @@ VHost: dataflow-app ``` 用户名: user_ops 密码: <安全密码> -VHost: dataflow-app +VHost: datahub-app 权限配置: - Configure: ^errors\..*$ - 可以管理错误相关资源 @@ -792,7 +792,7 @@ message_id 采用结构化格式,确保幂等性和可追溯性。 **字段说明**: - `prefix`:项目名称前缀(如 `wpic-project1`) -- `app_id`:应用标识(固定为 `dataflow`) +- `app_id`:应用标识(固定为 `datahub`) - `company_id`:公司 ID - `platform_id`:平台 ID - `store_id`:店铺 ID @@ -805,11 +805,11 @@ message_id 采用结构化格式,确保幂等性和可追溯性。 **示例**: ``` -wpic-project1#dataflow#100#20#200#order#2025-11-10~2025-11-15 +wpic-project1#datahub#100#20#200#order#2025-11-10~2025-11-15 解析: - 项目前缀: wpic-project1 -- 应用: dataflow +- 应用: datahub - 公司ID: 100 - 平台ID: 20 (DouYin) - 店铺ID: 200 @@ -827,7 +827,7 @@ wpic-project1#dataflow#100#20#200#order#2025-11-10~2025-11-15 ```json { - "message_id": "wpic-project1#dataflow#100#20#200#order#DY123456", + "message_id": "wpic-project1#datahub#100#20#200#order#DY123456", "timestamp": "2025-01-14T10:30:00Z", "platform": "douyin", "data_type": "order", @@ -876,7 +876,7 @@ wpic-project1#dataflow#100#20#200#order#2025-11-10~2025-11-15 ### 3. 错误消息格式 -错误消息由 dataflow 消费者生成,包含原始消息和错误详情。 +错误消息由 datahub 消费者生成,包含原始消息和错误详情。 ```json { @@ -917,7 +917,7 @@ URL: http://:15672 1. 点击顶部导航 **Admin** → **Virtual Hosts** 2. 点击 **Add a new virtual host** -3. 输入 VHost 名称:`dataflow-app` +3. 输入 VHost 名称:`datahub-app` 4. 点击 **Add virtual host** ### 3. 创建用户 @@ -934,7 +934,7 @@ URL: http://:15672 1. 在用户列表中,点击用户名(如 `user_amazon`) 2. 滚动到 **Permissions** 部分 -3. 选择 VHost:`dataflow-app` +3. 选择 VHost:`datahub-app` 4. 配置权限正则表达式: - **Configure regexp**: (留空) - **Write regexp**: `^amazon\.(exchange|errors\.exchange)$` @@ -944,7 +944,7 @@ URL: http://:15672 ### 5. 创建 Exchange 1. 点击顶部导航 **Exchanges** -2. 确保选择正确的 VHost:`dataflow-app` +2. 确保选择正确的 VHost:`datahub-app` 3. 点击 **Add a new exchange** 4. 填写配置: - **Name**: `amazon.exchange` @@ -959,7 +959,7 @@ URL: http://:15672 ### 6. 创建 Queue 1. 点击顶部导航 **Queues** -2. 确保选择正确的 VHost:`dataflow-app` +2. 确保选择正确的 VHost:`datahub-app` 3. 点击 **Add a new queue** 4. 填写配置: - **Name**: `orders.queue` @@ -1020,7 +1020,7 @@ RABBITMQ_HOST="127.0.0.1" RABBITMQ_PORT="15672" RABBITMQ_USER="admin" RABBITMQ_PASS="admin" -VHOST="dataflow" +VHOST="datahub" # 平台列表 PLATFORMS=("douyin" "jd" "tmall" "redbook" "amazon" "naver" "rakuten" "shopify") @@ -1058,7 +1058,7 @@ for platform in "${PLATFORMS[@]}"; do done # 添加其他用户 -ALL_USERS+=("user_dataflow_consumer") +ALL_USERS+=("user_datahub_consumer") ALL_USERS+=("user_ops") # 删除用户(如果存在) @@ -1212,17 +1212,17 @@ for platform in "${PLATFORMS[@]}"; do echo "✓ 配置用户权限" done -# 10. 创建 dataflow 消费者用户 +# 10. 创建 datahub 消费者用户 echo "" echo "========================================" -echo "创建 dataflow 消费者用户..." +echo "创建 datahub 消费者用户..." echo "========================================" rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags "" -echo "✓ 创建用户: user_dataflow_consumer" + declare user --name "user_datahub_consumer" --password "change_me_consumer" --tags "" +echo "✓ 创建用户: user_datahub_consumer" rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \ + declare permissions --vhost "$VHOST" --user "user_datahub_consumer" \ --configure "^(orders|products|refunds|inventory).*\\.queue$" \ --write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \ --read "^(orders|products|refunds|inventory).*\\.queue$" @@ -1331,7 +1331,7 @@ chmod +x setup_rabbitmq.sh ```bash # 导出所有定义(Exchanges, Queues, Bindings, Users, Permissions) rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - export dataflow-backup.json + export datahub-backup.json ``` #### 导入配置 @@ -1339,7 +1339,7 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ ```bash # 从备份恢复配置 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - import dataflow-backup.json + import datahub-backup.json ``` ### 4. 性能优化 @@ -1354,7 +1354,7 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ - 启用 lazy queue(大量消息堆积时): ```bash - rabbitmqadmin declare queue name=orders.queue vhost=dataflow-app \ + rabbitmqadmin declare queue name=orders.queue vhost=datahub-app \ durable=true arguments='{"x-queue-mode":"lazy"}' ``` - 定期清理过期消息(通过 TTL 自动实现) @@ -1410,12 +1410,12 @@ tail -f /var/log/rabbitmq/rabbit@_connection.log ```bash # 业务 Exchange rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "lazada.exchange" --vhost dataflow-app \ + declare exchange --name "lazada.exchange" --vhost datahub-app \ --type topic --durable true # 错误 Exchange rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare exchange --name "lazada.errors.exchange" --vhost dataflow-app \ + declare exchange --name "lazada.errors.exchange" --vhost datahub-app \ --type topic --durable true ``` @@ -1425,24 +1425,24 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ # 绑定到业务队列 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding --source "lazada.exchange" --destination "orders.queue" \ - --destination-type queue --vhost dataflow-app --routing-key "order.lazada" + --destination-type queue --vhost datahub-app --routing-key "order.lazada" rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding --source "lazada.exchange" --destination "products.queue" \ - --destination-type queue --vhost dataflow-app --routing-key "product.lazada" + --destination-type queue --vhost datahub-app --routing-key "product.lazada" rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding --source "lazada.exchange" --destination "refunds.queue" \ - --destination-type queue --vhost dataflow-app --routing-key "refund.lazada" + --destination-type queue --vhost datahub-app --routing-key "refund.lazada" rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding --source "lazada.exchange" --destination "inventory.queue" \ - --destination-type queue --vhost dataflow-app --routing-key "inventory.lazada" + --destination-type queue --vhost datahub-app --routing-key "inventory.lazada" # 绑定到错误队列 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding --source "lazada.errors.exchange" --destination "errors.queue" \ - --destination-type queue --vhost dataflow-app --routing-key "#" + --destination-type queue --vhost datahub-app --routing-key "#" ``` ### 3. 创建用户并配置权限 @@ -1454,7 +1454,7 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ # 配置权限 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare permissions --vhost dataflow-app --user "user_lazada" \ + declare permissions --vhost datahub-app --user "user_lazada" \ --configure "^lazada\\.(exchange|errors\\.exchange)$" \ --write "^lazada\\.(exchange|errors\\.exchange)$" \ --read "^lazada\\.errors\\..*$" @@ -1468,7 +1468,7 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ RabbitMQ 连接信息: - Host: - Port: 5672 -- VHost: dataflow-app +- VHost: datahub-app - Username: user_lazada - Password: @@ -1496,7 +1496,7 @@ RabbitMQ 连接信息: #### VHost - **总数**: 1 个 -- `dataflow-app` +- `datahub-app` #### Exchanges @@ -1579,7 +1579,7 @@ RabbitMQ 连接信息: - 权限:只能写入自己的业务和错误 Exchange,只能读取自己的错误队列 **消费者账号**: 1 个 -- `user_dataflow_consumer` +- `user_datahub_consumer` - 权限:可以管理所有业务队列和重试队列,可以写入 DLX 和错误 Exchange,可以读取所有业务队列 **运维监控账号**: 1 个 @@ -1596,7 +1596,7 @@ RabbitMQ 连接信息: | 资源类型 | 数量 | 说明 | |---------|------|------| -| VHost | 1 | dataflow-app | +| VHost | 1 | datahub-app | | Exchange | 21 | 1 主 + 8 平台业务 + 8 平台错误 + 4 DLX | | Queue | 9 | 4 主 + 4 重试 + 1 错误 | | Binding | 52 | 完整消息流转路径 | diff --git a/docs/data_flow.md b/docs/data_flow.md index fa6a995..1e45645 100644 --- a/docs/data_flow.md +++ b/docs/data_flow.md @@ -10,7 +10,7 @@ |------|------|------| | **数据生产者** | 采集电商平台数据并推送到 MQ | 各平台采集服务 | | **消息队列** | 数据路由、缓冲、隔离 | RabbitMQ | -| **数据消费者** | 消费 MQ 消息并入库 | Dataflow App | +| **数据消费者** | 消费 MQ 消息并入库 | Datahub App | | **存储层** | 持久化存储和统计分析 | PostgreSQL + TimescaleDB | --- @@ -27,7 +27,7 @@ graph TB end subgraph "RabbitMQ 消息队列" - subgraph "VHost: dataflow-app" + subgraph "VHost: datahub-app" subgraph "业务 Exchanges" E1[douyin.exchange] E2[tmall.exchange] @@ -65,7 +65,7 @@ graph TB end end - subgraph "Dataflow 消费者" + subgraph "Datahub 消费者" C1[OrderConsumer] C2[ProductConsumer] C3[RefundConsumer] @@ -187,7 +187,7 @@ graph LR end subgraph "消费者" - CO[OrderConsumer
user_dataflow_consumer] + CO[OrderConsumer
user_datahub_consumer] end PD -->|write only| ED @@ -387,7 +387,7 @@ message_1 后到达 (data_version: 1736840000): ```json { - "message_id": "order#dataflow#100#20#200#order#DY123", + "message_id": "order#datahub#100#20#200#order#DY123", "metadata": { "data_version": 1736840100 // 必需:平台数据的最后更新时间戳 }, @@ -484,7 +484,7 @@ BEGIN; -- 1. 记录消息已处理(幂等性) INSERT INTO processed_messages (message_id, platform_id, data_type) -VALUES ('order#dataflow#100#20#200#order#DY123', 20, 'order') +VALUES ('order#datahub#100#20#200#order#DY123', 20, 'order') ON CONFLICT (message_id) DO NOTHING RETURNING message_id; @@ -525,7 +525,7 @@ sequenceDiagram Note over P: 1. 数据采集阶段 P->>P: 从 DouYin API 获取订单数据 - P->>P: 生成 message_id
order#dataflow#100#20#200#order#DY123
提取 data_version + P->>P: 生成 message_id
order#datahub#100#20#200#order#DY123
提取 data_version Note over P,E: 2. 消息发布阶段 P->>E: 发布消息
routing_key: order
持久化消息 @@ -673,7 +673,7 @@ sequenceDiagram | 操作 | 权限 | 说明 | |------|------|------| -| 连接到 VHost | `dataflow-app` | 只能访问此 VHost | +| 连接到 VHost | `datahub-app` | 只能访问此 VHost | | 发布消息 | `{platform}.exchange` | 只能写入自己平台的 Exchange | | 订阅错误 | `{platform}.errors.exchange` | 可选,接收本平台错误通知 | @@ -685,19 +685,19 @@ sequenceDiagram 格式: {entity_type}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{platform_unique_id} 示例: -- order#dataflow#100#20#200#order#DY123456789 -- product#dataflow#100#2#201#product#TM-PROD789 -- refund#dataflow#100#20#200#refund#DY-REF456 -- inventory#dataflow#100#2#201#inventory#TM-SKU123 +- order#datahub#100#20#200#order#DY123456789 +- product#datahub#100#2#201#product#TM-PROD789 +- refund#datahub#100#20#200#refund#DY-REF456 +- inventory#datahub#100#2#201#inventory#TM-SKU123 ``` **生成逻辑**: ``` # 伪代码 -message_id = f"{data_type}#dataflow#{company_id}#{platform_id}#{store_id}#{data_type}#{platform_order_id}" +message_id = f"{data_type}#datahub#{company_id}#{platform_id}#{store_id}#{data_type}#{platform_order_id}" # 实际示例(DouYin 订单) -message_id = f"order#dataflow#100#20#200#order#{order_data['order_id']}" +message_id = f"order#datahub#100#20#200#order#{order_data['order_id']}" ``` ##### 步骤 2: 提取 data_version @@ -724,7 +724,7 @@ if not data_version: ```json { - "message_id": "order#dataflow#100#20#200#order#DY123456", + "message_id": "order#datahub#100#20#200#order#DY123456", "timestamp": "2025-01-14T10:30:00Z", "platform": "douyin", "data_type": "order", @@ -753,7 +753,7 @@ if not data_version: **连接参数**: - Host: `` - Port: `5672` -- VHost: `dataflow-app` +- VHost: `datahub-app` - Username: `user_{platform}` - Password: `` @@ -794,7 +794,7 @@ if not data_version: --- -### 2. Dataflow 应用(数据消费者) +### 2. Datahub 应用(数据消费者) #### 职责 @@ -847,7 +847,7 @@ OrderConsumer (单实例) | 操作 | 权限 | 说明 | |------|------|------| -| 连接到 VHost | `dataflow-app` | 访问业务 VHost | +| 连接到 VHost | `datahub-app` | 访问业务 VHost | | 读取队列 | `orders.queue`, `products.queue`, `refunds.queue` | 消费业务消息 | | 写入错误 Exchange | `*.errors.exchange` | 发送错误消息到对应平台的错误 Exchange | | 管理队列 | `orders.queue`, `products.queue`, `refunds.queue` | 配置消费者参数 | @@ -892,7 +892,7 @@ BEGIN; -- 步骤 1: 尝试插入 processed_messages(幂等性检查) INSERT INTO processed_messages (message_id, platform_id, data_type) -VALUES ('order#dataflow#100#20#200#order#DY123', 20, 'order') +VALUES ('order#datahub#100#20#200#order#DY123', 20, 'order') ON CONFLICT (message_id) DO NOTHING RETURNING message_id; @@ -994,7 +994,7 @@ COMMIT; ```bash # 查看所有队列状态 -rabbitmqctl list_queues -p dataflow-app name messages_ready messages_unacknowledged +rabbitmqctl list_queues -p datahub-app name messages_ready messages_unacknowledged # 告警阈值 # - messages_ready > 10000: 严重堆积 @@ -1005,7 +1005,7 @@ rabbitmqctl list_queues -p dataflow-app name messages_ready messages_unacknowled ```bash # 查看所有消费者连接 -rabbitmqctl list_consumers -p dataflow-app +rabbitmqctl list_consumers -p datahub-app # 检查项: # - 消费者数量是否正常 @@ -1034,7 +1034,7 @@ ORDER BY error_count DESC; ``` 排查步骤: 1. 检查消费者是否在线 - rabbitmqctl list_consumers -p dataflow-app + rabbitmqctl list_consumers -p datahub-app 2. 查看消费速率 管理界面 → Queues → 选择队列 → 查看 Deliver/Get rate @@ -1104,7 +1104,7 @@ ORDER BY error_count DESC; ```bash # 启用 lazy queue (大量消息堆积时) rabbitmqctl set_policy lazy-queue "^(orders|products|refunds)\.queue$" \ - '{"queue-mode":"lazy"}' --vhost dataflow-app + '{"queue-mode":"lazy"}' --vhost datahub-app # 效果: # - 消息存储在磁盘,减少内存占用 @@ -1133,12 +1133,12 @@ rabbitmqctl set_policy lazy-queue "^(orders|products|refunds)\.queue$" \ ```bash # 业务 Exchange rabbitmqadmin -u admin -p declare exchange \ - name="aliexpress.exchange" vhost=dataflow-app \ + name="aliexpress.exchange" vhost=datahub-app \ type=topic durable=true # 错误 Exchange rabbitmqadmin -u admin -p declare exchange \ - name="aliexpress.errors.exchange" vhost=dataflow-app \ + name="aliexpress.errors.exchange" vhost=datahub-app \ type=topic durable=true ``` @@ -1148,24 +1148,24 @@ rabbitmqadmin -u admin -p declare exchange \ # 绑定到业务队列(使用 {data_type}.{platform} 格式) rabbitmqadmin -u admin -p declare binding \ source="aliexpress.exchange" destination="orders.queue" \ - vhost=dataflow-app routing_key="order.aliexpress" + vhost=datahub-app routing_key="order.aliexpress" rabbitmqadmin -u admin -p declare binding \ source="aliexpress.exchange" destination="products.queue" \ - vhost=dataflow-app routing_key="product.aliexpress" + vhost=datahub-app routing_key="product.aliexpress" rabbitmqadmin -u admin -p declare binding \ source="aliexpress.exchange" destination="refunds.queue" \ - vhost=dataflow-app routing_key="refund.aliexpress" + vhost=datahub-app routing_key="refund.aliexpress" rabbitmqadmin -u admin -p declare binding \ source="aliexpress.exchange" destination="inventory.queue" \ - vhost=dataflow-app routing_key="inventory.aliexpress" + vhost=datahub-app routing_key="inventory.aliexpress" # 绑定到错误队列 rabbitmqadmin -u admin -p declare binding \ source="aliexpress.errors.exchange" destination="errors.queue" \ - vhost=dataflow-app routing_key="#" + vhost=datahub-app routing_key="#" ``` **步骤 3: 创建用户** @@ -1177,7 +1177,7 @@ rabbitmqadmin -u admin -p declare user \ # 配置权限 rabbitmqadmin -u admin -p declare permission \ - vhost=dataflow-app user="user_aliexpress" \ + vhost=datahub-app user="user_aliexpress" \ configure="" write="^aliexpress\.(exchange|errors\.exchange)$" \ read="^aliexpress\.errors\..*$" ``` @@ -1188,7 +1188,7 @@ rabbitmqadmin -u admin -p declare permission \ 连接信息: - Host: - Port: 5672 -- VHost: dataflow-app +- VHost: datahub-app - Username: user_aliexpress - Password: @@ -1422,7 +1422,7 @@ graph TB ```json { - "message_id": "order#dataflow#100#20#200#order#DY202501140001", + "message_id": "order#datahub#100#20#200#order#DY202501140001", "timestamp": "2025-01-14T10:30:00Z", "platform": "douyin", "data_type": "order", @@ -1459,7 +1459,7 @@ graph TB ```json { - "message_id": "product#dataflow#100#2#201#product#TM-PROD-789", + "message_id": "product#datahub#100#2#201#product#TM-PROD-789", "timestamp": "2025-01-14T11:00:00Z", "platform": "tmall", "data_type": "product", diff --git a/frontend/package.json b/frontend/package.json index 7a0459a..9e6b469 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,5 +1,5 @@ { - "name": "dataflow-frontend", + "name": "datahub-frontend", "version": "0.0.0", "private": true, "type": "module", diff --git a/frontend/src/components/Brand.vue b/frontend/src/components/Brand.vue index 54dbf8f..14677cc 100644 --- a/frontend/src/components/Brand.vue +++ b/frontend/src/components/Brand.vue @@ -37,7 +37,7 @@ const sloganSizeClasses = {