diff --git a/backend/app/Platform/OrderConsumer.php b/backend/app/Platform/OrderConsumer.php index cd94d3e..299b1fc 100644 --- a/backend/app/Platform/OrderConsumer.php +++ b/backend/app/Platform/OrderConsumer.php @@ -5,82 +5,113 @@ declare(strict_types=1); namespace App\Platform; use App\Entity\Parse\EntityParseFactory; +use App\Utils\Log; use Hyperf\Amqp\Annotation\Consumer; +use Hyperf\Amqp\Builder\QueueBuilder; use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Result; +use Hyperf\Amqp\Producer; use PhpAmqpLib\Message\AMQPMessage; use Hyperf\Di\Annotation\Inject; use Hyperf\DbConnection\Db; +use Hyperf\Context\ApplicationContext; use Throwable; +use function Hyperf\Support\env; + #[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", pool: "default_consumer", nums: 1, enable: true)] class OrderConsumer extends ConsumerMessage { - /** - * 队列参数配置 - * 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误 - * - * 参数说明: - * - x-message-ttl: 消息存活时间(毫秒),24小时 = 86400000ms - * - x-dead-letter-exchange: 死信交换机,用于重试机制 - * - x-dead-letter-routing-key: 死信路由键 - */ - protected array $queueOptions = [ - 'x-message-ttl' => ['I', 86400000], // 24小时 TTL - 'x-dead-letter-exchange' => ['S', 'dlx.orders'], // 死信交换机 - 'x-dead-letter-routing-key' => ['S', 'retry'], // 死信路由键 - ]; - protected ?array $qos = [ // AMQP 默认并没有实现此配置。 'prefetch_size' => 0, // 同一个消费者,最高同时可以处理的消息数。 - 'prefetch_count' => 100, + // @attention 默认值 100, test 设置为 1 + 'prefetch_count' => 1, // 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。 'global' => false, ]; + /** + * 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列 + * 这样才能实现重试队列和错误队列的机制 + */ + protected bool $requeue = false; + protected $entityType = 'order'; - + #[Inject] protected EntityParseFactory $entityParseFactory; + #[Inject] + protected Producer $producer; + + /** + * 重写队列构建器,设置队列参数 + * 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误 + * + * @return QueueBuilder + */ + public function getQueueBuilder(): QueueBuilder + { + return (new QueueBuilder()) + ->setQueue($this->getQueue()) + ->setArguments([ + 'x-dead-letter-exchange' => ['S', 'dlx.orders'], + 'x-dead-letter-routing-key' => ['S', 'retry'], + 'x-message-ttl' => ['I', 86400000], // 24小时 + 'x-queue-type' => ['S', 'classic'], + ]); + } + public function consumeMessage($data, AMQPMessage $message): Result { + // 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态 + // 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒 + $debugDelay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); + if ($debugDelay > 0) { + dump("Debug mode: sleeping for {$debugDelay} seconds..."); + + sleep($debugDelay); + } // dump('---data'); // dump($data); // dump('---'); dump('---message'); - dump( json_decode($message->getBody(), true)); + dump(json_decode($message->getBody(), true)); dump('---'); + // 获取重试次数 + $retryCount = $this->getRetryCount($message); + $maxRetries = (int) env('AMQP_MAX_RETRIES', 3); - $parse = $this->entityParseFactory->createFromMessage($message); - - // 提取 metadata - $metadata = [ - 'company_id' => $data['company_id'] ?? null, - 'platform_id' => $data['platform_id'] ?? null, - 'store_id' => $data['store_id'] ?? null, - 'unique_id' => $data['unique_id'] ?? null, - ]; - - // entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息。 - $entity = $parse->entityMatch($metadata); - - // message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。 - $entityMapResult = $parse->entityMap($data['raw_data'] ?? []); - - - // $entityMapResult 应该是一个内部元素为 Model 的集合 - Db::beginTransaction(); + dump("Retry count: {$retryCount}/{$maxRetries}"); try { + $parse = $this->entityParseFactory->createFromMessage($message); + + // 提取 metadata + $metadata = [ + 'company_id' => $data['company_id'] ?? null, + 'platform_id' => $data['platform_id'] ?? null, + 'store_id' => $data['store_id'] ?? null, + 'unique_id' => $data['unique_id'] ?? null, + ]; + + // entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息。 + $entity = $parse->entityMatch($metadata); + + // message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。 + $entityMapResult = $parse->entityMap($data['raw_data'] ?? []); + + // $entityMapResult 应该是一个内部元素为 Model 的集合 + Db::beginTransaction(); + // 假设 $entityMapResult 为集合 @Collection 对象 - $entityMapResult->each(function($el) use ($entity) { + $entityMapResult->each(function ($el) use ($entity) { $clone = clone $entity; $clone->fill($el); $clone->save(); @@ -92,7 +123,26 @@ class OrderConsumer extends ConsumerMessage return Result::ACK; } catch (Throwable $error) { dump($error->getMessage()); + Log::get()->error('Consumer processing failed', [ + 'error' => $error->getMessage(), + 'retry_count' => $retryCount, + 'max_retries' => $maxRetries, + ]); Db::rollBack(); + + // 检查是否超过最大重试次数 + if ($retryCount >= $maxRetries) { + // 超过重试次数,发送到错误队列 + dump("Max retries exceeded ({$retryCount}/{$maxRetries}), sending to error queue..."); + $this->sendToErrorQueue($message, $error); + + // 返回 ACK 避免消息再次重试 + // 因为消息已经被发送到错误队列,不应该继续在主队列中循环 + return Result::ACK; + } + + // 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列 + dump("Retry {$retryCount}/{$maxRetries}, sending to retry queue..."); return Result::NACK; } } @@ -101,4 +151,95 @@ class OrderConsumer extends ConsumerMessage { return parent::isEnable(); } + + /** + * 获取消息的重试次数 + * 通过检查 x-death header 中的 count 字段 + * + * @param AMQPMessage $message + * @return int + */ + protected function getRetryCount(AMQPMessage $message): int + { + $headers = $message->get('application_headers'); + if (!$headers) { + return 0; + } + + $headerData = $headers->getNativeData(); + $xDeath = $headerData['x-death'] ?? []; + + if (empty($xDeath)) { + return 0; // 首次失败 + } + + // x-death 是一个数组,第一个元素包含 count 字段 + // count 表示消息从该队列死信的次数 + return $xDeath[0]['count'] ?? 0; + } + + /** + * 发送消息到错误队列 + * 当重试次数超过上限时调用 + * + * @param AMQPMessage $message 原始消息 + * @param Throwable $error 错误信息 + * @return void + */ + protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void + { + try { + $originalData = json_decode($message->getBody(), true); + + // 构建错误消息 + $errorMessage = [ + 'error_id' => uniqid('err_', true), + 'original_message' => $originalData, + 'error' => [ + 'type' => get_class($error), + 'message' => $error->getMessage(), + 'trace' => $error->getTraceAsString(), + 'timestamp' => date('c'), + ], + 'metadata' => [ + 'platform' => $originalData['platform'] ?? 'unknown', + 'platform_id' => $originalData['meta']['platform_id'] ?? null, + 'company_id' => $originalData['meta']['company_id'] ?? null, + 'store_id' => $originalData['meta']['store_id'] ?? null, + 'data_type' => $originalData['data_type'] ?? 'unknown', + 'failed_at' => date('c'), + 'retry_count' => $this->getRetryCount($message), + ], + ]; + + // 直接发布到 errors.queue + // 注意:这里直接发送到队列,不经过 exchange + $this->producer->produce( + new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage { + protected string $exchange = ''; + protected string|array $routingKey = 'errors.queue'; + protected array $properties = [ + 'delivery_mode' => 2, // 持久化 + ]; + + public function __construct(array $data) + { + $this->payload = $data; + } + } + ); + + Log::get()->warning('Message sent to error queue after exceeding retry limit', [ + 'error_id' => $errorMessage['error_id'], + 'retry_count' => $errorMessage['metadata']['retry_count'], + 'error_message' => $error->getMessage(), + ]); + } catch (Throwable $e) { + // 发送到错误队列失败,记录日志 + Log::get()->error('Failed to send message to error queue', [ + 'error' => $e->getMessage(), + 'original_error' => $error->getMessage(), + ]); + } + } } diff --git a/backend/app/Platform/OrderProducer.php b/backend/app/Platform/OrderProducer.php index afe5a6d..1b45ba8 100644 --- a/backend/app/Platform/OrderProducer.php +++ b/backend/app/Platform/OrderProducer.php @@ -74,10 +74,11 @@ class OrderProducer extends ProducerMessage 'timestamp' => date('c'), // ISO 8601 格式 'platform' => 'tmall', 'data_type' => 'order', - 'metadata' => [ + 'meta' => [ 'platform_id' => $data['platform_id'] ?? null, 'company_id' => $data['company_id'] ?? null, 'store_id' => $data['store_id'] ?? null, + 'unique_id' => $data['unique_id'] ?? null, 'source_system' => 'tmall-open-api', 'retry_count' => 0, 'data_version' => $data['data_version'] ?? time(), diff --git a/backend/bin/rabbitmq.sh b/backend/bin/rabbitmq.sh index 3b979f9..b9842ef 100755 --- a/backend/bin/rabbitmq.sh +++ b/backend/bin/rabbitmq.sh @@ -121,21 +121,17 @@ for dtype in "${DATA_TYPES[@]}"; do echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)" done -# 8. 绑定 DLX 到重试队列和错误队列 +# 8. 绑定 DLX 到重试队列 echo "" -echo "绑定 DLX 到重试队列和错误队列..." +echo "绑定 DLX 到重试队列..." for dtype in "${DATA_TYPES[@]}"; do # DLX -> 重试队列 + # 注意:方案B中,超过重试次数的消息由 Consumer 直接发送到 errors.queue + # 不使用 DLX 的 routing_key="error" 路由,因此不需要绑定到 errors.queue rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \ --destination-type queue --vhost "$VHOST" --routing-key "retry" echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)" - - # DLX -> 错误队列 - rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ - declare binding --source "dlx.${dtype}" --destination "errors.queue" \ - --destination-type queue --vhost "$VHOST" --routing-key "error" - echo "✓ 绑定: dlx.${dtype} → errors.queue (routing_key: error)" done # 9. 为每个平台创建 Exchange 和 Binding @@ -206,12 +202,16 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags "" echo "✓ 创建用户: user_dataflow_consumer" +# Consumer 需要完整的权限来声明和消费队列 +# - configure: 允许声明 main.exchange、DLX exchanges 和所有队列 +# - write: 允许向业务队列写入(ACK/NACK)、DLX 和 errors exchange +# - read: 允许读取 main.exchange、业务队列和 DLX exchanges rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \ - --configure "^(orders|products|refunds|inventory).*\\.queue$" \ - --write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \ - --read "^(orders|products|refunds|inventory).*\\.queue$" -echo "✓ 配置用户权限" + --configure "^(main\\.exchange|dlx\\..*)|(.*\\.queue)$" \ + --write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(.*\\.errors\\.exchange)$" \ + --read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$" +echo "✓ 配置用户权限 (configure: main.exchange+DLX+queues, write: business queues+DLX+errors, read: main.exchange+business queues+DLX)" # 11. 创建运维监控用户 echo ""