Files
datahub/backend/app/Controller/Api/V1/MqStatusController.php
T
2026-03-17 10:30:31 +08:00

114 lines
4.4 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace App\Controller\Api\V1;
use App\Controller\AbstractController;
use App\Middleware\AuthMiddleware;
use App\Middleware\PermissionMiddleware;
use App\Service\MqStatusService;
use App\Utils\Log;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\Middleware;
use Hyperf\HttpServer\Annotation\RequestMapping;
use OpenApi\Attributes as OA;
use Psr\Http\Message\ResponseInterface;
/**
* 消息队列状态监控接口
*
* 仅 admin 角色可访问,返回 RabbitMQ 队列运行状态
*/
#[OA\Tag(name: 'MQ Status', description: '消息队列状态监控')]
#[Controller(prefix: "/api/v1/mq")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
class MqStatusController extends AbstractController
{
public function __construct(
protected readonly MqStatusService $mqStatusService,
) {}
/**
* 获取消息队列状态
*/
#[OA\Get(
path: '/api/v1/mq/status',
summary: '获取消息队列状态',
description: '查询 RabbitMQ 各队列的消息数、消费者数和运行状态。支持按队列类型筛选。仅 admin 角色可访问。',
security: [['bearerAuth' => []]],
tags: ['MQ Status'],
parameters: [
new OA\Parameter(
name: 'queue',
in: 'query',
required: false,
description: '队列类型筛选(orders/products/refunds/inventory',
schema: new OA\Schema(type: 'string', enum: ['orders', 'products', 'refunds', 'inventory'])
),
],
responses: [
new OA\Response(
response: 200,
description: '获取成功',
content: new OA\JsonContent(properties: [
new OA\Property(property: 'code', type: 'integer', example: 0),
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
new OA\Property(property: 'data', ref: '#/components/schemas/MqQueueStatus'),
])
),
new OA\Response(response: 400, description: '无效的队列类型参数', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 500, description: 'RabbitMQ 连接异常', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
]
)]
#[RequestMapping(path: "status", methods: "GET")]
public function status(): ResponseInterface|array
{
$queue_type = $this->request->input('queue');
// 校验队列类型参数
if ($queue_type !== null && !in_array($queue_type, $this->mqStatusService->getValidQueueTypes(), true)) {
return $this->response->json([
'code' => 400,
'message' => "无效的队列类型: {$queue_type},可选值: " . implode(', ', $this->mqStatusService->getValidQueueTypes()),
'data' => null,
])->withStatus(400);
}
try {
$data = $this->mqStatusService->getStatus($queue_type);
return [
'code' => 0,
'message' => '获取成功',
'data' => $data,
];
} catch (\Throwable $e) {
Log::get()->error('MQ status query failed', [
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString(),
]);
// 脱敏:mask 异常消息中的敏感信息(IP、端口、认证凭证)
$safe_message = preg_replace(
[
'/\/\/[^:]+:[^@]+@/', // user:pass@ → //***:***@
'/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/', // IP 地址
'/:\d{4,5}(?=[\/\s\)]|$)/', // 端口号
],
['//***:***@', '***.***.***.***', ':****'],
$e->getMessage()
);
return $this->response->json([
'code' => 500,
'message' => 'RabbitMQ 连接异常: ' . $safe_message,
'data' => null,
])->withStatus(500);
}
}
}