merge main

feature-youtube-upload
jaehwang 2026-01-28 16:59:24 +09:00
commit df3bfda594
14 changed files with 2814 additions and 93 deletions

View File

@ -24,7 +24,7 @@ from app.home.schemas.home_schema import (
ProcessedInfo,
)
from app.utils.upload_blob_as_request import AzureBlobUploader
from app.utils.chatgpt_prompt import ChatgptService
from app.utils.chatgpt_prompt import ChatgptService, ChatGPTResponseError
from app.utils.common import generate_task_id
from app.utils.logger import get_logger
from app.utils.nvMapScraper import NvMapScraper, GraphQLException
@ -64,7 +64,8 @@ KOREAN_CITIES = [
]
# fmt: on
router = APIRouter(tags=["Home"])
# router = APIRouter(tags=["Home"])
router = APIRouter()
def _extract_region_from_address(road_address: str | None) -> str:
@ -291,6 +292,15 @@ async def _crawling_logic(url:str):
f"[crawling] Step 3 완료 - 마케팅 분석 성공 ({step3_elapsed:.1f}ms)"
)
except ChatGPTResponseError as e:
step3_elapsed = (time.perf_counter() - step3_start) * 1000
logger.error(
f"[crawling] Step 3 FAILED - ChatGPT Error: status={e.status}, "
f"code={e.error_code}, message={e.error_message} ({step3_elapsed:.1f}ms)"
)
marketing_analysis = None
gpt_status = "failed"
except Exception as e:
step3_elapsed = (time.perf_counter() - step3_start) * 1000
logger.error(
@ -299,6 +309,7 @@ async def _crawling_logic(url:str):
logger.exception("[crawling] Step 3 상세 오류:")
# GPT 실패 시에도 크롤링 결과는 반환
marketing_analysis = None
gpt_status = "failed"
else:
step2_elapsed = (time.perf_counter() - step2_start) * 1000
logger.warning(
@ -318,6 +329,7 @@ async def _crawling_logic(url:str):
logger.info(f"[crawling] - GPT API 호출: {step3_3_elapsed:.1f}ms")
return {
"status": gpt_status if 'gpt_status' in locals() else "completed",
"image_list": scraper.image_link_list,
"image_count": len(scraper.image_link_list) if scraper.image_link_list else 0,
"processed_info": processed_info,

View File

@ -167,13 +167,37 @@ class MarketingAnalysis(BaseModel):
class CrawlingResponse(BaseModel):
"""크롤링 응답 스키마"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"status": "completed",
"image_list": ["https://example.com/image1.jpg", "https://example.com/image2.jpg"],
"image_count": 2,
"processed_info": {
"customer_name": "스테이 머뭄",
"region": "군산",
"detail_region_info": "전북특별자치도 군산시 절골길 18"
},
"marketing_analysis": {
"report": "마케팅 분석 리포트...",
"tags": ["힐링", "감성숙소"],
"facilities": ["조식", "주차"]
}
}
}
)
status: str = Field(
default="completed",
description="처리 상태 (completed: 성공, failed: ChatGPT 분석 실패)"
)
image_list: Optional[list[str]] = Field(None, description="이미지 URL 목록")
image_count: int = Field(..., description="이미지 개수")
processed_info: Optional[ProcessedInfo] = Field(
None, description="가공된 장소 정보 (customer_name, region, detail_region_info)"
)
marketing_analysis: Optional[MarketingAnalysis] = Field(
None, description="마케팅 분석 결과 (report, tags, facilities)"
None, description="마케팅 분석 결과 (report, tags, facilities). 실패 시 null"
)

View File

@ -108,15 +108,33 @@ class LyricStatusResponse(BaseModel):
Usage:
GET /lyric/status/{task_id}
Returns the current processing status of a lyric generation task.
Status Values:
- processing: 가사 생성 진행
- completed: 가사 생성 완료
- failed: ChatGPT API 오류 또는 생성 실패
"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"task_id": "0694b716-dbff-7219-8000-d08cb5fce431",
"status": "completed",
"message": "가사 생성이 완료되었습니다.",
}
"examples": [
{
"summary": "성공",
"value": {
"task_id": "0694b716-dbff-7219-8000-d08cb5fce431",
"status": "completed",
"message": "가사 생성이 완료되었습니다.",
}
},
{
"summary": "실패",
"value": {
"task_id": "0694b716-dbff-7219-8000-d08cb5fce431",
"status": "failed",
"message": "가사 생성에 실패했습니다.",
}
}
]
}
)
@ -131,26 +149,46 @@ class LyricDetailResponse(BaseModel):
Usage:
GET /lyric/{task_id}
Returns the generated lyric content for a specific task.
Note:
- status가 "failed" 경우 lyric_result에 에러 메시지가 저장됩니다.
- 에러 메시지 형식: "ChatGPT Error: {message}" 또는 "Error: {message}"
"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"id": 1,
"task_id": "0694b716-dbff-7219-8000-d08cb5fce431",
"project_id": 1,
"status": "completed",
"lyric_result": "인스타 감성의 스테이 머뭄\n군산 신흥동 말랭이 마을에서\n여유로운 하루를 보내며\n추억을 만들어가요",
"created_at": "2024-01-15T12:00:00",
}
"examples": [
{
"summary": "성공",
"value": {
"id": 1,
"task_id": "0694b716-dbff-7219-8000-d08cb5fce431",
"project_id": 1,
"status": "completed",
"lyric_result": "인스타 감성의 스테이 머뭄\n군산 신흥동 말랭이 마을에서\n여유로운 하루를 보내며\n추억을 만들어가요",
"created_at": "2024-01-15T12:00:00",
}
},
{
"summary": "실패",
"value": {
"id": 1,
"task_id": "0694b716-dbff-7219-8000-d08cb5fce431",
"project_id": 1,
"status": "failed",
"lyric_result": "ChatGPT Error: Response incomplete: max_output_tokens",
"created_at": "2024-01-15T12:00:00",
}
}
]
}
)
id: int = Field(..., description="가사 ID")
task_id: str = Field(..., description="작업 고유 식별자")
project_id: int = Field(..., description="프로젝트 ID")
status: str = Field(..., description="처리 상태")
lyric_result: Optional[str] = Field(None, description="생성된 가사")
status: str = Field(..., description="처리 상태 (processing, completed, failed)")
lyric_result: Optional[str] = Field(None, description="생성된 가사 또는 에러 메시지 (실패 시)")
created_at: Optional[datetime] = Field(None, description="생성 일시")

View File

@ -11,7 +11,7 @@ from sqlalchemy.exc import SQLAlchemyError
from app.database.session import BackgroundSessionLocal
from app.lyric.models import Lyric
from app.utils.chatgpt_prompt import ChatgptService
from app.utils.chatgpt_prompt import ChatgptService, ChatGPTResponseError
from app.utils.prompts.prompts import Prompt
from app.utils.logger import get_logger
@ -130,6 +130,14 @@ async def generate_lyric_background(
logger.debug(f"[generate_lyric_background] - Step 2 (GPT API 호출): {step2_elapsed:.1f}ms")
logger.debug(f"[generate_lyric_background] - Step 3 (DB 업데이트): {step3_elapsed:.1f}ms")
except ChatGPTResponseError as e:
elapsed = (time.perf_counter() - task_start) * 1000
logger.error(
f"[generate_lyric_background] ChatGPT ERROR - task_id: {task_id}, "
f"status: {e.status}, code: {e.error_code}, message: {e.error_message} ({elapsed:.1f}ms)"
)
await _update_lyric_status(task_id, "failed", f"ChatGPT Error: {e.error_message}")
except SQLAlchemyError as e:
elapsed = (time.perf_counter() - task_start) * 1000
logger.error(f"[generate_lyric_background] DB ERROR - task_id: {task_id}, error: {e} ({elapsed:.1f}ms)", exc_info=True)

View File

@ -9,6 +9,7 @@ from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from config import prj_settings
@ -276,10 +277,44 @@ class AuthService:
thumbnail_image_url=profile.thumbnail_image_url if profile else None,
)
session.add(new_user)
await session.flush()
await session.refresh(new_user)
logger.info(f"[AUTH] 신규 사용자 생성 완료 - user_id: {new_user.id}, is_new_user: True")
return new_user, True
try:
await session.flush()
await session.refresh(new_user)
logger.info(f"[AUTH] 신규 사용자 생성 완료 - user_id: {new_user.id}, is_new_user: True")
return new_user, True
except IntegrityError:
# 동시 요청으로 인한 중복 삽입 시도 - 기존 사용자 조회
logger.warning(
f"[AUTH] IntegrityError 발생 (동시 요청 추정) - kakao_id: {kakao_id}, "
"기존 사용자 재조회 시도"
)
await session.rollback()
result = await session.execute(
select(User).where(User.kakao_id == kakao_id)
)
existing_user = result.scalar_one_or_none()
if existing_user is not None:
logger.info(
f"[AUTH] 기존 사용자 재조회 성공 - user_id: {existing_user.id}, "
"is_new_user: False"
)
# 프로필 정보 업데이트
if profile:
existing_user.nickname = profile.nickname
existing_user.profile_image_url = profile.profile_image_url
existing_user.thumbnail_image_url = profile.thumbnail_image_url
if kakao_account and kakao_account.email:
existing_user.email = kakao_account.email
await session.flush()
return existing_user, False
# 재조회에도 실패한 경우 (매우 드문 경우)
logger.error(
f"[AUTH] IntegrityError 후 재조회 실패 - kakao_id: {kakao_id}"
)
raise
async def _save_refresh_token(
self,

View File

@ -4,45 +4,80 @@ from pydantic import BaseModel
from openai import AsyncOpenAI
from app.utils.logger import get_logger
from config import apikey_settings
from config import apikey_settings, recovery_settings
from app.utils.prompts.prompts import Prompt
# 로거 설정
logger = get_logger("chatgpt")
# fmt: on
class ChatGPTResponseError(Exception):
"""ChatGPT API 응답 에러"""
def __init__(self, status: str, error_code: str = None, error_message: str = None):
self.status = status
self.error_code = error_code
self.error_message = error_message
super().__init__(f"ChatGPT response failed: status={status}, code={error_code}, message={error_message}")
class ChatgptService:
"""ChatGPT API 서비스 클래스
GPT 5.0 모델을 사용하여 마케팅 가사 분석을 생성합니다.
"""
def __init__(self):
self.client = AsyncOpenAI(api_key=apikey_settings.CHATGPT_API_KEY)
async def _call_structured_output_with_response_gpt_api(self, prompt: str, output_format : dict, model:str) -> dict:
content = [{"type": "input_text", "text": prompt}]
response = await self.client.responses.create(
model=model,
input=[{"role": "user", "content": content}],
text = output_format
def __init__(self, timeout: float = None):
self.timeout = timeout or recovery_settings.CHATGPT_TIMEOUT
self.max_retries = recovery_settings.CHATGPT_MAX_RETRIES
self.client = AsyncOpenAI(
api_key=apikey_settings.CHATGPT_API_KEY,
timeout=self.timeout
)
structured_output = json.loads(response.output_text)
return structured_output or {}
async def _call_pydantic_output(self, prompt : str, output_format : BaseModel, model : str) -> BaseModel: # 입력 output_format의 경우 Pydantic BaseModel Class를 상속한 Class 자체임에 유의할 것
content = [{"type": "input_text", "text": prompt}]
response = await self.client.responses.parse(
model=model,
input=[{"role": "user", "content": content}],
text_format=output_format
)
structured_output = response.output_parsed
return structured_output.model_dump() or {}
last_error = None
for attempt in range(self.max_retries + 1):
response = await self.client.responses.parse(
model=model,
input=[{"role": "user", "content": content}],
text_format=output_format
)
# Response 디버그 로깅
logger.debug(f"[ChatgptService] Response ID: {response.id}")
logger.debug(f"[ChatgptService] Response status: {response.status}")
logger.debug(f"[ChatgptService] Response model: {response.model}")
# status 확인: completed, failed, incomplete, cancelled, queued, in_progress
if response.status == "completed":
logger.debug(f"[ChatgptService] Response output_text: {response.output_text[:200]}..." if len(response.output_text) > 200 else f"[ChatgptService] Response output_text: {response.output_text}")
structured_output = response.output_parsed
return structured_output.model_dump() or {}
# 에러 상태 처리
if response.status == "failed":
error_code = getattr(response.error, 'code', None) if response.error else None
error_message = getattr(response.error, 'message', None) if response.error else None
logger.warning(f"[ChatgptService] Response failed (attempt {attempt + 1}/{self.max_retries + 1}): code={error_code}, message={error_message}")
last_error = ChatGPTResponseError(response.status, error_code, error_message)
elif response.status == "incomplete":
reason = getattr(response.incomplete_details, 'reason', None) if response.incomplete_details else None
logger.warning(f"[ChatgptService] Response incomplete (attempt {attempt + 1}/{self.max_retries + 1}): reason={reason}")
last_error = ChatGPTResponseError(response.status, reason, f"Response incomplete: {reason}")
else:
# cancelled, queued, in_progress 등 예상치 못한 상태
logger.warning(f"[ChatgptService] Unexpected response status (attempt {attempt + 1}/{self.max_retries + 1}): {response.status}")
last_error = ChatGPTResponseError(response.status, None, f"Unexpected status: {response.status}")
# 마지막 시도가 아니면 재시도
if attempt < self.max_retries:
logger.info(f"[ChatgptService] Retrying request...")
# 모든 재시도 실패
logger.error(f"[ChatgptService] All retries exhausted. Last error: {last_error}")
raise last_error
async def generate_structured_output(
self,
prompt : Prompt,

View File

@ -36,12 +36,29 @@ from typing import Literal
import httpx
from app.utils.logger import get_logger
from config import apikey_settings, creatomate_settings
from config import apikey_settings, creatomate_settings, recovery_settings
# 로거 설정
logger = get_logger("creatomate")
class CreatomateResponseError(Exception):
"""Creatomate API 응답 오류 시 발생하는 예외
Creatomate API 렌더링 실패 또는 비정상 응답 사용됩니다.
재시도 로직에서 예외를 catch하여 재시도를 수행합니다.
Attributes:
message: 에러 메시지
original_response: 원본 API 응답 (있는 경우)
"""
def __init__(self, message: str, original_response: dict | None = None):
self.message = message
self.original_response = original_response
super().__init__(self.message)
# Orientation 타입 정의
OrientationType = Literal["horizontal", "vertical"]
@ -135,7 +152,10 @@ async def get_shared_client() -> httpx.AsyncClient:
global _shared_client
if _shared_client is None or _shared_client.is_closed:
_shared_client = httpx.AsyncClient(
timeout=httpx.Timeout(60.0, connect=10.0),
timeout=httpx.Timeout(
recovery_settings.CREATOMATE_RENDER_TIMEOUT,
connect=recovery_settings.CREATOMATE_CONNECT_TIMEOUT,
),
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
)
return _shared_client
@ -217,7 +237,7 @@ class CreatomateService:
self,
method: str,
url: str,
timeout: float = 30.0,
timeout: float | None = None,
**kwargs,
) -> httpx.Response:
"""HTTP 요청을 수행합니다.
@ -225,7 +245,7 @@ class CreatomateService:
Args:
method: HTTP 메서드 ("GET", "POST", etc.)
url: 요청 URL
timeout: 요청 타임아웃 ()
timeout: 요청 타임아웃 (). None이면 기본값 사용
**kwargs: httpx 요청에 전달할 추가 인자
Returns:
@ -236,15 +256,18 @@ class CreatomateService:
"""
logger.info(f"[Creatomate] {method} {url}")
# timeout이 None이면 기본 타임아웃 사용
actual_timeout = timeout if timeout is not None else recovery_settings.CREATOMATE_DEFAULT_TIMEOUT
client = await get_shared_client()
if method.upper() == "GET":
response = await client.get(
url, headers=self.headers, timeout=timeout, **kwargs
url, headers=self.headers, timeout=actual_timeout, **kwargs
)
elif method.upper() == "POST":
response = await client.post(
url, headers=self.headers, timeout=timeout, **kwargs
url, headers=self.headers, timeout=actual_timeout, **kwargs
)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
@ -255,7 +278,7 @@ class CreatomateService:
async def get_all_templates_data(self) -> dict:
"""모든 템플릿 정보를 조회합니다."""
url = f"{self.BASE_URL}/v1/templates"
response = await self._request("GET", url, timeout=30.0)
response = await self._request("GET", url) # 기본 타임아웃 사용
response.raise_for_status()
return response.json()
@ -288,7 +311,7 @@ class CreatomateService:
# API 호출
url = f"{self.BASE_URL}/v1/templates/{template_id}"
response = await self._request("GET", url, timeout=30.0)
response = await self._request("GET", url) # 기본 타임아웃 사용
response.raise_for_status()
data = response.json()
@ -433,30 +456,147 @@ class CreatomateService:
async def make_creatomate_call(
self, template_id: str, modifications: dict
) -> dict:
"""Creatomate에 렌더링 요청을 보냅니다.
"""Creatomate에 렌더링 요청을 보냅니다 (재시도 로직 포함).
Args:
template_id: Creatomate 템플릿 ID
modifications: 수정사항 딕셔너리
Returns:
Creatomate API 응답 데이터
Raises:
CreatomateResponseError: API 오류 또는 재시도 실패
Note:
response에 요청 정보가 있으니 폴링 필요
"""
url = f"{self.BASE_URL}/v2/renders"
data = {
payload = {
"template_id": template_id,
"modifications": modifications,
}
response = await self._request("POST", url, timeout=60.0, json=data)
response.raise_for_status()
return response.json()
last_error: Exception | None = None
for attempt in range(recovery_settings.CREATOMATE_MAX_RETRIES + 1):
try:
response = await self._request(
"POST",
url,
timeout=recovery_settings.CREATOMATE_RENDER_TIMEOUT,
json=payload,
)
if response.status_code == 200 or response.status_code == 201:
return response.json()
# 재시도 불가능한 오류 (4xx 클라이언트 오류)
if 400 <= response.status_code < 500:
raise CreatomateResponseError(
f"Client error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text},
)
# 재시도 가능한 오류 (5xx 서버 오류)
last_error = CreatomateResponseError(
f"Server error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text},
)
except httpx.TimeoutException as e:
logger.warning(
f"[Creatomate] Timeout on attempt {attempt + 1}/{recovery_settings.CREATOMATE_MAX_RETRIES + 1}"
)
last_error = e
except httpx.HTTPError as e:
logger.warning(f"[Creatomate] HTTP error on attempt {attempt + 1}: {e}")
last_error = e
except CreatomateResponseError:
raise # CreatomateResponseError는 재시도하지 않고 즉시 전파
# 마지막 시도가 아니면 재시도
if attempt < recovery_settings.CREATOMATE_MAX_RETRIES:
logger.info(
f"[Creatomate] Retrying... ({attempt + 1}/{recovery_settings.CREATOMATE_MAX_RETRIES})"
)
# 모든 재시도 실패
raise CreatomateResponseError(
f"All {recovery_settings.CREATOMATE_MAX_RETRIES + 1} attempts failed",
original_response={"last_error": str(last_error)},
)
async def make_creatomate_custom_call(self, source: dict) -> dict:
"""템플릿 없이 Creatomate에 커스텀 렌더링 요청을 보냅니다.
"""템플릿 없이 Creatomate에 커스텀 렌더링 요청을 보냅니다 (재시도 로직 포함).
Args:
source: 렌더링 소스 딕셔너리
Returns:
Creatomate API 응답 데이터
Raises:
CreatomateResponseError: API 오류 또는 재시도 실패
Note:
response에 요청 정보가 있으니 폴링 필요
"""
url = f"{self.BASE_URL}/v2/renders"
response = await self._request("POST", url, timeout=60.0, json=source)
response.raise_for_status()
return response.json()
last_error: Exception | None = None
for attempt in range(recovery_settings.CREATOMATE_MAX_RETRIES + 1):
try:
response = await self._request(
"POST",
url,
timeout=recovery_settings.CREATOMATE_RENDER_TIMEOUT,
json=source,
)
if response.status_code == 200 or response.status_code == 201:
return response.json()
# 재시도 불가능한 오류 (4xx 클라이언트 오류)
if 400 <= response.status_code < 500:
raise CreatomateResponseError(
f"Client error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text},
)
# 재시도 가능한 오류 (5xx 서버 오류)
last_error = CreatomateResponseError(
f"Server error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text},
)
except httpx.TimeoutException as e:
logger.warning(
f"[Creatomate] Timeout on attempt {attempt + 1}/{recovery_settings.CREATOMATE_MAX_RETRIES + 1}"
)
last_error = e
except httpx.HTTPError as e:
logger.warning(f"[Creatomate] HTTP error on attempt {attempt + 1}: {e}")
last_error = e
except CreatomateResponseError:
raise # CreatomateResponseError는 재시도하지 않고 즉시 전파
# 마지막 시도가 아니면 재시도
if attempt < recovery_settings.CREATOMATE_MAX_RETRIES:
logger.info(
f"[Creatomate] Retrying... ({attempt + 1}/{recovery_settings.CREATOMATE_MAX_RETRIES})"
)
# 모든 재시도 실패
raise CreatomateResponseError(
f"All {recovery_settings.CREATOMATE_MAX_RETRIES + 1} attempts failed",
original_response={"last_error": str(last_error)},
)
# 하위 호환성을 위한 별칭 (deprecated)
async def make_creatomate_custom_call_async(self, source: dict) -> dict:
@ -485,7 +625,7 @@ class CreatomateService:
- failed: 실패
"""
url = f"{self.BASE_URL}/v1/renders/{render_id}"
response = await self._request("GET", url, timeout=30.0)
response = await self._request("GET", url) # 기본 타임아웃 사용
response.raise_for_status()
return response.json()

View File

@ -59,8 +59,28 @@ from typing import Any, List, Optional
import httpx
from config import apikey_settings
from app.song.schemas.song_schema import PollingSongResponse, SongClipData
from app.utils.logger import get_logger
from config import apikey_settings, recovery_settings
logger = get_logger("suno")
class SunoResponseError(Exception):
"""Suno API 응답 오류 시 발생하는 예외
Suno API 거부 응답 또는 비정상 응답 사용됩니다.
재시도 로직에서 예외를 catch하여 재시도를 수행합니다.
Attributes:
message: 에러 메시지
original_response: 원본 API 응답 (있는 경우)
"""
def __init__(self, message: str, original_response: dict | None = None):
self.message = message
self.original_response = original_response
super().__init__(self.message)
class SunoService:
@ -122,34 +142,74 @@ class SunoService:
if genre:
payload["style"] = genre
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/generate",
headers=self.headers,
json=payload,
timeout=30.0,
)
response.raise_for_status()
data = response.json()
last_error: Exception | None = None
# 응답: {"code": 200, "msg": "success", "data": {"taskId": "..."}}
# API 응답 검증
if data is None:
raise ValueError("Suno API returned empty response")
for attempt in range(recovery_settings.SUNO_MAX_RETRIES + 1):
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/generate",
headers=self.headers,
json=payload,
timeout=recovery_settings.SUNO_DEFAULT_TIMEOUT,
)
if data.get("code") != 200:
error_msg = data.get("msg", "Unknown error")
raise ValueError(f"Suno API error: {error_msg}")
if response.status_code == 200:
data = response.json()
response_data = data.get("data")
if response_data is None:
raise ValueError(f"Suno API response missing 'data' field: {data}")
# API 응답 검증
if data is None:
raise SunoResponseError("Suno API returned empty response")
task_id = response_data.get("taskId")
if task_id is None:
raise ValueError(f"Suno API response missing 'taskId': {response_data}")
if data.get("code") != 200:
error_msg = data.get("msg", "Unknown error")
raise SunoResponseError(f"Suno API error: {error_msg}", original_response=data)
return task_id
response_data = data.get("data")
if response_data is None:
raise SunoResponseError(f"Suno API response missing 'data' field", original_response=data)
task_id = response_data.get("taskId")
if task_id is None:
raise SunoResponseError(f"Suno API response missing 'taskId'", original_response=response_data)
return task_id
# 재시도 불가능한 오류 (4xx 클라이언트 오류)
if 400 <= response.status_code < 500:
raise SunoResponseError(
f"Client error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text},
)
# 재시도 가능한 오류 (5xx 서버 오류)
last_error = SunoResponseError(
f"Server error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text},
)
except httpx.TimeoutException as e:
logger.warning(
f"[Suno] Timeout on attempt {attempt + 1}/{recovery_settings.SUNO_MAX_RETRIES + 1}"
)
last_error = e
except httpx.HTTPError as e:
logger.warning(f"[Suno] HTTP error on attempt {attempt + 1}: {e}")
last_error = e
except SunoResponseError:
raise # SunoResponseError는 재시도하지 않고 즉시 전파
# 마지막 시도가 아니면 재시도
if attempt < recovery_settings.SUNO_MAX_RETRIES:
logger.info(f"[Suno] Retrying... ({attempt + 1}/{recovery_settings.SUNO_MAX_RETRIES})")
# 모든 재시도 실패
raise SunoResponseError(
f"All {recovery_settings.SUNO_MAX_RETRIES + 1} attempts failed",
original_response={"last_error": str(last_error)},
)
async def get_task_status(self, task_id: str) -> dict[str, Any]:
"""
@ -170,7 +230,7 @@ class SunoService:
f"{self.BASE_URL}/generate/record-info",
headers=self.headers,
params={"taskId": task_id},
timeout=30.0,
timeout=recovery_settings.SUNO_DEFAULT_TIMEOUT,
)
response.raise_for_status()
data = response.json()
@ -198,7 +258,7 @@ class SunoService:
f"{self.BASE_URL}/generate/get-timestamped-lyrics",
headers=self.headers,
json=payload,
timeout=120.0,
timeout=recovery_settings.SUNO_LYRIC_TIMEOUT,
)
response.raise_for_status()
data = response.json()

View File

@ -156,6 +156,63 @@ class PromptSettings(BaseSettings):
model_config = _base_config
class RecoverySettings(BaseSettings):
"""외부 API 복구 및 타임아웃 설정
ChatGPT, Suno, Creatomate API의 타임아웃 재시도 설정을 관리합니다.
"""
# ============================================================
# ChatGPT API 설정
# ============================================================
CHATGPT_TIMEOUT: float = Field(
default=600.0,
description="ChatGPT API 타임아웃 (초). OpenAI Python SDK 기본값: 600초 (10분)",
)
CHATGPT_MAX_RETRIES: int = Field(
default=1,
description="ChatGPT API 응답 실패 시 최대 재시도 횟수",
)
# ============================================================
# Suno API 설정
# ============================================================
SUNO_DEFAULT_TIMEOUT: float = Field(
default=30.0,
description="Suno API 기본 요청 타임아웃 (초) - 음악 생성 요청, 상태 조회 등",
)
SUNO_LYRIC_TIMEOUT: float = Field(
default=120.0,
description="Suno API 가사 타임스탬프 요청 타임아웃 (초) - 가사 동기화 처리에 더 긴 시간 필요",
)
SUNO_MAX_RETRIES: int = Field(
default=2,
description="Suno API 응답 실패 시 최대 재시도 횟수",
)
# ============================================================
# Creatomate API 설정
# ============================================================
CREATOMATE_DEFAULT_TIMEOUT: float = Field(
default=30.0,
description="Creatomate API 기본 요청 타임아웃 (초) - 템플릿 조회, 상태 조회 등 일반 API 호출",
)
CREATOMATE_RENDER_TIMEOUT: float = Field(
default=60.0,
description="Creatomate API 렌더링 요청 타임아웃 (초) - 영상 렌더링 요청 시 더 긴 시간 필요",
)
CREATOMATE_CONNECT_TIMEOUT: float = Field(
default=10.0,
description="Creatomate API 연결 타임아웃 (초) - 서버 연결 수립까지의 대기 시간",
)
CREATOMATE_MAX_RETRIES: int = Field(
default=2,
description="Creatomate API 응답 실패 시 최대 재시도 횟수",
)
model_config = _base_config
class KakaoSettings(BaseSettings):
"""카카오 OAuth 설정"""
@ -389,3 +446,4 @@ prompt_settings = PromptSettings()
log_settings = LogSettings()
kakao_settings = KakaoSettings()
jwt_settings = JWTSettings()
recovery_settings = RecoverySettings()

885
docs/plan/db_lock.md Normal file
View File

@ -0,0 +1,885 @@
# DB Lock 및 Upsert 패턴 가이드 (MySQL 전용)
## 목차
1. [현재 Insert 사용 현황](#1-현재-insert-사용-현황)
2. [Upsert 패턴 설계](#2-upsert-패턴-설계)
3. [DB Lock 전략](#3-db-lock-전략)
4. [데드락 방지 전략](#4-데드락-방지-전략)
5. [제안 코드](#5-제안-코드)
6. [사용 예시](#6-사용-예시)
---
## 1. 현재 Insert 사용 현황
### 1.1 테이블별 Insert 분석 및 우선순위
| 테이블 | 파일 위치 | Unique 제약 | 동시 요청 가능성 | Upsert 우선순위 | 키 조합 |
|--------|-----------|-------------|------------------|-----------------|---------|
| **SongTimestamp** | song.py:477 | - | **높음** | **1순위** | suno_audio_id + order_idx |
| **Image** | home.py:503,552,799,821 | - | **중간** | **2순위** | task_id + img_order |
| **Song** | song.py:188 | - | 중간 | 3순위 | task_id |
| **Video** | video.py:252 | - | 중간 | 4순위 | task_id |
| **User** | auth.py:278 | `kakao_id` | **낮음** | ✅ 완료 | kakao_id |
| **Project** | lyric.py:297 | - | 낮음 | 선택 | task_id |
| **Lyric** | lyric.py:317 | - | 낮음 | 선택 | task_id |
| **RefreshToken** | auth.py:315 | `token_hash` | - | 불필요 | - (항상 새로 생성) |
#### 동시 요청 가능성 분석
| 가능성 | 테이블 | 발생 시나리오 |
|--------|--------|---------------|
| **높음** | SongTimestamp | 클라이언트가 상태 조회 API를 여러 번 호출 (폴링) → 동일 데이터 중복 삽입 |
| **중간** | Image | 네트워크 오류로 업로드 재시도, 클라이언트 중복 클릭 |
| **중간** | Song/Video | 백그라운드 태스크 재실행, 상태 확인 중복 호출 |
| **낮음** | User | OAuth 인가 코드 일회성으로 동시 요청 거의 불가능 |
> **참고**: User 테이블의 경우 카카오 OAuth 인가 코드(authorization code)가 **일회성**이므로,
> 동일한 코드로 동시 요청은 불가능합니다. 다만 여러 탭에서 각각 로그인을 시작하는 극히 드문 경우에만 발생 가능합니다.
### 1.2 현재 코드 패턴의 문제점
**실제 문제가 발생하는 케이스: SongTimestamp**
```python
# song.py:467-479 - 현재 패턴
# 클라이언트가 /song/status/{song_id}를 여러 번 호출하면 중복 삽입 발생!
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
song_timestamp = SongTimestamp(
suno_audio_id=suno_audio_id,
order_idx=order_idx,
lyric_line=timestamped_lyric["text"],
start_time=timestamped_lyric["start_sec"],
end_time=timestamped_lyric["end_sec"],
)
session.add(song_timestamp) # 동일 suno_audio_id로 중복 삽입!
await session.commit()
```
**문제 시나리오:**
1. 클라이언트가 노래 생성 상태 확인을 위해 폴링
2. SUCCESS 응답을 받은 후 타임스탬프 저장 로직 실행
3. 네트워크 지연으로 클라이언트가 재요청
4. **동일한 데이터가 중복 삽입됨**
---
## 2. Upsert 패턴 설계 (MySQL 전용)
### 2.1 MySQL ON DUPLICATE KEY UPDATE (권장)
MySQL의 `INSERT ... ON DUPLICATE KEY UPDATE` 절을 사용한 원자적 Upsert:
```python
from sqlalchemy.dialects.mysql import insert as mysql_insert
stmt = mysql_insert(User).values(
kakao_id=kakao_id,
nickname=nickname,
email=email,
)
stmt = stmt.on_duplicate_key_update(
nickname=stmt.inserted.nickname, # MySQL은 stmt.inserted 사용
email=stmt.inserted.email,
updated_at=func.now(),
)
await session.execute(stmt)
# MySQL은 RETURNING 미지원 - 별도 조회 필요
result = await session.execute(select(User).where(User.kakao_id == kakao_id))
user = result.scalar_one()
```
**장점:**
- 원자적 연산 (단일 쿼리)
- 데드락 위험 최소화
- 동시 요청에서도 안전
**전제 조건:**
- Unique 인덱스 필수 (없으면 항상 INSERT만 됨)
### 2.3 비관적 잠금 (Pessimistic Locking)
`SELECT ... FOR UPDATE`를 사용한 행 수준 잠금:
```python
result = await session.execute(
select(User)
.where(User.kakao_id == kakao_id)
.with_for_update() # FOR UPDATE 잠금
)
user = result.scalar_one_or_none()
```
---
## 3. DB Lock 전략
### 3.1 잠금 유형
| 잠금 유형 | 사용 시점 | SQLAlchemy 구현 |
|-----------|-----------|-----------------|
| **공유 잠금 (Shared)** | 읽기 작업 | `.with_for_update(read=True)` |
| **배타 잠금 (Exclusive)** | 쓰기 작업 | `.with_for_update()` |
| **NOWAIT** | 즉시 실패 | `.with_for_update(nowait=True)` |
| **SKIP LOCKED** | 잠긴 행 건너뛰기 | `.with_for_update(skip_locked=True)` |
### 3.2 잠금 범위 선택
```python
# 1. 행 수준 잠금 (Row-level) - 권장
select(User).where(User.id == user_id).with_for_update()
# 2. 키 범위 잠금 (Key-range) - 범위 조회 시
select(Image).where(Image.task_id == task_id).with_for_update()
# 3. 테이블 잠금 - 피해야 함 (성능 저하)
```
### 3.3 트랜잭션 격리 수준
```python
from sqlalchemy import text
# 세션별 격리 수준 설정
await session.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))
```
---
## 4. 데드락 방지 전략
### 4.1 핵심 원칙
1. **일관된 잠금 순서**: 항상 같은 순서로 리소스 접근
2. **짧은 트랜잭션**: 잠금 유지 시간 최소화
3. **타임아웃 설정**: 무한 대기 방지
4. **재시도 로직**: 데드락 발생 시 자동 재시도
### 4.2 잠금 순서 규칙
```python
# 올바른 순서: 항상 PK 또는 정렬된 순서로 잠금
async def lock_resources_safely(session, resource_ids: list[int]):
"""리소스를 ID 순서로 정렬하여 잠금"""
sorted_ids = sorted(resource_ids) # 정렬!
for resource_id in sorted_ids:
await session.execute(
select(Resource)
.where(Resource.id == resource_id)
.with_for_update()
)
```
### 4.3 타임아웃 설정 (MySQL)
```python
# MySQL 잠금 타임아웃 설정
await session.execute(text("SET innodb_lock_wait_timeout = 5"))
```
### 4.4 재시도 로직
```python
import asyncio
from sqlalchemy.exc import OperationalError
async def execute_with_retry(
session,
operation,
max_retries: int = 3,
base_delay: float = 0.1,
):
"""지수 백오프를 사용한 재시도 로직"""
for attempt in range(max_retries):
try:
return await operation(session)
except OperationalError as e:
if "deadlock" in str(e).lower() and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
await session.rollback()
continue
raise
```
---
## 5. 제안 코드 (MySQL 전용)
### 5.1 공통 Upsert 유틸리티
`app/utils/db_utils.py` 파일 (✅ 이미 생성됨):
```python
"""
DB 유틸리티 - Upsert 및 Lock 관리 (MySQL 전용)
MySQL의 INSERT ... ON DUPLICATE KEY UPDATE를 사용한 안전한 Upsert 패턴과
데드락 방지를 위한 잠금 관리 유틸리티를 제공합니다.
"""
import asyncio
import logging
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar
from sqlalchemy import func, select
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=DeclarativeBase)
class UpsertResult:
"""Upsert 결과를 담는 클래스"""
def __init__(self, entity: Any, created: bool):
self.entity = entity
self.created = created # True: INSERT, False: UPDATE
async def upsert_by_unique_key(
session: AsyncSession,
model: Type[T],
unique_columns: List[str],
values: Dict[str, Any],
update_columns: Optional[List[str]] = None,
lock_timeout_sec: int = 5,
) -> UpsertResult:
"""
Unique 키 기반 원자적 Upsert (MySQL ON DUPLICATE KEY UPDATE 사용)
Args:
session: AsyncSession 인스턴스
model: SQLAlchemy 모델 클래스
unique_columns: Unique 제약 컬럼 목록
values: INSERT/UPDATE 값 딕셔너리
update_columns: UPDATE 시 변경할 컬럼 목록 (None이면 unique 제외 전체)
lock_timeout_sec: 잠금 타임아웃 (초)
Returns:
UpsertResult: 엔티티와 생성 여부
Example:
result = await upsert_by_unique_key(
session=session,
model=User,
unique_columns=['kakao_id'],
values={'kakao_id': 12345, 'nickname': '홍길동', 'email': 'test@test.com'},
update_columns=['nickname', 'email'],
)
if result.created:
print("새 사용자 생성됨")
else:
print("기존 사용자 업데이트됨")
"""
from sqlalchemy import text
# MySQL 잠금 타임아웃 설정
await session.execute(text(f"SET innodb_lock_wait_timeout = {lock_timeout_sec}"))
# UPDATE 컬럼 결정
if update_columns is None:
update_columns = [k for k in values.keys() if k not in unique_columns]
# MySQL INSERT ... ON DUPLICATE KEY UPDATE
stmt = mysql_insert(model).values(**values)
update_dict = {col: stmt.inserted[col] for col in update_columns}
if hasattr(model, 'updated_at'):
update_dict['updated_at'] = func.now()
stmt = stmt.on_duplicate_key_update(**update_dict)
await session.execute(stmt)
# MySQL은 RETURNING 미지원 - 별도 조회 필요
filter_conditions = {col: values[col] for col in unique_columns}
result = await session.execute(select(model).filter_by(**filter_conditions))
entity = result.scalar_one()
# created 여부 확인 (created_at == updated_at 비교)
created = True
if hasattr(entity, 'updated_at') and hasattr(entity, 'created_at'):
if entity.created_at and entity.updated_at:
created = abs((entity.updated_at - entity.created_at).total_seconds()) < 1
return UpsertResult(entity=entity, created=created)
async def get_or_create_with_lock(
session: AsyncSession,
model: Type[T],
filter_by: Dict[str, Any],
defaults: Optional[Dict[str, Any]] = None,
lock: bool = True,
nowait: bool = False,
) -> UpsertResult:
"""
SELECT FOR UPDATE를 사용한 안전한 Get or Create
동시 요청에서도 안전하게 작동하며,
행이 존재하면 잠금 후 반환, 없으면 생성합니다.
Args:
session: AsyncSession 인스턴스
model: SQLAlchemy 모델 클래스
filter_by: 조회 조건 딕셔너리
defaults: 생성 시 추가할 기본값
lock: FOR UPDATE 잠금 사용 여부
nowait: 잠금 대기 안함 (즉시 예외 발생)
Returns:
UpsertResult: 엔티티와 생성 여부
Example:
result = await get_or_create_with_lock(
session=session,
model=User,
filter_by={'kakao_id': 12345},
defaults={'nickname': '홍길동'},
)
"""
# 조회 쿼리 구성
query = select(model).filter_by(**filter_by)
if lock:
query = query.with_for_update(nowait=nowait)
result = await session.execute(query)
entity = result.scalar_one_or_none()
if entity is not None:
# 기존 엔티티 반환
return UpsertResult(entity=entity, created=False)
# 새 엔티티 생성
create_values = {**filter_by, **(defaults or {})}
entity = model(**create_values)
session.add(entity)
try:
await session.flush()
except IntegrityError:
# 동시 INSERT로 인한 충돌 - 다시 조회
await session.rollback()
result = await session.execute(select(model).filter_by(**filter_by))
entity = result.scalar_one()
return UpsertResult(entity=entity, created=False)
return UpsertResult(entity=entity, created=True)
async def bulk_upsert(
session: AsyncSession,
model: Type[T],
unique_columns: List[str],
records: List[Dict[str, Any]],
update_columns: Optional[List[str]] = None,
) -> int:
"""
대량 Upsert (MySQL ON DUPLICATE KEY UPDATE 사용)
여러 레코드를 한 번에 Upsert합니다.
데드락 방지를 위해 unique 키 기준으로 정렬 후 처리합니다.
Unique 인덱스가 반드시 존재해야 합니다.
Args:
session: AsyncSession 인스턴스
model: SQLAlchemy 모델 클래스
unique_columns: Unique 제약 컬럼 목록
records: Upsert할 레코드 딕셔너리 목록
update_columns: UPDATE 시 변경할 컬럼 목록
Returns:
int: 처리된 레코드 수
Example:
count = await bulk_upsert(
session=session,
model=SongTimestamp,
unique_columns=['suno_audio_id', 'order_idx'],
records=[
{'suno_audio_id': 'abc', 'order_idx': 0, 'lyric_line': '가사1'},
{'suno_audio_id': 'abc', 'order_idx': 1, 'lyric_line': '가사2'},
],
)
"""
if not records:
return 0
# 데드락 방지: unique 키 기준 정렬
sorted_records = sorted(
records,
key=lambda r: tuple(r.get(col, '') for col in unique_columns)
)
# UPDATE 컬럼 결정
if update_columns is None:
all_columns = set(sorted_records[0].keys())
update_columns = list(all_columns - set(unique_columns))
# MySQL INSERT ... ON DUPLICATE KEY UPDATE
stmt = mysql_insert(model).values(sorted_records)
update_dict = {col: stmt.inserted[col] for col in update_columns}
if hasattr(model, 'updated_at'):
update_dict['updated_at'] = func.now()
stmt = stmt.on_duplicate_key_update(**update_dict)
await session.execute(stmt)
return len(sorted_records)
async def execute_with_retry(
func: Callable,
max_retries: int = 3,
base_delay: float = 0.1,
retry_on: tuple = (OperationalError,),
) -> Any:
"""
지수 백오프를 사용한 재시도 래퍼
데드락이나 일시적 오류 발생 시 자동으로 재시도합니다.
Args:
func: 실행할 비동기 함수 (인자 없음)
max_retries: 최대 재시도 횟수
base_delay: 기본 대기 시간 (초)
retry_on: 재시도할 예외 타입들
Returns:
함수 실행 결과
Example:
async def do_work():
async with AsyncSessionLocal() as session:
await upsert_by_unique_key(...)
await session.commit()
result = await execute_with_retry(do_work)
"""
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except retry_on as e:
last_exception = e
error_msg = str(e).lower()
# 데드락 또는 잠금 타임아웃인 경우만 재시도
if "deadlock" in error_msg or "lock" in error_msg:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
logger.warning(
f"DB 작업 실패 (시도 {attempt + 1}/{max_retries}), "
f"{delay:.2f}초 후 재시도: {e}"
)
await asyncio.sleep(delay)
continue
raise
raise last_exception
class LockManager:
"""
분산 잠금 관리자 (MySQL 전용)
여러 리소스에 대한 잠금을 일관된 순서로 획득하여
데드락을 방지합니다.
"""
def __init__(self, session: AsyncSession, timeout_sec: int = 5):
self.session = session
self.timeout_sec = timeout_sec
self._locked_resources: List[tuple] = []
async def __aenter__(self):
from sqlalchemy import text
# MySQL 잠금 타임아웃 설정
await self.session.execute(
text(f"SET innodb_lock_wait_timeout = {self.timeout_sec}")
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 트랜잭션 종료 시 자동으로 잠금 해제됨
self._locked_resources.clear()
async def lock_rows(
self,
model: Type[T],
ids: List[Any],
id_column: str = "id",
nowait: bool = False,
) -> List[T]:
"""
여러 행을 ID 순서로 잠금
Args:
model: 모델 클래스
ids: 잠금할 ID 목록
id_column: ID 컬럼명
nowait: 잠금 대기 안함
Returns:
잠긴 엔티티 목록
"""
if not ids:
return []
# 데드락 방지: ID 정렬
sorted_ids = sorted(ids)
column = getattr(model, id_column)
query = (
select(model)
.where(column.in_(sorted_ids))
.order_by(column) # 정렬 순서 유지
.with_for_update(nowait=nowait)
)
result = await self.session.execute(query)
entities = result.scalars().all()
self._locked_resources.append((model.__tablename__, sorted_ids))
return list(entities)
async def lock_row(
self,
model: Type[T],
id_value: Any,
id_column: str = "id",
nowait: bool = False,
) -> Optional[T]:
"""
단일 행 잠금
Args:
model: 모델 클래스
id_value: 잠금할 ID
id_column: ID 컬럼명
nowait: 잠금 대기 안함
Returns:
잠긴 엔티티 또는 None
"""
entities = await self.lock_rows(model, [id_value], id_column, nowait)
return entities[0] if entities else None
```
### 5.2 모델에 Unique Constraint 추가 (권장)
테이블별 Unique 제약 추가가 필요한 경우:
```python
# app/home/models.py - Image 테이블
class Image(Base):
__tablename__ = "image"
__table_args__ = (
# task_id + img_order 조합 유니크
Index("idx_image_task_order", "task_id", "img_order", unique=True),
...
)
# app/song/models.py - SongTimestamp 테이블
class SongTimestamp(Base):
__tablename__ = "song_timestamp"
__table_args__ = (
# suno_audio_id + order_idx 조합 유니크
Index("idx_song_ts_audio_order", "suno_audio_id", "order_idx", unique=True),
...
)
```
---
## 6. 사용 예시
### 6.1 SongTimestamp Bulk Upsert (우선순위 1 - 실제 문제 발생)
**가장 먼저 적용해야 하는 케이스입니다.** 클라이언트의 상태 폴링으로 인해 동일 데이터가 중복 삽입될 수 있습니다.
```python
# app/song/api/routers/v1/song.py
from app.utils.db_utils import bulk_upsert
# 기존 코드 (문제 있음 - 폴링 시 중복 삽입)
# for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
# song_timestamp = SongTimestamp(...)
# session.add(song_timestamp)
# 개선된 코드 (Upsert로 중복 방지)
records = [
{
'suno_audio_id': suno_audio_id,
'order_idx': idx,
'lyric_line': ts['text'],
'start_time': ts['start_sec'],
'end_time': ts['end_sec'],
}
for idx, ts in enumerate(timestamped_lyrics)
]
await bulk_upsert(
session=session,
model=SongTimestamp,
unique_columns=['suno_audio_id', 'order_idx'],
records=records,
update_columns=['lyric_line', 'start_time', 'end_time'],
)
```
### 6.2 Image 중복 방지 Upsert (우선순위 2)
```python
# app/home/api/routers/v1/home.py
from app.utils.db_utils import get_or_create_with_lock
async def save_image(session: AsyncSession, task_id: str, img_url: str, img_order: int):
"""이미지 저장 (중복 방지)"""
result = await get_or_create_with_lock(
session=session,
model=Image,
filter_by={'task_id': task_id, 'img_order': img_order},
defaults={
'img_name': f"image_{img_order}",
'img_url': img_url,
},
)
if not result.created:
# 기존 이미지 URL 업데이트
result.entity.img_url = img_url
return result.entity
```
### 6.3 User IntegrityError 처리 (✅ 적용 완료)
> **참고**: User 테이블은 OAuth 인가 코드의 일회성 특성상 동시 요청 가능성이 **매우 낮습니다**.
> `kakao_id` UNIQUE 제약이 있어 중복 시 IntegrityError가 발생하므로,
> IntegrityError 처리를 추가하여 500 에러 대신 기존 사용자를 재조회합니다.
```python
# app/user/services/auth.py - 현재 적용된 코드
from sqlalchemy.exc import IntegrityError
# 신규 사용자 생성 부분
session.add(new_user)
try:
await session.flush()
await session.refresh(new_user)
return new_user, True
except IntegrityError:
# 동시 요청으로 인한 중복 삽입 시도 - 기존 사용자 조회
logger.warning(
f"[AUTH] IntegrityError 발생 (동시 요청 추정) - kakao_id: {kakao_id}"
)
await session.rollback()
result = await session.execute(
select(User).where(User.kakao_id == kakao_id)
)
existing_user = result.scalar_one_or_none()
if existing_user is not None:
# 프로필 정보 업데이트
if profile:
existing_user.nickname = profile.nickname
existing_user.profile_image_url = profile.profile_image_url
existing_user.thumbnail_image_url = profile.thumbnail_image_url
if kakao_account and kakao_account.email:
existing_user.email = kakao_account.email
await session.flush()
return existing_user, False
# 재조회에도 실패한 경우 (매우 드문 경우)
raise
```
**이점:**
- Upsert 패턴 없이도 안전하게 처리
- 기존 코드 구조 유지
- 드문 경우의 500 에러 방지
### 6.4 재시도 로직 사용
```python
from app.utils.db_utils import execute_with_retry, bulk_upsert
async def safe_timestamp_upsert(suno_audio_id: str, timestamps: list):
"""안전한 타임스탬프 Upsert (데드락 시 재시도)"""
async def do_upsert():
async with AsyncSessionLocal() as session:
records = [
{
'suno_audio_id': suno_audio_id,
'order_idx': idx,
'lyric_line': ts['text'],
'start_time': ts['start_sec'],
'end_time': ts['end_sec'],
}
for idx, ts in enumerate(timestamps)
]
count = await bulk_upsert(
session=session,
model=SongTimestamp,
unique_columns=['suno_audio_id', 'order_idx'],
records=records,
)
await session.commit()
return count
return await execute_with_retry(do_upsert, max_retries=3)
```
### 6.5 LockManager 사용
```python
from app.utils.db_utils import LockManager
async def update_multiple_resources(session: AsyncSession, project_ids: list[int]):
"""여러 프로젝트를 안전하게 업데이트"""
async with LockManager(session, timeout_sec=10) as lock:
# 프로젝트들을 ID 순서로 잠금 (데드락 방지)
projects = await lock.lock_rows(Project, project_ids)
for project in projects:
project.status = "updated"
await session.commit()
```
---
## 7. 마이그레이션 가이드
### 7.1 단계별 적용 순서 (우선순위 기반)
동시 요청 가능성이 높은 테이블부터 적용합니다:
1. **1단계**: `app/utils/db_utils.py` 파일 생성 ✅ (이미 완료)
2. **2단계**: **SongTimestamp** - Unique 인덱스 추가 + `bulk_upsert` 적용 (우선순위 1)
- 동시 요청 가능성 **높음** (폴링으로 인한 중복 삽입)
3. **3단계**: **Image** - Unique 인덱스 추가 + `get_or_create_with_lock` 적용 (우선순위 2)
- 동시 요청 가능성 **중간** (업로드 재시도)
4. **4단계**: **Song/Video** - 필요시 `get_or_create_with_lock` 적용 (우선순위 3-4)
- 동시 요청 가능성 중간 (백그라운드 태스크)
5. **5단계**: **User** - ✅ IntegrityError 처리 추가 완료
- 동시 요청 가능성 **낮음** (OAuth 인가 코드 일회성)
- `kakao_id` UNIQUE 제약 + IntegrityError 발생 시 기존 사용자 재조회 처리
> **권장**: 1~3단계까지 우선 적용하고, 나머지는 필요에 따라 적용
### 7.2 Alembic 마이그레이션 예시
```python
"""Add unique constraints for upsert support
Revision ID: xxxx
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
# SongTimestamp: suno_audio_id + order_idx unique
op.create_index(
'idx_song_ts_audio_order',
'song_timestamp',
['suno_audio_id', 'order_idx'],
unique=True
)
# Image: task_id + img_order unique (선택적)
op.create_index(
'idx_image_task_order',
'image',
['task_id', 'img_order'],
unique=True
)
def downgrade():
op.drop_index('idx_song_ts_audio_order', table_name='song_timestamp')
op.drop_index('idx_image_task_order', table_name='image')
```
---
## 8. 모니터링 및 디버깅
### 8.1 잠금 모니터링 쿼리 (MySQL)
```sql
-- 현재 잠금 상태 확인 (InnoDB)
SELECT
r.trx_id AS waiting_trx_id,
r.trx_mysql_thread_id AS waiting_thread,
r.trx_query AS waiting_query,
b.trx_id AS blocking_trx_id,
b.trx_mysql_thread_id AS blocking_thread,
b.trx_query AS blocking_query
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;
-- 현재 실행 중인 트랜잭션 확인
SELECT * FROM information_schema.innodb_trx;
-- 데드락 로그 확인
SHOW ENGINE INNODB STATUS;
-- 잠금 대기 중인 쿼리 확인
SELECT
pl.id,
pl.user,
pl.state,
pl.info AS query
FROM information_schema.processlist pl
WHERE pl.state LIKE '%lock%';
```
### 8.2 로깅 설정
```python
# config.py 또는 logging 설정
import logging
# SQLAlchemy 엔진 로깅 (디버그 시)
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
# Upsert 유틸리티 로깅
logging.getLogger('app.utils.db_utils').setLevel(logging.DEBUG)
```
---
## 9. 요약
| 상황 | 권장 패턴 | 함수 |
|------|-----------|------|
| 단일 레코드 Upsert (Unique 키 존재) | ON CONFLICT | `upsert_by_unique_key()` |
| 단일 레코드 Get or Create | SELECT FOR UPDATE | `get_or_create_with_lock()` |
| 대량 레코드 Upsert | Bulk ON CONFLICT | `bulk_upsert()` |
| 데드락 방지 재시도 | 지수 백오프 | `execute_with_retry()` |
| 다중 행 잠금 | 정렬된 순서 잠금 | `LockManager` |
---
**작성일**: 2026-01-26
**작성자**: Claude Code (AI Assistant)

408
docs/plan/fix_plan.md Normal file
View File

@ -0,0 +1,408 @@
# API 타임아웃 및 재시도 로직 개선 계획
## 개요
외부 API 호출 시 타임아웃 미설정 및 재시도 로직 부재로 인한 안정성 문제를 해결합니다.
---
## 현재 상태
| 모듈 | 외부 API | 타임아웃 | 재시도 |
|------|----------|----------|--------|
| Lyric | ChatGPT (OpenAI) | ❌ 미설정 (SDK 기본 ~600초) | ❌ 없음 |
| Song | Suno API | ✅ 30-120초 | ❌ 없음 |
| Video | Creatomate API | ✅ 30-60초 | ❌ 없음 |
---
## 수정 계획
### 1. ChatGPT API 타임아웃 설정
**파일:** `app/utils/chatgpt_prompt.py`
**현재 코드:**
```python
class ChatgptService:
def __init__(self):
self.client = AsyncOpenAI(api_key=apikey_settings.CHATGPT_API_KEY)
```
**수정 코드:**
```python
class ChatgptService:
# 타임아웃 설정 (초)
DEFAULT_TIMEOUT = 60.0 # 전체 타임아웃
CONNECT_TIMEOUT = 10.0 # 연결 타임아웃
def __init__(self):
self.client = AsyncOpenAI(
api_key=apikey_settings.CHATGPT_API_KEY,
timeout=httpx.Timeout(
self.DEFAULT_TIMEOUT,
connect=self.CONNECT_TIMEOUT,
),
)
```
**필요한 import 추가:**
```python
import httpx
```
---
### 2. 재시도 유틸리티 함수 생성
**파일:** `app/utils/retry.py` (새 파일)
```python
"""
API 호출 재시도 유틸리티
지수 백오프(Exponential Backoff)를 사용한 재시도 로직을 제공합니다.
"""
import asyncio
import logging
from functools import wraps
from typing import Callable, Tuple, Type
logger = logging.getLogger(__name__)
class RetryExhaustedError(Exception):
"""모든 재시도 실패 시 발생하는 예외"""
def __init__(self, message: str, last_exception: Exception):
super().__init__(message)
self.last_exception = last_exception
async def retry_async(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
exponential_base: float = 2.0,
retry_on: Tuple[Type[Exception], ...] = (Exception,),
on_retry: Callable[[int, Exception], None] | None = None,
):
"""
비동기 함수 재시도 실행
Args:
func: 실행할 비동기 함수 (인자 없음)
max_retries: 최대 재시도 횟수 (기본: 3)
base_delay: 첫 번째 재시도 대기 시간 (초)
max_delay: 최대 대기 시간 (초)
exponential_base: 지수 백오프 배수 (기본: 2.0)
retry_on: 재시도할 예외 타입들
on_retry: 재시도 시 호출될 콜백 (attempt, exception)
Returns:
함수 실행 결과
Raises:
RetryExhaustedError: 모든 재시도 실패 시
Example:
result = await retry_async(
lambda: api_call(),
max_retries=3,
retry_on=(httpx.TimeoutException, httpx.HTTPStatusError),
)
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func()
except retry_on as e:
last_exception = e
if attempt == max_retries:
break
# 지수 백오프 계산
delay = min(base_delay * (exponential_base ** attempt), max_delay)
logger.warning(
f"[retry_async] 시도 {attempt + 1}/{max_retries + 1} 실패, "
f"{delay:.1f}초 후 재시도: {type(e).__name__}: {e}"
)
if on_retry:
on_retry(attempt + 1, e)
await asyncio.sleep(delay)
raise RetryExhaustedError(
f"최대 재시도 횟수({max_retries + 1}회) 초과",
last_exception,
)
def with_retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retry_on: Tuple[Type[Exception], ...] = (Exception,),
):
"""
재시도 데코레이터
Args:
max_retries: 최대 재시도 횟수
base_delay: 첫 번째 재시도 대기 시간 (초)
max_delay: 최대 대기 시간 (초)
retry_on: 재시도할 예외 타입들
Example:
@with_retry(max_retries=3, retry_on=(httpx.TimeoutException,))
async def call_api():
...
"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
return await retry_async(
lambda: func(*args, **kwargs),
max_retries=max_retries,
base_delay=base_delay,
max_delay=max_delay,
retry_on=retry_on,
)
return wrapper
return decorator
```
---
### 3. Suno API 재시도 로직 적용
**파일:** `app/utils/suno.py`
**수정 대상 메서드:**
- `generate()` - 노래 생성 요청
- `get_task_status()` - 상태 조회
- `get_lyric_timestamp()` - 타임스탬프 조회
**수정 예시 (generate 메서드):**
```python
# 상단 import 추가
import httpx
from app.utils.retry import retry_async
# 재시도 대상 예외 정의
RETRY_EXCEPTIONS = (
httpx.TimeoutException,
httpx.ConnectError,
httpx.ReadError,
)
async def generate(
self,
prompt: str,
genre: str | None = None,
callback_url: str | None = None,
) -> str:
# ... 기존 payload 구성 코드 ...
async def _call_api():
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/generate",
headers=self.headers,
json=payload,
timeout=30.0,
)
response.raise_for_status()
return response.json()
# 재시도 로직 적용
data = await retry_async(
_call_api,
max_retries=3,
base_delay=1.0,
retry_on=RETRY_EXCEPTIONS,
)
# ... 기존 응답 처리 코드 ...
```
---
### 4. Creatomate API 재시도 로직 적용
**파일:** `app/utils/creatomate.py`
**수정 대상:**
- `_request()` 메서드 (모든 API 호출의 기반)
**수정 코드:**
```python
# 상단 import 추가
from app.utils.retry import retry_async
# 재시도 대상 예외 정의
RETRY_EXCEPTIONS = (
httpx.TimeoutException,
httpx.ConnectError,
httpx.ReadError,
)
async def _request(
self,
method: str,
url: str,
timeout: float = 30.0,
max_retries: int = 3,
**kwargs,
) -> httpx.Response:
"""HTTP 요청을 수행합니다 (재시도 로직 포함)."""
logger.info(f"[Creatomate] {method} {url}")
async def _call():
client = await get_shared_client()
if method.upper() == "GET":
response = await client.get(
url, headers=self.headers, timeout=timeout, **kwargs
)
elif method.upper() == "POST":
response = await client.post(
url, headers=self.headers, timeout=timeout, **kwargs
)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response
response = await retry_async(
_call,
max_retries=max_retries,
base_delay=1.0,
retry_on=RETRY_EXCEPTIONS,
)
logger.info(f"[Creatomate] Response - Status: {response.status_code}")
return response
```
---
### 5. ChatGPT API 재시도 로직 적용
**파일:** `app/utils/chatgpt_prompt.py`
**수정 코드:**
```python
# 상단 import 추가
import httpx
from openai import APITimeoutError, APIConnectionError, RateLimitError
from app.utils.retry import retry_async
# 재시도 대상 예외 정의
RETRY_EXCEPTIONS = (
APITimeoutError,
APIConnectionError,
RateLimitError,
)
class ChatgptService:
DEFAULT_TIMEOUT = 60.0
CONNECT_TIMEOUT = 10.0
MAX_RETRIES = 3
def __init__(self):
self.client = AsyncOpenAI(
api_key=apikey_settings.CHATGPT_API_KEY,
timeout=httpx.Timeout(
self.DEFAULT_TIMEOUT,
connect=self.CONNECT_TIMEOUT,
),
)
async def _call_structured_output_with_response_gpt_api(
self, prompt: str, output_format: dict, model: str
) -> dict:
content = [{"type": "input_text", "text": prompt}]
async def _call():
response = await self.client.responses.create(
model=model,
input=[{"role": "user", "content": content}],
text=output_format,
)
return json.loads(response.output_text) or {}
return await retry_async(
_call,
max_retries=self.MAX_RETRIES,
base_delay=2.0, # OpenAI Rate Limit 대비 더 긴 대기
retry_on=RETRY_EXCEPTIONS,
)
```
---
## 타임아웃 설정 권장값
| API | 용도 | 권장 타임아웃 | 재시도 횟수 | 재시도 간격 |
|-----|------|---------------|-------------|-------------|
| ChatGPT | 가사 생성 | 60초 | 3회 | 2초 → 4초 → 8초 |
| Suno | 노래 생성 요청 | 30초 | 3회 | 1초 → 2초 → 4초 |
| Suno | 상태 조회 | 30초 | 2회 | 1초 → 2초 |
| Suno | 타임스탬프 | 120초 | 2회 | 2초 → 4초 |
| Creatomate | 템플릿 조회 | 30초 | 2회 | 1초 → 2초 |
| Creatomate | 렌더링 요청 | 60초 | 3회 | 1초 → 2초 → 4초 |
| Creatomate | 상태 조회 | 30초 | 2회 | 1초 → 2초 |
---
## 구현 순서
1. **1단계: retry.py 유틸리티 생성**
- 재사용 가능한 재시도 로직 구현
- 단위 테스트 작성
2. **2단계: ChatGPT 타임아웃 설정**
- 가장 시급한 문제 (현재 600초 기본값)
- 타임아웃 + 재시도 동시 적용
3. **3단계: Suno API 재시도 적용**
- generate(), get_task_status(), get_lyric_timestamp()
4. **4단계: Creatomate API 재시도 적용**
- _request() 메서드 수정으로 전체 적용
---
## 테스트 체크리스트
각 수정 후 확인 사항:
- [ ] 정상 요청 시 기존과 동일하게 동작
- [ ] 타임아웃 발생 시 지정된 시간 내 예외 발생
- [ ] 일시적 오류 시 재시도 후 성공
- [ ] 모든 재시도 실패 시 적절한 에러 메시지 반환
- [ ] 로그에 재시도 시도 기록 확인
---
## 롤백 계획
문제 발생 시:
1. retry.py 사용 코드 제거 (기존 직접 호출로 복구)
2. ChatGPT 타임아웃 설정 제거 (SDK 기본값으로 복구)
---
## 참고 사항
- OpenAI SDK는 내부적으로 일부 재시도 로직이 있으나, 커스텀 제어가 제한적
- httpx의 `TimeoutException``ConnectTimeout`, `ReadTimeout`, `WriteTimeout`, `PoolTimeout`을 포함
- Rate Limit 에러(429)는 재시도 시 더 긴 대기 시간 필요 (Retry-After 헤더 참고)

View File

@ -0,0 +1,447 @@
# INSERT → Upsert 변환 계획서
## 개요
이 문서는 프로젝트 내 INSERT 코드들을 **존재 여부 확인 후 INSERT 또는 DB Lock + UPDATE** 패턴으로 변환하기 위한 계획입니다.
### 적용 범위
- ✅ 포함: video, song, lyric 모듈의 INSERT 코드
- ❌ 제외: user 모듈, 이미지 업로드 엔드포인트
### 사용할 유틸리티
- `app/utils/db_utils.py``get_or_create_with_lock()` 함수
- `app/utils/db_utils.py``bulk_upsert()` 함수 (대량 INSERT용)
---
## 대상 INSERT 위치 목록
| # | 파일 | 라인 | 테이블 | 키 컬럼 | 우선순위 |
|---|------|------|--------|---------|----------|
| 1 | video.py | 252 | Video | task_id | 중 |
| 2 | song.py | 188 | Song | task_id | 중 |
| 3 | song.py | 477 | SongTimestamp | suno_audio_id + order_idx | **높음** |
| 4 | lyric.py | 297 | Project | task_id | 중 |
| 5 | lyric.py | 317 | Lyric | task_id | 중 |
---
## 1. Video INSERT (video.py:252)
### 현재 코드
```python
# app/video/api/routers/v1/video.py:244-254
video = Video(
project_id=project_id,
lyric_id=lyric_id,
song_id=song_id,
task_id=task_id,
creatomate_render_id=None,
status="processing",
)
session.add(video)
await session.commit()
video_id = video.id
```
### 문제점
- `task_id`로 기존 레코드 존재 여부를 확인하지 않음
- 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능
### 수정 계획
```python
# app/video/api/routers/v1/video.py
# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (244-254행)
result = await get_or_create_with_lock(
session=session,
model=Video,
filter_by={'task_id': task_id},
defaults={
'project_id': project_id,
'lyric_id': lyric_id,
'song_id': song_id,
'creatomate_render_id': None,
'status': 'processing',
},
lock=True,
)
video = result.entity
if not result.created:
# 이미 존재하는 경우: 상태 업데이트
video.project_id = project_id
video.lyric_id = lyric_id
video.song_id = song_id
video.status = 'processing'
video.creatomate_render_id = None
await session.commit()
video_id = video.id
```
### 필수 사전 작업
- Video 테이블에 `task_id` UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id);
```
---
## 2. Song INSERT (song.py:188)
### 현재 코드
```python
# app/song/api/routers/v1/song.py:179-191
song = Song(
project_id=project_id,
lyric_id=lyric_id,
task_id=task_id,
suno_task_id=None,
status="processing",
song_prompt=song_prompt,
language=request_body.language,
)
session.add(song)
await session.commit()
song_id = song.id
```
### 문제점
- `task_id`로 기존 레코드 존재 여부를 확인하지 않음
- 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능
### 수정 계획
```python
# app/song/api/routers/v1/song.py
# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (179-191행)
result = await get_or_create_with_lock(
session=session,
model=Song,
filter_by={'task_id': task_id},
defaults={
'project_id': project_id,
'lyric_id': lyric_id,
'suno_task_id': None,
'status': 'processing',
'song_prompt': song_prompt,
'language': request_body.language,
},
lock=True,
)
song = result.entity
if not result.created:
# 이미 존재하는 경우: 상태 업데이트
song.project_id = project_id
song.lyric_id = lyric_id
song.status = 'processing'
song.song_prompt = song_prompt
song.language = request_body.language
await session.commit()
song_id = song.id
```
### 필수 사전 작업
- Song 테이블에 `task_id` UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id);
```
---
## 3. SongTimestamp INSERT (song.py:477) ⚠️ 높은 우선순위
### 현재 코드
```python
# app/song/api/routers/v1/song.py:467-479
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
song_timestamp = SongTimestamp(
suno_audio_id=suno_audio_id,
order_idx=order_idx,
lyric_line=timestamped_lyric["text"],
start_time=timestamped_lyric["start_sec"],
end_time=timestamped_lyric["end_sec"],
)
session.add(song_timestamp)
await session.commit()
```
### 문제점
- **폴링 기반 요청으로 인해 중복 INSERT 위험이 높음**
- `suno_audio_id` + `order_idx` 조합으로 존재 여부 확인 없음
- 여러 행을 루프에서 개별 INSERT하여 비효율적
### 수정 계획 (bulk_upsert 사용)
```python
# app/song/api/routers/v1/song.py
# 상단 import 추가
from app.utils.db_utils import bulk_upsert
# 기존 코드 대체 (467-479행)
timestamp_records = [
{
'suno_audio_id': suno_audio_id,
'order_idx': order_idx,
'lyric_line': timestamped_lyric["text"],
'start_time': timestamped_lyric["start_sec"],
'end_time': timestamped_lyric["end_sec"],
}
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics)
]
await bulk_upsert(
session=session,
model=SongTimestamp,
unique_columns=['suno_audio_id', 'order_idx'],
records=timestamp_records,
update_columns=['lyric_line', 'start_time', 'end_time'],
)
await session.commit()
```
### 필수 사전 작업
- SongTimestamp 테이블에 복합 UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE song_timestamp
ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx);
```
### 대안: get_or_create_with_lock 사용 (개별 처리)
만약 `bulk_upsert`를 사용하지 않고 개별 처리가 필요한 경우:
```python
from app.utils.db_utils import get_or_create_with_lock
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
result = await get_or_create_with_lock(
session=session,
model=SongTimestamp,
filter_by={
'suno_audio_id': suno_audio_id,
'order_idx': order_idx,
},
defaults={
'lyric_line': timestamped_lyric["text"],
'start_time': timestamped_lyric["start_sec"],
'end_time': timestamped_lyric["end_sec"],
},
lock=True,
)
if not result.created:
# 이미 존재하는 경우: 업데이트
result.entity.lyric_line = timestamped_lyric["text"]
result.entity.start_time = timestamped_lyric["start_sec"]
result.entity.end_time = timestamped_lyric["end_sec"]
await session.commit()
```
**권장사항**: `bulk_upsert` 사용 (성능 및 데드락 방지 측면에서 우수)
---
## 4. Project INSERT (lyric.py:297)
### 현재 코드
```python
# app/lyric/api/routers/v1/lyric.py:290-299
project = Project(
store_name=request_body.customer_name,
region=request_body.region,
task_id=task_id,
detail_region_info=request_body.detail_region_info,
language=request_body.language,
)
session.add(project)
await session.commit()
await session.refresh(project)
```
### 문제점
- `task_id`로 기존 레코드 존재 여부를 확인하지 않음
- 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능
### 수정 계획
```python
# app/lyric/api/routers/v1/lyric.py
# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (290-299행)
result = await get_or_create_with_lock(
session=session,
model=Project,
filter_by={'task_id': task_id},
defaults={
'store_name': request_body.customer_name,
'region': request_body.region,
'detail_region_info': request_body.detail_region_info,
'language': request_body.language,
},
lock=True,
)
project = result.entity
if not result.created:
# 이미 존재하는 경우: 정보 업데이트
project.store_name = request_body.customer_name
project.region = request_body.region
project.detail_region_info = request_body.detail_region_info
project.language = request_body.language
await session.commit()
await session.refresh(project)
```
### 필수 사전 작업
- Project 테이블에 `task_id` UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id);
```
---
## 5. Lyric INSERT (lyric.py:317)
### 현재 코드
```python
# app/lyric/api/routers/v1/lyric.py:308-319
estimated_prompt = lyric_prompt.build_prompt(lyric_input_data)
lyric = Lyric(
project_id=project.id,
task_id=task_id,
status="processing",
lyric_prompt=estimated_prompt,
lyric_result=None,
language=request_body.language,
)
session.add(lyric)
await session.commit()
await session.refresh(lyric)
```
### 문제점
- `task_id`로 기존 레코드 존재 여부를 확인하지 않음
- 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능
### 수정 계획
```python
# app/lyric/api/routers/v1/lyric.py
# 상단 import (이미 추가되어 있음)
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (308-319행)
estimated_prompt = lyric_prompt.build_prompt(lyric_input_data)
result = await get_or_create_with_lock(
session=session,
model=Lyric,
filter_by={'task_id': task_id},
defaults={
'project_id': project.id,
'status': 'processing',
'lyric_prompt': estimated_prompt,
'lyric_result': None,
'language': request_body.language,
},
lock=True,
)
lyric = result.entity
if not result.created:
# 이미 존재하는 경우: 정보 업데이트
lyric.project_id = project.id
lyric.status = 'processing'
lyric.lyric_prompt = estimated_prompt
lyric.lyric_result = None
lyric.language = request_body.language
await session.commit()
await session.refresh(lyric)
```
### 필수 사전 작업
- Lyric 테이블에 `task_id` UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id);
```
---
## 구현 순서 권장
1. **1단계: DB 마이그레이션** (필수)
```sql
-- 모든 UNIQUE 인덱스 추가
ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id);
ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id);
ALTER TABLE song_timestamp ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx);
ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id);
ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id);
```
2. **2단계: SongTimestamp (높은 우선순위)**
- 폴링으로 인한 중복 INSERT 위험이 가장 높음
- `bulk_upsert` 사용 권장
3. **3단계: 나머지 테이블 (중간 우선순위)**
- Video, Song, Project, Lyric 순으로 적용
- 모두 `get_or_create_with_lock` 사용
---
## 롤백 계획
문제 발생 시 원래 코드로 복구:
1. 코드 변경 사항 git revert
2. UNIQUE 인덱스는 유지 (데이터 무결성에 도움됨)
---
## 테스트 체크리스트
각 수정 후 확인 사항:
- [ ] 새로운 task_id로 요청 시 정상 INSERT
- [ ] 동일한 task_id로 재요청 시 UPDATE (에러 없이)
- [ ] 동시 요청 테스트 (2개 이상 동시 요청)
- [ ] 성능 저하 없는지 확인
---
## 참고 문서
- [db_lock.md](./db_lock.md) - DB Lock 및 Upsert 패턴 가이드
- [db_utils.py](../../app/utils/db_utils.py) - 유틸리티 함수 구현체

571
error_plan.md Normal file
View File

@ -0,0 +1,571 @@
# Suno API & Creatomate API 에러 처리 작업 계획서
## 현재 상태 분석
### ✅ 이미 구현된 항목
| 항목 | Suno API | Creatomate API |
|------|----------|----------------|
| DB failed 상태 저장 | ✅ `song_task.py` | ✅ `video_task.py` |
| Response failed 상태 반환 | ✅ `song_schema.py` | ✅ `video_schema.py` |
### ❌ 미구현 항목
| 항목 | Suno API | Creatomate API |
|------|----------|----------------|
| 타임아웃 외부화 | ❌ 하드코딩됨 | ❌ 하드코딩됨 |
| 재시도 로직 | ❌ 없음 | ❌ 없음 |
| 커스텀 예외 클래스 | ❌ 없음 | ❌ 없음 |
---
## 1. RecoverySettings에 Suno/Creatomate 설정 추가
### 파일: `config.py`
**변경 전:**
```python
class RecoverySettings(BaseSettings):
"""ChatGPT API 복구 및 타임아웃 설정"""
CHATGPT_TIMEOUT: float = Field(
default=600.0,
description="ChatGPT API 타임아웃 (초). OpenAI Python SDK 기본값: 600초 (10분)",
)
CHATGPT_MAX_RETRIES: int = Field(
default=1,
description="ChatGPT API 응답 실패 시 최대 재시도 횟수",
)
model_config = _base_config
```
**변경 후:**
```python
class RecoverySettings(BaseSettings):
"""외부 API 복구 및 타임아웃 설정
ChatGPT, Suno, Creatomate API의 타임아웃 및 재시도 설정을 관리합니다.
"""
# ============================================================
# ChatGPT API 설정
# ============================================================
CHATGPT_TIMEOUT: float = Field(
default=600.0,
description="ChatGPT API 타임아웃 (초). OpenAI Python SDK 기본값: 600초 (10분)",
)
CHATGPT_MAX_RETRIES: int = Field(
default=1,
description="ChatGPT API 응답 실패 시 최대 재시도 횟수",
)
# ============================================================
# Suno API 설정
# ============================================================
SUNO_DEFAULT_TIMEOUT: float = Field(
default=30.0,
description="Suno API 기본 요청 타임아웃 (초)",
)
SUNO_LYRIC_TIMEOUT: float = Field(
default=120.0,
description="Suno API 가사 타임스탬프 요청 타임아웃 (초)",
)
SUNO_MAX_RETRIES: int = Field(
default=2,
description="Suno API 응답 실패 시 최대 재시도 횟수",
)
# ============================================================
# Creatomate API 설정
# ============================================================
CREATOMATE_DEFAULT_TIMEOUT: float = Field(
default=30.0,
description="Creatomate API 기본 요청 타임아웃 (초)",
)
CREATOMATE_RENDER_TIMEOUT: float = Field(
default=60.0,
description="Creatomate API 렌더링 요청 타임아웃 (초)",
)
CREATOMATE_CONNECT_TIMEOUT: float = Field(
default=10.0,
description="Creatomate API 연결 타임아웃 (초)",
)
CREATOMATE_MAX_RETRIES: int = Field(
default=2,
description="Creatomate API 응답 실패 시 최대 재시도 횟수",
)
model_config = _base_config
```
**이유:** 모든 외부 API의 타임아웃/재시도 설정을 `RecoverySettings` 하나에서 통합 관리하여 일관성을 유지합니다.
### 파일: `.env`
**추가할 내용:**
```env
# ============================================================
# 외부 API 타임아웃 및 재시도 설정 (RecoverySettings)
# ============================================================
# ChatGPT API (기존)
CHATGPT_TIMEOUT=600.0
CHATGPT_MAX_RETRIES=1
# Suno API
SUNO_DEFAULT_TIMEOUT=30.0
SUNO_LYRIC_TIMEOUT=120.0
SUNO_MAX_RETRIES=2
# Creatomate API
CREATOMATE_DEFAULT_TIMEOUT=30.0
CREATOMATE_RENDER_TIMEOUT=60.0
CREATOMATE_CONNECT_TIMEOUT=10.0
CREATOMATE_MAX_RETRIES=2
```
---
## 2. Suno API 커스텀 예외 클래스 추가
### 파일: `app/utils/suno.py`
**변경 전 (라인 1-20):**
```python
import httpx
import json
from typing import Optional
from app.utils.logger import get_logger
logger = get_logger("suno")
```
**변경 후:**
```python
import httpx
import json
from typing import Optional
from app.utils.logger import get_logger
from config import recovery_settings
logger = get_logger("suno")
class SunoResponseError(Exception):
"""Suno API 응답 오류 시 발생하는 예외
Suno API 거부 응답 또는 비정상 응답 시 사용됩니다.
재시도 로직에서 이 예외를 catch하여 재시도를 수행합니다.
Attributes:
message: 에러 메시지
original_response: 원본 API 응답 (있는 경우)
"""
def __init__(self, message: str, original_response: dict | None = None):
self.message = message
self.original_response = original_response
super().__init__(self.message)
```
**이유:** ChatGPT API와 동일하게 커스텀 예외 클래스를 추가하여 Suno API 오류를 명확히 구분하고 재시도 로직에서 활용합니다.
---
## 3. Suno API 타임아웃 적용
### 파일: `app/utils/suno.py`
**변경 전 (라인 130):**
```python
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/generate",
headers=self.headers,
json=payload,
timeout=30.0,
)
```
**변경 후:**
```python
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/generate",
headers=self.headers,
json=payload,
timeout=recovery_settings.SUNO_DEFAULT_TIMEOUT,
)
```
**변경 전 (라인 173):**
```python
timeout=30.0,
```
**변경 후:**
```python
timeout=recovery_settings.SUNO_DEFAULT_TIMEOUT,
```
**변경 전 (라인 201):**
```python
timeout=120.0,
```
**변경 후:**
```python
timeout=recovery_settings.SUNO_LYRIC_TIMEOUT,
```
**이유:** 환경변수로 타임아웃을 관리하여 배포 환경별로 유연하게 조정할 수 있습니다.
---
## 4. Suno API 재시도 로직 추가
### 파일: `app/utils/suno.py`
**변경 전 - generate() 메서드:**
```python
async def generate(
self,
lyric: str,
style: str,
title: str,
task_id: str,
) -> dict:
"""음악 생성 요청"""
payload = {
"prompt": lyric,
"style": style,
"title": title,
"customMode": True,
"callbackUrl": f"{self.callback_url}?task_id={task_id}",
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/generate",
headers=self.headers,
json=payload,
timeout=30.0,
)
if response.status_code != 200:
logger.error(f"Failed to generate music: {response.text}")
raise Exception(f"Failed to generate music: {response.status_code}")
return response.json()
```
**변경 후:**
```python
async def generate(
self,
lyric: str,
style: str,
title: str,
task_id: str,
) -> dict:
"""음악 생성 요청 (재시도 로직 포함)
Args:
lyric: 가사 텍스트
style: 음악 스타일
title: 곡 제목
task_id: 작업 고유 식별자
Returns:
Suno API 응답 데이터
Raises:
SunoResponseError: API 오류 또는 재시도 실패 시
"""
payload = {
"prompt": lyric,
"style": style,
"title": title,
"customMode": True,
"callbackUrl": f"{self.callback_url}?task_id={task_id}",
}
last_error: Exception | None = None
for attempt in range(recovery_settings.SUNO_MAX_RETRIES + 1):
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/generate",
headers=self.headers,
json=payload,
timeout=recovery_settings.SUNO_DEFAULT_TIMEOUT,
)
if response.status_code == 200:
return response.json()
# 재시도 불가능한 오류 (4xx 클라이언트 오류)
if 400 <= response.status_code < 500:
raise SunoResponseError(
f"Client error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text}
)
# 재시도 가능한 오류 (5xx 서버 오류)
last_error = SunoResponseError(
f"Server error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text}
)
except httpx.TimeoutException as e:
logger.warning(f"[Suno] Timeout on attempt {attempt + 1}/{recovery_settings.SUNO_MAX_RETRIES + 1}")
last_error = e
except httpx.HTTPError as e:
logger.warning(f"[Suno] HTTP error on attempt {attempt + 1}: {e}")
last_error = e
# 마지막 시도가 아니면 재시도
if attempt < recovery_settings.SUNO_MAX_RETRIES:
logger.info(f"[Suno] Retrying... ({attempt + 1}/{recovery_settings.SUNO_MAX_RETRIES})")
# 모든 재시도 실패
raise SunoResponseError(
f"All {recovery_settings.SUNO_MAX_RETRIES + 1} attempts failed",
original_response={"last_error": str(last_error)}
)
```
**이유:** 네트워크 오류나 일시적인 서버 오류 시 자동으로 재시도하여 안정성을 높입니다.
---
## 5. Creatomate API 커스텀 예외 클래스 추가
### 파일: `app/utils/creatomate.py`
**변경 전 (라인 1-20):**
```python
import asyncio
import json
from typing import Any, Optional
import httpx
from app.utils.logger import get_logger
from config import creatomate_settings
logger = get_logger("creatomate")
```
**변경 후:**
```python
import asyncio
import json
from typing import Any, Optional
import httpx
from app.utils.logger import get_logger
from config import creatomate_settings, recovery_settings
logger = get_logger("creatomate")
class CreatomateResponseError(Exception):
"""Creatomate API 응답 오류 시 발생하는 예외
Creatomate API 렌더링 실패 또는 비정상 응답 시 사용됩니다.
재시도 로직에서 이 예외를 catch하여 재시도를 수행합니다.
Attributes:
message: 에러 메시지
original_response: 원본 API 응답 (있는 경우)
"""
def __init__(self, message: str, original_response: dict | None = None):
self.message = message
self.original_response = original_response
super().__init__(self.message)
```
---
## 6. Creatomate API 타임아웃 적용
### 파일: `app/utils/creatomate.py`
**변경 전 (라인 138):**
```python
self._client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers=self._get_headers(),
timeout=httpx.Timeout(60.0, connect=10.0),
)
```
**변경 후:**
```python
self._client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers=self._get_headers(),
timeout=httpx.Timeout(
recovery_settings.CREATOMATE_RENDER_TIMEOUT,
connect=recovery_settings.CREATOMATE_CONNECT_TIMEOUT
),
)
```
**변경 전 (라인 258, 291):**
```python
timeout=30.0
```
**변경 후:**
```python
timeout=recovery_settings.CREATOMATE_DEFAULT_TIMEOUT
```
**변경 전 (라인 446, 457):**
```python
timeout=60.0
```
**변경 후:**
```python
timeout=recovery_settings.CREATOMATE_RENDER_TIMEOUT
```
---
## 7. Creatomate API 재시도 로직 추가
### 파일: `app/utils/creatomate.py`
**변경 전 - render_with_json() 메서드 (라인 440~):**
```python
async def render_with_json(
self,
template_id: str,
modifications: dict[str, Any],
task_id: str,
) -> dict:
"""JSON 수정사항으로 렌더링 요청"""
payload = {
"template_id": template_id,
"modifications": modifications,
"webhook_url": f"{creatomate_settings.CREATOMATE_CALLBACK_URL}?task_id={task_id}",
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/v1/renders",
headers=self._get_headers(),
json=payload,
timeout=60.0,
)
if response.status_code != 200:
logger.error(f"Failed to render: {response.text}")
raise Exception(f"Failed to render: {response.status_code}")
return response.json()
```
**변경 후:**
```python
async def render_with_json(
self,
template_id: str,
modifications: dict[str, Any],
task_id: str,
) -> dict:
"""JSON 수정사항으로 렌더링 요청 (재시도 로직 포함)
Args:
template_id: Creatomate 템플릿 ID
modifications: 수정사항 딕셔너리
task_id: 작업 고유 식별자
Returns:
Creatomate API 응답 데이터
Raises:
CreatomateResponseError: API 오류 또는 재시도 실패 시
"""
payload = {
"template_id": template_id,
"modifications": modifications,
"webhook_url": f"{creatomate_settings.CREATOMATE_CALLBACK_URL}?task_id={task_id}",
}
last_error: Exception | None = None
for attempt in range(recovery_settings.CREATOMATE_MAX_RETRIES + 1):
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/v1/renders",
headers=self._get_headers(),
json=payload,
timeout=recovery_settings.CREATOMATE_RENDER_TIMEOUT,
)
if response.status_code == 200:
return response.json()
# 재시도 불가능한 오류 (4xx 클라이언트 오류)
if 400 <= response.status_code < 500:
raise CreatomateResponseError(
f"Client error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text}
)
# 재시도 가능한 오류 (5xx 서버 오류)
last_error = CreatomateResponseError(
f"Server error: {response.status_code}",
original_response={"status": response.status_code, "text": response.text}
)
except httpx.TimeoutException as e:
logger.warning(f"[Creatomate] Timeout on attempt {attempt + 1}/{recovery_settings.CREATOMATE_MAX_RETRIES + 1}")
last_error = e
except httpx.HTTPError as e:
logger.warning(f"[Creatomate] HTTP error on attempt {attempt + 1}: {e}")
last_error = e
# 마지막 시도가 아니면 재시도
if attempt < recovery_settings.CREATOMATE_MAX_RETRIES:
logger.info(f"[Creatomate] Retrying... ({attempt + 1}/{recovery_settings.CREATOMATE_MAX_RETRIES})")
# 모든 재시도 실패
raise CreatomateResponseError(
f"All {recovery_settings.CREATOMATE_MAX_RETRIES + 1} attempts failed",
original_response={"last_error": str(last_error)}
)
```
---
## 작업 체크리스트
| 순번 | 작업 내용 | 파일 | 상태 |
|------|----------|------|------|
| 1 | RecoverySettings에 Suno/Creatomate 설정 추가 | `config.py` | ✅ |
| 2 | .env에 타임아웃/재시도 환경변수 추가 | `.env` | ✅ |
| 3 | SunoResponseError 예외 클래스 추가 | `app/utils/suno.py` | ✅ |
| 4 | Suno 타임아웃 적용 (recovery_settings 사용) | `app/utils/suno.py` | ✅ |
| 5 | Suno 재시도 로직 추가 | `app/utils/suno.py` | ✅ |
| 6 | CreatomateResponseError 예외 클래스 추가 | `app/utils/creatomate.py` | ✅ |
| 7 | Creatomate 타임아웃 적용 (recovery_settings 사용) | `app/utils/creatomate.py` | ✅ |
| 8 | Creatomate 재시도 로직 추가 | `app/utils/creatomate.py` | ✅ |
---
## 참고사항
- **DB failed 상태 저장**: `song_task.py``video_task.py`에 이미 구현되어 있습니다.
- **Response failed 상태**: 모든 스키마에 `success`, `status` 필드가 이미 존재합니다.
- 재시도는 5xx 서버 오류와 타임아웃에만 적용되며, 4xx 클라이언트 오류는 즉시 실패 처리합니다.
- 모든 타임아웃/재시도 설정은 `RecoverySettings`에서 통합 관리합니다.

View File

@ -38,10 +38,10 @@ tags_metadata = [
- **Refresh Token**: 7 유효, Access Token 갱신 사용
""",
},
{
"name": "Home",
"description": "홈 화면 및 프로젝트 관리 API",
},
# {
# "name": "Home",
# "description": "홈 화면 및 프로젝트 관리 API",
# },
{
"name": "Crawling",
"description": "네이버 지도 크롤링 API - 장소 정보 및 이미지 수집",