update main
parent
ee6069e5d5
commit
da59f3d6e3
|
|
@ -333,7 +333,7 @@ GET /song/status/abc123...
|
|||
},
|
||||
)
|
||||
async def get_song_status(
|
||||
song_id: str,
|
||||
suno_task_id: str,
|
||||
background_tasks: BackgroundTasks,
|
||||
session: AsyncSession = Depends(get_session),
|
||||
) -> PollingSongResponse:
|
||||
|
|
@ -343,13 +343,13 @@ async def get_song_status(
|
|||
Azure Blob Storage에 업로드한 뒤 Song 테이블의 status를 completed로,
|
||||
song_result_url을 Blob URL로 업데이트합니다.
|
||||
"""
|
||||
logger.info(f"[get_song_status] START - song_id: {song_id}")
|
||||
logger.info(f"[get_song_status] START - song_id: {suno_task_id}")
|
||||
try:
|
||||
suno_service = SunoService()
|
||||
result = await suno_service.get_task_status(song_id)
|
||||
logger.debug(f"[get_song_status] Suno API raw response - song_id: {song_id}, result: {result}")
|
||||
result = await suno_service.get_task_status(suno_task_id)
|
||||
logger.debug(f"[get_song_status] Suno API raw response - song_id: {suno_task_id}, result: {result}")
|
||||
parsed_response = suno_service.parse_status_response(result)
|
||||
logger.info(f"[get_song_status] Suno API response - song_id: {song_id}, status: {parsed_response.status}")
|
||||
logger.info(f"[get_song_status] Suno API response - song_id: {suno_task_id}, status: {parsed_response.status}")
|
||||
|
||||
# SUCCESS 상태인 경우 백그라운드에서 MP3 다운로드 및 Blob 업로드 진행
|
||||
if parsed_response.status == "SUCCESS" and result:
|
||||
|
|
@ -369,11 +369,38 @@ async def get_song_status(
|
|||
# song_id로 Song 조회
|
||||
song_result = await session.execute(
|
||||
select(Song)
|
||||
.where(Song.suno_task_id == song_id)
|
||||
.where(Song.suno_task_id == suno_task_id)
|
||||
.order_by(Song.created_at.desc())
|
||||
.limit(1)
|
||||
)
|
||||
song = song_result.scalar_one_or_none()
|
||||
|
||||
suno_audio_id = first_clip.id
|
||||
word_data = await suno_service.get_lyric_timestamp(suno_task_id, suno_audio_id)
|
||||
lyric_result = await session.execute(
|
||||
select(Lyric)
|
||||
.where(Lyric.task_id == song.task_id)
|
||||
.order_by(Lyric.created_at.desc())
|
||||
.limit(1)
|
||||
)
|
||||
lyric = lyric_result.scalar_one_or_none()
|
||||
gt_lyric = lyric.lyric_result
|
||||
lyric_line_list = gt_lyric.split("\n")
|
||||
sentences = [lyric_line.strip(',. ') for lyric_line in lyric_line_list if lyric_line and lyric_line != "---"]
|
||||
|
||||
timestamped_lyrics = await suno_service.align_lyrics(word_data, sentences)
|
||||
# TODO : DB upload timestamped_lyrics
|
||||
|
||||
if song and song.status != "completed":
|
||||
# 첫 번째 클립의 audio_url과 duration을 직접 DB에 저장
|
||||
song.status = "completed"
|
||||
song.song_result_url = audio_url
|
||||
if clip_duration is not None:
|
||||
song.duration = clip_duration
|
||||
await session.commit()
|
||||
logger.info(f"[get_song_status] Song updated - suno_task_id: {suno_task_id}, status: completed, song_result_url: {audio_url}, duration: {clip_duration}")
|
||||
elif song and song.status == "completed":
|
||||
logger.info(f"[get_song_status] SKIPPED - Song already completed, suno_task_id: {suno_task_id}")
|
||||
|
||||
# processing 상태인 경우에만 백그라운드 태스크 실행 (중복 방지)
|
||||
if song and song.status == "processing":
|
||||
|
|
@ -388,24 +415,24 @@ async def get_song_status(
|
|||
song.status = "uploading"
|
||||
song.suno_audio_id = first_clip.get('id')
|
||||
await session.commit()
|
||||
logger.info(f"[get_song_status] Song status changed to uploading - song_id: {song_id}")
|
||||
logger.info(f"[get_song_status] Song status changed to uploading - song_id: {suno_task_id}")
|
||||
|
||||
# 백그라운드 태스크로 MP3 다운로드 및 Blob 업로드 실행
|
||||
background_tasks.add_task(
|
||||
download_and_upload_song_by_suno_task_id,
|
||||
suno_task_id=song_id,
|
||||
suno_task_id=suno_task_id,
|
||||
audio_url=audio_url,
|
||||
store_name=store_name,
|
||||
duration=clip_duration,
|
||||
)
|
||||
logger.info(f"[get_song_status] Background task scheduled - song_id: {song_id}, store_name: {store_name}")
|
||||
logger.info(f"[get_song_status] Background task scheduled - song_id: {suno_task_id}, store_name: {store_name}")
|
||||
|
||||
elif song and song.status == "uploading":
|
||||
logger.info(f"[get_song_status] SKIPPED - Song is already uploading, song_id: {song_id}")
|
||||
logger.info(f"[get_song_status] SKIPPED - Song is already uploading, song_id: {suno_task_id}")
|
||||
elif song and song.status == "completed":
|
||||
logger.info(f"[get_song_status] SKIPPED - Song already completed, song_id: {song_id}")
|
||||
logger.info(f"[get_song_status] SKIPPED - Song already completed, song_id: {suno_task_id}")
|
||||
|
||||
logger.info(f"[get_song_status] SUCCESS - song_id: {song_id}")
|
||||
logger.info(f"[get_song_status] SUCCESS - song_id: {suno_task_id}")
|
||||
return parsed_response
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -179,6 +179,37 @@ class SunoService:
|
|||
raise ValueError("Suno API returned empty response for task status")
|
||||
|
||||
return data
|
||||
|
||||
async def get_lyric_timestamp(self, task_id: str, audio_id: str) -> dict[str, Any]:
|
||||
"""
|
||||
음악 타임스탬프 정보 추출
|
||||
|
||||
Args:
|
||||
task_id: generate()에서 반환된 작업 ID
|
||||
audio_id: 사용할 audio id
|
||||
|
||||
Returns:
|
||||
data.alignedWords: 수노 가사 input - startS endS 시간 데이터 매핑
|
||||
"""
|
||||
|
||||
payload = {
|
||||
"task_id" : task_id,
|
||||
"audio_id" : audio_id
|
||||
}
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{self.BASE_URL}/generate/get-timestamped-lyrics",
|
||||
headers=self.headers,
|
||||
json = payload,
|
||||
timeout=30.0,
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data is None:
|
||||
raise ValueError("Suno API returned empty response for task status")
|
||||
|
||||
return data['alignedWords']
|
||||
|
||||
def parse_status_response(self, result: dict | None) -> PollingSongResponse:
|
||||
"""Suno API 상태 응답을 파싱하여 PollingSongResponse로 변환합니다.
|
||||
|
|
@ -253,3 +284,83 @@ class SunoService:
|
|||
message=status_messages.get(status, f"상태: {status}"),
|
||||
error_message=None,
|
||||
)
|
||||
|
||||
def align_lyrics(self, word_data: list[dict], sentences: list[str]) -> list[dict]:
|
||||
"""
|
||||
word의 시작/끝 포지션만 저장하고, 시간은 word에서 참조
|
||||
"""
|
||||
|
||||
# Step 1: 전체 텍스트 + word별 포지션 범위 저장
|
||||
full_text = ""
|
||||
word_ranges = [] # [(start_pos, end_pos, entry), ...]
|
||||
|
||||
for entry in word_data:
|
||||
word = entry['word']
|
||||
start_pos = len(full_text)
|
||||
full_text += word
|
||||
end_pos = len(full_text) - 1
|
||||
|
||||
word_ranges.append((start_pos, end_pos, entry))
|
||||
|
||||
# Step 2: 메타데이터 제거 + 포지션 재매핑
|
||||
meta_ranges = []
|
||||
i = 0
|
||||
while i < len(full_text):
|
||||
if full_text[i] == '[':
|
||||
start = i
|
||||
while i < len(full_text) and full_text[i] != ']':
|
||||
i += 1
|
||||
meta_ranges.append((start, i + 1))
|
||||
i += 1
|
||||
|
||||
clean_text = ""
|
||||
new_to_old = {} # 클린 포지션 -> 원본 포지션
|
||||
|
||||
for old_pos, char in enumerate(full_text):
|
||||
in_meta = any(s <= old_pos < e for s, e in meta_ranges)
|
||||
if not in_meta:
|
||||
new_to_old[len(clean_text)] = old_pos
|
||||
clean_text += char
|
||||
|
||||
# Step 3: 포지션으로 word 찾기
|
||||
def get_word_at(old_pos: int):
|
||||
for start, end, entry in word_ranges:
|
||||
if start <= old_pos <= end:
|
||||
return entry
|
||||
return None
|
||||
|
||||
# Step 4: 문장 매칭
|
||||
def normalize(text):
|
||||
return ''.join(c for c in text if c not in ' \n\t-')
|
||||
|
||||
norm_clean = normalize(clean_text)
|
||||
norm_to_clean = [i for i, c in enumerate(clean_text) if c not in ' \n\t-']
|
||||
|
||||
results = []
|
||||
search_pos = 0
|
||||
|
||||
for sentence in sentences:
|
||||
norm_sentence = normalize(sentence)
|
||||
found_pos = norm_clean.find(norm_sentence, search_pos)
|
||||
|
||||
if found_pos != -1:
|
||||
clean_start = norm_to_clean[found_pos]
|
||||
clean_end = norm_to_clean[found_pos + len(norm_sentence) - 1]
|
||||
|
||||
old_start = new_to_old[clean_start]
|
||||
old_end = new_to_old[clean_end]
|
||||
|
||||
word_start = get_word_at(old_start)
|
||||
word_end = get_word_at(old_end)
|
||||
|
||||
results.append({
|
||||
'text': sentence,
|
||||
'startS': word_start['startS'],
|
||||
'endS': word_end['endS'],
|
||||
})
|
||||
|
||||
search_pos = found_pos + len(norm_sentence)
|
||||
else:
|
||||
results.append({'text': sentence, 'startS': None, 'endS': None})
|
||||
|
||||
return results
|
||||
File diff suppressed because it is too large
Load Diff
|
|
@ -1,42 +0,0 @@
|
|||
from openai import OpenAI
|
||||
from difflib import SequenceMatcher
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Tuple
|
||||
import aiohttp, json
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimestampedLyric:
|
||||
text: str
|
||||
start: float
|
||||
end: float
|
||||
|
||||
SUNO_BASE_URL="https://api.sunoapi.org"
|
||||
SUNO_TIMESTAMP_ROUTE = "/api/v1/generate/get-timestamped-lyrics"
|
||||
SUNO_DETAIL_ROUTE = "/api/v1/generate/record-info"
|
||||
|
||||
class LyricTimestampMapper:
|
||||
suno_api_key : str
|
||||
def __init__(self, suno_api_key):
|
||||
self.suno_api_key = suno_api_key
|
||||
|
||||
async def get_suno_audio_id_from_task_id(self, suno_task_id): # expire if db save audio id
|
||||
url = f"{SUNO_BASE_URL}{SUNO_DETAIL_ROUTE}"
|
||||
headers = {"Authorization": f"Bearer {self.suno_api_key}"}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, headers=headers, params={"taskId" : suno_task_id}) as response:
|
||||
detail = await response.json()
|
||||
result = detail['data']['response']['sunoData'][0]['id']
|
||||
return result
|
||||
|
||||
async def get_suno_timestamp(self, suno_task_id, suno_audio_id): # expire if db save audio id
|
||||
url = f"{SUNO_BASE_URL}{SUNO_TIMESTAMP_ROUTE}"
|
||||
headers = {"Authorization": f"Bearer {self.suno_api_key}"}
|
||||
payload = {
|
||||
"task_id" : suno_task_id,
|
||||
"audio_id" : suno_audio_id
|
||||
}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, headers=headers, data=json.dumps(payload)) as response:
|
||||
result = await response.json()
|
||||
return result
|
||||
|
|
@ -1,19 +0,0 @@
|
|||
from lyric_timestamp_mapper import LyricTimestampMapper
|
||||
|
||||
API_KEY = "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"
|
||||
AUDIO_PATH = "test_audio.mp3"
|
||||
|
||||
GROUND_TRUTH_LYRICS = [
|
||||
"첫 번째 가사 라인입니다",
|
||||
"두 번째 가사 라인입니다",
|
||||
"세 번째 가사 라인입니다",
|
||||
]
|
||||
|
||||
mapper = LyricTimestampMapper(api_key=API_KEY)
|
||||
result = mapper.map_ground_truth(AUDIO_PATH, GROUND_TRUTH_LYRICS)
|
||||
|
||||
for lyric in result:
|
||||
if lyric.start >= 0:
|
||||
print(f"[{lyric.start:.2f} - {lyric.end:.2f}] {lyric.text}")
|
||||
else:
|
||||
print(f"[매칭 실패] {lyric.text}")
|
||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue