add mq test

This commit is contained in:
2025-11-17 16:51:51 +08:00
parent 6b8943f07f
commit b7a628e13c
9 changed files with 292 additions and 7 deletions
+18 -1
View File
@@ -11,7 +11,24 @@ DB_CHARSET=utf8mb4
DB_COLLATION=utf8mb4_unicode_ci
DB_PREFIX=
AMQP_HOST=127.0.0.1
AMQP_PORT=5672
AMQP_USER="user_tmall"
AMQP_PASSWORD="change_me_tmall"
AMQP_VHOST="dataflow"
REDIS_HOST=localhost
REDIS_AUTH=(null)
REDIS_PORT=6379
REDIS_DB=0
REDIS_DB=0
# JWT认证配置
SIMPLE_JWT_SECRET=your-secret-key-change-this-in-production
JWT_HEADER_NAME=Authorization
SIMPLE_JWT_TTL=7200
SIMPLE_JWT_REFRESH_TTL=2592000
SIMPLE_JWT_PREFIX=dataflow
TOOLS_HOST="https://store-api-v2.wpic-tools.com/"
TOOLS_TOKEN="123"
+87
View File
@@ -0,0 +1,87 @@
<?php
declare(strict_types=1);
namespace App\Command;
use App\Model\User;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\DbConnection\Db;
use Psr\Container\ContainerInterface;
use App\Platform\Tmall\Producer\OrderProducer as TmallOrderProducer;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Amqp\Producer;
#[Command]
class AppMqInit extends HyperfCommand
{
#[Inject]
private Producer $producerService;
public function __construct(protected ContainerInterface $container)
{
parent::__construct('app:mq:init');
}
public function configure()
{
parent::configure();
$this->setDescription('Insert mq test feeds');
}
public function handle()
{
$this->line('Insert MQ test feeds now...', 'info');
// shopee
// $company_id = 171;
// $platform_id = 25;
// $store_id = 255;
// Tmall
$company_id = 188;
$platform_id = 2;
$store_id = 292;
for ($i=0; $i < 2; $i++) {
$unique_id = \uniqid();
// 构建 $message_id 格式为 {prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{unique_id}
$data = Db::connection('raw')->table('wpic_taobao_order')
->select(['id', 'order_raw'])->orderBy('t_created', 'asc')
->limit(20)->offset($i * 20)
->get()->toArray();
$data = [
'platform_id' => $platform_id,
'company_id' => $company_id,
'store_id' => $store_id,
'unique_id' => $unique_id,
'raw_data' => $data,
];
$message = new TmallOrderProducer($data);
$result = $this->producerService->produce($message, true);
$log = \sprintf('%s %s %s %s %s',
$company_id,
$platform_id,
$store_id,
$unique_id,
$result
);
dump($log);
}
}
}
@@ -0,0 +1,99 @@
<?php
declare(strict_types=1);
namespace App\Platform\Tmall\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
#[Producer]
class OrderProducer extends ProducerMessage
{
/**
* Exchange 名称
*/
protected string $exchange = 'tmall.exchange';
/**
* Routing key
*/
protected string|array $routingKey = 'order.tmall';
/**
* VHost
*/
protected string $vhost = 'dataflow';
/**
* 消息持久化
*/
protected array $properties = [
'delivery_mode' => 2, // 持久化消息
];
/**
* 构造消息
*
* @param array $data 订单数据
* @return string
*/
public function __construct(array $data = [])
{
if (!empty($data)) {
$this->payload = $this->buildMessage($data);
}
}
/**
* 构建消息格式
*
* @param array $data 原始订单数据
* @return array
*/
protected function buildMessage(array $data): array
{
// 根据 RabbitMQ.md 中定义的消息格式规范
return [
'message_id' => $this->generateMessageId($data),
'timestamp' => date('c'), // ISO 8601 格式
'platform' => 'tmall',
'data_type' => 'order',
'metadata' => [
'platform_id' => $data['platform_id'] ?? null,
'company_id' => $data['company_id'] ?? null,
'store_id' => $data['store_id'] ?? null,
'source_system' => 'tmall-open-api',
'retry_count' => 0,
'data_version' => $data['data_version'] ?? time(),
],
'data' => $data['raw_data'],
];
}
/**
* 生成消息ID
*
* 格式: {prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id}
*
* @param array $data
* @return string
*/
protected function generateMessageId(array $data): string
{
$company_id = $data['company_id'];
$platform_id = $data['platform_id'];
$store_id = $data['store_id'];
$entity_type = 'order';
$unique_id = $data['unique_id'];
return sprintf(
'%s#%s#%s#%s#%s',
$company_id,
$platform_id,
$store_id,
$entity_type,
$unique_id
);
}
}
+2 -1
View File
@@ -191,7 +191,8 @@ for platform in "${PLATFORMS[@]}"; do
# 配置平台用户权限
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_${platform}" \
--configure "" --write "^${platform}\\.(exchange|errors\\.exchange)$" \
--configure "^${platform}\\.(exchange|errors\\.exchange)$" \
--write "^${platform}\\.(exchange|errors\\.exchange)$" \
--read "^${platform}\\.errors\\..*$"
echo "✓ 配置用户权限"
done
+1
View File
@@ -16,6 +16,7 @@
"composer": "*",
"96qbhy/hyperf-auth": "^3.1",
"casbin/casbin": "^4.0",
"hyperf/amqp": "^3.1",
"hyperf/cache": "~3.1.0",
"hyperf/command": "~3.1.0",
"hyperf/config": "~3.1.0",
+50
View File
@@ -0,0 +1,50 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
use Hyperf\Amqp\IO\IOFactory;
use function Hyperf\Support\env;
return [
'enable' => true,
'default' => [
'host' => env('AMQP_HOST', 'localhost'),
'port' => (int) env('AMQP_PORT', 5672),
'user' => env('AMQP_USER', 'guest'),
'password' => env('AMQP_PASSWORD', 'guest'),
'vhost' => env('AMQP_VHOST', '/'),
'open_ssl' => false,
'concurrent' => [
'limit' => 2,
],
'pool' => [
'connections' => 2,
],
'io' => IOFactory::class,
'params' => [
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3,
// Try to maintain twice value heartbeat as much as possible
'read_write_timeout' => 6,
'context' => null,
'keepalive' => true,
// Try to ensure that the consumption time of each message is less than the heartbeat time as much as possible
'heartbeat' => 3,
'channel_rpc_timeout' => 0.0,
'close_on_destruct' => false,
'max_idle_channels' => 10,
'connection_name' => null,
],
],
];
+19
View File
@@ -41,4 +41,23 @@ return [
],
],
],
'raw' => [
'driver' => 'mysql',
'host' => '127.0.0.1',
'port' => 3380,
'database' => 'wpic_task',
'username' => 'tools',
'password' => 'root',
'charset' => 'utf8',
'collation' => 'utf8_unicode_ci',
'prefix' => '',
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => (float)env('DB_MAX_IDLE_TIME', 60),
]
],
];
+14 -5
View File
@@ -30,6 +30,13 @@
### 关键设计决策
** 注意 **
1. 消息内部字段无严格要求,但可以在业务侧进行约束
2. 默认单条消息的最大 Payload 尺寸为 128MB
3. 消息队列为缓存系统,应该尽可能的将批量数据放入 payload\
4. MQ 存在的意义在于快速接受远程生产的 Message
#### 1. 为什么使用单队列?
| 问题 | 传统多队列方案 | 本方案(单队列) |
@@ -643,7 +650,7 @@ message_id 采用结构化格式,确保幂等性和可追溯性。
**格式**
```
{prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id}
{company_id}#{platform_id}#{store_id}#{entity_type}#{request_time_range/formated_suffix}
```
**字段说明**
@@ -653,7 +660,7 @@ message_id 采用结构化格式,确保幂等性和可追溯性。
- `platform_id`:平台 ID
- `store_id`:店铺 ID
- `entity_type`:数据实体类型(order/product/refund/inventory
- `entity_id`:平台侧唯一标识(如订单号)
- `request_time_range/formated_suffix`:数据请求的时间区间/格式化的后缀,业务侧决定即可,需要业务侧维护幂等性
**分隔符说明**
- 使用 `#` 作为分隔符(RabbitMQ message_id 字段支持任意字符)
@@ -661,7 +668,7 @@ message_id 采用结构化格式,确保幂等性和可追溯性。
**示例**
```
wpic-project1#dataflow#100#20#200#order#DY123456789
wpic-project1#dataflow#100#20#200#order#2025-11-10~2025-11-15
解析:
- 项目前缀: wpic-project1
@@ -1062,7 +1069,8 @@ for platform in "${PLATFORMS[@]}"; do
# 配置平台用户权限
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost "$VHOST" --user "user_${platform}" \
--configure "" --write "^${platform}\\.(exchange|errors\\.exchange)$" \
--configure "^${platform}\\.(exchange|errors\\.exchange)$" \
--write "^${platform}\\.(exchange|errors\\.exchange)$" \
--read "^${platform}\\.errors\\..*$"
echo "✓ 配置用户权限"
done
@@ -1310,7 +1318,8 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ
# 配置权限
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
declare permissions --vhost dataflow-app --user "user_lazada" \
--configure "" --write "^lazada\\.(exchange|errors\\.exchange)$" \
--configure "^lazada\\.(exchange|errors\\.exchange)$" \
--write "^lazada\\.(exchange|errors\\.exchange)$" \
--read "^lazada\\.errors\\..*$"
```
+2
View File
@@ -10,5 +10,7 @@
8. 与 Tools 对比业务差异
9. 测试实时数据分析功能
10. 实时订单的数据展示可行性?
11. 移动订单生产者到统一的生产者目录,使用接口进行约束
12. 消息推错进行清理 接口
---