update order consumer
This commit is contained in:
@@ -16,6 +16,10 @@ AMQP_PORT=5672
|
|||||||
AMQP_USER="user_tmall"
|
AMQP_USER="user_tmall"
|
||||||
AMQP_PASSWORD="change_me_tmall"
|
AMQP_PASSWORD="change_me_tmall"
|
||||||
AMQP_VHOST="dataflow"
|
AMQP_VHOST="dataflow"
|
||||||
|
# 消息最大重试次数(默认3次),超过后进入错误队列
|
||||||
|
AMQP_MAX_RETRIES=3
|
||||||
|
# 调试模式:消费者处理每条消息的延迟秒数(默认0,设置为2可方便在mq:status中观察)
|
||||||
|
AMQP_CONSUMER_DEBUG_DELAY=0
|
||||||
|
|
||||||
REDIS_HOST=localhost
|
REDIS_HOST=localhost
|
||||||
REDIS_AUTH=(null)
|
REDIS_AUTH=(null)
|
||||||
|
|||||||
@@ -0,0 +1,114 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace App\Platform;
|
||||||
|
|
||||||
|
use Hyperf\Amqp\Annotation\Producer;
|
||||||
|
use Hyperf\Amqp\Message\ProducerMessage;
|
||||||
|
use PhpAmqpLib\Message\AMQPMessage;
|
||||||
|
use Throwable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 错误消息生产者
|
||||||
|
*
|
||||||
|
* 用于将处理失败且超过重试次数的消息发送到 errors.queue
|
||||||
|
* 使用 errors.exchange 进行路由
|
||||||
|
*/
|
||||||
|
#[Producer(exchange: 'errors.exchange', routingKey: 'error')]
|
||||||
|
class ErrorProducer extends ProducerMessage
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* 使用 consumer 连接池(因为是在 consumer 中调用)
|
||||||
|
*/
|
||||||
|
protected string $poolName = 'default_consumer';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* VHost
|
||||||
|
*/
|
||||||
|
protected string $vhost = 'dataflow';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息持久化
|
||||||
|
*/
|
||||||
|
protected array $properties = [
|
||||||
|
'delivery_mode' => 2, // 持久化消息
|
||||||
|
];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exchange 配置
|
||||||
|
*/
|
||||||
|
protected string $exchange = 'errors.exchange';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Routing Key 配置
|
||||||
|
*/
|
||||||
|
protected string|array $routingKey = 'error';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构造错误消息
|
||||||
|
*
|
||||||
|
* @param AMQPMessage $originalMessage 原始失败的消息
|
||||||
|
* @param Throwable $error 错误异常
|
||||||
|
* @param int $retryCount 重试次数
|
||||||
|
*/
|
||||||
|
public function __construct(
|
||||||
|
AMQPMessage $originalMessage,
|
||||||
|
Throwable $error,
|
||||||
|
int $retryCount
|
||||||
|
) {
|
||||||
|
$this->payload = $this->buildErrorMessage($originalMessage, $error, $retryCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构建错误消息格式
|
||||||
|
*
|
||||||
|
* @param AMQPMessage $originalMessage 原始消息
|
||||||
|
* @param Throwable $error 错误信息
|
||||||
|
* @param int $retryCount 重试次数
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
protected function buildErrorMessage(
|
||||||
|
AMQPMessage $originalMessage,
|
||||||
|
Throwable $error,
|
||||||
|
int $retryCount
|
||||||
|
): array {
|
||||||
|
$originalData = json_decode($originalMessage->getBody(), true);
|
||||||
|
|
||||||
|
return [
|
||||||
|
'error_id' => $this->generateErrorId(),
|
||||||
|
'original_message' => $originalData,
|
||||||
|
'error' => [
|
||||||
|
'type' => get_class($error),
|
||||||
|
'message' => $error->getMessage(),
|
||||||
|
'code' => $error->getCode(),
|
||||||
|
'file' => $error->getFile(),
|
||||||
|
'line' => $error->getLine(),
|
||||||
|
'trace' => $error->getTraceAsString(),
|
||||||
|
'timestamp' => date('c'),
|
||||||
|
],
|
||||||
|
'metadata' => [
|
||||||
|
'platform' => $originalData['platform'] ?? 'unknown',
|
||||||
|
'platform_id' => $originalData['meta']['platform_id'] ?? null,
|
||||||
|
'company_id' => $originalData['meta']['company_id'] ?? null,
|
||||||
|
'store_id' => $originalData['meta']['store_id'] ?? null,
|
||||||
|
'data_type' => $originalData['data_type'] ?? 'unknown',
|
||||||
|
'message_id' => $originalData['message_id'] ?? null,
|
||||||
|
'failed_at' => date('c'),
|
||||||
|
'retry_count' => $retryCount,
|
||||||
|
],
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 生成错误ID
|
||||||
|
*
|
||||||
|
* 格式: err_{timestamp}_{random}
|
||||||
|
*
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
protected function generateErrorId(): string
|
||||||
|
{
|
||||||
|
return uniqid('err_', true);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -41,12 +41,6 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
|
|
||||||
protected $entityType = 'order';
|
protected $entityType = 'order';
|
||||||
|
|
||||||
#[Inject]
|
|
||||||
protected EntityParseFactory $entityParseFactory;
|
|
||||||
|
|
||||||
#[Inject]
|
|
||||||
protected Producer $producer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 重写队列构建器,设置队列参数
|
* 重写队列构建器,设置队列参数
|
||||||
* 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误
|
* 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误
|
||||||
@@ -81,7 +75,7 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
// dump('---');
|
// dump('---');
|
||||||
|
|
||||||
dump('---message');
|
dump('---message');
|
||||||
dump(json_decode($message->getBody(), true));
|
dump(json_decode($message->getBody(), true)['message_id']);
|
||||||
dump('---');
|
dump('---');
|
||||||
|
|
||||||
// 获取重试次数
|
// 获取重试次数
|
||||||
@@ -91,7 +85,8 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
dump("Retry count: {$retryCount}/{$maxRetries}");
|
dump("Retry count: {$retryCount}/{$maxRetries}");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$parse = $this->entityParseFactory->createFromMessage($message);
|
// EntityParseFactory 使用静态方法调用,无需依赖注入
|
||||||
|
$parse = EntityParseFactory::createFromMessage($message);
|
||||||
|
|
||||||
// 提取 metadata
|
// 提取 metadata
|
||||||
$metadata = [
|
$metadata = [
|
||||||
@@ -122,7 +117,12 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
|
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
|
||||||
return Result::ACK;
|
return Result::ACK;
|
||||||
} catch (Throwable $error) {
|
} catch (Throwable $error) {
|
||||||
dump($error->getMessage());
|
dump("=== Error Caught ===");
|
||||||
|
dump("Error: " . $error->getMessage());
|
||||||
|
dump("Retry Count: {$retryCount}");
|
||||||
|
dump("Max Retries: {$maxRetries}");
|
||||||
|
dump("Check: {$retryCount} >= {$maxRetries} = " . ($retryCount >= $maxRetries ? 'TRUE' : 'FALSE'));
|
||||||
|
|
||||||
Log::get()->error('Consumer processing failed', [
|
Log::get()->error('Consumer processing failed', [
|
||||||
'error' => $error->getMessage(),
|
'error' => $error->getMessage(),
|
||||||
'retry_count' => $retryCount,
|
'retry_count' => $retryCount,
|
||||||
@@ -133,16 +133,24 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
// 检查是否超过最大重试次数
|
// 检查是否超过最大重试次数
|
||||||
if ($retryCount >= $maxRetries) {
|
if ($retryCount >= $maxRetries) {
|
||||||
// 超过重试次数,发送到错误队列
|
// 超过重试次数,发送到错误队列
|
||||||
dump("Max retries exceeded ({$retryCount}/{$maxRetries}), sending to error queue...");
|
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
|
||||||
|
|
||||||
|
try {
|
||||||
$this->sendToErrorQueue($message, $error);
|
$this->sendToErrorQueue($message, $error);
|
||||||
|
dump(">>> Successfully sent to error queue!");
|
||||||
|
} catch (Throwable $e) {
|
||||||
|
dump(">>> FAILED to send to error queue: " . $e->getMessage());
|
||||||
|
dump(">>> Stack trace: " . $e->getTraceAsString());
|
||||||
|
}
|
||||||
|
|
||||||
// 返回 ACK 避免消息再次重试
|
// 返回 ACK 避免消息再次重试
|
||||||
// 因为消息已经被发送到错误队列,不应该继续在主队列中循环
|
// 因为消息已经被发送到错误队列,不应该继续在主队列中循环
|
||||||
|
dump(">>> Returning ACK to prevent further retries");
|
||||||
return Result::ACK;
|
return Result::ACK;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列
|
// 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列
|
||||||
dump("Retry {$retryCount}/{$maxRetries}, sending to retry queue...");
|
dump(">>> Retry not exceeded, sending to DLX (NACK)");
|
||||||
return Result::NACK;
|
return Result::NACK;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -161,21 +169,35 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
*/
|
*/
|
||||||
protected function getRetryCount(AMQPMessage $message): int
|
protected function getRetryCount(AMQPMessage $message): int
|
||||||
{
|
{
|
||||||
|
// 检查是否存在 application_headers 属性
|
||||||
|
// 首次处理的消息不会有这个属性,必须先用 has() 检查
|
||||||
|
if (!$message->has('application_headers')) {
|
||||||
|
dump(">>> No application_headers, first time processing");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
$headers = $message->get('application_headers');
|
$headers = $message->get('application_headers');
|
||||||
if (!$headers) {
|
if (!$headers) {
|
||||||
|
dump(">>> application_headers exists but is empty");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
$headerData = $headers->getNativeData();
|
$headerData = $headers->getNativeData();
|
||||||
$xDeath = $headerData['x-death'] ?? [];
|
$xDeath = $headerData['x-death'] ?? [];
|
||||||
|
|
||||||
|
dump(">>> x-death header data:");
|
||||||
|
dump($xDeath);
|
||||||
|
|
||||||
if (empty($xDeath)) {
|
if (empty($xDeath)) {
|
||||||
|
dump(">>> x-death is empty");
|
||||||
return 0; // 首次失败
|
return 0; // 首次失败
|
||||||
}
|
}
|
||||||
|
|
||||||
// x-death 是一个数组,第一个元素包含 count 字段
|
// x-death 是一个数组,第一个元素包含 count 字段
|
||||||
// count 表示消息从该队列死信的次数
|
// count 表示消息从该队列死信的次数
|
||||||
return $xDeath[0]['count'] ?? 0;
|
$count = $xDeath[0]['count'] ?? 0;
|
||||||
|
dump(">>> Extracted count from x-death: {$count}");
|
||||||
|
return $count;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -189,53 +211,22 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
|
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
$originalData = json_decode($message->getBody(), true);
|
$retryCount = $this->getRetryCount($message);
|
||||||
|
|
||||||
// 构建错误消息
|
// 使用 ErrorProducer 发送到错误队列
|
||||||
$errorMessage = [
|
$producer = ApplicationContext::getContainer()->get(Producer::class);
|
||||||
'error_id' => uniqid('err_', true),
|
$errorProducer = new ErrorProducer($message, $error, $retryCount);
|
||||||
'original_message' => $originalData,
|
$producer->produce($errorProducer);
|
||||||
'error' => [
|
|
||||||
'type' => get_class($error),
|
|
||||||
'message' => $error->getMessage(),
|
|
||||||
'trace' => $error->getTraceAsString(),
|
|
||||||
'timestamp' => date('c'),
|
|
||||||
],
|
|
||||||
'metadata' => [
|
|
||||||
'platform' => $originalData['platform'] ?? 'unknown',
|
|
||||||
'platform_id' => $originalData['meta']['platform_id'] ?? null,
|
|
||||||
'company_id' => $originalData['meta']['company_id'] ?? null,
|
|
||||||
'store_id' => $originalData['meta']['store_id'] ?? null,
|
|
||||||
'data_type' => $originalData['data_type'] ?? 'unknown',
|
|
||||||
'failed_at' => date('c'),
|
|
||||||
'retry_count' => $this->getRetryCount($message),
|
|
||||||
],
|
|
||||||
];
|
|
||||||
|
|
||||||
// 直接发布到 errors.queue
|
|
||||||
// 注意:这里直接发送到队列,不经过 exchange
|
|
||||||
$this->producer->produce(
|
|
||||||
new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage {
|
|
||||||
protected string $exchange = '';
|
|
||||||
protected string|array $routingKey = 'errors.queue';
|
|
||||||
protected array $properties = [
|
|
||||||
'delivery_mode' => 2, // 持久化
|
|
||||||
];
|
|
||||||
|
|
||||||
public function __construct(array $data)
|
|
||||||
{
|
|
||||||
$this->payload = $data;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
|
// 记录日志
|
||||||
Log::get()->warning('Message sent to error queue after exceeding retry limit', [
|
Log::get()->warning('Message sent to error queue after exceeding retry limit', [
|
||||||
'error_id' => $errorMessage['error_id'],
|
'error_id' => $errorProducer->payload['error_id'] ?? 'unknown',
|
||||||
'retry_count' => $errorMessage['metadata']['retry_count'],
|
'retry_count' => $retryCount,
|
||||||
'error_message' => $error->getMessage(),
|
'error_message' => $error->getMessage(),
|
||||||
]);
|
]);
|
||||||
} catch (Throwable $e) {
|
} catch (Throwable $e) {
|
||||||
// 发送到错误队列失败,记录日志
|
// 发送到错误队列失败,记录日志
|
||||||
|
dump($e->getMessage());
|
||||||
Log::get()->error('Failed to send message to error queue', [
|
Log::get()->error('Failed to send message to error queue', [
|
||||||
'error' => $e->getMessage(),
|
'error' => $e->getMessage(),
|
||||||
'original_error' => $error->getMessage(),
|
'original_error' => $error->getMessage(),
|
||||||
|
|||||||
+26
-9
@@ -96,20 +96,37 @@ done
|
|||||||
echo ""
|
echo ""
|
||||||
echo "创建重试队列..."
|
echo "创建重试队列..."
|
||||||
for dtype in "${DATA_TYPES[@]}"; do
|
for dtype in "${DATA_TYPES[@]}"; do
|
||||||
|
# 去掉末尾的 's' 得到单数形式(orders -> order)
|
||||||
|
dtype_singular="${dtype%s}"
|
||||||
|
|
||||||
|
# 配置回流时的 routing key 为 {type}.retry,可以被 {type}.# 匹配
|
||||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||||
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \
|
declare queue --name "${dtype}.retry.queue" --vhost "$VHOST" --durable true \
|
||||||
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\"}"
|
--arguments "{\"x-message-ttl\":5000,\"x-dead-letter-exchange\":\"main.exchange\",\"x-dead-letter-routing-key\":\"${dtype_singular}.retry\"}"
|
||||||
echo "✓ 创建重试队列: ${dtype}.retry.queue"
|
echo "✓ 创建重试队列: ${dtype}.retry.queue (回流 routing key: ${dtype_singular}.retry)"
|
||||||
done
|
done
|
||||||
|
|
||||||
# 6. 创建错误队列
|
# 6. 创建错误 Exchange 和错误队列
|
||||||
echo ""
|
echo ""
|
||||||
echo "创建错误队列..."
|
echo "创建错误 Exchange 和错误队列..."
|
||||||
|
# 创建通用错误 Exchange(供 Consumer 发送错误消息使用)
|
||||||
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||||
|
declare exchange --name "errors.exchange" --vhost "$VHOST" \
|
||||||
|
--type topic --durable true
|
||||||
|
echo "✓ 创建错误 Exchange: errors.exchange"
|
||||||
|
|
||||||
|
# 创建错误队列
|
||||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||||
declare queue --name "errors.queue" --vhost "$VHOST" --durable true \
|
declare queue --name "errors.queue" --vhost "$VHOST" --durable true \
|
||||||
--arguments '{"x-message-ttl":604800000}'
|
--arguments '{"x-message-ttl":604800000}'
|
||||||
echo "✓ 创建错误队列: errors.queue"
|
echo "✓ 创建错误队列: errors.queue"
|
||||||
|
|
||||||
|
# 绑定通用错误 Exchange 到错误队列
|
||||||
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||||
|
declare binding --source "errors.exchange" --destination "errors.queue" \
|
||||||
|
--destination-type queue --vhost "$VHOST" --routing-key "#"
|
||||||
|
echo "✓ 绑定: errors.exchange → errors.queue (routing_key: #)"
|
||||||
|
|
||||||
# 7. 绑定主 Exchange 到主队列(使用通配符)
|
# 7. 绑定主 Exchange 到主队列(使用通配符)
|
||||||
echo ""
|
echo ""
|
||||||
echo "绑定主 Exchange 到主队列..."
|
echo "绑定主 Exchange 到主队列..."
|
||||||
@@ -203,15 +220,15 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ
|
|||||||
echo "✓ 创建用户: user_dataflow_consumer"
|
echo "✓ 创建用户: user_dataflow_consumer"
|
||||||
|
|
||||||
# Consumer 需要完整的权限来声明和消费队列
|
# Consumer 需要完整的权限来声明和消费队列
|
||||||
# - configure: 允许声明 main.exchange、DLX exchanges 和所有队列
|
# - configure: 允许声明 main.exchange、DLX exchanges、通用 errors.exchange 和所有队列
|
||||||
# - write: 允许向业务队列写入(ACK/NACK)、DLX 和 errors exchange
|
# - write: 允许向业务队列写入(ACK/NACK)、DLX、通用 errors.exchange 和平台 errors exchanges
|
||||||
# - read: 允许读取 main.exchange、业务队列和 DLX exchanges
|
# - read: 允许读取 main.exchange、业务队列和 DLX exchanges
|
||||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||||
declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \
|
declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \
|
||||||
--configure "^(main\\.exchange|dlx\\..*)|(.*\\.queue)$" \
|
--configure "^(main\\.exchange|errors\\.exchange|dlx\\..*)|(.*\\.queue)$" \
|
||||||
--write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(.*\\.errors\\.exchange)$" \
|
--write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(errors\\.exchange)|(.*\\.errors\\.exchange)$" \
|
||||||
--read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$"
|
--read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$"
|
||||||
echo "✓ 配置用户权限 (configure: main.exchange+DLX+queues, write: business queues+DLX+errors, read: main.exchange+business queues+DLX)"
|
echo "✓ 配置用户权限 (configure: main.exchange+errors.exchange+DLX+queues, write: business queues+DLX+errors.exchange, read: main.exchange+business queues+DLX)"
|
||||||
|
|
||||||
# 11. 创建运维监控用户
|
# 11. 创建运维监控用户
|
||||||
echo ""
|
echo ""
|
||||||
|
|||||||
Reference in New Issue
Block a user