Files
datahub/backend/app/Command/AppMqStatus.php
T

190 lines
6.2 KiB
PHP
Raw Normal View History

2025-11-27 15:03:25 +08:00
<?php
declare(strict_types=1);
namespace App\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Psr\Container\ContainerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Hyperf\Contract\ConfigInterface;
#[Command]
class AppMqStatus extends HyperfCommand
{
public function __construct(protected ContainerInterface $container)
{
parent::__construct('app:mq:status');
}
public function configure()
{
parent::configure();
$this->setDescription('Display message counts for all accessible queues');
$this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds (default: 3)');
$this->addOption('interval', 'i', \Symfony\Component\Console\Input\InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3');
}
public function handle()
{
$watchMode = $this->input->getOption('watch');
$interval = (int) $this->input->getOption('interval');
if ($watchMode) {
$this->info("Watch mode enabled. Refreshing every {$interval} seconds. Press Ctrl+C to exit.");
$this->line('');
while (true) {
// 清屏(对于支持 ANSI 的终端)
$this->output->write("\033[2J\033[H");
$this->displayQueueStatus();
sleep($interval);
}
} else {
return $this->displayQueueStatus();
}
}
private function displayQueueStatus(): int
{
$this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info');
try {
$config = $this->container->get(ConfigInterface::class);
$consumerConfig = $config->get('amqp.default_consumer');
// 创建连接
$connection = new AMQPStreamConnection(
$consumerConfig['host'],
$consumerConfig['port'],
$consumerConfig['user'],
$consumerConfig['password'],
$consumerConfig['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
$consumerConfig['params']['connection_timeout'] ?? 3.0,
$consumerConfig['params']['read_write_timeout'] ?? 3.0,
null,
$consumerConfig['params']['keepalive'] ?? false,
$consumerConfig['params']['heartbeat'] ?? 0
);
$channel = $connection->channel();
// 获取所有队列信息
$queues = $this->getQueuesFromAnnotations();
$tableData = [];
$totalMessages = 0;
foreach ($queues as $queueName) {
try {
// 使用 passive=true 来获取队列信息而不创建队列
// queue_declare 返回 [queue_name, message_count, consumer_count]
[$queue, $messageCount, $consumerCount] = $channel->queue_declare(
$queueName,
true, // passive
true, // durable
false, // exclusive
false // auto_delete
);
$tableData[] = [
'queue' => $queueName,
'messages' => $messageCount,
'consumers' => $consumerCount,
'status' => $messageCount > 0 ? '<fg=yellow>Has Messages</>' : '<fg=green>Empty</>',
];
$totalMessages += $messageCount;
} catch (\Exception $e) {
$tableData[] = [
'queue' => $queueName,
'messages' => 'N/A',
'consumers' => 'N/A',
'status' => '<fg=red>Error: ' . $e->getMessage() . '</>',
];
}
}
// 关闭连接
$channel->close();
$connection->close();
// 显示表格
if (empty($tableData)) {
$this->warn('No queues found.');
return 0;
}
$this->table(
['Queue Name', 'Messages', 'Consumers', 'Status'],
array_map(function($row) {
return [$row['queue'], $row['messages'], $row['consumers'], $row['status']];
}, $tableData)
);
$this->line('');
$this->info("Total messages across all queues: {$totalMessages}");
return 0;
} catch (\Exception $e) {
$this->error('Failed to fetch queue status: ' . $e->getMessage());
$this->line('Trace: ' . $e->getTraceAsString(), 'comment');
return 1;
}
}
/**
* 扫描所有使用 Consumer 注解的类,获取队列名称
*/
private function getQueuesFromAnnotations(): array
{
$queues = [];
// 扫描 Platform 目录下的所有 Consumer
$platformPath = BASE_PATH . '/app/Platform';
if (is_dir($platformPath)) {
$this->scanDirectory($platformPath, $queues);
}
// 如果没有找到任何队列,返回一些默认的队列名称
if (empty($queues)) {
$queues = ['orders.queue'];
}
return array_unique($queues);
}
/**
* 递归扫描目录,查找包含 Consumer 注解的类
*/
private function scanDirectory(string $path, array &$queues): void
{
$files = scandir($path);
foreach ($files as $file) {
if ($file === '.' || $file === '..') {
continue;
}
$filePath = $path . '/' . $file;
if (is_dir($filePath)) {
$this->scanDirectory($filePath, $queues);
} elseif (is_file($filePath) && pathinfo($filePath, PATHINFO_EXTENSION) === 'php') {
$content = file_get_contents($filePath);
// 查找 Consumer 注解中的 queue 参数
if (preg_match('/#\[Consumer\([^)]*queue:\s*["\']([^"\']+)["\']/', $content, $matches)) {
$queues[] = $matches[1];
}
}
}
}
}