o2o-castad-backend/app/utils/suno.py

366 lines
12 KiB
Python

"""
Suno API 클라이언트 모듈
API 문서: https://docs.sunoapi.org
## 사용법
```python
from app.utils.suno import SunoService
# config에서 자동으로 API 키를 가져옴
suno = SunoService()
# 또는 명시적으로 API 키 전달
suno = SunoService(api_key="your_api_key")
# 음악 생성 요청
task_id = await suno.generate(
prompt="[Verse]\\n오늘도 좋은 하루...",
style="K-Pop, Happy, 110 BPM",
title="좋은 하루"
)
# 상태 확인 (폴링 방식)
result = await suno.get_task_status(task_id)
# 상태 응답 파싱
parsed = suno.parse_status_response(result)
```
## 콜백 URL 사용법
generate() 호출 시 callback_url 파라미터를 전달하면 생성 완료 시 해당 URL로 POST 요청이 전송됩니다.
콜백 요청 형식:
```json
{
"code": 200,
"msg": "All generated successfully.",
"data": {
"callbackType": "complete",
"task_id": "작업ID",
"data": [
{
"id": "clip_id",
"audio_url": "https://...",
"image_url": "https://...",
"title": "곡 제목",
"status": "complete"
}
]
}
}
```
콜백 주의사항:
- HTTPS 프로토콜 권장
- 15초 내 응답 필수
- 동일 task_id에 대해 여러 콜백 수신 가능 (멱등성 처리 필요)
"""
from typing import Any, List, Optional
import httpx
from config import apikey_settings
from app.song.schemas.song_schema import PollingSongResponse, SongClipData
class SunoService:
"""Suno API를 통한 AI 음악 생성 서비스"""
BASE_URL = "https://api.sunoapi.org/api/v1"
def __init__(self, api_key: str | None = None):
"""
Args:
api_key: Suno API 키 (Bearer token으로 사용)
None일 경우 config에서 자동으로 가져옴
"""
self.api_key = api_key or apikey_settings.SUNO_API_KEY
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
async def generate(
self,
prompt: str,
genre: str | None = None,
callback_url: str | None = None,
) -> str:
"""
음악 생성 요청
Args:
prompt: 가사 (customMode=true일 때 가사로 사용)
1분 이내 길이의 노래에 적합한 가사여야 함
genre: 음악 장르 (예: "K-Pop", "Pop", "R&B", "Hip-Hop", "Ballad", "EDM")
None일 경우 style 파라미터를 전송하지 않음
callback_url: 생성 완료 시 알림 받을 URL (None일 경우 config에서 기본값 사용)
Returns:
task_id: 작업 추적용 ID
Note:
- 스트림 URL: 30-40초 내 생성
- 다운로드 URL: 2-3분 내 생성
- 생성되는 노래는 약 1분 이내의 길이
"""
# 정확히 1분 길이의 노래 생성을 위한 프롬프트 조건 추가
formatted_prompt = f"[Song Duration: Exactly 1 minute - Must be precisely 60 seconds]\n{prompt}"
# callback_url이 없으면 config에서 기본값 사용 (Suno API 필수 파라미터)
actual_callback_url = callback_url or apikey_settings.SUNO_CALLBACK_URL
payload: dict[str, Any] = {
"model": "V5",
"customMode": True,
"instrumental": False,
"prompt": formatted_prompt,
"callBackUrl": actual_callback_url,
}
# genre가 있을 때만 style 추가
if genre:
payload["style"] = genre
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/generate",
headers=self.headers,
json=payload,
timeout=30.0,
)
response.raise_for_status()
data = response.json()
# 응답: {"code": 200, "msg": "success", "data": {"taskId": "..."}}
# API 응답 검증
if data is None:
raise ValueError("Suno API returned empty response")
if data.get("code") != 200:
error_msg = data.get("msg", "Unknown error")
raise ValueError(f"Suno API error: {error_msg}")
response_data = data.get("data")
if response_data is None:
raise ValueError(f"Suno API response missing 'data' field: {data}")
task_id = response_data.get("taskId")
if task_id is None:
raise ValueError(f"Suno API response missing 'taskId': {response_data}")
return task_id
async def get_task_status(self, task_id: str) -> dict[str, Any]:
"""
음악 생성 작업 상태 확인
Args:
task_id: generate()에서 반환된 작업 ID
Returns:
작업 상태 정보 (status, audio_url, image_url 등 포함)
Note:
폴링 방식으로 상태 확인 시 사용.
콜백 URL을 사용하면 폴링 없이 결과를 받을 수 있음.
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/generate/record-info",
headers=self.headers,
params={"taskId": task_id},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
if data is None:
raise ValueError("Suno API returned empty response for task status")
return data
async def get_lyric_timestamp(self, task_id: str, audio_id: str) -> dict[str, Any]:
"""
음악 타임스탬프 정보 추출
Args:
task_id: generate()에서 반환된 작업 ID
audio_id: 사용할 audio id
Returns:
data.alignedWords: 수노 가사 input - startS endS 시간 데이터 매핑
"""
payload = {"task_id": task_id, "audio_id": audio_id}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/generate/get-timestamped-lyrics",
headers=self.headers,
json=payload,
timeout=120.0,
)
response.raise_for_status()
data = response.json()
if not data or not data["data"]:
raise ValueError("Suno API returned empty response for task status")
return data["data"]["alignedWords"]
def parse_status_response(self, result: dict | None) -> PollingSongResponse:
"""Suno API 상태 응답을 파싱하여 PollingSongResponse로 변환합니다.
Args:
result: get_task_status()에서 반환된 원본 응답
Returns:
PollingSongResponse: 파싱된 상태 응답
Note:
응답 구조:
- PENDING 상태: data.response가 null, data.status가 "PENDING"
- SUCCESS 상태: data.response.sunoData에 클립 데이터 배열, data.status가 "SUCCESS"
"""
if result is None:
return PollingSongResponse(
success=False,
status="error",
message="Suno API 응답이 비어있습니다.",
error_message="Suno API returned None response",
)
code = result.get("code", 0)
data = result.get("data", {})
if code != 200:
return PollingSongResponse(
success=False,
status="failed",
message="Suno API 응답 오류",
error_message=result.get("msg", "Unknown error"),
)
# status는 data.status에 있음 (PENDING, SUCCESS 등)
status = data.get("status", "unknown")
# 클립 데이터는 data.response.sunoData에 있음 (camelCase)
# PENDING 상태에서는 response가 null
response_data = data.get("response") or {}
clips_data = response_data.get("sunoData") or []
# 상태별 메시지
status_messages = {
"PENDING": "노래 생성 대기 중입니다.",
"processing": "노래를 생성하고 있습니다.",
"complete": "노래 생성이 완료되었습니다.",
"SUCCESS": "노래 생성이 완료되었습니다.",
"TEXT_SUCCESS": "노래 생성이 완료되었습니다.",
"failed": "노래 생성에 실패했습니다.",
}
# 클립 데이터 파싱 (Suno API는 camelCase 사용)
clips = None
if clips_data:
clips = [
SongClipData(
id=clip.get("id"),
audio_url=clip.get("audioUrl"),
stream_audio_url=clip.get("streamAudioUrl"),
image_url=clip.get("imageUrl"),
title=clip.get("title"),
status=clip.get("status"),
duration=clip.get("duration"),
)
for clip in clips_data
]
return PollingSongResponse(
success=True,
status=status,
message=status_messages.get(status, f"상태: {status}"),
error_message=None,
)
def align_lyrics(self, word_data: list[dict], sentences: list[str]) -> list[dict]:
"""
word의 시작/끝 포지션만 저장하고, 시간은 word에서 참조
"""
# Step 1: 전체 텍스트 + word별 포지션 범위 저장
full_text = ""
word_ranges = [] # [(start_pos, end_pos, entry), ...]
for entry in word_data:
word = entry["word"]
start_pos = len(full_text)
full_text += word
end_pos = len(full_text) - 1
word_ranges.append((start_pos, end_pos, entry))
# Step 2: 메타데이터 제거 + 포지션 재매핑
meta_ranges = []
i = 0
while i < len(full_text):
if full_text[i] == "[":
start = i
while i < len(full_text) and full_text[i] != "]":
i += 1
meta_ranges.append((start, i + 1))
i += 1
clean_text = ""
new_to_old = {} # 클린 포지션 -> 원본 포지션
for old_pos, char in enumerate(full_text):
in_meta = any(s <= old_pos < e for s, e in meta_ranges)
if not in_meta:
new_to_old[len(clean_text)] = old_pos
clean_text += char
# Step 3: 포지션으로 word 찾기
def get_word_at(old_pos: int):
for start, end, entry in word_ranges:
if start <= old_pos <= end:
return entry
return None
# Step 4: 문장 매칭
def normalize(text):
return "".join(c for c in text if c not in " \n\t-")
norm_clean = normalize(clean_text)
norm_to_clean = [i for i, c in enumerate(clean_text) if c not in " \n\t-"]
results = []
search_pos = 0
for sentence in sentences:
norm_sentence = normalize(sentence)
found_pos = norm_clean.find(norm_sentence, search_pos)
if found_pos != -1:
clean_start = norm_to_clean[found_pos]
clean_end = norm_to_clean[found_pos + len(norm_sentence) - 1]
old_start = new_to_old[clean_start]
old_end = new_to_old[clean_end]
word_start = get_word_at(old_start)
word_end = get_word_at(old_end)
results.append(
{
"text": sentence,
"start_sec": word_start["startS"],
"end_sec": word_end["endS"],
}
)
search_pos = found_pos + len(norm_sentence)
else:
results.append({"text": sentence, "start_sec": None, "end_sec": None})
return results