347 lines
16 KiB
PHP
347 lines
16 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace App\Controller\Api\V1;
|
||
|
||
use App\Controller\AbstractController;
|
||
use App\Middleware\AuthMiddleware;
|
||
use App\Middleware\PermissionMiddleware;
|
||
use App\Model\AggregateRefreshQueue;
|
||
use Hyperf\DbConnection\Db;
|
||
use Hyperf\HttpServer\Annotation\Controller;
|
||
use Hyperf\HttpServer\Annotation\Middleware;
|
||
use Hyperf\HttpServer\Annotation\RequestMapping;
|
||
use OpenApi\Attributes as OA;
|
||
use Psr\Http\Message\ResponseInterface;
|
||
|
||
#[OA\Tag(name: 'Admin Materialization', description: '管理员物化层监控')]
|
||
#[Controller(prefix: "/api/v1/admin/materialization")]
|
||
class AdminMaterializationController extends AbstractController
|
||
{
|
||
private const VIEW_BY_CREATED = 'orders_daily_by_created';
|
||
private const VIEW_BY_PAID = 'orders_daily_by_paid';
|
||
|
||
private const ALLOWED_VIEWS = [self::VIEW_BY_CREATED, self::VIEW_BY_PAID];
|
||
|
||
/**
|
||
* 列出 aggregate_refresh_queue 待刷新条目
|
||
*/
|
||
#[OA\Get(
|
||
path: '/admin/materialization/queue',
|
||
summary: '列出聚合刷新队列',
|
||
description: '分页列出 aggregate_refresh_queue 表中的待刷新日期,支持按 view/from/to 过滤',
|
||
security: [['bearerAuth' => []]],
|
||
tags: ['Admin Materialization'],
|
||
parameters: [
|
||
new OA\Parameter(name: 'page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 1)),
|
||
new OA\Parameter(name: 'per_page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 20)),
|
||
new OA\Parameter(name: 'view', in: 'query', required: false, description: '聚合视图名(白名单)', schema: new OA\Schema(type: 'string', enum: self::ALLOWED_VIEWS)),
|
||
new OA\Parameter(name: 'from', in: 'query', required: false, description: 'refresh_date >= from(YYYY-MM-DD)', schema: new OA\Schema(type: 'string', format: 'date')),
|
||
new OA\Parameter(name: 'to', in: 'query', required: false, description: 'refresh_date <= to(YYYY-MM-DD)', schema: new OA\Schema(type: 'string', format: 'date')),
|
||
],
|
||
responses: [
|
||
new OA\Response(
|
||
response: 200,
|
||
description: '获取成功',
|
||
content: new OA\JsonContent(properties: [
|
||
new OA\Property(property: 'code', type: 'integer', example: 0),
|
||
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
|
||
new OA\Property(property: 'data', properties: [
|
||
new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [
|
||
new OA\Property(property: 'refresh_date', type: 'string', format: 'date'),
|
||
new OA\Property(property: 'aggregate_view', type: 'string'),
|
||
new OA\Property(property: 'created_at', type: 'string', format: 'date-time'),
|
||
])),
|
||
new OA\Property(property: 'total', type: 'integer'),
|
||
new OA\Property(property: 'page', type: 'integer'),
|
||
new OA\Property(property: 'per_page', type: 'integer'),
|
||
], type: 'object'),
|
||
])
|
||
),
|
||
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
|
||
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
|
||
]
|
||
)]
|
||
#[RequestMapping(path: "queue", methods: "GET")]
|
||
#[Middleware(AuthMiddleware::class)]
|
||
#[Middleware(PermissionMiddleware::class)]
|
||
public function queue(): ResponseInterface|array
|
||
{
|
||
if ($forbidden = $this->requireAdmin()) return $forbidden;
|
||
|
||
$page = max((int) $this->request->input('page', 1), 1);
|
||
$per_page = min(max((int) $this->request->input('per_page', 20), 1), 100);
|
||
$view = $this->request->input('view');
|
||
$from = $this->request->input('from');
|
||
$to = $this->request->input('to');
|
||
|
||
$query = AggregateRefreshQueue::query();
|
||
if ($view !== null && $view !== '') {
|
||
$query->where('aggregate_view', $view);
|
||
}
|
||
if ($from !== null && $from !== '') {
|
||
$query->where('refresh_date', '>=', $from);
|
||
}
|
||
if ($to !== null && $to !== '') {
|
||
$query->where('refresh_date', '<=', $to);
|
||
}
|
||
|
||
$total = $query->count();
|
||
$items = $query
|
||
->orderBy('refresh_date')
|
||
->orderBy('aggregate_view')
|
||
->offset(($page - 1) * $per_page)
|
||
->limit($per_page)
|
||
->get();
|
||
|
||
return [
|
||
'code' => 0,
|
||
'message' => '获取成功',
|
||
'data' => [
|
||
'items' => $items,
|
||
'total' => $total,
|
||
'page' => $page,
|
||
'per_page' => $per_page,
|
||
],
|
||
];
|
||
}
|
||
|
||
/**
|
||
* 手动触发指定时间窗口的物化刷新
|
||
*/
|
||
#[OA\Post(
|
||
path: '/admin/materialization/refresh',
|
||
summary: '手动刷新物化对象',
|
||
description: 'by_created 走 refresh_continuous_aggregate(view, from, to);by_paid 是 PG 物化视图,from/to 无效,整体走 REFRESH MATERIALIZED VIEW [CONCURRENTLY]',
|
||
security: [['bearerAuth' => []]],
|
||
tags: ['Admin Materialization'],
|
||
requestBody: new OA\RequestBody(
|
||
required: true,
|
||
content: new OA\JsonContent(
|
||
required: ['view'],
|
||
properties: [
|
||
new OA\Property(property: 'view', type: 'string', enum: self::ALLOWED_VIEWS),
|
||
new OA\Property(property: 'from', type: 'string', description: '起始时间戳(仅 by_created 生效,timestamptz 字面量)'),
|
||
new OA\Property(property: 'to', type: 'string', description: '结束时间戳(仅 by_created 生效,timestamptz 字面量)'),
|
||
]
|
||
)
|
||
),
|
||
responses: [
|
||
new OA\Response(
|
||
response: 200,
|
||
description: '刷新成功',
|
||
content: new OA\JsonContent(properties: [
|
||
new OA\Property(property: 'code', type: 'integer', example: 0),
|
||
new OA\Property(property: 'message', type: 'string', example: '刷新成功'),
|
||
new OA\Property(property: 'data', properties: [
|
||
new OA\Property(property: 'view', type: 'string'),
|
||
new OA\Property(property: 'from', type: 'string', nullable: true),
|
||
new OA\Property(property: 'to', type: 'string', nullable: true),
|
||
new OA\Property(property: 'mode', type: 'string', enum: ['incremental', 'full_refresh', 'full_refresh_concurrent']),
|
||
], type: 'object'),
|
||
])
|
||
),
|
||
new OA\Response(response: 400, description: '参数错误或刷新失败', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
|
||
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
|
||
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
|
||
]
|
||
)]
|
||
#[RequestMapping(path: "refresh", methods: "POST")]
|
||
#[Middleware(AuthMiddleware::class)]
|
||
#[Middleware(PermissionMiddleware::class)]
|
||
public function refresh(): ResponseInterface|array
|
||
{
|
||
if ($forbidden = $this->requireAdmin()) return $forbidden;
|
||
|
||
$view = (string) $this->request->input('view', '');
|
||
$from = $this->request->input('from');
|
||
$to = $this->request->input('to');
|
||
|
||
if (!in_array($view, self::ALLOWED_VIEWS, true)) {
|
||
return $this->response->json([
|
||
'code' => 400,
|
||
'message' => 'invalid view: must be one of ' . implode(', ', self::ALLOWED_VIEWS),
|
||
'data' => null,
|
||
])->withStatus(400);
|
||
}
|
||
|
||
try {
|
||
if ($view === self::VIEW_BY_CREATED) {
|
||
Db::statement(
|
||
'CALL refresh_continuous_aggregate(?, ?::timestamptz, ?::timestamptz)',
|
||
[$view, $from, $to]
|
||
);
|
||
|
||
return [
|
||
'code' => 0,
|
||
'message' => '刷新成功',
|
||
'data' => [
|
||
'view' => $view,
|
||
'from' => $from,
|
||
'to' => $to,
|
||
'mode' => 'incremental',
|
||
],
|
||
];
|
||
}
|
||
|
||
// by_paid:PG 物化视图,from/to 无效
|
||
$rows = Db::select("SELECT ispopulated FROM pg_matviews WHERE matviewname = ?", [self::VIEW_BY_PAID]);
|
||
$populated = !empty($rows) && $rows[0]->ispopulated;
|
||
$mode = $populated ? 'full_refresh_concurrent' : 'full_refresh';
|
||
$sql = $populated
|
||
? 'REFRESH MATERIALIZED VIEW CONCURRENTLY ' . self::VIEW_BY_PAID
|
||
: 'REFRESH MATERIALIZED VIEW ' . self::VIEW_BY_PAID;
|
||
Db::statement($sql);
|
||
|
||
return [
|
||
'code' => 0,
|
||
'message' => '刷新成功',
|
||
'data' => [
|
||
'view' => $view,
|
||
'from' => null,
|
||
'to' => null,
|
||
'mode' => $mode,
|
||
],
|
||
];
|
||
} catch (\Throwable $e) {
|
||
return $this->response->json([
|
||
'code' => 400,
|
||
'message' => 'refresh failed: ' . $e->getMessage(),
|
||
'data' => null,
|
||
])->withStatus(400);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 列出连续聚合视图及其滞后秒数
|
||
*/
|
||
#[OA\Get(
|
||
path: '/admin/materialization/aggregates',
|
||
summary: '查询连续聚合滞后情况',
|
||
description: '查询 timescaledb_information.continuous_aggregates,附加由 cagg_watermark 计算的 lag_seconds 字段',
|
||
security: [['bearerAuth' => []]],
|
||
tags: ['Admin Materialization'],
|
||
responses: [
|
||
new OA\Response(
|
||
response: 200,
|
||
description: '获取成功',
|
||
content: new OA\JsonContent(properties: [
|
||
new OA\Property(property: 'code', type: 'integer', example: 0),
|
||
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
|
||
new OA\Property(property: 'data', properties: [
|
||
new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [
|
||
new OA\Property(property: 'view_name', type: 'string'),
|
||
new OA\Property(property: 'view_schema', type: 'string'),
|
||
new OA\Property(property: 'materialization_hypertable_name', type: 'string'),
|
||
new OA\Property(property: 'lag_seconds', type: 'number', format: 'double', nullable: true),
|
||
])),
|
||
], type: 'object'),
|
||
])
|
||
),
|
||
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
|
||
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
|
||
]
|
||
)]
|
||
#[RequestMapping(path: "aggregates", methods: "GET")]
|
||
#[Middleware(AuthMiddleware::class)]
|
||
#[Middleware(PermissionMiddleware::class)]
|
||
public function aggregates(): ResponseInterface|array
|
||
{
|
||
if ($forbidden = $this->requireAdmin()) return $forbidden;
|
||
|
||
$rows = Db::select(
|
||
"SELECT ca.view_name,
|
||
ca.view_schema,
|
||
ca.materialization_hypertable_name,
|
||
EXTRACT(EPOCH FROM (now() - to_timestamp(_timescaledb_functions.cagg_watermark(cagg.mat_hypertable_id)::float8 / 1000000))) AS lag_seconds
|
||
FROM timescaledb_information.continuous_aggregates ca
|
||
JOIN _timescaledb_catalog.continuous_agg cagg
|
||
ON cagg.user_view_name = ca.view_name
|
||
AND cagg.user_view_schema = ca.view_schema
|
||
ORDER BY ca.view_name"
|
||
);
|
||
|
||
return [
|
||
'code' => 0,
|
||
'message' => '获取成功',
|
||
'data' => ['items' => $rows],
|
||
];
|
||
}
|
||
|
||
/**
|
||
* 列出 TimescaleDB Crontab 任务及其最近执行状态
|
||
*/
|
||
#[OA\Get(
|
||
path: '/admin/materialization/jobs',
|
||
summary: '查询 Crontab 任务状态',
|
||
description: '查询 timescaledb_information.jobs LEFT JOIN job_stats,仅 policy_refresh_continuous_aggregate 类型',
|
||
security: [['bearerAuth' => []]],
|
||
tags: ['Admin Materialization'],
|
||
responses: [
|
||
new OA\Response(
|
||
response: 200,
|
||
description: '获取成功',
|
||
content: new OA\JsonContent(properties: [
|
||
new OA\Property(property: 'code', type: 'integer', example: 0),
|
||
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
|
||
new OA\Property(property: 'data', properties: [
|
||
new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [
|
||
new OA\Property(property: 'job_id', type: 'integer'),
|
||
new OA\Property(property: 'application_name', type: 'string'),
|
||
new OA\Property(property: 'proc_name', type: 'string'),
|
||
new OA\Property(property: 'hypertable_name', type: 'string', nullable: true),
|
||
new OA\Property(property: 'schedule_interval', type: 'string'),
|
||
new OA\Property(property: 'next_start', type: 'string', format: 'date-time', nullable: true),
|
||
new OA\Property(property: 'last_successful_finish', type: 'string', format: 'date-time', nullable: true),
|
||
new OA\Property(property: 'last_run_status', type: 'string', nullable: true),
|
||
])),
|
||
], type: 'object'),
|
||
])
|
||
),
|
||
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
|
||
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
|
||
]
|
||
)]
|
||
#[RequestMapping(path: "jobs", methods: "GET")]
|
||
#[Middleware(AuthMiddleware::class)]
|
||
#[Middleware(PermissionMiddleware::class)]
|
||
public function jobs(): ResponseInterface|array
|
||
{
|
||
if ($forbidden = $this->requireAdmin()) return $forbidden;
|
||
|
||
$rows = Db::select(
|
||
"SELECT j.job_id,
|
||
j.application_name,
|
||
j.proc_name,
|
||
j.hypertable_name,
|
||
j.schedule_interval,
|
||
j.next_start,
|
||
s.last_successful_finish,
|
||
s.last_run_status
|
||
FROM timescaledb_information.jobs j
|
||
LEFT JOIN timescaledb_information.job_stats s ON s.job_id = j.job_id
|
||
WHERE j.proc_name = 'policy_refresh_continuous_aggregate'
|
||
ORDER BY j.job_id"
|
||
);
|
||
|
||
return [
|
||
'code' => 0,
|
||
'message' => '获取成功',
|
||
'data' => ['items' => $rows],
|
||
];
|
||
}
|
||
|
||
private function requireAdmin(): ?ResponseInterface
|
||
{
|
||
$user = $this->getAuthUser();
|
||
if (!$user || !$user->isAdministrator()) {
|
||
return $this->response->json([
|
||
'code' => 403,
|
||
'message' => '仅管理员可访问',
|
||
])->withStatus(403);
|
||
}
|
||
return null;
|
||
}
|
||
}
|