diff --git a/backend/app/Platform/OrderConsumer.php b/backend/app/Platform/OrderConsumer.php index f064f5b..3f9b859 100644 --- a/backend/app/Platform/OrderConsumer.php +++ b/backend/app/Platform/OrderConsumer.php @@ -314,13 +314,6 @@ class OrderConsumer extends ConsumerMessage */ protected function processOrderItems(array $items_by_platform_order_id): void { - dump('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!'); - - dump('processOrderItems'); - dump($items_by_platform_order_id); - - dump('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!'); - if (empty($items_by_platform_order_id)) { dump('No order items to process'); diff --git a/backend/app/Platform/RefundConsumer.php b/backend/app/Platform/RefundConsumer.php new file mode 100644 index 0000000..118c85e --- /dev/null +++ b/backend/app/Platform/RefundConsumer.php @@ -0,0 +1,278 @@ + 0, + 'prefetch_count' => 1, + 'global' => false, + ]; + + /** + * 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列 + */ + protected bool $requeue = false; + + protected $entityType = 'refund'; + + /** + * 重写队列构建器,设置队列参数 + * + * @return QueueBuilder + */ + public function getQueueBuilder(): QueueBuilder + { + return (new QueueBuilder()) + ->setQueue($this->getQueue()) + ->setArguments([ + 'x-dead-letter-exchange' => ['S', 'dlx.refunds'], + 'x-dead-letter-routing-key' => ['S', 'retry'], + 'x-message-ttl' => ['I', 86400000], // 24小时 + 'x-queue-type' => ['S', 'classic'], + ]); + } + + public function consumeMessage($data, AMQPMessage $message): Result + { + $debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); + if ($debug_delay > 0) { + dump("Debug mode: sleeping for {$debug_delay} seconds..."); + sleep($debug_delay); + } + + $retry_count = $this->getRetryCount($message); + $max_retries = (int) env('AMQP_MAX_RETRIES', 3); + + dump("Retry count: {$retry_count}/{$max_retries}"); + + try { + $parse = EntityParseFactory::createFromMessage($message); + + $meta = $data['meta'] ?? []; + $metadata = [ + 'company_id' => $meta['company_id'] ?? null, + 'platform_id' => $meta['platform_id'] ?? null, + 'store_id' => $meta['store_id'] ?? null, + 'unique_id' => $meta['unique_id'] ?? null, + ]; + + $entity = $parse->entityMatch($metadata); + + $raw_data = $data['data'] ?? []; + $entity_map_result = $parse->entityMap($raw_data); + $data_to_upsert = $entity_map_result->all(); + + if (empty($data_to_upsert)) { + dump('No data to process'); + return Result::ACK; + } + + dump("Processing " . count($data_to_upsert) . " refund(s) with batch upsert"); + + if (!method_exists($parse, 'formatRefundItemsFromRaw')) { + throw new Exception('formatRefundItemsFromRaw method must be implemented in ' . $parse::class); + } + + if (!method_exists($parse, 'getRefundStatusId')) { + throw new Exception('getRefundStatusId method must be implemented in ' . $parse::class); + } + + if (!method_exists($parse, 'getRefundTypeId')) { + throw new Exception('getRefundTypeId method must be implemented in ' . $parse::class); + } + + $refunds_data = $data_to_upsert; + $unique_by = $parse->getUniqueBy(); + + Db::beginTransaction(); + + // 1. 批量 upsert refunds + $entity->newQuery()->upsert( + $refunds_data, + $unique_by, + $parse->getUpdateFields() + ); + + // 2. 查询获取 ID 映射 [platform_refund_id => local db refund id] + $platform_refund_id_to_local_refund_id_map = $entity->newQuery() + ->where(function ($query) use ($refunds_data, $unique_by) { + foreach ($refunds_data as $refund_data) { + $query->orWhere(function ($q) use ($refund_data, $unique_by) { + foreach ($unique_by as $key) { + $q->where($key, $refund_data[$key]); + } + }); + } + }) + ->pluck('id', 'platform_refund_id') + ->toArray(); + + dump("ID mapping: " . count($platform_refund_id_to_local_refund_id_map) . " refunds"); + + // 3. 格式化退款子项(传入 ID 映射,使子项能获取正确的 refund_id) + $items = $parse->formatRefundItemsFromRaw($raw_data, $platform_refund_id_to_local_refund_id_map); + + // 4. 处理退款子项 + $this->processRefundItems($items); + + Db::commit(); + + return Result::ACK; + + } catch (Throwable $error) { + Log::get()->error('Refund consumer processing failed', [ + 'error' => $error->getMessage(), + 'retry_count' => $retry_count, + 'max_retries' => $max_retries, + ]); + Db::rollBack(); + + if ($retry_count >= $max_retries) { + dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); + + try { + $this->sendToErrorQueue($message, $error); + dump(">>> Successfully sent to error queue!"); + } catch (Throwable $e) { + dump(">>> FAILED to send to error queue: " . $e->getMessage()); + } + + dump(">>> Returning ACK to prevent further retries"); + return Result::ACK; + } + + dump(">>> Retry not exceeded, sending to DLX (NACK)"); + return Result::NACK; + } + } + + public function isEnable(): bool + { + return parent::isEnable(); + } + + /** + * 获取消息的重试次数 + * + * @param AMQPMessage $message + * @return int + */ + protected function getRetryCount(AMQPMessage $message): int + { + if (!$message->has('application_headers')) { + return 0; + } + + $headers = $message->get('application_headers'); + if (!$headers) { + return 0; + } + + $header_data = $headers->getNativeData(); + $x_death = $header_data['x-death'] ?? []; + + if (empty($x_death)) { + return 0; + } + + return $x_death[0]['count'] ?? 0; + } + + /** + * 发送消息到错误队列 + * + * @param AMQPMessage $message 原始消息 + * @param Throwable $error 错误信息 + * @return void + */ + protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void + { + try { + $retry_count = $this->getRetryCount($message); + + $producer = ApplicationContext::getContainer()->get(Producer::class); + $error_producer = new ErrorProducer($message, $error, $retry_count); + $producer->produce($error_producer); + + Log::get()->warning('Refund message sent to error queue after exceeding retry limit', [ + 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', + 'retry_count' => $retry_count, + 'error_message' => $error->getMessage(), + ]); + } catch (Throwable $e) { + dump($e->getMessage()); + Log::get()->error('Failed to send refund message to error queue', [ + 'error' => $e->getMessage(), + 'original_error' => $error->getMessage(), + ]); + } + } + + /** + * 处理退款子项的批量同步 + * + * 使用 upsert 批量写入 refund_items 表 + * 唯一键:store_id + platform_parent_refund_id + platform_refund_id + * + * @param array $items 退款子项数据数组 + * @return void + */ + protected function processRefundItems(array $items): void + { + if (empty($items)) { + dump('No refund items to process'); + return; + } + + dump("Upserting " . count($items) . " refund items"); + + RefundItem::query()->upsert( + $items, + ['store_id', 'platform_parent_refund_id', 'platform_refund_id'], + [ + 'refund_id', + 'company_id', + 'platform_id', + 'refund_status_id', + 'refund_type_id', + 'reason', + 'currency', + 'buyer_user_id', + 'platform_order_id', + 'platform_sub_order_id', + 'platform_product_id', + 'quantity', + 'refund_amount', + 'updated_date', + 'completed_date', + 'raw', + 'ext', + 'updated_at', + ] + ); + + dump("Refund items processing completed"); + } +}