diff --git a/backend/app/Platform/OrderConsumer.php b/backend/app/Platform/OrderConsumer.php index 8360ce4..94963bb 100644 --- a/backend/app/Platform/OrderConsumer.php +++ b/backend/app/Platform/OrderConsumer.php @@ -65,11 +65,11 @@ class OrderConsumer extends ConsumerMessage { // 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态 // 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒 - $debugDelay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); - if ($debugDelay > 0) { - dump("Debug mode: sleeping for {$debugDelay} seconds..."); - - sleep($debugDelay); + $debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); + if ($debug_delay > 0) { + dump("Debug mode: sleeping for {$debug_delay} seconds..."); + + sleep($debug_delay); } // dump('---data'); @@ -81,10 +81,10 @@ class OrderConsumer extends ConsumerMessage dump('---'); // 获取重试次数 - $retryCount = $this->getRetryCount($message); - $maxRetries = (int) env('AMQP_MAX_RETRIES', 3); + $retry_count = $this->getRetryCount($message); + $max_retries = (int) env('AMQP_MAX_RETRIES', 3); - dump("Retry count: {$retryCount}/{$maxRetries}"); + dump("Retry count: {$retry_count}/{$max_retries}"); try { // EntityParseFactory 使用静态方法调用,无需依赖注入 @@ -106,19 +106,19 @@ class OrderConsumer extends ConsumerMessage // message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。 // 注意:raw_data 现在在 data 字段中 - $entityMapResult = $parse->entityMap($data['data'] ?? []); + $entity_map_result = $parse->entityMap($data['data'] ?? []); // 将 LazyCollection 转为数组,准备批量操作 - $dataToUpsert = $entityMapResult->all(); + $data_to_upsert = $entity_map_result->all(); - dump($entityMapResult->first()); + dump($entity_map_result->first()); - if (empty($dataToUpsert)) { + if (empty($data_to_upsert)) { dump('No data to process'); return Result::ACK; } - dump("Processing " . count($dataToUpsert) . " order(s) with batch upsert"); + dump("Processing " . count($data_to_upsert) . " order(s) with batch upsert"); // 检测订单解析器是否实现了 getOrderItems 方法 if( !method_exists($parse, 'formatOrderItemsFromRaw')){ @@ -139,8 +139,8 @@ class OrderConsumer extends ConsumerMessage // 收集订单数据 - $ordersData = $dataToUpsert; - $rawData = $data['data'] ?? []; + $orders_data = $data_to_upsert; + $raw_data = $data['data'] ?? []; Db::beginTransaction(); // @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理 @@ -149,21 +149,21 @@ class OrderConsumer extends ConsumerMessage // 1. 使用 upsert 批量处理订单插入和更新 // 利用数据库唯一索引自动判断是插入还是更新 // 解决了重复订单推送的问题:存在则更新,不存在则插入 - $uniqueBy = $parse->getUniqueBy(); + $unique_by = $parse->getUniqueBy(); $entity->newQuery()->upsert( - $ordersData, - $uniqueBy, // 从解析器获取唯一键字段 + $orders_data, + $unique_by, // 从解析器获取唯一键字段 $parse->getUpdateFields() // 从解析器获取可更新字段 ); // 2. 查询获取 ID 映射 [platform_order_id => local db id] // 通过唯一键查询刚写入的订单,获取数据库生成的 ID - $idMapping = $entity->newQuery() - ->where(function ($query) use ($ordersData, $uniqueBy) { - foreach ($ordersData as $orderData) { - $query->orWhere(function ($q) use ($orderData, $uniqueBy) { - foreach ($uniqueBy as $key) { - $q->where($key, $orderData[$key]); + $id_mapping = $entity->newQuery() + ->where(function ($query) use ($orders_data, $unique_by) { + foreach ($orders_data as $order_data) { + $query->orWhere(function ($q) use ($order_data, $unique_by) { + foreach ($unique_by as $key) { + $q->where($key, $order_data[$key]); } }); } @@ -171,10 +171,10 @@ class OrderConsumer extends ConsumerMessage ->pluck('id', 'platform_order_id') ->toArray(); - dump("ID mapping: " . count($idMapping) . " orders"); + dump("ID mapping: " . count($id_mapping) . " orders"); // 3. 格式化订单子项(传入 ID 映射,使子项能直接获取正确的 order_id) - $items = $parse->formatOrderItemsFromRaw($rawData, $idMapping); + $items = $parse->formatOrderItemsFromRaw($raw_data, $id_mapping); // 4. 处理订单子项 // 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem @@ -192,19 +192,19 @@ class OrderConsumer extends ConsumerMessage dump("File: " . $error->getFile() . ":" . $error->getLine()); dump("Stack trace:"); dump($error->getTraceAsString()); - dump("Retry Count: {$retryCount}"); - dump("Max Retries: {$maxRetries}"); - dump("Check: {$retryCount} >= {$maxRetries} = " . ($retryCount >= $maxRetries ? 'TRUE' : 'FALSE')); + dump("Retry Count: {$retry_count}"); + dump("Max Retries: {$max_retries}"); + dump("Check: {$retry_count} >= {$max_retries} = " . ($retry_count >= $max_retries ? 'TRUE' : 'FALSE')); Log::get()->error('Consumer processing failed', [ 'error' => $error->getMessage(), - 'retry_count' => $retryCount, - 'max_retries' => $maxRetries, + 'retry_count' => $retry_count, + 'max_retries' => $max_retries, ]); Db::rollBack(); // 检查是否超过最大重试次数 - if ($retryCount >= $maxRetries) { + if ($retry_count >= $max_retries) { // 超过重试次数,发送到错误队列 dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); @@ -284,17 +284,17 @@ class OrderConsumer extends ConsumerMessage protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void { try { - $retryCount = $this->getRetryCount($message); + $retry_count = $this->getRetryCount($message); // 使用 ErrorProducer 发送到错误队列 $producer = ApplicationContext::getContainer()->get(Producer::class); - $errorProducer = new ErrorProducer($message, $error, $retryCount); - $producer->produce($errorProducer); + $error_producer = new ErrorProducer($message, $error, $retry_count); + $producer->produce($error_producer); // 记录日志 Log::get()->warning('Message sent to error queue after exceeding retry limit', [ - 'error_id' => $errorProducer->payload['error_id'] ?? 'unknown', - 'retry_count' => $retryCount, + 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', + 'retry_count' => $retry_count, 'error_message' => $error->getMessage(), ]); } catch (Throwable $e) { @@ -317,44 +317,44 @@ class OrderConsumer extends ConsumerMessage * * 性能优势:减少一次批量查询,直接使用业务键进行 upsert * - * @param array $itemsByPlatformOrderId 以 platform_order_id 为键的子项数据数组 + * @param array $items_by_platform_order_id 以 platform_order_id 为键的子项数据数组 * @return void */ - protected function processOrderItems(array $itemsByPlatformOrderId): void + protected function processOrderItems(array $items_by_platform_order_id): void { - if (empty($itemsByPlatformOrderId)) { + if (empty($items_by_platform_order_id)) { dump('No order items to process'); return; } // 1. 构建所有子项数据(无需查询 order_id) - $allItemsToUpsert = []; - $itemSubOrderIdsByPlatformOrderId = []; // 记录每个平台订单的新子项 ID 列表 + $all_items_to_upsert = []; + $item_sub_order_ids_by_platform_order_id = []; // 记录每个平台订单的新子项 ID 列表 - foreach ($itemsByPlatformOrderId as $platformOrderId => $items) { - $subOrderIds = []; + foreach ($items_by_platform_order_id as $platform_order_id => $items) { + $sub_order_ids = []; foreach ($items as $item) { // order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充 - $allItemsToUpsert[] = $item; - $subOrderIds[] = $item['sub_order_id']; + $all_items_to_upsert[] = $item; + $sub_order_ids[] = $item['sub_order_id']; } - $itemSubOrderIdsByPlatformOrderId[$platformOrderId] = $subOrderIds; + $item_sub_order_ids_by_platform_order_id[$platform_order_id] = $sub_order_ids; } - if (empty($allItemsToUpsert)) { + if (empty($all_items_to_upsert)) { dump('No valid order items to upsert'); return; } - dump("Upserting " . count($allItemsToUpsert) . " order items"); + dump("Upserting " . count($all_items_to_upsert) . " order items"); // 2. 批量 upsert OrderItems(使用业务键作为唯一性约束) // order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充,无需后置 JOIN 更新 OrderItem::query()->upsert( - $allItemsToUpsert, - ['store_id', 'platform_order_id', 'sub_order_id'], // 唯一键(业务键) + $all_items_to_upsert, + ['store_id', 'platform_order_id', 'sub_order_id', 'created_date'], // 唯一键(业务键) [ 'order_id', // 直接更新 order_id 'company_id', @@ -368,7 +368,6 @@ class OrderConsumer extends ConsumerMessage 'quantity', 'discount', 'total', - 'created_date', 'ext', 'updated_at', ] @@ -377,49 +376,107 @@ class OrderConsumer extends ConsumerMessage dump("Upserted order items with order_id"); // 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略) - // 优化:一次性删除所有不匹配的旧子项,而不是逐个订单处理 + // 利用 hypertable 按 created_date 分区的特性,确保删除条件包含 created_date 以触发分区裁剪 // 此部分业务应该很少被调用,订单子项新增或删减的情况很少见 - - if (!empty($allItemsToUpsert)) { - // 构建本次更新的所有 (store_id, platform_order_id) 组合(用于限定删除范围) - $updatedOrders = []; - foreach ($itemsByPlatformOrderId as $platformOrderId => $items) { - if (!empty($items)) { - $storeId = (int)$items[0]['store_id']; - $platformOrderId = addslashes($platformOrderId); - $updatedOrders[] = "({$storeId}, '{$platformOrderId}')"; - } - } - - // 构建本次更新的所有 (store_id, platform_order_id, sub_order_id) 组合(保留的记录) - $validItems = []; - foreach ($allItemsToUpsert as $item) { - $storeId = (int)$item['store_id']; - $platformOrderId = addslashes($item['platform_order_id']); - $subOrderId = addslashes($item['sub_order_id']); - $validItems[] = "({$storeId}, '{$platformOrderId}', '{$subOrderId}')"; - } - - if (!empty($updatedOrders) && !empty($validItems)) { - $ordersIn = implode(', ', $updatedOrders); - $itemsNotIn = implode(', ', $validItems); - - // 批量删除:删除在本次更新订单范围内,但不在新数据中的旧子项 - // DELETE FROM order_items - // WHERE (store_id, platform_order_id) IN ((1, 'order1'), (1, 'order2')) - // AND (store_id, platform_order_id, sub_order_id) NOT IN ((1, 'order1', 'item1'), ...) - $deleted = Db::delete(" - DELETE FROM order_items - WHERE (store_id, platform_order_id) IN ({$ordersIn}) - AND (store_id, platform_order_id, sub_order_id) NOT IN ({$itemsNotIn}) - "); - - if ($deleted > 0) { - dump("Batch deleted {$deleted} obsolete order items"); - } - } - } + $this->deleteObsoleteOrderItems($all_items_to_upsert); dump("Order items processing completed"); } + + /** + * 删除不在新数据中的旧订单子项 + * + * 策略:集合差集(1 次 SELECT + 1 次 DELETE) + * 1. 查询数据库中涉及订单的所有 sub_order_id(集合 A) + * 2. 从新数据提取 sub_order_id(集合 B) + * 3. 集合 A - B = 需要删除的记录 + * + * @param array $new_items 新的订单子项数据 + * @return void + */ + protected function deleteObsoleteOrderItems(array $new_items): void + { + if (empty($new_items)) { + return; + } + + // 1. 提取唯一的订单键 (store_id, platform_order_id, created_date) 和新数据的完整键 + $order_keys = []; + $new_item_keys = []; // 集合 B + + foreach ($new_items as $item) { + $created_date = $item['created_date'] instanceof \DateTimeInterface + ? $item['created_date']->format('Y-m-d H:i:sP') + : $item['created_date']; + + // 订单级别的键(用于限定查询范围) + $order_key = sprintf('%d|%s|%s', $item['store_id'], $item['platform_order_id'], $created_date); + if (!isset($order_keys[$order_key])) { + $order_keys[$order_key] = [ + 'store_id' => $item['store_id'], + 'platform_order_id' => $item['platform_order_id'], + 'created_date' => $created_date, + ]; + } + + // 子项级别的键(用于差集计算) + $item_key = sprintf('%d|%s|%s|%s', $item['store_id'], $item['platform_order_id'], $item['sub_order_id'], $created_date); + $new_item_keys[$item_key] = true; + } + + // 2. 查询数据库中这些订单的所有现有 sub_order_id(集合 A) + // 利用 hypertable 的 created_date 分区裁剪 + 索引 order_items_store_platform_sub_unique + $existing_items = OrderItem::query() + ->where(function ($query) use ($order_keys) { + foreach ($order_keys as $key) { + $query->orWhere(function ($q) use ($key) { + $q->where('store_id', $key['store_id']) + ->where('platform_order_id', $key['platform_order_id']) + ->where('created_date', $key['created_date']); + }); + } + }) + ->select(['id', 'store_id', 'platform_order_id', 'sub_order_id', 'created_date']) + ->get(); + + if ($existing_items->isEmpty()) { + return; + } + + // 3. 计算差集:集合 A - 集合 B = 需要删除的记录 + $ids_to_delete = []; + $created_dates_to_delete = []; + + foreach ($existing_items as $existing) { + $existing_created_date = $existing->created_date instanceof \DateTimeInterface + ? $existing->created_date->format('Y-m-d H:i:sP') + : (string) $existing->created_date; + + $existing_key = sprintf( + '%d|%s|%s|%s', + $existing->store_id, + $existing->platform_order_id, + $existing->sub_order_id, + $existing_created_date + ); + + // 如果现有记录不在新数据中,则需要删除 + if (!isset($new_item_keys[$existing_key])) { + $ids_to_delete[] = $existing->id; + $created_dates_to_delete[$existing_created_date] = true; + } + } + + // 4. 批量删除(利用 hypertable 主键 (id, created_date) 进行高效删除) + if (!empty($ids_to_delete)) { + $deleted = OrderItem::query() + ->whereIn('id', $ids_to_delete) + ->whereIn('created_date', array_keys($created_dates_to_delete)) + ->delete(); + + if ($deleted > 0) { + dump("Batch deleted {$deleted} obsolete order items"); + } + } + } }