o2o-castad-backend/docs/plan/insert_for_update_plan.md

12 KiB

INSERT → Upsert 변환 계획서

개요

이 문서는 프로젝트 내 INSERT 코드들을 존재 여부 확인 후 INSERT 또는 DB Lock + UPDATE 패턴으로 변환하기 위한 계획입니다.

적용 범위

  • 포함: video, song, lyric 모듈의 INSERT 코드
  • 제외: user 모듈, 이미지 업로드 엔드포인트

사용할 유틸리티

  • app/utils/db_utils.pyget_or_create_with_lock() 함수
  • app/utils/db_utils.pybulk_upsert() 함수 (대량 INSERT용)

대상 INSERT 위치 목록

# 파일 라인 테이블 키 컬럼 우선순위
1 video.py 252 Video task_id
2 song.py 188 Song task_id
3 song.py 477 SongTimestamp suno_audio_id + order_idx 높음
4 lyric.py 297 Project task_id
5 lyric.py 317 Lyric task_id

1. Video INSERT (video.py:252)

현재 코드

# app/video/api/routers/v1/video.py:244-254

video = Video(
    project_id=project_id,
    lyric_id=lyric_id,
    song_id=song_id,
    task_id=task_id,
    creatomate_render_id=None,
    status="processing",
)
session.add(video)
await session.commit()
video_id = video.id

문제점

  • task_id로 기존 레코드 존재 여부를 확인하지 않음
  • 동일한 task_id로 재요청 시 중복 INSERT 발생 가능

수정 계획

# app/video/api/routers/v1/video.py

# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock

# 기존 코드 대체 (244-254행)
result = await get_or_create_with_lock(
    session=session,
    model=Video,
    filter_by={'task_id': task_id},
    defaults={
        'project_id': project_id,
        'lyric_id': lyric_id,
        'song_id': song_id,
        'creatomate_render_id': None,
        'status': 'processing',
    },
    lock=True,
)

video = result.entity
if not result.created:
    # 이미 존재하는 경우: 상태 업데이트
    video.project_id = project_id
    video.lyric_id = lyric_id
    video.song_id = song_id
    video.status = 'processing'
    video.creatomate_render_id = None

await session.commit()
video_id = video.id

필수 사전 작업

  • Video 테이블에 task_id UNIQUE 인덱스 추가 필요
ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id);

2. Song INSERT (song.py:188)

현재 코드

# app/song/api/routers/v1/song.py:179-191

song = Song(
    project_id=project_id,
    lyric_id=lyric_id,
    task_id=task_id,
    suno_task_id=None,
    status="processing",
    song_prompt=song_prompt,
    language=request_body.language,
)
session.add(song)
await session.commit()
song_id = song.id

문제점

  • task_id로 기존 레코드 존재 여부를 확인하지 않음
  • 동일한 task_id로 재요청 시 중복 INSERT 발생 가능

수정 계획

# app/song/api/routers/v1/song.py

# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock

# 기존 코드 대체 (179-191행)
result = await get_or_create_with_lock(
    session=session,
    model=Song,
    filter_by={'task_id': task_id},
    defaults={
        'project_id': project_id,
        'lyric_id': lyric_id,
        'suno_task_id': None,
        'status': 'processing',
        'song_prompt': song_prompt,
        'language': request_body.language,
    },
    lock=True,
)

song = result.entity
if not result.created:
    # 이미 존재하는 경우: 상태 업데이트
    song.project_id = project_id
    song.lyric_id = lyric_id
    song.status = 'processing'
    song.song_prompt = song_prompt
    song.language = request_body.language

await session.commit()
song_id = song.id

필수 사전 작업

  • Song 테이블에 task_id UNIQUE 인덱스 추가 필요
ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id);

3. SongTimestamp INSERT (song.py:477) ⚠️ 높은 우선순위

현재 코드

# app/song/api/routers/v1/song.py:467-479

for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
    song_timestamp = SongTimestamp(
        suno_audio_id=suno_audio_id,
        order_idx=order_idx,
        lyric_line=timestamped_lyric["text"],
        start_time=timestamped_lyric["start_sec"],
        end_time=timestamped_lyric["end_sec"],
    )
    session.add(song_timestamp)

await session.commit()

문제점

  • 폴링 기반 요청으로 인해 중복 INSERT 위험이 높음
  • suno_audio_id + order_idx 조합으로 존재 여부 확인 없음
  • 여러 행을 루프에서 개별 INSERT하여 비효율적

수정 계획 (bulk_upsert 사용)

# app/song/api/routers/v1/song.py

# 상단 import 추가
from app.utils.db_utils import bulk_upsert

# 기존 코드 대체 (467-479행)
timestamp_records = [
    {
        'suno_audio_id': suno_audio_id,
        'order_idx': order_idx,
        'lyric_line': timestamped_lyric["text"],
        'start_time': timestamped_lyric["start_sec"],
        'end_time': timestamped_lyric["end_sec"],
    }
    for order_idx, timestamped_lyric in enumerate(timestamped_lyrics)
]

await bulk_upsert(
    session=session,
    model=SongTimestamp,
    unique_columns=['suno_audio_id', 'order_idx'],
    records=timestamp_records,
    update_columns=['lyric_line', 'start_time', 'end_time'],
)

await session.commit()

필수 사전 작업

  • SongTimestamp 테이블에 복합 UNIQUE 인덱스 추가 필요
ALTER TABLE song_timestamp
ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx);

대안: get_or_create_with_lock 사용 (개별 처리)

만약 bulk_upsert를 사용하지 않고 개별 처리가 필요한 경우:

from app.utils.db_utils import get_or_create_with_lock

for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
    result = await get_or_create_with_lock(
        session=session,
        model=SongTimestamp,
        filter_by={
            'suno_audio_id': suno_audio_id,
            'order_idx': order_idx,
        },
        defaults={
            'lyric_line': timestamped_lyric["text"],
            'start_time': timestamped_lyric["start_sec"],
            'end_time': timestamped_lyric["end_sec"],
        },
        lock=True,
    )

    if not result.created:
        # 이미 존재하는 경우: 업데이트
        result.entity.lyric_line = timestamped_lyric["text"]
        result.entity.start_time = timestamped_lyric["start_sec"]
        result.entity.end_time = timestamped_lyric["end_sec"]

await session.commit()

권장사항: bulk_upsert 사용 (성능 및 데드락 방지 측면에서 우수)


4. Project INSERT (lyric.py:297)

현재 코드

# app/lyric/api/routers/v1/lyric.py:290-299

project = Project(
    store_name=request_body.customer_name,
    region=request_body.region,
    task_id=task_id,
    detail_region_info=request_body.detail_region_info,
    language=request_body.language,
)
session.add(project)
await session.commit()
await session.refresh(project)

문제점

  • task_id로 기존 레코드 존재 여부를 확인하지 않음
  • 동일한 task_id로 재요청 시 중복 INSERT 발생 가능

수정 계획

# app/lyric/api/routers/v1/lyric.py

# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock

# 기존 코드 대체 (290-299행)
result = await get_or_create_with_lock(
    session=session,
    model=Project,
    filter_by={'task_id': task_id},
    defaults={
        'store_name': request_body.customer_name,
        'region': request_body.region,
        'detail_region_info': request_body.detail_region_info,
        'language': request_body.language,
    },
    lock=True,
)

project = result.entity
if not result.created:
    # 이미 존재하는 경우: 정보 업데이트
    project.store_name = request_body.customer_name
    project.region = request_body.region
    project.detail_region_info = request_body.detail_region_info
    project.language = request_body.language

await session.commit()
await session.refresh(project)

필수 사전 작업

  • Project 테이블에 task_id UNIQUE 인덱스 추가 필요
ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id);

5. Lyric INSERT (lyric.py:317)

현재 코드

# app/lyric/api/routers/v1/lyric.py:308-319

estimated_prompt = lyric_prompt.build_prompt(lyric_input_data)
lyric = Lyric(
    project_id=project.id,
    task_id=task_id,
    status="processing",
    lyric_prompt=estimated_prompt,
    lyric_result=None,
    language=request_body.language,
)
session.add(lyric)
await session.commit()
await session.refresh(lyric)

문제점

  • task_id로 기존 레코드 존재 여부를 확인하지 않음
  • 동일한 task_id로 재요청 시 중복 INSERT 발생 가능

수정 계획

# app/lyric/api/routers/v1/lyric.py

# 상단 import (이미 추가되어 있음)
from app.utils.db_utils import get_or_create_with_lock

# 기존 코드 대체 (308-319행)
estimated_prompt = lyric_prompt.build_prompt(lyric_input_data)

result = await get_or_create_with_lock(
    session=session,
    model=Lyric,
    filter_by={'task_id': task_id},
    defaults={
        'project_id': project.id,
        'status': 'processing',
        'lyric_prompt': estimated_prompt,
        'lyric_result': None,
        'language': request_body.language,
    },
    lock=True,
)

lyric = result.entity
if not result.created:
    # 이미 존재하는 경우: 정보 업데이트
    lyric.project_id = project.id
    lyric.status = 'processing'
    lyric.lyric_prompt = estimated_prompt
    lyric.lyric_result = None
    lyric.language = request_body.language

await session.commit()
await session.refresh(lyric)

필수 사전 작업

  • Lyric 테이블에 task_id UNIQUE 인덱스 추가 필요
ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id);

구현 순서 권장

  1. 1단계: DB 마이그레이션 (필수)

    -- 모든 UNIQUE 인덱스 추가
    ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id);
    ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id);
    ALTER TABLE song_timestamp ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx);
    ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id);
    ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id);
    
  2. 2단계: SongTimestamp (높은 우선순위)

    • 폴링으로 인한 중복 INSERT 위험이 가장 높음
    • bulk_upsert 사용 권장
  3. 3단계: 나머지 테이블 (중간 우선순위)

    • Video, Song, Project, Lyric 순으로 적용
    • 모두 get_or_create_with_lock 사용

롤백 계획

문제 발생 시 원래 코드로 복구:

  1. 코드 변경 사항 git revert
  2. UNIQUE 인덱스는 유지 (데이터 무결성에 도움됨)

테스트 체크리스트

각 수정 후 확인 사항:

  • 새로운 task_id로 요청 시 정상 INSERT
  • 동일한 task_id로 재요청 시 UPDATE (에러 없이)
  • 동시 요청 테스트 (2개 이상 동시 요청)
  • 성능 저하 없는지 확인

참고 문서