351 lines
11 KiB
Python
351 lines
11 KiB
Python
"""
|
|
Song API Router
|
|
|
|
이 모듈은 Suno API를 통한 노래 생성 관련 API 엔드포인트를 정의합니다.
|
|
|
|
엔드포인트 목록:
|
|
- POST /song/generate: 노래 생성 요청
|
|
- GET /song/status/{task_id}: 노래 생성 상태 조회
|
|
- GET /song/download/{task_id}: 노래 다운로드
|
|
|
|
사용 예시:
|
|
from app.song.api.routers.v1.song import router
|
|
app.include_router(router, prefix="/api/v1")
|
|
"""
|
|
|
|
from datetime import date
|
|
from pathlib import Path
|
|
|
|
import aiofiles
|
|
import httpx
|
|
from fastapi import APIRouter
|
|
from uuid_extensions import uuid7str
|
|
|
|
from app.song.schemas.song_schema import (
|
|
DownloadSongResponse,
|
|
GenerateSongRequest,
|
|
GenerateSongResponse,
|
|
PollingSongResponse,
|
|
SongClipData,
|
|
)
|
|
from app.utils.suno import SunoService
|
|
from config import prj_settings
|
|
|
|
|
|
def _parse_suno_status_response(result: dict | None) -> PollingSongResponse:
|
|
"""Suno API 상태 응답을 파싱하여 PollingSongResponse로 변환합니다."""
|
|
if result is None:
|
|
return PollingSongResponse(
|
|
success=False,
|
|
status="error",
|
|
message="Suno API 응답이 비어있습니다.",
|
|
clips=None,
|
|
raw_response=None,
|
|
error_message="Suno API returned None response",
|
|
)
|
|
|
|
code = result.get("code", 0)
|
|
data = result.get("data", {})
|
|
|
|
if code != 200:
|
|
return PollingSongResponse(
|
|
success=False,
|
|
status="failed",
|
|
message="Suno API 응답 오류",
|
|
clips=None,
|
|
raw_response=result,
|
|
error_message=result.get("msg", "Unknown error"),
|
|
)
|
|
|
|
status = data.get("status", "unknown")
|
|
|
|
# 클립 데이터는 data.response.sunoData에 있음 (camelCase)
|
|
# data.get()이 None을 반환할 수 있으므로 or {}로 처리
|
|
response_data = data.get("response") or {}
|
|
clips_data = response_data.get("sunoData") or []
|
|
|
|
# 상태별 메시지 (Suno API는 다양한 상태값 반환)
|
|
status_messages = {
|
|
"pending": "노래 생성 대기 중입니다.",
|
|
"processing": "노래를 생성하고 있습니다.",
|
|
"complete": "노래 생성이 완료되었습니다.",
|
|
"SUCCESS": "노래 생성이 완료되었습니다.",
|
|
"TEXT_SUCCESS": "노래 생성이 완료되었습니다.",
|
|
"failed": "노래 생성에 실패했습니다.",
|
|
}
|
|
|
|
# 클립 데이터 파싱 (Suno API는 camelCase 사용)
|
|
clips = None
|
|
if clips_data:
|
|
clips = [
|
|
SongClipData(
|
|
id=clip.get("id"),
|
|
audio_url=clip.get("audioUrl"),
|
|
stream_audio_url=clip.get("streamAudioUrl"),
|
|
image_url=clip.get("imageUrl"),
|
|
title=clip.get("title"),
|
|
status=clip.get("status"),
|
|
duration=clip.get("duration"),
|
|
)
|
|
for clip in clips_data
|
|
]
|
|
|
|
return PollingSongResponse(
|
|
success=True,
|
|
status=status,
|
|
message=status_messages.get(status, f"상태: {status}"),
|
|
clips=clips,
|
|
raw_response=result,
|
|
error_message=None,
|
|
)
|
|
|
|
router = APIRouter(prefix="/song", tags=["song"])
|
|
|
|
|
|
@router.post(
|
|
"/generate",
|
|
summary="노래 생성 요청",
|
|
description="""
|
|
Suno API를 통해 노래 생성을 요청합니다.
|
|
|
|
## 요청 필드
|
|
- **lyrics**: 노래에 사용할 가사 (필수)
|
|
- **genre**: 음악 장르 (필수) - K-Pop, Pop, R&B, Hip-Hop, Ballad, EDM, Rock, Jazz 등
|
|
- **language**: 노래 언어 (선택, 기본값: Korean)
|
|
|
|
## 반환 정보
|
|
- **success**: 요청 성공 여부
|
|
- **task_id**: Suno 작업 ID (폴링에 사용)
|
|
- **message**: 응답 메시지
|
|
|
|
## 사용 예시
|
|
```
|
|
POST /song/generate
|
|
{
|
|
"lyrics": "여기 군산에서 만나요\\n아름다운 하루를 함께",
|
|
"genre": "K-Pop",
|
|
"language": "Korean"
|
|
}
|
|
```
|
|
|
|
## 참고
|
|
- 생성되는 노래는 약 1분 이내 길이입니다.
|
|
- task_id를 사용하여 /status/{task_id} 엔드포인트에서 생성 상태를 확인할 수 있습니다.
|
|
""",
|
|
response_model=GenerateSongResponse,
|
|
responses={
|
|
200: {"description": "노래 생성 요청 성공"},
|
|
500: {"description": "노래 생성 요청 실패"},
|
|
},
|
|
)
|
|
async def generate_song(
|
|
request_body: GenerateSongRequest,
|
|
) -> GenerateSongResponse:
|
|
"""가사와 장르를 기반으로 Suno API를 통해 노래를 생성합니다."""
|
|
try:
|
|
suno_service = SunoService()
|
|
|
|
task_id = await suno_service.generate(
|
|
prompt=request_body.lyrics,
|
|
genre=request_body.genre,
|
|
)
|
|
|
|
return GenerateSongResponse(
|
|
success=True,
|
|
task_id=task_id,
|
|
message="노래 생성 요청이 접수되었습니다. task_id로 상태를 조회하세요.",
|
|
error_message=None,
|
|
)
|
|
except Exception as e:
|
|
return GenerateSongResponse(
|
|
success=False,
|
|
task_id=None,
|
|
message="노래 생성 요청에 실패했습니다.",
|
|
error_message=str(e),
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/status/{task_id}",
|
|
summary="노래 생성 상태 조회",
|
|
description="""
|
|
Suno API를 통해 노래 생성 작업의 상태를 조회합니다.
|
|
|
|
## 경로 파라미터
|
|
- **task_id**: 노래 생성 시 반환된 작업 ID (필수)
|
|
|
|
## 반환 정보
|
|
- **success**: 조회 성공 여부
|
|
- **status**: 작업 상태 (pending, processing, complete, failed)
|
|
- **message**: 상태 메시지
|
|
- **clips**: 생성된 노래 클립 목록 (완료 시)
|
|
- **raw_response**: Suno API 원본 응답
|
|
|
|
## 사용 예시
|
|
```
|
|
GET /song/status/abc123...
|
|
```
|
|
|
|
## 상태 값
|
|
- **pending**: 대기 중
|
|
- **processing**: 생성 중
|
|
- **complete**: 생성 완료
|
|
- **failed**: 생성 실패
|
|
|
|
## 참고
|
|
- 스트림 URL: 30-40초 내 생성
|
|
- 다운로드 URL: 2-3분 내 생성
|
|
""",
|
|
response_model=PollingSongResponse,
|
|
responses={
|
|
200: {"description": "상태 조회 성공"},
|
|
500: {"description": "상태 조회 실패"},
|
|
},
|
|
)
|
|
async def get_song_status(
|
|
task_id: str,
|
|
) -> PollingSongResponse:
|
|
"""task_id로 노래 생성 작업의 상태를 조회합니다."""
|
|
try:
|
|
suno_service = SunoService()
|
|
result = await suno_service.get_task_status(task_id)
|
|
return _parse_suno_status_response(result)
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
return PollingSongResponse(
|
|
success=False,
|
|
status="error",
|
|
message="상태 조회에 실패했습니다.",
|
|
clips=None,
|
|
raw_response=None,
|
|
error_message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}",
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/download/{task_id}",
|
|
summary="노래 다운로드",
|
|
description="""
|
|
완료된 노래를 서버에 다운로드하고 접근 가능한 URL을 반환합니다.
|
|
|
|
## 경로 파라미터
|
|
- **task_id**: 노래 생성 시 반환된 작업 ID (필수)
|
|
|
|
## 반환 정보
|
|
- **success**: 다운로드 성공 여부
|
|
- **message**: 응답 메시지
|
|
- **file_path**: 저장된 파일의 상대 경로
|
|
- **file_url**: 프론트엔드에서 접근 가능한 파일 URL
|
|
|
|
## 사용 예시
|
|
```
|
|
GET /song/download/abc123...
|
|
```
|
|
|
|
## 참고
|
|
- 노래 생성이 완료된 상태(complete)에서만 다운로드 가능합니다.
|
|
- 파일은 /media/{날짜}/{uuid7}/song.mp3 경로에 저장됩니다.
|
|
- 반환된 file_url을 사용하여 프론트엔드에서 MP3를 재생할 수 있습니다.
|
|
""",
|
|
response_model=DownloadSongResponse,
|
|
responses={
|
|
200: {"description": "다운로드 성공"},
|
|
400: {"description": "노래 생성이 완료되지 않음"},
|
|
500: {"description": "다운로드 실패"},
|
|
},
|
|
)
|
|
async def download_song(
|
|
task_id: str,
|
|
) -> DownloadSongResponse:
|
|
"""완료된 노래를 다운로드하여 서버에 저장하고 접근 URL을 반환합니다."""
|
|
try:
|
|
suno_service = SunoService()
|
|
result = await suno_service.get_task_status(task_id)
|
|
|
|
# API 응답 확인
|
|
if result.get("code") != 200:
|
|
return DownloadSongResponse(
|
|
success=False,
|
|
message="Suno API 응답 오류",
|
|
error_message=result.get("msg", "Unknown error"),
|
|
)
|
|
|
|
data = result.get("data", {})
|
|
status = data.get("status", "unknown")
|
|
|
|
# 완료 상태 확인 (Suno API는 다양한 완료 상태값 반환)
|
|
completed_statuses = {"complete", "SUCCESS", "TEXT_SUCCESS"}
|
|
if status not in completed_statuses:
|
|
return DownloadSongResponse(
|
|
success=False,
|
|
message=f"노래 생성이 완료되지 않았습니다. 현재 상태: {status}",
|
|
error_message="노래 생성 완료 후 다운로드해 주세요.",
|
|
)
|
|
|
|
# 클립 데이터는 data.response.sunoData에 있음 (camelCase)
|
|
# data.get()이 None을 반환할 수 있으므로 or {}로 처리
|
|
response_data = data.get("response") or {}
|
|
clips_data = response_data.get("sunoData") or []
|
|
if not clips_data:
|
|
return DownloadSongResponse(
|
|
success=False,
|
|
message="생성된 노래 클립이 없습니다.",
|
|
error_message="sunoData is empty",
|
|
)
|
|
|
|
# 첫 번째 클립의 streamAudioUrl 가져오기 (camelCase)
|
|
first_clip = clips_data[0]
|
|
stream_audio_url = first_clip.get("streamAudioUrl")
|
|
|
|
if not stream_audio_url:
|
|
return DownloadSongResponse(
|
|
success=False,
|
|
message="스트리밍 오디오 URL을 찾을 수 없습니다.",
|
|
error_message="stream_audio_url is missing",
|
|
)
|
|
|
|
# 저장 경로 생성: media/{날짜}/{uuid7}/song.mp3
|
|
today = date.today().isoformat()
|
|
unique_id = uuid7str()
|
|
relative_dir = f"{today}/{unique_id}"
|
|
file_name = "song.mp3"
|
|
|
|
# 절대 경로 생성
|
|
media_dir = Path("media") / today / unique_id
|
|
media_dir.mkdir(parents=True, exist_ok=True)
|
|
file_path = media_dir / file_name
|
|
|
|
# 오디오 파일 다운로드 (비동기 파일 쓰기)
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(stream_audio_url, timeout=60.0)
|
|
response.raise_for_status()
|
|
|
|
async with aiofiles.open(file_path, "wb") as f:
|
|
await f.write(response.content)
|
|
|
|
# 프론트엔드에서 접근 가능한 URL 생성
|
|
relative_path = f"/media/{relative_dir}/{file_name}"
|
|
base_url = f"http://{prj_settings.PROJECT_DOMAIN}"
|
|
file_url = f"{base_url}{relative_path}"
|
|
|
|
return DownloadSongResponse(
|
|
success=True,
|
|
message="노래 다운로드가 완료되었습니다.",
|
|
file_path=relative_path,
|
|
file_url=file_url,
|
|
)
|
|
|
|
except httpx.HTTPError as e:
|
|
return DownloadSongResponse(
|
|
success=False,
|
|
message="오디오 파일 다운로드에 실패했습니다.",
|
|
error_message=str(e),
|
|
)
|
|
except Exception as e:
|
|
return DownloadSongResponse(
|
|
success=False,
|
|
message="노래 다운로드에 실패했습니다.",
|
|
error_message=str(e),
|
|
)
|