47 lines
1.7 KiB
PHP
47 lines
1.7 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace App\Command;
|
||
|
||
use Hyperf\Command\Annotation\Command;
|
||
use Hyperf\Command\Command as HyperfCommand;
|
||
use Hyperf\DbConnection\Db;
|
||
|
||
#[Command]
|
||
class OrderAggregatesBackfillCommand extends HyperfCommand
|
||
{
|
||
public function __construct()
|
||
{
|
||
parent::__construct('orders:backfill-aggregates');
|
||
}
|
||
|
||
public function configure(): void
|
||
{
|
||
parent::configure();
|
||
$this->setDescription('一次性回填 orders_daily_by_created(连续聚合)和 orders_daily_by_paid(物化视图)的全部历史数据');
|
||
}
|
||
|
||
public function handle(): void
|
||
{
|
||
// 1. orders_daily_by_created:调用 TimescaleDB 内置 refresh
|
||
$this->line('Refreshing orders_daily_by_created (NULL → now() - 1 hour) ...');
|
||
Db::statement("CALL refresh_continuous_aggregate('orders_daily_by_created', NULL, now() - INTERVAL '1 hour')");
|
||
|
||
// 2. orders_daily_by_paid:PG 物化视图。首次必须用非 CONCURRENTLY 模式填充,
|
||
// 后续重算才能走 CONCURRENTLY(PG 硬约束:CONCURRENTLY cannot be used when not populated)。
|
||
$rows = Db::select("SELECT ispopulated FROM pg_matviews WHERE matviewname = 'orders_daily_by_paid'");
|
||
$populated = ! empty($rows) && $rows[0]->ispopulated;
|
||
|
||
if ($populated) {
|
||
$this->line('Refreshing orders_daily_by_paid (CONCURRENTLY) ...');
|
||
Db::statement('REFRESH MATERIALIZED VIEW CONCURRENTLY orders_daily_by_paid');
|
||
} else {
|
||
$this->line('Initial population of orders_daily_by_paid (non-concurrent) ...');
|
||
Db::statement('REFRESH MATERIALIZED VIEW orders_daily_by_paid');
|
||
}
|
||
|
||
$this->info('Done.');
|
||
}
|
||
}
|