update order consumer

This commit is contained in:
2026-02-02 13:47:17 +08:00
parent 2aeaec79d6
commit e8c83cb911
+142 -85
View File
@@ -65,11 +65,11 @@ class OrderConsumer extends ConsumerMessage
{ {
// 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态 // 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态
// 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒 // 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒
$debugDelay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); $debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
if ($debugDelay > 0) { if ($debug_delay > 0) {
dump("Debug mode: sleeping for {$debugDelay} seconds..."); dump("Debug mode: sleeping for {$debug_delay} seconds...");
sleep($debugDelay); sleep($debug_delay);
} }
// dump('---data'); // dump('---data');
@@ -81,10 +81,10 @@ class OrderConsumer extends ConsumerMessage
dump('---'); dump('---');
// 获取重试次数 // 获取重试次数
$retryCount = $this->getRetryCount($message); $retry_count = $this->getRetryCount($message);
$maxRetries = (int) env('AMQP_MAX_RETRIES', 3); $max_retries = (int) env('AMQP_MAX_RETRIES', 3);
dump("Retry count: {$retryCount}/{$maxRetries}"); dump("Retry count: {$retry_count}/{$max_retries}");
try { try {
// EntityParseFactory 使用静态方法调用,无需依赖注入 // EntityParseFactory 使用静态方法调用,无需依赖注入
@@ -106,19 +106,19 @@ class OrderConsumer extends ConsumerMessage
// message 中包含 raw dataraw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。 // message 中包含 raw dataraw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
// 注意:raw_data 现在在 data 字段中 // 注意:raw_data 现在在 data 字段中
$entityMapResult = $parse->entityMap($data['data'] ?? []); $entity_map_result = $parse->entityMap($data['data'] ?? []);
// 将 LazyCollection 转为数组,准备批量操作 // 将 LazyCollection 转为数组,准备批量操作
$dataToUpsert = $entityMapResult->all(); $data_to_upsert = $entity_map_result->all();
dump($entityMapResult->first()); dump($entity_map_result->first());
if (empty($dataToUpsert)) { if (empty($data_to_upsert)) {
dump('No data to process'); dump('No data to process');
return Result::ACK; return Result::ACK;
} }
dump("Processing " . count($dataToUpsert) . " order(s) with batch upsert"); dump("Processing " . count($data_to_upsert) . " order(s) with batch upsert");
// 检测订单解析器是否实现了 getOrderItems 方法 // 检测订单解析器是否实现了 getOrderItems 方法
if( !method_exists($parse, 'formatOrderItemsFromRaw')){ if( !method_exists($parse, 'formatOrderItemsFromRaw')){
@@ -139,8 +139,8 @@ class OrderConsumer extends ConsumerMessage
// 收集订单数据 // 收集订单数据
$ordersData = $dataToUpsert; $orders_data = $data_to_upsert;
$rawData = $data['data'] ?? []; $raw_data = $data['data'] ?? [];
Db::beginTransaction(); Db::beginTransaction();
// @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理 // @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理
@@ -149,21 +149,21 @@ class OrderConsumer extends ConsumerMessage
// 1. 使用 upsert 批量处理订单插入和更新 // 1. 使用 upsert 批量处理订单插入和更新
// 利用数据库唯一索引自动判断是插入还是更新 // 利用数据库唯一索引自动判断是插入还是更新
// 解决了重复订单推送的问题:存在则更新,不存在则插入 // 解决了重复订单推送的问题:存在则更新,不存在则插入
$uniqueBy = $parse->getUniqueBy(); $unique_by = $parse->getUniqueBy();
$entity->newQuery()->upsert( $entity->newQuery()->upsert(
$ordersData, $orders_data,
$uniqueBy, // 从解析器获取唯一键字段 $unique_by, // 从解析器获取唯一键字段
$parse->getUpdateFields() // 从解析器获取可更新字段 $parse->getUpdateFields() // 从解析器获取可更新字段
); );
// 2. 查询获取 ID 映射 [platform_order_id => local db id] // 2. 查询获取 ID 映射 [platform_order_id => local db id]
// 通过唯一键查询刚写入的订单,获取数据库生成的 ID // 通过唯一键查询刚写入的订单,获取数据库生成的 ID
$idMapping = $entity->newQuery() $id_mapping = $entity->newQuery()
->where(function ($query) use ($ordersData, $uniqueBy) { ->where(function ($query) use ($orders_data, $unique_by) {
foreach ($ordersData as $orderData) { foreach ($orders_data as $order_data) {
$query->orWhere(function ($q) use ($orderData, $uniqueBy) { $query->orWhere(function ($q) use ($order_data, $unique_by) {
foreach ($uniqueBy as $key) { foreach ($unique_by as $key) {
$q->where($key, $orderData[$key]); $q->where($key, $order_data[$key]);
} }
}); });
} }
@@ -171,10 +171,10 @@ class OrderConsumer extends ConsumerMessage
->pluck('id', 'platform_order_id') ->pluck('id', 'platform_order_id')
->toArray(); ->toArray();
dump("ID mapping: " . count($idMapping) . " orders"); dump("ID mapping: " . count($id_mapping) . " orders");
// 3. 格式化订单子项(传入 ID 映射,使子项能直接获取正确的 order_id) // 3. 格式化订单子项(传入 ID 映射,使子项能直接获取正确的 order_id)
$items = $parse->formatOrderItemsFromRaw($rawData, $idMapping); $items = $parse->formatOrderItemsFromRaw($raw_data, $id_mapping);
// 4. 处理订单子项 // 4. 处理订单子项
// 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem // 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem
@@ -192,19 +192,19 @@ class OrderConsumer extends ConsumerMessage
dump("File: " . $error->getFile() . ":" . $error->getLine()); dump("File: " . $error->getFile() . ":" . $error->getLine());
dump("Stack trace:"); dump("Stack trace:");
dump($error->getTraceAsString()); dump($error->getTraceAsString());
dump("Retry Count: {$retryCount}"); dump("Retry Count: {$retry_count}");
dump("Max Retries: {$maxRetries}"); dump("Max Retries: {$max_retries}");
dump("Check: {$retryCount} >= {$maxRetries} = " . ($retryCount >= $maxRetries ? 'TRUE' : 'FALSE')); dump("Check: {$retry_count} >= {$max_retries} = " . ($retry_count >= $max_retries ? 'TRUE' : 'FALSE'));
Log::get()->error('Consumer processing failed', [ Log::get()->error('Consumer processing failed', [
'error' => $error->getMessage(), 'error' => $error->getMessage(),
'retry_count' => $retryCount, 'retry_count' => $retry_count,
'max_retries' => $maxRetries, 'max_retries' => $max_retries,
]); ]);
Db::rollBack(); Db::rollBack();
// 检查是否超过最大重试次数 // 检查是否超过最大重试次数
if ($retryCount >= $maxRetries) { if ($retry_count >= $max_retries) {
// 超过重试次数,发送到错误队列 // 超过重试次数,发送到错误队列
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
@@ -284,17 +284,17 @@ class OrderConsumer extends ConsumerMessage
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
{ {
try { try {
$retryCount = $this->getRetryCount($message); $retry_count = $this->getRetryCount($message);
// 使用 ErrorProducer 发送到错误队列 // 使用 ErrorProducer 发送到错误队列
$producer = ApplicationContext::getContainer()->get(Producer::class); $producer = ApplicationContext::getContainer()->get(Producer::class);
$errorProducer = new ErrorProducer($message, $error, $retryCount); $error_producer = new ErrorProducer($message, $error, $retry_count);
$producer->produce($errorProducer); $producer->produce($error_producer);
// 记录日志 // 记录日志
Log::get()->warning('Message sent to error queue after exceeding retry limit', [ Log::get()->warning('Message sent to error queue after exceeding retry limit', [
'error_id' => $errorProducer->payload['error_id'] ?? 'unknown', 'error_id' => $error_producer->payload['error_id'] ?? 'unknown',
'retry_count' => $retryCount, 'retry_count' => $retry_count,
'error_message' => $error->getMessage(), 'error_message' => $error->getMessage(),
]); ]);
} catch (Throwable $e) { } catch (Throwable $e) {
@@ -317,44 +317,44 @@ class OrderConsumer extends ConsumerMessage
* *
* 性能优势:减少一次批量查询,直接使用业务键进行 upsert * 性能优势:减少一次批量查询,直接使用业务键进行 upsert
* *
* @param array $itemsByPlatformOrderId 以 platform_order_id 为键的子项数据数组 * @param array $items_by_platform_order_id 以 platform_order_id 为键的子项数据数组
* @return void * @return void
*/ */
protected function processOrderItems(array $itemsByPlatformOrderId): void protected function processOrderItems(array $items_by_platform_order_id): void
{ {
if (empty($itemsByPlatformOrderId)) { if (empty($items_by_platform_order_id)) {
dump('No order items to process'); dump('No order items to process');
return; return;
} }
// 1. 构建所有子项数据(无需查询 order_id) // 1. 构建所有子项数据(无需查询 order_id)
$allItemsToUpsert = []; $all_items_to_upsert = [];
$itemSubOrderIdsByPlatformOrderId = []; // 记录每个平台订单的新子项 ID 列表 $item_sub_order_ids_by_platform_order_id = []; // 记录每个平台订单的新子项 ID 列表
foreach ($itemsByPlatformOrderId as $platformOrderId => $items) { foreach ($items_by_platform_order_id as $platform_order_id => $items) {
$subOrderIds = []; $sub_order_ids = [];
foreach ($items as $item) { foreach ($items as $item) {
// order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充 // order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充
$allItemsToUpsert[] = $item; $all_items_to_upsert[] = $item;
$subOrderIds[] = $item['sub_order_id']; $sub_order_ids[] = $item['sub_order_id'];
} }
$itemSubOrderIdsByPlatformOrderId[$platformOrderId] = $subOrderIds; $item_sub_order_ids_by_platform_order_id[$platform_order_id] = $sub_order_ids;
} }
if (empty($allItemsToUpsert)) { if (empty($all_items_to_upsert)) {
dump('No valid order items to upsert'); dump('No valid order items to upsert');
return; return;
} }
dump("Upserting " . count($allItemsToUpsert) . " order items"); dump("Upserting " . count($all_items_to_upsert) . " order items");
// 2. 批量 upsert OrderItems(使用业务键作为唯一性约束) // 2. 批量 upsert OrderItems(使用业务键作为唯一性约束)
// order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充,无需后置 JOIN 更新 // order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充,无需后置 JOIN 更新
OrderItem::query()->upsert( OrderItem::query()->upsert(
$allItemsToUpsert, $all_items_to_upsert,
['store_id', 'platform_order_id', 'sub_order_id'], // 唯一键(业务键) ['store_id', 'platform_order_id', 'sub_order_id', 'created_date'], // 唯一键(业务键)
[ [
'order_id', // 直接更新 order_id 'order_id', // 直接更新 order_id
'company_id', 'company_id',
@@ -368,7 +368,6 @@ class OrderConsumer extends ConsumerMessage
'quantity', 'quantity',
'discount', 'discount',
'total', 'total',
'created_date',
'ext', 'ext',
'updated_at', 'updated_at',
] ]
@@ -377,49 +376,107 @@ class OrderConsumer extends ConsumerMessage
dump("Upserted order items with order_id"); dump("Upserted order items with order_id");
// 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略) // 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略)
// 优化:一次性删除所有不匹配的旧子项,而不是逐个订单处理 // 利用 hypertable 按 created_date 分区的特性,确保删除条件包含 created_date 以触发分区裁剪
// 此部分业务应该很少被调用,订单子项新增或删减的情况很少见 // 此部分业务应该很少被调用,订单子项新增或删减的情况很少见
$this->deleteObsoleteOrderItems($all_items_to_upsert);
if (!empty($allItemsToUpsert)) { dump("Order items processing completed");
// 构建本次更新的所有 (store_id, platform_order_id) 组合(用于限定删除范围) }
$updatedOrders = [];
foreach ($itemsByPlatformOrderId as $platformOrderId => $items) { /**
if (!empty($items)) { * 删除不在新数据中的旧订单子项
$storeId = (int)$items[0]['store_id']; *
$platformOrderId = addslashes($platformOrderId); * 策略:集合差集(1 次 SELECT + 1 次 DELETE
$updatedOrders[] = "({$storeId}, '{$platformOrderId}')"; * 1. 查询数据库中涉及订单的所有 sub_order_id(集合 A
* 2. 从新数据提取 sub_order_id(集合 B
* 3. 集合 A - B = 需要删除的记录
*
* @param array $new_items 新的订单子项数据
* @return void
*/
protected function deleteObsoleteOrderItems(array $new_items): void
{
if (empty($new_items)) {
return;
}
// 1. 提取唯一的订单键 (store_id, platform_order_id, created_date) 和新数据的完整键
$order_keys = [];
$new_item_keys = []; // 集合 B
foreach ($new_items as $item) {
$created_date = $item['created_date'] instanceof \DateTimeInterface
? $item['created_date']->format('Y-m-d H:i:sP')
: $item['created_date'];
// 订单级别的键(用于限定查询范围)
$order_key = sprintf('%d|%s|%s', $item['store_id'], $item['platform_order_id'], $created_date);
if (!isset($order_keys[$order_key])) {
$order_keys[$order_key] = [
'store_id' => $item['store_id'],
'platform_order_id' => $item['platform_order_id'],
'created_date' => $created_date,
];
}
// 子项级别的键(用于差集计算)
$item_key = sprintf('%d|%s|%s|%s', $item['store_id'], $item['platform_order_id'], $item['sub_order_id'], $created_date);
$new_item_keys[$item_key] = true;
}
// 2. 查询数据库中这些订单的所有现有 sub_order_id(集合 A
// 利用 hypertable 的 created_date 分区裁剪 + 索引 order_items_store_platform_sub_unique
$existing_items = OrderItem::query()
->where(function ($query) use ($order_keys) {
foreach ($order_keys as $key) {
$query->orWhere(function ($q) use ($key) {
$q->where('store_id', $key['store_id'])
->where('platform_order_id', $key['platform_order_id'])
->where('created_date', $key['created_date']);
});
}
})
->select(['id', 'store_id', 'platform_order_id', 'sub_order_id', 'created_date'])
->get();
if ($existing_items->isEmpty()) {
return;
}
// 3. 计算差集:集合 A - 集合 B = 需要删除的记录
$ids_to_delete = [];
$created_dates_to_delete = [];
foreach ($existing_items as $existing) {
$existing_created_date = $existing->created_date instanceof \DateTimeInterface
? $existing->created_date->format('Y-m-d H:i:sP')
: (string) $existing->created_date;
$existing_key = sprintf(
'%d|%s|%s|%s',
$existing->store_id,
$existing->platform_order_id,
$existing->sub_order_id,
$existing_created_date
);
// 如果现有记录不在新数据中,则需要删除
if (!isset($new_item_keys[$existing_key])) {
$ids_to_delete[] = $existing->id;
$created_dates_to_delete[$existing_created_date] = true;
} }
} }
// 构建本次更新的所有 (store_id, platform_order_id, sub_order_id) 组合(保留的记录 // 4. 批量删除(利用 hypertable 主键 (id, created_date) 进行高效删除
$validItems = []; if (!empty($ids_to_delete)) {
foreach ($allItemsToUpsert as $item) { $deleted = OrderItem::query()
$storeId = (int)$item['store_id']; ->whereIn('id', $ids_to_delete)
$platformOrderId = addslashes($item['platform_order_id']); ->whereIn('created_date', array_keys($created_dates_to_delete))
$subOrderId = addslashes($item['sub_order_id']); ->delete();
$validItems[] = "({$storeId}, '{$platformOrderId}', '{$subOrderId}')";
}
if (!empty($updatedOrders) && !empty($validItems)) {
$ordersIn = implode(', ', $updatedOrders);
$itemsNotIn = implode(', ', $validItems);
// 批量删除:删除在本次更新订单范围内,但不在新数据中的旧子项
// DELETE FROM order_items
// WHERE (store_id, platform_order_id) IN ((1, 'order1'), (1, 'order2'))
// AND (store_id, platform_order_id, sub_order_id) NOT IN ((1, 'order1', 'item1'), ...)
$deleted = Db::delete("
DELETE FROM order_items
WHERE (store_id, platform_order_id) IN ({$ordersIn})
AND (store_id, platform_order_id, sub_order_id) NOT IN ({$itemsNotIn})
");
if ($deleted > 0) { if ($deleted > 0) {
dump("Batch deleted {$deleted} obsolete order items"); dump("Batch deleted {$deleted} obsolete order items");
} }
} }
} }
dump("Order items processing completed");
}
} }