Files
datahub/docs/data_query.md
2025-11-13 14:42:54 +08:00

1483 lines
47 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# TimescaleDB 多时间维度销售统计方案
## 概述
本文档描述如何使用 TimescaleDB 实现订单系统的多时间维度销售统计,支持按**创建时间**和**付款时间**两个维度进行高性能聚合查询。
### 业务需求
- 按订单创建日期统计所有订单趋势(包括未支付订单)
- 按订单付款日期统计实际收入(仅已支付订单)
- 支持按 company、platform、store 多维度分组
- 支持天/周/月等多种时间粒度统计
- 历史数据查询性能要求高(毫秒级)
- 当天数据要求实时
### 技术方案
- **主表**:使用 `created_date` 作为 hypertable 分区键
- **连续聚合**:创建两个物化视图分别按创建时间和付款时间统计
- **查询策略**:历史数据查询聚合表,当天数据实时查询主表
---
## 核心概念与工作原理
### 1. Hypertable:时间分区存储引擎
**Hypertable 是什么?**
Hypertable 是 TimescaleDB 的核心抽象层,它将普通的 PostgreSQL 表转换为**自动按时间分区的表**。
**Hypertable 的职责**
| 功能 | 说明 |
|------|------|
| **自动分区** | 根据时间列自动创建和管理分区(chunks) |
| **透明查询** | 查询时自动路由到相关分区,无需手动指定分区 |
| **高效写入** | 新数据写入最新分区,避免锁竞争 |
| **压缩存储** | 支持历史分区自动压缩,节省存储空间 |
| **数据保留** | 支持自动删除超过保留期的历史分区 |
**Hypertable 不负责的工作**
-**不会自动聚合数据** - 仅存储原始数据
-**不会自动创建统计视图** - 需要单独配置
-**不会加速聚合查询** - 聚合查询仍需扫描所有数据
**示例**
```sql
-- 创建 hypertable
SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days');
-- 效果:
-- 2025-01-01 ~ 2025-01-07 的数据存储在 chunk_1
-- 2025-01-08 ~ 2025-01-14 的数据存储在 chunk_2
-- 2025-01-15 ~ 2025-01-21 的数据存储在 chunk_3
-- ...
-- 查询时自动路由(用户无感知):
SELECT * FROM orders
WHERE created_date >= '2025-01-10'
AND created_date < '2025-01-12';
-- ↑ TimescaleDB 自动只扫描 chunk_2,不扫描其他分区
```
**关键理解**
> Hypertable 只是**存储优化**,不是**查询优化**。
> 要加速聚合查询,还需要配置**连续聚合**。
---
### 2. 物化视图对比:三种视图的本质区别
| 特性 | 普通视图<br>VIEW | PostgreSQL 物化视图<br>MATERIALIZED VIEW | TimescaleDB 连续聚合<br>Continuous Aggregate |
|------|-------------------|------------------------------------------|----------------------------------------------|
| **数据存储** | ❌ 不存储,每次查询重新计算 | ✅ 存储预计算结果 | ✅ 存储预计算结果 |
| **查询性能** | 慢(每次重新聚合) | 快(查询预计算结果) | 极快(查询预计算结果) |
| **数据刷新** | 实时(总是最新) | 手动刷新(`REFRESH MATERIALIZED VIEW` | 自动增量刷新(后台任务) |
| **刷新开销** | N/A | 高(全量重新计算) | 低(仅计算变化的数据) |
| **历史数据更新** | 实时反映 | 手动刷新后才反映 | 配置刷新窗口或手动刷新 |
| **实时查询** | ✅ 支持 | ❌ 不支持(必须先刷新) | ✅ 支持(与实时数据混合查询) |
| **适用场景** | 数据量小,实时性要求高 | 数据量中等,可容忍手动刷新 | 时间序列数据,需要自动维护 |
**示例对比**
```sql
-- 1. 普通视图(每次查询都重新聚合,慢)
CREATE VIEW daily_sales AS
SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue
FROM orders
GROUP BY day;
SELECT * FROM daily_sales WHERE day = '2025-01-01';
-- ↑ 扫描 orders 表中所有 2025-01-01 的数据,实时计算
-- 2. PostgreSQL 物化视图(预计算,需手动刷新)
CREATE MATERIALIZED VIEW daily_sales AS
SELECT DATE(created_date) AS day, SUM(total_amount) AS revenue
FROM orders
GROUP BY day;
SELECT * FROM daily_sales WHERE day = '2025-01-01';
-- ↑ 直接查询已计算的结果,快
-- 数据更新后必须手动刷新(全量重新计算)
REFRESH MATERIALIZED VIEW daily_sales;
-- 3. TimescaleDB 连续聚合(自动增量刷新)
CREATE MATERIALIZED VIEW daily_sales
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', created_date) AS day, SUM(total_amount) AS revenue
FROM orders
GROUP BY day
WITH NO DATA;
-- 配置自动刷新策略(每小时刷新最近 3 天的数据)
SELECT add_continuous_aggregate_policy('daily_sales',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
SELECT * FROM daily_sales WHERE day = '2025-01-01';
-- ↑ 查询预计算结果 + 自动包含最新数据(实时混合查询)
```
---
### 3. WITH (timescaledb.continuous) 的作用
**标准物化视图**
```sql
CREATE MATERIALIZED VIEW my_view AS
SELECT ... FROM ...;
-- ↑ 这是 PostgreSQL 标准物化视图
-- - 需要手动 REFRESH MATERIALIZED VIEW
-- - 每次刷新都是全量重新计算
```
**连续聚合**
```sql
CREATE MATERIALIZED VIEW my_view
WITH (timescaledb.continuous) AS -- ← 关键参数
SELECT time_bucket('1 day', time_column) AS bucket, ...
FROM hypertable
GROUP BY bucket;
-- ↑ 这是 TimescaleDB 连续聚合
-- - 自动后台刷新(配置刷新策略后)
-- - 增量刷新(只计算变化的 chunks)
-- - 支持实时查询(自动混合最新数据)
```
**WITH (timescaledb.continuous) 启用的特性**
1. **增量物化** - 只计算有变化的时间分区(chunks),不是全量重新计算
2. **自动刷新机制** - 可配置后台任务自动更新数据
3. **实时查询能力** - 查询时自动混合物化数据 + 未物化的实时数据
4. **失效跟踪** - 自动跟踪哪些时间段的数据需要重新计算
**关键理解**
> `WITH (timescaledb.continuous)` 不是"自动开启刷新"
> 而是"允许配置增量刷新策略"。
> 创建后还需要配置刷新策略才会自动更新!
---
### 4. 完整集成步骤:三步法
TimescaleDB 的统计功能不是一步到位的,需要**按顺序完成三个步骤**:
```
步骤1: 创建 Hypertable → 步骤2: 创建连续聚合 → 步骤3: 配置刷新策略 → 自动维护统计数据
```
#### 步骤 1:创建 Hypertable
**目的**:将普通表转换为时间分区表,优化存储和查询
```sql
-- 在已有的 orders 表上创建 hypertable
SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days'
);
```
**效果**
- ✅ 数据按时间自动分区存储
- ✅ 查询时自动路由到相关分区
- ❌ 聚合查询仍然很慢(未加速)
#### 步骤 2:创建连续聚合
**目的**:创建预聚合视图,存储统计结果
```sql
CREATE MATERIALIZED VIEW orders_daily_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', created_date) AS day,
company_id,
COUNT(*) AS total_orders,
SUM(total_amount) AS revenue
FROM orders
GROUP BY day, company_id
WITH NO DATA; -- 先不填充数据,等待刷新策略触发
```
**效果**
- ✅ 视图已创建,可以查询
- ❌ 数据为空(WITH NO DATA
- ❌ 不会自动更新(未配置刷新策略)
#### 步骤 3:配置刷新策略
**目的**:启用自动后台刷新,保持数据最新
```sql
SELECT add_continuous_aggregate_policy('orders_daily_stats',
start_offset => INTERVAL '3 days', -- 从 3 天前开始刷新
end_offset => INTERVAL '1 hour', -- 刷新到 1 小时前(排除正在写入的数据)
schedule_interval => INTERVAL '1 hour' -- 每小时执行一次
);
```
**效果**
- ✅ 后台任务每小时自动刷新
- ✅ 只刷新 [now() - 3天, now() - 1小时] 窗口内的数据
- ✅ 增量计算,性能高
**初次填充数据**
```sql
-- 刷新策略只刷新配置的窗口,历史数据需要手动触发一次
CALL refresh_continuous_aggregate('orders_daily_stats',
'2024-01-01', -- 从这个日期开始
'2025-01-01' -- 刷新到这个日期
);
```
---
### 5. 数据流与业务流程
#### 数据写入流程
```
应用写入订单数据
orders 表接收数据
TimescaleDB 路由器判断时间
├→ Chunk 1 (2025-01-01~07)
├→ Chunk 2 (2025-01-08~14)
├→ Chunk 3 (2025-01-15~21)
└→ Chunk N (最新数据)
```
#### 数据聚合流程
```
后台刷新任务(每小时执行)
检测变化的 Chunks
├→ Chunk 10 有新数据 → 重新计算 Chunk 10 的聚合
├→ Chunk 11 有更新 → 重新计算 Chunk 11 的聚合
└→ Chunk 12 无变化 → 跳过
更新 orders_daily_by_created 视图
聚合数据已更新
```
#### 查询流程(实时 + 历史混合)
```
查询最近 30 天销售额
查询优化器分析
├→ 历史数据(30天前 ~ 昨天)
│ ↓
│ 查询连续聚合 orders_daily_by_created
│ (速度:毫秒级)
└→ 实时数据(今天)
查询 orders 表实时计算
(速度:秒级,但数据量少)
合并结果 → 返回完整统计数据
```
#### 完整业务流
```
┌────────────────────────────────────────────────────────────────┐
│ 应用层操作 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 1. 订单写入 │
│ INSERT INTO orders (...) VALUES (...); │
│ ↓ │
│ [写入 hypertable,自动路由到对应 chunk] │
│ │
│ 2. 后台自动聚合(每小时执行) │
│ [TimescaleDB 后台任务] │
│ ↓ │
│ 检测最近 3 天有变化的 chunks │
│ ↓ │
│ 重新计算这些 chunks 的聚合数据 │
│ ↓ │
│ 更新 orders_daily_by_created 视图 │
│ │
│ 3. 历史订单更新(超出刷新窗口) │
│ UPDATE orders SET total_paid = 100 WHERE id = 123; │
│ ↓ │
│ [应用层检测:订单创建于 30 天前] │
│ ↓ │
│ INSERT INTO aggregate_refresh_queue ... │
│ ↓ │
│ [定时任务每天凌晨处理队列] │
│ ↓ │
│ CALL refresh_continuous_aggregate('...', '2024-12-01', ...) │
│ │
│ 4. 查询统计数据 │
│ SELECT * FROM orders_daily_by_created │
│ WHERE day >= '2024-12-01' AND day < CURRENT_DATE │
│ ↓ │
│ [查询预计算的聚合数据,毫秒级返回] │
│ │
│ UNION ALL │
│ │
│ SELECT ... FROM orders │
│ WHERE created_date >= CURRENT_DATE │
│ ↓ │
│ [实时聚合当天数据,秒级返回] │
│ │
└────────────────────────────────────────────────────────────────┘
```
**关键时间点**
| 数据年龄 | 处理方式 | 查询来源 | 更新方式 |
|---------|---------|---------|---------|
| **今天** | 实时数据 | orders 表(实时聚合) | 写入即可见 |
| **1小时前 ~ 3天前** | 自动刷新窗口 | 连续聚合 | 后台任务每小时刷新 |
| **3天前 ~ N年前** | 历史数据 | 连续聚合 | 手动刷新或刷新队列 |
---
## 架构设计
### 1. Hypertable 分区策略
TimescaleDB 的 hypertable 只能有一个时间分区键,我们选择 `created_date`
**原因**
- 所有订单都有 `created_date`(非 NULL
- `paid_date` 在未支付时为 NULL,无法作为分区键
- 订单按创建时间自然增长,适合时间序列存储
**配置**
```sql
SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days');
```
### 2. 连续聚合机制
连续聚合(Continuous Aggregates)是 TimescaleDB 提供的自动维护的物化视图:
**特性**
- 自动增量更新,只计算变化的数据
- 支持配置刷新策略(时间窗口、刷新频率)
- 查询速度远快于实时聚合(预计算结果)
- 支持与实时数据混合查询
**两个聚合视图**
1. **orders_daily_by_created** - 按创建日期统计
- 统计所有订单(包括未支付)
- 区分已支付和未支付订单数量
- 适用于分析订单趋势、转化率
2. **orders_daily_by_paid** - 按付款日期统计
- 仅统计已支付订单(WHERE paid_date IS NOT NULL
- 反映真实收入时间分布
- 适用于财务报表、收入分析
---
## 实施步骤
### 步骤 1:修改 orders 表迁移文件
文件:`backend/migrations/2025_11_11_083522_create_orders_table.php`
`up()` 方法末尾添加 TimescaleDB 配置:
```php
public function up(): void
{
// ... 现有的表创建和索引代码 ...
// 为 jsonb 字段创建 GIN 索引(PostgreSQL
Schema::getConnection()->statement('CREATE INDEX orders_raw_gin_idx ON orders USING gin (raw)');
Schema::getConnection()->statement('CREATE INDEX orders_ext_gin_idx ON orders USING gin (ext)');
// ==================== TimescaleDB 配置 ====================
// 1. 启用 TimescaleDB 扩展(如果未启用)
Schema::getConnection()->statement('CREATE EXTENSION IF NOT EXISTS timescaledb');
// 2. 将 orders 表转换为 hypertable
// - 使用 created_date 作为时间分区键
// - 每个分区(chunk)存储 7 天数据
// - if_not_exists => true 避免重复创建报错
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days',
if_not_exists => TRUE)"
);
// 3. (可选)设置数据保留策略 - 自动删除 2 年前的数据
// Schema::getConnection()->statement(
// "SELECT add_retention_policy('orders', INTERVAL '2 years')"
// );
}
```
**参数说明**
- `chunk_time_interval`: 分区间隔,根据数据量调整
- 小数据量:7 天(默认)
- 中等数据量:1 天
- 大数据量:半天或更小
- `if_not_exists`: 避免重复创建时报错
### 步骤 2:创建连续聚合迁移文件
创建文件:`backend/migrations/2025_11_12_XXXXXX_create_orders_continuous_aggregates.php`
```php
<?php
use Hyperf\Database\Schema\Schema;
use Hyperf\Database\Migrations\Migration;
return new class extends Migration
{
/**
* Run the migrations.
*/
public function up(): void
{
$conn = Schema::getConnection();
// ========== 连续聚合 1:按创建日期统计 ==========
$conn->statement("
CREATE MATERIALIZED VIEW orders_daily_by_created
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', created_date) AS day,
company_id,
platform_id,
store_id,
-- 订单数量指标
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,
-- 销售额指标
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
SUM(total_received) AS sum_total_received,
-- 平均值指标
AVG(total_amount) AS avg_order_amount,
AVG(total_paid) FILTER (WHERE paid_date IS NOT NULL) AS avg_paid_amount,
-- 费用指标
SUM(freight_fee) AS sum_freight_fee,
SUM(tax_fee) AS sum_tax_fee,
SUM(commission_fee) AS sum_commission_fee,
SUM(discount_fee) AS sum_discount_fee
FROM orders
GROUP BY day, company_id, platform_id, store_id
WITH NO DATA;
");
// 配置自动刷新策略
// - 刷新最近 3 天到 1 小时前的数据
// - 每小时执行一次
$conn->statement("
SELECT add_continuous_aggregate_policy('orders_daily_by_created',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
");
// 创建索引加速查询
$conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, company_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, platform_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_created (day DESC, store_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_created (company_id, day DESC)');
$conn->statement('CREATE INDEX ON orders_daily_by_created (store_id, day DESC)');
// ========== 连续聚合 2:按付款日期统计 ==========
$conn->statement("
CREATE MATERIALIZED VIEW orders_daily_by_paid
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', paid_date) AS day,
company_id,
platform_id,
store_id,
-- 已支付订单统计
COUNT(*) AS paid_orders,
-- 销售额指标
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
SUM(total_received) AS sum_total_received,
-- 平均值指标
AVG(total_amount) AS avg_order_amount,
AVG(total_paid) AS avg_paid_amount,
-- 费用指标
SUM(freight_fee) AS sum_freight_fee,
SUM(tax_fee) AS sum_tax_fee,
SUM(commission_fee) AS sum_commission_fee,
SUM(discount_fee) AS sum_discount_fee
FROM orders
WHERE paid_date IS NOT NULL -- 只统计已支付订单
GROUP BY day, company_id, platform_id, store_id
WITH NO DATA;
");
// 配置自动刷新策略
$conn->statement("
SELECT add_continuous_aggregate_policy('orders_daily_by_paid',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
");
// 创建索引
$conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, company_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, platform_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_paid (day DESC, store_id)');
$conn->statement('CREATE INDEX ON orders_daily_by_paid (company_id, day DESC)');
$conn->statement('CREATE INDEX ON orders_daily_by_paid (store_id, day DESC)');
}
/**
* Reverse the migrations.
*/
public function down(): void
{
$conn = Schema::getConnection();
// 删除连续聚合视图(CASCADE 会自动删除关联的刷新策略)
$conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_paid CASCADE');
$conn->statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_created CASCADE');
}
};
```
**刷新策略参数说明**
- `start_offset`: 从多久之前开始刷新(相对于当前时间)
- `end_offset`: 刷新到多久之前结束(避免刷新正在写入的数据)
- `schedule_interval`: 刷新频率
**示例配置**
```sql
-- 每小时刷新最近 3 天的数据
start_offset => INTERVAL '3 days'
end_offset => INTERVAL '1 hour'
schedule_interval => INTERVAL '1 hour'
-- 实际刷新窗口:[now() - 3 days, now() - 1 hour]
```
---
## 历史数据更新同步问题
### 问题描述
连续聚合的刷新策略只覆盖配置的时间窗口(如最近 3 天),当**超出窗口的历史订单被修改**时,聚合数据不会自动更新。
**常见场景**
- 历史订单退款(修改 total_paid
- 补录付款时间(更新 paid_date)
- 订单状态更正
- 金额调整
### 解决方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|------|------|------|----------|
| **手动刷新** | 精准控制,开销小 | 需要应用层处理 | 历史修改较少 |
| **实时聚合** | 数据始终准确 | 查询性能下降 | 对准确性要求极高 |
| **扩大刷新窗口** | 自动处理 | 刷新开销增加 | 历史修改集中在近期 |
| **刷新队列** | 平衡性能和准确性 | 实现复杂度高 | 生产环境推荐 |
### 推荐方案:刷新队列机制
**实现思路**
1. 订单被修改时,记录需要刷新的日期
2. 定时任务批量刷新队列中的日期
3. 结合自动刷新策略,覆盖大部分场景
#### 1. 创建刷新队列表
```php
// 迁移文件
Schema::create('aggregate_refresh_queue', function (Blueprint $table) {
$table->date('refresh_date')->comment('需要刷新的日期');
$table->string('aggregate_view', 100)->comment('聚合视图名称');
$table->timestamp('created_at')->comment('入队时间');
$table->primary(['refresh_date', 'aggregate_view']);
$table->index('created_at');
});
```
#### 2. 订单更新时入队
```php
// 订单更新方法
public function updateOrder($orderId, array $data): Order
{
$order = Order::findOrFail($orderId);
$affectedDates = [];
// 收集受影响的日期
if (isset($data['created_date']) && $data['created_date'] != $order->created_date) {
$affectedDates[] = Carbon::parse($order->created_date)->format('Y-m-d');
$affectedDates[] = Carbon::parse($data['created_date'])->format('Y-m-d');
}
if (isset($data['paid_date'])) {
if ($order->paid_date) {
$affectedDates[] = Carbon::parse($order->paid_date)->format('Y-m-d');
}
if ($data['paid_date']) {
$affectedDates[] = Carbon::parse($data['paid_date'])->format('Y-m-d');
}
}
// 更新订单
$order->update($data);
// 入队需要刷新的日期
foreach (array_unique($affectedDates) as $date) {
$this->enqueueRefresh($date);
}
return $order;
}
private function enqueueRefresh(string $date): void
{
// 只刷新 3 天前的数据(3 天内的由自动策略处理)
if (Carbon::parse($date)->lt(Carbon::today()->subDays(3))) {
DB::table('aggregate_refresh_queue')->insertOrIgnore([
['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => now()],
['refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_paid', 'created_at' => now()],
]);
}
}
```
#### 3. 定时任务批量刷新
```php
// app/Command/RefreshOrderAggregatesCommand.php
<?php
namespace App\Command;
use Hyperf\Command\Command;
use Hyperf\DbConnection\Db;
use Carbon\Carbon;
class RefreshOrderAggregatesCommand extends Command
{
protected ?string $signature = 'orders:refresh-aggregates';
protected string $description = '刷新订单聚合数据队列';
public function handle()
{
$queue = DB::table('aggregate_refresh_queue')
->where('created_at', '<', now()->subHour()) // 延迟 1 小时,避免频繁刷新
->orderBy('refresh_date')
->get();
if ($queue->isEmpty()) {
$this->info('No aggregates to refresh.');
return;
}
$grouped = $queue->groupBy('aggregate_view');
foreach ($grouped as $view => $items) {
$dates = $items->pluck('refresh_date')->unique();
$this->info("Refreshing {$view}: " . $dates->count() . " dates");
foreach ($dates as $date) {
$start = Carbon::parse($date)->startOfDay();
$end = Carbon::parse($date)->endOfDay();
try {
DB::statement(
"CALL refresh_continuous_aggregate(?, ?, ?)",
[$view, $start, $end]
);
// 删除已刷新的记录
DB::table('aggregate_refresh_queue')
->where('refresh_date', $date)
->where('aggregate_view', $view)
->delete();
$this->line(" ✓ Refreshed {$date}");
} catch (\Exception $e) {
$this->error(" ✗ Failed to refresh {$date}: " . $e->getMessage());
}
}
}
$this->info('Refresh completed.');
}
}
```
**配置定时任务**(每天凌晨 2 点执行):
```php
// config/autoload/crontab.php
return [
'enable' => true,
'crontab' => [
[
'name' => 'refresh-order-aggregates',
'rule' => '0 2 * * *', // 每天凌晨 2 点
'callback' => [App\Command\RefreshOrderAggregatesCommand::class, 'handle'],
'memo' => '刷新订单聚合数据',
],
],
];
```
---
## 查询示例
### 1. 按创建日期统计(历史 + 当天)
```sql
-- 查询最近 30 天的订单统计(按创建日期)
SELECT * FROM (
-- 历史数据(从连续聚合查询,速度快)
SELECT
day,
company_id,
platform_id,
store_id,
total_orders,
paid_orders,
unpaid_orders,
sum_total_amount,
sum_total_paid,
avg_order_amount
FROM orders_daily_by_created
WHERE day >= CURRENT_DATE - INTERVAL '30 days'
AND day < CURRENT_DATE
UNION ALL
-- 当天实时数据
SELECT
time_bucket('1 day', created_date) AS day,
company_id,
platform_id,
store_id,
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
AVG(total_amount) AS avg_order_amount
FROM orders
WHERE created_date >= CURRENT_DATE
GROUP BY day, company_id, platform_id, store_id
) AS combined
ORDER BY day DESC, company_id, platform_id, store_id;
```
### 2. 按付款日期统计(历史 + 当天)
```sql
-- 查询最近 30 天的实际收入(按付款日期)
SELECT * FROM (
-- 历史数据
SELECT
day,
company_id,
platform_id,
store_id,
paid_orders,
sum_total_paid,
avg_paid_amount
FROM orders_daily_by_paid
WHERE day >= CURRENT_DATE - INTERVAL '30 days'
AND day < CURRENT_DATE
UNION ALL
-- 当天实时数据
SELECT
time_bucket('1 day', paid_date) AS day,
company_id,
platform_id,
store_id,
COUNT(*) AS paid_orders,
SUM(total_paid) AS sum_total_paid,
AVG(total_paid) AS avg_paid_amount
FROM orders
WHERE paid_date >= CURRENT_DATE
AND paid_date IS NOT NULL
GROUP BY day, company_id, platform_id, store_id
) AS combined
ORDER BY day DESC;
```
### 3. 按公司(Company)分组统计
```sql
-- 统计各公司每天的订单情况
SELECT
day,
company_id,
SUM(total_orders) AS total_orders,
SUM(paid_orders) AS paid_orders,
SUM(sum_total_amount) AS total_amount,
SUM(sum_total_paid) AS total_paid,
AVG(avg_order_amount) AS avg_order_amount
FROM orders_daily_by_created
WHERE day >= '2025-01-01' AND day < '2025-02-01'
GROUP BY day, company_id
ORDER BY day DESC, company_id;
```
```sql
-- 统计各公司本月累计销售额(按付款时间)
SELECT
company_id,
SUM(paid_orders) AS total_paid_orders,
SUM(sum_total_paid) AS total_revenue,
AVG(avg_paid_amount) AS avg_order_value
FROM orders_daily_by_paid
WHERE day >= date_trunc('month', CURRENT_DATE)
AND day < CURRENT_DATE
GROUP BY company_id
ORDER BY total_revenue DESC;
```
### 4. 按平台(Platform)分组统计
```sql
-- 对比各平台每天的订单量和销售额
SELECT
day,
platform_id,
SUM(total_orders) AS total_orders,
SUM(paid_orders) AS paid_orders,
SUM(sum_total_amount) AS total_amount,
SUM(sum_total_paid) AS total_paid,
ROUND(SUM(paid_orders)::numeric / NULLIF(SUM(total_orders), 0) * 100, 2) AS payment_rate
FROM orders_daily_by_created
WHERE day >= CURRENT_DATE - INTERVAL '7 days'
AND day < CURRENT_DATE
GROUP BY day, platform_id
ORDER BY day DESC, total_paid DESC;
```
```sql
-- 统计各平台本周每天的实际收入(按付款时间)
SELECT
day,
platform_id,
SUM(paid_orders) AS orders,
SUM(sum_total_paid) AS revenue,
SUM(sum_freight_fee) AS freight,
SUM(sum_commission_fee) AS commission
FROM orders_daily_by_paid
WHERE day >= date_trunc('week', CURRENT_DATE)
AND day < CURRENT_DATE
GROUP BY day, platform_id
ORDER BY day DESC, revenue DESC;
```
### 5. 按店铺(Store)分组统计
```sql
-- 统计各店铺每天的订单情况
SELECT
day,
store_id,
SUM(total_orders) AS total_orders,
SUM(paid_orders) AS paid_orders,
SUM(unpaid_orders) AS unpaid_orders,
SUM(sum_total_amount) AS total_amount,
SUM(sum_total_paid) AS total_paid
FROM orders_daily_by_created
WHERE day >= '2025-01-01' AND day < '2025-02-01'
GROUP BY day, store_id
ORDER BY day DESC, total_paid DESC;
```
```sql
-- 查询指定店铺的日销售趋势(按付款时间)
SELECT
day,
paid_orders,
sum_total_paid AS revenue,
avg_paid_amount,
sum_freight_fee,
sum_tax_fee
FROM orders_daily_by_paid
WHERE store_id = 123
AND day >= CURRENT_DATE - INTERVAL '30 days'
AND day < CURRENT_DATE
ORDER BY day DESC;
```
### 6. 多维度组合查询
```sql
-- 查询特定公司下各店铺的每日销售情况
SELECT
day,
company_id,
store_id,
SUM(total_orders) AS orders,
SUM(paid_orders) AS paid,
SUM(sum_total_paid) AS revenue,
AVG(avg_paid_amount) AS avg_value
FROM orders_daily_by_created
WHERE company_id = 100
AND day >= '2025-01-01' AND day < '2025-02-01'
GROUP BY day, company_id, store_id
ORDER BY day DESC, revenue DESC;
```
```sql
-- 对比各公司在不同平台的销售表现(按付款时间)
SELECT
company_id,
platform_id,
SUM(paid_orders) AS total_orders,
SUM(sum_total_paid) AS total_revenue,
AVG(avg_paid_amount) AS avg_order_value,
SUM(sum_commission_fee) AS total_commission,
ROUND((SUM(sum_commission_fee) / NULLIF(SUM(sum_total_paid), 0) * 100), 2) AS commission_rate
FROM orders_daily_by_paid
WHERE day >= date_trunc('month', CURRENT_DATE)
AND day < CURRENT_DATE
GROUP BY company_id, platform_id
ORDER BY company_id, total_revenue DESC;
```
```sql
-- 查询指定公司、平台、店铺的销售趋势
SELECT
day,
total_orders,
paid_orders,
unpaid_orders,
sum_total_amount,
sum_total_paid,
avg_order_amount
FROM orders_daily_by_created
WHERE company_id = 100
AND platform_id = 200
AND store_id = 300
AND day >= CURRENT_DATE - INTERVAL '90 days'
AND day < CURRENT_DATE
ORDER BY day DESC;
```
### 7. 周/月聚合查询
```sql
-- 按周统计(按创建时间)
SELECT
time_bucket('7 days', day) AS week,
company_id,
SUM(total_orders) AS weekly_orders,
SUM(paid_orders) AS weekly_paid_orders,
SUM(sum_total_paid) AS weekly_revenue,
AVG(avg_order_amount) AS avg_order_value
FROM orders_daily_by_created
WHERE day >= '2025-01-01'
GROUP BY week, company_id
ORDER BY week DESC, company_id;
```
```sql
-- 按月统计(按付款时间)
SELECT
date_trunc('month', day) AS month,
platform_id,
SUM(paid_orders) AS monthly_orders,
SUM(sum_total_paid) AS monthly_revenue,
SUM(sum_freight_fee) AS monthly_freight,
SUM(sum_commission_fee) AS monthly_commission
FROM orders_daily_by_paid
WHERE day >= '2024-01-01'
GROUP BY month, platform_id
ORDER BY month DESC, monthly_revenue DESC;
```
### 8. 实时 + 历史混合查询(应用层封装)
**PHP 查询示例**
```php
<?php
namespace App\Service;
use Hyperf\DbConnection\Db;
use Carbon\Carbon;
class OrderStatisticsService
{
/**
* 查询指定店铺的日销售统计(按创建时间)
*
* @param int $storeId 店铺 ID
* @param string $startDate 开始日期 (Y-m-d)
* @param string $endDate 结束日期 (Y-m-d)
* @return array
*/
public function getDailyStatsByStore(int $storeId, string $startDate, string $endDate): array
{
$start = Carbon::parse($startDate);
$end = Carbon::parse($endDate);
$today = Carbon::today();
$results = [];
// 查询历史数据(从聚合表)
if ($start->lt($today)) {
$historyEnd = $end->lt($today) ? $end : $today->copy()->subDay();
$history = Db::select("
SELECT
day::date,
total_orders,
paid_orders,
unpaid_orders,
sum_total_amount,
sum_total_paid,
avg_order_amount
FROM orders_daily_by_created
WHERE store_id = ?
AND day >= ?
AND day <= ?
ORDER BY day DESC
", [$storeId, $start->format('Y-m-d'), $historyEnd->format('Y-m-d')]);
$results = array_merge($results, $history);
}
// 查询当天实时数据
if ($end->gte($today)) {
$realtime = Db::select("
SELECT
time_bucket('1 day', created_date)::date AS day,
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
AVG(total_amount) AS avg_order_amount
FROM orders
WHERE store_id = ?
AND created_date >= ?
AND created_date < ?
GROUP BY day
ORDER BY day DESC
", [$storeId, $today->format('Y-m-d H:i:s'), $end->addDay()->format('Y-m-d H:i:s')]);
$results = array_merge($results, $realtime);
}
return $results;
}
/**
* 查询各公司本月销售排名(按付款时间)
*
* @return array
*/
public function getMonthlyRevenueByCompany(): array
{
$monthStart = Carbon::now()->startOfMonth();
$today = Carbon::today();
// 历史数据
$history = Db::select("
SELECT
company_id,
SUM(paid_orders) AS total_orders,
SUM(sum_total_paid) AS total_revenue,
AVG(avg_paid_amount) AS avg_order_value
FROM orders_daily_by_paid
WHERE day >= ?
AND day < ?
GROUP BY company_id
", [$monthStart->format('Y-m-d'), $today->format('Y-m-d')]);
// 当天实时数据
$realtime = Db::select("
SELECT
company_id,
COUNT(*) AS total_orders,
SUM(total_paid) AS total_revenue,
AVG(total_paid) AS avg_order_value
FROM orders
WHERE paid_date >= ?
AND paid_date IS NOT NULL
GROUP BY company_id
", [$today->format('Y-m-d H:i:s')]);
// 合并数据
$combined = [];
foreach ($history as $row) {
$combined[$row->company_id] = [
'company_id' => $row->company_id,
'total_orders' => $row->total_orders,
'total_revenue' => $row->total_revenue,
'avg_order_value' => $row->avg_order_value,
];
}
foreach ($realtime as $row) {
if (isset($combined[$row->company_id])) {
$combined[$row->company_id]['total_orders'] += $row->total_orders;
$combined[$row->company_id]['total_revenue'] += $row->total_revenue;
// 重新计算平均值
$combined[$row->company_id]['avg_order_value'] =
$combined[$row->company_id]['total_revenue'] / $combined[$row->company_id]['total_orders'];
} else {
$combined[$row->company_id] = [
'company_id' => $row->company_id,
'total_orders' => $row->total_orders,
'total_revenue' => $row->total_revenue,
'avg_order_value' => $row->avg_order_value,
];
}
}
// 按收入排序
usort($combined, fn($a, $b) => $b['total_revenue'] <=> $a['total_revenue']);
return $combined;
}
}
```
---
## 最佳实践
### 1. Chunk 大小优化(重要)
#### Chunk 大小的核心原则
**TimescaleDB 官方建议**
- **目标大小**500 MB ~ 2 GB per chunk(最优性能范围)
- **内存限制**:Chunk 大小应 ≤ 25% 可用内存
- **查询效率**:单次查询扫描的 chunk 数量建议 < 10 个
- **上限**:单个 chunk 不建议超过 5 GB
#### 计算您的 Chunk 大小
**步骤 1:估算单行数据大小**
```
orders 表单行估算(含索引):
- 基础字段(id, company_id, 金额等):~300 bytes
- 时间字段(4个 timestampTz):~32 bytes
- 文本字段(zipcode, city, province, country):~200 bytes
- JSONB 字段(raw, ext):~500-1000 bytes(视内容而定)
- 索引开销(14个索引):~800 bytes
─────────────────────────────────────────
总计:约 1.5 - 2.5 KB/行(平均 2KB
```
**步骤 2:计算每天数据量**
```
公式:日数据量 = 日订单数 × 单行大小
示例:
- 10,000 单/天 × 2KB = 20 MB/天
- 50,000 单/天 × 2KB = 100 MB/天
- 100,000 单/天 × 2KB = 200 MB/天
- 500,000 单/天 × 2KB = 1 GB/天
```
**步骤 3:选择 chunk_time_interval**
| 日订单量 | 日数据量 | 推荐间隔 | Chunk 大小 | 性能评价 |
|---------|---------|---------|-----------|---------|
| < 5千 | < 10 MB/天 | **14-30天** | 140-300 MB | ✅ 最优:减少 chunk 数量 |
| 5千-2万 | 10-40 MB/天 | **7-14天** | 70-560 MB | ✅ 最优:平衡性能 |
| 2万-10万 | 40-200 MB/天 | **3-7天** | 120MB-1.4GB | ✅ 推荐 |
| 10万-50万 | 200MB-1GB/天 | **1-3天** | 200MB-3GB | ✅ 推荐 |
| 50万-100万 | 1-2 GB/天 | **12-24小时** | 1.5-4 GB | ✅ 可接受 |
| > 100万 | > 2 GB/天 | **6-12小时** | 2-4 GB | ⚠️ 需监控性能 |
#### 查询模式影响分析
**常见查询时间范围 vs Chunk 配置**
```sql
-- 假设您最常查询"最近 30 天"数据
SELECT * FROM orders
WHERE created_date >= CURRENT_DATE - INTERVAL '30 days';
-- 不同配置扫描的 chunk 数量:
-- 1天间隔:30个chunks ❌ 过多,规划时间长
-- 3天间隔:10个chunks ✅ 理想
-- 7天间隔:5个chunks ✅ 最优
-- 14天间隔:3个chunks ✅ 很好
-- 30天间隔:1个chunk ⚠️ chunk 过大,可能超出内存
```
**最佳实践**
> 根据您最常见的查询时间范围,配置 chunk_time_interval 使得单次查询扫描 3-8 个 chunks。
#### 实际配置示例
```php
// 场景 1:小规模电商(日均 5000 单)
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '14 days', -- 约 140 MB/chunk
if_not_exists => TRUE)"
);
// 场景 2:中等规模(日均 5 万单)
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '7 days', -- 约 700 MB/chunk
if_not_exists => TRUE)"
);
// 场景 3:大规模平台(日均 50 万单)
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '1 day', -- 约 1 GB/chunk
if_not_exists => TRUE)"
);
// 场景 4:超大规模(日均 200 万单)
Schema::getConnection()->statement(
"SELECT create_hypertable('orders', 'created_date',
chunk_time_interval => INTERVAL '12 hours', -- 约 2 GB/chunk
if_not_exists => TRUE)"
);
```
#### 动态调整 Chunk 间隔
如果创建 hypertable 后发现 chunk 大小不合适,可以调整:
```sql
-- 查看当前 chunk 大小
SELECT
chunk_name,
pg_size_pretty(pg_total_relation_size(chunk_schema || '.' || chunk_name)) AS size,
range_start,
range_end
FROM timescaledb_information.chunks
WHERE hypertable_name = 'orders'
ORDER BY range_start DESC
LIMIT 10;
-- 如果 chunk 过小(< 100 MB)或过大(> 5 GB),调整间隔
-- 注意:只影响新创建的 chunk,已有 chunk 不受影响
SELECT set_chunk_time_interval('orders', INTERVAL '7 days');
```
#### 性能监控指标
定期检查以下指标:
```sql
-- 1. 查看 chunk 数量和大小分布
SELECT
COUNT(*) AS total_chunks,
pg_size_pretty(SUM(pg_total_relation_size(chunk_schema || '.' || chunk_name))) AS total_size,
pg_size_pretty(AVG(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS avg_chunk_size,
pg_size_pretty(MIN(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS min_chunk_size,
pg_size_pretty(MAX(pg_total_relation_size(chunk_schema || '.' || chunk_name))::bigint) AS max_chunk_size
FROM timescaledb_information.chunks
WHERE hypertable_name = 'orders';
-- 2. 查看查询性能(EXPLAIN ANALYZE
EXPLAIN ANALYZE
SELECT COUNT(*), SUM(total_amount)
FROM orders
WHERE created_date >= CURRENT_DATE - INTERVAL '30 days';
-- 关注 "Chunks excluded by constraint" - 数值越高越好
```
**警告信号**
- ❌ 平均 chunk 大小 < 50 MB → chunk 间隔太小,增大间隔
- ❌ 平均 chunk 大小 > 5 GB → chunk 间隔太大,减小间隔
- ❌ 查询扫描 > 20 个 chunks → 考虑增大 chunk 间隔或优化查询
- ❌ Planning Time > Execution Time → chunk 数量过多,需要增大间隔
### 2. 刷新策略配置
**低延迟场景**(数据实时性要求高):
```sql
SELECT add_continuous_aggregate_policy('orders_daily_by_created',
start_offset => INTERVAL '7 days', -- 刷新最近 7 天
end_offset => INTERVAL '5 minutes', -- 只排除最近 5 分钟
schedule_interval => INTERVAL '5 minutes' -- 每 5 分钟刷新
);
```
**标准场景**(平衡性能和实时性):
```sql
SELECT add_continuous_aggregate_policy('orders_daily_by_created',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
```
**低开销场景**(历史数据为主):
```sql
SELECT add_continuous_aggregate_policy('orders_daily_by_created',
start_offset => INTERVAL '1 day', -- 只刷新昨天
end_offset => INTERVAL '12 hours',
schedule_interval => INTERVAL '6 hours' -- 每 6 小时刷新
);
```
### 3. 索引优化
**原则**:根据常用查询模式创建索引
```sql
-- 场景 1:按时间 + 店铺查询
CREATE INDEX ON orders_daily_by_created (day DESC, store_id);
-- 场景 2:按店铺 + 时间排序
CREATE INDEX ON orders_daily_by_created (store_id, day DESC);
-- 场景 3:按公司 + 平台 + 时间
CREATE INDEX ON orders_daily_by_created (company_id, platform_id, day DESC);
```
### 4. 数据保留策略
自动删除旧数据(可选):
```sql
-- 保留最近 2 年的数据,自动删除更早的
SELECT add_retention_policy('orders', INTERVAL '2 years');
-- 查看保留策略
SELECT * FROM timescaledb_information.jobs
WHERE proc_name = 'policy_retention';
-- 删除保留策略
SELECT remove_retention_policy('orders');
```
### 5. 性能监控
**查看 chunk 分区情况**
```sql
SELECT chunk_name, range_start, range_end
FROM timescaledb_information.chunks
WHERE hypertable_name = 'orders'
ORDER BY range_start DESC
LIMIT 10;
```
**查看连续聚合刷新状态**
```sql
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregates;
```
**查看后台任务执行情况**
```sql
SELECT job_id, proc_name, scheduled, next_start, last_finish, last_successful_finish
FROM timescaledb_information.jobs
ORDER BY job_id;
```
### 6. 故障排查
**手动刷新聚合数据**
```sql
-- 刷新指定时间范围
CALL refresh_continuous_aggregate('orders_daily_by_created',
'2025-01-01', '2025-02-01');
-- 刷新所有数据(慎用,耗时长)
CALL refresh_continuous_aggregate('orders_daily_by_created', NULL, NULL);
```
**检查聚合数据是否最新**
```sql
-- 查看最新的物化数据时间点
SELECT view_name,
completed_threshold AS last_materialized_time,
NOW() - completed_threshold AS lag
FROM timescaledb_information.continuous_aggregates
WHERE view_name IN ('orders_daily_by_created', 'orders_daily_by_paid');
```
---
## 总结
### 架构优势
1. **高性能**:历史数据查询从秒级优化到毫秒级
2. **灵活性**:支持双时间维度(创建时间、付款时间)
3. **实时性**:当天数据实时查询,历史数据预聚合
4. **可扩展**:支持多维度分组(company/platform/store
5. **可维护**:自动刷新策略 + 手动刷新队列
### 适用场景
- ✅ 订单量大(日订单 > 1 万)
- ✅ 需要多时间维度分析
- ✅ 历史数据查询频繁
- ✅ 需要天/周/月等多种时间粒度统计
- ✅ 需要按多维度分组(公司/平台/店铺)
### 注意事项
1. **数据更新**:超出刷新窗口的历史修改需要手动刷新或使用刷新队列
2. **索引规划**:根据实际查询模式创建合适的索引
3. **分区间隔**:根据数据量调整,避免分区过多或过少
4. **监控告警**:定期检查聚合刷新状态,防止数据滞后
---
## 参考资料
- [TimescaleDB 官方文档](https://docs.timescale.com/)
- [Continuous Aggregates 详解](https://docs.timescale.com/use-timescale/latest/continuous-aggregates/)
- [Hypertables 最佳实践](https://docs.timescale.com/use-timescale/latest/hypertables/)
- [PostgreSQL time_bucket 函数](https://docs.timescale.com/api/latest/hyperfunctions/time_bucket/)