setDescription('Test push message with Tmall km data'); $this->addOption( 'queue-type', 't', InputOption::VALUE_REQUIRED, 'Queue type: product, order, refund, inventory' ); } /** * 获取队列类型配置 */ protected function getQueueTypeConfig(string $type): array { $configs = [ 'order' => [ 'table' => 'wpic_taobao_order', 'column' => 'order_raw', 'producer' => TmallOrderProducer::class, ], 'product' => [ 'table' => 'wpic_taobao_item', 'column' => 'item_raw', 'producer' => TmallProductProducer::class, ], 'refund' => [ 'table' => 'wpic_taobao_return_item', 'column' => 'refund_raw', 'producer' => TmallRefundProducer::class, ] ]; return $configs[$type] ?? throw new Exception("Invalid queue type: {$type}"); } public function handle(): void { try { $queueType = $this->input->getOption('queue-type'); $validTypes = ['product', 'order', 'refund', 'inventory']; if (empty($queueType) || !in_array($queueType, $validTypes)) { $this->error(sprintf('--queue-type is required. Must be one of: %s', implode(', ', $validTypes))); return; } $config = $this->getQueueTypeConfig($queueType); $this->info(sprintf('Processing queue type: %s, table: %s', $queueType, $config['table'])); // 从 raw 数据库连接获取数据 $records = Db::connection('raw') ->table($config['table']) ->orderBy('id', 'desc') ->limit(4)->get($config['column'])->lazy(); if ($records->isEmpty()) { $this->warn(sprintf('No records found in %s table', $config['table'])); return; } $this->info(sprintf('Found %d records, processing...', $records->count())); // 获取 Producer 实例 $producer = $this->container->get(Producer::class); $producerClass = $config['producer']; $messageCount = 0; $records->chunk(2)->each(function (LazyCollection $collection) use ($producer, $producerClass, $config, &$messageCount) { $raw_data = $collection->pluck($config['column'])->map(function ($item) { return json_decode($item, true); })->toArray(); //@ATTENTION 生产环境需要注意, 暂时使用 KM 进行测试 $messageData = [ 'company_id' => 188, 'platform_id' => 2, 'store_id' => 292, 'unique_id' => time() . '_' . uniqid(), 'raw_data' => $raw_data, // 包含 2 条原始记录 ]; // 创建并发送消息 $message = new $producerClass($messageData); $producer->produce($message); $messageCount++; $this->line(sprintf('Sent message %d with unique ID: %s', $messageCount, $messageData['unique_id'], )); }); $this->info(sprintf('Successfully sent %d messages to RabbitMQ', $messageCount)); return; } catch (Exception $e) { $this->error('Error pushing messages: ' . $e->getMessage()); $this->error($e->getTraceAsString()); return; } } }