2026-02-06 15:33:55 +08:00
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Platform;
|
|
|
|
|
|
|
|
|
|
use App\Entity\Parse\EntityParseFactory;
|
2026-03-13 15:01:55 +08:00
|
|
|
use App\Model\FailedMessage;
|
2026-02-06 15:33:55 +08:00
|
|
|
use App\Utils\Log;
|
|
|
|
|
use Hyperf\Amqp\Annotation\Consumer;
|
|
|
|
|
use Hyperf\Amqp\Builder\QueueBuilder;
|
|
|
|
|
use Hyperf\Amqp\Message\ConsumerMessage;
|
|
|
|
|
use Hyperf\Amqp\Result;
|
|
|
|
|
use Hyperf\Amqp\Producer;
|
|
|
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
|
|
|
use Hyperf\DbConnection\Db;
|
|
|
|
|
use Hyperf\Context\ApplicationContext;
|
|
|
|
|
use Throwable;
|
|
|
|
|
|
|
|
|
|
use function Hyperf\Support\env;
|
|
|
|
|
|
|
|
|
|
#[Consumer(exchange: "main.exchange", routingKey: "product.#", queue: "products.queue", pool: "default_consumer", nums: 1, enable: true)]
|
|
|
|
|
class ProductConsumer extends ConsumerMessage
|
|
|
|
|
{
|
|
|
|
|
protected ?array $qos = [
|
|
|
|
|
'prefetch_size' => 0,
|
|
|
|
|
'prefetch_count' => 1,
|
|
|
|
|
'global' => false,
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列
|
|
|
|
|
*/
|
|
|
|
|
protected bool $requeue = false;
|
|
|
|
|
|
|
|
|
|
protected $entityType = 'product';
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 重写队列构建器,设置队列参数
|
|
|
|
|
*
|
|
|
|
|
* @return QueueBuilder
|
|
|
|
|
*/
|
|
|
|
|
public function getQueueBuilder(): QueueBuilder
|
|
|
|
|
{
|
|
|
|
|
return (new QueueBuilder())
|
|
|
|
|
->setQueue($this->getQueue())
|
|
|
|
|
->setArguments([
|
|
|
|
|
'x-dead-letter-exchange' => ['S', 'dlx.products'],
|
|
|
|
|
'x-dead-letter-routing-key' => ['S', 'retry'],
|
|
|
|
|
'x-message-ttl' => ['I', 86400000], // 24小时
|
|
|
|
|
'x-queue-type' => ['S', 'classic'],
|
|
|
|
|
]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public function consumeMessage($data, AMQPMessage $message): Result
|
|
|
|
|
{
|
|
|
|
|
// 调试延迟
|
|
|
|
|
$debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0);
|
|
|
|
|
if ($debug_delay > 0) {
|
|
|
|
|
dump("Debug mode: sleeping for {$debug_delay} seconds...");
|
|
|
|
|
sleep($debug_delay);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 获取重试次数
|
|
|
|
|
$retry_count = $this->getRetryCount($message);
|
|
|
|
|
$max_retries = (int) env('AMQP_MAX_RETRIES', 3);
|
|
|
|
|
|
|
|
|
|
dump("Product Consumer - Retry count: {$retry_count}/{$max_retries}");
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// EntityParseFactory 使用静态方法调用
|
|
|
|
|
$parse = EntityParseFactory::createFromMessage($message);
|
|
|
|
|
|
|
|
|
|
// 提取 metadata
|
|
|
|
|
$meta = $data['meta'] ?? [];
|
|
|
|
|
$metadata = [
|
|
|
|
|
'company_id' => $meta['company_id'] ?? null,
|
|
|
|
|
'platform_id' => $meta['platform_id'] ?? null,
|
|
|
|
|
'store_id' => $meta['store_id'] ?? null,
|
|
|
|
|
'platform_store_id' => $meta['platform_store_id'] ?? null,
|
|
|
|
|
'unique_id' => $meta['unique_id'] ?? null,
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
// 获取实体模型
|
|
|
|
|
$entity = $parse->entityMatch($metadata);
|
|
|
|
|
|
|
|
|
|
// 映射原始数据
|
|
|
|
|
$entity_map_result = $parse->entityMap($data['data'] ?? []);
|
|
|
|
|
|
|
|
|
|
// 转为数组,准备批量操作
|
|
|
|
|
$data_to_upsert = $entity_map_result->all();
|
|
|
|
|
|
|
|
|
|
dump($entity_map_result->first());
|
|
|
|
|
|
|
|
|
|
if (empty($data_to_upsert)) {
|
|
|
|
|
dump('No data to process');
|
|
|
|
|
return Result::ACK;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dump("Processing " . count($data_to_upsert) . " product(s) with batch upsert");
|
|
|
|
|
|
|
|
|
|
Db::beginTransaction();
|
|
|
|
|
|
|
|
|
|
// 使用 upsert 批量处理产品插入和更新
|
|
|
|
|
$unique_by = $parse->getUniqueBy();
|
|
|
|
|
$entity->newQuery()->upsert(
|
|
|
|
|
$data_to_upsert,
|
|
|
|
|
$unique_by,
|
|
|
|
|
$parse->getUpdateFields()
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Db::commit();
|
|
|
|
|
|
|
|
|
|
dump("Successfully upserted " . count($data_to_upsert) . " product(s)");
|
|
|
|
|
|
|
|
|
|
return Result::ACK;
|
|
|
|
|
|
|
|
|
|
} catch (Throwable $error) {
|
2026-02-09 14:33:02 +08:00
|
|
|
dump($error->getMessage());
|
|
|
|
|
dump($error->getTraceAsString());
|
2026-02-06 15:33:55 +08:00
|
|
|
Log::get()->error('Product consumer processing failed', [
|
|
|
|
|
'error' => $error->getMessage(),
|
|
|
|
|
'retry_count' => $retry_count,
|
|
|
|
|
'max_retries' => $max_retries,
|
|
|
|
|
]);
|
|
|
|
|
Db::rollBack();
|
|
|
|
|
|
|
|
|
|
// 检查是否超过最大重试次数
|
|
|
|
|
if ($retry_count >= $max_retries) {
|
|
|
|
|
dump(">>> MAX RETRIES EXCEEDED! Sending to error queue...");
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
$this->sendToErrorQueue($message, $error);
|
|
|
|
|
dump(">>> Successfully sent to error queue!");
|
|
|
|
|
} catch (Throwable $e) {
|
|
|
|
|
dump(">>> FAILED to send to error queue: " . $e->getMessage());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return Result::ACK;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dump(">>> Retry not exceeded, sending to DLX (NACK)");
|
|
|
|
|
return Result::NACK;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public function isEnable(): bool
|
|
|
|
|
{
|
|
|
|
|
return parent::isEnable();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取消息的重试次数
|
|
|
|
|
*
|
|
|
|
|
* @param AMQPMessage $message
|
|
|
|
|
* @return int
|
|
|
|
|
*/
|
|
|
|
|
protected function getRetryCount(AMQPMessage $message): int
|
|
|
|
|
{
|
|
|
|
|
if (!$message->has('application_headers')) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$headers = $message->get('application_headers');
|
|
|
|
|
if (!$headers) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$headerData = $headers->getNativeData();
|
|
|
|
|
$xDeath = $headerData['x-death'] ?? [];
|
|
|
|
|
|
|
|
|
|
if (empty($xDeath)) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return $xDeath[0]['count'] ?? 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 发送消息到错误队列
|
|
|
|
|
*
|
|
|
|
|
* @param AMQPMessage $message 原始消息
|
|
|
|
|
* @param Throwable $error 错误信息
|
|
|
|
|
* @return void
|
|
|
|
|
*/
|
|
|
|
|
protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void
|
|
|
|
|
{
|
|
|
|
|
try {
|
|
|
|
|
$retry_count = $this->getRetryCount($message);
|
|
|
|
|
|
|
|
|
|
$producer = ApplicationContext::getContainer()->get(Producer::class);
|
|
|
|
|
$error_producer = new ErrorProducer($message, $error, $retry_count);
|
|
|
|
|
$producer->produce($error_producer);
|
|
|
|
|
|
2026-03-13 15:01:55 +08:00
|
|
|
// 同步写入 failed_messages 表
|
|
|
|
|
$this->persistFailedMessage($error_producer->payload);
|
|
|
|
|
|
2026-02-06 15:33:55 +08:00
|
|
|
Log::get()->warning('Product message sent to error queue after exceeding retry limit', [
|
|
|
|
|
'error_id' => $error_producer->payload['error_id'] ?? 'unknown',
|
|
|
|
|
'retry_count' => $retry_count,
|
|
|
|
|
'error_message' => $error->getMessage(),
|
|
|
|
|
]);
|
|
|
|
|
} catch (Throwable $e) {
|
|
|
|
|
dump($e->getMessage());
|
|
|
|
|
Log::get()->error('Failed to send product message to error queue', [
|
|
|
|
|
'error' => $e->getMessage(),
|
|
|
|
|
'original_error' => $error->getMessage(),
|
|
|
|
|
]);
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-03-13 15:01:55 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 持久化失败消息到数据库
|
|
|
|
|
*/
|
|
|
|
|
protected function persistFailedMessage(array $payload): void
|
|
|
|
|
{
|
|
|
|
|
try {
|
|
|
|
|
FailedMessage::query()->create([
|
|
|
|
|
'error_id' => $payload['error_id'],
|
|
|
|
|
'data_type' => $payload['metadata']['data_type'] ?? 'product',
|
|
|
|
|
'platform' => $payload['metadata']['platform'] ?? null,
|
|
|
|
|
'platform_id' => $payload['metadata']['platform_id'] ?? null,
|
|
|
|
|
'company_id' => $payload['metadata']['company_id'] ?? null,
|
|
|
|
|
'store_id' => $payload['metadata']['store_id'] ?? null,
|
|
|
|
|
'error_type' => $payload['error']['type'] ?? 'Unknown',
|
|
|
|
|
'error_message' => $payload['error']['message'] ?? '',
|
|
|
|
|
'error_code' => $payload['error']['code'] ?? 0,
|
|
|
|
|
'error_trace' => $payload['error']['trace'] ?? '',
|
|
|
|
|
'original_message' => $payload['original_message'] ?? [],
|
|
|
|
|
'retry_count' => $payload['metadata']['retry_count'] ?? 0,
|
|
|
|
|
'message_id' => $payload['metadata']['message_id'] ?? null,
|
|
|
|
|
'failed_at' => $payload['metadata']['failed_at'] ?? date('c'),
|
|
|
|
|
]);
|
|
|
|
|
} catch (Throwable $e) {
|
|
|
|
|
Log::get()->error('Failed to persist failed message to database', [
|
|
|
|
|
'error' => $e->getMessage(),
|
|
|
|
|
'error_id' => $payload['error_id'] ?? 'unknown',
|
|
|
|
|
]);
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-02-06 15:33:55 +08:00
|
|
|
}
|