diff --git a/backend/app/Entity/Parse/EntityParse.php b/backend/app/Entity/Parse/EntityParse.php index 76ab86c..f4b96a3 100644 --- a/backend/app/Entity/Parse/EntityParse.php +++ b/backend/app/Entity/Parse/EntityParse.php @@ -9,9 +9,9 @@ use App\Model\Company; use App\Model\Platform; use App\Model\Store; use App\Entity\Parse\Traits\EntityParseHelper; -use Hyperf\Amqp\Message\ConsumerMessageInterface; use Hyperf\Collection\LazyCollection; use InvalidArgumentException; +use PhpAmqpLib\Message\AMQPMessage; /** * EntityParse 抽象类 @@ -19,14 +19,14 @@ use InvalidArgumentException; * 使用工厂方法模式 + 延迟初始化 * 提供消息解析的通用框架和默认实现 * - * @method static static create(array $data, ConsumerMessageInterface $message) + * @method static static create(array $data, AMQPMessage $message) */ abstract class EntityParse implements EntityParseInterface { use EntityParseHelper; protected array $data; - protected ConsumerMessageInterface $message; + protected AMQPMessage $message; protected ?Platform $platform = null; protected ?Company $company = null; protected ?Store $store = null; @@ -52,11 +52,11 @@ abstract class EntityParse implements EntityParseInterface * 工厂方法:创建解析器实例 * * @param array $data - * @param ConsumerMessageInterface $message + * @param AMQPMessage $message * @return static * @throws InvalidArgumentException */ - public static function create(array $data, ConsumerMessageInterface $message): static + public static function create(array $data, AMQPMessage $message): static { $instance = new static(); $instance->data = $data; @@ -82,15 +82,24 @@ abstract class EntityParse implements EntityParseInterface */ public function messageValidate(array $data): bool { - // 验证必需字段 - $requiredFields = ['company_id', 'platform_id', 'store_id', 'unique_id', 'raw_data']; + // 验证 meta 字段存在 + if (!isset($data['meta'])) { + return false; + } - foreach ($requiredFields as $field) { - if (!isset($data[$field])) { + // 验证 meta 中的必需字段 + $requiredMetaFields = ['company_id', 'platform_id', 'store_id', 'unique_id']; + foreach ($requiredMetaFields as $field) { + if (!isset($data['meta'][$field])) { return false; } } + // 验证 data 字段存在 + if (!isset($data['data'])) { + return false; + } + return true; } @@ -103,12 +112,13 @@ abstract class EntityParse implements EntityParseInterface */ protected function initialize(): void { - // 提取 metadata + // 提取 metadata(从 meta 字段中获取) + $meta = $this->data['meta'] ?? []; $metadata = [ - 'company_id' => $this->data['company_id'], - 'platform_id' => $this->data['platform_id'], - 'store_id' => $this->data['store_id'], - 'unique_id' => $this->data['unique_id'], + 'company_id' => $meta['company_id'] ?? null, + 'platform_id' => $meta['platform_id'] ?? null, + 'store_id' => $meta['store_id'] ?? null, + 'unique_id' => $meta['unique_id'] ?? null, ]; $this->platform = $this->platformScopeMatch($metadata); @@ -305,9 +315,9 @@ abstract class EntityParse implements EntityParseInterface /** * 获取消息对象 * - * @return ConsumerMessageInterface + * @return AMQPMessage */ - public function getMessage(): ConsumerMessageInterface + public function getMessage(): AMQPMessage { return $this->message; } diff --git a/backend/app/Entity/Parse/Traits/EntityParseHelper.php b/backend/app/Entity/Parse/Traits/EntityParseHelper.php index ef05424..9ddfac8 100644 --- a/backend/app/Entity/Parse/Traits/EntityParseHelper.php +++ b/backend/app/Entity/Parse/Traits/EntityParseHelper.php @@ -6,9 +6,9 @@ namespace App\Entity\Parse\Traits; use App\Model\Model as Entity; use App\Model\Platform; -use Hyperf\Amqp\Message\ConsumerMessageInterface; use InvalidArgumentException; use Hyperf\Stringable\Str; +use PhpAmqpLib\Message\AMQPMessage; /** * EntityParseHelper Trait @@ -18,26 +18,28 @@ use Hyperf\Stringable\Str; trait EntityParseHelper { /** - * 从 exchange 名称中提取平台信息 + * 从消息体中提取平台信息 * - * 规则:exchange 格式为 "platform.exchange" - * 例如:tmall.exchange -> Tmall 平台 + * 从消息体的 meta.platform_id 获取平台 ID,查询 Platform * - * @param ConsumerMessageInterface $message + * @param AMQPMessage $message * @return Platform * @throws InvalidArgumentException */ - protected function extractPlatformFromExchange(ConsumerMessageInterface $message): Platform + protected function extractPlatformFromExchange(AMQPMessage $message): Platform { - $platformName = Str::of($message->getExchange()) - ->before('.') - ->ucfirst() - ->toString(); + $data = json_decode($message->getBody(), true); - $platform = Platform::where('name', '=', $platformName)->first(); + $platformId = $data['meta']['platform_id'] ?? null; + + if (!$platformId) { + throw new InvalidArgumentException("Missing platform_id in message meta"); + } + + $platform = Platform::find($platformId); if (!$platform) { - throw new InvalidArgumentException("Platform name '{$platformName}' does not exist!"); + throw new InvalidArgumentException("Platform with id '{$platformId}' does not exist!"); } return $platform; @@ -49,11 +51,11 @@ trait EntityParseHelper * 规则:routing key 格式为 "entity.platform" * 例如:order.tmall -> Order 实体 * - * @param ConsumerMessageInterface $message + * @param AMQPMessage $message * @return Entity * @throws InvalidArgumentException */ - protected function extractEntityFromRoutingKey(ConsumerMessageInterface $message): Entity + protected function extractEntityFromRoutingKey(AMQPMessage $message): Entity { $entityName = Str::of($message->getRoutingKey()) ->before('.') @@ -78,11 +80,11 @@ trait EntityParseHelper /** * 从消息体中提取 JSON 数据 * - * @param ConsumerMessageInterface $message + * @param AMQPMessage $message * @return array * @throws InvalidArgumentException */ - protected function extractMessageData(ConsumerMessageInterface $message): array + protected function extractMessageData(AMQPMessage $message): array { $body = $message->getBody();