update entity parse
This commit is contained in:
@@ -9,9 +9,9 @@ use App\Model\Company;
|
|||||||
use App\Model\Platform;
|
use App\Model\Platform;
|
||||||
use App\Model\Store;
|
use App\Model\Store;
|
||||||
use App\Entity\Parse\Traits\EntityParseHelper;
|
use App\Entity\Parse\Traits\EntityParseHelper;
|
||||||
use Hyperf\Amqp\Message\ConsumerMessageInterface;
|
|
||||||
use Hyperf\Collection\LazyCollection;
|
use Hyperf\Collection\LazyCollection;
|
||||||
use InvalidArgumentException;
|
use InvalidArgumentException;
|
||||||
|
use PhpAmqpLib\Message\AMQPMessage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* EntityParse 抽象类
|
* EntityParse 抽象类
|
||||||
@@ -19,14 +19,14 @@ use InvalidArgumentException;
|
|||||||
* 使用工厂方法模式 + 延迟初始化
|
* 使用工厂方法模式 + 延迟初始化
|
||||||
* 提供消息解析的通用框架和默认实现
|
* 提供消息解析的通用框架和默认实现
|
||||||
*
|
*
|
||||||
* @method static static create(array $data, ConsumerMessageInterface $message)
|
* @method static static create(array $data, AMQPMessage $message)
|
||||||
*/
|
*/
|
||||||
abstract class EntityParse implements EntityParseInterface
|
abstract class EntityParse implements EntityParseInterface
|
||||||
{
|
{
|
||||||
use EntityParseHelper;
|
use EntityParseHelper;
|
||||||
|
|
||||||
protected array $data;
|
protected array $data;
|
||||||
protected ConsumerMessageInterface $message;
|
protected AMQPMessage $message;
|
||||||
protected ?Platform $platform = null;
|
protected ?Platform $platform = null;
|
||||||
protected ?Company $company = null;
|
protected ?Company $company = null;
|
||||||
protected ?Store $store = null;
|
protected ?Store $store = null;
|
||||||
@@ -52,11 +52,11 @@ abstract class EntityParse implements EntityParseInterface
|
|||||||
* 工厂方法:创建解析器实例
|
* 工厂方法:创建解析器实例
|
||||||
*
|
*
|
||||||
* @param array $data
|
* @param array $data
|
||||||
* @param ConsumerMessageInterface $message
|
* @param AMQPMessage $message
|
||||||
* @return static
|
* @return static
|
||||||
* @throws InvalidArgumentException
|
* @throws InvalidArgumentException
|
||||||
*/
|
*/
|
||||||
public static function create(array $data, ConsumerMessageInterface $message): static
|
public static function create(array $data, AMQPMessage $message): static
|
||||||
{
|
{
|
||||||
$instance = new static();
|
$instance = new static();
|
||||||
$instance->data = $data;
|
$instance->data = $data;
|
||||||
@@ -82,15 +82,24 @@ abstract class EntityParse implements EntityParseInterface
|
|||||||
*/
|
*/
|
||||||
public function messageValidate(array $data): bool
|
public function messageValidate(array $data): bool
|
||||||
{
|
{
|
||||||
// 验证必需字段
|
// 验证 meta 字段存在
|
||||||
$requiredFields = ['company_id', 'platform_id', 'store_id', 'unique_id', 'raw_data'];
|
if (!isset($data['meta'])) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
foreach ($requiredFields as $field) {
|
// 验证 meta 中的必需字段
|
||||||
if (!isset($data[$field])) {
|
$requiredMetaFields = ['company_id', 'platform_id', 'store_id', 'unique_id'];
|
||||||
|
foreach ($requiredMetaFields as $field) {
|
||||||
|
if (!isset($data['meta'][$field])) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 验证 data 字段存在
|
||||||
|
if (!isset($data['data'])) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -103,12 +112,13 @@ abstract class EntityParse implements EntityParseInterface
|
|||||||
*/
|
*/
|
||||||
protected function initialize(): void
|
protected function initialize(): void
|
||||||
{
|
{
|
||||||
// 提取 metadata
|
// 提取 metadata(从 meta 字段中获取)
|
||||||
|
$meta = $this->data['meta'] ?? [];
|
||||||
$metadata = [
|
$metadata = [
|
||||||
'company_id' => $this->data['company_id'],
|
'company_id' => $meta['company_id'] ?? null,
|
||||||
'platform_id' => $this->data['platform_id'],
|
'platform_id' => $meta['platform_id'] ?? null,
|
||||||
'store_id' => $this->data['store_id'],
|
'store_id' => $meta['store_id'] ?? null,
|
||||||
'unique_id' => $this->data['unique_id'],
|
'unique_id' => $meta['unique_id'] ?? null,
|
||||||
];
|
];
|
||||||
|
|
||||||
$this->platform = $this->platformScopeMatch($metadata);
|
$this->platform = $this->platformScopeMatch($metadata);
|
||||||
@@ -305,9 +315,9 @@ abstract class EntityParse implements EntityParseInterface
|
|||||||
/**
|
/**
|
||||||
* 获取消息对象
|
* 获取消息对象
|
||||||
*
|
*
|
||||||
* @return ConsumerMessageInterface
|
* @return AMQPMessage
|
||||||
*/
|
*/
|
||||||
public function getMessage(): ConsumerMessageInterface
|
public function getMessage(): AMQPMessage
|
||||||
{
|
{
|
||||||
return $this->message;
|
return $this->message;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,9 +6,9 @@ namespace App\Entity\Parse\Traits;
|
|||||||
|
|
||||||
use App\Model\Model as Entity;
|
use App\Model\Model as Entity;
|
||||||
use App\Model\Platform;
|
use App\Model\Platform;
|
||||||
use Hyperf\Amqp\Message\ConsumerMessageInterface;
|
|
||||||
use InvalidArgumentException;
|
use InvalidArgumentException;
|
||||||
use Hyperf\Stringable\Str;
|
use Hyperf\Stringable\Str;
|
||||||
|
use PhpAmqpLib\Message\AMQPMessage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* EntityParseHelper Trait
|
* EntityParseHelper Trait
|
||||||
@@ -18,26 +18,28 @@ use Hyperf\Stringable\Str;
|
|||||||
trait EntityParseHelper
|
trait EntityParseHelper
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* 从 exchange 名称中提取平台信息
|
* 从消息体中提取平台信息
|
||||||
*
|
*
|
||||||
* 规则:exchange 格式为 "platform.exchange"
|
* 从消息体的 meta.platform_id 获取平台 ID,查询 Platform
|
||||||
* 例如:tmall.exchange -> Tmall 平台
|
|
||||||
*
|
*
|
||||||
* @param ConsumerMessageInterface $message
|
* @param AMQPMessage $message
|
||||||
* @return Platform
|
* @return Platform
|
||||||
* @throws InvalidArgumentException
|
* @throws InvalidArgumentException
|
||||||
*/
|
*/
|
||||||
protected function extractPlatformFromExchange(ConsumerMessageInterface $message): Platform
|
protected function extractPlatformFromExchange(AMQPMessage $message): Platform
|
||||||
{
|
{
|
||||||
$platformName = Str::of($message->getExchange())
|
$data = json_decode($message->getBody(), true);
|
||||||
->before('.')
|
|
||||||
->ucfirst()
|
|
||||||
->toString();
|
|
||||||
|
|
||||||
$platform = Platform::where('name', '=', $platformName)->first();
|
$platformId = $data['meta']['platform_id'] ?? null;
|
||||||
|
|
||||||
|
if (!$platformId) {
|
||||||
|
throw new InvalidArgumentException("Missing platform_id in message meta");
|
||||||
|
}
|
||||||
|
|
||||||
|
$platform = Platform::find($platformId);
|
||||||
|
|
||||||
if (!$platform) {
|
if (!$platform) {
|
||||||
throw new InvalidArgumentException("Platform name '{$platformName}' does not exist!");
|
throw new InvalidArgumentException("Platform with id '{$platformId}' does not exist!");
|
||||||
}
|
}
|
||||||
|
|
||||||
return $platform;
|
return $platform;
|
||||||
@@ -49,11 +51,11 @@ trait EntityParseHelper
|
|||||||
* 规则:routing key 格式为 "entity.platform"
|
* 规则:routing key 格式为 "entity.platform"
|
||||||
* 例如:order.tmall -> Order 实体
|
* 例如:order.tmall -> Order 实体
|
||||||
*
|
*
|
||||||
* @param ConsumerMessageInterface $message
|
* @param AMQPMessage $message
|
||||||
* @return Entity
|
* @return Entity
|
||||||
* @throws InvalidArgumentException
|
* @throws InvalidArgumentException
|
||||||
*/
|
*/
|
||||||
protected function extractEntityFromRoutingKey(ConsumerMessageInterface $message): Entity
|
protected function extractEntityFromRoutingKey(AMQPMessage $message): Entity
|
||||||
{
|
{
|
||||||
$entityName = Str::of($message->getRoutingKey())
|
$entityName = Str::of($message->getRoutingKey())
|
||||||
->before('.')
|
->before('.')
|
||||||
@@ -78,11 +80,11 @@ trait EntityParseHelper
|
|||||||
/**
|
/**
|
||||||
* 从消息体中提取 JSON 数据
|
* 从消息体中提取 JSON 数据
|
||||||
*
|
*
|
||||||
* @param ConsumerMessageInterface $message
|
* @param AMQPMessage $message
|
||||||
* @return array
|
* @return array
|
||||||
* @throws InvalidArgumentException
|
* @throws InvalidArgumentException
|
||||||
*/
|
*/
|
||||||
protected function extractMessageData(ConsumerMessageInterface $message): array
|
protected function extractMessageData(AMQPMessage $message): array
|
||||||
{
|
{
|
||||||
$body = $message->getBody();
|
$body = $message->getBody();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user