0, // 同一个消费者,最高同时可以处理的消息数。 // @attention 默认值 100, test 设置为 1 'prefetch_count' => 1, // 因为 Hyperf 默认一个 Channel 只消费一个 队列,所以 global 设置为 true/false 效果是一样的。 'global' => false, ]; /** * 设置 requeue=false,让失败的消息进入 DLX 而不是回到原队列 * 这样才能实现重试队列和错误队列的机制 */ protected bool $requeue = false; protected $entityType = 'order'; /** * 重写队列构建器,设置队列参数 * 必须与 RabbitMQ 中现有队列的参数完全一致,否则会报 PRECONDITION_FAILED 错误 * * @return QueueBuilder */ public function getQueueBuilder(): QueueBuilder { return (new QueueBuilder()) ->setQueue($this->getQueue()) ->setArguments([ 'x-dead-letter-exchange' => ['S', 'dlx.orders'], 'x-dead-letter-routing-key' => ['S', 'retry'], 'x-message-ttl' => ['I', 86400000], // 24小时 'x-queue-type' => ['S', 'classic'], ]); } public function consumeMessage($data, AMQPMessage $message): Result { // 调试延迟:通过环境变量控制,方便在 mq:status 中观察消息状态 // 设置 AMQP_CONSUMER_DEBUG_DELAY=2 可以让每条消息处理延迟2秒 $debug_delay = (int) env('AMQP_CONSUMER_DEBUG_DELAY', 0); if ($debug_delay > 0) { dump("Debug mode: sleeping for {$debug_delay} seconds..."); sleep($debug_delay); } // 获取重试次数 $retry_count = $this->getRetryCount($message); $max_retries = (int) env('AMQP_MAX_RETRIES', 3); dump("Retry count: {$retry_count}/{$max_retries}"); try { // EntityParseFactory 使用静态方法调用,无需依赖注入 $parse = EntityParseFactory::createFromMessage($message); // 提取 metadata(从 meta 字段中获取) $meta = $data['meta'] ?? []; $metadata = [ 'company_id' => $meta['company_id'] ?? null, 'platform_id' => $meta['platform_id'] ?? null, 'store_id' => $meta['store_id'] ?? null, 'unique_id' => $meta['unique_id'] ?? null, ]; // entityMatch 则需要实现 根据 message 的 metadata 或其他字段的数据 获取 scope 如所属的 company / platform / store 的信息 // 获取到的 $entity 示例为属性为空的 \App\Model\Model 对象,比如 \App\Model\Order $entity = $parse->entityMatch($metadata); // message 中包含 raw data,raw data (数组或集合 -> 优先为集合类型 ) 则需要通过 entityMap 方法转换为 ORM 对象。 // 注意:raw_data 现在在 data 字段中 $entity_map_result = $parse->entityMap($data['data'] ?? []); // 将 LazyCollection 转为数组,准备批量操作 $data_to_upsert = $entity_map_result->all(); dump($entity_map_result->first()); if (empty($data_to_upsert)) { dump('No data to process'); return Result::ACK; } dump("Processing " . count($data_to_upsert) . " order(s) with batch upsert"); // 收集订单数据 $orders_data = $data_to_upsert; $raw_data = $data['data'] ?? []; Db::beginTransaction(); // @attention 为考虑数据写入的时效性和执行效率,采用批量写入 + 事务方式处理 // @attention 如果多条记录中有个别记录请求失败,可能会导致该批次写入失败,此时则需要判断和修复 // 1. 使用 upsert 批量处理订单插入和更新 // 利用数据库唯一索引自动判断是插入还是更新 // 解决了重复订单推送的问题:存在则更新,不存在则插入 $unique_by = $parse->getUniqueBy(); $entity->newQuery()->upsert( $orders_data, $unique_by, // 从解析器获取唯一键字段 $parse->getUpdateFields() // 从解析器获取可更新字段 ); // 2. 查询获取 ID 映射 [platform_order_id => local db id] // 通过唯一键查询刚写入的订单,获取数据库生成的 ID $platform_orders_id_to_local_db_order_id_map = $entity->newQuery() ->where(function ($query) use ($orders_data, $unique_by) { foreach ($orders_data as $order_data) { $query->orWhere(function ($q) use ($order_data, $unique_by) { foreach ($unique_by as $key) { $q->where($key, $order_data[$key]); } }); } }) ->pluck('id', 'platform_order_id') ->toArray(); dump("ID mapping: " . count($platform_orders_id_to_local_db_order_id_map) . " orders"); // 3. 格式化订单子项(传入 ID 映射,使子项能直接获取正确的 order_id) $items = $parse->formatOrderItemsFromRaw($raw_data, $platform_orders_id_to_local_db_order_id_map); // 4. 处理订单子项 // 鉴于定义子项为了保留足够的灵活性,因此每次订单更新,我们都需要完整更新 OrderItem $this->processOrderItems($items); // 5. 识别 ≥ 3 天前的 created_date 入队,补刷自动策略未覆盖的窗口 $this->enqueueAffectedDates($orders_data); Db::commit(); // 在数据库事务中尝试对 $entityMapResult 中的元素进行持久化,如果没有问题, 则返回 ACK,否则这是 NACK 且 回滚事务。 return Result::ACK; } catch (Throwable $error) { // dump("=== Error Caught ==="); // dump("Error: " . $error->getMessage()); // dump("File: " . $error->getFile() . ":" . $error->getLine()); // dump("Stack trace:"); // dump($error->getTraceAsString()); // dump("Retry Count: {$retry_count}"); // dump("Max Retries: {$max_retries}"); // dump("Check: {$retry_count} >= {$max_retries} = " . ($retry_count >= $max_retries ? 'TRUE' : 'FALSE')); Log::get()->error('Consumer processing failed', [ 'error' => $error->getMessage(), 'retry_count' => $retry_count, 'max_retries' => $max_retries, ]); Db::rollBack(); // 检查是否超过最大重试次数 if ($retry_count >= $max_retries) { // 超过重试次数,发送到错误队列 dump(">>> MAX RETRIES EXCEEDED! Sending to error queue..."); try { $this->sendToErrorQueue($message, $error); dump(">>> Successfully sent to error queue!"); } catch (Throwable $e) { dump(">>> FAILED to send to error queue: " . $e->getMessage()); dump(">>> Stack trace: " . $e->getTraceAsString()); } // 返回 ACK 避免消息再次重试 // 因为消息已经被发送到错误队列,不应该继续在主队列中循环 dump(">>> Returning ACK to prevent further retries"); return Result::ACK; } // 未超过重试次数,返回 NACK 让消息进入 DLX -> retry 队列 dump(">>> Retry not exceeded, sending to DLX (NACK)"); return Result::NACK; } } public function isEnable(): bool { return parent::isEnable(); } /** * 处理订单子项的批量同步(优化版本) * * 策略优化:使用业务键 (store_id, platform_order_id, sub_order_id) 作为唯一性约束 * 1. 直接批量 upsert OrderItems(无需先查询 order_id) * 2. 批量更新 order_id(通过 JOIN orders 表) * 3. 删除不在新数据中的旧 OrderItem(完全同步) * * 性能优势:减少一次批量查询,直接使用业务键进行 upsert * * @param array $items_by_platform_order_id 以 platform_order_id 为键的子项数据数组 * @return void */ protected function processOrderItems(array $items_by_platform_order_id): void { if (empty($items_by_platform_order_id)) { dump('No order items to process'); return; } // 1. 构建所有子项数据(无需查询 order_id) $all_items_to_upsert = []; $item_sub_order_ids_by_platform_order_id = []; // 记录每个平台订单的新子项 ID 列表 foreach ($items_by_platform_order_id as $item) { $platform_order_id = $item['platform_order_id']; // order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充 $all_items_to_upsert[] = $item; $item_sub_order_ids_by_platform_order_id[$platform_order_id] = $item['sub_order_id']; } if (empty($all_items_to_upsert)) { dump('No valid order items to upsert'); return; } dump("Upserting " . count($all_items_to_upsert) . " order items"); // 2. 批量 upsert OrderItems(使用业务键作为唯一性约束) // order_id 已由 formatOrderItemsFromRaw 通过 idMapping 填充,无需后置 JOIN 更新 OrderItem::query()->upsert( $all_items_to_upsert, ['store_id', 'platform_order_id', 'sub_order_id', 'created_date'], // 唯一键(业务键) [ 'order_id', // 直接更新 order_id 'company_id', 'platform_id', 'sub_order_type_id', 'product_id', 'platform_product_id', 'product_sku', 'product_barcode', 'unit_price', 'quantity', 'discount', 'total', 'ext', 'updated_at', ] ); dump("Upserted order items with order_id"); // 4. 批量删除不在新数据中的旧 OrderItem(完全同步策略) // 利用 hypertable 按 created_date 分区的特性,确保删除条件包含 created_date 以触发分区裁剪 // 此部分业务应该很少被调用,订单子项新增或删减的情况很少见 $this->deleteObsoleteOrderItems($all_items_to_upsert); dump("Order items processing completed"); } /** * 删除不在新数据中的旧订单子项 * * 策略:集合差集(1 次 SELECT + 1 次 DELETE) * 1. 查询数据库中涉及订单的所有 sub_order_id(集合 A) * 2. 从新数据提取 sub_order_id(集合 B) * 3. 集合 A - B = 需要删除的记录 * * @param array $new_items 新的订单子项数据 * @return void */ protected function deleteObsoleteOrderItems(array $new_items): void { if (empty($new_items)) { return; } // 1. 提取唯一的订单键 (store_id, platform_order_id, created_date) 和新数据的完整键 $order_keys = []; $new_item_keys = []; // 集合 B foreach ($new_items as $item) { // 统一格式化为 Y-m-d H:i:s(不带时区),确保与数据库查询结果匹配 $created_date = $item['created_date'] instanceof \DateTimeInterface ? $item['created_date']->format('Y-m-d H:i:s') : (new \DateTime($item['created_date']))->format('Y-m-d H:i:s'); // 订单级别的键(用于限定查询范围) $order_key = sprintf('%d|%s|%s', $item['store_id'], $item['platform_order_id'], $created_date); if (!isset($order_keys[$order_key])) { $order_keys[$order_key] = [ 'store_id' => $item['store_id'], 'platform_order_id' => $item['platform_order_id'], 'created_date' => $created_date, ]; } // 子项级别的键(用于差集计算) $item_key = sprintf('%d|%s|%s|%s', $item['store_id'], $item['platform_order_id'], $item['sub_order_id'], $created_date); $new_item_keys[$item_key] = true; } // 2. 查询数据库中这些订单的所有现有 sub_order_id(集合 A) // 利用 hypertable 的 created_date 分区裁剪 + 索引 order_items_store_platform_sub_unique $existing_items = OrderItem::query() ->where(function ($query) use ($order_keys) { foreach ($order_keys as $key) { $query->orWhere(function ($q) use ($key) { $q->where('store_id', $key['store_id']) ->where('platform_order_id', $key['platform_order_id']) ->where('created_date', $key['created_date']); }); } }) ->select(['id', 'store_id', 'platform_order_id', 'sub_order_id', 'created_date']) ->get(); if ($existing_items->isEmpty()) { return; } // 3. 计算差集:集合 A - 集合 B = 需要删除的记录 $ids_to_delete = []; $created_dates_to_delete = []; foreach ($existing_items as $existing) { // 统一格式化为 Y-m-d H:i:s(不带时区),与新数据保持一致 $existing_created_date = $existing->created_date instanceof \DateTimeInterface ? $existing->created_date->format('Y-m-d H:i:s') : (new \DateTime((string) $existing->created_date))->format('Y-m-d H:i:s'); $existing_key = sprintf( '%d|%s|%s|%s', $existing->store_id, $existing->platform_order_id, $existing->sub_order_id, $existing_created_date ); // 如果现有记录不在新数据中,则需要删除 if (!isset($new_item_keys[$existing_key])) { $ids_to_delete[] = $existing->id; $created_dates_to_delete[$existing_created_date] = true; } } // 4. 批量删除(利用 hypertable 主键 (id, created_date) 进行高效删除) if (!empty($ids_to_delete)) { $deleted = OrderItem::query() ->whereIn('id', $ids_to_delete) ->whereIn('created_date', array_keys($created_dates_to_delete)) ->delete(); if ($deleted > 0) { dump("Batch deleted {$deleted} obsolete order items"); } } } /** * 识别 payload 中 ≥ 3 天前的 created_date,入队 orders_daily_by_created 兜底刷新。 * * 自动刷新策略仅覆盖最近 3 天窗口;3 天前的订单变更(补录、追溯调整)需由 * aggregate_refresh_queue + Crontab 任务补刷。仅服务 by_created 视图, * by_paid 由全量 REFRESH 覆盖,不入此队列。 * * @param array $payloads 来自 entityMap()->all() 的订单数组,每条含 created_date 字段 */ protected function enqueueAffectedDates(array $payloads): void { if (empty($payloads)) { return; } $threshold = Carbon::now()->subDays(3)->toDateString(); $unique_dates = []; foreach ($payloads as $payload) { $created = $payload['created_date'] ?? null; if ($created === null) { continue; } // entityMap 输出 'Y-m-d H:i:sP';用 Carbon::parse 兼容多种格式 $date = Carbon::parse($created)->toDateString(); // 严格小于阈值才入队(≥ 阈值的部分由自动刷新策略覆盖) if ($date < $threshold) { $unique_dates[$date] = true; } } if (empty($unique_dates)) { return; } $now = Carbon::now(); foreach (array_keys($unique_dates) as $date) { AggregateRefreshQueue::query()->insertOrIgnore([ 'refresh_date' => $date, 'aggregate_view' => 'orders_daily_by_created', 'created_at' => $now, ]); } } }