Files
datahub/backend/app/Platform/RefundConsumer.php
T
2026-03-17 10:36:38 +08:00

242 lines
7.8 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace App\Platform;
use App\Entity\Parse\EntityParseFactory;
use App\Model\RefundItem;
use App\Platform\Traits\FailedMessageTrait;
use App\Utils\Log;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Builder\QueueBuilder;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\DbConnection\Db;
use Throwable;
use function Hyperf\Support\env;
#[Consumer(exchange: "main.exchange", routingKey: "refund.#", queue: "refunds.queue", pool: "default_consumer", nums: 1, enable: true)]
class RefundConsumer extends ConsumerMessage
{
use FailedMessageTrait;
protected ?array $qos = [
'prefetch_size' => 0,
'prefetch_count' => 1,
'global' => false,
];
/**
* 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列
*/
protected bool $requeue = false;
protected $entityType = 'refund';
/**
* 重写队列构建器,设置队列参数
*
* @return QueueBuilder
*/
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())
->setQueue($this->getQueue())
->setArguments([
'x-dead-letter-exchange' => ['S', 'dlx.refunds'],
'x-dead-letter-routing-key' => ['S', 'retry'],
'x-message-ttl' => ['I', 86400000], // 24小时
'x-queue-type' => ['S', 'classic'],
]);
}
public function consumeMessage($data, AMQPMessage $message): Result
{
$debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
if ($debug_delay > 0) {
dump("Debug mode: sleeping for {$debug_delay} seconds...");
sleep($debug_delay);
}
$retry_count = $this->getRetryCount($message);
$max_retries = (int) env('AMQP_MAX_RETRIES', 3);
dump("Retry count: {$retry_count}/{$max_retries}");
try {
$parse = EntityParseFactory::createFromMessage($message);
$meta = $data['meta'] ?? [];
$metadata = [
'company_id' => $meta['company_id'] ?? null,
'platform_id' => $meta['platform_id'] ?? null,
'store_id' => $meta['store_id'] ?? null,
'unique_id' => $meta['unique_id'] ?? null,
];
$entity = $parse->entityMatch($metadata);
$raw_data = $data['data'] ?? [];
$entity_map_result = $parse->entityMap($raw_data);
$data_to_upsert = $entity_map_result->all();
$platform_refund_id_to_local_refund_id_map = null;
// 退款信息为空且有父退款,直接返回 ACK
if (empty($data_to_upsert) && $parse->hasParentRefund()) {
dump('No data to process');
return Result::ACK;
}
// 无父级退款,直接使用平台售后单的情况, 比如 Tmall
if(!$parse->hasParentRefund()){
goto PROCESS_REFUND_ITEMS;
}
// 有父退款,且要处理退款子项的情况
$refunds_data = $data_to_upsert;
$unique_by = $parse->getUniqueBy();
Db::beginTransaction();
// 1. 批量 upsert refunds
$entity->newQuery()->upsert(
$refunds_data,
$unique_by,
$parse->getUpdateFields()
);
// 2. 查询获取 ID 映射 [platform_refund_id => local db refund id]
$platform_refund_id_to_local_refund_id_map = $entity->newQuery()
->where(function ($query) use ($refunds_data, $unique_by) {
foreach ($refunds_data as $refund_data) {
$query->orWhere(function ($q) use ($refund_data, $unique_by) {
foreach ($unique_by as $key) {
$q->where($key, $refund_data[$key]);
}
});
}
})
->pluck('id', 'platform_refund_id')
->toArray();
if ($parse->hasParentRefund() && empty($platform_refund_id_to_local_refund_id_map)) {
Log::get()->warning('Refund ID mapping is empty after upsert, refund_items.refund_id will not be populated', [
'store_id' => $parse->getStore()->id,
'platform' => $parse->getPlatform()->name ?? '',
]);
}
// @TODO refund type 也需要进一步确定 app/Constants/RefundType.php
// @attention 没有父退款的平台,业务直接跳到此处执行
PROCESS_REFUND_ITEMS:
// 3. 格式化退款子项(传入 ID 映射,使子项能获取正确的 refund_id)
$items = $parse->formatRefundItemsFromRaw($raw_data, $platform_refund_id_to_local_refund_id_map);
// 4. 处理退款子项
$this->processRefundItems($items);
// 5. Tmall 闪电退款订单重建(从退款数据恢复缺失的 Order/OrderItem
// @TODO 添加平台检查,而不仅限于 Tmall, 其他平台可能存在类似情况
if (method_exists($parse, 'fixInstantRefundOrders')) {
$parse->fixInstantRefundOrders($raw_data);
}
Db::commit();
return Result::ACK;
} catch (Throwable $error) {
dump($error->getMessage());
dump($error->getTraceAsString());
Log::get()->error('Refund consumer processing failed', [
'error' => $error->getMessage(),
'retry_count' => $retry_count,
'max_retries' => $max_retries,
]);
Db::rollBack();
if ($retry_count >= $max_retries) {
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
try {
$this->sendToErrorQueue($message, $error);
dump(">>> Successfully sent to error queue!");
} catch (Throwable $e) {
dump(">>> FAILED to send to error queue: " . $e->getMessage());
}
dump(">>> Returning ACK to prevent further retries");
return Result::ACK;
}
dump(">>> Retry not exceeded, sending to DLX (NACK)");
return Result::NACK;
}
}
public function isEnable(): bool
{
return parent::isEnable();
}
/**
* 处理退款子项的批量同步
*
* 使用 upsert 批量写入 refund_items 表
* 唯一键:store_id + platform_parent_refund_id + platform_refund_id
*
* @param array $items 退款子项数据数组
* @return void
*/
protected function processRefundItems(array $items): void
{
if (empty($items)) {
dump('No refund items to process');
return;
}
dump("Upserting " . count($items) . " refund items");
RefundItem::query()->upsert(
$items,
['store_id', 'platform_parent_refund_id', 'platform_refund_id'],
[
'refund_id',
'company_id',
'platform_id',
'refund_status_id',
'refund_type_id',
'reason',
'currency',
'buyer_user_id',
'platform_order_id',
'platform_sub_order_id',
'platform_product_id',
'quantity',
'refund_amount',
'order_created_date',
'order_paid_date',
'updated_date',
'completed_date',
'raw',
'ext',
'updated_at',
]
);
dump("Refund items processing completed");
}
}