0, 'prefetch_count' => 1, 'global' => false, ]; /** * 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列 */ protected bool $requeue = false; protected $entityType = 'product'; /** * 重写队列构建器,设置队列参数 * * @return QueueBuilder */ public function getQueueBuilder(): QueueBuilder { return (new QueueBuilder()) ->setQueue($this->getQueue()) ->setArguments([ 'x-dead-letter-exchange' => ['S', 'dlx.products'], 'x-dead-letter-routing-key' => ['S', 'retry'], 'x-message-ttl' => ['I', 86400000], // 24小时 'x-queue-type' => ['S', 'classic'], ]); } public function consumeMessage($data, AMQPMessage $message): Result { // 调试延迟 $debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); if ($debug_delay > 0) { dump("Debug mode: sleeping for {$debug_delay} seconds..."); sleep($debug_delay); } // 获取重试次数 $retry_count = $this->getRetryCount($message); $max_retries = (int) env('AMQP_MAX_RETRIES', 3); dump("Product Consumer - Retry count: {$retry_count}/{$max_retries}"); try { // EntityParseFactory 使用静态方法调用 $parse = EntityParseFactory::createFromMessage($message); // 提取 metadata $meta = $data['meta'] ?? []; $metadata = [ 'company_id' => $meta['company_id'] ?? null, 'platform_id' => $meta['platform_id'] ?? null, 'store_id' => $meta['store_id'] ?? null, 'platform_store_id' => $meta['platform_store_id'] ?? null, 'unique_id' => $meta['unique_id'] ?? null, ]; // 获取实体模型 $entity = $parse->entityMatch($metadata); // 映射原始数据 $entity_map_result = $parse->entityMap($data['data'] ?? []); // 转为数组,准备批量操作 $data_to_upsert = $entity_map_result->all(); dump($entity_map_result->first()); if (empty($data_to_upsert)) { dump('No data to process'); return Result::ACK; } dump("Processing " . count($data_to_upsert) . " product(s) with batch upsert"); // 检测产品解析器是否实现了 getProductStatusId 方法 if (!method_exists($parse, 'getProductStatusId')) { throw new \Exception('getProductStatusId method must be implemented in ' . $parse::class); } Db::beginTransaction(); // 使用 upsert 批量处理产品插入和更新 $unique_by = $parse->getUniqueBy(); $entity->newQuery()->upsert( $data_to_upsert, $unique_by, $parse->getUpdateFields() ); Db::commit(); dump("Successfully upserted " . count($data_to_upsert) . " product(s)"); return Result::ACK; } catch (Throwable $error) { Log::get()->error('Product consumer processing failed', [ 'error' => $error->getMessage(), 'retry_count' => $retry_count, 'max_retries' => $max_retries, ]); Db::rollBack(); // 检查是否超过最大重试次数 if ($retry_count >= $max_retries) { dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); try { $this->sendToErrorQueue($message, $error); dump(">>> Successfully sent to error queue!"); } catch (Throwable $e) { dump(">>> FAILED to send to error queue: " . $e->getMessage()); } return Result::ACK; } dump(">>> Retry not exceeded, sending to DLX (NACK)"); return Result::NACK; } } public function isEnable(): bool { return parent::isEnable(); } /** * 获取消息的重试次数 * * @param AMQPMessage $message * @return int */ protected function getRetryCount(AMQPMessage $message): int { if (!$message->has('application_headers')) { return 0; } $headers = $message->get('application_headers'); if (!$headers) { return 0; } $headerData = $headers->getNativeData(); $xDeath = $headerData['x-death'] ?? []; if (empty($xDeath)) { return 0; } return $xDeath[0]['count'] ?? 0; } /** * 发送消息到错误队列 * * @param AMQPMessage $message 原始消息 * @param Throwable $error 错误信息 * @return void */ protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void { try { $retry_count = $this->getRetryCount($message); $producer = ApplicationContext::getContainer()->get(Producer::class); $error_producer = new ErrorProducer($message, $error, $retry_count); $producer->produce($error_producer); Log::get()->warning('Product message sent to error queue after exceeding retry limit', [ 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', 'retry_count' => $retry_count, 'error_message' => $error->getMessage(), ]); } catch (Throwable $e) { dump($e->getMessage()); Log::get()->error('Failed to send product message to error queue', [ 'error' => $e->getMessage(), 'original_error' => $error->getMessage(), ]); } } }