diff --git a/backend/app/Command/AppMessageQueuePullTmall.php b/backend/app/Command/AppMessageQueuePullTmall.php new file mode 100644 index 0000000..c6448a0 --- /dev/null +++ b/backend/app/Command/AppMessageQueuePullTmall.php @@ -0,0 +1,92 @@ +setDescription('Manually pull and consume one message from orders.queue'); + } + + public function handle() + { + $connectionFactory = $this->container->get(ConnectionFactory::class); + $connection = $connectionFactory->getConnection('default_consumer'); + $channel = $connection->getChannel(); + + $queueName = 'orders.queue'; + + $this->info("Pulling one message from queue: {$queueName}"); + + // 使用 basic_get 手动拉取一条消息(非阻塞) + $message = $channel->basic_get($queueName, false); + + if ($message === null) { + $this->warn('No messages available in queue'); + return 0; + } + + try { + // 解析消息体 + $data = json_decode($message->body, true); + + $this->info('Received message:'); + $this->line(json_encode($data, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE)); + + // 创建消费者实例并处理消息 + $result = $this->orderConsumer->consumeMessage($data, $message); + + // 根据消费结果进行 ACK/NACK + switch ($result) { + case Result::ACK: + $channel->basic_ack($message->getDeliveryTag()); + $this->info('Message consumed successfully (ACK)'); + break; + case Result::NACK: + $channel->basic_nack($message->getDeliveryTag(), false, true); + $this->warn('Message consumption failed (NACK - requeue)'); + break; + case Result::DROP: + $channel->basic_nack($message->getDeliveryTag(), false, false); + $this->warn('Message dropped (NACK - no requeue)'); + break; + case Result::REQUEUE: + $channel->basic_nack($message->getDeliveryTag(), false, true); + $this->warn('Message requeued'); + break; + } + + return 0; + } catch (\Throwable $e) { + $this->error('Error consuming message: ' . $e->getMessage()); + $this->error($e->getTraceAsString()); + + // 发生异常时,NACK 并重新入队 + $channel->basic_nack($message->getDeliveryTag(), false, true); + return 1; + } + } +} diff --git a/backend/app/Command/AppMessageQueuePushTmall.php b/backend/app/Command/AppMessageQueuePushTmall.php new file mode 100644 index 0000000..515e09d --- /dev/null +++ b/backend/app/Command/AppMessageQueuePushTmall.php @@ -0,0 +1,84 @@ +setDescription('Test push message with Tmall km data'); + } + + public function handle() + { + try { + // 从 raw 数据库连接获取数据 + $orders = Db::connection('raw') + ->table('wpic_taobao_order') + ->orderBy('id', 'desc') + ->limit(10) + ->get() + ->toArray(); + + if (empty($orders)) { + $this->warn('No orders found in wpic_taobao_order table'); + return 0; + } + + $this->info(sprintf('Found %d orders, processing...', count($orders))); + + // 获取 Producer 实例 + $producer = $this->container->get(Producer::class); + + // 每 2 条记录组成一条消息 + $chunks = array_chunk($orders, 2); + $messageCount = 0; + + foreach ($chunks as $index => $chunk) { + // 构造消息数据(根据实际表结构调整字段映射) + $messageData = [ + 'company_id' => $chunk[0]->company_id ?? 'default_company', + 'platform_id' => $chunk[0]->platform_id ?? 'tmall', + 'store_id' => $chunk[0]->store_id ?? 'default_store', + 'unique_id' => implode('_', array_column($chunk, 'id')), + 'raw_data' => $chunk, // 包含 2 条原始记录 + ]; + + // 创建并发送消息 + $message = new TmallOrderProducer($messageData); + $producer->produce($message); + + $messageCount++; + $this->line(sprintf('Sent message %d with order IDs: %s', + $messageCount, + $messageData['unique_id'] + )); + } + + $this->info(sprintf('Successfully sent %d messages to RabbitMQ', $messageCount)); + return 0; + + } catch (Exception $e) { + $this->error('Error pushing messages: ' . $e->getMessage()); + $this->error($e->getTraceAsString()); + return 1; + } + } +} diff --git a/backend/app/Command/AppQueuePushKm.php b/backend/app/Command/AppQueuePushKm.php deleted file mode 100644 index cbaf649..0000000 --- a/backend/app/Command/AppQueuePushKm.php +++ /dev/null @@ -1,34 +0,0 @@ -setDescription('Test push message with KM data'); - } - - public function handle() - { - - } -} diff --git a/backend/app/Platform/AdapterInterface.php b/backend/app/Platform/AdapterInterface.php new file mode 100644 index 0000000..09cef91 --- /dev/null +++ b/backend/app/Platform/AdapterInterface.php @@ -0,0 +1,10 @@ + 2, // 持久化消息 - ]; - - /** - * 构造消息 - * - * @param array $data 订单数据 - * @return string - */ - public function __construct(array $data = []) - { - if (!empty($data)) { - $this->payload = $this->buildMessage($data); - } - } - - /** - * 构建消息格式 - * - * @param array $data 原始订单数据 - * @return array - */ - protected function buildMessage(array $data): array - { - // 根据 RabbitMQ.md 中定义的消息格式规范 - return [ - 'message_id' => $this->generateMessageId($data), - 'timestamp' => date('c'), // ISO 8601 格式 - 'platform' => 'tmall', - 'data_type' => 'order', - 'metadata' => [ - 'platform_id' => $data['platform_id'] ?? null, - 'company_id' => $data['company_id'] ?? null, - 'store_id' => $data['store_id'] ?? null, - 'source_system' => 'tmall-open-api', - 'retry_count' => 0, - 'data_version' => $data['data_version'] ?? time(), - ], - 'data' => $data['raw_data'], - ]; - } - - /** - * 生成消息ID - * - * 格式: {prefix}#{app_id}#{company_id}#{platform_id}#{store_id}#{entity_type}#{entity_id} - * - * @param array $data - * @return string - */ - protected function generateMessageId(array $data): string - { - $company_id = $data['company_id']; - $platform_id = $data['platform_id']; - $store_id = $data['store_id']; - $entity_type = 'order'; - $unique_id = $data['unique_id']; - - return sprintf( - '%s#%s#%s#%s#%s', - $company_id, - $platform_id, - $store_id, - $entity_type, - $unique_id - ); - } -} diff --git a/backend/app/Platform/Tmall/Producer/TmallOrderProducer.php b/backend/app/Platform/Tmall/Producer/TmallOrderProducer.php new file mode 100644 index 0000000..991d77b --- /dev/null +++ b/backend/app/Platform/Tmall/Producer/TmallOrderProducer.php @@ -0,0 +1,16 @@ +