""" Song API Router 이 모듈은 Suno API를 통한 노래 생성 관련 API 엔드포인트를 정의합니다. 엔드포인트 목록: - POST /song/generate/{task_id}: 노래 생성 요청 (task_id로 Project/Lyric 연결) - GET /song/status/{song_id}: Suno API 노래 생성 상태 조회 사용 예시: from app.song.api.routers.v1.song import router app.include_router(router, prefix="/api/v1") """ from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.database.session import get_session from app.home.models import Project from app.lyric.models import Lyric from app.song.models import Song, SongTimestamp from app.song.schemas.song_schema import ( GenerateSongRequest, GenerateSongResponse, PollingSongResponse, ) from app.song.worker.song_task import download_and_upload_song_by_suno_task_id from app.utils.logger import get_logger from app.utils.suno import SunoService logger = get_logger("song") router = APIRouter(prefix="/song", tags=["Song"]) @router.post( "/generate/{task_id}", summary="노래 생성 요청", description=""" Suno API를 통해 노래 생성을 요청합니다. ## 경로 파라미터 - **task_id**: Project/Lyric의 task_id (필수) - 연관된 프로젝트와 가사를 조회하는 데 사용 ## 요청 필드 - **lyrics**: 노래에 사용할 가사 (필수) - **genre**: 음악 장르 (필수) - K-Pop, Pop, R&B, Hip-Hop, Ballad, EDM, Rock, Jazz 등 - **language**: 노래 언어 (선택, 기본값: Korean) ## 반환 정보 - **success**: 요청 성공 여부 - **task_id**: 내부 작업 ID (Project/Lyric task_id) - **song_id**: Suno API 작업 ID (상태 조회에 사용) - **message**: 응답 메시지 ## 사용 예시 ``` POST /song/generate/019123ab-cdef-7890-abcd-ef1234567890 { "lyrics": "여기 군산에서 만나요\\n아름다운 하루를 함께", "genre": "K-Pop", "language": "Korean" } ``` ## 참고 - 생성되는 노래는 약 1분 이내 길이입니다. - song_id를 사용하여 /status/{song_id} 엔드포인트에서 생성 상태를 확인할 수 있습니다. - Song 테이블에 데이터가 저장되며, project_id와 lyric_id가 자동으로 연결됩니다. """, response_model=GenerateSongResponse, responses={ 200: {"description": "노래 생성 요청 성공"}, 404: {"description": "Project 또는 Lyric을 찾을 수 없음"}, 500: {"description": "노래 생성 요청 실패"}, }, ) async def generate_song( task_id: str, request_body: GenerateSongRequest, ) -> GenerateSongResponse: """가사와 장르를 기반으로 Suno API를 통해 노래를 생성합니다. 1. task_id로 Project와 Lyric 조회 2. Song 테이블에 초기 데이터 저장 (status: processing) 3. Suno API 호출 (세션 닫힌 상태) 4. suno_task_id 업데이트 후 응답 반환 Note: 이 함수는 Depends(get_session)을 사용하지 않고 명시적으로 세션을 관리합니다. 외부 API 호출 중 DB 커넥션이 유지되지 않도록 하여 커넥션 타임아웃 문제를 방지합니다. """ import time from app.database.session import AsyncSessionLocal request_start = time.perf_counter() logger.info( f"[generate_song] START - task_id: {task_id}, " f"genre: {request_body.genre}, language: {request_body.language}" ) # 외부 API 호출 전에 필요한 데이터를 저장할 변수들 project_id: int | None = None lyric_id: int | None = None song_id: int | None = None # ========================================================================== # 1단계: DB 조회 및 초기 데이터 저장 (세션을 명시적으로 열고 닫음) # ========================================================================== try: async with AsyncSessionLocal() as session: # Project 조회 (중복 시 최신 것 선택) project_result = await session.execute( select(Project) .where(Project.task_id == task_id) .order_by(Project.created_at.desc()) .limit(1) ) project = project_result.scalar_one_or_none() if not project: logger.warning( f"[generate_song] Project NOT FOUND - task_id: {task_id}" ) raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.", ) project_id = project.id # Lyric 조회 (중복 시 최신 것 선택) lyric_result = await session.execute( select(Lyric) .where(Lyric.task_id == task_id) .order_by(Lyric.created_at.desc()) .limit(1) ) lyric = lyric_result.scalar_one_or_none() logger.debug( f"[generate_song] Lyric query result - " f"id: {lyric.id if lyric else None}, " f"project_id: {lyric.project_id if lyric else None}, " f"task_id: {lyric.task_id if lyric else None}, " f"lyric_result: {lyric.lyric_result if lyric else None}" ) if not lyric: logger.warning(f"[generate_song] Lyric NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.", ) lyric_id = lyric.id query_time = time.perf_counter() logger.info( f"[generate_song] Queries completed - task_id: {task_id}, " f"project_id: {project_id}, lyric_id: {lyric_id}, " f"elapsed: {(query_time - request_start) * 1000:.1f}ms" ) # Song 테이블에 초기 데이터 저장 song_prompt = ( f"[Lyrics]\n{request_body.lyrics}\n\n[Genre]\n{request_body.genre}" ) logger.debug( f"[generate_song] Lyrics comparison - task_id: {task_id}\n" f"{'=' * 60}\n" f"[lyric.lyric_result]\n" f"{'-' * 60}\n" f"{lyric.lyric_result}\n" f"{'=' * 60}\n" f"[song_prompt]\n" f"{'-' * 60}\n" f"{song_prompt}\n" f"{'=' * 60}" ) song = Song( project_id=project_id, lyric_id=lyric_id, task_id=task_id, suno_task_id=None, status="processing", song_prompt=song_prompt, language=request_body.language, ) session.add(song) await session.commit() song_id = song.id stage1_time = time.perf_counter() logger.info( f"[generate_song] Stage 1 DONE - Song saved - " f"task_id: {task_id}, song_id: {song_id}, " f"elapsed: {(stage1_time - request_start) * 1000:.1f}ms" ) # 세션이 여기서 자동으로 닫힘 except HTTPException: raise except Exception as e: logger.error( f"[generate_song] Stage 1 EXCEPTION - " f"task_id: {task_id}, error: {type(e).__name__}: {e}" ) return GenerateSongResponse( success=False, task_id=task_id, song_id=None, message="노래 생성 요청에 실패했습니다.", error_message=str(e), ) # ========================================================================== # 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음) # ========================================================================== stage2_start = time.perf_counter() suno_task_id: str | None = None try: logger.info(f"[generate_song] Stage 2 START - Suno API - task_id: {task_id}") suno_service = SunoService() suno_task_id = await suno_service.generate( prompt=request_body.lyrics, genre=request_body.genre, ) stage2_time = time.perf_counter() logger.info( f"[generate_song] Stage 2 DONE - task_id: {task_id}, " f"suno_task_id: {suno_task_id}, " f"elapsed: {(stage2_time - stage2_start) * 1000:.1f}ms" ) except Exception as e: logger.error( f"[generate_song] Stage 2 EXCEPTION - Suno API failed - " f"task_id: {task_id}, error: {type(e).__name__}: {e}" ) # 외부 API 실패 시 Song 상태를 failed로 업데이트 async with AsyncSessionLocal() as update_session: song_result = await update_session.execute( select(Song).where(Song.id == song_id) ) song_to_update = song_result.scalar_one_or_none() if song_to_update: song_to_update.status = "failed" await update_session.commit() return GenerateSongResponse( success=False, task_id=task_id, song_id=None, message="노래 생성 요청에 실패했습니다.", error_message=str(e), ) # ========================================================================== # 3단계: suno_task_id 업데이트 (새 세션으로 빠르게 처리) # ========================================================================== stage3_start = time.perf_counter() logger.info(f"[generate_song] Stage 3 START - DB update - task_id: {task_id}") try: async with AsyncSessionLocal() as update_session: song_result = await update_session.execute( select(Song).where(Song.id == song_id) ) song_to_update = song_result.scalar_one_or_none() if song_to_update: song_to_update.suno_task_id = suno_task_id await update_session.commit() stage3_time = time.perf_counter() total_time = stage3_time - request_start logger.info( f"[generate_song] Stage 3 DONE - task_id: {task_id}, " f"elapsed: {(stage3_time - stage3_start) * 1000:.1f}ms" ) logger.info( f"[generate_song] SUCCESS - task_id: {task_id}, " f"suno_task_id: {suno_task_id}, " f"total_time: {total_time * 1000:.1f}ms" ) return GenerateSongResponse( success=True, task_id=task_id, song_id=suno_task_id, message="노래 생성 요청이 접수되었습니다. song_id로 상태를 조회하세요.", error_message=None, ) except Exception as e: logger.error( f"[generate_song] Stage 3 EXCEPTION - " f"task_id: {task_id}, error: {type(e).__name__}: {e}" ) return GenerateSongResponse( success=False, task_id=task_id, song_id=suno_task_id, message="노래 생성은 요청되었으나 DB 업데이트에 실패했습니다.", error_message=str(e), ) @router.get( "/status/{song_id}", summary="노래 생성 상태 조회 (Suno API)", description=""" Suno API를 통해 노래 생성 작업의 상태를 조회합니다. SUCCESS 상태인 경우 백그라운드에서 MP3 파일을 다운로드하고 Azure Blob Storage에 업로드를 시작합니다. ## 경로 파라미터 - **song_id**: 노래 생성 시 반환된 Suno API 작업 ID (필수) ## 반환 정보 - **success**: 조회 성공 여부 - **status**: Suno API 작업 상태 - **message**: 상태 메시지 ## 사용 예시 ``` GET /song/status/abc123... ``` ## 상태 값 (Suno API 응답) - **PENDING**: Suno API 대기 중 - **processing**: Suno API에서 노래 생성 중 - **SUCCESS**: Suno API 노래 생성 완료 (백그라운드 Blob 업로드 시작) - **TEXT_SUCCESS**: Suno API 노래 생성 완료 - **failed**: Suno API 노래 생성 실패 - **error**: API 조회 오류 ## 참고 - 이 엔드포인트는 Suno API의 상태를 반환합니다 - SUCCESS 응답 시 백그라운드에서 MP3 다운로드 → Azure Blob Storage 업로드가 시작됩니다 - Song 테이블 상태: processing → uploading → completed """, response_model=PollingSongResponse, responses={ 200: {"description": "상태 조회 성공"}, }, ) async def get_song_status( song_id: str, background_tasks: BackgroundTasks, session: AsyncSession = Depends(get_session), ) -> PollingSongResponse: """song_id로 노래 생성 작업의 상태를 조회합니다. SUCCESS 상태인 경우 백그라운드에서 MP3 파일을 다운로드하고 Azure Blob Storage에 업로드한 뒤 Song 테이블의 status를 completed로, song_result_url을 Blob URL로 업데이트합니다. """ suno_task_id = song_id # 임시방편 / 외부 suno 노출 방지 logger.info(f"[get_song_status] START - song_id: {suno_task_id}") try: suno_service = SunoService() result = await suno_service.get_task_status(suno_task_id) logger.debug( f"[get_song_status] Suno API raw response - song_id: {suno_task_id}, result: {result}" ) parsed_response = suno_service.parse_status_response(result) logger.info( f"[get_song_status] Suno API response - song_id: {suno_task_id}, status: {parsed_response.status}" ) if parsed_response.status == "TEXT_SUCCESS" and result: parsed_response.status = "processing" return parsed_response # SUCCESS 상태인 경우 백그라운드에서 MP3 다운로드 및 Blob 업로드 진행 if parsed_response.status == "SUCCESS" and result: # result에서 직접 clips 데이터 추출 data = result.get("data", {}) response_data = data.get("response") or {} clips_data = response_data.get("sunoData") or [] if clips_data: # 첫 번째 클립(clips[0])의 audioUrl과 duration 사용 first_clip = clips_data[0] audio_url = first_clip.get("audioUrl") clip_duration = first_clip.get("duration") logger.debug( f"[get_song_status] Using first clip - id: {first_clip.get('id')}, audio_url: {audio_url}, duration: {clip_duration}" ) if audio_url: # song_id로 Song 조회 song_result = await session.execute( select(Song) .where(Song.suno_task_id == song_id) .order_by(Song.created_at.desc()) .limit(1) ) song = song_result.scalar_one_or_none() # processing 상태인 경우에만 백그라운드 태스크 실행 (중복 방지) if song and song.status == "processing": # store_name 조회 project_result = await session.execute( select(Project).where(Project.id == song.project_id) ) project = project_result.scalar_one_or_none() store_name = project.store_name if project else "song" # 상태를 uploading으로 변경 (중복 호출 방지) song.status = "uploading" song.suno_audio_id = first_clip.get("id") await session.commit() logger.info( f"[get_song_status] Song status changed to uploading - song_id: {suno_task_id}" ) # 백그라운드 태스크로 MP3 다운로드 및 Blob 업로드 실행 background_tasks.add_task( download_and_upload_song_by_suno_task_id, suno_task_id=song_id, audio_url=audio_url, store_name=store_name, duration=clip_duration, ) logger.info( f"[get_song_status] Background task scheduled - song_id: {suno_task_id}, store_name: {store_name}" ) suno_audio_id = first_clip.get("id") word_data = await suno_service.get_lyric_timestamp( suno_task_id, suno_audio_id ) logger.debug( f"[get_song_status] word_data from get_lyric_timestamp - " f"suno_task_id: {suno_task_id}, suno_audio_id: {suno_audio_id}, " f"word_data: {word_data}" ) lyric_result = await session.execute( select(Lyric) .where(Lyric.task_id == song.task_id) .order_by(Lyric.created_at.desc()) .limit(1) ) lyric = lyric_result.scalar_one_or_none() gt_lyric = lyric.lyric_result lyric_line_list = gt_lyric.split("\n") sentences = [ lyric_line.strip(",. ") for lyric_line in lyric_line_list if lyric_line and lyric_line != "---" ] logger.debug( f"[get_song_status] sentences from lyric - " f"sentences: {sentences}" ) timestamped_lyrics = suno_service.align_lyrics( word_data, sentences ) logger.debug( f"[get_song_status] sentences from lyric - " f"sentences: {sentences}" ) # TODO : DB upload timestamped_lyrics for order_idx, timestamped_lyric in enumerate( timestamped_lyrics ): song_timestamp = SongTimestamp( suno_audio_id=suno_audio_id, order_idx=order_idx, lyric_line=timestamped_lyric["text"], start_time=timestamped_lyric["start_sec"], end_time=timestamped_lyric["end_sec"], ) session.add(song_timestamp) await session.commit() parsed_response.status = "processing" elif song and song.status == "uploading": logger.info( f"[get_song_status] SKIPPED - Song is already uploading, song_id: {suno_task_id}" ) parsed_response.status = "uploading" elif song and song.status == "completed": logger.info( f"[get_song_status] SKIPPED - Song already completed, song_id: {suno_task_id}" ) parsed_response.song_result_url = song.song_result_url else: # audio_url이 없는 경우 에러 반환 logger.error( f"[get_song_status] ERROR - audio_url not found in clips_data, song_id: {suno_task_id}" ) return PollingSongResponse( success=False, status="error", message="Suno API 응답에서 audio_url을 찾을 수 없습니다.", error_message="audio_url not found in Suno API response", ) else: # clips_data가 없는 경우 에러 반환 logger.error( f"[get_song_status] ERROR - clips_data not found, song_id: {suno_task_id}" ) return PollingSongResponse( success=False, status="error", message="Suno API 응답에서 클립 데이터를 찾을 수 없습니다.", error_message="clips_data not found in Suno API response", ) logger.info(f"[get_song_status] END - song_id: {suno_task_id}") return parsed_response except Exception as e: import traceback logger.error(f"[get_song_status] EXCEPTION - song_id: {song_id}, error: {e}") return PollingSongResponse( success=False, status="error", message="상태 조회에 실패했습니다.", error_message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}", )