add mq controller

This commit is contained in:
2026-03-13 14:50:06 +08:00
parent f165419e7c
commit a3e8a5cf5d
7 changed files with 7424 additions and 203 deletions
+79 -203
View File
@@ -4,21 +4,15 @@ declare(strict_types=1);
namespace App\Command;
use App\Service\MqStatusService;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Psr\Container\ContainerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Hyperf\Contract\ConfigInterface;
use Symfony\Component\Console\Input\InputOption;
#[Command]
class AppMqStatus extends HyperfCommand
{
/**
* 业务队列类型
*/
private const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory'];
public function __construct(protected ContainerInterface $container)
{
parent::__construct('app:mq:status');
@@ -59,116 +53,62 @@ class AppMqStatus extends HyperfCommand
$this->line('');
try {
$config = $this->container->get(ConfigInterface::class);
$consumerConfig = $config->get('amqp.default_consumer');
// 创建连接
$connection = new AMQPStreamConnection(
$consumerConfig['host'],
$consumerConfig['port'],
$consumerConfig['user'],
$consumerConfig['password'],
$consumerConfig['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
$consumerConfig['params']['connection_timeout'] ?? 3.0,
$consumerConfig['params']['read_write_timeout'] ?? 3.0,
null,
$consumerConfig['params']['keepalive'] ?? false,
$consumerConfig['params']['heartbeat'] ?? 0
);
$channel = $connection->channel();
$service = $this->container->get(MqStatusService::class);
// 获取 --queue 参数
$filterQueue = $this->input->getOption('queue');
// 确定要显示的队列组
$queueTypes = $filterQueue
? [$filterQueue]
: self::QUEUE_TYPES;
$filter_queue = $this->input->getOption('queue');
// 验证队列类型
if ($filterQueue && !in_array($filterQueue, self::QUEUE_TYPES)) {
$this->error("Invalid queue type: {$filterQueue}");
$this->line("Valid types: " . implode(', ', self::QUEUE_TYPES));
$channel->close();
$connection->close();
if ($filter_queue && !in_array($filter_queue, $service->getValidQueueTypes())) {
$this->error("Invalid queue type: {$filter_queue}");
$this->line("Valid types: " . implode(', ', $service->getValidQueueTypes()));
return 1;
}
$totalMessages = 0;
$totalConsumers = 0;
$allQueueNames = [];
// 收集主业务队列和死信队列的数据
$businessQueuesData = [];
$deadLetterQueuesData = [];
foreach ($queueTypes as $type) {
$groupData = $this->fetchQueueGroupData($channel, $type);
foreach ($groupData as $queueInfo) {
// 区分主队列和重试队列(死信队列)
if (str_ends_with($queueInfo['queue'], '.retry.queue')) {
$deadLetterQueuesData[] = $queueInfo;
} else {
$businessQueuesData[] = $queueInfo;
}
if (is_numeric($queueInfo['messages'])) {
$totalMessages += $queueInfo['messages'];
}
if (is_numeric($queueInfo['consumers'])) {
$totalConsumers += $queueInfo['consumers'];
}
$allQueueNames[] = $queueInfo['queue'];
}
}
// 通过 Service 获取队列状态数据
$status = $service->getStatus($filter_queue);
// 显示主业务队列表格
$this->displayBusinessQueues($businessQueuesData, $filterQueue);
$this->displayBusinessQueues($status['business_queues'], $filter_queue);
// 显示死信队列(重试队列)
if (!empty($deadLetterQueuesData)) {
if (!empty($status['retry_queues'])) {
$this->line('');
$this->displayDeadLetterQueues($deadLetterQueuesData, $filterQueue);
$this->displayDeadLetterQueues($status['retry_queues'], $filter_queue);
}
// 显示共享的错误队列
if (!$filterQueue) {
if (!empty($status['error_queue'])) {
$this->line('');
$errorQueueData = $this->fetchQueueData($channel, 'errors.queue');
$this->displaySharedQueues([$errorQueueData]);
if (is_numeric($errorQueueData['messages'])) {
$totalMessages += $errorQueueData['messages'];
}
if (is_numeric($errorQueueData['consumers'])) {
$totalConsumers += $errorQueueData['consumers'];
}
$allQueueNames[] = $errorQueueData['queue'];
$this->displaySharedQueues([$status['error_queue']]);
}
// 关闭连接
$channel->close();
$connection->close();
// 显示汇总信息
$this->line('');
$this->info("=== Summary ===");
$this->line("Total messages: <fg=yellow>{$totalMessages}</>");
$this->line("Total active consumers: <fg=cyan>{$totalConsumers}</>");
$this->line("Total messages: <fg=yellow>{$status['summary']['total_messages']}</>");
$this->line("Total active consumers: <fg=cyan>{$status['summary']['total_consumers']}</>");
$this->line('');
// 列出所有监控的队列名称
$all_queue_names = [];
foreach ($status['business_queues'] as $q) {
$all_queue_names[] = $q['queue'];
}
foreach ($status['retry_queues'] as $q) {
$all_queue_names[] = $q['queue'];
}
if (!empty($status['error_queue'])) {
$all_queue_names[] = $status['error_queue']['queue'];
}
$this->line('Queues monitored:');
foreach ($allQueueNames as $queueName) {
$this->line(" - {$queueName}");
foreach ($all_queue_names as $queue_name) {
$this->line(" - {$queue_name}");
}
return 0;
} catch (\Exception $e) {
} catch (\Throwable $e) {
$this->error('Failed to fetch queue status: ' . $e->getMessage());
$this->line('Trace: ' . $e->getTraceAsString(), 'comment');
return 1;
@@ -176,96 +116,41 @@ class AppMqStatus extends HyperfCommand
}
/**
* 获取队列组数据(主队列 + 重试队列)
* 将 Service 返回的 status 枚举值转为 CLI ANSI 格式
*/
private function fetchQueueGroupData($channel, string $type): array
private function formatStatus(string $status): string
{
$queues = [
"{$type}.queue", // 主业务队列
"{$type}.retry.queue", // 重试队列
];
$groupData = [];
foreach ($queues as $queueName) {
$groupData[] = $this->fetchQueueData($channel, $queueName);
}
return $groupData;
}
/**
* 获取单个队列的数据
*/
private function fetchQueueData($channel, string $queueName): array
{
try {
// 使用 passive=true 来获取队列信息而不创建队列
[$queue, $messageCount, $consumerCount] = $channel->queue_declare(
$queueName,
true, // passive
true, // durable
false, // exclusive
false // auto_delete
);
return [
'queue' => $queueName,
'messages' => $messageCount,
'consumers' => $consumerCount,
'status' => $this->getQueueStatus($messageCount, $consumerCount),
];
} catch (\Exception $e) {
return [
'queue' => $queueName,
'messages' => 'N/A',
'consumers' => 'N/A',
'status' => '<fg=red>Error: ' . $e->getMessage() . '</>',
];
}
}
/**
* 获取队列状态描述
*/
private function getQueueStatus(int $messageCount, int $consumerCount): string
{
if ($messageCount > 100) {
return '<fg=red>⚠ High Load</>';
} elseif ($messageCount > 10) {
return '<fg=yellow>⚡ Processing</>';
} elseif ($messageCount > 0) {
return '<fg=cyan>✓ Active</>';
} else {
return '<fg=green>✓ Empty</>';
}
return match ($status) {
'high_load' => '<fg=red>⚠ High Load</>',
'processing' => '<fg=yellow>⚡ Processing</>',
'active' => '<fg=cyan>✓ Active</>',
'empty' => '<fg=green>✓ Empty</>',
'error' => '<fg=red>✗ Error</>',
default => $status,
};
}
/**
* 显示业务队列(合并所有队列组)- 转置显示
*/
private function displayBusinessQueues(array $allGroupsData, ?string $filterQueue): void
private function displayBusinessQueues(array $all_groups_data, ?string $filter_queue): void
{
// 构建标题
$title = $filterQueue
? "Business Queues (Filtered: {$filterQueue})"
$title = $filter_queue
? "Business Queues (Filtered: {$filter_queue})"
: "Business Queues";
$this->line("<fg=blue>=== {$title} ===</>");
// 构建表头 - 使用简化的队列名称(去掉 .queue)
$headers = ['Metric'];
foreach ($allGroupsData as $queueInfo) {
$queueName = $queueInfo['queue'];
// 去掉 .queue 后缀
$simpleName = str_replace('.queue', '', $queueName);
$headers[] = $simpleName;
foreach ($all_groups_data as $queue_info) {
$simple_name = str_replace('.queue', '', $queue_info['queue']);
$headers[] = $simple_name;
}
// 构建行数据 - 转置显示
$rows = [
$this->buildMetricRow('Messages', $allGroupsData, 'messages'),
$this->buildMetricRow('Consumers', $allGroupsData, 'consumers'),
$this->buildMetricRow('Status', $allGroupsData, 'status'),
$this->buildMetricRow('Messages', $all_groups_data, 'messages'),
$this->buildMetricRow('Consumers', $all_groups_data, 'consumers'),
$this->buildMetricRow('Status', $all_groups_data, 'status'),
];
$this->table($headers, $rows);
@@ -275,34 +160,29 @@ class AppMqStatus extends HyperfCommand
/**
* 显示死信队列(重试队列)- 转置显示
*/
private function displayDeadLetterQueues(array $queuesData, ?string $filterQueue): void
private function displayDeadLetterQueues(array $queues_data, ?string $filter_queue): void
{
$title = $filterQueue
? "Dead Letter Queues (Filtered: {$filterQueue})"
$title = $filter_queue
? "Dead Letter Queues (Filtered: {$filter_queue})"
: "Dead Letter Queues (Retry Queues)";
$this->line("<fg=magenta>=== {$title} ===</>");
// 构建表头
$headers = ['Metric'];
foreach ($queuesData as $queueInfo) {
$queueName = $queueInfo['queue'];
// 去掉 .queue 后缀,保留 retry 标识
$simpleName = str_replace('.queue', '', $queueName);
$headers[] = $simpleName;
foreach ($queues_data as $queue_info) {
$simple_name = str_replace('.queue', '', $queue_info['queue']);
$headers[] = $simple_name;
}
// 构建行数据
$rows = [
$this->buildMetricRow('Messages', $queuesData, 'messages'),
$this->buildMetricRow('Consumers', $queuesData, 'consumers'),
$this->buildMetricRow('Status', $queuesData, 'status'),
$this->buildMetricRow('Messages', $queues_data, 'messages'),
$this->buildMetricRow('Consumers', $queues_data, 'consumers'),
$this->buildMetricRow('Status', $queues_data, 'status'),
];
$this->table($headers, $rows);
// 添加说明
if (!empty($queuesData) && $this->hasMessages($queuesData)) {
if (!empty($queues_data) && $this->hasMessages($queues_data)) {
$this->line('<fg=yellow> These queues receive messages from DLX when main queue processing fails</>');
}
}
@@ -310,30 +190,25 @@ class AppMqStatus extends HyperfCommand
/**
* 显示共享队列(errors.queue- 转置显示
*/
private function displaySharedQueues(array $queuesData): void
private function displaySharedQueues(array $queues_data): void
{
$this->line("<fg=blue>=== Shared Queues ===</>");
// 构建表头
$headers = ['Metric'];
foreach ($queuesData as $queueInfo) {
$queueName = $queueInfo['queue'];
// 去掉 .queue 后缀
$simpleName = str_replace('.queue', '', $queueName);
$headers[] = $simpleName;
foreach ($queues_data as $queue_info) {
$simple_name = str_replace('.queue', '', $queue_info['queue']);
$headers[] = $simple_name;
}
// 构建行数据
$rows = [
$this->buildMetricRow('Messages', $queuesData, 'messages'),
$this->buildMetricRow('Consumers', $queuesData, 'consumers'),
$this->buildMetricRow('Status', $queuesData, 'status'),
$this->buildMetricRow('Messages', $queues_data, 'messages'),
$this->buildMetricRow('Consumers', $queues_data, 'consumers'),
$this->buildMetricRow('Status', $queues_data, 'status'),
];
$this->table($headers, $rows);
// 添加说明
if (!empty($queuesData) && $this->hasMessages($queuesData)) {
if (!empty($queues_data) && $this->hasMessages($queues_data)) {
$this->line('<fg=yellow> Error queue contains messages that exceeded max retry count</>');
}
}
@@ -341,10 +216,10 @@ class AppMqStatus extends HyperfCommand
/**
* 检查队列中是否有消息
*/
private function hasMessages(array $queuesData): bool
private function hasMessages(array $queues_data): bool
{
foreach ($queuesData as $queueInfo) {
if (is_numeric($queueInfo['messages']) && $queueInfo['messages'] > 0) {
foreach ($queues_data as $queue_info) {
if (is_numeric($queue_info['messages']) && $queue_info['messages'] > 0) {
return true;
}
}
@@ -354,15 +229,16 @@ class AppMqStatus extends HyperfCommand
/**
* 构建指标行数据
*/
private function buildMetricRow(string $metricName, array $queuesData, string $field): array
private function buildMetricRow(string $metric_name, array $queues_data, string $field): array
{
$row = [$metricName];
$row = [$metric_name];
foreach ($queuesData as $queueInfo) {
$value = $queueInfo[$field];
foreach ($queues_data as $queue_info) {
$value = $queue_info[$field];
// 根据字段类型格式化显示
if ($field === 'messages' || $field === 'consumers') {
if ($field === 'status') {
$row[] = $this->formatStatus((string) $value);
} elseif ($field === 'messages' || $field === 'consumers') {
$row[] = $this->formatNumber($value);
} else {
$row[] = $value;
@@ -375,7 +251,7 @@ class AppMqStatus extends HyperfCommand
/**
* 格式化数字显示
*/
private function formatNumber($value): string
private function formatNumber(mixed $value): string
{
if (!is_numeric($value)) {
return (string) $value;