add admin materialize

This commit is contained in:
2026-05-07 21:25:38 +08:00
parent 349f8e11b0
commit eae665d66f
3 changed files with 508 additions and 0 deletions
@@ -0,0 +1,346 @@
<?php
declare(strict_types=1);
namespace App\Controller\Api\V1;
use App\Controller\AbstractController;
use App\Middleware\AuthMiddleware;
use App\Middleware\PermissionMiddleware;
use App\Model\AggregateRefreshQueue;
use Hyperf\DbConnection\Db;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\Middleware;
use Hyperf\HttpServer\Annotation\RequestMapping;
use OpenApi\Attributes as OA;
use Psr\Http\Message\ResponseInterface;
#[OA\Tag(name: 'Admin Materialization', description: '管理员物化层监控')]
#[Controller(prefix: "/api/v1/admin/materialization")]
class AdminMaterializationController extends AbstractController
{
private const VIEW_BY_CREATED = 'orders_daily_by_created';
private const VIEW_BY_PAID = 'orders_daily_by_paid';
private const ALLOWED_VIEWS = [self::VIEW_BY_CREATED, self::VIEW_BY_PAID];
/**
* 列出 aggregate_refresh_queue 待刷新条目
*/
#[OA\Get(
path: '/admin/materialization/queue',
summary: '列出聚合刷新队列',
description: '分页列出 aggregate_refresh_queue 表中的待刷新日期,支持按 view/from/to 过滤',
security: [['bearerAuth' => []]],
tags: ['Admin Materialization'],
parameters: [
new OA\Parameter(name: 'page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 1)),
new OA\Parameter(name: 'per_page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 20)),
new OA\Parameter(name: 'view', in: 'query', required: false, description: '聚合视图名(白名单)', schema: new OA\Schema(type: 'string', enum: self::ALLOWED_VIEWS)),
new OA\Parameter(name: 'from', in: 'query', required: false, description: 'refresh_date >= fromYYYY-MM-DD', schema: new OA\Schema(type: 'string', format: 'date')),
new OA\Parameter(name: 'to', in: 'query', required: false, description: 'refresh_date <= toYYYY-MM-DD', schema: new OA\Schema(type: 'string', format: 'date')),
],
responses: [
new OA\Response(
response: 200,
description: '获取成功',
content: new OA\JsonContent(properties: [
new OA\Property(property: 'code', type: 'integer', example: 0),
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
new OA\Property(property: 'data', properties: [
new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [
new OA\Property(property: 'refresh_date', type: 'string', format: 'date'),
new OA\Property(property: 'aggregate_view', type: 'string'),
new OA\Property(property: 'created_at', type: 'string', format: 'date-time'),
])),
new OA\Property(property: 'total', type: 'integer'),
new OA\Property(property: 'page', type: 'integer'),
new OA\Property(property: 'per_page', type: 'integer'),
], type: 'object'),
])
),
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
]
)]
#[RequestMapping(path: "queue", methods: "GET")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
public function queue(): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$page = max((int) $this->request->input('page', 1), 1);
$per_page = min(max((int) $this->request->input('per_page', 20), 1), 100);
$view = $this->request->input('view');
$from = $this->request->input('from');
$to = $this->request->input('to');
$query = AggregateRefreshQueue::query();
if ($view !== null && $view !== '') {
$query->where('aggregate_view', $view);
}
if ($from !== null && $from !== '') {
$query->where('refresh_date', '>=', $from);
}
if ($to !== null && $to !== '') {
$query->where('refresh_date', '<=', $to);
}
$total = $query->count();
$items = $query
->orderBy('refresh_date')
->orderBy('aggregate_view')
->offset(($page - 1) * $per_page)
->limit($per_page)
->get();
return [
'code' => 0,
'message' => '获取成功',
'data' => [
'items' => $items,
'total' => $total,
'page' => $page,
'per_page' => $per_page,
],
];
}
/**
* 手动触发指定时间窗口的物化刷新
*/
#[OA\Post(
path: '/admin/materialization/refresh',
summary: '手动刷新物化对象',
description: 'by_created 走 refresh_continuous_aggregate(view, from, to)by_paid 是 PG 物化视图,from/to 无效,整体走 REFRESH MATERIALIZED VIEW [CONCURRENTLY]',
security: [['bearerAuth' => []]],
tags: ['Admin Materialization'],
requestBody: new OA\RequestBody(
required: true,
content: new OA\JsonContent(
required: ['view'],
properties: [
new OA\Property(property: 'view', type: 'string', enum: self::ALLOWED_VIEWS),
new OA\Property(property: 'from', type: 'string', description: '起始时间戳(仅 by_created 生效,timestamptz 字面量)'),
new OA\Property(property: 'to', type: 'string', description: '结束时间戳(仅 by_created 生效,timestamptz 字面量)'),
]
)
),
responses: [
new OA\Response(
response: 200,
description: '刷新成功',
content: new OA\JsonContent(properties: [
new OA\Property(property: 'code', type: 'integer', example: 0),
new OA\Property(property: 'message', type: 'string', example: '刷新成功'),
new OA\Property(property: 'data', properties: [
new OA\Property(property: 'view', type: 'string'),
new OA\Property(property: 'from', type: 'string', nullable: true),
new OA\Property(property: 'to', type: 'string', nullable: true),
new OA\Property(property: 'mode', type: 'string', enum: ['incremental', 'full_refresh', 'full_refresh_concurrent']),
], type: 'object'),
])
),
new OA\Response(response: 400, description: '参数错误或刷新失败', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
]
)]
#[RequestMapping(path: "refresh", methods: "POST")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
public function refresh(): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$view = (string) $this->request->input('view', '');
$from = $this->request->input('from');
$to = $this->request->input('to');
if (!in_array($view, self::ALLOWED_VIEWS, true)) {
return $this->response->json([
'code' => 400,
'message' => 'invalid view: must be one of ' . implode(', ', self::ALLOWED_VIEWS),
'data' => null,
])->withStatus(400);
}
try {
if ($view === self::VIEW_BY_CREATED) {
Db::statement(
'CALL refresh_continuous_aggregate(?, ?::timestamptz, ?::timestamptz)',
[$view, $from, $to]
);
return [
'code' => 0,
'message' => '刷新成功',
'data' => [
'view' => $view,
'from' => $from,
'to' => $to,
'mode' => 'incremental',
],
];
}
// by_paidPG 物化视图,from/to 无效
$rows = Db::select("SELECT ispopulated FROM pg_matviews WHERE matviewname = ?", [self::VIEW_BY_PAID]);
$populated = !empty($rows) && $rows[0]->ispopulated;
$mode = $populated ? 'full_refresh_concurrent' : 'full_refresh';
$sql = $populated
? 'REFRESH MATERIALIZED VIEW CONCURRENTLY ' . self::VIEW_BY_PAID
: 'REFRESH MATERIALIZED VIEW ' . self::VIEW_BY_PAID;
Db::statement($sql);
return [
'code' => 0,
'message' => '刷新成功',
'data' => [
'view' => $view,
'from' => null,
'to' => null,
'mode' => $mode,
],
];
} catch (\Throwable $e) {
return $this->response->json([
'code' => 400,
'message' => 'refresh failed: ' . $e->getMessage(),
'data' => null,
])->withStatus(400);
}
}
/**
* 列出连续聚合视图及其滞后秒数
*/
#[OA\Get(
path: '/admin/materialization/aggregates',
summary: '查询连续聚合滞后情况',
description: '查询 timescaledb_information.continuous_aggregates,附加由 cagg_watermark 计算的 lag_seconds 字段',
security: [['bearerAuth' => []]],
tags: ['Admin Materialization'],
responses: [
new OA\Response(
response: 200,
description: '获取成功',
content: new OA\JsonContent(properties: [
new OA\Property(property: 'code', type: 'integer', example: 0),
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
new OA\Property(property: 'data', properties: [
new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [
new OA\Property(property: 'view_name', type: 'string'),
new OA\Property(property: 'view_schema', type: 'string'),
new OA\Property(property: 'materialization_hypertable_name', type: 'string'),
new OA\Property(property: 'lag_seconds', type: 'number', format: 'double', nullable: true),
])),
], type: 'object'),
])
),
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
]
)]
#[RequestMapping(path: "aggregates", methods: "GET")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
public function aggregates(): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$rows = Db::select(
"SELECT ca.view_name,
ca.view_schema,
ca.materialization_hypertable_name,
EXTRACT(EPOCH FROM (now() - to_timestamp(_timescaledb_functions.cagg_watermark(cagg.mat_hypertable_id)::float8 / 1000000))) AS lag_seconds
FROM timescaledb_information.continuous_aggregates ca
JOIN _timescaledb_catalog.continuous_agg cagg
ON cagg.user_view_name = ca.view_name
AND cagg.user_view_schema = ca.view_schema
ORDER BY ca.view_name"
);
return [
'code' => 0,
'message' => '获取成功',
'data' => ['items' => $rows],
];
}
/**
* 列出 TimescaleDB Crontab 任务及其最近执行状态
*/
#[OA\Get(
path: '/admin/materialization/jobs',
summary: '查询 Crontab 任务状态',
description: '查询 timescaledb_information.jobs LEFT JOIN job_stats,仅 policy_refresh_continuous_aggregate 类型',
security: [['bearerAuth' => []]],
tags: ['Admin Materialization'],
responses: [
new OA\Response(
response: 200,
description: '获取成功',
content: new OA\JsonContent(properties: [
new OA\Property(property: 'code', type: 'integer', example: 0),
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
new OA\Property(property: 'data', properties: [
new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [
new OA\Property(property: 'job_id', type: 'integer'),
new OA\Property(property: 'application_name', type: 'string'),
new OA\Property(property: 'proc_name', type: 'string'),
new OA\Property(property: 'hypertable_name', type: 'string', nullable: true),
new OA\Property(property: 'schedule_interval', type: 'string'),
new OA\Property(property: 'next_start', type: 'string', format: 'date-time', nullable: true),
new OA\Property(property: 'last_successful_finish', type: 'string', format: 'date-time', nullable: true),
new OA\Property(property: 'last_run_status', type: 'string', nullable: true),
])),
], type: 'object'),
])
),
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
]
)]
#[RequestMapping(path: "jobs", methods: "GET")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
public function jobs(): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$rows = Db::select(
"SELECT j.job_id,
j.application_name,
j.proc_name,
j.hypertable_name,
j.schedule_interval,
j.next_start,
s.last_successful_finish,
s.last_run_status
FROM timescaledb_information.jobs j
LEFT JOIN timescaledb_information.job_stats s ON s.job_id = j.job_id
WHERE j.proc_name = 'policy_refresh_continuous_aggregate'
ORDER BY j.job_id"
);
return [
'code' => 0,
'message' => '获取成功',
'data' => ['items' => $rows],
];
}
private function requireAdmin(): ?ResponseInterface
{
$user = $this->getAuthUser();
if (!$user || !$user->isAdministrator()) {
return $this->response->json([
'code' => 403,
'message' => '仅管理员可访问',
])->withStatus(403);
}
return null;
}
}