Files
datahub/backend/app/Platform/RefundConsumer.php
T

279 lines
8.7 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Platform;
use App\Entity\Parse\EntityParseFactory;
use App\Utils\Log;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Builder\QueueBuilder;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Producer;
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\DbConnection\Db;
use App\Model\RefundItem;
use Exception;
use Hyperf\Context\ApplicationContext;
use Throwable;
use function Hyperf\Support\env;
#[Consumer(exchange: "main.exchange", routingKey: "refund.#", queue: "refunds.queue", pool: "default_consumer", nums: 1, enable: true)]
class RefundConsumer extends ConsumerMessage
{
protected ?array $qos = [
'prefetch_size' => 0,
'prefetch_count' => 1,
'global' => false,
];
/**
* 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列
*/
protected bool $requeue = false;
protected $entityType = 'refund';
/**
* 重写队列构建器,设置队列参数
*
* @return QueueBuilder
*/
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())
->setQueue($this->getQueue())
->setArguments([
'x-dead-letter-exchange' => ['S', 'dlx.refunds'],
'x-dead-letter-routing-key' => ['S', 'retry'],
'x-message-ttl' => ['I', 86400000], // 24小时
'x-queue-type' => ['S', 'classic'],
]);
}
public function consumeMessage($data, AMQPMessage $message): Result
{
$debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
if ($debug_delay > 0) {
dump("Debug mode: sleeping for {$debug_delay} seconds...");
sleep($debug_delay);
}
$retry_count = $this->getRetryCount($message);
$max_retries = (int) env('AMQP_MAX_RETRIES', 3);
dump("Retry count: {$retry_count}/{$max_retries}");
try {
$parse = EntityParseFactory::createFromMessage($message);
$meta = $data['meta'] ?? [];
$metadata = [
'company_id' => $meta['company_id'] ?? null,
'platform_id' => $meta['platform_id'] ?? null,
'store_id' => $meta['store_id'] ?? null,
'unique_id' => $meta['unique_id'] ?? null,
];
$entity = $parse->entityMatch($metadata);
$raw_data = $data['data'] ?? [];
$entity_map_result = $parse->entityMap($raw_data);
$data_to_upsert = $entity_map_result->all();
if (empty($data_to_upsert)) {
dump('No data to process');
return Result::ACK;
}
dump("Processing " . count($data_to_upsert) . " refund(s) with batch upsert");
if (!method_exists($parse, 'formatRefundItemsFromRaw')) {
throw new Exception('formatRefundItemsFromRaw method must be implemented in ' . $parse::class);
}
if (!method_exists($parse, 'getRefundStatusId')) {
throw new Exception('getRefundStatusId method must be implemented in ' . $parse::class);
}
if (!method_exists($parse, 'getRefundTypeId')) {
throw new Exception('getRefundTypeId method must be implemented in ' . $parse::class);
}
$refunds_data = $data_to_upsert;
$unique_by = $parse->getUniqueBy();
Db::beginTransaction();
// 1. 批量 upsert refunds
$entity->newQuery()->upsert(
$refunds_data,
$unique_by,
$parse->getUpdateFields()
);
// 2. 查询获取 ID 映射 [platform_refund_id => local db refund id]
$platform_refund_id_to_local_refund_id_map = $entity->newQuery()
->where(function ($query) use ($refunds_data, $unique_by) {
foreach ($refunds_data as $refund_data) {
$query->orWhere(function ($q) use ($refund_data, $unique_by) {
foreach ($unique_by as $key) {
$q->where($key, $refund_data[$key]);
}
});
}
})
->pluck('id', 'platform_refund_id')
->toArray();
dump("ID mapping: " . count($platform_refund_id_to_local_refund_id_map) . " refunds");
// 3. 格式化退款子项(传入 ID 映射,使子项能获取正确的 refund_id)
$items = $parse->formatRefundItemsFromRaw($raw_data, $platform_refund_id_to_local_refund_id_map);
// 4. 处理退款子项
$this->processRefundItems($items);
Db::commit();
return Result::ACK;
} catch (Throwable $error) {
Log::get()->error('Refund consumer processing failed', [
'error' => $error->getMessage(),
'retry_count' => $retry_count,
'max_retries' => $max_retries,
]);
Db::rollBack();
if ($retry_count >= $max_retries) {
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
try {
$this->sendToErrorQueue($message, $error);
dump(">>> Successfully sent to error queue!");
} catch (Throwable $e) {
dump(">>> FAILED to send to error queue: " . $e->getMessage());
}
dump(">>> Returning ACK to prevent further retries");
return Result::ACK;
}
dump(">>> Retry not exceeded, sending to DLX (NACK)");
return Result::NACK;
}
}
public function isEnable(): bool
{
return parent::isEnable();
}
/**
* 获取消息的重试次数
*
* @param AMQPMessage $message
* @return int
*/
protected function getRetryCount(AMQPMessage $message): int
{
if (!$message->has('application_headers')) {
return 0;
}
$headers = $message->get('application_headers');
if (!$headers) {
return 0;
}
$header_data = $headers->getNativeData();
$x_death = $header_data['x-death'] ?? [];
if (empty($x_death)) {
return 0;
}
return $x_death[0]['count'] ?? 0;
}
/**
* 发送消息到错误队列
*
* @param AMQPMessage $message 原始消息
* @param Throwable $error 错误信息
* @return void
*/
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
{
try {
$retry_count = $this->getRetryCount($message);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$error_producer = new ErrorProducer($message, $error, $retry_count);
$producer->produce($error_producer);
Log::get()->warning('Refund message sent to error queue after exceeding retry limit', [
'error_id' => $error_producer->payload['error_id'] ?? 'unknown',
'retry_count' => $retry_count,
'error_message' => $error->getMessage(),
]);
} catch (Throwable $e) {
dump($e->getMessage());
Log::get()->error('Failed to send refund message to error queue', [
'error' => $e->getMessage(),
'original_error' => $error->getMessage(),
]);
}
}
/**
* 处理退款子项的批量同步
*
* 使用 upsert 批量写入 refund_items 表
* 唯一键:store_id + platform_parent_refund_id + platform_refund_id
*
* @param array $items 退款子项数据数组
* @return void
*/
protected function processRefundItems(array $items): void
{
if (empty($items)) {
dump('No refund items to process');
return;
}
dump("Upserting " . count($items) . " refund items");
RefundItem::query()->upsert(
$items,
['store_id', 'platform_parent_refund_id', 'platform_refund_id'],
[
'refund_id',
'company_id',
'platform_id',
'refund_status_id',
'refund_type_id',
'reason',
'currency',
'buyer_user_id',
'platform_order_id',
'platform_sub_order_id',
'platform_product_id',
'quantity',
'refund_amount',
'updated_date',
'completed_date',
'raw',
'ext',
'updated_at',
]
);
dump("Refund items processing completed");
}
}