From b7a628e13cf3184b516ffc2643061a3ff14cd4e1 Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Mon, 17 Nov 2025 16:51:51 +0800 Subject: [PATCH] add mq test --- backend/.env.example | 19 +++- backend/app/Command/AppMqInit.php | 87 ++++++++++++++++ .../Platform/Tmall/Producer/OrderProducer.php | 99 +++++++++++++++++++ backend/bin/rabbitmq.sh | 3 +- backend/composer.json | 1 + backend/config/autoload/amqp.php | 50 ++++++++++ backend/config/autoload/databases.php | 19 ++++ docs/RabbitMQ.md | 19 +++- docs/todo.md | 2 + 9 files changed, 292 insertions(+), 7 deletions(-) create mode 100644 backend/app/Command/AppMqInit.php create mode 100644 backend/app/Platform/Tmall/Producer/OrderProducer.php create mode 100644 backend/config/autoload/amqp.php diff --git a/backend/.env.example b/backend/.env.example index 6879583..6c32b2f 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -11,7 +11,24 @@ DB_CHARSET=utf8mb4 DB_COLLATION=utf8mb4_unicode_ci DB_PREFIX= +AMQP_HOST=127.0.0.1 +AMQP_PORT=5672 +AMQP_USER="user_tmall" +AMQP_PASSWORD="change_me_tmall" +AMQP_VHOST="dataflow" + REDIS_HOST=localhost REDIS_AUTH=(null) REDIS_PORT=6379 -REDIS_DB=0 \ No newline at end of file +REDIS_DB=0 + +# JWT认证配置 +SIMPLE_JWT_SECRET=your-secret-key-change-this-in-production +JWT_HEADER_NAME=Authorization +SIMPLE_JWT_TTL=7200 +SIMPLE_JWT_REFRESH_TTL=2592000 +SIMPLE_JWT_PREFIX=dataflow + +TOOLS_HOST="https://store-api-v2.wpic-tools.com/" +TOOLS_TOKEN="123" + diff --git a/backend/app/Command/AppMqInit.php b/backend/app/Command/AppMqInit.php new file mode 100644 index 0000000..c92102e --- /dev/null +++ b/backend/app/Command/AppMqInit.php @@ -0,0 +1,87 @@ +setDescription('Insert mq test feeds'); + } + + public function handle() + { + $this->line('Insert MQ test feeds now...', 'info'); + + // shopee + // $company_id = 171; + // $platform_id = 25; + // $store_id = 255; + + // Tmall + $company_id = 188; + $platform_id = 2; + $store_id = 292; + + for ($i=0; $i < 2; $i++) { + + + $unique_id = \uniqid(); + + // 构建 $message_id 格式为 {prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{unique_id} + + $data = Db::connection('raw')->table('wpic_taobao_order') + ->select(['id', 'order_raw'])->orderBy('t_created', 'asc') + ->limit(20)->offset($i * 20) + ->get()->toArray(); + + $data = [ + 'platform_id' => $platform_id, + 'company_id' => $company_id, + 'store_id' => $store_id, + 'unique_id' => $unique_id, + 'raw_data' => $data, + ]; + + + $message = new TmallOrderProducer($data); + $result = $this->producerService->produce($message, true); + + + $log = \sprintf('%s %s %s %s %s', + $company_id, + $platform_id, + $store_id, + $unique_id, + $result + ); + + dump($log); + + } + + + + } +} diff --git a/backend/app/Platform/Tmall/Producer/OrderProducer.php b/backend/app/Platform/Tmall/Producer/OrderProducer.php new file mode 100644 index 0000000..623eafe --- /dev/null +++ b/backend/app/Platform/Tmall/Producer/OrderProducer.php @@ -0,0 +1,99 @@ + 2, // 持久化消息 + ]; + + /** + * 构造消息 + * + * @param array $data 订单数据 + * @return string + */ + public function __construct(array $data = []) + { + if (!empty($data)) { + $this->payload = $this->buildMessage($data); + } + } + + /** + * 构建消息格式 + * + * @param array $data 原始订单数据 + * @return array + */ + protected function buildMessage(array $data): array + { + // 根据 RabbitMQ.md 中定义的消息格式规范 + return [ + 'message_id' => $this->generateMessageId($data), + 'timestamp' => date('c'), // ISO 8601 格式 + 'platform' => 'tmall', + 'data_type' => 'order', + 'metadata' => [ + 'platform_id' => $data['platform_id'] ?? null, + 'company_id' => $data['company_id'] ?? null, + 'store_id' => $data['store_id'] ?? null, + 'source_system' => 'tmall-open-api', + 'retry_count' => 0, + 'data_version' => $data['data_version'] ?? time(), + ], + 'data' => $data['raw_data'], + ]; + } + + /** + * 生成消息ID + * + * 格式: {prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id} + * + * @param array $data + * @return string + */ + protected function generateMessageId(array $data): string + { + $company_id = $data['company_id']; + $platform_id = $data['platform_id']; + $store_id = $data['store_id']; + $entity_type = 'order'; + $unique_id = $data['unique_id']; + + return sprintf( + '%s#%s#%s#%s#%s', + $company_id, + $platform_id, + $store_id, + $entity_type, + $unique_id + ); + } +} diff --git a/backend/bin/rabbitmq.sh b/backend/bin/rabbitmq.sh index da5fae7..3b979f9 100755 --- a/backend/bin/rabbitmq.sh +++ b/backend/bin/rabbitmq.sh @@ -191,7 +191,8 @@ for platform in "${PLATFORMS[@]}"; do # 配置平台用户权限 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permissions --vhost "$VHOST" --user "user_${platform}" \ - --configure "" --write "^${platform}\\.(exchange|errors\\.exchange)$" \ + --configure "^${platform}\\.(exchange|errors\\.exchange)$" \ + --write "^${platform}\\.(exchange|errors\\.exchange)$" \ --read "^${platform}\\.errors\\..*$" echo "✓ 配置用户权限" done diff --git a/backend/composer.json b/backend/composer.json index 9e5cb7c..e5c4933 100644 --- a/backend/composer.json +++ b/backend/composer.json @@ -16,6 +16,7 @@ "composer": "*", "96qbhy/hyperf-auth": "^3.1", "casbin/casbin": "^4.0", + "hyperf/amqp": "^3.1", "hyperf/cache": "~3.1.0", "hyperf/command": "~3.1.0", "hyperf/config": "~3.1.0", diff --git a/backend/config/autoload/amqp.php b/backend/config/autoload/amqp.php new file mode 100644 index 0000000..2cac2c9 --- /dev/null +++ b/backend/config/autoload/amqp.php @@ -0,0 +1,50 @@ + true, + 'default' => [ + 'host' => env('AMQP_HOST', 'localhost'), + 'port' => (int) env('AMQP_PORT', 5672), + 'user' => env('AMQP_USER', 'guest'), + 'password' => env('AMQP_PASSWORD', 'guest'), + 'vhost' => env('AMQP_VHOST', '/'), + 'open_ssl' => false, + 'concurrent' => [ + 'limit' => 2, + ], + 'pool' => [ + 'connections' => 2, + ], + 'io' => IOFactory::class, + 'params' => [ + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'connection_timeout' => 3, + // Try to maintain twice value heartbeat as much as possible + 'read_write_timeout' => 6, + 'context' => null, + 'keepalive' => true, + // Try to ensure that the consumption time of each message is less than the heartbeat time as much as possible + 'heartbeat' => 3, + 'channel_rpc_timeout' => 0.0, + 'close_on_destruct' => false, + 'max_idle_channels' => 10, + 'connection_name' => null, + ], + ], +]; diff --git a/backend/config/autoload/databases.php b/backend/config/autoload/databases.php index 5cb588c..a1e1abb 100644 --- a/backend/config/autoload/databases.php +++ b/backend/config/autoload/databases.php @@ -41,4 +41,23 @@ return [ ], ], ], + 'raw' => [ + 'driver' => 'mysql', + 'host' => '127.0.0.1', + 'port' => 3380, + 'database' => 'wpic_task', + 'username' => 'tools', + 'password' => 'root', + 'charset' => 'utf8', + 'collation' => 'utf8_unicode_ci', + 'prefix' => '', + 'pool' => [ + 'min_connections' => 1, + 'max_connections' => 10, + 'connect_timeout' => 10.0, + 'wait_timeout' => 3.0, + 'heartbeat' => -1, + 'max_idle_time' => (float)env('DB_MAX_IDLE_TIME', 60), + ] + ], ]; diff --git a/docs/RabbitMQ.md b/docs/RabbitMQ.md index 48d60c7..4b5929d 100644 --- a/docs/RabbitMQ.md +++ b/docs/RabbitMQ.md @@ -30,6 +30,13 @@ ### 关键设计决策 +** 注意 ** +1. 消息内部字段无严格要求,但可以在业务侧进行约束 +2. 默认单条消息的最大 Payload 尺寸为 128MB +3. 消息队列为缓存系统,应该尽可能的将批量数据放入 payload\ +4. MQ 存在的意义在于快速接受远程生产的 Message + + #### 1. 为什么使用单队列? | 问题 | 传统多队列方案 | 本方案(单队列) | @@ -643,7 +650,7 @@ message_id 采用结构化格式,确保幂等性和可追溯性。 **格式**: ``` -{prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id} +{company_id}#{platform_id}#{store_id}#{entity_type}#{request_time_range/formated_suffix} ``` **字段说明**: @@ -653,7 +660,7 @@ message_id 采用结构化格式,确保幂等性和可追溯性。 - `platform_id`:平台 ID - `store_id`:店铺 ID - `entity_type`:数据实体类型(order/product/refund/inventory) -- `entity_id`:平台侧唯一标识(如订单号) +- `request_time_range/formated_suffix`:数据请求的时间区间/格式化的后缀,业务侧决定即可,需要业务侧维护幂等性 **分隔符说明**: - 使用 `#` 作为分隔符(RabbitMQ message_id 字段支持任意字符) @@ -661,7 +668,7 @@ message_id 采用结构化格式,确保幂等性和可追溯性。 **示例**: ``` -wpic-project1#dataflow#100#20#200#order#DY123456789 +wpic-project1#dataflow#100#20#200#order#2025-11-10~2025-11-15 解析: - 项目前缀: wpic-project1 @@ -1062,7 +1069,8 @@ for platform in "${PLATFORMS[@]}"; do # 配置平台用户权限 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permissions --vhost "$VHOST" --user "user_${platform}" \ - --configure "" --write "^${platform}\\.(exchange|errors\\.exchange)$" \ + --configure "^${platform}\\.(exchange|errors\\.exchange)$" \ + --write "^${platform}\\.(exchange|errors\\.exchange)$" \ --read "^${platform}\\.errors\\..*$" echo "✓ 配置用户权限" done @@ -1310,7 +1318,8 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ # 配置权限 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permissions --vhost dataflow-app --user "user_lazada" \ - --configure "" --write "^lazada\\.(exchange|errors\\.exchange)$" \ + --configure "^lazada\\.(exchange|errors\\.exchange)$" \ + --write "^lazada\\.(exchange|errors\\.exchange)$" \ --read "^lazada\\.errors\\..*$" ``` diff --git a/docs/todo.md b/docs/todo.md index f274f17..cf8a533 100644 --- a/docs/todo.md +++ b/docs/todo.md @@ -10,5 +10,7 @@ 8. 与 Tools 对比业务差异 9. 测试实时数据分析功能 10. 实时订单的数据展示可行性? +11. 移动订单生产者到统一的生产者目录,使用接口进行约束 +12. 消息推错进行清理 接口 --- \ No newline at end of file