update entity map
This commit is contained in:
@@ -33,7 +33,7 @@ class AppMessageQueuePushTmall extends HyperfCommand
|
|||||||
// 从 raw 数据库连接获取数据
|
// 从 raw 数据库连接获取数据
|
||||||
$orders = Db::connection('raw')
|
$orders = Db::connection('raw')
|
||||||
->table('wpic_taobao_order')->orderBy('id', 'desc')
|
->table('wpic_taobao_order')->orderBy('id', 'desc')
|
||||||
->limit(10)->get('order_raw')->lazy();
|
->limit(4)->get('order_raw')->lazy();
|
||||||
|
|
||||||
// dump($orders->first());
|
// dump($orders->first());
|
||||||
// return;
|
// return;
|
||||||
|
|||||||
@@ -9,10 +9,16 @@ use Hyperf\Command\Annotation\Command;
|
|||||||
use Psr\Container\ContainerInterface;
|
use Psr\Container\ContainerInterface;
|
||||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||||
use Hyperf\Contract\ConfigInterface;
|
use Hyperf\Contract\ConfigInterface;
|
||||||
|
use Symfony\Component\Console\Input\InputOption;
|
||||||
|
|
||||||
#[Command]
|
#[Command]
|
||||||
class AppMqStatus extends HyperfCommand
|
class AppMqStatus extends HyperfCommand
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* 业务队列类型
|
||||||
|
*/
|
||||||
|
private const QUEUE_TYPES = ['orders', 'products', 'refunds', 'inventory'];
|
||||||
|
|
||||||
public function __construct(protected ContainerInterface $container)
|
public function __construct(protected ContainerInterface $container)
|
||||||
{
|
{
|
||||||
parent::__construct('app:mq:status');
|
parent::__construct('app:mq:status');
|
||||||
@@ -22,8 +28,9 @@ class AppMqStatus extends HyperfCommand
|
|||||||
{
|
{
|
||||||
parent::configure();
|
parent::configure();
|
||||||
$this->setDescription('Display message counts for all accessible queues');
|
$this->setDescription('Display message counts for all accessible queues');
|
||||||
$this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds (default: 3)');
|
$this->addOption('watch', 'w', null, 'Watch mode - refresh every N seconds');
|
||||||
$this->addOption('interval', 'i', \Symfony\Component\Console\Input\InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3');
|
$this->addOption('interval', 'i', InputOption::VALUE_OPTIONAL, 'Refresh interval in seconds', '3');
|
||||||
|
$this->addOption('queue', null, InputOption::VALUE_OPTIONAL, 'Filter by queue type (orders, products, refunds, inventory)');
|
||||||
}
|
}
|
||||||
|
|
||||||
public function handle()
|
public function handle()
|
||||||
@@ -49,6 +56,7 @@ class AppMqStatus extends HyperfCommand
|
|||||||
private function displayQueueStatus(): int
|
private function displayQueueStatus(): int
|
||||||
{
|
{
|
||||||
$this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info');
|
$this->line('Fetching queue status... ' . date('Y-m-d H:i:s'), 'info');
|
||||||
|
$this->line('');
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$config = $this->container->get(ConfigInterface::class);
|
$config = $this->container->get(ConfigInterface::class);
|
||||||
@@ -74,61 +82,90 @@ class AppMqStatus extends HyperfCommand
|
|||||||
|
|
||||||
$channel = $connection->channel();
|
$channel = $connection->channel();
|
||||||
|
|
||||||
// 获取所有队列信息
|
// 获取 --queue 参数
|
||||||
$queues = $this->getQueuesFromAnnotations();
|
$filterQueue = $this->input->getOption('queue');
|
||||||
|
|
||||||
|
// 确定要显示的队列组
|
||||||
|
$queueTypes = $filterQueue
|
||||||
|
? [$filterQueue]
|
||||||
|
: self::QUEUE_TYPES;
|
||||||
|
|
||||||
|
// 验证队列类型
|
||||||
|
if ($filterQueue && !in_array($filterQueue, self::QUEUE_TYPES)) {
|
||||||
|
$this->error("Invalid queue type: {$filterQueue}");
|
||||||
|
$this->line("Valid types: " . implode(', ', self::QUEUE_TYPES));
|
||||||
|
$channel->close();
|
||||||
|
$connection->close();
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
$tableData = [];
|
|
||||||
$totalMessages = 0;
|
$totalMessages = 0;
|
||||||
|
$totalConsumers = 0;
|
||||||
|
$allQueueNames = [];
|
||||||
|
|
||||||
foreach ($queues as $queueName) {
|
// 收集主业务队列和死信队列的数据
|
||||||
try {
|
$businessQueuesData = [];
|
||||||
// 使用 passive=true 来获取队列信息而不创建队列
|
$deadLetterQueuesData = [];
|
||||||
// queue_declare 返回 [queue_name, message_count, consumer_count]
|
|
||||||
[$queue, $messageCount, $consumerCount] = $channel->queue_declare(
|
|
||||||
$queueName,
|
|
||||||
true, // passive
|
|
||||||
true, // durable
|
|
||||||
false, // exclusive
|
|
||||||
false // auto_delete
|
|
||||||
);
|
|
||||||
|
|
||||||
$tableData[] = [
|
foreach ($queueTypes as $type) {
|
||||||
'queue' => $queueName,
|
$groupData = $this->fetchQueueGroupData($channel, $type);
|
||||||
'messages' => $messageCount,
|
|
||||||
'consumers' => $consumerCount,
|
|
||||||
'status' => $messageCount > 0 ? '<fg=yellow>Has Messages</>' : '<fg=green>Empty</>',
|
|
||||||
];
|
|
||||||
|
|
||||||
$totalMessages += $messageCount;
|
foreach ($groupData as $queueInfo) {
|
||||||
} catch (\Exception $e) {
|
// 区分主队列和重试队列(死信队列)
|
||||||
$tableData[] = [
|
if (str_ends_with($queueInfo['queue'], '.retry.queue')) {
|
||||||
'queue' => $queueName,
|
$deadLetterQueuesData[] = $queueInfo;
|
||||||
'messages' => 'N/A',
|
} else {
|
||||||
'consumers' => 'N/A',
|
$businessQueuesData[] = $queueInfo;
|
||||||
'status' => '<fg=red>Error: ' . $e->getMessage() . '</>',
|
}
|
||||||
];
|
|
||||||
|
if (is_numeric($queueInfo['messages'])) {
|
||||||
|
$totalMessages += $queueInfo['messages'];
|
||||||
|
}
|
||||||
|
if (is_numeric($queueInfo['consumers'])) {
|
||||||
|
$totalConsumers += $queueInfo['consumers'];
|
||||||
|
}
|
||||||
|
$allQueueNames[] = $queueInfo['queue'];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 显示主业务队列表格
|
||||||
|
$this->displayBusinessQueues($businessQueuesData, $filterQueue);
|
||||||
|
|
||||||
|
// 显示死信队列(重试队列)
|
||||||
|
if (!empty($deadLetterQueuesData)) {
|
||||||
|
$this->line('');
|
||||||
|
$this->displayDeadLetterQueues($deadLetterQueuesData, $filterQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 显示共享的错误队列
|
||||||
|
if (!$filterQueue) {
|
||||||
|
$this->line('');
|
||||||
|
$errorQueueData = $this->fetchQueueData($channel, 'errors.queue');
|
||||||
|
$this->displaySharedQueues([$errorQueueData]);
|
||||||
|
|
||||||
|
if (is_numeric($errorQueueData['messages'])) {
|
||||||
|
$totalMessages += $errorQueueData['messages'];
|
||||||
|
}
|
||||||
|
if (is_numeric($errorQueueData['consumers'])) {
|
||||||
|
$totalConsumers += $errorQueueData['consumers'];
|
||||||
|
}
|
||||||
|
$allQueueNames[] = $errorQueueData['queue'];
|
||||||
|
}
|
||||||
|
|
||||||
// 关闭连接
|
// 关闭连接
|
||||||
$channel->close();
|
$channel->close();
|
||||||
$connection->close();
|
$connection->close();
|
||||||
|
|
||||||
// 显示表格
|
// 显示汇总信息
|
||||||
if (empty($tableData)) {
|
|
||||||
$this->warn('No queues found.');
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->table(
|
|
||||||
['Queue Name', 'Messages', 'Consumers', 'Status'],
|
|
||||||
array_map(function($row) {
|
|
||||||
return [$row['queue'], $row['messages'], $row['consumers'], $row['status']];
|
|
||||||
}, $tableData)
|
|
||||||
);
|
|
||||||
|
|
||||||
$this->line('');
|
$this->line('');
|
||||||
$this->info("Total messages across all queues: {$totalMessages}");
|
$this->info("=== Summary ===");
|
||||||
|
$this->line("Total messages: <fg=yellow>{$totalMessages}</>");
|
||||||
|
$this->line("Total active consumers: <fg=cyan>{$totalConsumers}</>");
|
||||||
|
$this->line('');
|
||||||
|
$this->line('Queues monitored:');
|
||||||
|
foreach ($allQueueNames as $queueName) {
|
||||||
|
$this->line(" - {$queueName}");
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
@@ -139,51 +176,221 @@ class AppMqStatus extends HyperfCommand
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 扫描所有使用 Consumer 注解的类,获取队列名称
|
* 获取队列组数据(主队列 + 重试队列)
|
||||||
*/
|
*/
|
||||||
private function getQueuesFromAnnotations(): array
|
private function fetchQueueGroupData($channel, string $type): array
|
||||||
{
|
{
|
||||||
$queues = [];
|
$queues = [
|
||||||
|
"{$type}.queue", // 主业务队列
|
||||||
|
"{$type}.retry.queue", // 重试队列
|
||||||
|
];
|
||||||
|
|
||||||
// 扫描 Platform 目录下的所有 Consumer
|
$groupData = [];
|
||||||
$platformPath = BASE_PATH . '/app/Platform';
|
foreach ($queues as $queueName) {
|
||||||
|
$groupData[] = $this->fetchQueueData($channel, $queueName);
|
||||||
if (is_dir($platformPath)) {
|
|
||||||
$this->scanDirectory($platformPath, $queues);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 如果没有找到任何队列,返回一些默认的队列名称
|
return $groupData;
|
||||||
if (empty($queues)) {
|
|
||||||
$queues = ['orders.queue'];
|
|
||||||
}
|
|
||||||
|
|
||||||
return array_unique($queues);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 递归扫描目录,查找包含 Consumer 注解的类
|
* 获取单个队列的数据
|
||||||
*/
|
*/
|
||||||
private function scanDirectory(string $path, array &$queues): void
|
private function fetchQueueData($channel, string $queueName): array
|
||||||
{
|
{
|
||||||
$files = scandir($path);
|
try {
|
||||||
|
// 使用 passive=true 来获取队列信息而不创建队列
|
||||||
|
[$queue, $messageCount, $consumerCount] = $channel->queue_declare(
|
||||||
|
$queueName,
|
||||||
|
true, // passive
|
||||||
|
true, // durable
|
||||||
|
false, // exclusive
|
||||||
|
false // auto_delete
|
||||||
|
);
|
||||||
|
|
||||||
foreach ($files as $file) {
|
return [
|
||||||
if ($file === '.' || $file === '..') {
|
'queue' => $queueName,
|
||||||
continue;
|
'messages' => $messageCount,
|
||||||
|
'consumers' => $consumerCount,
|
||||||
|
'status' => $this->getQueueStatus($messageCount, $consumerCount),
|
||||||
|
];
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
return [
|
||||||
|
'queue' => $queueName,
|
||||||
|
'messages' => 'N/A',
|
||||||
|
'consumers' => 'N/A',
|
||||||
|
'status' => '<fg=red>Error: ' . $e->getMessage() . '</>',
|
||||||
|
];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取队列状态描述
|
||||||
|
*/
|
||||||
|
private function getQueueStatus(int $messageCount, int $consumerCount): string
|
||||||
|
{
|
||||||
|
if ($messageCount > 100) {
|
||||||
|
return '<fg=red>⚠ High Load</>';
|
||||||
|
} elseif ($messageCount > 10) {
|
||||||
|
return '<fg=yellow>⚡ Processing</>';
|
||||||
|
} elseif ($messageCount > 0) {
|
||||||
|
return '<fg=cyan>✓ Active</>';
|
||||||
|
} else {
|
||||||
|
return '<fg=green>✓ Empty</>';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 显示业务队列(合并所有队列组)- 转置显示
|
||||||
|
*/
|
||||||
|
private function displayBusinessQueues(array $allGroupsData, ?string $filterQueue): void
|
||||||
|
{
|
||||||
|
// 构建标题
|
||||||
|
$title = $filterQueue
|
||||||
|
? "Business Queues (Filtered: {$filterQueue})"
|
||||||
|
: "Business Queues";
|
||||||
|
|
||||||
|
$this->line("<fg=blue>=== {$title} ===</>");
|
||||||
|
|
||||||
|
// 构建表头 - 使用简化的队列名称(去掉 .queue)
|
||||||
|
$headers = ['Metric'];
|
||||||
|
foreach ($allGroupsData as $queueInfo) {
|
||||||
|
$queueName = $queueInfo['queue'];
|
||||||
|
// 去掉 .queue 后缀
|
||||||
|
$simpleName = str_replace('.queue', '', $queueName);
|
||||||
|
$headers[] = $simpleName;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建行数据 - 转置显示
|
||||||
|
$rows = [
|
||||||
|
$this->buildMetricRow('Messages', $allGroupsData, 'messages'),
|
||||||
|
$this->buildMetricRow('Consumers', $allGroupsData, 'consumers'),
|
||||||
|
$this->buildMetricRow('Status', $allGroupsData, 'status'),
|
||||||
|
];
|
||||||
|
|
||||||
|
$this->table($headers, $rows);
|
||||||
|
$this->line('');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 显示死信队列(重试队列)- 转置显示
|
||||||
|
*/
|
||||||
|
private function displayDeadLetterQueues(array $queuesData, ?string $filterQueue): void
|
||||||
|
{
|
||||||
|
$title = $filterQueue
|
||||||
|
? "Dead Letter Queues (Filtered: {$filterQueue})"
|
||||||
|
: "Dead Letter Queues (Retry Queues)";
|
||||||
|
|
||||||
|
$this->line("<fg=magenta>=== {$title} ===</>");
|
||||||
|
|
||||||
|
// 构建表头
|
||||||
|
$headers = ['Metric'];
|
||||||
|
foreach ($queuesData as $queueInfo) {
|
||||||
|
$queueName = $queueInfo['queue'];
|
||||||
|
// 去掉 .queue 后缀,保留 retry 标识
|
||||||
|
$simpleName = str_replace('.queue', '', $queueName);
|
||||||
|
$headers[] = $simpleName;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建行数据
|
||||||
|
$rows = [
|
||||||
|
$this->buildMetricRow('Messages', $queuesData, 'messages'),
|
||||||
|
$this->buildMetricRow('Consumers', $queuesData, 'consumers'),
|
||||||
|
$this->buildMetricRow('Status', $queuesData, 'status'),
|
||||||
|
];
|
||||||
|
|
||||||
|
$this->table($headers, $rows);
|
||||||
|
|
||||||
|
// 添加说明
|
||||||
|
if (!empty($queuesData) && $this->hasMessages($queuesData)) {
|
||||||
|
$this->line('<fg=yellow>ℹ These queues receive messages from DLX when main queue processing fails</>');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 显示共享队列(errors.queue)- 转置显示
|
||||||
|
*/
|
||||||
|
private function displaySharedQueues(array $queuesData): void
|
||||||
|
{
|
||||||
|
$this->line("<fg=blue>=== Shared Queues ===</>");
|
||||||
|
|
||||||
|
// 构建表头
|
||||||
|
$headers = ['Metric'];
|
||||||
|
foreach ($queuesData as $queueInfo) {
|
||||||
|
$queueName = $queueInfo['queue'];
|
||||||
|
// 去掉 .queue 后缀
|
||||||
|
$simpleName = str_replace('.queue', '', $queueName);
|
||||||
|
$headers[] = $simpleName;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建行数据
|
||||||
|
$rows = [
|
||||||
|
$this->buildMetricRow('Messages', $queuesData, 'messages'),
|
||||||
|
$this->buildMetricRow('Consumers', $queuesData, 'consumers'),
|
||||||
|
$this->buildMetricRow('Status', $queuesData, 'status'),
|
||||||
|
];
|
||||||
|
|
||||||
|
$this->table($headers, $rows);
|
||||||
|
|
||||||
|
// 添加说明
|
||||||
|
if (!empty($queuesData) && $this->hasMessages($queuesData)) {
|
||||||
|
$this->line('<fg=yellow>ℹ Error queue contains messages that exceeded max retry count</>');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检查队列中是否有消息
|
||||||
|
*/
|
||||||
|
private function hasMessages(array $queuesData): bool
|
||||||
|
{
|
||||||
|
foreach ($queuesData as $queueInfo) {
|
||||||
|
if (is_numeric($queueInfo['messages']) && $queueInfo['messages'] > 0) {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
$filePath = $path . '/' . $file;
|
/**
|
||||||
|
* 构建指标行数据
|
||||||
|
*/
|
||||||
|
private function buildMetricRow(string $metricName, array $queuesData, string $field): array
|
||||||
|
{
|
||||||
|
$row = [$metricName];
|
||||||
|
|
||||||
if (is_dir($filePath)) {
|
foreach ($queuesData as $queueInfo) {
|
||||||
$this->scanDirectory($filePath, $queues);
|
$value = $queueInfo[$field];
|
||||||
} elseif (is_file($filePath) && pathinfo($filePath, PATHINFO_EXTENSION) === 'php') {
|
|
||||||
$content = file_get_contents($filePath);
|
|
||||||
|
|
||||||
// 查找 Consumer 注解中的 queue 参数
|
// 根据字段类型格式化显示
|
||||||
if (preg_match('/#\[Consumer\([^)]*queue:\s*["\']([^"\']+)["\']/', $content, $matches)) {
|
if ($field === 'messages' || $field === 'consumers') {
|
||||||
$queues[] = $matches[1];
|
$row[] = $this->formatNumber($value);
|
||||||
}
|
} else {
|
||||||
|
$row[] = $value;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return $row;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 格式化数字显示
|
||||||
|
*/
|
||||||
|
private function formatNumber($value): string
|
||||||
|
{
|
||||||
|
if (!is_numeric($value)) {
|
||||||
|
return (string) $value;
|
||||||
|
}
|
||||||
|
|
||||||
|
$num = (int) $value;
|
||||||
|
|
||||||
|
if ($num > 1000) {
|
||||||
|
return '<fg=red>' . number_format($num) . '</>';
|
||||||
|
} elseif ($num > 100) {
|
||||||
|
return '<fg=yellow>' . number_format($num) . '</>';
|
||||||
|
} elseif ($num > 0) {
|
||||||
|
return '<fg=cyan>' . $num . '</>';
|
||||||
|
} else {
|
||||||
|
return '<fg=gray>' . $num . '</>';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -135,22 +135,19 @@ class EntityParseFactory
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 从数据中提取平台名称
|
* 从数据中提取平台名称(从 meta 字段中获取)
|
||||||
*
|
*
|
||||||
* @param array $data
|
* @param array $data
|
||||||
* @return string
|
* @return string
|
||||||
|
* @throws InvalidArgumentException
|
||||||
*/
|
*/
|
||||||
private static function extractPlatformNameFromData(array $data): string
|
private static function extractPlatformNameFromData(array $data): string
|
||||||
{
|
{
|
||||||
// 优先从 platform_id 获取,如果没有则尝试从其他字段获取
|
if (!isset($data['meta']['platform_id'])) {
|
||||||
if (isset($data['platform_id'])) {
|
throw new InvalidArgumentException("Cannot extract platform name from data: meta.platform_id missing");
|
||||||
// 这里可以根据 platform_id 映射到平台名称
|
|
||||||
// 简化处理:假设需要从数据库或配置中查找
|
|
||||||
// 当前先直接使用 platform_id 作为平台名称
|
|
||||||
return self::resolvePlatformName($data['platform_id']);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new InvalidArgumentException("Cannot extract platform name from data: platform_id missing");
|
return self::resolvePlatformName($data['meta']['platform_id']);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use App\Model\Company;
|
|||||||
use App\Model\Model as Entity;
|
use App\Model\Model as Entity;
|
||||||
use App\Model\Store;
|
use App\Model\Store;
|
||||||
use App\Entity\Parse\EntityParse;
|
use App\Entity\Parse\EntityParse;
|
||||||
use Hyperf\Amqp\Message\ConsumerMessageInterface;
|
use Hyperf\Collection\LazyCollection;
|
||||||
use InvalidArgumentException;
|
use InvalidArgumentException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -21,20 +21,20 @@ class TmallOrderParser extends EntityParse
|
|||||||
/**
|
/**
|
||||||
* 公司作用域匹配
|
* 公司作用域匹配
|
||||||
*
|
*
|
||||||
* 从消息体中提取 company_id,然后查询数据库获取公司对象
|
* 从 metadata 中提取 company_id,然后查询数据库获取公司对象
|
||||||
*
|
*
|
||||||
* @param ConsumerMessageInterface $message
|
* @param array $metadata
|
||||||
* @return Company
|
* @return Company
|
||||||
* @throws InvalidArgumentException
|
* @throws InvalidArgumentException
|
||||||
*/
|
*/
|
||||||
public function companyScopeMatch(ConsumerMessageInterface $message): Company
|
public function companyScopeMatch(array $metadata): Company
|
||||||
{
|
{
|
||||||
$data = $this->extractMessageData($message);
|
|
||||||
|
|
||||||
// 验证必需字段
|
// 验证必需字段
|
||||||
$this->validateRequiredFields($data, ['company_id']);
|
if (!isset($metadata['company_id'])) {
|
||||||
|
throw new InvalidArgumentException('company_id is required in metadata');
|
||||||
|
}
|
||||||
|
|
||||||
$companyId = $data['company_id'];
|
$companyId = $metadata['company_id'];
|
||||||
|
|
||||||
$company = Company::find($companyId);
|
$company = Company::find($companyId);
|
||||||
|
|
||||||
@@ -48,20 +48,20 @@ class TmallOrderParser extends EntityParse
|
|||||||
/**
|
/**
|
||||||
* 店铺作用域匹配
|
* 店铺作用域匹配
|
||||||
*
|
*
|
||||||
* 从消息体中提取 store_id,然后查询数据库获取店铺对象
|
* 从 metadata 中提取 store_id,然后查询数据库获取店铺对象
|
||||||
*
|
*
|
||||||
* @param ConsumerMessageInterface $message
|
* @param array $metadata
|
||||||
* @return Store
|
* @return Store
|
||||||
* @throws InvalidArgumentException
|
* @throws InvalidArgumentException
|
||||||
*/
|
*/
|
||||||
public function storeScopeMatch(ConsumerMessageInterface $message): Store
|
public function storeScopeMatch(array $metadata): Store
|
||||||
{
|
{
|
||||||
$data = $this->extractMessageData($message);
|
|
||||||
|
|
||||||
// 验证必需字段
|
// 验证必需字段
|
||||||
$this->validateRequiredFields($data, ['store_id']);
|
if (!isset($metadata['store_id'])) {
|
||||||
|
throw new InvalidArgumentException('store_id is required in metadata');
|
||||||
|
}
|
||||||
|
|
||||||
$storeId = $data['store_id'];
|
$storeId = $metadata['store_id'];
|
||||||
|
|
||||||
$store = Store::find($storeId);
|
$store = Store::find($storeId);
|
||||||
|
|
||||||
@@ -75,50 +75,44 @@ class TmallOrderParser extends EntityParse
|
|||||||
/**
|
/**
|
||||||
* 实体数据映射
|
* 实体数据映射
|
||||||
*
|
*
|
||||||
* 将原始数据映射到实体对象
|
* 将原始数据映射为可供 Model 使用的数组集合
|
||||||
*
|
*
|
||||||
* @param array $data
|
* @param array $rawData 原始数据数组,通常来自 $data['raw_data']
|
||||||
* @param Entity $entity
|
* @return LazyCollection 返回 LazyCollection,每个元素为可供 Model::fill() 使用的数组
|
||||||
* @return Entity
|
|
||||||
*/
|
*/
|
||||||
public function entityMap(array $data, Entity $entity): Entity
|
public function entityMap(array $rawData): LazyCollection
|
||||||
{
|
{
|
||||||
// 假设数据结构为:
|
// 使用 LazyCollection 进行延迟处理,提高性能
|
||||||
// {
|
return LazyCollection::make(function () use ($rawData) {
|
||||||
// "platform_id": 2,
|
// 如果 rawData 是单个记录,转换为数组
|
||||||
// "company_id": 188,
|
$records = isset($rawData[0]) ? $rawData : [$rawData];
|
||||||
// "store_id": 292,
|
|
||||||
// "unique_id": "abc123",
|
|
||||||
// "raw_data": [...]
|
|
||||||
// }
|
|
||||||
|
|
||||||
$rawData = $data['raw_data'] ?? [];
|
foreach ($records as $record) {
|
||||||
|
// 映射每条原始数据到 Model 可用的数组格式
|
||||||
// 映射基本信息
|
yield [
|
||||||
$entity->platform_id = $this->getPlatform()->id;
|
'platform_id' => $this->getPlatform()->id,
|
||||||
$entity->company_id = $this->getCompany()->id;
|
'company_id' => $this->getCompany()->id,
|
||||||
$entity->store_id = $this->getStore()->id;
|
'store_id' => $this->getStore()->id,
|
||||||
$entity->unique_id = $data['unique_id'] ?? null;
|
'unique_id' => $record['unique_id'] ?? $this->getData()['unique_id'] ?? null,
|
||||||
|
'raw_data' => json_encode($record),
|
||||||
// 映射原始数据
|
// 根据实际业务需求映射其他字段
|
||||||
if (!empty($rawData)) {
|
// 例如:
|
||||||
$entity->raw_data = json_encode($rawData);
|
// 'order_id' => $record['order_id'] ?? null,
|
||||||
}
|
// 'order_status' => $record['order_status'] ?? null,
|
||||||
|
// 'order_amount' => $record['order_amount'] ?? 0,
|
||||||
// 映射其他字段(根据实际业务需求)
|
// ...
|
||||||
// ...
|
];
|
||||||
|
}
|
||||||
return $entity;
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 可选:覆盖唯一标识符提取逻辑
|
* 可选:覆盖唯一标识符提取逻辑
|
||||||
*
|
*
|
||||||
* 如果使用默认的 id/unique_id 提取逻辑,则无需覆盖此方法
|
* 如果使用默认的 unique_id 提取逻辑,则无需覆盖此方法
|
||||||
*/
|
*/
|
||||||
// public function entityUniqueIdentifierExtract(ConsumerMessageInterface $message): string|int
|
// public function entityUniqueIdentifierExtract(array $metadata): string|int
|
||||||
// {
|
// {
|
||||||
// $data = $this->extractMessageData($message);
|
// return $metadata['custom_id_field'] ?? throw new InvalidArgumentException('custom_id_field not found');
|
||||||
// return $this->extractUniqueIdentifier($data, 'custom_id_field');
|
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user