From fcd376cab46fd1d95579342a1aca4ebceb2d84f9 Mon Sep 17 00:00:00 2001 From: Nick Zeng Date: Tue, 17 Mar 2026 10:36:38 +0800 Subject: [PATCH] implement FailedMessageTrait --- backend/app/Platform/OrderConsumer.php | 112 +----------------- backend/app/Platform/ProductConsumer.php | 95 +-------------- backend/app/Platform/RefundConsumer.php | 95 +-------------- .../Platform/Traits/FailedMessageTrait.php | 107 +++++++++++++++++ 4 files changed, 114 insertions(+), 295 deletions(-) create mode 100644 backend/app/Platform/Traits/FailedMessageTrait.php diff --git a/backend/app/Platform/OrderConsumer.php b/backend/app/Platform/OrderConsumer.php index 7c22f84..d74b6a8 100644 --- a/backend/app/Platform/OrderConsumer.php +++ b/backend/app/Platform/OrderConsumer.php @@ -5,18 +5,15 @@ declare(strict_types=1); namespace App\Platform; use App\Entity\Parse\EntityParseFactory; -use App\Model\FailedMessage; +use App\Platform\Traits\FailedMessageTrait; use App\Utils\Log; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Builder\QueueBuilder; use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Result; -use Hyperf\Amqp\Producer; use PhpAmqpLib\Message\AMQPMessage; use Hyperf\DbConnection\Db; use App\Model\OrderItem; -use Exception; -use Hyperf\Context\ApplicationContext; use Throwable; use function Hyperf\Support\env; @@ -24,6 +21,7 @@ use function Hyperf\Support\env; #[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", pool: "default_consumer", nums: 1, enable: true)] class OrderConsumer extends ConsumerMessage { + use FailedMessageTrait; protected ?array $qos = [ // AMQP 默认并没有实现此配置。 @@ -208,83 +206,6 @@ class OrderConsumer extends ConsumerMessage return parent::isEnable(); } - /** - * 获取消息的重试次数 - * 通过检查 x-death header 中的 count 字段 - * - * @param AMQPMessage $message - * @return int - */ - protected function getRetryCount(AMQPMessage $message): int - { - // 检查是否存在 application_headers 属性 - // 首次处理的消息不会有这个属性,必须先用 has() 检查 - if (!$message->has('application_headers')) { - dump(">>> No application_headers, first time processing"); - return 0; - } - - $headers = $message->get('application_headers'); - if (!$headers) { - dump(">>> application_headers exists but is empty"); - return 0; - } - - $headerData = $headers->getNativeData(); - $xDeath = $headerData['x-death'] ?? []; - - dump(">>> x-death header data:"); - dump($xDeath); - - if (empty($xDeath)) { - dump(">>> x-death is empty"); - return 0; // 首次失败 - } - - // x-death 是一个数组,第一个元素包含 count 字段 - // count 表示消息从该队列死信的次数 - $count = $xDeath[0]['count'] ?? 0; - dump(">>> Extracted count from x-death: {$count}"); - return $count; - } - - /** - * 发送消息到错误队列 - * 当重试次数超过上限时调用 - * - * @param AMQPMessage $message 原始消息 - * @param Throwable $error 错误信息 - * @return void - */ - protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void - { - try { - $retry_count = $this->getRetryCount($message); - - // 使用 ErrorProducer 发送到错误队列 - $producer = ApplicationContext::getContainer()->get(Producer::class); - $error_producer = new ErrorProducer($message, $error, $retry_count); - $producer->produce($error_producer); - - // 同步写入 failed_messages 表 - $this->persistFailedMessage($error_producer->payload); - - // 记录日志 - Log::get()->warning('Message sent to error queue after exceeding retry limit', [ - 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', - 'retry_count' => $retry_count, - 'error_message' => $error->getMessage(), - ]); - } catch (Throwable $e) { - // 发送到错误队列失败,记录日志 - dump($e->getMessage()); - Log::get()->error('Failed to send message to error queue', [ - 'error' => $e->getMessage(), - 'original_error' => $error->getMessage(), - ]); - } - } - /** * 处理订单子项的批量同步(优化版本) * @@ -458,33 +379,4 @@ class OrderConsumer extends ConsumerMessage } } - /** - * 持久化失败消息到数据库 - */ - protected function persistFailedMessage(array $payload): void - { - try { - FailedMessage::query()->create([ - 'error_id' => $payload['error_id'], - 'data_type' => $payload['metadata']['data_type'] ?? 'order', - 'platform' => $payload['metadata']['platform'] ?? null, - 'platform_id' => $payload['metadata']['platform_id'] ?? null, - 'company_id' => $payload['metadata']['company_id'] ?? null, - 'store_id' => $payload['metadata']['store_id'] ?? null, - 'error_type' => $payload['error']['type'] ?? 'Unknown', - 'error_message' => $payload['error']['message'] ?? '', - 'error_code' => $payload['error']['code'] ?? 0, - 'error_trace' => $payload['error']['trace'] ?? '', - 'original_message' => $payload['original_message'] ?? [], - 'retry_count' => $payload['metadata']['retry_count'] ?? 0, - 'message_id' => $payload['metadata']['message_id'] ?? null, - 'failed_at' => $payload['metadata']['failed_at'] ?? date('c'), - ]); - } catch (Throwable $e) { - Log::get()->error('Failed to persist failed message to database', [ - 'error' => $e->getMessage(), - 'error_id' => $payload['error_id'] ?? 'unknown', - ]); - } - } } diff --git a/backend/app/Platform/ProductConsumer.php b/backend/app/Platform/ProductConsumer.php index a10d7b8..63fac5c 100644 --- a/backend/app/Platform/ProductConsumer.php +++ b/backend/app/Platform/ProductConsumer.php @@ -5,16 +5,14 @@ declare(strict_types=1); namespace App\Platform; use App\Entity\Parse\EntityParseFactory; -use App\Model\FailedMessage; +use App\Platform\Traits\FailedMessageTrait; use App\Utils\Log; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Builder\QueueBuilder; use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Result; -use Hyperf\Amqp\Producer; use PhpAmqpLib\Message\AMQPMessage; use Hyperf\DbConnection\Db; -use Hyperf\Context\ApplicationContext; use Throwable; use function Hyperf\Support\env; @@ -22,6 +20,8 @@ use function Hyperf\Support\env; #[Consumer(exchange: "main.exchange", routingKey: "product.#", queue: "products.queue", pool: "default_consumer", nums: 1, enable: true)] class ProductConsumer extends ConsumerMessage { + use FailedMessageTrait; + protected ?array $qos = [ 'prefetch_size' => 0, 'prefetch_count' => 1, @@ -149,93 +149,4 @@ class ProductConsumer extends ConsumerMessage return parent::isEnable(); } - /** - * 获取消息的重试次数 - * - * @param AMQPMessage $message - * @return int - */ - protected function getRetryCount(AMQPMessage $message): int - { - if (!$message->has('application_headers')) { - return 0; - } - - $headers = $message->get('application_headers'); - if (!$headers) { - return 0; - } - - $headerData = $headers->getNativeData(); - $xDeath = $headerData['x-death'] ?? []; - - if (empty($xDeath)) { - return 0; - } - - return $xDeath[0]['count'] ?? 0; - } - - /** - * 发送消息到错误队列 - * - * @param AMQPMessage $message 原始消息 - * @param Throwable $error 错误信息 - * @return void - */ - protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void - { - try { - $retry_count = $this->getRetryCount($message); - - $producer = ApplicationContext::getContainer()->get(Producer::class); - $error_producer = new ErrorProducer($message, $error, $retry_count); - $producer->produce($error_producer); - - // 同步写入 failed_messages 表 - $this->persistFailedMessage($error_producer->payload); - - Log::get()->warning('Product message sent to error queue after exceeding retry limit', [ - 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', - 'retry_count' => $retry_count, - 'error_message' => $error->getMessage(), - ]); - } catch (Throwable $e) { - dump($e->getMessage()); - Log::get()->error('Failed to send product message to error queue', [ - 'error' => $e->getMessage(), - 'original_error' => $error->getMessage(), - ]); - } - } - - /** - * 持久化失败消息到数据库 - */ - protected function persistFailedMessage(array $payload): void - { - try { - FailedMessage::query()->create([ - 'error_id' => $payload['error_id'], - 'data_type' => $payload['metadata']['data_type'] ?? 'product', - 'platform' => $payload['metadata']['platform'] ?? null, - 'platform_id' => $payload['metadata']['platform_id'] ?? null, - 'company_id' => $payload['metadata']['company_id'] ?? null, - 'store_id' => $payload['metadata']['store_id'] ?? null, - 'error_type' => $payload['error']['type'] ?? 'Unknown', - 'error_message' => $payload['error']['message'] ?? '', - 'error_code' => $payload['error']['code'] ?? 0, - 'error_trace' => $payload['error']['trace'] ?? '', - 'original_message' => $payload['original_message'] ?? [], - 'retry_count' => $payload['metadata']['retry_count'] ?? 0, - 'message_id' => $payload['metadata']['message_id'] ?? null, - 'failed_at' => $payload['metadata']['failed_at'] ?? date('c'), - ]); - } catch (Throwable $e) { - Log::get()->error('Failed to persist failed message to database', [ - 'error' => $e->getMessage(), - 'error_id' => $payload['error_id'] ?? 'unknown', - ]); - } - } } diff --git a/backend/app/Platform/RefundConsumer.php b/backend/app/Platform/RefundConsumer.php index dd7c6e6..a0a6e14 100644 --- a/backend/app/Platform/RefundConsumer.php +++ b/backend/app/Platform/RefundConsumer.php @@ -5,17 +5,15 @@ declare(strict_types=1); namespace App\Platform; use App\Entity\Parse\EntityParseFactory; -use App\Model\FailedMessage; use App\Model\RefundItem; +use App\Platform\Traits\FailedMessageTrait; use App\Utils\Log; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Builder\QueueBuilder; use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Result; -use Hyperf\Amqp\Producer; use PhpAmqpLib\Message\AMQPMessage; use Hyperf\DbConnection\Db; -use Hyperf\Context\ApplicationContext; use Throwable; use function Hyperf\Support\env; @@ -23,6 +21,7 @@ use function Hyperf\Support\env; #[Consumer(exchange: "main.exchange", routingKey: "refund.#", queue: "refunds.queue", pool: "default_consumer", nums: 1, enable: true)] class RefundConsumer extends ConsumerMessage { + use FailedMessageTrait; protected ?array $qos = [ 'prefetch_size' => 0, @@ -191,96 +190,6 @@ class RefundConsumer extends ConsumerMessage return parent::isEnable(); } - /** - * 获取消息的重试次数 - * - * @param AMQPMessage $message - * @return int - */ - protected function getRetryCount(AMQPMessage $message): int - { - if (!$message->has('application_headers')) { - return 0; - } - - $headers = $message->get('application_headers'); - if (!$headers) { - return 0; - } - - $header_data = $headers->getNativeData(); - $x_death = $header_data['x-death'] ?? []; - - if (empty($x_death)) { - return 0; - } - - return $x_death[0]['count'] ?? 0; - } - - /** - * 发送消息到错误队列 - * - * @param AMQPMessage $message 原始消息 - * @param Throwable $error 错误信息 - * @return void - */ - protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void - { - try { - $retry_count = $this->getRetryCount($message); - - $producer = ApplicationContext::getContainer()->get(Producer::class); - $error_producer = new ErrorProducer($message, $error, $retry_count); - $producer->produce($error_producer); - - // 同步写入 failed_messages 表 - $this->persistFailedMessage($error_producer->payload); - - Log::get()->warning('Refund message sent to error queue after exceeding retry limit', [ - 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', - 'retry_count' => $retry_count, - 'error_message' => $error->getMessage(), - ]); - } catch (Throwable $e) { - dump($e->getMessage()); - Log::get()->error('Failed to send refund message to error queue', [ - 'error' => $e->getMessage(), - 'original_error' => $error->getMessage(), - ]); - } - } - - /** - * 持久化失败消息到数据库 - */ - protected function persistFailedMessage(array $payload): void - { - try { - FailedMessage::query()->create([ - 'error_id' => $payload['error_id'], - 'data_type' => $payload['metadata']['data_type'] ?? 'refund', - 'platform' => $payload['metadata']['platform'] ?? null, - 'platform_id' => $payload['metadata']['platform_id'] ?? null, - 'company_id' => $payload['metadata']['company_id'] ?? null, - 'store_id' => $payload['metadata']['store_id'] ?? null, - 'error_type' => $payload['error']['type'] ?? 'Unknown', - 'error_message' => $payload['error']['message'] ?? '', - 'error_code' => $payload['error']['code'] ?? 0, - 'error_trace' => $payload['error']['trace'] ?? '', - 'original_message' => $payload['original_message'] ?? [], - 'retry_count' => $payload['metadata']['retry_count'] ?? 0, - 'message_id' => $payload['metadata']['message_id'] ?? null, - 'failed_at' => $payload['metadata']['failed_at'] ?? date('c'), - ]); - } catch (Throwable $e) { - Log::get()->error('Failed to persist failed message to database', [ - 'error' => $e->getMessage(), - 'error_id' => $payload['error_id'] ?? 'unknown', - ]); - } - } - /** * 处理退款子项的批量同步 * diff --git a/backend/app/Platform/Traits/FailedMessageTrait.php b/backend/app/Platform/Traits/FailedMessageTrait.php new file mode 100644 index 0000000..7cb5549 --- /dev/null +++ b/backend/app/Platform/Traits/FailedMessageTrait.php @@ -0,0 +1,107 @@ +has('application_headers')) { + return 0; + } + + $headers = $message->get('application_headers'); + if (!$headers) { + return 0; + } + + $header_data = $headers->getNativeData(); + $x_death = $header_data['x-death'] ?? []; + + if (empty($x_death)) { + return 0; + } + + return $x_death[0]['count'] ?? 0; + } + + /** + * 发送消息到错误队列 + * 当重试次数超过上限时调用 + * + * @param AMQPMessage $message 原始消息 + * @param Throwable $error 错误信息 + */ + protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void + { + try { + $retry_count = $this->getRetryCount($message); + + $producer = ApplicationContext::getContainer()->get(Producer::class); + $error_producer = new ErrorProducer($message, $error, $retry_count); + $producer->produce($error_producer); + + // 同步写入 failed_messages 表 + $this->persistFailedMessage($error_producer->payload); + + Log::get()->warning('Message sent to error queue after exceeding retry limit', [ + 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', + 'retry_count' => $retry_count, + 'error_message' => $error->getMessage(), + ]); + } catch (Throwable $e) { + Log::get()->error('Failed to send message to error queue', [ + 'error' => $e->getMessage(), + 'original_error' => $error->getMessage(), + ]); + } + } + + /** + * 持久化失败消息到数据库 + */ + protected function persistFailedMessage(array $payload): void + { + try { + FailedMessage::query()->create([ + 'error_id' => $payload['error_id'], + 'data_type' => $payload['metadata']['data_type'] ?? 'unknown', + 'platform' => $payload['metadata']['platform'] ?? null, + 'platform_id' => $payload['metadata']['platform_id'] ?? null, + 'company_id' => $payload['metadata']['company_id'] ?? null, + 'store_id' => $payload['metadata']['store_id'] ?? null, + 'error_type' => $payload['error']['type'] ?? 'Unknown', + 'error_message' => $payload['error']['message'] ?? '', + 'error_code' => $payload['error']['code'] ?? 0, + 'error_trace' => $payload['error']['trace'] ?? '', + 'original_message' => $payload['original_message'] ?? [], + 'retry_count' => $payload['metadata']['retry_count'] ?? 0, + 'message_id' => $payload['metadata']['message_id'] ?? null, + 'failed_at' => $payload['metadata']['failed_at'] ?? date('c'), + ]); + } catch (Throwable $e) { + Log::get()->error('Failed to persist failed message to database', [ + 'error' => $e->getMessage(), + 'error_id' => $payload['error_id'] ?? 'unknown', + ]); + } + } +}