update entiity parse

This commit is contained in:
2025-11-27 15:03:25 +08:00
parent 0d634a2ca9
commit e9068ac73d
7 changed files with 569 additions and 122 deletions
@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace App\Command; namespace App\Command;
use Hyperf\Collection\LazyCollection;
use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command; use Hyperf\Command\Annotation\Command;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
@@ -26,39 +27,45 @@ class AppMessageQueuePushTmall extends HyperfCommand
$this->setDescription('Test push message with Tmall km data'); $this->setDescription('Test push message with Tmall km data');
} }
public function handle() public function handle(): void
{ {
try { try {
// 从 raw 数据库连接获取数据 // 从 raw 数据库连接获取数据
$orders = Db::connection('raw') $orders = Db::connection('raw')
->table('wpic_taobao_order') ->table('wpic_taobao_order')->orderBy('id', 'desc')
->orderBy('id', 'desc') ->limit(10)->get('order_raw')->lazy();
->limit(10)
->get()
->toArray();
if (empty($orders)) { // dump($orders->first());
// return;
if ($orders->isEmpty()) {
$this->warn('No orders found in wpic_taobao_order table'); $this->warn('No orders found in wpic_taobao_order table');
return 0; return;
} }
$this->info(sprintf('Found %d orders, processing...', count($orders))); $this->info(sprintf('Found %d orders, processing...', $orders->count()));
// 获取 Producer 实例 // 获取 Producer 实例
$producer = $this->container->get(Producer::class); $producer = $this->container->get(Producer::class);
// 每 2 条记录组成一条消息 // 每 2 条记录组成一条消息 - 实际生产环境需要增大这个值
$chunks = array_chunk($orders, 2); // $orders->chunk(2)->each(function($collection) use ($producer) {
$messageCount = 0; $messageCount = 0;
foreach ($chunks as $index => $chunk) { $orders->chunk(2)->each(function (LazyCollection $collection) use ($producer, &$messageCount) {
// 构造消息数据(根据实际表结构调整字段映射)
$order_data = $collection->pluck('order_raw')->map(function ($item) {
return json_decode($item, true);
})->toArray();
//@ATTENTION 生产环境需要注意, 暂时使用 KM 进行测试
$messageData = [ $messageData = [
'company_id' => $chunk[0]->company_id ?? 'default_company', 'company_id' => 188,
'platform_id' => $chunk[0]->platform_id ?? 'tmall', 'platform_id' => 2,
'store_id' => $chunk[0]->store_id ?? 'default_store', 'store_id' => 292,
'unique_id' => implode('_', array_column($chunk, 'id')), 'unique_id' => uniqid() . '_' . time(),
'raw_data' => $chunk, // 包含 2 条原始记录 'raw_data' => $order_data, // 包含 2 条原始记录
]; ];
// 创建并发送消息 // 创建并发送消息
@@ -66,19 +73,21 @@ class AppMessageQueuePushTmall extends HyperfCommand
$producer->produce($message); $producer->produce($message);
$messageCount++; $messageCount++;
$this->line(sprintf('Sent message %d with order IDs: %s', $this->line(sprintf('Sent message %d with order IDs: %s',
$messageCount, $messageCount,
$messageData['unique_id'] $messageData['unique_id'],
)); ));
} });
$this->info(sprintf('Successfully sent %d messages to RabbitMQ', $messageCount)); $this->info(sprintf('Successfully sent %d messages to RabbitMQ', $messageCount));
return 0; return;
} catch (Exception $e) { } catch (Exception $e) {
$this->error('Error pushing messages: ' . $e->getMessage()); $this->error('Error pushing messages: ' . $e->getMessage());
$this->error($e->getTraceAsString()); $this->error($e->getTraceAsString());
return 1; return;
} }
} }
} }
+125
View File
@@ -0,0 +1,125 @@
<?php
declare(strict_types=1);
namespace App\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Psr\Container\ContainerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Hyperf\Contract\ConfigInterface;
use Symfony\Component\Console\Input\InputArgument;
#[Command]
class AppMqClear extends HyperfCommand
{
public function __construct(protected ContainerInterface $container)
{
parent::__construct('app:mq:clear');
}
public function configure()
{
parent::configure();
$this->setDescription('Clear all messages from a specified queue (development mode only)');
$this->addArgument('queue', InputArgument::REQUIRED, 'The queue name to clear');
$this->addOption('force', 'f', null, 'Force clear without confirmation');
}
public function handle()
{
$queueName = $this->input->getArgument('queue');
$force = $this->input->getOption('force');
$this->warn("You are about to clear all messages from queue: {$queueName}");
// 如果不是强制模式,需要确认
if (!$force) {
$confirm = $this->confirm('Are you sure you want to continue?', false);
if (!$confirm) {
$this->info('Operation cancelled.');
return 0;
}
}
try {
$config = $this->container->get(ConfigInterface::class);
$consumerConfig = $config->get('amqp.default_consumer');
// 创建连接
$connection = new AMQPStreamConnection(
$consumerConfig['host'],
$consumerConfig['port'],
$consumerConfig['user'],
$consumerConfig['password'],
$consumerConfig['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
$consumerConfig['params']['connection_timeout'] ?? 3.0,
$consumerConfig['params']['read_write_timeout'] ?? 3.0,
null,
$consumerConfig['params']['keepalive'] ?? false,
$consumerConfig['params']['heartbeat'] ?? 0
);
$channel = $connection->channel();
// 先检查队列是否存在并获取当前消息数
try {
[$queue, $messageCount, $consumerCount] = $channel->queue_declare(
$queueName,
true, // passive - 只检查,不创建
true, // durable
false, // exclusive
false // auto_delete
);
$this->line("Queue '{$queueName}' has {$messageCount} messages before clearing.", 'info');
if ($messageCount === 0) {
$this->info('Queue is already empty. Nothing to clear.');
$channel->close();
$connection->close();
return 0;
}
// 清除队列中的所有消息
$channel->queue_purge($queueName);
// 再次检查确认已清除
[$queue, $remainingCount, $consumerCount] = $channel->queue_declare(
$queueName,
true, // passive
true, // durable
false, // exclusive
false // auto_delete
);
$this->line('');
$this->info("Successfully cleared {$messageCount} messages from queue '{$queueName}'.");
$this->line("Remaining messages: {$remainingCount}", 'comment');
} catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
$this->error("Queue '{$queueName}' does not exist.");
$this->line('Available queues can be found using: php bin/hyperf.php app:mq:status', 'comment');
$channel->close();
$connection->close();
return 1;
}
// 关闭连接
$channel->close();
$connection->close();
return 0;
} catch (\Exception $e) {
$this->error('Failed to clear queue: ' . $e->getMessage());
$this->line('Trace: ' . $e->getTraceAsString(), 'comment');
return 1;
}
}
}
+189
View File
@@ -0,0 +1,189 @@
<?php
declare(strict_types=1);
namespace App\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Psr\Container\ContainerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Hyperf\Contract\ConfigInterface;
#[Command]
class AppMqStatus extends HyperfCommand
{
public function __construct(protected ContainerInterface $container)
{
parent::__construct('app:mq:status');
}
public function configure()
{
parent::configure();
$this->setDescription('Display message counts for all accessible queues');
$this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds (default: 3)');
$this->addOption('interval', 'i', \Symfony\Component\Console\Input\InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3');
}
public function handle()
{
$watchMode = $this->input->getOption('watch');
$interval = (int) $this->input->getOption('interval');
if ($watchMode) {
$this->info("Watch mode enabled. Refreshing every {$interval} seconds. Press Ctrl+C to exit.");
$this->line('');
while (true) {
// 清屏(对于支持 ANSI 的终端)
$this->output->write("\033[2J\033[H");
$this->displayQueueStatus();
sleep($interval);
}
} else {
return $this->displayQueueStatus();
}
}
private function displayQueueStatus(): int
{
$this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info');
try {
$config = $this->container->get(ConfigInterface::class);
$consumerConfig = $config->get('amqp.default_consumer');
// 创建连接
$connection = new AMQPStreamConnection(
$consumerConfig['host'],
$consumerConfig['port'],
$consumerConfig['user'],
$consumerConfig['password'],
$consumerConfig['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
$consumerConfig['params']['connection_timeout'] ?? 3.0,
$consumerConfig['params']['read_write_timeout'] ?? 3.0,
null,
$consumerConfig['params']['keepalive'] ?? false,
$consumerConfig['params']['heartbeat'] ?? 0
);
$channel = $connection->channel();
// 获取所有队列信息
$queues = $this->getQueuesFromAnnotations();
$tableData = [];
$totalMessages = 0;
foreach ($queues as $queueName) {
try {
// 使用 passive=true 来获取队列信息而不创建队列
// queue_declare 返回 [queue_name, message_count, consumer_count]
[$queue, $messageCount, $consumerCount] = $channel->queue_declare(
$queueName,
true, // passive
true, // durable
false, // exclusive
false // auto_delete
);
$tableData[] = [
'queue' => $queueName,
'messages' => $messageCount,
'consumers' => $consumerCount,
'status' => $messageCount > 0 ? '<fg=yellow>Has Messages</>' : '<fg=green>Empty</>',
];
$totalMessages += $messageCount;
} catch (\Exception $e) {
$tableData[] = [
'queue' => $queueName,
'messages' => 'N/A',
'consumers' => 'N/A',
'status' => '<fg=red>Error: ' . $e->getMessage() . '</>',
];
}
}
// 关闭连接
$channel->close();
$connection->close();
// 显示表格
if (empty($tableData)) {
$this->warn('No queues found.');
return 0;
}
$this->table(
['Queue Name', 'Messages', 'Consumers', 'Status'],
array_map(function($row) {
return [$row['queue'], $row['messages'], $row['consumers'], $row['status']];
}, $tableData)
);
$this->line('');
$this->info("Total messages across all queues: {$totalMessages}");
return 0;
} catch (\Exception $e) {
$this->error('Failed to fetch queue status: ' . $e->getMessage());
$this->line('Trace: ' . $e->getTraceAsString(), 'comment');
return 1;
}
}
/**
* 扫描所有使用 Consumer 注解的类,获取队列名称
*/
private function getQueuesFromAnnotations(): array
{
$queues = [];
// 扫描 Platform 目录下的所有 Consumer
$platformPath = BASE_PATH . '/app/Platform';
if (is_dir($platformPath)) {
$this->scanDirectory($platformPath, $queues);
}
// 如果没有找到任何队列,返回一些默认的队列名称
if (empty($queues)) {
$queues = ['orders.queue'];
}
return array_unique($queues);
}
/**
* 递归扫描目录,查找包含 Consumer 注解的类
*/
private function scanDirectory(string $path, array &$queues): void
{
$files = scandir($path);
foreach ($files as $file) {
if ($file === '.' || $file === '..') {
continue;
}
$filePath = $path . '/' . $file;
if (is_dir($filePath)) {
$this->scanDirectory($filePath, $queues);
} elseif (is_file($filePath) && pathinfo($filePath, PATHINFO_EXTENSION) === 'php') {
$content = file_get_contents($filePath);
// 查找 Consumer 注解中的 queue 参数
if (preg_match('/#\[Consumer\([^)]*queue:\s*["\']([^"\']+)["\']/', $content, $matches)) {
$queues[] = $matches[1];
}
}
}
}
}
+70 -35
View File
@@ -9,7 +9,7 @@ use App\Model\Company;
use App\Model\Platform; use App\Model\Platform;
use App\Model\Store; use App\Model\Store;
use App\Entity\Parse\Traits\EntityParseHelper; use App\Entity\Parse\Traits\EntityParseHelper;
use Hyperf\Amqp\Message\ConsumerMessageInterface; use Hyperf\Collection\LazyCollection;
use InvalidArgumentException; use InvalidArgumentException;
/** /**
@@ -18,13 +18,13 @@ use InvalidArgumentException;
* 使用工厂方法模式 + 延迟初始化 * 使用工厂方法模式 + 延迟初始化
* 提供消息解析的通用框架和默认实现 * 提供消息解析的通用框架和默认实现
* *
* @method static static create(ConsumerMessageInterface $message) * @method static static create(array $data)
*/ */
abstract class EntityParse implements EntityParseInterface abstract class EntityParse implements EntityParseInterface
{ {
use EntityParseHelper; use EntityParseHelper;
protected ConsumerMessageInterface $message; protected array $data;
protected ?Platform $platform = null; protected ?Platform $platform = null;
protected ?Company $company = null; protected ?Company $company = null;
protected ?Store $store = null; protected ?Store $store = null;
@@ -41,30 +41,67 @@ abstract class EntityParse implements EntityParseInterface
/** /**
* 工厂方法:创建解析器实例 * 工厂方法:创建解析器实例
* *
* @param ConsumerMessageInterface $message * @param array $data
* @return static * @return static
* @throws InvalidArgumentException * @throws InvalidArgumentException
*/ */
public static function create(ConsumerMessageInterface $message): static public static function create(array $data): static
{ {
$instance = new static(); $instance = new static();
$instance->message = $message; $instance->data = $data;
// 在初始化前先验证数据
if (!$instance->messageValidate($data)) {
throw new InvalidArgumentException('Message validation failed: required fields missing');
}
$instance->initialize(); $instance->initialize();
return $instance; return $instance;
} }
/**
* 消息数据验证
*
* 默认实现:验证必需字段
* 子类可以覆写以添加自定义验证逻辑
*
* @param array $data
* @return bool
*/
public function messageValidate(array $data): bool
{
// 验证必需字段
$requiredFields = ['company_id', 'platform_id', 'store_id', 'unique_id', 'raw_data'];
foreach ($requiredFields as $field) {
if (!isset($data[$field])) {
return false;
}
}
return true;
}
/** /**
* 延迟初始化 * 延迟初始化
* 在 message 设置后执行作用域匹配和验证 * 在 data 设置后执行作用域匹配和验证
* *
* @return void * @return void
* @throws InvalidArgumentException * @throws InvalidArgumentException
*/ */
protected function initialize(): void protected function initialize(): void
{ {
$this->platform = $this->platformScopeMatch($this->message); // 提取 metadata
$this->company = $this->companyScopeMatch($this->message); $metadata = [
$this->store = $this->storeScopeMatch($this->message); 'company_id' => $this->data['company_id'],
'platform_id' => $this->data['platform_id'],
'store_id' => $this->data['store_id'],
'unique_id' => $this->data['unique_id'],
];
$this->platform = $this->platformScopeMatch($metadata);
$this->company = $this->companyScopeMatch($metadata);
$this->store = $this->storeScopeMatch($metadata);
$this->validateScope(); $this->validateScope();
} }
@@ -93,47 +130,46 @@ abstract class EntityParse implements EntityParseInterface
/** /**
* 平台作用域匹配 - 提供默认实现 * 平台作用域匹配 - 提供默认实现
* *
* 从 exchange 名称中提取平台信息 * 从 metadata 中提取平台信息
* 子类可以覆盖此方法以实现自定义逻辑 * 子类可以覆盖此方法以实现自定义逻辑
* *
* @param ConsumerMessageInterface $message * @param array $metadata
* @return Platform * @return Platform
* @throws InvalidArgumentException * @throws InvalidArgumentException
*/ */
public function platformScopeMatch(ConsumerMessageInterface $message): Platform public function platformScopeMatch(array $metadata): Platform
{ {
return $this->extractPlatformFromExchange($message); return $this->extractPlatformFromMetadata($metadata);
} }
/** /**
* 实体匹配 - 提供默认实现 * 实体匹配 - 提供默认实现
* *
* 从 routing key 中提取实体类型 * 从 metadata 中提取实体类型
* 子类可以覆盖此方法以实现自定义逻辑 * 子类可以覆盖此方法以实现自定义逻辑
* *
* @param ConsumerMessageInterface $message * @param array $metadata
* @return Entity * @return Entity
* @throws InvalidArgumentException * @throws InvalidArgumentException
*/ */
public function entityMatch(ConsumerMessageInterface $message): Entity public function entityMatch(array $metadata): Entity
{ {
return $this->extractEntityFromRoutingKey($message); return $this->extractEntityFromMetadata($metadata);
} }
/** /**
* 唯一标识符提取 - 提供默认实现 * 唯一标识符提取 - 提供默认实现
* *
* 默认从消息体中提取 id 或 unique_id * 默认从 metadata 中提取 unique_id
* 子类可以覆盖此方法以实现自定义逻辑 * 子类可以覆盖此方法以实现自定义逻辑
* *
* @param ConsumerMessageInterface $message * @param array $metadata
* @return string|int * @return string|int
* @throws InvalidArgumentException * @throws InvalidArgumentException
*/ */
public function entityUniqueIdentifierExtract(ConsumerMessageInterface $message): string|int public function entityUniqueIdentifierExtract(array $metadata): string|int
{ {
$data = $this->extractMessageData($message); return $metadata['unique_id'] ?? throw new InvalidArgumentException('unique_id not found in metadata');
return $this->extractUniqueIdentifier($data);
} }
/** /**
@@ -141,42 +177,41 @@ abstract class EntityParse implements EntityParseInterface
* *
* 必须由子类实现,因为不同平台的公司识别逻辑不同 * 必须由子类实现,因为不同平台的公司识别逻辑不同
* *
* @param ConsumerMessageInterface $message * @param array $metadata
* @return Company * @return Company
*/ */
abstract public function companyScopeMatch(ConsumerMessageInterface $message): Company; abstract public function companyScopeMatch(array $metadata): Company;
/** /**
* 店铺作用域匹配 - 抽象方法 * 店铺作用域匹配 - 抽象方法
* *
* 必须由子类实现,因为不同平台的店铺识别逻辑不同 * 必须由子类实现,因为不同平台的店铺识别逻辑不同
* *
* @param ConsumerMessageInterface $message * @param array $metadata
* @return Store * @return Store
*/ */
abstract public function storeScopeMatch(ConsumerMessageInterface $message): Store; abstract public function storeScopeMatch(array $metadata): Store;
/** /**
* 实体数据映射 - 抽象方法 * 实体数据映射 - 抽象方法
* *
* 必须由子类实现,因为不同平台的数据结构不同 * 必须由子类实现,因为不同平台的数据结构不同
* *
* @param array $data * @param array $rawData
* @param Entity $entity * @return LazyCollection
* @return Entity
*/ */
abstract public function entityMap(array $data, Entity $entity): Entity; abstract public function entityMap(array $rawData): LazyCollection;
// ==================== Getter 方法 ==================== // ==================== Getter 方法 ====================
/** /**
* 获取消息对象 * 获取消息数据
* *
* @return ConsumerMessageInterface * @return array
*/ */
public function getMessage(): ConsumerMessageInterface public function getData(): array
{ {
return $this->message; return $this->data;
} }
/** /**
+61 -36
View File
@@ -9,6 +9,7 @@ use Hyperf\Contract\ConfigInterface;
use Hyperf\Stringable\Str; use Hyperf\Stringable\Str;
use InvalidArgumentException; use InvalidArgumentException;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
use PhpAmqpLib\Message\AMQPMessage;
/** /**
* EntityParseFactory 工厂类 * EntityParseFactory 工厂类
@@ -102,66 +103,90 @@ class EntityParseFactory
/** /**
* 根据消息自动创建 Parser 实例(静态方法) * 根据消息自动创建 Parser 实例(静态方法)
* *
* @param ConsumerMessageInterface $message * @param AMQPMessage $message
* @param string|null $entityType 实体类型(可选,如果不指定则从 routing key 中提取) * @param string|null $entityType 实体类型(可选,如果不指定则从 payload 中提取)
* @return EntityParseInterface * @return EntityParseInterface
* @throws InvalidArgumentException * @throws InvalidArgumentException
*/ */
public static function createFromMessage( public static function createFromMessage(
ConsumerMessageInterface $message, AMQPMessage $message,
?string $entityType = null ?string $entityType = null
): EntityParseInterface { ): EntityParseInterface {
// 1. 从 exchange 中提取平台名称 // 1. 从消息体中解析数据
$platformName = self::extractPlatformName($message); $data = json_decode($message->getBody(), true);
// 2. 如果未指定实体类型,从 routing key 中提取 if (!is_array($data)) {
if ($entityType === null) { throw new InvalidArgumentException('Invalid message body: expected JSON array');
$entityType = self::extractEntityType($message);
} }
// 3. 获取对应的 Parser 类 // 2. 从 data 中提取平台名称
$platformName = self::extractPlatformNameFromData($data);
// 3. 如果未指定实体类型,从 data 或 routing key 中提取
if ($entityType === null) {
$entityType = self::extractEntityTypeFromData($data, $message);
}
// 4. 获取对应的 Parser 类
$parserClass = self::resolveParserClass($platformName, $entityType); $parserClass = self::resolveParserClass($platformName, $entityType);
// 4. 创建并返回 Parser 实例 // 5. 创建并返回 Parser 实例,传递解析后的数据
return $parserClass::create($message); return $parserClass::create($data);
} }
/** /**
* 从 exchange 中提取平台名称 * 从数据中提取平台名称
* *
* 规则:exchange 格式为 "platform.exchange" * @param array $data
* 例如:tmall.exchange -> tmall
*
* @param ConsumerMessageInterface $message
* @return string * @return string
*/ */
private static function extractPlatformName(ConsumerMessageInterface $message): string private static function extractPlatformNameFromData(array $data): string
{ {
$exchange = $message->getExchange(); // 优先从 platform_id 获取,如果没有则尝试从其他字段获取
if (isset($data['platform_id'])) {
$platformName = Str::of($exchange) // 这里可以根据 platform_id 映射到平台名称
->before('.') // 简化处理:假设需要从数据库或配置中查找
->lower() // 当前先直接使用 platform_id 作为平台名称
->toString(); return self::resolvePlatformName($data['platform_id']);
if (empty($platformName)) {
throw new InvalidArgumentException("Cannot extract platform name from exchange: {$exchange}");
} }
return $platformName; throw new InvalidArgumentException("Cannot extract platform name from data: platform_id missing");
} }
/** /**
* 从 routing key 中提取实体类型 * 根据 platform_id 解析平台名称
* *
* 规则:routing key 格式为 "entity.action" * @param mixed $platformId
* 例如:order.create -> order
*
* @param ConsumerMessageInterface $message
* @return string * @return string
*/ */
private static function extractEntityType(ConsumerMessageInterface $message): string private static function resolvePlatformName($platformId): string
{ {
// TODO: 从配置或数据库中根据 platform_id 查找平台名称
// 临时方案:使用简单的映射
$platformMap = [
1 => 'taobao',
2 => 'tmall',
3 => 'jd',
// ... 其他平台
];
if (isset($platformMap[$platformId])) {
return $platformMap[$platformId];
}
throw new InvalidArgumentException("Unknown platform_id: {$platformId}");
}
/**
* 从数据或路由键中提取实体类型
*
* @param array $data
* @param AMQPMessage $message
* @return string
*/
private static function extractEntityTypeFromData(array $data, AMQPMessage $message): string
{
// 优先从 routing key 中提取
$routingKey = $message->getRoutingKey(); $routingKey = $message->getRoutingKey();
$entityType = Str::of($routingKey) $entityType = Str::of($routingKey)
@@ -169,11 +194,11 @@ class EntityParseFactory
->lower() ->lower()
->toString(); ->toString();
if (empty($entityType)) { if (!empty($entityType)) {
throw new InvalidArgumentException("Cannot extract entity type from routing key: {$routingKey}"); return $entityType;
} }
return $entityType; throw new InvalidArgumentException("Cannot extract entity type from routing key: {$routingKey}");
} }
/** /**
@@ -4,11 +4,11 @@ declare(strict_types=1);
namespace App\Entity\Parse; namespace App\Entity\Parse;
use Hyperf\Amqp\Message\ConsumerMessageInterface;
use App\Model\Company; use App\Model\Company;
use App\Model\Platform; use App\Model\Platform;
use App\Model\Store; use App\Model\Store;
use App\Model\Model as Entity; use App\Model\Model as Entity;
use Hyperf\Collection\LazyCollection;
/** /**
* EntityParseInterface 接口 * EntityParseInterface 接口
@@ -17,61 +17,74 @@ use App\Model\Model as Entity;
*/ */
interface EntityParseInterface interface EntityParseInterface
{ {
/**
* 消息数据验证
*
* 验证消息数据是否包含必需字段
*
* @param array $data
* @return bool
*/
public function messageValidate(array $data): bool;
/** /**
* 公司作用域匹配 * 公司作用域匹配
* *
* @param ConsumerMessageInterface $message * @param array $metadata
* @return Company * @return Company
*/ */
public function companyScopeMatch(ConsumerMessageInterface $message): Company; public function companyScopeMatch(array $metadata): Company;
/** /**
* 平台作用域匹配 * 平台作用域匹配
* *
* @param ConsumerMessageInterface $message * @param array $metadata
* @return Platform * @return Platform
*/ */
public function platformScopeMatch(ConsumerMessageInterface $message): Platform; public function platformScopeMatch(array $metadata): Platform;
/** /**
* 店铺作用域匹配 * 店铺作用域匹配
* *
* @param ConsumerMessageInterface $message * @param array $metadata
* @return Store * @return Store
*/ */
public function storeScopeMatch(ConsumerMessageInterface $message): Store; public function storeScopeMatch(array $metadata): Store;
/** /**
* 实体类型匹配 * 实体类型匹配
* *
* @param ConsumerMessageInterface $message * 根据 metadata 返回实体模板实例
*
* @param array $metadata
* @return Entity * @return Entity
*/ */
public function entityMatch(ConsumerMessageInterface $message): Entity; public function entityMatch(array $metadata): Entity;
/** /**
* 实体数据映射 * 实体数据映射
* *
* @param array $data * 将原始数据转换为可填充到 Model 的数据集合
* @param Entity $entity *
* @return Entity * @param array $rawData
* @return LazyCollection
*/ */
public function entityMap(array $data, Entity $entity): Entity; public function entityMap(array $rawData): LazyCollection;
/** /**
* 提取实体唯一标识符 * 提取实体唯一标识符
* *
* @param ConsumerMessageInterface $message * @param array $metadata
* @return string|int * @return string|int
*/ */
public function entityUniqueIdentifierExtract(ConsumerMessageInterface $message): string|int; public function entityUniqueIdentifierExtract(array $metadata): string|int;
/** /**
* 获取消息对象 * 获取消息数据
* *
* @return ConsumerMessageInterface * @return array
*/ */
public function getMessage(): ConsumerMessageInterface; public function getData(): array;
/** /**
* 获取平台对象 * 获取平台对象
+53 -2
View File
@@ -4,11 +4,14 @@ declare(strict_types=1);
namespace App\Platform; namespace App\Platform;
use Exception; use App\Entity\Parse\EntityParseFactory;
use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result; use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Message\AMQPMessage;
use Hyperf\Di\Annotation\Inject;
use Hyperf\DbConnection\Db;
use Throwable;
#[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", nums: 1, enable: true)] #[Consumer(exchange: "main.exchange", routingKey: "order.#", queue: "orders.queue", nums: 1, enable: true)]
class OrderConsumer extends ConsumerMessage class OrderConsumer extends ConsumerMessage
@@ -25,11 +28,59 @@ class OrderConsumer extends ConsumerMessage
protected $entityType = 'order'; protected $entityType = 'order';
#[Inject]
protected EntityParseFactory $entityParseFactory;
public function consumeMessage($data, AMQPMessage $message): Result public function consumeMessage($data, AMQPMessage $message): Result
{ {
dump($data);
// dump('---data');
// dump($data);
// dump('---');
dump('---message');
dump( json_decode($message->getBody(), true));
dump('---');
$parse = $this->entityParseFactory->createFromMessage($message);
// 提取 metadata
$metadata = [
'company_id' => $data['company_id'] ?? null,
'platform_id' => $data['platform_id'] ?? null,
'store_id' => $data['store_id'] ?? null,
'unique_id' => $data['unique_id'] ?? null,
];
// entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息。
$entity = $parse->entityMatch($metadata);
// message 中包含 raw dataraw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
$entityMapResult = $parse->entityMap($data['raw_data'] ?? []);
// $entityMapResult 应该是一个内部元素为 Model 的集合
Db::beginTransaction();
try {
// 假设 $entityMapResult 为集合 @Collection 对象
$entityMapResult->each(function($el) use ($entity) {
$clone = clone $entity;
$clone->fill($el);
$clone->save();
});
Db::commit();
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
return Result::ACK;
} catch (Throwable $error) {
dump($error->getMessage());
Db::rollBack();
return Result::NACK; return Result::NACK;
} }
}
public function isEnable(): bool public function isEnable(): bool
{ {