o2o-castad-backend/app/utils/chatgpt_prompt.py

95 lines
4.6 KiB
Python

import json
import re
from pydantic import BaseModel
from openai import AsyncOpenAI
from app.utils.logger import get_logger
from config import apikey_settings, recovery_settings
from app.utils.prompts.prompts import Prompt
# 로거 설정
logger = get_logger("chatgpt")
class ChatGPTResponseError(Exception):
"""ChatGPT API 응답 에러"""
def __init__(self, status: str, error_code: str = None, error_message: str = None):
self.status = status
self.error_code = error_code
self.error_message = error_message
super().__init__(f"ChatGPT response failed: status={status}, code={error_code}, message={error_message}")
class ChatgptService:
"""ChatGPT API 서비스 클래스
"""
def __init__(self, timeout: float = None):
self.timeout = timeout or recovery_settings.CHATGPT_TIMEOUT
self.max_retries = recovery_settings.CHATGPT_MAX_RETRIES
self.client = AsyncOpenAI(
api_key=apikey_settings.CHATGPT_API_KEY,
timeout=self.timeout
)
async def _call_pydantic_output(self, prompt : str, output_format : BaseModel, model : str) -> BaseModel: # 입력 output_format의 경우 Pydantic BaseModel Class를 상속한 Class 자체임에 유의할 것
content = [{"type": "input_text", "text": prompt}]
last_error = None
for attempt in range(self.max_retries + 1):
response = await self.client.responses.parse(
model=model,
input=[{"role": "user", "content": content}],
text_format=output_format
)
# Response 디버그 로깅
logger.debug(f"[ChatgptService] Response ID: {response.id}")
logger.debug(f"[ChatgptService] Response status: {response.status}")
logger.debug(f"[ChatgptService] Response model: {response.model}")
# status 확인: completed, failed, incomplete, cancelled, queued, in_progress
if response.status == "completed":
logger.debug(f"[ChatgptService] Response output_text: {response.output_text[:200]}..." if len(response.output_text) > 200 else f"[ChatgptService] Response output_text: {response.output_text}")
structured_output = response.output_parsed
return structured_output #.model_dump() or {}
# 에러 상태 처리
if response.status == "failed":
error_code = getattr(response.error, 'code', None) if response.error else None
error_message = getattr(response.error, 'message', None) if response.error else None
logger.warning(f"[ChatgptService] Response failed (attempt {attempt + 1}/{self.max_retries + 1}): code={error_code}, message={error_message}")
last_error = ChatGPTResponseError(response.status, error_code, error_message)
elif response.status == "incomplete":
reason = getattr(response.incomplete_details, 'reason', None) if response.incomplete_details else None
logger.warning(f"[ChatgptService] Response incomplete (attempt {attempt + 1}/{self.max_retries + 1}): reason={reason}")
last_error = ChatGPTResponseError(response.status, reason, f"Response incomplete: {reason}")
else:
# cancelled, queued, in_progress 등 예상치 못한 상태
logger.warning(f"[ChatgptService] Unexpected response status (attempt {attempt + 1}/{self.max_retries + 1}): {response.status}")
last_error = ChatGPTResponseError(response.status, None, f"Unexpected status: {response.status}")
# 마지막 시도가 아니면 재시도
if attempt < self.max_retries:
logger.info(f"[ChatgptService] Retrying request...")
# 모든 재시도 실패
logger.error(f"[ChatgptService] All retries exhausted. Last error: {last_error}")
raise last_error
async def generate_structured_output(
self,
prompt : Prompt,
input_data : dict,
) -> str:
prompt_text = prompt.build_prompt(input_data)
logger.debug(f"[ChatgptService] Generated Prompt (length: {len(prompt_text)})")
logger.info(f"[ChatgptService] Starting GPT request with structured output with model: {prompt.prompt_model}")
# GPT API 호출
#response = await self._call_structured_output_with_response_gpt_api(prompt_text, prompt.prompt_output, prompt.prompt_model)
response = await self._call_pydantic_output(prompt_text, prompt.prompt_output_class, prompt.prompt_model)
return response