Files
datahub/backend/app/Platform/OrderConsumer.php
T
2025-12-01 16:31:13 +08:00

237 lines
8.5 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace App\Platform;
use App\Entity\Parse\EntityParseFactory;
use App\Utils\Log;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Builder\QueueBuilder;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Producer;
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\Di\Annotation\Inject;
use Hyperf\DbConnection\Db;
use Hyperf\Context\ApplicationContext;
use Throwable;
use function Hyperf\Support\env;
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", pool: "default_consumer", nums: 1, enable: true)]
class OrderConsumer extends ConsumerMessage
{
protected ?array $qos = [
// AMQP 默认并没有实现此配置。
'prefetch_size' => 0,
// 同一个消费者,最高同时可以处理的消息数。
// @attention 默认值 100 test 设置为 1
'prefetch_count' => 1,
// 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。
'global' => false,
];
/**
* 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列
* 这样才能实现重试队列和错误队列的机制
*/
protected bool $requeue = false;
protected $entityType = 'order';
/**
* 重写队列构建器,设置队列参数
* 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误
*
* @return QueueBuilder
*/
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())
->setQueue($this->getQueue())
->setArguments([
'x-dead-letter-exchange' => ['S', 'dlx.orders'],
'x-dead-letter-routing-key' => ['S', 'retry'],
'x-message-ttl' => ['I', 86400000], // 24小时
'x-queue-type' => ['S', 'classic'],
]);
}
public function consumeMessage($data, AMQPMessage $message): Result
{
// 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态
// 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒
$debugDelay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
if ($debugDelay > 0) {
dump("Debug mode: sleeping for {$debugDelay} seconds...");
sleep($debugDelay);
}
// dump('---data');
// dump($data);
// dump('---');
dump('---message');
dump(json_decode($message->getBody(), true)['message_id']);
dump('---');
// 获取重试次数
$retryCount = $this->getRetryCount($message);
$maxRetries = (int) env('AMQP_MAX_RETRIES', 3);
dump("Retry count: {$retryCount}/{$maxRetries}");
try {
// EntityParseFactory 使用静态方法调用,无需依赖注入
$parse = EntityParseFactory::createFromMessage($message);
// 提取 metadata
$metadata = [
'company_id' => $data['company_id'] ?? null,
'platform_id' => $data['platform_id'] ?? null,
'store_id' => $data['store_id'] ?? null,
'unique_id' => $data['unique_id'] ?? null,
];
// entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息。
$entity = $parse->entityMatch($metadata);
// message 中包含 raw dataraw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
$entityMapResult = $parse->entityMap($data['raw_data'] ?? []);
// $entityMapResult 应该是一个内部元素为 Model 的集合
Db::beginTransaction();
// 假设 $entityMapResult 为集合 @Collection 对象
$entityMapResult->each(function ($el) use ($entity) {
$clone = clone $entity;
$clone->fill($el);
$clone->save();
});
Db::commit();
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
return Result::ACK;
} catch (Throwable $error) {
dump("=== Error Caught ===");
dump("Error: " . $error->getMessage());
dump("Retry Count: {$retryCount}");
dump("Max Retries: {$maxRetries}");
dump("Check: {$retryCount} >= {$maxRetries} = " . ($retryCount >= $maxRetries ? 'TRUE' : 'FALSE'));
Log::get()->error('Consumer processing failed', [
'error' => $error->getMessage(),
'retry_count' => $retryCount,
'max_retries' => $maxRetries,
]);
Db::rollBack();
// 检查是否超过最大重试次数
if ($retryCount >= $maxRetries) {
// 超过重试次数,发送到错误队列
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
try {
$this->sendToErrorQueue($message, $error);
dump(">>> Successfully sent to error queue!");
} catch (Throwable $e) {
dump(">>> FAILED to send to error queue: " . $e->getMessage());
dump(">>> Stack trace: " . $e->getTraceAsString());
}
// 返回 ACK 避免消息再次重试
// 因为消息已经被发送到错误队列,不应该继续在主队列中循环
dump(">>> Returning ACK to prevent further retries");
return Result::ACK;
}
// 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列
dump(">>> Retry not exceeded, sending to DLX (NACK)");
return Result::NACK;
}
}
public function isEnable(): bool
{
return parent::isEnable();
}
/**
* 获取消息的重试次数
* 通过检查 x-death header 中的 count 字段
*
* @param AMQPMessage $message
* @return int
*/
protected function getRetryCount(AMQPMessage $message): int
{
// 检查是否存在 application_headers 属性
// 首次处理的消息不会有这个属性,必须先用 has() 检查
if (!$message->has('application_headers')) {
dump(">>> No application_headers, first time processing");
return 0;
}
$headers = $message->get('application_headers');
if (!$headers) {
dump(">>> application_headers exists but is empty");
return 0;
}
$headerData = $headers->getNativeData();
$xDeath = $headerData['x-death'] ?? [];
dump(">>> x-death header data:");
dump($xDeath);
if (empty($xDeath)) {
dump(">>> x-death is empty");
return 0; // 首次失败
}
// x-death 是一个数组,第一个元素包含 count 字段
// count 表示消息从该队列死信的次数
$count = $xDeath[0]['count'] ?? 0;
dump(">>> Extracted count from x-death: {$count}");
return $count;
}
/**
* 发送消息到错误队列
* 当重试次数超过上限时调用
*
* @param AMQPMessage $message 原始消息
* @param Throwable $error 错误信息
* @return void
*/
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
{
try {
$retryCount = $this->getRetryCount($message);
// 使用 ErrorProducer 发送到错误队列
$producer = ApplicationContext::getContainer()->get(Producer::class);
$errorProducer = new ErrorProducer($message, $error, $retryCount);
$producer->produce($errorProducer);
// 记录日志
Log::get()->warning('Message sent to error queue after exceeding retry limit', [
'error_id' => $errorProducer->payload['error_id'] ?? 'unknown',
'retry_count' => $retryCount,
'error_message' => $error->getMessage(),
]);
} catch (Throwable $e) {
// 发送到错误队列失败,记录日志
dump($e->getMessage());
Log::get()->error('Failed to send message to error queue', [
'error' => $e->getMessage(),
'original_error' => $error->getMessage(),
]);
}
}
}