setDescription('Display message counts for all accessible queues'); $this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds'); $this->addOption('interval', 'i', InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3'); $this->addOption('queue', null, InputOption::VALUE_OPTIONAL, 'Filter by queue type (orders, products, refunds, inventory)'); } public function handle() { $watchMode = $this->input->getOption('watch'); $interval = (int) $this->input->getOption('interval'); if ($watchMode) { $this->info("Watch mode enabled. Refreshing every {$interval} seconds. Press Ctrl+C to exit."); $this->line(''); while (true) { // 清屏(对于支持 ANSI 的终端) $this->output->write("\033[2J\033[H"); $this->displayQueueStatus(); sleep($interval); } } else { return $this->displayQueueStatus(); } } private function displayQueueStatus(): int { $this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info'); $this->line(''); try { $service = $this->container->get(MqStatusService::class); // 获取 --queue 参数 $filter_queue = $this->input->getOption('queue'); // 验证队列类型 if ($filter_queue && !in_array($filter_queue, $service->getValidQueueTypes())) { $this->error("Invalid queue type: {$filter_queue}"); $this->line("Valid types: " . implode(', ', $service->getValidQueueTypes())); return 1; } // 通过 Service 获取队列状态数据 $status = $service->getStatus($filter_queue); // 显示主业务队列表格 $this->displayBusinessQueues($status['business_queues'], $filter_queue); // 显示死信队列(重试队列) if (!empty($status['retry_queues'])) { $this->line(''); $this->displayDeadLetterQueues($status['retry_queues'], $filter_queue); } // 显示共享的错误队列 if (!empty($status['error_queue'])) { $this->line(''); $this->displaySharedQueues([$status['error_queue']]); } // 显示汇总信息 $this->line(''); $this->info("=== Summary ==="); $this->line("Total messages: {$status['summary']['total_messages']}"); $this->line("Total active consumers: {$status['summary']['total_consumers']}"); $this->line(''); // 列出所有监控的队列名称 $all_queue_names = []; foreach ($status['business_queues'] as $q) { $all_queue_names[] = $q['queue']; } foreach ($status['retry_queues'] as $q) { $all_queue_names[] = $q['queue']; } if (!empty($status['error_queue'])) { $all_queue_names[] = $status['error_queue']['queue']; } $this->line('Queues monitored:'); foreach ($all_queue_names as $queue_name) { $this->line(" - {$queue_name}"); } return 0; } catch (\Throwable $e) { $this->error('Failed to fetch queue status: ' . $e->getMessage()); $this->line('Trace: ' . $e->getTraceAsString(), 'comment'); return 1; } } /** * 将 Service 返回的 status 枚举值转为 CLI ANSI 格式 */ private function formatStatus(string $status): string { return match ($status) { 'high_load' => '⚠ High Load', 'processing' => '⚡ Processing', 'active' => '✓ Active', 'empty' => '✓ Empty', 'error' => '✗ Error', default => $status, }; } /** * 显示业务队列(合并所有队列组)- 转置显示 */ private function displayBusinessQueues(array $all_groups_data, ?string $filter_queue): void { $title = $filter_queue ? "Business Queues (Filtered: {$filter_queue})" : "Business Queues"; $this->line("=== {$title} ==="); $headers = ['Metric']; foreach ($all_groups_data as $queue_info) { $simple_name = str_replace('.queue', '', $queue_info['queue']); $headers[] = $simple_name; } $rows = [ $this->buildMetricRow('Messages', $all_groups_data, 'messages'), $this->buildMetricRow('Consumers', $all_groups_data, 'consumers'), $this->buildMetricRow('Status', $all_groups_data, 'status'), ]; $this->table($headers, $rows); $this->line(''); } /** * 显示死信队列(重试队列)- 转置显示 */ private function displayDeadLetterQueues(array $queues_data, ?string $filter_queue): void { $title = $filter_queue ? "Dead Letter Queues (Filtered: {$filter_queue})" : "Dead Letter Queues (Retry Queues)"; $this->line("=== {$title} ==="); $headers = ['Metric']; foreach ($queues_data as $queue_info) { $simple_name = str_replace('.queue', '', $queue_info['queue']); $headers[] = $simple_name; } $rows = [ $this->buildMetricRow('Messages', $queues_data, 'messages'), $this->buildMetricRow('Consumers', $queues_data, 'consumers'), $this->buildMetricRow('Status', $queues_data, 'status'), ]; $this->table($headers, $rows); if (!empty($queues_data) && $this->hasMessages($queues_data)) { $this->line('ℹ These queues receive messages from DLX when main queue processing fails'); } } /** * 显示共享队列(errors.queue)- 转置显示 */ private function displaySharedQueues(array $queues_data): void { $this->line("=== Shared Queues ==="); $headers = ['Metric']; foreach ($queues_data as $queue_info) { $simple_name = str_replace('.queue', '', $queue_info['queue']); $headers[] = $simple_name; } $rows = [ $this->buildMetricRow('Messages', $queues_data, 'messages'), $this->buildMetricRow('Consumers', $queues_data, 'consumers'), $this->buildMetricRow('Status', $queues_data, 'status'), ]; $this->table($headers, $rows); if (!empty($queues_data) && $this->hasMessages($queues_data)) { $this->line('ℹ Error queue contains messages that exceeded max retry count'); } } /** * 检查队列中是否有消息 */ private function hasMessages(array $queues_data): bool { foreach ($queues_data as $queue_info) { if (is_numeric($queue_info['messages']) && $queue_info['messages'] > 0) { return true; } } return false; } /** * 构建指标行数据 */ private function buildMetricRow(string $metric_name, array $queues_data, string $field): array { $row = [$metric_name]; foreach ($queues_data as $queue_info) { $value = $queue_info[$field]; if ($field === 'status') { $row[] = $this->formatStatus((string) $value); } elseif ($field === 'messages' || $field === 'consumers') { $row[] = $this->formatNumber($value); } else { $row[] = $value; } } return $row; } /** * 格式化数字显示 */ private function formatNumber(mixed $value): string { if (!is_numeric($value)) { return (string) $value; } $num = (int) $value; if ($num > 1000) { return '' . number_format($num) . ''; } elseif ($num > 100) { return '' . number_format($num) . ''; } elseif ($num > 0) { return '' . $num . ''; } else { return '' . $num . ''; } } }