From ec9b3ca500325b3555c8a237b0ec7d5fae513297 Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Tue, 24 Feb 2026 15:52:30 +0800 Subject: [PATCH] update tmall --- .../app/Command/AppMessageQueuePushTmall.php | 103 +++++++++++++----- 1 file changed, 73 insertions(+), 30 deletions(-) diff --git a/backend/app/Command/AppMessageQueuePushTmall.php b/backend/app/Command/AppMessageQueuePushTmall.php index 3d23a1f..2f0da19 100644 --- a/backend/app/Command/AppMessageQueuePushTmall.php +++ b/backend/app/Command/AppMessageQueuePushTmall.php @@ -10,7 +10,11 @@ use Hyperf\Command\Annotation\Command; use Psr\Container\ContainerInterface; use Hyperf\DbConnection\Db; use App\Platform\Tmall\Producer\TmallOrderProducer; +use App\Platform\Tmall\Producer\TmallProductProducer; +use App\Platform\Tmall\Producer\TmallRefundProducer; +use App\Platform\Tmall\Producer\TmallInventoryProducer; use Hyperf\Amqp\Producer; +use Symfony\Component\Console\Input\InputOption; use Exception; #[Command] @@ -20,42 +24,81 @@ class AppMessageQueuePushTmall extends HyperfCommand { parent::__construct('app:mq-push:tmall'); } - + public function configure() { parent::configure(); $this->setDescription('Test push message with Tmall km data'); + $this->addOption( + 'queue-type', + 't', + InputOption::VALUE_REQUIRED, + 'Queue type: product, order, refund, inventory' + ); } - + + /** + * 获取队列类型配置 + */ + protected function getQueueTypeConfig(string $type): array + { + $configs = [ + 'order' => [ + 'table' => 'wpic_taobao_order', + 'column' => 'order_raw', + 'producer' => TmallOrderProducer::class, + ], + 'product' => [ + 'table' => 'wpic_taobao_item', + 'column' => 'item_raw', + 'producer' => TmallProductProducer::class, + ], + 'refund' => [ + 'table' => 'wpic_taobao_return_item', + 'column' => 'refund_raw', + 'producer' => TmallRefundProducer::class, + ] + ]; + + return $configs[$type] ?? throw new Exception("Invalid queue type: {$type}"); + } + public function handle(): void { try { - // 从 raw 数据库连接获取数据 - $orders = Db::connection('raw') - ->table('wpic_taobao_order')->orderBy('id', 'desc') - ->limit(4)->get('order_raw')->lazy(); - - // dump($orders->first()); - // return; - - if ($orders->isEmpty()) { - $this->warn('No orders found in wpic_taobao_order table'); + $queueType = $this->input->getOption('queue-type'); + $validTypes = ['product', 'order', 'refund', 'inventory']; + + if (empty($queueType) || !in_array($queueType, $validTypes)) { + $this->error(sprintf('--queue-type is required. Must be one of: %s', implode(', ', $validTypes))); return; } - - $this->info(sprintf('Found %d orders, processing...', $orders->count())); - + + $config = $this->getQueueTypeConfig($queueType); + $this->info(sprintf('Processing queue type: %s, table: %s', $queueType, $config['table'])); + + // 从 raw 数据库连接获取数据 + $records = Db::connection('raw') + ->table($config['table']) + ->orderBy('id', 'desc') + ->limit(4)->get($config['column'])->lazy(); + + if ($records->isEmpty()) { + $this->warn(sprintf('No records found in %s table', $config['table'])); + return; + } + + $this->info(sprintf('Found %d records, processing...', $records->count())); + // 获取 Producer 实例 $producer = $this->container->get(Producer::class); - - // 每 2 条记录组成一条消息 - 实际生产环境需要增大这个值 - // $orders->chunk(2)->each(function($collection) use ($producer) { - - $messageCount = 0; - - $orders->chunk(2)->each(function (LazyCollection $collection) use ($producer, &$messageCount) { + $producerClass = $config['producer']; - $order_data = $collection->pluck('order_raw')->map(function ($item) { + $messageCount = 0; + + $records->chunk(2)->each(function (LazyCollection $collection) use ($producer, $producerClass, $config, &$messageCount) { + + $raw_data = $collection->pluck($config['column'])->map(function ($item) { return json_decode($item, true); })->toArray(); @@ -64,26 +107,26 @@ class AppMessageQueuePushTmall extends HyperfCommand 'company_id' => 188, 'platform_id' => 2, 'store_id' => 292, - 'unique_id' => uniqid() . '_' . time(), - 'raw_data' => $order_data, // 包含 2 条原始记录 + 'unique_id' => time() . '_' . uniqid(), + 'raw_data' => $raw_data, // 包含 2 条原始记录 ]; // 创建并发送消息 - $message = new TmallOrderProducer($messageData); + $message = new $producerClass($messageData); $producer->produce($message); $messageCount++; - $this->line(sprintf('Sent message %d with order IDs: %s', + $this->line(sprintf('Sent message %d with unique ID: %s', $messageCount, $messageData['unique_id'], )); }); - - + + $this->info(sprintf('Successfully sent %d messages to RabbitMQ', $messageCount)); return; - + } catch (Exception $e) { $this->error('Error pushing messages: ' . $e->getMessage()); $this->error($e->getTraceAsString());