0, // 同一个消费者,最高同时可以处理的消息数。 // @attention 默认值 100, test 设置为 1 'prefetch_count' => 1, // 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。 'global' => false, ]; /** * 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列 * 这样才能实现重试队列和错误队列的机制 */ protected bool $requeue = false; protected $entityType = 'order'; /** * 重写队列构建器,设置队列参数 * 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误 * * @return QueueBuilder */ public function getQueueBuilder(): QueueBuilder { return (new QueueBuilder()) ->setQueue($this->getQueue()) ->setArguments([ 'x-dead-letter-exchange' => ['S', 'dlx.orders'], 'x-dead-letter-routing-key' => ['S', 'retry'], 'x-message-ttl' => ['I', 86400000], // 24小时 'x-queue-type' => ['S', 'classic'], ]); } public function consumeMessage($data, AMQPMessage $message): Result { // 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态 // 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒 $debugDelay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); if ($debugDelay > 0) { dump("Debug mode: sleeping for {$debugDelay} seconds..."); sleep($debugDelay); } // dump('---data'); // dump($data); // dump('---'); dump('---message'); dump(json_decode($message->getBody(), true)['message_id']); dump('---'); // 获取重试次数 $retryCount = $this->getRetryCount($message); $maxRetries = (int) env('AMQP_MAX_RETRIES', 3); dump("Retry count: {$retryCount}/{$maxRetries}"); try { // EntityParseFactory 使用静态方法调用,无需依赖注入 $parse = EntityParseFactory::createFromMessage($message); // 提取 metadata $metadata = [ 'company_id' => $data['company_id'] ?? null, 'platform_id' => $data['platform_id'] ?? null, 'store_id' => $data['store_id'] ?? null, 'unique_id' => $data['unique_id'] ?? null, ]; // entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息。 $entity = $parse->entityMatch($metadata); // message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。 $entityMapResult = $parse->entityMap($data['raw_data'] ?? []); // $entityMapResult 应该是一个内部元素为 Model 的集合 Db::beginTransaction(); // 假设 $entityMapResult 为集合 @Collection 对象 $entityMapResult->each(function ($el) use ($entity) { $clone = clone $entity; $clone->fill($el); $clone->save(); }); Db::commit(); // 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。 return Result::ACK; } catch (Throwable $error) { dump("=== Error Caught ==="); dump("Error: " . $error->getMessage()); dump("Retry Count: {$retryCount}"); dump("Max Retries: {$maxRetries}"); dump("Check: {$retryCount} >= {$maxRetries} = " . ($retryCount >= $maxRetries ? 'TRUE' : 'FALSE')); Log::get()->error('Consumer processing failed', [ 'error' => $error->getMessage(), 'retry_count' => $retryCount, 'max_retries' => $maxRetries, ]); Db::rollBack(); // 检查是否超过最大重试次数 if ($retryCount >= $maxRetries) { // 超过重试次数,发送到错误队列 dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); try { $this->sendToErrorQueue($message, $error); dump(">>> Successfully sent to error queue!"); } catch (Throwable $e) { dump(">>> FAILED to send to error queue: " . $e->getMessage()); dump(">>> Stack trace: " . $e->getTraceAsString()); } // 返回 ACK 避免消息再次重试 // 因为消息已经被发送到错误队列,不应该继续在主队列中循环 dump(">>> Returning ACK to prevent further retries"); return Result::ACK; } // 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列 dump(">>> Retry not exceeded, sending to DLX (NACK)"); return Result::NACK; } } public function isEnable(): bool { return parent::isEnable(); } /** * 获取消息的重试次数 * 通过检查 x-death header 中的 count 字段 * * @param AMQPMessage $message * @return int */ protected function getRetryCount(AMQPMessage $message): int { // 检查是否存在 application_headers 属性 // 首次处理的消息不会有这个属性,必须先用 has() 检查 if (!$message->has('application_headers')) { dump(">>> No application_headers, first time processing"); return 0; } $headers = $message->get('application_headers'); if (!$headers) { dump(">>> application_headers exists but is empty"); return 0; } $headerData = $headers->getNativeData(); $xDeath = $headerData['x-death'] ?? []; dump(">>> x-death header data:"); dump($xDeath); if (empty($xDeath)) { dump(">>> x-death is empty"); return 0; // 首次失败 } // x-death 是一个数组,第一个元素包含 count 字段 // count 表示消息从该队列死信的次数 $count = $xDeath[0]['count'] ?? 0; dump(">>> Extracted count from x-death: {$count}"); return $count; } /** * 发送消息到错误队列 * 当重试次数超过上限时调用 * * @param AMQPMessage $message 原始消息 * @param Throwable $error 错误信息 * @return void */ protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void { try { $retryCount = $this->getRetryCount($message); // 使用 ErrorProducer 发送到错误队列 $producer = ApplicationContext::getContainer()->get(Producer::class); $errorProducer = new ErrorProducer($message, $error, $retryCount); $producer->produce($errorProducer); // 记录日志 Log::get()->warning('Message sent to error queue after exceeding retry limit', [ 'error_id' => $errorProducer->payload['error_id'] ?? 'unknown', 'retry_count' => $retryCount, 'error_message' => $error->getMessage(), ]); } catch (Throwable $e) { // 发送到错误队列失败,记录日志 dump($e->getMessage()); Log::get()->error('Failed to send message to error queue', [ 'error' => $e->getMessage(), 'original_error' => $error->getMessage(), ]); } } }