*/ public array $calls = []; public function refresh(string $view, string $refresh_date): void { $this->calls[] = ['view' => $view, 'date' => $refresh_date]; } } /** * 测试用 stub Refresher,每次调用抛异常(模拟刷新失败) */ class ThrowingRefresher implements AggregateRefresherInterface { public function refresh(string $view, string $refresh_date): void { throw new RuntimeException('refresh failed'); } } /** * @internal * @coversNothing */ class OrderAggregatesRefreshJobTest extends TestCase { protected function runInCoroutine(callable $callback): void { if (\Swoole\Coroutine::getCid() > 0) { $callback(); return; } $exception = null; \Swoole\Coroutine\run(static function () use ($callback, &$exception): void { try { $callback(); } catch (\Throwable $e) { $exception = $e; } }); if ($exception) { throw $exception; } } /** * 沿用 P23.1 AggregateRefreshQueueTest 的随机未来日期隔离策略, * 确保不同测试用例间无冲突,也不会污染生产数据。 */ private function uniqueRefreshDate(): string { return sprintf('20%02d-%02d-%02d', random_int(50, 99), random_int(1, 12), random_int(1, 28)); } /** * 队列项 created_at < now()-1h 应被处理:调用 refresher 后从队列删除 */ public function test_processes_old_items_and_clears_queue(): void { $this->runInCoroutine(function (): void { $date_a = $this->uniqueRefreshDate(); $date_b = $this->uniqueRefreshDate(); while ($date_b === $date_a) { $date_b = $this->uniqueRefreshDate(); } // 前置清理(防 random 撞历史残留) AggregateRefreshQueue::query()->whereIn('refresh_date', [$date_a, $date_b])->delete(); try { $two_hours_ago = Carbon::now()->subHours(2); AggregateRefreshQueue::query()->insertOrIgnore([ ['refresh_date' => $date_a, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => $two_hours_ago], ['refresh_date' => $date_b, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => $two_hours_ago], ]); $refresher = new RecordingRefresher(); $job = new OrderAggregatesRefreshJob($refresher); $result = $job->run(); $this->assertSame(2, $result['processed']); $this->assertSame(0, $result['failed']); $this->assertCount(2, $refresher->calls); // 队列已清空(针对本次测试的两条) $remaining = AggregateRefreshQueue::query() ->whereIn('refresh_date', [$date_a, $date_b]) ->count(); $this->assertSame(0, $remaining); } finally { AggregateRefreshQueue::query()->whereIn('refresh_date', [$date_a, $date_b])->delete(); } }); } /** * 队列项 created_at >= now()-1h 应被跳过:refresher 无调用,队列保留 */ public function test_recently_enqueued_items_are_skipped(): void { $this->runInCoroutine(function (): void { $date = $this->uniqueRefreshDate(); AggregateRefreshQueue::query()->where('refresh_date', $date)->delete(); try { AggregateRefreshQueue::query()->insertOrIgnore([ 'refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => Carbon::now()->subMinutes(30), // < 1 hour ]); $refresher = new RecordingRefresher(); $job = new OrderAggregatesRefreshJob($refresher); $result = $job->run(); $this->assertSame(0, $result['processed']); $this->assertSame(0, $result['failed']); $this->assertCount(0, $refresher->calls); // 队列项保留 $exists = AggregateRefreshQueue::query()->where('refresh_date', $date)->exists(); $this->assertTrue($exists); } finally { AggregateRefreshQueue::query()->where('refresh_date', $date)->delete(); } }); } /** * refresher 抛异常时:队列项保留(下次任务重试),失败计数 +1 */ public function test_failure_keeps_queue_item(): void { $this->runInCoroutine(function (): void { $date = $this->uniqueRefreshDate(); AggregateRefreshQueue::query()->where('refresh_date', $date)->delete(); try { AggregateRefreshQueue::query()->insertOrIgnore([ 'refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => Carbon::now()->subHours(2), ]); $job = new OrderAggregatesRefreshJob(new ThrowingRefresher()); $result = $job->run(); $this->assertSame(0, $result['processed']); $this->assertSame(1, $result['failed']); // 队列项保留以待重试 $exists = AggregateRefreshQueue::query()->where('refresh_date', $date)->exists(); $this->assertTrue($exists); } finally { AggregateRefreshQueue::query()->where('refresh_date', $date)->delete(); } }); } }