""" Song API Router 이 모듈은 Suno API를 통한 노래 생성 관련 API 엔드포인트를 정의합니다. 엔드포인트 목록: - POST /song/generate: 노래 생성 요청 - GET /song/status/{task_id}: 노래 생성 상태 조회 - GET /song/download/{task_id}: 노래 다운로드 사용 예시: from app.song.api.routers.v1.song import router app.include_router(router, prefix="/api/v1") """ from datetime import date from pathlib import Path import aiofiles import httpx from fastapi import APIRouter from uuid_extensions import uuid7str from app.song.schemas.song_schema import ( DownloadSongResponse, GenerateSongRequest, GenerateSongResponse, PollingSongResponse, SongClipData, ) from app.utils.suno import SunoService from config import prj_settings def _parse_suno_status_response(result: dict | None) -> PollingSongResponse: """Suno API 상태 응답을 파싱하여 PollingSongResponse로 변환합니다.""" if result is None: return PollingSongResponse( success=False, status="error", message="Suno API 응답이 비어있습니다.", clips=None, raw_response=None, error_message="Suno API returned None response", ) code = result.get("code", 0) data = result.get("data", {}) if code != 200: return PollingSongResponse( success=False, status="failed", message="Suno API 응답 오류", clips=None, raw_response=result, error_message=result.get("msg", "Unknown error"), ) status = data.get("status", "unknown") # 클립 데이터는 data.response.sunoData에 있음 (camelCase) # data.get()이 None을 반환할 수 있으므로 or {}로 처리 response_data = data.get("response") or {} clips_data = response_data.get("sunoData") or [] # 상태별 메시지 (Suno API는 다양한 상태값 반환) status_messages = { "pending": "노래 생성 대기 중입니다.", "processing": "노래를 생성하고 있습니다.", "complete": "노래 생성이 완료되었습니다.", "SUCCESS": "노래 생성이 완료되었습니다.", "TEXT_SUCCESS": "노래 생성이 완료되었습니다.", "failed": "노래 생성에 실패했습니다.", } # 클립 데이터 파싱 (Suno API는 camelCase 사용) clips = None if clips_data: clips = [ SongClipData( id=clip.get("id"), audio_url=clip.get("audioUrl"), stream_audio_url=clip.get("streamAudioUrl"), image_url=clip.get("imageUrl"), title=clip.get("title"), status=clip.get("status"), duration=clip.get("duration"), ) for clip in clips_data ] return PollingSongResponse( success=True, status=status, message=status_messages.get(status, f"상태: {status}"), clips=clips, raw_response=result, error_message=None, ) router = APIRouter(prefix="/song", tags=["song"]) @router.post( "/generate", summary="노래 생성 요청", description=""" Suno API를 통해 노래 생성을 요청합니다. ## 요청 필드 - **lyrics**: 노래에 사용할 가사 (필수) - **genre**: 음악 장르 (필수) - K-Pop, Pop, R&B, Hip-Hop, Ballad, EDM, Rock, Jazz 등 - **language**: 노래 언어 (선택, 기본값: Korean) ## 반환 정보 - **success**: 요청 성공 여부 - **task_id**: Suno 작업 ID (폴링에 사용) - **message**: 응답 메시지 ## 사용 예시 ``` POST /song/generate { "lyrics": "여기 군산에서 만나요\\n아름다운 하루를 함께", "genre": "K-Pop", "language": "Korean" } ``` ## 참고 - 생성되는 노래는 약 1분 이내 길이입니다. - task_id를 사용하여 /status/{task_id} 엔드포인트에서 생성 상태를 확인할 수 있습니다. """, response_model=GenerateSongResponse, responses={ 200: {"description": "노래 생성 요청 성공"}, 500: {"description": "노래 생성 요청 실패"}, }, ) async def generate_song( request_body: GenerateSongRequest, ) -> GenerateSongResponse: """가사와 장르를 기반으로 Suno API를 통해 노래를 생성합니다.""" try: suno_service = SunoService() task_id = await suno_service.generate( prompt=request_body.lyrics, genre=request_body.genre, ) return GenerateSongResponse( success=True, task_id=task_id, message="노래 생성 요청이 접수되었습니다. task_id로 상태를 조회하세요.", error_message=None, ) except Exception as e: return GenerateSongResponse( success=False, task_id=None, message="노래 생성 요청에 실패했습니다.", error_message=str(e), ) @router.get( "/status/{task_id}", summary="노래 생성 상태 조회", description=""" Suno API를 통해 노래 생성 작업의 상태를 조회합니다. ## 경로 파라미터 - **task_id**: 노래 생성 시 반환된 작업 ID (필수) ## 반환 정보 - **success**: 조회 성공 여부 - **status**: 작업 상태 (pending, processing, complete, failed) - **message**: 상태 메시지 - **clips**: 생성된 노래 클립 목록 (완료 시) - **raw_response**: Suno API 원본 응답 ## 사용 예시 ``` GET /song/status/abc123... ``` ## 상태 값 - **pending**: 대기 중 - **processing**: 생성 중 - **complete**: 생성 완료 - **failed**: 생성 실패 ## 참고 - 스트림 URL: 30-40초 내 생성 - 다운로드 URL: 2-3분 내 생성 """, response_model=PollingSongResponse, responses={ 200: {"description": "상태 조회 성공"}, 500: {"description": "상태 조회 실패"}, }, ) async def get_song_status( task_id: str, ) -> PollingSongResponse: """task_id로 노래 생성 작업의 상태를 조회합니다.""" try: suno_service = SunoService() result = await suno_service.get_task_status(task_id) return _parse_suno_status_response(result) except Exception as e: import traceback return PollingSongResponse( success=False, status="error", message="상태 조회에 실패했습니다.", clips=None, raw_response=None, error_message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}", ) @router.get( "/download/{task_id}", summary="노래 다운로드", description=""" 완료된 노래를 서버에 다운로드하고 접근 가능한 URL을 반환합니다. ## 경로 파라미터 - **task_id**: 노래 생성 시 반환된 작업 ID (필수) ## 반환 정보 - **success**: 다운로드 성공 여부 - **message**: 응답 메시지 - **file_path**: 저장된 파일의 상대 경로 - **file_url**: 프론트엔드에서 접근 가능한 파일 URL ## 사용 예시 ``` GET /song/download/abc123... ``` ## 참고 - 노래 생성이 완료된 상태(complete)에서만 다운로드 가능합니다. - 파일은 /media/{날짜}/{uuid7}/song.mp3 경로에 저장됩니다. - 반환된 file_url을 사용하여 프론트엔드에서 MP3를 재생할 수 있습니다. """, response_model=DownloadSongResponse, responses={ 200: {"description": "다운로드 성공"}, 400: {"description": "노래 생성이 완료되지 않음"}, 500: {"description": "다운로드 실패"}, }, ) async def download_song( task_id: str, ) -> DownloadSongResponse: """완료된 노래를 다운로드하여 서버에 저장하고 접근 URL을 반환합니다.""" try: suno_service = SunoService() result = await suno_service.get_task_status(task_id) # API 응답 확인 if result.get("code") != 200: return DownloadSongResponse( success=False, message="Suno API 응답 오류", error_message=result.get("msg", "Unknown error"), ) data = result.get("data", {}) status = data.get("status", "unknown") # 완료 상태 확인 (Suno API는 다양한 완료 상태값 반환) completed_statuses = {"complete", "SUCCESS", "TEXT_SUCCESS"} if status not in completed_statuses: return DownloadSongResponse( success=False, message=f"노래 생성이 완료되지 않았습니다. 현재 상태: {status}", error_message="노래 생성 완료 후 다운로드해 주세요.", ) # 클립 데이터는 data.response.sunoData에 있음 (camelCase) # data.get()이 None을 반환할 수 있으므로 or {}로 처리 response_data = data.get("response") or {} clips_data = response_data.get("sunoData") or [] if not clips_data: return DownloadSongResponse( success=False, message="생성된 노래 클립이 없습니다.", error_message="sunoData is empty", ) # 첫 번째 클립의 streamAudioUrl 가져오기 (camelCase) first_clip = clips_data[0] stream_audio_url = first_clip.get("streamAudioUrl") if not stream_audio_url: return DownloadSongResponse( success=False, message="스트리밍 오디오 URL을 찾을 수 없습니다.", error_message="stream_audio_url is missing", ) # 저장 경로 생성: media/{날짜}/{uuid7}/song.mp3 today = date.today().isoformat() unique_id = uuid7str() relative_dir = f"{today}/{unique_id}" file_name = "song.mp3" # 절대 경로 생성 media_dir = Path("media") / today / unique_id media_dir.mkdir(parents=True, exist_ok=True) file_path = media_dir / file_name # 오디오 파일 다운로드 (비동기 파일 쓰기) async with httpx.AsyncClient() as client: response = await client.get(stream_audio_url, timeout=60.0) response.raise_for_status() # aiofiles는 Path 객체를 문자열로 변환하여 사용 async with aiofiles.open(str(file_path), "wb") as f: await f.write(response.content) # 프론트엔드에서 접근 가능한 URL 생성 relative_path = f"/media/{relative_dir}/{file_name}" base_url = f"http://{prj_settings.PROJECT_DOMAIN}" file_url = f"{base_url}{relative_path}" return DownloadSongResponse( success=True, message="노래 다운로드가 완료되었습니다.", file_path=relative_path, file_url=file_url, ) except httpx.HTTPError as e: return DownloadSongResponse( success=False, message="오디오 파일 다운로드에 실패했습니다.", error_message=str(e), ) except Exception as e: return DownloadSongResponse( success=False, message="노래 다운로드에 실패했습니다.", error_message=str(e), )