o2o-castad-backend/docs/plan/db_lock.md

27 KiB

DB Lock 및 Upsert 패턴 가이드 (MySQL 전용)

목차

  1. 현재 Insert 사용 현황
  2. Upsert 패턴 설계
  3. DB Lock 전략
  4. 데드락 방지 전략
  5. 제안 코드
  6. 사용 예시

1. 현재 Insert 사용 현황

1.1 테이블별 Insert 분석 및 우선순위

테이블 파일 위치 Unique 제약 동시 요청 가능성 Upsert 우선순위 키 조합
SongTimestamp song.py:477 - 높음 1순위 suno_audio_id + order_idx
Image home.py:503,552,799,821 - 중간 2순위 task_id + img_order
Song song.py:188 - 중간 3순위 task_id
Video video.py:252 - 중간 4순위 task_id
User auth.py:278 kakao_id 낮음 완료 kakao_id
Project lyric.py:297 - 낮음 선택 task_id
Lyric lyric.py:317 - 낮음 선택 task_id
RefreshToken auth.py:315 token_hash - 불필요 - (항상 새로 생성)

동시 요청 가능성 분석

가능성 테이블 발생 시나리오
높음 SongTimestamp 클라이언트가 상태 조회 API를 여러 번 호출 (폴링) → 동일 데이터 중복 삽입
중간 Image 네트워크 오류로 업로드 재시도, 클라이언트 중복 클릭
중간 Song/Video 백그라운드 태스크 재실행, 상태 확인 중복 호출
낮음 User OAuth 인가 코드 일회성으로 동시 요청 거의 불가능

참고: User 테이블의 경우 카카오 OAuth 인가 코드(authorization code)가 일회성이므로, 동일한 코드로 동시 요청은 불가능합니다. 다만 여러 탭에서 각각 로그인을 시작하는 극히 드문 경우에만 발생 가능합니다.

1.2 현재 코드 패턴의 문제점

실제 문제가 발생하는 케이스: SongTimestamp

# song.py:467-479 - 현재 패턴
# 클라이언트가 /song/status/{song_id}를 여러 번 호출하면 중복 삽입 발생!

for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
    song_timestamp = SongTimestamp(
        suno_audio_id=suno_audio_id,
        order_idx=order_idx,
        lyric_line=timestamped_lyric["text"],
        start_time=timestamped_lyric["start_sec"],
        end_time=timestamped_lyric["end_sec"],
    )
    session.add(song_timestamp)  # 동일 suno_audio_id로 중복 삽입!

await session.commit()

문제 시나리오:

  1. 클라이언트가 노래 생성 상태 확인을 위해 폴링
  2. SUCCESS 응답을 받은 후 타임스탬프 저장 로직 실행
  3. 네트워크 지연으로 클라이언트가 재요청
  4. 동일한 데이터가 중복 삽입됨

2. Upsert 패턴 설계 (MySQL 전용)

2.1 MySQL ON DUPLICATE KEY UPDATE (권장)

MySQL의 INSERT ... ON DUPLICATE KEY UPDATE 절을 사용한 원자적 Upsert:

from sqlalchemy.dialects.mysql import insert as mysql_insert

stmt = mysql_insert(User).values(
    kakao_id=kakao_id,
    nickname=nickname,
    email=email,
)
stmt = stmt.on_duplicate_key_update(
    nickname=stmt.inserted.nickname,  # MySQL은 stmt.inserted 사용
    email=stmt.inserted.email,
    updated_at=func.now(),
)
await session.execute(stmt)

# MySQL은 RETURNING 미지원 - 별도 조회 필요
result = await session.execute(select(User).where(User.kakao_id == kakao_id))
user = result.scalar_one()

장점:

  • 원자적 연산 (단일 쿼리)
  • 데드락 위험 최소화
  • 동시 요청에서도 안전

전제 조건:

  • Unique 인덱스 필수 (없으면 항상 INSERT만 됨)

2.3 비관적 잠금 (Pessimistic Locking)

SELECT ... FOR UPDATE를 사용한 행 수준 잠금:

result = await session.execute(
    select(User)
    .where(User.kakao_id == kakao_id)
    .with_for_update()  # FOR UPDATE 잠금
)
user = result.scalar_one_or_none()

3. DB Lock 전략

3.1 잠금 유형

잠금 유형 사용 시점 SQLAlchemy 구현
공유 잠금 (Shared) 읽기 작업 .with_for_update(read=True)
배타 잠금 (Exclusive) 쓰기 작업 .with_for_update()
NOWAIT 즉시 실패 .with_for_update(nowait=True)
SKIP LOCKED 잠긴 행 건너뛰기 .with_for_update(skip_locked=True)

3.2 잠금 범위 선택

# 1. 행 수준 잠금 (Row-level) - 권장
select(User).where(User.id == user_id).with_for_update()

# 2. 키 범위 잠금 (Key-range) - 범위 조회 시
select(Image).where(Image.task_id == task_id).with_for_update()

# 3. 테이블 잠금 - 피해야 함 (성능 저하)

3.3 트랜잭션 격리 수준

from sqlalchemy import text

# 세션별 격리 수준 설정
await session.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))

4. 데드락 방지 전략

4.1 핵심 원칙

  1. 일관된 잠금 순서: 항상 같은 순서로 리소스 접근
  2. 짧은 트랜잭션: 잠금 유지 시간 최소화
  3. 타임아웃 설정: 무한 대기 방지
  4. 재시도 로직: 데드락 발생 시 자동 재시도

4.2 잠금 순서 규칙

# 올바른 순서: 항상 PK 또는 정렬된 순서로 잠금
async def lock_resources_safely(session, resource_ids: list[int]):
    """리소스를 ID 순서로 정렬하여 잠금"""
    sorted_ids = sorted(resource_ids)  # 정렬!

    for resource_id in sorted_ids:
        await session.execute(
            select(Resource)
            .where(Resource.id == resource_id)
            .with_for_update()
        )

4.3 타임아웃 설정 (MySQL)

# MySQL 잠금 타임아웃 설정
await session.execute(text("SET innodb_lock_wait_timeout = 5"))

4.4 재시도 로직

import asyncio
from sqlalchemy.exc import OperationalError

async def execute_with_retry(
    session,
    operation,
    max_retries: int = 3,
    base_delay: float = 0.1,
):
    """지수 백오프를 사용한 재시도 로직"""
    for attempt in range(max_retries):
        try:
            return await operation(session)
        except OperationalError as e:
            if "deadlock" in str(e).lower() and attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt)
                await asyncio.sleep(delay)
                await session.rollback()
                continue
            raise

5. 제안 코드 (MySQL 전용)

5.1 공통 Upsert 유틸리티

app/utils/db_utils.py 파일 ( 이미 생성됨):

"""
DB 유틸리티 - Upsert 및 Lock 관리 (MySQL 전용)

MySQL의 INSERT ... ON DUPLICATE KEY UPDATE를 사용한 안전한 Upsert 패턴과
데드락 방지를 위한 잠금 관리 유틸리티를 제공합니다.
"""

import asyncio
import logging
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar

from sqlalchemy import func, select
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase

logger = logging.getLogger(__name__)

T = TypeVar("T", bound=DeclarativeBase)


class UpsertResult:
    """Upsert 결과를 담는 클래스"""

    def __init__(self, entity: Any, created: bool):
        self.entity = entity
        self.created = created  # True: INSERT, False: UPDATE


async def upsert_by_unique_key(
    session: AsyncSession,
    model: Type[T],
    unique_columns: List[str],
    values: Dict[str, Any],
    update_columns: Optional[List[str]] = None,
    lock_timeout_sec: int = 5,
) -> UpsertResult:
    """
    Unique 키 기반 원자적 Upsert (MySQL ON DUPLICATE KEY UPDATE 사용)

    Args:
        session: AsyncSession 인스턴스
        model: SQLAlchemy 모델 클래스
        unique_columns: Unique 제약 컬럼 목록
        values: INSERT/UPDATE 값 딕셔너리
        update_columns: UPDATE 시 변경할 컬럼 목록 (None이면 unique 제외 전체)
        lock_timeout_sec: 잠금 타임아웃 (초)

    Returns:
        UpsertResult: 엔티티와 생성 여부

    Example:
        result = await upsert_by_unique_key(
            session=session,
            model=User,
            unique_columns=['kakao_id'],
            values={'kakao_id': 12345, 'nickname': '홍길동', 'email': 'test@test.com'},
            update_columns=['nickname', 'email'],
        )
        if result.created:
            print("새 사용자 생성됨")
        else:
            print("기존 사용자 업데이트됨")
    """
    from sqlalchemy import text

    # MySQL 잠금 타임아웃 설정
    await session.execute(text(f"SET innodb_lock_wait_timeout = {lock_timeout_sec}"))

    # UPDATE 컬럼 결정
    if update_columns is None:
        update_columns = [k for k in values.keys() if k not in unique_columns]

    # MySQL INSERT ... ON DUPLICATE KEY UPDATE
    stmt = mysql_insert(model).values(**values)

    update_dict = {col: stmt.inserted[col] for col in update_columns}
    if hasattr(model, 'updated_at'):
        update_dict['updated_at'] = func.now()

    stmt = stmt.on_duplicate_key_update(**update_dict)

    await session.execute(stmt)

    # MySQL은 RETURNING 미지원 - 별도 조회 필요
    filter_conditions = {col: values[col] for col in unique_columns}
    result = await session.execute(select(model).filter_by(**filter_conditions))
    entity = result.scalar_one()

    # created 여부 확인 (created_at == updated_at 비교)
    created = True
    if hasattr(entity, 'updated_at') and hasattr(entity, 'created_at'):
        if entity.created_at and entity.updated_at:
            created = abs((entity.updated_at - entity.created_at).total_seconds()) < 1

    return UpsertResult(entity=entity, created=created)


async def get_or_create_with_lock(
    session: AsyncSession,
    model: Type[T],
    filter_by: Dict[str, Any],
    defaults: Optional[Dict[str, Any]] = None,
    lock: bool = True,
    nowait: bool = False,
) -> UpsertResult:
    """
    SELECT FOR UPDATE를 사용한 안전한 Get or Create

    동시 요청에서도 안전하게 작동하며,
    행이 존재하면 잠금 후 반환, 없으면 생성합니다.

    Args:
        session: AsyncSession 인스턴스
        model: SQLAlchemy 모델 클래스
        filter_by: 조회 조건 딕셔너리
        defaults: 생성 시 추가할 기본값
        lock: FOR UPDATE 잠금 사용 여부
        nowait: 잠금 대기 안함 (즉시 예외 발생)

    Returns:
        UpsertResult: 엔티티와 생성 여부

    Example:
        result = await get_or_create_with_lock(
            session=session,
            model=User,
            filter_by={'kakao_id': 12345},
            defaults={'nickname': '홍길동'},
        )
    """
    # 조회 쿼리 구성
    query = select(model).filter_by(**filter_by)

    if lock:
        query = query.with_for_update(nowait=nowait)

    result = await session.execute(query)
    entity = result.scalar_one_or_none()

    if entity is not None:
        # 기존 엔티티 반환
        return UpsertResult(entity=entity, created=False)

    # 새 엔티티 생성
    create_values = {**filter_by, **(defaults or {})}
    entity = model(**create_values)
    session.add(entity)

    try:
        await session.flush()
    except IntegrityError:
        # 동시 INSERT로 인한 충돌 - 다시 조회
        await session.rollback()
        result = await session.execute(select(model).filter_by(**filter_by))
        entity = result.scalar_one()
        return UpsertResult(entity=entity, created=False)

    return UpsertResult(entity=entity, created=True)


async def bulk_upsert(
    session: AsyncSession,
    model: Type[T],
    unique_columns: List[str],
    records: List[Dict[str, Any]],
    update_columns: Optional[List[str]] = None,
) -> int:
    """
    대량 Upsert (MySQL ON DUPLICATE KEY UPDATE 사용)

    여러 레코드를 한 번에 Upsert합니다.
    데드락 방지를 위해 unique 키 기준으로 정렬 후 처리합니다.
    Unique 인덱스가 반드시 존재해야 합니다.

    Args:
        session: AsyncSession 인스턴스
        model: SQLAlchemy 모델 클래스
        unique_columns: Unique 제약 컬럼 목록
        records: Upsert할 레코드 딕셔너리 목록
        update_columns: UPDATE 시 변경할 컬럼 목록

    Returns:
        int: 처리된 레코드 수

    Example:
        count = await bulk_upsert(
            session=session,
            model=SongTimestamp,
            unique_columns=['suno_audio_id', 'order_idx'],
            records=[
                {'suno_audio_id': 'abc', 'order_idx': 0, 'lyric_line': '가사1'},
                {'suno_audio_id': 'abc', 'order_idx': 1, 'lyric_line': '가사2'},
            ],
        )
    """
    if not records:
        return 0

    # 데드락 방지: unique 키 기준 정렬
    sorted_records = sorted(
        records,
        key=lambda r: tuple(r.get(col, '') for col in unique_columns)
    )

    # UPDATE 컬럼 결정
    if update_columns is None:
        all_columns = set(sorted_records[0].keys())
        update_columns = list(all_columns - set(unique_columns))

    # MySQL INSERT ... ON DUPLICATE KEY UPDATE
    stmt = mysql_insert(model).values(sorted_records)

    update_dict = {col: stmt.inserted[col] for col in update_columns}
    if hasattr(model, 'updated_at'):
        update_dict['updated_at'] = func.now()

    stmt = stmt.on_duplicate_key_update(**update_dict)

    await session.execute(stmt)
    return len(sorted_records)


async def execute_with_retry(
    func: Callable,
    max_retries: int = 3,
    base_delay: float = 0.1,
    retry_on: tuple = (OperationalError,),
) -> Any:
    """
    지수 백오프를 사용한 재시도 래퍼

    데드락이나 일시적 오류 발생 시 자동으로 재시도합니다.

    Args:
        func: 실행할 비동기 함수 (인자 없음)
        max_retries: 최대 재시도 횟수
        base_delay: 기본 대기 시간 (초)
        retry_on: 재시도할 예외 타입들

    Returns:
        함수 실행 결과

    Example:
        async def do_work():
            async with AsyncSessionLocal() as session:
                await upsert_by_unique_key(...)
                await session.commit()

        result = await execute_with_retry(do_work)
    """
    last_exception = None

    for attempt in range(max_retries):
        try:
            return await func()
        except retry_on as e:
            last_exception = e
            error_msg = str(e).lower()

            # 데드락 또는 잠금 타임아웃인 경우만 재시도
            if "deadlock" in error_msg or "lock" in error_msg:
                if attempt < max_retries - 1:
                    delay = base_delay * (2 ** attempt)
                    logger.warning(
                        f"DB 작업 실패 (시도 {attempt + 1}/{max_retries}), "
                        f"{delay:.2f}초 후 재시도: {e}"
                    )
                    await asyncio.sleep(delay)
                    continue
            raise

    raise last_exception


class LockManager:
    """
    분산 잠금 관리자 (MySQL 전용)

    여러 리소스에 대한 잠금을 일관된 순서로 획득하여
    데드락을 방지합니다.
    """

    def __init__(self, session: AsyncSession, timeout_sec: int = 5):
        self.session = session
        self.timeout_sec = timeout_sec
        self._locked_resources: List[tuple] = []

    async def __aenter__(self):
        from sqlalchemy import text
        # MySQL 잠금 타임아웃 설정
        await self.session.execute(
            text(f"SET innodb_lock_wait_timeout = {self.timeout_sec}")
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 트랜잭션 종료 시 자동으로 잠금 해제됨
        self._locked_resources.clear()

    async def lock_rows(
        self,
        model: Type[T],
        ids: List[Any],
        id_column: str = "id",
        nowait: bool = False,
    ) -> List[T]:
        """
        여러 행을 ID 순서로 잠금

        Args:
            model: 모델 클래스
            ids: 잠금할 ID 목록
            id_column: ID 컬럼명
            nowait: 잠금 대기 안함

        Returns:
            잠긴 엔티티 목록
        """
        if not ids:
            return []

        # 데드락 방지: ID 정렬
        sorted_ids = sorted(ids)

        column = getattr(model, id_column)
        query = (
            select(model)
            .where(column.in_(sorted_ids))
            .order_by(column)  # 정렬 순서 유지
            .with_for_update(nowait=nowait)
        )

        result = await self.session.execute(query)
        entities = result.scalars().all()

        self._locked_resources.append((model.__tablename__, sorted_ids))
        return list(entities)

    async def lock_row(
        self,
        model: Type[T],
        id_value: Any,
        id_column: str = "id",
        nowait: bool = False,
    ) -> Optional[T]:
        """
        단일 행 잠금

        Args:
            model: 모델 클래스
            id_value: 잠금할 ID
            id_column: ID 컬럼명
            nowait: 잠금 대기 안함

        Returns:
            잠긴 엔티티 또는 None
        """
        entities = await self.lock_rows(model, [id_value], id_column, nowait)
        return entities[0] if entities else None

5.2 모델에 Unique Constraint 추가 (권장)

테이블별 Unique 제약 추가가 필요한 경우:

# app/home/models.py - Image 테이블
class Image(Base):
    __tablename__ = "image"
    __table_args__ = (
        # task_id + img_order 조합 유니크
        Index("idx_image_task_order", "task_id", "img_order", unique=True),
        ...
    )

# app/song/models.py - SongTimestamp 테이블
class SongTimestamp(Base):
    __tablename__ = "song_timestamp"
    __table_args__ = (
        # suno_audio_id + order_idx 조합 유니크
        Index("idx_song_ts_audio_order", "suno_audio_id", "order_idx", unique=True),
        ...
    )

6. 사용 예시

6.1 SongTimestamp Bulk Upsert (우선순위 1 - 실제 문제 발생)

가장 먼저 적용해야 하는 케이스입니다. 클라이언트의 상태 폴링으로 인해 동일 데이터가 중복 삽입될 수 있습니다.

# app/song/api/routers/v1/song.py

from app.utils.db_utils import bulk_upsert

# 기존 코드 (문제 있음 - 폴링 시 중복 삽입)
# for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
#     song_timestamp = SongTimestamp(...)
#     session.add(song_timestamp)

# 개선된 코드 (Upsert로 중복 방지)
records = [
    {
        'suno_audio_id': suno_audio_id,
        'order_idx': idx,
        'lyric_line': ts['text'],
        'start_time': ts['start_sec'],
        'end_time': ts['end_sec'],
    }
    for idx, ts in enumerate(timestamped_lyrics)
]

await bulk_upsert(
    session=session,
    model=SongTimestamp,
    unique_columns=['suno_audio_id', 'order_idx'],
    records=records,
    update_columns=['lyric_line', 'start_time', 'end_time'],
)

6.2 Image 중복 방지 Upsert (우선순위 2)

# app/home/api/routers/v1/home.py

from app.utils.db_utils import get_or_create_with_lock

async def save_image(session: AsyncSession, task_id: str, img_url: str, img_order: int):
    """이미지 저장 (중복 방지)"""
    result = await get_or_create_with_lock(
        session=session,
        model=Image,
        filter_by={'task_id': task_id, 'img_order': img_order},
        defaults={
            'img_name': f"image_{img_order}",
            'img_url': img_url,
        },
    )

    if not result.created:
        # 기존 이미지 URL 업데이트
        result.entity.img_url = img_url

    return result.entity

6.3 User IntegrityError 처리 ( 적용 완료)

참고: User 테이블은 OAuth 인가 코드의 일회성 특성상 동시 요청 가능성이 매우 낮습니다. kakao_id UNIQUE 제약이 있어 중복 시 IntegrityError가 발생하므로, IntegrityError 처리를 추가하여 500 에러 대신 기존 사용자를 재조회합니다.

# app/user/services/auth.py - 현재 적용된 코드

from sqlalchemy.exc import IntegrityError

# 신규 사용자 생성 부분
session.add(new_user)

try:
    await session.flush()
    await session.refresh(new_user)
    return new_user, True
except IntegrityError:
    # 동시 요청으로 인한 중복 삽입 시도 - 기존 사용자 조회
    logger.warning(
        f"[AUTH] IntegrityError 발생 (동시 요청 추정) - kakao_id: {kakao_id}"
    )
    await session.rollback()
    result = await session.execute(
        select(User).where(User.kakao_id == kakao_id)
    )
    existing_user = result.scalar_one_or_none()

    if existing_user is not None:
        # 프로필 정보 업데이트
        if profile:
            existing_user.nickname = profile.nickname
            existing_user.profile_image_url = profile.profile_image_url
            existing_user.thumbnail_image_url = profile.thumbnail_image_url
        if kakao_account and kakao_account.email:
            existing_user.email = kakao_account.email
        await session.flush()
        return existing_user, False

    # 재조회에도 실패한 경우 (매우 드문 경우)
    raise

이점:

  • Upsert 패턴 없이도 안전하게 처리
  • 기존 코드 구조 유지
  • 드문 경우의 500 에러 방지

6.4 재시도 로직 사용

from app.utils.db_utils import execute_with_retry, bulk_upsert

async def safe_timestamp_upsert(suno_audio_id: str, timestamps: list):
    """안전한 타임스탬프 Upsert (데드락 시 재시도)"""

    async def do_upsert():
        async with AsyncSessionLocal() as session:
            records = [
                {
                    'suno_audio_id': suno_audio_id,
                    'order_idx': idx,
                    'lyric_line': ts['text'],
                    'start_time': ts['start_sec'],
                    'end_time': ts['end_sec'],
                }
                for idx, ts in enumerate(timestamps)
            ]
            count = await bulk_upsert(
                session=session,
                model=SongTimestamp,
                unique_columns=['suno_audio_id', 'order_idx'],
                records=records,
            )
            await session.commit()
            return count

    return await execute_with_retry(do_upsert, max_retries=3)

6.5 LockManager 사용

from app.utils.db_utils import LockManager

async def update_multiple_resources(session: AsyncSession, project_ids: list[int]):
    """여러 프로젝트를 안전하게 업데이트"""

    async with LockManager(session, timeout_sec=10) as lock:
        # 프로젝트들을 ID 순서로 잠금 (데드락 방지)
        projects = await lock.lock_rows(Project, project_ids)

        for project in projects:
            project.status = "updated"

        await session.commit()

7. 마이그레이션 가이드

7.1 단계별 적용 순서 (우선순위 기반)

동시 요청 가능성이 높은 테이블부터 적용합니다:

  1. 1단계: app/utils/db_utils.py 파일 생성 (이미 완료)
  2. 2단계: SongTimestamp - Unique 인덱스 추가 + bulk_upsert 적용 (우선순위 1)
    • 동시 요청 가능성 높음 (폴링으로 인한 중복 삽입)
  3. 3단계: Image - Unique 인덱스 추가 + get_or_create_with_lock 적용 (우선순위 2)
    • 동시 요청 가능성 중간 (업로드 재시도)
  4. 4단계: Song/Video - 필요시 get_or_create_with_lock 적용 (우선순위 3-4)
    • 동시 요청 가능성 중간 (백그라운드 태스크)
  5. 5단계: User - IntegrityError 처리 추가 완료
    • 동시 요청 가능성 낮음 (OAuth 인가 코드 일회성)
    • kakao_id UNIQUE 제약 + IntegrityError 발생 시 기존 사용자 재조회 처리

권장: 1~3단계까지 우선 적용하고, 나머지는 필요에 따라 적용

7.2 Alembic 마이그레이션 예시

"""Add unique constraints for upsert support

Revision ID: xxxx
"""
from alembic import op
import sqlalchemy as sa

def upgrade():
    # SongTimestamp: suno_audio_id + order_idx unique
    op.create_index(
        'idx_song_ts_audio_order',
        'song_timestamp',
        ['suno_audio_id', 'order_idx'],
        unique=True
    )

    # Image: task_id + img_order unique (선택적)
    op.create_index(
        'idx_image_task_order',
        'image',
        ['task_id', 'img_order'],
        unique=True
    )

def downgrade():
    op.drop_index('idx_song_ts_audio_order', table_name='song_timestamp')
    op.drop_index('idx_image_task_order', table_name='image')

8. 모니터링 및 디버깅

8.1 잠금 모니터링 쿼리 (MySQL)

-- 현재 잠금 상태 확인 (InnoDB)
SELECT
    r.trx_id AS waiting_trx_id,
    r.trx_mysql_thread_id AS waiting_thread,
    r.trx_query AS waiting_query,
    b.trx_id AS blocking_trx_id,
    b.trx_mysql_thread_id AS blocking_thread,
    b.trx_query AS blocking_query
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;

-- 현재 실행 중인 트랜잭션 확인
SELECT * FROM information_schema.innodb_trx;

-- 데드락 로그 확인
SHOW ENGINE INNODB STATUS;

-- 잠금 대기 중인 쿼리 확인
SELECT
    pl.id,
    pl.user,
    pl.state,
    pl.info AS query
FROM information_schema.processlist pl
WHERE pl.state LIKE '%lock%';

8.2 로깅 설정

# config.py 또는 logging 설정
import logging

# SQLAlchemy 엔진 로깅 (디버그 시)
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

# Upsert 유틸리티 로깅
logging.getLogger('app.utils.db_utils').setLevel(logging.DEBUG)

9. 요약

상황 권장 패턴 함수
단일 레코드 Upsert (Unique 키 존재) ON CONFLICT upsert_by_unique_key()
단일 레코드 Get or Create SELECT FOR UPDATE get_or_create_with_lock()
대량 레코드 Upsert Bulk ON CONFLICT bulk_upsert()
데드락 방지 재시도 지수 백오프 execute_with_retry()
다중 행 잠금 정렬된 순서 잠금 LockManager

작성일: 2026-01-26 작성자: Claude Code (AI Assistant)