12 KiB
12 KiB
INSERT → Upsert 변환 계획서
개요
이 문서는 프로젝트 내 INSERT 코드들을 존재 여부 확인 후 INSERT 또는 DB Lock + UPDATE 패턴으로 변환하기 위한 계획입니다.
적용 범위
- ✅ 포함: video, song, lyric 모듈의 INSERT 코드
- ❌ 제외: user 모듈, 이미지 업로드 엔드포인트
사용할 유틸리티
app/utils/db_utils.py의get_or_create_with_lock()함수app/utils/db_utils.py의bulk_upsert()함수 (대량 INSERT용)
대상 INSERT 위치 목록
| # | 파일 | 라인 | 테이블 | 키 컬럼 | 우선순위 |
|---|---|---|---|---|---|
| 1 | video.py | 252 | Video | task_id | 중 |
| 2 | song.py | 188 | Song | task_id | 중 |
| 3 | song.py | 477 | SongTimestamp | suno_audio_id + order_idx | 높음 |
| 4 | lyric.py | 297 | Project | task_id | 중 |
| 5 | lyric.py | 317 | Lyric | task_id | 중 |
1. Video INSERT (video.py:252)
현재 코드
# app/video/api/routers/v1/video.py:244-254
video = Video(
project_id=project_id,
lyric_id=lyric_id,
song_id=song_id,
task_id=task_id,
creatomate_render_id=None,
status="processing",
)
session.add(video)
await session.commit()
video_id = video.id
문제점
task_id로 기존 레코드 존재 여부를 확인하지 않음- 동일한
task_id로 재요청 시 중복 INSERT 발생 가능
수정 계획
# app/video/api/routers/v1/video.py
# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (244-254행)
result = await get_or_create_with_lock(
session=session,
model=Video,
filter_by={'task_id': task_id},
defaults={
'project_id': project_id,
'lyric_id': lyric_id,
'song_id': song_id,
'creatomate_render_id': None,
'status': 'processing',
},
lock=True,
)
video = result.entity
if not result.created:
# 이미 존재하는 경우: 상태 업데이트
video.project_id = project_id
video.lyric_id = lyric_id
video.song_id = song_id
video.status = 'processing'
video.creatomate_render_id = None
await session.commit()
video_id = video.id
필수 사전 작업
- Video 테이블에
task_idUNIQUE 인덱스 추가 필요
ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id);
2. Song INSERT (song.py:188)
현재 코드
# app/song/api/routers/v1/song.py:179-191
song = Song(
project_id=project_id,
lyric_id=lyric_id,
task_id=task_id,
suno_task_id=None,
status="processing",
song_prompt=song_prompt,
language=request_body.language,
)
session.add(song)
await session.commit()
song_id = song.id
문제점
task_id로 기존 레코드 존재 여부를 확인하지 않음- 동일한
task_id로 재요청 시 중복 INSERT 발생 가능
수정 계획
# app/song/api/routers/v1/song.py
# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (179-191행)
result = await get_or_create_with_lock(
session=session,
model=Song,
filter_by={'task_id': task_id},
defaults={
'project_id': project_id,
'lyric_id': lyric_id,
'suno_task_id': None,
'status': 'processing',
'song_prompt': song_prompt,
'language': request_body.language,
},
lock=True,
)
song = result.entity
if not result.created:
# 이미 존재하는 경우: 상태 업데이트
song.project_id = project_id
song.lyric_id = lyric_id
song.status = 'processing'
song.song_prompt = song_prompt
song.language = request_body.language
await session.commit()
song_id = song.id
필수 사전 작업
- Song 테이블에
task_idUNIQUE 인덱스 추가 필요
ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id);
3. SongTimestamp INSERT (song.py:477) ⚠️ 높은 우선순위
현재 코드
# app/song/api/routers/v1/song.py:467-479
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
song_timestamp = SongTimestamp(
suno_audio_id=suno_audio_id,
order_idx=order_idx,
lyric_line=timestamped_lyric["text"],
start_time=timestamped_lyric["start_sec"],
end_time=timestamped_lyric["end_sec"],
)
session.add(song_timestamp)
await session.commit()
문제점
- 폴링 기반 요청으로 인해 중복 INSERT 위험이 높음
suno_audio_id+order_idx조합으로 존재 여부 확인 없음- 여러 행을 루프에서 개별 INSERT하여 비효율적
수정 계획 (bulk_upsert 사용)
# app/song/api/routers/v1/song.py
# 상단 import 추가
from app.utils.db_utils import bulk_upsert
# 기존 코드 대체 (467-479행)
timestamp_records = [
{
'suno_audio_id': suno_audio_id,
'order_idx': order_idx,
'lyric_line': timestamped_lyric["text"],
'start_time': timestamped_lyric["start_sec"],
'end_time': timestamped_lyric["end_sec"],
}
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics)
]
await bulk_upsert(
session=session,
model=SongTimestamp,
unique_columns=['suno_audio_id', 'order_idx'],
records=timestamp_records,
update_columns=['lyric_line', 'start_time', 'end_time'],
)
await session.commit()
필수 사전 작업
- SongTimestamp 테이블에 복합 UNIQUE 인덱스 추가 필요
ALTER TABLE song_timestamp
ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx);
대안: get_or_create_with_lock 사용 (개별 처리)
만약 bulk_upsert를 사용하지 않고 개별 처리가 필요한 경우:
from app.utils.db_utils import get_or_create_with_lock
for order_idx, timestamped_lyric in enumerate(timestamped_lyrics):
result = await get_or_create_with_lock(
session=session,
model=SongTimestamp,
filter_by={
'suno_audio_id': suno_audio_id,
'order_idx': order_idx,
},
defaults={
'lyric_line': timestamped_lyric["text"],
'start_time': timestamped_lyric["start_sec"],
'end_time': timestamped_lyric["end_sec"],
},
lock=True,
)
if not result.created:
# 이미 존재하는 경우: 업데이트
result.entity.lyric_line = timestamped_lyric["text"]
result.entity.start_time = timestamped_lyric["start_sec"]
result.entity.end_time = timestamped_lyric["end_sec"]
await session.commit()
권장사항: bulk_upsert 사용 (성능 및 데드락 방지 측면에서 우수)
4. Project INSERT (lyric.py:297)
현재 코드
# app/lyric/api/routers/v1/lyric.py:290-299
project = Project(
store_name=request_body.customer_name,
region=request_body.region,
task_id=task_id,
detail_region_info=request_body.detail_region_info,
language=request_body.language,
)
session.add(project)
await session.commit()
await session.refresh(project)
문제점
task_id로 기존 레코드 존재 여부를 확인하지 않음- 동일한
task_id로 재요청 시 중복 INSERT 발생 가능
수정 계획
# app/lyric/api/routers/v1/lyric.py
# 상단 import 추가
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (290-299행)
result = await get_or_create_with_lock(
session=session,
model=Project,
filter_by={'task_id': task_id},
defaults={
'store_name': request_body.customer_name,
'region': request_body.region,
'detail_region_info': request_body.detail_region_info,
'language': request_body.language,
},
lock=True,
)
project = result.entity
if not result.created:
# 이미 존재하는 경우: 정보 업데이트
project.store_name = request_body.customer_name
project.region = request_body.region
project.detail_region_info = request_body.detail_region_info
project.language = request_body.language
await session.commit()
await session.refresh(project)
필수 사전 작업
- Project 테이블에
task_idUNIQUE 인덱스 추가 필요
ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id);
5. Lyric INSERT (lyric.py:317)
현재 코드
# app/lyric/api/routers/v1/lyric.py:308-319
estimated_prompt = lyric_prompt.build_prompt(lyric_input_data)
lyric = Lyric(
project_id=project.id,
task_id=task_id,
status="processing",
lyric_prompt=estimated_prompt,
lyric_result=None,
language=request_body.language,
)
session.add(lyric)
await session.commit()
await session.refresh(lyric)
문제점
task_id로 기존 레코드 존재 여부를 확인하지 않음- 동일한
task_id로 재요청 시 중복 INSERT 발생 가능
수정 계획
# app/lyric/api/routers/v1/lyric.py
# 상단 import (이미 추가되어 있음)
from app.utils.db_utils import get_or_create_with_lock
# 기존 코드 대체 (308-319행)
estimated_prompt = lyric_prompt.build_prompt(lyric_input_data)
result = await get_or_create_with_lock(
session=session,
model=Lyric,
filter_by={'task_id': task_id},
defaults={
'project_id': project.id,
'status': 'processing',
'lyric_prompt': estimated_prompt,
'lyric_result': None,
'language': request_body.language,
},
lock=True,
)
lyric = result.entity
if not result.created:
# 이미 존재하는 경우: 정보 업데이트
lyric.project_id = project.id
lyric.status = 'processing'
lyric.lyric_prompt = estimated_prompt
lyric.lyric_result = None
lyric.language = request_body.language
await session.commit()
await session.refresh(lyric)
필수 사전 작업
- Lyric 테이블에
task_idUNIQUE 인덱스 추가 필요
ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id);
구현 순서 권장
-
1단계: DB 마이그레이션 (필수)
-- 모든 UNIQUE 인덱스 추가 ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id); ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id); ALTER TABLE song_timestamp ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx); ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id); ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id); -
2단계: SongTimestamp (높은 우선순위)
- 폴링으로 인한 중복 INSERT 위험이 가장 높음
bulk_upsert사용 권장
-
3단계: 나머지 테이블 (중간 우선순위)
- Video, Song, Project, Lyric 순으로 적용
- 모두
get_or_create_with_lock사용
롤백 계획
문제 발생 시 원래 코드로 복구:
- 코드 변경 사항 git revert
- UNIQUE 인덱스는 유지 (데이터 무결성에 도움됨)
테스트 체크리스트
각 수정 후 확인 사항:
- 새로운 task_id로 요청 시 정상 INSERT
- 동일한 task_id로 재요청 시 UPDATE (에러 없이)
- 동시 요청 테스트 (2개 이상 동시 요청)
- 성능 저하 없는지 확인
참고 문서
- db_lock.md - DB Lock 및 Upsert 패턴 가이드
- db_utils.py - 유틸리티 함수 구현체