update mq controller

This commit is contained in:
2026-03-13 15:01:55 +08:00
parent a3e8a5cf5d
commit 0c013a8c10
9 changed files with 991 additions and 1 deletions
+34
View File
@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace App\Platform;
use App\Entity\Parse\EntityParseFactory;
use App\Model\FailedMessage;
use App\Utils\Log;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Builder\QueueBuilder;
@@ -265,6 +266,9 @@ class OrderConsumer extends ConsumerMessage
$error_producer = new ErrorProducer($message, $error, $retry_count);
$producer->produce($error_producer);
// 同步写入 failed_messages 表
$this->persistFailedMessage($error_producer->payload);
// 记录日志
Log::get()->warning('Message sent to error queue after exceeding retry limit', [
'error_id' => $error_producer->payload['error_id'] ?? 'unknown',
@@ -453,4 +457,34 @@ class OrderConsumer extends ConsumerMessage
}
}
}
/**
* 持久化失败消息到数据库
*/
protected function persistFailedMessage(array $payload): void
{
try {
FailedMessage::query()->create([
'error_id' => $payload['error_id'],
'data_type' => $payload['metadata']['data_type'] ?? 'order',
'platform' => $payload['metadata']['platform'] ?? null,
'platform_id' => $payload['metadata']['platform_id'] ?? null,
'company_id' => $payload['metadata']['company_id'] ?? null,
'store_id' => $payload['metadata']['store_id'] ?? null,
'error_type' => $payload['error']['type'] ?? 'Unknown',
'error_message' => $payload['error']['message'] ?? '',
'error_code' => $payload['error']['code'] ?? 0,
'error_trace' => $payload['error']['trace'] ?? '',
'original_message' => $payload['original_message'] ?? [],
'retry_count' => $payload['metadata']['retry_count'] ?? 0,
'message_id' => $payload['metadata']['message_id'] ?? null,
'failed_at' => $payload['metadata']['failed_at'] ?? date('c'),
]);
} catch (Throwable $e) {
Log::get()->error('Failed to persist failed message to database', [
'error' => $e->getMessage(),
'error_id' => $payload['error_id'] ?? 'unknown',
]);
}
}
}
+34
View File
@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace App\Platform;
use App\Entity\Parse\EntityParseFactory;
use App\Model\FailedMessage;
use App\Utils\Log;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Builder\QueueBuilder;
@@ -191,6 +192,9 @@ class ProductConsumer extends ConsumerMessage
$error_producer = new ErrorProducer($message, $error, $retry_count);
$producer->produce($error_producer);
// 同步写入 failed_messages 表
$this->persistFailedMessage($error_producer->payload);
Log::get()->warning('Product message sent to error queue after exceeding retry limit', [
'error_id' => $error_producer->payload['error_id'] ?? 'unknown',
'retry_count' => $retry_count,
@@ -204,4 +208,34 @@ class ProductConsumer extends ConsumerMessage
]);
}
}
/**
* 持久化失败消息到数据库
*/
protected function persistFailedMessage(array $payload): void
{
try {
FailedMessage::query()->create([
'error_id' => $payload['error_id'],
'data_type' => $payload['metadata']['data_type'] ?? 'product',
'platform' => $payload['metadata']['platform'] ?? null,
'platform_id' => $payload['metadata']['platform_id'] ?? null,
'company_id' => $payload['metadata']['company_id'] ?? null,
'store_id' => $payload['metadata']['store_id'] ?? null,
'error_type' => $payload['error']['type'] ?? 'Unknown',
'error_message' => $payload['error']['message'] ?? '',
'error_code' => $payload['error']['code'] ?? 0,
'error_trace' => $payload['error']['trace'] ?? '',
'original_message' => $payload['original_message'] ?? [],
'retry_count' => $payload['metadata']['retry_count'] ?? 0,
'message_id' => $payload['metadata']['message_id'] ?? null,
'failed_at' => $payload['metadata']['failed_at'] ?? date('c'),
]);
} catch (Throwable $e) {
Log::get()->error('Failed to persist failed message to database', [
'error' => $e->getMessage(),
'error_id' => $payload['error_id'] ?? 'unknown',
]);
}
}
}
+35 -1
View File
@@ -5,6 +5,8 @@ declare(strict_types=1);
namespace App\Platform;
use App\Entity\Parse\EntityParseFactory;
use App\Model\FailedMessage;
use App\Model\RefundItem;
use App\Utils\Log;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Builder\QueueBuilder;
@@ -13,7 +15,6 @@ use Hyperf\Amqp\Result;
use Hyperf\Amqp\Producer;
use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\DbConnection\Db;
use App\Model\RefundItem;
use Hyperf\Context\ApplicationContext;
use Throwable;
@@ -233,6 +234,9 @@ class RefundConsumer extends ConsumerMessage
$error_producer = new ErrorProducer($message, $error, $retry_count);
$producer->produce($error_producer);
// 同步写入 failed_messages 表
$this->persistFailedMessage($error_producer->payload);
Log::get()->warning('Refund message sent to error queue after exceeding retry limit', [
'error_id' => $error_producer->payload['error_id'] ?? 'unknown',
'retry_count' => $retry_count,
@@ -247,6 +251,36 @@ class RefundConsumer extends ConsumerMessage
}
}
/**
* 持久化失败消息到数据库
*/
protected function persistFailedMessage(array $payload): void
{
try {
FailedMessage::query()->create([
'error_id' => $payload['error_id'],
'data_type' => $payload['metadata']['data_type'] ?? 'refund',
'platform' => $payload['metadata']['platform'] ?? null,
'platform_id' => $payload['metadata']['platform_id'] ?? null,
'company_id' => $payload['metadata']['company_id'] ?? null,
'store_id' => $payload['metadata']['store_id'] ?? null,
'error_type' => $payload['error']['type'] ?? 'Unknown',
'error_message' => $payload['error']['message'] ?? '',
'error_code' => $payload['error']['code'] ?? 0,
'error_trace' => $payload['error']['trace'] ?? '',
'original_message' => $payload['original_message'] ?? [],
'retry_count' => $payload['metadata']['retry_count'] ?? 0,
'message_id' => $payload['metadata']['message_id'] ?? null,
'failed_at' => $payload['metadata']['failed_at'] ?? date('c'),
]);
} catch (Throwable $e) {
Log::get()->error('Failed to persist failed message to database', [
'error' => $e->getMessage(),
'error_id' => $payload['error_id'] ?? 'unknown',
]);
}
}
/**
* 处理退款子项的批量同步
*