2025-11-27 15:03:25 +08:00
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
|
|
namespace App\Command;
|
|
|
|
|
|
|
|
|
|
|
|
use Hyperf\Command\Command as HyperfCommand;
|
|
|
|
|
|
use Hyperf\Command\Annotation\Command;
|
|
|
|
|
|
use Psr\Container\ContainerInterface;
|
|
|
|
|
|
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
|
|
|
|
|
use Hyperf\Contract\ConfigInterface;
|
2025-12-01 11:12:33 +08:00
|
|
|
|
use Symfony\Component\Console\Input\InputOption;
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
|
|
|
|
|
#[Command]
|
|
|
|
|
|
class AppMqStatus extends HyperfCommand
|
|
|
|
|
|
{
|
2025-12-01 11:12:33 +08:00
|
|
|
|
/**
|
|
|
|
|
|
* 业务队列类型
|
|
|
|
|
|
*/
|
|
|
|
|
|
private const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory'];
|
|
|
|
|
|
|
2025-11-27 15:03:25 +08:00
|
|
|
|
public function __construct(protected ContainerInterface $container)
|
|
|
|
|
|
{
|
|
|
|
|
|
parent::__construct('app:mq:status');
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public function configure()
|
|
|
|
|
|
{
|
|
|
|
|
|
parent::configure();
|
|
|
|
|
|
$this->setDescription('Display message counts for all accessible queues');
|
2025-12-01 11:12:33 +08:00
|
|
|
|
$this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds');
|
|
|
|
|
|
$this->addOption('interval', 'i', InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3');
|
|
|
|
|
|
$this->addOption('queue', null, InputOption::VALUE_OPTIONAL, 'Filter by queue type (orders, products, refunds, inventory)');
|
2025-11-27 15:03:25 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public function handle()
|
|
|
|
|
|
{
|
|
|
|
|
|
$watchMode = $this->input->getOption('watch');
|
|
|
|
|
|
$interval = (int) $this->input->getOption('interval');
|
|
|
|
|
|
|
|
|
|
|
|
if ($watchMode) {
|
|
|
|
|
|
$this->info("Watch mode enabled. Refreshing every {$interval} seconds. Press Ctrl+C to exit.");
|
|
|
|
|
|
$this->line('');
|
|
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
|
// 清屏(对于支持 ANSI 的终端)
|
|
|
|
|
|
$this->output->write("\033[2J\033[H");
|
|
|
|
|
|
$this->displayQueueStatus();
|
|
|
|
|
|
sleep($interval);
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
return $this->displayQueueStatus();
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private function displayQueueStatus(): int
|
|
|
|
|
|
{
|
|
|
|
|
|
$this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info');
|
2025-12-01 11:12:33 +08:00
|
|
|
|
$this->line('');
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
$config = $this->container->get(ConfigInterface::class);
|
|
|
|
|
|
$consumerConfig = $config->get('amqp.default_consumer');
|
|
|
|
|
|
|
|
|
|
|
|
// 创建连接
|
|
|
|
|
|
$connection = new AMQPStreamConnection(
|
|
|
|
|
|
$consumerConfig['host'],
|
|
|
|
|
|
$consumerConfig['port'],
|
|
|
|
|
|
$consumerConfig['user'],
|
|
|
|
|
|
$consumerConfig['password'],
|
|
|
|
|
|
$consumerConfig['vhost'],
|
|
|
|
|
|
false,
|
|
|
|
|
|
'AMQPLAIN',
|
|
|
|
|
|
null,
|
|
|
|
|
|
'en_US',
|
|
|
|
|
|
$consumerConfig['params']['connection_timeout'] ?? 3.0,
|
|
|
|
|
|
$consumerConfig['params']['read_write_timeout'] ?? 3.0,
|
|
|
|
|
|
null,
|
|
|
|
|
|
$consumerConfig['params']['keepalive'] ?? false,
|
|
|
|
|
|
$consumerConfig['params']['heartbeat'] ?? 0
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
$channel = $connection->channel();
|
|
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
// 获取 --queue 参数
|
|
|
|
|
|
$filterQueue = $this->input->getOption('queue');
|
|
|
|
|
|
|
|
|
|
|
|
// 确定要显示的队列组
|
|
|
|
|
|
$queueTypes = $filterQueue
|
|
|
|
|
|
? [$filterQueue]
|
|
|
|
|
|
: self::QUEUE_TYPES;
|
|
|
|
|
|
|
|
|
|
|
|
// 验证队列类型
|
|
|
|
|
|
if ($filterQueue && !in_array($filterQueue, self::QUEUE_TYPES)) {
|
|
|
|
|
|
$this->error("Invalid queue type: {$filterQueue}");
|
|
|
|
|
|
$this->line("Valid types: " . implode(', ', self::QUEUE_TYPES));
|
|
|
|
|
|
$channel->close();
|
|
|
|
|
|
$connection->close();
|
|
|
|
|
|
return 1;
|
|
|
|
|
|
}
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
|
|
|
|
|
$totalMessages = 0;
|
2025-12-01 11:12:33 +08:00
|
|
|
|
$totalConsumers = 0;
|
|
|
|
|
|
$allQueueNames = [];
|
|
|
|
|
|
|
|
|
|
|
|
// 收集主业务队列和死信队列的数据
|
|
|
|
|
|
$businessQueuesData = [];
|
|
|
|
|
|
$deadLetterQueuesData = [];
|
|
|
|
|
|
|
|
|
|
|
|
foreach ($queueTypes as $type) {
|
|
|
|
|
|
$groupData = $this->fetchQueueGroupData($channel, $type);
|
|
|
|
|
|
|
|
|
|
|
|
foreach ($groupData as $queueInfo) {
|
|
|
|
|
|
// 区分主队列和重试队列(死信队列)
|
|
|
|
|
|
if (str_ends_with($queueInfo['queue'], '.retry.queue')) {
|
|
|
|
|
|
$deadLetterQueuesData[] = $queueInfo;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
$businessQueuesData[] = $queueInfo;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (is_numeric($queueInfo['messages'])) {
|
|
|
|
|
|
$totalMessages += $queueInfo['messages'];
|
|
|
|
|
|
}
|
|
|
|
|
|
if (is_numeric($queueInfo['consumers'])) {
|
|
|
|
|
|
$totalConsumers += $queueInfo['consumers'];
|
|
|
|
|
|
}
|
|
|
|
|
|
$allQueueNames[] = $queueInfo['queue'];
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 显示主业务队列表格
|
|
|
|
|
|
$this->displayBusinessQueues($businessQueuesData, $filterQueue);
|
|
|
|
|
|
|
|
|
|
|
|
// 显示死信队列(重试队列)
|
|
|
|
|
|
if (!empty($deadLetterQueuesData)) {
|
|
|
|
|
|
$this->line('');
|
|
|
|
|
|
$this->displayDeadLetterQueues($deadLetterQueuesData, $filterQueue);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 显示共享的错误队列
|
|
|
|
|
|
if (!$filterQueue) {
|
|
|
|
|
|
$this->line('');
|
|
|
|
|
|
$errorQueueData = $this->fetchQueueData($channel, 'errors.queue');
|
|
|
|
|
|
$this->displaySharedQueues([$errorQueueData]);
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
if (is_numeric($errorQueueData['messages'])) {
|
|
|
|
|
|
$totalMessages += $errorQueueData['messages'];
|
2025-11-27 15:03:25 +08:00
|
|
|
|
}
|
2025-12-01 11:12:33 +08:00
|
|
|
|
if (is_numeric($errorQueueData['consumers'])) {
|
|
|
|
|
|
$totalConsumers += $errorQueueData['consumers'];
|
|
|
|
|
|
}
|
|
|
|
|
|
$allQueueNames[] = $errorQueueData['queue'];
|
2025-11-27 15:03:25 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭连接
|
|
|
|
|
|
$channel->close();
|
|
|
|
|
|
$connection->close();
|
|
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
// 显示汇总信息
|
2025-11-27 15:03:25 +08:00
|
|
|
|
$this->line('');
|
2025-12-01 11:12:33 +08:00
|
|
|
|
$this->info("=== Summary ===");
|
|
|
|
|
|
$this->line("Total messages: <fg=yellow>{$totalMessages}</>");
|
|
|
|
|
|
$this->line("Total active consumers: <fg=cyan>{$totalConsumers}</>");
|
|
|
|
|
|
$this->line('');
|
|
|
|
|
|
$this->line('Queues monitored:');
|
|
|
|
|
|
foreach ($allQueueNames as $queueName) {
|
|
|
|
|
|
$this->line(" - {$queueName}");
|
|
|
|
|
|
}
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
} catch (\Exception $e) {
|
|
|
|
|
|
$this->error('Failed to fetch queue status: ' . $e->getMessage());
|
|
|
|
|
|
$this->line('Trace: ' . $e->getTraceAsString(), 'comment');
|
|
|
|
|
|
return 1;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
2025-12-01 11:12:33 +08:00
|
|
|
|
* 获取队列组数据(主队列 + 重试队列)
|
2025-11-27 15:03:25 +08:00
|
|
|
|
*/
|
2025-12-01 11:12:33 +08:00
|
|
|
|
private function fetchQueueGroupData($channel, string $type): array
|
2025-11-27 15:03:25 +08:00
|
|
|
|
{
|
2025-12-01 11:12:33 +08:00
|
|
|
|
$queues = [
|
|
|
|
|
|
"{$type}.queue", // 主业务队列
|
|
|
|
|
|
"{$type}.retry.queue", // 重试队列
|
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
|
|
$groupData = [];
|
|
|
|
|
|
foreach ($queues as $queueName) {
|
|
|
|
|
|
$groupData[] = $this->fetchQueueData($channel, $queueName);
|
|
|
|
|
|
}
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
return $groupData;
|
|
|
|
|
|
}
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
/**
|
|
|
|
|
|
* 获取单个队列的数据
|
|
|
|
|
|
*/
|
|
|
|
|
|
private function fetchQueueData($channel, string $queueName): array
|
|
|
|
|
|
{
|
|
|
|
|
|
try {
|
|
|
|
|
|
// 使用 passive=true 来获取队列信息而不创建队列
|
|
|
|
|
|
[$queue, $messageCount, $consumerCount] = $channel->queue_declare(
|
|
|
|
|
|
$queueName,
|
|
|
|
|
|
true, // passive
|
|
|
|
|
|
true, // durable
|
|
|
|
|
|
false, // exclusive
|
|
|
|
|
|
false // auto_delete
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
|
'queue' => $queueName,
|
|
|
|
|
|
'messages' => $messageCount,
|
|
|
|
|
|
'consumers' => $consumerCount,
|
|
|
|
|
|
'status' => $this->getQueueStatus($messageCount, $consumerCount),
|
|
|
|
|
|
];
|
|
|
|
|
|
} catch (\Exception $e) {
|
|
|
|
|
|
return [
|
|
|
|
|
|
'queue' => $queueName,
|
|
|
|
|
|
'messages' => 'N/A',
|
|
|
|
|
|
'consumers' => 'N/A',
|
|
|
|
|
|
'status' => '<fg=red>Error: ' . $e->getMessage() . '</>',
|
|
|
|
|
|
];
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 获取队列状态描述
|
|
|
|
|
|
*/
|
|
|
|
|
|
private function getQueueStatus(int $messageCount, int $consumerCount): string
|
|
|
|
|
|
{
|
|
|
|
|
|
if ($messageCount > 100) {
|
|
|
|
|
|
return '<fg=red>⚠ High Load</>';
|
|
|
|
|
|
} elseif ($messageCount > 10) {
|
|
|
|
|
|
return '<fg=yellow>⚡ Processing</>';
|
|
|
|
|
|
} elseif ($messageCount > 0) {
|
|
|
|
|
|
return '<fg=cyan>✓ Active</>';
|
|
|
|
|
|
} else {
|
|
|
|
|
|
return '<fg=green>✓ Empty</>';
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 显示业务队列(合并所有队列组)- 转置显示
|
|
|
|
|
|
*/
|
|
|
|
|
|
private function displayBusinessQueues(array $allGroupsData, ?string $filterQueue): void
|
|
|
|
|
|
{
|
|
|
|
|
|
// 构建标题
|
|
|
|
|
|
$title = $filterQueue
|
|
|
|
|
|
? "Business Queues (Filtered: {$filterQueue})"
|
|
|
|
|
|
: "Business Queues";
|
|
|
|
|
|
|
|
|
|
|
|
$this->line("<fg=blue>=== {$title} ===</>");
|
|
|
|
|
|
|
|
|
|
|
|
// 构建表头 - 使用简化的队列名称(去掉 .queue)
|
|
|
|
|
|
$headers = ['Metric'];
|
|
|
|
|
|
foreach ($allGroupsData as $queueInfo) {
|
|
|
|
|
|
$queueName = $queueInfo['queue'];
|
|
|
|
|
|
// 去掉 .queue 后缀
|
|
|
|
|
|
$simpleName = str_replace('.queue', '', $queueName);
|
|
|
|
|
|
$headers[] = $simpleName;
|
2025-11-27 15:03:25 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
// 构建行数据 - 转置显示
|
|
|
|
|
|
$rows = [
|
|
|
|
|
|
$this->buildMetricRow('Messages', $allGroupsData, 'messages'),
|
|
|
|
|
|
$this->buildMetricRow('Consumers', $allGroupsData, 'consumers'),
|
|
|
|
|
|
$this->buildMetricRow('Status', $allGroupsData, 'status'),
|
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
|
|
$this->table($headers, $rows);
|
|
|
|
|
|
$this->line('');
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 显示死信队列(重试队列)- 转置显示
|
|
|
|
|
|
*/
|
|
|
|
|
|
private function displayDeadLetterQueues(array $queuesData, ?string $filterQueue): void
|
|
|
|
|
|
{
|
|
|
|
|
|
$title = $filterQueue
|
|
|
|
|
|
? "Dead Letter Queues (Filtered: {$filterQueue})"
|
|
|
|
|
|
: "Dead Letter Queues (Retry Queues)";
|
|
|
|
|
|
|
|
|
|
|
|
$this->line("<fg=magenta>=== {$title} ===</>");
|
|
|
|
|
|
|
|
|
|
|
|
// 构建表头
|
|
|
|
|
|
$headers = ['Metric'];
|
|
|
|
|
|
foreach ($queuesData as $queueInfo) {
|
|
|
|
|
|
$queueName = $queueInfo['queue'];
|
|
|
|
|
|
// 去掉 .queue 后缀,保留 retry 标识
|
|
|
|
|
|
$simpleName = str_replace('.queue', '', $queueName);
|
|
|
|
|
|
$headers[] = $simpleName;
|
2025-11-27 15:03:25 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
// 构建行数据
|
|
|
|
|
|
$rows = [
|
|
|
|
|
|
$this->buildMetricRow('Messages', $queuesData, 'messages'),
|
|
|
|
|
|
$this->buildMetricRow('Consumers', $queuesData, 'consumers'),
|
|
|
|
|
|
$this->buildMetricRow('Status', $queuesData, 'status'),
|
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
|
|
$this->table($headers, $rows);
|
|
|
|
|
|
|
|
|
|
|
|
// 添加说明
|
|
|
|
|
|
if (!empty($queuesData) && $this->hasMessages($queuesData)) {
|
|
|
|
|
|
$this->line('<fg=yellow>ℹ These queues receive messages from DLX when main queue processing fails</>');
|
|
|
|
|
|
}
|
2025-11-27 15:03:25 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
2025-12-01 11:12:33 +08:00
|
|
|
|
* 显示共享队列(errors.queue)- 转置显示
|
2025-11-27 15:03:25 +08:00
|
|
|
|
*/
|
2025-12-01 11:12:33 +08:00
|
|
|
|
private function displaySharedQueues(array $queuesData): void
|
2025-11-27 15:03:25 +08:00
|
|
|
|
{
|
2025-12-01 11:12:33 +08:00
|
|
|
|
$this->line("<fg=blue>=== Shared Queues ===</>");
|
|
|
|
|
|
|
|
|
|
|
|
// 构建表头
|
|
|
|
|
|
$headers = ['Metric'];
|
|
|
|
|
|
foreach ($queuesData as $queueInfo) {
|
|
|
|
|
|
$queueName = $queueInfo['queue'];
|
|
|
|
|
|
// 去掉 .queue 后缀
|
|
|
|
|
|
$simpleName = str_replace('.queue', '', $queueName);
|
|
|
|
|
|
$headers[] = $simpleName;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 构建行数据
|
|
|
|
|
|
$rows = [
|
|
|
|
|
|
$this->buildMetricRow('Messages', $queuesData, 'messages'),
|
|
|
|
|
|
$this->buildMetricRow('Consumers', $queuesData, 'consumers'),
|
|
|
|
|
|
$this->buildMetricRow('Status', $queuesData, 'status'),
|
|
|
|
|
|
];
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
$this->table($headers, $rows);
|
|
|
|
|
|
|
|
|
|
|
|
// 添加说明
|
|
|
|
|
|
if (!empty($queuesData) && $this->hasMessages($queuesData)) {
|
|
|
|
|
|
$this->line('<fg=yellow>ℹ Error queue contains messages that exceeded max retry count</>');
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 检查队列中是否有消息
|
|
|
|
|
|
*/
|
|
|
|
|
|
private function hasMessages(array $queuesData): bool
|
|
|
|
|
|
{
|
|
|
|
|
|
foreach ($queuesData as $queueInfo) {
|
|
|
|
|
|
if (is_numeric($queueInfo['messages']) && $queueInfo['messages'] > 0) {
|
|
|
|
|
|
return true;
|
2025-11-27 15:03:25 +08:00
|
|
|
|
}
|
2025-12-01 11:12:33 +08:00
|
|
|
|
}
|
|
|
|
|
|
return false;
|
|
|
|
|
|
}
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
/**
|
|
|
|
|
|
* 构建指标行数据
|
|
|
|
|
|
*/
|
|
|
|
|
|
private function buildMetricRow(string $metricName, array $queuesData, string $field): array
|
|
|
|
|
|
{
|
|
|
|
|
|
$row = [$metricName];
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
foreach ($queuesData as $queueInfo) {
|
|
|
|
|
|
$value = $queueInfo[$field];
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-01 11:12:33 +08:00
|
|
|
|
// 根据字段类型格式化显示
|
|
|
|
|
|
if ($field === 'messages' || $field === 'consumers') {
|
|
|
|
|
|
$row[] = $this->formatNumber($value);
|
|
|
|
|
|
} else {
|
|
|
|
|
|
$row[] = $value;
|
2025-11-27 15:03:25 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-12-01 11:12:33 +08:00
|
|
|
|
|
|
|
|
|
|
return $row;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 格式化数字显示
|
|
|
|
|
|
*/
|
|
|
|
|
|
private function formatNumber($value): string
|
|
|
|
|
|
{
|
|
|
|
|
|
if (!is_numeric($value)) {
|
|
|
|
|
|
return (string) $value;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
$num = (int) $value;
|
|
|
|
|
|
|
|
|
|
|
|
if ($num > 1000) {
|
|
|
|
|
|
return '<fg=red>' . number_format($num) . '</>';
|
|
|
|
|
|
} elseif ($num > 100) {
|
|
|
|
|
|
return '<fg=yellow>' . number_format($num) . '</>';
|
|
|
|
|
|
} elseif ($num > 0) {
|
|
|
|
|
|
return '<fg=cyan>' . $num . '</>';
|
|
|
|
|
|
} else {
|
|
|
|
|
|
return '<fg=gray>' . $num . '</>';
|
|
|
|
|
|
}
|
2025-11-27 15:03:25 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|