[]]], tags: ['Admin Materialization'], parameters: [ new OA\Parameter(name: 'page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 1)), new OA\Parameter(name: 'per_page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 20)), new OA\Parameter(name: 'view', in: 'query', required: false, description: '聚合视图名(白名单)', schema: new OA\Schema(type: 'string', enum: self::ALLOWED_VIEWS)), new OA\Parameter(name: 'from', in: 'query', required: false, description: 'refresh_date >= from(YYYY-MM-DD)', schema: new OA\Schema(type: 'string', format: 'date')), new OA\Parameter(name: 'to', in: 'query', required: false, description: 'refresh_date <= to(YYYY-MM-DD)', schema: new OA\Schema(type: 'string', format: 'date')), ], responses: [ new OA\Response( response: 200, description: '获取成功', content: new OA\JsonContent(properties: [ new OA\Property(property: 'code', type: 'integer', example: 0), new OA\Property(property: 'message', type: 'string', example: '获取成功'), new OA\Property(property: 'data', properties: [ new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [ new OA\Property(property: 'refresh_date', type: 'string', format: 'date'), new OA\Property(property: 'aggregate_view', type: 'string'), new OA\Property(property: 'created_at', type: 'string', format: 'date-time'), ])), new OA\Property(property: 'total', type: 'integer'), new OA\Property(property: 'page', type: 'integer'), new OA\Property(property: 'per_page', type: 'integer'), ], type: 'object'), ]) ), new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), ] )] #[RequestMapping(path: "queue", methods: "GET")] #[Middleware(AuthMiddleware::class)] #[Middleware(PermissionMiddleware::class)] public function queue(): ResponseInterface|array { if ($forbidden = $this->requireAdmin()) return $forbidden; $page = max((int) $this->request->input('page', 1), 1); $per_page = min(max((int) $this->request->input('per_page', 20), 1), 100); $view = $this->request->input('view'); $from = $this->request->input('from'); $to = $this->request->input('to'); $query = AggregateRefreshQueue::query(); if ($view !== null && $view !== '') { $query->where('aggregate_view', $view); } if ($from !== null && $from !== '') { $query->where('refresh_date', '>=', $from); } if ($to !== null && $to !== '') { $query->where('refresh_date', '<=', $to); } $total = $query->count(); $items = $query ->orderBy('refresh_date') ->orderBy('aggregate_view') ->offset(($page - 1) * $per_page) ->limit($per_page) ->get(); return [ 'code' => 0, 'message' => '获取成功', 'data' => [ 'items' => $items, 'total' => $total, 'page' => $page, 'per_page' => $per_page, ], ]; } /** * 手动触发指定时间窗口的物化刷新 */ #[OA\Post( path: '/admin/materialization/refresh', summary: '手动刷新物化对象', description: 'by_created 走 refresh_continuous_aggregate(view, from, to);by_paid 是 PG 物化视图,from/to 无效,整体走 REFRESH MATERIALIZED VIEW [CONCURRENTLY]', security: [['bearerAuth' => []]], tags: ['Admin Materialization'], requestBody: new OA\RequestBody( required: true, content: new OA\JsonContent( required: ['view'], properties: [ new OA\Property(property: 'view', type: 'string', enum: self::ALLOWED_VIEWS), new OA\Property(property: 'from', type: 'string', description: '起始时间戳(仅 by_created 生效,timestamptz 字面量)'), new OA\Property(property: 'to', type: 'string', description: '结束时间戳(仅 by_created 生效,timestamptz 字面量)'), ] ) ), responses: [ new OA\Response( response: 200, description: '刷新成功', content: new OA\JsonContent(properties: [ new OA\Property(property: 'code', type: 'integer', example: 0), new OA\Property(property: 'message', type: 'string', example: '刷新成功'), new OA\Property(property: 'data', properties: [ new OA\Property(property: 'view', type: 'string'), new OA\Property(property: 'from', type: 'string', nullable: true), new OA\Property(property: 'to', type: 'string', nullable: true), new OA\Property(property: 'mode', type: 'string', enum: ['incremental', 'full_refresh', 'full_refresh_concurrent']), ], type: 'object'), ]) ), new OA\Response(response: 400, description: '参数错误或刷新失败', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), ] )] #[RequestMapping(path: "refresh", methods: "POST")] #[Middleware(AuthMiddleware::class)] #[Middleware(PermissionMiddleware::class)] public function refresh(): ResponseInterface|array { if ($forbidden = $this->requireAdmin()) return $forbidden; $view = (string) $this->request->input('view', ''); $from = $this->request->input('from'); $to = $this->request->input('to'); if (!in_array($view, self::ALLOWED_VIEWS, true)) { return $this->response->json([ 'code' => 400, 'message' => 'invalid view: must be one of ' . implode(', ', self::ALLOWED_VIEWS), 'data' => null, ])->withStatus(400); } try { if ($view === self::VIEW_BY_CREATED) { Db::statement( 'CALL refresh_continuous_aggregate(?, ?::timestamptz, ?::timestamptz)', [$view, $from, $to] ); return [ 'code' => 0, 'message' => '刷新成功', 'data' => [ 'view' => $view, 'from' => $from, 'to' => $to, 'mode' => 'incremental', ], ]; } // by_paid:PG 物化视图,from/to 无效 $rows = Db::select("SELECT ispopulated FROM pg_matviews WHERE matviewname = ?", [self::VIEW_BY_PAID]); $populated = !empty($rows) && $rows[0]->ispopulated; $mode = $populated ? 'full_refresh_concurrent' : 'full_refresh'; $sql = $populated ? 'REFRESH MATERIALIZED VIEW CONCURRENTLY ' . self::VIEW_BY_PAID : 'REFRESH MATERIALIZED VIEW ' . self::VIEW_BY_PAID; Db::statement($sql); return [ 'code' => 0, 'message' => '刷新成功', 'data' => [ 'view' => $view, 'from' => null, 'to' => null, 'mode' => $mode, ], ]; } catch (\Throwable $e) { return $this->response->json([ 'code' => 400, 'message' => 'refresh failed: ' . $e->getMessage(), 'data' => null, ])->withStatus(400); } } /** * 列出连续聚合视图及其滞后秒数 */ #[OA\Get( path: '/admin/materialization/aggregates', summary: '查询连续聚合滞后情况', description: '查询 timescaledb_information.continuous_aggregates,附加由 cagg_watermark 计算的 lag_seconds 字段', security: [['bearerAuth' => []]], tags: ['Admin Materialization'], responses: [ new OA\Response( response: 200, description: '获取成功', content: new OA\JsonContent(properties: [ new OA\Property(property: 'code', type: 'integer', example: 0), new OA\Property(property: 'message', type: 'string', example: '获取成功'), new OA\Property(property: 'data', properties: [ new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [ new OA\Property(property: 'view_name', type: 'string'), new OA\Property(property: 'view_schema', type: 'string'), new OA\Property(property: 'materialization_hypertable_name', type: 'string'), new OA\Property(property: 'lag_seconds', type: 'number', format: 'double', nullable: true), ])), ], type: 'object'), ]) ), new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), ] )] #[RequestMapping(path: "aggregates", methods: "GET")] #[Middleware(AuthMiddleware::class)] #[Middleware(PermissionMiddleware::class)] public function aggregates(): ResponseInterface|array { if ($forbidden = $this->requireAdmin()) return $forbidden; $rows = Db::select( "SELECT ca.view_name, ca.view_schema, ca.materialization_hypertable_name, EXTRACT(EPOCH FROM (now() - to_timestamp(_timescaledb_functions.cagg_watermark(cagg.mat_hypertable_id)::float8 / 1000000))) AS lag_seconds FROM timescaledb_information.continuous_aggregates ca JOIN _timescaledb_catalog.continuous_agg cagg ON cagg.user_view_name = ca.view_name AND cagg.user_view_schema = ca.view_schema ORDER BY ca.view_name" ); return [ 'code' => 0, 'message' => '获取成功', 'data' => ['items' => $rows], ]; } /** * 列出 TimescaleDB Crontab 任务及其最近执行状态 */ #[OA\Get( path: '/admin/materialization/jobs', summary: '查询 Crontab 任务状态', description: '查询 timescaledb_information.jobs LEFT JOIN job_stats,仅 policy_refresh_continuous_aggregate 类型', security: [['bearerAuth' => []]], tags: ['Admin Materialization'], responses: [ new OA\Response( response: 200, description: '获取成功', content: new OA\JsonContent(properties: [ new OA\Property(property: 'code', type: 'integer', example: 0), new OA\Property(property: 'message', type: 'string', example: '获取成功'), new OA\Property(property: 'data', properties: [ new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [ new OA\Property(property: 'job_id', type: 'integer'), new OA\Property(property: 'application_name', type: 'string'), new OA\Property(property: 'proc_name', type: 'string'), new OA\Property(property: 'hypertable_name', type: 'string', nullable: true), new OA\Property(property: 'schedule_interval', type: 'string'), new OA\Property(property: 'next_start', type: 'string', format: 'date-time', nullable: true), new OA\Property(property: 'last_successful_finish', type: 'string', format: 'date-time', nullable: true), new OA\Property(property: 'last_run_status', type: 'string', nullable: true), ])), ], type: 'object'), ]) ), new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), ] )] #[RequestMapping(path: "jobs", methods: "GET")] #[Middleware(AuthMiddleware::class)] #[Middleware(PermissionMiddleware::class)] public function jobs(): ResponseInterface|array { if ($forbidden = $this->requireAdmin()) return $forbidden; $rows = Db::select( "SELECT j.job_id, j.application_name, j.proc_name, j.hypertable_name, j.schedule_interval, j.next_start, s.last_successful_finish, s.last_run_status FROM timescaledb_information.jobs j LEFT JOIN timescaledb_information.job_stats s ON s.job_id = j.job_id WHERE j.proc_name = 'policy_refresh_continuous_aggregate' ORDER BY j.job_id" ); return [ 'code' => 0, 'message' => '获取成功', 'data' => ['items' => $rows], ]; } private function requireAdmin(): ?ResponseInterface { $user = $this->getAuthUser(); if (!$user || !$user->isAdministrator()) { return $this->response->json([ 'code' => 403, 'message' => '仅管理员可访问', ])->withStatus(403); } return null; } }