diff --git a/backend/app/Command/RouteGroupSeedCommand.php b/backend/app/Command/RouteGroupSeedCommand.php index af8ab21..e16f10a 100644 --- a/backend/app/Command/RouteGroupSeedCommand.php +++ b/backend/app/Command/RouteGroupSeedCommand.php @@ -37,6 +37,7 @@ class RouteGroupSeedCommand extends HyperfCommand ['name' => 'user-permission', 'label' => '用户与权限', 'sort_order' => 8, 'patterns' => ['/api/v1/users', '/api/v1/roles', '/api/v1/route-groups', '/api/v1/routes']], ['name' => 'platform-management', 'label' => '平台管理', 'sort_order' => 9, 'patterns' => ['/api/v1/platforms']], ['name' => 'system', 'label' => '系统功能', 'sort_order' => 10, 'patterns' => ['/api/v1/me/', '/api/v1/dashboard', '/api/v1/mq', '/api/v1/failed-messages', '/api/v1/auth/']], + ['name' => 'materialization-admin', 'label' => '物化任务管理', 'sort_order' => 11, 'patterns' => ['/api/v1/admin/materialization/']], ]; $group_count = 0; diff --git a/backend/app/Controller/Api/V1/AdminMaterializationController.php b/backend/app/Controller/Api/V1/AdminMaterializationController.php new file mode 100644 index 0000000..616369b --- /dev/null +++ b/backend/app/Controller/Api/V1/AdminMaterializationController.php @@ -0,0 +1,346 @@ + []]], + tags: ['Admin Materialization'], + parameters: [ + new OA\Parameter(name: 'page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 1)), + new OA\Parameter(name: 'per_page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 20)), + new OA\Parameter(name: 'view', in: 'query', required: false, description: '聚合视图名(白名单)', schema: new OA\Schema(type: 'string', enum: self::ALLOWED_VIEWS)), + new OA\Parameter(name: 'from', in: 'query', required: false, description: 'refresh_date >= from(YYYY-MM-DD)', schema: new OA\Schema(type: 'string', format: 'date')), + new OA\Parameter(name: 'to', in: 'query', required: false, description: 'refresh_date <= to(YYYY-MM-DD)', schema: new OA\Schema(type: 'string', format: 'date')), + ], + responses: [ + new OA\Response( + response: 200, + description: '获取成功', + content: new OA\JsonContent(properties: [ + new OA\Property(property: 'code', type: 'integer', example: 0), + new OA\Property(property: 'message', type: 'string', example: '获取成功'), + new OA\Property(property: 'data', properties: [ + new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [ + new OA\Property(property: 'refresh_date', type: 'string', format: 'date'), + new OA\Property(property: 'aggregate_view', type: 'string'), + new OA\Property(property: 'created_at', type: 'string', format: 'date-time'), + ])), + new OA\Property(property: 'total', type: 'integer'), + new OA\Property(property: 'page', type: 'integer'), + new OA\Property(property: 'per_page', type: 'integer'), + ], type: 'object'), + ]) + ), + new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), + new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), + ] + )] + #[RequestMapping(path: "queue", methods: "GET")] + #[Middleware(AuthMiddleware::class)] + #[Middleware(PermissionMiddleware::class)] + public function queue(): ResponseInterface|array + { + if ($forbidden = $this->requireAdmin()) return $forbidden; + + $page = max((int) $this->request->input('page', 1), 1); + $per_page = min(max((int) $this->request->input('per_page', 20), 1), 100); + $view = $this->request->input('view'); + $from = $this->request->input('from'); + $to = $this->request->input('to'); + + $query = AggregateRefreshQueue::query(); + if ($view !== null && $view !== '') { + $query->where('aggregate_view', $view); + } + if ($from !== null && $from !== '') { + $query->where('refresh_date', '>=', $from); + } + if ($to !== null && $to !== '') { + $query->where('refresh_date', '<=', $to); + } + + $total = $query->count(); + $items = $query + ->orderBy('refresh_date') + ->orderBy('aggregate_view') + ->offset(($page - 1) * $per_page) + ->limit($per_page) + ->get(); + + return [ + 'code' => 0, + 'message' => '获取成功', + 'data' => [ + 'items' => $items, + 'total' => $total, + 'page' => $page, + 'per_page' => $per_page, + ], + ]; + } + + /** + * 手动触发指定时间窗口的物化刷新 + */ + #[OA\Post( + path: '/admin/materialization/refresh', + summary: '手动刷新物化对象', + description: 'by_created 走 refresh_continuous_aggregate(view, from, to);by_paid 是 PG 物化视图,from/to 无效,整体走 REFRESH MATERIALIZED VIEW [CONCURRENTLY]', + security: [['bearerAuth' => []]], + tags: ['Admin Materialization'], + requestBody: new OA\RequestBody( + required: true, + content: new OA\JsonContent( + required: ['view'], + properties: [ + new OA\Property(property: 'view', type: 'string', enum: self::ALLOWED_VIEWS), + new OA\Property(property: 'from', type: 'string', description: '起始时间戳(仅 by_created 生效,timestamptz 字面量)'), + new OA\Property(property: 'to', type: 'string', description: '结束时间戳(仅 by_created 生效,timestamptz 字面量)'), + ] + ) + ), + responses: [ + new OA\Response( + response: 200, + description: '刷新成功', + content: new OA\JsonContent(properties: [ + new OA\Property(property: 'code', type: 'integer', example: 0), + new OA\Property(property: 'message', type: 'string', example: '刷新成功'), + new OA\Property(property: 'data', properties: [ + new OA\Property(property: 'view', type: 'string'), + new OA\Property(property: 'from', type: 'string', nullable: true), + new OA\Property(property: 'to', type: 'string', nullable: true), + new OA\Property(property: 'mode', type: 'string', enum: ['incremental', 'full_refresh', 'full_refresh_concurrent']), + ], type: 'object'), + ]) + ), + new OA\Response(response: 400, description: '参数错误或刷新失败', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), + new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), + new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), + ] + )] + #[RequestMapping(path: "refresh", methods: "POST")] + #[Middleware(AuthMiddleware::class)] + #[Middleware(PermissionMiddleware::class)] + public function refresh(): ResponseInterface|array + { + if ($forbidden = $this->requireAdmin()) return $forbidden; + + $view = (string) $this->request->input('view', ''); + $from = $this->request->input('from'); + $to = $this->request->input('to'); + + if (!in_array($view, self::ALLOWED_VIEWS, true)) { + return $this->response->json([ + 'code' => 400, + 'message' => 'invalid view: must be one of ' . implode(', ', self::ALLOWED_VIEWS), + 'data' => null, + ])->withStatus(400); + } + + try { + if ($view === self::VIEW_BY_CREATED) { + Db::statement( + 'CALL refresh_continuous_aggregate(?, ?::timestamptz, ?::timestamptz)', + [$view, $from, $to] + ); + + return [ + 'code' => 0, + 'message' => '刷新成功', + 'data' => [ + 'view' => $view, + 'from' => $from, + 'to' => $to, + 'mode' => 'incremental', + ], + ]; + } + + // by_paid:PG 物化视图,from/to 无效 + $rows = Db::select("SELECT ispopulated FROM pg_matviews WHERE matviewname = ?", [self::VIEW_BY_PAID]); + $populated = !empty($rows) && $rows[0]->ispopulated; + $mode = $populated ? 'full_refresh_concurrent' : 'full_refresh'; + $sql = $populated + ? 'REFRESH MATERIALIZED VIEW CONCURRENTLY ' . self::VIEW_BY_PAID + : 'REFRESH MATERIALIZED VIEW ' . self::VIEW_BY_PAID; + Db::statement($sql); + + return [ + 'code' => 0, + 'message' => '刷新成功', + 'data' => [ + 'view' => $view, + 'from' => null, + 'to' => null, + 'mode' => $mode, + ], + ]; + } catch (\Throwable $e) { + return $this->response->json([ + 'code' => 400, + 'message' => 'refresh failed: ' . $e->getMessage(), + 'data' => null, + ])->withStatus(400); + } + } + + /** + * 列出连续聚合视图及其滞后秒数 + */ + #[OA\Get( + path: '/admin/materialization/aggregates', + summary: '查询连续聚合滞后情况', + description: '查询 timescaledb_information.continuous_aggregates,附加由 cagg_watermark 计算的 lag_seconds 字段', + security: [['bearerAuth' => []]], + tags: ['Admin Materialization'], + responses: [ + new OA\Response( + response: 200, + description: '获取成功', + content: new OA\JsonContent(properties: [ + new OA\Property(property: 'code', type: 'integer', example: 0), + new OA\Property(property: 'message', type: 'string', example: '获取成功'), + new OA\Property(property: 'data', properties: [ + new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [ + new OA\Property(property: 'view_name', type: 'string'), + new OA\Property(property: 'view_schema', type: 'string'), + new OA\Property(property: 'materialization_hypertable_name', type: 'string'), + new OA\Property(property: 'lag_seconds', type: 'number', format: 'double', nullable: true), + ])), + ], type: 'object'), + ]) + ), + new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), + new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), + ] + )] + #[RequestMapping(path: "aggregates", methods: "GET")] + #[Middleware(AuthMiddleware::class)] + #[Middleware(PermissionMiddleware::class)] + public function aggregates(): ResponseInterface|array + { + if ($forbidden = $this->requireAdmin()) return $forbidden; + + $rows = Db::select( + "SELECT ca.view_name, + ca.view_schema, + ca.materialization_hypertable_name, + EXTRACT(EPOCH FROM (now() - to_timestamp(_timescaledb_functions.cagg_watermark(cagg.mat_hypertable_id)::float8 / 1000000))) AS lag_seconds + FROM timescaledb_information.continuous_aggregates ca + JOIN _timescaledb_catalog.continuous_agg cagg + ON cagg.user_view_name = ca.view_name + AND cagg.user_view_schema = ca.view_schema + ORDER BY ca.view_name" + ); + + return [ + 'code' => 0, + 'message' => '获取成功', + 'data' => ['items' => $rows], + ]; + } + + /** + * 列出 TimescaleDB Crontab 任务及其最近执行状态 + */ + #[OA\Get( + path: '/admin/materialization/jobs', + summary: '查询 Crontab 任务状态', + description: '查询 timescaledb_information.jobs LEFT JOIN job_stats,仅 policy_refresh_continuous_aggregate 类型', + security: [['bearerAuth' => []]], + tags: ['Admin Materialization'], + responses: [ + new OA\Response( + response: 200, + description: '获取成功', + content: new OA\JsonContent(properties: [ + new OA\Property(property: 'code', type: 'integer', example: 0), + new OA\Property(property: 'message', type: 'string', example: '获取成功'), + new OA\Property(property: 'data', properties: [ + new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [ + new OA\Property(property: 'job_id', type: 'integer'), + new OA\Property(property: 'application_name', type: 'string'), + new OA\Property(property: 'proc_name', type: 'string'), + new OA\Property(property: 'hypertable_name', type: 'string', nullable: true), + new OA\Property(property: 'schedule_interval', type: 'string'), + new OA\Property(property: 'next_start', type: 'string', format: 'date-time', nullable: true), + new OA\Property(property: 'last_successful_finish', type: 'string', format: 'date-time', nullable: true), + new OA\Property(property: 'last_run_status', type: 'string', nullable: true), + ])), + ], type: 'object'), + ]) + ), + new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), + new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')), + ] + )] + #[RequestMapping(path: "jobs", methods: "GET")] + #[Middleware(AuthMiddleware::class)] + #[Middleware(PermissionMiddleware::class)] + public function jobs(): ResponseInterface|array + { + if ($forbidden = $this->requireAdmin()) return $forbidden; + + $rows = Db::select( + "SELECT j.job_id, + j.application_name, + j.proc_name, + j.hypertable_name, + j.schedule_interval, + j.next_start, + s.last_successful_finish, + s.last_run_status + FROM timescaledb_information.jobs j + LEFT JOIN timescaledb_information.job_stats s ON s.job_id = j.job_id + WHERE j.proc_name = 'policy_refresh_continuous_aggregate' + ORDER BY j.job_id" + ); + + return [ + 'code' => 0, + 'message' => '获取成功', + 'data' => ['items' => $rows], + ]; + } + + private function requireAdmin(): ?ResponseInterface + { + $user = $this->getAuthUser(); + if (!$user || !$user->isAdministrator()) { + return $this->response->json([ + 'code' => 403, + 'message' => '仅管理员可访问', + ])->withStatus(403); + } + return null; + } +} diff --git a/backend/test/Cases/Integration/Materialization/AdminMaterializationControllerTest.php b/backend/test/Cases/Integration/Materialization/AdminMaterializationControllerTest.php new file mode 100644 index 0000000..a58f214 --- /dev/null +++ b/backend/test/Cases/Integration/Materialization/AdminMaterializationControllerTest.php @@ -0,0 +1,161 @@ +where('name', 'administrator')->firstOrFail(); + } + + protected function getAdminAuthToken(): string + { + $admin_role = $this->fetchAdminRole(); + $user = User::query() + ->where('status', 1) + ->where('role_id', $admin_role->id) + ->first(); + if (!$user) { + $this->markTestSkipped('没有可用的 administrator 用户,无法测试'); + } + + $auth = make(AuthManager::class); + return $auth->guard('jwt')->login($user); + } + + protected function adminHeaders(): array + { + return ['Authorization' => 'Bearer ' . $this->getAdminAuthToken()]; + } + + protected function getNonAdminToken(): array + { + $suffix = 'mat_nonadmin_' . uniqid(); + $user = User::query()->create([ + 'username' => $suffix, + 'password' => 'Pass_' . $suffix, + 'email' => $suffix . '@example.com', + 'status' => 1, + 'api_key_enabled' => true, + ]); + $auth = make(AuthManager::class); + $token = $auth->guard('jwt')->login($user); + return ['Authorization' => 'Bearer ' . $token]; + } + + public function test_queue_lists_pending(): void + { + $date = '2030-12-31'; + $view = 'orders_daily_by_created'; + + AggregateRefreshQueue::query()->insertOrIgnore([[ + 'refresh_date' => $date, + 'aggregate_view' => $view, + 'created_at' => Carbon::now(), + ]]); + + try { + $response = $this->get( + '/api/v1/admin/materialization/queue', + ['view' => $view, 'from' => $date, 'to' => $date], + $this->adminHeaders() + ); + + $response->assertStatus(200); + $response->assertJsonPath('code', 0); + + $body = json_decode($response->getBody()->getContents(), true); + $this->assertArrayHasKey('items', $body['data']); + $this->assertArrayHasKey('total', $body['data']); + $this->assertArrayHasKey('page', $body['data']); + $this->assertArrayHasKey('per_page', $body['data']); + $this->assertGreaterThanOrEqual(1, $body['data']['total']); + + $found = false; + foreach ($body['data']['items'] as $item) { + if ($item['refresh_date'] === $date && $item['aggregate_view'] === $view) { + $found = true; + break; + } + } + $this->assertTrue($found, '应在 queue 列表中找到刚插入的 fixture 行'); + } finally { + AggregateRefreshQueue::query() + ->where('refresh_date', $date) + ->where('aggregate_view', $view) + ->delete(); + } + } + + public function test_refresh_validates_view_whitelist(): void + { + $response = $this->post( + '/api/v1/admin/materialization/refresh', + [ + 'view' => 'evil_view', + 'from' => '2026-01-01 00:00:00+00', + 'to' => '2026-01-02 00:00:00+00', + ], + $this->adminHeaders() + ); + + $response->assertStatus(400); + $body = json_decode($response->getBody()->getContents(), true); + $this->assertSame(400, $body['code']); + $this->assertStringContainsString('view', $body['message']); + } + + public function test_aggregates_returns_lag_seconds(): void + { + $response = $this->get('/api/v1/admin/materialization/aggregates', [], $this->adminHeaders()); + + $response->assertStatus(200); + $response->assertJsonPath('code', 0); + + $body = json_decode($response->getBody()->getContents(), true); + $this->assertArrayHasKey('items', $body['data']); + $items = $body['data']['items']; + $this->assertNotEmpty($items, 'aggregates 应至少返回一条连续聚合记录(orders_daily_by_created)'); + $this->assertArrayHasKey('view_name', $items[0]); + $this->assertArrayHasKey('lag_seconds', $items[0]); + } + + public function test_jobs_lists_refresh_policy(): void + { + $response = $this->get('/api/v1/admin/materialization/jobs', [], $this->adminHeaders()); + + $response->assertStatus(200); + $response->assertJsonPath('code', 0); + + $body = json_decode($response->getBody()->getContents(), true); + $this->assertArrayHasKey('items', $body['data']); + // by_created 注册了 1 条 policy_refresh_continuous_aggregate;by_paid 由 Hyperf Crontab 调度,不入此表 + $this->assertGreaterThanOrEqual(1, count($body['data']['items'])); + $this->assertSame('policy_refresh_continuous_aggregate', $body['data']['items'][0]['proc_name']); + } + + public function test_non_admin_blocked(): void + { + $headers = $this->getNonAdminToken(); + $response = $this->get('/api/v1/admin/materialization/queue', [], $headers); + $response->assertStatus(403); + } +}