diff --git a/backend/app/Platform/ProductConsumer.php b/backend/app/Platform/ProductConsumer.php new file mode 100644 index 0000000..c606044 --- /dev/null +++ b/backend/app/Platform/ProductConsumer.php @@ -0,0 +1,210 @@ + 0, + 'prefetch_count' => 1, + 'global' => false, + ]; + + /** + * 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列 + */ + protected bool $requeue = false; + + protected $entityType = 'product'; + + /** + * 重写队列构建器,设置队列参数 + * + * @return QueueBuilder + */ + public function getQueueBuilder(): QueueBuilder + { + return (new QueueBuilder()) + ->setQueue($this->getQueue()) + ->setArguments([ + 'x-dead-letter-exchange' => ['S', 'dlx.products'], + 'x-dead-letter-routing-key' => ['S', 'retry'], + 'x-message-ttl' => ['I', 86400000], // 24小时 + 'x-queue-type' => ['S', 'classic'], + ]); + } + + public function consumeMessage($data, AMQPMessage $message): Result + { + // 调试延迟 + $debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); + if ($debug_delay > 0) { + dump("Debug mode: sleeping for {$debug_delay} seconds..."); + sleep($debug_delay); + } + + // 获取重试次数 + $retry_count = $this->getRetryCount($message); + $max_retries = (int) env('AMQP_MAX_RETRIES', 3); + + dump("Product Consumer - Retry count: {$retry_count}/{$max_retries}"); + + try { + // EntityParseFactory 使用静态方法调用 + $parse = EntityParseFactory::createFromMessage($message); + + // 提取 metadata + $meta = $data['meta'] ?? []; + $metadata = [ + 'company_id' => $meta['company_id'] ?? null, + 'platform_id' => $meta['platform_id'] ?? null, + 'store_id' => $meta['store_id'] ?? null, + 'platform_store_id' => $meta['platform_store_id'] ?? null, + 'unique_id' => $meta['unique_id'] ?? null, + ]; + + // 获取实体模型 + $entity = $parse->entityMatch($metadata); + + // 映射原始数据 + $entity_map_result = $parse->entityMap($data['data'] ?? []); + + // 转为数组,准备批量操作 + $data_to_upsert = $entity_map_result->all(); + + dump($entity_map_result->first()); + + if (empty($data_to_upsert)) { + dump('No data to process'); + return Result::ACK; + } + + dump("Processing " . count($data_to_upsert) . " product(s) with batch upsert"); + + // 检测产品解析器是否实现了 getProductStatusId 方法 + if (!method_exists($parse, 'getProductStatusId')) { + throw new \Exception('getProductStatusId method must be implemented in ' . $parse::class); + } + + Db::beginTransaction(); + + // 使用 upsert 批量处理产品插入和更新 + $unique_by = $parse->getUniqueBy(); + $entity->newQuery()->upsert( + $data_to_upsert, + $unique_by, + $parse->getUpdateFields() + ); + + Db::commit(); + + dump("Successfully upserted " . count($data_to_upsert) . " product(s)"); + + return Result::ACK; + + } catch (Throwable $error) { + Log::get()->error('Product consumer processing failed', [ + 'error' => $error->getMessage(), + 'retry_count' => $retry_count, + 'max_retries' => $max_retries, + ]); + Db::rollBack(); + + // 检查是否超过最大重试次数 + if ($retry_count >= $max_retries) { + dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); + + try { + $this->sendToErrorQueue($message, $error); + dump(">>> Successfully sent to error queue!"); + } catch (Throwable $e) { + dump(">>> FAILED to send to error queue: " . $e->getMessage()); + } + + return Result::ACK; + } + + dump(">>> Retry not exceeded, sending to DLX (NACK)"); + return Result::NACK; + } + } + + public function isEnable(): bool + { + return parent::isEnable(); + } + + /** + * 获取消息的重试次数 + * + * @param AMQPMessage $message + * @return int + */ + protected function getRetryCount(AMQPMessage $message): int + { + if (!$message->has('application_headers')) { + return 0; + } + + $headers = $message->get('application_headers'); + if (!$headers) { + return 0; + } + + $headerData = $headers->getNativeData(); + $xDeath = $headerData['x-death'] ?? []; + + if (empty($xDeath)) { + return 0; + } + + return $xDeath[0]['count'] ?? 0; + } + + /** + * 发送消息到错误队列 + * + * @param AMQPMessage $message 原始消息 + * @param Throwable $error 错误信息 + * @return void + */ + protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void + { + try { + $retry_count = $this->getRetryCount($message); + + $producer = ApplicationContext::getContainer()->get(Producer::class); + $error_producer = new ErrorProducer($message, $error, $retry_count); + $producer->produce($error_producer); + + Log::get()->warning('Product message sent to error queue after exceeding retry limit', [ + 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', + 'retry_count' => $retry_count, + 'error_message' => $error->getMessage(), + ]); + } catch (Throwable $e) { + dump($e->getMessage()); + Log::get()->error('Failed to send product message to error queue', [ + 'error' => $e->getMessage(), + 'original_error' => $error->getMessage(), + ]); + } + } +}