update entity parse batch insert
This commit is contained in:
@@ -31,6 +31,14 @@ abstract class EntityParse implements EntityParseInterface
|
|||||||
protected ?Company $company = null;
|
protected ?Company $company = null;
|
||||||
protected ?Store $store = null;
|
protected ?Store $store = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 表字段静态缓存
|
||||||
|
* 在 Hyperf 常驻进程中,此缓存会一直保持到进程重启
|
||||||
|
*
|
||||||
|
* @var array<string, array<string>>
|
||||||
|
*/
|
||||||
|
protected static array $tableColumnsCache = [];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 禁止直接使用构造函数
|
* 禁止直接使用构造函数
|
||||||
* 使用 create() 工厂方法创建实例
|
* 使用 create() 工厂方法创建实例
|
||||||
@@ -162,28 +170,55 @@ abstract class EntityParse implements EntityParseInterface
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 唯一标识符提取 - 提供默认实现
|
* 获取实体的唯一键字段(用于 upsert 的 uniqueBy 参数)
|
||||||
*
|
*
|
||||||
* 子类需要覆盖此方法以实现自定义逻辑
|
* 必须由子类实现,定义哪些字段组成唯一约束
|
||||||
*
|
*
|
||||||
* @param array $metadata
|
* 示例:
|
||||||
* @return array
|
* ```php
|
||||||
* @throws InvalidArgumentException
|
* public function getUniqueBy(): array
|
||||||
|
* {
|
||||||
|
* return ['store_id', 'platform_order_id'];
|
||||||
|
* }
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @return array 唯一键字段名数组
|
||||||
*/
|
*/
|
||||||
public function entityUniqueIdentifierExtract(array $metadata): array
|
abstract public function getUniqueBy(): array;
|
||||||
{
|
|
||||||
// 举例说明,当 Order Entity 实体需要根据 平台 ID 和 平台订单 ID 来确定唯一性时
|
|
||||||
// 确认仓库中 Order 模型 对应的数据库字段约束条件为
|
|
||||||
// CONSTRAINT "orders_store_platform_order_unique" UNIQUE ("store_id", "platform_order_id")
|
|
||||||
// 可以从 $metadata 中分别提取 store_id 和 platform_order_id 的值
|
|
||||||
// 返回构造后的结果即可
|
|
||||||
// return ['store_id' => 123, 'platform_order_id' => 123123]
|
|
||||||
|
|
||||||
$className = static::class;
|
/**
|
||||||
throw new InvalidArgumentException(
|
* 获取可更新的字段列表(用于 upsert 的 update 参数)
|
||||||
"Method entityUniqueIdentifierExtract() must be implemented in class '{$className}'"
|
*
|
||||||
);
|
* 必须由子类实现,明确定义哪些字段在更新时可以被修改
|
||||||
}
|
* 通常需要排除:主键、唯一键、创建时间、关联 ID 等不应变更的字段
|
||||||
|
*
|
||||||
|
* 示例实现(针对 Order 模型):
|
||||||
|
* ```php
|
||||||
|
* public function getUpdateFields(): array
|
||||||
|
* {
|
||||||
|
* // 方案1:手动指定(推荐,最明确)
|
||||||
|
* return [
|
||||||
|
* 'order_status_id',
|
||||||
|
* 'payment_method_id',
|
||||||
|
* 'buyer_user_id',
|
||||||
|
* 'total_amount',
|
||||||
|
* 'updated_date',
|
||||||
|
* 'raw',
|
||||||
|
* // ... 其他可更新字段
|
||||||
|
* ];
|
||||||
|
*
|
||||||
|
* // 方案2:动态计算(使用辅助方法)
|
||||||
|
* $excludeFields = array_merge(
|
||||||
|
* ['id', 'created_at', 'created_date', 'company_id', 'platform_id'],
|
||||||
|
* $this->getUniqueBy()
|
||||||
|
* );
|
||||||
|
* return $this->getTableColumnsExcept($excludeFields);
|
||||||
|
* }
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @return array 可更新字段名数组
|
||||||
|
*/
|
||||||
|
abstract public function getUpdateFields(): array;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 公司作用域匹配 - 抽象方法
|
* 公司作用域匹配 - 抽象方法
|
||||||
@@ -206,15 +241,55 @@ abstract class EntityParse implements EntityParseInterface
|
|||||||
abstract public function storeScopeMatch(array $metadata): Store;
|
abstract public function storeScopeMatch(array $metadata): Store;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 实体数据映射 - 抽象方法
|
* 实体数据映射 - 抽象方法 - 负责将消息体内的数据映射为数据库模型的字段
|
||||||
*
|
*
|
||||||
* 必须由子类实现,因为不同平台的数据结构不同
|
* 必须由子类实现,因为不同平台的数据结构不同
|
||||||
|
* 映射的结果会影响不同平台最终聚合的结果
|
||||||
*
|
*
|
||||||
* @param array $rawData
|
* @param array $rawData
|
||||||
* @return LazyCollection
|
* @return LazyCollection
|
||||||
*/
|
*/
|
||||||
abstract public function entityMap(array $rawData): LazyCollection;
|
abstract public function entityMap(array $rawData): LazyCollection;
|
||||||
|
|
||||||
|
// ==================== 辅助方法 ====================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取模型表的所有字段(带静态缓存)
|
||||||
|
*
|
||||||
|
* 利用 Hyperf 常驻进程特性,首次查询后缓存在静态属性中
|
||||||
|
* 进程重启后自动刷新,正好适配表结构变更场景
|
||||||
|
*
|
||||||
|
* @param Entity $entity 实体模型实例
|
||||||
|
* @return array 字段名数组
|
||||||
|
*/
|
||||||
|
protected function getTableColumns(Entity $entity): array
|
||||||
|
{
|
||||||
|
$table = $entity->getTable();
|
||||||
|
|
||||||
|
if (!isset(self::$tableColumnsCache[$table])) {
|
||||||
|
// 首次查询,从数据库 schema 获取字段列表
|
||||||
|
$columns = \Hyperf\DbConnection\Db::getSchemaBuilder()->getColumnListing($table);
|
||||||
|
self::$tableColumnsCache[$table] = $columns;
|
||||||
|
}
|
||||||
|
|
||||||
|
return self::$tableColumnsCache[$table];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取表字段列表,排除指定字段
|
||||||
|
*
|
||||||
|
* 用于动态计算 getUpdateFields()
|
||||||
|
*
|
||||||
|
* @param Entity $entity 实体模型实例
|
||||||
|
* @param array $excludeFields 要排除的字段
|
||||||
|
* @return array 过滤后的字段数组
|
||||||
|
*/
|
||||||
|
protected function getTableColumnsExcept(Entity $entity, array $excludeFields): array
|
||||||
|
{
|
||||||
|
$allColumns = $this->getTableColumns($entity);
|
||||||
|
return array_values(array_diff($allColumns, $excludeFields));
|
||||||
|
}
|
||||||
|
|
||||||
// ==================== Getter 方法 ====================
|
// ==================== Getter 方法 ====================
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -102,15 +102,26 @@ class OrderConsumer extends ConsumerMessage
|
|||||||
// message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
|
// message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。
|
||||||
$entityMapResult = $parse->entityMap($data['raw_data'] ?? []);
|
$entityMapResult = $parse->entityMap($data['raw_data'] ?? []);
|
||||||
|
|
||||||
// $entityMapResult 应该是一个内部元素为 Model 的集合
|
// 将 LazyCollection 转为数组,准备批量操作
|
||||||
|
$dataToUpsert = $entityMapResult->all();
|
||||||
|
|
||||||
|
if (empty($dataToUpsert)) {
|
||||||
|
dump('No data to process');
|
||||||
|
return Result::ACK;
|
||||||
|
}
|
||||||
|
|
||||||
|
dump("Processing " . count($dataToUpsert) . " order(s) with batch upsert");
|
||||||
|
|
||||||
Db::beginTransaction();
|
Db::beginTransaction();
|
||||||
|
|
||||||
// 假设 $entityMapResult 为集合 @Collection 对象
|
// 使用 upsert 批量处理插入和更新
|
||||||
$entityMapResult->each(function ($el) use ($entity) {
|
// 利用数据库唯一索引自动判断是插入还是更新
|
||||||
$clone = clone $entity;
|
// 解决了重复订单推送的问题:存在则更新,不存在则插入
|
||||||
$clone->fill($el);
|
$entity->newQuery()->upsert(
|
||||||
$clone->save();
|
$dataToUpsert,
|
||||||
});
|
$parse->getUniqueBy(), // 从解析器获取唯一键字段
|
||||||
|
$parse->getUpdateFields() // 从解析器获取可更新字段
|
||||||
|
);
|
||||||
|
|
||||||
Db::commit();
|
Db::commit();
|
||||||
|
|
||||||
|
|||||||
@@ -210,12 +210,66 @@ class Order extends EntityParse
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 可选:覆盖唯一标识符提取逻辑
|
* 获取唯一键字段(对应数据库唯一索引)
|
||||||
*
|
*
|
||||||
* 如果使用默认的 unique_id 提取逻辑,则无需覆盖此方法
|
* 对应数据库约束:
|
||||||
|
* UNIQUE INDEX orders_store_platform_order_unique (store_id, platform_order_id)
|
||||||
|
*
|
||||||
|
* @return array
|
||||||
*/
|
*/
|
||||||
// public function entityUniqueIdentifierExtract(array $metadata): string|int
|
public function getUniqueBy(): array
|
||||||
// {
|
{
|
||||||
// return $metadata['custom_id_field'] ?? throw new InvalidArgumentException('custom_id_field not found');
|
return ['store_id', 'platform_order_id'];
|
||||||
// }
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取可更新字段列表
|
||||||
|
*
|
||||||
|
* 排除:主键、唯一键、创建时间、关联 ID
|
||||||
|
*
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
public function getUpdateFields(): array
|
||||||
|
{
|
||||||
|
// 手动指定(推荐:明确且高效,无数据库查询开销)
|
||||||
|
return [
|
||||||
|
'order_status_id',
|
||||||
|
'payment_method_id',
|
||||||
|
'buyer_user_id',
|
||||||
|
'presale',
|
||||||
|
'total_amount',
|
||||||
|
'total_paid',
|
||||||
|
'total_discount',
|
||||||
|
'total_received',
|
||||||
|
'freight_fee',
|
||||||
|
'tax_fee',
|
||||||
|
'discount_fee',
|
||||||
|
'commission_fee',
|
||||||
|
'coupon_amount',
|
||||||
|
'voucher_amount',
|
||||||
|
'order_type_id',
|
||||||
|
'updated_date',
|
||||||
|
'paid_date',
|
||||||
|
'shipping_date',
|
||||||
|
'zipcode',
|
||||||
|
'city',
|
||||||
|
'province',
|
||||||
|
'country',
|
||||||
|
'raw',
|
||||||
|
'ext',
|
||||||
|
'updated_at',
|
||||||
|
];
|
||||||
|
|
||||||
|
// 动态计算方案(如果表字段经常变化,可以使用):
|
||||||
|
// $entity = $this->entityMatch([
|
||||||
|
// 'company_id' => $this->getCompany()->id,
|
||||||
|
// 'platform_id' => $this->getPlatform()->id,
|
||||||
|
// 'store_id' => $this->getStore()->id,
|
||||||
|
// ]);
|
||||||
|
// $excludeFields = array_merge(
|
||||||
|
// ['id', 'created_at', 'created_date', 'company_id', 'platform_id'],
|
||||||
|
// $this->getUniqueBy()
|
||||||
|
// );
|
||||||
|
// return $this->getTableColumnsExcept($entity, $excludeFields);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user