setDescription('Display message counts for all accessible queues'); $this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds'); $this->addOption('interval', 'i', InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3'); $this->addOption('queue', null, InputOption::VALUE_OPTIONAL, 'Filter by queue type (orders, products, refunds, inventory)'); } public function handle() { $watchMode = $this->input->getOption('watch'); $interval = (int) $this->input->getOption('interval'); if ($watchMode) { $this->info("Watch mode enabled. Refreshing every {$interval} seconds. Press Ctrl+C to exit."); $this->line(''); while (true) { // 清屏(对于支持 ANSI 的终端) $this->output->write("\033[2J\033[H"); $this->displayQueueStatus(); sleep($interval); } } else { return $this->displayQueueStatus(); } } private function displayQueueStatus(): int { $this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info'); $this->line(''); try { $config = $this->container->get(ConfigInterface::class); $consumerConfig = $config->get('amqp.default_consumer'); // 创建连接 $connection = new AMQPStreamConnection( $consumerConfig['host'], $consumerConfig['port'], $consumerConfig['user'], $consumerConfig['password'], $consumerConfig['vhost'], false, 'AMQPLAIN', null, 'en_US', $consumerConfig['params']['connection_timeout'] ?? 3.0, $consumerConfig['params']['read_write_timeout'] ?? 3.0, null, $consumerConfig['params']['keepalive'] ?? false, $consumerConfig['params']['heartbeat'] ?? 0 ); $channel = $connection->channel(); // 获取 --queue 参数 $filterQueue = $this->input->getOption('queue'); // 确定要显示的队列组 $queueTypes = $filterQueue ? [$filterQueue] : self::QUEUE_TYPES; // 验证队列类型 if ($filterQueue && !in_array($filterQueue, self::QUEUE_TYPES)) { $this->error("Invalid queue type: {$filterQueue}"); $this->line("Valid types: " . implode(', ', self::QUEUE_TYPES)); $channel->close(); $connection->close(); return 1; } $totalMessages = 0; $totalConsumers = 0; $allQueueNames = []; // 收集主业务队列和死信队列的数据 $businessQueuesData = []; $deadLetterQueuesData = []; foreach ($queueTypes as $type) { $groupData = $this->fetchQueueGroupData($channel, $type); foreach ($groupData as $queueInfo) { // 区分主队列和重试队列(死信队列) if (str_ends_with($queueInfo['queue'], '.retry.queue')) { $deadLetterQueuesData[] = $queueInfo; } else { $businessQueuesData[] = $queueInfo; } if (is_numeric($queueInfo['messages'])) { $totalMessages += $queueInfo['messages']; } if (is_numeric($queueInfo['consumers'])) { $totalConsumers += $queueInfo['consumers']; } $allQueueNames[] = $queueInfo['queue']; } } // 显示主业务队列表格 $this->displayBusinessQueues($businessQueuesData, $filterQueue); // 显示死信队列(重试队列) if (!empty($deadLetterQueuesData)) { $this->line(''); $this->displayDeadLetterQueues($deadLetterQueuesData, $filterQueue); } // 显示共享的错误队列 if (!$filterQueue) { $this->line(''); $errorQueueData = $this->fetchQueueData($channel, 'errors.queue'); $this->displaySharedQueues([$errorQueueData]); if (is_numeric($errorQueueData['messages'])) { $totalMessages += $errorQueueData['messages']; } if (is_numeric($errorQueueData['consumers'])) { $totalConsumers += $errorQueueData['consumers']; } $allQueueNames[] = $errorQueueData['queue']; } // 关闭连接 $channel->close(); $connection->close(); // 显示汇总信息 $this->line(''); $this->info("=== Summary ==="); $this->line("Total messages: {$totalMessages}"); $this->line("Total active consumers: {$totalConsumers}"); $this->line(''); $this->line('Queues monitored:'); foreach ($allQueueNames as $queueName) { $this->line(" - {$queueName}"); } return 0; } catch (\Exception $e) { $this->error('Failed to fetch queue status: ' . $e->getMessage()); $this->line('Trace: ' . $e->getTraceAsString(), 'comment'); return 1; } } /** * 获取队列组数据(主队列 + 重试队列) */ private function fetchQueueGroupData($channel, string $type): array { $queues = [ "{$type}.queue", // 主业务队列 "{$type}.retry.queue", // 重试队列 ]; $groupData = []; foreach ($queues as $queueName) { $groupData[] = $this->fetchQueueData($channel, $queueName); } return $groupData; } /** * 获取单个队列的数据 */ private function fetchQueueData($channel, string $queueName): array { try { // 使用 passive=true 来获取队列信息而不创建队列 [$queue, $messageCount, $consumerCount] = $channel->queue_declare( $queueName, true, // passive true, // durable false, // exclusive false // auto_delete ); return [ 'queue' => $queueName, 'messages' => $messageCount, 'consumers' => $consumerCount, 'status' => $this->getQueueStatus($messageCount, $consumerCount), ]; } catch (\Exception $e) { return [ 'queue' => $queueName, 'messages' => 'N/A', 'consumers' => 'N/A', 'status' => 'Error: ' . $e->getMessage() . '', ]; } } /** * 获取队列状态描述 */ private function getQueueStatus(int $messageCount, int $consumerCount): string { if ($messageCount > 100) { return '⚠ High Load'; } elseif ($messageCount > 10) { return '⚡ Processing'; } elseif ($messageCount > 0) { return '✓ Active'; } else { return '✓ Empty'; } } /** * 显示业务队列(合并所有队列组)- 转置显示 */ private function displayBusinessQueues(array $allGroupsData, ?string $filterQueue): void { // 构建标题 $title = $filterQueue ? "Business Queues (Filtered: {$filterQueue})" : "Business Queues"; $this->line("=== {$title} ==="); // 构建表头 - 使用简化的队列名称(去掉 .queue) $headers = ['Metric']; foreach ($allGroupsData as $queueInfo) { $queueName = $queueInfo['queue']; // 去掉 .queue 后缀 $simpleName = str_replace('.queue', '', $queueName); $headers[] = $simpleName; } // 构建行数据 - 转置显示 $rows = [ $this->buildMetricRow('Messages', $allGroupsData, 'messages'), $this->buildMetricRow('Consumers', $allGroupsData, 'consumers'), $this->buildMetricRow('Status', $allGroupsData, 'status'), ]; $this->table($headers, $rows); $this->line(''); } /** * 显示死信队列(重试队列)- 转置显示 */ private function displayDeadLetterQueues(array $queuesData, ?string $filterQueue): void { $title = $filterQueue ? "Dead Letter Queues (Filtered: {$filterQueue})" : "Dead Letter Queues (Retry Queues)"; $this->line("=== {$title} ==="); // 构建表头 $headers = ['Metric']; foreach ($queuesData as $queueInfo) { $queueName = $queueInfo['queue']; // 去掉 .queue 后缀,保留 retry 标识 $simpleName = str_replace('.queue', '', $queueName); $headers[] = $simpleName; } // 构建行数据 $rows = [ $this->buildMetricRow('Messages', $queuesData, 'messages'), $this->buildMetricRow('Consumers', $queuesData, 'consumers'), $this->buildMetricRow('Status', $queuesData, 'status'), ]; $this->table($headers, $rows); // 添加说明 if (!empty($queuesData) && $this->hasMessages($queuesData)) { $this->line('ℹ These queues receive messages from DLX when main queue processing fails'); } } /** * 显示共享队列(errors.queue)- 转置显示 */ private function displaySharedQueues(array $queuesData): void { $this->line("=== Shared Queues ==="); // 构建表头 $headers = ['Metric']; foreach ($queuesData as $queueInfo) { $queueName = $queueInfo['queue']; // 去掉 .queue 后缀 $simpleName = str_replace('.queue', '', $queueName); $headers[] = $simpleName; } // 构建行数据 $rows = [ $this->buildMetricRow('Messages', $queuesData, 'messages'), $this->buildMetricRow('Consumers', $queuesData, 'consumers'), $this->buildMetricRow('Status', $queuesData, 'status'), ]; $this->table($headers, $rows); // 添加说明 if (!empty($queuesData) && $this->hasMessages($queuesData)) { $this->line('ℹ Error queue contains messages that exceeded max retry count'); } } /** * 检查队列中是否有消息 */ private function hasMessages(array $queuesData): bool { foreach ($queuesData as $queueInfo) { if (is_numeric($queueInfo['messages']) && $queueInfo['messages'] > 0) { return true; } } return false; } /** * 构建指标行数据 */ private function buildMetricRow(string $metricName, array $queuesData, string $field): array { $row = [$metricName]; foreach ($queuesData as $queueInfo) { $value = $queueInfo[$field]; // 根据字段类型格式化显示 if ($field === 'messages' || $field === 'consumers') { $row[] = $this->formatNumber($value); } else { $row[] = $value; } } return $row; } /** * 格式化数字显示 */ private function formatNumber($value): string { if (!is_numeric($value)) { return (string) $value; } $num = (int) $value; if ($num > 1000) { return '' . number_format($num) . ''; } elseif ($num > 100) { return '' . number_format($num) . ''; } elseif ($num > 0) { return '' . $num . ''; } else { return '' . $num . ''; } } }