o2o-castad-backend/app/song/api/routers/v1/song.py

352 lines
11 KiB
Python

"""
Song API Router
이 모듈은 Suno API를 통한 노래 생성 관련 API 엔드포인트를 정의합니다.
엔드포인트 목록:
- POST /song/generate: 노래 생성 요청
- GET /song/status/{task_id}: 노래 생성 상태 조회
- GET /song/download/{task_id}: 노래 다운로드
사용 예시:
from app.song.api.routers.v1.song import router
app.include_router(router, prefix="/api/v1")
"""
from datetime import date
from pathlib import Path
import aiofiles
import httpx
from fastapi import APIRouter
from uuid_extensions import uuid7str
from app.song.schemas.song_schema import (
DownloadSongResponse,
GenerateSongRequest,
GenerateSongResponse,
PollingSongResponse,
SongClipData,
)
from app.utils.suno import SunoService
from config import prj_settings
def _parse_suno_status_response(result: dict | None) -> PollingSongResponse:
"""Suno API 상태 응답을 파싱하여 PollingSongResponse로 변환합니다."""
if result is None:
return PollingSongResponse(
success=False,
status="error",
message="Suno API 응답이 비어있습니다.",
clips=None,
raw_response=None,
error_message="Suno API returned None response",
)
code = result.get("code", 0)
data = result.get("data", {})
if code != 200:
return PollingSongResponse(
success=False,
status="failed",
message="Suno API 응답 오류",
clips=None,
raw_response=result,
error_message=result.get("msg", "Unknown error"),
)
status = data.get("status", "unknown")
# 클립 데이터는 data.response.sunoData에 있음 (camelCase)
# data.get()이 None을 반환할 수 있으므로 or {}로 처리
response_data = data.get("response") or {}
clips_data = response_data.get("sunoData") or []
# 상태별 메시지 (Suno API는 다양한 상태값 반환)
status_messages = {
"pending": "노래 생성 대기 중입니다.",
"processing": "노래를 생성하고 있습니다.",
"complete": "노래 생성이 완료되었습니다.",
"SUCCESS": "노래 생성이 완료되었습니다.",
"TEXT_SUCCESS": "노래 생성이 완료되었습니다.",
"failed": "노래 생성에 실패했습니다.",
}
# 클립 데이터 파싱 (Suno API는 camelCase 사용)
clips = None
if clips_data:
clips = [
SongClipData(
id=clip.get("id"),
audio_url=clip.get("audioUrl"),
stream_audio_url=clip.get("streamAudioUrl"),
image_url=clip.get("imageUrl"),
title=clip.get("title"),
status=clip.get("status"),
duration=clip.get("duration"),
)
for clip in clips_data
]
return PollingSongResponse(
success=True,
status=status,
message=status_messages.get(status, f"상태: {status}"),
clips=clips,
raw_response=result,
error_message=None,
)
router = APIRouter(prefix="/song", tags=["song"])
@router.post(
"/generate",
summary="노래 생성 요청",
description="""
Suno API를 통해 노래 생성을 요청합니다.
## 요청 필드
- **lyrics**: 노래에 사용할 가사 (필수)
- **genre**: 음악 장르 (필수) - K-Pop, Pop, R&B, Hip-Hop, Ballad, EDM, Rock, Jazz 등
- **language**: 노래 언어 (선택, 기본값: Korean)
## 반환 정보
- **success**: 요청 성공 여부
- **task_id**: Suno 작업 ID (폴링에 사용)
- **message**: 응답 메시지
## 사용 예시
```
POST /song/generate
{
"lyrics": "여기 군산에서 만나요\\n아름다운 하루를 함께",
"genre": "K-Pop",
"language": "Korean"
}
```
## 참고
- 생성되는 노래는 약 1분 이내 길이입니다.
- task_id를 사용하여 /status/{task_id} 엔드포인트에서 생성 상태를 확인할 수 있습니다.
""",
response_model=GenerateSongResponse,
responses={
200: {"description": "노래 생성 요청 성공"},
500: {"description": "노래 생성 요청 실패"},
},
)
async def generate_song(
request_body: GenerateSongRequest,
) -> GenerateSongResponse:
"""가사와 장르를 기반으로 Suno API를 통해 노래를 생성합니다."""
try:
suno_service = SunoService()
task_id = await suno_service.generate(
prompt=request_body.lyrics,
genre=request_body.genre,
)
return GenerateSongResponse(
success=True,
task_id=task_id,
message="노래 생성 요청이 접수되었습니다. task_id로 상태를 조회하세요.",
error_message=None,
)
except Exception as e:
return GenerateSongResponse(
success=False,
task_id=None,
message="노래 생성 요청에 실패했습니다.",
error_message=str(e),
)
@router.get(
"/status/{task_id}",
summary="노래 생성 상태 조회",
description="""
Suno API를 통해 노래 생성 작업의 상태를 조회합니다.
## 경로 파라미터
- **task_id**: 노래 생성 시 반환된 작업 ID (필수)
## 반환 정보
- **success**: 조회 성공 여부
- **status**: 작업 상태 (pending, processing, complete, failed)
- **message**: 상태 메시지
- **clips**: 생성된 노래 클립 목록 (완료 시)
- **raw_response**: Suno API 원본 응답
## 사용 예시
```
GET /song/status/abc123...
```
## 상태 값
- **pending**: 대기 중
- **processing**: 생성 중
- **complete**: 생성 완료
- **failed**: 생성 실패
## 참고
- 스트림 URL: 30-40초 내 생성
- 다운로드 URL: 2-3분 내 생성
""",
response_model=PollingSongResponse,
responses={
200: {"description": "상태 조회 성공"},
500: {"description": "상태 조회 실패"},
},
)
async def get_song_status(
task_id: str,
) -> PollingSongResponse:
"""task_id로 노래 생성 작업의 상태를 조회합니다."""
try:
suno_service = SunoService()
result = await suno_service.get_task_status(task_id)
return _parse_suno_status_response(result)
except Exception as e:
import traceback
return PollingSongResponse(
success=False,
status="error",
message="상태 조회에 실패했습니다.",
clips=None,
raw_response=None,
error_message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}",
)
@router.get(
"/download/{task_id}",
summary="노래 다운로드",
description="""
완료된 노래를 서버에 다운로드하고 접근 가능한 URL을 반환합니다.
## 경로 파라미터
- **task_id**: 노래 생성 시 반환된 작업 ID (필수)
## 반환 정보
- **success**: 다운로드 성공 여부
- **message**: 응답 메시지
- **file_path**: 저장된 파일의 상대 경로
- **file_url**: 프론트엔드에서 접근 가능한 파일 URL
## 사용 예시
```
GET /song/download/abc123...
```
## 참고
- 노래 생성이 완료된 상태(complete)에서만 다운로드 가능합니다.
- 파일은 /media/{날짜}/{uuid7}/song.mp3 경로에 저장됩니다.
- 반환된 file_url을 사용하여 프론트엔드에서 MP3를 재생할 수 있습니다.
""",
response_model=DownloadSongResponse,
responses={
200: {"description": "다운로드 성공"},
400: {"description": "노래 생성이 완료되지 않음"},
500: {"description": "다운로드 실패"},
},
)
async def download_song(
task_id: str,
) -> DownloadSongResponse:
"""완료된 노래를 다운로드하여 서버에 저장하고 접근 URL을 반환합니다."""
try:
suno_service = SunoService()
result = await suno_service.get_task_status(task_id)
# API 응답 확인
if result.get("code") != 200:
return DownloadSongResponse(
success=False,
message="Suno API 응답 오류",
error_message=result.get("msg", "Unknown error"),
)
data = result.get("data", {})
status = data.get("status", "unknown")
# 완료 상태 확인 (Suno API는 다양한 완료 상태값 반환)
completed_statuses = {"complete", "SUCCESS", "TEXT_SUCCESS"}
if status not in completed_statuses:
return DownloadSongResponse(
success=False,
message=f"노래 생성이 완료되지 않았습니다. 현재 상태: {status}",
error_message="노래 생성 완료 후 다운로드해 주세요.",
)
# 클립 데이터는 data.response.sunoData에 있음 (camelCase)
# data.get()이 None을 반환할 수 있으므로 or {}로 처리
response_data = data.get("response") or {}
clips_data = response_data.get("sunoData") or []
if not clips_data:
return DownloadSongResponse(
success=False,
message="생성된 노래 클립이 없습니다.",
error_message="sunoData is empty",
)
# 첫 번째 클립의 streamAudioUrl 가져오기 (camelCase)
first_clip = clips_data[0]
stream_audio_url = first_clip.get("streamAudioUrl")
if not stream_audio_url:
return DownloadSongResponse(
success=False,
message="스트리밍 오디오 URL을 찾을 수 없습니다.",
error_message="stream_audio_url is missing",
)
# 저장 경로 생성: media/{날짜}/{uuid7}/song.mp3
today = date.today().isoformat()
unique_id = uuid7str()
relative_dir = f"{today}/{unique_id}"
file_name = "song.mp3"
# 절대 경로 생성
media_dir = Path("media") / today / unique_id
media_dir.mkdir(parents=True, exist_ok=True)
file_path = media_dir / file_name
# 오디오 파일 다운로드 (비동기 파일 쓰기)
async with httpx.AsyncClient() as client:
response = await client.get(stream_audio_url, timeout=60.0)
response.raise_for_status()
# aiofiles는 Path 객체를 문자열로 변환하여 사용
async with aiofiles.open(str(file_path), "wb") as f:
await f.write(response.content)
# 프론트엔드에서 접근 가능한 URL 생성
relative_path = f"/media/{relative_dir}/{file_name}"
base_url = f"http://{prj_settings.PROJECT_DOMAIN}"
file_url = f"{base_url}{relative_path}"
return DownloadSongResponse(
success=True,
message="노래 다운로드가 완료되었습니다.",
file_path=relative_path,
file_url=file_url,
)
except httpx.HTTPError as e:
return DownloadSongResponse(
success=False,
message="오디오 파일 다운로드에 실패했습니다.",
error_message=str(e),
)
except Exception as e:
return DownloadSongResponse(
success=False,
message="노래 다운로드에 실패했습니다.",
error_message=str(e),
)