add backfill
This commit is contained in:
@@ -0,0 +1,46 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace App\Command;
|
||||||
|
|
||||||
|
use Hyperf\Command\Annotation\Command;
|
||||||
|
use Hyperf\Command\Command as HyperfCommand;
|
||||||
|
use Hyperf\DbConnection\Db;
|
||||||
|
|
||||||
|
#[Command]
|
||||||
|
class OrderAggregatesBackfillCommand extends HyperfCommand
|
||||||
|
{
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
parent::__construct('orders:backfill-aggregates');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function configure(): void
|
||||||
|
{
|
||||||
|
parent::configure();
|
||||||
|
$this->setDescription('一次性回填 orders_daily_by_created(连续聚合)和 orders_daily_by_paid(物化视图)的全部历史数据');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function handle(): void
|
||||||
|
{
|
||||||
|
// 1. orders_daily_by_created:调用 TimescaleDB 内置 refresh
|
||||||
|
$this->line('Refreshing orders_daily_by_created (NULL → now() - 1 hour) ...');
|
||||||
|
Db::statement("CALL refresh_continuous_aggregate('orders_daily_by_created', NULL, now() - INTERVAL '1 hour')");
|
||||||
|
|
||||||
|
// 2. orders_daily_by_paid:PG 物化视图。首次必须用非 CONCURRENTLY 模式填充,
|
||||||
|
// 后续重算才能走 CONCURRENTLY(PG 硬约束:CONCURRENTLY cannot be used when not populated)。
|
||||||
|
$rows = Db::select("SELECT ispopulated FROM pg_matviews WHERE matviewname = 'orders_daily_by_paid'");
|
||||||
|
$populated = ! empty($rows) && $rows[0]->ispopulated;
|
||||||
|
|
||||||
|
if ($populated) {
|
||||||
|
$this->line('Refreshing orders_daily_by_paid (CONCURRENTLY) ...');
|
||||||
|
Db::statement('REFRESH MATERIALIZED VIEW CONCURRENTLY orders_daily_by_paid');
|
||||||
|
} else {
|
||||||
|
$this->line('Initial population of orders_daily_by_paid (non-concurrent) ...');
|
||||||
|
Db::statement('REFRESH MATERIALIZED VIEW orders_daily_by_paid');
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->info('Done.');
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,94 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace HyperfTest\Cases\Integration\Materialization;
|
||||||
|
|
||||||
|
use Hyperf\DbConnection\Db;
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* P22.3 物化对象集成测试:跨两类物化对象 + 跨命名空间索引的存在性断言。
|
||||||
|
*
|
||||||
|
* 与 P22.2 烟雾测试 `System/MaterializedViewSmokeTest` 各自关注点不同:
|
||||||
|
* - 烟雾测试:单点测试,专测 by_paid 视图与索引
|
||||||
|
* - 本测试:跨两类对象(连续聚合 + PG 物化视图)+ 跨命名空间索引(pg_indexes 不区分视图类型)
|
||||||
|
*
|
||||||
|
* @internal
|
||||||
|
* @coversNothing
|
||||||
|
*/
|
||||||
|
class MaterializationObjectsTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @template T
|
||||||
|
* @param callable(): T $callback
|
||||||
|
* @return T
|
||||||
|
*/
|
||||||
|
protected function runInCoroutine(callable $callback): mixed
|
||||||
|
{
|
||||||
|
if (\Swoole\Coroutine::getCid() > 0) {
|
||||||
|
return $callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
$result = null;
|
||||||
|
$exception = null;
|
||||||
|
\Swoole\Coroutine\run(static function () use ($callback, &$result, &$exception): void {
|
||||||
|
try {
|
||||||
|
$result = $callback();
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
$exception = $e;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if ($exception !== null) {
|
||||||
|
throw $exception;
|
||||||
|
}
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function test_continuous_aggregate_exists(): void
|
||||||
|
{
|
||||||
|
$rows = $this->runInCoroutine(static fn () => Db::select(
|
||||||
|
"SELECT view_name FROM timescaledb_information.continuous_aggregates
|
||||||
|
WHERE view_name = 'orders_daily_by_created'"
|
||||||
|
));
|
||||||
|
$this->assertCount(1, $rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function test_pg_materialized_view_exists(): void
|
||||||
|
{
|
||||||
|
$rows = $this->runInCoroutine(static fn () => Db::select(
|
||||||
|
"SELECT matviewname FROM pg_matviews
|
||||||
|
WHERE matviewname = 'orders_daily_by_paid'"
|
||||||
|
));
|
||||||
|
$this->assertCount(1, $rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function test_refresh_policy_registered(): void
|
||||||
|
{
|
||||||
|
// 显式过滤 hypertable_name,避免未来追加其他连续聚合策略时此断言无关地失败。
|
||||||
|
$rows = $this->runInCoroutine(static fn () => Db::select(
|
||||||
|
"SELECT hypertable_name FROM timescaledb_information.jobs
|
||||||
|
WHERE proc_name = 'policy_refresh_continuous_aggregate'
|
||||||
|
AND hypertable_name = 'orders_daily_by_created'"
|
||||||
|
));
|
||||||
|
$this->assertCount(1, $rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function test_by_created_indexes_present(): void
|
||||||
|
{
|
||||||
|
// P22.1 创建 5 复合索引;chunk 索引名是 `_hyper_*` 不会被 LIKE 'idx_*' 匹配。
|
||||||
|
$rows = $this->runInCoroutine(static fn () => Db::select(
|
||||||
|
"SELECT indexname FROM pg_indexes WHERE indexname LIKE 'idx_orders_daily_by_created%'"
|
||||||
|
));
|
||||||
|
$this->assertCount(5, $rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function test_by_paid_indexes_present(): void
|
||||||
|
{
|
||||||
|
// P22.2 创建 1 UNIQUE + 5 复合 = 6 索引。
|
||||||
|
$rows = $this->runInCoroutine(static fn () => Db::select(
|
||||||
|
"SELECT indexname FROM pg_indexes WHERE indexname LIKE 'idx_orders_daily_by_paid%'"
|
||||||
|
));
|
||||||
|
$this->assertCount(6, $rows);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user