setDescription('Manually pull and consume one message from orders.queue'); } public function handle() { $connectionFactory = $this->container->get(ConnectionFactory::class); $connection = $connectionFactory->getConnection('default_consumer'); $channel = $connection->getChannel(); $queueName = 'orders.queue'; $this->info("Pulling one message from queue: {$queueName}"); // 使用 basic_get 手动拉取一条消息(非阻塞) $message = $channel->basic_get($queueName, false); if ($message === null) { $this->warn('No messages available in queue'); return 0; } try { // 解析消息体 $data = json_decode($message->body, true); $this->info('Received message:'); $this->line(json_encode($data, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE)); // 创建消费者实例并处理消息 $result = $this->orderConsumer->consumeMessage($data, $message); // 根据消费结果进行 ACK/NACK switch ($result) { case Result::ACK: $channel->basic_ack($message->getDeliveryTag()); $this->info('Message consumed successfully (ACK)'); break; case Result::NACK: $channel->basic_nack($message->getDeliveryTag(), false, true); $this->warn('Message consumption failed (NACK - requeue)'); break; case Result::DROP: $channel->basic_nack($message->getDeliveryTag(), false, false); $this->warn('Message dropped (NACK - no requeue)'); break; case Result::REQUEUE: $channel->basic_nack($message->getDeliveryTag(), false, true); $this->warn('Message requeued'); break; } return 0; } catch (\Throwable $e) { $this->error('Error consuming message: ' . $e->getMessage()); $this->error($e->getTraceAsString()); // 发生异常时,NACK 并重新入队 $channel->basic_nack($message->getDeliveryTag(), false, true); return 1; } } }