0, 'prefetch_count' => 1, 'global' => false, ]; /** * 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列 */ protected bool $requeue = false; protected $entityType = 'refund'; /** * 重写队列构建器,设置队列参数 * * @return QueueBuilder */ public function getQueueBuilder(): QueueBuilder { return (new QueueBuilder()) ->setQueue($this->getQueue()) ->setArguments([ 'x-dead-letter-exchange' => ['S', 'dlx.refunds'], 'x-dead-letter-routing-key' => ['S', 'retry'], 'x-message-ttl' => ['I', 86400000], // 24小时 'x-queue-type' => ['S', 'classic'], ]); } public function consumeMessage($data, AMQPMessage $message): Result { $debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); if ($debug_delay > 0) { dump("Debug mode: sleeping for {$debug_delay} seconds..."); sleep($debug_delay); } $retry_count = $this->getRetryCount($message); $max_retries = (int) env('AMQP_MAX_RETRIES', 3); dump("Retry count: {$retry_count}/{$max_retries}"); try { $parse = EntityParseFactory::createFromMessage($message); $this->parseValidate($parse); $meta = $data['meta'] ?? []; $metadata = [ 'company_id' => $meta['company_id'] ?? null, 'platform_id' => $meta['platform_id'] ?? null, 'store_id' => $meta['store_id'] ?? null, 'unique_id' => $meta['unique_id'] ?? null, ]; $entity = $parse->entityMatch($metadata); $raw_data = $data['data'] ?? []; $entity_map_result = $parse->entityMap($raw_data); $data_to_upsert = $entity_map_result->all(); $platform_refund_id_to_local_refund_id_map = null; // 退款信息为空且有父退款,直接返回 ACK if (empty($data_to_upsert) && $parse->hasParentRefund()) { dump('No data to process'); return Result::ACK; } // 无父级退款,直接使用平台售后单的情况, 比如 Tmall if(!$parse->hasParentRefund()){ goto PROCESS_REFUND_ITEMS; } // 有父退款,且要处理退款子项的情况 $refunds_data = $data_to_upsert; $unique_by = $parse->getUniqueBy(); Db::beginTransaction(); // 1. 批量 upsert refunds $entity->newQuery()->upsert( $refunds_data, $unique_by, $parse->getUpdateFields() ); // 2. 查询获取 ID 映射 [platform_refund_id => local db refund id] $platform_refund_id_to_local_refund_id_map = $entity->newQuery() ->where(function ($query) use ($refunds_data, $unique_by) { foreach ($refunds_data as $refund_data) { $query->orWhere(function ($q) use ($refund_data, $unique_by) { foreach ($unique_by as $key) { $q->where($key, $refund_data[$key]); } }); } }) ->pluck('id', 'platform_refund_id') ->toArray(); if ($parse->hasParentRefund() && empty($platform_refund_id_to_local_refund_id_map)) { Log::get()->warning('Refund ID mapping is empty after upsert, refund_items.refund_id will not be populated', [ 'store_id' => $parse->getStore()->id, 'platform' => $parse->getPlatform()->name ?? '', ]); } // @TODO refund type 也需要进一步确定 app/Constants/RefundType.php // @attention 没有父退款的平台,业务直接跳到此处执行 PROCESS_REFUND_ITEMS: // 3. 格式化退款子项(传入 ID 映射,使子项能获取正确的 refund_id) $items = $parse->formatRefundItemsFromRaw($raw_data, $platform_refund_id_to_local_refund_id_map); // 4. 处理退款子项 $this->processRefundItems($items); // 5. Tmall 闪电退款订单重建(从退款数据恢复缺失的 Order/OrderItem) // @TODO 添加平台检查,而不仅限于 Tmall, 其他平台可能存在类似情况 if (method_exists($parse, 'fixInstantRefundOrders')) { $parse->fixInstantRefundOrders($raw_data); } Db::commit(); return Result::ACK; } catch (Throwable $error) { dump($error->getMessage()); dump($error->getTraceAsString()); Log::get()->error('Refund consumer processing failed', [ 'error' => $error->getMessage(), 'retry_count' => $retry_count, 'max_retries' => $max_retries, ]); Db::rollBack(); if ($retry_count >= $max_retries) { dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); try { $this->sendToErrorQueue($message, $error); dump(">>> Successfully sent to error queue!"); } catch (Throwable $e) { dump(">>> FAILED to send to error queue: " . $e->getMessage()); } dump(">>> Returning ACK to prevent further retries"); return Result::ACK; } dump(">>> Retry not exceeded, sending to DLX (NACK)"); return Result::NACK; } } public function isEnable(): bool { return parent::isEnable(); } /** * 获取消息的重试次数 * * @param AMQPMessage $message * @return int */ protected function getRetryCount(AMQPMessage $message): int { if (!$message->has('application_headers')) { return 0; } $headers = $message->get('application_headers'); if (!$headers) { return 0; } $header_data = $headers->getNativeData(); $x_death = $header_data['x-death'] ?? []; if (empty($x_death)) { return 0; } return $x_death[0]['count'] ?? 0; } /** * 发送消息到错误队列 * * @param AMQPMessage $message 原始消息 * @param Throwable $error 错误信息 * @return void */ protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void { try { $retry_count = $this->getRetryCount($message); $producer = ApplicationContext::getContainer()->get(Producer::class); $error_producer = new ErrorProducer($message, $error, $retry_count); $producer->produce($error_producer); Log::get()->warning('Refund message sent to error queue after exceeding retry limit', [ 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', 'retry_count' => $retry_count, 'error_message' => $error->getMessage(), ]); } catch (Throwable $e) { dump($e->getMessage()); Log::get()->error('Failed to send refund message to error queue', [ 'error' => $e->getMessage(), 'original_error' => $error->getMessage(), ]); } } /** * 处理退款子项的批量同步 * * 使用 upsert 批量写入 refund_items 表 * 唯一键:store_id + platform_parent_refund_id + platform_refund_id * * @param array $items 退款子项数据数组 * @return void */ protected function processRefundItems(array $items): void { if (empty($items)) { dump('No refund items to process'); return; } dump("Upserting " . count($items) . " refund items"); RefundItem::query()->upsert( $items, ['store_id', 'platform_parent_refund_id', 'platform_refund_id'], [ 'refund_id', 'company_id', 'platform_id', 'refund_status_id', 'refund_type_id', 'reason', 'currency', 'buyer_user_id', 'platform_order_id', 'platform_sub_order_id', 'platform_product_id', 'quantity', 'refund_amount', 'order_created_date', 'order_paid_date', 'updated_date', 'completed_date', 'raw', 'ext', 'updated_at', ] ); dump("Refund items processing completed"); } protected function parseValidate(EntityParseInterface $parse): void { if (!method_exists($parse, 'hasParentRefund')) { throw new Exception('hasParentRefund method must be implemented in ' . $parse::class); } if (!method_exists($parse, 'formatRefundItemsFromRaw')) { throw new Exception('formatRefundItemsFromRaw method must be implemented in ' . $parse::class); } if (!method_exists($parse, 'getRefundStatusId')) { throw new Exception('getRefundStatusId method must be implemented in ' . $parse::class); } if (!method_exists($parse, 'getRefundTypeId')) { throw new Exception('getRefundTypeId method must be implemented in ' . $parse::class); } } }