Files
datahub/backend/app/Service/OrderAggregatesRefreshJob.php
2026-05-07 20:51:38 +08:00

77 lines
2.1 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace App\Service;
use App\Model\AggregateRefreshQueue;
use App\Utils\Log;
use Carbon\Carbon;
use Throwable;
/**
* 订单聚合刷新任务
*
* 消费 aggregate_refresh_queue 中 created_at < now() - 1 hour 的项,
* 逐条调用 AggregateRefresherInterface 刷新对应日期的连续聚合,
* 成功后从队列中删除;失败保留队列项由下次任务重试。
*
* 由 OrderAggregatesRefreshCommandCLI 入口)和 Crontab(定时入口)共同调用。
*/
class OrderAggregatesRefreshJob
{
public function __construct(private AggregateRefresherInterface $refresher)
{
}
/**
* @return array{processed: int, failed: int} 处理与失败计数
*/
public function run(): array
{
$items = AggregateRefreshQueue::query()
->where('created_at', '<', Carbon::now()->subHour())
->orderBy('refresh_date')
->orderBy('aggregate_view')
->get();
if ($items->isEmpty()) {
Log::get()->info('orders:refresh-aggregates queue empty');
return ['processed' => 0, 'failed' => 0];
}
$processed = 0;
$failed = 0;
foreach ($items as $item) {
$view = $item->aggregate_view;
$date = $item->refresh_date;
try {
$this->refresher->refresh($view, $date);
AggregateRefreshQueue::query()
->where('refresh_date', $date)
->where('aggregate_view', $view)
->delete();
$processed++;
} catch (Throwable $e) {
Log::get()->error('orders:refresh-aggregates failed', [
'view' => $view,
'date' => $date,
'error' => $e->getMessage(),
]);
$failed++;
}
}
Log::get()->info('orders:refresh-aggregates done', [
'processed' => $processed,
'failed' => $failed,
]);
return ['processed' => $processed, 'failed' => $failed];
}
}