update mq status controller

This commit is contained in:
2026-03-17 10:37:02 +08:00
parent fcd376cab4
commit a5189f46ca
2 changed files with 88 additions and 89 deletions
+86 -86
View File
@@ -4,13 +4,15 @@ declare(strict_types=1);
namespace App\Service; namespace App\Service;
use GuzzleHttp\Client;
use GuzzleHttp\HandlerStack;
use Hyperf\Contract\ConfigInterface; use Hyperf\Contract\ConfigInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection; use Hyperf\Guzzle\CoroutineHandler;
/** /**
* 消息队列状态服务 * 消息队列状态服务
* *
* 提供 RabbitMQ 队列状态查询能力,供 CLI 命令和 API 接口共用 * 通过 RabbitMQ Management HTTP API 查询队列状态,供 CLI 命令和 API 接口共用
*/ */
class MqStatusService class MqStatusService
{ {
@@ -19,6 +21,8 @@ class MqStatusService
*/ */
public const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory']; public const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory'];
private ?Client $httpClient = null;
public function __construct( public function __construct(
protected readonly ConfigInterface $config, protected readonly ConfigInterface $config,
) {} ) {}
@@ -41,81 +45,55 @@ class MqStatusService
*/ */
public function getStatus(?string $queue_type = null): array public function getStatus(?string $queue_type = null): array
{ {
$consumer_config = $this->config->get('amqp.default_consumer'); $queue_types = $queue_type ? [$queue_type] : self::QUEUE_TYPES;
$connection = new AMQPStreamConnection( $business_queues = [];
$consumer_config['host'], $retry_queues = [];
$consumer_config['port'], $total_messages = 0;
$consumer_config['user'], $total_consumers = 0;
$consumer_config['password'],
$consumer_config['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
$consumer_config['params']['connection_timeout'] ?? 3.0,
$consumer_config['params']['read_write_timeout'] ?? 3.0,
null,
$consumer_config['params']['keepalive'] ?? false,
$consumer_config['params']['heartbeat'] ?? 0
);
$channel = $connection->channel(); foreach ($queue_types as $type) {
$group_data = $this->fetchQueueGroupData($type);
try { foreach ($group_data as $queue_info) {
$queue_types = $queue_type ? [$queue_type] : self::QUEUE_TYPES; if (str_ends_with($queue_info['queue'], '.retry.queue')) {
$retry_queues[] = $queue_info;
} else {
$business_queues[] = $queue_info;
}
$business_queues = []; if (is_numeric($queue_info['messages'])) {
$retry_queues = []; $total_messages += $queue_info['messages'];
$total_messages = 0; }
$total_consumers = 0; if (is_numeric($queue_info['consumers'])) {
$total_consumers += $queue_info['consumers'];
foreach ($queue_types as $type) {
$group_data = $this->fetchQueueGroupData($channel, $type);
foreach ($group_data as $queue_info) {
if (str_ends_with($queue_info['queue'], '.retry.queue')) {
$retry_queues[] = $queue_info;
} else {
$business_queues[] = $queue_info;
}
if (is_numeric($queue_info['messages'])) {
$total_messages += $queue_info['messages'];
}
if (is_numeric($queue_info['consumers'])) {
$total_consumers += $queue_info['consumers'];
}
} }
} }
// 错误队列仅在未筛选时返回
$error_queue = [];
if ($queue_type === null) {
$error_queue = $this->fetchQueueData($channel, 'errors.queue');
if (is_numeric($error_queue['messages'])) {
$total_messages += $error_queue['messages'];
}
if (is_numeric($error_queue['consumers'])) {
$total_consumers += $error_queue['consumers'];
}
}
return [
'business_queues' => $business_queues,
'retry_queues' => $retry_queues,
'error_queue' => $error_queue,
'summary' => [
'total_messages' => $total_messages,
'total_consumers' => $total_consumers,
],
'fetched_at' => date('Y-m-d H:i:s'),
];
} finally {
$channel->close();
$connection->close();
} }
// 错误队列仅在未筛选时返回
$error_queue = [];
if ($queue_type === null) {
$error_queue = $this->fetchQueueData('errors.queue');
if (is_numeric($error_queue['messages'])) {
$total_messages += $error_queue['messages'];
}
if (is_numeric($error_queue['consumers'])) {
$total_consumers += $error_queue['consumers'];
}
}
return [
'business_queues' => $business_queues,
'retry_queues' => $retry_queues,
'error_queue' => $error_queue,
'summary' => [
'total_messages' => $total_messages,
'total_consumers' => $total_consumers,
],
'fetched_at' => date('Y-m-d H:i:s'),
];
} }
/** /**
@@ -123,7 +101,7 @@ class MqStatusService
* *
* @return array<array{queue: string, messages: int|string, consumers: int|string, status: string}> * @return array<array{queue: string, messages: int|string, consumers: int|string, status: string}>
*/ */
private function fetchQueueGroupData(mixed $channel, string $type): array private function fetchQueueGroupData(string $type): array
{ {
$queues = [ $queues = [
"{$type}.queue", "{$type}.queue",
@@ -132,36 +110,31 @@ class MqStatusService
$group_data = []; $group_data = [];
foreach ($queues as $queue_name) { foreach ($queues as $queue_name) {
$group_data[] = $this->fetchQueueData($channel, $queue_name); $group_data[] = $this->fetchQueueData($queue_name);
} }
return $group_data; return $group_data;
} }
/** /**
* 获取单个队列的数据 * 通过 Management API 获取单个队列的数据
* *
* @return array{queue: string, messages: int|string, consumers: int|string, status: string} * @return array{queue: string, messages: int|string, consumers: int|string, status: string}
*/ */
private function fetchQueueData(mixed $channel, string $queue_name): array private function fetchQueueData(string $queue_name): array
{ {
try { try {
// 使用 passive=true 获取队列信息而不创建队列 $vhost = urlencode($this->config->get('amqp.default_consumer.vhost', '/'));
[, $message_count, $consumer_count] = $channel->queue_declare( $response = $this->getHttpClient()->get("/api/queues/{$vhost}/{$queue_name}");
$queue_name, $data = json_decode($response->getBody()->getContents(), true);
true, // passive
true, // durable
false, // exclusive
false // auto_delete
);
return [ return [
'queue' => $queue_name, 'queue' => $queue_name,
'messages' => $message_count, 'messages' => $data['messages'] ?? 0,
'consumers' => $consumer_count, 'consumers' => $data['consumers'] ?? 0,
'status' => $this->getQueueStatus($message_count), 'status' => $this->getQueueStatus($data['messages'] ?? 0),
]; ];
} catch (\Throwable $e) { } catch (\Throwable) {
return [ return [
'queue' => $queue_name, 'queue' => $queue_name,
'messages' => 'N/A', 'messages' => 'N/A',
@@ -189,4 +162,31 @@ class MqStatusService
} }
return 'empty'; return 'empty';
} }
/**
* 获取 Management API HTTP 客户端(懒初始化,协程安全)
*/
private function getHttpClient(): Client
{
if ($this->httpClient === null) {
$management = $this->config->get('amqp.management', []);
$this->httpClient = new Client([
'base_uri' => sprintf(
'http://%s:%d',
$management['host'] ?? 'localhost',
$management['port'] ?? 15672,
),
'auth' => [
$management['user'] ?? 'guest',
$management['password'] ?? 'guest',
],
'handler' => HandlerStack::create(new CoroutineHandler()),
'timeout' => 5,
'connect_timeout' => 3,
]);
}
return $this->httpClient;
}
} }
@@ -31,7 +31,7 @@ class MqStatusControllerTest extends TestCase
// 连接不可用时返回 500,仍验证响应格式 // 连接不可用时返回 500,仍验证响应格式
$response->assertJsonStructure(['code', 'message', 'data']); $response->assertJsonStructure(['code', 'message', 'data']);
$this->assertSame(500, $response->json('code')); $this->assertSame(500, $response->json('code'));
return; $this->markTestSkipped('RabbitMQ not available');
} }
$response->assertStatus(200); $response->assertStatus(200);
@@ -61,8 +61,7 @@ class MqStatusControllerTest extends TestCase
$status = $response->getStatusCode(); $status = $response->getStatusCode();
if ($status === 500) { if ($status === 500) {
// RabbitMQ 不可用,跳过具体数据验证 $this->markTestSkipped('RabbitMQ not available');
return;
} }
$response->assertStatus(200); $response->assertStatus(200);