Files
datahub/backend/app/Command/OrderAggregatesBackfillCommand.php
2026-05-07 16:00:10 +08:00

47 lines
1.7 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace App\Command;
use Hyperf\Command\Annotation\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\DbConnection\Db;
#[Command]
class OrderAggregatesBackfillCommand extends HyperfCommand
{
public function __construct()
{
parent::__construct('orders:backfill-aggregates');
}
public function configure(): void
{
parent::configure();
$this->setDescription('一次性回填 orders_daily_by_created(连续聚合)和 orders_daily_by_paid(物化视图)的全部历史数据');
}
public function handle(): void
{
// 1. orders_daily_by_created:调用 TimescaleDB 内置 refresh
$this->line('Refreshing orders_daily_by_created (NULL → now() - 1 hour) ...');
Db::statement("CALL refresh_continuous_aggregate('orders_daily_by_created', NULL, now() - INTERVAL '1 hour')");
// 2. orders_daily_by_paidPG 物化视图。首次必须用非 CONCURRENTLY 模式填充,
// 后续重算才能走 CONCURRENTLYPG 硬约束:CONCURRENTLY cannot be used when not populated)。
$rows = Db::select("SELECT ispopulated FROM pg_matviews WHERE matviewname = 'orders_daily_by_paid'");
$populated = ! empty($rows) && $rows[0]->ispopulated;
if ($populated) {
$this->line('Refreshing orders_daily_by_paid (CONCURRENTLY) ...');
Db::statement('REFRESH MATERIALIZED VIEW CONCURRENTLY orders_daily_by_paid');
} else {
$this->line('Initial population of orders_daily_by_paid (non-concurrent) ...');
Db::statement('REFRESH MATERIALIZED VIEW orders_daily_by_paid');
}
$this->info('Done.');
}
}