2026-03-13 14:50:06 +08:00
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
|
|
namespace App\Service;
|
|
|
|
|
|
|
2026-03-17 10:37:02 +08:00
|
|
|
|
use GuzzleHttp\Client;
|
|
|
|
|
|
use GuzzleHttp\HandlerStack;
|
2026-03-13 14:50:06 +08:00
|
|
|
|
use Hyperf\Contract\ConfigInterface;
|
2026-03-17 10:37:02 +08:00
|
|
|
|
use Hyperf\Guzzle\CoroutineHandler;
|
2026-03-13 14:50:06 +08:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 消息队列状态服务
|
|
|
|
|
|
*
|
2026-03-17 10:37:02 +08:00
|
|
|
|
* 通过 RabbitMQ Management HTTP API 查询队列状态,供 CLI 命令和 API 接口共用
|
2026-03-13 14:50:06 +08:00
|
|
|
|
*/
|
|
|
|
|
|
class MqStatusService
|
|
|
|
|
|
{
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 业务队列类型
|
|
|
|
|
|
*/
|
|
|
|
|
|
public const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory'];
|
|
|
|
|
|
|
2026-03-17 10:37:02 +08:00
|
|
|
|
private ?Client $httpClient = null;
|
|
|
|
|
|
|
2026-03-13 14:50:06 +08:00
|
|
|
|
public function __construct(
|
|
|
|
|
|
protected readonly ConfigInterface $config,
|
|
|
|
|
|
) {}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 获取合法的队列类型列表
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return array<string>
|
|
|
|
|
|
*/
|
|
|
|
|
|
public function getValidQueueTypes(): array
|
|
|
|
|
|
{
|
|
|
|
|
|
return self::QUEUE_TYPES;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 获取队列状态
|
|
|
|
|
|
*
|
|
|
|
|
|
* @param string|null $queue_type 筛选队列类型(orders/products/refunds/inventory)
|
|
|
|
|
|
* @return array{business_queues: array, retry_queues: array, error_queue: array, summary: array, fetched_at: string}
|
|
|
|
|
|
*/
|
|
|
|
|
|
public function getStatus(?string $queue_type = null): array
|
|
|
|
|
|
{
|
2026-03-17 10:37:02 +08:00
|
|
|
|
$queue_types = $queue_type ? [$queue_type] : self::QUEUE_TYPES;
|
2026-03-13 14:50:06 +08:00
|
|
|
|
|
2026-03-17 10:37:02 +08:00
|
|
|
|
$business_queues = [];
|
|
|
|
|
|
$retry_queues = [];
|
|
|
|
|
|
$total_messages = 0;
|
|
|
|
|
|
$total_consumers = 0;
|
|
|
|
|
|
|
|
|
|
|
|
foreach ($queue_types as $type) {
|
|
|
|
|
|
$group_data = $this->fetchQueueGroupData($type);
|
2026-03-13 14:50:06 +08:00
|
|
|
|
|
2026-03-17 10:37:02 +08:00
|
|
|
|
foreach ($group_data as $queue_info) {
|
|
|
|
|
|
if (str_ends_with($queue_info['queue'], '.retry.queue')) {
|
|
|
|
|
|
$retry_queues[] = $queue_info;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
$business_queues[] = $queue_info;
|
|
|
|
|
|
}
|
2026-03-13 14:50:06 +08:00
|
|
|
|
|
2026-03-17 10:37:02 +08:00
|
|
|
|
if (is_numeric($queue_info['messages'])) {
|
|
|
|
|
|
$total_messages += $queue_info['messages'];
|
2026-03-13 14:50:06 +08:00
|
|
|
|
}
|
2026-03-17 10:37:02 +08:00
|
|
|
|
if (is_numeric($queue_info['consumers'])) {
|
|
|
|
|
|
$total_consumers += $queue_info['consumers'];
|
2026-03-13 14:50:06 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-03-17 10:37:02 +08:00
|
|
|
|
}
|
2026-03-13 14:50:06 +08:00
|
|
|
|
|
2026-03-17 10:37:02 +08:00
|
|
|
|
// 错误队列仅在未筛选时返回
|
|
|
|
|
|
$error_queue = [];
|
|
|
|
|
|
if ($queue_type === null) {
|
|
|
|
|
|
$error_queue = $this->fetchQueueData('errors.queue');
|
|
|
|
|
|
|
|
|
|
|
|
if (is_numeric($error_queue['messages'])) {
|
|
|
|
|
|
$total_messages += $error_queue['messages'];
|
|
|
|
|
|
}
|
|
|
|
|
|
if (is_numeric($error_queue['consumers'])) {
|
|
|
|
|
|
$total_consumers += $error_queue['consumers'];
|
|
|
|
|
|
}
|
2026-03-13 14:50:06 +08:00
|
|
|
|
}
|
2026-03-17 10:37:02 +08:00
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
|
'business_queues' => $business_queues,
|
|
|
|
|
|
'retry_queues' => $retry_queues,
|
|
|
|
|
|
'error_queue' => $error_queue,
|
|
|
|
|
|
'summary' => [
|
|
|
|
|
|
'total_messages' => $total_messages,
|
|
|
|
|
|
'total_consumers' => $total_consumers,
|
|
|
|
|
|
],
|
|
|
|
|
|
'fetched_at' => date('Y-m-d H:i:s'),
|
|
|
|
|
|
];
|
2026-03-13 14:50:06 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 获取队列组数据(主队列 + 重试队列)
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return array<array{queue: string, messages: int|string, consumers: int|string, status: string}>
|
|
|
|
|
|
*/
|
2026-03-17 10:37:02 +08:00
|
|
|
|
private function fetchQueueGroupData(string $type): array
|
2026-03-13 14:50:06 +08:00
|
|
|
|
{
|
|
|
|
|
|
$queues = [
|
|
|
|
|
|
"{$type}.queue",
|
|
|
|
|
|
"{$type}.retry.queue",
|
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
|
|
$group_data = [];
|
|
|
|
|
|
foreach ($queues as $queue_name) {
|
2026-03-17 10:37:02 +08:00
|
|
|
|
$group_data[] = $this->fetchQueueData($queue_name);
|
2026-03-13 14:50:06 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return $group_data;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
2026-03-17 10:37:02 +08:00
|
|
|
|
* 通过 Management API 获取单个队列的数据
|
2026-03-13 14:50:06 +08:00
|
|
|
|
*
|
|
|
|
|
|
* @return array{queue: string, messages: int|string, consumers: int|string, status: string}
|
|
|
|
|
|
*/
|
2026-03-17 10:37:02 +08:00
|
|
|
|
private function fetchQueueData(string $queue_name): array
|
2026-03-13 14:50:06 +08:00
|
|
|
|
{
|
|
|
|
|
|
try {
|
2026-03-17 10:37:02 +08:00
|
|
|
|
$vhost = urlencode($this->config->get('amqp.default_consumer.vhost', '/'));
|
|
|
|
|
|
$response = $this->getHttpClient()->get("/api/queues/{$vhost}/{$queue_name}");
|
|
|
|
|
|
$data = json_decode($response->getBody()->getContents(), true);
|
2026-03-13 14:50:06 +08:00
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
|
'queue' => $queue_name,
|
2026-03-17 10:37:02 +08:00
|
|
|
|
'messages' => $data['messages'] ?? 0,
|
|
|
|
|
|
'consumers' => $data['consumers'] ?? 0,
|
|
|
|
|
|
'status' => $this->getQueueStatus($data['messages'] ?? 0),
|
2026-03-13 14:50:06 +08:00
|
|
|
|
];
|
2026-03-17 10:37:02 +08:00
|
|
|
|
} catch (\Throwable) {
|
2026-03-13 14:50:06 +08:00
|
|
|
|
return [
|
|
|
|
|
|
'queue' => $queue_name,
|
|
|
|
|
|
'messages' => 'N/A',
|
|
|
|
|
|
'consumers' => 'N/A',
|
|
|
|
|
|
'status' => 'error',
|
|
|
|
|
|
];
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 根据消息数量判断队列状态
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return string 状态枚举:high_load/processing/active/empty
|
|
|
|
|
|
*/
|
|
|
|
|
|
public function getQueueStatus(int $message_count): string
|
|
|
|
|
|
{
|
|
|
|
|
|
if ($message_count > 100) {
|
|
|
|
|
|
return 'high_load';
|
|
|
|
|
|
}
|
|
|
|
|
|
if ($message_count > 10) {
|
|
|
|
|
|
return 'processing';
|
|
|
|
|
|
}
|
|
|
|
|
|
if ($message_count > 0) {
|
|
|
|
|
|
return 'active';
|
|
|
|
|
|
}
|
|
|
|
|
|
return 'empty';
|
|
|
|
|
|
}
|
2026-03-17 10:37:02 +08:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 获取 Management API HTTP 客户端(懒初始化,协程安全)
|
|
|
|
|
|
*/
|
|
|
|
|
|
private function getHttpClient(): Client
|
|
|
|
|
|
{
|
|
|
|
|
|
if ($this->httpClient === null) {
|
|
|
|
|
|
$management = $this->config->get('amqp.management', []);
|
|
|
|
|
|
|
|
|
|
|
|
$this->httpClient = new Client([
|
|
|
|
|
|
'base_uri' => sprintf(
|
|
|
|
|
|
'http://%s:%d',
|
|
|
|
|
|
$management['host'] ?? 'localhost',
|
|
|
|
|
|
$management['port'] ?? 15672,
|
|
|
|
|
|
),
|
|
|
|
|
|
'auth' => [
|
|
|
|
|
|
$management['user'] ?? 'guest',
|
|
|
|
|
|
$management['password'] ?? 'guest',
|
|
|
|
|
|
],
|
|
|
|
|
|
'handler' => HandlerStack::create(new CoroutineHandler()),
|
|
|
|
|
|
'timeout' => 5,
|
|
|
|
|
|
'connect_timeout' => 3,
|
|
|
|
|
|
]);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return $this->httpClient;
|
|
|
|
|
|
}
|
2026-03-13 14:50:06 +08:00
|
|
|
|
}
|