""" Song API Router 이 모듈은 Suno API를 통한 노래 생성 관련 API 엔드포인트를 정의합니다. 엔드포인트 목록: - POST /song/generate/{task_id}: 노래 생성 요청 (task_id로 Project/Lyric 연결) - GET /song/status/{suno_task_id}: Suno API 노래 생성 상태 조회 - GET /song/download/{task_id}: 노래 다운로드 상태 조회 (DB polling) 사용 예시: from app.song.api.routers.v1.song import router app.include_router(router, prefix="/api/v1") """ from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.database.session import get_session from app.dependencies.pagination import ( PaginationParams, get_pagination_params, ) from app.home.models import Project from app.lyric.models import Lyric from app.song.models import Song from app.song.schemas.song_schema import ( DownloadSongResponse, GenerateSongRequest, GenerateSongResponse, PollingSongResponse, SongListItem, ) from app.utils.pagination import PaginatedResponse from app.utils.suno import SunoService router = APIRouter(prefix="/song", tags=["song"]) @router.post( "/generate/{task_id}", summary="노래 생성 요청", description=""" Suno API를 통해 노래 생성을 요청합니다. ## 경로 파라미터 - **task_id**: Project/Lyric의 task_id (필수) - 연관된 프로젝트와 가사를 조회하는 데 사용 ## 요청 필드 - **lyrics**: 노래에 사용할 가사 (필수) - **genre**: 음악 장르 (필수) - K-Pop, Pop, R&B, Hip-Hop, Ballad, EDM, Rock, Jazz 등 - **language**: 노래 언어 (선택, 기본값: Korean) ## 반환 정보 - **success**: 요청 성공 여부 - **task_id**: 내부 작업 ID (Project/Lyric task_id) - **suno_task_id**: Suno API 작업 ID (상태 조회에 사용) - **message**: 응답 메시지 ## 사용 예시 ``` POST /song/generate/019123ab-cdef-7890-abcd-ef1234567890 { "lyrics": "여기 군산에서 만나요\\n아름다운 하루를 함께", "genre": "K-Pop", "language": "Korean" } ``` ## 참고 - 생성되는 노래는 약 1분 이내 길이입니다. - suno_task_id를 사용하여 /status/{suno_task_id} 엔드포인트에서 생성 상태를 확인할 수 있습니다. - Song 테이블에 데이터가 저장되며, project_id와 lyric_id가 자동으로 연결됩니다. """, response_model=GenerateSongResponse, responses={ 200: {"description": "노래 생성 요청 성공"}, 404: {"description": "Project 또는 Lyric을 찾을 수 없음"}, 500: {"description": "노래 생성 요청 실패"}, }, ) async def generate_song( task_id: str, request_body: GenerateSongRequest, session: AsyncSession = Depends(get_session), ) -> GenerateSongResponse: """가사와 장르를 기반으로 Suno API를 통해 노래를 생성합니다. 1. task_id로 Project와 Lyric 조회 2. Song 테이블에 초기 데이터 저장 (status: processing) 3. Suno API 호출 4. suno_task_id 업데이트 후 응답 반환 """ print(f"[generate_song] START - task_id: {task_id}, genre: {request_body.genre}, language: {request_body.language}") try: # 1. task_id로 Project 조회 project_result = await session.execute( select(Project).where(Project.task_id == task_id) ) project = project_result.scalar_one_or_none() if not project: print(f"[generate_song] Project NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.", ) print(f"[generate_song] Project found - project_id: {project.id}, task_id: {task_id}") # 2. task_id로 Lyric 조회 lyric_result = await session.execute( select(Lyric).where(Lyric.task_id == task_id) ) lyric = lyric_result.scalar_one_or_none() if not lyric: print(f"[generate_song] Lyric NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.", ) print(f"[generate_song] Lyric found - lyric_id: {lyric.id}, task_id: {task_id}") # 3. Song 테이블에 초기 데이터 저장 song_prompt = ( f"[Lyrics]\n{request_body.lyrics}\n\n[Genre]\n{request_body.genre}" ) song = Song( project_id=project.id, lyric_id=lyric.id, task_id=task_id, suno_task_id=None, status="processing", song_prompt=song_prompt, language=request_body.language, ) session.add(song) await session.flush() # ID 생성을 위해 flush print(f"[generate_song] Song saved (processing) - task_id: {task_id}") # 4. Suno API 호출 print(f"[generate_song] Suno API generation started - task_id: {task_id}") suno_service = SunoService() suno_task_id = await suno_service.generate( prompt=request_body.lyrics, genre=request_body.genre, ) # 5. suno_task_id 업데이트 song.suno_task_id = suno_task_id await session.commit() print(f"[generate_song] SUCCESS - task_id: {task_id}, suno_task_id: {suno_task_id}") return GenerateSongResponse( success=True, task_id=task_id, suno_task_id=suno_task_id, message="노래 생성 요청이 접수되었습니다. suno_task_id로 상태를 조회하세요.", error_message=None, ) except HTTPException: raise except Exception as e: print(f"[generate_song] EXCEPTION - task_id: {task_id}, error: {e}") await session.rollback() return GenerateSongResponse( success=False, task_id=task_id, suno_task_id=None, message="노래 생성 요청에 실패했습니다.", error_message=str(e), ) @router.get( "/status/{suno_task_id}", summary="노래 생성 상태 조회", description=""" Suno API를 통해 노래 생성 작업의 상태를 조회합니다. SUCCESS 상태인 경우 백그라운드에서 MP3 파일을 다운로드하고 Azure Blob Storage에 업로드한 뒤 Song 테이블을 업데이트합니다. ## 경로 파라미터 - **suno_task_id**: 노래 생성 시 반환된 Suno API 작업 ID (필수) ## 반환 정보 - **success**: 조회 성공 여부 - **status**: 작업 상태 (PENDING, processing, SUCCESS, failed) - **message**: 상태 메시지 - **clips**: 생성된 노래 클립 목록 (완료 시) - **raw_response**: Suno API 원본 응답 ## 사용 예시 ``` GET /song/status/abc123... ``` ## 상태 값 - **PENDING**: 대기 중 - **processing**: 생성 중 - **SUCCESS**: 생성 완료 - **failed**: 생성 실패 ## 참고 - 스트림 URL: 30-40초 내 생성 - 다운로드 URL: 2-3분 내 생성 - SUCCESS 시 백그라운드에서 MP3 다운로드 → Azure Blob Storage 업로드 → Song 테이블 업데이트 진행 - 저장 경로: Azure Blob Storage ({BASE_URL}/{task_id}/song/{store_name}.mp3) - Song 테이블의 song_result_url에 Blob URL이 저장됩니다 """, response_model=PollingSongResponse, responses={ 200: {"description": "상태 조회 성공"}, 500: {"description": "상태 조회 실패"}, }, ) async def get_song_status( suno_task_id: str, session: AsyncSession = Depends(get_session), ) -> PollingSongResponse: """suno_task_id로 노래 생성 작업의 상태를 조회합니다. SUCCESS 상태인 경우 백그라운드에서 MP3 파일을 다운로드하고 Azure Blob Storage에 업로드한 뒤 Song 테이블의 status를 completed로, song_result_url을 Blob URL로 업데이트합니다. """ print(f"[get_song_status] START - suno_task_id: {suno_task_id}") try: suno_service = SunoService() result = await suno_service.get_task_status(suno_task_id) parsed_response = suno_service.parse_status_response(result) print(f"[get_song_status] Suno API response - suno_task_id: {suno_task_id}, status: {parsed_response.status}") # SUCCESS 상태인 경우 첫 번째 클립 정보를 DB에 직접 저장 if parsed_response.status == "SUCCESS" and parsed_response.clips: # 첫 번째 클립(clips[0])의 audioUrl과 duration 사용 first_clip = parsed_response.clips[0] audio_url = first_clip.audio_url clip_duration = first_clip.duration print(f"[get_song_status] Using first clip - id: {first_clip.id}, audio_url: {audio_url}, duration: {clip_duration}") if audio_url: # suno_task_id로 Song 조회 song_result = await session.execute( select(Song) .where(Song.suno_task_id == suno_task_id) .order_by(Song.created_at.desc()) .limit(1) ) song = song_result.scalar_one_or_none() if song and song.status != "completed": # 첫 번째 클립의 audio_url과 duration을 직접 DB에 저장 song.status = "completed" song.song_result_url = audio_url if clip_duration is not None: song.duration = clip_duration await session.commit() print(f"[get_song_status] Song updated - suno_task_id: {suno_task_id}, status: completed, song_result_url: {audio_url}, duration: {clip_duration}") elif song and song.status == "completed": print(f"[get_song_status] SKIPPED - Song already completed, suno_task_id: {suno_task_id}") print(f"[get_song_status] SUCCESS - suno_task_id: {suno_task_id}") return parsed_response except Exception as e: import traceback print(f"[get_song_status] EXCEPTION - suno_task_id: {suno_task_id}, error: {e}") return PollingSongResponse( success=False, status="error", message="상태 조회에 실패했습니다.", clips=None, raw_response=None, error_message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}", ) @router.get( "/download/{task_id}", summary="노래 생성 URL 조회", description=""" task_id를 기반으로 Song 테이블의 상태를 조회하고, completed인 경우 Project 정보와 노래 URL을 반환합니다. ## 경로 파라미터 - **task_id**: 프로젝트 task_id (필수) ## 반환 정보 - **success**: 조회 성공 여부 - **status**: 처리 상태 (processing, completed, failed, not_found) - **message**: 응답 메시지 - **store_name**: 업체명 - **region**: 지역명 - **detail_region_info**: 상세 지역 정보 - **task_id**: 작업 고유 식별자 - **language**: 언어 - **song_result_url**: 노래 결과 URL (completed 시, Azure Blob Storage URL) - **created_at**: 생성 일시 ## 사용 예시 ``` GET /song/download/019123ab-cdef-7890-abcd-ef1234567890 ``` ## 참고 - processing 상태인 경우 song_result_url은 null입니다. - completed 상태인 경우 Project 정보와 함께 song_result_url (Azure Blob URL)을 반환합니다. - song_result_url 형식: {AZURE_BLOB_BASE_URL}/{task_id}/song/{store_name}.mp3 """, response_model=DownloadSongResponse, responses={ 200: {"description": "조회 성공"}, 404: {"description": "Song을 찾을 수 없음"}, 500: {"description": "조회 실패"}, }, ) async def download_song( task_id: str, session: AsyncSession = Depends(get_session), ) -> DownloadSongResponse: """task_id로 Song 상태를 polling하고 completed 시 Project 정보와 노래 URL을 반환합니다.""" print(f"[download_song] START - task_id: {task_id}") try: # task_id로 Song 조회 (여러 개 있을 경우 가장 최근 것 선택) song_result = await session.execute( select(Song) .where(Song.task_id == task_id) .order_by(Song.created_at.desc()) .limit(1) ) song = song_result.scalar_one_or_none() if not song: print(f"[download_song] Song NOT FOUND - task_id: {task_id}") return DownloadSongResponse( success=False, status="not_found", message=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.", error_message="Song not found", ) print(f"[download_song] Song found - task_id: {task_id}, status: {song.status}") # processing 상태인 경우 if song.status == "processing": print(f"[download_song] PROCESSING - task_id: {task_id}") return DownloadSongResponse( success=True, status="processing", message="노래 생성이 진행 중입니다.", task_id=task_id, ) # failed 상태인 경우 if song.status == "failed": print(f"[download_song] FAILED - task_id: {task_id}") return DownloadSongResponse( success=False, status="failed", message="노래 생성에 실패했습니다.", task_id=task_id, error_message="Song generation failed", ) # completed 상태인 경우 - Project 정보 조회 project_result = await session.execute( select(Project).where(Project.id == song.project_id) ) project = project_result.scalar_one_or_none() print(f"[download_song] COMPLETED - task_id: {task_id}, song_result_url: {song.song_result_url}") return DownloadSongResponse( success=True, status="completed", message="노래 다운로드가 완료되었습니다.", store_name=project.store_name if project else None, region=project.region if project else None, detail_region_info=project.detail_region_info if project else None, task_id=task_id, language=project.language if project else None, song_result_url=song.song_result_url, created_at=song.created_at, ) except Exception as e: print(f"[download_song] EXCEPTION - task_id: {task_id}, error: {e}") return DownloadSongResponse( success=False, status="error", message="노래 다운로드 조회에 실패했습니다.", error_message=str(e), ) @router.get( "s/", summary="생성된 노래 목록 조회", description=""" 완료된 노래 목록을 페이지네이션하여 조회합니다. ## 쿼리 파라미터 - **page**: 페이지 번호 (1부터 시작, 기본값: 1) - **page_size**: 페이지당 데이터 수 (기본값: 10, 최대: 100) ## 반환 정보 - **items**: 노래 목록 (store_name, region, task_id, language, song_result_url, created_at) - **total**: 전체 데이터 수 - **page**: 현재 페이지 - **page_size**: 페이지당 데이터 수 - **total_pages**: 전체 페이지 수 - **has_next**: 다음 페이지 존재 여부 - **has_prev**: 이전 페이지 존재 여부 ## 사용 예시 ``` GET /songs/?page=1&page_size=10 ``` ## 참고 - status가 'completed'인 노래만 반환됩니다. - created_at 기준 내림차순 정렬됩니다. """, response_model=PaginatedResponse[SongListItem], responses={ 200: {"description": "노래 목록 조회 성공"}, 500: {"description": "조회 실패"}, }, ) async def get_songs( session: AsyncSession = Depends(get_session), pagination: PaginationParams = Depends(get_pagination_params), ) -> PaginatedResponse[SongListItem]: """완료된 노래 목록을 페이지네이션하여 반환합니다.""" print(f"[get_songs] START - page: {pagination.page}, page_size: {pagination.page_size}") try: offset = (pagination.page - 1) * pagination.page_size # 서브쿼리: task_id별 최신 Song의 id 조회 (completed 상태, created_at 기준) from sqlalchemy import and_ # task_id별 최신 created_at 조회 latest_subquery = ( select( Song.task_id, func.max(Song.created_at).label("max_created_at") ) .where(Song.status == "completed") .group_by(Song.task_id) .subquery() ) # 전체 개수 조회 (task_id별 최신 1개만) count_query = select(func.count()).select_from(latest_subquery) total_result = await session.execute(count_query) total = total_result.scalar() or 0 # 데이터 조회 (completed 상태, task_id별 created_at 기준 최신 1개만, 최신순) query = ( select(Song) .join( latest_subquery, and_( Song.task_id == latest_subquery.c.task_id, Song.created_at == latest_subquery.c.max_created_at ) ) .where(Song.status == "completed") .order_by(Song.created_at.desc()) .offset(offset) .limit(pagination.page_size) ) result = await session.execute(query) songs = result.scalars().all() # Project 정보와 함께 SongListItem으로 변환 items = [] for song in songs: # Project 조회 (song.project_id 직접 사용) project_result = await session.execute( select(Project).where(Project.id == song.project_id) ) project = project_result.scalar_one_or_none() item = SongListItem( store_name=project.store_name if project else None, region=project.region if project else None, task_id=song.task_id, language=song.language, song_result_url=song.song_result_url, created_at=song.created_at, ) items.append(item) # 개별 아이템 로그 print( f"[get_songs] Item - store_name: {item.store_name}, region: {item.region}, " f"task_id: {item.task_id}, language: {item.language}, " f"song_result_url: {item.song_result_url}, created_at: {item.created_at}" ) response = PaginatedResponse.create( items=items, total=total, page=pagination.page, page_size=pagination.page_size, ) print( f"[get_songs] SUCCESS - total: {total}, page: {pagination.page}, " f"page_size: {pagination.page_size}, items_count: {len(items)}" ) return response except Exception as e: print(f"[get_songs] EXCEPTION - error: {e}") raise HTTPException( status_code=500, detail=f"노래 목록 조회에 실패했습니다: {str(e)}", )