add aggr
This commit is contained in:
@@ -0,0 +1,35 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Command;
|
||||
|
||||
use App\Service\OrderAggregatesRefreshJob;
|
||||
use Hyperf\Command\Annotation\Command;
|
||||
use Hyperf\Command\Command as HyperfCommand;
|
||||
|
||||
/**
|
||||
* 手动触发聚合刷新队列消费
|
||||
*
|
||||
* 与 Crontab 共用 OrderAggregatesRefreshJob,便于运维即时补刷或调试。
|
||||
*/
|
||||
#[Command]
|
||||
class OrderAggregatesRefreshCommand extends HyperfCommand
|
||||
{
|
||||
public function __construct(private OrderAggregatesRefreshJob $job)
|
||||
{
|
||||
parent::__construct('orders:refresh-aggregates');
|
||||
}
|
||||
|
||||
public function configure(): void
|
||||
{
|
||||
parent::configure();
|
||||
$this->setDescription('消费 aggregate_refresh_queue,对滞后日期调用 refresh_continuous_aggregate');
|
||||
}
|
||||
|
||||
public function handle(): void
|
||||
{
|
||||
$result = $this->job->run();
|
||||
$this->info(sprintf('Processed: %d, Failed: %d', $result['processed'], $result['failed']));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Model;
|
||||
|
||||
/**
|
||||
* @property string $refresh_date 格式 Y-m-d
|
||||
* @property string $aggregate_view 视图名(如 orders_daily_by_created)
|
||||
* @property \Carbon\Carbon $created_at 入队时间
|
||||
*/
|
||||
class AggregateRefreshQueue extends Model
|
||||
{
|
||||
protected ?string $table = 'aggregate_refresh_queue';
|
||||
|
||||
public bool $timestamps = false;
|
||||
|
||||
public bool $incrementing = false;
|
||||
|
||||
protected string $primaryKey = 'refresh_date';
|
||||
|
||||
protected array $fillable = [
|
||||
'refresh_date',
|
||||
'aggregate_view',
|
||||
'created_at',
|
||||
];
|
||||
|
||||
protected array $casts = [
|
||||
'created_at' => 'datetime',
|
||||
];
|
||||
}
|
||||
@@ -5,8 +5,10 @@ declare(strict_types=1);
|
||||
namespace App\Platform;
|
||||
|
||||
use App\Entity\Parse\EntityParseFactory;
|
||||
use App\Model\AggregateRefreshQueue;
|
||||
use App\Platform\Traits\FailedMessageTrait;
|
||||
use App\Utils\Log;
|
||||
use Carbon\Carbon;
|
||||
use Hyperf\Amqp\Annotation\Consumer;
|
||||
use Hyperf\Amqp\Builder\QueueBuilder;
|
||||
use Hyperf\Amqp\Message\ConsumerMessage;
|
||||
@@ -152,9 +154,10 @@ class OrderConsumer extends ConsumerMessage
|
||||
// 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem
|
||||
$this->processOrderItems($items);
|
||||
|
||||
// 5. 识别 ≥ 3 天前的 created_date 入队,补刷自动策略未覆盖的窗口
|
||||
$this->enqueueAffectedDates($orders_data);
|
||||
|
||||
Db::commit();
|
||||
// @TODO 触发事件通知,更新自动聚合任务
|
||||
|
||||
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
|
||||
return Result::ACK;
|
||||
@@ -379,4 +382,51 @@ class OrderConsumer extends ConsumerMessage
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 识别 payload 中 ≥ 3 天前的 created_date,入队 orders_daily_by_created 兜底刷新。
|
||||
*
|
||||
* 自动刷新策略仅覆盖最近 3 天窗口;3 天前的订单变更(补录、追溯调整)需由
|
||||
* aggregate_refresh_queue + Crontab 任务补刷。仅服务 by_created 视图,
|
||||
* by_paid 由全量 REFRESH 覆盖,不入此队列。
|
||||
*
|
||||
* @param array $payloads 来自 entityMap()->all() 的订单数组,每条含 created_date 字段
|
||||
*/
|
||||
protected function enqueueAffectedDates(array $payloads): void
|
||||
{
|
||||
if (empty($payloads)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$threshold = Carbon::now()->subDays(3)->toDateString();
|
||||
|
||||
$unique_dates = [];
|
||||
foreach ($payloads as $payload) {
|
||||
$created = $payload['created_date'] ?? null;
|
||||
if ($created === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// entityMap 输出 'Y-m-d H:i:sP';用 Carbon::parse 兼容多种格式
|
||||
$date = Carbon::parse($created)->toDateString();
|
||||
|
||||
// 严格小于阈值才入队(≥ 阈值的部分由自动刷新策略覆盖)
|
||||
if ($date < $threshold) {
|
||||
$unique_dates[$date] = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (empty($unique_dates)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$now = Carbon::now();
|
||||
foreach (array_keys($unique_dates) as $date) {
|
||||
AggregateRefreshQueue::query()->insertOrIgnore([
|
||||
'refresh_date' => $date,
|
||||
'aggregate_view' => 'orders_daily_by_created',
|
||||
'created_at' => $now,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Service;
|
||||
|
||||
/**
|
||||
* 聚合视图刷新接口
|
||||
*
|
||||
* 隔离 PG procedure 调用,便于在单测中注入 mock 实现。
|
||||
*/
|
||||
interface AggregateRefresherInterface
|
||||
{
|
||||
/**
|
||||
* 对指定聚合视图的指定日期刷新
|
||||
*
|
||||
* @param string $view 聚合视图名(如 'orders_daily_by_created')
|
||||
* @param string $refresh_date Y-m-d 格式日期
|
||||
*/
|
||||
public function refresh(string $view, string $refresh_date): void;
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Service;
|
||||
|
||||
use Hyperf\DbConnection\Db;
|
||||
|
||||
/**
|
||||
* TimescaleDB 连续聚合刷新实现
|
||||
*
|
||||
* 调用 PG 的 refresh_continuous_aggregate 存储过程,按整日窗口刷新。
|
||||
*/
|
||||
class ContinuousAggregateRefresher implements AggregateRefresherInterface
|
||||
{
|
||||
public function refresh(string $view, string $refresh_date): void
|
||||
{
|
||||
$start = sprintf("'%s 00:00:00+00'::timestamptz", $refresh_date);
|
||||
$end = sprintf("'%s 23:59:59.999999+00'::timestamptz", $refresh_date);
|
||||
|
||||
// view 在 P23.2 中硬编码为 'orders_daily_by_created',但仍走 PDO quote 防御
|
||||
$quoted_view = Db::getPdo()->quote($view);
|
||||
|
||||
Db::statement(sprintf(
|
||||
'CALL refresh_continuous_aggregate(%s, %s, %s)',
|
||||
$quoted_view,
|
||||
$start,
|
||||
$end
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Service;
|
||||
|
||||
use App\Model\AggregateRefreshQueue;
|
||||
use App\Utils\Log;
|
||||
use Carbon\Carbon;
|
||||
use Throwable;
|
||||
|
||||
/**
|
||||
* 订单聚合刷新任务
|
||||
*
|
||||
* 消费 aggregate_refresh_queue 中 created_at < now() - 1 hour 的项,
|
||||
* 逐条调用 AggregateRefresherInterface 刷新对应日期的连续聚合,
|
||||
* 成功后从队列中删除;失败保留队列项由下次任务重试。
|
||||
*
|
||||
* 由 OrderAggregatesRefreshCommand(CLI 入口)和 Crontab(定时入口)共同调用。
|
||||
*/
|
||||
class OrderAggregatesRefreshJob
|
||||
{
|
||||
public function __construct(private AggregateRefresherInterface $refresher)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{processed: int, failed: int} 处理与失败计数
|
||||
*/
|
||||
public function run(): array
|
||||
{
|
||||
$items = AggregateRefreshQueue::query()
|
||||
->where('created_at', '<', Carbon::now()->subHour())
|
||||
->orderBy('refresh_date')
|
||||
->orderBy('aggregate_view')
|
||||
->get();
|
||||
|
||||
if ($items->isEmpty()) {
|
||||
Log::get()->info('orders:refresh-aggregates queue empty');
|
||||
return ['processed' => 0, 'failed' => 0];
|
||||
}
|
||||
|
||||
$processed = 0;
|
||||
$failed = 0;
|
||||
|
||||
foreach ($items as $item) {
|
||||
$view = $item->aggregate_view;
|
||||
$date = $item->refresh_date;
|
||||
|
||||
try {
|
||||
$this->refresher->refresh($view, $date);
|
||||
|
||||
AggregateRefreshQueue::query()
|
||||
->where('refresh_date', $date)
|
||||
->where('aggregate_view', $view)
|
||||
->delete();
|
||||
|
||||
$processed++;
|
||||
} catch (Throwable $e) {
|
||||
Log::get()->error('orders:refresh-aggregates failed', [
|
||||
'view' => $view,
|
||||
'date' => $date,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
$failed++;
|
||||
}
|
||||
}
|
||||
|
||||
Log::get()->info('orders:refresh-aggregates done', [
|
||||
'processed' => $processed,
|
||||
'failed' => $failed,
|
||||
]);
|
||||
|
||||
return ['processed' => $processed, 'failed' => $failed];
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user