import json import re from openai import AsyncOpenAI from app.utils.logger import get_logger from config import apikey_settings, recovery_settings from app.utils.prompts.prompts import Prompt # 로거 설정 logger = get_logger("chatgpt") class ChatGPTResponseError(Exception): """ChatGPT API 응답 에러""" def __init__(self, status: str, error_code: str = None, error_message: str = None): self.status = status self.error_code = error_code self.error_message = error_message super().__init__(f"ChatGPT response failed: status={status}, code={error_code}, message={error_message}") class ChatgptService: """ChatGPT API 서비스 클래스 """ def __init__(self, timeout: float = None): self.timeout = timeout or recovery_settings.CHATGPT_TIMEOUT self.max_retries = recovery_settings.CHATGPT_MAX_RETRIES self.client = AsyncOpenAI( api_key=apikey_settings.CHATGPT_API_KEY, timeout=self.timeout ) async def _call_structured_output_with_response_gpt_api(self, prompt: str, output_format: dict, model: str) -> dict: content = [{"type": "input_text", "text": prompt}] last_error = None for attempt in range(self.max_retries + 1): response = await self.client.responses.create( model=model, input=[{"role": "user", "content": content}], text=output_format, timeout=self.timeout ) # Response 디버그 로깅 logger.debug(f"[ChatgptService] Response ID: {response.id}") logger.debug(f"[ChatgptService] Response status: {response.status}") logger.debug(f"[ChatgptService] Response model: {response.model}") # status 확인: completed, failed, incomplete, cancelled, queued, in_progress if response.status == "completed": logger.debug(f"[ChatgptService] Response output_text: {response.output_text[:200]}..." if len(response.output_text) > 200 else f"[ChatgptService] Response output_text: {response.output_text}") structured_output = json.loads(response.output_text) return structured_output or {} # 에러 상태 처리 if response.status == "failed": error_code = getattr(response.error, 'code', None) if response.error else None error_message = getattr(response.error, 'message', None) if response.error else None logger.warning(f"[ChatgptService] Response failed (attempt {attempt + 1}/{self.max_retries + 1}): code={error_code}, message={error_message}") last_error = ChatGPTResponseError(response.status, error_code, error_message) elif response.status == "incomplete": reason = getattr(response.incomplete_details, 'reason', None) if response.incomplete_details else None logger.warning(f"[ChatgptService] Response incomplete (attempt {attempt + 1}/{self.max_retries + 1}): reason={reason}") last_error = ChatGPTResponseError(response.status, reason, f"Response incomplete: {reason}") else: # cancelled, queued, in_progress 등 예상치 못한 상태 logger.warning(f"[ChatgptService] Unexpected response status (attempt {attempt + 1}/{self.max_retries + 1}): {response.status}") last_error = ChatGPTResponseError(response.status, None, f"Unexpected status: {response.status}") # 마지막 시도가 아니면 재시도 if attempt < self.max_retries: logger.info(f"[ChatgptService] Retrying request...") # 모든 재시도 실패 logger.error(f"[ChatgptService] All retries exhausted. Last error: {last_error}") raise last_error async def generate_structured_output( self, prompt : Prompt, input_data : dict, ) -> str: prompt_text = prompt.build_prompt(input_data) logger.debug(f"[ChatgptService] Generated Prompt (length: {len(prompt_text)})") logger.info(f"[ChatgptService] Starting GPT request with structured output with model: {prompt.prompt_model}") # GPT API 호출 response = await self._call_structured_output_with_response_gpt_api(prompt_text, prompt.prompt_output, prompt.prompt_model) return response