242 lines
7.8 KiB
PHP
242 lines
7.8 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace App\Platform;
|
||
|
||
use App\Entity\Parse\EntityParseFactory;
|
||
use App\Model\RefundItem;
|
||
use App\Platform\Traits\FailedMessageTrait;
|
||
use App\Utils\Log;
|
||
use Hyperf\Amqp\Annotation\Consumer;
|
||
use Hyperf\Amqp\Builder\QueueBuilder;
|
||
use Hyperf\Amqp\Message\ConsumerMessage;
|
||
use Hyperf\Amqp\Result;
|
||
use PhpAmqpLib\Message\AMQPMessage;
|
||
use Hyperf\DbConnection\Db;
|
||
use Throwable;
|
||
|
||
use function Hyperf\Support\env;
|
||
|
||
#[Consumer(exchange: "main.exchange", routingKey: "refund.#", queue: "refunds.queue", pool: "default_consumer", nums: 1, enable: true)]
|
||
class RefundConsumer extends ConsumerMessage
|
||
{
|
||
use FailedMessageTrait;
|
||
|
||
protected ?array $qos = [
|
||
'prefetch_size' => 0,
|
||
'prefetch_count' => 1,
|
||
'global' => false,
|
||
];
|
||
|
||
/**
|
||
* 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列
|
||
*/
|
||
protected bool $requeue = false;
|
||
|
||
protected $entityType = 'refund';
|
||
|
||
/**
|
||
* 重写队列构建器,设置队列参数
|
||
*
|
||
* @return QueueBuilder
|
||
*/
|
||
public function getQueueBuilder(): QueueBuilder
|
||
{
|
||
return (new QueueBuilder())
|
||
->setQueue($this->getQueue())
|
||
->setArguments([
|
||
'x-dead-letter-exchange' => ['S', 'dlx.refunds'],
|
||
'x-dead-letter-routing-key' => ['S', 'retry'],
|
||
'x-message-ttl' => ['I', 86400000], // 24小时
|
||
'x-queue-type' => ['S', 'classic'],
|
||
]);
|
||
}
|
||
|
||
public function consumeMessage($data, AMQPMessage $message): Result
|
||
{
|
||
$debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
|
||
if ($debug_delay > 0) {
|
||
dump("Debug mode: sleeping for {$debug_delay} seconds...");
|
||
sleep($debug_delay);
|
||
}
|
||
|
||
$retry_count = $this->getRetryCount($message);
|
||
$max_retries = (int) env('AMQP_MAX_RETRIES', 3);
|
||
|
||
dump("Retry count: {$retry_count}/{$max_retries}");
|
||
|
||
try {
|
||
$parse = EntityParseFactory::createFromMessage($message);
|
||
|
||
$meta = $data['meta'] ?? [];
|
||
$metadata = [
|
||
'company_id' => $meta['company_id'] ?? null,
|
||
'platform_id' => $meta['platform_id'] ?? null,
|
||
'store_id' => $meta['store_id'] ?? null,
|
||
'unique_id' => $meta['unique_id'] ?? null,
|
||
];
|
||
|
||
$entity = $parse->entityMatch($metadata);
|
||
|
||
$raw_data = $data['data'] ?? [];
|
||
$entity_map_result = $parse->entityMap($raw_data);
|
||
|
||
$data_to_upsert = $entity_map_result->all();
|
||
$platform_refund_id_to_local_refund_id_map = null;
|
||
|
||
// 退款信息为空且有父退款,直接返回 ACK
|
||
|
||
if (empty($data_to_upsert) && $parse->hasParentRefund()) {
|
||
dump('No data to process');
|
||
return Result::ACK;
|
||
}
|
||
|
||
// 无父级退款,直接使用平台售后单的情况, 比如 Tmall
|
||
if(!$parse->hasParentRefund()){
|
||
goto PROCESS_REFUND_ITEMS;
|
||
}
|
||
|
||
// 有父退款,且要处理退款子项的情况
|
||
|
||
$refunds_data = $data_to_upsert;
|
||
$unique_by = $parse->getUniqueBy();
|
||
|
||
Db::beginTransaction();
|
||
|
||
// 1. 批量 upsert refunds
|
||
$entity->newQuery()->upsert(
|
||
$refunds_data,
|
||
$unique_by,
|
||
$parse->getUpdateFields()
|
||
);
|
||
|
||
// 2. 查询获取 ID 映射 [platform_refund_id => local db refund id]
|
||
$platform_refund_id_to_local_refund_id_map = $entity->newQuery()
|
||
->where(function ($query) use ($refunds_data, $unique_by) {
|
||
foreach ($refunds_data as $refund_data) {
|
||
$query->orWhere(function ($q) use ($refund_data, $unique_by) {
|
||
foreach ($unique_by as $key) {
|
||
$q->where($key, $refund_data[$key]);
|
||
}
|
||
});
|
||
}
|
||
})
|
||
->pluck('id', 'platform_refund_id')
|
||
->toArray();
|
||
|
||
if ($parse->hasParentRefund() && empty($platform_refund_id_to_local_refund_id_map)) {
|
||
Log::get()->warning('Refund ID mapping is empty after upsert, refund_items.refund_id will not be populated', [
|
||
'store_id' => $parse->getStore()->id,
|
||
'platform' => $parse->getPlatform()->name ?? '',
|
||
]);
|
||
}
|
||
|
||
|
||
// @TODO refund type 也需要进一步确定 app/Constants/RefundType.php
|
||
|
||
|
||
// @attention 没有父退款的平台,业务直接跳到此处执行
|
||
PROCESS_REFUND_ITEMS:
|
||
|
||
// 3. 格式化退款子项(传入 ID 映射,使子项能获取正确的 refund_id)
|
||
$items = $parse->formatRefundItemsFromRaw($raw_data, $platform_refund_id_to_local_refund_id_map);
|
||
|
||
// 4. 处理退款子项
|
||
$this->processRefundItems($items);
|
||
|
||
// 5. Tmall 闪电退款订单重建(从退款数据恢复缺失的 Order/OrderItem)
|
||
// @TODO 添加平台检查,而不仅限于 Tmall, 其他平台可能存在类似情况
|
||
if (method_exists($parse, 'fixInstantRefundOrders')) {
|
||
$parse->fixInstantRefundOrders($raw_data);
|
||
}
|
||
|
||
Db::commit();
|
||
|
||
return Result::ACK;
|
||
} catch (Throwable $error) {
|
||
|
||
dump($error->getMessage());
|
||
dump($error->getTraceAsString());
|
||
|
||
Log::get()->error('Refund consumer processing failed', [
|
||
'error' => $error->getMessage(),
|
||
'retry_count' => $retry_count,
|
||
'max_retries' => $max_retries,
|
||
]);
|
||
Db::rollBack();
|
||
|
||
if ($retry_count >= $max_retries) {
|
||
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
|
||
|
||
try {
|
||
$this->sendToErrorQueue($message, $error);
|
||
dump(">>> Successfully sent to error queue!");
|
||
} catch (Throwable $e) {
|
||
dump(">>> FAILED to send to error queue: " . $e->getMessage());
|
||
}
|
||
|
||
dump(">>> Returning ACK to prevent further retries");
|
||
return Result::ACK;
|
||
}
|
||
|
||
dump(">>> Retry not exceeded, sending to DLX (NACK)");
|
||
return Result::NACK;
|
||
}
|
||
}
|
||
|
||
public function isEnable(): bool
|
||
{
|
||
return parent::isEnable();
|
||
}
|
||
|
||
/**
|
||
* 处理退款子项的批量同步
|
||
*
|
||
* 使用 upsert 批量写入 refund_items 表
|
||
* 唯一键:store_id + platform_parent_refund_id + platform_refund_id
|
||
*
|
||
* @param array $items 退款子项数据数组
|
||
* @return void
|
||
*/
|
||
protected function processRefundItems(array $items): void
|
||
{
|
||
if (empty($items)) {
|
||
dump('No refund items to process');
|
||
return;
|
||
}
|
||
|
||
dump("Upserting " . count($items) . " refund items");
|
||
|
||
RefundItem::query()->upsert(
|
||
$items,
|
||
['store_id', 'platform_parent_refund_id', 'platform_refund_id'],
|
||
[
|
||
'refund_id',
|
||
'company_id',
|
||
'platform_id',
|
||
'refund_status_id',
|
||
'refund_type_id',
|
||
'reason',
|
||
'currency',
|
||
'buyer_user_id',
|
||
'platform_order_id',
|
||
'platform_sub_order_id',
|
||
'platform_product_id',
|
||
'quantity',
|
||
'refund_amount',
|
||
'order_created_date',
|
||
'order_paid_date',
|
||
'updated_date',
|
||
'completed_date',
|
||
'raw',
|
||
'ext',
|
||
'updated_at',
|
||
]
|
||
);
|
||
|
||
dump("Refund items processing completed");
|
||
}
|
||
|
||
}
|