From 700435e5dcd348639b2f4699a037423fc7437fa1 Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Fri, 30 Jan 2026 17:02:53 +0800 Subject: [PATCH] add order consumer --- backend/app/Platform/OrderConsumer.php | 104 ++++++++++++++++--------- 1 file changed, 68 insertions(+), 36 deletions(-) diff --git a/backend/app/Platform/OrderConsumer.php b/backend/app/Platform/OrderConsumer.php index 4fbb1e1..8360ce4 100644 --- a/backend/app/Platform/OrderConsumer.php +++ b/backend/app/Platform/OrderConsumer.php @@ -15,6 +15,7 @@ use PhpAmqpLib\Message\AMQPMessage; use Hyperf\Di\Annotation\Inject; use Hyperf\DbConnection\Db; use App\Model\OrderItem; +use Exception; use Hyperf\Context\ApplicationContext; use Throwable; @@ -89,23 +90,29 @@ class OrderConsumer extends ConsumerMessage // EntityParseFactory 使用静态方法调用,无需依赖注入 $parse = EntityParseFactory::createFromMessage($message); - // 提取 metadata + // 提取 metadata(从 meta 字段中获取) + $meta = $data['meta'] ?? []; $metadata = [ - 'company_id' => $data['company_id'] ?? null, - 'platform_id' => $data['platform_id'] ?? null, - 'store_id' => $data['store_id'] ?? null, - 'unique_id' => $data['unique_id'] ?? null, + 'company_id' => $meta['company_id'] ?? null, + 'platform_id' => $meta['platform_id'] ?? null, + 'store_id' => $meta['store_id'] ?? null, + 'unique_id' => $meta['unique_id'] ?? null, ]; - // entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息。 + // entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息 + // 获取到的 $entity 示例为属性为空的 \App\Model\Model 对象,比如 \App\Model\Order $entity = $parse->entityMatch($metadata); + // message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。 - $entityMapResult = $parse->entityMap($data['raw_data'] ?? []); + // 注意:raw_data 现在在 data 字段中 + $entityMapResult = $parse->entityMap($data['data'] ?? []); // 将 LazyCollection 转为数组,准备批量操作 $dataToUpsert = $entityMapResult->all(); + dump($entityMapResult->first()); + if (empty($dataToUpsert)) { dump('No data to process'); return Result::ACK; @@ -113,15 +120,28 @@ class OrderConsumer extends ConsumerMessage dump("Processing " . count($dataToUpsert) . " order(s) with batch upsert"); - // 分离订单数据和子项数据 - $ordersData = []; - $itemsByPlatformOrderId = []; - - foreach ($dataToUpsert as $data) { - $ordersData[] = $data['order']; - $itemsByPlatformOrderId[$data['order']['platform_order_id']] = $data['items'] ?? []; + // 检测订单解析器是否实现了 getOrderItems 方法 + if( !method_exists($parse, 'formatOrderItemsFromRaw')){ + throw new Exception('formatOrderItemsFromRaw method must be implemented in ' . $parse::class ); } + // 检测订单解析器是否实现了 getOrderStatusId 方法 + // 该方法处理了电商平台订单状态到本地订单状态枚举值 \App\Constants\OrderStatus 的映射 + if( !method_exists($parse, 'getOrderStatusId')){ + throw new Exception('getOrderStatusId method must be implemented in ' . $parse::class ); + } + + // 检测订单解析器是否实现了 getPaymentMethodId 方法 + // 该方法处理了电商平台订单付款方式的枚举值 \App\Constants\PaymentMethod 的映射 + if( !method_exists($parse, 'getPaymentMethodId')){ + throw new Exception('getPaymentMethodId method must be implemented in ' . $parse::class ); + } + + + // 收集订单数据 + $ordersData = $dataToUpsert; + $rawData = $data['data'] ?? []; + Db::beginTransaction(); // @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理 // @attention 如果多条记录中有个别记录请求失败,可能会导致该批次写入失败,此时则需要判断和修复 @@ -129,15 +149,37 @@ class OrderConsumer extends ConsumerMessage // 1. 使用 upsert 批量处理订单插入和更新 // 利用数据库唯一索引自动判断是插入还是更新 // 解决了重复订单推送的问题:存在则更新,不存在则插入 + $uniqueBy = $parse->getUniqueBy(); $entity->newQuery()->upsert( $ordersData, - $parse->getUniqueBy(), // 从解析器获取唯一键字段 + $uniqueBy, // 从解析器获取唯一键字段 $parse->getUpdateFields() // 从解析器获取可更新字段 ); - // 2. 处理订单子项 + // 2. 查询获取 ID 映射 [platform_order_id => local db id] + // 通过唯一键查询刚写入的订单,获取数据库生成的 ID + $idMapping = $entity->newQuery() + ->where(function ($query) use ($ordersData, $uniqueBy) { + foreach ($ordersData as $orderData) { + $query->orWhere(function ($q) use ($orderData, $uniqueBy) { + foreach ($uniqueBy as $key) { + $q->where($key, $orderData[$key]); + } + }); + } + }) + ->pluck('id', 'platform_order_id') + ->toArray(); + + dump("ID mapping: " . count($idMapping) . " orders"); + + // 3. 格式化订单子项(传入 ID 映射,使子项能直接获取正确的 order_id) + $items = $parse->formatOrderItemsFromRaw($rawData, $idMapping); + + // 4. 处理订单子项 // 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem - $this->processOrderItems($itemsByPlatformOrderId); + $this->processOrderItems($items); + Db::commit(); // @TODO 触发事件通知,更新自动聚合任务 @@ -147,6 +189,9 @@ class OrderConsumer extends ConsumerMessage } catch (Throwable $error) { dump("=== Error Caught ==="); dump("Error: " . $error->getMessage()); + dump("File: " . $error->getFile() . ":" . $error->getLine()); + dump("Stack trace:"); + dump($error->getTraceAsString()); dump("Retry Count: {$retryCount}"); dump("Max Retries: {$maxRetries}"); dump("Check: {$retryCount} >= {$maxRetries} = " . ($retryCount >= $maxRetries ? 'TRUE' : 'FALSE')); @@ -290,9 +335,7 @@ class OrderConsumer extends ConsumerMessage $subOrderIds = []; foreach ($items as $item) { - // order_id 暂时设为 0,后续批量更新 - $item['order_id'] = 0; - + // order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充 $allItemsToUpsert[] = $item; $subOrderIds[] = $item['sub_order_id']; } @@ -308,10 +351,12 @@ class OrderConsumer extends ConsumerMessage dump("Upserting " . count($allItemsToUpsert) . " order items"); // 2. 批量 upsert OrderItems(使用业务键作为唯一性约束) + // order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充,无需后置 JOIN 更新 OrderItem::query()->upsert( $allItemsToUpsert, ['store_id', 'platform_order_id', 'sub_order_id'], // 唯一键(业务键) [ + 'order_id', // 直接更新 order_id 'company_id', 'platform_id', 'sub_order_type_id', @@ -323,26 +368,13 @@ class OrderConsumer extends ConsumerMessage 'quantity', 'discount', 'total', + 'created_date', 'ext', 'updated_at', - ] // 可更新字段(不包括 order_id,需要单独更新) + ] ); - // 3. 批量更新 order_id(通过 JOIN orders 表) - // UPDATE order_items SET order_id = orders.id - // FROM orders - // WHERE order_items.store_id = orders.store_id - // AND order_items.platform_order_id = orders.platform_order_id - Db::update(' - UPDATE order_items - SET order_id = orders.id - FROM orders - WHERE order_items.store_id = orders.store_id - AND order_items.platform_order_id = orders.platform_order_id - AND order_items.order_id = 0 - '); - - dump("Updated order_id for all items"); + dump("Upserted order items with order_id"); // 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略) // 优化:一次性删除所有不匹配的旧子项,而不是逐个订单处理