diff --git a/backend/app/Command/AppMessageQueuePushShopee.php b/backend/app/Command/AppMessageQueuePushShopee.php index f0bac0c..b4ebb39 100644 --- a/backend/app/Command/AppMessageQueuePushShopee.php +++ b/backend/app/Command/AppMessageQueuePushShopee.php @@ -9,9 +9,13 @@ use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Annotation\Command; use Psr\Container\ContainerInterface; use Hyperf\DbConnection\Db; -use App\Platform\Tmall\Producer\TmallOrderProducer; +use App\Platform\Shopee\Producer\ShopeeOrderProducer; +use App\Platform\Shopee\Producer\ShopeeProductProducer; +use App\Platform\Shopee\Producer\ShopeeRefundProducer; +use App\Platform\Shopee\Producer\ShopeeInventoryProducer; use Hyperf\Amqp\Producer; use App\Model\Store; +use Symfony\Component\Console\Input\InputOption; use Exception; #[Command] @@ -26,16 +30,63 @@ class AppMessageQueuePushShopee extends HyperfCommand { parent::configure(); $this->setDescription('Test push message with Shopee Loop data'); + $this->addOption( + 'queue-type', + 't', + InputOption::VALUE_REQUIRED, + 'Queue type: product, order, refund, inventory' + ); + } + + /** + * 获取队列类型配置 + */ + protected function getQueueTypeConfig(string $type): array + { + $configs = [ + 'order' => [ + 'table' => 'wpic_shopee_order', + 'column' => 'raw', + 'producer' => ShopeeOrderProducer::class, + ], + 'product' => [ + 'table' => 'wpic_shopee_item', + 'column' => 'raw', + 'producer' => ShopeeProductProducer::class, + ], + 'refund' => [ + 'table' => 'wpic_shopee_return', + 'column' => 'raw', + 'producer' => ShopeeRefundProducer::class, + ], + 'inventory' => [ + 'table' => 'wpic_shopee_inventory', + 'column' => 'raw', + 'producer' => ShopeeInventoryProducer::class, + ], + ]; + + return $configs[$type] ?? throw new Exception("Invalid queue type: {$type}"); } public function handle(): void { try { + $queueType = $this->input->getOption('queue-type'); + $validTypes = ['product', 'order', 'refund', 'inventory']; + + if (empty($queueType) || !in_array($queueType, $validTypes)) { + $this->error(sprintf('--queue-type is required. Must be one of: %s', implode(', ', $validTypes))); + return; + } + + $config = $this->getQueueTypeConfig($queueType); + $this->info(sprintf('Processing queue type: %s, table: %s', $queueType, $config['table'])); // 尝试更新 store 信息 $store = Store::where('platform_id', '=', 25) - ->where('platform_store_id', '=', 931327381 ) + ->where('platform_store_id', '=', 1669343109 ) ->first(); if(!$store){ @@ -47,7 +98,7 @@ class AppMessageQueuePushShopee extends HyperfCommand 'partner_id' => 2006675, 'app_id' => '', 'merchant_id' => 3726273, - 'name' => 'Loop Earplugs.SG', + 'name' => 'owala.sg', 'zone' => 'SG', ]; @@ -55,36 +106,31 @@ class AppMessageQueuePushShopee extends HyperfCommand $store->platform_meta = $platform_meta; $store->save(); - return; - // 从 raw 数据库连接获取数据 - $orders = Db::connection('raw') - ->table('wpic_shopee_order') - ->where('store_id', '=', 931327381) + $records = Db::connection('raw') + ->table($config['table']) + ->where('store_id', '=', 1669343109) ->orderBy('id', 'desc') - ->limit(4)->get('order_raw')->lazy(); - - // dump($orders->first()); - // return; - - if ($orders->isEmpty()) { - $this->warn('No orders found in wpic_shopee_order table'); + ->limit(4)->get($config['column'])->lazy(); + + if ($records->isEmpty()) { + $this->warn(sprintf('No records found in %s table', $config['table'])); return; } - - $this->info(sprintf('Found %d orders, processing...', $orders->count())); - + + $this->info(sprintf('Found %d records, processing...', $records->count())); + // 获取 Producer 实例 $producer = $this->container->get(Producer::class); - - // 每 2 条记录组成一条消息 - 实际生产环境需要增大这个值 - // $orders->chunk(2)->each(function($collection) use ($producer) { - - $messageCount = 0; - - $orders->chunk(2)->each(function (LazyCollection $collection) use ($producer, &$messageCount) { + $producerClass = $config['producer']; - $order_data = $collection->pluck('order_raw')->map(function ($item) { + // 每 2 条记录组成一条消息 - 实际生产环境需要增大这个值 + + $messageCount = 0; + + $records->chunk(2)->each(function (LazyCollection $collection) use ($producer, $producerClass, $config, &$messageCount) { + + $raw_data = $collection->pluck($config['column'])->map(function ($item) { return json_decode($item, true); })->toArray(); @@ -93,13 +139,13 @@ class AppMessageQueuePushShopee extends HyperfCommand 'company_id' => 171, 'platform_id' => 25, 'store_id' => 255, - 'platform_store_id' => 931327381, - 'unique_id' => uniqid() . '_' . time(), - 'raw_data' => $order_data, // 包含 2 条原始记录 + 'platform_store_id' => 1669343109, + 'unique_id' => time() . '_' . uniqid(), + 'raw_data' => $raw_data, // 包含 2 条原始记录 ]; // 创建并发送消息 - $message = new ShopeeOrderProducer($messageData); + $message = new $producerClass($messageData); $producer->produce($message); $messageCount++;