0, 'prefetch_count' => 1, 'global' => false, ]; /** * 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列 */ protected bool $requeue = false; protected $entityType = 'product'; /** * 重写队列构建器,设置队列参数 * * @return QueueBuilder */ public function getQueueBuilder(): QueueBuilder { return (new QueueBuilder()) ->setQueue($this->getQueue()) ->setArguments([ 'x-dead-letter-exchange' => ['S', 'dlx.products'], 'x-dead-letter-routing-key' => ['S', 'retry'], 'x-message-ttl' => ['I', 86400000], // 24小时 'x-queue-type' => ['S', 'classic'], ]); } public function consumeMessage($data, AMQPMessage $message): Result { // 调试延迟 $debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); if ($debug_delay > 0) { dump("Debug mode: sleeping for {$debug_delay} seconds..."); sleep($debug_delay); } // 获取重试次数 $retry_count = $this->getRetryCount($message); $max_retries = (int) env('AMQP_MAX_RETRIES', 3); dump("Product Consumer - Retry count: {$retry_count}/{$max_retries}"); try { // EntityParseFactory 使用静态方法调用 $parse = EntityParseFactory::createFromMessage($message); // 提取 metadata $meta = $data['meta'] ?? []; $metadata = [ 'company_id' => $meta['company_id'] ?? null, 'platform_id' => $meta['platform_id'] ?? null, 'store_id' => $meta['store_id'] ?? null, 'platform_store_id' => $meta['platform_store_id'] ?? null, 'unique_id' => $meta['unique_id'] ?? null, ]; // 获取实体模型 $entity = $parse->entityMatch($metadata); // 映射原始数据 $entity_map_result = $parse->entityMap($data['data'] ?? []); // 转为数组,准备批量操作 $data_to_upsert = $entity_map_result->all(); dump($entity_map_result->first()); if (empty($data_to_upsert)) { dump('No data to process'); return Result::ACK; } dump("Processing " . count($data_to_upsert) . " product(s) with batch upsert"); Db::beginTransaction(); // 使用 upsert 批量处理产品插入和更新 $unique_by = $parse->getUniqueBy(); $entity->newQuery()->upsert( $data_to_upsert, $unique_by, $parse->getUpdateFields() ); Db::commit(); dump("Successfully upserted " . count($data_to_upsert) . " product(s)"); return Result::ACK; } catch (Throwable $error) { dump($error->getMessage()); dump($error->getTraceAsString()); Log::get()->error('Product consumer processing failed', [ 'error' => $error->getMessage(), 'retry_count' => $retry_count, 'max_retries' => $max_retries, ]); Db::rollBack(); // 检查是否超过最大重试次数 if ($retry_count >= $max_retries) { dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); try { $this->sendToErrorQueue($message, $error); dump(">>> Successfully sent to error queue!"); } catch (Throwable $e) { dump(">>> FAILED to send to error queue: " . $e->getMessage()); } return Result::ACK; } dump(">>> Retry not exceeded, sending to DLX (NACK)"); return Result::NACK; } } public function isEnable(): bool { return parent::isEnable(); } }