Files
datahub/backend/app/Command/AppMqStatus.php
T

273 lines
8.8 KiB
PHP
Raw Normal View History

2025-11-27 15:03:25 +08:00
<?php
declare(strict_types=1);
namespace App\Command;
2026-03-13 14:50:06 +08:00
use App\Service\MqStatusService;
2025-11-27 15:03:25 +08:00
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Psr\Container\ContainerInterface;
2025-12-01 11:12:33 +08:00
use Symfony\Component\Console\Input\InputOption;
2025-11-27 15:03:25 +08:00
#[Command]
class AppMqStatus extends HyperfCommand
{
public function __construct(protected ContainerInterface $container)
{
parent::__construct('app:mq:status');
}
public function configure()
{
parent::configure();
$this->setDescription('Display message counts for all accessible queues');
2025-12-01 11:12:33 +08:00
$this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds');
$this->addOption('interval', 'i', InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3');
$this->addOption('queue', null, InputOption::VALUE_OPTIONAL, 'Filter by queue type (orders, products, refunds, inventory)');
2025-11-27 15:03:25 +08:00
}
public function handle()
{
$watchMode = $this->input->getOption('watch');
$interval = (int) $this->input->getOption('interval');
if ($watchMode) {
$this->info("Watch mode enabled. Refreshing every {$interval} seconds. Press Ctrl+C to exit.");
$this->line('');
while (true) {
// 清屏(对于支持 ANSI 的终端)
$this->output->write("\033[2J\033[H");
$this->displayQueueStatus();
sleep($interval);
}
} else {
return $this->displayQueueStatus();
}
}
private function displayQueueStatus(): int
{
$this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info');
2025-12-01 11:12:33 +08:00
$this->line('');
2025-11-27 15:03:25 +08:00
try {
2026-03-13 14:50:06 +08:00
$service = $this->container->get(MqStatusService::class);
2025-11-27 15:03:25 +08:00
2025-12-01 11:12:33 +08:00
// 获取 --queue 参数
2026-03-13 14:50:06 +08:00
$filter_queue = $this->input->getOption('queue');
2025-12-01 11:12:33 +08:00
// 验证队列类型
2026-03-13 14:50:06 +08:00
if ($filter_queue && !in_array($filter_queue, $service->getValidQueueTypes())) {
$this->error("Invalid queue type: {$filter_queue}");
$this->line("Valid types: " . implode(', ', $service->getValidQueueTypes()));
2025-12-01 11:12:33 +08:00
return 1;
}
2025-11-27 15:03:25 +08:00
2026-03-13 14:50:06 +08:00
// 通过 Service 获取队列状态数据
$status = $service->getStatus($filter_queue);
2025-12-01 11:12:33 +08:00
// 显示主业务队列表格
2026-03-13 14:50:06 +08:00
$this->displayBusinessQueues($status['business_queues'], $filter_queue);
2025-12-01 11:12:33 +08:00
// 显示死信队列(重试队列)
2026-03-13 14:50:06 +08:00
if (!empty($status['retry_queues'])) {
2025-12-01 11:12:33 +08:00
$this->line('');
2026-03-13 14:50:06 +08:00
$this->displayDeadLetterQueues($status['retry_queues'], $filter_queue);
2025-12-01 11:12:33 +08:00
}
// 显示共享的错误队列
2026-03-13 14:50:06 +08:00
if (!empty($status['error_queue'])) {
2025-12-01 11:12:33 +08:00
$this->line('');
2026-03-13 14:50:06 +08:00
$this->displaySharedQueues([$status['error_queue']]);
2025-11-27 15:03:25 +08:00
}
2025-12-01 11:12:33 +08:00
// 显示汇总信息
2025-11-27 15:03:25 +08:00
$this->line('');
2025-12-01 11:12:33 +08:00
$this->info("=== Summary ===");
2026-03-13 14:50:06 +08:00
$this->line("Total messages: <fg=yellow>{$status['summary']['total_messages']}</>");
$this->line("Total active consumers: <fg=cyan>{$status['summary']['total_consumers']}</>");
2025-12-01 11:12:33 +08:00
$this->line('');
2026-03-13 14:50:06 +08:00
// 列出所有监控的队列名称
$all_queue_names = [];
foreach ($status['business_queues'] as $q) {
$all_queue_names[] = $q['queue'];
}
foreach ($status['retry_queues'] as $q) {
$all_queue_names[] = $q['queue'];
}
if (!empty($status['error_queue'])) {
$all_queue_names[] = $status['error_queue']['queue'];
}
2025-12-01 11:12:33 +08:00
$this->line('Queues monitored:');
2026-03-13 14:50:06 +08:00
foreach ($all_queue_names as $queue_name) {
$this->line(" - {$queue_name}");
2025-12-01 11:12:33 +08:00
}
2025-11-27 15:03:25 +08:00
return 0;
2026-03-13 14:50:06 +08:00
} catch (\Throwable $e) {
2025-11-27 15:03:25 +08:00
$this->error('Failed to fetch queue status: ' . $e->getMessage());
$this->line('Trace: ' . $e->getTraceAsString(), 'comment');
return 1;
}
}
/**
2026-03-13 14:50:06 +08:00
* 将 Service 返回的 status 枚举值转为 CLI ANSI 格式
2025-11-27 15:03:25 +08:00
*/
2026-03-13 14:50:06 +08:00
private function formatStatus(string $status): string
2025-11-27 15:03:25 +08:00
{
2026-03-13 14:50:06 +08:00
return match ($status) {
'high_load' => '<fg=red>⚠ High Load</>',
'processing' => '<fg=yellow>⚡ Processing</>',
'active' => '<fg=cyan>✓ Active</>',
'empty' => '<fg=green>✓ Empty</>',
'error' => '<fg=red>✗ Error</>',
default => $status,
};
2025-12-01 11:12:33 +08:00
}
/**
* 显示业务队列(合并所有队列组)- 转置显示
*/
2026-03-13 14:50:06 +08:00
private function displayBusinessQueues(array $all_groups_data, ?string $filter_queue): void
2025-12-01 11:12:33 +08:00
{
2026-03-13 14:50:06 +08:00
$title = $filter_queue
? "Business Queues (Filtered: {$filter_queue})"
2025-12-01 11:12:33 +08:00
: "Business Queues";
$this->line("<fg=blue>=== {$title} ===</>");
$headers = ['Metric'];
2026-03-13 14:50:06 +08:00
foreach ($all_groups_data as $queue_info) {
$simple_name = str_replace('.queue', '', $queue_info['queue']);
$headers[] = $simple_name;
2025-11-27 15:03:25 +08:00
}
2025-12-01 11:12:33 +08:00
$rows = [
2026-03-13 14:50:06 +08:00
$this->buildMetricRow('Messages', $all_groups_data, 'messages'),
$this->buildMetricRow('Consumers', $all_groups_data, 'consumers'),
$this->buildMetricRow('Status', $all_groups_data, 'status'),
2025-12-01 11:12:33 +08:00
];
$this->table($headers, $rows);
$this->line('');
}
/**
* 显示死信队列(重试队列)- 转置显示
*/
2026-03-13 14:50:06 +08:00
private function displayDeadLetterQueues(array $queues_data, ?string $filter_queue): void
2025-12-01 11:12:33 +08:00
{
2026-03-13 14:50:06 +08:00
$title = $filter_queue
? "Dead Letter Queues (Filtered: {$filter_queue})"
2025-12-01 11:12:33 +08:00
: "Dead Letter Queues (Retry Queues)";
$this->line("<fg=magenta>=== {$title} ===</>");
$headers = ['Metric'];
2026-03-13 14:50:06 +08:00
foreach ($queues_data as $queue_info) {
$simple_name = str_replace('.queue', '', $queue_info['queue']);
$headers[] = $simple_name;
2025-11-27 15:03:25 +08:00
}
2025-12-01 11:12:33 +08:00
$rows = [
2026-03-13 14:50:06 +08:00
$this->buildMetricRow('Messages', $queues_data, 'messages'),
$this->buildMetricRow('Consumers', $queues_data, 'consumers'),
$this->buildMetricRow('Status', $queues_data, 'status'),
2025-12-01 11:12:33 +08:00
];
$this->table($headers, $rows);
2026-03-13 14:50:06 +08:00
if (!empty($queues_data) && $this->hasMessages($queues_data)) {
2025-12-01 11:12:33 +08:00
$this->line('<fg=yellow> These queues receive messages from DLX when main queue processing fails</>');
}
2025-11-27 15:03:25 +08:00
}
/**
2025-12-01 11:12:33 +08:00
* 显示共享队列(errors.queue- 转置显示
2025-11-27 15:03:25 +08:00
*/
2026-03-13 14:50:06 +08:00
private function displaySharedQueues(array $queues_data): void
2025-11-27 15:03:25 +08:00
{
2025-12-01 11:12:33 +08:00
$this->line("<fg=blue>=== Shared Queues ===</>");
$headers = ['Metric'];
2026-03-13 14:50:06 +08:00
foreach ($queues_data as $queue_info) {
$simple_name = str_replace('.queue', '', $queue_info['queue']);
$headers[] = $simple_name;
2025-12-01 11:12:33 +08:00
}
$rows = [
2026-03-13 14:50:06 +08:00
$this->buildMetricRow('Messages', $queues_data, 'messages'),
$this->buildMetricRow('Consumers', $queues_data, 'consumers'),
$this->buildMetricRow('Status', $queues_data, 'status'),
2025-12-01 11:12:33 +08:00
];
2025-11-27 15:03:25 +08:00
2025-12-01 11:12:33 +08:00
$this->table($headers, $rows);
2026-03-13 14:50:06 +08:00
if (!empty($queues_data) && $this->hasMessages($queues_data)) {
2025-12-01 11:12:33 +08:00
$this->line('<fg=yellow> Error queue contains messages that exceeded max retry count</>');
}
}
/**
* 检查队列中是否有消息
*/
2026-03-13 14:50:06 +08:00
private function hasMessages(array $queues_data): bool
2025-12-01 11:12:33 +08:00
{
2026-03-13 14:50:06 +08:00
foreach ($queues_data as $queue_info) {
if (is_numeric($queue_info['messages']) && $queue_info['messages'] > 0) {
2025-12-01 11:12:33 +08:00
return true;
2025-11-27 15:03:25 +08:00
}
2025-12-01 11:12:33 +08:00
}
return false;
}
2025-11-27 15:03:25 +08:00
2025-12-01 11:12:33 +08:00
/**
* 构建指标行数据
*/
2026-03-13 14:50:06 +08:00
private function buildMetricRow(string $metric_name, array $queues_data, string $field): array
2025-12-01 11:12:33 +08:00
{
2026-03-13 14:50:06 +08:00
$row = [$metric_name];
2025-11-27 15:03:25 +08:00
2026-03-13 14:50:06 +08:00
foreach ($queues_data as $queue_info) {
$value = $queue_info[$field];
2025-11-27 15:03:25 +08:00
2026-03-13 14:50:06 +08:00
if ($field === 'status') {
$row[] = $this->formatStatus((string) $value);
} elseif ($field === 'messages' || $field === 'consumers') {
2025-12-01 11:12:33 +08:00
$row[] = $this->formatNumber($value);
} else {
$row[] = $value;
2025-11-27 15:03:25 +08:00
}
}
2025-12-01 11:12:33 +08:00
return $row;
}
/**
* 格式化数字显示
*/
2026-03-13 14:50:06 +08:00
private function formatNumber(mixed $value): string
2025-12-01 11:12:33 +08:00
{
if (!is_numeric($value)) {
return (string) $value;
}
$num = (int) $value;
if ($num > 1000) {
return '<fg=red>' . number_format($num) . '</>';
} elseif ($num > 100) {
return '<fg=yellow>' . number_format($num) . '</>';
} elseif ($num > 0) {
return '<fg=cyan>' . $num . '</>';
} else {
return '<fg=gray>' . $num . '</>';
}
2025-11-27 15:03:25 +08:00
}
}