Files
datahub/backend/app/Command/AppMqStatus.php
T
2025-12-01 11:12:33 +08:00

397 lines
13 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace App\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Psr\Container\ContainerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Hyperf\Contract\ConfigInterface;
use Symfony\Component\Console\Input\InputOption;
#[Command]
class AppMqStatus extends HyperfCommand
{
/**
* 业务队列类型
*/
private const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory'];
public function __construct(protected ContainerInterface $container)
{
parent::__construct('app:mq:status');
}
public function configure()
{
parent::configure();
$this->setDescription('Display message counts for all accessible queues');
$this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds');
$this->addOption('interval', 'i', InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3');
$this->addOption('queue', null, InputOption::VALUE_OPTIONAL, 'Filter by queue type (orders, products, refunds, inventory)');
}
public function handle()
{
$watchMode = $this->input->getOption('watch');
$interval = (int) $this->input->getOption('interval');
if ($watchMode) {
$this->info("Watch mode enabled. Refreshing every {$interval} seconds. Press Ctrl+C to exit.");
$this->line('');
while (true) {
// 清屏(对于支持 ANSI 的终端)
$this->output->write("\033[2J\033[H");
$this->displayQueueStatus();
sleep($interval);
}
} else {
return $this->displayQueueStatus();
}
}
private function displayQueueStatus(): int
{
$this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info');
$this->line('');
try {
$config = $this->container->get(ConfigInterface::class);
$consumerConfig = $config->get('amqp.default_consumer');
// 创建连接
$connection = new AMQPStreamConnection(
$consumerConfig['host'],
$consumerConfig['port'],
$consumerConfig['user'],
$consumerConfig['password'],
$consumerConfig['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
$consumerConfig['params']['connection_timeout'] ?? 3.0,
$consumerConfig['params']['read_write_timeout'] ?? 3.0,
null,
$consumerConfig['params']['keepalive'] ?? false,
$consumerConfig['params']['heartbeat'] ?? 0
);
$channel = $connection->channel();
// 获取 --queue 参数
$filterQueue = $this->input->getOption('queue');
// 确定要显示的队列组
$queueTypes = $filterQueue
? [$filterQueue]
: self::QUEUE_TYPES;
// 验证队列类型
if ($filterQueue && !in_array($filterQueue, self::QUEUE_TYPES)) {
$this->error("Invalid queue type: {$filterQueue}");
$this->line("Valid types: " . implode(', ', self::QUEUE_TYPES));
$channel->close();
$connection->close();
return 1;
}
$totalMessages = 0;
$totalConsumers = 0;
$allQueueNames = [];
// 收集主业务队列和死信队列的数据
$businessQueuesData = [];
$deadLetterQueuesData = [];
foreach ($queueTypes as $type) {
$groupData = $this->fetchQueueGroupData($channel, $type);
foreach ($groupData as $queueInfo) {
// 区分主队列和重试队列(死信队列)
if (str_ends_with($queueInfo['queue'], '.retry.queue')) {
$deadLetterQueuesData[] = $queueInfo;
} else {
$businessQueuesData[] = $queueInfo;
}
if (is_numeric($queueInfo['messages'])) {
$totalMessages += $queueInfo['messages'];
}
if (is_numeric($queueInfo['consumers'])) {
$totalConsumers += $queueInfo['consumers'];
}
$allQueueNames[] = $queueInfo['queue'];
}
}
// 显示主业务队列表格
$this->displayBusinessQueues($businessQueuesData, $filterQueue);
// 显示死信队列(重试队列)
if (!empty($deadLetterQueuesData)) {
$this->line('');
$this->displayDeadLetterQueues($deadLetterQueuesData, $filterQueue);
}
// 显示共享的错误队列
if (!$filterQueue) {
$this->line('');
$errorQueueData = $this->fetchQueueData($channel, 'errors.queue');
$this->displaySharedQueues([$errorQueueData]);
if (is_numeric($errorQueueData['messages'])) {
$totalMessages += $errorQueueData['messages'];
}
if (is_numeric($errorQueueData['consumers'])) {
$totalConsumers += $errorQueueData['consumers'];
}
$allQueueNames[] = $errorQueueData['queue'];
}
// 关闭连接
$channel->close();
$connection->close();
// 显示汇总信息
$this->line('');
$this->info("=== Summary ===");
$this->line("Total messages: <fg=yellow>{$totalMessages}</>");
$this->line("Total active consumers: <fg=cyan>{$totalConsumers}</>");
$this->line('');
$this->line('Queues monitored:');
foreach ($allQueueNames as $queueName) {
$this->line(" - {$queueName}");
}
return 0;
} catch (\Exception $e) {
$this->error('Failed to fetch queue status: ' . $e->getMessage());
$this->line('Trace: ' . $e->getTraceAsString(), 'comment');
return 1;
}
}
/**
* 获取队列组数据(主队列 + 重试队列)
*/
private function fetchQueueGroupData($channel, string $type): array
{
$queues = [
"{$type}.queue", // 主业务队列
"{$type}.retry.queue", // 重试队列
];
$groupData = [];
foreach ($queues as $queueName) {
$groupData[] = $this->fetchQueueData($channel, $queueName);
}
return $groupData;
}
/**
* 获取单个队列的数据
*/
private function fetchQueueData($channel, string $queueName): array
{
try {
// 使用 passive=true 来获取队列信息而不创建队列
[$queue, $messageCount, $consumerCount] = $channel->queue_declare(
$queueName,
true, // passive
true, // durable
false, // exclusive
false // auto_delete
);
return [
'queue' => $queueName,
'messages' => $messageCount,
'consumers' => $consumerCount,
'status' => $this->getQueueStatus($messageCount, $consumerCount),
];
} catch (\Exception $e) {
return [
'queue' => $queueName,
'messages' => 'N/A',
'consumers' => 'N/A',
'status' => '<fg=red>Error: ' . $e->getMessage() . '</>',
];
}
}
/**
* 获取队列状态描述
*/
private function getQueueStatus(int $messageCount, int $consumerCount): string
{
if ($messageCount > 100) {
return '<fg=red>⚠ High Load</>';
} elseif ($messageCount > 10) {
return '<fg=yellow>⚡ Processing</>';
} elseif ($messageCount > 0) {
return '<fg=cyan>✓ Active</>';
} else {
return '<fg=green>✓ Empty</>';
}
}
/**
* 显示业务队列(合并所有队列组)- 转置显示
*/
private function displayBusinessQueues(array $allGroupsData, ?string $filterQueue): void
{
// 构建标题
$title = $filterQueue
? "Business Queues (Filtered: {$filterQueue})"
: "Business Queues";
$this->line("<fg=blue>=== {$title} ===</>");
// 构建表头 - 使用简化的队列名称(去掉 .queue)
$headers = ['Metric'];
foreach ($allGroupsData as $queueInfo) {
$queueName = $queueInfo['queue'];
// 去掉 .queue 后缀
$simpleName = str_replace('.queue', '', $queueName);
$headers[] = $simpleName;
}
// 构建行数据 - 转置显示
$rows = [
$this->buildMetricRow('Messages', $allGroupsData, 'messages'),
$this->buildMetricRow('Consumers', $allGroupsData, 'consumers'),
$this->buildMetricRow('Status', $allGroupsData, 'status'),
];
$this->table($headers, $rows);
$this->line('');
}
/**
* 显示死信队列(重试队列)- 转置显示
*/
private function displayDeadLetterQueues(array $queuesData, ?string $filterQueue): void
{
$title = $filterQueue
? "Dead Letter Queues (Filtered: {$filterQueue})"
: "Dead Letter Queues (Retry Queues)";
$this->line("<fg=magenta>=== {$title} ===</>");
// 构建表头
$headers = ['Metric'];
foreach ($queuesData as $queueInfo) {
$queueName = $queueInfo['queue'];
// 去掉 .queue 后缀,保留 retry 标识
$simpleName = str_replace('.queue', '', $queueName);
$headers[] = $simpleName;
}
// 构建行数据
$rows = [
$this->buildMetricRow('Messages', $queuesData, 'messages'),
$this->buildMetricRow('Consumers', $queuesData, 'consumers'),
$this->buildMetricRow('Status', $queuesData, 'status'),
];
$this->table($headers, $rows);
// 添加说明
if (!empty($queuesData) && $this->hasMessages($queuesData)) {
$this->line('<fg=yellow> These queues receive messages from DLX when main queue processing fails</>');
}
}
/**
* 显示共享队列(errors.queue- 转置显示
*/
private function displaySharedQueues(array $queuesData): void
{
$this->line("<fg=blue>=== Shared Queues ===</>");
// 构建表头
$headers = ['Metric'];
foreach ($queuesData as $queueInfo) {
$queueName = $queueInfo['queue'];
// 去掉 .queue 后缀
$simpleName = str_replace('.queue', '', $queueName);
$headers[] = $simpleName;
}
// 构建行数据
$rows = [
$this->buildMetricRow('Messages', $queuesData, 'messages'),
$this->buildMetricRow('Consumers', $queuesData, 'consumers'),
$this->buildMetricRow('Status', $queuesData, 'status'),
];
$this->table($headers, $rows);
// 添加说明
if (!empty($queuesData) && $this->hasMessages($queuesData)) {
$this->line('<fg=yellow> Error queue contains messages that exceeded max retry count</>');
}
}
/**
* 检查队列中是否有消息
*/
private function hasMessages(array $queuesData): bool
{
foreach ($queuesData as $queueInfo) {
if (is_numeric($queueInfo['messages']) && $queueInfo['messages'] > 0) {
return true;
}
}
return false;
}
/**
* 构建指标行数据
*/
private function buildMetricRow(string $metricName, array $queuesData, string $field): array
{
$row = [$metricName];
foreach ($queuesData as $queueInfo) {
$value = $queueInfo[$field];
// 根据字段类型格式化显示
if ($field === 'messages' || $field === 'consumers') {
$row[] = $this->formatNumber($value);
} else {
$row[] = $value;
}
}
return $row;
}
/**
* 格式化数字显示
*/
private function formatNumber($value): string
{
if (!is_numeric($value)) {
return (string) $value;
}
$num = (int) $value;
if ($num > 1000) {
return '<fg=red>' . number_format($num) . '</>';
} elseif ($num > 100) {
return '<fg=yellow>' . number_format($num) . '</>';
} elseif ($num > 0) {
return '<fg=cyan>' . $num . '</>';
} else {
return '<fg=gray>' . $num . '</>';
}
}
}