Compare commits

...

10 Commits

Author SHA1 Message Date
nz eae665d66f add admin materialize 2026-05-07 21:25:38 +08:00
nz 349f8e11b0 add aggr 2026-05-07 20:51:38 +08:00
nz 1e7de46c26 add backfill 2026-05-07 16:00:10 +08:00
nz 597d8ae948 add daily paid view 2026-05-07 14:21:30 +08:00
nz 785726caac add timescaledb extension migration and orders_daily_by_created continuous aggregate
P22.1 of Stage 22 materialization layer infrastructure. Adds explicit
CREATE EXTENSION migration so clean environments can run `migrate` end-to-end
without ops manual setup, then creates the first continuous aggregate
orders_daily_by_created (day-bucketed, grouped by company/platform/store)
with WITH NO DATA. Five composite indexes cover the high-frequency query
patterns documented in docs/data_query.md. P22.2/P22.3 will add the
by-paid view, refresh policy and historical backfill on top.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-07 12:07:17 +08:00
nz efc5cabfbb update sku mapping 2026-04-20 13:50:20 +08:00
nz dd80286e23 update sku mapping 2026-04-20 13:26:48 +08:00
nz 93518fd031 add route group sync 2026-04-20 13:13:17 +08:00
nz 95ec0f16aa update 2026-04-20 11:06:55 +08:00
nz 7898beef5a update api key manage 2026-04-20 10:40:48 +08:00
38 changed files with 1888 additions and 67 deletions
@@ -0,0 +1,46 @@
<?php
declare(strict_types=1);
namespace App\Command;
use Hyperf\Command\Annotation\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\DbConnection\Db;
#[Command]
class OrderAggregatesBackfillCommand extends HyperfCommand
{
public function __construct()
{
parent::__construct('orders:backfill-aggregates');
}
public function configure(): void
{
parent::configure();
$this->setDescription('一次性回填 orders_daily_by_created(连续聚合)和 orders_daily_by_paid(物化视图)的全部历史数据');
}
public function handle(): void
{
// 1. orders_daily_by_created:调用 TimescaleDB 内置 refresh
$this->line('Refreshing orders_daily_by_created (NULL → now() - 1 hour) ...');
Db::statement("CALL refresh_continuous_aggregate('orders_daily_by_created', NULL, now() - INTERVAL '1 hour')");
// 2. orders_daily_by_paidPG 物化视图。首次必须用非 CONCURRENTLY 模式填充,
// 后续重算才能走 CONCURRENTLYPG 硬约束:CONCURRENTLY cannot be used when not populated)。
$rows = Db::select("SELECT ispopulated FROM pg_matviews WHERE matviewname = 'orders_daily_by_paid'");
$populated = ! empty($rows) && $rows[0]->ispopulated;
if ($populated) {
$this->line('Refreshing orders_daily_by_paid (CONCURRENTLY) ...');
Db::statement('REFRESH MATERIALIZED VIEW CONCURRENTLY orders_daily_by_paid');
} else {
$this->line('Initial population of orders_daily_by_paid (non-concurrent) ...');
Db::statement('REFRESH MATERIALIZED VIEW orders_daily_by_paid');
}
$this->info('Done.');
}
}
@@ -0,0 +1,35 @@
<?php
declare(strict_types=1);
namespace App\Command;
use App\Service\OrderAggregatesRefreshJob;
use Hyperf\Command\Annotation\Command;
use Hyperf\Command\Command as HyperfCommand;
/**
* 手动触发聚合刷新队列消费
*
* 与 Crontab 共用 OrderAggregatesRefreshJob,便于运维即时补刷或调试。
*/
#[Command]
class OrderAggregatesRefreshCommand extends HyperfCommand
{
public function __construct(private OrderAggregatesRefreshJob $job)
{
parent::__construct('orders:refresh-aggregates');
}
public function configure(): void
{
parent::configure();
$this->setDescription('消费 aggregate_refresh_queue,对滞后日期调用 refresh_continuous_aggregate');
}
public function handle(): void
{
$result = $this->job->run();
$this->info(sprintf('Processed: %d, Failed: %d', $result['processed'], $result['failed']));
}
}
@@ -37,6 +37,7 @@ class RouteGroupSeedCommand extends HyperfCommand
['name' => 'user-permission', 'label' => '用户与权限', 'sort_order' => 8, 'patterns' => ['/api/v1/users', '/api/v1/roles', '/api/v1/route-groups', '/api/v1/routes']],
['name' => 'platform-management', 'label' => '平台管理', 'sort_order' => 9, 'patterns' => ['/api/v1/platforms']],
['name' => 'system', 'label' => '系统功能', 'sort_order' => 10, 'patterns' => ['/api/v1/me/', '/api/v1/dashboard', '/api/v1/mq', '/api/v1/failed-messages', '/api/v1/auth/']],
['name' => 'materialization-admin', 'label' => '物化任务管理', 'sort_order' => 11, 'patterns' => ['/api/v1/admin/materialization/']],
];
$group_count = 0;
@@ -21,18 +21,19 @@ class AdminApiKeyController extends AbstractController
/**
* 管理员列出所有 API Keys
*
* 支持按 user_id、enabled 筛选,关联用户信息
* 支持按 username、email、enabled 筛选,关联用户信息(含 email
*/
#[OA\Get(
path: '/admin/api-keys',
summary: '管理员列出所有 API Keys',
description: '分页列出所有用户的 API Keys,支持按 user_id、enabled 筛选,关联用户基本信息',
description: '分页列出所有用户的 API Keys,支持按 username、email、enabled 筛选,关联用户基本信息',
security: [['bearerAuth' => []]],
tags: ['Admin API Keys'],
parameters: [
new OA\Parameter(name: 'page', in: 'query', required: false, description: '页码,默认 1', schema: new OA\Schema(type: 'integer', default: 1)),
new OA\Parameter(name: 'per_page', in: 'query', required: false, description: '每页条数,默认 15,最大 100', schema: new OA\Schema(type: 'integer', default: 15)),
new OA\Parameter(name: 'user_id', in: 'query', required: false, description: '按用户 ID 筛选', schema: new OA\Schema(type: 'integer')),
new OA\Parameter(name: 'username', in: 'query', required: false, description: '按用户名模糊搜索', schema: new OA\Schema(type: 'string')),
new OA\Parameter(name: 'email', in: 'query', required: false, description: '按邮箱模糊搜索', schema: new OA\Schema(type: 'string')),
new OA\Parameter(name: 'enabled', in: 'query', required: false, description: '按启用状态筛选(0/1', schema: new OA\Schema(type: 'integer', enum: [0, 1])),
],
responses: [
@@ -55,6 +56,7 @@ class AdminApiKeyController extends AbstractController
new OA\Property(property: 'user', properties: [
new OA\Property(property: 'id', type: 'integer'),
new OA\Property(property: 'username', type: 'string'),
new OA\Property(property: 'email', type: 'string'),
new OA\Property(property: 'api_key_enabled', type: 'boolean'),
], type: 'object'),
])),
@@ -71,17 +73,29 @@ class AdminApiKeyController extends AbstractController
#[RequestMapping(path: "", methods: "GET")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
public function index(): array
public function index(): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$page = (int) $this->request->input('page', 1);
$per_page = min((int) $this->request->input('per_page', 15), 100);
$query = ApiKey::query()->with('user:id,username,api_key_enabled');
$query = ApiKey::query()->with('user:id,username,email,api_key_enabled');
// 按用户 ID 筛选
$user_id = $this->request->input('user_id');
if ($user_id !== null && $user_id !== '') {
$query->where('user_id', (int) $user_id);
// 按用户名模糊搜索
$username = $this->request->input('username');
if ($username !== null && $username !== '') {
$query->whereHas('user', function ($q) use ($username) {
$q->where('username', 'like', '%' . $username . '%');
});
}
// 按邮箱模糊搜索
$email = $this->request->input('email');
if ($email !== null && $email !== '') {
$query->whereHas('user', function ($q) use ($email) {
$q->where('email', 'like', '%' . $email . '%');
});
}
// 按启用状态筛选
@@ -155,6 +169,8 @@ class AdminApiKeyController extends AbstractController
#[Middleware(PermissionMiddleware::class)]
public function toggle(int $id): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$api_key = ApiKey::query()->find($id);
if (!$api_key) {
@@ -213,6 +229,8 @@ class AdminApiKeyController extends AbstractController
#[Middleware(PermissionMiddleware::class)]
public function destroy(int $id): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$api_key = ApiKey::query()->find($id);
if (!$api_key) {
@@ -229,4 +247,16 @@ class AdminApiKeyController extends AbstractController
'message' => '删除成功',
];
}
private function requireAdmin(): ?ResponseInterface
{
$user = $this->getAuthUser();
if (!$user || !$user->isAdministrator()) {
return $this->response->json([
'code' => 403,
'message' => '仅管理员可访问',
])->withStatus(403);
}
return null;
}
}
@@ -0,0 +1,346 @@
<?php
declare(strict_types=1);
namespace App\Controller\Api\V1;
use App\Controller\AbstractController;
use App\Middleware\AuthMiddleware;
use App\Middleware\PermissionMiddleware;
use App\Model\AggregateRefreshQueue;
use Hyperf\DbConnection\Db;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\Middleware;
use Hyperf\HttpServer\Annotation\RequestMapping;
use OpenApi\Attributes as OA;
use Psr\Http\Message\ResponseInterface;
#[OA\Tag(name: 'Admin Materialization', description: '管理员物化层监控')]
#[Controller(prefix: "/api/v1/admin/materialization")]
class AdminMaterializationController extends AbstractController
{
private const VIEW_BY_CREATED = 'orders_daily_by_created';
private const VIEW_BY_PAID = 'orders_daily_by_paid';
private const ALLOWED_VIEWS = [self::VIEW_BY_CREATED, self::VIEW_BY_PAID];
/**
* 列出 aggregate_refresh_queue 待刷新条目
*/
#[OA\Get(
path: '/admin/materialization/queue',
summary: '列出聚合刷新队列',
description: '分页列出 aggregate_refresh_queue 表中的待刷新日期,支持按 view/from/to 过滤',
security: [['bearerAuth' => []]],
tags: ['Admin Materialization'],
parameters: [
new OA\Parameter(name: 'page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 1)),
new OA\Parameter(name: 'per_page', in: 'query', required: false, schema: new OA\Schema(type: 'integer', default: 20)),
new OA\Parameter(name: 'view', in: 'query', required: false, description: '聚合视图名(白名单)', schema: new OA\Schema(type: 'string', enum: self::ALLOWED_VIEWS)),
new OA\Parameter(name: 'from', in: 'query', required: false, description: 'refresh_date >= fromYYYY-MM-DD', schema: new OA\Schema(type: 'string', format: 'date')),
new OA\Parameter(name: 'to', in: 'query', required: false, description: 'refresh_date <= toYYYY-MM-DD', schema: new OA\Schema(type: 'string', format: 'date')),
],
responses: [
new OA\Response(
response: 200,
description: '获取成功',
content: new OA\JsonContent(properties: [
new OA\Property(property: 'code', type: 'integer', example: 0),
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
new OA\Property(property: 'data', properties: [
new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [
new OA\Property(property: 'refresh_date', type: 'string', format: 'date'),
new OA\Property(property: 'aggregate_view', type: 'string'),
new OA\Property(property: 'created_at', type: 'string', format: 'date-time'),
])),
new OA\Property(property: 'total', type: 'integer'),
new OA\Property(property: 'page', type: 'integer'),
new OA\Property(property: 'per_page', type: 'integer'),
], type: 'object'),
])
),
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
]
)]
#[RequestMapping(path: "queue", methods: "GET")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
public function queue(): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$page = max((int) $this->request->input('page', 1), 1);
$per_page = min(max((int) $this->request->input('per_page', 20), 1), 100);
$view = $this->request->input('view');
$from = $this->request->input('from');
$to = $this->request->input('to');
$query = AggregateRefreshQueue::query();
if ($view !== null && $view !== '') {
$query->where('aggregate_view', $view);
}
if ($from !== null && $from !== '') {
$query->where('refresh_date', '>=', $from);
}
if ($to !== null && $to !== '') {
$query->where('refresh_date', '<=', $to);
}
$total = $query->count();
$items = $query
->orderBy('refresh_date')
->orderBy('aggregate_view')
->offset(($page - 1) * $per_page)
->limit($per_page)
->get();
return [
'code' => 0,
'message' => '获取成功',
'data' => [
'items' => $items,
'total' => $total,
'page' => $page,
'per_page' => $per_page,
],
];
}
/**
* 手动触发指定时间窗口的物化刷新
*/
#[OA\Post(
path: '/admin/materialization/refresh',
summary: '手动刷新物化对象',
description: 'by_created 走 refresh_continuous_aggregate(view, from, to)by_paid 是 PG 物化视图,from/to 无效,整体走 REFRESH MATERIALIZED VIEW [CONCURRENTLY]',
security: [['bearerAuth' => []]],
tags: ['Admin Materialization'],
requestBody: new OA\RequestBody(
required: true,
content: new OA\JsonContent(
required: ['view'],
properties: [
new OA\Property(property: 'view', type: 'string', enum: self::ALLOWED_VIEWS),
new OA\Property(property: 'from', type: 'string', description: '起始时间戳(仅 by_created 生效,timestamptz 字面量)'),
new OA\Property(property: 'to', type: 'string', description: '结束时间戳(仅 by_created 生效,timestamptz 字面量)'),
]
)
),
responses: [
new OA\Response(
response: 200,
description: '刷新成功',
content: new OA\JsonContent(properties: [
new OA\Property(property: 'code', type: 'integer', example: 0),
new OA\Property(property: 'message', type: 'string', example: '刷新成功'),
new OA\Property(property: 'data', properties: [
new OA\Property(property: 'view', type: 'string'),
new OA\Property(property: 'from', type: 'string', nullable: true),
new OA\Property(property: 'to', type: 'string', nullable: true),
new OA\Property(property: 'mode', type: 'string', enum: ['incremental', 'full_refresh', 'full_refresh_concurrent']),
], type: 'object'),
])
),
new OA\Response(response: 400, description: '参数错误或刷新失败', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
]
)]
#[RequestMapping(path: "refresh", methods: "POST")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
public function refresh(): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$view = (string) $this->request->input('view', '');
$from = $this->request->input('from');
$to = $this->request->input('to');
if (!in_array($view, self::ALLOWED_VIEWS, true)) {
return $this->response->json([
'code' => 400,
'message' => 'invalid view: must be one of ' . implode(', ', self::ALLOWED_VIEWS),
'data' => null,
])->withStatus(400);
}
try {
if ($view === self::VIEW_BY_CREATED) {
Db::statement(
'CALL refresh_continuous_aggregate(?, ?::timestamptz, ?::timestamptz)',
[$view, $from, $to]
);
return [
'code' => 0,
'message' => '刷新成功',
'data' => [
'view' => $view,
'from' => $from,
'to' => $to,
'mode' => 'incremental',
],
];
}
// by_paidPG 物化视图,from/to 无效
$rows = Db::select("SELECT ispopulated FROM pg_matviews WHERE matviewname = ?", [self::VIEW_BY_PAID]);
$populated = !empty($rows) && $rows[0]->ispopulated;
$mode = $populated ? 'full_refresh_concurrent' : 'full_refresh';
$sql = $populated
? 'REFRESH MATERIALIZED VIEW CONCURRENTLY ' . self::VIEW_BY_PAID
: 'REFRESH MATERIALIZED VIEW ' . self::VIEW_BY_PAID;
Db::statement($sql);
return [
'code' => 0,
'message' => '刷新成功',
'data' => [
'view' => $view,
'from' => null,
'to' => null,
'mode' => $mode,
],
];
} catch (\Throwable $e) {
return $this->response->json([
'code' => 400,
'message' => 'refresh failed: ' . $e->getMessage(),
'data' => null,
])->withStatus(400);
}
}
/**
* 列出连续聚合视图及其滞后秒数
*/
#[OA\Get(
path: '/admin/materialization/aggregates',
summary: '查询连续聚合滞后情况',
description: '查询 timescaledb_information.continuous_aggregates,附加由 cagg_watermark 计算的 lag_seconds 字段',
security: [['bearerAuth' => []]],
tags: ['Admin Materialization'],
responses: [
new OA\Response(
response: 200,
description: '获取成功',
content: new OA\JsonContent(properties: [
new OA\Property(property: 'code', type: 'integer', example: 0),
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
new OA\Property(property: 'data', properties: [
new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [
new OA\Property(property: 'view_name', type: 'string'),
new OA\Property(property: 'view_schema', type: 'string'),
new OA\Property(property: 'materialization_hypertable_name', type: 'string'),
new OA\Property(property: 'lag_seconds', type: 'number', format: 'double', nullable: true),
])),
], type: 'object'),
])
),
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
]
)]
#[RequestMapping(path: "aggregates", methods: "GET")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
public function aggregates(): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$rows = Db::select(
"SELECT ca.view_name,
ca.view_schema,
ca.materialization_hypertable_name,
EXTRACT(EPOCH FROM (now() - to_timestamp(_timescaledb_functions.cagg_watermark(cagg.mat_hypertable_id)::float8 / 1000000))) AS lag_seconds
FROM timescaledb_information.continuous_aggregates ca
JOIN _timescaledb_catalog.continuous_agg cagg
ON cagg.user_view_name = ca.view_name
AND cagg.user_view_schema = ca.view_schema
ORDER BY ca.view_name"
);
return [
'code' => 0,
'message' => '获取成功',
'data' => ['items' => $rows],
];
}
/**
* 列出 TimescaleDB Crontab 任务及其最近执行状态
*/
#[OA\Get(
path: '/admin/materialization/jobs',
summary: '查询 Crontab 任务状态',
description: '查询 timescaledb_information.jobs LEFT JOIN job_stats,仅 policy_refresh_continuous_aggregate 类型',
security: [['bearerAuth' => []]],
tags: ['Admin Materialization'],
responses: [
new OA\Response(
response: 200,
description: '获取成功',
content: new OA\JsonContent(properties: [
new OA\Property(property: 'code', type: 'integer', example: 0),
new OA\Property(property: 'message', type: 'string', example: '获取成功'),
new OA\Property(property: 'data', properties: [
new OA\Property(property: 'items', type: 'array', items: new OA\Items(properties: [
new OA\Property(property: 'job_id', type: 'integer'),
new OA\Property(property: 'application_name', type: 'string'),
new OA\Property(property: 'proc_name', type: 'string'),
new OA\Property(property: 'hypertable_name', type: 'string', nullable: true),
new OA\Property(property: 'schedule_interval', type: 'string'),
new OA\Property(property: 'next_start', type: 'string', format: 'date-time', nullable: true),
new OA\Property(property: 'last_successful_finish', type: 'string', format: 'date-time', nullable: true),
new OA\Property(property: 'last_run_status', type: 'string', nullable: true),
])),
], type: 'object'),
])
),
new OA\Response(response: 401, description: '未认证', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
new OA\Response(response: 403, description: '无权限', content: new OA\JsonContent(ref: '#/components/schemas/ErrorResponse')),
]
)]
#[RequestMapping(path: "jobs", methods: "GET")]
#[Middleware(AuthMiddleware::class)]
#[Middleware(PermissionMiddleware::class)]
public function jobs(): ResponseInterface|array
{
if ($forbidden = $this->requireAdmin()) return $forbidden;
$rows = Db::select(
"SELECT j.job_id,
j.application_name,
j.proc_name,
j.hypertable_name,
j.schedule_interval,
j.next_start,
s.last_successful_finish,
s.last_run_status
FROM timescaledb_information.jobs j
LEFT JOIN timescaledb_information.job_stats s ON s.job_id = j.job_id
WHERE j.proc_name = 'policy_refresh_continuous_aggregate'
ORDER BY j.job_id"
);
return [
'code' => 0,
'message' => '获取成功',
'data' => ['items' => $rows],
];
}
private function requireAdmin(): ?ResponseInterface
{
$user = $this->getAuthUser();
if (!$user || !$user->isAdministrator()) {
return $this->response->json([
'code' => 403,
'message' => '仅管理员可访问',
])->withStatus(403);
}
return null;
}
}
@@ -0,0 +1,31 @@
<?php
declare(strict_types=1);
namespace App\Model;
/**
* @property string $refresh_date 格式 Y-m-d
* @property string $aggregate_view 视图名(如 orders_daily_by_created
* @property \Carbon\Carbon $created_at 入队时间
*/
class AggregateRefreshQueue extends Model
{
protected ?string $table = 'aggregate_refresh_queue';
public bool $timestamps = false;
public bool $incrementing = false;
protected string $primaryKey = 'refresh_date';
protected array $fillable = [
'refresh_date',
'aggregate_view',
'created_at',
];
protected array $casts = [
'created_at' => 'datetime',
];
}
+51 -1
View File
@@ -5,8 +5,10 @@ declare(strict_types=1);
namespace App\Platform;
use App\Entity\Parse\EntityParseFactory;
use App\Model\AggregateRefreshQueue;
use App\Platform\Traits\FailedMessageTrait;
use App\Utils\Log;
use Carbon\Carbon;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Builder\QueueBuilder;
use Hyperf\Amqp\Message\ConsumerMessage;
@@ -152,9 +154,10 @@ class OrderConsumer extends ConsumerMessage
// 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem
$this->processOrderItems($items);
// 5. 识别 ≥ 3 天前的 created_date 入队,补刷自动策略未覆盖的窗口
$this->enqueueAffectedDates($orders_data);
Db::commit();
// @TODO 触发事件通知,更新自动聚合任务
// 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。
return Result::ACK;
@@ -379,4 +382,51 @@ class OrderConsumer extends ConsumerMessage
}
}
/**
* 识别 payload 中 ≥ 3 天前的 created_date,入队 orders_daily_by_created 兜底刷新。
*
* 自动刷新策略仅覆盖最近 3 天窗口;3 天前的订单变更(补录、追溯调整)需由
* aggregate_refresh_queue + Crontab 任务补刷。仅服务 by_created 视图,
* by_paid 由全量 REFRESH 覆盖,不入此队列。
*
* @param array $payloads 来自 entityMap()->all() 的订单数组,每条含 created_date 字段
*/
protected function enqueueAffectedDates(array $payloads): void
{
if (empty($payloads)) {
return;
}
$threshold = Carbon::now()->subDays(3)->toDateString();
$unique_dates = [];
foreach ($payloads as $payload) {
$created = $payload['created_date'] ?? null;
if ($created === null) {
continue;
}
// entityMap 输出 'Y-m-d H:i:sP';用 Carbon::parse 兼容多种格式
$date = Carbon::parse($created)->toDateString();
// 严格小于阈值才入队(≥ 阈值的部分由自动刷新策略覆盖)
if ($date < $threshold) {
$unique_dates[$date] = true;
}
}
if (empty($unique_dates)) {
return;
}
$now = Carbon::now();
foreach (array_keys($unique_dates) as $date) {
AggregateRefreshQueue::query()->insertOrIgnore([
'refresh_date' => $date,
'aggregate_view' => 'orders_daily_by_created',
'created_at' => $now,
]);
}
}
}
@@ -0,0 +1,21 @@
<?php
declare(strict_types=1);
namespace App\Service;
/**
* 聚合视图刷新接口
*
* 隔离 PG procedure 调用,便于在单测中注入 mock 实现。
*/
interface AggregateRefresherInterface
{
/**
* 对指定聚合视图的指定日期刷新
*
* @param string $view 聚合视图名(如 'orders_daily_by_created'
* @param string $refresh_date Y-m-d 格式日期
*/
public function refresh(string $view, string $refresh_date): void;
}
@@ -0,0 +1,31 @@
<?php
declare(strict_types=1);
namespace App\Service;
use Hyperf\DbConnection\Db;
/**
* TimescaleDB 连续聚合刷新实现
*
* 调用 PG 的 refresh_continuous_aggregate 存储过程,按整日窗口刷新。
*/
class ContinuousAggregateRefresher implements AggregateRefresherInterface
{
public function refresh(string $view, string $refresh_date): void
{
$start = sprintf("'%s 00:00:00+00'::timestamptz", $refresh_date);
$end = sprintf("'%s 23:59:59.999999+00'::timestamptz", $refresh_date);
// view 在 P23.2 中硬编码为 'orders_daily_by_created',但仍走 PDO quote 防御
$quoted_view = Db::getPdo()->quote($view);
Db::statement(sprintf(
'CALL refresh_continuous_aggregate(%s, %s, %s)',
$quoted_view,
$start,
$end
));
}
}
@@ -0,0 +1,76 @@
<?php
declare(strict_types=1);
namespace App\Service;
use App\Model\AggregateRefreshQueue;
use App\Utils\Log;
use Carbon\Carbon;
use Throwable;
/**
* 订单聚合刷新任务
*
* 消费 aggregate_refresh_queue 中 created_at < now() - 1 hour 的项,
* 逐条调用 AggregateRefresherInterface 刷新对应日期的连续聚合,
* 成功后从队列中删除;失败保留队列项由下次任务重试。
*
* 由 OrderAggregatesRefreshCommandCLI 入口)和 Crontab(定时入口)共同调用。
*/
class OrderAggregatesRefreshJob
{
public function __construct(private AggregateRefresherInterface $refresher)
{
}
/**
* @return array{processed: int, failed: int} 处理与失败计数
*/
public function run(): array
{
$items = AggregateRefreshQueue::query()
->where('created_at', '<', Carbon::now()->subHour())
->orderBy('refresh_date')
->orderBy('aggregate_view')
->get();
if ($items->isEmpty()) {
Log::get()->info('orders:refresh-aggregates queue empty');
return ['processed' => 0, 'failed' => 0];
}
$processed = 0;
$failed = 0;
foreach ($items as $item) {
$view = $item->aggregate_view;
$date = $item->refresh_date;
try {
$this->refresher->refresh($view, $date);
AggregateRefreshQueue::query()
->where('refresh_date', $date)
->where('aggregate_view', $view)
->delete();
$processed++;
} catch (Throwable $e) {
Log::get()->error('orders:refresh-aggregates failed', [
'view' => $view,
'date' => $date,
'error' => $e->getMessage(),
]);
$failed++;
}
}
Log::get()->info('orders:refresh-aggregates done', [
'processed' => $processed,
'failed' => $failed,
]);
return ['processed' => $processed, 'failed' => $failed];
}
}
+1
View File
@@ -20,6 +20,7 @@
"hyperf/command": "~3.1.0",
"hyperf/config": "~3.1.0",
"hyperf/constants": "~3.1.0",
"hyperf/crontab": "~3.1.0",
"hyperf/database-pgsql": "^3.1",
"hyperf/db-connection": "~3.1.0",
"hyperf/engine": "^2.10",
+19
View File
@@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
use App\Service\OrderAggregatesRefreshJob;
use Hyperf\Crontab\Crontab;
return [
'enable' => true,
'crontab' => [
// 每天 02:07 处理 aggregate_refresh_queue 中的滞后聚合刷新。
// 02 时段为最低流量段;分钟取 :07 避开整点全互联网定时任务集中触发。
(new Crontab())
->setName('OrderAggregatesRefresh')
->setRule('7 2 * * *')
->setCallback([OrderAggregatesRefreshJob::class, 'run'])
->setMemo('每天 02:07 处理 aggregate_refresh_queue 中的滞后聚合刷新'),
],
];
+1
View File
@@ -11,4 +11,5 @@ declare(strict_types=1);
*/
return [
// 可以在这里配置接口到实现的绑定
App\Service\AggregateRefresherInterface::class => App\Service\ContinuousAggregateRefresher::class,
];
+1
View File
@@ -10,4 +10,5 @@ declare(strict_types=1);
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
return [
Hyperf\Crontab\Process\CrontabDispatcherProcess::class,
];
@@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
use Hyperf\DbConnection\Db;
use Hyperf\Database\Migrations\Migration;
return new class extends Migration
{
public function up(): void
{
Db::statement('CREATE EXTENSION IF NOT EXISTS timescaledb');
}
public function down(): void
{
// 不主动 DROP EXTENSIONexisting hypertables 依赖该扩展,drop 会破坏数据。
// 完全清空数据库时手动执行 `DROP EXTENSION timescaledb CASCADE`。
}
};
@@ -0,0 +1,52 @@
<?php
declare(strict_types=1);
use Hyperf\DbConnection\Db;
use Hyperf\Database\Migrations\Migration;
return new class extends Migration
{
public function up(): void
{
// 连续聚合视图:按订单创建日期日聚合(含未付订单)。
// WITH NO DATA:视图创建时不立即物化,由 P22.3 的回填命令一次性填充历史数据。
Db::statement(<<<'SQL'
CREATE MATERIALIZED VIEW orders_daily_by_created
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', created_date) AS day,
company_id,
platform_id,
store_id,
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE paid_date IS NOT NULL) AS paid_orders,
COUNT(*) FILTER (WHERE paid_date IS NULL) AS unpaid_orders,
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
SUM(total_received) AS sum_total_received,
AVG(total_amount) AS avg_total_amount,
AVG(total_paid) FILTER (WHERE paid_date IS NOT NULL) AS avg_paid_amount,
SUM(freight_fee) AS sum_freight_fee,
SUM(tax_fee) AS sum_tax_fee,
SUM(commission_fee) AS sum_commission_fee,
SUM(discount_fee) AS sum_discount_fee
FROM orders
GROUP BY day, company_id, platform_id, store_id
WITH NO DATA;
SQL);
// 复合索引:覆盖"最近 N 日按 X 维度"与"指定 X 全历史时间序列"两类高频查询。
Db::statement('CREATE INDEX IF NOT EXISTS idx_orders_daily_by_created_day_company ON orders_daily_by_created (day DESC, company_id)');
Db::statement('CREATE INDEX IF NOT EXISTS idx_orders_daily_by_created_day_platform ON orders_daily_by_created (day DESC, platform_id)');
Db::statement('CREATE INDEX IF NOT EXISTS idx_orders_daily_by_created_day_store ON orders_daily_by_created (day DESC, store_id)');
Db::statement('CREATE INDEX IF NOT EXISTS idx_orders_daily_by_created_company_day ON orders_daily_by_created (company_id, day DESC)');
Db::statement('CREATE INDEX IF NOT EXISTS idx_orders_daily_by_created_store_day ON orders_daily_by_created (store_id, day DESC)');
}
public function down(): void
{
// CASCADE 一并 drop 索引以及后续阶段附加的刷新策略。
Db::statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_created CASCADE');
}
};
@@ -0,0 +1,69 @@
<?php
declare(strict_types=1);
use Hyperf\DbConnection\Db;
use Hyperf\Database\Migrations\Migration;
return new class extends Migration
{
public function up(): void
{
// PG 原生物化视图:按付款日聚合(仅已付订单)。
// 不带 WITH (timescaledb.continuous)TimescaleDB 连续聚合的 time_bucket
// 必须基于 hypertable 主时间列(created_date),无法对 paid_date 分桶。
// WITH NO DATA:避免 migrate 时长时间锁;首次填充由 P22.3 回填命令执行。
Db::statement(<<<'SQL'
CREATE MATERIALIZED VIEW orders_daily_by_paid AS
SELECT
paid_date::date AS day,
company_id,
platform_id,
store_id,
COUNT(*) AS paid_orders,
SUM(total_amount) AS sum_total_amount,
SUM(total_paid) AS sum_total_paid,
SUM(total_received) AS sum_total_received,
AVG(total_paid) AS avg_paid_amount,
SUM(freight_fee) AS sum_freight_fee,
SUM(tax_fee) AS sum_tax_fee,
SUM(commission_fee) AS sum_commission_fee,
SUM(discount_fee) AS sum_discount_fee
FROM orders
WHERE paid_date IS NOT NULL
GROUP BY day, company_id, platform_id, store_id
WITH NO DATA;
SQL);
// UNIQUE 索引:REFRESH MATERIALIZED VIEW CONCURRENTLY 的前置条件,缺失会直接报错。
Db::statement('CREATE UNIQUE INDEX idx_orders_daily_by_paid_unique ON orders_daily_by_paid (day, company_id, platform_id, store_id)');
// 5 复合索引:覆盖"最近 N 日按 X 维度"与"指定 X 全历史时间序列"两类高频查询。
Db::statement('CREATE INDEX idx_orders_daily_by_paid_day_company ON orders_daily_by_paid (day DESC, company_id)');
Db::statement('CREATE INDEX idx_orders_daily_by_paid_day_platform ON orders_daily_by_paid (day DESC, platform_id)');
Db::statement('CREATE INDEX idx_orders_daily_by_paid_day_store ON orders_daily_by_paid (day DESC, store_id)');
Db::statement('CREATE INDEX idx_orders_daily_by_paid_company_day ON orders_daily_by_paid (company_id, day DESC)');
Db::statement('CREATE INDEX idx_orders_daily_by_paid_store_day ON orders_daily_by_paid (store_id, day DESC)');
// by_created 连续聚合自动刷新策略:每小时刷新近 3 天水位线,1 小时写入缓冲避免与热数据冲突。
// 3 天前的修改由 Stage 23 aggregate_refresh_queue 兜底。
Db::statement(<<<'SQL'
SELECT add_continuous_aggregate_policy(
'orders_daily_by_created',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
SQL);
}
public function down(): void
{
// 先 remove policy,再 drop 视图 CASCADE。
// if_exists => true 处理 by_created 视图已被 P22.1 down CASCADE 间接带走的情况。
Db::statement("SELECT remove_continuous_aggregate_policy('orders_daily_by_created', if_exists => true)");
// CASCADE 一并 drop UNIQUE 索引和 5 复合索引。
Db::statement('DROP MATERIALIZED VIEW IF EXISTS orders_daily_by_paid CASCADE');
}
};
@@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
use Hyperf\Database\Schema\Schema;
use Hyperf\Database\Schema\Blueprint;
use Hyperf\Database\Migrations\Migration;
use Hyperf\DbConnection\Db;
return new class extends Migration
{
public function up(): void
{
Schema::create('aggregate_refresh_queue', function (Blueprint $table) {
$table->date('refresh_date')->comment('待刷新的聚合日');
$table->string('aggregate_view', 100)->comment('视图名(orders_daily_by_created / orders_daily_by_paid');
$table->timestampTz('created_at')->useCurrent()->comment('入队时间');
$table->primary(['refresh_date', 'aggregate_view'], 'pk_aggregate_refresh_queue');
$table->index('created_at', 'idx_aggregate_refresh_queue_created_at');
});
// Hyperf PostgresGrammar::compilePrimary 丢弃自定义 index 名(vendor/hyperf/database-pgsql/.../PostgresGrammar.php:230),
// PG 默认用 {table}_pkey;这里显式重命名以满足计划验收要求(名称 pk_aggregate_refresh_queue
Db::statement('ALTER TABLE aggregate_refresh_queue RENAME CONSTRAINT aggregate_refresh_queue_pkey TO pk_aggregate_refresh_queue');
}
public function down(): void
{
Schema::dropIfExists('aggregate_refresh_queue');
}
};
@@ -0,0 +1,161 @@
<?php
declare(strict_types=1);
namespace HyperfTest\Cases\Integration\Materialization;
use App\Model\AggregateRefreshQueue;
use App\Model\Role;
use App\Model\User;
use Carbon\Carbon;
use HyperfTest\TestCase;
use Qbhy\HyperfAuth\AuthManager;
use function Hyperf\Support\make;
/**
* AdminMaterializationController 集成测试
*
* @internal
* @coversNothing
*/
class AdminMaterializationControllerTest extends TestCase
{
protected function fetchAdminRole(): Role
{
return Role::query()->where('name', 'administrator')->firstOrFail();
}
protected function getAdminAuthToken(): string
{
$admin_role = $this->fetchAdminRole();
$user = User::query()
->where('status', 1)
->where('role_id', $admin_role->id)
->first();
if (!$user) {
$this->markTestSkipped('没有可用的 administrator 用户,无法测试');
}
$auth = make(AuthManager::class);
return $auth->guard('jwt')->login($user);
}
protected function adminHeaders(): array
{
return ['Authorization' => 'Bearer ' . $this->getAdminAuthToken()];
}
protected function getNonAdminToken(): array
{
$suffix = 'mat_nonadmin_' . uniqid();
$user = User::query()->create([
'username' => $suffix,
'password' => 'Pass_' . $suffix,
'email' => $suffix . '@example.com',
'status' => 1,
'api_key_enabled' => true,
]);
$auth = make(AuthManager::class);
$token = $auth->guard('jwt')->login($user);
return ['Authorization' => 'Bearer ' . $token];
}
public function test_queue_lists_pending(): void
{
$date = '2030-12-31';
$view = 'orders_daily_by_created';
AggregateRefreshQueue::query()->insertOrIgnore([[
'refresh_date' => $date,
'aggregate_view' => $view,
'created_at' => Carbon::now(),
]]);
try {
$response = $this->get(
'/api/v1/admin/materialization/queue',
['view' => $view, 'from' => $date, 'to' => $date],
$this->adminHeaders()
);
$response->assertStatus(200);
$response->assertJsonPath('code', 0);
$body = json_decode($response->getBody()->getContents(), true);
$this->assertArrayHasKey('items', $body['data']);
$this->assertArrayHasKey('total', $body['data']);
$this->assertArrayHasKey('page', $body['data']);
$this->assertArrayHasKey('per_page', $body['data']);
$this->assertGreaterThanOrEqual(1, $body['data']['total']);
$found = false;
foreach ($body['data']['items'] as $item) {
if ($item['refresh_date'] === $date && $item['aggregate_view'] === $view) {
$found = true;
break;
}
}
$this->assertTrue($found, '应在 queue 列表中找到刚插入的 fixture 行');
} finally {
AggregateRefreshQueue::query()
->where('refresh_date', $date)
->where('aggregate_view', $view)
->delete();
}
}
public function test_refresh_validates_view_whitelist(): void
{
$response = $this->post(
'/api/v1/admin/materialization/refresh',
[
'view' => 'evil_view',
'from' => '2026-01-01 00:00:00+00',
'to' => '2026-01-02 00:00:00+00',
],
$this->adminHeaders()
);
$response->assertStatus(400);
$body = json_decode($response->getBody()->getContents(), true);
$this->assertSame(400, $body['code']);
$this->assertStringContainsString('view', $body['message']);
}
public function test_aggregates_returns_lag_seconds(): void
{
$response = $this->get('/api/v1/admin/materialization/aggregates', [], $this->adminHeaders());
$response->assertStatus(200);
$response->assertJsonPath('code', 0);
$body = json_decode($response->getBody()->getContents(), true);
$this->assertArrayHasKey('items', $body['data']);
$items = $body['data']['items'];
$this->assertNotEmpty($items, 'aggregates 应至少返回一条连续聚合记录(orders_daily_by_created');
$this->assertArrayHasKey('view_name', $items[0]);
$this->assertArrayHasKey('lag_seconds', $items[0]);
}
public function test_jobs_lists_refresh_policy(): void
{
$response = $this->get('/api/v1/admin/materialization/jobs', [], $this->adminHeaders());
$response->assertStatus(200);
$response->assertJsonPath('code', 0);
$body = json_decode($response->getBody()->getContents(), true);
$this->assertArrayHasKey('items', $body['data']);
// by_created 注册了 1 条 policy_refresh_continuous_aggregateby_paid 由 Hyperf Crontab 调度,不入此表
$this->assertGreaterThanOrEqual(1, count($body['data']['items']));
$this->assertSame('policy_refresh_continuous_aggregate', $body['data']['items'][0]['proc_name']);
}
public function test_non_admin_blocked(): void
{
$headers = $this->getNonAdminToken();
$response = $this->get('/api/v1/admin/materialization/queue', [], $headers);
$response->assertStatus(403);
}
}
@@ -0,0 +1,94 @@
<?php
declare(strict_types=1);
namespace HyperfTest\Cases\Integration\Materialization;
use Hyperf\DbConnection\Db;
use PHPUnit\Framework\TestCase;
/**
* P22.3 物化对象集成测试:跨两类物化对象 + 跨命名空间索引的存在性断言。
*
* 与 P22.2 烟雾测试 `System/MaterializedViewSmokeTest` 各自关注点不同:
* - 烟雾测试:单点测试,专测 by_paid 视图与索引
* - 本测试:跨两类对象(连续聚合 + PG 物化视图)+ 跨命名空间索引(pg_indexes 不区分视图类型)
*
* @internal
* @coversNothing
*/
class MaterializationObjectsTest extends TestCase
{
/**
* @template T
* @param callable(): T $callback
* @return T
*/
protected function runInCoroutine(callable $callback): mixed
{
if (\Swoole\Coroutine::getCid() > 0) {
return $callback();
}
$result = null;
$exception = null;
\Swoole\Coroutine\run(static function () use ($callback, &$result, &$exception): void {
try {
$result = $callback();
} catch (\Throwable $e) {
$exception = $e;
}
});
if ($exception !== null) {
throw $exception;
}
return $result;
}
public function test_continuous_aggregate_exists(): void
{
$rows = $this->runInCoroutine(static fn () => Db::select(
"SELECT view_name FROM timescaledb_information.continuous_aggregates
WHERE view_name = 'orders_daily_by_created'"
));
$this->assertCount(1, $rows);
}
public function test_pg_materialized_view_exists(): void
{
$rows = $this->runInCoroutine(static fn () => Db::select(
"SELECT matviewname FROM pg_matviews
WHERE matviewname = 'orders_daily_by_paid'"
));
$this->assertCount(1, $rows);
}
public function test_refresh_policy_registered(): void
{
// 显式过滤 hypertable_name,避免未来追加其他连续聚合策略时此断言无关地失败。
$rows = $this->runInCoroutine(static fn () => Db::select(
"SELECT hypertable_name FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
AND hypertable_name = 'orders_daily_by_created'"
));
$this->assertCount(1, $rows);
}
public function test_by_created_indexes_present(): void
{
// P22.1 创建 5 复合索引;chunk 索引名是 `_hyper_*` 不会被 LIKE 'idx_*' 匹配。
$rows = $this->runInCoroutine(static fn () => Db::select(
"SELECT indexname FROM pg_indexes WHERE indexname LIKE 'idx_orders_daily_by_created%'"
));
$this->assertCount(5, $rows);
}
public function test_by_paid_indexes_present(): void
{
// P22.2 创建 1 UNIQUE + 5 复合 = 6 索引。
$rows = $this->runInCoroutine(static fn () => Db::select(
"SELECT indexname FROM pg_indexes WHERE indexname LIKE 'idx_orders_daily_by_paid%'"
));
$this->assertCount(6, $rows);
}
}
@@ -0,0 +1,69 @@
<?php
declare(strict_types=1);
namespace HyperfTest\Cases\Integration\System;
use Hyperf\DbConnection\Db;
use PHPUnit\Framework\TestCase;
/**
* P22.2 物化层 schema 烟雾测试:断言迁移已应用、索引齐备、刷新策略已注册。
*
* @internal
* @coversNothing
*/
class MaterializedViewSmokeTest extends TestCase
{
/**
* @template T
* @param callable(): T $callback
* @return T
*/
protected function runInCoroutine(callable $callback): mixed
{
if (\Swoole\Coroutine::getCid() > 0) {
return $callback();
}
$result = null;
$exception = null;
\Swoole\Coroutine\run(static function () use ($callback, &$result, &$exception): void {
try {
$result = $callback();
} catch (\Throwable $e) {
$exception = $e;
}
});
if ($exception !== null) {
throw $exception;
}
return $result;
}
public function test_orders_daily_by_paid_matview_exists(): void
{
$rows = $this->runInCoroutine(static fn () => Db::select(
"SELECT matviewname FROM pg_matviews WHERE matviewname = 'orders_daily_by_paid'"
));
$this->assertCount(1, $rows);
}
public function test_orders_daily_by_paid_has_six_indexes(): void
{
$rows = $this->runInCoroutine(static fn () => Db::select(
"SELECT indexname FROM pg_indexes WHERE tablename = 'orders_daily_by_paid'"
));
$this->assertCount(6, $rows, '应有 1 UNIQUE + 5 复合 = 6 个索引');
}
public function test_orders_daily_by_created_refresh_policy_registered(): void
{
$rows = $this->runInCoroutine(static fn () => Db::select(
"SELECT job_id FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
AND hypertable_name = 'orders_daily_by_created'"
));
$this->assertCount(1, $rows);
}
}
@@ -0,0 +1,89 @@
<?php
declare(strict_types=1);
namespace HyperfTest\Cases\Unit\Model;
use App\Model\AggregateRefreshQueue;
use Carbon\Carbon;
use PHPUnit\Framework\TestCase;
/**
* @internal
* @coversNothing
*/
class AggregateRefreshQueueTest extends TestCase
{
protected function runInCoroutine(callable $callback): void
{
if (\Swoole\Coroutine::getCid() > 0) {
$callback();
return;
}
$exception = null;
\Swoole\Coroutine\run(static function () use ($callback, &$exception): void {
try {
$callback();
} catch (\Throwable $e) {
$exception = $e;
}
});
if ($exception) {
throw $exception;
}
}
private function uniqueRefreshDate(): string
{
// 与 ApiKeyTest 的 bin2hex(random_bytes(4)) 同源策略:用唯一性而非全表清理来隔离用例
return sprintf('20%02d-%02d-%02d', random_int(50, 99), random_int(1, 12), random_int(1, 28));
}
public function test_attributes_persist_correctly(): void
{
$this->runInCoroutine(function (): void {
$unique_date = $this->uniqueRefreshDate();
try {
AggregateRefreshQueue::query()->create([
'refresh_date' => $unique_date,
'aggregate_view' => 'orders_daily_by_created',
'created_at' => Carbon::now(),
]);
$row = AggregateRefreshQueue::query()->where('refresh_date', $unique_date)->first();
$this->assertNotNull($row);
$this->assertSame($unique_date, $row->refresh_date);
$this->assertSame('orders_daily_by_created', $row->aggregate_view);
$this->assertInstanceOf(Carbon::class, $row->created_at);
} finally {
AggregateRefreshQueue::query()->where('refresh_date', $unique_date)->delete();
}
});
}
public function test_insert_or_ignore_dedups_on_composite_pk(): void
{
$this->runInCoroutine(function (): void {
$unique_date = $this->uniqueRefreshDate();
$payload = [
'refresh_date' => $unique_date,
'aggregate_view' => 'orders_daily_by_created',
'created_at' => Carbon::now(),
];
try {
AggregateRefreshQueue::query()->insertOrIgnore($payload);
AggregateRefreshQueue::query()->insertOrIgnore($payload);
$count = AggregateRefreshQueue::query()->where('refresh_date', $unique_date)->count();
$this->assertSame(1, $count);
} finally {
AggregateRefreshQueue::query()->where('refresh_date', $unique_date)->delete();
}
});
}
}
@@ -0,0 +1,164 @@
<?php
declare(strict_types=1);
namespace HyperfTest\Cases\Unit\Platform;
use App\Model\AggregateRefreshQueue;
use App\Platform\OrderConsumer;
use Carbon\Carbon;
use PHPUnit\Framework\TestCase;
use ReflectionMethod;
/**
* @internal
* @coversNothing
*/
class OrderConsumerEnqueueTest extends TestCase
{
protected function runInCoroutine(callable $callback): void
{
if (\Swoole\Coroutine::getCid() > 0) {
$callback();
return;
}
$exception = null;
\Swoole\Coroutine\run(static function () use ($callback, &$exception): void {
try {
$callback();
} catch (\Throwable $e) {
$exception = $e;
}
});
if ($exception) {
throw $exception;
}
}
/**
* 调用 protected enqueueAffectedDates 方法
*/
protected function invokeEnqueue(array $payloads): void
{
$consumer = new OrderConsumer();
$method = new ReflectionMethod($consumer, 'enqueueAffectedDates');
$method->setAccessible(true);
$method->invoke($consumer, $payloads);
}
/**
* 清理指定日期的队列条目(前置 + 后置双保险)
*/
protected function cleanupQueue(string $date): void
{
AggregateRefreshQueue::query()
->where('refresh_date', $date)
->where('aggregate_view', 'orders_daily_by_created')
->delete();
}
/**
* 当日订单不应入队(在自动刷新策略覆盖窗口内)
*/
public function test_today_payload_does_not_enqueue(): void
{
$this->runInCoroutine(function (): void {
$today = Carbon::now()->toDateString();
$payloads = [['created_date' => Carbon::now()->format('Y-m-d H:i:sP')]];
$this->cleanupQueue($today);
try {
$this->invokeEnqueue($payloads);
$row = AggregateRefreshQueue::query()
->where('refresh_date', $today)
->where('aggregate_view', 'orders_daily_by_created')
->first();
$this->assertNull($row);
} finally {
$this->cleanupQueue($today);
}
});
}
/**
* 2 天前订单不应入队(仍在自动刷新策略 [now-3d, now-1h] 窗口内)
*/
public function test_two_days_ago_payload_does_not_enqueue(): void
{
$this->runInCoroutine(function (): void {
$two_days_ago = Carbon::now()->subDays(2)->toDateString();
$payloads = [['created_date' => Carbon::now()->subDays(2)->format('Y-m-d H:i:sP')]];
$this->cleanupQueue($two_days_ago);
try {
$this->invokeEnqueue($payloads);
$row = AggregateRefreshQueue::query()
->where('refresh_date', $two_days_ago)
->where('aggregate_view', 'orders_daily_by_created')
->first();
$this->assertNull($row);
} finally {
$this->cleanupQueue($two_days_ago);
}
});
}
/**
* 30 天前订单应入队 by_created(自动策略未覆盖,需补刷)
*/
public function test_old_payload_enqueues_by_created(): void
{
$this->runInCoroutine(function (): void {
$old_date = Carbon::now()->subDays(30)->toDateString();
$payloads = [['created_date' => Carbon::now()->subDays(30)->format('Y-m-d H:i:sP')]];
$this->cleanupQueue($old_date);
try {
$this->invokeEnqueue($payloads);
$row = AggregateRefreshQueue::query()
->where('refresh_date', $old_date)
->where('aggregate_view', 'orders_daily_by_created')
->first();
$this->assertNotNull($row);
$this->assertSame($old_date, $row->refresh_date);
$this->assertSame('orders_daily_by_created', $row->aggregate_view);
} finally {
$this->cleanupQueue($old_date);
}
});
}
/**
* 同日期重复入队由复合主键 + insertOrIgnore 防重
*/
public function test_dedup_via_insert_or_ignore(): void
{
$this->runInCoroutine(function (): void {
$old_date = Carbon::now()->subDays(45)->toDateString();
$payloads = [
['created_date' => Carbon::now()->subDays(45)->format('Y-m-d H:i:sP')],
['created_date' => Carbon::now()->subDays(45)->format('Y-m-d H:i:sP')],
];
$this->cleanupQueue($old_date);
try {
// 单批次内同日期 → $unique_dates map 去重
$this->invokeEnqueue($payloads);
// 跨批次同日期 → 复合主键 insertOrIgnore 防重
$this->invokeEnqueue($payloads);
$count = AggregateRefreshQueue::query()
->where('refresh_date', $old_date)
->where('aggregate_view', 'orders_daily_by_created')
->count();
$this->assertSame(1, $count);
} finally {
$this->cleanupQueue($old_date);
}
});
}
}
@@ -0,0 +1,178 @@
<?php
declare(strict_types=1);
namespace HyperfTest\Cases\Unit\Service;
use App\Model\AggregateRefreshQueue;
use App\Service\AggregateRefresherInterface;
use App\Service\OrderAggregatesRefreshJob;
use Carbon\Carbon;
use PHPUnit\Framework\TestCase;
use RuntimeException;
/**
* 测试用 stub Refresher,记录每次 refresh 调用
*/
class RecordingRefresher implements AggregateRefresherInterface
{
/** @var array<int, array{view: string, date: string}> */
public array $calls = [];
public function refresh(string $view, string $refresh_date): void
{
$this->calls[] = ['view' => $view, 'date' => $refresh_date];
}
}
/**
* 测试用 stub Refresher,每次调用抛异常(模拟刷新失败)
*/
class ThrowingRefresher implements AggregateRefresherInterface
{
public function refresh(string $view, string $refresh_date): void
{
throw new RuntimeException('refresh failed');
}
}
/**
* @internal
* @coversNothing
*/
class OrderAggregatesRefreshJobTest extends TestCase
{
protected function runInCoroutine(callable $callback): void
{
if (\Swoole\Coroutine::getCid() > 0) {
$callback();
return;
}
$exception = null;
\Swoole\Coroutine\run(static function () use ($callback, &$exception): void {
try {
$callback();
} catch (\Throwable $e) {
$exception = $e;
}
});
if ($exception) {
throw $exception;
}
}
/**
* 沿用 P23.1 AggregateRefreshQueueTest 的随机未来日期隔离策略,
* 确保不同测试用例间无冲突,也不会污染生产数据。
*/
private function uniqueRefreshDate(): string
{
return sprintf('20%02d-%02d-%02d', random_int(50, 99), random_int(1, 12), random_int(1, 28));
}
/**
* 队列项 created_at < now()-1h 应被处理:调用 refresher 后从队列删除
*/
public function test_processes_old_items_and_clears_queue(): void
{
$this->runInCoroutine(function (): void {
$date_a = $this->uniqueRefreshDate();
$date_b = $this->uniqueRefreshDate();
while ($date_b === $date_a) {
$date_b = $this->uniqueRefreshDate();
}
// 前置清理(防 random 撞历史残留)
AggregateRefreshQueue::query()->whereIn('refresh_date', [$date_a, $date_b])->delete();
try {
$two_hours_ago = Carbon::now()->subHours(2);
AggregateRefreshQueue::query()->insertOrIgnore([
['refresh_date' => $date_a, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => $two_hours_ago],
['refresh_date' => $date_b, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => $two_hours_ago],
]);
$refresher = new RecordingRefresher();
$job = new OrderAggregatesRefreshJob($refresher);
$result = $job->run();
$this->assertSame(2, $result['processed']);
$this->assertSame(0, $result['failed']);
$this->assertCount(2, $refresher->calls);
// 队列已清空(针对本次测试的两条)
$remaining = AggregateRefreshQueue::query()
->whereIn('refresh_date', [$date_a, $date_b])
->count();
$this->assertSame(0, $remaining);
} finally {
AggregateRefreshQueue::query()->whereIn('refresh_date', [$date_a, $date_b])->delete();
}
});
}
/**
* 队列项 created_at >= now()-1h 应被跳过:refresher 无调用,队列保留
*/
public function test_recently_enqueued_items_are_skipped(): void
{
$this->runInCoroutine(function (): void {
$date = $this->uniqueRefreshDate();
AggregateRefreshQueue::query()->where('refresh_date', $date)->delete();
try {
AggregateRefreshQueue::query()->insertOrIgnore([
'refresh_date' => $date,
'aggregate_view' => 'orders_daily_by_created',
'created_at' => Carbon::now()->subMinutes(30), // < 1 hour
]);
$refresher = new RecordingRefresher();
$job = new OrderAggregatesRefreshJob($refresher);
$result = $job->run();
$this->assertSame(0, $result['processed']);
$this->assertSame(0, $result['failed']);
$this->assertCount(0, $refresher->calls);
// 队列项保留
$exists = AggregateRefreshQueue::query()->where('refresh_date', $date)->exists();
$this->assertTrue($exists);
} finally {
AggregateRefreshQueue::query()->where('refresh_date', $date)->delete();
}
});
}
/**
* refresher 抛异常时:队列项保留(下次任务重试),失败计数 +1
*/
public function test_failure_keeps_queue_item(): void
{
$this->runInCoroutine(function (): void {
$date = $this->uniqueRefreshDate();
AggregateRefreshQueue::query()->where('refresh_date', $date)->delete();
try {
AggregateRefreshQueue::query()->insertOrIgnore([
'refresh_date' => $date,
'aggregate_view' => 'orders_daily_by_created',
'created_at' => Carbon::now()->subHours(2),
]);
$job = new OrderAggregatesRefreshJob(new ThrowingRefresher());
$result = $job->run();
$this->assertSame(0, $result['processed']);
$this->assertSame(1, $result['failed']);
// 队列项保留以待重试
$exists = AggregateRefreshQueue::query()->where('refresh_date', $date)->exists();
$this->assertTrue($exists);
} finally {
AggregateRefreshQueue::query()->where('refresh_date', $date)->delete();
}
});
}
}
+1
View File
@@ -39,6 +39,7 @@ coverage
# Vitest
__screenshots__/
node_modules/.vite/
test-results/
playwright-report/
@@ -2,11 +2,12 @@ import { describe, it, expect } from 'vitest'
import { ADMIN_ONLY_PATH_PREFIXES, isAdminOnlyPath } from '../permissions'
describe('ADMIN_ONLY_PATH_PREFIXES', () => {
it('contains all 7 admin-only paths', () => {
expect(ADMIN_ONLY_PATH_PREFIXES).toHaveLength(7)
it('contains all 8 admin-only paths', () => {
expect(ADMIN_ONLY_PATH_PREFIXES).toHaveLength(8)
expect(ADMIN_ONLY_PATH_PREFIXES).toContain('/users')
expect(ADMIN_ONLY_PATH_PREFIXES).toContain('/roles')
expect(ADMIN_ONLY_PATH_PREFIXES).toContain('/route-groups')
expect(ADMIN_ONLY_PATH_PREFIXES).toContain('/api-keys')
expect(ADMIN_ONLY_PATH_PREFIXES).toContain('/mq-status')
expect(ADMIN_ONLY_PATH_PREFIXES).toContain('/failed-messages')
expect(ADMIN_ONLY_PATH_PREFIXES).toContain('/logs/requests')
+1
View File
@@ -8,6 +8,7 @@ export const ADMIN_ONLY_PATH_PREFIXES: readonly string[] = [
'/users',
'/roles',
'/route-groups',
'/api-keys',
'/mq-status',
'/failed-messages',
'/logs/requests',
@@ -108,7 +108,8 @@ describe('AdminApiKeyPage', () => {
it('重置按钮调用 resetFilters 并 fetchAllKeys', async () => {
await mountPage()
const store = useAdminApiKeyStore()
store.filters.user_id = 5
store.filters.username = 'testuser'
store.filters.email = 'test@example.com'
vi.mocked(api.get).mockClear()
setupApi([])
@@ -119,7 +120,8 @@ describe('AdminApiKeyPage', () => {
btn?.click()
await flushPromises()
expect(store.filters.user_id).toBeUndefined()
expect(store.filters.username).toBeUndefined()
expect(store.filters.email).toBeUndefined()
expect(vi.mocked(api.get)).toHaveBeenCalled()
})
+24 -12
View File
@@ -62,12 +62,19 @@ function handleDelete(id: number) {
<!-- 筛选区 -->
<div class="flex gap-3 mb-4 flex-wrap">
<a-input-number
v-model:value="store.filters.user_id"
placeholder="用户 ID"
:min="1"
style="width: 140px"
<a-input
v-model:value="store.filters.username"
placeholder="用户"
style="width: 150px"
allow-clear
@press-enter="handleSearch"
/>
<a-input
v-model:value="store.filters.email"
placeholder="邮箱"
style="width: 180px"
allow-clear
@press-enter="handleSearch"
/>
<a-select
v-model:value="enabledFilter"
@@ -100,13 +107,18 @@ function handleDelete(id: number) {
>
<template #bodyCell="{ column, record }">
<template v-if="column.key === 'user'">
<span>{{ (record as ApiKeyRecord).user?.username ?? record.user_id }}</span>
<a-tooltip
v-if="(record as ApiKeyRecord).user?.api_key_enabled === false"
title="该用户的 API Key 功能已关闭,所有 Key 均无法认证"
>
<a-tag color="red" class="ml-1">已停用</a-tag>
</a-tooltip>
<div>
<span>{{ (record as ApiKeyRecord).user?.username ?? record.user_id }}</span>
<a-tooltip
v-if="(record as ApiKeyRecord).user?.api_key_enabled === false"
title="该用户的 API Key 功能已关闭,所有 Key 均无法认证"
>
<a-tag color="red" class="ml-1">已停用</a-tag>
</a-tooltip>
</div>
<div v-if="(record as ApiKeyRecord).user?.email" class="text-xs text-gray-400">
{{ (record as ApiKeyRecord).user!.email }}
</div>
</template>
<template v-else-if="column.key === 'prefix'">
@@ -157,6 +157,22 @@ describe('useRouteGroupStore', () => {
expect(api.get).toHaveBeenCalledWith('/api/v1/routes', undefined)
})
})
describe('syncRoutes', () => {
it('calls sync API and refreshes groups + routes', async () => {
const store = useRouteGroupStore()
vi.mocked(api.post).mockResolvedValueOnce(undefined)
vi.mocked(api.get)
.mockResolvedValueOnce(mockGroups)
.mockResolvedValueOnce(mockRoutes)
await store.syncRoutes()
expect(api.post).toHaveBeenCalledWith('/api/v1/routes/sync')
expect(api.get).toHaveBeenCalledWith('/api/v1/route-groups')
expect(api.get).toHaveBeenCalledWith('/api/v1/routes', undefined)
})
})
})
// ─── Page 组件测试 ───
@@ -177,6 +193,7 @@ describe('RouteGroupsPage', () => {
EditOutlined: { template: '<span class="icon-edit" />' },
DeleteOutlined: { template: '<span class="icon-delete" />' },
SaveOutlined: { template: '<span class="icon-save" />' },
SyncOutlined: { template: '<span class="icon-sync" />' },
},
},
})
@@ -242,6 +259,23 @@ describe('RouteGroupsPage', () => {
expect(wrapper.text()).toContain('未分组')
})
it('renders 同步路由 button and calls syncRoutes on click', async () => {
await mountPage()
const syncBtn = wrapper.findAll('.ant-btn').find((b) => b.text().includes('同步路由'))
expect(syncBtn).toBeDefined()
vi.mocked(api.post).mockResolvedValueOnce(undefined)
vi.mocked(api.get)
.mockResolvedValueOnce(mockGroups)
.mockResolvedValueOnce(mockRoutes)
await syncBtn!.trigger('click')
await flushPromises()
expect(api.post).toHaveBeenCalledWith('/api/v1/routes/sync')
})
it('calls batchAssignRoutes on save button click', async () => {
await mountPage()
+23 -3
View File
@@ -6,6 +6,7 @@ import {
EditOutlined,
DeleteOutlined,
SaveOutlined,
SyncOutlined,
} from '@ant-design/icons-vue'
const groupStore = useRouteGroupStore()
@@ -55,6 +56,7 @@ const localCheckedKeys = ref<{ checked: (string | number)[], halfChecked: (strin
halfChecked: [],
})
const assignSaving = ref(false)
const syncing = ref(false)
// 当前选中组的名称
const selectedGroupName = computed(() => {
@@ -148,6 +150,19 @@ async function handleDelete(group: RouteGroupRecord) {
}
}
async function handleSyncRoutes() {
syncing.value = true
try {
await groupStore.syncRoutes()
message.success('路由同步成功')
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : '同步失败'
message.error(msg)
} finally {
syncing.value = false
}
}
async function handleGroupChange(routeId: number, groupId: number | null) {
try {
await groupStore.assignRouteToGroup(routeId, groupId)
@@ -171,9 +186,14 @@ onMounted(async () => {
<a-col :span="8">
<a-card title="路由组" :loading="groupStore.loading">
<template #extra>
<a-button type="primary" size="small" @click="openCreateModal">
<PlusOutlined /> 新建
</a-button>
<a-space>
<a-button size="small" :loading="syncing" @click="handleSyncRoutes">
<SyncOutlined /> 同步路由
</a-button>
<a-button type="primary" size="small" @click="openCreateModal">
<PlusOutlined /> 新建
</a-button>
</a-space>
</template>
<!-- 未分组入口 -->
@@ -30,9 +30,10 @@ import { api } from '@/utils/request'
const mockMappings = {
items: [
{ id: 1, company_id: 3, platform_id: 1, store_id: null, origin_sku: '0032', origin_sku_id: 1, platform_outer_sku: 'AMZ-0032', platform_product_id: 'ITEM-001', enabled: true, note: null, created_at: '2026-04-01T00:00:00', updated_at: '2026-04-01T00:00:00' },
{ id: 1, company_id: 3, platform_id: 1, store_id: null, origin_sku: '0032', origin_sku_id: 1, platform_outer_sku: 'AMZ-0032', platform_product_id: 'ITEM-001', enabled: true, bundled: true, note: null, created_at: '2026-04-01T00:00:00', updated_at: '2026-04-01T00:00:00' },
{ id: 2, company_id: 3, platform_id: 1, store_id: null, origin_sku: '0033', origin_sku_id: 2, platform_outer_sku: 'AMZ-0033', platform_product_id: 'ITEM-002', enabled: true, bundled: false, note: null, created_at: '2026-04-01T00:00:00', updated_at: '2026-04-01T00:00:00' },
],
total: 1,
total: 2,
page: 1,
per_page: 15,
}
@@ -148,4 +149,70 @@ describe('SkuMappingsPage', () => {
expect(buttonTexts.some((t) => t.includes('搜索'))).toBe(true)
expect(buttonTexts.some((t) => t.includes('重置'))).toBe(true)
})
it('P7: renders 组合商品 column with blue/default tags per bundled', async () => {
await mountPage()
const html = wrapper.html()
expect(html).toContain('组合商品')
const rows = wrapper.findAll('tbody tr')
const bundledRow = rows.find((r) => r.text().includes('AMZ-0032'))
const plainRow = rows.find((r) => r.text().includes('AMZ-0033'))
expect(bundledRow?.html()).toMatch(/ant-tag-blue[^>]*>\s*是/)
expect(plainRow?.html()).toMatch(/ant-tag[^>]*>\s*否/)
expect(plainRow?.html()).not.toMatch(/ant-tag-blue[^>]*>\s*否/)
})
it('P8: form contains bundled Switch defaulted to off on create', async () => {
await mountPage()
const newBtn = wrapper.findAll('.ant-btn').find((b) => b.text().includes('新建连接'))
await newBtn?.trigger('click')
await flushPromises()
await nextTick()
const bodyHtml = document.body.innerHTML
expect(bodyHtml).toContain('组合商品')
const switches = document.body.querySelectorAll('.ant-switch')
const bundledSwitch = Array.from(switches).find((s) => {
const label = s.closest('.ant-form-item')?.querySelector('.ant-form-item-label')
return label?.textContent?.includes('组合商品')
})
expect(bundledSwitch).toBeTruthy()
expect(bundledSwitch?.classList.contains('ant-switch-checked')).toBe(false)
})
it('P9: origin_sku_id required validation triggers error on empty submit', async () => {
await mountPage()
const newBtn = wrapper.findAll('.ant-btn').find((b) => b.text().includes('新建连接'))
await newBtn?.trigger('click')
await flushPromises()
await nextTick()
const okBtn = Array.from(document.body.querySelectorAll('.ant-modal-footer .ant-btn')).find(
(b) => {
const t = (b as HTMLElement).textContent ?? ''
return t.includes('确 定') || t.includes('确定') || t.includes('OK')
},
) as HTMLElement | undefined
okBtn?.click()
await flushPromises()
await nextTick()
await vi.waitFor(
() => {
expect(document.body.innerHTML).toContain('请选择内部 SKU')
},
{ timeout: 8000, interval: 100 },
)
}, 15000)
it('P10: filter area exposes bundled dropdown with 是/否 options', async () => {
await mountPage()
const formItemLabels = wrapper.findAll('.filter-form .ant-form-item-label label')
const bundledLabel = formItemLabels.find((l) => l.text().includes('组合商品'))
expect(bundledLabel).toBeTruthy()
})
})
+44 -9
View File
@@ -26,6 +26,7 @@ const columns = [
{ title: '内部 SKU', dataIndex: 'origin_sku', width: 140 },
{ title: '平台 SKU', dataIndex: 'platform_outer_sku', width: 160 },
{ title: '平台商品ID', dataIndex: 'platform_product_id', width: 160, ellipsis: true },
{ title: '组合商品', key: 'bundled', width: 100 },
{ title: '状态', key: 'enabled', width: 80 },
{ title: '更新时间', key: 'updated_at', width: 170 },
{ title: '操作', key: 'action', width: 200, fixed: 'right' as const },
@@ -49,6 +50,7 @@ const defaultForm = (): SkuMappingForm => ({
generation_strategy: 'prefix',
warehouse_id: undefined,
enabled: true,
bundled: false,
note: '',
})
@@ -57,7 +59,9 @@ const form = reactive<SkuMappingForm>(defaultForm())
const rules: Record<string, Rule[]> = {
company_id: [{ required: true, message: '请选择公司', trigger: 'change' }],
platform_id: [{ required: true, message: '请选择平台', trigger: 'change' }],
origin_sku: [{ required: true, message: '请输入内部 SKU', trigger: 'blur' }],
origin_sku_id: [
{ required: true, message: '请选择内部 SKU', trigger: 'change', type: 'number' },
],
platform_outer_sku: [{ required: true, message: '请输入或生成平台 SKU', trigger: 'blur' }],
}
@@ -175,6 +179,7 @@ function openEdit(record: SkuMappingForm & { id: number }) {
generation_strategy: record.generation_strategy || 'prefix',
warehouse_id: record.warehouse_id || undefined,
enabled: record.enabled ?? true,
bundled: record.bundled ?? false,
note: record.note || '',
})
nextTick(() => { skipCompanyWatch = false })
@@ -338,6 +343,18 @@ watch(
<a-select-option :value="false">禁用</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="组合商品">
<a-select
:value="(store.filters.bundled as any)"
@update:value="(v: any) => { store.filters.bundled = v }"
placeholder="全部"
allow-clear
style="width: 100px"
>
<a-select-option :value="true"></a-select-option>
<a-select-option :value="false"></a-select-option>
</a-select>
</a-form-item>
<a-form-item>
<a-space>
<a-button type="primary" html-type="submit">
@@ -384,6 +401,11 @@ watch(
</template>
<span v-else class="text-gray-400">平台默认</span>
</template>
<template v-else-if="column.key === 'bundled'">
<a-tag :color="record.bundled ? 'blue' : 'default'">
{{ record.bundled ? '是' : '否' }}
</a-tag>
</template>
<template v-else-if="column.key === 'enabled'">
<a-tag :color="record.enabled ? 'green' : 'default'">
{{ record.enabled ? '启用' : '禁用' }}
@@ -488,7 +510,7 @@ watch(
</a-form-item>
</a-col>
<a-col :span="12">
<a-form-item label="内部 SKU" name="origin_sku">
<a-form-item label="内部 SKU" name="origin_sku_id">
<a-select
v-model:value="form.origin_sku_id"
:placeholder="!form.company_id ? '请先选择公司' : '搜索(最少3个字)或输入SKU'"
@@ -544,13 +566,26 @@ watch(
<a-textarea v-model:value="form.note" placeholder="可选备注" :rows="2" />
</a-form-item>
<a-form-item label="状态">
<a-switch
v-model:checked="form.enabled"
checked-children="启用"
un-checked-children="禁用"
/>
</a-form-item>
<a-row :gutter="16">
<a-col :span="12">
<a-form-item label="状态">
<a-switch
v-model:checked="form.enabled"
checked-children="启用"
un-checked-children="禁用"
/>
</a-form-item>
</a-col>
<a-col :span="12">
<a-form-item label="组合商品" name="bundled">
<a-switch
v-model:checked="form.bundled"
checked-children=""
un-checked-children=""
/>
</a-form-item>
</a-col>
</a-row>
</a-form>
</a-modal>
@@ -22,7 +22,7 @@ describe('useAdminApiKeyStore', () => {
vi.restoreAllMocks()
})
describe('fetchAllKeys — enabled 查询参数契约', () => {
describe('fetchAllKeys — 查询参数契约', () => {
it('enabled=true 应序列化为 1(后端期望 integer 0/1', async () => {
vi.mocked(api.get).mockResolvedValueOnce(emptyPage)
@@ -60,20 +60,46 @@ describe('useAdminApiKeyStore', () => {
expect.objectContaining({ enabled: undefined }),
)
})
})
describe('toggleUserApiKeyEnabled — 请求体字段名契约', () => {
it('应发送 api_key_enabled 字段(后端 UserController::updateApiKeyEnabled 读取该字段)', async () => {
vi.mocked(api.patch).mockResolvedValueOnce(undefined)
it('username 过滤器应传递给后端', async () => {
vi.mocked(api.get).mockResolvedValueOnce(emptyPage)
const store = useAdminApiKeyStore()
await store.toggleUserApiKeyEnabled(42, true)
store.filters.username = 'testuser'
await store.fetchAllKeys()
expect(api.patch).toHaveBeenCalledWith(
'/api/v1/users/42/api-key-enabled',
{ api_key_enabled: true },
expect(api.get).toHaveBeenCalledWith(
'/api/v1/admin/api-keys',
expect.objectContaining({ username: 'testuser' }),
)
})
it('email 过滤器应传递给后端', async () => {
vi.mocked(api.get).mockResolvedValueOnce(emptyPage)
const store = useAdminApiKeyStore()
store.filters.email = 'test@example.com'
await store.fetchAllKeys()
expect(api.get).toHaveBeenCalledWith(
'/api/v1/admin/api-keys',
expect.objectContaining({ email: 'test@example.com' }),
)
})
it('空字符串过滤器应转换为 undefined', async () => {
vi.mocked(api.get).mockResolvedValueOnce(emptyPage)
const store = useAdminApiKeyStore()
store.filters.username = ''
store.filters.email = ''
await store.fetchAllKeys()
expect(api.get).toHaveBeenCalledWith(
'/api/v1/admin/api-keys',
expect.objectContaining({ username: undefined, email: undefined }),
)
})
})
})
+6 -14
View File
@@ -10,7 +10,8 @@ export const useAdminApiKeyStore = defineStore('admin-api-key', () => {
total: 0,
})
const filters = reactive<AdminApiKeyFilters>({
user_id: undefined,
username: undefined,
email: undefined,
enabled: undefined,
})
@@ -20,7 +21,8 @@ export const useAdminApiKeyStore = defineStore('admin-api-key', () => {
const data = await api.get<PaginatedData<ApiKeyRecord>>('/api/v1/admin/api-keys', {
page: pagination.page,
per_page: pagination.per_page,
user_id: filters.user_id,
username: filters.username || undefined,
email: filters.email || undefined,
enabled: filters.enabled === undefined ? undefined : filters.enabled ? 1 : 0,
})
keys.value = data.items
@@ -55,18 +57,9 @@ export const useAdminApiKeyStore = defineStore('admin-api-key', () => {
}
}
async function toggleUserApiKeyEnabled(userId: number, enabled: boolean) {
try {
await api.patch(`/api/v1/users/${userId}/api-key-enabled`, { api_key_enabled: enabled })
await fetchAllKeys()
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : '操作失败'
message.error(msg)
}
}
function resetFilters() {
filters.user_id = undefined
filters.username = undefined
filters.email = undefined
filters.enabled = undefined
pagination.page = 1
}
@@ -79,7 +72,6 @@ export const useAdminApiKeyStore = defineStore('admin-api-key', () => {
fetchAllKeys,
toggleKey,
deleteKey,
toggleUserApiKeyEnabled,
resetFilters,
}
})
+6
View File
@@ -86,6 +86,11 @@ export const useRouteGroupStore = defineStore('route-group', () => {
await Promise.all([fetchGroups(), fetchRoutes()])
}
async function syncRoutes() {
await api.post('/api/v1/routes/sync')
await Promise.all([fetchGroups(), fetchRoutes()])
}
// 按首段资源名分组,避免同名叶节点与目录共存导致 UI 混乱
const routeTree = computed(() => {
const root: RouteTreeNode[] = []
@@ -167,6 +172,7 @@ export const useRouteGroupStore = defineStore('route-group', () => {
fetchRoutes,
assignRouteToGroup,
batchAssignRoutes,
syncRoutes,
routeTree,
checkedRouteKeys,
}
+8 -2
View File
@@ -9,10 +9,11 @@ export interface SkuMappingRecord {
platform_id: number
store_id: number | null
origin_sku: string
origin_sku_id: number | null
platform_outer_sku: string | null
origin_sku_id: number
platform_outer_sku: string
platform_product_id: string
enabled: boolean
bundled: boolean
note: string | null
created_at: string
updated_at: string
@@ -22,6 +23,7 @@ export interface SkuMappingFilters {
origin_sku: string
platform_outer_sku: string
enabled: boolean | undefined
bundled: boolean | undefined
}
export interface SkuMappingForm {
@@ -35,6 +37,7 @@ export interface SkuMappingForm {
generation_strategy: string
warehouse_id: number | undefined
enabled: boolean
bundled?: boolean
note: string
}
@@ -90,6 +93,7 @@ export const useSkuMappingStore = defineStore('skuMapping', () => {
origin_sku: '',
platform_outer_sku: '',
enabled: undefined,
bundled: undefined,
})
// Lookups
@@ -178,6 +182,7 @@ export const useSkuMappingStore = defineStore('skuMapping', () => {
origin_sku: filters.origin_sku || undefined,
platform_outer_sku: filters.platform_outer_sku || undefined,
enabled: filters.enabled,
bundled: filters.bundled,
})
items.value = data.items
pagination.total = data.total
@@ -196,6 +201,7 @@ export const useSkuMappingStore = defineStore('skuMapping', () => {
filters.origin_sku = ''
filters.platform_outer_sku = ''
filters.enabled = undefined
filters.bundled = undefined
cascadeValue.company_id = undefined
cascadeValue.platform_id = undefined
cascadeValue.store_id = undefined
+3 -2
View File
@@ -315,7 +315,7 @@ export interface ApiKeyRecord {
expires_at: string | null
enabled: boolean
created_at: string
user?: { id: number; username: string; api_key_enabled?: boolean }
user?: { id: number; username: string; email?: string; api_key_enabled?: boolean }
}
export interface ApiKeyCreateParams {
@@ -329,7 +329,8 @@ export interface ApiKeyCreateResult {
}
export interface AdminApiKeyFilters {
user_id: number | undefined
username: string | undefined
email: string | undefined
enabled: boolean | undefined
}