add daily paid view
This commit is contained in:
@@ -0,0 +1,69 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
use Hyperf\DbConnection\Db;
|
||||||
|
use Hyperf\Database\Migrations\Migration;
|
||||||
|
|
||||||
|
return new class extends Migration
|
||||||
|
{
|
||||||
|
public function up(): void
|
||||||
|
{
|
||||||
|
// PG 原生物化视图:按付款日聚合(仅已付订单)。
|
||||||
|
// 不带 WITH (timescaledb.continuous):TimescaleDB 连续聚合的 time_bucket
|
||||||
|
// 必须基于 hypertable 主时间列(created_date),无法对 paid_date 分桶。
|
||||||
|
// WITH NO DATA:避免 migrate 时长时间锁;首次填充由 P22.3 回填命令执行。
|
||||||
|
Db::statement(<<<'SQL'
|
||||||
|
CREATE MATERIALIZED VIEW orders_daily_by_paid AS
|
||||||
|
SELECT
|
||||||
|
paid_date::date AS day,
|
||||||
|
company_id,
|
||||||
|
platform_id,
|
||||||
|
store_id,
|
||||||
|
COUNT(*) AS paid_orders,
|
||||||
|
SUM(total_amount) AS sum_total_amount,
|
||||||
|
SUM(total_paid) AS sum_total_paid,
|
||||||
|
SUM(total_received) AS sum_total_received,
|
||||||
|
AVG(total_paid) AS avg_paid_amount,
|
||||||
|
SUM(freight_fee) AS sum_freight_fee,
|
||||||
|
SUM(tax_fee) AS sum_tax_fee,
|
||||||
|
SUM(commission_fee) AS sum_commission_fee,
|
||||||
|
SUM(discount_fee) AS sum_discount_fee
|
||||||
|
FROM orders
|
||||||
|
WHERE paid_date IS NOT NULL
|
||||||
|
GROUP BY day, company_id, platform_id, store_id
|
||||||
|
WITH NO DATA;
|
||||||
|
SQL);
|
||||||
|
|
||||||
|
// UNIQUE 索引:REFRESH MATERIALIZED VIEW CONCURRENTLY 的前置条件,缺失会直接报错。
|
||||||
|
Db::statement('CREATE UNIQUE INDEX idx_orders_daily_by_paid_unique ON orders_daily_by_paid (day, company_id, platform_id, store_id)');
|
||||||
|
|
||||||
|
// 5 复合索引:覆盖"最近 N 日按 X 维度"与"指定 X 全历史时间序列"两类高频查询。
|
||||||
|
Db::statement('CREATE INDEX idx_orders_daily_by_paid_day_company ON orders_daily_by_paid (day DESC, company_id)');
|
||||||
|
Db::statement('CREATE INDEX idx_orders_daily_by_paid_day_platform ON orders_daily_by_paid (day DESC, platform_id)');
|
||||||
|
Db::statement('CREATE INDEX idx_orders_daily_by_paid_day_store ON orders_daily_by_paid (day DESC, store_id)');
|
||||||
|
Db::statement('CREATE INDEX idx_orders_daily_by_paid_company_day ON orders_daily_by_paid (company_id, day DESC)');
|
||||||
|
Db::statement('CREATE INDEX idx_orders_daily_by_paid_store_day ON orders_daily_by_paid (store_id, day DESC)');
|
||||||
|
|
||||||
|
// by_created 连续聚合自动刷新策略:每小时刷新近 3 天水位线,1 小时写入缓冲避免与热数据冲突。
|
||||||
|
// 3 天前的修改由 Stage 23 aggregate_refresh_queue 兜底。
|
||||||
|
Db::statement(<<<'SQL'
|
||||||
|
SELECT add_continuous_aggregate_policy(
|
||||||
|
'orders_daily_by_created',
|
||||||
|
start_offset => INTERVAL '3 days',
|
||||||
|
end_offset => INTERVAL '1 hour',
|
||||||
|
schedule_interval => INTERVAL '1 hour'
|
||||||
|
);
|
||||||
|
SQL);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function down(): void
|
||||||
|
{
|
||||||
|
// 先 remove policy,再 drop 视图 CASCADE。
|
||||||
|
// if_exists => true 处理 by_created 视图已被 P22.1 down CASCADE 间接带走的情况。
|
||||||
|
Db::statement("SELECT remove_continuous_aggregate_policy('orders_daily_by_created', if_exists => true)");
|
||||||
|
|
||||||
|
// CASCADE 一并 drop UNIQUE 索引和 5 复合索引。
|
||||||
|
Db::statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_paid CASCADE');
|
||||||
|
}
|
||||||
|
};
|
||||||
@@ -0,0 +1,69 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace HyperfTest\Cases\Integration\System;
|
||||||
|
|
||||||
|
use Hyperf\DbConnection\Db;
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* P22.2 物化层 schema 烟雾测试:断言迁移已应用、索引齐备、刷新策略已注册。
|
||||||
|
*
|
||||||
|
* @internal
|
||||||
|
* @coversNothing
|
||||||
|
*/
|
||||||
|
class MaterializedViewSmokeTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @template T
|
||||||
|
* @param callable(): T $callback
|
||||||
|
* @return T
|
||||||
|
*/
|
||||||
|
protected function runInCoroutine(callable $callback): mixed
|
||||||
|
{
|
||||||
|
if (\Swoole\Coroutine::getCid() > 0) {
|
||||||
|
return $callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
$result = null;
|
||||||
|
$exception = null;
|
||||||
|
\Swoole\Coroutine\run(static function () use ($callback, &$result, &$exception): void {
|
||||||
|
try {
|
||||||
|
$result = $callback();
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
$exception = $e;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if ($exception !== null) {
|
||||||
|
throw $exception;
|
||||||
|
}
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function test_orders_daily_by_paid_matview_exists(): void
|
||||||
|
{
|
||||||
|
$rows = $this->runInCoroutine(static fn () => Db::select(
|
||||||
|
"SELECT matviewname FROM pg_matviews WHERE matviewname = 'orders_daily_by_paid'"
|
||||||
|
));
|
||||||
|
$this->assertCount(1, $rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function test_orders_daily_by_paid_has_six_indexes(): void
|
||||||
|
{
|
||||||
|
$rows = $this->runInCoroutine(static fn () => Db::select(
|
||||||
|
"SELECT indexname FROM pg_indexes WHERE tablename = 'orders_daily_by_paid'"
|
||||||
|
));
|
||||||
|
$this->assertCount(6, $rows, '应有 1 UNIQUE + 5 复合 = 6 个索引');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function test_orders_daily_by_created_refresh_policy_registered(): void
|
||||||
|
{
|
||||||
|
$rows = $this->runInCoroutine(static fn () => Db::select(
|
||||||
|
"SELECT job_id FROM timescaledb_information.jobs
|
||||||
|
WHERE proc_name = 'policy_refresh_continuous_aggregate'
|
||||||
|
AND hypertable_name = 'orders_daily_by_created'"
|
||||||
|
));
|
||||||
|
$this->assertCount(1, $rows);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user