setDescription('Test push message with Tmall km data'); } public function handle() { try { // 从 raw 数据库连接获取数据 $orders = Db::connection('raw') ->table('wpic_taobao_order') ->orderBy('id', 'desc') ->limit(10) ->get() ->toArray(); if (empty($orders)) { $this->warn('No orders found in wpic_taobao_order table'); return 0; } $this->info(sprintf('Found %d orders, processing...', count($orders))); // 获取 Producer 实例 $producer = $this->container->get(Producer::class); // 每 2 条记录组成一条消息 $chunks = array_chunk($orders, 2); $messageCount = 0; foreach ($chunks as $index => $chunk) { // 构造消息数据(根据实际表结构调整字段映射) $messageData = [ 'company_id' => $chunk[0]->company_id ?? 'default_company', 'platform_id' => $chunk[0]->platform_id ?? 'tmall', 'store_id' => $chunk[0]->store_id ?? 'default_store', 'unique_id' => implode('_', array_column($chunk, 'id')), 'raw_data' => $chunk, // 包含 2 条原始记录 ]; // 创建并发送消息 $message = new TmallOrderProducer($messageData); $producer->produce($message); $messageCount++; $this->line(sprintf('Sent message %d with order IDs: %s', $messageCount, $messageData['unique_id'] )); } $this->info(sprintf('Successfully sent %d messages to RabbitMQ', $messageCount)); return 0; } catch (Exception $e) { $this->error('Error pushing messages: ' . $e->getMessage()); $this->error($e->getTraceAsString()); return 1; } } }