Files
datahub/docs/data_query.md
2025-11-13 14:42:54 +08:00

47 KiB
Raw Permalink Blame History

TimescaleDB 多时间维度销售统计方案

概述

本文档描述如何使用 TimescaleDB 实现订单系统的多时间维度销售统计,支持按创建时间付款时间两个维度进行高性能聚合查询。

业务需求

  • 按订单创建日期统计所有订单趋势(包括未支付订单)
  • 按订单付款日期统计实际收入(仅已支付订单)
  • 支持按 company、platform、store 多维度分组
  • 支持天/周/月等多种时间粒度统计
  • 历史数据查询性能要求高(毫秒级)
  • 当天数据要求实时

技术方案

  • 主表:使用 created_date 作为 hypertable 分区键
  • 连续聚合:创建两个物化视图分别按创建时间和付款时间统计
  • 查询策略:历史数据查询聚合表,当天数据实时查询主表

核心概念与工作原理

1. Hypertable:时间分区存储引擎

Hypertable 是什么?

Hypertable 是 TimescaleDB 的核心抽象层,它将普通的 PostgreSQL 表转换为自动按时间分区的表

Hypertable 的职责

功能 说明
自动分区 根据时间列自动创建和管理分区(chunks)
透明查询 查询时自动路由到相关分区,无需手动指定分区
高效写入 新数据写入最新分区,避免锁竞争
压缩存储 支持历史分区自动压缩,节省存储空间
数据保留 支持自动删除超过保留期的历史分区

Hypertable 不负责的工作

  • 不会自动聚合数据 - 仅存储原始数据
  • 不会自动创建统计视图 - 需要单独配置
  • 不会加速聚合查询 - 聚合查询仍需扫描所有数据

示例

-- 创建 hypertable
SELECT create_hypertable('orders', 'created_date',
  chunk_time_interval => INTERVAL '7 days');

-- 效果:
-- 2025-01-01 ~ 2025-01-07 的数据存储在 chunk_1
-- 2025-01-08 ~ 2025-01-14 的数据存储在 chunk_2
-- 2025-01-15 ~ 2025-01-21 的数据存储在 chunk_3
-- ...

-- 查询时自动路由(用户无感知):
SELECT * FROM orders
WHERE created_date >= '2025-01-10'
  AND created_date < '2025-01-12';
-- ↑ TimescaleDB 自动只扫描 chunk_2,不扫描其他分区

关键理解

Hypertable 只是存储优化,不是查询优化。 要加速聚合查询,还需要配置连续聚合


2. 物化视图对比:三种视图的本质区别

特性 普通视图
VIEW
PostgreSQL 物化视图
MATERIALIZED VIEW
TimescaleDB 连续聚合
Continuous Aggregate
数据存储 不存储,每次查询重新计算 存储预计算结果 存储预计算结果
查询性能 慢(每次重新聚合) 快(查询预计算结果) 极快(查询预计算结果)
数据刷新 实时(总是最新) 手动刷新(REFRESH MATERIALIZED VIEW 自动增量刷新(后台任务)
刷新开销 N/A 高(全量重新计算) 低(仅计算变化的数据)
历史数据更新 实时反映 手动刷新后才反映 配置刷新窗口或手动刷新
实时查询 支持 不支持(必须先刷新) 支持(与实时数据混合查询)
适用场景 数据量小,实时性要求高 数据量中等,可容忍手动刷新 时间序列数据,需要自动维护

示例对比

-- 1. 普通视图(每次查询都重新聚合,慢)
CREATE VIEW daily_sales AS
SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue
FROM orders
GROUP BY day;

SELECT * FROM daily_sales WHERE day = '2025-01-01';
-- ↑ 扫描 orders 表中所有 2025-01-01 的数据,实时计算


-- 2. PostgreSQL 物化视图(预计算,需手动刷新)
CREATE MATERIALIZED VIEW daily_sales AS
SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue
FROM orders
GROUP BY day;

SELECT * FROM daily_sales WHERE day = '2025-01-01';
-- ↑ 直接查询已计算的结果,快

-- 数据更新后必须手动刷新(全量重新计算)
REFRESH MATERIALIZED VIEW daily_sales;


-- 3. TimescaleDB 连续聚合(自动增量刷新)
CREATE MATERIALIZED VIEW daily_sales
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', created_date) AS day, SUM(total_amount) AS revenue
FROM orders
GROUP BY day
WITH NO DATA;

-- 配置自动刷新策略(每小时刷新最近 3 天的数据)
SELECT add_continuous_aggregate_policy('daily_sales',
  start_offset => INTERVAL '3 days',
  end_offset => INTERVAL '1 hour',
  schedule_interval => INTERVAL '1 hour'
);

SELECT * FROM daily_sales WHERE day = '2025-01-01';
-- ↑ 查询预计算结果 + 自动包含最新数据(实时混合查询)

3. WITH (timescaledb.continuous) 的作用

标准物化视图

CREATE MATERIALIZED VIEW my_view AS
SELECT ... FROM ...;
-- ↑ 这是 PostgreSQL 标准物化视图
-- - 需要手动 REFRESH MATERIALIZED VIEW
-- - 每次刷新都是全量重新计算

连续聚合

CREATE MATERIALIZED VIEW my_view
WITH (timescaledb.continuous) AS  -- ← 关键参数
SELECT time_bucket('1 day', time_column) AS bucket, ...
FROM hypertable
GROUP BY bucket;
-- ↑ 这是 TimescaleDB 连续聚合
-- - 自动后台刷新(配置刷新策略后)
-- - 增量刷新(只计算变化的 chunks)
-- - 支持实时查询(自动混合最新数据)

WITH (timescaledb.continuous) 启用的特性

  1. 增量物化 - 只计算有变化的时间分区(chunks),不是全量重新计算
  2. 自动刷新机制 - 可配置后台任务自动更新数据
  3. 实时查询能力 - 查询时自动混合物化数据 + 未物化的实时数据
  4. 失效跟踪 - 自动跟踪哪些时间段的数据需要重新计算

关键理解

WITH (timescaledb.continuous) 不是"自动开启刷新", 而是"允许配置增量刷新策略"。 创建后还需要配置刷新策略才会自动更新!


4. 完整集成步骤:三步法

TimescaleDB 的统计功能不是一步到位的,需要按顺序完成三个步骤

步骤1: 创建 Hypertable → 步骤2: 创建连续聚合 → 步骤3: 配置刷新策略 → 自动维护统计数据

步骤 1:创建 Hypertable

目的:将普通表转换为时间分区表,优化存储和查询

-- 在已有的 orders 表上创建 hypertable
SELECT create_hypertable('orders', 'created_date',
  chunk_time_interval => INTERVAL '7 days'
);

效果

  • 数据按时间自动分区存储
  • 查询时自动路由到相关分区
  • 聚合查询仍然很慢(未加速)

步骤 2:创建连续聚合

目的:创建预聚合视图,存储统计结果

CREATE MATERIALIZED VIEW orders_daily_stats
WITH (timescaledb.continuous) AS
SELECT
  time_bucket('1 day', created_date) AS day,
  company_id,
  COUNT(*) AS total_orders,
  SUM(total_amount) AS revenue
FROM orders
GROUP BY day, company_id
WITH NO DATA;  -- 先不填充数据,等待刷新策略触发

效果

  • 视图已创建,可以查询
  • 数据为空(WITH NO DATA
  • 不会自动更新(未配置刷新策略)

步骤 3:配置刷新策略

目的:启用自动后台刷新,保持数据最新

SELECT add_continuous_aggregate_policy('orders_daily_stats',
  start_offset => INTERVAL '3 days',   -- 从 3 天前开始刷新
  end_offset => INTERVAL '1 hour',     -- 刷新到 1 小时前(排除正在写入的数据)
  schedule_interval => INTERVAL '1 hour'  -- 每小时执行一次
);

效果

  • 后台任务每小时自动刷新
  • 只刷新 [now() - 3天, now() - 1小时] 窗口内的数据
  • 增量计算,性能高

初次填充数据

-- 刷新策略只刷新配置的窗口,历史数据需要手动触发一次
CALL refresh_continuous_aggregate('orders_daily_stats',
  '2024-01-01',  -- 从这个日期开始
  '2025-01-01'   -- 刷新到这个日期
);

5. 数据流与业务流程

数据写入流程

应用写入订单数据
    ↓
orders 表接收数据
    ↓
TimescaleDB 路由器判断时间
    ├→ Chunk 1 (2025-01-01~07)
    ├→ Chunk 2 (2025-01-08~14)
    ├→ Chunk 3 (2025-01-15~21)
    └→ Chunk N (最新数据)

数据聚合流程

后台刷新任务(每小时执行)
    ↓
检测变化的 Chunks
    ├→ Chunk 10 有新数据 → 重新计算 Chunk 10 的聚合
    ├→ Chunk 11 有更新  → 重新计算 Chunk 11 的聚合
    └→ Chunk 12 无变化  → 跳过
         ↓
更新 orders_daily_by_created 视图
    ↓
聚合数据已更新

查询流程(实时 + 历史混合)

查询最近 30 天销售额
    ↓
查询优化器分析
    ├→ 历史数据(30天前 ~ 昨天)
    │    ↓
    │  查询连续聚合 orders_daily_by_created
    │  (速度:毫秒级)
    │
    └→ 实时数据(今天)
         ↓
       查询 orders 表实时计算
       (速度:秒级,但数据量少)
         ↓
合并结果 → 返回完整统计数据

完整业务流

┌────────────────────────────────────────────────────────────────┐
│                        应用层操作                                │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│  1. 订单写入                                                    │
│     INSERT INTO orders (...) VALUES (...);                     │
│     ↓                                                          │
│     [写入 hypertable,自动路由到对应 chunk]                      │
│                                                                │
│  2. 后台自动聚合(每小时执行)                                    │
│     [TimescaleDB 后台任务]                                      │
│     ↓                                                          │
│     检测最近 3 天有变化的 chunks                                │
│     ↓                                                          │
│     重新计算这些 chunks 的聚合数据                              │
│     ↓                                                          │
│     更新 orders_daily_by_created 视图                           │
│                                                                │
│  3. 历史订单更新(超出刷新窗口)                                  │
│     UPDATE orders SET total_paid = 100 WHERE id = 123;         │
│     ↓                                                          │
│     [应用层检测:订单创建于 30 天前]                             │
│     ↓                                                          │
│     INSERT INTO aggregate_refresh_queue ...                    │
│     ↓                                                          │
│     [定时任务每天凌晨处理队列]                                   │
│     ↓                                                          │
│     CALL refresh_continuous_aggregate('...', '2024-12-01', ...) │
│                                                                │
│  4. 查询统计数据                                                │
│     SELECT * FROM orders_daily_by_created                      │
│     WHERE day >= '2024-12-01' AND day < CURRENT_DATE           │
│     ↓                                                          │
│     [查询预计算的聚合数据,毫秒级返回]                           │
│                                                                │
│     UNION ALL                                                  │
│                                                                │
│     SELECT ... FROM orders                                     │
│     WHERE created_date >= CURRENT_DATE                         │
│     ↓                                                          │
│     [实时聚合当天数据,秒级返回]                                 │
│                                                                │
└────────────────────────────────────────────────────────────────┘

关键时间点

数据年龄 处理方式 查询来源 更新方式
今天 实时数据 orders 表(实时聚合) 写入即可见
1小时前 ~ 3天前 自动刷新窗口 连续聚合 后台任务每小时刷新
3天前 ~ N年前 历史数据 连续聚合 手动刷新或刷新队列

架构设计

1. Hypertable 分区策略

TimescaleDB 的 hypertable 只能有一个时间分区键,我们选择 created_date

原因

  • 所有订单都有 created_date(非 NULL
  • paid_date 在未支付时为 NULL,无法作为分区键
  • 订单按创建时间自然增长,适合时间序列存储

配置

SELECT create_hypertable('orders', 'created_date',
  chunk_time_interval => INTERVAL '7 days');

2. 连续聚合机制

连续聚合(Continuous Aggregates)是 TimescaleDB 提供的自动维护的物化视图:

特性

  • 自动增量更新,只计算变化的数据
  • 支持配置刷新策略(时间窗口、刷新频率)
  • 查询速度远快于实时聚合(预计算结果)
  • 支持与实时数据混合查询

两个聚合视图

  1. orders_daily_by_created - 按创建日期统计

    • 统计所有订单(包括未支付)
    • 区分已支付和未支付订单数量
    • 适用于分析订单趋势、转化率
  2. orders_daily_by_paid - 按付款日期统计

    • 仅统计已支付订单(WHERE paid_date IS NOT NULL
    • 反映真实收入时间分布
    • 适用于财务报表、收入分析

实施步骤

步骤 1:修改 orders 表迁移文件

文件:backend/migrations/2025_11_11_083522_create_orders_table.php

up() 方法末尾添加 TimescaleDB 配置:

public function up(): void
{
    // ... 现有的表创建和索引代码 ...

    // 为 jsonb 字段创建 GIN 索引(PostgreSQL
    Schema::getConnection()->statement('CREATE INDEX orders_raw_gin_idx ON orders USING gin (raw)');
    Schema::getConnection()->statement('CREATE INDEX orders_ext_gin_idx ON orders USING gin (ext)');

    // ==================== TimescaleDB 配置 ====================

    // 1. 启用 TimescaleDB 扩展(如果未启用)
    Schema::getConnection()->statement('CREATE EXTENSION IF NOT EXISTS timescaledb');

    // 2. 将 orders 表转换为 hypertable
    //    - 使用 created_date 作为时间分区键
    //    - 每个分区(chunk)存储 7 天数据
    //    - if_not_exists => true 避免重复创建报错
    Schema::getConnection()->statement(
        "SELECT create_hypertable('orders', 'created_date',
          chunk_time_interval => INTERVAL '7 days',
          if_not_exists => TRUE)"
    );

    // 3. (可选)设置数据保留策略 - 自动删除 2 年前的数据
    // Schema::getConnection()->statement(
    //     "SELECT add_retention_policy('orders', INTERVAL '2 years')"
    // );
}

参数说明

  • chunk_time_interval: 分区间隔,根据数据量调整
    • 小数据量:7 天(默认)
    • 中等数据量:1 天
    • 大数据量:半天或更小
  • if_not_exists: 避免重复创建时报错

步骤 2:创建连续聚合迁移文件

创建文件:backend/migrations/2025_11_12_XXXXXX_create_orders_continuous_aggregates.php

<?php

use Hyperf\Database\Schema\Schema;
use Hyperf\Database\Migrations\Migration;

return new class extends Migration
{
    /**
     * Run the migrations.
     */
    public function up(): void
    {
        $conn = Schema::getConnection();

        // ========== 连续聚合 1:按创建日期统计 ==========

        $conn->statement("
            CREATE MATERIALIZED VIEW orders_daily_by_created
            WITH (timescaledb.continuous) AS
            SELECT
              time_bucket('1 day', created_date) AS day,
              company_id,
              platform_id,
              store_id,

              -- 订单数量指标
              COUNT(*) AS total_orders,
              COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
              COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,

              -- 销售额指标
              SUM(total_amount) AS sum_total_amount,
              SUM(total_paid) AS sum_total_paid,
              SUM(total_received) AS sum_total_received,

              -- 平均值指标
              AVG(total_amount) AS avg_order_amount,
              AVG(total_paid) FILTER (WHERE paid_date IS NOT NULL) AS avg_paid_amount,

              -- 费用指标
              SUM(freight_fee) AS sum_freight_fee,
              SUM(tax_fee) AS sum_tax_fee,
              SUM(commission_fee) AS sum_commission_fee,
              SUM(discount_fee) AS sum_discount_fee

            FROM orders
            GROUP BY day, company_id, platform_id, store_id
            WITH NO DATA;
        ");

        // 配置自动刷新策略
        // - 刷新最近 3 天到 1 小时前的数据
        // - 每小时执行一次
        $conn->statement("
            SELECT add_continuous_aggregate_policy('orders_daily_by_created',
              start_offset => INTERVAL '3 days',
              end_offset => INTERVAL '1 hour',
              schedule_interval => INTERVAL '1 hour'
            );
        ");

        // 创建索引加速查询
        $conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, company_id)');
        $conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, platform_id)');
        $conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, store_id)');
        $conn->statement('CREATE INDEX ON orders_daily_by_created (company_id, day DESC)');
        $conn->statement('CREATE INDEX ON orders_daily_by_created (store_id, day DESC)');

        // ========== 连续聚合 2:按付款日期统计 ==========

        $conn->statement("
            CREATE MATERIALIZED VIEW orders_daily_by_paid
            WITH (timescaledb.continuous) AS
            SELECT
              time_bucket('1 day', paid_date) AS day,
              company_id,
              platform_id,
              store_id,

              -- 已支付订单统计
              COUNT(*) AS paid_orders,

              -- 销售额指标
              SUM(total_amount) AS sum_total_amount,
              SUM(total_paid) AS sum_total_paid,
              SUM(total_received) AS sum_total_received,

              -- 平均值指标
              AVG(total_amount) AS avg_order_amount,
              AVG(total_paid) AS avg_paid_amount,

              -- 费用指标
              SUM(freight_fee) AS sum_freight_fee,
              SUM(tax_fee) AS sum_tax_fee,
              SUM(commission_fee) AS sum_commission_fee,
              SUM(discount_fee) AS sum_discount_fee

            FROM orders
            WHERE paid_date IS NOT NULL  -- 只统计已支付订单
            GROUP BY day, company_id, platform_id, store_id
            WITH NO DATA;
        ");

        // 配置自动刷新策略
        $conn->statement("
            SELECT add_continuous_aggregate_policy('orders_daily_by_paid',
              start_offset => INTERVAL '3 days',
              end_offset => INTERVAL '1 hour',
              schedule_interval => INTERVAL '1 hour'
            );
        ");

        // 创建索引
        $conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, company_id)');
        $conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, platform_id)');
        $conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, store_id)');
        $conn->statement('CREATE INDEX ON orders_daily_by_paid (company_id, day DESC)');
        $conn->statement('CREATE INDEX ON orders_daily_by_paid (store_id, day DESC)');
    }

    /**
     * Reverse the migrations.
     */
    public function down(): void
    {
        $conn = Schema::getConnection();

        // 删除连续聚合视图(CASCADE 会自动删除关联的刷新策略)
        $conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_paid CASCADE');
        $conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_created CASCADE');
    }
};

刷新策略参数说明

  • start_offset: 从多久之前开始刷新(相对于当前时间)
  • end_offset: 刷新到多久之前结束(避免刷新正在写入的数据)
  • schedule_interval: 刷新频率

示例配置

-- 每小时刷新最近 3 天的数据
start_offset => INTERVAL '3 days'
end_offset => INTERVAL '1 hour'
schedule_interval => INTERVAL '1 hour'

-- 实际刷新窗口:[now() - 3 days, now() - 1 hour]

历史数据更新同步问题

问题描述

连续聚合的刷新策略只覆盖配置的时间窗口(如最近 3 天),当超出窗口的历史订单被修改时,聚合数据不会自动更新。

常见场景

  • 历史订单退款(修改 total_paid)
  • 补录付款时间(更新 paid_date)
  • 订单状态更正
  • 金额调整

解决方案对比

方案 优点 缺点 适用场景
手动刷新 精准控制,开销小 需要应用层处理 历史修改较少
实时聚合 数据始终准确 查询性能下降 对准确性要求极高
扩大刷新窗口 自动处理 刷新开销增加 历史修改集中在近期
刷新队列 平衡性能和准确性 实现复杂度高 生产环境推荐

推荐方案:刷新队列机制

实现思路

  1. 订单被修改时,记录需要刷新的日期
  2. 定时任务批量刷新队列中的日期
  3. 结合自动刷新策略,覆盖大部分场景

1. 创建刷新队列表

// 迁移文件
Schema::create('aggregate_refresh_queue', function (Blueprint $table) {
    $table->date('refresh_date')->comment('需要刷新的日期');
    $table->string('aggregate_view', 100)->comment('聚合视图名称');
    $table->timestamp('created_at')->comment('入队时间');
    $table->primary(['refresh_date', 'aggregate_view']);
    $table->index('created_at');
});

2. 订单更新时入队

// 订单更新方法
public function updateOrder($orderId, array $data): Order
{
    $order = Order::findOrFail($orderId);
    $affectedDates = [];

    // 收集受影响的日期
    if (isset($data['created_date']) && $data['created_date'] != $order->created_date) {
        $affectedDates[] = Carbon::parse($order->created_date)->format('Y-m-d');
        $affectedDates[] = Carbon::parse($data['created_date'])->format('Y-m-d');
    }
    if (isset($data['paid_date'])) {
        if ($order->paid_date) {
            $affectedDates[] = Carbon::parse($order->paid_date)->format('Y-m-d');
        }
        if ($data['paid_date']) {
            $affectedDates[] = Carbon::parse($data['paid_date'])->format('Y-m-d');
        }
    }

    // 更新订单
    $order->update($data);

    // 入队需要刷新的日期
    foreach (array_unique($affectedDates) as $date) {
        $this->enqueueRefresh($date);
    }

    return $order;
}

private function enqueueRefresh(string $date): void
{
    // 只刷新 3 天前的数据(3 天内的由自动策略处理)
    if (Carbon::parse($date)->lt(Carbon::today()->subDays(3))) {
        DB::table('aggregate_refresh_queue')->insertOrIgnore([
            ['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => now()],
            ['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_paid', 'created_at' => now()],
        ]);
    }
}

3. 定时任务批量刷新

// app/Command/RefreshOrderAggregatesCommand.php
<?php

namespace App\Command;

use Hyperf\Command\Command;
use Hyperf\DbConnection\Db;
use Carbon\Carbon;

class RefreshOrderAggregatesCommand extends Command
{
    protected ?string $signature = 'orders:refresh-aggregates';
    protected string $description = '刷新订单聚合数据队列';

    public function handle()
    {
        $queue = DB::table('aggregate_refresh_queue')
            ->where('created_at', '<', now()->subHour()) // 延迟 1 小时,避免频繁刷新
            ->orderBy('refresh_date')
            ->get();

        if ($queue->isEmpty()) {
            $this->info('No aggregates to refresh.');
            return;
        }

        $grouped = $queue->groupBy('aggregate_view');

        foreach ($grouped as $view => $items) {
            $dates = $items->pluck('refresh_date')->unique();
            $this->info("Refreshing {$view}: " . $dates->count() . " dates");

            foreach ($dates as $date) {
                $start = Carbon::parse($date)->startOfDay();
                $end = Carbon::parse($date)->endOfDay();

                try {
                    DB::statement(
                        "CALL refresh_continuous_aggregate(?, ?, ?)",
                        [$view, $start, $end]
                    );

                    // 删除已刷新的记录
                    DB::table('aggregate_refresh_queue')
                        ->where('refresh_date', $date)
                        ->where('aggregate_view', $view)
                        ->delete();

                    $this->line("  ✓ Refreshed {$date}");
                } catch (\Exception $e) {
                    $this->error("  ✗ Failed to refresh {$date}: " . $e->getMessage());
                }
            }
        }

        $this->info('Refresh completed.');
    }
}

配置定时任务(每天凌晨 2 点执行):

// config/autoload/crontab.php
return [
    'enable' => true,
    'crontab' => [
        [
            'name' => 'refresh-order-aggregates',
            'rule' => '0 2 * * *', // 每天凌晨 2 点
            'callback' => [App\Command\RefreshOrderAggregatesCommand::class, 'handle'],
            'memo' => '刷新订单聚合数据',
        ],
    ],
];

查询示例

1. 按创建日期统计(历史 + 当天)

-- 查询最近 30 天的订单统计(按创建日期)
SELECT * FROM (
  -- 历史数据(从连续聚合查询,速度快)
  SELECT
    day,
    company_id,
    platform_id,
    store_id,
    total_orders,
    paid_orders,
    unpaid_orders,
    sum_total_amount,
    sum_total_paid,
    avg_order_amount
  FROM orders_daily_by_created
  WHERE day >= CURRENT_DATE - INTERVAL '30 days'
    AND day < CURRENT_DATE

  UNION ALL

  -- 当天实时数据
  SELECT
    time_bucket('1 day', created_date) AS day,
    company_id,
    platform_id,
    store_id,
    COUNT(*) AS total_orders,
    COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
    COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,
    SUM(total_amount) AS sum_total_amount,
    SUM(total_paid) AS sum_total_paid,
    AVG(total_amount) AS avg_order_amount
  FROM orders
  WHERE created_date >= CURRENT_DATE
  GROUP BY day, company_id, platform_id, store_id
) AS combined
ORDER BY day DESC, company_id, platform_id, store_id;

2. 按付款日期统计(历史 + 当天)

-- 查询最近 30 天的实际收入(按付款日期)
SELECT * FROM (
  -- 历史数据
  SELECT
    day,
    company_id,
    platform_id,
    store_id,
    paid_orders,
    sum_total_paid,
    avg_paid_amount
  FROM orders_daily_by_paid
  WHERE day >= CURRENT_DATE - INTERVAL '30 days'
    AND day < CURRENT_DATE

  UNION ALL

  -- 当天实时数据
  SELECT
    time_bucket('1 day', paid_date) AS day,
    company_id,
    platform_id,
    store_id,
    COUNT(*) AS paid_orders,
    SUM(total_paid) AS sum_total_paid,
    AVG(total_paid) AS avg_paid_amount
  FROM orders
  WHERE paid_date >= CURRENT_DATE
    AND paid_date IS NOT NULL
  GROUP BY day, company_id, platform_id, store_id
) AS combined
ORDER BY day DESC;

3. 按公司(Company)分组统计

-- 统计各公司每天的订单情况
SELECT
  day,
  company_id,
  SUM(total_orders) AS total_orders,
  SUM(paid_orders) AS paid_orders,
  SUM(sum_total_amount) AS total_amount,
  SUM(sum_total_paid) AS total_paid,
  AVG(avg_order_amount) AS avg_order_amount
FROM orders_daily_by_created
WHERE day >= '2025-01-01' AND day < '2025-02-01'
GROUP BY day, company_id
ORDER BY day DESC, company_id;
-- 统计各公司本月累计销售额(按付款时间)
SELECT
  company_id,
  SUM(paid_orders) AS total_paid_orders,
  SUM(sum_total_paid) AS total_revenue,
  AVG(avg_paid_amount) AS avg_order_value
FROM orders_daily_by_paid
WHERE day >= date_trunc('month', CURRENT_DATE)
  AND day < CURRENT_DATE
GROUP BY company_id
ORDER BY total_revenue DESC;

4. 按平台(Platform)分组统计

-- 对比各平台每天的订单量和销售额
SELECT
  day,
  platform_id,
  SUM(total_orders) AS total_orders,
  SUM(paid_orders) AS paid_orders,
  SUM(sum_total_amount) AS total_amount,
  SUM(sum_total_paid) AS total_paid,
  ROUND(SUM(paid_orders)::numeric / NULLIF(SUM(total_orders), 0) * 100, 2) AS payment_rate
FROM orders_daily_by_created
WHERE day >= CURRENT_DATE - INTERVAL '7 days'
  AND day < CURRENT_DATE
GROUP BY day, platform_id
ORDER BY day DESC, total_paid DESC;
-- 统计各平台本周每天的实际收入(按付款时间)
SELECT
  day,
  platform_id,
  SUM(paid_orders) AS orders,
  SUM(sum_total_paid) AS revenue,
  SUM(sum_freight_fee) AS freight,
  SUM(sum_commission_fee) AS commission
FROM orders_daily_by_paid
WHERE day >= date_trunc('week', CURRENT_DATE)
  AND day < CURRENT_DATE
GROUP BY day, platform_id
ORDER BY day DESC, revenue DESC;

5. 按店铺(Store)分组统计

-- 统计各店铺每天的订单情况
SELECT
  day,
  store_id,
  SUM(total_orders) AS total_orders,
  SUM(paid_orders) AS paid_orders,
  SUM(unpaid_orders) AS unpaid_orders,
  SUM(sum_total_amount) AS total_amount,
  SUM(sum_total_paid) AS total_paid
FROM orders_daily_by_created
WHERE day >= '2025-01-01' AND day < '2025-02-01'
GROUP BY day, store_id
ORDER BY day DESC, total_paid DESC;
-- 查询指定店铺的日销售趋势(按付款时间)
SELECT
  day,
  paid_orders,
  sum_total_paid AS revenue,
  avg_paid_amount,
  sum_freight_fee,
  sum_tax_fee
FROM orders_daily_by_paid
WHERE store_id = 123
  AND day >= CURRENT_DATE - INTERVAL '30 days'
  AND day < CURRENT_DATE
ORDER BY day DESC;

6. 多维度组合查询

-- 查询特定公司下各店铺的每日销售情况
SELECT
  day,
  company_id,
  store_id,
  SUM(total_orders) AS orders,
  SUM(paid_orders) AS paid,
  SUM(sum_total_paid) AS revenue,
  AVG(avg_paid_amount) AS avg_value
FROM orders_daily_by_created
WHERE company_id = 100
  AND day >= '2025-01-01' AND day < '2025-02-01'
GROUP BY day, company_id, store_id
ORDER BY day DESC, revenue DESC;
-- 对比各公司在不同平台的销售表现(按付款时间)
SELECT
  company_id,
  platform_id,
  SUM(paid_orders) AS total_orders,
  SUM(sum_total_paid) AS total_revenue,
  AVG(avg_paid_amount) AS avg_order_value,
  SUM(sum_commission_fee) AS total_commission,
  ROUND((SUM(sum_commission_fee) / NULLIF(SUM(sum_total_paid), 0) * 100), 2) AS commission_rate
FROM orders_daily_by_paid
WHERE day >= date_trunc('month', CURRENT_DATE)
  AND day < CURRENT_DATE
GROUP BY company_id, platform_id
ORDER BY company_id, total_revenue DESC;
-- 查询指定公司、平台、店铺的销售趋势
SELECT
  day,
  total_orders,
  paid_orders,
  unpaid_orders,
  sum_total_amount,
  sum_total_paid,
  avg_order_amount
FROM orders_daily_by_created
WHERE company_id = 100
  AND platform_id = 200
  AND store_id = 300
  AND day >= CURRENT_DATE - INTERVAL '90 days'
  AND day < CURRENT_DATE
ORDER BY day DESC;

7. 周/月聚合查询

-- 按周统计(按创建时间)
SELECT
  time_bucket('7 days', day) AS week,
  company_id,
  SUM(total_orders) AS weekly_orders,
  SUM(paid_orders) AS weekly_paid_orders,
  SUM(sum_total_paid) AS weekly_revenue,
  AVG(avg_order_amount) AS avg_order_value
FROM orders_daily_by_created
WHERE day >= '2025-01-01'
GROUP BY week, company_id
ORDER BY week DESC, company_id;
-- 按月统计(按付款时间)
SELECT
  date_trunc('month', day) AS month,
  platform_id,
  SUM(paid_orders) AS monthly_orders,
  SUM(sum_total_paid) AS monthly_revenue,
  SUM(sum_freight_fee) AS monthly_freight,
  SUM(sum_commission_fee) AS monthly_commission
FROM orders_daily_by_paid
WHERE day >= '2024-01-01'
GROUP BY month, platform_id
ORDER BY month DESC, monthly_revenue DESC;

8. 实时 + 历史混合查询(应用层封装)

PHP 查询示例

<?php

namespace App\Service;

use Hyperf\DbConnection\Db;
use Carbon\Carbon;

class OrderStatisticsService
{
    /**
     * 查询指定店铺的日销售统计(按创建时间)
     *
     * @param int $storeId 店铺 ID
     * @param string $startDate 开始日期 (Y-m-d)
     * @param string $endDate 结束日期 (Y-m-d)
     * @return array
     */
    public function getDailyStatsByStore(int $storeId, string $startDate, string $endDate): array
    {
        $start = Carbon::parse($startDate);
        $end = Carbon::parse($endDate);
        $today = Carbon::today();

        $results = [];

        // 查询历史数据(从聚合表)
        if ($start->lt($today)) {
            $historyEnd = $end->lt($today) ? $end : $today->copy()->subDay();

            $history = Db::select("
                SELECT
                    day::date,
                    total_orders,
                    paid_orders,
                    unpaid_orders,
                    sum_total_amount,
                    sum_total_paid,
                    avg_order_amount
                FROM orders_daily_by_created
                WHERE store_id = ?
                  AND day >= ?
                  AND day <= ?
                ORDER BY day DESC
            ", [$storeId, $start->format('Y-m-d'), $historyEnd->format('Y-m-d')]);

            $results = array_merge($results, $history);
        }

        // 查询当天实时数据
        if ($end->gte($today)) {
            $realtime = Db::select("
                SELECT
                    time_bucket('1 day', created_date)::date AS day,
                    COUNT(*) AS total_orders,
                    COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
                    COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,
                    SUM(total_amount) AS sum_total_amount,
                    SUM(total_paid) AS sum_total_paid,
                    AVG(total_amount) AS avg_order_amount
                FROM orders
                WHERE store_id = ?
                  AND created_date >= ?
                  AND created_date < ?
                GROUP BY day
                ORDER BY day DESC
            ", [$storeId, $today->format('Y-m-d H:i:s'), $end->addDay()->format('Y-m-d H:i:s')]);

            $results = array_merge($results, $realtime);
        }

        return $results;
    }

    /**
     * 查询各公司本月销售排名(按付款时间)
     *
     * @return array
     */
    public function getMonthlyRevenueByCompany(): array
    {
        $monthStart = Carbon::now()->startOfMonth();
        $today = Carbon::today();

        // 历史数据
        $history = Db::select("
            SELECT
                company_id,
                SUM(paid_orders) AS total_orders,
                SUM(sum_total_paid) AS total_revenue,
                AVG(avg_paid_amount) AS avg_order_value
            FROM orders_daily_by_paid
            WHERE day >= ?
              AND day < ?
            GROUP BY company_id
        ", [$monthStart->format('Y-m-d'), $today->format('Y-m-d')]);

        // 当天实时数据
        $realtime = Db::select("
            SELECT
                company_id,
                COUNT(*) AS total_orders,
                SUM(total_paid) AS total_revenue,
                AVG(total_paid) AS avg_order_value
            FROM orders
            WHERE paid_date >= ?
              AND paid_date IS NOT NULL
            GROUP BY company_id
        ", [$today->format('Y-m-d H:i:s')]);

        // 合并数据
        $combined = [];
        foreach ($history as $row) {
            $combined[$row->company_id] = [
                'company_id' => $row->company_id,
                'total_orders' => $row->total_orders,
                'total_revenue' => $row->total_revenue,
                'avg_order_value' => $row->avg_order_value,
            ];
        }

        foreach ($realtime as $row) {
            if (isset($combined[$row->company_id])) {
                $combined[$row->company_id]['total_orders'] += $row->total_orders;
                $combined[$row->company_id]['total_revenue'] += $row->total_revenue;
                // 重新计算平均值
                $combined[$row->company_id]['avg_order_value'] =
                    $combined[$row->company_id]['total_revenue'] / $combined[$row->company_id]['total_orders'];
            } else {
                $combined[$row->company_id] = [
                    'company_id' => $row->company_id,
                    'total_orders' => $row->total_orders,
                    'total_revenue' => $row->total_revenue,
                    'avg_order_value' => $row->avg_order_value,
                ];
            }
        }

        // 按收入排序
        usort($combined, fn($a, $b) => $b['total_revenue'] <=> $a['total_revenue']);

        return $combined;
    }
}

最佳实践

1. Chunk 大小优化(重要)

Chunk 大小的核心原则

TimescaleDB 官方建议

  • 目标大小500 MB ~ 2 GB per chunk(最优性能范围)
  • 内存限制:Chunk 大小应 ≤ 25% 可用内存
  • 查询效率:单次查询扫描的 chunk 数量建议 < 10 个
  • 上限:单个 chunk 不建议超过 5 GB

计算您的 Chunk 大小

步骤 1:估算单行数据大小

orders 表单行估算(含索引):
- 基础字段(id, company_id, 金额等):~300 bytes
- 时间字段(4个 timestampTz):~32 bytes
- 文本字段(zipcode, city, province, country):~200 bytes
- JSONB 字段(raw, ext):~500-1000 bytes(视内容而定)
- 索引开销(14个索引):~800 bytes
─────────────────────────────────────────
总计:约 1.5 - 2.5 KB/行(平均 2KB

步骤 2:计算每天数据量

公式:日数据量 = 日订单数 × 单行大小

示例:
- 10,000 单/天 × 2KB = 20 MB/天
- 50,000 单/天 × 2KB = 100 MB/天
- 100,000 单/天 × 2KB = 200 MB/天
- 500,000 单/天 × 2KB = 1 GB/天

步骤 3:选择 chunk_time_interval

日订单量 日数据量 推荐间隔 Chunk 大小 性能评价
< 5千 < 10 MB/天 14-30天 140-300 MB 最优:减少 chunk 数量
5千-2万 10-40 MB/天 7-14天 70-560 MB 最优:平衡性能
2万-10万 40-200 MB/天 3-7天 120MB-1.4GB 推荐
10万-50万 200MB-1GB/天 1-3天 200MB-3GB 推荐
50万-100万 1-2 GB/天 12-24小时 1.5-4 GB 可接受
> 100万 > 2 GB/天 6-12小时 2-4 GB ⚠️ 需监控性能

查询模式影响分析

常见查询时间范围 vs Chunk 配置

-- 假设您最常查询"最近 30 天"数据
SELECT * FROM orders
WHERE created_date >= CURRENT_DATE - INTERVAL '30 days';

-- 不同配置扫描的 chunk 数量:
-- 1天间隔:30个chunks  ❌ 过多,规划时间长
-- 3天间隔:10个chunks  ✅ 理想
-- 7天间隔:5个chunks   ✅ 最优
-- 14天间隔:3个chunks  ✅ 很好
-- 30天间隔:1个chunk   ⚠️ chunk 过大,可能超出内存

最佳实践

根据您最常见的查询时间范围,配置 chunk_time_interval 使得单次查询扫描 3-8 个 chunks。

实际配置示例

// 场景 1:小规模电商(日均 5000 单)
Schema::getConnection()->statement(
    "SELECT create_hypertable('orders', 'created_date',
      chunk_time_interval => INTERVAL '14 days',  -- 约 140 MB/chunk
      if_not_exists => TRUE)"
);

// 场景 2:中等规模(日均 5 万单)
Schema::getConnection()->statement(
    "SELECT create_hypertable('orders', 'created_date',
      chunk_time_interval => INTERVAL '7 days',   -- 约 700 MB/chunk
      if_not_exists => TRUE)"
);

// 场景 3:大规模平台(日均 50 万单)
Schema::getConnection()->statement(
    "SELECT create_hypertable('orders', 'created_date',
      chunk_time_interval => INTERVAL '1 day',    -- 约 1 GB/chunk
      if_not_exists => TRUE)"
);

// 场景 4:超大规模(日均 200 万单)
Schema::getConnection()->statement(
    "SELECT create_hypertable('orders', 'created_date',
      chunk_time_interval => INTERVAL '12 hours', -- 约 2 GB/chunk
      if_not_exists => TRUE)"
);

动态调整 Chunk 间隔

如果创建 hypertable 后发现 chunk 大小不合适,可以调整:

-- 查看当前 chunk 大小
SELECT
  chunk_name,
  pg_size_pretty(pg_total_relation_size(chunk_schema || '.' || chunk_name)) AS size,
  range_start,
  range_end
FROM timescaledb_information.chunks
WHERE hypertable_name = 'orders'
ORDER BY range_start DESC
LIMIT 10;

-- 如果 chunk 过小(< 100 MB)或过大(> 5 GB),调整间隔
-- 注意:只影响新创建的 chunk,已有 chunk 不受影响
SELECT set_chunk_time_interval('orders', INTERVAL '7 days');

性能监控指标

定期检查以下指标:

-- 1. 查看 chunk 数量和大小分布
SELECT
  COUNT(*) AS total_chunks,
  pg_size_pretty(SUM(pg_total_relation_size(chunk_schema || '.' || chunk_name))) AS total_size,
  pg_size_pretty(AVG(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS avg_chunk_size,
  pg_size_pretty(MIN(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS min_chunk_size,
  pg_size_pretty(MAX(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS max_chunk_size
FROM timescaledb_information.chunks
WHERE hypertable_name = 'orders';

-- 2. 查看查询性能(EXPLAIN ANALYZE
EXPLAIN ANALYZE
SELECT COUNT(*), SUM(total_amount)
FROM orders
WHERE created_date >= CURRENT_DATE - INTERVAL '30 days';
-- 关注 "Chunks excluded by constraint" - 数值越高越好

警告信号

  • 平均 chunk 大小 < 50 MB → chunk 间隔太小,增大间隔
  • 平均 chunk 大小 > 5 GB → chunk 间隔太大,减小间隔
  • 查询扫描 > 20 个 chunks → 考虑增大 chunk 间隔或优化查询
  • Planning Time > Execution Time → chunk 数量过多,需要增大间隔

2. 刷新策略配置

低延迟场景(数据实时性要求高):

SELECT add_continuous_aggregate_policy('orders_daily_by_created',
  start_offset => INTERVAL '7 days',   -- 刷新最近 7 天
  end_offset => INTERVAL '5 minutes',  -- 只排除最近 5 分钟
  schedule_interval => INTERVAL '5 minutes'  -- 每 5 分钟刷新
);

标准场景(平衡性能和实时性):

SELECT add_continuous_aggregate_policy('orders_daily_by_created',
  start_offset => INTERVAL '3 days',
  end_offset => INTERVAL '1 hour',
  schedule_interval => INTERVAL '1 hour'
);

低开销场景(历史数据为主):

SELECT add_continuous_aggregate_policy('orders_daily_by_created',
  start_offset => INTERVAL '1 day',    -- 只刷新昨天
  end_offset => INTERVAL '12 hours',
  schedule_interval => INTERVAL '6 hours'  -- 每 6 小时刷新
);

3. 索引优化

原则:根据常用查询模式创建索引

-- 场景 1:按时间 + 店铺查询
CREATE INDEX ON orders_daily_by_created (day DESC, store_id);

-- 场景 2:按店铺 + 时间排序
CREATE INDEX ON orders_daily_by_created (store_id, day DESC);

-- 场景 3:按公司 + 平台 + 时间
CREATE INDEX ON orders_daily_by_created (company_id, platform_id, day DESC);

4. 数据保留策略

自动删除旧数据(可选):

-- 保留最近 2 年的数据,自动删除更早的
SELECT add_retention_policy('orders', INTERVAL '2 years');

-- 查看保留策略
SELECT * FROM timescaledb_information.jobs
WHERE proc_name = 'policy_retention';

-- 删除保留策略
SELECT remove_retention_policy('orders');

5. 性能监控

查看 chunk 分区情况

SELECT chunk_name, range_start, range_end
FROM timescaledb_information.chunks
WHERE hypertable_name = 'orders'
ORDER BY range_start DESC
LIMIT 10;

查看连续聚合刷新状态

SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregates;

查看后台任务执行情况

SELECT job_id, proc_name, scheduled, next_start, last_finish, last_successful_finish
FROM timescaledb_information.jobs
ORDER BY job_id;

6. 故障排查

手动刷新聚合数据

-- 刷新指定时间范围
CALL refresh_continuous_aggregate('orders_daily_by_created',
  '2025-01-01', '2025-02-01');

-- 刷新所有数据(慎用,耗时长)
CALL refresh_continuous_aggregate('orders_daily_by_created', NULL, NULL);

检查聚合数据是否最新

-- 查看最新的物化数据时间点
SELECT view_name,
       completed_threshold AS last_materialized_time,
       NOW() - completed_threshold AS lag
FROM timescaledb_information.continuous_aggregates
WHERE view_name IN ('orders_daily_by_created', 'orders_daily_by_paid');

总结

架构优势

  1. 高性能:历史数据查询从秒级优化到毫秒级
  2. 灵活性:支持双时间维度(创建时间、付款时间)
  3. 实时性:当天数据实时查询,历史数据预聚合
  4. 可扩展:支持多维度分组(company/platform/store
  5. 可维护:自动刷新策略 + 手动刷新队列

适用场景

  • 订单量大(日订单 > 1 万)
  • 需要多时间维度分析
  • 历史数据查询频繁
  • 需要天/周/月等多种时间粒度统计
  • 需要按多维度分组(公司/平台/店铺)

注意事项

  1. 数据更新:超出刷新窗口的历史修改需要手动刷新或使用刷新队列
  2. 索引规划:根据实际查询模式创建合适的索引
  3. 分区间隔:根据数据量调整,避免分区过多或过少
  4. 监控告警:定期检查聚合刷新状态,防止数据滞后

参考资料