519 lines
19 KiB
Python
519 lines
19 KiB
Python
"""
|
|
Song API Router
|
|
|
|
이 모듈은 Suno API를 통한 노래 생성 관련 API 엔드포인트를 정의합니다.
|
|
|
|
엔드포인트 목록:
|
|
- POST /song/generate/{task_id}: 노래 생성 요청 (task_id로 Project/Lyric 연결)
|
|
- GET /song/status/{suno_task_id}: Suno API 노래 생성 상태 조회
|
|
- GET /song/download/{task_id}: 노래 다운로드 상태 조회 (DB polling)
|
|
|
|
사용 예시:
|
|
from app.song.api.routers.v1.song import router
|
|
app.include_router(router, prefix="/api/v1")
|
|
"""
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.database.session import get_session
|
|
from app.dependencies.pagination import (
|
|
PaginationParams,
|
|
get_pagination_params,
|
|
)
|
|
from app.home.models import Project
|
|
from app.lyric.models import Lyric
|
|
from app.song.models import Song
|
|
from app.song.schemas.song_schema import (
|
|
DownloadSongResponse,
|
|
GenerateSongRequest,
|
|
GenerateSongResponse,
|
|
PollingSongResponse,
|
|
SongListItem,
|
|
)
|
|
from app.song.worker.song_task import download_and_upload_song_to_blob
|
|
from app.utils.pagination import PaginatedResponse
|
|
from app.utils.suno import SunoService
|
|
|
|
|
|
router = APIRouter(prefix="/song", tags=["song"])
|
|
|
|
|
|
@router.post(
|
|
"/generate/{task_id}",
|
|
summary="노래 생성 요청",
|
|
description="""
|
|
Suno API를 통해 노래 생성을 요청합니다.
|
|
|
|
## 경로 파라미터
|
|
- **task_id**: Project/Lyric의 task_id (필수) - 연관된 프로젝트와 가사를 조회하는 데 사용
|
|
|
|
## 요청 필드
|
|
- **lyrics**: 노래에 사용할 가사 (필수)
|
|
- **genre**: 음악 장르 (필수) - K-Pop, Pop, R&B, Hip-Hop, Ballad, EDM, Rock, Jazz 등
|
|
- **language**: 노래 언어 (선택, 기본값: Korean)
|
|
|
|
## 반환 정보
|
|
- **success**: 요청 성공 여부
|
|
- **task_id**: 내부 작업 ID (Project/Lyric task_id)
|
|
- **suno_task_id**: Suno API 작업 ID (상태 조회에 사용)
|
|
- **message**: 응답 메시지
|
|
|
|
## 사용 예시
|
|
```
|
|
POST /song/generate/019123ab-cdef-7890-abcd-ef1234567890
|
|
{
|
|
"lyrics": "여기 군산에서 만나요\\n아름다운 하루를 함께",
|
|
"genre": "K-Pop",
|
|
"language": "Korean"
|
|
}
|
|
```
|
|
|
|
## 참고
|
|
- 생성되는 노래는 약 1분 이내 길이입니다.
|
|
- suno_task_id를 사용하여 /status/{suno_task_id} 엔드포인트에서 생성 상태를 확인할 수 있습니다.
|
|
- Song 테이블에 데이터가 저장되며, project_id와 lyric_id가 자동으로 연결됩니다.
|
|
""",
|
|
response_model=GenerateSongResponse,
|
|
responses={
|
|
200: {"description": "노래 생성 요청 성공"},
|
|
404: {"description": "Project 또는 Lyric을 찾을 수 없음"},
|
|
500: {"description": "노래 생성 요청 실패"},
|
|
},
|
|
)
|
|
async def generate_song(
|
|
task_id: str,
|
|
request_body: GenerateSongRequest,
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> GenerateSongResponse:
|
|
"""가사와 장르를 기반으로 Suno API를 통해 노래를 생성합니다.
|
|
|
|
1. task_id로 Project와 Lyric 조회
|
|
2. Song 테이블에 초기 데이터 저장 (status: processing)
|
|
3. Suno API 호출
|
|
4. suno_task_id 업데이트 후 응답 반환
|
|
"""
|
|
print(f"[generate_song] START - task_id: {task_id}, genre: {request_body.genre}, language: {request_body.language}")
|
|
try:
|
|
# 1. task_id로 Project 조회
|
|
project_result = await session.execute(
|
|
select(Project).where(Project.task_id == task_id)
|
|
)
|
|
project = project_result.scalar_one_or_none()
|
|
|
|
if not project:
|
|
print(f"[generate_song] Project NOT FOUND - task_id: {task_id}")
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.",
|
|
)
|
|
print(f"[generate_song] Project found - project_id: {project.id}, task_id: {task_id}")
|
|
|
|
# 2. task_id로 Lyric 조회
|
|
lyric_result = await session.execute(
|
|
select(Lyric).where(Lyric.task_id == task_id)
|
|
)
|
|
lyric = lyric_result.scalar_one_or_none()
|
|
|
|
if not lyric:
|
|
print(f"[generate_song] Lyric NOT FOUND - task_id: {task_id}")
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.",
|
|
)
|
|
print(f"[generate_song] Lyric found - lyric_id: {lyric.id}, task_id: {task_id}")
|
|
|
|
# 3. Song 테이블에 초기 데이터 저장
|
|
song_prompt = (
|
|
f"[Lyrics]\n{request_body.lyrics}\n\n[Genre]\n{request_body.genre}"
|
|
)
|
|
|
|
song = Song(
|
|
project_id=project.id,
|
|
lyric_id=lyric.id,
|
|
task_id=task_id,
|
|
suno_task_id=None,
|
|
status="processing",
|
|
song_prompt=song_prompt,
|
|
language=request_body.language,
|
|
)
|
|
session.add(song)
|
|
await session.flush() # ID 생성을 위해 flush
|
|
print(f"[generate_song] Song saved (processing) - task_id: {task_id}")
|
|
|
|
# 4. Suno API 호출
|
|
print(f"[generate_song] Suno API generation started - task_id: {task_id}")
|
|
suno_service = SunoService()
|
|
suno_task_id = await suno_service.generate(
|
|
prompt=request_body.lyrics,
|
|
genre=request_body.genre,
|
|
)
|
|
|
|
# 5. suno_task_id 업데이트
|
|
song.suno_task_id = suno_task_id
|
|
await session.commit()
|
|
print(f"[generate_song] SUCCESS - task_id: {task_id}, suno_task_id: {suno_task_id}")
|
|
|
|
return GenerateSongResponse(
|
|
success=True,
|
|
task_id=task_id,
|
|
suno_task_id=suno_task_id,
|
|
message="노래 생성 요청이 접수되었습니다. suno_task_id로 상태를 조회하세요.",
|
|
error_message=None,
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
print(f"[generate_song] EXCEPTION - task_id: {task_id}, error: {e}")
|
|
await session.rollback()
|
|
return GenerateSongResponse(
|
|
success=False,
|
|
task_id=task_id,
|
|
suno_task_id=None,
|
|
message="노래 생성 요청에 실패했습니다.",
|
|
error_message=str(e),
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/status/{suno_task_id}",
|
|
summary="노래 생성 상태 조회",
|
|
description="""
|
|
Suno API를 통해 노래 생성 작업의 상태를 조회합니다.
|
|
SUCCESS 상태인 경우 백그라운드에서 MP3 파일을 다운로드하고 Azure Blob Storage에 업로드한 뒤 Song 테이블을 업데이트합니다.
|
|
|
|
## 경로 파라미터
|
|
- **suno_task_id**: 노래 생성 시 반환된 Suno API 작업 ID (필수)
|
|
|
|
## 반환 정보
|
|
- **success**: 조회 성공 여부
|
|
- **status**: 작업 상태 (PENDING, processing, SUCCESS, failed)
|
|
- **message**: 상태 메시지
|
|
- **clips**: 생성된 노래 클립 목록 (완료 시)
|
|
- **raw_response**: Suno API 원본 응답
|
|
|
|
## 사용 예시
|
|
```
|
|
GET /song/status/abc123...
|
|
```
|
|
|
|
## 상태 값
|
|
- **PENDING**: 대기 중
|
|
- **processing**: 생성 중
|
|
- **SUCCESS**: 생성 완료
|
|
- **failed**: 생성 실패
|
|
|
|
## 참고
|
|
- 스트림 URL: 30-40초 내 생성
|
|
- 다운로드 URL: 2-3분 내 생성
|
|
- SUCCESS 시 백그라운드에서 MP3 다운로드 → Azure Blob Storage 업로드 → Song 테이블 업데이트 진행
|
|
- 저장 경로: Azure Blob Storage ({BASE_URL}/{task_id}/song/{store_name}.mp3)
|
|
- Song 테이블의 song_result_url에 Blob URL이 저장됩니다
|
|
""",
|
|
response_model=PollingSongResponse,
|
|
responses={
|
|
200: {"description": "상태 조회 성공"},
|
|
500: {"description": "상태 조회 실패"},
|
|
},
|
|
)
|
|
async def get_song_status(
|
|
suno_task_id: str,
|
|
background_tasks: BackgroundTasks,
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> PollingSongResponse:
|
|
"""suno_task_id로 노래 생성 작업의 상태를 조회합니다.
|
|
|
|
SUCCESS 상태인 경우 백그라운드에서 MP3 파일을 다운로드하고
|
|
Azure Blob Storage에 업로드한 뒤 Song 테이블의 status를 completed로,
|
|
song_result_url을 Blob URL로 업데이트합니다.
|
|
"""
|
|
print(f"[get_song_status] START - suno_task_id: {suno_task_id}")
|
|
try:
|
|
suno_service = SunoService()
|
|
result = await suno_service.get_task_status(suno_task_id)
|
|
parsed_response = suno_service.parse_status_response(result)
|
|
print(f"[get_song_status] Suno API response - suno_task_id: {suno_task_id}, status: {parsed_response.status}")
|
|
|
|
# SUCCESS 상태인 경우 백그라운드 태스크 실행
|
|
if parsed_response.status == "SUCCESS" and parsed_response.clips:
|
|
# 첫 번째 클립의 audioUrl 가져오기
|
|
first_clip = parsed_response.clips[0]
|
|
audio_url = first_clip.audio_url
|
|
|
|
if audio_url:
|
|
# suno_task_id로 Song 조회하여 task_id 가져오기 (여러 개 있을 경우 가장 최근 것 선택)
|
|
song_result = await session.execute(
|
|
select(Song)
|
|
.where(Song.suno_task_id == suno_task_id)
|
|
.order_by(Song.created_at.desc())
|
|
.limit(1)
|
|
)
|
|
song = song_result.scalar_one_or_none()
|
|
|
|
if song:
|
|
# task_id로 Project 조회하여 store_name 가져오기
|
|
project_result = await session.execute(
|
|
select(Project).where(Project.id == song.project_id)
|
|
)
|
|
project = project_result.scalar_one_or_none()
|
|
|
|
store_name = project.store_name if project else "song"
|
|
|
|
# 백그라운드 태스크로 MP3 다운로드 및 Blob 업로드, DB 업데이트
|
|
print(f"[get_song_status] Background task args - task_id: {song.task_id}, audio_url: {audio_url}, store_name: {store_name}")
|
|
background_tasks.add_task(
|
|
download_and_upload_song_to_blob,
|
|
task_id=song.task_id,
|
|
audio_url=audio_url,
|
|
store_name=store_name,
|
|
)
|
|
|
|
print(f"[get_song_status] SUCCESS - suno_task_id: {suno_task_id}")
|
|
return parsed_response
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
print(f"[get_song_status] EXCEPTION - suno_task_id: {suno_task_id}, error: {e}")
|
|
return PollingSongResponse(
|
|
success=False,
|
|
status="error",
|
|
message="상태 조회에 실패했습니다.",
|
|
clips=None,
|
|
raw_response=None,
|
|
error_message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}",
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/download/{task_id}",
|
|
summary="노래 다운로드 상태 조회",
|
|
description="""
|
|
task_id를 기반으로 Song 테이블의 상태를 polling하고,
|
|
completed인 경우 Project 정보와 노래 URL을 반환합니다.
|
|
|
|
## 경로 파라미터
|
|
- **task_id**: 프로젝트 task_id (필수)
|
|
|
|
## 반환 정보
|
|
- **success**: 조회 성공 여부
|
|
- **status**: 처리 상태 (processing, completed, failed, not_found)
|
|
- **message**: 응답 메시지
|
|
- **store_name**: 업체명
|
|
- **region**: 지역명
|
|
- **detail_region_info**: 상세 지역 정보
|
|
- **task_id**: 작업 고유 식별자
|
|
- **language**: 언어
|
|
- **song_result_url**: 노래 결과 URL (completed 시, Azure Blob Storage URL)
|
|
- **created_at**: 생성 일시
|
|
|
|
## 사용 예시
|
|
```
|
|
GET /song/download/019123ab-cdef-7890-abcd-ef1234567890
|
|
```
|
|
|
|
## 참고
|
|
- processing 상태인 경우 song_result_url은 null입니다.
|
|
- completed 상태인 경우 Project 정보와 함께 song_result_url (Azure Blob URL)을 반환합니다.
|
|
- song_result_url 형식: {AZURE_BLOB_BASE_URL}/{task_id}/song/{store_name}.mp3
|
|
""",
|
|
response_model=DownloadSongResponse,
|
|
responses={
|
|
200: {"description": "조회 성공"},
|
|
404: {"description": "Song을 찾을 수 없음"},
|
|
500: {"description": "조회 실패"},
|
|
},
|
|
)
|
|
async def download_song(
|
|
task_id: str,
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> DownloadSongResponse:
|
|
"""task_id로 Song 상태를 polling하고 completed 시 Project 정보와 노래 URL을 반환합니다."""
|
|
print(f"[download_song] START - task_id: {task_id}")
|
|
try:
|
|
# task_id로 Song 조회 (여러 개 있을 경우 가장 최근 것 선택)
|
|
song_result = await session.execute(
|
|
select(Song)
|
|
.where(Song.task_id == task_id)
|
|
.order_by(Song.created_at.desc())
|
|
.limit(1)
|
|
)
|
|
song = song_result.scalar_one_or_none()
|
|
|
|
if not song:
|
|
print(f"[download_song] Song NOT FOUND - task_id: {task_id}")
|
|
return DownloadSongResponse(
|
|
success=False,
|
|
status="not_found",
|
|
message=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.",
|
|
error_message="Song not found",
|
|
)
|
|
|
|
print(f"[download_song] Song found - task_id: {task_id}, status: {song.status}")
|
|
|
|
# processing 상태인 경우
|
|
if song.status == "processing":
|
|
print(f"[download_song] PROCESSING - task_id: {task_id}")
|
|
return DownloadSongResponse(
|
|
success=True,
|
|
status="processing",
|
|
message="노래 생성이 진행 중입니다.",
|
|
task_id=task_id,
|
|
)
|
|
|
|
# failed 상태인 경우
|
|
if song.status == "failed":
|
|
print(f"[download_song] FAILED - task_id: {task_id}")
|
|
return DownloadSongResponse(
|
|
success=False,
|
|
status="failed",
|
|
message="노래 생성에 실패했습니다.",
|
|
task_id=task_id,
|
|
error_message="Song generation failed",
|
|
)
|
|
|
|
# completed 상태인 경우 - Project 정보 조회
|
|
project_result = await session.execute(
|
|
select(Project).where(Project.id == song.project_id)
|
|
)
|
|
project = project_result.scalar_one_or_none()
|
|
|
|
print(f"[download_song] COMPLETED - task_id: {task_id}, song_result_url: {song.song_result_url}")
|
|
return DownloadSongResponse(
|
|
success=True,
|
|
status="completed",
|
|
message="노래 다운로드가 완료되었습니다.",
|
|
store_name=project.store_name if project else None,
|
|
region=project.region if project else None,
|
|
detail_region_info=project.detail_region_info if project else None,
|
|
task_id=task_id,
|
|
language=project.language if project else None,
|
|
song_result_url=song.song_result_url,
|
|
created_at=song.created_at,
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"[download_song] EXCEPTION - task_id: {task_id}, error: {e}")
|
|
return DownloadSongResponse(
|
|
success=False,
|
|
status="error",
|
|
message="노래 다운로드 조회에 실패했습니다.",
|
|
error_message=str(e),
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"s/",
|
|
summary="생성된 노래 목록 조회",
|
|
description="""
|
|
완료된 노래 목록을 페이지네이션하여 조회합니다.
|
|
|
|
## 쿼리 파라미터
|
|
- **page**: 페이지 번호 (1부터 시작, 기본값: 1)
|
|
- **page_size**: 페이지당 데이터 수 (기본값: 10, 최대: 100)
|
|
|
|
## 반환 정보
|
|
- **items**: 노래 목록 (store_name, region, task_id, language, song_result_url, created_at)
|
|
- **total**: 전체 데이터 수
|
|
- **page**: 현재 페이지
|
|
- **page_size**: 페이지당 데이터 수
|
|
- **total_pages**: 전체 페이지 수
|
|
- **has_next**: 다음 페이지 존재 여부
|
|
- **has_prev**: 이전 페이지 존재 여부
|
|
|
|
## 사용 예시
|
|
```
|
|
GET /songs/?page=1&page_size=10
|
|
```
|
|
|
|
## 참고
|
|
- status가 'completed'인 노래만 반환됩니다.
|
|
- created_at 기준 내림차순 정렬됩니다.
|
|
""",
|
|
response_model=PaginatedResponse[SongListItem],
|
|
responses={
|
|
200: {"description": "노래 목록 조회 성공"},
|
|
500: {"description": "조회 실패"},
|
|
},
|
|
)
|
|
async def get_songs(
|
|
session: AsyncSession = Depends(get_session),
|
|
pagination: PaginationParams = Depends(get_pagination_params),
|
|
) -> PaginatedResponse[SongListItem]:
|
|
"""완료된 노래 목록을 페이지네이션하여 반환합니다."""
|
|
print(f"[get_songs] START - page: {pagination.page}, page_size: {pagination.page_size}")
|
|
try:
|
|
offset = (pagination.page - 1) * pagination.page_size
|
|
|
|
# 서브쿼리: task_id별 최신 Song의 id 조회 (completed 상태만)
|
|
subquery = (
|
|
select(func.max(Song.id).label("max_id"))
|
|
.where(Song.status == "completed")
|
|
.group_by(Song.task_id)
|
|
.subquery()
|
|
)
|
|
|
|
# 전체 개수 조회 (task_id별 최신 1개만)
|
|
count_query = select(func.count()).select_from(subquery)
|
|
total_result = await session.execute(count_query)
|
|
total = total_result.scalar() or 0
|
|
|
|
# 데이터 조회 (completed 상태, task_id별 최신 1개만, 최신순)
|
|
query = (
|
|
select(Song)
|
|
.where(Song.id.in_(select(subquery.c.max_id)))
|
|
.order_by(Song.created_at.desc())
|
|
.offset(offset)
|
|
.limit(pagination.page_size)
|
|
)
|
|
result = await session.execute(query)
|
|
songs = result.scalars().all()
|
|
|
|
# Project 정보와 함께 SongListItem으로 변환
|
|
items = []
|
|
for song in songs:
|
|
# Project 조회 (song.project_id 직접 사용)
|
|
project_result = await session.execute(
|
|
select(Project).where(Project.id == song.project_id)
|
|
)
|
|
project = project_result.scalar_one_or_none()
|
|
|
|
item = SongListItem(
|
|
store_name=project.store_name if project else None,
|
|
region=project.region if project else None,
|
|
task_id=song.task_id,
|
|
language=song.language,
|
|
song_result_url=song.song_result_url,
|
|
created_at=song.created_at,
|
|
)
|
|
items.append(item)
|
|
|
|
# 개별 아이템 로그
|
|
print(
|
|
f"[get_songs] Item - store_name: {item.store_name}, region: {item.region}, "
|
|
f"task_id: {item.task_id}, language: {item.language}, "
|
|
f"song_result_url: {item.song_result_url}, created_at: {item.created_at}"
|
|
)
|
|
|
|
response = PaginatedResponse.create(
|
|
items=items,
|
|
total=total,
|
|
page=pagination.page,
|
|
page_size=pagination.page_size,
|
|
)
|
|
|
|
print(
|
|
f"[get_songs] SUCCESS - total: {total}, page: {pagination.page}, "
|
|
f"page_size: {pagination.page_size}, items_count: {len(items)}"
|
|
)
|
|
return response
|
|
|
|
except Exception as e:
|
|
print(f"[get_songs] EXCEPTION - error: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"노래 목록 조회에 실패했습니다: {str(e)}",
|
|
)
|