886 lines
27 KiB
Markdown
886 lines
27 KiB
Markdown
# DB Lock 및 Upsert 패턴 가이드 (MySQL 전용)
|
|
|
|
## 목차
|
|
1. [현재 Insert 사용 현황](#1-현재-insert-사용-현황)
|
|
2. [Upsert 패턴 설계](#2-upsert-패턴-설계)
|
|
3. [DB Lock 전략](#3-db-lock-전략)
|
|
4. [데드락 방지 전략](#4-데드락-방지-전략)
|
|
5. [제안 코드](#5-제안-코드)
|
|
6. [사용 예시](#6-사용-예시)
|
|
|
|
---
|
|
|
|
## 1. 현재 Insert 사용 현황
|
|
|
|
### 1.1 테이블별 Insert 분석 및 우선순위
|
|
|
|
| 테이블 | 파일 위치 | Unique 제약 | 동시 요청 가능성 | Upsert 우선순위 | 키 조합 |
|
|
|--------|-----------|-------------|------------------|-----------------|---------|
|
|
| **SongTimestamp** | song.py:477 | - | **높음** | **1순위** | suno_audio_id + order_idx |
|
|
| **Image** | home.py:503,552,799,821 | - | **중간** | **2순위** | task_id + img_order |
|
|
| **Song** | song.py:188 | - | 중간 | 3순위 | task_id |
|
|
| **Video** | video.py:252 | - | 중간 | 4순위 | task_id |
|
|
| **User** | auth.py:278 | `kakao_id` | **낮음** | ✅ 완료 | kakao_id |
|
|
| **Project** | lyric.py:297 | - | 낮음 | 선택 | task_id |
|
|
| **Lyric** | lyric.py:317 | - | 낮음 | 선택 | task_id |
|
|
| **RefreshToken** | auth.py:315 | `token_hash` | - | 불필요 | - (항상 새로 생성) |
|
|
|
|
#### 동시 요청 가능성 분석
|
|
|
|
| 가능성 | 테이블 | 발생 시나리오 |
|
|
|--------|--------|---------------|
|
|
| **높음** | SongTimestamp | 클라이언트가 상태 조회 API를 여러 번 호출 (폴링) → 동일 데이터 중복 삽입 |
|
|
| **중간** | Image | 네트워크 오류로 업로드 재시도, 클라이언트 중복 클릭 |
|
|
| **중간** | Song/Video | 백그라운드 태스크 재실행, 상태 확인 중복 호출 |
|
|
| **낮음** | User | OAuth 인가 코드 일회성으로 동시 요청 거의 불가능 |
|
|
|
|
> **참고**: User 테이블의 경우 카카오 OAuth 인가 코드(authorization code)가 **일회성**이므로,
|
|
> 동일한 코드로 동시 요청은 불가능합니다. 다만 여러 탭에서 각각 로그인을 시작하는 극히 드문 경우에만 발생 가능합니다.
|
|
|
|
### 1.2 현재 코드 패턴의 문제점
|
|
|
|
**실제 문제가 발생하는 케이스: SongTimestamp**
|
|
|
|
```python
|
|
# song.py:467-479 - 현재 패턴
|
|
# 클라이언트가 /song/status/{song_id}를 여러 번 호출하면 중복 삽입 발생!
|
|
|
|
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
|
|
song_timestamp = SongTimestamp(
|
|
suno_audio_id=suno_audio_id,
|
|
order_idx=order_idx,
|
|
lyric_line=timestamped_lyric["text"],
|
|
start_time=timestamped_lyric["start_sec"],
|
|
end_time=timestamped_lyric["end_sec"],
|
|
)
|
|
session.add(song_timestamp) # 동일 suno_audio_id로 중복 삽입!
|
|
|
|
await session.commit()
|
|
```
|
|
|
|
**문제 시나리오:**
|
|
1. 클라이언트가 노래 생성 상태 확인을 위해 폴링
|
|
2. SUCCESS 응답을 받은 후 타임스탬프 저장 로직 실행
|
|
3. 네트워크 지연으로 클라이언트가 재요청
|
|
4. **동일한 데이터가 중복 삽입됨**
|
|
|
|
---
|
|
|
|
## 2. Upsert 패턴 설계 (MySQL 전용)
|
|
|
|
### 2.1 MySQL ON DUPLICATE KEY UPDATE (권장)
|
|
|
|
MySQL의 `INSERT ... ON DUPLICATE KEY UPDATE` 절을 사용한 원자적 Upsert:
|
|
|
|
```python
|
|
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
|
|
|
stmt = mysql_insert(User).values(
|
|
kakao_id=kakao_id,
|
|
nickname=nickname,
|
|
email=email,
|
|
)
|
|
stmt = stmt.on_duplicate_key_update(
|
|
nickname=stmt.inserted.nickname, # MySQL은 stmt.inserted 사용
|
|
email=stmt.inserted.email,
|
|
updated_at=func.now(),
|
|
)
|
|
await session.execute(stmt)
|
|
|
|
# MySQL은 RETURNING 미지원 - 별도 조회 필요
|
|
result = await session.execute(select(User).where(User.kakao_id == kakao_id))
|
|
user = result.scalar_one()
|
|
```
|
|
|
|
**장점:**
|
|
- 원자적 연산 (단일 쿼리)
|
|
- 데드락 위험 최소화
|
|
- 동시 요청에서도 안전
|
|
|
|
**전제 조건:**
|
|
- Unique 인덱스 필수 (없으면 항상 INSERT만 됨)
|
|
|
|
### 2.3 비관적 잠금 (Pessimistic Locking)
|
|
|
|
`SELECT ... FOR UPDATE`를 사용한 행 수준 잠금:
|
|
|
|
```python
|
|
result = await session.execute(
|
|
select(User)
|
|
.where(User.kakao_id == kakao_id)
|
|
.with_for_update() # FOR UPDATE 잠금
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
```
|
|
|
|
---
|
|
|
|
## 3. DB Lock 전략
|
|
|
|
### 3.1 잠금 유형
|
|
|
|
| 잠금 유형 | 사용 시점 | SQLAlchemy 구현 |
|
|
|-----------|-----------|-----------------|
|
|
| **공유 잠금 (Shared)** | 읽기 작업 | `.with_for_update(read=True)` |
|
|
| **배타 잠금 (Exclusive)** | 쓰기 작업 | `.with_for_update()` |
|
|
| **NOWAIT** | 즉시 실패 | `.with_for_update(nowait=True)` |
|
|
| **SKIP LOCKED** | 잠긴 행 건너뛰기 | `.with_for_update(skip_locked=True)` |
|
|
|
|
### 3.2 잠금 범위 선택
|
|
|
|
```python
|
|
# 1. 행 수준 잠금 (Row-level) - 권장
|
|
select(User).where(User.id == user_id).with_for_update()
|
|
|
|
# 2. 키 범위 잠금 (Key-range) - 범위 조회 시
|
|
select(Image).where(Image.task_id == task_id).with_for_update()
|
|
|
|
# 3. 테이블 잠금 - 피해야 함 (성능 저하)
|
|
```
|
|
|
|
### 3.3 트랜잭션 격리 수준
|
|
|
|
```python
|
|
from sqlalchemy import text
|
|
|
|
# 세션별 격리 수준 설정
|
|
await session.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))
|
|
```
|
|
|
|
---
|
|
|
|
## 4. 데드락 방지 전략
|
|
|
|
### 4.1 핵심 원칙
|
|
|
|
1. **일관된 잠금 순서**: 항상 같은 순서로 리소스 접근
|
|
2. **짧은 트랜잭션**: 잠금 유지 시간 최소화
|
|
3. **타임아웃 설정**: 무한 대기 방지
|
|
4. **재시도 로직**: 데드락 발생 시 자동 재시도
|
|
|
|
### 4.2 잠금 순서 규칙
|
|
|
|
```python
|
|
# 올바른 순서: 항상 PK 또는 정렬된 순서로 잠금
|
|
async def lock_resources_safely(session, resource_ids: list[int]):
|
|
"""리소스를 ID 순서로 정렬하여 잠금"""
|
|
sorted_ids = sorted(resource_ids) # 정렬!
|
|
|
|
for resource_id in sorted_ids:
|
|
await session.execute(
|
|
select(Resource)
|
|
.where(Resource.id == resource_id)
|
|
.with_for_update()
|
|
)
|
|
```
|
|
|
|
### 4.3 타임아웃 설정 (MySQL)
|
|
|
|
```python
|
|
# MySQL 잠금 타임아웃 설정
|
|
await session.execute(text("SET innodb_lock_wait_timeout = 5"))
|
|
```
|
|
|
|
### 4.4 재시도 로직
|
|
|
|
```python
|
|
import asyncio
|
|
from sqlalchemy.exc import OperationalError
|
|
|
|
async def execute_with_retry(
|
|
session,
|
|
operation,
|
|
max_retries: int = 3,
|
|
base_delay: float = 0.1,
|
|
):
|
|
"""지수 백오프를 사용한 재시도 로직"""
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await operation(session)
|
|
except OperationalError as e:
|
|
if "deadlock" in str(e).lower() and attempt < max_retries - 1:
|
|
delay = base_delay * (2 ** attempt)
|
|
await asyncio.sleep(delay)
|
|
await session.rollback()
|
|
continue
|
|
raise
|
|
```
|
|
|
|
---
|
|
|
|
## 5. 제안 코드 (MySQL 전용)
|
|
|
|
### 5.1 공통 Upsert 유틸리티
|
|
|
|
`app/utils/db_utils.py` 파일 (✅ 이미 생성됨):
|
|
|
|
```python
|
|
"""
|
|
DB 유틸리티 - Upsert 및 Lock 관리 (MySQL 전용)
|
|
|
|
MySQL의 INSERT ... ON DUPLICATE KEY UPDATE를 사용한 안전한 Upsert 패턴과
|
|
데드락 방지를 위한 잠금 관리 유틸리티를 제공합니다.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
|
from sqlalchemy.exc import IntegrityError, OperationalError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import DeclarativeBase
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
T = TypeVar("T", bound=DeclarativeBase)
|
|
|
|
|
|
class UpsertResult:
|
|
"""Upsert 결과를 담는 클래스"""
|
|
|
|
def __init__(self, entity: Any, created: bool):
|
|
self.entity = entity
|
|
self.created = created # True: INSERT, False: UPDATE
|
|
|
|
|
|
async def upsert_by_unique_key(
|
|
session: AsyncSession,
|
|
model: Type[T],
|
|
unique_columns: List[str],
|
|
values: Dict[str, Any],
|
|
update_columns: Optional[List[str]] = None,
|
|
lock_timeout_sec: int = 5,
|
|
) -> UpsertResult:
|
|
"""
|
|
Unique 키 기반 원자적 Upsert (MySQL ON DUPLICATE KEY UPDATE 사용)
|
|
|
|
Args:
|
|
session: AsyncSession 인스턴스
|
|
model: SQLAlchemy 모델 클래스
|
|
unique_columns: Unique 제약 컬럼 목록
|
|
values: INSERT/UPDATE 값 딕셔너리
|
|
update_columns: UPDATE 시 변경할 컬럼 목록 (None이면 unique 제외 전체)
|
|
lock_timeout_sec: 잠금 타임아웃 (초)
|
|
|
|
Returns:
|
|
UpsertResult: 엔티티와 생성 여부
|
|
|
|
Example:
|
|
result = await upsert_by_unique_key(
|
|
session=session,
|
|
model=User,
|
|
unique_columns=['kakao_id'],
|
|
values={'kakao_id': 12345, 'nickname': '홍길동', 'email': 'test@test.com'},
|
|
update_columns=['nickname', 'email'],
|
|
)
|
|
if result.created:
|
|
print("새 사용자 생성됨")
|
|
else:
|
|
print("기존 사용자 업데이트됨")
|
|
"""
|
|
from sqlalchemy import text
|
|
|
|
# MySQL 잠금 타임아웃 설정
|
|
await session.execute(text(f"SET innodb_lock_wait_timeout = {lock_timeout_sec}"))
|
|
|
|
# UPDATE 컬럼 결정
|
|
if update_columns is None:
|
|
update_columns = [k for k in values.keys() if k not in unique_columns]
|
|
|
|
# MySQL INSERT ... ON DUPLICATE KEY UPDATE
|
|
stmt = mysql_insert(model).values(**values)
|
|
|
|
update_dict = {col: stmt.inserted[col] for col in update_columns}
|
|
if hasattr(model, 'updated_at'):
|
|
update_dict['updated_at'] = func.now()
|
|
|
|
stmt = stmt.on_duplicate_key_update(**update_dict)
|
|
|
|
await session.execute(stmt)
|
|
|
|
# MySQL은 RETURNING 미지원 - 별도 조회 필요
|
|
filter_conditions = {col: values[col] for col in unique_columns}
|
|
result = await session.execute(select(model).filter_by(**filter_conditions))
|
|
entity = result.scalar_one()
|
|
|
|
# created 여부 확인 (created_at == updated_at 비교)
|
|
created = True
|
|
if hasattr(entity, 'updated_at') and hasattr(entity, 'created_at'):
|
|
if entity.created_at and entity.updated_at:
|
|
created = abs((entity.updated_at - entity.created_at).total_seconds()) < 1
|
|
|
|
return UpsertResult(entity=entity, created=created)
|
|
|
|
|
|
async def get_or_create_with_lock(
|
|
session: AsyncSession,
|
|
model: Type[T],
|
|
filter_by: Dict[str, Any],
|
|
defaults: Optional[Dict[str, Any]] = None,
|
|
lock: bool = True,
|
|
nowait: bool = False,
|
|
) -> UpsertResult:
|
|
"""
|
|
SELECT FOR UPDATE를 사용한 안전한 Get or Create
|
|
|
|
동시 요청에서도 안전하게 작동하며,
|
|
행이 존재하면 잠금 후 반환, 없으면 생성합니다.
|
|
|
|
Args:
|
|
session: AsyncSession 인스턴스
|
|
model: SQLAlchemy 모델 클래스
|
|
filter_by: 조회 조건 딕셔너리
|
|
defaults: 생성 시 추가할 기본값
|
|
lock: FOR UPDATE 잠금 사용 여부
|
|
nowait: 잠금 대기 안함 (즉시 예외 발생)
|
|
|
|
Returns:
|
|
UpsertResult: 엔티티와 생성 여부
|
|
|
|
Example:
|
|
result = await get_or_create_with_lock(
|
|
session=session,
|
|
model=User,
|
|
filter_by={'kakao_id': 12345},
|
|
defaults={'nickname': '홍길동'},
|
|
)
|
|
"""
|
|
# 조회 쿼리 구성
|
|
query = select(model).filter_by(**filter_by)
|
|
|
|
if lock:
|
|
query = query.with_for_update(nowait=nowait)
|
|
|
|
result = await session.execute(query)
|
|
entity = result.scalar_one_or_none()
|
|
|
|
if entity is not None:
|
|
# 기존 엔티티 반환
|
|
return UpsertResult(entity=entity, created=False)
|
|
|
|
# 새 엔티티 생성
|
|
create_values = {**filter_by, **(defaults or {})}
|
|
entity = model(**create_values)
|
|
session.add(entity)
|
|
|
|
try:
|
|
await session.flush()
|
|
except IntegrityError:
|
|
# 동시 INSERT로 인한 충돌 - 다시 조회
|
|
await session.rollback()
|
|
result = await session.execute(select(model).filter_by(**filter_by))
|
|
entity = result.scalar_one()
|
|
return UpsertResult(entity=entity, created=False)
|
|
|
|
return UpsertResult(entity=entity, created=True)
|
|
|
|
|
|
async def bulk_upsert(
|
|
session: AsyncSession,
|
|
model: Type[T],
|
|
unique_columns: List[str],
|
|
records: List[Dict[str, Any]],
|
|
update_columns: Optional[List[str]] = None,
|
|
) -> int:
|
|
"""
|
|
대량 Upsert (MySQL ON DUPLICATE KEY UPDATE 사용)
|
|
|
|
여러 레코드를 한 번에 Upsert합니다.
|
|
데드락 방지를 위해 unique 키 기준으로 정렬 후 처리합니다.
|
|
Unique 인덱스가 반드시 존재해야 합니다.
|
|
|
|
Args:
|
|
session: AsyncSession 인스턴스
|
|
model: SQLAlchemy 모델 클래스
|
|
unique_columns: Unique 제약 컬럼 목록
|
|
records: Upsert할 레코드 딕셔너리 목록
|
|
update_columns: UPDATE 시 변경할 컬럼 목록
|
|
|
|
Returns:
|
|
int: 처리된 레코드 수
|
|
|
|
Example:
|
|
count = await bulk_upsert(
|
|
session=session,
|
|
model=SongTimestamp,
|
|
unique_columns=['suno_audio_id', 'order_idx'],
|
|
records=[
|
|
{'suno_audio_id': 'abc', 'order_idx': 0, 'lyric_line': '가사1'},
|
|
{'suno_audio_id': 'abc', 'order_idx': 1, 'lyric_line': '가사2'},
|
|
],
|
|
)
|
|
"""
|
|
if not records:
|
|
return 0
|
|
|
|
# 데드락 방지: unique 키 기준 정렬
|
|
sorted_records = sorted(
|
|
records,
|
|
key=lambda r: tuple(r.get(col, '') for col in unique_columns)
|
|
)
|
|
|
|
# UPDATE 컬럼 결정
|
|
if update_columns is None:
|
|
all_columns = set(sorted_records[0].keys())
|
|
update_columns = list(all_columns - set(unique_columns))
|
|
|
|
# MySQL INSERT ... ON DUPLICATE KEY UPDATE
|
|
stmt = mysql_insert(model).values(sorted_records)
|
|
|
|
update_dict = {col: stmt.inserted[col] for col in update_columns}
|
|
if hasattr(model, 'updated_at'):
|
|
update_dict['updated_at'] = func.now()
|
|
|
|
stmt = stmt.on_duplicate_key_update(**update_dict)
|
|
|
|
await session.execute(stmt)
|
|
return len(sorted_records)
|
|
|
|
|
|
async def execute_with_retry(
|
|
func: Callable,
|
|
max_retries: int = 3,
|
|
base_delay: float = 0.1,
|
|
retry_on: tuple = (OperationalError,),
|
|
) -> Any:
|
|
"""
|
|
지수 백오프를 사용한 재시도 래퍼
|
|
|
|
데드락이나 일시적 오류 발생 시 자동으로 재시도합니다.
|
|
|
|
Args:
|
|
func: 실행할 비동기 함수 (인자 없음)
|
|
max_retries: 최대 재시도 횟수
|
|
base_delay: 기본 대기 시간 (초)
|
|
retry_on: 재시도할 예외 타입들
|
|
|
|
Returns:
|
|
함수 실행 결과
|
|
|
|
Example:
|
|
async def do_work():
|
|
async with AsyncSessionLocal() as session:
|
|
await upsert_by_unique_key(...)
|
|
await session.commit()
|
|
|
|
result = await execute_with_retry(do_work)
|
|
"""
|
|
last_exception = None
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await func()
|
|
except retry_on as e:
|
|
last_exception = e
|
|
error_msg = str(e).lower()
|
|
|
|
# 데드락 또는 잠금 타임아웃인 경우만 재시도
|
|
if "deadlock" in error_msg or "lock" in error_msg:
|
|
if attempt < max_retries - 1:
|
|
delay = base_delay * (2 ** attempt)
|
|
logger.warning(
|
|
f"DB 작업 실패 (시도 {attempt + 1}/{max_retries}), "
|
|
f"{delay:.2f}초 후 재시도: {e}"
|
|
)
|
|
await asyncio.sleep(delay)
|
|
continue
|
|
raise
|
|
|
|
raise last_exception
|
|
|
|
|
|
class LockManager:
|
|
"""
|
|
분산 잠금 관리자 (MySQL 전용)
|
|
|
|
여러 리소스에 대한 잠금을 일관된 순서로 획득하여
|
|
데드락을 방지합니다.
|
|
"""
|
|
|
|
def __init__(self, session: AsyncSession, timeout_sec: int = 5):
|
|
self.session = session
|
|
self.timeout_sec = timeout_sec
|
|
self._locked_resources: List[tuple] = []
|
|
|
|
async def __aenter__(self):
|
|
from sqlalchemy import text
|
|
# MySQL 잠금 타임아웃 설정
|
|
await self.session.execute(
|
|
text(f"SET innodb_lock_wait_timeout = {self.timeout_sec}")
|
|
)
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
# 트랜잭션 종료 시 자동으로 잠금 해제됨
|
|
self._locked_resources.clear()
|
|
|
|
async def lock_rows(
|
|
self,
|
|
model: Type[T],
|
|
ids: List[Any],
|
|
id_column: str = "id",
|
|
nowait: bool = False,
|
|
) -> List[T]:
|
|
"""
|
|
여러 행을 ID 순서로 잠금
|
|
|
|
Args:
|
|
model: 모델 클래스
|
|
ids: 잠금할 ID 목록
|
|
id_column: ID 컬럼명
|
|
nowait: 잠금 대기 안함
|
|
|
|
Returns:
|
|
잠긴 엔티티 목록
|
|
"""
|
|
if not ids:
|
|
return []
|
|
|
|
# 데드락 방지: ID 정렬
|
|
sorted_ids = sorted(ids)
|
|
|
|
column = getattr(model, id_column)
|
|
query = (
|
|
select(model)
|
|
.where(column.in_(sorted_ids))
|
|
.order_by(column) # 정렬 순서 유지
|
|
.with_for_update(nowait=nowait)
|
|
)
|
|
|
|
result = await self.session.execute(query)
|
|
entities = result.scalars().all()
|
|
|
|
self._locked_resources.append((model.__tablename__, sorted_ids))
|
|
return list(entities)
|
|
|
|
async def lock_row(
|
|
self,
|
|
model: Type[T],
|
|
id_value: Any,
|
|
id_column: str = "id",
|
|
nowait: bool = False,
|
|
) -> Optional[T]:
|
|
"""
|
|
단일 행 잠금
|
|
|
|
Args:
|
|
model: 모델 클래스
|
|
id_value: 잠금할 ID
|
|
id_column: ID 컬럼명
|
|
nowait: 잠금 대기 안함
|
|
|
|
Returns:
|
|
잠긴 엔티티 또는 None
|
|
"""
|
|
entities = await self.lock_rows(model, [id_value], id_column, nowait)
|
|
return entities[0] if entities else None
|
|
```
|
|
|
|
### 5.2 모델에 Unique Constraint 추가 (권장)
|
|
|
|
테이블별 Unique 제약 추가가 필요한 경우:
|
|
|
|
```python
|
|
# app/home/models.py - Image 테이블
|
|
class Image(Base):
|
|
__tablename__ = "image"
|
|
__table_args__ = (
|
|
# task_id + img_order 조합 유니크
|
|
Index("idx_image_task_order", "task_id", "img_order", unique=True),
|
|
...
|
|
)
|
|
|
|
# app/song/models.py - SongTimestamp 테이블
|
|
class SongTimestamp(Base):
|
|
__tablename__ = "song_timestamp"
|
|
__table_args__ = (
|
|
# suno_audio_id + order_idx 조합 유니크
|
|
Index("idx_song_ts_audio_order", "suno_audio_id", "order_idx", unique=True),
|
|
...
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## 6. 사용 예시
|
|
|
|
### 6.1 SongTimestamp Bulk Upsert (우선순위 1 - 실제 문제 발생)
|
|
|
|
**가장 먼저 적용해야 하는 케이스입니다.** 클라이언트의 상태 폴링으로 인해 동일 데이터가 중복 삽입될 수 있습니다.
|
|
|
|
```python
|
|
# app/song/api/routers/v1/song.py
|
|
|
|
from app.utils.db_utils import bulk_upsert
|
|
|
|
# 기존 코드 (문제 있음 - 폴링 시 중복 삽입)
|
|
# for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
|
|
# song_timestamp = SongTimestamp(...)
|
|
# session.add(song_timestamp)
|
|
|
|
# 개선된 코드 (Upsert로 중복 방지)
|
|
records = [
|
|
{
|
|
'suno_audio_id': suno_audio_id,
|
|
'order_idx': idx,
|
|
'lyric_line': ts['text'],
|
|
'start_time': ts['start_sec'],
|
|
'end_time': ts['end_sec'],
|
|
}
|
|
for idx, ts in enumerate(timestamped_lyrics)
|
|
]
|
|
|
|
await bulk_upsert(
|
|
session=session,
|
|
model=SongTimestamp,
|
|
unique_columns=['suno_audio_id', 'order_idx'],
|
|
records=records,
|
|
update_columns=['lyric_line', 'start_time', 'end_time'],
|
|
)
|
|
```
|
|
|
|
### 6.2 Image 중복 방지 Upsert (우선순위 2)
|
|
|
|
```python
|
|
# app/home/api/routers/v1/home.py
|
|
|
|
from app.utils.db_utils import get_or_create_with_lock
|
|
|
|
async def save_image(session: AsyncSession, task_id: str, img_url: str, img_order: int):
|
|
"""이미지 저장 (중복 방지)"""
|
|
result = await get_or_create_with_lock(
|
|
session=session,
|
|
model=Image,
|
|
filter_by={'task_id': task_id, 'img_order': img_order},
|
|
defaults={
|
|
'img_name': f"image_{img_order}",
|
|
'img_url': img_url,
|
|
},
|
|
)
|
|
|
|
if not result.created:
|
|
# 기존 이미지 URL 업데이트
|
|
result.entity.img_url = img_url
|
|
|
|
return result.entity
|
|
```
|
|
|
|
### 6.3 User IntegrityError 처리 (✅ 적용 완료)
|
|
|
|
> **참고**: User 테이블은 OAuth 인가 코드의 일회성 특성상 동시 요청 가능성이 **매우 낮습니다**.
|
|
> `kakao_id` UNIQUE 제약이 있어 중복 시 IntegrityError가 발생하므로,
|
|
> IntegrityError 처리를 추가하여 500 에러 대신 기존 사용자를 재조회합니다.
|
|
|
|
```python
|
|
# app/user/services/auth.py - 현재 적용된 코드
|
|
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
# 신규 사용자 생성 부분
|
|
session.add(new_user)
|
|
|
|
try:
|
|
await session.flush()
|
|
await session.refresh(new_user)
|
|
return new_user, True
|
|
except IntegrityError:
|
|
# 동시 요청으로 인한 중복 삽입 시도 - 기존 사용자 조회
|
|
logger.warning(
|
|
f"[AUTH] IntegrityError 발생 (동시 요청 추정) - kakao_id: {kakao_id}"
|
|
)
|
|
await session.rollback()
|
|
result = await session.execute(
|
|
select(User).where(User.kakao_id == kakao_id)
|
|
)
|
|
existing_user = result.scalar_one_or_none()
|
|
|
|
if existing_user is not None:
|
|
# 프로필 정보 업데이트
|
|
if profile:
|
|
existing_user.nickname = profile.nickname
|
|
existing_user.profile_image_url = profile.profile_image_url
|
|
existing_user.thumbnail_image_url = profile.thumbnail_image_url
|
|
if kakao_account and kakao_account.email:
|
|
existing_user.email = kakao_account.email
|
|
await session.flush()
|
|
return existing_user, False
|
|
|
|
# 재조회에도 실패한 경우 (매우 드문 경우)
|
|
raise
|
|
```
|
|
|
|
**이점:**
|
|
- Upsert 패턴 없이도 안전하게 처리
|
|
- 기존 코드 구조 유지
|
|
- 드문 경우의 500 에러 방지
|
|
|
|
### 6.4 재시도 로직 사용
|
|
|
|
```python
|
|
from app.utils.db_utils import execute_with_retry, bulk_upsert
|
|
|
|
async def safe_timestamp_upsert(suno_audio_id: str, timestamps: list):
|
|
"""안전한 타임스탬프 Upsert (데드락 시 재시도)"""
|
|
|
|
async def do_upsert():
|
|
async with AsyncSessionLocal() as session:
|
|
records = [
|
|
{
|
|
'suno_audio_id': suno_audio_id,
|
|
'order_idx': idx,
|
|
'lyric_line': ts['text'],
|
|
'start_time': ts['start_sec'],
|
|
'end_time': ts['end_sec'],
|
|
}
|
|
for idx, ts in enumerate(timestamps)
|
|
]
|
|
count = await bulk_upsert(
|
|
session=session,
|
|
model=SongTimestamp,
|
|
unique_columns=['suno_audio_id', 'order_idx'],
|
|
records=records,
|
|
)
|
|
await session.commit()
|
|
return count
|
|
|
|
return await execute_with_retry(do_upsert, max_retries=3)
|
|
```
|
|
|
|
### 6.5 LockManager 사용
|
|
|
|
```python
|
|
from app.utils.db_utils import LockManager
|
|
|
|
async def update_multiple_resources(session: AsyncSession, project_ids: list[int]):
|
|
"""여러 프로젝트를 안전하게 업데이트"""
|
|
|
|
async with LockManager(session, timeout_sec=10) as lock:
|
|
# 프로젝트들을 ID 순서로 잠금 (데드락 방지)
|
|
projects = await lock.lock_rows(Project, project_ids)
|
|
|
|
for project in projects:
|
|
project.status = "updated"
|
|
|
|
await session.commit()
|
|
```
|
|
|
|
---
|
|
|
|
## 7. 마이그레이션 가이드
|
|
|
|
### 7.1 단계별 적용 순서 (우선순위 기반)
|
|
|
|
동시 요청 가능성이 높은 테이블부터 적용합니다:
|
|
|
|
1. **1단계**: `app/utils/db_utils.py` 파일 생성 ✅ (이미 완료)
|
|
2. **2단계**: **SongTimestamp** - Unique 인덱스 추가 + `bulk_upsert` 적용 (우선순위 1)
|
|
- 동시 요청 가능성 **높음** (폴링으로 인한 중복 삽입)
|
|
3. **3단계**: **Image** - Unique 인덱스 추가 + `get_or_create_with_lock` 적용 (우선순위 2)
|
|
- 동시 요청 가능성 **중간** (업로드 재시도)
|
|
4. **4단계**: **Song/Video** - 필요시 `get_or_create_with_lock` 적용 (우선순위 3-4)
|
|
- 동시 요청 가능성 중간 (백그라운드 태스크)
|
|
5. **5단계**: **User** - ✅ IntegrityError 처리 추가 완료
|
|
- 동시 요청 가능성 **낮음** (OAuth 인가 코드 일회성)
|
|
- `kakao_id` UNIQUE 제약 + IntegrityError 발생 시 기존 사용자 재조회 처리
|
|
|
|
> **권장**: 1~3단계까지 우선 적용하고, 나머지는 필요에 따라 적용
|
|
|
|
### 7.2 Alembic 마이그레이션 예시
|
|
|
|
```python
|
|
"""Add unique constraints for upsert support
|
|
|
|
Revision ID: xxxx
|
|
"""
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
|
|
def upgrade():
|
|
# SongTimestamp: suno_audio_id + order_idx unique
|
|
op.create_index(
|
|
'idx_song_ts_audio_order',
|
|
'song_timestamp',
|
|
['suno_audio_id', 'order_idx'],
|
|
unique=True
|
|
)
|
|
|
|
# Image: task_id + img_order unique (선택적)
|
|
op.create_index(
|
|
'idx_image_task_order',
|
|
'image',
|
|
['task_id', 'img_order'],
|
|
unique=True
|
|
)
|
|
|
|
def downgrade():
|
|
op.drop_index('idx_song_ts_audio_order', table_name='song_timestamp')
|
|
op.drop_index('idx_image_task_order', table_name='image')
|
|
```
|
|
|
|
---
|
|
|
|
## 8. 모니터링 및 디버깅
|
|
|
|
### 8.1 잠금 모니터링 쿼리 (MySQL)
|
|
|
|
```sql
|
|
-- 현재 잠금 상태 확인 (InnoDB)
|
|
SELECT
|
|
r.trx_id AS waiting_trx_id,
|
|
r.trx_mysql_thread_id AS waiting_thread,
|
|
r.trx_query AS waiting_query,
|
|
b.trx_id AS blocking_trx_id,
|
|
b.trx_mysql_thread_id AS blocking_thread,
|
|
b.trx_query AS blocking_query
|
|
FROM information_schema.innodb_lock_waits w
|
|
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
|
|
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;
|
|
|
|
-- 현재 실행 중인 트랜잭션 확인
|
|
SELECT * FROM information_schema.innodb_trx;
|
|
|
|
-- 데드락 로그 확인
|
|
SHOW ENGINE INNODB STATUS;
|
|
|
|
-- 잠금 대기 중인 쿼리 확인
|
|
SELECT
|
|
pl.id,
|
|
pl.user,
|
|
pl.state,
|
|
pl.info AS query
|
|
FROM information_schema.processlist pl
|
|
WHERE pl.state LIKE '%lock%';
|
|
```
|
|
|
|
### 8.2 로깅 설정
|
|
|
|
```python
|
|
# config.py 또는 logging 설정
|
|
import logging
|
|
|
|
# SQLAlchemy 엔진 로깅 (디버그 시)
|
|
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
|
|
|
|
# Upsert 유틸리티 로깅
|
|
logging.getLogger('app.utils.db_utils').setLevel(logging.DEBUG)
|
|
```
|
|
|
|
---
|
|
|
|
## 9. 요약
|
|
|
|
| 상황 | 권장 패턴 | 함수 |
|
|
|------|-----------|------|
|
|
| 단일 레코드 Upsert (Unique 키 존재) | ON CONFLICT | `upsert_by_unique_key()` |
|
|
| 단일 레코드 Get or Create | SELECT FOR UPDATE | `get_or_create_with_lock()` |
|
|
| 대량 레코드 Upsert | Bulk ON CONFLICT | `bulk_upsert()` |
|
|
| 데드락 방지 재시도 | 지수 백오프 | `execute_with_retry()` |
|
|
| 다중 행 잠금 | 정렬된 순서 잠금 | `LockManager` |
|
|
|
|
---
|
|
|
|
**작성일**: 2026-01-26
|
|
**작성자**: Claude Code (AI Assistant)
|