has('application_headers')) { return 0; } $headers = $message->get('application_headers'); if (!$headers) { return 0; } $header_data = $headers->getNativeData(); $x_death = $header_data['x-death'] ?? []; if (empty($x_death)) { return 0; } return $x_death[0]['count'] ?? 0; } /** * 发送消息到错误队列 * 当重试次数超过上限时调用 * * @param AMQPMessage $message 原始消息 * @param Throwable $error 错误信息 */ protected function sendToErrorQueue(AMQPMessage $message, Throwable $error): void { try { $retry_count = $this->getRetryCount($message); $producer = ApplicationContext::getContainer()->get(Producer::class); $error_producer = new ErrorProducer($message, $error, $retry_count); $producer->produce($error_producer); // 同步写入 failed_messages 表 $this->persistFailedMessage($error_producer->payload); Log::get()->warning('Message sent to error queue after exceeding retry limit', [ 'error_id' => $error_producer->payload['error_id'] ?? 'unknown', 'retry_count' => $retry_count, 'error_message' => $error->getMessage(), ]); } catch (Throwable $e) { Log::get()->error('Failed to send message to error queue', [ 'error' => $e->getMessage(), 'original_error' => $error->getMessage(), ]); } } /** * 持久化失败消息到数据库 */ protected function persistFailedMessage(array $payload): void { try { FailedMessage::query()->create([ 'error_id' => $payload['error_id'], 'data_type' => $payload['metadata']['data_type'] ?? 'unknown', 'platform' => $payload['metadata']['platform'] ?? null, 'platform_id' => $payload['metadata']['platform_id'] ?? null, 'company_id' => $payload['metadata']['company_id'] ?? null, 'store_id' => $payload['metadata']['store_id'] ?? null, 'error_type' => $payload['error']['type'] ?? 'Unknown', 'error_message' => $payload['error']['message'] ?? '', 'error_code' => $payload['error']['code'] ?? 0, 'error_trace' => $payload['error']['trace'] ?? '', 'original_message' => $payload['original_message'] ?? [], 'retry_count' => $payload['metadata']['retry_count'] ?? 0, 'message_id' => $payload['metadata']['message_id'] ?? null, 'failed_at' => $payload['metadata']['failed_at'] ?? date('c'), ]); } catch (Throwable $e) { Log::get()->error('Failed to persist failed message to database', [ 'error' => $e->getMessage(), 'error_id' => $payload['error_id'] ?? 'unknown', ]); } } }