From 11e2ea11819e6b3957bc98cf5b37bc14544386e4 Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Wed, 26 Nov 2025 09:44:55 +0800 Subject: [PATCH] update consumer and producer --- backend/app/Platform/OrderConsumer.php | 38 +++++++++ backend/app/Platform/OrderProducer.php | 114 +++++++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 backend/app/Platform/OrderConsumer.php create mode 100644 backend/app/Platform/OrderProducer.php diff --git a/backend/app/Platform/OrderConsumer.php b/backend/app/Platform/OrderConsumer.php new file mode 100644 index 0000000..7abb1da --- /dev/null +++ b/backend/app/Platform/OrderConsumer.php @@ -0,0 +1,38 @@ + 0, + // 同一个消费者,最高同时可以处理的消息数。 + 'prefetch_count' => 100, + // 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。 + 'global' => false, + ]; + + protected $entityType = 'order'; + + public function consumeMessage($data, AMQPMessage $message): Result + { + dump($data); + return Result::NACK; + } + + public function isEnable(): bool + { + return parent::isEnable(); + } +} diff --git a/backend/app/Platform/OrderProducer.php b/backend/app/Platform/OrderProducer.php new file mode 100644 index 0000000..afe5a6d --- /dev/null +++ b/backend/app/Platform/OrderProducer.php @@ -0,0 +1,114 @@ + 2, // 持久化消息 + ]; + + + protected string $exchange = ''; + protected string $entityType = 'order'; + protected string|array $routingKey = ''; + + + /** + * 构造消息 + * + * @param array $data 订单数据 + * @return string + */ + public function __construct(array $data = []) + { + if(empty($this->exchange)){ + throw new Exception('exchange is not set!'); + } + + if(empty($this->routingKey)){ + throw new Exception('routingKey is not set!'); + } + + if(!empty($this->routingKey)){ + $this->entityType = \explode( '.', $this->routingKey)[0]; + } + + if (!empty($data)) { + $this->payload = $this->buildMessage($data); + } + } + + /** + * 构建消息格式 + * + * @param array $data 原始订单数据 + * @return array + */ + protected function buildMessage(array $data): array + { + // 根据 RabbitMQ.md 中定义的消息格式规范 + return [ + 'message_id' => $this->generateMessageId($data), + 'timestamp' => date('c'), // ISO 8601 格式 + 'platform' => 'tmall', + 'data_type' => 'order', + 'metadata' => [ + 'platform_id' => $data['platform_id'] ?? null, + 'company_id' => $data['company_id'] ?? null, + 'store_id' => $data['store_id'] ?? null, + 'source_system' => 'tmall-open-api', + 'retry_count' => 0, + 'data_version' => $data['data_version'] ?? time(), + ], + 'data' => $data['raw_data'], + ]; + } + + /** + * 生成消息ID + * + * 格式: {prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id} + * + * @param array $data + * @return string + */ + protected function generateMessageId(array $data): string + { + $company_id = $data['company_id']; + $platform_id = $data['platform_id']; + $store_id = $data['store_id']; + + $unique_id = $data['unique_id']; + + return sprintf( + '%s#%s#%s#%s#%s', + $company_id, + $platform_id, + $store_id, + $this->entityType, + $unique_id + ); + } +}