o2o-castad-backend/app/utils/prompts/chatgpt_prompt.py

191 lines
9.4 KiB
Python

import json
import re
from pydantic import BaseModel
from typing import List, Optional
from openai import AsyncOpenAI
from app.utils.logger import get_logger
from config import apikey_settings, recovery_settings
from app.utils.prompts.prompts import Prompt
# 로거 설정
logger = get_logger("chatgpt")
class ChatGPTResponseError(Exception):
"""ChatGPT API 응답 에러"""
def __init__(self, status: str, error_code: str = None, error_message: str = None):
self.status = status
self.error_code = error_code
self.error_message = error_message
super().__init__(f"ChatGPT response failed: status={status}, code={error_code}, message={error_message}")
class ChatgptService:
"""ChatGPT API 서비스 클래스
"""
model_type : str
def __init__(self, model_type:str = "gpt", timeout: float = None):
self.timeout = timeout or recovery_settings.CHATGPT_TIMEOUT
self.max_retries = recovery_settings.CHATGPT_MAX_RETRIES
self.model_type = model_type
match model_type:
case "gpt":
self.client = AsyncOpenAI(
api_key=apikey_settings.CHATGPT_API_KEY,
timeout=self.timeout
)
case "gemini":
self.client = AsyncOpenAI(
api_key=apikey_settings.GEMINI_API_KEY,
base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
timeout=self.timeout
)
case _:
raise NotImplementedError(f"Unknown Provider : {model_type}")
async def _call_pydantic_output(
self,
prompt : str,
output_format : BaseModel, #입력 output_format의 경우 Pydantic BaseModel Class를 상속한 Class 자체임에 유의할 것
model : str,
img_url : str,
image_detail_high : bool) -> BaseModel:
content = []
if img_url:
content.append({
"type" : "input_image",
"image_url" : img_url,
"detail": "high" if image_detail_high else "low"
})
content.append({
"type": "input_text",
"text": prompt}
)
last_error = None
for attempt in range(self.max_retries + 1):
response = await self.client.responses.parse(
model=model,
input=[{"role": "user", "content": content}],
text_format=output_format
)
# Response 디버그 로깅
logger.debug(f"[ChatgptService({self.model_type})] attempt: {attempt}")
logger.debug(f"[ChatgptService({self.model_type})] Response ID: {response.id}")
logger.debug(f"[ChatgptService({self.model_type})] Response status: {response.status}")
logger.debug(f"[ChatgptService({self.model_type})] Response model: {response.model}")
# status 확인: completed, failed, incomplete, cancelled, queued, in_progress
if response.status == "completed":
logger.debug(f"[ChatgptService({self.model_type})] Response output_text: {response.output_text[:200]}..." if len(response.output_text) > 200 else f"[ChatgptService] Response output_text: {response.output_text}")
structured_output = response.output_parsed
return structured_output #.model_dump() or {}
# 에러 상태 처리
if response.status == "failed":
error_code = getattr(response.error, 'code', None) if response.error else None
error_message = getattr(response.error, 'message', None) if response.error else None
logger.warning(f"[ChatgptService({self.model_type})] Response failed (attempt {attempt + 1}/{self.max_retries + 1}): code={error_code}, message={error_message}")
last_error = ChatGPTResponseError(response.status, error_code, error_message)
elif response.status == "incomplete":
reason = getattr(response.incomplete_details, 'reason', None) if response.incomplete_details else None
logger.warning(f"[ChatgptService({self.model_type})] Response incomplete (attempt {attempt + 1}/{self.max_retries + 1}): reason={reason}")
last_error = ChatGPTResponseError(response.status, reason, f"Response incomplete: {reason}")
else:
# cancelled, queued, in_progress 등 예상치 못한 상태
logger.warning(f"[ChatgptService({self.model_type})] Unexpected response status (attempt {attempt + 1}/{self.max_retries + 1}): {response.status}")
last_error = ChatGPTResponseError(response.status, None, f"Unexpected status: {response.status}")
# 마지막 시도가 아니면 재시도
if attempt < self.max_retries:
logger.info(f"[ChatgptService({self.model_type})] Retrying request...")
# 모든 재시도 실패
logger.error(f"[ChatgptService({self.model_type})] All retries exhausted. Last error: {last_error}")
raise last_error
async def _call_pydantic_output_chat_completion( # alter version
self,
prompt : str,
output_format : BaseModel, #입력 output_format의 경우 Pydantic BaseModel Class를 상속한 Class 자체임에 유의할 것
model : str,
img_url : str,
image_detail_high : bool) -> BaseModel:
content = []
if img_url:
content.append({
"type": "image_url",
"image_url": {
"url": img_url,
"detail": "high" if image_detail_high else "low"
}
})
content.append({
"type": "text",
"text": prompt
})
last_error = None
for attempt in range(self.max_retries + 1):
response = await self.client.beta.chat.completions.parse(
model=model,
messages=[{"role": "user", "content": content}],
response_format=output_format
)
# Response 디버그 로깅
logger.debug(f"[ChatgptService({self.model_type})] attempt: {attempt}")
logger.debug(f"[ChatgptService({self.model_type})] Response ID: {response.id}")
logger.debug(f"[ChatgptService({self.model_type})] Response finish_reason: {response.id}")
logger.debug(f"[ChatgptService({self.model_type})] Response model: {response.model}")
choice = response.choices[0]
finish_reason = choice.finish_reason
if finish_reason == "stop":
output_text = choice.message.content or ""
logger.debug(f"[ChatgptService({self.model_type})] Response output_text: {output_text[:200]}..." if len(output_text) > 200 else f"[ChatgptService] Response output_text: {output_text}")
return choice.message.parsed
elif finish_reason == "length":
logger.warning(f"[ChatgptService({self.model_type})] Response incomplete - token limit reached (attempt {attempt + 1}/{self.max_retries + 1})")
last_error = ChatGPTResponseError("incomplete", finish_reason, "Response incomplete: max tokens reached")
elif finish_reason == "content_filter":
logger.warning(f"[ChatgptService({self.model_type})] Response blocked by content filter (attempt {attempt + 1}/{self.max_retries + 1})")
last_error = ChatGPTResponseError("failed", finish_reason, "Response blocked by content filter")
else:
logger.warning(f"[ChatgptService({self.model_type})] Unexpected finish_reason (attempt {attempt + 1}/{self.max_retries + 1}): {finish_reason}")
last_error = ChatGPTResponseError("failed", finish_reason, f"Unexpected finish_reason: {finish_reason}")
# 마지막 시도가 아니면 재시도
if attempt < self.max_retries:
logger.info(f"[ChatgptService({self.model_type})] Retrying request...")
# 모든 재시도 실패
logger.error(f"[ChatgptService({self.model_type})] All retries exhausted. Last error: {last_error}")
raise last_error
async def generate_structured_output(
self,
prompt : Prompt,
input_data : dict,
img_url : Optional[str] = None,
img_detail_high : bool = False,
silent : bool = False
) -> BaseModel:
prompt_text = prompt.build_prompt(input_data, silent)
logger.debug(f"[ChatgptService({self.model_type})] Generated Prompt (length: {len(prompt_text)})")
if not silent:
logger.info(f"[ChatgptService({self.model_type})] Starting GPT request with structured output with model: {prompt.prompt_model}")
# GPT API 호출
#parsed = await self._call_structured_output_with_response_gpt_api(prompt_text, prompt.prompt_output, prompt.prompt_model)
# parsed = await self._call_pydantic_output(prompt_text, prompt.prompt_output_class, prompt.prompt_model, img_url, img_detail_high)
parsed = await self._call_pydantic_output_chat_completion(prompt_text, prompt.prompt_output_class, prompt.prompt_model, img_url, img_detail_high)
return parsed