0, // 同一个消费者,最高同时可以处理的消息数。 // @attention 默认值 100, test 设置为 1 'prefetch_count' => 1, // 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。 'global' => false, ]; /** * 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列 * 这样才能实现重试队列和错误队列的机制 */ protected bool $requeue = false; protected $entityType = 'order'; /** * 重写队列构建器,设置队列参数 * 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误 * * @return QueueBuilder */ public function getQueueBuilder(): QueueBuilder { return (new QueueBuilder()) ->setQueue($this->getQueue()) ->setArguments([ 'x-dead-letter-exchange' => ['S', 'dlx.orders'], 'x-dead-letter-routing-key' => ['S', 'retry'], 'x-message-ttl' => ['I', 86400000], // 24小时 'x-queue-type' => ['S', 'classic'], ]); } public function consumeMessage($data, AMQPMessage $message): Result { // 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态 // 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒 $debugDelay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); if ($debugDelay > 0) { dump("Debug mode: sleeping for {$debugDelay} seconds..."); sleep($debugDelay); } // dump('---data'); // dump($data); // dump('---'); dump('---message'); dump(json_decode($message->getBody(), true)['message_id']); dump('---'); // 获取重试次数 $retryCount = $this->getRetryCount($message); $maxRetries = (int) env('AMQP_MAX_RETRIES', 3); dump("Retry count: {$retryCount}/{$maxRetries}"); try { // EntityParseFactory 使用静态方法调用,无需依赖注入 $parse = EntityParseFactory::createFromMessage($message); // 提取 metadata $metadata = [ 'company_id' => $data['company_id'] ?? null, 'platform_id' => $data['platform_id'] ?? null, 'store_id' => $data['store_id'] ?? null, 'unique_id' => $data['unique_id'] ?? null, ]; // entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息。 $entity = $parse->entityMatch($metadata); // message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。 $entityMapResult = $parse->entityMap($data['raw_data'] ?? []); // 将 LazyCollection 转为数组,准备批量操作 $dataToUpsert = $entityMapResult->all(); if (empty($dataToUpsert)) { dump('No data to process'); return Result::ACK; } dump("Processing " . count($dataToUpsert) . " order(s) with batch upsert"); // 分离订单数据和子项数据 $ordersData = []; $itemsByPlatformOrderId = []; foreach ($dataToUpsert as $data) { $ordersData[] = $data['order']; $itemsByPlatformOrderId[$data['order']['platform_order_id']] = $data['items'] ?? []; } Db::beginTransaction(); // @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理 // @attention 如果多条记录中有个别记录请求失败,可能会导致该批次写入失败,此时则需要判断和修复 // 1. 使用 upsert 批量处理订单插入和更新 // 利用数据库唯一索引自动判断是插入还是更新 // 解决了重复订单推送的问题:存在则更新,不存在则插入 $entity->newQuery()->upsert( $ordersData, $parse->getUniqueBy(), // 从解析器获取唯一键字段 $parse->getUpdateFields() // 从解析器获取可更新字段 ); // 2. 处理订单子项 // 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem $this->processOrderItems($itemsByPlatformOrderId); Db::commit(); // @TODO 触发事件通知,更新自动聚合任务 // 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。 return Result::ACK; } catch (Throwable $error) { dump("=== Error Caught ==="); dump("Error: " . $error->getMessage()); dump("Retry Count: {$retryCount}"); dump("Max Retries: {$maxRetries}"); dump("Check: {$retryCount} >= {$maxRetries} = " . ($retryCount >= $maxRetries ? 'TRUE' : 'FALSE')); Log::get()->error('Consumer processing failed', [ 'error' => $error->getMessage(), 'retry_count' => $retryCount, 'max_retries' => $maxRetries, ]); Db::rollBack(); // 检查是否超过最大重试次数 if ($retryCount >= $maxRetries) { // 超过重试次数,发送到错误队列 dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); try { $this->sendToErrorQueue($message, $error); dump(">>> Successfully sent to error queue!"); } catch (Throwable $e) { dump(">>> FAILED to send to error queue: " . $e->getMessage()); dump(">>> Stack trace: " . $e->getTraceAsString()); } // 返回 ACK 避免消息再次重试 // 因为消息已经被发送到错误队列,不应该继续在主队列中循环 dump(">>> Returning ACK to prevent further retries"); return Result::ACK; } // 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列 dump(">>> Retry not exceeded, sending to DLX (NACK)"); return Result::NACK; } } public function isEnable(): bool { return parent::isEnable(); } /** * 获取消息的重试次数 * 通过检查 x-death header 中的 count 字段 * * @param AMQPMessage $message * @return int */ protected function getRetryCount(AMQPMessage $message): int { // 检查是否存在 application_headers 属性 // 首次处理的消息不会有这个属性,必须先用 has() 检查 if (!$message->has('application_headers')) { dump(">>> No application_headers, first time processing"); return 0; } $headers = $message->get('application_headers'); if (!$headers) { dump(">>> application_headers exists but is empty"); return 0; } $headerData = $headers->getNativeData(); $xDeath = $headerData['x-death'] ?? []; dump(">>> x-death header data:"); dump($xDeath); if (empty($xDeath)) { dump(">>> x-death is empty"); return 0; // 首次失败 } // x-death 是一个数组,第一个元素包含 count 字段 // count 表示消息从该队列死信的次数 $count = $xDeath[0]['count'] ?? 0; dump(">>> Extracted count from x-death: {$count}"); return $count; } /** * 发送消息到错误队列 * 当重试次数超过上限时调用 * * @param AMQPMessage $message 原始消息 * @param Throwable $error 错误信息 * @return void */ protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void { try { $retryCount = $this->getRetryCount($message); // 使用 ErrorProducer 发送到错误队列 $producer = ApplicationContext::getContainer()->get(Producer::class); $errorProducer = new ErrorProducer($message, $error, $retryCount); $producer->produce($errorProducer); // 记录日志 Log::get()->warning('Message sent to error queue after exceeding retry limit', [ 'error_id' => $errorProducer->payload['error_id'] ?? 'unknown', 'retry_count' => $retryCount, 'error_message' => $error->getMessage(), ]); } catch (Throwable $e) { // 发送到错误队列失败,记录日志 dump($e->getMessage()); Log::get()->error('Failed to send message to error queue', [ 'error' => $e->getMessage(), 'original_error' => $error->getMessage(), ]); } } /** * 处理订单子项的批量同步(优化版本) * * 策略优化:使用业务键 (store_id, platform_order_id, sub_order_id) 作为唯一性约束 * 1. 直接批量 upsert OrderItems(无需先查询 order_id) * 2. 批量更新 order_id(通过 JOIN orders 表) * 3. 删除不在新数据中的旧 OrderItem(完全同步) * * 性能优势:减少一次批量查询,直接使用业务键进行 upsert * * @param array $itemsByPlatformOrderId 以 platform_order_id 为键的子项数据数组 * @return void */ protected function processOrderItems(array $itemsByPlatformOrderId): void { if (empty($itemsByPlatformOrderId)) { dump('No order items to process'); return; } // 1. 构建所有子项数据(无需查询 order_id) $allItemsToUpsert = []; $itemSubOrderIdsByPlatformOrderId = []; // 记录每个平台订单的新子项 ID 列表 foreach ($itemsByPlatformOrderId as $platformOrderId => $items) { $subOrderIds = []; foreach ($items as $item) { // order_id 暂时设为 0,后续批量更新 $item['order_id'] = 0; $allItemsToUpsert[] = $item; $subOrderIds[] = $item['sub_order_id']; } $itemSubOrderIdsByPlatformOrderId[$platformOrderId] = $subOrderIds; } if (empty($allItemsToUpsert)) { dump('No valid order items to upsert'); return; } dump("Upserting " . count($allItemsToUpsert) . " order items"); // 2. 批量 upsert OrderItems(使用业务键作为唯一性约束) OrderItem::query()->upsert( $allItemsToUpsert, ['store_id', 'platform_order_id', 'sub_order_id'], // 唯一键(业务键) [ 'company_id', 'platform_id', 'sub_order_type_id', 'product_id', 'platform_product_id', 'product_sku', 'product_barcode', 'unit_price', 'quantity', 'discount', 'total', 'ext', 'updated_at', ] // 可更新字段(不包括 order_id,需要单独更新) ); // 3. 批量更新 order_id(通过 JOIN orders 表) // UPDATE order_items SET order_id = orders.id // FROM orders // WHERE order_items.store_id = orders.store_id // AND order_items.platform_order_id = orders.platform_order_id Db::update(' UPDATE order_items SET order_id = orders.id FROM orders WHERE order_items.store_id = orders.store_id AND order_items.platform_order_id = orders.platform_order_id AND order_items.order_id = 0 '); dump("Updated order_id for all items"); // 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略) // 优化:一次性删除所有不匹配的旧子项,而不是逐个订单处理 // 此部分业务应该很少被调用,订单子项新增或删减的情况很少见 if (!empty($allItemsToUpsert)) { // 构建本次更新的所有 (store_id, platform_order_id) 组合(用于限定删除范围) $updatedOrders = []; foreach ($itemsByPlatformOrderId as $platformOrderId => $items) { if (!empty($items)) { $storeId = (int)$items[0]['store_id']; $platformOrderId = addslashes($platformOrderId); $updatedOrders[] = "({$storeId}, '{$platformOrderId}')"; } } // 构建本次更新的所有 (store_id, platform_order_id, sub_order_id) 组合(保留的记录) $validItems = []; foreach ($allItemsToUpsert as $item) { $storeId = (int)$item['store_id']; $platformOrderId = addslashes($item['platform_order_id']); $subOrderId = addslashes($item['sub_order_id']); $validItems[] = "({$storeId}, '{$platformOrderId}', '{$subOrderId}')"; } if (!empty($updatedOrders) && !empty($validItems)) { $ordersIn = implode(', ', $updatedOrders); $itemsNotIn = implode(', ', $validItems); // 批量删除:删除在本次更新订单范围内,但不在新数据中的旧子项 // DELETE FROM order_items // WHERE (store_id, platform_order_id) IN ((1, 'order1'), (1, 'order2')) // AND (store_id, platform_order_id, sub_order_id) NOT IN ((1, 'order1', 'item1'), ...) $deleted = Db::delete(" DELETE FROM order_items WHERE (store_id, platform_order_id) IN ({$ordersIn}) AND (store_id, platform_order_id, sub_order_id) NOT IN ({$itemsNotIn}) "); if ($deleted > 0) { dump("Batch deleted {$deleted} obsolete order items"); } } } dump("Order items processing completed"); } }