setDescription('Test push message with Shopee Loop data'); $this->addOption( 'queue-type', 't', InputOption::VALUE_REQUIRED, 'Queue type: product, order, refund, inventory' ); } /** * 获取队列类型配置 */ protected function getQueueTypeConfig(string $type): array { $configs = [ 'order' => [ 'table' => 'wpic_shopee_order', 'column' => 'raw', 'producer' => ShopeeOrderProducer::class, ], 'product' => [ 'table' => 'wpic_shopee_item', 'column' => 'raw', 'producer' => ShopeeProductProducer::class, ], 'refund' => [ 'table' => 'wpic_shopee_return', 'column' => 'raw', 'producer' => ShopeeRefundProducer::class, ], 'inventory' => [ 'table' => 'wpic_shopee_inventory', 'column' => 'raw', 'producer' => ShopeeInventoryProducer::class, ], ]; return $configs[$type] ?? throw new Exception("Invalid queue type: {$type}"); } public function handle(): void { try { $queueType = $this->input->getOption('queue-type'); $validTypes = ['product', 'order', 'refund', 'inventory']; if (empty($queueType) || !in_array($queueType, $validTypes)) { $this->error(sprintf('--queue-type is required. Must be one of: %s', implode(', ', $validTypes))); return; } $config = $this->getQueueTypeConfig($queueType); $this->info(sprintf('Processing queue type: %s, table: %s', $queueType, $config['table'])); // 尝试更新 store 信息 $store = Store::where('platform_id', '=', 25) ->where('platform_store_id', '=', 1669343109 ) ->first(); if(!$store){ $this->error('店铺不存在!'); return; } $_platform_meta = [ 'partner_id' => 2006675, 'app_id' => '', 'merchant_id' => 3726273, 'name' => 'owala.sg', 'zone' => 'SG', ]; $platform_meta = !$store->platform_meta ? $_platform_meta : array_merge($store->platform_meta, $_platform_meta); $store->platform_meta = $platform_meta; $store->save(); // 从 raw 数据库连接获取数据 $records = Db::connection('raw') ->table($config['table']) ->where('store_id', '=', 1669343109) ->orderBy('id', 'desc') ->limit(4)->get($config['column'])->lazy(); if ($records->isEmpty()) { $this->warn(sprintf('No records found in %s table', $config['table'])); return; } $this->info(sprintf('Found %d records, processing...', $records->count())); // 获取 Producer 实例 $producer = $this->container->get(Producer::class); $producerClass = $config['producer']; // 每 2 条记录组成一条消息 - 实际生产环境需要增大这个值 $messageCount = 0; $records->chunk(2)->each(function (LazyCollection $collection) use ($producer, $producerClass, $config, &$messageCount) { $raw_data = $collection->pluck($config['column'])->map(function ($item) { return json_decode($item, true); })->toArray(); //@ATTENTION 生产环境需要注意, 暂时使用 Shopee Loop SG 进行测试 $messageData = [ 'company_id' => 171, 'platform_id' => 25, 'store_id' => 255, 'platform_store_id' => 1669343109, 'unique_id' => time() . '_' . uniqid(), 'raw_data' => $raw_data, // 包含 2 条原始记录 ]; // 创建并发送消息 $message = new $producerClass($messageData); $producer->produce($message); $messageCount++; $this->line(sprintf('Sent message %d with order IDs: %s', $messageCount, $messageData['unique_id'], )); }); $this->info(sprintf('Successfully sent %d messages to RabbitMQ', $messageCount)); return; } catch (Exception $e) { $this->error('Error pushing messages: ' . $e->getMessage()); $this->error($e->getTraceAsString()); return; } } }