*/ public function getValidQueueTypes(): array { return self::QUEUE_TYPES; } /** * 获取队列状态 * * @param string|null $queue_type 筛选队列类型(orders/products/refunds/inventory) * @return array{business_queues: array, retry_queues: array, error_queue: array, summary: array, fetched_at: string} */ public function getStatus(?string $queue_type = null): array { $queue_types = $queue_type ? [$queue_type] : self::QUEUE_TYPES; $business_queues = []; $retry_queues = []; $total_messages = 0; $total_consumers = 0; foreach ($queue_types as $type) { $group_data = $this->fetchQueueGroupData($type); foreach ($group_data as $queue_info) { if (str_ends_with($queue_info['queue'], '.retry.queue')) { $retry_queues[] = $queue_info; } else { $business_queues[] = $queue_info; } if (is_numeric($queue_info['messages'])) { $total_messages += $queue_info['messages']; } if (is_numeric($queue_info['consumers'])) { $total_consumers += $queue_info['consumers']; } } } // 错误队列仅在未筛选时返回 $error_queue = []; if ($queue_type === null) { $error_queue = $this->fetchQueueData('errors.queue'); if (is_numeric($error_queue['messages'])) { $total_messages += $error_queue['messages']; } if (is_numeric($error_queue['consumers'])) { $total_consumers += $error_queue['consumers']; } } return [ 'business_queues' => $business_queues, 'retry_queues' => $retry_queues, 'error_queue' => $error_queue, 'summary' => [ 'total_messages' => $total_messages, 'total_consumers' => $total_consumers, ], 'fetched_at' => date('Y-m-d H:i:s'), ]; } /** * 获取队列组数据(主队列 + 重试队列) * * @return array */ private function fetchQueueGroupData(string $type): array { $queues = [ "{$type}.queue", "{$type}.retry.queue", ]; $group_data = []; foreach ($queues as $queue_name) { $group_data[] = $this->fetchQueueData($queue_name); } return $group_data; } /** * 通过 Management API 获取单个队列的数据 * * @return array{queue: string, messages: int|string, consumers: int|string, status: string} */ private function fetchQueueData(string $queue_name): array { try { $vhost = urlencode($this->config->get('amqp.default_consumer.vhost', '/')); $response = $this->getHttpClient()->get("/api/queues/{$vhost}/{$queue_name}"); $data = json_decode($response->getBody()->getContents(), true); return [ 'queue' => $queue_name, 'messages' => $data['messages'] ?? 0, 'consumers' => $data['consumers'] ?? 0, 'status' => $this->getQueueStatus($data['messages'] ?? 0), ]; } catch (\Throwable) { return [ 'queue' => $queue_name, 'messages' => 'N/A', 'consumers' => 'N/A', 'status' => 'error', ]; } } /** * 根据消息数量判断队列状态 * * @return string 状态枚举:high_load/processing/active/empty */ public function getQueueStatus(int $message_count): string { if ($message_count > 100) { return 'high_load'; } if ($message_count > 10) { return 'processing'; } if ($message_count > 0) { return 'active'; } return 'empty'; } /** * 获取 Management API HTTP 客户端(懒初始化,协程安全) */ private function getHttpClient(): Client { if ($this->httpClient === null) { $management = $this->config->get('amqp.management', []); $this->httpClient = new Client([ 'base_uri' => sprintf( 'http://%s:%d', $management['host'] ?? 'localhost', $management['port'] ?? 15672, ), 'auth' => [ $management['user'] ?? 'guest', $management['password'] ?? 'guest', ], 'handler' => HandlerStack::create(new CoroutineHandler()), 'timeout' => 5, 'connect_timeout' => 3, ]); } return $this->httpClient; } }