47 KiB
TimescaleDB 多时间维度销售统计方案
概述
本文档描述如何使用 TimescaleDB 实现订单系统的多时间维度销售统计,支持按创建时间和付款时间两个维度进行高性能聚合查询。
业务需求
- 按订单创建日期统计所有订单趋势(包括未支付订单)
- 按订单付款日期统计实际收入(仅已支付订单)
- 支持按 company、platform、store 多维度分组
- 支持天/周/月等多种时间粒度统计
- 历史数据查询性能要求高(毫秒级)
- 当天数据要求实时
技术方案
- 主表:使用
created_date作为 hypertable 分区键 - 连续聚合:创建两个物化视图分别按创建时间和付款时间统计
- 查询策略:历史数据查询聚合表,当天数据实时查询主表
核心概念与工作原理
1. Hypertable:时间分区存储引擎
Hypertable 是什么?
Hypertable 是 TimescaleDB 的核心抽象层,它将普通的 PostgreSQL 表转换为自动按时间分区的表。
Hypertable 的职责:
| 功能 | 说明 |
|---|---|
| 自动分区 | 根据时间列自动创建和管理分区(chunks) |
| 透明查询 | 查询时自动路由到相关分区,无需手动指定分区 |
| 高效写入 | 新数据写入最新分区,避免锁竞争 |
| 压缩存储 | 支持历史分区自动压缩,节省存储空间 |
| 数据保留 | 支持自动删除超过保留期的历史分区 |
Hypertable 不负责的工作:
- ❌ 不会自动聚合数据 - 仅存储原始数据
- ❌ 不会自动创建统计视图 - 需要单独配置
- ❌ 不会加速聚合查询 - 聚合查询仍需扫描所有数据
示例:
-- 创建 hypertable
SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days');
-- 效果:
-- 2025-01-01 ~ 2025-01-07 的数据存储在 chunk_1
-- 2025-01-08 ~ 2025-01-14 的数据存储在 chunk_2
-- 2025-01-15 ~ 2025-01-21 的数据存储在 chunk_3
-- ...
-- 查询时自动路由(用户无感知):
SELECT * FROM orders
WHERE created_date >= '2025-01-10'
AND created_date < '2025-01-12';
-- ↑ TimescaleDB 自动只扫描 chunk_2,不扫描其他分区
关键理解:
Hypertable 只是存储优化,不是查询优化。 要加速聚合查询,还需要配置连续聚合。
2. 物化视图对比:三种视图的本质区别
| 特性 | 普通视图 (VIEW) |
PostgreSQL 物化视图 (MATERIALIZED VIEW) |
TimescaleDB 连续聚合 (Continuous Aggregate) |
|---|---|---|---|
| 数据存储 | ❌ 不存储,每次查询重新计算 | ✅ 存储预计算结果 | ✅ 存储预计算结果 |
| 查询性能 | 慢(每次重新聚合) | 快(查询预计算结果) | 极快(查询预计算结果) |
| 数据刷新 | 实时(总是最新) | 手动刷新(REFRESH MATERIALIZED VIEW) |
自动增量刷新(后台任务) |
| 刷新开销 | N/A | 高(全量重新计算) | 低(仅计算变化的数据) |
| 历史数据更新 | 实时反映 | 手动刷新后才反映 | 配置刷新窗口或手动刷新 |
| 实时查询 | ✅ 支持 | ❌ 不支持(必须先刷新) | ✅ 支持(与实时数据混合查询) |
| 适用场景 | 数据量小,实时性要求高 | 数据量中等,可容忍手动刷新 | 时间序列数据,需要自动维护 |
示例对比:
-- 1. 普通视图(每次查询都重新聚合,慢)
CREATE VIEW daily_sales AS
SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue
FROM orders
GROUP BY day;
SELECT * FROM daily_sales WHERE day = '2025-01-01';
-- ↑ 扫描 orders 表中所有 2025-01-01 的数据,实时计算
-- 2. PostgreSQL 物化视图(预计算,需手动刷新)
CREATE MATERIALIZED VIEW daily_sales AS
SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue
FROM orders
GROUP BY day;
SELECT * FROM daily_sales WHERE day = '2025-01-01';
-- ↑ 直接查询已计算的结果,快
-- 数据更新后必须手动刷新(全量重新计算)
REFRESH MATERIALIZED VIEW daily_sales;
-- 3. TimescaleDB 连续聚合(自动增量刷新)
CREATE MATERIALIZED VIEW daily_sales
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', created_date) AS day, SUM(total_amount) AS revenue
FROM orders
GROUP BY day
WITH NO DATA;
-- 配置自动刷新策略(每小时刷新最近 3 天的数据)
SELECT add_continuous_aggregate_policy('daily_sales',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
SELECT * FROM daily_sales WHERE day = '2025-01-01';
-- ↑ 查询预计算结果 + 自动包含最新数据(实时混合查询)
3. WITH (timescaledb.continuous) 的作用
标准物化视图:
CREATE MATERIALIZED VIEW my_view AS
SELECT ... FROM ...;
-- ↑ 这是 PostgreSQL 标准物化视图
-- - 需要手动 REFRESH MATERIALIZED VIEW
-- - 每次刷新都是全量重新计算
连续聚合:
CREATE MATERIALIZED VIEW my_view
WITH (timescaledb.continuous) AS -- ← 关键参数
SELECT time_bucket('1 day', time_column) AS bucket, ...
FROM hypertable
GROUP BY bucket;
-- ↑ 这是 TimescaleDB 连续聚合
-- - 自动后台刷新(配置刷新策略后)
-- - 增量刷新(只计算变化的 chunks)
-- - 支持实时查询(自动混合最新数据)
WITH (timescaledb.continuous) 启用的特性:
- 增量物化 - 只计算有变化的时间分区(chunks),不是全量重新计算
- 自动刷新机制 - 可配置后台任务自动更新数据
- 实时查询能力 - 查询时自动混合物化数据 + 未物化的实时数据
- 失效跟踪 - 自动跟踪哪些时间段的数据需要重新计算
关键理解:
WITH (timescaledb.continuous)不是"自动开启刷新", 而是"允许配置增量刷新策略"。 创建后还需要配置刷新策略才会自动更新!
4. 完整集成步骤:三步法
TimescaleDB 的统计功能不是一步到位的,需要按顺序完成三个步骤:
步骤1: 创建 Hypertable → 步骤2: 创建连续聚合 → 步骤3: 配置刷新策略 → 自动维护统计数据
步骤 1:创建 Hypertable
目的:将普通表转换为时间分区表,优化存储和查询
-- 在已有的 orders 表上创建 hypertable
SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days'
);
效果:
- ✅ 数据按时间自动分区存储
- ✅ 查询时自动路由到相关分区
- ❌ 聚合查询仍然很慢(未加速)
步骤 2:创建连续聚合
目的:创建预聚合视图,存储统计结果
CREATE MATERIALIZED VIEW orders_daily_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', created_date) AS day,
company_id,
COUNT(*) AS total_orders,
SUM(total_amount) AS revenue
FROM orders
GROUP BY day, company_id
WITH NO DATA; -- 先不填充数据,等待刷新策略触发
效果:
- ✅ 视图已创建,可以查询
- ❌ 数据为空(WITH NO DATA)
- ❌ 不会自动更新(未配置刷新策略)
步骤 3:配置刷新策略
目的:启用自动后台刷新,保持数据最新
SELECT add_continuous_aggregate_policy('orders_daily_stats',
start_offset => INTERVAL '3 days', -- 从 3 天前开始刷新
end_offset => INTERVAL '1 hour', -- 刷新到 1 小时前(排除正在写入的数据)
schedule_interval => INTERVAL '1 hour' -- 每小时执行一次
);
效果:
- ✅ 后台任务每小时自动刷新
- ✅ 只刷新 [now() - 3天, now() - 1小时] 窗口内的数据
- ✅ 增量计算,性能高
初次填充数据:
-- 刷新策略只刷新配置的窗口,历史数据需要手动触发一次
CALL refresh_continuous_aggregate('orders_daily_stats',
'2024-01-01', -- 从这个日期开始
'2025-01-01' -- 刷新到这个日期
);
5. 数据流与业务流程
数据写入流程
应用写入订单数据
↓
orders 表接收数据
↓
TimescaleDB 路由器判断时间
├→ Chunk 1 (2025-01-01~07)
├→ Chunk 2 (2025-01-08~14)
├→ Chunk 3 (2025-01-15~21)
└→ Chunk N (最新数据)
数据聚合流程
后台刷新任务(每小时执行)
↓
检测变化的 Chunks
├→ Chunk 10 有新数据 → 重新计算 Chunk 10 的聚合
├→ Chunk 11 有更新 → 重新计算 Chunk 11 的聚合
└→ Chunk 12 无变化 → 跳过
↓
更新 orders_daily_by_created 视图
↓
聚合数据已更新
查询流程(实时 + 历史混合)
查询最近 30 天销售额
↓
查询优化器分析
├→ 历史数据(30天前 ~ 昨天)
│ ↓
│ 查询连续聚合 orders_daily_by_created
│ (速度:毫秒级)
│
└→ 实时数据(今天)
↓
查询 orders 表实时计算
(速度:秒级,但数据量少)
↓
合并结果 → 返回完整统计数据
完整业务流
┌────────────────────────────────────────────────────────────────┐
│ 应用层操作 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 1. 订单写入 │
│ INSERT INTO orders (...) VALUES (...); │
│ ↓ │
│ [写入 hypertable,自动路由到对应 chunk] │
│ │
│ 2. 后台自动聚合(每小时执行) │
│ [TimescaleDB 后台任务] │
│ ↓ │
│ 检测最近 3 天有变化的 chunks │
│ ↓ │
│ 重新计算这些 chunks 的聚合数据 │
│ ↓ │
│ 更新 orders_daily_by_created 视图 │
│ │
│ 3. 历史订单更新(超出刷新窗口) │
│ UPDATE orders SET total_paid = 100 WHERE id = 123; │
│ ↓ │
│ [应用层检测:订单创建于 30 天前] │
│ ↓ │
│ INSERT INTO aggregate_refresh_queue ... │
│ ↓ │
│ [定时任务每天凌晨处理队列] │
│ ↓ │
│ CALL refresh_continuous_aggregate('...', '2024-12-01', ...) │
│ │
│ 4. 查询统计数据 │
│ SELECT * FROM orders_daily_by_created │
│ WHERE day >= '2024-12-01' AND day < CURRENT_DATE │
│ ↓ │
│ [查询预计算的聚合数据,毫秒级返回] │
│ │
│ UNION ALL │
│ │
│ SELECT ... FROM orders │
│ WHERE created_date >= CURRENT_DATE │
│ ↓ │
│ [实时聚合当天数据,秒级返回] │
│ │
└────────────────────────────────────────────────────────────────┘
关键时间点:
| 数据年龄 | 处理方式 | 查询来源 | 更新方式 |
|---|---|---|---|
| 今天 | 实时数据 | orders 表(实时聚合) | 写入即可见 |
| 1小时前 ~ 3天前 | 自动刷新窗口 | 连续聚合 | 后台任务每小时刷新 |
| 3天前 ~ N年前 | 历史数据 | 连续聚合 | 手动刷新或刷新队列 |
架构设计
1. Hypertable 分区策略
TimescaleDB 的 hypertable 只能有一个时间分区键,我们选择 created_date:
原因:
- 所有订单都有
created_date(非 NULL) paid_date在未支付时为 NULL,无法作为分区键- 订单按创建时间自然增长,适合时间序列存储
配置:
SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days');
2. 连续聚合机制
连续聚合(Continuous Aggregates)是 TimescaleDB 提供的自动维护的物化视图:
特性:
- 自动增量更新,只计算变化的数据
- 支持配置刷新策略(时间窗口、刷新频率)
- 查询速度远快于实时聚合(预计算结果)
- 支持与实时数据混合查询
两个聚合视图:
-
orders_daily_by_created - 按创建日期统计
- 统计所有订单(包括未支付)
- 区分已支付和未支付订单数量
- 适用于分析订单趋势、转化率
-
orders_daily_by_paid - 按付款日期统计
- 仅统计已支付订单(WHERE paid_date IS NOT NULL)
- 反映真实收入时间分布
- 适用于财务报表、收入分析
实施步骤
步骤 1:修改 orders 表迁移文件
文件:backend/migrations/2025_11_11_083522_create_orders_table.php
在 up() 方法末尾添加 TimescaleDB 配置:
public function up(): void
{
// ... 现有的表创建和索引代码 ...
// 为 jsonb 字段创建 GIN 索引(PostgreSQL)
Schema::getConnection()->statement('CREATE INDEX orders_raw_gin_idx ON orders USING gin (raw)');
Schema::getConnection()->statement('CREATE INDEX orders_ext_gin_idx ON orders USING gin (ext)');
// ==================== TimescaleDB 配置 ====================
// 1. 启用 TimescaleDB 扩展(如果未启用)
Schema::getConnection()->statement('CREATE EXTENSION IF NOT EXISTS timescaledb');
// 2. 将 orders 表转换为 hypertable
// - 使用 created_date 作为时间分区键
// - 每个分区(chunk)存储 7 天数据
// - if_not_exists => true 避免重复创建报错
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days',
if_not_exists => TRUE)"
);
// 3. (可选)设置数据保留策略 - 自动删除 2 年前的数据
// Schema::getConnection()->statement(
// "SELECT add_retention_policy('orders', INTERVAL '2 years')"
// );
}
参数说明:
chunk_time_interval: 分区间隔,根据数据量调整- 小数据量:7 天(默认)
- 中等数据量:1 天
- 大数据量:半天或更小
if_not_exists: 避免重复创建时报错
步骤 2:创建连续聚合迁移文件
创建文件:backend/migrations/2025_11_12_XXXXXX_create_orders_continuous_aggregates.php
<?php
use Hyperf\Database\Schema\Schema;
use Hyperf\Database\Migrations\Migration;
return new class extends Migration
{
/**
* Run the migrations.
*/
public function up(): void
{
$conn = Schema::getConnection();
// ========== 连续聚合 1:按创建日期统计 ==========
$conn->statement("
CREATE MATERIALIZED VIEW orders_daily_by_created
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', created_date) AS day,
company_id,
platform_id,
store_id,
-- 订单数量指标
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,
-- 销售额指标
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
SUM(total_received) AS sum_total_received,
-- 平均值指标
AVG(total_amount) AS avg_order_amount,
AVG(total_paid) FILTER (WHERE paid_date IS NOT NULL) AS avg_paid_amount,
-- 费用指标
SUM(freight_fee) AS sum_freight_fee,
SUM(tax_fee) AS sum_tax_fee,
SUM(commission_fee) AS sum_commission_fee,
SUM(discount_fee) AS sum_discount_fee
FROM orders
GROUP BY day, company_id, platform_id, store_id
WITH NO DATA;
");
// 配置自动刷新策略
// - 刷新最近 3 天到 1 小时前的数据
// - 每小时执行一次
$conn->statement("
SELECT add_continuous_aggregate_policy('orders_daily_by_created',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
");
// 创建索引加速查询
$conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, company_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, platform_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, store_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_created (company_id, day DESC)');
$conn->statement('CREATE INDEX ON orders_daily_by_created (store_id, day DESC)');
// ========== 连续聚合 2:按付款日期统计 ==========
$conn->statement("
CREATE MATERIALIZED VIEW orders_daily_by_paid
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', paid_date) AS day,
company_id,
platform_id,
store_id,
-- 已支付订单统计
COUNT(*) AS paid_orders,
-- 销售额指标
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
SUM(total_received) AS sum_total_received,
-- 平均值指标
AVG(total_amount) AS avg_order_amount,
AVG(total_paid) AS avg_paid_amount,
-- 费用指标
SUM(freight_fee) AS sum_freight_fee,
SUM(tax_fee) AS sum_tax_fee,
SUM(commission_fee) AS sum_commission_fee,
SUM(discount_fee) AS sum_discount_fee
FROM orders
WHERE paid_date IS NOT NULL -- 只统计已支付订单
GROUP BY day, company_id, platform_id, store_id
WITH NO DATA;
");
// 配置自动刷新策略
$conn->statement("
SELECT add_continuous_aggregate_policy('orders_daily_by_paid',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
");
// 创建索引
$conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, company_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, platform_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, store_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_paid (company_id, day DESC)');
$conn->statement('CREATE INDEX ON orders_daily_by_paid (store_id, day DESC)');
}
/**
* Reverse the migrations.
*/
public function down(): void
{
$conn = Schema::getConnection();
// 删除连续聚合视图(CASCADE 会自动删除关联的刷新策略)
$conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_paid CASCADE');
$conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_created CASCADE');
}
};
刷新策略参数说明:
start_offset: 从多久之前开始刷新(相对于当前时间)end_offset: 刷新到多久之前结束(避免刷新正在写入的数据)schedule_interval: 刷新频率
示例配置:
-- 每小时刷新最近 3 天的数据
start_offset => INTERVAL '3 days'
end_offset => INTERVAL '1 hour'
schedule_interval => INTERVAL '1 hour'
-- 实际刷新窗口:[now() - 3 days, now() - 1 hour]
历史数据更新同步问题
问题描述
连续聚合的刷新策略只覆盖配置的时间窗口(如最近 3 天),当超出窗口的历史订单被修改时,聚合数据不会自动更新。
常见场景:
- 历史订单退款(修改 total_paid)
- 补录付款时间(更新 paid_date)
- 订单状态更正
- 金额调整
解决方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 手动刷新 | 精准控制,开销小 | 需要应用层处理 | 历史修改较少 |
| 实时聚合 | 数据始终准确 | 查询性能下降 | 对准确性要求极高 |
| 扩大刷新窗口 | 自动处理 | 刷新开销增加 | 历史修改集中在近期 |
| 刷新队列 | 平衡性能和准确性 | 实现复杂度高 | 生产环境推荐 |
推荐方案:刷新队列机制
实现思路:
- 订单被修改时,记录需要刷新的日期
- 定时任务批量刷新队列中的日期
- 结合自动刷新策略,覆盖大部分场景
1. 创建刷新队列表
// 迁移文件
Schema::create('aggregate_refresh_queue', function (Blueprint $table) {
$table->date('refresh_date')->comment('需要刷新的日期');
$table->string('aggregate_view', 100)->comment('聚合视图名称');
$table->timestamp('created_at')->comment('入队时间');
$table->primary(['refresh_date', 'aggregate_view']);
$table->index('created_at');
});
2. 订单更新时入队
// 订单更新方法
public function updateOrder($orderId, array $data): Order
{
$order = Order::findOrFail($orderId);
$affectedDates = [];
// 收集受影响的日期
if (isset($data['created_date']) && $data['created_date'] != $order->created_date) {
$affectedDates[] = Carbon::parse($order->created_date)->format('Y-m-d');
$affectedDates[] = Carbon::parse($data['created_date'])->format('Y-m-d');
}
if (isset($data['paid_date'])) {
if ($order->paid_date) {
$affectedDates[] = Carbon::parse($order->paid_date)->format('Y-m-d');
}
if ($data['paid_date']) {
$affectedDates[] = Carbon::parse($data['paid_date'])->format('Y-m-d');
}
}
// 更新订单
$order->update($data);
// 入队需要刷新的日期
foreach (array_unique($affectedDates) as $date) {
$this->enqueueRefresh($date);
}
return $order;
}
private function enqueueRefresh(string $date): void
{
// 只刷新 3 天前的数据(3 天内的由自动策略处理)
if (Carbon::parse($date)->lt(Carbon::today()->subDays(3))) {
DB::table('aggregate_refresh_queue')->insertOrIgnore([
['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => now()],
['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_paid', 'created_at' => now()],
]);
}
}
3. 定时任务批量刷新
// app/Command/RefreshOrderAggregatesCommand.php
<?php
namespace App\Command;
use Hyperf\Command\Command;
use Hyperf\DbConnection\Db;
use Carbon\Carbon;
class RefreshOrderAggregatesCommand extends Command
{
protected ?string $signature = 'orders:refresh-aggregates';
protected string $description = '刷新订单聚合数据队列';
public function handle()
{
$queue = DB::table('aggregate_refresh_queue')
->where('created_at', '<', now()->subHour()) // 延迟 1 小时,避免频繁刷新
->orderBy('refresh_date')
->get();
if ($queue->isEmpty()) {
$this->info('No aggregates to refresh.');
return;
}
$grouped = $queue->groupBy('aggregate_view');
foreach ($grouped as $view => $items) {
$dates = $items->pluck('refresh_date')->unique();
$this->info("Refreshing {$view}: " . $dates->count() . " dates");
foreach ($dates as $date) {
$start = Carbon::parse($date)->startOfDay();
$end = Carbon::parse($date)->endOfDay();
try {
DB::statement(
"CALL refresh_continuous_aggregate(?, ?, ?)",
[$view, $start, $end]
);
// 删除已刷新的记录
DB::table('aggregate_refresh_queue')
->where('refresh_date', $date)
->where('aggregate_view', $view)
->delete();
$this->line(" ✓ Refreshed {$date}");
} catch (\Exception $e) {
$this->error(" ✗ Failed to refresh {$date}: " . $e->getMessage());
}
}
}
$this->info('Refresh completed.');
}
}
配置定时任务(每天凌晨 2 点执行):
// config/autoload/crontab.php
return [
'enable' => true,
'crontab' => [
[
'name' => 'refresh-order-aggregates',
'rule' => '0 2 * * *', // 每天凌晨 2 点
'callback' => [App\Command\RefreshOrderAggregatesCommand::class, 'handle'],
'memo' => '刷新订单聚合数据',
],
],
];
查询示例
1. 按创建日期统计(历史 + 当天)
-- 查询最近 30 天的订单统计(按创建日期)
SELECT * FROM (
-- 历史数据(从连续聚合查询,速度快)
SELECT
day,
company_id,
platform_id,
store_id,
total_orders,
paid_orders,
unpaid_orders,
sum_total_amount,
sum_total_paid,
avg_order_amount
FROM orders_daily_by_created
WHERE day >= CURRENT_DATE - INTERVAL '30 days'
AND day < CURRENT_DATE
UNION ALL
-- 当天实时数据
SELECT
time_bucket('1 day', created_date) AS day,
company_id,
platform_id,
store_id,
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
AVG(total_amount) AS avg_order_amount
FROM orders
WHERE created_date >= CURRENT_DATE
GROUP BY day, company_id, platform_id, store_id
) AS combined
ORDER BY day DESC, company_id, platform_id, store_id;
2. 按付款日期统计(历史 + 当天)
-- 查询最近 30 天的实际收入(按付款日期)
SELECT * FROM (
-- 历史数据
SELECT
day,
company_id,
platform_id,
store_id,
paid_orders,
sum_total_paid,
avg_paid_amount
FROM orders_daily_by_paid
WHERE day >= CURRENT_DATE - INTERVAL '30 days'
AND day < CURRENT_DATE
UNION ALL
-- 当天实时数据
SELECT
time_bucket('1 day', paid_date) AS day,
company_id,
platform_id,
store_id,
COUNT(*) AS paid_orders,
SUM(total_paid) AS sum_total_paid,
AVG(total_paid) AS avg_paid_amount
FROM orders
WHERE paid_date >= CURRENT_DATE
AND paid_date IS NOT NULL
GROUP BY day, company_id, platform_id, store_id
) AS combined
ORDER BY day DESC;
3. 按公司(Company)分组统计
-- 统计各公司每天的订单情况
SELECT
day,
company_id,
SUM(total_orders) AS total_orders,
SUM(paid_orders) AS paid_orders,
SUM(sum_total_amount) AS total_amount,
SUM(sum_total_paid) AS total_paid,
AVG(avg_order_amount) AS avg_order_amount
FROM orders_daily_by_created
WHERE day >= '2025-01-01' AND day < '2025-02-01'
GROUP BY day, company_id
ORDER BY day DESC, company_id;
-- 统计各公司本月累计销售额(按付款时间)
SELECT
company_id,
SUM(paid_orders) AS total_paid_orders,
SUM(sum_total_paid) AS total_revenue,
AVG(avg_paid_amount) AS avg_order_value
FROM orders_daily_by_paid
WHERE day >= date_trunc('month', CURRENT_DATE)
AND day < CURRENT_DATE
GROUP BY company_id
ORDER BY total_revenue DESC;
4. 按平台(Platform)分组统计
-- 对比各平台每天的订单量和销售额
SELECT
day,
platform_id,
SUM(total_orders) AS total_orders,
SUM(paid_orders) AS paid_orders,
SUM(sum_total_amount) AS total_amount,
SUM(sum_total_paid) AS total_paid,
ROUND(SUM(paid_orders)::numeric / NULLIF(SUM(total_orders), 0) * 100, 2) AS payment_rate
FROM orders_daily_by_created
WHERE day >= CURRENT_DATE - INTERVAL '7 days'
AND day < CURRENT_DATE
GROUP BY day, platform_id
ORDER BY day DESC, total_paid DESC;
-- 统计各平台本周每天的实际收入(按付款时间)
SELECT
day,
platform_id,
SUM(paid_orders) AS orders,
SUM(sum_total_paid) AS revenue,
SUM(sum_freight_fee) AS freight,
SUM(sum_commission_fee) AS commission
FROM orders_daily_by_paid
WHERE day >= date_trunc('week', CURRENT_DATE)
AND day < CURRENT_DATE
GROUP BY day, platform_id
ORDER BY day DESC, revenue DESC;
5. 按店铺(Store)分组统计
-- 统计各店铺每天的订单情况
SELECT
day,
store_id,
SUM(total_orders) AS total_orders,
SUM(paid_orders) AS paid_orders,
SUM(unpaid_orders) AS unpaid_orders,
SUM(sum_total_amount) AS total_amount,
SUM(sum_total_paid) AS total_paid
FROM orders_daily_by_created
WHERE day >= '2025-01-01' AND day < '2025-02-01'
GROUP BY day, store_id
ORDER BY day DESC, total_paid DESC;
-- 查询指定店铺的日销售趋势(按付款时间)
SELECT
day,
paid_orders,
sum_total_paid AS revenue,
avg_paid_amount,
sum_freight_fee,
sum_tax_fee
FROM orders_daily_by_paid
WHERE store_id = 123
AND day >= CURRENT_DATE - INTERVAL '30 days'
AND day < CURRENT_DATE
ORDER BY day DESC;
6. 多维度组合查询
-- 查询特定公司下各店铺的每日销售情况
SELECT
day,
company_id,
store_id,
SUM(total_orders) AS orders,
SUM(paid_orders) AS paid,
SUM(sum_total_paid) AS revenue,
AVG(avg_paid_amount) AS avg_value
FROM orders_daily_by_created
WHERE company_id = 100
AND day >= '2025-01-01' AND day < '2025-02-01'
GROUP BY day, company_id, store_id
ORDER BY day DESC, revenue DESC;
-- 对比各公司在不同平台的销售表现(按付款时间)
SELECT
company_id,
platform_id,
SUM(paid_orders) AS total_orders,
SUM(sum_total_paid) AS total_revenue,
AVG(avg_paid_amount) AS avg_order_value,
SUM(sum_commission_fee) AS total_commission,
ROUND((SUM(sum_commission_fee) / NULLIF(SUM(sum_total_paid), 0) * 100), 2) AS commission_rate
FROM orders_daily_by_paid
WHERE day >= date_trunc('month', CURRENT_DATE)
AND day < CURRENT_DATE
GROUP BY company_id, platform_id
ORDER BY company_id, total_revenue DESC;
-- 查询指定公司、平台、店铺的销售趋势
SELECT
day,
total_orders,
paid_orders,
unpaid_orders,
sum_total_amount,
sum_total_paid,
avg_order_amount
FROM orders_daily_by_created
WHERE company_id = 100
AND platform_id = 200
AND store_id = 300
AND day >= CURRENT_DATE - INTERVAL '90 days'
AND day < CURRENT_DATE
ORDER BY day DESC;
7. 周/月聚合查询
-- 按周统计(按创建时间)
SELECT
time_bucket('7 days', day) AS week,
company_id,
SUM(total_orders) AS weekly_orders,
SUM(paid_orders) AS weekly_paid_orders,
SUM(sum_total_paid) AS weekly_revenue,
AVG(avg_order_amount) AS avg_order_value
FROM orders_daily_by_created
WHERE day >= '2025-01-01'
GROUP BY week, company_id
ORDER BY week DESC, company_id;
-- 按月统计(按付款时间)
SELECT
date_trunc('month', day) AS month,
platform_id,
SUM(paid_orders) AS monthly_orders,
SUM(sum_total_paid) AS monthly_revenue,
SUM(sum_freight_fee) AS monthly_freight,
SUM(sum_commission_fee) AS monthly_commission
FROM orders_daily_by_paid
WHERE day >= '2024-01-01'
GROUP BY month, platform_id
ORDER BY month DESC, monthly_revenue DESC;
8. 实时 + 历史混合查询(应用层封装)
PHP 查询示例:
<?php
namespace App\Service;
use Hyperf\DbConnection\Db;
use Carbon\Carbon;
class OrderStatisticsService
{
/**
* 查询指定店铺的日销售统计(按创建时间)
*
* @param int $storeId 店铺 ID
* @param string $startDate 开始日期 (Y-m-d)
* @param string $endDate 结束日期 (Y-m-d)
* @return array
*/
public function getDailyStatsByStore(int $storeId, string $startDate, string $endDate): array
{
$start = Carbon::parse($startDate);
$end = Carbon::parse($endDate);
$today = Carbon::today();
$results = [];
// 查询历史数据(从聚合表)
if ($start->lt($today)) {
$historyEnd = $end->lt($today) ? $end : $today->copy()->subDay();
$history = Db::select("
SELECT
day::date,
total_orders,
paid_orders,
unpaid_orders,
sum_total_amount,
sum_total_paid,
avg_order_amount
FROM orders_daily_by_created
WHERE store_id = ?
AND day >= ?
AND day <= ?
ORDER BY day DESC
", [$storeId, $start->format('Y-m-d'), $historyEnd->format('Y-m-d')]);
$results = array_merge($results, $history);
}
// 查询当天实时数据
if ($end->gte($today)) {
$realtime = Db::select("
SELECT
time_bucket('1 day', created_date)::date AS day,
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
AVG(total_amount) AS avg_order_amount
FROM orders
WHERE store_id = ?
AND created_date >= ?
AND created_date < ?
GROUP BY day
ORDER BY day DESC
", [$storeId, $today->format('Y-m-d H:i:s'), $end->addDay()->format('Y-m-d H:i:s')]);
$results = array_merge($results, $realtime);
}
return $results;
}
/**
* 查询各公司本月销售排名(按付款时间)
*
* @return array
*/
public function getMonthlyRevenueByCompany(): array
{
$monthStart = Carbon::now()->startOfMonth();
$today = Carbon::today();
// 历史数据
$history = Db::select("
SELECT
company_id,
SUM(paid_orders) AS total_orders,
SUM(sum_total_paid) AS total_revenue,
AVG(avg_paid_amount) AS avg_order_value
FROM orders_daily_by_paid
WHERE day >= ?
AND day < ?
GROUP BY company_id
", [$monthStart->format('Y-m-d'), $today->format('Y-m-d')]);
// 当天实时数据
$realtime = Db::select("
SELECT
company_id,
COUNT(*) AS total_orders,
SUM(total_paid) AS total_revenue,
AVG(total_paid) AS avg_order_value
FROM orders
WHERE paid_date >= ?
AND paid_date IS NOT NULL
GROUP BY company_id
", [$today->format('Y-m-d H:i:s')]);
// 合并数据
$combined = [];
foreach ($history as $row) {
$combined[$row->company_id] = [
'company_id' => $row->company_id,
'total_orders' => $row->total_orders,
'total_revenue' => $row->total_revenue,
'avg_order_value' => $row->avg_order_value,
];
}
foreach ($realtime as $row) {
if (isset($combined[$row->company_id])) {
$combined[$row->company_id]['total_orders'] += $row->total_orders;
$combined[$row->company_id]['total_revenue'] += $row->total_revenue;
// 重新计算平均值
$combined[$row->company_id]['avg_order_value'] =
$combined[$row->company_id]['total_revenue'] / $combined[$row->company_id]['total_orders'];
} else {
$combined[$row->company_id] = [
'company_id' => $row->company_id,
'total_orders' => $row->total_orders,
'total_revenue' => $row->total_revenue,
'avg_order_value' => $row->avg_order_value,
];
}
}
// 按收入排序
usort($combined, fn($a, $b) => $b['total_revenue'] <=> $a['total_revenue']);
return $combined;
}
}
最佳实践
1. Chunk 大小优化(重要)
Chunk 大小的核心原则
TimescaleDB 官方建议:
- 目标大小:500 MB ~ 2 GB per chunk(最优性能范围)
- 内存限制:Chunk 大小应 ≤ 25% 可用内存
- 查询效率:单次查询扫描的 chunk 数量建议 < 10 个
- 上限:单个 chunk 不建议超过 5 GB
计算您的 Chunk 大小
步骤 1:估算单行数据大小
orders 表单行估算(含索引):
- 基础字段(id, company_id, 金额等):~300 bytes
- 时间字段(4个 timestampTz):~32 bytes
- 文本字段(zipcode, city, province, country):~200 bytes
- JSONB 字段(raw, ext):~500-1000 bytes(视内容而定)
- 索引开销(14个索引):~800 bytes
─────────────────────────────────────────
总计:约 1.5 - 2.5 KB/行(平均 2KB)
步骤 2:计算每天数据量
公式:日数据量 = 日订单数 × 单行大小
示例:
- 10,000 单/天 × 2KB = 20 MB/天
- 50,000 单/天 × 2KB = 100 MB/天
- 100,000 单/天 × 2KB = 200 MB/天
- 500,000 单/天 × 2KB = 1 GB/天
步骤 3:选择 chunk_time_interval
| 日订单量 | 日数据量 | 推荐间隔 | Chunk 大小 | 性能评价 |
|---|---|---|---|---|
| < 5千 | < 10 MB/天 | 14-30天 | 140-300 MB | ✅ 最优:减少 chunk 数量 |
| 5千-2万 | 10-40 MB/天 | 7-14天 | 70-560 MB | ✅ 最优:平衡性能 |
| 2万-10万 | 40-200 MB/天 | 3-7天 | 120MB-1.4GB | ✅ 推荐 |
| 10万-50万 | 200MB-1GB/天 | 1-3天 | 200MB-3GB | ✅ 推荐 |
| 50万-100万 | 1-2 GB/天 | 12-24小时 | 1.5-4 GB | ✅ 可接受 |
| > 100万 | > 2 GB/天 | 6-12小时 | 2-4 GB | ⚠️ 需监控性能 |
查询模式影响分析
常见查询时间范围 vs Chunk 配置:
-- 假设您最常查询"最近 30 天"数据
SELECT * FROM orders
WHERE created_date >= CURRENT_DATE - INTERVAL '30 days';
-- 不同配置扫描的 chunk 数量:
-- 1天间隔:30个chunks ❌ 过多,规划时间长
-- 3天间隔:10个chunks ✅ 理想
-- 7天间隔:5个chunks ✅ 最优
-- 14天间隔:3个chunks ✅ 很好
-- 30天间隔:1个chunk ⚠️ chunk 过大,可能超出内存
最佳实践:
根据您最常见的查询时间范围,配置 chunk_time_interval 使得单次查询扫描 3-8 个 chunks。
实际配置示例
// 场景 1:小规模电商(日均 5000 单)
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '14 days', -- 约 140 MB/chunk
if_not_exists => TRUE)"
);
// 场景 2:中等规模(日均 5 万单)
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days', -- 约 700 MB/chunk
if_not_exists => TRUE)"
);
// 场景 3:大规模平台(日均 50 万单)
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '1 day', -- 约 1 GB/chunk
if_not_exists => TRUE)"
);
// 场景 4:超大规模(日均 200 万单)
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '12 hours', -- 约 2 GB/chunk
if_not_exists => TRUE)"
);
动态调整 Chunk 间隔
如果创建 hypertable 后发现 chunk 大小不合适,可以调整:
-- 查看当前 chunk 大小
SELECT
chunk_name,
pg_size_pretty(pg_total_relation_size(chunk_schema || '.' || chunk_name)) AS size,
range_start,
range_end
FROM timescaledb_information.chunks
WHERE hypertable_name = 'orders'
ORDER BY range_start DESC
LIMIT 10;
-- 如果 chunk 过小(< 100 MB)或过大(> 5 GB),调整间隔
-- 注意:只影响新创建的 chunk,已有 chunk 不受影响
SELECT set_chunk_time_interval('orders', INTERVAL '7 days');
性能监控指标
定期检查以下指标:
-- 1. 查看 chunk 数量和大小分布
SELECT
COUNT(*) AS total_chunks,
pg_size_pretty(SUM(pg_total_relation_size(chunk_schema || '.' || chunk_name))) AS total_size,
pg_size_pretty(AVG(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS avg_chunk_size,
pg_size_pretty(MIN(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS min_chunk_size,
pg_size_pretty(MAX(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS max_chunk_size
FROM timescaledb_information.chunks
WHERE hypertable_name = 'orders';
-- 2. 查看查询性能(EXPLAIN ANALYZE)
EXPLAIN ANALYZE
SELECT COUNT(*), SUM(total_amount)
FROM orders
WHERE created_date >= CURRENT_DATE - INTERVAL '30 days';
-- 关注 "Chunks excluded by constraint" - 数值越高越好
警告信号:
- ❌ 平均 chunk 大小 < 50 MB → chunk 间隔太小,增大间隔
- ❌ 平均 chunk 大小 > 5 GB → chunk 间隔太大,减小间隔
- ❌ 查询扫描 > 20 个 chunks → 考虑增大 chunk 间隔或优化查询
- ❌ Planning Time > Execution Time → chunk 数量过多,需要增大间隔
2. 刷新策略配置
低延迟场景(数据实时性要求高):
SELECT add_continuous_aggregate_policy('orders_daily_by_created',
start_offset => INTERVAL '7 days', -- 刷新最近 7 天
end_offset => INTERVAL '5 minutes', -- 只排除最近 5 分钟
schedule_interval => INTERVAL '5 minutes' -- 每 5 分钟刷新
);
标准场景(平衡性能和实时性):
SELECT add_continuous_aggregate_policy('orders_daily_by_created',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
低开销场景(历史数据为主):
SELECT add_continuous_aggregate_policy('orders_daily_by_created',
start_offset => INTERVAL '1 day', -- 只刷新昨天
end_offset => INTERVAL '12 hours',
schedule_interval => INTERVAL '6 hours' -- 每 6 小时刷新
);
3. 索引优化
原则:根据常用查询模式创建索引
-- 场景 1:按时间 + 店铺查询
CREATE INDEX ON orders_daily_by_created (day DESC, store_id);
-- 场景 2:按店铺 + 时间排序
CREATE INDEX ON orders_daily_by_created (store_id, day DESC);
-- 场景 3:按公司 + 平台 + 时间
CREATE INDEX ON orders_daily_by_created (company_id, platform_id, day DESC);
4. 数据保留策略
自动删除旧数据(可选):
-- 保留最近 2 年的数据,自动删除更早的
SELECT add_retention_policy('orders', INTERVAL '2 years');
-- 查看保留策略
SELECT * FROM timescaledb_information.jobs
WHERE proc_name = 'policy_retention';
-- 删除保留策略
SELECT remove_retention_policy('orders');
5. 性能监控
查看 chunk 分区情况:
SELECT chunk_name, range_start, range_end
FROM timescaledb_information.chunks
WHERE hypertable_name = 'orders'
ORDER BY range_start DESC
LIMIT 10;
查看连续聚合刷新状态:
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregates;
查看后台任务执行情况:
SELECT job_id, proc_name, scheduled, next_start, last_finish, last_successful_finish
FROM timescaledb_information.jobs
ORDER BY job_id;
6. 故障排查
手动刷新聚合数据:
-- 刷新指定时间范围
CALL refresh_continuous_aggregate('orders_daily_by_created',
'2025-01-01', '2025-02-01');
-- 刷新所有数据(慎用,耗时长)
CALL refresh_continuous_aggregate('orders_daily_by_created', NULL, NULL);
检查聚合数据是否最新:
-- 查看最新的物化数据时间点
SELECT view_name,
completed_threshold AS last_materialized_time,
NOW() - completed_threshold AS lag
FROM timescaledb_information.continuous_aggregates
WHERE view_name IN ('orders_daily_by_created', 'orders_daily_by_paid');
总结
架构优势
- 高性能:历史数据查询从秒级优化到毫秒级
- 灵活性:支持双时间维度(创建时间、付款时间)
- 实时性:当天数据实时查询,历史数据预聚合
- 可扩展:支持多维度分组(company/platform/store)
- 可维护:自动刷新策略 + 手动刷新队列
适用场景
- ✅ 订单量大(日订单 > 1 万)
- ✅ 需要多时间维度分析
- ✅ 历史数据查询频繁
- ✅ 需要天/周/月等多种时间粒度统计
- ✅ 需要按多维度分组(公司/平台/店铺)
注意事项
- 数据更新:超出刷新窗口的历史修改需要手动刷新或使用刷新队列
- 索引规划:根据实际查询模式创建合适的索引
- 分区间隔:根据数据量调整,避免分区过多或过少
- 监控告警:定期检查聚合刷新状态,防止数据滞后