2025-11-27 13:40:58 +08:00
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Entity\Parse;
|
|
|
|
|
|
|
|
|
|
use App\Model\Model as Entity;
|
|
|
|
|
use App\Model\Company;
|
|
|
|
|
use App\Model\Platform;
|
|
|
|
|
use App\Model\Store;
|
|
|
|
|
use App\Entity\Parse\Traits\EntityParseHelper;
|
2025-11-27 15:03:25 +08:00
|
|
|
use Hyperf\Collection\LazyCollection;
|
2025-11-27 13:40:58 +08:00
|
|
|
use InvalidArgumentException;
|
2026-01-30 13:30:17 +08:00
|
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
2025-11-27 13:40:58 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* EntityParse 抽象类
|
|
|
|
|
*
|
|
|
|
|
* 使用工厂方法模式 + 延迟初始化
|
|
|
|
|
* 提供消息解析的通用框架和默认实现
|
|
|
|
|
*
|
2026-01-30 13:30:17 +08:00
|
|
|
* @method static static create(array $data, AMQPMessage $message)
|
2025-11-27 13:40:58 +08:00
|
|
|
*/
|
|
|
|
|
abstract class EntityParse implements EntityParseInterface
|
|
|
|
|
{
|
|
|
|
|
use EntityParseHelper;
|
|
|
|
|
|
2025-11-27 15:03:25 +08:00
|
|
|
protected array $data;
|
2026-01-30 13:30:17 +08:00
|
|
|
protected AMQPMessage $message;
|
2025-11-27 13:40:58 +08:00
|
|
|
protected ?Platform $platform = null;
|
|
|
|
|
protected ?Company $company = null;
|
|
|
|
|
protected ?Store $store = null;
|
|
|
|
|
|
2025-12-12 11:12:52 +08:00
|
|
|
/**
|
|
|
|
|
* 表字段静态缓存
|
|
|
|
|
* 在 Hyperf 常驻进程中,此缓存会一直保持到进程重启
|
|
|
|
|
*
|
|
|
|
|
* @var array<string, array<string>>
|
|
|
|
|
*/
|
|
|
|
|
protected static array $tableColumnsCache = [];
|
|
|
|
|
|
2025-11-27 13:40:58 +08:00
|
|
|
/**
|
|
|
|
|
* 禁止直接使用构造函数
|
|
|
|
|
* 使用 create() 工厂方法创建实例
|
|
|
|
|
*/
|
|
|
|
|
protected function __construct()
|
|
|
|
|
{
|
|
|
|
|
// 空构造函数,延迟初始化
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 工厂方法:创建解析器实例
|
|
|
|
|
*
|
2025-11-27 15:03:25 +08:00
|
|
|
* @param array $data
|
2026-01-30 13:30:17 +08:00
|
|
|
* @param AMQPMessage $message
|
2025-11-27 13:40:58 +08:00
|
|
|
* @return static
|
|
|
|
|
* @throws InvalidArgumentException
|
|
|
|
|
*/
|
2026-01-30 13:30:17 +08:00
|
|
|
public static function create(array $data, AMQPMessage $message): static
|
2025-11-27 13:40:58 +08:00
|
|
|
{
|
|
|
|
|
$instance = new static();
|
2025-11-27 15:03:25 +08:00
|
|
|
$instance->data = $data;
|
2025-12-11 16:38:29 +08:00
|
|
|
$instance->message = $message;
|
2025-11-27 15:03:25 +08:00
|
|
|
|
|
|
|
|
// 在初始化前先验证数据
|
|
|
|
|
if (!$instance->messageValidate($data)) {
|
|
|
|
|
throw new InvalidArgumentException('Message validation failed: required fields missing');
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-27 13:40:58 +08:00
|
|
|
$instance->initialize();
|
|
|
|
|
return $instance;
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-27 15:03:25 +08:00
|
|
|
/**
|
|
|
|
|
* 消息数据验证
|
|
|
|
|
*
|
|
|
|
|
* 默认实现:验证必需字段
|
|
|
|
|
* 子类可以覆写以添加自定义验证逻辑
|
|
|
|
|
*
|
|
|
|
|
* @param array $data
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
public function messageValidate(array $data): bool
|
|
|
|
|
{
|
2026-01-30 13:30:17 +08:00
|
|
|
// 验证 meta 字段存在
|
|
|
|
|
if (!isset($data['meta'])) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
2025-11-27 15:03:25 +08:00
|
|
|
|
2026-01-30 13:30:17 +08:00
|
|
|
// 验证 meta 中的必需字段
|
|
|
|
|
$requiredMetaFields = ['company_id', 'platform_id', 'store_id', 'unique_id'];
|
|
|
|
|
foreach ($requiredMetaFields as $field) {
|
|
|
|
|
if (!isset($data['meta'][$field])) {
|
2025-11-27 15:03:25 +08:00
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-30 13:30:17 +08:00
|
|
|
// 验证 data 字段存在
|
|
|
|
|
if (!isset($data['data'])) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-27 15:03:25 +08:00
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-27 13:40:58 +08:00
|
|
|
/**
|
|
|
|
|
* 延迟初始化
|
2025-11-27 15:03:25 +08:00
|
|
|
* 在 data 设置后执行作用域匹配和验证
|
2025-11-27 13:40:58 +08:00
|
|
|
*
|
|
|
|
|
* @return void
|
|
|
|
|
* @throws InvalidArgumentException
|
|
|
|
|
*/
|
|
|
|
|
protected function initialize(): void
|
|
|
|
|
{
|
2026-01-30 13:30:17 +08:00
|
|
|
// 提取 metadata(从 meta 字段中获取)
|
|
|
|
|
$meta = $this->data['meta'] ?? [];
|
2025-11-27 15:03:25 +08:00
|
|
|
$metadata = [
|
2026-01-30 13:30:17 +08:00
|
|
|
'company_id' => $meta['company_id'] ?? null,
|
|
|
|
|
'platform_id' => $meta['platform_id'] ?? null,
|
|
|
|
|
'store_id' => $meta['store_id'] ?? null,
|
|
|
|
|
'unique_id' => $meta['unique_id'] ?? null,
|
2025-11-27 15:03:25 +08:00
|
|
|
];
|
|
|
|
|
|
|
|
|
|
$this->platform = $this->platformScopeMatch($metadata);
|
|
|
|
|
$this->company = $this->companyScopeMatch($metadata);
|
|
|
|
|
$this->store = $this->storeScopeMatch($metadata);
|
2025-11-27 13:40:58 +08:00
|
|
|
|
|
|
|
|
$this->validateScope();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 验证作用域对象是否有效
|
|
|
|
|
*
|
|
|
|
|
* @return void
|
|
|
|
|
* @throws InvalidArgumentException
|
|
|
|
|
*/
|
|
|
|
|
protected function validateScope(): void
|
|
|
|
|
{
|
|
|
|
|
if (!$this->platform instanceof Platform || !isset($this->platform->id)) {
|
|
|
|
|
throw new InvalidArgumentException('Platform is undefined or invalid');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!$this->company instanceof Company || !isset($this->company->id)) {
|
|
|
|
|
throw new InvalidArgumentException('Company is undefined or invalid');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!$this->store instanceof Store || !isset($this->store->id)) {
|
|
|
|
|
throw new InvalidArgumentException('Store is undefined or invalid');
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 平台作用域匹配 - 提供默认实现
|
|
|
|
|
*
|
2025-12-11 16:38:29 +08:00
|
|
|
* 从 exchange 中提取平台信息
|
2025-11-27 13:40:58 +08:00
|
|
|
* 子类可以覆盖此方法以实现自定义逻辑
|
|
|
|
|
*
|
2025-11-27 15:03:25 +08:00
|
|
|
* @param array $metadata
|
2025-11-27 13:40:58 +08:00
|
|
|
* @return Platform
|
|
|
|
|
* @throws InvalidArgumentException
|
|
|
|
|
*/
|
2025-11-27 15:03:25 +08:00
|
|
|
public function platformScopeMatch(array $metadata): Platform
|
2025-11-27 13:40:58 +08:00
|
|
|
{
|
2025-12-11 16:38:29 +08:00
|
|
|
return $this->extractPlatformFromExchange($this->message);
|
2025-11-27 13:40:58 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 实体匹配 - 提供默认实现
|
|
|
|
|
*
|
2025-12-11 16:38:29 +08:00
|
|
|
* 从 routing key 中提取实体类型
|
2025-11-27 13:40:58 +08:00
|
|
|
* 子类可以覆盖此方法以实现自定义逻辑
|
|
|
|
|
*
|
2025-11-27 15:03:25 +08:00
|
|
|
* @param array $metadata
|
2025-11-27 13:40:58 +08:00
|
|
|
* @return Entity
|
|
|
|
|
* @throws InvalidArgumentException
|
|
|
|
|
*/
|
2025-11-27 15:03:25 +08:00
|
|
|
public function entityMatch(array $metadata): Entity
|
2025-11-27 13:40:58 +08:00
|
|
|
{
|
2025-12-11 16:38:29 +08:00
|
|
|
return $this->extractEntityFromRoutingKey($this->message);
|
2025-11-27 13:40:58 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2025-12-12 11:12:52 +08:00
|
|
|
* 获取实体的唯一键字段(用于 upsert 的 uniqueBy 参数)
|
2025-11-27 13:40:58 +08:00
|
|
|
*
|
2025-12-12 11:12:52 +08:00
|
|
|
* 必须由子类实现,定义哪些字段组成唯一约束
|
2025-11-27 13:40:58 +08:00
|
|
|
*
|
2025-12-12 11:12:52 +08:00
|
|
|
* 示例:
|
|
|
|
|
* ```php
|
|
|
|
|
* public function getUniqueBy(): array
|
|
|
|
|
* {
|
|
|
|
|
* return ['store_id', 'platform_order_id'];
|
|
|
|
|
* }
|
|
|
|
|
* ```
|
|
|
|
|
*
|
|
|
|
|
* @return array 唯一键字段名数组
|
2025-11-27 13:40:58 +08:00
|
|
|
*/
|
2025-12-12 11:12:52 +08:00
|
|
|
abstract public function getUniqueBy(): array;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取可更新的字段列表(用于 upsert 的 update 参数)
|
|
|
|
|
*
|
|
|
|
|
* 必须由子类实现,明确定义哪些字段在更新时可以被修改
|
|
|
|
|
* 通常需要排除:主键、唯一键、创建时间、关联 ID 等不应变更的字段
|
|
|
|
|
*
|
|
|
|
|
* 示例实现(针对 Order 模型):
|
|
|
|
|
* ```php
|
|
|
|
|
* public function getUpdateFields(): array
|
|
|
|
|
* {
|
|
|
|
|
* // 方案1:手动指定(推荐,最明确)
|
|
|
|
|
* return [
|
|
|
|
|
* 'order_status_id',
|
|
|
|
|
* 'payment_method_id',
|
|
|
|
|
* 'buyer_user_id',
|
|
|
|
|
* 'total_amount',
|
|
|
|
|
* 'updated_date',
|
|
|
|
|
* 'raw',
|
|
|
|
|
* // ... 其他可更新字段
|
|
|
|
|
* ];
|
|
|
|
|
*
|
|
|
|
|
* // 方案2:动态计算(使用辅助方法)
|
|
|
|
|
* $excludeFields = array_merge(
|
|
|
|
|
* ['id', 'created_at', 'created_date', 'company_id', 'platform_id'],
|
|
|
|
|
* $this->getUniqueBy()
|
|
|
|
|
* );
|
|
|
|
|
* return $this->getTableColumnsExcept($excludeFields);
|
|
|
|
|
* }
|
|
|
|
|
* ```
|
|
|
|
|
*
|
|
|
|
|
* @return array 可更新字段名数组
|
|
|
|
|
*/
|
|
|
|
|
abstract public function getUpdateFields(): array;
|
2025-11-27 13:40:58 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 公司作用域匹配 - 抽象方法
|
|
|
|
|
*
|
|
|
|
|
* 必须由子类实现,因为不同平台的公司识别逻辑不同
|
|
|
|
|
*
|
2025-11-27 15:03:25 +08:00
|
|
|
* @param array $metadata
|
2025-11-27 13:40:58 +08:00
|
|
|
* @return Company
|
|
|
|
|
*/
|
2025-11-27 15:03:25 +08:00
|
|
|
abstract public function companyScopeMatch(array $metadata): Company;
|
2025-11-27 13:40:58 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 店铺作用域匹配 - 抽象方法
|
|
|
|
|
*
|
|
|
|
|
* 必须由子类实现,因为不同平台的店铺识别逻辑不同
|
|
|
|
|
*
|
2025-11-27 15:03:25 +08:00
|
|
|
* @param array $metadata
|
2025-11-27 13:40:58 +08:00
|
|
|
* @return Store
|
|
|
|
|
*/
|
2025-11-27 15:03:25 +08:00
|
|
|
abstract public function storeScopeMatch(array $metadata): Store;
|
2025-11-27 13:40:58 +08:00
|
|
|
|
|
|
|
|
/**
|
2025-12-12 11:12:52 +08:00
|
|
|
* 实体数据映射 - 抽象方法 - 负责将消息体内的数据映射为数据库模型的字段
|
2025-11-27 13:40:58 +08:00
|
|
|
*
|
|
|
|
|
* 必须由子类实现,因为不同平台的数据结构不同
|
2025-12-12 11:12:52 +08:00
|
|
|
* 映射的结果会影响不同平台最终聚合的结果
|
2025-11-27 13:40:58 +08:00
|
|
|
*
|
2025-11-27 15:03:25 +08:00
|
|
|
* @param array $rawData
|
|
|
|
|
* @return LazyCollection
|
2025-11-27 13:40:58 +08:00
|
|
|
*/
|
2025-11-27 15:03:25 +08:00
|
|
|
abstract public function entityMap(array $rawData): LazyCollection;
|
2025-11-27 13:40:58 +08:00
|
|
|
|
2025-12-12 11:12:52 +08:00
|
|
|
// ==================== 辅助方法 ====================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取模型表的所有字段(带静态缓存)
|
|
|
|
|
*
|
|
|
|
|
* 利用 Hyperf 常驻进程特性,首次查询后缓存在静态属性中
|
|
|
|
|
* 进程重启后自动刷新,正好适配表结构变更场景
|
|
|
|
|
*
|
|
|
|
|
* @param Entity $entity 实体模型实例
|
|
|
|
|
* @return array 字段名数组
|
|
|
|
|
*/
|
|
|
|
|
protected function getTableColumns(Entity $entity): array
|
|
|
|
|
{
|
|
|
|
|
$table = $entity->getTable();
|
|
|
|
|
|
|
|
|
|
if (!isset(self::$tableColumnsCache[$table])) {
|
|
|
|
|
// 首次查询,从数据库 schema 获取字段列表
|
|
|
|
|
$columns = \Hyperf\DbConnection\Db::getSchemaBuilder()->getColumnListing($table);
|
|
|
|
|
self::$tableColumnsCache[$table] = $columns;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return self::$tableColumnsCache[$table];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取表字段列表,排除指定字段
|
|
|
|
|
*
|
|
|
|
|
* 用于动态计算 getUpdateFields()
|
|
|
|
|
*
|
|
|
|
|
* @param Entity $entity 实体模型实例
|
|
|
|
|
* @param array $excludeFields 要排除的字段
|
|
|
|
|
* @return array 过滤后的字段数组
|
|
|
|
|
*/
|
|
|
|
|
protected function getTableColumnsExcept(Entity $entity, array $excludeFields): array
|
|
|
|
|
{
|
|
|
|
|
$allColumns = $this->getTableColumns($entity);
|
|
|
|
|
return array_values(array_diff($allColumns, $excludeFields));
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-27 13:40:58 +08:00
|
|
|
// ==================== Getter 方法 ====================
|
|
|
|
|
|
|
|
|
|
/**
|
2025-11-27 15:03:25 +08:00
|
|
|
* 获取消息数据
|
2025-11-27 13:40:58 +08:00
|
|
|
*
|
2025-11-27 15:03:25 +08:00
|
|
|
* @return array
|
2025-11-27 13:40:58 +08:00
|
|
|
*/
|
2025-11-27 15:03:25 +08:00
|
|
|
public function getData(): array
|
2025-11-27 13:40:58 +08:00
|
|
|
{
|
2025-11-27 15:03:25 +08:00
|
|
|
return $this->data;
|
2025-11-27 13:40:58 +08:00
|
|
|
}
|
|
|
|
|
|
2025-12-11 16:38:29 +08:00
|
|
|
/**
|
|
|
|
|
* 获取消息对象
|
|
|
|
|
*
|
2026-01-30 13:30:17 +08:00
|
|
|
* @return AMQPMessage
|
2025-12-11 16:38:29 +08:00
|
|
|
*/
|
2026-01-30 13:30:17 +08:00
|
|
|
public function getMessage(): AMQPMessage
|
2025-12-11 16:38:29 +08:00
|
|
|
{
|
|
|
|
|
return $this->message;
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-27 13:40:58 +08:00
|
|
|
/**
|
|
|
|
|
* 获取平台对象
|
|
|
|
|
*
|
|
|
|
|
* @return Platform
|
|
|
|
|
*/
|
|
|
|
|
public function getPlatform(): Platform
|
|
|
|
|
{
|
|
|
|
|
return $this->platform;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取公司对象
|
|
|
|
|
*
|
|
|
|
|
* @return Company
|
|
|
|
|
*/
|
|
|
|
|
public function getCompany(): Company
|
|
|
|
|
{
|
|
|
|
|
return $this->company;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取店铺对象
|
|
|
|
|
*
|
|
|
|
|
* @return Store
|
|
|
|
|
*/
|
|
|
|
|
public function getStore(): Store
|
|
|
|
|
{
|
|
|
|
|
return $this->store;
|
|
|
|
|
}
|
|
|
|
|
}
|