Files
datahub/backend/app/Service/MqStatusService.php
2026-03-17 10:37:02 +08:00

193 lines
5.6 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace App\Service;
use GuzzleHttp\Client;
use GuzzleHttp\HandlerStack;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Guzzle\CoroutineHandler;
/**
* 消息队列状态服务
*
* 通过 RabbitMQ Management HTTP API 查询队列状态,供 CLI 命令和 API 接口共用
*/
class MqStatusService
{
/**
* 业务队列类型
*/
public const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory'];
private ?Client $httpClient = null;
public function __construct(
protected readonly ConfigInterface $config,
) {}
/**
* 获取合法的队列类型列表
*
* @return array<string>
*/
public function getValidQueueTypes(): array
{
return self::QUEUE_TYPES;
}
/**
* 获取队列状态
*
* @param string|null $queue_type 筛选队列类型(orders/products/refunds/inventory
* @return array{business_queues: array, retry_queues: array, error_queue: array, summary: array, fetched_at: string}
*/
public function getStatus(?string $queue_type = null): array
{
$queue_types = $queue_type ? [$queue_type] : self::QUEUE_TYPES;
$business_queues = [];
$retry_queues = [];
$total_messages = 0;
$total_consumers = 0;
foreach ($queue_types as $type) {
$group_data = $this->fetchQueueGroupData($type);
foreach ($group_data as $queue_info) {
if (str_ends_with($queue_info['queue'], '.retry.queue')) {
$retry_queues[] = $queue_info;
} else {
$business_queues[] = $queue_info;
}
if (is_numeric($queue_info['messages'])) {
$total_messages += $queue_info['messages'];
}
if (is_numeric($queue_info['consumers'])) {
$total_consumers += $queue_info['consumers'];
}
}
}
// 错误队列仅在未筛选时返回
$error_queue = [];
if ($queue_type === null) {
$error_queue = $this->fetchQueueData('errors.queue');
if (is_numeric($error_queue['messages'])) {
$total_messages += $error_queue['messages'];
}
if (is_numeric($error_queue['consumers'])) {
$total_consumers += $error_queue['consumers'];
}
}
return [
'business_queues' => $business_queues,
'retry_queues' => $retry_queues,
'error_queue' => $error_queue,
'summary' => [
'total_messages' => $total_messages,
'total_consumers' => $total_consumers,
],
'fetched_at' => date('Y-m-d H:i:s'),
];
}
/**
* 获取队列组数据(主队列 + 重试队列)
*
* @return array<array{queue: string, messages: int|string, consumers: int|string, status: string}>
*/
private function fetchQueueGroupData(string $type): array
{
$queues = [
"{$type}.queue",
"{$type}.retry.queue",
];
$group_data = [];
foreach ($queues as $queue_name) {
$group_data[] = $this->fetchQueueData($queue_name);
}
return $group_data;
}
/**
* 通过 Management API 获取单个队列的数据
*
* @return array{queue: string, messages: int|string, consumers: int|string, status: string}
*/
private function fetchQueueData(string $queue_name): array
{
try {
$vhost = urlencode($this->config->get('amqp.default_consumer.vhost', '/'));
$response = $this->getHttpClient()->get("/api/queues/{$vhost}/{$queue_name}");
$data = json_decode($response->getBody()->getContents(), true);
return [
'queue' => $queue_name,
'messages' => $data['messages'] ?? 0,
'consumers' => $data['consumers'] ?? 0,
'status' => $this->getQueueStatus($data['messages'] ?? 0),
];
} catch (\Throwable) {
return [
'queue' => $queue_name,
'messages' => 'N/A',
'consumers' => 'N/A',
'status' => 'error',
];
}
}
/**
* 根据消息数量判断队列状态
*
* @return string 状态枚举:high_load/processing/active/empty
*/
public function getQueueStatus(int $message_count): string
{
if ($message_count > 100) {
return 'high_load';
}
if ($message_count > 10) {
return 'processing';
}
if ($message_count > 0) {
return 'active';
}
return 'empty';
}
/**
* 获取 Management API HTTP 客户端(懒初始化,协程安全)
*/
private function getHttpClient(): Client
{
if ($this->httpClient === null) {
$management = $this->config->get('amqp.management', []);
$this->httpClient = new Client([
'base_uri' => sprintf(
'http://%s:%d',
$management['host'] ?? 'localhost',
$management['port'] ?? 15672,
),
'auth' => [
$management['user'] ?? 'guest',
$management['password'] ?? 'guest',
],
'handler' => HandlerStack::create(new CoroutineHandler()),
'timeout' => 5,
'connect_timeout' => 3,
]);
}
return $this->httpClient;
}
}