# TimescaleDB 多时间维度销售统计方案 ## 概述 本文档描述如何使用 TimescaleDB 实现订单系统的多时间维度销售统计,支持按**创建时间**和**付款时间**两个维度进行高性能聚合查询。 ### 业务需求 - 按订单创建日期统计所有订单趋势(包括未支付订单) - 按订单付款日期统计实际收入(仅已支付订单) - 支持按 company、platform、store 多维度分组 - 支持天/周/月等多种时间粒度统计 - 历史数据查询性能要求高(毫秒级) - 当天数据要求实时 ### 技术方案 - **主表**:使用 `created_date` 作为 hypertable 分区键 - **连续聚合**:创建两个物化视图分别按创建时间和付款时间统计 - **查询策略**:历史数据查询聚合表,当天数据实时查询主表 --- ## 核心概念与工作原理 ### 1. Hypertable:时间分区存储引擎 **Hypertable 是什么?** Hypertable 是 TimescaleDB 的核心抽象层,它将普通的 PostgreSQL 表转换为**自动按时间分区的表**。 **Hypertable 的职责**: | 功能 | 说明 | |------|------| | **自动分区** | 根据时间列自动创建和管理分区(chunks) | | **透明查询** | 查询时自动路由到相关分区,无需手动指定分区 | | **高效写入** | 新数据写入最新分区,避免锁竞争 | | **压缩存储** | 支持历史分区自动压缩,节省存储空间 | | **数据保留** | 支持自动删除超过保留期的历史分区 | **Hypertable 不负责的工作**: - ❌ **不会自动聚合数据** - 仅存储原始数据 - ❌ **不会自动创建统计视图** - 需要单独配置 - ❌ **不会加速聚合查询** - 聚合查询仍需扫描所有数据 **示例**: ```sql -- 创建 hypertable SELECT create_hypertable('orders', 'created_date', chunk_time_interval => INTERVAL '7 days'); -- 效果: -- 2025-01-01 ~ 2025-01-07 的数据存储在 chunk_1 -- 2025-01-08 ~ 2025-01-14 的数据存储在 chunk_2 -- 2025-01-15 ~ 2025-01-21 的数据存储在 chunk_3 -- ... -- 查询时自动路由(用户无感知): SELECT * FROM orders WHERE created_date >= '2025-01-10' AND created_date < '2025-01-12'; -- ↑ TimescaleDB 自动只扫描 chunk_2,不扫描其他分区 ``` **关键理解**: > Hypertable 只是**存储优化**,不是**查询优化**。 > 要加速聚合查询,还需要配置**连续聚合**。 --- ### 2. 物化视图对比:三种视图的本质区别 | 特性 | 普通视图
(VIEW) | PostgreSQL 物化视图
(MATERIALIZED VIEW) | TimescaleDB 连续聚合
(Continuous Aggregate) | |------|-------------------|------------------------------------------|----------------------------------------------| | **数据存储** | ❌ 不存储,每次查询重新计算 | ✅ 存储预计算结果 | ✅ 存储预计算结果 | | **查询性能** | 慢(每次重新聚合) | 快(查询预计算结果) | 极快(查询预计算结果) | | **数据刷新** | 实时(总是最新) | 手动刷新(`REFRESH MATERIALIZED VIEW`) | 自动增量刷新(后台任务) | | **刷新开销** | N/A | 高(全量重新计算) | 低(仅计算变化的数据) | | **历史数据更新** | 实时反映 | 手动刷新后才反映 | 配置刷新窗口或手动刷新 | | **实时查询** | ✅ 支持 | ❌ 不支持(必须先刷新) | ✅ 支持(与实时数据混合查询) | | **适用场景** | 数据量小,实时性要求高 | 数据量中等,可容忍手动刷新 | 时间序列数据,需要自动维护 | **示例对比**: ```sql -- 1. 普通视图(每次查询都重新聚合,慢) CREATE VIEW daily_sales AS SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue FROM orders GROUP BY day; SELECT * FROM daily_sales WHERE day = '2025-01-01'; -- ↑ 扫描 orders 表中所有 2025-01-01 的数据,实时计算 -- 2. PostgreSQL 物化视图(预计算,需手动刷新) CREATE MATERIALIZED VIEW daily_sales AS SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue FROM orders GROUP BY day; SELECT * FROM daily_sales WHERE day = '2025-01-01'; -- ↑ 直接查询已计算的结果,快 -- 数据更新后必须手动刷新(全量重新计算) REFRESH MATERIALIZED VIEW daily_sales; -- 3. TimescaleDB 连续聚合(自动增量刷新) CREATE MATERIALIZED VIEW daily_sales WITH (timescaledb.continuous) AS SELECT time_bucket('1 day', created_date) AS day, SUM(total_amount) AS revenue FROM orders GROUP BY day WITH NO DATA; -- 配置自动刷新策略(每小时刷新最近 3 天的数据) SELECT add_continuous_aggregate_policy('daily_sales', start_offset => INTERVAL '3 days', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '1 hour' ); SELECT * FROM daily_sales WHERE day = '2025-01-01'; -- ↑ 查询预计算结果 + 自动包含最新数据(实时混合查询) ``` --- ### 3. WITH (timescaledb.continuous) 的作用 **标准物化视图**: ```sql CREATE MATERIALIZED VIEW my_view AS SELECT ... FROM ...; -- ↑ 这是 PostgreSQL 标准物化视图 -- - 需要手动 REFRESH MATERIALIZED VIEW -- - 每次刷新都是全量重新计算 ``` **连续聚合**: ```sql CREATE MATERIALIZED VIEW my_view WITH (timescaledb.continuous) AS -- ← 关键参数 SELECT time_bucket('1 day', time_column) AS bucket, ... FROM hypertable GROUP BY bucket; -- ↑ 这是 TimescaleDB 连续聚合 -- - 自动后台刷新(配置刷新策略后) -- - 增量刷新(只计算变化的 chunks) -- - 支持实时查询(自动混合最新数据) ``` **WITH (timescaledb.continuous) 启用的特性**: 1. **增量物化** - 只计算有变化的时间分区(chunks),不是全量重新计算 2. **自动刷新机制** - 可配置后台任务自动更新数据 3. **实时查询能力** - 查询时自动混合物化数据 + 未物化的实时数据 4. **失效跟踪** - 自动跟踪哪些时间段的数据需要重新计算 **关键理解**: > `WITH (timescaledb.continuous)` 不是"自动开启刷新", > 而是"允许配置增量刷新策略"。 > 创建后还需要配置刷新策略才会自动更新! --- ### 4. 完整集成步骤:三步法 TimescaleDB 的统计功能不是一步到位的,需要**按顺序完成三个步骤**: ``` 步骤1: 创建 Hypertable → 步骤2: 创建连续聚合 → 步骤3: 配置刷新策略 → 自动维护统计数据 ``` #### 步骤 1:创建 Hypertable **目的**:将普通表转换为时间分区表,优化存储和查询 ```sql -- 在已有的 orders 表上创建 hypertable SELECT create_hypertable('orders', 'created_date', chunk_time_interval => INTERVAL '7 days' ); ``` **效果**: - ✅ 数据按时间自动分区存储 - ✅ 查询时自动路由到相关分区 - ❌ 聚合查询仍然很慢(未加速) #### 步骤 2:创建连续聚合 **目的**:创建预聚合视图,存储统计结果 ```sql CREATE MATERIALIZED VIEW orders_daily_stats WITH (timescaledb.continuous) AS SELECT time_bucket('1 day', created_date) AS day, company_id, COUNT(*) AS total_orders, SUM(total_amount) AS revenue FROM orders GROUP BY day, company_id WITH NO DATA; -- 先不填充数据,等待刷新策略触发 ``` **效果**: - ✅ 视图已创建,可以查询 - ❌ 数据为空(WITH NO DATA) - ❌ 不会自动更新(未配置刷新策略) #### 步骤 3:配置刷新策略 **目的**:启用自动后台刷新,保持数据最新 ```sql SELECT add_continuous_aggregate_policy('orders_daily_stats', start_offset => INTERVAL '3 days', -- 从 3 天前开始刷新 end_offset => INTERVAL '1 hour', -- 刷新到 1 小时前(排除正在写入的数据) schedule_interval => INTERVAL '1 hour' -- 每小时执行一次 ); ``` **效果**: - ✅ 后台任务每小时自动刷新 - ✅ 只刷新 [now() - 3天, now() - 1小时] 窗口内的数据 - ✅ 增量计算,性能高 **初次填充数据**: ```sql -- 刷新策略只刷新配置的窗口,历史数据需要手动触发一次 CALL refresh_continuous_aggregate('orders_daily_stats', '2024-01-01', -- 从这个日期开始 '2025-01-01' -- 刷新到这个日期 ); ``` --- ### 5. 数据流与业务流程 #### 数据写入流程 ``` 应用写入订单数据 ↓ orders 表接收数据 ↓ TimescaleDB 路由器判断时间 ├→ Chunk 1 (2025-01-01~07) ├→ Chunk 2 (2025-01-08~14) ├→ Chunk 3 (2025-01-15~21) └→ Chunk N (最新数据) ``` #### 数据聚合流程 ``` 后台刷新任务(每小时执行) ↓ 检测变化的 Chunks ├→ Chunk 10 有新数据 → 重新计算 Chunk 10 的聚合 ├→ Chunk 11 有更新 → 重新计算 Chunk 11 的聚合 └→ Chunk 12 无变化 → 跳过 ↓ 更新 orders_daily_by_created 视图 ↓ 聚合数据已更新 ``` #### 查询流程(实时 + 历史混合) ``` 查询最近 30 天销售额 ↓ 查询优化器分析 ├→ 历史数据(30天前 ~ 昨天) │ ↓ │ 查询连续聚合 orders_daily_by_created │ (速度:毫秒级) │ └→ 实时数据(今天) ↓ 查询 orders 表实时计算 (速度:秒级,但数据量少) ↓ 合并结果 → 返回完整统计数据 ``` #### 完整业务流 ``` ┌────────────────────────────────────────────────────────────────┐ │ 应用层操作 │ ├────────────────────────────────────────────────────────────────┤ │ │ │ 1. 订单写入 │ │ INSERT INTO orders (...) VALUES (...); │ │ ↓ │ │ [写入 hypertable,自动路由到对应 chunk] │ │ │ │ 2. 后台自动聚合(每小时执行) │ │ [TimescaleDB 后台任务] │ │ ↓ │ │ 检测最近 3 天有变化的 chunks │ │ ↓ │ │ 重新计算这些 chunks 的聚合数据 │ │ ↓ │ │ 更新 orders_daily_by_created 视图 │ │ │ │ 3. 历史订单更新(超出刷新窗口) │ │ UPDATE orders SET total_paid = 100 WHERE id = 123; │ │ ↓ │ │ [应用层检测:订单创建于 30 天前] │ │ ↓ │ │ INSERT INTO aggregate_refresh_queue ... │ │ ↓ │ │ [定时任务每天凌晨处理队列] │ │ ↓ │ │ CALL refresh_continuous_aggregate('...', '2024-12-01', ...) │ │ │ │ 4. 查询统计数据 │ │ SELECT * FROM orders_daily_by_created │ │ WHERE day >= '2024-12-01' AND day < CURRENT_DATE │ │ ↓ │ │ [查询预计算的聚合数据,毫秒级返回] │ │ │ │ UNION ALL │ │ │ │ SELECT ... FROM orders │ │ WHERE created_date >= CURRENT_DATE │ │ ↓ │ │ [实时聚合当天数据,秒级返回] │ │ │ └────────────────────────────────────────────────────────────────┘ ``` **关键时间点**: | 数据年龄 | 处理方式 | 查询来源 | 更新方式 | |---------|---------|---------|---------| | **今天** | 实时数据 | orders 表(实时聚合) | 写入即可见 | | **1小时前 ~ 3天前** | 自动刷新窗口 | 连续聚合 | 后台任务每小时刷新 | | **3天前 ~ N年前** | 历史数据 | 连续聚合 | 手动刷新或刷新队列 | --- ## 架构设计 ### 1. Hypertable 分区策略 TimescaleDB 的 hypertable 只能有一个时间分区键,我们选择 `created_date`: **原因**: - 所有订单都有 `created_date`(非 NULL) - `paid_date` 在未支付时为 NULL,无法作为分区键 - 订单按创建时间自然增长,适合时间序列存储 **配置**: ```sql SELECT create_hypertable('orders', 'created_date', chunk_time_interval => INTERVAL '7 days'); ``` ### 2. 连续聚合机制 连续聚合(Continuous Aggregates)是 TimescaleDB 提供的自动维护的物化视图: **特性**: - 自动增量更新,只计算变化的数据 - 支持配置刷新策略(时间窗口、刷新频率) - 查询速度远快于实时聚合(预计算结果) - 支持与实时数据混合查询 **两个聚合视图**: 1. **orders_daily_by_created** - 按创建日期统计 - 统计所有订单(包括未支付) - 区分已支付和未支付订单数量 - 适用于分析订单趋势、转化率 2. **orders_daily_by_paid** - 按付款日期统计 - 仅统计已支付订单(WHERE paid_date IS NOT NULL) - 反映真实收入时间分布 - 适用于财务报表、收入分析 --- ## 实施步骤 ### 步骤 1:修改 orders 表迁移文件 文件:`backend/migrations/2025_11_11_083522_create_orders_table.php` 在 `up()` 方法末尾添加 TimescaleDB 配置: ```php public function up(): void { // ... 现有的表创建和索引代码 ... // 为 jsonb 字段创建 GIN 索引(PostgreSQL) Schema::getConnection()->statement('CREATE INDEX orders_raw_gin_idx ON orders USING gin (raw)'); Schema::getConnection()->statement('CREATE INDEX orders_ext_gin_idx ON orders USING gin (ext)'); // ==================== TimescaleDB 配置 ==================== // 1. 启用 TimescaleDB 扩展(如果未启用) Schema::getConnection()->statement('CREATE EXTENSION IF NOT EXISTS timescaledb'); // 2. 将 orders 表转换为 hypertable // - 使用 created_date 作为时间分区键 // - 每个分区(chunk)存储 7 天数据 // - if_not_exists => true 避免重复创建报错 Schema::getConnection()->statement( "SELECT create_hypertable('orders', 'created_date', chunk_time_interval => INTERVAL '7 days', if_not_exists => TRUE)" ); // 3. (可选)设置数据保留策略 - 自动删除 2 年前的数据 // Schema::getConnection()->statement( // "SELECT add_retention_policy('orders', INTERVAL '2 years')" // ); } ``` **参数说明**: - `chunk_time_interval`: 分区间隔,根据数据量调整 - 小数据量:7 天(默认) - 中等数据量:1 天 - 大数据量:半天或更小 - `if_not_exists`: 避免重复创建时报错 ### 步骤 2:创建连续聚合迁移文件 创建文件:`backend/migrations/2025_11_12_XXXXXX_create_orders_continuous_aggregates.php` ```php statement(" CREATE MATERIALIZED VIEW orders_daily_by_created WITH (timescaledb.continuous) AS SELECT time_bucket('1 day', created_date) AS day, company_id, platform_id, store_id, -- 订单数量指标 COUNT(*) AS total_orders, COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders, COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders, -- 销售额指标 SUM(total_amount) AS sum_total_amount, SUM(total_paid) AS sum_total_paid, SUM(total_received) AS sum_total_received, -- 平均值指标 AVG(total_amount) AS avg_order_amount, AVG(total_paid) FILTER (WHERE paid_date IS NOT NULL) AS avg_paid_amount, -- 费用指标 SUM(freight_fee) AS sum_freight_fee, SUM(tax_fee) AS sum_tax_fee, SUM(commission_fee) AS sum_commission_fee, SUM(discount_fee) AS sum_discount_fee FROM orders GROUP BY day, company_id, platform_id, store_id WITH NO DATA; "); // 配置自动刷新策略 // - 刷新最近 3 天到 1 小时前的数据 // - 每小时执行一次 $conn->statement(" SELECT add_continuous_aggregate_policy('orders_daily_by_created', start_offset => INTERVAL '3 days', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '1 hour' ); "); // 创建索引加速查询 $conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, company_id)'); $conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, platform_id)'); $conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, store_id)'); $conn->statement('CREATE INDEX ON orders_daily_by_created (company_id, day DESC)'); $conn->statement('CREATE INDEX ON orders_daily_by_created (store_id, day DESC)'); // ========== 连续聚合 2:按付款日期统计 ========== $conn->statement(" CREATE MATERIALIZED VIEW orders_daily_by_paid WITH (timescaledb.continuous) AS SELECT time_bucket('1 day', paid_date) AS day, company_id, platform_id, store_id, -- 已支付订单统计 COUNT(*) AS paid_orders, -- 销售额指标 SUM(total_amount) AS sum_total_amount, SUM(total_paid) AS sum_total_paid, SUM(total_received) AS sum_total_received, -- 平均值指标 AVG(total_amount) AS avg_order_amount, AVG(total_paid) AS avg_paid_amount, -- 费用指标 SUM(freight_fee) AS sum_freight_fee, SUM(tax_fee) AS sum_tax_fee, SUM(commission_fee) AS sum_commission_fee, SUM(discount_fee) AS sum_discount_fee FROM orders WHERE paid_date IS NOT NULL -- 只统计已支付订单 GROUP BY day, company_id, platform_id, store_id WITH NO DATA; "); // 配置自动刷新策略 $conn->statement(" SELECT add_continuous_aggregate_policy('orders_daily_by_paid', start_offset => INTERVAL '3 days', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '1 hour' ); "); // 创建索引 $conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, company_id)'); $conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, platform_id)'); $conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, store_id)'); $conn->statement('CREATE INDEX ON orders_daily_by_paid (company_id, day DESC)'); $conn->statement('CREATE INDEX ON orders_daily_by_paid (store_id, day DESC)'); } /** * Reverse the migrations. */ public function down(): void { $conn = Schema::getConnection(); // 删除连续聚合视图(CASCADE 会自动删除关联的刷新策略) $conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_paid CASCADE'); $conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_created CASCADE'); } }; ``` **刷新策略参数说明**: - `start_offset`: 从多久之前开始刷新(相对于当前时间) - `end_offset`: 刷新到多久之前结束(避免刷新正在写入的数据) - `schedule_interval`: 刷新频率 **示例配置**: ```sql -- 每小时刷新最近 3 天的数据 start_offset => INTERVAL '3 days' end_offset => INTERVAL '1 hour' schedule_interval => INTERVAL '1 hour' -- 实际刷新窗口:[now() - 3 days, now() - 1 hour] ``` --- ## 历史数据更新同步问题 ### 问题描述 连续聚合的刷新策略只覆盖配置的时间窗口(如最近 3 天),当**超出窗口的历史订单被修改**时,聚合数据不会自动更新。 **常见场景**: - 历史订单退款(修改 total_paid) - 补录付款时间(更新 paid_date) - 订单状态更正 - 金额调整 ### 解决方案对比 | 方案 | 优点 | 缺点 | 适用场景 | |------|------|------|----------| | **手动刷新** | 精准控制,开销小 | 需要应用层处理 | 历史修改较少 | | **实时聚合** | 数据始终准确 | 查询性能下降 | 对准确性要求极高 | | **扩大刷新窗口** | 自动处理 | 刷新开销增加 | 历史修改集中在近期 | | **刷新队列** | 平衡性能和准确性 | 实现复杂度高 | 生产环境推荐 | ### 推荐方案:刷新队列机制 **实现思路**: 1. 订单被修改时,记录需要刷新的日期 2. 定时任务批量刷新队列中的日期 3. 结合自动刷新策略,覆盖大部分场景 #### 1. 创建刷新队列表 ```php // 迁移文件 Schema::create('aggregate_refresh_queue', function (Blueprint $table) { $table->date('refresh_date')->comment('需要刷新的日期'); $table->string('aggregate_view', 100)->comment('聚合视图名称'); $table->timestamp('created_at')->comment('入队时间'); $table->primary(['refresh_date', 'aggregate_view']); $table->index('created_at'); }); ``` #### 2. 订单更新时入队 ```php // 订单更新方法 public function updateOrder($orderId, array $data): Order { $order = Order::findOrFail($orderId); $affectedDates = []; // 收集受影响的日期 if (isset($data['created_date']) && $data['created_date'] != $order->created_date) { $affectedDates[] = Carbon::parse($order->created_date)->format('Y-m-d'); $affectedDates[] = Carbon::parse($data['created_date'])->format('Y-m-d'); } if (isset($data['paid_date'])) { if ($order->paid_date) { $affectedDates[] = Carbon::parse($order->paid_date)->format('Y-m-d'); } if ($data['paid_date']) { $affectedDates[] = Carbon::parse($data['paid_date'])->format('Y-m-d'); } } // 更新订单 $order->update($data); // 入队需要刷新的日期 foreach (array_unique($affectedDates) as $date) { $this->enqueueRefresh($date); } return $order; } private function enqueueRefresh(string $date): void { // 只刷新 3 天前的数据(3 天内的由自动策略处理) if (Carbon::parse($date)->lt(Carbon::today()->subDays(3))) { DB::table('aggregate_refresh_queue')->insertOrIgnore([ ['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => now()], ['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_paid', 'created_at' => now()], ]); } } ``` #### 3. 定时任务批量刷新 ```php // app/Command/RefreshOrderAggregatesCommand.php where('created_at', '<', now()->subHour()) // 延迟 1 小时,避免频繁刷新 ->orderBy('refresh_date') ->get(); if ($queue->isEmpty()) { $this->info('No aggregates to refresh.'); return; } $grouped = $queue->groupBy('aggregate_view'); foreach ($grouped as $view => $items) { $dates = $items->pluck('refresh_date')->unique(); $this->info("Refreshing {$view}: " . $dates->count() . " dates"); foreach ($dates as $date) { $start = Carbon::parse($date)->startOfDay(); $end = Carbon::parse($date)->endOfDay(); try { DB::statement( "CALL refresh_continuous_aggregate(?, ?, ?)", [$view, $start, $end] ); // 删除已刷新的记录 DB::table('aggregate_refresh_queue') ->where('refresh_date', $date) ->where('aggregate_view', $view) ->delete(); $this->line(" ✓ Refreshed {$date}"); } catch (\Exception $e) { $this->error(" ✗ Failed to refresh {$date}: " . $e->getMessage()); } } } $this->info('Refresh completed.'); } } ``` **配置定时任务**(每天凌晨 2 点执行): ```php // config/autoload/crontab.php return [ 'enable' => true, 'crontab' => [ [ 'name' => 'refresh-order-aggregates', 'rule' => '0 2 * * *', // 每天凌晨 2 点 'callback' => [App\Command\RefreshOrderAggregatesCommand::class, 'handle'], 'memo' => '刷新订单聚合数据', ], ], ]; ``` --- ## 查询示例 ### 1. 按创建日期统计(历史 + 当天) ```sql -- 查询最近 30 天的订单统计(按创建日期) SELECT * FROM ( -- 历史数据(从连续聚合查询,速度快) SELECT day, company_id, platform_id, store_id, total_orders, paid_orders, unpaid_orders, sum_total_amount, sum_total_paid, avg_order_amount FROM orders_daily_by_created WHERE day >= CURRENT_DATE - INTERVAL '30 days' AND day < CURRENT_DATE UNION ALL -- 当天实时数据 SELECT time_bucket('1 day', created_date) AS day, company_id, platform_id, store_id, COUNT(*) AS total_orders, COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders, COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders, SUM(total_amount) AS sum_total_amount, SUM(total_paid) AS sum_total_paid, AVG(total_amount) AS avg_order_amount FROM orders WHERE created_date >= CURRENT_DATE GROUP BY day, company_id, platform_id, store_id ) AS combined ORDER BY day DESC, company_id, platform_id, store_id; ``` ### 2. 按付款日期统计(历史 + 当天) ```sql -- 查询最近 30 天的实际收入(按付款日期) SELECT * FROM ( -- 历史数据 SELECT day, company_id, platform_id, store_id, paid_orders, sum_total_paid, avg_paid_amount FROM orders_daily_by_paid WHERE day >= CURRENT_DATE - INTERVAL '30 days' AND day < CURRENT_DATE UNION ALL -- 当天实时数据 SELECT time_bucket('1 day', paid_date) AS day, company_id, platform_id, store_id, COUNT(*) AS paid_orders, SUM(total_paid) AS sum_total_paid, AVG(total_paid) AS avg_paid_amount FROM orders WHERE paid_date >= CURRENT_DATE AND paid_date IS NOT NULL GROUP BY day, company_id, platform_id, store_id ) AS combined ORDER BY day DESC; ``` ### 3. 按公司(Company)分组统计 ```sql -- 统计各公司每天的订单情况 SELECT day, company_id, SUM(total_orders) AS total_orders, SUM(paid_orders) AS paid_orders, SUM(sum_total_amount) AS total_amount, SUM(sum_total_paid) AS total_paid, AVG(avg_order_amount) AS avg_order_amount FROM orders_daily_by_created WHERE day >= '2025-01-01' AND day < '2025-02-01' GROUP BY day, company_id ORDER BY day DESC, company_id; ``` ```sql -- 统计各公司本月累计销售额(按付款时间) SELECT company_id, SUM(paid_orders) AS total_paid_orders, SUM(sum_total_paid) AS total_revenue, AVG(avg_paid_amount) AS avg_order_value FROM orders_daily_by_paid WHERE day >= date_trunc('month', CURRENT_DATE) AND day < CURRENT_DATE GROUP BY company_id ORDER BY total_revenue DESC; ``` ### 4. 按平台(Platform)分组统计 ```sql -- 对比各平台每天的订单量和销售额 SELECT day, platform_id, SUM(total_orders) AS total_orders, SUM(paid_orders) AS paid_orders, SUM(sum_total_amount) AS total_amount, SUM(sum_total_paid) AS total_paid, ROUND(SUM(paid_orders)::numeric / NULLIF(SUM(total_orders), 0) * 100, 2) AS payment_rate FROM orders_daily_by_created WHERE day >= CURRENT_DATE - INTERVAL '7 days' AND day < CURRENT_DATE GROUP BY day, platform_id ORDER BY day DESC, total_paid DESC; ``` ```sql -- 统计各平台本周每天的实际收入(按付款时间) SELECT day, platform_id, SUM(paid_orders) AS orders, SUM(sum_total_paid) AS revenue, SUM(sum_freight_fee) AS freight, SUM(sum_commission_fee) AS commission FROM orders_daily_by_paid WHERE day >= date_trunc('week', CURRENT_DATE) AND day < CURRENT_DATE GROUP BY day, platform_id ORDER BY day DESC, revenue DESC; ``` ### 5. 按店铺(Store)分组统计 ```sql -- 统计各店铺每天的订单情况 SELECT day, store_id, SUM(total_orders) AS total_orders, SUM(paid_orders) AS paid_orders, SUM(unpaid_orders) AS unpaid_orders, SUM(sum_total_amount) AS total_amount, SUM(sum_total_paid) AS total_paid FROM orders_daily_by_created WHERE day >= '2025-01-01' AND day < '2025-02-01' GROUP BY day, store_id ORDER BY day DESC, total_paid DESC; ``` ```sql -- 查询指定店铺的日销售趋势(按付款时间) SELECT day, paid_orders, sum_total_paid AS revenue, avg_paid_amount, sum_freight_fee, sum_tax_fee FROM orders_daily_by_paid WHERE store_id = 123 AND day >= CURRENT_DATE - INTERVAL '30 days' AND day < CURRENT_DATE ORDER BY day DESC; ``` ### 6. 多维度组合查询 ```sql -- 查询特定公司下各店铺的每日销售情况 SELECT day, company_id, store_id, SUM(total_orders) AS orders, SUM(paid_orders) AS paid, SUM(sum_total_paid) AS revenue, AVG(avg_paid_amount) AS avg_value FROM orders_daily_by_created WHERE company_id = 100 AND day >= '2025-01-01' AND day < '2025-02-01' GROUP BY day, company_id, store_id ORDER BY day DESC, revenue DESC; ``` ```sql -- 对比各公司在不同平台的销售表现(按付款时间) SELECT company_id, platform_id, SUM(paid_orders) AS total_orders, SUM(sum_total_paid) AS total_revenue, AVG(avg_paid_amount) AS avg_order_value, SUM(sum_commission_fee) AS total_commission, ROUND((SUM(sum_commission_fee) / NULLIF(SUM(sum_total_paid), 0) * 100), 2) AS commission_rate FROM orders_daily_by_paid WHERE day >= date_trunc('month', CURRENT_DATE) AND day < CURRENT_DATE GROUP BY company_id, platform_id ORDER BY company_id, total_revenue DESC; ``` ```sql -- 查询指定公司、平台、店铺的销售趋势 SELECT day, total_orders, paid_orders, unpaid_orders, sum_total_amount, sum_total_paid, avg_order_amount FROM orders_daily_by_created WHERE company_id = 100 AND platform_id = 200 AND store_id = 300 AND day >= CURRENT_DATE - INTERVAL '90 days' AND day < CURRENT_DATE ORDER BY day DESC; ``` ### 7. 周/月聚合查询 ```sql -- 按周统计(按创建时间) SELECT time_bucket('7 days', day) AS week, company_id, SUM(total_orders) AS weekly_orders, SUM(paid_orders) AS weekly_paid_orders, SUM(sum_total_paid) AS weekly_revenue, AVG(avg_order_amount) AS avg_order_value FROM orders_daily_by_created WHERE day >= '2025-01-01' GROUP BY week, company_id ORDER BY week DESC, company_id; ``` ```sql -- 按月统计(按付款时间) SELECT date_trunc('month', day) AS month, platform_id, SUM(paid_orders) AS monthly_orders, SUM(sum_total_paid) AS monthly_revenue, SUM(sum_freight_fee) AS monthly_freight, SUM(sum_commission_fee) AS monthly_commission FROM orders_daily_by_paid WHERE day >= '2024-01-01' GROUP BY month, platform_id ORDER BY month DESC, monthly_revenue DESC; ``` ### 8. 实时 + 历史混合查询(应用层封装) **PHP 查询示例**: ```php lt($today)) { $historyEnd = $end->lt($today) ? $end : $today->copy()->subDay(); $history = Db::select(" SELECT day::date, total_orders, paid_orders, unpaid_orders, sum_total_amount, sum_total_paid, avg_order_amount FROM orders_daily_by_created WHERE store_id = ? AND day >= ? AND day <= ? ORDER BY day DESC ", [$storeId, $start->format('Y-m-d'), $historyEnd->format('Y-m-d')]); $results = array_merge($results, $history); } // 查询当天实时数据 if ($end->gte($today)) { $realtime = Db::select(" SELECT time_bucket('1 day', created_date)::date AS day, COUNT(*) AS total_orders, COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders, COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders, SUM(total_amount) AS sum_total_amount, SUM(total_paid) AS sum_total_paid, AVG(total_amount) AS avg_order_amount FROM orders WHERE store_id = ? AND created_date >= ? AND created_date < ? GROUP BY day ORDER BY day DESC ", [$storeId, $today->format('Y-m-d H:i:s'), $end->addDay()->format('Y-m-d H:i:s')]); $results = array_merge($results, $realtime); } return $results; } /** * 查询各公司本月销售排名(按付款时间) * * @return array */ public function getMonthlyRevenueByCompany(): array { $monthStart = Carbon::now()->startOfMonth(); $today = Carbon::today(); // 历史数据 $history = Db::select(" SELECT company_id, SUM(paid_orders) AS total_orders, SUM(sum_total_paid) AS total_revenue, AVG(avg_paid_amount) AS avg_order_value FROM orders_daily_by_paid WHERE day >= ? AND day < ? GROUP BY company_id ", [$monthStart->format('Y-m-d'), $today->format('Y-m-d')]); // 当天实时数据 $realtime = Db::select(" SELECT company_id, COUNT(*) AS total_orders, SUM(total_paid) AS total_revenue, AVG(total_paid) AS avg_order_value FROM orders WHERE paid_date >= ? AND paid_date IS NOT NULL GROUP BY company_id ", [$today->format('Y-m-d H:i:s')]); // 合并数据 $combined = []; foreach ($history as $row) { $combined[$row->company_id] = [ 'company_id' => $row->company_id, 'total_orders' => $row->total_orders, 'total_revenue' => $row->total_revenue, 'avg_order_value' => $row->avg_order_value, ]; } foreach ($realtime as $row) { if (isset($combined[$row->company_id])) { $combined[$row->company_id]['total_orders'] += $row->total_orders; $combined[$row->company_id]['total_revenue'] += $row->total_revenue; // 重新计算平均值 $combined[$row->company_id]['avg_order_value'] = $combined[$row->company_id]['total_revenue'] / $combined[$row->company_id]['total_orders']; } else { $combined[$row->company_id] = [ 'company_id' => $row->company_id, 'total_orders' => $row->total_orders, 'total_revenue' => $row->total_revenue, 'avg_order_value' => $row->avg_order_value, ]; } } // 按收入排序 usort($combined, fn($a, $b) => $b['total_revenue'] <=> $a['total_revenue']); return $combined; } } ``` --- ## 最佳实践 ### 1. Chunk 大小优化(重要) #### Chunk 大小的核心原则 **TimescaleDB 官方建议**: - **目标大小**:500 MB ~ 2 GB per chunk(最优性能范围) - **内存限制**:Chunk 大小应 ≤ 25% 可用内存 - **查询效率**:单次查询扫描的 chunk 数量建议 < 10 个 - **上限**:单个 chunk 不建议超过 5 GB #### 计算您的 Chunk 大小 **步骤 1:估算单行数据大小** ``` orders 表单行估算(含索引): - 基础字段(id, company_id, 金额等):~300 bytes - 时间字段(4个 timestampTz):~32 bytes - 文本字段(zipcode, city, province, country):~200 bytes - JSONB 字段(raw, ext):~500-1000 bytes(视内容而定) - 索引开销(14个索引):~800 bytes ───────────────────────────────────────── 总计:约 1.5 - 2.5 KB/行(平均 2KB) ``` **步骤 2:计算每天数据量** ``` 公式:日数据量 = 日订单数 × 单行大小 示例: - 10,000 单/天 × 2KB = 20 MB/天 - 50,000 单/天 × 2KB = 100 MB/天 - 100,000 单/天 × 2KB = 200 MB/天 - 500,000 单/天 × 2KB = 1 GB/天 ``` **步骤 3:选择 chunk_time_interval** | 日订单量 | 日数据量 | 推荐间隔 | Chunk 大小 | 性能评价 | |---------|---------|---------|-----------|---------| | < 5千 | < 10 MB/天 | **14-30天** | 140-300 MB | ✅ 最优:减少 chunk 数量 | | 5千-2万 | 10-40 MB/天 | **7-14天** | 70-560 MB | ✅ 最优:平衡性能 | | 2万-10万 | 40-200 MB/天 | **3-7天** | 120MB-1.4GB | ✅ 推荐 | | 10万-50万 | 200MB-1GB/天 | **1-3天** | 200MB-3GB | ✅ 推荐 | | 50万-100万 | 1-2 GB/天 | **12-24小时** | 1.5-4 GB | ✅ 可接受 | | > 100万 | > 2 GB/天 | **6-12小时** | 2-4 GB | ⚠️ 需监控性能 | #### 查询模式影响分析 **常见查询时间范围 vs Chunk 配置**: ```sql -- 假设您最常查询"最近 30 天"数据 SELECT * FROM orders WHERE created_date >= CURRENT_DATE - INTERVAL '30 days'; -- 不同配置扫描的 chunk 数量: -- 1天间隔:30个chunks ❌ 过多,规划时间长 -- 3天间隔:10个chunks ✅ 理想 -- 7天间隔:5个chunks ✅ 最优 -- 14天间隔:3个chunks ✅ 很好 -- 30天间隔:1个chunk ⚠️ chunk 过大,可能超出内存 ``` **最佳实践**: > 根据您最常见的查询时间范围,配置 chunk_time_interval 使得单次查询扫描 3-8 个 chunks。 #### 实际配置示例 ```php // 场景 1:小规模电商(日均 5000 单) Schema::getConnection()->statement( "SELECT create_hypertable('orders', 'created_date', chunk_time_interval => INTERVAL '14 days', -- 约 140 MB/chunk if_not_exists => TRUE)" ); // 场景 2:中等规模(日均 5 万单) Schema::getConnection()->statement( "SELECT create_hypertable('orders', 'created_date', chunk_time_interval => INTERVAL '7 days', -- 约 700 MB/chunk if_not_exists => TRUE)" ); // 场景 3:大规模平台(日均 50 万单) Schema::getConnection()->statement( "SELECT create_hypertable('orders', 'created_date', chunk_time_interval => INTERVAL '1 day', -- 约 1 GB/chunk if_not_exists => TRUE)" ); // 场景 4:超大规模(日均 200 万单) Schema::getConnection()->statement( "SELECT create_hypertable('orders', 'created_date', chunk_time_interval => INTERVAL '12 hours', -- 约 2 GB/chunk if_not_exists => TRUE)" ); ``` #### 动态调整 Chunk 间隔 如果创建 hypertable 后发现 chunk 大小不合适,可以调整: ```sql -- 查看当前 chunk 大小 SELECT chunk_name, pg_size_pretty(pg_total_relation_size(chunk_schema || '.' || chunk_name)) AS size, range_start, range_end FROM timescaledb_information.chunks WHERE hypertable_name = 'orders' ORDER BY range_start DESC LIMIT 10; -- 如果 chunk 过小(< 100 MB)或过大(> 5 GB),调整间隔 -- 注意:只影响新创建的 chunk,已有 chunk 不受影响 SELECT set_chunk_time_interval('orders', INTERVAL '7 days'); ``` #### 性能监控指标 定期检查以下指标: ```sql -- 1. 查看 chunk 数量和大小分布 SELECT COUNT(*) AS total_chunks, pg_size_pretty(SUM(pg_total_relation_size(chunk_schema || '.' || chunk_name))) AS total_size, pg_size_pretty(AVG(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS avg_chunk_size, pg_size_pretty(MIN(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS min_chunk_size, pg_size_pretty(MAX(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS max_chunk_size FROM timescaledb_information.chunks WHERE hypertable_name = 'orders'; -- 2. 查看查询性能(EXPLAIN ANALYZE) EXPLAIN ANALYZE SELECT COUNT(*), SUM(total_amount) FROM orders WHERE created_date >= CURRENT_DATE - INTERVAL '30 days'; -- 关注 "Chunks excluded by constraint" - 数值越高越好 ``` **警告信号**: - ❌ 平均 chunk 大小 < 50 MB → chunk 间隔太小,增大间隔 - ❌ 平均 chunk 大小 > 5 GB → chunk 间隔太大,减小间隔 - ❌ 查询扫描 > 20 个 chunks → 考虑增大 chunk 间隔或优化查询 - ❌ Planning Time > Execution Time → chunk 数量过多,需要增大间隔 ### 2. 刷新策略配置 **低延迟场景**(数据实时性要求高): ```sql SELECT add_continuous_aggregate_policy('orders_daily_by_created', start_offset => INTERVAL '7 days', -- 刷新最近 7 天 end_offset => INTERVAL '5 minutes', -- 只排除最近 5 分钟 schedule_interval => INTERVAL '5 minutes' -- 每 5 分钟刷新 ); ``` **标准场景**(平衡性能和实时性): ```sql SELECT add_continuous_aggregate_policy('orders_daily_by_created', start_offset => INTERVAL '3 days', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '1 hour' ); ``` **低开销场景**(历史数据为主): ```sql SELECT add_continuous_aggregate_policy('orders_daily_by_created', start_offset => INTERVAL '1 day', -- 只刷新昨天 end_offset => INTERVAL '12 hours', schedule_interval => INTERVAL '6 hours' -- 每 6 小时刷新 ); ``` ### 3. 索引优化 **原则**:根据常用查询模式创建索引 ```sql -- 场景 1:按时间 + 店铺查询 CREATE INDEX ON orders_daily_by_created (day DESC, store_id); -- 场景 2:按店铺 + 时间排序 CREATE INDEX ON orders_daily_by_created (store_id, day DESC); -- 场景 3:按公司 + 平台 + 时间 CREATE INDEX ON orders_daily_by_created (company_id, platform_id, day DESC); ``` ### 4. 数据保留策略 自动删除旧数据(可选): ```sql -- 保留最近 2 年的数据,自动删除更早的 SELECT add_retention_policy('orders', INTERVAL '2 years'); -- 查看保留策略 SELECT * FROM timescaledb_information.jobs WHERE proc_name = 'policy_retention'; -- 删除保留策略 SELECT remove_retention_policy('orders'); ``` ### 5. 性能监控 **查看 chunk 分区情况**: ```sql SELECT chunk_name, range_start, range_end FROM timescaledb_information.chunks WHERE hypertable_name = 'orders' ORDER BY range_start DESC LIMIT 10; ``` **查看连续聚合刷新状态**: ```sql SELECT view_name, completed_threshold, invalidation_threshold FROM timescaledb_information.continuous_aggregates; ``` **查看后台任务执行情况**: ```sql SELECT job_id, proc_name, scheduled, next_start, last_finish, last_successful_finish FROM timescaledb_information.jobs ORDER BY job_id; ``` ### 6. 故障排查 **手动刷新聚合数据**: ```sql -- 刷新指定时间范围 CALL refresh_continuous_aggregate('orders_daily_by_created', '2025-01-01', '2025-02-01'); -- 刷新所有数据(慎用,耗时长) CALL refresh_continuous_aggregate('orders_daily_by_created', NULL, NULL); ``` **检查聚合数据是否最新**: ```sql -- 查看最新的物化数据时间点 SELECT view_name, completed_threshold AS last_materialized_time, NOW() - completed_threshold AS lag FROM timescaledb_information.continuous_aggregates WHERE view_name IN ('orders_daily_by_created', 'orders_daily_by_paid'); ``` --- ## 总结 ### 架构优势 1. **高性能**:历史数据查询从秒级优化到毫秒级 2. **灵活性**:支持双时间维度(创建时间、付款时间) 3. **实时性**:当天数据实时查询,历史数据预聚合 4. **可扩展**:支持多维度分组(company/platform/store) 5. **可维护**:自动刷新策略 + 手动刷新队列 ### 适用场景 - ✅ 订单量大(日订单 > 1 万) - ✅ 需要多时间维度分析 - ✅ 历史数据查询频繁 - ✅ 需要天/周/月等多种时间粒度统计 - ✅ 需要按多维度分组(公司/平台/店铺) ### 注意事项 1. **数据更新**:超出刷新窗口的历史修改需要手动刷新或使用刷新队列 2. **索引规划**:根据实际查询模式创建合适的索引 3. **分区间隔**:根据数据量调整,避免分区过多或过少 4. **监控告警**:定期检查聚合刷新状态,防止数据滞后 --- ## 参考资料 - [TimescaleDB 官方文档](https://docs.timescale.com/) - [Continuous Aggregates 详解](https://docs.timescale.com/use-timescale/latest/continuous-aggregates/) - [Hypertables 最佳实践](https://docs.timescale.com/use-timescale/latest/hypertables/) - [PostgreSQL time_bucket 函数](https://docs.timescale.com/api/latest/hyperfunctions/time_bucket/)