ChatGPT API timeout 및 retry 설정 추가

insta
Dohyun Lim 2026-01-27 15:28:44 +09:00
parent 6d2961cee2
commit fea30e79fd
6 changed files with 1858 additions and 17 deletions

View File

@ -9,6 +9,7 @@ from datetime import datetime, timezone
from typing import Optional from typing import Optional
from sqlalchemy import select, update from sqlalchemy import select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from config import prj_settings from config import prj_settings
@ -276,10 +277,44 @@ class AuthService:
thumbnail_image_url=profile.thumbnail_image_url if profile else None, thumbnail_image_url=profile.thumbnail_image_url if profile else None,
) )
session.add(new_user) session.add(new_user)
try:
await session.flush() await session.flush()
await session.refresh(new_user) await session.refresh(new_user)
logger.info(f"[AUTH] 신규 사용자 생성 완료 - user_id: {new_user.id}, is_new_user: True") logger.info(f"[AUTH] 신규 사용자 생성 완료 - user_id: {new_user.id}, is_new_user: True")
return new_user, True return new_user, True
except IntegrityError:
# 동시 요청으로 인한 중복 삽입 시도 - 기존 사용자 조회
logger.warning(
f"[AUTH] IntegrityError 발생 (동시 요청 추정) - kakao_id: {kakao_id}, "
"기존 사용자 재조회 시도"
)
await session.rollback()
result = await session.execute(
select(User).where(User.kakao_id == kakao_id)
)
existing_user = result.scalar_one_or_none()
if existing_user is not None:
logger.info(
f"[AUTH] 기존 사용자 재조회 성공 - user_id: {existing_user.id}, "
"is_new_user: False"
)
# 프로필 정보 업데이트
if profile:
existing_user.nickname = profile.nickname
existing_user.profile_image_url = profile.profile_image_url
existing_user.thumbnail_image_url = profile.thumbnail_image_url
if kakao_account and kakao_account.email:
existing_user.email = kakao_account.email
await session.flush()
return existing_user, False
# 재조회에도 실패한 경우 (매우 드문 경우)
logger.error(
f"[AUTH] IntegrityError 후 재조회 실패 - kakao_id: {kakao_id}"
)
raise
async def _save_refresh_token( async def _save_refresh_token(
self, self,

View File

@ -4,13 +4,20 @@ import re
from openai import AsyncOpenAI from openai import AsyncOpenAI
from app.utils.logger import get_logger from app.utils.logger import get_logger
from config import apikey_settings from config import apikey_settings, recovery_settings
from app.utils.prompts.prompts import Prompt from app.utils.prompts.prompts import Prompt
# 로거 설정 # 로거 설정
logger = get_logger("chatgpt") logger = get_logger("chatgpt")
# fmt: on
class ChatGPTResponseError(Exception):
"""ChatGPT API 응답 에러"""
def __init__(self, status: str, error_code: str = None, error_message: str = None):
self.status = status
self.error_code = error_code
self.error_message = error_message
super().__init__(f"ChatGPT response failed: status={status}, code={error_code}, message={error_message}")
class ChatgptService: class ChatgptService:
@ -19,19 +26,62 @@ class ChatgptService:
GPT 5.0 모델을 사용하여 마케팅 가사 분석을 생성합니다. GPT 5.0 모델을 사용하여 마케팅 가사 분석을 생성합니다.
""" """
def __init__(self): def __init__(self, timeout: float = None):
self.client = AsyncOpenAI(api_key=apikey_settings.CHATGPT_API_KEY) self.timeout = timeout or recovery_settings.CHATGPT_TIMEOUT
self.max_retries = recovery_settings.CHATGPT_MAX_RETRIES
self.client = AsyncOpenAI(
api_key=apikey_settings.CHATGPT_API_KEY,
timeout=self.timeout
)
async def _call_structured_output_with_response_gpt_api(self, prompt: str, output_format: dict, model: str) -> dict: async def _call_structured_output_with_response_gpt_api(self, prompt: str, output_format: dict, model: str) -> dict:
content = [{"type": "input_text", "text": prompt}] content = [{"type": "input_text", "text": prompt}]
last_error = None
for attempt in range(self.max_retries + 1):
response = await self.client.responses.create( response = await self.client.responses.create(
model=model, model=model,
input=[{"role": "user", "content": content}], input=[{"role": "user", "content": content}],
text = output_format text=output_format,
timeout=self.timeout
) )
# Response 디버그 로깅
logger.debug(f"[ChatgptService] Response ID: {response.id}")
logger.debug(f"[ChatgptService] Response status: {response.status}")
logger.debug(f"[ChatgptService] Response model: {response.model}")
# status 확인: completed, failed, incomplete, cancelled, queued, in_progress
if response.status == "completed":
logger.debug(f"[ChatgptService] Response output_text: {response.output_text[:200]}..." if len(response.output_text) > 200 else f"[ChatgptService] Response output_text: {response.output_text}")
structured_output = json.loads(response.output_text) structured_output = json.loads(response.output_text)
return structured_output or {} return structured_output or {}
# 에러 상태 처리
if response.status == "failed":
error_code = getattr(response.error, 'code', None) if response.error else None
error_message = getattr(response.error, 'message', None) if response.error else None
logger.warning(f"[ChatgptService] Response failed (attempt {attempt + 1}/{self.max_retries + 1}): code={error_code}, message={error_message}")
last_error = ChatGPTResponseError(response.status, error_code, error_message)
elif response.status == "incomplete":
reason = getattr(response.incomplete_details, 'reason', None) if response.incomplete_details else None
logger.warning(f"[ChatgptService] Response incomplete (attempt {attempt + 1}/{self.max_retries + 1}): reason={reason}")
last_error = ChatGPTResponseError(response.status, reason, f"Response incomplete: {reason}")
else:
# cancelled, queued, in_progress 등 예상치 못한 상태
logger.warning(f"[ChatgptService] Unexpected response status (attempt {attempt + 1}/{self.max_retries + 1}): {response.status}")
last_error = ChatGPTResponseError(response.status, None, f"Unexpected status: {response.status}")
# 마지막 시도가 아니면 재시도
if attempt < self.max_retries:
logger.info(f"[ChatgptService] Retrying request...")
# 모든 재시도 실패
logger.error(f"[ChatgptService] All retries exhausted. Last error: {last_error}")
raise last_error
async def generate_structured_output( async def generate_structured_output(
self, self,
prompt : Prompt, prompt : Prompt,

View File

@ -153,6 +153,21 @@ class PromptSettings(BaseSettings):
model_config = _base_config model_config = _base_config
class RecoverySettings(BaseSettings):
"""ChatGPT API 복구 및 타임아웃 설정"""
CHATGPT_TIMEOUT: float = Field(
default=600.0,
description="ChatGPT API 타임아웃 (초). OpenAI Python SDK 기본값: 600초 (10분)",
)
CHATGPT_MAX_RETRIES: int = Field(
default=1,
description="ChatGPT API 응답 실패 시 최대 재시도 횟수",
)
model_config = _base_config
class KakaoSettings(BaseSettings): class KakaoSettings(BaseSettings):
"""카카오 OAuth 설정""" """카카오 OAuth 설정"""
@ -386,3 +401,4 @@ prompt_settings = PromptSettings()
log_settings = LogSettings() log_settings = LogSettings()
kakao_settings = KakaoSettings() kakao_settings = KakaoSettings()
jwt_settings = JWTSettings() jwt_settings = JWTSettings()
recovery_settings = RecoverySettings()

885
docs/plan/db_lock.md Normal file
View File

@ -0,0 +1,885 @@
# DB Lock 및 Upsert 패턴 가이드 (MySQL 전용)
## 목차
1. [현재 Insert 사용 현황](#1-현재-insert-사용-현황)
2. [Upsert 패턴 설계](#2-upsert-패턴-설계)
3. [DB Lock 전략](#3-db-lock-전략)
4. [데드락 방지 전략](#4-데드락-방지-전략)
5. [제안 코드](#5-제안-코드)
6. [사용 예시](#6-사용-예시)
---
## 1. 현재 Insert 사용 현황
### 1.1 테이블별 Insert 분석 및 우선순위
| 테이블 | 파일 위치 | Unique 제약 | 동시 요청 가능성 | Upsert 우선순위 | 키 조합 |
|--------|-----------|-------------|------------------|-----------------|---------|
| **SongTimestamp** | song.py:477 | - | **높음** | **1순위** | suno_audio_id + order_idx |
| **Image** | home.py:503,552,799,821 | - | **중간** | **2순위** | task_id + img_order |
| **Song** | song.py:188 | - | 중간 | 3순위 | task_id |
| **Video** | video.py:252 | - | 중간 | 4순위 | task_id |
| **User** | auth.py:278 | `kakao_id` | **낮음** | ✅ 완료 | kakao_id |
| **Project** | lyric.py:297 | - | 낮음 | 선택 | task_id |
| **Lyric** | lyric.py:317 | - | 낮음 | 선택 | task_id |
| **RefreshToken** | auth.py:315 | `token_hash` | - | 불필요 | - (항상 새로 생성) |
#### 동시 요청 가능성 분석
| 가능성 | 테이블 | 발생 시나리오 |
|--------|--------|---------------|
| **높음** | SongTimestamp | 클라이언트가 상태 조회 API를 여러 번 호출 (폴링) → 동일 데이터 중복 삽입 |
| **중간** | Image | 네트워크 오류로 업로드 재시도, 클라이언트 중복 클릭 |
| **중간** | Song/Video | 백그라운드 태스크 재실행, 상태 확인 중복 호출 |
| **낮음** | User | OAuth 인가 코드 일회성으로 동시 요청 거의 불가능 |
> **참고**: User 테이블의 경우 카카오 OAuth 인가 코드(authorization code)가 **일회성**이므로,
> 동일한 코드로 동시 요청은 불가능합니다. 다만 여러 탭에서 각각 로그인을 시작하는 극히 드문 경우에만 발생 가능합니다.
### 1.2 현재 코드 패턴의 문제점
**실제 문제가 발생하는 케이스: SongTimestamp**
```python
# song.py:467-479 - 현재 패턴
# 클라이언트가 /song/status/{song_id}를 여러 번 호출하면 중복 삽입 발생!
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
song_timestamp = SongTimestamp(
suno_audio_id=suno_audio_id,
order_idx=order_idx,
lyric_line=timestamped_lyric["text"],
start_time=timestamped_lyric["start_sec"],
end_time=timestamped_lyric["end_sec"],
)
session.add(song_timestamp) # 동일 suno_audio_id로 중복 삽입!
await session.commit()
```
**문제 시나리오:**
1. 클라이언트가 노래 생성 상태 확인을 위해 폴링
2. SUCCESS 응답을 받은 후 타임스탬프 저장 로직 실행
3. 네트워크 지연으로 클라이언트가 재요청
4. **동일한 데이터가 중복 삽입됨**
---
## 2. Upsert 패턴 설계 (MySQL 전용)
### 2.1 MySQL ON DUPLICATE KEY UPDATE (권장)
MySQL의 `INSERT ... ON DUPLICATE KEY UPDATE` 절을 사용한 원자적 Upsert:
```python
from sqlalchemy.dialects.mysql import insert as mysql_insert
stmt = mysql_insert(User).values(
kakao_id=kakao_id,
nickname=nickname,
email=email,
)
stmt = stmt.on_duplicate_key_update(
nickname=stmt.inserted.nickname, # MySQL은 stmt.inserted 사용
email=stmt.inserted.email,
updated_at=func.now(),
)
await session.execute(stmt)
# MySQL은 RETURNING 미지원 - 별도 조회 필요
result = await session.execute(select(User).where(User.kakao_id == kakao_id))
user = result.scalar_one()
```
**장점:**
- 원자적 연산 (단일 쿼리)
- 데드락 위험 최소화
- 동시 요청에서도 안전
**전제 조건:**
- Unique 인덱스 필수 (없으면 항상 INSERT만 됨)
### 2.3 비관적 잠금 (Pessimistic Locking)
`SELECT ... FOR UPDATE`를 사용한 행 수준 잠금:
```python
result = await session.execute(
select(User)
.where(User.kakao_id == kakao_id)
.with_for_update() # FOR UPDATE 잠금
)
user = result.scalar_one_or_none()
```
---
## 3. DB Lock 전략
### 3.1 잠금 유형
| 잠금 유형 | 사용 시점 | SQLAlchemy 구현 |
|-----------|-----------|-----------------|
| **공유 잠금 (Shared)** | 읽기 작업 | `.with_for_update(read=True)` |
| **배타 잠금 (Exclusive)** | 쓰기 작업 | `.with_for_update()` |
| **NOWAIT** | 즉시 실패 | `.with_for_update(nowait=True)` |
| **SKIP LOCKED** | 잠긴 행 건너뛰기 | `.with_for_update(skip_locked=True)` |
### 3.2 잠금 범위 선택
```python
# 1. 행 수준 잠금 (Row-level) - 권장
select(User).where(User.id == user_id).with_for_update()
# 2. 키 범위 잠금 (Key-range) - 범위 조회 시
select(Image).where(Image.task_id == task_id).with_for_update()
# 3. 테이블 잠금 - 피해야 함 (성능 저하)
```
### 3.3 트랜잭션 격리 수준
```python
from sqlalchemy import text
# 세션별 격리 수준 설정
await session.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))
```
---
## 4. 데드락 방지 전략
### 4.1 핵심 원칙
1. **일관된 잠금 순서**: 항상 같은 순서로 리소스 접근
2. **짧은 트랜잭션**: 잠금 유지 시간 최소화
3. **타임아웃 설정**: 무한 대기 방지
4. **재시도 로직**: 데드락 발생 시 자동 재시도
### 4.2 잠금 순서 규칙
```python
# 올바른 순서: 항상 PK 또는 정렬된 순서로 잠금
async def lock_resources_safely(session, resource_ids: list[int]):
"""리소스를 ID 순서로 정렬하여 잠금"""
sorted_ids = sorted(resource_ids) # 정렬!
for resource_id in sorted_ids:
await session.execute(
select(Resource)
.where(Resource.id == resource_id)
.with_for_update()
)
```
### 4.3 타임아웃 설정 (MySQL)
```python
# MySQL 잠금 타임아웃 설정
await session.execute(text("SET innodb_lock_wait_timeout = 5"))
```
### 4.4 재시도 로직
```python
import asyncio
from sqlalchemy.exc import OperationalError
async def execute_with_retry(
session,
operation,
max_retries: int = 3,
base_delay: float = 0.1,
):
"""지수 백오프를 사용한 재시도 로직"""
for attempt in range(max_retries):
try:
return await operation(session)
except OperationalError as e:
if "deadlock" in str(e).lower() and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
await session.rollback()
continue
raise
```
---
## 5. 제안 코드 (MySQL 전용)
### 5.1 공통 Upsert 유틸리티
`app/utils/db_utils.py` 파일 (✅ 이미 생성됨):
```python
"""
DB 유틸리티 - Upsert 및 Lock 관리 (MySQL 전용)
MySQL의 INSERT ... ON DUPLICATE KEY UPDATE를 사용한 안전한 Upsert 패턴과
데드락 방지를 위한 잠금 관리 유틸리티를 제공합니다.
"""
import asyncio
import logging
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar
from sqlalchemy import func, select
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=DeclarativeBase)
class UpsertResult:
"""Upsert 결과를 담는 클래스"""
def __init__(self, entity: Any, created: bool):
self.entity = entity
self.created = created # True: INSERT, False: UPDATE
async def upsert_by_unique_key(
session: AsyncSession,
model: Type[T],
unique_columns: List[str],
values: Dict[str, Any],
update_columns: Optional[List[str]] = None,
lock_timeout_sec: int = 5,
) -> UpsertResult:
"""
Unique 키 기반 원자적 Upsert (MySQL ON DUPLICATE KEY UPDATE 사용)
Args:
session: AsyncSession 인스턴스
model: SQLAlchemy 모델 클래스
unique_columns: Unique 제약 컬럼 목록
values: INSERT/UPDATE 값 딕셔너리
update_columns: UPDATE 시 변경할 컬럼 목록 (None이면 unique 제외 전체)
lock_timeout_sec: 잠금 타임아웃 (초)
Returns:
UpsertResult: 엔티티와 생성 여부
Example:
result = await upsert_by_unique_key(
session=session,
model=User,
unique_columns=['kakao_id'],
values={'kakao_id': 12345, 'nickname': '홍길동', 'email': 'test@test.com'},
update_columns=['nickname', 'email'],
)
if result.created:
print("새 사용자 생성됨")
else:
print("기존 사용자 업데이트됨")
"""
from sqlalchemy import text
# MySQL 잠금 타임아웃 설정
await session.execute(text(f"SET innodb_lock_wait_timeout = {lock_timeout_sec}"))
# UPDATE 컬럼 결정
if update_columns is None:
update_columns = [k for k in values.keys() if k not in unique_columns]
# MySQL INSERT ... ON DUPLICATE KEY UPDATE
stmt = mysql_insert(model).values(**values)
update_dict = {col: stmt.inserted[col] for col in update_columns}
if hasattr(model, 'updated_at'):
update_dict['updated_at'] = func.now()
stmt = stmt.on_duplicate_key_update(**update_dict)
await session.execute(stmt)
# MySQL은 RETURNING 미지원 - 별도 조회 필요
filter_conditions = {col: values[col] for col in unique_columns}
result = await session.execute(select(model).filter_by(**filter_conditions))
entity = result.scalar_one()
# created 여부 확인 (created_at == updated_at 비교)
created = True
if hasattr(entity, 'updated_at') and hasattr(entity, 'created_at'):
if entity.created_at and entity.updated_at:
created = abs((entity.updated_at - entity.created_at).total_seconds()) < 1
return UpsertResult(entity=entity, created=created)
async def get_or_create_with_lock(
session: AsyncSession,
model: Type[T],
filter_by: Dict[str, Any],
defaults: Optional[Dict[str, Any]] = None,
lock: bool = True,
nowait: bool = False,
) -> UpsertResult:
"""
SELECT FOR UPDATE를 사용한 안전한 Get or Create
동시 요청에서도 안전하게 작동하며,
행이 존재하면 잠금 후 반환, 없으면 생성합니다.
Args:
session: AsyncSession 인스턴스
model: SQLAlchemy 모델 클래스
filter_by: 조회 조건 딕셔너리
defaults: 생성 시 추가할 기본값
lock: FOR UPDATE 잠금 사용 여부
nowait: 잠금 대기 안함 (즉시 예외 발생)
Returns:
UpsertResult: 엔티티와 생성 여부
Example:
result = await get_or_create_with_lock(
session=session,
model=User,
filter_by={'kakao_id': 12345},
defaults={'nickname': '홍길동'},
)
"""
# 조회 쿼리 구성
query = select(model).filter_by(**filter_by)
if lock:
query = query.with_for_update(nowait=nowait)
result = await session.execute(query)
entity = result.scalar_one_or_none()
if entity is not None:
# 기존 엔티티 반환
return UpsertResult(entity=entity, created=False)
# 새 엔티티 생성
create_values = {**filter_by, **(defaults or {})}
entity = model(**create_values)
session.add(entity)
try:
await session.flush()
except IntegrityError:
# 동시 INSERT로 인한 충돌 - 다시 조회
await session.rollback()
result = await session.execute(select(model).filter_by(**filter_by))
entity = result.scalar_one()
return UpsertResult(entity=entity, created=False)
return UpsertResult(entity=entity, created=True)
async def bulk_upsert(
session: AsyncSession,
model: Type[T],
unique_columns: List[str],
records: List[Dict[str, Any]],
update_columns: Optional[List[str]] = None,
) -> int:
"""
대량 Upsert (MySQL ON DUPLICATE KEY UPDATE 사용)
여러 레코드를 한 번에 Upsert합니다.
데드락 방지를 위해 unique 키 기준으로 정렬 후 처리합니다.
Unique 인덱스가 반드시 존재해야 합니다.
Args:
session: AsyncSession 인스턴스
model: SQLAlchemy 모델 클래스
unique_columns: Unique 제약 컬럼 목록
records: Upsert할 레코드 딕셔너리 목록
update_columns: UPDATE 시 변경할 컬럼 목록
Returns:
int: 처리된 레코드 수
Example:
count = await bulk_upsert(
session=session,
model=SongTimestamp,
unique_columns=['suno_audio_id', 'order_idx'],
records=[
{'suno_audio_id': 'abc', 'order_idx': 0, 'lyric_line': '가사1'},
{'suno_audio_id': 'abc', 'order_idx': 1, 'lyric_line': '가사2'},
],
)
"""
if not records:
return 0
# 데드락 방지: unique 키 기준 정렬
sorted_records = sorted(
records,
key=lambda r: tuple(r.get(col, '') for col in unique_columns)
)
# UPDATE 컬럼 결정
if update_columns is None:
all_columns = set(sorted_records[0].keys())
update_columns = list(all_columns - set(unique_columns))
# MySQL INSERT ... ON DUPLICATE KEY UPDATE
stmt = mysql_insert(model).values(sorted_records)
update_dict = {col: stmt.inserted[col] for col in update_columns}
if hasattr(model, 'updated_at'):
update_dict['updated_at'] = func.now()
stmt = stmt.on_duplicate_key_update(**update_dict)
await session.execute(stmt)
return len(sorted_records)
async def execute_with_retry(
func: Callable,
max_retries: int = 3,
base_delay: float = 0.1,
retry_on: tuple = (OperationalError,),
) -> Any:
"""
지수 백오프를 사용한 재시도 래퍼
데드락이나 일시적 오류 발생 시 자동으로 재시도합니다.
Args:
func: 실행할 비동기 함수 (인자 없음)
max_retries: 최대 재시도 횟수
base_delay: 기본 대기 시간 (초)
retry_on: 재시도할 예외 타입들
Returns:
함수 실행 결과
Example:
async def do_work():
async with AsyncSessionLocal() as session:
await upsert_by_unique_key(...)
await session.commit()
result = await execute_with_retry(do_work)
"""
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except retry_on as e:
last_exception = e
error_msg = str(e).lower()
# 데드락 또는 잠금 타임아웃인 경우만 재시도
if "deadlock" in error_msg or "lock" in error_msg:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
logger.warning(
f"DB 작업 실패 (시도 {attempt + 1}/{max_retries}), "
f"{delay:.2f}초 후 재시도: {e}"
)
await asyncio.sleep(delay)
continue
raise
raise last_exception
class LockManager:
"""
분산 잠금 관리자 (MySQL 전용)
여러 리소스에 대한 잠금을 일관된 순서로 획득하여
데드락을 방지합니다.
"""
def __init__(self, session: AsyncSession, timeout_sec: int = 5):
self.session = session
self.timeout_sec = timeout_sec
self._locked_resources: List[tuple] = []
async def __aenter__(self):
from sqlalchemy import text
# MySQL 잠금 타임아웃 설정
await self.session.execute(
text(f"SET innodb_lock_wait_timeout = {self.timeout_sec}")
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 트랜잭션 종료 시 자동으로 잠금 해제됨
self._locked_resources.clear()
async def lock_rows(
self,
model: Type[T],
ids: List[Any],
id_column: str = "id",
nowait: bool = False,
) -> List[T]:
"""
여러 행을 ID 순서로 잠금
Args:
model: 모델 클래스
ids: 잠금할 ID 목록
id_column: ID 컬럼명
nowait: 잠금 대기 안함
Returns:
잠긴 엔티티 목록
"""
if not ids:
return []
# 데드락 방지: ID 정렬
sorted_ids = sorted(ids)
column = getattr(model, id_column)
query = (
select(model)
.where(column.in_(sorted_ids))
.order_by(column) # 정렬 순서 유지
.with_for_update(nowait=nowait)
)
result = await self.session.execute(query)
entities = result.scalars().all()
self._locked_resources.append((model.__tablename__, sorted_ids))
return list(entities)
async def lock_row(
self,
model: Type[T],
id_value: Any,
id_column: str = "id",
nowait: bool = False,
) -> Optional[T]:
"""
단일 행 잠금
Args:
model: 모델 클래스
id_value: 잠금할 ID
id_column: ID 컬럼명
nowait: 잠금 대기 안함
Returns:
잠긴 엔티티 또는 None
"""
entities = await self.lock_rows(model, [id_value], id_column, nowait)
return entities[0] if entities else None
```
### 5.2 모델에 Unique Constraint 추가 (권장)
테이블별 Unique 제약 추가가 필요한 경우:
```python
# app/home/models.py - Image 테이블
class Image(Base):
__tablename__ = "image"
__table_args__ = (
# task_id + img_order 조합 유니크
Index("idx_image_task_order", "task_id", "img_order", unique=True),
...
)
# app/song/models.py - SongTimestamp 테이블
class SongTimestamp(Base):
__tablename__ = "song_timestamp"
__table_args__ = (
# suno_audio_id + order_idx 조합 유니크
Index("idx_song_ts_audio_order", "suno_audio_id", "order_idx", unique=True),
...
)
```
---
## 6. 사용 예시
### 6.1 SongTimestamp Bulk Upsert (우선순위 1 - 실제 문제 발생)
**가장 먼저 적용해야 하는 케이스입니다.** 클라이언트의 상태 폴링으로 인해 동일 데이터가 중복 삽입될 수 있습니다.
```python
# app/song/api/routers/v1/song.py
from app.utils.db_utils import bulk_upsert
# 기존 코드 (문제 있음 - 폴링 시 중복 삽입)
# for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
# song_timestamp = SongTimestamp(...)
# session.add(song_timestamp)
# 개선된 코드 (Upsert로 중복 방지)
records = [
{
'suno_audio_id': suno_audio_id,
'order_idx': idx,
'lyric_line': ts['text'],
'start_time': ts['start_sec'],
'end_time': ts['end_sec'],
}
for idx, ts in enumerate(timestamped_lyrics)
]
await bulk_upsert(
session=session,
model=SongTimestamp,
unique_columns=['suno_audio_id', 'order_idx'],
records=records,
update_columns=['lyric_line', 'start_time', 'end_time'],
)
```
### 6.2 Image 중복 방지 Upsert (우선순위 2)
```python
# app/home/api/routers/v1/home.py
from app.utils.db_utils import get_or_create_with_lock
async def save_image(session: AsyncSession, task_id: str, img_url: str, img_order: int):
"""이미지 저장 (중복 방지)"""
result = await get_or_create_with_lock(
session=session,
model=Image,
filter_by={'task_id': task_id, 'img_order': img_order},
defaults={
'img_name': f"image_{img_order}",
'img_url': img_url,
},
)
if not result.created:
# 기존 이미지 URL 업데이트
result.entity.img_url = img_url
return result.entity
```
### 6.3 User IntegrityError 처리 (✅ 적용 완료)
> **참고**: User 테이블은 OAuth 인가 코드의 일회성 특성상 동시 요청 가능성이 **매우 낮습니다**.
> `kakao_id` UNIQUE 제약이 있어 중복 시 IntegrityError가 발생하므로,
> IntegrityError 처리를 추가하여 500 에러 대신 기존 사용자를 재조회합니다.
```python
# app/user/services/auth.py - 현재 적용된 코드
from sqlalchemy.exc import IntegrityError
# 신규 사용자 생성 부분
session.add(new_user)
try:
await session.flush()
await session.refresh(new_user)
return new_user, True
except IntegrityError:
# 동시 요청으로 인한 중복 삽입 시도 - 기존 사용자 조회
logger.warning(
f"[AUTH] IntegrityError 발생 (동시 요청 추정) - kakao_id: {kakao_id}"
)
await session.rollback()
result = await session.execute(
select(User).where(User.kakao_id == kakao_id)
)
existing_user = result.scalar_one_or_none()
if existing_user is not None:
# 프로필 정보 업데이트
if profile:
existing_user.nickname = profile.nickname
existing_user.profile_image_url = profile.profile_image_url
existing_user.thumbnail_image_url = profile.thumbnail_image_url
if kakao_account and kakao_account.email:
existing_user.email = kakao_account.email
await session.flush()
return existing_user, False
# 재조회에도 실패한 경우 (매우 드문 경우)
raise
```
**이점:**
- Upsert 패턴 없이도 안전하게 처리
- 기존 코드 구조 유지
- 드문 경우의 500 에러 방지
### 6.4 재시도 로직 사용
```python
from app.utils.db_utils import execute_with_retry, bulk_upsert
async def safe_timestamp_upsert(suno_audio_id: str, timestamps: list):
"""안전한 타임스탬프 Upsert (데드락 시 재시도)"""
async def do_upsert():
async with AsyncSessionLocal() as session:
records = [
{
'suno_audio_id': suno_audio_id,
'order_idx': idx,
'lyric_line': ts['text'],
'start_time': ts['start_sec'],
'end_time': ts['end_sec'],
}
for idx, ts in enumerate(timestamps)
]
count = await bulk_upsert(
session=session,
model=SongTimestamp,
unique_columns=['suno_audio_id', 'order_idx'],
records=records,
)
await session.commit()
return count
return await execute_with_retry(do_upsert, max_retries=3)
```
### 6.5 LockManager 사용
```python
from app.utils.db_utils import LockManager
async def update_multiple_resources(session: AsyncSession, project_ids: list[int]):
"""여러 프로젝트를 안전하게 업데이트"""
async with LockManager(session, timeout_sec=10) as lock:
# 프로젝트들을 ID 순서로 잠금 (데드락 방지)
projects = await lock.lock_rows(Project, project_ids)
for project in projects:
project.status = "updated"
await session.commit()
```
---
## 7. 마이그레이션 가이드
### 7.1 단계별 적용 순서 (우선순위 기반)
동시 요청 가능성이 높은 테이블부터 적용합니다:
1. **1단계**: `app/utils/db_utils.py` 파일 생성 ✅ (이미 완료)
2. **2단계**: **SongTimestamp** - Unique 인덱스 추가 + `bulk_upsert` 적용 (우선순위 1)
- 동시 요청 가능성 **높음** (폴링으로 인한 중복 삽입)
3. **3단계**: **Image** - Unique 인덱스 추가 + `get_or_create_with_lock` 적용 (우선순위 2)
- 동시 요청 가능성 **중간** (업로드 재시도)
4. **4단계**: **Song/Video** - 필요시 `get_or_create_with_lock` 적용 (우선순위 3-4)
- 동시 요청 가능성 중간 (백그라운드 태스크)
5. **5단계**: **User** - ✅ IntegrityError 처리 추가 완료
- 동시 요청 가능성 **낮음** (OAuth 인가 코드 일회성)
- `kakao_id` UNIQUE 제약 + IntegrityError 발생 시 기존 사용자 재조회 처리
> **권장**: 1~3단계까지 우선 적용하고, 나머지는 필요에 따라 적용
### 7.2 Alembic 마이그레이션 예시
```python
"""Add unique constraints for upsert support
Revision ID: xxxx
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
# SongTimestamp: suno_audio_id + order_idx unique
op.create_index(
'idx_song_ts_audio_order',
'song_timestamp',
['suno_audio_id', 'order_idx'],
unique=True
)
# Image: task_id + img_order unique (선택적)
op.create_index(
'idx_image_task_order',
'image',
['task_id', 'img_order'],
unique=True
)
def downgrade():
op.drop_index('idx_song_ts_audio_order', table_name='song_timestamp')
op.drop_index('idx_image_task_order', table_name='image')
```
---
## 8. 모니터링 및 디버깅
### 8.1 잠금 모니터링 쿼리 (MySQL)
```sql
-- 현재 잠금 상태 확인 (InnoDB)
SELECT
r.trx_id AS waiting_trx_id,
r.trx_mysql_thread_id AS waiting_thread,
r.trx_query AS waiting_query,
b.trx_id AS blocking_trx_id,
b.trx_mysql_thread_id AS blocking_thread,
b.trx_query AS blocking_query
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;
-- 현재 실행 중인 트랜잭션 확인
SELECT * FROM information_schema.innodb_trx;
-- 데드락 로그 확인
SHOW ENGINE INNODB STATUS;
-- 잠금 대기 중인 쿼리 확인
SELECT
pl.id,
pl.user,
pl.state,
pl.info AS query
FROM information_schema.processlist pl
WHERE pl.state LIKE '%lock%';
```
### 8.2 로깅 설정
```python
# config.py 또는 logging 설정
import logging
# SQLAlchemy 엔진 로깅 (디버그 시)
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
# Upsert 유틸리티 로깅
logging.getLogger('app.utils.db_utils').setLevel(logging.DEBUG)
```
---
## 9. 요약
| 상황 | 권장 패턴 | 함수 |
|------|-----------|------|
| 단일 레코드 Upsert (Unique 키 존재) | ON CONFLICT | `upsert_by_unique_key()` |
| 단일 레코드 Get or Create | SELECT FOR UPDATE | `get_or_create_with_lock()` |
| 대량 레코드 Upsert | Bulk ON CONFLICT | `bulk_upsert()` |
| 데드락 방지 재시도 | 지수 백오프 | `execute_with_retry()` |
| 다중 행 잠금 | 정렬된 순서 잠금 | `LockManager` |
---
**작성일**: 2026-01-26
**작성자**: Claude Code (AI Assistant)

408
docs/plan/fix_plan.md Normal file
View File

@ -0,0 +1,408 @@
# API 타임아웃 및 재시도 로직 개선 계획
## 개요
외부 API 호출 시 타임아웃 미설정 및 재시도 로직 부재로 인한 안정성 문제를 해결합니다.
---
## 현재 상태
| 모듈 | 외부 API | 타임아웃 | 재시도 |
|------|----------|----------|--------|
| Lyric | ChatGPT (OpenAI) | ❌ 미설정 (SDK 기본 ~600초) | ❌ 없음 |
| Song | Suno API | ✅ 30-120초 | ❌ 없음 |
| Video | Creatomate API | ✅ 30-60초 | ❌ 없음 |
---
## 수정 계획
### 1. ChatGPT API 타임아웃 설정
**파일:** `app/utils/chatgpt_prompt.py`
**현재 코드:**
```python
class ChatgptService:
def __init__(self):
self.client = AsyncOpenAI(api_key=apikey_settings.CHATGPT_API_KEY)
```
**수정 코드:**
```python
class ChatgptService:
# 타임아웃 설정 (초)
DEFAULT_TIMEOUT = 60.0 # 전체 타임아웃
CONNECT_TIMEOUT = 10.0 # 연결 타임아웃
def __init__(self):
self.client = AsyncOpenAI(
api_key=apikey_settings.CHATGPT_API_KEY,
timeout=httpx.Timeout(
self.DEFAULT_TIMEOUT,
connect=self.CONNECT_TIMEOUT,
),
)
```
**필요한 import 추가:**
```python
import httpx
```
---
### 2. 재시도 유틸리티 함수 생성
**파일:** `app/utils/retry.py` (새 파일)
```python
"""
API 호출 재시도 유틸리티
지수 백오프(Exponential Backoff)를 사용한 재시도 로직을 제공합니다.
"""
import asyncio
import logging
from functools import wraps
from typing import Callable, Tuple, Type
logger = logging.getLogger(__name__)
class RetryExhaustedError(Exception):
"""모든 재시도 실패 시 발생하는 예외"""
def __init__(self, message: str, last_exception: Exception):
super().__init__(message)
self.last_exception = last_exception
async def retry_async(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
exponential_base: float = 2.0,
retry_on: Tuple[Type[Exception], ...] = (Exception,),
on_retry: Callable[[int, Exception], None] | None = None,
):
"""
비동기 함수 재시도 실행
Args:
func: 실행할 비동기 함수 (인자 없음)
max_retries: 최대 재시도 횟수 (기본: 3)
base_delay: 첫 번째 재시도 대기 시간 (초)
max_delay: 최대 대기 시간 (초)
exponential_base: 지수 백오프 배수 (기본: 2.0)
retry_on: 재시도할 예외 타입들
on_retry: 재시도 시 호출될 콜백 (attempt, exception)
Returns:
함수 실행 결과
Raises:
RetryExhaustedError: 모든 재시도 실패 시
Example:
result = await retry_async(
lambda: api_call(),
max_retries=3,
retry_on=(httpx.TimeoutException, httpx.HTTPStatusError),
)
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func()
except retry_on as e:
last_exception = e
if attempt == max_retries:
break
# 지수 백오프 계산
delay = min(base_delay * (exponential_base ** attempt), max_delay)
logger.warning(
f"[retry_async] 시도 {attempt + 1}/{max_retries + 1} 실패, "
f"{delay:.1f}초 후 재시도: {type(e).__name__}: {e}"
)
if on_retry:
on_retry(attempt + 1, e)
await asyncio.sleep(delay)
raise RetryExhaustedError(
f"최대 재시도 횟수({max_retries + 1}회) 초과",
last_exception,
)
def with_retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retry_on: Tuple[Type[Exception], ...] = (Exception,),
):
"""
재시도 데코레이터
Args:
max_retries: 최대 재시도 횟수
base_delay: 첫 번째 재시도 대기 시간 (초)
max_delay: 최대 대기 시간 (초)
retry_on: 재시도할 예외 타입들
Example:
@with_retry(max_retries=3, retry_on=(httpx.TimeoutException,))
async def call_api():
...
"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
return await retry_async(
lambda: func(*args, **kwargs),
max_retries=max_retries,
base_delay=base_delay,
max_delay=max_delay,
retry_on=retry_on,
)
return wrapper
return decorator
```
---
### 3. Suno API 재시도 로직 적용
**파일:** `app/utils/suno.py`
**수정 대상 메서드:**
- `generate()` - 노래 생성 요청
- `get_task_status()` - 상태 조회
- `get_lyric_timestamp()` - 타임스탬프 조회
**수정 예시 (generate 메서드):**
```python
# 상단 import 추가
import httpx
from app.utils.retry import retry_async
# 재시도 대상 예외 정의
RETRY_EXCEPTIONS = (
httpx.TimeoutException,
httpx.ConnectError,
httpx.ReadError,
)
async def generate(
self,
prompt: str,
genre: str | None = None,
callback_url: str | None = None,
) -> str:
# ... 기존 payload 구성 코드 ...
async def _call_api():
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/generate",
headers=self.headers,
json=payload,
timeout=30.0,
)
response.raise_for_status()
return response.json()
# 재시도 로직 적용
data = await retry_async(
_call_api,
max_retries=3,
base_delay=1.0,
retry_on=RETRY_EXCEPTIONS,
)
# ... 기존 응답 처리 코드 ...
```
---
### 4. Creatomate API 재시도 로직 적용
**파일:** `app/utils/creatomate.py`
**수정 대상:**
- `_request()` 메서드 (모든 API 호출의 기반)
**수정 코드:**
```python
# 상단 import 추가
from app.utils.retry import retry_async
# 재시도 대상 예외 정의
RETRY_EXCEPTIONS = (
httpx.TimeoutException,
httpx.ConnectError,
httpx.ReadError,
)
async def _request(
self,
method: str,
url: str,
timeout: float = 30.0,
max_retries: int = 3,
**kwargs,
) -> httpx.Response:
"""HTTP 요청을 수행합니다 (재시도 로직 포함)."""
logger.info(f"[Creatomate] {method} {url}")
async def _call():
client = await get_shared_client()
if method.upper() == "GET":
response = await client.get(
url, headers=self.headers, timeout=timeout, **kwargs
)
elif method.upper() == "POST":
response = await client.post(
url, headers=self.headers, timeout=timeout, **kwargs
)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response
response = await retry_async(
_call,
max_retries=max_retries,
base_delay=1.0,
retry_on=RETRY_EXCEPTIONS,
)
logger.info(f"[Creatomate] Response - Status: {response.status_code}")
return response
```
---
### 5. ChatGPT API 재시도 로직 적용
**파일:** `app/utils/chatgpt_prompt.py`
**수정 코드:**
```python
# 상단 import 추가
import httpx
from openai import APITimeoutError, APIConnectionError, RateLimitError
from app.utils.retry import retry_async
# 재시도 대상 예외 정의
RETRY_EXCEPTIONS = (
APITimeoutError,
APIConnectionError,
RateLimitError,
)
class ChatgptService:
DEFAULT_TIMEOUT = 60.0
CONNECT_TIMEOUT = 10.0
MAX_RETRIES = 3
def __init__(self):
self.client = AsyncOpenAI(
api_key=apikey_settings.CHATGPT_API_KEY,
timeout=httpx.Timeout(
self.DEFAULT_TIMEOUT,
connect=self.CONNECT_TIMEOUT,
),
)
async def _call_structured_output_with_response_gpt_api(
self, prompt: str, output_format: dict, model: str
) -> dict:
content = [{"type": "input_text", "text": prompt}]
async def _call():
response = await self.client.responses.create(
model=model,
input=[{"role": "user", "content": content}],
text=output_format,
)
return json.loads(response.output_text) or {}
return await retry_async(
_call,
max_retries=self.MAX_RETRIES,
base_delay=2.0, # OpenAI Rate Limit 대비 더 긴 대기
retry_on=RETRY_EXCEPTIONS,
)
```
---
## 타임아웃 설정 권장값
| API | 용도 | 권장 타임아웃 | 재시도 횟수 | 재시도 간격 |
|-----|------|---------------|-------------|-------------|
| ChatGPT | 가사 생성 | 60초 | 3회 | 2초 → 4초 → 8초 |
| Suno | 노래 생성 요청 | 30초 | 3회 | 1초 → 2초 → 4초 |
| Suno | 상태 조회 | 30초 | 2회 | 1초 → 2초 |
| Suno | 타임스탬프 | 120초 | 2회 | 2초 → 4초 |
| Creatomate | 템플릿 조회 | 30초 | 2회 | 1초 → 2초 |
| Creatomate | 렌더링 요청 | 60초 | 3회 | 1초 → 2초 → 4초 |
| Creatomate | 상태 조회 | 30초 | 2회 | 1초 → 2초 |
---
## 구현 순서
1. **1단계: retry.py 유틸리티 생성**
- 재사용 가능한 재시도 로직 구현
- 단위 테스트 작성
2. **2단계: ChatGPT 타임아웃 설정**
- 가장 시급한 문제 (현재 600초 기본값)
- 타임아웃 + 재시도 동시 적용
3. **3단계: Suno API 재시도 적용**
- generate(), get_task_status(), get_lyric_timestamp()
4. **4단계: Creatomate API 재시도 적용**
- _request() 메서드 수정으로 전체 적용
---
## 테스트 체크리스트
각 수정 후 확인 사항:
- [ ] 정상 요청 시 기존과 동일하게 동작
- [ ] 타임아웃 발생 시 지정된 시간 내 예외 발생
- [ ] 일시적 오류 시 재시도 후 성공
- [ ] 모든 재시도 실패 시 적절한 에러 메시지 반환
- [ ] 로그에 재시도 시도 기록 확인
---
## 롤백 계획
문제 발생 시:
1. retry.py 사용 코드 제거 (기존 직접 호출로 복구)
2. ChatGPT 타임아웃 설정 제거 (SDK 기본값으로 복구)
---
## 참고 사항
- OpenAI SDK는 내부적으로 일부 재시도 로직이 있으나, 커스텀 제어가 제한적
- httpx의 `TimeoutException``ConnectTimeout`, `ReadTimeout`, `WriteTimeout`, `PoolTimeout`을 포함
- Rate Limit 에러(429)는 재시도 시 더 긴 대기 시간 필요 (Retry-After 헤더 참고)

View File

@ -0,0 +1,447 @@
# INSERT → Upsert 변환 계획서
## 개요
이 문서는 프로젝트 내 INSERT 코드들을 **존재 여부 확인 후 INSERT 또는 DB Lock + UPDATE** 패턴으로 변환하기 위한 계획입니다.
### 적용 범위
- ✅ 포함: video, song, lyric 모듈의 INSERT 코드
- ❌ 제외: user 모듈, 이미지 업로드 엔드포인트
### 사용할 유틸리티
- `app/utils/db_utils.py``get_or_create_with_lock()` 함수
- `app/utils/db_utils.py``bulk_upsert()` 함수 (대량 INSERT용)
---
## 대상 INSERT 위치 목록
| # | 파일 | 라인 | 테이블 | 키 컬럼 | 우선순위 |
|---|------|------|--------|---------|----------|
| 1 | video.py | 252 | Video | task_id | 중 |
| 2 | song.py | 188 | Song | task_id | 중 |
| 3 | song.py | 477 | SongTimestamp | suno_audio_id + order_idx | **높음** |
| 4 | lyric.py | 297 | Project | task_id | 중 |
| 5 | lyric.py | 317 | Lyric | task_id | 중 |
---
## 1. Video INSERT (video.py:252)
### 현재 코드
```python
# app/video/api/routers/v1/video.py:244-254
video = Video(
project_id=project_id,
lyric_id=lyric_id,
song_id=song_id,
task_id=task_id,
creatomate_render_id=None,
status="processing",
)
session.add(video)
await session.commit()
video_id = video.id
```
### 문제점
- `task_id`로 기존 레코드 존재 여부를 확인하지 않음
- 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능
### 수정 계획
```python
# app/video/api/routers/v1/video.py
# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (244-254행)
result = await get_or_create_with_lock(
session=session,
model=Video,
filter_by={'task_id': task_id},
defaults={
'project_id': project_id,
'lyric_id': lyric_id,
'song_id': song_id,
'creatomate_render_id': None,
'status': 'processing',
},
lock=True,
)
video = result.entity
if not result.created:
# 이미 존재하는 경우: 상태 업데이트
video.project_id = project_id
video.lyric_id = lyric_id
video.song_id = song_id
video.status = 'processing'
video.creatomate_render_id = None
await session.commit()
video_id = video.id
```
### 필수 사전 작업
- Video 테이블에 `task_id` UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id);
```
---
## 2. Song INSERT (song.py:188)
### 현재 코드
```python
# app/song/api/routers/v1/song.py:179-191
song = Song(
project_id=project_id,
lyric_id=lyric_id,
task_id=task_id,
suno_task_id=None,
status="processing",
song_prompt=song_prompt,
language=request_body.language,
)
session.add(song)
await session.commit()
song_id = song.id
```
### 문제점
- `task_id`로 기존 레코드 존재 여부를 확인하지 않음
- 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능
### 수정 계획
```python
# app/song/api/routers/v1/song.py
# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (179-191행)
result = await get_or_create_with_lock(
session=session,
model=Song,
filter_by={'task_id': task_id},
defaults={
'project_id': project_id,
'lyric_id': lyric_id,
'suno_task_id': None,
'status': 'processing',
'song_prompt': song_prompt,
'language': request_body.language,
},
lock=True,
)
song = result.entity
if not result.created:
# 이미 존재하는 경우: 상태 업데이트
song.project_id = project_id
song.lyric_id = lyric_id
song.status = 'processing'
song.song_prompt = song_prompt
song.language = request_body.language
await session.commit()
song_id = song.id
```
### 필수 사전 작업
- Song 테이블에 `task_id` UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id);
```
---
## 3. SongTimestamp INSERT (song.py:477) ⚠️ 높은 우선순위
### 현재 코드
```python
# app/song/api/routers/v1/song.py:467-479
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
song_timestamp = SongTimestamp(
suno_audio_id=suno_audio_id,
order_idx=order_idx,
lyric_line=timestamped_lyric["text"],
start_time=timestamped_lyric["start_sec"],
end_time=timestamped_lyric["end_sec"],
)
session.add(song_timestamp)
await session.commit()
```
### 문제점
- **폴링 기반 요청으로 인해 중복 INSERT 위험이 높음**
- `suno_audio_id` + `order_idx` 조합으로 존재 여부 확인 없음
- 여러 행을 루프에서 개별 INSERT하여 비효율적
### 수정 계획 (bulk_upsert 사용)
```python
# app/song/api/routers/v1/song.py
# 상단 import 추가
from app.utils.db_utils import bulk_upsert
# 기존 코드 대체 (467-479행)
timestamp_records = [
{
'suno_audio_id': suno_audio_id,
'order_idx': order_idx,
'lyric_line': timestamped_lyric["text"],
'start_time': timestamped_lyric["start_sec"],
'end_time': timestamped_lyric["end_sec"],
}
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics)
]
await bulk_upsert(
session=session,
model=SongTimestamp,
unique_columns=['suno_audio_id', 'order_idx'],
records=timestamp_records,
update_columns=['lyric_line', 'start_time', 'end_time'],
)
await session.commit()
```
### 필수 사전 작업
- SongTimestamp 테이블에 복합 UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE song_timestamp
ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx);
```
### 대안: get_or_create_with_lock 사용 (개별 처리)
만약 `bulk_upsert`를 사용하지 않고 개별 처리가 필요한 경우:
```python
from app.utils.db_utils import get_or_create_with_lock
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
result = await get_or_create_with_lock(
session=session,
model=SongTimestamp,
filter_by={
'suno_audio_id': suno_audio_id,
'order_idx': order_idx,
},
defaults={
'lyric_line': timestamped_lyric["text"],
'start_time': timestamped_lyric["start_sec"],
'end_time': timestamped_lyric["end_sec"],
},
lock=True,
)
if not result.created:
# 이미 존재하는 경우: 업데이트
result.entity.lyric_line = timestamped_lyric["text"]
result.entity.start_time = timestamped_lyric["start_sec"]
result.entity.end_time = timestamped_lyric["end_sec"]
await session.commit()
```
**권장사항**: `bulk_upsert` 사용 (성능 및 데드락 방지 측면에서 우수)
---
## 4. Project INSERT (lyric.py:297)
### 현재 코드
```python
# app/lyric/api/routers/v1/lyric.py:290-299
project = Project(
store_name=request_body.customer_name,
region=request_body.region,
task_id=task_id,
detail_region_info=request_body.detail_region_info,
language=request_body.language,
)
session.add(project)
await session.commit()
await session.refresh(project)
```
### 문제점
- `task_id`로 기존 레코드 존재 여부를 확인하지 않음
- 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능
### 수정 계획
```python
# app/lyric/api/routers/v1/lyric.py
# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (290-299행)
result = await get_or_create_with_lock(
session=session,
model=Project,
filter_by={'task_id': task_id},
defaults={
'store_name': request_body.customer_name,
'region': request_body.region,
'detail_region_info': request_body.detail_region_info,
'language': request_body.language,
},
lock=True,
)
project = result.entity
if not result.created:
# 이미 존재하는 경우: 정보 업데이트
project.store_name = request_body.customer_name
project.region = request_body.region
project.detail_region_info = request_body.detail_region_info
project.language = request_body.language
await session.commit()
await session.refresh(project)
```
### 필수 사전 작업
- Project 테이블에 `task_id` UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id);
```
---
## 5. Lyric INSERT (lyric.py:317)
### 현재 코드
```python
# app/lyric/api/routers/v1/lyric.py:308-319
estimated_prompt = lyric_prompt.build_prompt(lyric_input_data)
lyric = Lyric(
project_id=project.id,
task_id=task_id,
status="processing",
lyric_prompt=estimated_prompt,
lyric_result=None,
language=request_body.language,
)
session.add(lyric)
await session.commit()
await session.refresh(lyric)
```
### 문제점
- `task_id`로 기존 레코드 존재 여부를 확인하지 않음
- 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능
### 수정 계획
```python
# app/lyric/api/routers/v1/lyric.py
# 상단 import (이미 추가되어 있음)
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (308-319행)
estimated_prompt = lyric_prompt.build_prompt(lyric_input_data)
result = await get_or_create_with_lock(
session=session,
model=Lyric,
filter_by={'task_id': task_id},
defaults={
'project_id': project.id,
'status': 'processing',
'lyric_prompt': estimated_prompt,
'lyric_result': None,
'language': request_body.language,
},
lock=True,
)
lyric = result.entity
if not result.created:
# 이미 존재하는 경우: 정보 업데이트
lyric.project_id = project.id
lyric.status = 'processing'
lyric.lyric_prompt = estimated_prompt
lyric.lyric_result = None
lyric.language = request_body.language
await session.commit()
await session.refresh(lyric)
```
### 필수 사전 작업
- Lyric 테이블에 `task_id` UNIQUE 인덱스 추가 필요
```sql
ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id);
```
---
## 구현 순서 권장
1. **1단계: DB 마이그레이션** (필수)
```sql
-- 모든 UNIQUE 인덱스 추가
ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id);
ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id);
ALTER TABLE song_timestamp ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx);
ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id);
ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id);
```
2. **2단계: SongTimestamp (높은 우선순위)**
- 폴링으로 인한 중복 INSERT 위험이 가장 높음
- `bulk_upsert` 사용 권장
3. **3단계: 나머지 테이블 (중간 우선순위)**
- Video, Song, Project, Lyric 순으로 적용
- 모두 `get_or_create_with_lock` 사용
---
## 롤백 계획
문제 발생 시 원래 코드로 복구:
1. 코드 변경 사항 git revert
2. UNIQUE 인덱스는 유지 (데이터 무결성에 도움됨)
---
## 테스트 체크리스트
각 수정 후 확인 사항:
- [ ] 새로운 task_id로 요청 시 정상 INSERT
- [ ] 동일한 task_id로 재요청 시 UPDATE (에러 없이)
- [ ] 동시 요청 테스트 (2개 이상 동시 요청)
- [ ] 성능 저하 없는지 확인
---
## 참고 문서
- [db_lock.md](./db_lock.md) - DB Lock 및 Upsert 패턴 가이드
- [db_utils.py](../../app/utils/db_utils.py) - 유틸리티 함수 구현체