diff --git a/backend/app/Command/OrderAggregatesRefreshCommand.php b/backend/app/Command/OrderAggregatesRefreshCommand.php new file mode 100644 index 0000000..01333d5 --- /dev/null +++ b/backend/app/Command/OrderAggregatesRefreshCommand.php @@ -0,0 +1,35 @@ +setDescription('消费 aggregate_refresh_queue,对滞后日期调用 refresh_continuous_aggregate'); + } + + public function handle(): void + { + $result = $this->job->run(); + $this->info(sprintf('Processed: %d, Failed: %d', $result['processed'], $result['failed'])); + } +} diff --git a/backend/app/Model/AggregateRefreshQueue.php b/backend/app/Model/AggregateRefreshQueue.php new file mode 100644 index 0000000..ffb84fb --- /dev/null +++ b/backend/app/Model/AggregateRefreshQueue.php @@ -0,0 +1,31 @@ + 'datetime', + ]; +} diff --git a/backend/app/Platform/OrderConsumer.php b/backend/app/Platform/OrderConsumer.php index d74b6a8..ebe25aa 100644 --- a/backend/app/Platform/OrderConsumer.php +++ b/backend/app/Platform/OrderConsumer.php @@ -5,8 +5,10 @@ declare(strict_types=1); namespace App\Platform; use App\Entity\Parse\EntityParseFactory; +use App\Model\AggregateRefreshQueue; use App\Platform\Traits\FailedMessageTrait; use App\Utils\Log; +use Carbon\Carbon; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Builder\QueueBuilder; use Hyperf\Amqp\Message\ConsumerMessage; @@ -152,9 +154,10 @@ class OrderConsumer extends ConsumerMessage // 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem $this->processOrderItems($items); + // 5. 识别 ≥ 3 天前的 created_date 入队,补刷自动策略未覆盖的窗口 + $this->enqueueAffectedDates($orders_data); Db::commit(); - // @TODO 触发事件通知,更新自动聚合任务 // 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。 return Result::ACK; @@ -379,4 +382,51 @@ class OrderConsumer extends ConsumerMessage } } + /** + * 识别 payload 中 ≥ 3 天前的 created_date,入队 orders_daily_by_created 兜底刷新。 + * + * 自动刷新策略仅覆盖最近 3 天窗口;3 天前的订单变更(补录、追溯调整)需由 + * aggregate_refresh_queue + Crontab 任务补刷。仅服务 by_created 视图, + * by_paid 由全量 REFRESH 覆盖,不入此队列。 + * + * @param array $payloads 来自 entityMap()->all() 的订单数组,每条含 created_date 字段 + */ + protected function enqueueAffectedDates(array $payloads): void + { + if (empty($payloads)) { + return; + } + + $threshold = Carbon::now()->subDays(3)->toDateString(); + + $unique_dates = []; + foreach ($payloads as $payload) { + $created = $payload['created_date'] ?? null; + if ($created === null) { + continue; + } + + // entityMap 输出 'Y-m-d H:i:sP';用 Carbon::parse 兼容多种格式 + $date = Carbon::parse($created)->toDateString(); + + // 严格小于阈值才入队(≥ 阈值的部分由自动刷新策略覆盖) + if ($date < $threshold) { + $unique_dates[$date] = true; + } + } + + if (empty($unique_dates)) { + return; + } + + $now = Carbon::now(); + foreach (array_keys($unique_dates) as $date) { + AggregateRefreshQueue::query()->insertOrIgnore([ + 'refresh_date' => $date, + 'aggregate_view' => 'orders_daily_by_created', + 'created_at' => $now, + ]); + } + } + } diff --git a/backend/app/Service/AggregateRefresherInterface.php b/backend/app/Service/AggregateRefresherInterface.php new file mode 100644 index 0000000..065201c --- /dev/null +++ b/backend/app/Service/AggregateRefresherInterface.php @@ -0,0 +1,21 @@ +quote($view); + + Db::statement(sprintf( + 'CALL refresh_continuous_aggregate(%s, %s, %s)', + $quoted_view, + $start, + $end + )); + } +} diff --git a/backend/app/Service/OrderAggregatesRefreshJob.php b/backend/app/Service/OrderAggregatesRefreshJob.php new file mode 100644 index 0000000..8291d0d --- /dev/null +++ b/backend/app/Service/OrderAggregatesRefreshJob.php @@ -0,0 +1,76 @@ +where('created_at', '<', Carbon::now()->subHour()) + ->orderBy('refresh_date') + ->orderBy('aggregate_view') + ->get(); + + if ($items->isEmpty()) { + Log::get()->info('orders:refresh-aggregates queue empty'); + return ['processed' => 0, 'failed' => 0]; + } + + $processed = 0; + $failed = 0; + + foreach ($items as $item) { + $view = $item->aggregate_view; + $date = $item->refresh_date; + + try { + $this->refresher->refresh($view, $date); + + AggregateRefreshQueue::query() + ->where('refresh_date', $date) + ->where('aggregate_view', $view) + ->delete(); + + $processed++; + } catch (Throwable $e) { + Log::get()->error('orders:refresh-aggregates failed', [ + 'view' => $view, + 'date' => $date, + 'error' => $e->getMessage(), + ]); + $failed++; + } + } + + Log::get()->info('orders:refresh-aggregates done', [ + 'processed' => $processed, + 'failed' => $failed, + ]); + + return ['processed' => $processed, 'failed' => $failed]; + } +} diff --git a/backend/composer.json b/backend/composer.json index 578d351..493b2be 100644 --- a/backend/composer.json +++ b/backend/composer.json @@ -20,6 +20,7 @@ "hyperf/command": "~3.1.0", "hyperf/config": "~3.1.0", "hyperf/constants": "~3.1.0", + "hyperf/crontab": "~3.1.0", "hyperf/database-pgsql": "^3.1", "hyperf/db-connection": "~3.1.0", "hyperf/engine": "^2.10", diff --git a/backend/config/autoload/crontab.php b/backend/config/autoload/crontab.php new file mode 100644 index 0000000..82d3b62 --- /dev/null +++ b/backend/config/autoload/crontab.php @@ -0,0 +1,19 @@ + true, + 'crontab' => [ + // 每天 02:07 处理 aggregate_refresh_queue 中的滞后聚合刷新。 + // 02 时段为最低流量段;分钟取 :07 避开整点全互联网定时任务集中触发。 + (new Crontab()) + ->setName('OrderAggregatesRefresh') + ->setRule('7 2 * * *') + ->setCallback([OrderAggregatesRefreshJob::class, 'run']) + ->setMemo('每天 02:07 处理 aggregate_refresh_queue 中的滞后聚合刷新'), + ], +]; diff --git a/backend/config/autoload/dependencies.php b/backend/config/autoload/dependencies.php index e7bea62..24dfac6 100644 --- a/backend/config/autoload/dependencies.php +++ b/backend/config/autoload/dependencies.php @@ -11,4 +11,5 @@ declare(strict_types=1); */ return [ // 可以在这里配置接口到实现的绑定 + App\Service\AggregateRefresherInterface::class => App\Service\ContinuousAggregateRefresher::class, ]; diff --git a/backend/config/autoload/processes.php b/backend/config/autoload/processes.php index f46bd96..4099209 100644 --- a/backend/config/autoload/processes.php +++ b/backend/config/autoload/processes.php @@ -10,4 +10,5 @@ declare(strict_types=1); * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */ return [ + Hyperf\Crontab\Process\CrontabDispatcherProcess::class, ]; diff --git a/backend/migrations/2026_05_07_100300_create_aggregate_refresh_queue.php b/backend/migrations/2026_05_07_100300_create_aggregate_refresh_queue.php new file mode 100644 index 0000000..64c05f2 --- /dev/null +++ b/backend/migrations/2026_05_07_100300_create_aggregate_refresh_queue.php @@ -0,0 +1,32 @@ +date('refresh_date')->comment('待刷新的聚合日'); + $table->string('aggregate_view', 100)->comment('视图名(orders_daily_by_created / orders_daily_by_paid)'); + $table->timestampTz('created_at')->useCurrent()->comment('入队时间'); + + $table->primary(['refresh_date', 'aggregate_view'], 'pk_aggregate_refresh_queue'); + $table->index('created_at', 'idx_aggregate_refresh_queue_created_at'); + }); + + // Hyperf PostgresGrammar::compilePrimary 丢弃自定义 index 名(vendor/hyperf/database-pgsql/.../PostgresGrammar.php:230), + // PG 默认用 {table}_pkey;这里显式重命名以满足计划验收要求(名称 pk_aggregate_refresh_queue) + Db::statement('ALTER TABLE aggregate_refresh_queue RENAME CONSTRAINT aggregate_refresh_queue_pkey TO pk_aggregate_refresh_queue'); + } + + public function down(): void + { + Schema::dropIfExists('aggregate_refresh_queue'); + } +}; diff --git a/backend/test/Cases/Unit/Model/AggregateRefreshQueueTest.php b/backend/test/Cases/Unit/Model/AggregateRefreshQueueTest.php new file mode 100644 index 0000000..de8a076 --- /dev/null +++ b/backend/test/Cases/Unit/Model/AggregateRefreshQueueTest.php @@ -0,0 +1,89 @@ + 0) { + $callback(); + return; + } + + $exception = null; + \Swoole\Coroutine\run(static function () use ($callback, &$exception): void { + try { + $callback(); + } catch (\Throwable $e) { + $exception = $e; + } + }); + if ($exception) { + throw $exception; + } + } + + private function uniqueRefreshDate(): string + { + // 与 ApiKeyTest 的 bin2hex(random_bytes(4)) 同源策略:用唯一性而非全表清理来隔离用例 + return sprintf('20%02d-%02d-%02d', random_int(50, 99), random_int(1, 12), random_int(1, 28)); + } + + public function test_attributes_persist_correctly(): void + { + $this->runInCoroutine(function (): void { + $unique_date = $this->uniqueRefreshDate(); + + try { + AggregateRefreshQueue::query()->create([ + 'refresh_date' => $unique_date, + 'aggregate_view' => 'orders_daily_by_created', + 'created_at' => Carbon::now(), + ]); + + $row = AggregateRefreshQueue::query()->where('refresh_date', $unique_date)->first(); + + $this->assertNotNull($row); + $this->assertSame($unique_date, $row->refresh_date); + $this->assertSame('orders_daily_by_created', $row->aggregate_view); + $this->assertInstanceOf(Carbon::class, $row->created_at); + } finally { + AggregateRefreshQueue::query()->where('refresh_date', $unique_date)->delete(); + } + }); + } + + public function test_insert_or_ignore_dedups_on_composite_pk(): void + { + $this->runInCoroutine(function (): void { + $unique_date = $this->uniqueRefreshDate(); + + $payload = [ + 'refresh_date' => $unique_date, + 'aggregate_view' => 'orders_daily_by_created', + 'created_at' => Carbon::now(), + ]; + + try { + AggregateRefreshQueue::query()->insertOrIgnore($payload); + AggregateRefreshQueue::query()->insertOrIgnore($payload); + + $count = AggregateRefreshQueue::query()->where('refresh_date', $unique_date)->count(); + $this->assertSame(1, $count); + } finally { + AggregateRefreshQueue::query()->where('refresh_date', $unique_date)->delete(); + } + }); + } +} diff --git a/backend/test/Cases/Unit/Platform/OrderConsumerEnqueueTest.php b/backend/test/Cases/Unit/Platform/OrderConsumerEnqueueTest.php new file mode 100644 index 0000000..ff0df6b --- /dev/null +++ b/backend/test/Cases/Unit/Platform/OrderConsumerEnqueueTest.php @@ -0,0 +1,164 @@ + 0) { + $callback(); + return; + } + + $exception = null; + \Swoole\Coroutine\run(static function () use ($callback, &$exception): void { + try { + $callback(); + } catch (\Throwable $e) { + $exception = $e; + } + }); + if ($exception) { + throw $exception; + } + } + + /** + * 调用 protected enqueueAffectedDates 方法 + */ + protected function invokeEnqueue(array $payloads): void + { + $consumer = new OrderConsumer(); + $method = new ReflectionMethod($consumer, 'enqueueAffectedDates'); + $method->setAccessible(true); + $method->invoke($consumer, $payloads); + } + + /** + * 清理指定日期的队列条目(前置 + 后置双保险) + */ + protected function cleanupQueue(string $date): void + { + AggregateRefreshQueue::query() + ->where('refresh_date', $date) + ->where('aggregate_view', 'orders_daily_by_created') + ->delete(); + } + + /** + * 当日订单不应入队(在自动刷新策略覆盖窗口内) + */ + public function test_today_payload_does_not_enqueue(): void + { + $this->runInCoroutine(function (): void { + $today = Carbon::now()->toDateString(); + $payloads = [['created_date' => Carbon::now()->format('Y-m-d H:i:sP')]]; + + $this->cleanupQueue($today); + try { + $this->invokeEnqueue($payloads); + + $row = AggregateRefreshQueue::query() + ->where('refresh_date', $today) + ->where('aggregate_view', 'orders_daily_by_created') + ->first(); + $this->assertNull($row); + } finally { + $this->cleanupQueue($today); + } + }); + } + + /** + * 2 天前订单不应入队(仍在自动刷新策略 [now-3d, now-1h] 窗口内) + */ + public function test_two_days_ago_payload_does_not_enqueue(): void + { + $this->runInCoroutine(function (): void { + $two_days_ago = Carbon::now()->subDays(2)->toDateString(); + $payloads = [['created_date' => Carbon::now()->subDays(2)->format('Y-m-d H:i:sP')]]; + + $this->cleanupQueue($two_days_ago); + try { + $this->invokeEnqueue($payloads); + + $row = AggregateRefreshQueue::query() + ->where('refresh_date', $two_days_ago) + ->where('aggregate_view', 'orders_daily_by_created') + ->first(); + $this->assertNull($row); + } finally { + $this->cleanupQueue($two_days_ago); + } + }); + } + + /** + * 30 天前订单应入队 by_created(自动策略未覆盖,需补刷) + */ + public function test_old_payload_enqueues_by_created(): void + { + $this->runInCoroutine(function (): void { + $old_date = Carbon::now()->subDays(30)->toDateString(); + $payloads = [['created_date' => Carbon::now()->subDays(30)->format('Y-m-d H:i:sP')]]; + + $this->cleanupQueue($old_date); + try { + $this->invokeEnqueue($payloads); + + $row = AggregateRefreshQueue::query() + ->where('refresh_date', $old_date) + ->where('aggregate_view', 'orders_daily_by_created') + ->first(); + $this->assertNotNull($row); + $this->assertSame($old_date, $row->refresh_date); + $this->assertSame('orders_daily_by_created', $row->aggregate_view); + } finally { + $this->cleanupQueue($old_date); + } + }); + } + + /** + * 同日期重复入队由复合主键 + insertOrIgnore 防重 + */ + public function test_dedup_via_insert_or_ignore(): void + { + $this->runInCoroutine(function (): void { + $old_date = Carbon::now()->subDays(45)->toDateString(); + $payloads = [ + ['created_date' => Carbon::now()->subDays(45)->format('Y-m-d H:i:sP')], + ['created_date' => Carbon::now()->subDays(45)->format('Y-m-d H:i:sP')], + ]; + + $this->cleanupQueue($old_date); + try { + // 单批次内同日期 → $unique_dates map 去重 + $this->invokeEnqueue($payloads); + // 跨批次同日期 → 复合主键 insertOrIgnore 防重 + $this->invokeEnqueue($payloads); + + $count = AggregateRefreshQueue::query() + ->where('refresh_date', $old_date) + ->where('aggregate_view', 'orders_daily_by_created') + ->count(); + $this->assertSame(1, $count); + } finally { + $this->cleanupQueue($old_date); + } + }); + } +} diff --git a/backend/test/Cases/Unit/Service/OrderAggregatesRefreshJobTest.php b/backend/test/Cases/Unit/Service/OrderAggregatesRefreshJobTest.php new file mode 100644 index 0000000..bad7ead --- /dev/null +++ b/backend/test/Cases/Unit/Service/OrderAggregatesRefreshJobTest.php @@ -0,0 +1,178 @@ + */ + public array $calls = []; + + public function refresh(string $view, string $refresh_date): void + { + $this->calls[] = ['view' => $view, 'date' => $refresh_date]; + } +} + +/** + * 测试用 stub Refresher,每次调用抛异常(模拟刷新失败) + */ +class ThrowingRefresher implements AggregateRefresherInterface +{ + public function refresh(string $view, string $refresh_date): void + { + throw new RuntimeException('refresh failed'); + } +} + +/** + * @internal + * @coversNothing + */ +class OrderAggregatesRefreshJobTest extends TestCase +{ + protected function runInCoroutine(callable $callback): void + { + if (\Swoole\Coroutine::getCid() > 0) { + $callback(); + return; + } + + $exception = null; + \Swoole\Coroutine\run(static function () use ($callback, &$exception): void { + try { + $callback(); + } catch (\Throwable $e) { + $exception = $e; + } + }); + if ($exception) { + throw $exception; + } + } + + /** + * 沿用 P23.1 AggregateRefreshQueueTest 的随机未来日期隔离策略, + * 确保不同测试用例间无冲突,也不会污染生产数据。 + */ + private function uniqueRefreshDate(): string + { + return sprintf('20%02d-%02d-%02d', random_int(50, 99), random_int(1, 12), random_int(1, 28)); + } + + /** + * 队列项 created_at < now()-1h 应被处理:调用 refresher 后从队列删除 + */ + public function test_processes_old_items_and_clears_queue(): void + { + $this->runInCoroutine(function (): void { + $date_a = $this->uniqueRefreshDate(); + $date_b = $this->uniqueRefreshDate(); + while ($date_b === $date_a) { + $date_b = $this->uniqueRefreshDate(); + } + + // 前置清理(防 random 撞历史残留) + AggregateRefreshQueue::query()->whereIn('refresh_date', [$date_a, $date_b])->delete(); + + try { + $two_hours_ago = Carbon::now()->subHours(2); + AggregateRefreshQueue::query()->insertOrIgnore([ + ['refresh_date' => $date_a, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => $two_hours_ago], + ['refresh_date' => $date_b, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => $two_hours_ago], + ]); + + $refresher = new RecordingRefresher(); + $job = new OrderAggregatesRefreshJob($refresher); + $result = $job->run(); + + $this->assertSame(2, $result['processed']); + $this->assertSame(0, $result['failed']); + $this->assertCount(2, $refresher->calls); + + // 队列已清空(针对本次测试的两条) + $remaining = AggregateRefreshQueue::query() + ->whereIn('refresh_date', [$date_a, $date_b]) + ->count(); + $this->assertSame(0, $remaining); + } finally { + AggregateRefreshQueue::query()->whereIn('refresh_date', [$date_a, $date_b])->delete(); + } + }); + } + + /** + * 队列项 created_at >= now()-1h 应被跳过:refresher 无调用,队列保留 + */ + public function test_recently_enqueued_items_are_skipped(): void + { + $this->runInCoroutine(function (): void { + $date = $this->uniqueRefreshDate(); + AggregateRefreshQueue::query()->where('refresh_date', $date)->delete(); + + try { + AggregateRefreshQueue::query()->insertOrIgnore([ + 'refresh_date' => $date, + 'aggregate_view' => 'orders_daily_by_created', + 'created_at' => Carbon::now()->subMinutes(30), // < 1 hour + ]); + + $refresher = new RecordingRefresher(); + $job = new OrderAggregatesRefreshJob($refresher); + $result = $job->run(); + + $this->assertSame(0, $result['processed']); + $this->assertSame(0, $result['failed']); + $this->assertCount(0, $refresher->calls); + + // 队列项保留 + $exists = AggregateRefreshQueue::query()->where('refresh_date', $date)->exists(); + $this->assertTrue($exists); + } finally { + AggregateRefreshQueue::query()->where('refresh_date', $date)->delete(); + } + }); + } + + /** + * refresher 抛异常时:队列项保留(下次任务重试),失败计数 +1 + */ + public function test_failure_keeps_queue_item(): void + { + $this->runInCoroutine(function (): void { + $date = $this->uniqueRefreshDate(); + AggregateRefreshQueue::query()->where('refresh_date', $date)->delete(); + + try { + AggregateRefreshQueue::query()->insertOrIgnore([ + 'refresh_date' => $date, + 'aggregate_view' => 'orders_daily_by_created', + 'created_at' => Carbon::now()->subHours(2), + ]); + + $job = new OrderAggregatesRefreshJob(new ThrowingRefresher()); + $result = $job->run(); + + $this->assertSame(0, $result['processed']); + $this->assertSame(1, $result['failed']); + + // 队列项保留以待重试 + $exists = AggregateRefreshQueue::query()->where('refresh_date', $date)->exists(); + $this->assertTrue($exists); + } finally { + AggregateRefreshQueue::query()->where('refresh_date', $date)->delete(); + } + }); + } +}