From a5189f46ca26cf1034c7226d72bc616fec8ee0f7 Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Tue, 17 Mar 2026 10:37:02 +0800 Subject: [PATCH] update mq status controller --- backend/app/Service/MqStatusService.php | 172 +++++++++--------- .../System/MqStatusControllerTest.php | 5 +- 2 files changed, 88 insertions(+), 89 deletions(-) diff --git a/backend/app/Service/MqStatusService.php b/backend/app/Service/MqStatusService.php index fc6acfd..7aa8a91 100644 --- a/backend/app/Service/MqStatusService.php +++ b/backend/app/Service/MqStatusService.php @@ -4,13 +4,15 @@ declare(strict_types=1); namespace App\Service; +use GuzzleHttp\Client; +use GuzzleHttp\HandlerStack; use Hyperf\Contract\ConfigInterface; -use PhpAmqpLib\Connection\AMQPStreamConnection; +use Hyperf\Guzzle\CoroutineHandler; /** * 消息队列状态服务 * - * 提供 RabbitMQ 队列状态查询能力,供 CLI 命令和 API 接口共用 + * 通过 RabbitMQ Management HTTP API 查询队列状态,供 CLI 命令和 API 接口共用 */ class MqStatusService { @@ -19,6 +21,8 @@ class MqStatusService */ public const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory']; + private ?Client $httpClient = null; + public function __construct( protected readonly ConfigInterface $config, ) {} @@ -41,81 +45,55 @@ class MqStatusService */ public function getStatus(?string $queue_type = null): array { - $consumer_config = $this->config->get('amqp.default_consumer'); + $queue_types = $queue_type ? [$queue_type] : self::QUEUE_TYPES; - $connection = new AMQPStreamConnection( - $consumer_config['host'], - $consumer_config['port'], - $consumer_config['user'], - $consumer_config['password'], - $consumer_config['vhost'], - false, - 'AMQPLAIN', - null, - 'en_US', - $consumer_config['params']['connection_timeout'] ?? 3.0, - $consumer_config['params']['read_write_timeout'] ?? 3.0, - null, - $consumer_config['params']['keepalive'] ?? false, - $consumer_config['params']['heartbeat'] ?? 0 - ); + $business_queues = []; + $retry_queues = []; + $total_messages = 0; + $total_consumers = 0; - $channel = $connection->channel(); + foreach ($queue_types as $type) { + $group_data = $this->fetchQueueGroupData($type); - try { - $queue_types = $queue_type ? [$queue_type] : self::QUEUE_TYPES; + foreach ($group_data as $queue_info) { + if (str_ends_with($queue_info['queue'], '.retry.queue')) { + $retry_queues[] = $queue_info; + } else { + $business_queues[] = $queue_info; + } - $business_queues = []; - $retry_queues = []; - $total_messages = 0; - $total_consumers = 0; - - foreach ($queue_types as $type) { - $group_data = $this->fetchQueueGroupData($channel, $type); - - foreach ($group_data as $queue_info) { - if (str_ends_with($queue_info['queue'], '.retry.queue')) { - $retry_queues[] = $queue_info; - } else { - $business_queues[] = $queue_info; - } - - if (is_numeric($queue_info['messages'])) { - $total_messages += $queue_info['messages']; - } - if (is_numeric($queue_info['consumers'])) { - $total_consumers += $queue_info['consumers']; - } + if (is_numeric($queue_info['messages'])) { + $total_messages += $queue_info['messages']; + } + if (is_numeric($queue_info['consumers'])) { + $total_consumers += $queue_info['consumers']; } } - - // 错误队列仅在未筛选时返回 - $error_queue = []; - if ($queue_type === null) { - $error_queue = $this->fetchQueueData($channel, 'errors.queue'); - - if (is_numeric($error_queue['messages'])) { - $total_messages += $error_queue['messages']; - } - if (is_numeric($error_queue['consumers'])) { - $total_consumers += $error_queue['consumers']; - } - } - - return [ - 'business_queues' => $business_queues, - 'retry_queues' => $retry_queues, - 'error_queue' => $error_queue, - 'summary' => [ - 'total_messages' => $total_messages, - 'total_consumers' => $total_consumers, - ], - 'fetched_at' => date('Y-m-d H:i:s'), - ]; - } finally { - $channel->close(); - $connection->close(); } + + // 错误队列仅在未筛选时返回 + $error_queue = []; + if ($queue_type === null) { + $error_queue = $this->fetchQueueData('errors.queue'); + + if (is_numeric($error_queue['messages'])) { + $total_messages += $error_queue['messages']; + } + if (is_numeric($error_queue['consumers'])) { + $total_consumers += $error_queue['consumers']; + } + } + + return [ + 'business_queues' => $business_queues, + 'retry_queues' => $retry_queues, + 'error_queue' => $error_queue, + 'summary' => [ + 'total_messages' => $total_messages, + 'total_consumers' => $total_consumers, + ], + 'fetched_at' => date('Y-m-d H:i:s'), + ]; } /** @@ -123,7 +101,7 @@ class MqStatusService * * @return array */ - private function fetchQueueGroupData(mixed $channel, string $type): array + private function fetchQueueGroupData(string $type): array { $queues = [ "{$type}.queue", @@ -132,36 +110,31 @@ class MqStatusService $group_data = []; foreach ($queues as $queue_name) { - $group_data[] = $this->fetchQueueData($channel, $queue_name); + $group_data[] = $this->fetchQueueData($queue_name); } return $group_data; } /** - * 获取单个队列的数据 + * 通过 Management API 获取单个队列的数据 * * @return array{queue: string, messages: int|string, consumers: int|string, status: string} */ - private function fetchQueueData(mixed $channel, string $queue_name): array + private function fetchQueueData(string $queue_name): array { try { - // 使用 passive=true 获取队列信息而不创建队列 - [, $message_count, $consumer_count] = $channel->queue_declare( - $queue_name, - true, // passive - true, // durable - false, // exclusive - false // auto_delete - ); + $vhost = urlencode($this->config->get('amqp.default_consumer.vhost', '/')); + $response = $this->getHttpClient()->get("/api/queues/{$vhost}/{$queue_name}"); + $data = json_decode($response->getBody()->getContents(), true); return [ 'queue' => $queue_name, - 'messages' => $message_count, - 'consumers' => $consumer_count, - 'status' => $this->getQueueStatus($message_count), + 'messages' => $data['messages'] ?? 0, + 'consumers' => $data['consumers'] ?? 0, + 'status' => $this->getQueueStatus($data['messages'] ?? 0), ]; - } catch (\Throwable $e) { + } catch (\Throwable) { return [ 'queue' => $queue_name, 'messages' => 'N/A', @@ -189,4 +162,31 @@ class MqStatusService } return 'empty'; } + + /** + * 获取 Management API HTTP 客户端(懒初始化,协程安全) + */ + private function getHttpClient(): Client + { + if ($this->httpClient === null) { + $management = $this->config->get('amqp.management', []); + + $this->httpClient = new Client([ + 'base_uri' => sprintf( + 'http://%s:%d', + $management['host'] ?? 'localhost', + $management['port'] ?? 15672, + ), + 'auth' => [ + $management['user'] ?? 'guest', + $management['password'] ?? 'guest', + ], + 'handler' => HandlerStack::create(new CoroutineHandler()), + 'timeout' => 5, + 'connect_timeout' => 3, + ]); + } + + return $this->httpClient; + } } diff --git a/backend/test/Cases/Integration/System/MqStatusControllerTest.php b/backend/test/Cases/Integration/System/MqStatusControllerTest.php index dcafbe6..8e902c1 100644 --- a/backend/test/Cases/Integration/System/MqStatusControllerTest.php +++ b/backend/test/Cases/Integration/System/MqStatusControllerTest.php @@ -31,7 +31,7 @@ class MqStatusControllerTest extends TestCase // 连接不可用时返回 500,仍验证响应格式 $response->assertJsonStructure(['code', 'message', 'data']); $this->assertSame(500, $response->json('code')); - return; + $this->markTestSkipped('RabbitMQ not available'); } $response->assertStatus(200); @@ -61,8 +61,7 @@ class MqStatusControllerTest extends TestCase $status = $response->getStatusCode(); if ($status === 500) { - // RabbitMQ 不可用,跳过具体数据验证 - return; + $this->markTestSkipped('RabbitMQ not available'); } $response->assertStatus(200);