# INSERT → Upsert 변환 계획서 ## 개요 이 문서는 프로젝트 내 INSERT 코드들을 **존재 여부 확인 후 INSERT 또는 DB Lock + UPDATE** 패턴으로 변환하기 위한 계획입니다. ### 적용 범위 - ✅ 포함: video, song, lyric 모듈의 INSERT 코드 - ❌ 제외: user 모듈, 이미지 업로드 엔드포인트 ### 사용할 유틸리티 - `app/utils/db_utils.py`의 `get_or_create_with_lock()` 함수 - `app/utils/db_utils.py`의 `bulk_upsert()` 함수 (대량 INSERT용) --- ## 대상 INSERT 위치 목록 | # | 파일 | 라인 | 테이블 | 키 컬럼 | 우선순위 | |---|------|------|--------|---------|----------| | 1 | video.py | 252 | Video | task_id | 중 | | 2 | song.py | 188 | Song | task_id | 중 | | 3 | song.py | 477 | SongTimestamp | suno_audio_id + order_idx | **높음** | | 4 | lyric.py | 297 | Project | task_id | 중 | | 5 | lyric.py | 317 | Lyric | task_id | 중 | --- ## 1. Video INSERT (video.py:252) ### 현재 코드 ```python # app/video/api/routers/v1/video.py:244-254 video = Video( project_id=project_id, lyric_id=lyric_id, song_id=song_id, task_id=task_id, creatomate_render_id=None, status="processing", ) session.add(video) await session.commit() video_id = video.id ``` ### 문제점 - `task_id`로 기존 레코드 존재 여부를 확인하지 않음 - 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능 ### 수정 계획 ```python # app/video/api/routers/v1/video.py # 상단 import 추가 from app.utils.db_utils import get_or_create_with_lock # 기존 코드 대체 (244-254행) result = await get_or_create_with_lock( session=session, model=Video, filter_by={'task_id': task_id}, defaults={ 'project_id': project_id, 'lyric_id': lyric_id, 'song_id': song_id, 'creatomate_render_id': None, 'status': 'processing', }, lock=True, ) video = result.entity if not result.created: # 이미 존재하는 경우: 상태 업데이트 video.project_id = project_id video.lyric_id = lyric_id video.song_id = song_id video.status = 'processing' video.creatomate_render_id = None await session.commit() video_id = video.id ``` ### 필수 사전 작업 - Video 테이블에 `task_id` UNIQUE 인덱스 추가 필요 ```sql ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id); ``` --- ## 2. Song INSERT (song.py:188) ### 현재 코드 ```python # app/song/api/routers/v1/song.py:179-191 song = Song( project_id=project_id, lyric_id=lyric_id, task_id=task_id, suno_task_id=None, status="processing", song_prompt=song_prompt, language=request_body.language, ) session.add(song) await session.commit() song_id = song.id ``` ### 문제점 - `task_id`로 기존 레코드 존재 여부를 확인하지 않음 - 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능 ### 수정 계획 ```python # app/song/api/routers/v1/song.py # 상단 import 추가 from app.utils.db_utils import get_or_create_with_lock # 기존 코드 대체 (179-191행) result = await get_or_create_with_lock( session=session, model=Song, filter_by={'task_id': task_id}, defaults={ 'project_id': project_id, 'lyric_id': lyric_id, 'suno_task_id': None, 'status': 'processing', 'song_prompt': song_prompt, 'language': request_body.language, }, lock=True, ) song = result.entity if not result.created: # 이미 존재하는 경우: 상태 업데이트 song.project_id = project_id song.lyric_id = lyric_id song.status = 'processing' song.song_prompt = song_prompt song.language = request_body.language await session.commit() song_id = song.id ``` ### 필수 사전 작업 - Song 테이블에 `task_id` UNIQUE 인덱스 추가 필요 ```sql ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id); ``` --- ## 3. SongTimestamp INSERT (song.py:477) ⚠️ 높은 우선순위 ### 현재 코드 ```python # app/song/api/routers/v1/song.py:467-479 for order_idx, timestamped_lyric in enumerate(timestamped_lyrics): song_timestamp = SongTimestamp( suno_audio_id=suno_audio_id, order_idx=order_idx, lyric_line=timestamped_lyric["text"], start_time=timestamped_lyric["start_sec"], end_time=timestamped_lyric["end_sec"], ) session.add(song_timestamp) await session.commit() ``` ### 문제점 - **폴링 기반 요청으로 인해 중복 INSERT 위험이 높음** - `suno_audio_id` + `order_idx` 조합으로 존재 여부 확인 없음 - 여러 행을 루프에서 개별 INSERT하여 비효율적 ### 수정 계획 (bulk_upsert 사용) ```python # app/song/api/routers/v1/song.py # 상단 import 추가 from app.utils.db_utils import bulk_upsert # 기존 코드 대체 (467-479행) timestamp_records = [ { 'suno_audio_id': suno_audio_id, 'order_idx': order_idx, 'lyric_line': timestamped_lyric["text"], 'start_time': timestamped_lyric["start_sec"], 'end_time': timestamped_lyric["end_sec"], } for order_idx, timestamped_lyric in enumerate(timestamped_lyrics) ] await bulk_upsert( session=session, model=SongTimestamp, unique_columns=['suno_audio_id', 'order_idx'], records=timestamp_records, update_columns=['lyric_line', 'start_time', 'end_time'], ) await session.commit() ``` ### 필수 사전 작업 - SongTimestamp 테이블에 복합 UNIQUE 인덱스 추가 필요 ```sql ALTER TABLE song_timestamp ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx); ``` ### 대안: get_or_create_with_lock 사용 (개별 처리) 만약 `bulk_upsert`를 사용하지 않고 개별 처리가 필요한 경우: ```python from app.utils.db_utils import get_or_create_with_lock for order_idx, timestamped_lyric in enumerate(timestamped_lyrics): result = await get_or_create_with_lock( session=session, model=SongTimestamp, filter_by={ 'suno_audio_id': suno_audio_id, 'order_idx': order_idx, }, defaults={ 'lyric_line': timestamped_lyric["text"], 'start_time': timestamped_lyric["start_sec"], 'end_time': timestamped_lyric["end_sec"], }, lock=True, ) if not result.created: # 이미 존재하는 경우: 업데이트 result.entity.lyric_line = timestamped_lyric["text"] result.entity.start_time = timestamped_lyric["start_sec"] result.entity.end_time = timestamped_lyric["end_sec"] await session.commit() ``` **권장사항**: `bulk_upsert` 사용 (성능 및 데드락 방지 측면에서 우수) --- ## 4. Project INSERT (lyric.py:297) ### 현재 코드 ```python # app/lyric/api/routers/v1/lyric.py:290-299 project = Project( store_name=request_body.customer_name, region=request_body.region, task_id=task_id, detail_region_info=request_body.detail_region_info, language=request_body.language, ) session.add(project) await session.commit() await session.refresh(project) ``` ### 문제점 - `task_id`로 기존 레코드 존재 여부를 확인하지 않음 - 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능 ### 수정 계획 ```python # app/lyric/api/routers/v1/lyric.py # 상단 import 추가 from app.utils.db_utils import get_or_create_with_lock # 기존 코드 대체 (290-299행) result = await get_or_create_with_lock( session=session, model=Project, filter_by={'task_id': task_id}, defaults={ 'store_name': request_body.customer_name, 'region': request_body.region, 'detail_region_info': request_body.detail_region_info, 'language': request_body.language, }, lock=True, ) project = result.entity if not result.created: # 이미 존재하는 경우: 정보 업데이트 project.store_name = request_body.customer_name project.region = request_body.region project.detail_region_info = request_body.detail_region_info project.language = request_body.language await session.commit() await session.refresh(project) ``` ### 필수 사전 작업 - Project 테이블에 `task_id` UNIQUE 인덱스 추가 필요 ```sql ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id); ``` --- ## 5. Lyric INSERT (lyric.py:317) ### 현재 코드 ```python # app/lyric/api/routers/v1/lyric.py:308-319 estimated_prompt = lyric_prompt.build_prompt(lyric_input_data) lyric = Lyric( project_id=project.id, task_id=task_id, status="processing", lyric_prompt=estimated_prompt, lyric_result=None, language=request_body.language, ) session.add(lyric) await session.commit() await session.refresh(lyric) ``` ### 문제점 - `task_id`로 기존 레코드 존재 여부를 확인하지 않음 - 동일한 `task_id`로 재요청 시 중복 INSERT 발생 가능 ### 수정 계획 ```python # app/lyric/api/routers/v1/lyric.py # 상단 import (이미 추가되어 있음) from app.utils.db_utils import get_or_create_with_lock # 기존 코드 대체 (308-319행) estimated_prompt = lyric_prompt.build_prompt(lyric_input_data) result = await get_or_create_with_lock( session=session, model=Lyric, filter_by={'task_id': task_id}, defaults={ 'project_id': project.id, 'status': 'processing', 'lyric_prompt': estimated_prompt, 'lyric_result': None, 'language': request_body.language, }, lock=True, ) lyric = result.entity if not result.created: # 이미 존재하는 경우: 정보 업데이트 lyric.project_id = project.id lyric.status = 'processing' lyric.lyric_prompt = estimated_prompt lyric.lyric_result = None lyric.language = request_body.language await session.commit() await session.refresh(lyric) ``` ### 필수 사전 작업 - Lyric 테이블에 `task_id` UNIQUE 인덱스 추가 필요 ```sql ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id); ``` --- ## 구현 순서 권장 1. **1단계: DB 마이그레이션** (필수) ```sql -- 모든 UNIQUE 인덱스 추가 ALTER TABLE video ADD UNIQUE INDEX idx_video_task_id (task_id); ALTER TABLE song ADD UNIQUE INDEX idx_song_task_id (task_id); ALTER TABLE song_timestamp ADD UNIQUE INDEX idx_song_timestamp_audio_order (suno_audio_id, order_idx); ALTER TABLE project ADD UNIQUE INDEX idx_project_task_id (task_id); ALTER TABLE lyric ADD UNIQUE INDEX idx_lyric_task_id (task_id); ``` 2. **2단계: SongTimestamp (높은 우선순위)** - 폴링으로 인한 중복 INSERT 위험이 가장 높음 - `bulk_upsert` 사용 권장 3. **3단계: 나머지 테이블 (중간 우선순위)** - Video, Song, Project, Lyric 순으로 적용 - 모두 `get_or_create_with_lock` 사용 --- ## 롤백 계획 문제 발생 시 원래 코드로 복구: 1. 코드 변경 사항 git revert 2. UNIQUE 인덱스는 유지 (데이터 무결성에 도움됨) --- ## 테스트 체크리스트 각 수정 후 확인 사항: - [ ] 새로운 task_id로 요청 시 정상 INSERT - [ ] 동일한 task_id로 재요청 시 UPDATE (에러 없이) - [ ] 동시 요청 테스트 (2개 이상 동시 요청) - [ ] 성능 저하 없는지 확인 --- ## 참고 문서 - [db_lock.md](./db_lock.md) - DB Lock 및 Upsert 패턴 가이드 - [db_utils.py](../../app/utils/db_utils.py) - 유틸리티 함수 구현체