add order item constraint to order parse

This commit is contained in:
2025-12-15 15:22:22 +08:00
parent 651de05bb5
commit 47b5fe2f8a
5 changed files with 358 additions and 42 deletions
+149 -3
View File
@@ -14,6 +14,7 @@ use Hyperf\Amqp\Producer;
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\Di\Annotation\Inject;
use Hyperf\DbConnection\Db;
use App\Model\OrderItem;
use Hyperf\Context\ApplicationContext;
use Throwable;
@@ -112,18 +113,34 @@ class OrderConsumer extends ConsumerMessage
dump("Processing " . count($dataToUpsert) . " order(s) with batch upsert");
Db::beginTransaction();
// 分离订单数据和子项数据
$ordersData = [];
$itemsByPlatformOrderId = [];
// 使用 upsert 批量处理插入和更新
foreach ($dataToUpsert as $data) {
$ordersData[] = $data['order'];
$itemsByPlatformOrderId[$data['order']['platform_order_id']] = $data['items'] ?? [];
}
Db::beginTransaction();
// @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理
// @attention 如果多条记录中有个别记录请求失败,可能会导致该批次写入失败,此时则需要判断和修复
// 1. 使用 upsert 批量处理订单插入和更新
// 利用数据库唯一索引自动判断是插入还是更新
// 解决了重复订单推送的问题:存在则更新,不存在则插入
$entity->newQuery()->upsert(
$dataToUpsert,
$ordersData,
$parse->getUniqueBy(), // 从解析器获取唯一键字段
$parse->getUpdateFields() // 从解析器获取可更新字段
);
// 2. 处理订单子项
// 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem
$this->processOrderItems($itemsByPlatformOrderId);
Db::commit();
// @TODO 触发事件通知,更新自动聚合任务
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
return Result::ACK;
@@ -244,4 +261,133 @@ class OrderConsumer extends ConsumerMessage
]);
}
}
/**
* 处理订单子项的批量同步(优化版本)
*
* 策略优化:使用业务键 (store_id, platform_order_id, sub_order_id) 作为唯一性约束
* 1. 直接批量 upsert OrderItems(无需先查询 order_id
* 2. 批量更新 order_id(通过 JOIN orders 表)
* 3. 删除不在新数据中的旧 OrderItem(完全同步)
*
* 性能优势:减少一次批量查询,直接使用业务键进行 upsert
*
* @param array $itemsByPlatformOrderId 以 platform_order_id 为键的子项数据数组
* @return void
*/
protected function processOrderItems(array $itemsByPlatformOrderId): void
{
if (empty($itemsByPlatformOrderId)) {
dump('No order items to process');
return;
}
// 1. 构建所有子项数据(无需查询 order_id)
$allItemsToUpsert = [];
$itemSubOrderIdsByPlatformOrderId = []; // 记录每个平台订单的新子项 ID 列表
foreach ($itemsByPlatformOrderId as $platformOrderId => $items) {
$subOrderIds = [];
foreach ($items as $item) {
// order_id 暂时设为 0,后续批量更新
$item['order_id'] = 0;
$allItemsToUpsert[] = $item;
$subOrderIds[] = $item['sub_order_id'];
}
$itemSubOrderIdsByPlatformOrderId[$platformOrderId] = $subOrderIds;
}
if (empty($allItemsToUpsert)) {
dump('No valid order items to upsert');
return;
}
dump("Upserting " . count($allItemsToUpsert) . " order items");
// 2. 批量 upsert OrderItems(使用业务键作为唯一性约束)
OrderItem::query()->upsert(
$allItemsToUpsert,
['store_id', 'platform_order_id', 'sub_order_id'], // 唯一键(业务键)
[
'company_id',
'platform_id',
'sub_order_type_id',
'product_id',
'platform_product_id',
'product_sku',
'product_barcode',
'unit_price',
'quantity',
'discount',
'total',
'ext',
'updated_at',
] // 可更新字段(不包括 order_id,需要单独更新)
);
// 3. 批量更新 order_id(通过 JOIN orders 表)
// UPDATE order_items SET order_id = orders.id
// FROM orders
// WHERE order_items.store_id = orders.store_id
// AND order_items.platform_order_id = orders.platform_order_id
Db::update('
UPDATE order_items
SET order_id = orders.id
FROM orders
WHERE order_items.store_id = orders.store_id
AND order_items.platform_order_id = orders.platform_order_id
AND order_items.order_id = 0
');
dump("Updated order_id for all items");
// 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略)
// 优化:一次性删除所有不匹配的旧子项,而不是逐个订单处理
// 此部分业务应该很少被调用,订单子项新增或删减的情况很少见
if (!empty($allItemsToUpsert)) {
// 构建本次更新的所有 (store_id, platform_order_id) 组合(用于限定删除范围)
$updatedOrders = [];
foreach ($itemsByPlatformOrderId as $platformOrderId => $items) {
if (!empty($items)) {
$storeId = (int)$items[0]['store_id'];
$platformOrderId = addslashes($platformOrderId);
$updatedOrders[] = "({$storeId}, '{$platformOrderId}')";
}
}
// 构建本次更新的所有 (store_id, platform_order_id, sub_order_id) 组合(保留的记录)
$validItems = [];
foreach ($allItemsToUpsert as $item) {
$storeId = (int)$item['store_id'];
$platformOrderId = addslashes($item['platform_order_id']);
$subOrderId = addslashes($item['sub_order_id']);
$validItems[] = "({$storeId}, '{$platformOrderId}', '{$subOrderId}')";
}
if (!empty($updatedOrders) && !empty($validItems)) {
$ordersIn = implode(', ', $updatedOrders);
$itemsNotIn = implode(', ', $validItems);
// 批量删除:删除在本次更新订单范围内,但不在新数据中的旧子项
// DELETE FROM order_items
// WHERE (store_id, platform_order_id) IN ((1, 'order1'), (1, 'order2'))
// AND (store_id, platform_order_id, sub_order_id) NOT IN ((1, 'order1', 'item1'), ...)
$deleted = Db::delete("
DELETE FROM order_items
WHERE (store_id, platform_order_id) IN ({$ordersIn})
AND (store_id, platform_order_id, sub_order_id) NOT IN ({$itemsNotIn})
");
if ($deleted > 0) {
dump("Batch deleted {$deleted} obsolete order items");
}
}
}
dump("Order items processing completed");
}
}