Files
datahub/backend/app/Platform/OrderConsumer.php
T

426 lines
17 KiB
PHP
Raw Normal View History

2025-11-26 09:44:55 +08:00
<?php
declare(strict_types=1);
namespace App\Platform;
2025-11-27 15:03:25 +08:00
use App\Entity\Parse\EntityParseFactory;
2025-12-01 11:13:42 +08:00
use App\Utils\Log;
2025-11-26 09:44:55 +08:00
use Hyperf\Amqp\Annotation\Consumer;
2025-12-01 11:13:42 +08:00
use Hyperf\Amqp\Builder\QueueBuilder;
2025-11-26 09:44:55 +08:00
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
2025-12-01 11:13:42 +08:00
use Hyperf\Amqp\Producer;
2025-11-26 09:44:55 +08:00
use PhpAmqpLib\Message\AMQPMessage;
2025-11-27 15:03:25 +08:00
use Hyperf\Di\Annotation\Inject;
use Hyperf\DbConnection\Db;
2025-12-15 15:22:22 +08:00
use App\Model\OrderItem;
2026-01-30 17:02:53 +08:00
use Exception;
2025-12-01 11:13:42 +08:00
use Hyperf\Context\ApplicationContext;
2025-11-27 15:03:25 +08:00
use Throwable;
2025-11-26 09:44:55 +08:00
2025-12-01 11:13:42 +08:00
use function Hyperf\Support\env;
2025-11-27 16:25:53 +08:00
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", pool: "default_consumer", nums: 1, enable: true)]
2025-11-26 09:44:55 +08:00
class OrderConsumer extends ConsumerMessage
{
protected ?array $qos = [
// AMQP 默认并没有实现此配置。
'prefetch_size' => 0,
// 同一个消费者,最高同时可以处理的消息数。
2025-12-01 11:13:42 +08:00
// @attention 默认值 100 test 设置为 1
'prefetch_count' => 1,
2025-11-26 09:44:55 +08:00
// 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。
'global' => false,
];
2025-12-01 11:13:42 +08:00
/**
* 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列
* 这样才能实现重试队列和错误队列的机制
*/
protected bool $requeue = false;
2025-11-26 09:44:55 +08:00
protected $entityType = 'order';
2025-12-01 11:13:42 +08:00
/**
* 重写队列构建器,设置队列参数
* 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误
*
* @return QueueBuilder
*/
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())
->setQueue($this->getQueue())
->setArguments([
'x-dead-letter-exchange' => ['S', 'dlx.orders'],
'x-dead-letter-routing-key' => ['S', 'retry'],
'x-message-ttl' => ['I', 86400000], // 24小时
'x-queue-type' => ['S', 'classic'],
]);
}
2025-11-26 09:44:55 +08:00
public function consumeMessage($data, AMQPMessage $message): Result
{
2025-12-01 11:13:42 +08:00
// 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态
// 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒
$debugDelay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
if ($debugDelay > 0) {
dump("Debug mode: sleeping for {$debugDelay} seconds...");
sleep($debugDelay);
}
2025-11-27 15:03:25 +08:00
// dump('---data');
// dump($data);
// dump('---');
dump('---message');
2025-12-01 16:31:13 +08:00
dump(json_decode($message->getBody(), true)['message_id']);
2025-11-27 15:03:25 +08:00
dump('---');
2025-12-01 11:13:42 +08:00
// 获取重试次数
$retryCount = $this->getRetryCount($message);
$maxRetries = (int) env('AMQP_MAX_RETRIES', 3);
2025-11-27 15:03:25 +08:00
2025-12-01 11:13:42 +08:00
dump("Retry count: {$retryCount}/{$maxRetries}");
2025-11-27 15:03:25 +08:00
2025-12-01 11:13:42 +08:00
try {
2025-12-01 16:31:13 +08:00
// EntityParseFactory 使用静态方法调用,无需依赖注入
$parse = EntityParseFactory::createFromMessage($message);
2025-11-27 15:03:25 +08:00
2026-01-30 17:02:53 +08:00
// 提取 metadata(从 meta 字段中获取)
$meta = $data['meta'] ?? [];
2025-12-01 11:13:42 +08:00
$metadata = [
2026-01-30 17:02:53 +08:00
'company_id' => $meta['company_id'] ?? null,
'platform_id' => $meta['platform_id'] ?? null,
'store_id' => $meta['store_id'] ?? null,
'unique_id' => $meta['unique_id'] ?? null,
2025-12-01 11:13:42 +08:00
];
2025-11-27 15:03:25 +08:00
2026-01-30 17:02:53 +08:00
// entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息
// 获取到的 $entity 示例为属性为空的 \App\Model\Model 对象,比如 \App\Model\Order
2025-12-01 11:13:42 +08:00
$entity = $parse->entityMatch($metadata);
2025-11-27 15:03:25 +08:00
2026-01-30 17:02:53 +08:00
2025-12-01 11:13:42 +08:00
// message 中包含 raw dataraw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
2026-01-30 17:02:53 +08:00
// 注意:raw_data 现在在 data 字段中
$entityMapResult = $parse->entityMap($data['data'] ?? []);
2025-11-27 15:03:25 +08:00
2025-12-12 11:12:52 +08:00
// 将 LazyCollection 转为数组,准备批量操作
$dataToUpsert = $entityMapResult->all();
2026-01-30 17:02:53 +08:00
dump($entityMapResult->first());
2025-12-12 11:12:52 +08:00
if (empty($dataToUpsert)) {
dump('No data to process');
return Result::ACK;
}
dump("Processing " . count($dataToUpsert) . " order(s) with batch upsert");
2026-01-30 17:02:53 +08:00
// 检测订单解析器是否实现了 getOrderItems 方法
if( !method_exists($parse, 'formatOrderItemsFromRaw')){
throw new Exception('formatOrderItemsFromRaw method must be implemented in ' . $parse::class );
}
// 检测订单解析器是否实现了 getOrderStatusId 方法
// 该方法处理了电商平台订单状态到本地订单状态枚举值 \App\Constants\OrderStatus 的映射
if( !method_exists($parse, 'getOrderStatusId')){
throw new Exception('getOrderStatusId method must be implemented in ' . $parse::class );
}
2025-12-15 15:22:22 +08:00
2026-01-30 17:02:53 +08:00
// 检测订单解析器是否实现了 getPaymentMethodId 方法
// 该方法处理了电商平台订单付款方式的枚举值 \App\Constants\PaymentMethod 的映射
if( !method_exists($parse, 'getPaymentMethodId')){
throw new Exception('getPaymentMethodId method must be implemented in ' . $parse::class );
2025-12-15 15:22:22 +08:00
}
2026-01-30 17:02:53 +08:00
// 收集订单数据
$ordersData = $dataToUpsert;
$rawData = $data['data'] ?? [];
2025-12-01 11:13:42 +08:00
Db::beginTransaction();
2025-12-15 15:22:22 +08:00
// @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理
// @attention 如果多条记录中有个别记录请求失败,可能会导致该批次写入失败,此时则需要判断和修复
2025-11-27 15:03:25 +08:00
2025-12-15 15:22:22 +08:00
// 1. 使用 upsert 批量处理订单插入和更新
2025-12-12 11:12:52 +08:00
// 利用数据库唯一索引自动判断是插入还是更新
// 解决了重复订单推送的问题:存在则更新,不存在则插入
2026-01-30 17:02:53 +08:00
$uniqueBy = $parse->getUniqueBy();
2025-12-12 11:12:52 +08:00
$entity->newQuery()->upsert(
2025-12-15 15:22:22 +08:00
$ordersData,
2026-01-30 17:02:53 +08:00
$uniqueBy, // 从解析器获取唯一键字段
2025-12-12 11:12:52 +08:00
$parse->getUpdateFields() // 从解析器获取可更新字段
);
2025-11-27 15:03:25 +08:00
2026-01-30 17:02:53 +08:00
// 2. 查询获取 ID 映射 [platform_order_id => local db id]
// 通过唯一键查询刚写入的订单,获取数据库生成的 ID
$idMapping = $entity->newQuery()
->where(function ($query) use ($ordersData, $uniqueBy) {
foreach ($ordersData as $orderData) {
$query->orWhere(function ($q) use ($orderData, $uniqueBy) {
foreach ($uniqueBy as $key) {
$q->where($key, $orderData[$key]);
}
});
}
})
->pluck('id', 'platform_order_id')
->toArray();
dump("ID mapping: " . count($idMapping) . " orders");
// 3. 格式化订单子项(传入 ID 映射,使子项能直接获取正确的 order_id)
$items = $parse->formatOrderItemsFromRaw($rawData, $idMapping);
// 4. 处理订单子项
2025-12-15 15:22:22 +08:00
// 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem
2026-01-30 17:02:53 +08:00
$this->processOrderItems($items);
2025-12-15 15:22:22 +08:00
2025-11-27 15:03:25 +08:00
Db::commit();
2025-12-15 15:22:22 +08:00
// @TODO 触发事件通知,更新自动聚合任务
2025-11-27 15:03:25 +08:00
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
return Result::ACK;
} catch (Throwable $error) {
2025-12-01 16:31:13 +08:00
dump("=== Error Caught ===");
dump("Error: " . $error->getMessage());
2026-01-30 17:02:53 +08:00
dump("File: " . $error->getFile() . ":" . $error->getLine());
dump("Stack trace:");
dump($error->getTraceAsString());
2025-12-01 16:31:13 +08:00
dump("Retry Count: {$retryCount}");
dump("Max Retries: {$maxRetries}");
dump("Check: {$retryCount} >= {$maxRetries} = " . ($retryCount >= $maxRetries ? 'TRUE' : 'FALSE'));
2025-12-01 11:13:42 +08:00
Log::get()->error('Consumer processing failed', [
'error' => $error->getMessage(),
'retry_count' => $retryCount,
'max_retries' => $maxRetries,
]);
2025-11-27 15:03:25 +08:00
Db::rollBack();
2025-12-01 11:13:42 +08:00
// 检查是否超过最大重试次数
if ($retryCount >= $maxRetries) {
// 超过重试次数,发送到错误队列
2025-12-01 16:31:13 +08:00
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
try {
$this->sendToErrorQueue($message, $error);
dump(">>> Successfully sent to error queue!");
} catch (Throwable $e) {
dump(">>> FAILED to send to error queue: " . $e->getMessage());
dump(">>> Stack trace: " . $e->getTraceAsString());
}
2025-12-01 11:13:42 +08:00
// 返回 ACK 避免消息再次重试
// 因为消息已经被发送到错误队列,不应该继续在主队列中循环
2025-12-01 16:31:13 +08:00
dump(">>> Returning ACK to prevent further retries");
2025-12-01 11:13:42 +08:00
return Result::ACK;
}
// 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列
2025-12-01 16:31:13 +08:00
dump(">>> Retry not exceeded, sending to DLX (NACK)");
2025-11-27 15:03:25 +08:00
return Result::NACK;
}
2025-11-26 09:44:55 +08:00
}
public function isEnable(): bool
{
return parent::isEnable();
}
2025-12-01 11:13:42 +08:00
/**
* 获取消息的重试次数
* 通过检查 x-death header 中的 count 字段
*
* @param AMQPMessage $message
* @return int
*/
protected function getRetryCount(AMQPMessage $message): int
{
2025-12-01 16:31:13 +08:00
// 检查是否存在 application_headers 属性
// 首次处理的消息不会有这个属性,必须先用 has() 检查
if (!$message->has('application_headers')) {
dump(">>> No application_headers, first time processing");
return 0;
}
2025-12-01 11:13:42 +08:00
$headers = $message->get('application_headers');
if (!$headers) {
2025-12-01 16:31:13 +08:00
dump(">>> application_headers exists but is empty");
2025-12-01 11:13:42 +08:00
return 0;
}
$headerData = $headers->getNativeData();
$xDeath = $headerData['x-death'] ?? [];
2025-12-01 16:31:13 +08:00
dump(">>> x-death header data:");
dump($xDeath);
2025-12-01 11:13:42 +08:00
if (empty($xDeath)) {
2025-12-01 16:31:13 +08:00
dump(">>> x-death is empty");
2025-12-01 11:13:42 +08:00
return 0; // 首次失败
}
// x-death 是一个数组,第一个元素包含 count 字段
// count 表示消息从该队列死信的次数
2025-12-01 16:31:13 +08:00
$count = $xDeath[0]['count'] ?? 0;
dump(">>> Extracted count from x-death: {$count}");
return $count;
2025-12-01 11:13:42 +08:00
}
/**
* 发送消息到错误队列
* 当重试次数超过上限时调用
*
* @param AMQPMessage $message 原始消息
* @param Throwable $error 错误信息
* @return void
*/
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
{
try {
2025-12-01 16:31:13 +08:00
$retryCount = $this->getRetryCount($message);
2025-12-01 11:13:42 +08:00
2025-12-01 16:31:13 +08:00
// 使用 ErrorProducer 发送到错误队列
$producer = ApplicationContext::getContainer()->get(Producer::class);
$errorProducer = new ErrorProducer($message, $error, $retryCount);
$producer->produce($errorProducer);
2025-12-01 11:13:42 +08:00
2025-12-01 16:31:13 +08:00
// 记录日志
2025-12-01 11:13:42 +08:00
Log::get()->warning('Message sent to error queue after exceeding retry limit', [
2025-12-01 16:31:13 +08:00
'error_id' => $errorProducer->payload['error_id'] ?? 'unknown',
'retry_count' => $retryCount,
2025-12-01 11:13:42 +08:00
'error_message' => $error->getMessage(),
]);
} catch (Throwable $e) {
// 发送到错误队列失败,记录日志
2025-12-01 16:31:13 +08:00
dump($e->getMessage());
2025-12-01 11:13:42 +08:00
Log::get()->error('Failed to send message to error queue', [
'error' => $e->getMessage(),
'original_error' => $error->getMessage(),
]);
}
}
2025-12-15 15:22:22 +08:00
/**
* 处理订单子项的批量同步(优化版本)
*
* 策略优化:使用业务键 (store_id, platform_order_id, sub_order_id) 作为唯一性约束
* 1. 直接批量 upsert OrderItems(无需先查询 order_id
* 2. 批量更新 order_id(通过 JOIN orders 表)
* 3. 删除不在新数据中的旧 OrderItem(完全同步)
*
* 性能优势:减少一次批量查询,直接使用业务键进行 upsert
*
* @param array $itemsByPlatformOrderId 以 platform_order_id 为键的子项数据数组
* @return void
*/
protected function processOrderItems(array $itemsByPlatformOrderId): void
{
if (empty($itemsByPlatformOrderId)) {
dump('No order items to process');
return;
}
// 1. 构建所有子项数据(无需查询 order_id)
$allItemsToUpsert = [];
$itemSubOrderIdsByPlatformOrderId = []; // 记录每个平台订单的新子项 ID 列表
foreach ($itemsByPlatformOrderId as $platformOrderId => $items) {
$subOrderIds = [];
foreach ($items as $item) {
2026-01-30 17:02:53 +08:00
// order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充
2025-12-15 15:22:22 +08:00
$allItemsToUpsert[] = $item;
$subOrderIds[] = $item['sub_order_id'];
}
$itemSubOrderIdsByPlatformOrderId[$platformOrderId] = $subOrderIds;
}
if (empty($allItemsToUpsert)) {
dump('No valid order items to upsert');
return;
}
dump("Upserting " . count($allItemsToUpsert) . " order items");
// 2. 批量 upsert OrderItems(使用业务键作为唯一性约束)
2026-01-30 17:02:53 +08:00
// order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充,无需后置 JOIN 更新
2025-12-15 15:22:22 +08:00
OrderItem::query()->upsert(
$allItemsToUpsert,
['store_id', 'platform_order_id', 'sub_order_id'], // 唯一键(业务键)
[
2026-01-30 17:02:53 +08:00
'order_id', // 直接更新 order_id
2025-12-15 15:22:22 +08:00
'company_id',
'platform_id',
'sub_order_type_id',
'product_id',
'platform_product_id',
'product_sku',
'product_barcode',
'unit_price',
'quantity',
'discount',
'total',
2026-01-30 17:02:53 +08:00
'created_date',
2025-12-15 15:22:22 +08:00
'ext',
'updated_at',
2026-01-30 17:02:53 +08:00
]
2025-12-15 15:22:22 +08:00
);
2026-01-30 17:02:53 +08:00
dump("Upserted order items with order_id");
2025-12-15 15:22:22 +08:00
// 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略)
// 优化:一次性删除所有不匹配的旧子项,而不是逐个订单处理
// 此部分业务应该很少被调用,订单子项新增或删减的情况很少见
if (!empty($allItemsToUpsert)) {
// 构建本次更新的所有 (store_id, platform_order_id) 组合(用于限定删除范围)
$updatedOrders = [];
foreach ($itemsByPlatformOrderId as $platformOrderId => $items) {
if (!empty($items)) {
$storeId = (int)$items[0]['store_id'];
$platformOrderId = addslashes($platformOrderId);
$updatedOrders[] = "({$storeId}, '{$platformOrderId}')";
}
}
// 构建本次更新的所有 (store_id, platform_order_id, sub_order_id) 组合(保留的记录)
$validItems = [];
foreach ($allItemsToUpsert as $item) {
$storeId = (int)$item['store_id'];
$platformOrderId = addslashes($item['platform_order_id']);
$subOrderId = addslashes($item['sub_order_id']);
$validItems[] = "({$storeId}, '{$platformOrderId}', '{$subOrderId}')";
}
if (!empty($updatedOrders) && !empty($validItems)) {
$ordersIn = implode(', ', $updatedOrders);
$itemsNotIn = implode(', ', $validItems);
// 批量删除:删除在本次更新订单范围内,但不在新数据中的旧子项
// DELETE FROM order_items
// WHERE (store_id, platform_order_id) IN ((1, 'order1'), (1, 'order2'))
// AND (store_id, platform_order_id, sub_order_id) NOT IN ((1, 'order1', 'item1'), ...)
$deleted = Db::delete("
DELETE FROM order_items
WHERE (store_id, platform_order_id) IN ({$ordersIn})
AND (store_id, platform_order_id, sub_order_id) NOT IN ({$itemsNotIn})
");
if ($deleted > 0) {
dump("Batch deleted {$deleted} obsolete order items");
}
}
}
dump("Order items processing completed");
}
2025-11-26 09:44:55 +08:00
}