From 47b5fe2f8a572d2accec72b2666d8128738e7770 Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Mon, 15 Dec 2025 15:22:22 +0800 Subject: [PATCH] add order item constraint to order parse --- backend/app/Platform/AbstractOrderParse.php | 53 ++++++ backend/app/Platform/OrderConsumer.php | 152 +++++++++++++++++- backend/app/Platform/OrderContract.php | 28 ++++ .../app/Platform/Shopee/EntityParse/Order.php | 129 ++++++++++----- ..._add_unique_index_to_order_items_table.php | 38 +++++ 5 files changed, 358 insertions(+), 42 deletions(-) create mode 100644 backend/app/Platform/AbstractOrderParse.php create mode 100644 backend/app/Platform/OrderContract.php create mode 100644 backend/migrations/2025_12_15_141034_add_unique_index_to_order_items_table.php diff --git a/backend/app/Platform/AbstractOrderParse.php b/backend/app/Platform/AbstractOrderParse.php new file mode 100644 index 0000000..dfbe50f --- /dev/null +++ b/backend/app/Platform/AbstractOrderParse.php @@ -0,0 +1,53 @@ +newQuery()->upsert( - $dataToUpsert, + $ordersData, $parse->getUniqueBy(), // 从解析器获取唯一键字段 $parse->getUpdateFields() // 从解析器获取可更新字段 ); + // 2. 处理订单子项 + // 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem + $this->processOrderItems($itemsByPlatformOrderId); + Db::commit(); + // @TODO 触发事件通知,更新自动聚合任务 // 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。 return Result::ACK; @@ -244,4 +261,133 @@ class OrderConsumer extends ConsumerMessage ]); } } + + /** + * 处理订单子项的批量同步(优化版本) + * + * 策略优化:使用业务键 (store_id, platform_order_id, sub_order_id) 作为唯一性约束 + * 1. 直接批量 upsert OrderItems(无需先查询 order_id) + * 2. 批量更新 order_id(通过 JOIN orders 表) + * 3. 删除不在新数据中的旧 OrderItem(完全同步) + * + * 性能优势:减少一次批量查询,直接使用业务键进行 upsert + * + * @param array $itemsByPlatformOrderId 以 platform_order_id 为键的子项数据数组 + * @return void + */ + protected function processOrderItems(array $itemsByPlatformOrderId): void + { + if (empty($itemsByPlatformOrderId)) { + dump('No order items to process'); + return; + } + + // 1. 构建所有子项数据(无需查询 order_id) + $allItemsToUpsert = []; + $itemSubOrderIdsByPlatformOrderId = []; // 记录每个平台订单的新子项 ID 列表 + + foreach ($itemsByPlatformOrderId as $platformOrderId => $items) { + $subOrderIds = []; + + foreach ($items as $item) { + // order_id 暂时设为 0,后续批量更新 + $item['order_id'] = 0; + + $allItemsToUpsert[] = $item; + $subOrderIds[] = $item['sub_order_id']; + } + + $itemSubOrderIdsByPlatformOrderId[$platformOrderId] = $subOrderIds; + } + + if (empty($allItemsToUpsert)) { + dump('No valid order items to upsert'); + return; + } + + dump("Upserting " . count($allItemsToUpsert) . " order items"); + + // 2. 批量 upsert OrderItems(使用业务键作为唯一性约束) + OrderItem::query()->upsert( + $allItemsToUpsert, + ['store_id', 'platform_order_id', 'sub_order_id'], // 唯一键(业务键) + [ + 'company_id', + 'platform_id', + 'sub_order_type_id', + 'product_id', + 'platform_product_id', + 'product_sku', + 'product_barcode', + 'unit_price', + 'quantity', + 'discount', + 'total', + 'ext', + 'updated_at', + ] // 可更新字段(不包括 order_id,需要单独更新) + ); + + // 3. 批量更新 order_id(通过 JOIN orders 表) + // UPDATE order_items SET order_id = orders.id + // FROM orders + // WHERE order_items.store_id = orders.store_id + // AND order_items.platform_order_id = orders.platform_order_id + Db::update(' + UPDATE order_items + SET order_id = orders.id + FROM orders + WHERE order_items.store_id = orders.store_id + AND order_items.platform_order_id = orders.platform_order_id + AND order_items.order_id = 0 + '); + + dump("Updated order_id for all items"); + + // 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略) + // 优化:一次性删除所有不匹配的旧子项,而不是逐个订单处理 + // 此部分业务应该很少被调用,订单子项新增或删减的情况很少见 + + if (!empty($allItemsToUpsert)) { + // 构建本次更新的所有 (store_id, platform_order_id) 组合(用于限定删除范围) + $updatedOrders = []; + foreach ($itemsByPlatformOrderId as $platformOrderId => $items) { + if (!empty($items)) { + $storeId = (int)$items[0]['store_id']; + $platformOrderId = addslashes($platformOrderId); + $updatedOrders[] = "({$storeId}, '{$platformOrderId}')"; + } + } + + // 构建本次更新的所有 (store_id, platform_order_id, sub_order_id) 组合(保留的记录) + $validItems = []; + foreach ($allItemsToUpsert as $item) { + $storeId = (int)$item['store_id']; + $platformOrderId = addslashes($item['platform_order_id']); + $subOrderId = addslashes($item['sub_order_id']); + $validItems[] = "({$storeId}, '{$platformOrderId}', '{$subOrderId}')"; + } + + if (!empty($updatedOrders) && !empty($validItems)) { + $ordersIn = implode(', ', $updatedOrders); + $itemsNotIn = implode(', ', $validItems); + + // 批量删除:删除在本次更新订单范围内,但不在新数据中的旧子项 + // DELETE FROM order_items + // WHERE (store_id, platform_order_id) IN ((1, 'order1'), (1, 'order2')) + // AND (store_id, platform_order_id, sub_order_id) NOT IN ((1, 'order1', 'item1'), ...) + $deleted = Db::delete(" + DELETE FROM order_items + WHERE (store_id, platform_order_id) IN ({$ordersIn}) + AND (store_id, platform_order_id, sub_order_id) NOT IN ({$itemsNotIn}) + "); + + if ($deleted > 0) { + dump("Batch deleted {$deleted} obsolete order items"); + } + } + } + + dump("Order items processing completed"); + } } diff --git a/backend/app/Platform/OrderContract.php b/backend/app/Platform/OrderContract.php new file mode 100644 index 0000000..9636bef --- /dev/null +++ b/backend/app/Platform/OrderContract.php @@ -0,0 +1,28 @@ +parseOrderItems($_origin_order_item, $record['order_sn']); + // 根据实际业务需求映射其他字段 // 映射每条原始数据到 Order Model 可用的数组格式 // @attention 此处业务决定了 电商平台数据 和 数据仓库 之间的映射关系 + + // 返回包含订单数据和子项数据的结构 yield [ - 'company_id' => $this->getCompany()->id, - 'platform_id' => 25, - 'store_id' => $this->getStore()->id, - 'order_status_id'=> OrderStatus::{$record['order_status']}->value, - 'platform_order_id' => $record['order_sn'], - 'payment_method_id' => $this->parsePaymentMethod($record['payment_method']), - 'buyer_user_id' => $record['buyer_user_id'] ?? null, - 'presale' => false, // shopee 平台暂不存类型为 presales 的订单 - 'total_amount' => $record['total_amount'] ?? 0, - // @link https://open.shopee.com/documents/v2/v2.order.get_order_detail?module=94&type=1 - // shopee 定义 $record['total_amount'] 为买家付款金额字段 - // The total amount paid by the buyer for the order. This amount includes the total sale price of items, - // shipping cost beared by buyer; and offset by Shopee promotions if applicable. This value - // will only return after the buyer has completed payment for the order. - 'total_paid' => $record['total_amount'] ?? 0, - 'total_discount' => 0, - 'total_received' => 0, - // @link https://open.shopee.com/documents/v2/v2.order.get_order_detail?module=94&type=1 - // $record['actual_shipping_fee'] - The actual shipping fee of the order if available from external logistics partners. - 'freight_fee' => $record['actual_shipping_fee'] ?? 0, - 'tax_fee' => 0, - 'discount_fee' => 0, - 'commission_fee' => 0, - 'coupon_amount' => 0, - 'voucher_amount' => 0, - 'order_type_id' => 0, - 'created_date' => Carbon::createFromTimestamp($record['create_time'], $this->getStore()->getTimezoneString()), // 订单的创建时间 - 'updated_date' => Carbon::createFromTimestamp($record['update_time'], $this->getStore()->getTimezoneString()), // 最近的一次订单更新时间 - 'paid_date' => $record['pay_time'] ? Carbon::createFromTimestamp($record['pay_time'], $this->getStore()->getTimezoneString()) : null, - 'shipping_date' => $record['pickup_done_time'] > 0 ? Carbon::createFromTimestamp($record['pickup_done_time'], $this->getStore()->getTimezoneString()) : null, - 'country' => $record['region'], // address 等地理位置信息因 shopee 数据保护协议已被隐藏, 目前仅提供 ISO 格式国家代码 - 'raw' => json_encode($record['raw']), + 'order' => [ + 'company_id' => $this->getCompany()->id, + 'platform_id' => 25, + 'store_id' => $this->getStore()->id, + 'order_status_id'=> OrderStatus::{$record['order_status']}->value, + 'platform_order_id' => $record['order_sn'], + 'payment_method_id' => $this->parsePaymentMethod($record['payment_method']), + 'buyer_user_id' => $record['buyer_user_id'] ?? null, + 'presale' => false, // shopee 平台暂不存类型为 presales 的订单 + 'total_amount' => $record['total_amount'] ?? 0, + // @link https://open.shopee.com/documents/v2/v2.order.get_order_detail?module=94&type=1 + // shopee 定义 $record['total_amount'] 为买家付款金额字段 + // The total amount paid by the buyer for the order. This amount includes the total sale price of items, + // shipping cost beared by buyer; and offset by Shopee promotions if applicable. This value + // will only return after the buyer has completed payment for the order. + 'total_paid' => $record['total_amount'] ?? 0, + 'total_discount' => 0, + 'total_received' => 0, + // @link https://open.shopee.com/documents/v2/v2.order.get_order_detail?module=94&type=1 + // $record['actual_shipping_fee'] - The actual shipping fee of the order if available from external logistics partners. + 'freight_fee' => $record['actual_shipping_fee'] ?? 0, + 'tax_fee' => 0, + 'discount_fee' => 0, + 'commission_fee' => 0, + 'coupon_amount' => 0, + 'voucher_amount' => 0, + 'order_type_id' => 0, + 'created_date' => Carbon::createFromTimestamp($record['create_time'], $this->getStore()->getTimezoneString()), // 订单的创建时间 + 'updated_date' => Carbon::createFromTimestamp($record['update_time'], $this->getStore()->getTimezoneString()), // 最近的一次订单更新时间 + 'paid_date' => $record['pay_time'] ? Carbon::createFromTimestamp($record['pay_time'], $this->getStore()->getTimezoneString()) : null, + 'shipping_date' => $record['pickup_done_time'] > 0 ? Carbon::createFromTimestamp($record['pickup_done_time'], $this->getStore()->getTimezoneString()) : null, + 'country' => $record['region'], // address 等地理位置信息因 shopee 数据保护协议已被隐藏, 目前仅提供 ISO 格式国家代码 + 'raw' => json_encode($record['raw']), + ], + 'items' => $order_items, ]; } }); } - private function orderItemParse( array $order_items) : array { - return []; + /** + * 解析订单子项数据(实现 OrderContract 契约方法) + * + * Shopee API 文档:https://open.shopee.com/documents/v2/v2.order.get_order_detail?module=94&type=1 + * + * @param array $orderItems Shopee 原始订单子项数组 + * @param string $platformOrderId 平台订单 ID (order_sn) + * @return array 解析后的订单子项数据数组 + */ + public function parseOrderItems(array $orderItems, string $platformOrderId): array + { + $parsedItems = []; + + foreach ($orderItems as $item) { + // 价格和数量 + $originalPrice = (float)($item['model_original_price'] ?? 0); + $discountedPrice = (float)($item['model_discounted_price'] ?? $originalPrice); + $quantity = (int)($item['model_quantity_purchased'] ?? 1); + + // SKU: 优先使用 model_sku(变体SKU),如果为空则使用 item_sku(商品SKU) + $sku = !empty($item['model_sku']) ? $item['model_sku'] : ($item['item_sku'] ?? ''); + + $parsedItems[] = [ + 'company_id' => $this->getCompany()->id, + 'platform_id' => 25, + 'store_id' => $this->getStore()->id, + // order_id 将在 Consumer 中补充 + 'platform_order_id' => $platformOrderId, // 平台订单 ID(从 order_sn 传入) + 'sub_order_id' => (string)$item['order_item_id'], // 使用 order_item_id 作为订单子项的唯一标识 + 'sub_order_type_id' => 0, + 'product_id' => 0, // 需要根据 platform_product_id 匹配,暂设为 0 + 'platform_product_id' => (string)$item['item_id'], // Shopee 商品 ID + 'product_sku' => $sku, + 'product_barcode' => '', + 'unit_price' => $discountedPrice, // 实际成交单价(折扣后) + 'quantity' => $quantity, + 'discount' => ($originalPrice - $discountedPrice) * $quantity, // 总折扣金额 + 'total' => $discountedPrice * $quantity, // 总金额 = 折扣价 × 数量 + 'ext' => '', // 原始信息已在 Order.raw 中记录,此处留空 + ]; + } + + return $parsedItems; } /** diff --git a/backend/migrations/2025_12_15_141034_add_unique_index_to_order_items_table.php b/backend/migrations/2025_12_15_141034_add_unique_index_to_order_items_table.php new file mode 100644 index 0000000..0ec6e2e --- /dev/null +++ b/backend/migrations/2025_12_15_141034_add_unique_index_to_order_items_table.php @@ -0,0 +1,38 @@ +unique(['order_id', 'sub_order_id'], 'order_items_order_sub_order_unique'); + + // 索引2:基于业务键的唯一性(用于 upsert 操作,无需先查询 order_id) + // 与 orders 表的 (store_id, platform_order_id) 唯一索引配合使用 + $table->unique( + ['store_id', 'platform_order_id', 'sub_order_id'], + 'order_items_store_platform_sub_unique' + ); + }); + } + + /** + * Reverse the migrations. + */ + public function down(): void + { + Schema::table('order_items', function (Blueprint $table) { + // 删除两个唯一索引 + $table->dropUnique('order_items_order_sub_order_unique'); + $table->dropUnique('order_items_store_platform_sub_unique'); + }); + } +};