""" Suno API 클라이언트 모듈 API 문서: https://docs.sunoapi.org ## 사용법 ```python from app.utils.suno import SunoService # config에서 자동으로 API 키를 가져옴 suno = SunoService() # 또는 명시적으로 API 키 전달 suno = SunoService(api_key="your_api_key") # 음악 생성 요청 task_id = await suno.generate( prompt="[Verse]\\n오늘도 좋은 하루...", style="K-Pop, Happy, 110 BPM", title="좋은 하루" ) # 상태 확인 (폴링 방식) result = await suno.get_task_status(task_id) # 상태 응답 파싱 parsed = suno.parse_status_response(result) ``` ## 콜백 URL 사용법 generate() 호출 시 callback_url 파라미터를 전달하면 생성 완료 시 해당 URL로 POST 요청이 전송됩니다. 콜백 요청 형식: ```json { "code": 200, "msg": "All generated successfully.", "data": { "callbackType": "complete", "task_id": "작업ID", "data": [ { "id": "clip_id", "audio_url": "https://...", "image_url": "https://...", "title": "곡 제목", "status": "complete" } ] } } ``` 콜백 주의사항: - HTTPS 프로토콜 권장 - 15초 내 응답 필수 - 동일 task_id에 대해 여러 콜백 수신 가능 (멱등성 처리 필요) """ from typing import Any, List, Optional import httpx from config import apikey_settings from app.song.schemas.song_schema import PollingSongResponse, SongClipData class SunoService: """Suno API를 통한 AI 음악 생성 서비스""" BASE_URL = "https://api.sunoapi.org/api/v1" def __init__(self, api_key: str | None = None): """ Args: api_key: Suno API 키 (Bearer token으로 사용) None일 경우 config에서 자동으로 가져옴 """ self.api_key = api_key or apikey_settings.SUNO_API_KEY self.headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } async def generate( self, prompt: str, genre: str | None = None, callback_url: str | None = None, ) -> str: """ 음악 생성 요청 Args: prompt: 가사 (customMode=true일 때 가사로 사용) 1분 이내 길이의 노래에 적합한 가사여야 함 genre: 음악 장르 (예: "K-Pop", "Pop", "R&B", "Hip-Hop", "Ballad", "EDM") None일 경우 style 파라미터를 전송하지 않음 callback_url: 생성 완료 시 알림 받을 URL (None일 경우 config에서 기본값 사용) Returns: task_id: 작업 추적용 ID Note: - 스트림 URL: 30-40초 내 생성 - 다운로드 URL: 2-3분 내 생성 - 생성되는 노래는 약 1분 이내의 길이 """ # 정확히 1분 길이의 노래 생성을 위한 프롬프트 조건 추가 formatted_prompt = f"[Song Duration: Exactly 1 minute - Must be precisely 60 seconds]\n{prompt}" # callback_url이 없으면 config에서 기본값 사용 (Suno API 필수 파라미터) actual_callback_url = callback_url or apikey_settings.SUNO_CALLBACK_URL payload: dict[str, Any] = { "model": "V5", "customMode": True, "instrumental": False, "prompt": formatted_prompt, "callBackUrl": actual_callback_url, } # genre가 있을 때만 style 추가 if genre: payload["style"] = genre async with httpx.AsyncClient() as client: response = await client.post( f"{self.BASE_URL}/generate", headers=self.headers, json=payload, timeout=30.0, ) response.raise_for_status() data = response.json() # 응답: {"code": 200, "msg": "success", "data": {"taskId": "..."}} # API 응답 검증 if data is None: raise ValueError("Suno API returned empty response") if data.get("code") != 200: error_msg = data.get("msg", "Unknown error") raise ValueError(f"Suno API error: {error_msg}") response_data = data.get("data") if response_data is None: raise ValueError(f"Suno API response missing 'data' field: {data}") task_id = response_data.get("taskId") if task_id is None: raise ValueError(f"Suno API response missing 'taskId': {response_data}") return task_id async def get_task_status(self, task_id: str) -> dict[str, Any]: """ 음악 생성 작업 상태 확인 Args: task_id: generate()에서 반환된 작업 ID Returns: 작업 상태 정보 (status, audio_url, image_url 등 포함) Note: 폴링 방식으로 상태 확인 시 사용. 콜백 URL을 사용하면 폴링 없이 결과를 받을 수 있음. """ async with httpx.AsyncClient() as client: response = await client.get( f"{self.BASE_URL}/generate/record-info", headers=self.headers, params={"taskId": task_id}, timeout=30.0, ) response.raise_for_status() data = response.json() if data is None: raise ValueError("Suno API returned empty response for task status") return data async def get_lyric_timestamp(self, task_id: str, audio_id: str) -> dict[str, Any]: """ 음악 타임스탬프 정보 추출 Args: task_id: generate()에서 반환된 작업 ID audio_id: 사용할 audio id Returns: data.alignedWords: 수노 가사 input - startS endS 시간 데이터 매핑 """ payload = { "task_id" : task_id, "audio_id" : audio_id } async with httpx.AsyncClient() as client: response = await client.post( f"{self.BASE_URL}/generate/get-timestamped-lyrics", headers=self.headers, json=payload, timeout=30.0, ) response.raise_for_status() data = response.json() if not data or not data['data']: raise ValueError("Suno API returned empty response for task status") return data['data']['alignedWords'] def parse_status_response(self, result: dict | None) -> PollingSongResponse: """Suno API 상태 응답을 파싱하여 PollingSongResponse로 변환합니다. Args: result: get_task_status()에서 반환된 원본 응답 Returns: PollingSongResponse: 파싱된 상태 응답 Note: 응답 구조: - PENDING 상태: data.response가 null, data.status가 "PENDING" - SUCCESS 상태: data.response.sunoData에 클립 데이터 배열, data.status가 "SUCCESS" """ if result is None: return PollingSongResponse( success=False, status="error", message="Suno API 응답이 비어있습니다.", error_message="Suno API returned None response", ) code = result.get("code", 0) data = result.get("data") or {} if code != 200: return PollingSongResponse( success=False, status="failed", message="Suno API 응답 오류", error_message=result.get("msg", "Unknown error"), ) # status는 data.status에 있음 (PENDING, SUCCESS 등) status = data.get("status", "unknown") # 클립 데이터는 data.response.sunoData에 있음 (camelCase) # PENDING 상태에서는 response가 null response_data = data.get("response") or {} clips_data = response_data.get("sunoData") or [] # 상태별 메시지 status_messages = { "PENDING": "노래 생성 대기 중입니다.", "processing": "노래를 생성하고 있습니다.", "complete": "노래 생성이 완료되었습니다.", "SUCCESS": "노래 생성이 완료되었습니다.", "TEXT_SUCCESS": "노래 생성이 완료되었습니다.", "failed": "노래 생성에 실패했습니다.", } # 클립 데이터 파싱 (Suno API는 camelCase 사용) clips = None if clips_data: clips = [ SongClipData( id=clip.get("id"), audio_url=clip.get("audioUrl"), stream_audio_url=clip.get("streamAudioUrl"), image_url=clip.get("imageUrl"), title=clip.get("title"), status=clip.get("status"), duration=clip.get("duration"), ) for clip in clips_data ] return PollingSongResponse( success=True, status=status, message=status_messages.get(status, f"상태: {status}"), error_message=None, ) def align_lyrics(self, word_data: list[dict], sentences: list[str]) -> list[dict]: """ word의 시작/끝 포지션만 저장하고, 시간은 word에서 참조 """ # Step 1: 전체 텍스트 + word별 포지션 범위 저장 full_text = "" word_ranges = [] # [(start_pos, end_pos, entry), ...] for entry in word_data: word = entry['word'] start_pos = len(full_text) full_text += word end_pos = len(full_text) - 1 word_ranges.append((start_pos, end_pos, entry)) # Step 2: 메타데이터 제거 + 포지션 재매핑 meta_ranges = [] i = 0 while i < len(full_text): if full_text[i] == '[': start = i while i < len(full_text) and full_text[i] != ']': i += 1 meta_ranges.append((start, i + 1)) i += 1 clean_text = "" new_to_old = {} # 클린 포지션 -> 원본 포지션 for old_pos, char in enumerate(full_text): in_meta = any(s <= old_pos < e for s, e in meta_ranges) if not in_meta: new_to_old[len(clean_text)] = old_pos clean_text += char # Step 3: 포지션으로 word 찾기 def get_word_at(old_pos: int): for start, end, entry in word_ranges: if start <= old_pos <= end: return entry return None # Step 4: 문장 매칭 def normalize(text): return ''.join(c for c in text if c not in ' \n\t-') norm_clean = normalize(clean_text) norm_to_clean = [i for i, c in enumerate(clean_text) if c not in ' \n\t-'] results = [] search_pos = 0 for sentence in sentences: norm_sentence = normalize(sentence) found_pos = norm_clean.find(norm_sentence, search_pos) if found_pos != -1: clean_start = norm_to_clean[found_pos] clean_end = norm_to_clean[found_pos + len(norm_sentence) - 1] old_start = new_to_old[clean_start] old_end = new_to_old[clean_end] word_start = get_word_at(old_start) word_end = get_word_at(old_end) results.append({ 'text': sentence, 'start_sec': word_start['startS'], 'end_sec': word_end['endS'], }) search_pos = found_pos + len(norm_sentence) else: results.append({'text': sentence, 'start_sec': None, 'end_sec': None}) return results