From 2c32a0c0af1b63761bcfe0b52193f29176a4f774 Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Mon, 1 Dec 2025 16:31:13 +0800 Subject: [PATCH] update order consumer --- backend/.env.example | 4 + backend/app/Platform/ErrorProducer.php | 114 +++++++++++++++++++++++++ backend/app/Platform/OrderConsumer.php | 97 ++++++++++----------- backend/bin/rabbitmq.sh | 35 ++++++-- 4 files changed, 188 insertions(+), 62 deletions(-) create mode 100644 backend/app/Platform/ErrorProducer.php diff --git a/backend/.env.example b/backend/.env.example index 6c32b2f..b1371aa 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -16,6 +16,10 @@ AMQP_PORT=5672 AMQP_USER="user_tmall" AMQP_PASSWORD="change_me_tmall" AMQP_VHOST="dataflow" +# 消息最大重试次数(默认3次),超过后进入错误队列 +AMQP_MAX_RETRIES=3 +# 调试模式:消费者处理每条消息的延迟秒数(默认0,设置为2可方便在mq:status中观察) +AMQP_CONSUMER_DEBUG_DELAY=0 REDIS_HOST=localhost REDIS_AUTH=(null) diff --git a/backend/app/Platform/ErrorProducer.php b/backend/app/Platform/ErrorProducer.php new file mode 100644 index 0000000..8de9b05 --- /dev/null +++ b/backend/app/Platform/ErrorProducer.php @@ -0,0 +1,114 @@ + 2, // 持久化消息 + ]; + + /** + * Exchange 配置 + */ + protected string $exchange = 'errors.exchange'; + + /** + * Routing Key 配置 + */ + protected string|array $routingKey = 'error'; + + /** + * 构造错误消息 + * + * @param AMQPMessage $originalMessage 原始失败的消息 + * @param Throwable $error 错误异常 + * @param int $retryCount 重试次数 + */ + public function __construct( + AMQPMessage $originalMessage, + Throwable $error, + int $retryCount + ) { + $this->payload = $this->buildErrorMessage($originalMessage, $error, $retryCount); + } + + /** + * 构建错误消息格式 + * + * @param AMQPMessage $originalMessage 原始消息 + * @param Throwable $error 错误信息 + * @param int $retryCount 重试次数 + * @return array + */ + protected function buildErrorMessage( + AMQPMessage $originalMessage, + Throwable $error, + int $retryCount + ): array { + $originalData = json_decode($originalMessage->getBody(), true); + + return [ + 'error_id' => $this->generateErrorId(), + 'original_message' => $originalData, + 'error' => [ + 'type' => get_class($error), + 'message' => $error->getMessage(), + 'code' => $error->getCode(), + 'file' => $error->getFile(), + 'line' => $error->getLine(), + 'trace' => $error->getTraceAsString(), + 'timestamp' => date('c'), + ], + 'metadata' => [ + 'platform' => $originalData['platform'] ?? 'unknown', + 'platform_id' => $originalData['meta']['platform_id'] ?? null, + 'company_id' => $originalData['meta']['company_id'] ?? null, + 'store_id' => $originalData['meta']['store_id'] ?? null, + 'data_type' => $originalData['data_type'] ?? 'unknown', + 'message_id' => $originalData['message_id'] ?? null, + 'failed_at' => date('c'), + 'retry_count' => $retryCount, + ], + ]; + } + + /** + * 生成错误ID + * + * 格式: err_{timestamp}_{random} + * + * @return string + */ + protected function generateErrorId(): string + { + return uniqid('err_', true); + } +} diff --git a/backend/app/Platform/OrderConsumer.php b/backend/app/Platform/OrderConsumer.php index 299b1fc..30bbd7f 100644 --- a/backend/app/Platform/OrderConsumer.php +++ b/backend/app/Platform/OrderConsumer.php @@ -41,12 +41,6 @@ class OrderConsumer extends ConsumerMessage protected $entityType = 'order'; - #[Inject] - protected EntityParseFactory $entityParseFactory; - - #[Inject] - protected Producer $producer; - /** * 重写队列构建器,设置队列参数 * 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误 @@ -81,7 +75,7 @@ class OrderConsumer extends ConsumerMessage // dump('---'); dump('---message'); - dump(json_decode($message->getBody(), true)); + dump(json_decode($message->getBody(), true)['message_id']); dump('---'); // 获取重试次数 @@ -91,7 +85,8 @@ class OrderConsumer extends ConsumerMessage dump("Retry count: {$retryCount}/{$maxRetries}"); try { - $parse = $this->entityParseFactory->createFromMessage($message); + // EntityParseFactory 使用静态方法调用,无需依赖注入 + $parse = EntityParseFactory::createFromMessage($message); // 提取 metadata $metadata = [ @@ -122,7 +117,12 @@ class OrderConsumer extends ConsumerMessage // 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。 return Result::ACK; } catch (Throwable $error) { - dump($error->getMessage()); + dump("=== Error Caught ==="); + dump("Error: " . $error->getMessage()); + dump("Retry Count: {$retryCount}"); + dump("Max Retries: {$maxRetries}"); + dump("Check: {$retryCount} >= {$maxRetries} = " . ($retryCount >= $maxRetries ? 'TRUE' : 'FALSE')); + Log::get()->error('Consumer processing failed', [ 'error' => $error->getMessage(), 'retry_count' => $retryCount, @@ -133,16 +133,24 @@ class OrderConsumer extends ConsumerMessage // 检查是否超过最大重试次数 if ($retryCount >= $maxRetries) { // 超过重试次数,发送到错误队列 - dump("Max retries exceeded ({$retryCount}/{$maxRetries}), sending to error queue..."); - $this->sendToErrorQueue($message, $error); + dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); + + try { + $this->sendToErrorQueue($message, $error); + dump(">>> Successfully sent to error queue!"); + } catch (Throwable $e) { + dump(">>> FAILED to send to error queue: " . $e->getMessage()); + dump(">>> Stack trace: " . $e->getTraceAsString()); + } // 返回 ACK 避免消息再次重试 // 因为消息已经被发送到错误队列,不应该继续在主队列中循环 + dump(">>> Returning ACK to prevent further retries"); return Result::ACK; } // 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列 - dump("Retry {$retryCount}/{$maxRetries}, sending to retry queue..."); + dump(">>> Retry not exceeded, sending to DLX (NACK)"); return Result::NACK; } } @@ -161,21 +169,35 @@ class OrderConsumer extends ConsumerMessage */ protected function getRetryCount(AMQPMessage $message): int { + // 检查是否存在 application_headers 属性 + // 首次处理的消息不会有这个属性,必须先用 has() 检查 + if (!$message->has('application_headers')) { + dump(">>> No application_headers, first time processing"); + return 0; + } + $headers = $message->get('application_headers'); if (!$headers) { + dump(">>> application_headers exists but is empty"); return 0; } $headerData = $headers->getNativeData(); $xDeath = $headerData['x-death'] ?? []; + dump(">>> x-death header data:"); + dump($xDeath); + if (empty($xDeath)) { + dump(">>> x-death is empty"); return 0; // 首次失败 } // x-death 是一个数组,第一个元素包含 count 字段 // count 表示消息从该队列死信的次数 - return $xDeath[0]['count'] ?? 0; + $count = $xDeath[0]['count'] ?? 0; + dump(">>> Extracted count from x-death: {$count}"); + return $count; } /** @@ -189,53 +211,22 @@ class OrderConsumer extends ConsumerMessage protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void { try { - $originalData = json_decode($message->getBody(), true); + $retryCount = $this->getRetryCount($message); - // 构建错误消息 - $errorMessage = [ - 'error_id' => uniqid('err_', true), - 'original_message' => $originalData, - 'error' => [ - 'type' => get_class($error), - 'message' => $error->getMessage(), - 'trace' => $error->getTraceAsString(), - 'timestamp' => date('c'), - ], - 'metadata' => [ - 'platform' => $originalData['platform'] ?? 'unknown', - 'platform_id' => $originalData['meta']['platform_id'] ?? null, - 'company_id' => $originalData['meta']['company_id'] ?? null, - 'store_id' => $originalData['meta']['store_id'] ?? null, - 'data_type' => $originalData['data_type'] ?? 'unknown', - 'failed_at' => date('c'), - 'retry_count' => $this->getRetryCount($message), - ], - ]; - - // 直接发布到 errors.queue - // 注意:这里直接发送到队列,不经过 exchange - $this->producer->produce( - new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage { - protected string $exchange = ''; - protected string|array $routingKey = 'errors.queue'; - protected array $properties = [ - 'delivery_mode' => 2, // 持久化 - ]; - - public function __construct(array $data) - { - $this->payload = $data; - } - } - ); + // 使用 ErrorProducer 发送到错误队列 + $producer = ApplicationContext::getContainer()->get(Producer::class); + $errorProducer = new ErrorProducer($message, $error, $retryCount); + $producer->produce($errorProducer); + // 记录日志 Log::get()->warning('Message sent to error queue after exceeding retry limit', [ - 'error_id' => $errorMessage['error_id'], - 'retry_count' => $errorMessage['metadata']['retry_count'], + 'error_id' => $errorProducer->payload['error_id'] ?? 'unknown', + 'retry_count' => $retryCount, 'error_message' => $error->getMessage(), ]); } catch (Throwable $e) { // 发送到错误队列失败,记录日志 + dump($e->getMessage()); Log::get()->error('Failed to send message to error queue', [ 'error' => $e->getMessage(), 'original_error' => $error->getMessage(), diff --git a/backend/bin/rabbitmq.sh b/backend/bin/rabbitmq.sh index b9842ef..a5e5a92 100755 --- a/backend/bin/rabbitmq.sh +++ b/backend/bin/rabbitmq.sh @@ -96,20 +96,37 @@ done echo "" echo "创建重试队列..." for dtype in "${DATA_TYPES[@]}"; do + # 去掉末尾的 's' 得到单数形式(orders -> order) + dtype_singular="${dtype%s}" + + # 配置回流时的 routing key 为 {type}.retry,可以被 {type}.# 匹配 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \ - --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}" - echo "✓ 创建重试队列: ${dtype}.retry.queue" + --arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}" + echo "✓ 创建重试队列: ${dtype}.retry.queue (回流 routing key: ${dtype_singular}.retry)" done -# 6. 创建错误队列 +# 6. 创建错误 Exchange 和错误队列 echo "" -echo "创建错误队列..." +echo "创建错误 Exchange 和错误队列..." +# 创建通用错误 Exchange(供 Consumer 发送错误消息使用) +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare exchange --name "errors.exchange" --vhost "$VHOST" \ + --type topic --durable true +echo "✓ 创建错误 Exchange: errors.exchange" + +# 创建错误队列 rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare queue --name "errors.queue" --vhost "$VHOST" --durable true \ --arguments '{"x-message-ttl":604800000}' echo "✓ 创建错误队列: errors.queue" +# 绑定通用错误 Exchange 到错误队列 +rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ + declare binding --source "errors.exchange" --destination "errors.queue" \ + --destination-type queue --vhost "$VHOST" --routing-key "#" +echo "✓ 绑定: errors.exchange → errors.queue (routing_key: #)" + # 7. 绑定主 Exchange 到主队列(使用通配符) echo "" echo "绑定主 Exchange 到主队列..." @@ -203,15 +220,15 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ echo "✓ 创建用户: user_dataflow_consumer" # Consumer 需要完整的权限来声明和消费队列 -# - configure: 允许声明 main.exchange、DLX exchanges 和所有队列 -# - write: 允许向业务队列写入(ACK/NACK)、DLX 和 errors exchange +# - configure: 允许声明 main.exchange、DLX exchanges、通用 errors.exchange 和所有队列 +# - write: 允许向业务队列写入(ACK/NACK)、DLX、通用 errors.exchange 和平台 errors exchanges # - read: 允许读取 main.exchange、业务队列和 DLX exchanges rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \ declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \ - --configure "^(main\\.exchange|dlx\\..*)|(.*\\.queue)$" \ - --write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(.*\\.errors\\.exchange)$" \ + --configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \ + --write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \ --read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$" -echo "✓ 配置用户权限 (configure: main.exchange+DLX+queues, write: business queues+DLX+errors, read: main.exchange+business queues+DLX)" +echo "✓ 配置用户权限 (configure: main.exchange+errors.exchange+DLX+queues, write: business queues+DLX+errors.exchange, read: main.exchange+business queues+DLX)" # 11. 创建运维监控用户 echo ""