Files
datahub/backend/app/Platform/OrderConsumer.php
T
2026-02-05 14:20:52 +08:00

482 lines
19 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace App\Platform;
use App\Entity\Parse\EntityParseFactory;
use App\Utils\Log;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Builder\QueueBuilder;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Producer;
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\DbConnection\Db;
use App\Model\OrderItem;
use Exception;
use Hyperf\Context\ApplicationContext;
use Throwable;
use function Hyperf\Support\env;
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", pool: "default_consumer", nums: 1, enable: true)]
class OrderConsumer extends ConsumerMessage
{
protected ?array $qos = [
// AMQP 默认并没有实现此配置。
'prefetch_size' => 0,
// 同一个消费者,最高同时可以处理的消息数。
// @attention 默认值 100 test 设置为 1
'prefetch_count' => 1,
// 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。
'global' => false,
];
/**
* 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列
* 这样才能实现重试队列和错误队列的机制
*/
protected bool $requeue = false;
protected $entityType = 'order';
/**
* 重写队列构建器,设置队列参数
* 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误
*
* @return QueueBuilder
*/
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())
->setQueue($this->getQueue())
->setArguments([
'x-dead-letter-exchange' => ['S', 'dlx.orders'],
'x-dead-letter-routing-key' => ['S', 'retry'],
'x-message-ttl' => ['I', 86400000], // 24小时
'x-queue-type' => ['S', 'classic'],
]);
}
public function consumeMessage($data, AMQPMessage $message): Result
{
// 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态
// 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒
$debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
if ($debug_delay > 0) {
dump("Debug mode: sleeping for {$debug_delay} seconds...");
sleep($debug_delay);
}
// 获取重试次数
$retry_count = $this->getRetryCount($message);
$max_retries = (int) env('AMQP_MAX_RETRIES', 3);
dump("Retry count: {$retry_count}/{$max_retries}");
try {
// EntityParseFactory 使用静态方法调用,无需依赖注入
$parse = EntityParseFactory::createFromMessage($message);
// 提取 metadata(从 meta 字段中获取)
$meta = $data['meta'] ?? [];
$metadata = [
'company_id' => $meta['company_id'] ?? null,
'platform_id' => $meta['platform_id'] ?? null,
'store_id' => $meta['store_id'] ?? null,
'unique_id' => $meta['unique_id'] ?? null,
];
// entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息
// 获取到的 $entity 示例为属性为空的 \App\Model\Model 对象,比如 \App\Model\Order
$entity = $parse->entityMatch($metadata);
// message 中包含 raw dataraw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
// 注意:raw_data 现在在 data 字段中
$entity_map_result = $parse->entityMap($data['data'] ?? []);
// 将 LazyCollection 转为数组,准备批量操作
$data_to_upsert = $entity_map_result->all();
dump($entity_map_result->first());
if (empty($data_to_upsert)) {
dump('No data to process');
return Result::ACK;
}
dump("Processing " . count($data_to_upsert) . " order(s) with batch upsert");
// 检测订单解析器是否实现了 getOrderItems 方法
if( !method_exists($parse, 'formatOrderItemsFromRaw')){
throw new Exception('formatOrderItemsFromRaw method must be implemented in ' . $parse::class );
}
// 检测订单解析器是否实现了 getOrderStatusId 方法
// 该方法处理了电商平台订单状态到本地订单状态枚举值 \App\Constants\OrderStatus 的映射
if( !method_exists($parse, 'getOrderStatusId')){
throw new Exception('getOrderStatusId method must be implemented in ' . $parse::class );
}
// 检测订单解析器是否实现了 getPaymentMethodId 方法
// 该方法处理了电商平台订单付款方式的枚举值 \App\Constants\PaymentMethod 的映射
if( !method_exists($parse, 'getPaymentMethodId')){
throw new Exception('getPaymentMethodId method must be implemented in ' . $parse::class );
}
// 收集订单数据
$orders_data = $data_to_upsert;
$raw_data = $data['data'] ?? [];
Db::beginTransaction();
// @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理
// @attention 如果多条记录中有个别记录请求失败,可能会导致该批次写入失败,此时则需要判断和修复
// 1. 使用 upsert 批量处理订单插入和更新
// 利用数据库唯一索引自动判断是插入还是更新
// 解决了重复订单推送的问题:存在则更新,不存在则插入
$unique_by = $parse->getUniqueBy();
$entity->newQuery()->upsert(
$orders_data,
$unique_by, // 从解析器获取唯一键字段
$parse->getUpdateFields() // 从解析器获取可更新字段
);
// 2. 查询获取 ID 映射 [platform_order_id => local db id]
// 通过唯一键查询刚写入的订单,获取数据库生成的 ID
$platform_orders_id_to_local_db_order_id_map = $entity->newQuery()
->where(function ($query) use ($orders_data, $unique_by) {
foreach ($orders_data as $order_data) {
$query->orWhere(function ($q) use ($order_data, $unique_by) {
foreach ($unique_by as $key) {
$q->where($key, $order_data[$key]);
}
});
}
})
->pluck('id', 'platform_order_id')
->toArray();
dump("ID mapping: " . count($platform_orders_id_to_local_db_order_id_map) . " orders");
// 3. 格式化订单子项(传入 ID 映射,使子项能直接获取正确的 order_id)
$items = $parse->formatOrderItemsFromRaw($raw_data, $platform_orders_id_to_local_db_order_id_map);
// 4. 处理订单子项
// 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem
$this->processOrderItems($items);
Db::commit();
// @TODO 触发事件通知,更新自动聚合任务
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
return Result::ACK;
} catch (Throwable $error) {
// dump("=== Error Caught ===");
// dump("Error: " . $error->getMessage());
// dump("File: " . $error->getFile() . ":" . $error->getLine());
// dump("Stack trace:");
// dump($error->getTraceAsString());
// dump("Retry Count: {$retry_count}");
// dump("Max Retries: {$max_retries}");
// dump("Check: {$retry_count} >= {$max_retries} = " . ($retry_count >= $max_retries ? 'TRUE' : 'FALSE'));
Log::get()->error('Consumer processing failed', [
'error' => $error->getMessage(),
'retry_count' => $retry_count,
'max_retries' => $max_retries,
]);
Db::rollBack();
// 检查是否超过最大重试次数
if ($retry_count >= $max_retries) {
// 超过重试次数,发送到错误队列
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
try {
$this->sendToErrorQueue($message, $error);
dump(">>> Successfully sent to error queue!");
} catch (Throwable $e) {
dump(">>> FAILED to send to error queue: " . $e->getMessage());
dump(">>> Stack trace: " . $e->getTraceAsString());
}
// 返回 ACK 避免消息再次重试
// 因为消息已经被发送到错误队列,不应该继续在主队列中循环
dump(">>> Returning ACK to prevent further retries");
return Result::ACK;
}
// 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列
dump(">>> Retry not exceeded, sending to DLX (NACK)");
return Result::NACK;
}
}
public function isEnable(): bool
{
return parent::isEnable();
}
/**
* 获取消息的重试次数
* 通过检查 x-death header 中的 count 字段
*
* @param AMQPMessage $message
* @return int
*/
protected function getRetryCount(AMQPMessage $message): int
{
// 检查是否存在 application_headers 属性
// 首次处理的消息不会有这个属性,必须先用 has() 检查
if (!$message->has('application_headers')) {
dump(">>> No application_headers, first time processing");
return 0;
}
$headers = $message->get('application_headers');
if (!$headers) {
dump(">>> application_headers exists but is empty");
return 0;
}
$headerData = $headers->getNativeData();
$xDeath = $headerData['x-death'] ?? [];
dump(">>> x-death header data:");
dump($xDeath);
if (empty($xDeath)) {
dump(">>> x-death is empty");
return 0; // 首次失败
}
// x-death 是一个数组,第一个元素包含 count 字段
// count 表示消息从该队列死信的次数
$count = $xDeath[0]['count'] ?? 0;
dump(">>> Extracted count from x-death: {$count}");
return $count;
}
/**
* 发送消息到错误队列
* 当重试次数超过上限时调用
*
* @param AMQPMessage $message 原始消息
* @param Throwable $error 错误信息
* @return void
*/
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
{
try {
$retry_count = $this->getRetryCount($message);
// 使用 ErrorProducer 发送到错误队列
$producer = ApplicationContext::getContainer()->get(Producer::class);
$error_producer = new ErrorProducer($message, $error, $retry_count);
$producer->produce($error_producer);
// 记录日志
Log::get()->warning('Message sent to error queue after exceeding retry limit', [
'error_id' => $error_producer->payload['error_id'] ?? 'unknown',
'retry_count' => $retry_count,
'error_message' => $error->getMessage(),
]);
} catch (Throwable $e) {
// 发送到错误队列失败,记录日志
dump($e->getMessage());
Log::get()->error('Failed to send message to error queue', [
'error' => $e->getMessage(),
'original_error' => $error->getMessage(),
]);
}
}
/**
* 处理订单子项的批量同步(优化版本)
*
* 策略优化:使用业务键 (store_id, platform_order_id, sub_order_id) 作为唯一性约束
* 1. 直接批量 upsert OrderItems(无需先查询 order_id
* 2. 批量更新 order_id(通过 JOIN orders 表)
* 3. 删除不在新数据中的旧 OrderItem(完全同步)
*
* 性能优势:减少一次批量查询,直接使用业务键进行 upsert
*
* @param array $items_by_platform_order_id 以 platform_order_id 为键的子项数据数组
* @return void
*/
protected function processOrderItems(array $items_by_platform_order_id): void
{
dump('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!');
dump('processOrderItems');
dump($items_by_platform_order_id);
dump('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!');
if (empty($items_by_platform_order_id)) {
dump('No order items to process');
return;
}
// 1. 构建所有子项数据(无需查询 order_id)
$all_items_to_upsert = [];
$item_sub_order_ids_by_platform_order_id = []; // 记录每个平台订单的新子项 ID 列表
foreach ($items_by_platform_order_id as $item) {
$platform_order_id = $item['platform_order_id'];
// order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充
$all_items_to_upsert[] = $item;
$item_sub_order_ids_by_platform_order_id[$platform_order_id] = $item['sub_order_id'];
}
if (empty($all_items_to_upsert)) {
dump('No valid order items to upsert');
return;
}
dump("Upserting " . count($all_items_to_upsert) . " order items");
// 2. 批量 upsert OrderItems(使用业务键作为唯一性约束)
// order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充,无需后置 JOIN 更新
OrderItem::query()->upsert(
$all_items_to_upsert,
['store_id', 'platform_order_id', 'sub_order_id', 'created_date'], // 唯一键(业务键)
[
'order_id', // 直接更新 order_id
'company_id',
'platform_id',
'sub_order_type_id',
'product_id',
'platform_product_id',
'product_sku',
'product_barcode',
'unit_price',
'quantity',
'discount',
'total',
'ext',
'updated_at',
]
);
dump("Upserted order items with order_id");
// 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略)
// 利用 hypertable 按 created_date 分区的特性,确保删除条件包含 created_date 以触发分区裁剪
// 此部分业务应该很少被调用,订单子项新增或删减的情况很少见
$this->deleteObsoleteOrderItems($all_items_to_upsert);
dump("Order items processing completed");
}
/**
* 删除不在新数据中的旧订单子项
*
* 策略:集合差集(1 次 SELECT + 1 次 DELETE
* 1. 查询数据库中涉及订单的所有 sub_order_id(集合 A
* 2. 从新数据提取 sub_order_id(集合 B
* 3. 集合 A - B = 需要删除的记录
*
* @param array $new_items 新的订单子项数据
* @return void
*/
protected function deleteObsoleteOrderItems(array $new_items): void
{
if (empty($new_items)) {
return;
}
// 1. 提取唯一的订单键 (store_id, platform_order_id, created_date) 和新数据的完整键
$order_keys = [];
$new_item_keys = []; // 集合 B
foreach ($new_items as $item) {
// 统一格式化为 Y-m-d H:i:s(不带时区),确保与数据库查询结果匹配
$created_date = $item['created_date'] instanceof \DateTimeInterface
? $item['created_date']->format('Y-m-d H:i:s')
: (new \DateTime($item['created_date']))->format('Y-m-d H:i:s');
// 订单级别的键(用于限定查询范围)
$order_key = sprintf('%d|%s|%s', $item['store_id'], $item['platform_order_id'], $created_date);
if (!isset($order_keys[$order_key])) {
$order_keys[$order_key] = [
'store_id' => $item['store_id'],
'platform_order_id' => $item['platform_order_id'],
'created_date' => $created_date,
];
}
// 子项级别的键(用于差集计算)
$item_key = sprintf('%d|%s|%s|%s', $item['store_id'], $item['platform_order_id'], $item['sub_order_id'], $created_date);
$new_item_keys[$item_key] = true;
}
// 2. 查询数据库中这些订单的所有现有 sub_order_id(集合 A
// 利用 hypertable 的 created_date 分区裁剪 + 索引 order_items_store_platform_sub_unique
$existing_items = OrderItem::query()
->where(function ($query) use ($order_keys) {
foreach ($order_keys as $key) {
$query->orWhere(function ($q) use ($key) {
$q->where('store_id', $key['store_id'])
->where('platform_order_id', $key['platform_order_id'])
->where('created_date', $key['created_date']);
});
}
})
->select(['id', 'store_id', 'platform_order_id', 'sub_order_id', 'created_date'])
->get();
if ($existing_items->isEmpty()) {
return;
}
// 3. 计算差集:集合 A - 集合 B = 需要删除的记录
$ids_to_delete = [];
$created_dates_to_delete = [];
foreach ($existing_items as $existing) {
// 统一格式化为 Y-m-d H:i:s(不带时区),与新数据保持一致
$existing_created_date = $existing->created_date instanceof \DateTimeInterface
? $existing->created_date->format('Y-m-d H:i:s')
: (new \DateTime((string) $existing->created_date))->format('Y-m-d H:i:s');
$existing_key = sprintf(
'%d|%s|%s|%s',
$existing->store_id,
$existing->platform_order_id,
$existing->sub_order_id,
$existing_created_date
);
// 如果现有记录不在新数据中,则需要删除
if (!isset($new_item_keys[$existing_key])) {
$ids_to_delete[] = $existing->id;
$created_dates_to_delete[$existing_created_date] = true;
}
}
// 4. 批量删除(利用 hypertable 主键 (id, created_date) 进行高效删除)
if (!empty($ids_to_delete)) {
$deleted = OrderItem::query()
->whereIn('id', $ids_to_delete)
->whereIn('created_date', array_keys($created_dates_to_delete))
->delete();
if ($deleted > 0) {
dump("Batch deleted {$deleted} obsolete order items");
}
}
}
}