diff --git a/backend/app/Command/AppMessageQueuePushTmall.php b/backend/app/Command/AppMessageQueuePushTmall.php index cfc4910..3d23a1f 100644 --- a/backend/app/Command/AppMessageQueuePushTmall.php +++ b/backend/app/Command/AppMessageQueuePushTmall.php @@ -33,7 +33,7 @@ class AppMessageQueuePushTmall extends HyperfCommand // 从 raw 数据库连接获取数据 $orders = Db::connection('raw') ->table('wpic_taobao_order')->orderBy('id', 'desc') - ->limit(10)->get('order_raw')->lazy(); + ->limit(4)->get('order_raw')->lazy(); // dump($orders->first()); // return; diff --git a/backend/app/Command/AppMqStatus.php b/backend/app/Command/AppMqStatus.php index 4aa3010..8e6a27d 100644 --- a/backend/app/Command/AppMqStatus.php +++ b/backend/app/Command/AppMqStatus.php @@ -9,10 +9,16 @@ use Hyperf\Command\Annotation\Command; use Psr\Container\ContainerInterface; use PhpAmqpLib\Connection\AMQPStreamConnection; use Hyperf\Contract\ConfigInterface; +use Symfony\Component\Console\Input\InputOption; #[Command] class AppMqStatus extends HyperfCommand { + /** + * 业务队列类型 + */ + private const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory']; + public function __construct(protected ContainerInterface $container) { parent::__construct('app:mq:status'); @@ -22,8 +28,9 @@ class AppMqStatus extends HyperfCommand { parent::configure(); $this->setDescription('Display message counts for all accessible queues'); - $this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds (default: 3)'); - $this->addOption('interval', 'i', \Symfony\Component\Console\Input\InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3'); + $this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds'); + $this->addOption('interval', 'i', InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3'); + $this->addOption('queue', null, InputOption::VALUE_OPTIONAL, 'Filter by queue type (orders, products, refunds, inventory)'); } public function handle() @@ -49,6 +56,7 @@ class AppMqStatus extends HyperfCommand private function displayQueueStatus(): int { $this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info'); + $this->line(''); try { $config = $this->container->get(ConfigInterface::class); @@ -74,61 +82,90 @@ class AppMqStatus extends HyperfCommand $channel = $connection->channel(); - // 获取所有队列信息 - $queues = $this->getQueuesFromAnnotations(); + // 获取 --queue 参数 + $filterQueue = $this->input->getOption('queue'); + + // 确定要显示的队列组 + $queueTypes = $filterQueue + ? [$filterQueue] + : self::QUEUE_TYPES; + + // 验证队列类型 + if ($filterQueue && !in_array($filterQueue, self::QUEUE_TYPES)) { + $this->error("Invalid queue type: {$filterQueue}"); + $this->line("Valid types: " . implode(', ', self::QUEUE_TYPES)); + $channel->close(); + $connection->close(); + return 1; + } - $tableData = []; $totalMessages = 0; + $totalConsumers = 0; + $allQueueNames = []; - foreach ($queues as $queueName) { - try { - // 使用 passive=true 来获取队列信息而不创建队列 - // queue_declare 返回 [queue_name, message_count, consumer_count] - [$queue, $messageCount, $consumerCount] = $channel->queue_declare( - $queueName, - true, // passive - true, // durable - false, // exclusive - false // auto_delete - ); + // 收集主业务队列和死信队列的数据 + $businessQueuesData = []; + $deadLetterQueuesData = []; - $tableData[] = [ - 'queue' => $queueName, - 'messages' => $messageCount, - 'consumers' => $consumerCount, - 'status' => $messageCount > 0 ? 'Has Messages' : 'Empty', - ]; + foreach ($queueTypes as $type) { + $groupData = $this->fetchQueueGroupData($channel, $type); - $totalMessages += $messageCount; - } catch (\Exception $e) { - $tableData[] = [ - 'queue' => $queueName, - 'messages' => 'N/A', - 'consumers' => 'N/A', - 'status' => 'Error: ' . $e->getMessage() . '', - ]; + foreach ($groupData as $queueInfo) { + // 区分主队列和重试队列(死信队列) + if (str_ends_with($queueInfo['queue'], '.retry.queue')) { + $deadLetterQueuesData[] = $queueInfo; + } else { + $businessQueuesData[] = $queueInfo; + } + + if (is_numeric($queueInfo['messages'])) { + $totalMessages += $queueInfo['messages']; + } + if (is_numeric($queueInfo['consumers'])) { + $totalConsumers += $queueInfo['consumers']; + } + $allQueueNames[] = $queueInfo['queue']; } } + // 显示主业务队列表格 + $this->displayBusinessQueues($businessQueuesData, $filterQueue); + + // 显示死信队列(重试队列) + if (!empty($deadLetterQueuesData)) { + $this->line(''); + $this->displayDeadLetterQueues($deadLetterQueuesData, $filterQueue); + } + + // 显示共享的错误队列 + if (!$filterQueue) { + $this->line(''); + $errorQueueData = $this->fetchQueueData($channel, 'errors.queue'); + $this->displaySharedQueues([$errorQueueData]); + + if (is_numeric($errorQueueData['messages'])) { + $totalMessages += $errorQueueData['messages']; + } + if (is_numeric($errorQueueData['consumers'])) { + $totalConsumers += $errorQueueData['consumers']; + } + $allQueueNames[] = $errorQueueData['queue']; + } + // 关闭连接 $channel->close(); $connection->close(); - // 显示表格 - if (empty($tableData)) { - $this->warn('No queues found.'); - return 0; - } - - $this->table( - ['Queue Name', 'Messages', 'Consumers', 'Status'], - array_map(function($row) { - return [$row['queue'], $row['messages'], $row['consumers'], $row['status']]; - }, $tableData) - ); - + // 显示汇总信息 $this->line(''); - $this->info("Total messages across all queues: {$totalMessages}"); + $this->info("=== Summary ==="); + $this->line("Total messages: {$totalMessages}"); + $this->line("Total active consumers: {$totalConsumers}"); + $this->line(''); + $this->line('Queues monitored:'); + foreach ($allQueueNames as $queueName) { + $this->line(" - {$queueName}"); + } return 0; } catch (\Exception $e) { @@ -139,51 +176,221 @@ class AppMqStatus extends HyperfCommand } /** - * 扫描所有使用 Consumer 注解的类,获取队列名称 + * 获取队列组数据(主队列 + 重试队列) */ - private function getQueuesFromAnnotations(): array + private function fetchQueueGroupData($channel, string $type): array { - $queues = []; + $queues = [ + "{$type}.queue", // 主业务队列 + "{$type}.retry.queue", // 重试队列 + ]; - // 扫描 Platform 目录下的所有 Consumer - $platformPath = BASE_PATH . '/app/Platform'; - - if (is_dir($platformPath)) { - $this->scanDirectory($platformPath, $queues); + $groupData = []; + foreach ($queues as $queueName) { + $groupData[] = $this->fetchQueueData($channel, $queueName); } - // 如果没有找到任何队列,返回一些默认的队列名称 - if (empty($queues)) { - $queues = ['orders.queue']; - } - - return array_unique($queues); + return $groupData; } /** - * 递归扫描目录,查找包含 Consumer 注解的类 + * 获取单个队列的数据 */ - private function scanDirectory(string $path, array &$queues): void + private function fetchQueueData($channel, string $queueName): array { - $files = scandir($path); + try { + // 使用 passive=true 来获取队列信息而不创建队列 + [$queue, $messageCount, $consumerCount] = $channel->queue_declare( + $queueName, + true, // passive + true, // durable + false, // exclusive + false // auto_delete + ); - foreach ($files as $file) { - if ($file === '.' || $file === '..') { - continue; + return [ + 'queue' => $queueName, + 'messages' => $messageCount, + 'consumers' => $consumerCount, + 'status' => $this->getQueueStatus($messageCount, $consumerCount), + ]; + } catch (\Exception $e) { + return [ + 'queue' => $queueName, + 'messages' => 'N/A', + 'consumers' => 'N/A', + 'status' => 'Error: ' . $e->getMessage() . '', + ]; + } + } + + /** + * 获取队列状态描述 + */ + private function getQueueStatus(int $messageCount, int $consumerCount): string + { + if ($messageCount > 100) { + return '⚠ High Load'; + } elseif ($messageCount > 10) { + return '⚡ Processing'; + } elseif ($messageCount > 0) { + return '✓ Active'; + } else { + return '✓ Empty'; + } + } + + /** + * 显示业务队列(合并所有队列组)- 转置显示 + */ + private function displayBusinessQueues(array $allGroupsData, ?string $filterQueue): void + { + // 构建标题 + $title = $filterQueue + ? "Business Queues (Filtered: {$filterQueue})" + : "Business Queues"; + + $this->line("=== {$title} ==="); + + // 构建表头 - 使用简化的队列名称(去掉 .queue) + $headers = ['Metric']; + foreach ($allGroupsData as $queueInfo) { + $queueName = $queueInfo['queue']; + // 去掉 .queue 后缀 + $simpleName = str_replace('.queue', '', $queueName); + $headers[] = $simpleName; + } + + // 构建行数据 - 转置显示 + $rows = [ + $this->buildMetricRow('Messages', $allGroupsData, 'messages'), + $this->buildMetricRow('Consumers', $allGroupsData, 'consumers'), + $this->buildMetricRow('Status', $allGroupsData, 'status'), + ]; + + $this->table($headers, $rows); + $this->line(''); + } + + /** + * 显示死信队列(重试队列)- 转置显示 + */ + private function displayDeadLetterQueues(array $queuesData, ?string $filterQueue): void + { + $title = $filterQueue + ? "Dead Letter Queues (Filtered: {$filterQueue})" + : "Dead Letter Queues (Retry Queues)"; + + $this->line("=== {$title} ==="); + + // 构建表头 + $headers = ['Metric']; + foreach ($queuesData as $queueInfo) { + $queueName = $queueInfo['queue']; + // 去掉 .queue 后缀,保留 retry 标识 + $simpleName = str_replace('.queue', '', $queueName); + $headers[] = $simpleName; + } + + // 构建行数据 + $rows = [ + $this->buildMetricRow('Messages', $queuesData, 'messages'), + $this->buildMetricRow('Consumers', $queuesData, 'consumers'), + $this->buildMetricRow('Status', $queuesData, 'status'), + ]; + + $this->table($headers, $rows); + + // 添加说明 + if (!empty($queuesData) && $this->hasMessages($queuesData)) { + $this->line('ℹ These queues receive messages from DLX when main queue processing fails'); + } + } + + /** + * 显示共享队列(errors.queue)- 转置显示 + */ + private function displaySharedQueues(array $queuesData): void + { + $this->line("=== Shared Queues ==="); + + // 构建表头 + $headers = ['Metric']; + foreach ($queuesData as $queueInfo) { + $queueName = $queueInfo['queue']; + // 去掉 .queue 后缀 + $simpleName = str_replace('.queue', '', $queueName); + $headers[] = $simpleName; + } + + // 构建行数据 + $rows = [ + $this->buildMetricRow('Messages', $queuesData, 'messages'), + $this->buildMetricRow('Consumers', $queuesData, 'consumers'), + $this->buildMetricRow('Status', $queuesData, 'status'), + ]; + + $this->table($headers, $rows); + + // 添加说明 + if (!empty($queuesData) && $this->hasMessages($queuesData)) { + $this->line('ℹ Error queue contains messages that exceeded max retry count'); + } + } + + /** + * 检查队列中是否有消息 + */ + private function hasMessages(array $queuesData): bool + { + foreach ($queuesData as $queueInfo) { + if (is_numeric($queueInfo['messages']) && $queueInfo['messages'] > 0) { + return true; } + } + return false; + } - $filePath = $path . '/' . $file; + /** + * 构建指标行数据 + */ + private function buildMetricRow(string $metricName, array $queuesData, string $field): array + { + $row = [$metricName]; - if (is_dir($filePath)) { - $this->scanDirectory($filePath, $queues); - } elseif (is_file($filePath) && pathinfo($filePath, PATHINFO_EXTENSION) === 'php') { - $content = file_get_contents($filePath); + foreach ($queuesData as $queueInfo) { + $value = $queueInfo[$field]; - // 查找 Consumer 注解中的 queue 参数 - if (preg_match('/#\[Consumer\([^)]*queue:\s*["\']([^"\']+)["\']/', $content, $matches)) { - $queues[] = $matches[1]; - } + // 根据字段类型格式化显示 + if ($field === 'messages' || $field === 'consumers') { + $row[] = $this->formatNumber($value); + } else { + $row[] = $value; } } + + return $row; + } + + /** + * 格式化数字显示 + */ + private function formatNumber($value): string + { + if (!is_numeric($value)) { + return (string) $value; + } + + $num = (int) $value; + + if ($num > 1000) { + return '' . number_format($num) . ''; + } elseif ($num > 100) { + return '' . number_format($num) . ''; + } elseif ($num > 0) { + return '' . $num . ''; + } else { + return '' . $num . ''; + } } } diff --git a/backend/app/Entity/Parse/EntityParseFactory.php b/backend/app/Entity/Parse/EntityParseFactory.php index dd225c7..3065cf9 100644 --- a/backend/app/Entity/Parse/EntityParseFactory.php +++ b/backend/app/Entity/Parse/EntityParseFactory.php @@ -135,22 +135,19 @@ class EntityParseFactory } /** - * 从数据中提取平台名称 + * 从数据中提取平台名称(从 meta 字段中获取) * * @param array $data * @return string + * @throws InvalidArgumentException */ private static function extractPlatformNameFromData(array $data): string { - // 优先从 platform_id 获取,如果没有则尝试从其他字段获取 - if (isset($data['platform_id'])) { - // 这里可以根据 platform_id 映射到平台名称 - // 简化处理:假设需要从数据库或配置中查找 - // 当前先直接使用 platform_id 作为平台名称 - return self::resolvePlatformName($data['platform_id']); + if (!isset($data['meta']['platform_id'])) { + throw new InvalidArgumentException("Cannot extract platform name from data: meta.platform_id missing"); } - throw new InvalidArgumentException("Cannot extract platform name from data: platform_id missing"); + return self::resolvePlatformName($data['meta']['platform_id']); } /** diff --git a/backend/app/Platform/Tmall/TmallOrderParser.php b/backend/app/Platform/Tmall/TmallOrderParser.php index 7d2b56c..c891405 100644 --- a/backend/app/Platform/Tmall/TmallOrderParser.php +++ b/backend/app/Platform/Tmall/TmallOrderParser.php @@ -8,7 +8,7 @@ use App\Model\Company; use App\Model\Model as Entity; use App\Model\Store; use App\Entity\Parse\EntityParse; -use Hyperf\Amqp\Message\ConsumerMessageInterface; +use Hyperf\Collection\LazyCollection; use InvalidArgumentException; /** @@ -21,20 +21,20 @@ class TmallOrderParser extends EntityParse /** * 公司作用域匹配 * - * 从消息体中提取 company_id,然后查询数据库获取公司对象 + * 从 metadata 中提取 company_id,然后查询数据库获取公司对象 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return Company * @throws InvalidArgumentException */ - public function companyScopeMatch(ConsumerMessageInterface $message): Company + public function companyScopeMatch(array $metadata): Company { - $data = $this->extractMessageData($message); - // 验证必需字段 - $this->validateRequiredFields($data, ['company_id']); + if (!isset($metadata['company_id'])) { + throw new InvalidArgumentException('company_id is required in metadata'); + } - $companyId = $data['company_id']; + $companyId = $metadata['company_id']; $company = Company::find($companyId); @@ -48,20 +48,20 @@ class TmallOrderParser extends EntityParse /** * 店铺作用域匹配 * - * 从消息体中提取 store_id,然后查询数据库获取店铺对象 + * 从 metadata 中提取 store_id,然后查询数据库获取店铺对象 * - * @param ConsumerMessageInterface $message + * @param array $metadata * @return Store * @throws InvalidArgumentException */ - public function storeScopeMatch(ConsumerMessageInterface $message): Store + public function storeScopeMatch(array $metadata): Store { - $data = $this->extractMessageData($message); - // 验证必需字段 - $this->validateRequiredFields($data, ['store_id']); + if (!isset($metadata['store_id'])) { + throw new InvalidArgumentException('store_id is required in metadata'); + } - $storeId = $data['store_id']; + $storeId = $metadata['store_id']; $store = Store::find($storeId); @@ -75,50 +75,44 @@ class TmallOrderParser extends EntityParse /** * 实体数据映射 * - * 将原始数据映射到实体对象 + * 将原始数据映射为可供 Model 使用的数组集合 * - * @param array $data - * @param Entity $entity - * @return Entity + * @param array $rawData 原始数据数组,通常来自 $data['raw_data'] + * @return LazyCollection 返回 LazyCollection,每个元素为可供 Model::fill() 使用的数组 */ - public function entityMap(array $data, Entity $entity): Entity + public function entityMap(array $rawData): LazyCollection { - // 假设数据结构为: - // { - // "platform_id": 2, - // "company_id": 188, - // "store_id": 292, - // "unique_id": "abc123", - // "raw_data": [...] - // } + // 使用 LazyCollection 进行延迟处理,提高性能 + return LazyCollection::make(function () use ($rawData) { + // 如果 rawData 是单个记录,转换为数组 + $records = isset($rawData[0]) ? $rawData : [$rawData]; - $rawData = $data['raw_data'] ?? []; - - // 映射基本信息 - $entity->platform_id = $this->getPlatform()->id; - $entity->company_id = $this->getCompany()->id; - $entity->store_id = $this->getStore()->id; - $entity->unique_id = $data['unique_id'] ?? null; - - // 映射原始数据 - if (!empty($rawData)) { - $entity->raw_data = json_encode($rawData); - } - - // 映射其他字段(根据实际业务需求) - // ... - - return $entity; + foreach ($records as $record) { + // 映射每条原始数据到 Model 可用的数组格式 + yield [ + 'platform_id' => $this->getPlatform()->id, + 'company_id' => $this->getCompany()->id, + 'store_id' => $this->getStore()->id, + 'unique_id' => $record['unique_id'] ?? $this->getData()['unique_id'] ?? null, + 'raw_data' => json_encode($record), + // 根据实际业务需求映射其他字段 + // 例如: + // 'order_id' => $record['order_id'] ?? null, + // 'order_status' => $record['order_status'] ?? null, + // 'order_amount' => $record['order_amount'] ?? 0, + // ... + ]; + } + }); } /** * 可选:覆盖唯一标识符提取逻辑 * - * 如果使用默认的 id/unique_id 提取逻辑,则无需覆盖此方法 + * 如果使用默认的 unique_id 提取逻辑,则无需覆盖此方法 */ - // public function entityUniqueIdentifierExtract(ConsumerMessageInterface $message): string|int + // public function entityUniqueIdentifierExtract(array $metadata): string|int // { - // $data = $this->extractMessageData($message); - // return $this->extractUniqueIdentifier($data, 'custom_id_field'); + // return $metadata['custom_id_field'] ?? throw new InvalidArgumentException('custom_id_field not found'); // } }