From e9068ac73d72e77e7c0ab0703d39bbd7538c477c Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Thu, 27 Nov 2025 15:03:25 +0800 Subject: [PATCH] update entiity parse --- .../app/Command/AppMessageQueuePushTmall.php | 69 ++++--- backend/app/Command/AppMqClear.php | 125 ++++++++++++ backend/app/Command/AppMqStatus.php | 189 ++++++++++++++++++ backend/app/Entity/Parse/EntityParse.php | 105 ++++++---- .../app/Entity/Parse/EntityParseFactory.php | 97 +++++---- .../app/Entity/Parse/EntityParseInterface.php | 49 +++-- backend/app/Platform/OrderConsumer.php | 57 +++++- 7 files changed, 569 insertions(+), 122 deletions(-) create mode 100644 backend/app/Command/AppMqClear.php create mode 100644 backend/app/Command/AppMqStatus.php diff --git a/backend/app/Command/AppMessageQueuePushTmall.php b/backend/app/Command/AppMessageQueuePushTmall.php index 515e09d..cfc4910 100644 --- a/backend/app/Command/AppMessageQueuePushTmall.php +++ b/backend/app/Command/AppMessageQueuePushTmall.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace App\Command; +use Hyperf\Collection\LazyCollection; use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Annotation\Command; use Psr\Container\ContainerInterface; @@ -19,46 +20,52 @@ class AppMessageQueuePushTmall extends HyperfCommand { parent::__construct('app:mq-push:tmall'); } - + public function configure() { parent::configure(); $this->setDescription('Test push message with Tmall km data'); } - - public function handle() + + public function handle(): void { try { // 从 raw 数据库连接获取数据 $orders = Db::connection('raw') - ->table('wpic_taobao_order') - ->orderBy('id', 'desc') - ->limit(10) - ->get() - ->toArray(); - - if (empty($orders)) { + ->table('wpic_taobao_order')->orderBy('id', 'desc') + ->limit(10)->get('order_raw')->lazy(); + + // dump($orders->first()); + // return; + + if ($orders->isEmpty()) { $this->warn('No orders found in wpic_taobao_order table'); - return 0; + return; } - - $this->info(sprintf('Found %d orders, processing...', count($orders))); - + + $this->info(sprintf('Found %d orders, processing...', $orders->count())); + // 获取 Producer 实例 $producer = $this->container->get(Producer::class); - - // 每 2 条记录组成一条消息 - $chunks = array_chunk($orders, 2); + + // 每 2 条记录组成一条消息 - 实际生产环境需要增大这个值 + // $orders->chunk(2)->each(function($collection) use ($producer) { + $messageCount = 0; + + $orders->chunk(2)->each(function (LazyCollection $collection) use ($producer, &$messageCount) { - foreach ($chunks as $index => $chunk) { - // 构造消息数据(根据实际表结构调整字段映射) + $order_data = $collection->pluck('order_raw')->map(function ($item) { + return json_decode($item, true); + })->toArray(); + + //@ATTENTION 生产环境需要注意, 暂时使用 KM 进行测试 $messageData = [ - 'company_id' => $chunk[0]->company_id ?? 'default_company', - 'platform_id' => $chunk[0]->platform_id ?? 'tmall', - 'store_id' => $chunk[0]->store_id ?? 'default_store', - 'unique_id' => implode('_', array_column($chunk, 'id')), - 'raw_data' => $chunk, // 包含 2 条原始记录 + 'company_id' => 188, + 'platform_id' => 2, + 'store_id' => 292, + 'unique_id' => uniqid() . '_' . time(), + 'raw_data' => $order_data, // 包含 2 条原始记录 ]; // 创建并发送消息 @@ -66,19 +73,21 @@ class AppMessageQueuePushTmall extends HyperfCommand $producer->produce($message); $messageCount++; + $this->line(sprintf('Sent message %d with order IDs: %s', $messageCount, - $messageData['unique_id'] + $messageData['unique_id'], )); - } - + }); + + $this->info(sprintf('Successfully sent %d messages to RabbitMQ', $messageCount)); - return 0; - + return; + } catch (Exception $e) { $this->error('Error pushing messages: ' . $e->getMessage()); $this->error($e->getTraceAsString()); - return 1; + return; } } } diff --git a/backend/app/Command/AppMqClear.php b/backend/app/Command/AppMqClear.php new file mode 100644 index 0000000..273bec2 --- /dev/null +++ b/backend/app/Command/AppMqClear.php @@ -0,0 +1,125 @@ +setDescription('Clear all messages from a specified queue (development mode only)'); + $this->addArgument('queue', InputArgument::REQUIRED, 'The queue name to clear'); + $this->addOption('force', 'f', null, 'Force clear without confirmation'); + } + + public function handle() + { + $queueName = $this->input->getArgument('queue'); + $force = $this->input->getOption('force'); + + $this->warn("You are about to clear all messages from queue: {$queueName}"); + + // 如果不是强制模式,需要确认 + if (!$force) { + $confirm = $this->confirm('Are you sure you want to continue?', false); + if (!$confirm) { + $this->info('Operation cancelled.'); + return 0; + } + } + + try { + $config = $this->container->get(ConfigInterface::class); + $consumerConfig = $config->get('amqp.default_consumer'); + + // 创建连接 + $connection = new AMQPStreamConnection( + $consumerConfig['host'], + $consumerConfig['port'], + $consumerConfig['user'], + $consumerConfig['password'], + $consumerConfig['vhost'], + false, + 'AMQPLAIN', + null, + 'en_US', + $consumerConfig['params']['connection_timeout'] ?? 3.0, + $consumerConfig['params']['read_write_timeout'] ?? 3.0, + null, + $consumerConfig['params']['keepalive'] ?? false, + $consumerConfig['params']['heartbeat'] ?? 0 + ); + + $channel = $connection->channel(); + + // 先检查队列是否存在并获取当前消息数 + try { + [$queue, $messageCount, $consumerCount] = $channel->queue_declare( + $queueName, + true, // passive - 只检查,不创建 + true, // durable + false, // exclusive + false // auto_delete + ); + + $this->line("Queue '{$queueName}' has {$messageCount} messages before clearing.", 'info'); + + if ($messageCount === 0) { + $this->info('Queue is already empty. Nothing to clear.'); + $channel->close(); + $connection->close(); + return 0; + } + + // 清除队列中的所有消息 + $channel->queue_purge($queueName); + + // 再次检查确认已清除 + [$queue, $remainingCount, $consumerCount] = $channel->queue_declare( + $queueName, + true, // passive + true, // durable + false, // exclusive + false // auto_delete + ); + + $this->line(''); + $this->info("Successfully cleared {$messageCount} messages from queue '{$queueName}'."); + $this->line("Remaining messages: {$remainingCount}", 'comment'); + + } catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) { + $this->error("Queue '{$queueName}' does not exist."); + $this->line('Available queues can be found using: php bin/hyperf.php app:mq:status', 'comment'); + $channel->close(); + $connection->close(); + return 1; + } + + // 关闭连接 + $channel->close(); + $connection->close(); + + return 0; + + } catch (\Exception $e) { + $this->error('Failed to clear queue: ' . $e->getMessage()); + $this->line('Trace: ' . $e->getTraceAsString(), 'comment'); + return 1; + } + } +} diff --git a/backend/app/Command/AppMqStatus.php b/backend/app/Command/AppMqStatus.php new file mode 100644 index 0000000..4aa3010 --- /dev/null +++ b/backend/app/Command/AppMqStatus.php @@ -0,0 +1,189 @@ +setDescription('Display message counts for all accessible queues'); + $this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds (default: 3)'); + $this->addOption('interval', 'i', \Symfony\Component\Console\Input\InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3'); + } + + public function handle() + { + $watchMode = $this->input->getOption('watch'); + $interval = (int) $this->input->getOption('interval'); + + if ($watchMode) { + $this->info("Watch mode enabled. Refreshing every {$interval} seconds. Press Ctrl+C to exit."); + $this->line(''); + + while (true) { + // 清屏(对于支持 ANSI 的终端) + $this->output->write("\033[2J\033[H"); + $this->displayQueueStatus(); + sleep($interval); + } + } else { + return $this->displayQueueStatus(); + } + } + + private function displayQueueStatus(): int + { + $this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info'); + + try { + $config = $this->container->get(ConfigInterface::class); + $consumerConfig = $config->get('amqp.default_consumer'); + + // 创建连接 + $connection = new AMQPStreamConnection( + $consumerConfig['host'], + $consumerConfig['port'], + $consumerConfig['user'], + $consumerConfig['password'], + $consumerConfig['vhost'], + false, + 'AMQPLAIN', + null, + 'en_US', + $consumerConfig['params']['connection_timeout'] ?? 3.0, + $consumerConfig['params']['read_write_timeout'] ?? 3.0, + null, + $consumerConfig['params']['keepalive'] ?? false, + $consumerConfig['params']['heartbeat'] ?? 0 + ); + + $channel = $connection->channel(); + + // 获取所有队列信息 + $queues = $this->getQueuesFromAnnotations(); + + $tableData = []; + $totalMessages = 0; + + foreach ($queues as $queueName) { + try { + // 使用 passive=true 来获取队列信息而不创建队列 + // queue_declare 返回 [queue_name, message_count, consumer_count] + [$queue, $messageCount, $consumerCount] = $channel->queue_declare( + $queueName, + true, // passive + true, // durable + false, // exclusive + false // auto_delete + ); + + $tableData[] = [ + 'queue' => $queueName, + 'messages' => $messageCount, + 'consumers' => $consumerCount, + 'status' => $messageCount > 0 ? 'Has Messages' : 'Empty', + ]; + + $totalMessages += $messageCount; + } catch (\Exception $e) { + $tableData[] = [ + 'queue' => $queueName, + 'messages' => 'N/A', + 'consumers' => 'N/A', + 'status' => 'Error: ' . $e->getMessage() . '', + ]; + } + } + + // 关闭连接 + $channel->close(); + $connection->close(); + + // 显示表格 + if (empty($tableData)) { + $this->warn('No queues found.'); + return 0; + } + + $this->table( + ['Queue Name', 'Messages', 'Consumers', 'Status'], + array_map(function($row) { + return [$row['queue'], $row['messages'], $row['consumers'], $row['status']]; + }, $tableData) + ); + + $this->line(''); + $this->info("Total messages across all queues: {$totalMessages}"); + + return 0; + } catch (\Exception $e) { + $this->error('Failed to fetch queue status: ' . $e->getMessage()); + $this->line('Trace: ' . $e->getTraceAsString(), 'comment'); + return 1; + } + } + + /** + * 扫描所有使用 Consumer 注解的类,获取队列名称 + */ + private function getQueuesFromAnnotations(): array + { + $queues = []; + + // 扫描 Platform 目录下的所有 Consumer + $platformPath = BASE_PATH . '/app/Platform'; + + if (is_dir($platformPath)) { + $this->scanDirectory($platformPath, $queues); + } + + // 如果没有找到任何队列,返回一些默认的队列名称 + if (empty($queues)) { + $queues = ['orders.queue']; + } + + return array_unique($queues); + } + + /** + * 递归扫描目录,查找包含 Consumer 注解的类 + */ + private function scanDirectory(string $path, array &$queues): void + { + $files = scandir($path); + + foreach ($files as $file) { + if ($file === '.' || $file === '..') { + continue; + } + + $filePath = $path . '/' . $file; + + if (is_dir($filePath)) { + $this->scanDirectory($filePath, $queues); + } elseif (is_file($filePath) && pathinfo($filePath, PATHINFO_EXTENSION) === 'php') { + $content = file_get_contents($filePath); + + // 查找 Consumer 注解中的 queue 参数 + if (preg_match('/#\[Consumer\([^)]*queue:\s*["\']([^"\']+)["\']/', $content, $matches)) { + $queues[] = $matches[1]; + } + } + } + } +} diff --git a/backend/app/Entity/Parse/EntityParse.php b/backend/app/Entity/Parse/EntityParse.php index 6920e1e..6a46287 100644 --- a/backend/app/Entity/Parse/EntityParse.php +++ b/backend/app/Entity/Parse/EntityParse.php @@ -9,7 +9,7 @@ use App\Model\Company; use App\Model\Platform; use App\Model\Store; use App\Entity\Parse\Traits\EntityParseHelper; -use Hyperf\Amqp\Message\ConsumerMessageInterface; +use Hyperf\Collection\LazyCollection; use InvalidArgumentException; /** @@ -18,13 +18,13 @@ use InvalidArgumentException; * 使用工厂方法模式 + 延迟初始化 * 提供消息解析的通用框架和默认实现 * - * @method static static create(ConsumerMessageInterface $message) + * @method static static create(array $data) */ abstract class EntityParse implements EntityParseInterface { use EntityParseHelper; - protected ConsumerMessageInterface $message; + protected array $data; protected ?Platform $platform = null; protected ?Company $company = null; protected ?Store $store = null; @@ -41,30 +41,67 @@ abstract class EntityParse implements EntityParseInterface /** * 工厂方法:创建解析器实例 * - * @param ConsumerMessageInterface $message + * @param array $data * @return static * @throws InvalidArgumentException */ - public static function create(ConsumerMessageInterface $message): static + public static function create(array $data): static { $instance = new static(); - $instance->message = $message; + $instance->data = $data; + + // 在初始化前先验证数据 + if (!$instance->messageValidate($data)) { + throw new InvalidArgumentException('Message validation failed: required fields missing'); + } + $instance->initialize(); return $instance; } + /** + * 消息数据验证 + * + * 默认实现:验证必需字段 + * 子类可以覆写以添加自定义验证逻辑 + * + * @param array $data + * @return bool + */ + public function messageValidate(array $data): bool + { + // 验证必需字段 + $requiredFields = ['company_id', 'platform_id', 'store_id', 'unique_id', 'raw_data']; + + foreach ($requiredFields as $field) { + if (!isset($data[$field])) { + return false; + } + } + + return true; + } + /** * 延迟初始化 - * 在 message 设置后执行作用域匹配和验证 + * 在 data 设置后执行作用域匹配和验证 * * @return void * @throws InvalidArgumentException */ protected function initialize(): void { - $this->platform = $this->platformScopeMatch($this->message); - $this->company = $this->companyScopeMatch($this->message); - $this->store = $this->storeScopeMatch($this->message); + // 提取 metadata + $metadata = [ + 'company_id' => $this->data['company_id'], + 'platform_id' => $this->data['platform_id'], + 'store_id' => $this->data['store_id'], + 'unique_id' => $this->data['unique_id'], + ]; + + $this->platform = $this->platformScopeMatch($metadata); + $this->company = $this->companyScopeMatch($metadata); + $this->store = $this->storeScopeMatch($metadata); $this->validateScope(); } @@ -93,47 +130,46 @@ abstract class EntityParse implements EntityParseInterface /** * 平台作用域匹配 - 提供默认实现 * - * 从 exchange 名称中提取平台信息 + * 从 metadata 中提取平台信息 * 子类可以覆盖此方法以实现自定义逻辑 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return Platform * @throws InvalidArgumentException */ - public function platformScopeMatch(ConsumerMessageInterface $message): Platform + public function platformScopeMatch(array $metadata): Platform { - return $this->extractPlatformFromExchange($message); + return $this->extractPlatformFromMetadata($metadata); } /** * 实体匹配 - 提供默认实现 * - * 从 routing key 中提取实体类型 + * 从 metadata 中提取实体类型 * 子类可以覆盖此方法以实现自定义逻辑 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return Entity * @throws InvalidArgumentException */ - public function entityMatch(ConsumerMessageInterface $message): Entity + public function entityMatch(array $metadata): Entity { - return $this->extractEntityFromRoutingKey($message); + return $this->extractEntityFromMetadata($metadata); } /** * 唯一标识符提取 - 提供默认实现 * - * 默认从消息体中提取 id 或 unique_id + * 默认从 metadata 中提取 unique_id * 子类可以覆盖此方法以实现自定义逻辑 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return string|int * @throws InvalidArgumentException */ - public function entityUniqueIdentifierExtract(ConsumerMessageInterface $message): string|int + public function entityUniqueIdentifierExtract(array $metadata): string|int { - $data = $this->extractMessageData($message); - return $this->extractUniqueIdentifier($data); + return $metadata['unique_id'] ?? throw new InvalidArgumentException('unique_id not found in metadata'); } /** @@ -141,42 +177,41 @@ abstract class EntityParse implements EntityParseInterface * * 必须由子类实现,因为不同平台的公司识别逻辑不同 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return Company */ - abstract public function companyScopeMatch(ConsumerMessageInterface $message): Company; + abstract public function companyScopeMatch(array $metadata): Company; /** * 店铺作用域匹配 - 抽象方法 * * 必须由子类实现,因为不同平台的店铺识别逻辑不同 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return Store */ - abstract public function storeScopeMatch(ConsumerMessageInterface $message): Store; + abstract public function storeScopeMatch(array $metadata): Store; /** * 实体数据映射 - 抽象方法 * * 必须由子类实现,因为不同平台的数据结构不同 * - * @param array $data - * @param Entity $entity - * @return Entity + * @param array $rawData + * @return LazyCollection */ - abstract public function entityMap(array $data, Entity $entity): Entity; + abstract public function entityMap(array $rawData): LazyCollection; // ==================== Getter 方法 ==================== /** - * 获取消息对象 + * 获取消息数据 * - * @return ConsumerMessageInterface + * @return array */ - public function getMessage(): ConsumerMessageInterface + public function getData(): array { - return $this->message; + return $this->data; } /** diff --git a/backend/app/Entity/Parse/EntityParseFactory.php b/backend/app/Entity/Parse/EntityParseFactory.php index 5db817f..dd225c7 100644 --- a/backend/app/Entity/Parse/EntityParseFactory.php +++ b/backend/app/Entity/Parse/EntityParseFactory.php @@ -9,6 +9,7 @@ use Hyperf\Contract\ConfigInterface; use Hyperf\Stringable\Str; use InvalidArgumentException; use Psr\Container\ContainerInterface; +use PhpAmqpLib\Message\AMQPMessage; /** * EntityParseFactory 工厂类 @@ -102,66 +103,90 @@ class EntityParseFactory /** * 根据消息自动创建 Parser 实例(静态方法) * - * @param ConsumerMessageInterface $message - * @param string|null $entityType 实体类型(可选,如果不指定则从 routing key 中提取) + * @param AMQPMessage $message + * @param string|null $entityType 实体类型(可选,如果不指定则从 payload 中提取) * @return EntityParseInterface * @throws InvalidArgumentException */ public static function createFromMessage( - ConsumerMessageInterface $message, + AMQPMessage $message, ?string $entityType = null ): EntityParseInterface { - // 1. 从 exchange 中提取平台名称 - $platformName = self::extractPlatformName($message); + // 1. 从消息体中解析数据 + $data = json_decode($message->getBody(), true); - // 2. 如果未指定实体类型,从 routing key 中提取 - if ($entityType === null) { - $entityType = self::extractEntityType($message); + if (!is_array($data)) { + throw new InvalidArgumentException('Invalid message body: expected JSON array'); } - // 3. 获取对应的 Parser 类 + // 2. 从 data 中提取平台名称 + $platformName = self::extractPlatformNameFromData($data); + + // 3. 如果未指定实体类型,从 data 或 routing key 中提取 + if ($entityType === null) { + $entityType = self::extractEntityTypeFromData($data, $message); + } + + // 4. 获取对应的 Parser 类 $parserClass = self::resolveParserClass($platformName, $entityType); - // 4. 创建并返回 Parser 实例 - return $parserClass::create($message); + // 5. 创建并返回 Parser 实例,传递解析后的数据 + return $parserClass::create($data); } /** - * 从 exchange 中提取平台名称 + * 从数据中提取平台名称 * - * 规则:exchange 格式为 "platform.exchange" - * 例如:tmall.exchange -> tmall - * - * @param ConsumerMessageInterface $message + * @param array $data * @return string */ - private static function extractPlatformName(ConsumerMessageInterface $message): string + private static function extractPlatformNameFromData(array $data): string { - $exchange = $message->getExchange(); - - $platformName = Str::of($exchange) - ->before('.') - ->lower() - ->toString(); - - if (empty($platformName)) { - throw new InvalidArgumentException("Cannot extract platform name from exchange: {$exchange}"); + // 优先从 platform_id 获取,如果没有则尝试从其他字段获取 + if (isset($data['platform_id'])) { + // 这里可以根据 platform_id 映射到平台名称 + // 简化处理:假设需要从数据库或配置中查找 + // 当前先直接使用 platform_id 作为平台名称 + return self::resolvePlatformName($data['platform_id']); } - return $platformName; + throw new InvalidArgumentException("Cannot extract platform name from data: platform_id missing"); } /** - * 从 routing key 中提取实体类型 + * 根据 platform_id 解析平台名称 * - * 规则:routing key 格式为 "entity.action" - * 例如:order.create -> order - * - * @param ConsumerMessageInterface $message + * @param mixed $platformId * @return string */ - private static function extractEntityType(ConsumerMessageInterface $message): string + private static function resolvePlatformName($platformId): string { + // TODO: 从配置或数据库中根据 platform_id 查找平台名称 + // 临时方案:使用简单的映射 + $platformMap = [ + 1 => 'taobao', + 2 => 'tmall', + 3 => 'jd', + // ... 其他平台 + ]; + + if (isset($platformMap[$platformId])) { + return $platformMap[$platformId]; + } + + throw new InvalidArgumentException("Unknown platform_id: {$platformId}"); + } + + /** + * 从数据或路由键中提取实体类型 + * + * @param array $data + * @param AMQPMessage $message + * @return string + */ + private static function extractEntityTypeFromData(array $data, AMQPMessage $message): string + { + // 优先从 routing key 中提取 $routingKey = $message->getRoutingKey(); $entityType = Str::of($routingKey) @@ -169,11 +194,11 @@ class EntityParseFactory ->lower() ->toString(); - if (empty($entityType)) { - throw new InvalidArgumentException("Cannot extract entity type from routing key: {$routingKey}"); + if (!empty($entityType)) { + return $entityType; } - return $entityType; + throw new InvalidArgumentException("Cannot extract entity type from routing key: {$routingKey}"); } /** diff --git a/backend/app/Entity/Parse/EntityParseInterface.php b/backend/app/Entity/Parse/EntityParseInterface.php index 5f9bbbd..1f2452c 100644 --- a/backend/app/Entity/Parse/EntityParseInterface.php +++ b/backend/app/Entity/Parse/EntityParseInterface.php @@ -4,11 +4,11 @@ declare(strict_types=1); namespace App\Entity\Parse; -use Hyperf\Amqp\Message\ConsumerMessageInterface; use App\Model\Company; use App\Model\Platform; use App\Model\Store; use App\Model\Model as Entity; +use Hyperf\Collection\LazyCollection; /** * EntityParseInterface 接口 @@ -17,61 +17,74 @@ use App\Model\Model as Entity; */ interface EntityParseInterface { + /** + * 消息数据验证 + * + * 验证消息数据是否包含必需字段 + * + * @param array $data + * @return bool + */ + public function messageValidate(array $data): bool; + /** * 公司作用域匹配 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return Company */ - public function companyScopeMatch(ConsumerMessageInterface $message): Company; + public function companyScopeMatch(array $metadata): Company; /** * 平台作用域匹配 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return Platform */ - public function platformScopeMatch(ConsumerMessageInterface $message): Platform; + public function platformScopeMatch(array $metadata): Platform; /** * 店铺作用域匹配 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return Store */ - public function storeScopeMatch(ConsumerMessageInterface $message): Store; + public function storeScopeMatch(array $metadata): Store; /** * 实体类型匹配 * - * @param ConsumerMessageInterface $message + * 根据 metadata 返回实体模板实例 + * + * @param array $metadata * @return Entity */ - public function entityMatch(ConsumerMessageInterface $message): Entity; + public function entityMatch(array $metadata): Entity; /** * 实体数据映射 * - * @param array $data - * @param Entity $entity - * @return Entity + * 将原始数据转换为可填充到 Model 的数据集合 + * + * @param array $rawData + * @return LazyCollection */ - public function entityMap(array $data, Entity $entity): Entity; + public function entityMap(array $rawData): LazyCollection; /** * 提取实体唯一标识符 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return string|int */ - public function entityUniqueIdentifierExtract(ConsumerMessageInterface $message): string|int; + public function entityUniqueIdentifierExtract(array $metadata): string|int; /** - * 获取消息对象 + * 获取消息数据 * - * @return ConsumerMessageInterface + * @return array */ - public function getMessage(): ConsumerMessageInterface; + public function getData(): array; /** * 获取平台对象 diff --git a/backend/app/Platform/OrderConsumer.php b/backend/app/Platform/OrderConsumer.php index 7abb1da..ccac5b2 100644 --- a/backend/app/Platform/OrderConsumer.php +++ b/backend/app/Platform/OrderConsumer.php @@ -4,11 +4,14 @@ declare(strict_types=1); namespace App\Platform; -use Exception; +use App\Entity\Parse\EntityParseFactory; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Result; use PhpAmqpLib\Message\AMQPMessage; +use Hyperf\Di\Annotation\Inject; +use Hyperf\DbConnection\Db; +use Throwable; #[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", nums: 1, enable: true)] class OrderConsumer extends ConsumerMessage @@ -24,11 +27,59 @@ class OrderConsumer extends ConsumerMessage ]; protected $entityType = 'order'; + + #[Inject] + protected EntityParseFactory $entityParseFactory; public function consumeMessage($data, AMQPMessage $message): Result { - dump($data); - return Result::NACK; + + // dump('---data'); + // dump($data); + // dump('---'); + + dump('---message'); + dump( json_decode($message->getBody(), true)); + dump('---'); + + + $parse = $this->entityParseFactory->createFromMessage($message); + + // 提取 metadata + $metadata = [ + 'company_id' => $data['company_id'] ?? null, + 'platform_id' => $data['platform_id'] ?? null, + 'store_id' => $data['store_id'] ?? null, + 'unique_id' => $data['unique_id'] ?? null, + ]; + + // entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息。 + $entity = $parse->entityMatch($metadata); + + // message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。 + $entityMapResult = $parse->entityMap($data['raw_data'] ?? []); + + + // $entityMapResult 应该是一个内部元素为 Model 的集合 + Db::beginTransaction(); + + try { + // 假设 $entityMapResult 为集合 @Collection 对象 + $entityMapResult->each(function($el) use ($entity) { + $clone = clone $entity; + $clone->fill($el); + $clone->save(); + }); + + Db::commit(); + + // 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。 + return Result::ACK; + } catch (Throwable $error) { + dump($error->getMessage()); + Db::rollBack(); + return Result::NACK; + } } public function isEnable(): bool