*/ public function getValidQueueTypes(): array { return self::QUEUE_TYPES; } /** * 获取队列状态 * * @param string|null $queue_type 筛选队列类型(orders/products/refunds/inventory) * @return array{business_queues: array, retry_queues: array, error_queue: array, summary: array, fetched_at: string} */ public function getStatus(?string $queue_type = null): array { $consumer_config = $this->config->get('amqp.default_consumer'); $connection = new AMQPStreamConnection( $consumer_config['host'], $consumer_config['port'], $consumer_config['user'], $consumer_config['password'], $consumer_config['vhost'], false, 'AMQPLAIN', null, 'en_US', $consumer_config['params']['connection_timeout'] ?? 3.0, $consumer_config['params']['read_write_timeout'] ?? 3.0, null, $consumer_config['params']['keepalive'] ?? false, $consumer_config['params']['heartbeat'] ?? 0 ); $channel = $connection->channel(); try { $queue_types = $queue_type ? [$queue_type] : self::QUEUE_TYPES; $business_queues = []; $retry_queues = []; $total_messages = 0; $total_consumers = 0; foreach ($queue_types as $type) { $group_data = $this->fetchQueueGroupData($channel, $type); foreach ($group_data as $queue_info) { if (str_ends_with($queue_info['queue'], '.retry.queue')) { $retry_queues[] = $queue_info; } else { $business_queues[] = $queue_info; } if (is_numeric($queue_info['messages'])) { $total_messages += $queue_info['messages']; } if (is_numeric($queue_info['consumers'])) { $total_consumers += $queue_info['consumers']; } } } // 错误队列仅在未筛选时返回 $error_queue = []; if ($queue_type === null) { $error_queue = $this->fetchQueueData($channel, 'errors.queue'); if (is_numeric($error_queue['messages'])) { $total_messages += $error_queue['messages']; } if (is_numeric($error_queue['consumers'])) { $total_consumers += $error_queue['consumers']; } } return [ 'business_queues' => $business_queues, 'retry_queues' => $retry_queues, 'error_queue' => $error_queue, 'summary' => [ 'total_messages' => $total_messages, 'total_consumers' => $total_consumers, ], 'fetched_at' => date('Y-m-d H:i:s'), ]; } finally { $channel->close(); $connection->close(); } } /** * 获取队列组数据(主队列 + 重试队列) * * @return array */ private function fetchQueueGroupData(mixed $channel, string $type): array { $queues = [ "{$type}.queue", "{$type}.retry.queue", ]; $group_data = []; foreach ($queues as $queue_name) { $group_data[] = $this->fetchQueueData($channel, $queue_name); } return $group_data; } /** * 获取单个队列的数据 * * @return array{queue: string, messages: int|string, consumers: int|string, status: string} */ private function fetchQueueData(mixed $channel, string $queue_name): array { try { // 使用 passive=true 获取队列信息而不创建队列 [, $message_count, $consumer_count] = $channel->queue_declare( $queue_name, true, // passive true, // durable false, // exclusive false // auto_delete ); return [ 'queue' => $queue_name, 'messages' => $message_count, 'consumers' => $consumer_count, 'status' => $this->getQueueStatus($message_count), ]; } catch (\Throwable $e) { return [ 'queue' => $queue_name, 'messages' => 'N/A', 'consumers' => 'N/A', 'status' => 'error', ]; } } /** * 根据消息数量判断队列状态 * * @return string 状态枚举:high_load/processing/active/empty */ public function getQueueStatus(int $message_count): string { if ($message_count > 100) { return 'high_load'; } if ($message_count > 10) { return 'processing'; } if ($message_count > 0) { return 'active'; } return 'empty'; } }