2025-11-26 09:44:55 +08:00
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
|
|
namespace App\Platform;
|
|
|
|
|
|
|
2025-11-27 15:03:25 +08:00
|
|
|
|
use App\Entity\Parse\EntityParseFactory;
|
2025-12-01 11:13:42 +08:00
|
|
|
|
use App\Utils\Log;
|
2025-11-26 09:44:55 +08:00
|
|
|
|
use Hyperf\Amqp\Annotation\Consumer;
|
2025-12-01 11:13:42 +08:00
|
|
|
|
use Hyperf\Amqp\Builder\QueueBuilder;
|
2025-11-26 09:44:55 +08:00
|
|
|
|
use Hyperf\Amqp\Message\ConsumerMessage;
|
|
|
|
|
|
use Hyperf\Amqp\Result;
|
2025-12-01 11:13:42 +08:00
|
|
|
|
use Hyperf\Amqp\Producer;
|
2025-11-26 09:44:55 +08:00
|
|
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
2025-11-27 15:03:25 +08:00
|
|
|
|
use Hyperf\DbConnection\Db;
|
2025-12-15 15:22:22 +08:00
|
|
|
|
use App\Model\OrderItem;
|
2026-01-30 17:02:53 +08:00
|
|
|
|
use Exception;
|
2025-12-01 11:13:42 +08:00
|
|
|
|
use Hyperf\Context\ApplicationContext;
|
2025-11-27 15:03:25 +08:00
|
|
|
|
use Throwable;
|
2025-11-26 09:44:55 +08:00
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
use function Hyperf\Support\env;
|
|
|
|
|
|
|
2025-11-27 16:25:53 +08:00
|
|
|
|
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", pool: "default_consumer", nums: 1, enable: true)]
|
2025-11-26 09:44:55 +08:00
|
|
|
|
class OrderConsumer extends ConsumerMessage
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
protected ?array $qos = [
|
|
|
|
|
|
// AMQP 默认并没有实现此配置。
|
|
|
|
|
|
'prefetch_size' => 0,
|
|
|
|
|
|
// 同一个消费者,最高同时可以处理的消息数。
|
2025-12-01 11:13:42 +08:00
|
|
|
|
// @attention 默认值 100, test 设置为 1
|
|
|
|
|
|
'prefetch_count' => 1,
|
2025-11-26 09:44:55 +08:00
|
|
|
|
// 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。
|
|
|
|
|
|
'global' => false,
|
|
|
|
|
|
];
|
|
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
/**
|
|
|
|
|
|
* 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列
|
|
|
|
|
|
* 这样才能实现重试队列和错误队列的机制
|
|
|
|
|
|
*/
|
|
|
|
|
|
protected bool $requeue = false;
|
|
|
|
|
|
|
2025-11-26 09:44:55 +08:00
|
|
|
|
protected $entityType = 'order';
|
2025-12-01 11:13:42 +08:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 重写队列构建器,设置队列参数
|
|
|
|
|
|
* 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return QueueBuilder
|
|
|
|
|
|
*/
|
|
|
|
|
|
public function getQueueBuilder(): QueueBuilder
|
|
|
|
|
|
{
|
|
|
|
|
|
return (new QueueBuilder())
|
|
|
|
|
|
->setQueue($this->getQueue())
|
|
|
|
|
|
->setArguments([
|
|
|
|
|
|
'x-dead-letter-exchange' => ['S', 'dlx.orders'],
|
|
|
|
|
|
'x-dead-letter-routing-key' => ['S', 'retry'],
|
|
|
|
|
|
'x-message-ttl' => ['I', 86400000], // 24小时
|
|
|
|
|
|
'x-queue-type' => ['S', 'classic'],
|
|
|
|
|
|
]);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-26 09:44:55 +08:00
|
|
|
|
public function consumeMessage($data, AMQPMessage $message): Result
|
|
|
|
|
|
{
|
2025-12-01 11:13:42 +08:00
|
|
|
|
// 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态
|
|
|
|
|
|
// 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
|
|
|
|
|
|
if ($debug_delay > 0) {
|
|
|
|
|
|
dump("Debug mode: sleeping for {$debug_delay} seconds...");
|
|
|
|
|
|
|
|
|
|
|
|
sleep($debug_delay);
|
2025-12-01 11:13:42 +08:00
|
|
|
|
}
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
// 获取重试次数
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$retry_count = $this->getRetryCount($message);
|
|
|
|
|
|
$max_retries = (int) env('AMQP_MAX_RETRIES', 3);
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
dump("Retry count: {$retry_count}/{$max_retries}");
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
try {
|
2025-12-01 16:31:13 +08:00
|
|
|
|
// EntityParseFactory 使用静态方法调用,无需依赖注入
|
|
|
|
|
|
$parse = EntityParseFactory::createFromMessage($message);
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2026-01-30 17:02:53 +08:00
|
|
|
|
// 提取 metadata(从 meta 字段中获取)
|
|
|
|
|
|
$meta = $data['meta'] ?? [];
|
2025-12-01 11:13:42 +08:00
|
|
|
|
$metadata = [
|
2026-01-30 17:02:53 +08:00
|
|
|
|
'company_id' => $meta['company_id'] ?? null,
|
|
|
|
|
|
'platform_id' => $meta['platform_id'] ?? null,
|
|
|
|
|
|
'store_id' => $meta['store_id'] ?? null,
|
|
|
|
|
|
'unique_id' => $meta['unique_id'] ?? null,
|
2025-12-01 11:13:42 +08:00
|
|
|
|
];
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2026-01-30 17:02:53 +08:00
|
|
|
|
// entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息
|
|
|
|
|
|
// 获取到的 $entity 示例为属性为空的 \App\Model\Model 对象,比如 \App\Model\Order
|
2025-12-01 11:13:42 +08:00
|
|
|
|
$entity = $parse->entityMatch($metadata);
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2026-01-30 17:02:53 +08:00
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
// message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
|
2026-01-30 17:02:53 +08:00
|
|
|
|
// 注意:raw_data 现在在 data 字段中
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$entity_map_result = $parse->entityMap($data['data'] ?? []);
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-12 11:12:52 +08:00
|
|
|
|
// 将 LazyCollection 转为数组,准备批量操作
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$data_to_upsert = $entity_map_result->all();
|
2025-12-12 11:12:52 +08:00
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
dump($entity_map_result->first());
|
2026-01-30 17:02:53 +08:00
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
if (empty($data_to_upsert)) {
|
2025-12-12 11:12:52 +08:00
|
|
|
|
dump('No data to process');
|
|
|
|
|
|
return Result::ACK;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
dump("Processing " . count($data_to_upsert) . " order(s) with batch upsert");
|
2025-12-12 11:12:52 +08:00
|
|
|
|
|
2026-01-30 17:02:53 +08:00
|
|
|
|
// 检测订单解析器是否实现了 getOrderItems 方法
|
|
|
|
|
|
if( !method_exists($parse, 'formatOrderItemsFromRaw')){
|
|
|
|
|
|
throw new Exception('formatOrderItemsFromRaw method must be implemented in ' . $parse::class );
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检测订单解析器是否实现了 getOrderStatusId 方法
|
|
|
|
|
|
// 该方法处理了电商平台订单状态到本地订单状态枚举值 \App\Constants\OrderStatus 的映射
|
|
|
|
|
|
if( !method_exists($parse, 'getOrderStatusId')){
|
|
|
|
|
|
throw new Exception('getOrderStatusId method must be implemented in ' . $parse::class );
|
|
|
|
|
|
}
|
2025-12-15 15:22:22 +08:00
|
|
|
|
|
2026-01-30 17:02:53 +08:00
|
|
|
|
// 检测订单解析器是否实现了 getPaymentMethodId 方法
|
|
|
|
|
|
// 该方法处理了电商平台订单付款方式的枚举值 \App\Constants\PaymentMethod 的映射
|
|
|
|
|
|
if( !method_exists($parse, 'getPaymentMethodId')){
|
|
|
|
|
|
throw new Exception('getPaymentMethodId method must be implemented in ' . $parse::class );
|
2025-12-15 15:22:22 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-30 17:02:53 +08:00
|
|
|
|
|
|
|
|
|
|
// 收集订单数据
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$orders_data = $data_to_upsert;
|
|
|
|
|
|
$raw_data = $data['data'] ?? [];
|
2026-01-30 17:02:53 +08:00
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
Db::beginTransaction();
|
2025-12-15 15:22:22 +08:00
|
|
|
|
// @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理
|
|
|
|
|
|
// @attention 如果多条记录中有个别记录请求失败,可能会导致该批次写入失败,此时则需要判断和修复
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2025-12-15 15:22:22 +08:00
|
|
|
|
// 1. 使用 upsert 批量处理订单插入和更新
|
2025-12-12 11:12:52 +08:00
|
|
|
|
// 利用数据库唯一索引自动判断是插入还是更新
|
|
|
|
|
|
// 解决了重复订单推送的问题:存在则更新,不存在则插入
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$unique_by = $parse->getUniqueBy();
|
2025-12-12 11:12:52 +08:00
|
|
|
|
$entity->newQuery()->upsert(
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$orders_data,
|
|
|
|
|
|
$unique_by, // 从解析器获取唯一键字段
|
2025-12-12 11:12:52 +08:00
|
|
|
|
$parse->getUpdateFields() // 从解析器获取可更新字段
|
|
|
|
|
|
);
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
2026-01-30 17:02:53 +08:00
|
|
|
|
// 2. 查询获取 ID 映射 [platform_order_id => local db id]
|
|
|
|
|
|
// 通过唯一键查询刚写入的订单,获取数据库生成的 ID
|
2026-02-05 10:38:59 +08:00
|
|
|
|
$platform_orders_id_to_local_db_order_id_map = $entity->newQuery()
|
2026-02-02 13:47:17 +08:00
|
|
|
|
->where(function ($query) use ($orders_data, $unique_by) {
|
|
|
|
|
|
foreach ($orders_data as $order_data) {
|
|
|
|
|
|
$query->orWhere(function ($q) use ($order_data, $unique_by) {
|
|
|
|
|
|
foreach ($unique_by as $key) {
|
|
|
|
|
|
$q->where($key, $order_data[$key]);
|
2026-01-30 17:02:53 +08:00
|
|
|
|
}
|
|
|
|
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
})
|
|
|
|
|
|
->pluck('id', 'platform_order_id')
|
|
|
|
|
|
->toArray();
|
|
|
|
|
|
|
2026-02-05 10:38:59 +08:00
|
|
|
|
dump("ID mapping: " . count($platform_orders_id_to_local_db_order_id_map) . " orders");
|
2026-01-30 17:02:53 +08:00
|
|
|
|
|
|
|
|
|
|
// 3. 格式化订单子项(传入 ID 映射,使子项能直接获取正确的 order_id)
|
2026-02-05 10:38:59 +08:00
|
|
|
|
$items = $parse->formatOrderItemsFromRaw($raw_data, $platform_orders_id_to_local_db_order_id_map);
|
2026-01-30 17:02:53 +08:00
|
|
|
|
|
|
|
|
|
|
// 4. 处理订单子项
|
2025-12-15 15:22:22 +08:00
|
|
|
|
// 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem
|
2026-01-30 17:02:53 +08:00
|
|
|
|
$this->processOrderItems($items);
|
|
|
|
|
|
|
2025-12-15 15:22:22 +08:00
|
|
|
|
|
2025-11-27 15:03:25 +08:00
|
|
|
|
Db::commit();
|
2025-12-15 15:22:22 +08:00
|
|
|
|
// @TODO 触发事件通知,更新自动聚合任务
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
|
|
|
|
|
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
|
|
|
|
|
|
return Result::ACK;
|
2026-02-02 15:13:46 +08:00
|
|
|
|
|
2025-11-27 15:03:25 +08:00
|
|
|
|
} catch (Throwable $error) {
|
2026-02-05 14:20:52 +08:00
|
|
|
|
// dump("=== Error Caught ===");
|
|
|
|
|
|
// dump("Error: " . $error->getMessage());
|
|
|
|
|
|
// dump("File: " . $error->getFile() . ":" . $error->getLine());
|
|
|
|
|
|
// dump("Stack trace:");
|
|
|
|
|
|
// dump($error->getTraceAsString());
|
|
|
|
|
|
// dump("Retry Count: {$retry_count}");
|
|
|
|
|
|
// dump("Max Retries: {$max_retries}");
|
|
|
|
|
|
// dump("Check: {$retry_count} >= {$max_retries} = " . ($retry_count >= $max_retries ? 'TRUE' : 'FALSE'));
|
2025-12-01 16:31:13 +08:00
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
Log::get()->error('Consumer processing failed', [
|
|
|
|
|
|
'error' => $error->getMessage(),
|
2026-02-02 13:47:17 +08:00
|
|
|
|
'retry_count' => $retry_count,
|
|
|
|
|
|
'max_retries' => $max_retries,
|
2025-12-01 11:13:42 +08:00
|
|
|
|
]);
|
2025-11-27 15:03:25 +08:00
|
|
|
|
Db::rollBack();
|
2025-12-01 11:13:42 +08:00
|
|
|
|
|
|
|
|
|
|
// 检查是否超过最大重试次数
|
2026-02-02 13:47:17 +08:00
|
|
|
|
if ($retry_count >= $max_retries) {
|
2025-12-01 11:13:42 +08:00
|
|
|
|
// 超过重试次数,发送到错误队列
|
2025-12-01 16:31:13 +08:00
|
|
|
|
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
$this->sendToErrorQueue($message, $error);
|
|
|
|
|
|
dump(">>> Successfully sent to error queue!");
|
|
|
|
|
|
} catch (Throwable $e) {
|
|
|
|
|
|
dump(">>> FAILED to send to error queue: " . $e->getMessage());
|
|
|
|
|
|
dump(">>> Stack trace: " . $e->getTraceAsString());
|
|
|
|
|
|
}
|
2025-12-01 11:13:42 +08:00
|
|
|
|
|
|
|
|
|
|
// 返回 ACK 避免消息再次重试
|
|
|
|
|
|
// 因为消息已经被发送到错误队列,不应该继续在主队列中循环
|
2025-12-01 16:31:13 +08:00
|
|
|
|
dump(">>> Returning ACK to prevent further retries");
|
2025-12-01 11:13:42 +08:00
|
|
|
|
return Result::ACK;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列
|
2025-12-01 16:31:13 +08:00
|
|
|
|
dump(">>> Retry not exceeded, sending to DLX (NACK)");
|
2025-11-27 15:03:25 +08:00
|
|
|
|
return Result::NACK;
|
|
|
|
|
|
}
|
2025-11-26 09:44:55 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public function isEnable(): bool
|
|
|
|
|
|
{
|
|
|
|
|
|
return parent::isEnable();
|
|
|
|
|
|
}
|
2025-12-01 11:13:42 +08:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 获取消息的重试次数
|
|
|
|
|
|
* 通过检查 x-death header 中的 count 字段
|
|
|
|
|
|
*
|
|
|
|
|
|
* @param AMQPMessage $message
|
|
|
|
|
|
* @return int
|
|
|
|
|
|
*/
|
|
|
|
|
|
protected function getRetryCount(AMQPMessage $message): int
|
|
|
|
|
|
{
|
2025-12-01 16:31:13 +08:00
|
|
|
|
// 检查是否存在 application_headers 属性
|
|
|
|
|
|
// 首次处理的消息不会有这个属性,必须先用 has() 检查
|
|
|
|
|
|
if (!$message->has('application_headers')) {
|
|
|
|
|
|
dump(">>> No application_headers, first time processing");
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
$headers = $message->get('application_headers');
|
|
|
|
|
|
if (!$headers) {
|
2025-12-01 16:31:13 +08:00
|
|
|
|
dump(">>> application_headers exists but is empty");
|
2025-12-01 11:13:42 +08:00
|
|
|
|
return 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
$headerData = $headers->getNativeData();
|
|
|
|
|
|
$xDeath = $headerData['x-death'] ?? [];
|
|
|
|
|
|
|
2025-12-01 16:31:13 +08:00
|
|
|
|
dump(">>> x-death header data:");
|
|
|
|
|
|
dump($xDeath);
|
|
|
|
|
|
|
2025-12-01 11:13:42 +08:00
|
|
|
|
if (empty($xDeath)) {
|
2025-12-01 16:31:13 +08:00
|
|
|
|
dump(">>> x-death is empty");
|
2025-12-01 11:13:42 +08:00
|
|
|
|
return 0; // 首次失败
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// x-death 是一个数组,第一个元素包含 count 字段
|
|
|
|
|
|
// count 表示消息从该队列死信的次数
|
2025-12-01 16:31:13 +08:00
|
|
|
|
$count = $xDeath[0]['count'] ?? 0;
|
|
|
|
|
|
dump(">>> Extracted count from x-death: {$count}");
|
|
|
|
|
|
return $count;
|
2025-12-01 11:13:42 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发送消息到错误队列
|
|
|
|
|
|
* 当重试次数超过上限时调用
|
|
|
|
|
|
*
|
|
|
|
|
|
* @param AMQPMessage $message 原始消息
|
|
|
|
|
|
* @param Throwable $error 错误信息
|
|
|
|
|
|
* @return void
|
|
|
|
|
|
*/
|
|
|
|
|
|
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
|
|
|
|
|
|
{
|
|
|
|
|
|
try {
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$retry_count = $this->getRetryCount($message);
|
2025-12-01 11:13:42 +08:00
|
|
|
|
|
2025-12-01 16:31:13 +08:00
|
|
|
|
// 使用 ErrorProducer 发送到错误队列
|
|
|
|
|
|
$producer = ApplicationContext::getContainer()->get(Producer::class);
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$error_producer = new ErrorProducer($message, $error, $retry_count);
|
|
|
|
|
|
$producer->produce($error_producer);
|
2025-12-01 11:13:42 +08:00
|
|
|
|
|
2025-12-01 16:31:13 +08:00
|
|
|
|
// 记录日志
|
2025-12-01 11:13:42 +08:00
|
|
|
|
Log::get()->warning('Message sent to error queue after exceeding retry limit', [
|
2026-02-02 13:47:17 +08:00
|
|
|
|
'error_id' => $error_producer->payload['error_id'] ?? 'unknown',
|
|
|
|
|
|
'retry_count' => $retry_count,
|
2025-12-01 11:13:42 +08:00
|
|
|
|
'error_message' => $error->getMessage(),
|
|
|
|
|
|
]);
|
|
|
|
|
|
} catch (Throwable $e) {
|
|
|
|
|
|
// 发送到错误队列失败,记录日志
|
2025-12-01 16:31:13 +08:00
|
|
|
|
dump($e->getMessage());
|
2025-12-01 11:13:42 +08:00
|
|
|
|
Log::get()->error('Failed to send message to error queue', [
|
|
|
|
|
|
'error' => $e->getMessage(),
|
|
|
|
|
|
'original_error' => $error->getMessage(),
|
|
|
|
|
|
]);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-12-15 15:22:22 +08:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理订单子项的批量同步(优化版本)
|
|
|
|
|
|
*
|
|
|
|
|
|
* 策略优化:使用业务键 (store_id, platform_order_id, sub_order_id) 作为唯一性约束
|
|
|
|
|
|
* 1. 直接批量 upsert OrderItems(无需先查询 order_id)
|
|
|
|
|
|
* 2. 批量更新 order_id(通过 JOIN orders 表)
|
|
|
|
|
|
* 3. 删除不在新数据中的旧 OrderItem(完全同步)
|
|
|
|
|
|
*
|
|
|
|
|
|
* 性能优势:减少一次批量查询,直接使用业务键进行 upsert
|
|
|
|
|
|
*
|
2026-02-02 13:47:17 +08:00
|
|
|
|
* @param array $items_by_platform_order_id 以 platform_order_id 为键的子项数据数组
|
2025-12-15 15:22:22 +08:00
|
|
|
|
* @return void
|
|
|
|
|
|
*/
|
2026-02-02 13:47:17 +08:00
|
|
|
|
protected function processOrderItems(array $items_by_platform_order_id): void
|
2025-12-15 15:22:22 +08:00
|
|
|
|
{
|
2026-02-02 15:13:46 +08:00
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
if (empty($items_by_platform_order_id)) {
|
2025-12-15 15:22:22 +08:00
|
|
|
|
dump('No order items to process');
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 1. 构建所有子项数据(无需查询 order_id)
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$all_items_to_upsert = [];
|
|
|
|
|
|
$item_sub_order_ids_by_platform_order_id = []; // 记录每个平台订单的新子项 ID 列表
|
2025-12-15 15:22:22 +08:00
|
|
|
|
|
2026-02-02 15:13:46 +08:00
|
|
|
|
foreach ($items_by_platform_order_id as $item) {
|
|
|
|
|
|
$platform_order_id = $item['platform_order_id'];
|
|
|
|
|
|
// order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充
|
|
|
|
|
|
$all_items_to_upsert[] = $item;
|
|
|
|
|
|
$item_sub_order_ids_by_platform_order_id[$platform_order_id] = $item['sub_order_id'];
|
2025-12-15 15:22:22 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
if (empty($all_items_to_upsert)) {
|
2025-12-15 15:22:22 +08:00
|
|
|
|
dump('No valid order items to upsert');
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
dump("Upserting " . count($all_items_to_upsert) . " order items");
|
2025-12-15 15:22:22 +08:00
|
|
|
|
|
|
|
|
|
|
// 2. 批量 upsert OrderItems(使用业务键作为唯一性约束)
|
2026-01-30 17:02:53 +08:00
|
|
|
|
// order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充,无需后置 JOIN 更新
|
2025-12-15 15:22:22 +08:00
|
|
|
|
OrderItem::query()->upsert(
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$all_items_to_upsert,
|
|
|
|
|
|
['store_id', 'platform_order_id', 'sub_order_id', 'created_date'], // 唯一键(业务键)
|
2025-12-15 15:22:22 +08:00
|
|
|
|
[
|
2026-01-30 17:02:53 +08:00
|
|
|
|
'order_id', // 直接更新 order_id
|
2025-12-15 15:22:22 +08:00
|
|
|
|
'company_id',
|
|
|
|
|
|
'platform_id',
|
|
|
|
|
|
'sub_order_type_id',
|
|
|
|
|
|
'product_id',
|
|
|
|
|
|
'platform_product_id',
|
|
|
|
|
|
'product_sku',
|
|
|
|
|
|
'product_barcode',
|
|
|
|
|
|
'unit_price',
|
|
|
|
|
|
'quantity',
|
|
|
|
|
|
'discount',
|
|
|
|
|
|
'total',
|
|
|
|
|
|
'ext',
|
|
|
|
|
|
'updated_at',
|
2026-01-30 17:02:53 +08:00
|
|
|
|
]
|
2025-12-15 15:22:22 +08:00
|
|
|
|
);
|
|
|
|
|
|
|
2026-01-30 17:02:53 +08:00
|
|
|
|
dump("Upserted order items with order_id");
|
2025-12-15 15:22:22 +08:00
|
|
|
|
|
|
|
|
|
|
// 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略)
|
2026-02-02 13:47:17 +08:00
|
|
|
|
// 利用 hypertable 按 created_date 分区的特性,确保删除条件包含 created_date 以触发分区裁剪
|
2025-12-15 15:22:22 +08:00
|
|
|
|
// 此部分业务应该很少被调用,订单子项新增或删减的情况很少见
|
2026-02-02 15:13:46 +08:00
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$this->deleteObsoleteOrderItems($all_items_to_upsert);
|
2025-12-15 15:22:22 +08:00
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
dump("Order items processing completed");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 删除不在新数据中的旧订单子项
|
|
|
|
|
|
*
|
|
|
|
|
|
* 策略:集合差集(1 次 SELECT + 1 次 DELETE)
|
|
|
|
|
|
* 1. 查询数据库中涉及订单的所有 sub_order_id(集合 A)
|
|
|
|
|
|
* 2. 从新数据提取 sub_order_id(集合 B)
|
|
|
|
|
|
* 3. 集合 A - B = 需要删除的记录
|
|
|
|
|
|
*
|
|
|
|
|
|
* @param array $new_items 新的订单子项数据
|
|
|
|
|
|
* @return void
|
|
|
|
|
|
*/
|
|
|
|
|
|
protected function deleteObsoleteOrderItems(array $new_items): void
|
|
|
|
|
|
{
|
|
|
|
|
|
if (empty($new_items)) {
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
2025-12-15 15:22:22 +08:00
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
// 1. 提取唯一的订单键 (store_id, platform_order_id, created_date) 和新数据的完整键
|
|
|
|
|
|
$order_keys = [];
|
|
|
|
|
|
$new_item_keys = []; // 集合 B
|
|
|
|
|
|
|
|
|
|
|
|
foreach ($new_items as $item) {
|
2026-02-02 15:13:46 +08:00
|
|
|
|
// 统一格式化为 Y-m-d H:i:s(不带时区),确保与数据库查询结果匹配
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$created_date = $item['created_date'] instanceof \DateTimeInterface
|
2026-02-02 15:13:46 +08:00
|
|
|
|
? $item['created_date']->format('Y-m-d H:i:s')
|
|
|
|
|
|
: (new \DateTime($item['created_date']))->format('Y-m-d H:i:s');
|
2026-02-02 13:47:17 +08:00
|
|
|
|
|
|
|
|
|
|
// 订单级别的键(用于限定查询范围)
|
|
|
|
|
|
$order_key = sprintf('%d|%s|%s', $item['store_id'], $item['platform_order_id'], $created_date);
|
|
|
|
|
|
if (!isset($order_keys[$order_key])) {
|
|
|
|
|
|
$order_keys[$order_key] = [
|
|
|
|
|
|
'store_id' => $item['store_id'],
|
|
|
|
|
|
'platform_order_id' => $item['platform_order_id'],
|
|
|
|
|
|
'created_date' => $created_date,
|
|
|
|
|
|
];
|
2025-12-15 15:22:22 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
// 子项级别的键(用于差集计算)
|
|
|
|
|
|
$item_key = sprintf('%d|%s|%s|%s', $item['store_id'], $item['platform_order_id'], $item['sub_order_id'], $created_date);
|
|
|
|
|
|
$new_item_keys[$item_key] = true;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 查询数据库中这些订单的所有现有 sub_order_id(集合 A)
|
|
|
|
|
|
// 利用 hypertable 的 created_date 分区裁剪 + 索引 order_items_store_platform_sub_unique
|
|
|
|
|
|
$existing_items = OrderItem::query()
|
|
|
|
|
|
->where(function ($query) use ($order_keys) {
|
|
|
|
|
|
foreach ($order_keys as $key) {
|
|
|
|
|
|
$query->orWhere(function ($q) use ($key) {
|
|
|
|
|
|
$q->where('store_id', $key['store_id'])
|
|
|
|
|
|
->where('platform_order_id', $key['platform_order_id'])
|
|
|
|
|
|
->where('created_date', $key['created_date']);
|
|
|
|
|
|
});
|
2025-12-15 15:22:22 +08:00
|
|
|
|
}
|
2026-02-02 13:47:17 +08:00
|
|
|
|
})
|
|
|
|
|
|
->select(['id', 'store_id', 'platform_order_id', 'sub_order_id', 'created_date'])
|
|
|
|
|
|
->get();
|
|
|
|
|
|
|
|
|
|
|
|
if ($existing_items->isEmpty()) {
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 计算差集:集合 A - 集合 B = 需要删除的记录
|
|
|
|
|
|
$ids_to_delete = [];
|
|
|
|
|
|
$created_dates_to_delete = [];
|
|
|
|
|
|
|
|
|
|
|
|
foreach ($existing_items as $existing) {
|
2026-02-02 15:13:46 +08:00
|
|
|
|
// 统一格式化为 Y-m-d H:i:s(不带时区),与新数据保持一致
|
2026-02-02 13:47:17 +08:00
|
|
|
|
$existing_created_date = $existing->created_date instanceof \DateTimeInterface
|
2026-02-02 15:13:46 +08:00
|
|
|
|
? $existing->created_date->format('Y-m-d H:i:s')
|
|
|
|
|
|
: (new \DateTime((string) $existing->created_date))->format('Y-m-d H:i:s');
|
2026-02-02 13:47:17 +08:00
|
|
|
|
|
|
|
|
|
|
$existing_key = sprintf(
|
|
|
|
|
|
'%d|%s|%s|%s',
|
|
|
|
|
|
$existing->store_id,
|
|
|
|
|
|
$existing->platform_order_id,
|
|
|
|
|
|
$existing->sub_order_id,
|
|
|
|
|
|
$existing_created_date
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
// 如果现有记录不在新数据中,则需要删除
|
|
|
|
|
|
if (!isset($new_item_keys[$existing_key])) {
|
|
|
|
|
|
$ids_to_delete[] = $existing->id;
|
|
|
|
|
|
$created_dates_to_delete[$existing_created_date] = true;
|
2025-12-15 15:22:22 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-02 15:13:46 +08:00
|
|
|
|
|
2026-02-02 13:47:17 +08:00
|
|
|
|
// 4. 批量删除(利用 hypertable 主键 (id, created_date) 进行高效删除)
|
|
|
|
|
|
if (!empty($ids_to_delete)) {
|
|
|
|
|
|
$deleted = OrderItem::query()
|
|
|
|
|
|
->whereIn('id', $ids_to_delete)
|
|
|
|
|
|
->whereIn('created_date', array_keys($created_dates_to_delete))
|
|
|
|
|
|
->delete();
|
|
|
|
|
|
|
|
|
|
|
|
if ($deleted > 0) {
|
|
|
|
|
|
dump("Batch deleted {$deleted} obsolete order items");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-12-15 15:22:22 +08:00
|
|
|
|
}
|
2025-11-26 09:44:55 +08:00
|
|
|
|
}
|