diff --git a/docs/data_query.md b/docs/data_query.md new file mode 100644 index 0000000..5cfa9c5 --- /dev/null +++ b/docs/data_query.md @@ -0,0 +1,1482 @@ +# TimescaleDB 多时间维度销售统计方案 + +## 概述 + +本文档描述如何使用 TimescaleDB 实现订单系统的多时间维度销售统计,支持按**创建时间**和**付款时间**两个维度进行高性能聚合查询。 + +### 业务需求 + +- 按订单创建日期统计所有订单趋势(包括未支付订单) +- 按订单付款日期统计实际收入(仅已支付订单) +- 支持按 company、platform、store 多维度分组 +- 支持天/周/月等多种时间粒度统计 +- 历史数据查询性能要求高(毫秒级) +- 当天数据要求实时 + +### 技术方案 + +- **主表**:使用 `created_date` 作为 hypertable 分区键 +- **连续聚合**:创建两个物化视图分别按创建时间和付款时间统计 +- **查询策略**:历史数据查询聚合表,当天数据实时查询主表 + +--- + +## 核心概念与工作原理 + +### 1. Hypertable:时间分区存储引擎 + +**Hypertable 是什么?** + +Hypertable 是 TimescaleDB 的核心抽象层,它将普通的 PostgreSQL 表转换为**自动按时间分区的表**。 + +**Hypertable 的职责**: + +| 功能 | 说明 | +|------|------| +| **自动分区** | 根据时间列自动创建和管理分区(chunks) | +| **透明查询** | 查询时自动路由到相关分区,无需手动指定分区 | +| **高效写入** | 新数据写入最新分区,避免锁竞争 | +| **压缩存储** | 支持历史分区自动压缩,节省存储空间 | +| **数据保留** | 支持自动删除超过保留期的历史分区 | + +**Hypertable 不负责的工作**: + +- ❌ **不会自动聚合数据** - 仅存储原始数据 +- ❌ **不会自动创建统计视图** - 需要单独配置 +- ❌ **不会加速聚合查询** - 聚合查询仍需扫描所有数据 + +**示例**: + +```sql +-- 创建 hypertable +SELECT create_hypertable('orders', 'created_date', + chunk_time_interval => INTERVAL '7 days'); + +-- 效果: +-- 2025-01-01 ~ 2025-01-07 的数据存储在 chunk_1 +-- 2025-01-08 ~ 2025-01-14 的数据存储在 chunk_2 +-- 2025-01-15 ~ 2025-01-21 的数据存储在 chunk_3 +-- ... + +-- 查询时自动路由(用户无感知): +SELECT * FROM orders +WHERE created_date >= '2025-01-10' + AND created_date < '2025-01-12'; +-- ↑ TimescaleDB 自动只扫描 chunk_2,不扫描其他分区 +``` + +**关键理解**: + +> Hypertable 只是**存储优化**,不是**查询优化**。 +> 要加速聚合查询,还需要配置**连续聚合**。 + +--- + +### 2. 物化视图对比:三种视图的本质区别 + +| 特性 | 普通视图
(VIEW) | PostgreSQL 物化视图
(MATERIALIZED VIEW) | TimescaleDB 连续聚合
(Continuous Aggregate) | +|------|-------------------|------------------------------------------|----------------------------------------------| +| **数据存储** | ❌ 不存储,每次查询重新计算 | ✅ 存储预计算结果 | ✅ 存储预计算结果 | +| **查询性能** | 慢(每次重新聚合) | 快(查询预计算结果) | 极快(查询预计算结果) | +| **数据刷新** | 实时(总是最新) | 手动刷新(`REFRESH MATERIALIZED VIEW`) | 自动增量刷新(后台任务) | +| **刷新开销** | N/A | 高(全量重新计算) | 低(仅计算变化的数据) | +| **历史数据更新** | 实时反映 | 手动刷新后才反映 | 配置刷新窗口或手动刷新 | +| **实时查询** | ✅ 支持 | ❌ 不支持(必须先刷新) | ✅ 支持(与实时数据混合查询) | +| **适用场景** | 数据量小,实时性要求高 | 数据量中等,可容忍手动刷新 | 时间序列数据,需要自动维护 | + +**示例对比**: + +```sql +-- 1. 普通视图(每次查询都重新聚合,慢) +CREATE VIEW daily_sales AS +SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue +FROM orders +GROUP BY day; + +SELECT * FROM daily_sales WHERE day = '2025-01-01'; +-- ↑ 扫描 orders 表中所有 2025-01-01 的数据,实时计算 + + +-- 2. PostgreSQL 物化视图(预计算,需手动刷新) +CREATE MATERIALIZED VIEW daily_sales AS +SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue +FROM orders +GROUP BY day; + +SELECT * FROM daily_sales WHERE day = '2025-01-01'; +-- ↑ 直接查询已计算的结果,快 + +-- 数据更新后必须手动刷新(全量重新计算) +REFRESH MATERIALIZED VIEW daily_sales; + + +-- 3. TimescaleDB 连续聚合(自动增量刷新) +CREATE MATERIALIZED VIEW daily_sales +WITH (timescaledb.continuous) AS +SELECT time_bucket('1 day', created_date) AS day, SUM(total_amount) AS revenue +FROM orders +GROUP BY day +WITH NO DATA; + +-- 配置自动刷新策略(每小时刷新最近 3 天的数据) +SELECT add_continuous_aggregate_policy('daily_sales', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour' +); + +SELECT * FROM daily_sales WHERE day = '2025-01-01'; +-- ↑ 查询预计算结果 + 自动包含最新数据(实时混合查询) +``` + +--- + +### 3. WITH (timescaledb.continuous) 的作用 + +**标准物化视图**: + +```sql +CREATE MATERIALIZED VIEW my_view AS +SELECT ... FROM ...; +-- ↑ 这是 PostgreSQL 标准物化视图 +-- - 需要手动 REFRESH MATERIALIZED VIEW +-- - 每次刷新都是全量重新计算 +``` + +**连续聚合**: + +```sql +CREATE MATERIALIZED VIEW my_view +WITH (timescaledb.continuous) AS -- ← 关键参数 +SELECT time_bucket('1 day', time_column) AS bucket, ... +FROM hypertable +GROUP BY bucket; +-- ↑ 这是 TimescaleDB 连续聚合 +-- - 自动后台刷新(配置刷新策略后) +-- - 增量刷新(只计算变化的 chunks) +-- - 支持实时查询(自动混合最新数据) +``` + +**WITH (timescaledb.continuous) 启用的特性**: + +1. **增量物化** - 只计算有变化的时间分区(chunks),不是全量重新计算 +2. **自动刷新机制** - 可配置后台任务自动更新数据 +3. **实时查询能力** - 查询时自动混合物化数据 + 未物化的实时数据 +4. **失效跟踪** - 自动跟踪哪些时间段的数据需要重新计算 + +**关键理解**: + +> `WITH (timescaledb.continuous)` 不是"自动开启刷新", +> 而是"允许配置增量刷新策略"。 +> 创建后还需要配置刷新策略才会自动更新! + +--- + +### 4. 完整集成步骤:三步法 + +TimescaleDB 的统计功能不是一步到位的,需要**按顺序完成三个步骤**: + +``` +步骤1: 创建 Hypertable → 步骤2: 创建连续聚合 → 步骤3: 配置刷新策略 → 自动维护统计数据 +``` + +#### 步骤 1:创建 Hypertable + +**目的**:将普通表转换为时间分区表,优化存储和查询 + +```sql +-- 在已有的 orders 表上创建 hypertable +SELECT create_hypertable('orders', 'created_date', + chunk_time_interval => INTERVAL '7 days' +); +``` + +**效果**: +- ✅ 数据按时间自动分区存储 +- ✅ 查询时自动路由到相关分区 +- ❌ 聚合查询仍然很慢(未加速) + +#### 步骤 2:创建连续聚合 + +**目的**:创建预聚合视图,存储统计结果 + +```sql +CREATE MATERIALIZED VIEW orders_daily_stats +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 day', created_date) AS day, + company_id, + COUNT(*) AS total_orders, + SUM(total_amount) AS revenue +FROM orders +GROUP BY day, company_id +WITH NO DATA; -- 先不填充数据,等待刷新策略触发 +``` + +**效果**: +- ✅ 视图已创建,可以查询 +- ❌ 数据为空(WITH NO DATA) +- ❌ 不会自动更新(未配置刷新策略) + +#### 步骤 3:配置刷新策略 + +**目的**:启用自动后台刷新,保持数据最新 + +```sql +SELECT add_continuous_aggregate_policy('orders_daily_stats', + start_offset => INTERVAL '3 days', -- 从 3 天前开始刷新 + end_offset => INTERVAL '1 hour', -- 刷新到 1 小时前(排除正在写入的数据) + schedule_interval => INTERVAL '1 hour' -- 每小时执行一次 +); +``` + +**效果**: +- ✅ 后台任务每小时自动刷新 +- ✅ 只刷新 [now() - 3天, now() - 1小时] 窗口内的数据 +- ✅ 增量计算,性能高 + +**初次填充数据**: + +```sql +-- 刷新策略只刷新配置的窗口,历史数据需要手动触发一次 +CALL refresh_continuous_aggregate('orders_daily_stats', + '2024-01-01', -- 从这个日期开始 + '2025-01-01' -- 刷新到这个日期 +); +``` + +--- + +### 5. 数据流与业务流程 + +#### 数据写入流程 + +``` +应用写入订单数据 + ↓ +orders 表接收数据 + ↓ +TimescaleDB 路由器判断时间 + ├→ Chunk 1 (2025-01-01~07) + ├→ Chunk 2 (2025-01-08~14) + ├→ Chunk 3 (2025-01-15~21) + └→ Chunk N (最新数据) +``` + +#### 数据聚合流程 + +``` +后台刷新任务(每小时执行) + ↓ +检测变化的 Chunks + ├→ Chunk 10 有新数据 → 重新计算 Chunk 10 的聚合 + ├→ Chunk 11 有更新 → 重新计算 Chunk 11 的聚合 + └→ Chunk 12 无变化 → 跳过 + ↓ +更新 orders_daily_by_created 视图 + ↓ +聚合数据已更新 +``` + +#### 查询流程(实时 + 历史混合) + +``` +查询最近 30 天销售额 + ↓ +查询优化器分析 + ├→ 历史数据(30天前 ~ 昨天) + │ ↓ + │ 查询连续聚合 orders_daily_by_created + │ (速度:毫秒级) + │ + └→ 实时数据(今天) + ↓ + 查询 orders 表实时计算 + (速度:秒级,但数据量少) + ↓ +合并结果 → 返回完整统计数据 +``` + +#### 完整业务流 + +``` +┌────────────────────────────────────────────────────────────────┐ +│ 应用层操作 │ +├────────────────────────────────────────────────────────────────┤ +│ │ +│ 1. 订单写入 │ +│ INSERT INTO orders (...) VALUES (...); │ +│ ↓ │ +│ [写入 hypertable,自动路由到对应 chunk] │ +│ │ +│ 2. 后台自动聚合(每小时执行) │ +│ [TimescaleDB 后台任务] │ +│ ↓ │ +│ 检测最近 3 天有变化的 chunks │ +│ ↓ │ +│ 重新计算这些 chunks 的聚合数据 │ +│ ↓ │ +│ 更新 orders_daily_by_created 视图 │ +│ │ +│ 3. 历史订单更新(超出刷新窗口) │ +│ UPDATE orders SET total_paid = 100 WHERE id = 123; │ +│ ↓ │ +│ [应用层检测:订单创建于 30 天前] │ +│ ↓ │ +│ INSERT INTO aggregate_refresh_queue ... │ +│ ↓ │ +│ [定时任务每天凌晨处理队列] │ +│ ↓ │ +│ CALL refresh_continuous_aggregate('...', '2024-12-01', ...) │ +│ │ +│ 4. 查询统计数据 │ +│ SELECT * FROM orders_daily_by_created │ +│ WHERE day >= '2024-12-01' AND day < CURRENT_DATE │ +│ ↓ │ +│ [查询预计算的聚合数据,毫秒级返回] │ +│ │ +│ UNION ALL │ +│ │ +│ SELECT ... FROM orders │ +│ WHERE created_date >= CURRENT_DATE │ +│ ↓ │ +│ [实时聚合当天数据,秒级返回] │ +│ │ +└────────────────────────────────────────────────────────────────┘ +``` + +**关键时间点**: + +| 数据年龄 | 处理方式 | 查询来源 | 更新方式 | +|---------|---------|---------|---------| +| **今天** | 实时数据 | orders 表(实时聚合) | 写入即可见 | +| **1小时前 ~ 3天前** | 自动刷新窗口 | 连续聚合 | 后台任务每小时刷新 | +| **3天前 ~ N年前** | 历史数据 | 连续聚合 | 手动刷新或刷新队列 | + +--- + +## 架构设计 + +### 1. Hypertable 分区策略 + +TimescaleDB 的 hypertable 只能有一个时间分区键,我们选择 `created_date`: + +**原因**: +- 所有订单都有 `created_date`(非 NULL) +- `paid_date` 在未支付时为 NULL,无法作为分区键 +- 订单按创建时间自然增长,适合时间序列存储 + +**配置**: +```sql +SELECT create_hypertable('orders', 'created_date', + chunk_time_interval => INTERVAL '7 days'); +``` + +### 2. 连续聚合机制 + +连续聚合(Continuous Aggregates)是 TimescaleDB 提供的自动维护的物化视图: + +**特性**: +- 自动增量更新,只计算变化的数据 +- 支持配置刷新策略(时间窗口、刷新频率) +- 查询速度远快于实时聚合(预计算结果) +- 支持与实时数据混合查询 + +**两个聚合视图**: + +1. **orders_daily_by_created** - 按创建日期统计 + - 统计所有订单(包括未支付) + - 区分已支付和未支付订单数量 + - 适用于分析订单趋势、转化率 + +2. **orders_daily_by_paid** - 按付款日期统计 + - 仅统计已支付订单(WHERE paid_date IS NOT NULL) + - 反映真实收入时间分布 + - 适用于财务报表、收入分析 + +--- + +## 实施步骤 + +### 步骤 1:修改 orders 表迁移文件 + +文件:`backend/migrations/2025_11_11_083522_create_orders_table.php` + +在 `up()` 方法末尾添加 TimescaleDB 配置: + +```php +public function up(): void +{ + // ... 现有的表创建和索引代码 ... + + // 为 jsonb 字段创建 GIN 索引(PostgreSQL) + Schema::getConnection()->statement('CREATE INDEX orders_raw_gin_idx ON orders USING gin (raw)'); + Schema::getConnection()->statement('CREATE INDEX orders_ext_gin_idx ON orders USING gin (ext)'); + + // ==================== TimescaleDB 配置 ==================== + + // 1. 启用 TimescaleDB 扩展(如果未启用) + Schema::getConnection()->statement('CREATE EXTENSION IF NOT EXISTS timescaledb'); + + // 2. 将 orders 表转换为 hypertable + // - 使用 created_date 作为时间分区键 + // - 每个分区(chunk)存储 7 天数据 + // - if_not_exists => true 避免重复创建报错 + Schema::getConnection()->statement( + "SELECT create_hypertable('orders', 'created_date', + chunk_time_interval => INTERVAL '7 days', + if_not_exists => TRUE)" + ); + + // 3. (可选)设置数据保留策略 - 自动删除 2 年前的数据 + // Schema::getConnection()->statement( + // "SELECT add_retention_policy('orders', INTERVAL '2 years')" + // ); +} +``` + +**参数说明**: +- `chunk_time_interval`: 分区间隔,根据数据量调整 + - 小数据量:7 天(默认) + - 中等数据量:1 天 + - 大数据量:半天或更小 +- `if_not_exists`: 避免重复创建时报错 + +### 步骤 2:创建连续聚合迁移文件 + +创建文件:`backend/migrations/2025_11_12_XXXXXX_create_orders_continuous_aggregates.php` + +```php +statement(" + CREATE MATERIALIZED VIEW orders_daily_by_created + WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 day', created_date) AS day, + company_id, + platform_id, + store_id, + + -- 订单数量指标 + COUNT(*) AS total_orders, + COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders, + COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders, + + -- 销售额指标 + SUM(total_amount) AS sum_total_amount, + SUM(total_paid) AS sum_total_paid, + SUM(total_received) AS sum_total_received, + + -- 平均值指标 + AVG(total_amount) AS avg_order_amount, + AVG(total_paid) FILTER (WHERE paid_date IS NOT NULL) AS avg_paid_amount, + + -- 费用指标 + SUM(freight_fee) AS sum_freight_fee, + SUM(tax_fee) AS sum_tax_fee, + SUM(commission_fee) AS sum_commission_fee, + SUM(discount_fee) AS sum_discount_fee + + FROM orders + GROUP BY day, company_id, platform_id, store_id + WITH NO DATA; + "); + + // 配置自动刷新策略 + // - 刷新最近 3 天到 1 小时前的数据 + // - 每小时执行一次 + $conn->statement(" + SELECT add_continuous_aggregate_policy('orders_daily_by_created', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour' + ); + "); + + // 创建索引加速查询 + $conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, company_id)'); + $conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, platform_id)'); + $conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, store_id)'); + $conn->statement('CREATE INDEX ON orders_daily_by_created (company_id, day DESC)'); + $conn->statement('CREATE INDEX ON orders_daily_by_created (store_id, day DESC)'); + + // ========== 连续聚合 2:按付款日期统计 ========== + + $conn->statement(" + CREATE MATERIALIZED VIEW orders_daily_by_paid + WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 day', paid_date) AS day, + company_id, + platform_id, + store_id, + + -- 已支付订单统计 + COUNT(*) AS paid_orders, + + -- 销售额指标 + SUM(total_amount) AS sum_total_amount, + SUM(total_paid) AS sum_total_paid, + SUM(total_received) AS sum_total_received, + + -- 平均值指标 + AVG(total_amount) AS avg_order_amount, + AVG(total_paid) AS avg_paid_amount, + + -- 费用指标 + SUM(freight_fee) AS sum_freight_fee, + SUM(tax_fee) AS sum_tax_fee, + SUM(commission_fee) AS sum_commission_fee, + SUM(discount_fee) AS sum_discount_fee + + FROM orders + WHERE paid_date IS NOT NULL -- 只统计已支付订单 + GROUP BY day, company_id, platform_id, store_id + WITH NO DATA; + "); + + // 配置自动刷新策略 + $conn->statement(" + SELECT add_continuous_aggregate_policy('orders_daily_by_paid', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour' + ); + "); + + // 创建索引 + $conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, company_id)'); + $conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, platform_id)'); + $conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, store_id)'); + $conn->statement('CREATE INDEX ON orders_daily_by_paid (company_id, day DESC)'); + $conn->statement('CREATE INDEX ON orders_daily_by_paid (store_id, day DESC)'); + } + + /** + * Reverse the migrations. + */ + public function down(): void + { + $conn = Schema::getConnection(); + + // 删除连续聚合视图(CASCADE 会自动删除关联的刷新策略) + $conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_paid CASCADE'); + $conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_created CASCADE'); + } +}; +``` + +**刷新策略参数说明**: +- `start_offset`: 从多久之前开始刷新(相对于当前时间) +- `end_offset`: 刷新到多久之前结束(避免刷新正在写入的数据) +- `schedule_interval`: 刷新频率 + +**示例配置**: +```sql +-- 每小时刷新最近 3 天的数据 +start_offset => INTERVAL '3 days' +end_offset => INTERVAL '1 hour' +schedule_interval => INTERVAL '1 hour' + +-- 实际刷新窗口:[now() - 3 days, now() - 1 hour] +``` + +--- + +## 历史数据更新同步问题 + +### 问题描述 + +连续聚合的刷新策略只覆盖配置的时间窗口(如最近 3 天),当**超出窗口的历史订单被修改**时,聚合数据不会自动更新。 + +**常见场景**: +- 历史订单退款(修改 total_paid) +- 补录付款时间(更新 paid_date) +- 订单状态更正 +- 金额调整 + +### 解决方案对比 + +| 方案 | 优点 | 缺点 | 适用场景 | +|------|------|------|----------| +| **手动刷新** | 精准控制,开销小 | 需要应用层处理 | 历史修改较少 | +| **实时聚合** | 数据始终准确 | 查询性能下降 | 对准确性要求极高 | +| **扩大刷新窗口** | 自动处理 | 刷新开销增加 | 历史修改集中在近期 | +| **刷新队列** | 平衡性能和准确性 | 实现复杂度高 | 生产环境推荐 | + +### 推荐方案:刷新队列机制 + +**实现思路**: +1. 订单被修改时,记录需要刷新的日期 +2. 定时任务批量刷新队列中的日期 +3. 结合自动刷新策略,覆盖大部分场景 + +#### 1. 创建刷新队列表 + +```php +// 迁移文件 +Schema::create('aggregate_refresh_queue', function (Blueprint $table) { + $table->date('refresh_date')->comment('需要刷新的日期'); + $table->string('aggregate_view', 100)->comment('聚合视图名称'); + $table->timestamp('created_at')->comment('入队时间'); + $table->primary(['refresh_date', 'aggregate_view']); + $table->index('created_at'); +}); +``` + +#### 2. 订单更新时入队 + +```php +// 订单更新方法 +public function updateOrder($orderId, array $data): Order +{ + $order = Order::findOrFail($orderId); + $affectedDates = []; + + // 收集受影响的日期 + if (isset($data['created_date']) && $data['created_date'] != $order->created_date) { + $affectedDates[] = Carbon::parse($order->created_date)->format('Y-m-d'); + $affectedDates[] = Carbon::parse($data['created_date'])->format('Y-m-d'); + } + if (isset($data['paid_date'])) { + if ($order->paid_date) { + $affectedDates[] = Carbon::parse($order->paid_date)->format('Y-m-d'); + } + if ($data['paid_date']) { + $affectedDates[] = Carbon::parse($data['paid_date'])->format('Y-m-d'); + } + } + + // 更新订单 + $order->update($data); + + // 入队需要刷新的日期 + foreach (array_unique($affectedDates) as $date) { + $this->enqueueRefresh($date); + } + + return $order; +} + +private function enqueueRefresh(string $date): void +{ + // 只刷新 3 天前的数据(3 天内的由自动策略处理) + if (Carbon::parse($date)->lt(Carbon::today()->subDays(3))) { + DB::table('aggregate_refresh_queue')->insertOrIgnore([ + ['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => now()], + ['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_paid', 'created_at' => now()], + ]); + } +} +``` + +#### 3. 定时任务批量刷新 + +```php +// app/Command/RefreshOrderAggregatesCommand.php +where('created_at', '<', now()->subHour()) // 延迟 1 小时,避免频繁刷新 + ->orderBy('refresh_date') + ->get(); + + if ($queue->isEmpty()) { + $this->info('No aggregates to refresh.'); + return; + } + + $grouped = $queue->groupBy('aggregate_view'); + + foreach ($grouped as $view => $items) { + $dates = $items->pluck('refresh_date')->unique(); + $this->info("Refreshing {$view}: " . $dates->count() . " dates"); + + foreach ($dates as $date) { + $start = Carbon::parse($date)->startOfDay(); + $end = Carbon::parse($date)->endOfDay(); + + try { + DB::statement( + "CALL refresh_continuous_aggregate(?, ?, ?)", + [$view, $start, $end] + ); + + // 删除已刷新的记录 + DB::table('aggregate_refresh_queue') + ->where('refresh_date', $date) + ->where('aggregate_view', $view) + ->delete(); + + $this->line(" ✓ Refreshed {$date}"); + } catch (\Exception $e) { + $this->error(" ✗ Failed to refresh {$date}: " . $e->getMessage()); + } + } + } + + $this->info('Refresh completed.'); + } +} +``` + +**配置定时任务**(每天凌晨 2 点执行): +```php +// config/autoload/crontab.php +return [ + 'enable' => true, + 'crontab' => [ + [ + 'name' => 'refresh-order-aggregates', + 'rule' => '0 2 * * *', // 每天凌晨 2 点 + 'callback' => [App\Command\RefreshOrderAggregatesCommand::class, 'handle'], + 'memo' => '刷新订单聚合数据', + ], + ], +]; +``` + +--- + +## 查询示例 + +### 1. 按创建日期统计(历史 + 当天) + +```sql +-- 查询最近 30 天的订单统计(按创建日期) +SELECT * FROM ( + -- 历史数据(从连续聚合查询,速度快) + SELECT + day, + company_id, + platform_id, + store_id, + total_orders, + paid_orders, + unpaid_orders, + sum_total_amount, + sum_total_paid, + avg_order_amount + FROM orders_daily_by_created + WHERE day >= CURRENT_DATE - INTERVAL '30 days' + AND day < CURRENT_DATE + + UNION ALL + + -- 当天实时数据 + SELECT + time_bucket('1 day', created_date) AS day, + company_id, + platform_id, + store_id, + COUNT(*) AS total_orders, + COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders, + COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders, + SUM(total_amount) AS sum_total_amount, + SUM(total_paid) AS sum_total_paid, + AVG(total_amount) AS avg_order_amount + FROM orders + WHERE created_date >= CURRENT_DATE + GROUP BY day, company_id, platform_id, store_id +) AS combined +ORDER BY day DESC, company_id, platform_id, store_id; +``` + +### 2. 按付款日期统计(历史 + 当天) + +```sql +-- 查询最近 30 天的实际收入(按付款日期) +SELECT * FROM ( + -- 历史数据 + SELECT + day, + company_id, + platform_id, + store_id, + paid_orders, + sum_total_paid, + avg_paid_amount + FROM orders_daily_by_paid + WHERE day >= CURRENT_DATE - INTERVAL '30 days' + AND day < CURRENT_DATE + + UNION ALL + + -- 当天实时数据 + SELECT + time_bucket('1 day', paid_date) AS day, + company_id, + platform_id, + store_id, + COUNT(*) AS paid_orders, + SUM(total_paid) AS sum_total_paid, + AVG(total_paid) AS avg_paid_amount + FROM orders + WHERE paid_date >= CURRENT_DATE + AND paid_date IS NOT NULL + GROUP BY day, company_id, platform_id, store_id +) AS combined +ORDER BY day DESC; +``` + +### 3. 按公司(Company)分组统计 + +```sql +-- 统计各公司每天的订单情况 +SELECT + day, + company_id, + SUM(total_orders) AS total_orders, + SUM(paid_orders) AS paid_orders, + SUM(sum_total_amount) AS total_amount, + SUM(sum_total_paid) AS total_paid, + AVG(avg_order_amount) AS avg_order_amount +FROM orders_daily_by_created +WHERE day >= '2025-01-01' AND day < '2025-02-01' +GROUP BY day, company_id +ORDER BY day DESC, company_id; +``` + +```sql +-- 统计各公司本月累计销售额(按付款时间) +SELECT + company_id, + SUM(paid_orders) AS total_paid_orders, + SUM(sum_total_paid) AS total_revenue, + AVG(avg_paid_amount) AS avg_order_value +FROM orders_daily_by_paid +WHERE day >= date_trunc('month', CURRENT_DATE) + AND day < CURRENT_DATE +GROUP BY company_id +ORDER BY total_revenue DESC; +``` + +### 4. 按平台(Platform)分组统计 + +```sql +-- 对比各平台每天的订单量和销售额 +SELECT + day, + platform_id, + SUM(total_orders) AS total_orders, + SUM(paid_orders) AS paid_orders, + SUM(sum_total_amount) AS total_amount, + SUM(sum_total_paid) AS total_paid, + ROUND(SUM(paid_orders)::numeric / NULLIF(SUM(total_orders), 0) * 100, 2) AS payment_rate +FROM orders_daily_by_created +WHERE day >= CURRENT_DATE - INTERVAL '7 days' + AND day < CURRENT_DATE +GROUP BY day, platform_id +ORDER BY day DESC, total_paid DESC; +``` + +```sql +-- 统计各平台本周每天的实际收入(按付款时间) +SELECT + day, + platform_id, + SUM(paid_orders) AS orders, + SUM(sum_total_paid) AS revenue, + SUM(sum_freight_fee) AS freight, + SUM(sum_commission_fee) AS commission +FROM orders_daily_by_paid +WHERE day >= date_trunc('week', CURRENT_DATE) + AND day < CURRENT_DATE +GROUP BY day, platform_id +ORDER BY day DESC, revenue DESC; +``` + +### 5. 按店铺(Store)分组统计 + +```sql +-- 统计各店铺每天的订单情况 +SELECT + day, + store_id, + SUM(total_orders) AS total_orders, + SUM(paid_orders) AS paid_orders, + SUM(unpaid_orders) AS unpaid_orders, + SUM(sum_total_amount) AS total_amount, + SUM(sum_total_paid) AS total_paid +FROM orders_daily_by_created +WHERE day >= '2025-01-01' AND day < '2025-02-01' +GROUP BY day, store_id +ORDER BY day DESC, total_paid DESC; +``` + +```sql +-- 查询指定店铺的日销售趋势(按付款时间) +SELECT + day, + paid_orders, + sum_total_paid AS revenue, + avg_paid_amount, + sum_freight_fee, + sum_tax_fee +FROM orders_daily_by_paid +WHERE store_id = 123 + AND day >= CURRENT_DATE - INTERVAL '30 days' + AND day < CURRENT_DATE +ORDER BY day DESC; +``` + +### 6. 多维度组合查询 + +```sql +-- 查询特定公司下各店铺的每日销售情况 +SELECT + day, + company_id, + store_id, + SUM(total_orders) AS orders, + SUM(paid_orders) AS paid, + SUM(sum_total_paid) AS revenue, + AVG(avg_paid_amount) AS avg_value +FROM orders_daily_by_created +WHERE company_id = 100 + AND day >= '2025-01-01' AND day < '2025-02-01' +GROUP BY day, company_id, store_id +ORDER BY day DESC, revenue DESC; +``` + +```sql +-- 对比各公司在不同平台的销售表现(按付款时间) +SELECT + company_id, + platform_id, + SUM(paid_orders) AS total_orders, + SUM(sum_total_paid) AS total_revenue, + AVG(avg_paid_amount) AS avg_order_value, + SUM(sum_commission_fee) AS total_commission, + ROUND((SUM(sum_commission_fee) / NULLIF(SUM(sum_total_paid), 0) * 100), 2) AS commission_rate +FROM orders_daily_by_paid +WHERE day >= date_trunc('month', CURRENT_DATE) + AND day < CURRENT_DATE +GROUP BY company_id, platform_id +ORDER BY company_id, total_revenue DESC; +``` + +```sql +-- 查询指定公司、平台、店铺的销售趋势 +SELECT + day, + total_orders, + paid_orders, + unpaid_orders, + sum_total_amount, + sum_total_paid, + avg_order_amount +FROM orders_daily_by_created +WHERE company_id = 100 + AND platform_id = 200 + AND store_id = 300 + AND day >= CURRENT_DATE - INTERVAL '90 days' + AND day < CURRENT_DATE +ORDER BY day DESC; +``` + +### 7. 周/月聚合查询 + +```sql +-- 按周统计(按创建时间) +SELECT + time_bucket('7 days', day) AS week, + company_id, + SUM(total_orders) AS weekly_orders, + SUM(paid_orders) AS weekly_paid_orders, + SUM(sum_total_paid) AS weekly_revenue, + AVG(avg_order_amount) AS avg_order_value +FROM orders_daily_by_created +WHERE day >= '2025-01-01' +GROUP BY week, company_id +ORDER BY week DESC, company_id; +``` + +```sql +-- 按月统计(按付款时间) +SELECT + date_trunc('month', day) AS month, + platform_id, + SUM(paid_orders) AS monthly_orders, + SUM(sum_total_paid) AS monthly_revenue, + SUM(sum_freight_fee) AS monthly_freight, + SUM(sum_commission_fee) AS monthly_commission +FROM orders_daily_by_paid +WHERE day >= '2024-01-01' +GROUP BY month, platform_id +ORDER BY month DESC, monthly_revenue DESC; +``` + +### 8. 实时 + 历史混合查询(应用层封装) + +**PHP 查询示例**: + +```php +lt($today)) { + $historyEnd = $end->lt($today) ? $end : $today->copy()->subDay(); + + $history = Db::select(" + SELECT + day::date, + total_orders, + paid_orders, + unpaid_orders, + sum_total_amount, + sum_total_paid, + avg_order_amount + FROM orders_daily_by_created + WHERE store_id = ? + AND day >= ? + AND day <= ? + ORDER BY day DESC + ", [$storeId, $start->format('Y-m-d'), $historyEnd->format('Y-m-d')]); + + $results = array_merge($results, $history); + } + + // 查询当天实时数据 + if ($end->gte($today)) { + $realtime = Db::select(" + SELECT + time_bucket('1 day', created_date)::date AS day, + COUNT(*) AS total_orders, + COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders, + COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders, + SUM(total_amount) AS sum_total_amount, + SUM(total_paid) AS sum_total_paid, + AVG(total_amount) AS avg_order_amount + FROM orders + WHERE store_id = ? + AND created_date >= ? + AND created_date < ? + GROUP BY day + ORDER BY day DESC + ", [$storeId, $today->format('Y-m-d H:i:s'), $end->addDay()->format('Y-m-d H:i:s')]); + + $results = array_merge($results, $realtime); + } + + return $results; + } + + /** + * 查询各公司本月销售排名(按付款时间) + * + * @return array + */ + public function getMonthlyRevenueByCompany(): array + { + $monthStart = Carbon::now()->startOfMonth(); + $today = Carbon::today(); + + // 历史数据 + $history = Db::select(" + SELECT + company_id, + SUM(paid_orders) AS total_orders, + SUM(sum_total_paid) AS total_revenue, + AVG(avg_paid_amount) AS avg_order_value + FROM orders_daily_by_paid + WHERE day >= ? + AND day < ? + GROUP BY company_id + ", [$monthStart->format('Y-m-d'), $today->format('Y-m-d')]); + + // 当天实时数据 + $realtime = Db::select(" + SELECT + company_id, + COUNT(*) AS total_orders, + SUM(total_paid) AS total_revenue, + AVG(total_paid) AS avg_order_value + FROM orders + WHERE paid_date >= ? + AND paid_date IS NOT NULL + GROUP BY company_id + ", [$today->format('Y-m-d H:i:s')]); + + // 合并数据 + $combined = []; + foreach ($history as $row) { + $combined[$row->company_id] = [ + 'company_id' => $row->company_id, + 'total_orders' => $row->total_orders, + 'total_revenue' => $row->total_revenue, + 'avg_order_value' => $row->avg_order_value, + ]; + } + + foreach ($realtime as $row) { + if (isset($combined[$row->company_id])) { + $combined[$row->company_id]['total_orders'] += $row->total_orders; + $combined[$row->company_id]['total_revenue'] += $row->total_revenue; + // 重新计算平均值 + $combined[$row->company_id]['avg_order_value'] = + $combined[$row->company_id]['total_revenue'] / $combined[$row->company_id]['total_orders']; + } else { + $combined[$row->company_id] = [ + 'company_id' => $row->company_id, + 'total_orders' => $row->total_orders, + 'total_revenue' => $row->total_revenue, + 'avg_order_value' => $row->avg_order_value, + ]; + } + } + + // 按收入排序 + usort($combined, fn($a, $b) => $b['total_revenue'] <=> $a['total_revenue']); + + return $combined; + } +} +``` + +--- + +## 最佳实践 + +### 1. Chunk 大小优化(重要) + +#### Chunk 大小的核心原则 + +**TimescaleDB 官方建议**: +- **目标大小**:500 MB ~ 2 GB per chunk(最优性能范围) +- **内存限制**:Chunk 大小应 ≤ 25% 可用内存 +- **查询效率**:单次查询扫描的 chunk 数量建议 < 10 个 +- **上限**:单个 chunk 不建议超过 5 GB + +#### 计算您的 Chunk 大小 + +**步骤 1:估算单行数据大小** + +``` +orders 表单行估算(含索引): +- 基础字段(id, company_id, 金额等):~300 bytes +- 时间字段(4个 timestampTz):~32 bytes +- 文本字段(zipcode, city, province, country):~200 bytes +- JSONB 字段(raw, ext):~500-1000 bytes(视内容而定) +- 索引开销(14个索引):~800 bytes +───────────────────────────────────────── +总计:约 1.5 - 2.5 KB/行(平均 2KB) +``` + +**步骤 2:计算每天数据量** + +``` +公式:日数据量 = 日订单数 × 单行大小 + +示例: +- 10,000 单/天 × 2KB = 20 MB/天 +- 50,000 单/天 × 2KB = 100 MB/天 +- 100,000 单/天 × 2KB = 200 MB/天 +- 500,000 单/天 × 2KB = 1 GB/天 +``` + +**步骤 3:选择 chunk_time_interval** + +| 日订单量 | 日数据量 | 推荐间隔 | Chunk 大小 | 性能评价 | +|---------|---------|---------|-----------|---------| +| < 5千 | < 10 MB/天 | **14-30天** | 140-300 MB | ✅ 最优:减少 chunk 数量 | +| 5千-2万 | 10-40 MB/天 | **7-14天** | 70-560 MB | ✅ 最优:平衡性能 | +| 2万-10万 | 40-200 MB/天 | **3-7天** | 120MB-1.4GB | ✅ 推荐 | +| 10万-50万 | 200MB-1GB/天 | **1-3天** | 200MB-3GB | ✅ 推荐 | +| 50万-100万 | 1-2 GB/天 | **12-24小时** | 1.5-4 GB | ✅ 可接受 | +| > 100万 | > 2 GB/天 | **6-12小时** | 2-4 GB | ⚠️ 需监控性能 | + +#### 查询模式影响分析 + +**常见查询时间范围 vs Chunk 配置**: + +```sql +-- 假设您最常查询"最近 30 天"数据 +SELECT * FROM orders +WHERE created_date >= CURRENT_DATE - INTERVAL '30 days'; + +-- 不同配置扫描的 chunk 数量: +-- 1天间隔:30个chunks ❌ 过多,规划时间长 +-- 3天间隔:10个chunks ✅ 理想 +-- 7天间隔:5个chunks ✅ 最优 +-- 14天间隔:3个chunks ✅ 很好 +-- 30天间隔:1个chunk ⚠️ chunk 过大,可能超出内存 +``` + +**最佳实践**: +> 根据您最常见的查询时间范围,配置 chunk_time_interval 使得单次查询扫描 3-8 个 chunks。 + +#### 实际配置示例 + +```php +// 场景 1:小规模电商(日均 5000 单) +Schema::getConnection()->statement( + "SELECT create_hypertable('orders', 'created_date', + chunk_time_interval => INTERVAL '14 days', -- 约 140 MB/chunk + if_not_exists => TRUE)" +); + +// 场景 2:中等规模(日均 5 万单) +Schema::getConnection()->statement( + "SELECT create_hypertable('orders', 'created_date', + chunk_time_interval => INTERVAL '7 days', -- 约 700 MB/chunk + if_not_exists => TRUE)" +); + +// 场景 3:大规模平台(日均 50 万单) +Schema::getConnection()->statement( + "SELECT create_hypertable('orders', 'created_date', + chunk_time_interval => INTERVAL '1 day', -- 约 1 GB/chunk + if_not_exists => TRUE)" +); + +// 场景 4:超大规模(日均 200 万单) +Schema::getConnection()->statement( + "SELECT create_hypertable('orders', 'created_date', + chunk_time_interval => INTERVAL '12 hours', -- 约 2 GB/chunk + if_not_exists => TRUE)" +); +``` + +#### 动态调整 Chunk 间隔 + +如果创建 hypertable 后发现 chunk 大小不合适,可以调整: + +```sql +-- 查看当前 chunk 大小 +SELECT + chunk_name, + pg_size_pretty(pg_total_relation_size(chunk_schema || '.' || chunk_name)) AS size, + range_start, + range_end +FROM timescaledb_information.chunks +WHERE hypertable_name = 'orders' +ORDER BY range_start DESC +LIMIT 10; + +-- 如果 chunk 过小(< 100 MB)或过大(> 5 GB),调整间隔 +-- 注意:只影响新创建的 chunk,已有 chunk 不受影响 +SELECT set_chunk_time_interval('orders', INTERVAL '7 days'); +``` + +#### 性能监控指标 + +定期检查以下指标: + +```sql +-- 1. 查看 chunk 数量和大小分布 +SELECT + COUNT(*) AS total_chunks, + pg_size_pretty(SUM(pg_total_relation_size(chunk_schema || '.' || chunk_name))) AS total_size, + pg_size_pretty(AVG(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS avg_chunk_size, + pg_size_pretty(MIN(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS min_chunk_size, + pg_size_pretty(MAX(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS max_chunk_size +FROM timescaledb_information.chunks +WHERE hypertable_name = 'orders'; + +-- 2. 查看查询性能(EXPLAIN ANALYZE) +EXPLAIN ANALYZE +SELECT COUNT(*), SUM(total_amount) +FROM orders +WHERE created_date >= CURRENT_DATE - INTERVAL '30 days'; +-- 关注 "Chunks excluded by constraint" - 数值越高越好 +``` + +**警告信号**: +- ❌ 平均 chunk 大小 < 50 MB → chunk 间隔太小,增大间隔 +- ❌ 平均 chunk 大小 > 5 GB → chunk 间隔太大,减小间隔 +- ❌ 查询扫描 > 20 个 chunks → 考虑增大 chunk 间隔或优化查询 +- ❌ Planning Time > Execution Time → chunk 数量过多,需要增大间隔 + +### 2. 刷新策略配置 + +**低延迟场景**(数据实时性要求高): +```sql +SELECT add_continuous_aggregate_policy('orders_daily_by_created', + start_offset => INTERVAL '7 days', -- 刷新最近 7 天 + end_offset => INTERVAL '5 minutes', -- 只排除最近 5 分钟 + schedule_interval => INTERVAL '5 minutes' -- 每 5 分钟刷新 +); +``` + +**标准场景**(平衡性能和实时性): +```sql +SELECT add_continuous_aggregate_policy('orders_daily_by_created', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour' +); +``` + +**低开销场景**(历史数据为主): +```sql +SELECT add_continuous_aggregate_policy('orders_daily_by_created', + start_offset => INTERVAL '1 day', -- 只刷新昨天 + end_offset => INTERVAL '12 hours', + schedule_interval => INTERVAL '6 hours' -- 每 6 小时刷新 +); +``` + +### 3. 索引优化 + +**原则**:根据常用查询模式创建索引 + +```sql +-- 场景 1:按时间 + 店铺查询 +CREATE INDEX ON orders_daily_by_created (day DESC, store_id); + +-- 场景 2:按店铺 + 时间排序 +CREATE INDEX ON orders_daily_by_created (store_id, day DESC); + +-- 场景 3:按公司 + 平台 + 时间 +CREATE INDEX ON orders_daily_by_created (company_id, platform_id, day DESC); +``` + +### 4. 数据保留策略 + +自动删除旧数据(可选): + +```sql +-- 保留最近 2 年的数据,自动删除更早的 +SELECT add_retention_policy('orders', INTERVAL '2 years'); + +-- 查看保留策略 +SELECT * FROM timescaledb_information.jobs +WHERE proc_name = 'policy_retention'; + +-- 删除保留策略 +SELECT remove_retention_policy('orders'); +``` + +### 5. 性能监控 + +**查看 chunk 分区情况**: +```sql +SELECT chunk_name, range_start, range_end +FROM timescaledb_information.chunks +WHERE hypertable_name = 'orders' +ORDER BY range_start DESC +LIMIT 10; +``` + +**查看连续聚合刷新状态**: +```sql +SELECT view_name, completed_threshold, invalidation_threshold +FROM timescaledb_information.continuous_aggregates; +``` + +**查看后台任务执行情况**: +```sql +SELECT job_id, proc_name, scheduled, next_start, last_finish, last_successful_finish +FROM timescaledb_information.jobs +ORDER BY job_id; +``` + +### 6. 故障排查 + +**手动刷新聚合数据**: +```sql +-- 刷新指定时间范围 +CALL refresh_continuous_aggregate('orders_daily_by_created', + '2025-01-01', '2025-02-01'); + +-- 刷新所有数据(慎用,耗时长) +CALL refresh_continuous_aggregate('orders_daily_by_created', NULL, NULL); +``` + +**检查聚合数据是否最新**: +```sql +-- 查看最新的物化数据时间点 +SELECT view_name, + completed_threshold AS last_materialized_time, + NOW() - completed_threshold AS lag +FROM timescaledb_information.continuous_aggregates +WHERE view_name IN ('orders_daily_by_created', 'orders_daily_by_paid'); +``` + +--- + +## 总结 + +### 架构优势 + +1. **高性能**:历史数据查询从秒级优化到毫秒级 +2. **灵活性**:支持双时间维度(创建时间、付款时间) +3. **实时性**:当天数据实时查询,历史数据预聚合 +4. **可扩展**:支持多维度分组(company/platform/store) +5. **可维护**:自动刷新策略 + 手动刷新队列 + +### 适用场景 + +- ✅ 订单量大(日订单 > 1 万) +- ✅ 需要多时间维度分析 +- ✅ 历史数据查询频繁 +- ✅ 需要天/周/月等多种时间粒度统计 +- ✅ 需要按多维度分组(公司/平台/店铺) + +### 注意事项 + +1. **数据更新**:超出刷新窗口的历史修改需要手动刷新或使用刷新队列 +2. **索引规划**:根据实际查询模式创建合适的索引 +3. **分区间隔**:根据数据量调整,避免分区过多或过少 +4. **监控告警**:定期检查聚合刷新状态,防止数据滞后 + +--- + +## 参考资料 + +- [TimescaleDB 官方文档](https://docs.timescale.com/) +- [Continuous Aggregates 详解](https://docs.timescale.com/use-timescale/latest/continuous-aggregates/) +- [Hypertables 最佳实践](https://docs.timescale.com/use-timescale/latest/hypertables/) +- [PostgreSQL time_bucket 函数](https://docs.timescale.com/api/latest/hyperfunctions/time_bucket/)