setDescription('Test push message with Tmall km data'); } public function handle(): void { try { // 从 raw 数据库连接获取数据 $orders = Db::connection('raw') ->table('wpic_taobao_order')->orderBy('id', 'desc') ->limit(4)->get('order_raw')->lazy(); // dump($orders->first()); // return; if ($orders->isEmpty()) { $this->warn('No orders found in wpic_taobao_order table'); return; } $this->info(sprintf('Found %d orders, processing...', $orders->count())); // 获取 Producer 实例 $producer = $this->container->get(Producer::class); // 每 2 条记录组成一条消息 - 实际生产环境需要增大这个值 // $orders->chunk(2)->each(function($collection) use ($producer) { $messageCount = 0; $orders->chunk(2)->each(function (LazyCollection $collection) use ($producer, &$messageCount) { $order_data = $collection->pluck('order_raw')->map(function ($item) { return json_decode($item, true); })->toArray(); //@ATTENTION 生产环境需要注意, 暂时使用 KM 进行测试 $messageData = [ 'company_id' => 188, 'platform_id' => 2, 'store_id' => 292, 'unique_id' => uniqid() . '_' . time(), 'raw_data' => $order_data, // 包含 2 条原始记录 ]; // 创建并发送消息 $message = new TmallOrderProducer($messageData); $producer->produce($message); $messageCount++; $this->line(sprintf('Sent message %d with order IDs: %s', $messageCount, $messageData['unique_id'], )); }); $this->info(sprintf('Successfully sent %d messages to RabbitMQ', $messageCount)); return; } catch (Exception $e) { $this->error('Error pushing messages: ' . $e->getMessage()); $this->error($e->getTraceAsString()); return; } } }