update mq
This commit is contained in:
@@ -5,49 +5,76 @@ declare(strict_types=1);
|
|||||||
namespace App\Platform;
|
namespace App\Platform;
|
||||||
|
|
||||||
use App\Entity\Parse\EntityParseFactory;
|
use App\Entity\Parse\EntityParseFactory;
|
||||||
|
use App\Utils\Log;
|
||||||
use Hyperf\Amqp\Annotation\Consumer;
|
use Hyperf\Amqp\Annotation\Consumer;
|
||||||
|
use Hyperf\Amqp\Builder\QueueBuilder;
|
||||||
use Hyperf\Amqp\Message\ConsumerMessage;
|
use Hyperf\Amqp\Message\ConsumerMessage;
|
||||||
use Hyperf\Amqp\Result;
|
use Hyperf\Amqp\Result;
|
||||||
|
use Hyperf\Amqp\Producer;
|
||||||
use PhpAmqpLib\Message\AMQPMessage;
|
use PhpAmqpLib\Message\AMQPMessage;
|
||||||
use Hyperf\Di\Annotation\Inject;
|
use Hyperf\Di\Annotation\Inject;
|
||||||
use Hyperf\DbConnection\Db;
|
use Hyperf\DbConnection\Db;
|
||||||
|
use Hyperf\Context\ApplicationContext;
|
||||||
use Throwable;
|
use Throwable;
|
||||||
|
|
||||||
|
use function Hyperf\Support\env;
|
||||||
|
|
||||||
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", pool: "default_consumer", nums: 1, enable: true)]
|
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", pool: "default_consumer", nums: 1, enable: true)]
|
||||||
class OrderConsumer extends ConsumerMessage
|
class OrderConsumer extends ConsumerMessage
|
||||||
{
|
{
|
||||||
|
|
||||||
/**
|
|
||||||
* 队列参数配置
|
|
||||||
* 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误
|
|
||||||
*
|
|
||||||
* 参数说明:
|
|
||||||
* - x-message-ttl: 消息存活时间(毫秒),24小时 = 86400000ms
|
|
||||||
* - x-dead-letter-exchange: 死信交换机,用于重试机制
|
|
||||||
* - x-dead-letter-routing-key: 死信路由键
|
|
||||||
*/
|
|
||||||
protected array $queueOptions = [
|
|
||||||
'x-message-ttl' => ['I', 86400000], // 24小时 TTL
|
|
||||||
'x-dead-letter-exchange' => ['S', 'dlx.orders'], // 死信交换机
|
|
||||||
'x-dead-letter-routing-key' => ['S', 'retry'], // 死信路由键
|
|
||||||
];
|
|
||||||
|
|
||||||
protected ?array $qos = [
|
protected ?array $qos = [
|
||||||
// AMQP 默认并没有实现此配置。
|
// AMQP 默认并没有实现此配置。
|
||||||
'prefetch_size' => 0,
|
'prefetch_size' => 0,
|
||||||
// 同一个消费者,最高同时可以处理的消息数。
|
// 同一个消费者,最高同时可以处理的消息数。
|
||||||
'prefetch_count' => 100,
|
// @attention 默认值 100, test 设置为 1
|
||||||
|
'prefetch_count' => 1,
|
||||||
// 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。
|
// 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。
|
||||||
'global' => false,
|
'global' => false,
|
||||||
];
|
];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列
|
||||||
|
* 这样才能实现重试队列和错误队列的机制
|
||||||
|
*/
|
||||||
|
protected bool $requeue = false;
|
||||||
|
|
||||||
protected $entityType = 'order';
|
protected $entityType = 'order';
|
||||||
|
|
||||||
#[Inject]
|
#[Inject]
|
||||||
protected EntityParseFactory $entityParseFactory;
|
protected EntityParseFactory $entityParseFactory;
|
||||||
|
|
||||||
|
#[Inject]
|
||||||
|
protected Producer $producer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重写队列构建器,设置队列参数
|
||||||
|
* 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误
|
||||||
|
*
|
||||||
|
* @return QueueBuilder
|
||||||
|
*/
|
||||||
|
public function getQueueBuilder(): QueueBuilder
|
||||||
|
{
|
||||||
|
return (new QueueBuilder())
|
||||||
|
->setQueue($this->getQueue())
|
||||||
|
->setArguments([
|
||||||
|
'x-dead-letter-exchange' => ['S', 'dlx.orders'],
|
||||||
|
'x-dead-letter-routing-key' => ['S', 'retry'],
|
||||||
|
'x-message-ttl' => ['I', 86400000], // 24小时
|
||||||
|
'x-queue-type' => ['S', 'classic'],
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
public function consumeMessage($data, AMQPMessage $message): Result
|
public function consumeMessage($data, AMQPMessage $message): Result
|
||||||
{
|
{
|
||||||
|
// 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态
|
||||||
|
// 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒
|
||||||
|
$debugDelay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
|
||||||
|
if ($debugDelay > 0) {
|
||||||
|
dump("Debug mode: sleeping for {$debugDelay} seconds...");
|
||||||
|
|
||||||
|
sleep($debugDelay);
|
||||||
|
}
|
||||||
|
|
||||||
// dump('---data');
|
// dump('---data');
|
||||||
// dump($data);
|
// dump($data);
|
||||||
@@ -57,7 +84,13 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
dump(json_decode($message->getBody(), true));
|
dump(json_decode($message->getBody(), true));
|
||||||
dump('---');
|
dump('---');
|
||||||
|
|
||||||
|
// 获取重试次数
|
||||||
|
$retryCount = $this->getRetryCount($message);
|
||||||
|
$maxRetries = (int) env('AMQP_MAX_RETRIES', 3);
|
||||||
|
|
||||||
|
dump("Retry count: {$retryCount}/{$maxRetries}");
|
||||||
|
|
||||||
|
try {
|
||||||
$parse = $this->entityParseFactory->createFromMessage($message);
|
$parse = $this->entityParseFactory->createFromMessage($message);
|
||||||
|
|
||||||
// 提取 metadata
|
// 提取 metadata
|
||||||
@@ -74,11 +107,9 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
// message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
|
// message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
|
||||||
$entityMapResult = $parse->entityMap($data['raw_data'] ?? []);
|
$entityMapResult = $parse->entityMap($data['raw_data'] ?? []);
|
||||||
|
|
||||||
|
|
||||||
// $entityMapResult 应该是一个内部元素为 Model 的集合
|
// $entityMapResult 应该是一个内部元素为 Model 的集合
|
||||||
Db::beginTransaction();
|
Db::beginTransaction();
|
||||||
|
|
||||||
try {
|
|
||||||
// 假设 $entityMapResult 为集合 @Collection 对象
|
// 假设 $entityMapResult 为集合 @Collection 对象
|
||||||
$entityMapResult->each(function ($el) use ($entity) {
|
$entityMapResult->each(function ($el) use ($entity) {
|
||||||
$clone = clone $entity;
|
$clone = clone $entity;
|
||||||
@@ -92,7 +123,26 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
return Result::ACK;
|
return Result::ACK;
|
||||||
} catch (Throwable $error) {
|
} catch (Throwable $error) {
|
||||||
dump($error->getMessage());
|
dump($error->getMessage());
|
||||||
|
Log::get()->error('Consumer processing failed', [
|
||||||
|
'error' => $error->getMessage(),
|
||||||
|
'retry_count' => $retryCount,
|
||||||
|
'max_retries' => $maxRetries,
|
||||||
|
]);
|
||||||
Db::rollBack();
|
Db::rollBack();
|
||||||
|
|
||||||
|
// 检查是否超过最大重试次数
|
||||||
|
if ($retryCount >= $maxRetries) {
|
||||||
|
// 超过重试次数,发送到错误队列
|
||||||
|
dump("Max retries exceeded ({$retryCount}/{$maxRetries}), sending to error queue...");
|
||||||
|
$this->sendToErrorQueue($message, $error);
|
||||||
|
|
||||||
|
// 返回 ACK 避免消息再次重试
|
||||||
|
// 因为消息已经被发送到错误队列,不应该继续在主队列中循环
|
||||||
|
return Result::ACK;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列
|
||||||
|
dump("Retry {$retryCount}/{$maxRetries}, sending to retry queue...");
|
||||||
return Result::NACK;
|
return Result::NACK;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -101,4 +151,95 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
{
|
{
|
||||||
return parent::isEnable();
|
return parent::isEnable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取消息的重试次数
|
||||||
|
* 通过检查 x-death header 中的 count 字段
|
||||||
|
*
|
||||||
|
* @param AMQPMessage $message
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
protected function getRetryCount(AMQPMessage $message): int
|
||||||
|
{
|
||||||
|
$headers = $message->get('application_headers');
|
||||||
|
if (!$headers) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
$headerData = $headers->getNativeData();
|
||||||
|
$xDeath = $headerData['x-death'] ?? [];
|
||||||
|
|
||||||
|
if (empty($xDeath)) {
|
||||||
|
return 0; // 首次失败
|
||||||
|
}
|
||||||
|
|
||||||
|
// x-death 是一个数组,第一个元素包含 count 字段
|
||||||
|
// count 表示消息从该队列死信的次数
|
||||||
|
return $xDeath[0]['count'] ?? 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送消息到错误队列
|
||||||
|
* 当重试次数超过上限时调用
|
||||||
|
*
|
||||||
|
* @param AMQPMessage $message 原始消息
|
||||||
|
* @param Throwable $error 错误信息
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
$originalData = json_decode($message->getBody(), true);
|
||||||
|
|
||||||
|
// 构建错误消息
|
||||||
|
$errorMessage = [
|
||||||
|
'error_id' => uniqid('err_', true),
|
||||||
|
'original_message' => $originalData,
|
||||||
|
'error' => [
|
||||||
|
'type' => get_class($error),
|
||||||
|
'message' => $error->getMessage(),
|
||||||
|
'trace' => $error->getTraceAsString(),
|
||||||
|
'timestamp' => date('c'),
|
||||||
|
],
|
||||||
|
'metadata' => [
|
||||||
|
'platform' => $originalData['platform'] ?? 'unknown',
|
||||||
|
'platform_id' => $originalData['meta']['platform_id'] ?? null,
|
||||||
|
'company_id' => $originalData['meta']['company_id'] ?? null,
|
||||||
|
'store_id' => $originalData['meta']['store_id'] ?? null,
|
||||||
|
'data_type' => $originalData['data_type'] ?? 'unknown',
|
||||||
|
'failed_at' => date('c'),
|
||||||
|
'retry_count' => $this->getRetryCount($message),
|
||||||
|
],
|
||||||
|
];
|
||||||
|
|
||||||
|
// 直接发布到 errors.queue
|
||||||
|
// 注意:这里直接发送到队列,不经过 exchange
|
||||||
|
$this->producer->produce(
|
||||||
|
new class($errorMessage) extends \Hyperf\Amqp\Message\ProducerMessage {
|
||||||
|
protected string $exchange = '';
|
||||||
|
protected string|array $routingKey = 'errors.queue';
|
||||||
|
protected array $properties = [
|
||||||
|
'delivery_mode' => 2, // 持久化
|
||||||
|
];
|
||||||
|
|
||||||
|
public function __construct(array $data)
|
||||||
|
{
|
||||||
|
$this->payload = $data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
Log::get()->warning('Message sent to error queue after exceeding retry limit', [
|
||||||
|
'error_id' => $errorMessage['error_id'],
|
||||||
|
'retry_count' => $errorMessage['metadata']['retry_count'],
|
||||||
|
'error_message' => $error->getMessage(),
|
||||||
|
]);
|
||||||
|
} catch (Throwable $e) {
|
||||||
|
// 发送到错误队列失败,记录日志
|
||||||
|
Log::get()->error('Failed to send message to error queue', [
|
||||||
|
'error' => $e->getMessage(),
|
||||||
|
'original_error' => $error->getMessage(),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,10 +74,11 @@ class OrderProducer extends ProducerMessage
|
|||||||
'timestamp' => date('c'), // ISO 8601 格式
|
'timestamp' => date('c'), // ISO 8601 格式
|
||||||
'platform' => 'tmall',
|
'platform' => 'tmall',
|
||||||
'data_type' => 'order',
|
'data_type' => 'order',
|
||||||
'metadata' => [
|
'meta' => [
|
||||||
'platform_id' => $data['platform_id'] ?? null,
|
'platform_id' => $data['platform_id'] ?? null,
|
||||||
'company_id' => $data['company_id'] ?? null,
|
'company_id' => $data['company_id'] ?? null,
|
||||||
'store_id' => $data['store_id'] ?? null,
|
'store_id' => $data['store_id'] ?? null,
|
||||||
|
'unique_id' => $data['unique_id'] ?? null,
|
||||||
'source_system' => 'tmall-open-api',
|
'source_system' => 'tmall-open-api',
|
||||||
'retry_count' => 0,
|
'retry_count' => 0,
|
||||||
'data_version' => $data['data_version'] ?? time(),
|
'data_version' => $data['data_version'] ?? time(),
|
||||||
|
|||||||
+12
-12
@@ -121,21 +121,17 @@ for dtype in "${DATA_TYPES[@]}"; do
|
|||||||
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
|
echo "✓ 绑定: main.exchange → ${dtype}.queue (routing_key: ${dtype%s}.#)"
|
||||||
done
|
done
|
||||||
|
|
||||||
# 8. 绑定 DLX 到重试队列和错误队列
|
# 8. 绑定 DLX 到重试队列
|
||||||
echo ""
|
echo ""
|
||||||
echo "绑定 DLX 到重试队列和错误队列..."
|
echo "绑定 DLX 到重试队列..."
|
||||||
for dtype in "${DATA_TYPES[@]}"; do
|
for dtype in "${DATA_TYPES[@]}"; do
|
||||||
# DLX -> 重试队列
|
# DLX -> 重试队列
|
||||||
|
# 注意:方案B中,超过重试次数的消息由 Consumer 直接发送到 errors.queue
|
||||||
|
# 不使用 DLX 的 routing_key="error" 路由,因此不需要绑定到 errors.queue
|
||||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||||
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
|
declare binding --source "dlx.${dtype}" --destination "${dtype}.retry.queue" \
|
||||||
--destination-type queue --vhost "$VHOST" --routing-key "retry"
|
--destination-type queue --vhost "$VHOST" --routing-key "retry"
|
||||||
echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)"
|
echo "✓ 绑定: dlx.${dtype} → ${dtype}.retry.queue (routing_key: retry)"
|
||||||
|
|
||||||
# DLX -> 错误队列
|
|
||||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
|
||||||
declare binding --source "dlx.${dtype}" --destination "errors.queue" \
|
|
||||||
--destination-type queue --vhost "$VHOST" --routing-key "error"
|
|
||||||
echo "✓ 绑定: dlx.${dtype} → errors.queue (routing_key: error)"
|
|
||||||
done
|
done
|
||||||
|
|
||||||
# 9. 为每个平台创建 Exchange 和 Binding
|
# 9. 为每个平台创建 Exchange 和 Binding
|
||||||
@@ -206,12 +202,16 @@ rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ
|
|||||||
declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags ""
|
declare user --name "user_dataflow_consumer" --password "change_me_consumer" --tags ""
|
||||||
echo "✓ 创建用户: user_dataflow_consumer"
|
echo "✓ 创建用户: user_dataflow_consumer"
|
||||||
|
|
||||||
|
# Consumer 需要完整的权限来声明和消费队列
|
||||||
|
# - configure: 允许声明 main.exchange、DLX exchanges 和所有队列
|
||||||
|
# - write: 允许向业务队列写入(ACK/NACK)、DLX 和 errors exchange
|
||||||
|
# - read: 允许读取 main.exchange、业务队列和 DLX exchanges
|
||||||
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
rabbitmqadmin -H $RABBITMQ_HOST -P $RABBITMQ_PORT -u $RABBITMQ_USER -p $RABBITMQ_PASS \
|
||||||
declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \
|
declare permissions --vhost "$VHOST" --user "user_dataflow_consumer" \
|
||||||
--configure "^(orders|products|refunds|inventory).*\\.queue$" \
|
--configure "^(main\\.exchange|dlx\\..*)|(.*\\.queue)$" \
|
||||||
--write "(dlx\\..*)|(.*\\.errors\\.exchange)$" \
|
--write "^(orders|products|refunds|inventory).*\\.queue$|(dlx\\..*)|(.*\\.errors\\.exchange)$" \
|
||||||
--read "^(orders|products|refunds|inventory).*\\.queue$"
|
--read "^(main\\.exchange|(orders|products|refunds|inventory).*\\.queue|dlx\\..*)$"
|
||||||
echo "✓ 配置用户权限"
|
echo "✓ 配置用户权限 (configure: main.exchange+DLX+queues, write: business queues+DLX+errors, read: main.exchange+business queues+DLX)"
|
||||||
|
|
||||||
# 11. 创建运维监控用户
|
# 11. 创建运维监控用户
|
||||||
echo ""
|
echo ""
|
||||||
|
|||||||
Reference in New Issue
Block a user