o2o-castad-backend/app/video/api/routers/v1/video.py

737 lines
29 KiB
Python

"""
Video API Router
이 모듈은 Creatomate API를 통한 영상 생성 관련 API 엔드포인트를 정의합니다.
엔드포인트 목록:
- POST /video/generate/{task_id}: 영상 생성 요청 (task_id로 Project/Lyric/Song 연결)
- GET /video/status/{creatomate_render_id}: Creatomate API 영상 생성 상태 조회
- GET /video/download/{task_id}: 영상 다운로드 상태 조회 (DB polling)
사용 예시:
from app.video.api.routers.v1.video import router
app.include_router(router)
"""
import json
from typing import Literal
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session
from app.user.dependencies.auth import get_current_user
from app.user.models import User
from app.home.models import Image, Project
from app.lyric.models import Lyric
from app.song.models import Song, SongTimestamp
from app.utils.creatomate import CreatomateService
from app.utils.logger import get_logger
from app.video.models import Video
from app.video.schemas.video_schema import (
DownloadVideoResponse,
GenerateVideoResponse,
PollingVideoResponse,
VideoRenderData,
)
from app.video.worker.video_task import download_and_upload_video_to_blob
from config import creatomate_settings
logger = get_logger("video")
router = APIRouter(prefix="/video", tags=["Video"])
@router.get(
"/generate/{task_id}",
summary="영상 생성 요청",
description="""
Creatomate API를 통해 영상 생성을 요청합니다.
## 인증
**Bearer 토큰 필수** - `Authorization: Bearer {access_token}` 헤더를 포함해야 합니다.
## 경로 파라미터
- **task_id**: Project/Lyric/Song/Image의 task_id (필수) - 연관된 프로젝트, 가사, 노래, 이미지를 조회하는 데 사용
## 쿼리 파라미터
- **orientation**: 영상 방향 (horizontal: 가로형, vertical: 세로형, 기본값: vertical) - 선택
## 자동 조회 정보
- **image_urls**: Image 테이블에서 task_id로 조회 (img_order 순서로 정렬)
- **music_url**: Song 테이블의 song_result_url 사용
- **duration**: Song 테이블의 duration 사용
- **lyrics**: Song 테이블의 song_prompt (가사) 사용
## 반환 정보
- **success**: 요청 성공 여부
- **task_id**: 내부 작업 ID (Project task_id)
- **creatomate_render_id**: Creatomate 렌더 ID (상태 조회에 사용)
- **message**: 응답 메시지
## 사용 예시 (cURL)
```bash
# 세로형 영상 생성 (기본값)
curl -X GET "http://localhost:8000/video/generate/0694b716-dbff-7219-8000-d08cb5fce431" \\
-H "Authorization: Bearer {access_token}"
# 가로형 영상 생성
curl -X GET "http://localhost:8000/video/generate/0694b716-dbff-7219-8000-d08cb5fce431?orientation=horizontal" \\
-H "Authorization: Bearer {access_token}"
```
## 참고
- 이미지는 task_id로 Image 테이블에서 자동 조회됩니다 (img_order 순서).
- 배경 음악(music_url), 영상 길이(duration), 가사(lyrics)는 task_id로 Song 테이블을 조회하여 자동으로 가져옵니다.
- 같은 task_id로 여러 Song이 있을 경우 **가장 최근 생성된 노래**를 사용합니다.
- Song의 song_result_url과 song_prompt가 있어야 영상 생성이 가능합니다.
- creatomate_render_id를 사용하여 /status/{creatomate_render_id} 엔드포인트에서 생성 상태를 확인할 수 있습니다.
- Video 테이블에 데이터가 저장되며, project_id, lyric_id, song_id가 자동으로 연결됩니다.
""",
response_model=GenerateVideoResponse,
responses={
200: {"description": "영상 생성 요청 성공"},
400: {"description": "Song의 음악 URL, 가사(song_prompt) 또는 이미지가 없음"},
401: {"description": "인증 실패 (토큰 없음/만료)"},
404: {"description": "Project, Lyric, Song 또는 Image를 찾을 수 없음"},
500: {"description": "영상 생성 요청 실패"},
},
)
async def generate_video(
task_id: str,
orientation: Literal["horizontal", "vertical"] = Query(
default="vertical",
description="영상 방향 (horizontal: 가로형, vertical: 세로형)",
),
current_user: User = Depends(get_current_user),
) -> GenerateVideoResponse:
"""Creatomate API를 통해 영상을 생성합니다.
1. task_id로 Project, Lyric, Song, Image 순차 조회
2. Video 테이블에 초기 데이터 저장 (status: processing)
3. Creatomate API 호출 (orientation에 따른 템플릿 자동 선택)
4. creatomate_render_id 업데이트 후 응답 반환
Note: 이 함수는 Depends(get_session)을 사용하지 않고 명시적으로 세션을 관리합니다.
외부 API 호출 중 DB 커넥션이 유지되지 않도록 하여 커넥션 타임아웃 문제를 방지합니다.
중요: SQLAlchemy AsyncSession은 단일 세션에서 동시에 여러 쿼리를 실행하는 것을
지원하지 않습니다. asyncio.gather()로 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다.
따라서 쿼리는 순차적으로 실행합니다.
"""
import time
from app.database.session import AsyncSessionLocal
request_start = time.perf_counter()
logger.info(
f"[generate_video] START - task_id: {task_id}, orientation: {orientation}"
)
# ==========================================================================
# 1단계: DB 조회 및 초기 데이터 저장 (세션을 명시적으로 열고 닫음)
# ==========================================================================
# 외부 API 호출 전에 필요한 데이터를 저장할 변수들
project_id: int | None = None
lyric_id: int | None = None
song_id: int | None = None
video_id: int | None = None
music_url: str | None = None
song_duration: float | None = None
lyrics: str | None = None
image_urls: list[str] = []
try:
# 세션을 명시적으로 열고 DB 작업 후 바로 닫음
async with AsyncSessionLocal() as session:
# ===== 순차 쿼리 실행: Project, Lyric, Song, Image =====
# Note: AsyncSession은 동일 세션에서 병렬 쿼리를 지원하지 않음
# Project 조회
project_result = await session.execute(
select(Project)
.where(Project.task_id == task_id)
.order_by(Project.created_at.desc())
.limit(1)
)
# Lyric 조회
lyric_result = await session.execute(
select(Lyric)
.where(Lyric.task_id == task_id)
.order_by(Lyric.created_at.desc())
.limit(1)
)
# Song 조회
song_result = await session.execute(
select(Song)
.where(Song.task_id == task_id)
.order_by(Song.created_at.desc())
.limit(1)
)
# Image 조회
image_result = await session.execute(
select(Image)
.where(Image.task_id == task_id)
.order_by(Image.img_order.asc())
)
query_time = time.perf_counter()
logger.debug(
f"[generate_video] Queries completed - task_id: {task_id}, "
f"elapsed: {(query_time - request_start) * 1000:.1f}ms"
)
# ===== 결과 처리: Project =====
project = project_result.scalar_one_or_none()
if not project:
logger.warning(
f"[generate_video] Project NOT FOUND - task_id: {task_id}"
)
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.",
)
project_id = project.id
store_address = project.detail_region_info
# ===== 결과 처리: Lyric =====
lyric = lyric_result.scalar_one_or_none()
if not lyric:
logger.warning(f"[generate_video] Lyric NOT FOUND - task_id: {task_id}")
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.",
)
lyric_id = lyric.id
lyric_language = lyric.language
# ===== 결과 처리: Song =====
song = song_result.scalar_one_or_none()
if not song:
logger.warning(f"[generate_video] Song NOT FOUND - task_id: {task_id}")
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.",
)
song_id = song.id
music_url = song.song_result_url
song_duration = song.duration
lyrics = song.song_prompt
if not music_url:
raise HTTPException(
status_code=400,
detail=f"Song(id={song_id})의 음악 URL이 없습니다.",
)
if not lyrics:
raise HTTPException(
status_code=400,
detail=f"Song(id={song_id})의 가사(song_prompt)가 없습니다.",
)
# ===== 결과 처리: Image =====
images = image_result.scalars().all()
if not images:
logger.warning(f"[generate_video] Image NOT FOUND - task_id: {task_id}")
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 이미지를 찾을 수 없습니다.",
)
image_urls = [img.img_url for img in images]
logger.info(
f"[generate_video] Data loaded - task_id: {task_id}, "
f"project_id: {project_id}, lyric_id: {lyric_id}, "
f"song_id: {song_id}, images: {len(image_urls)}"
)
# ===== Video 테이블에 초기 데이터 저장 및 커밋 =====
video = Video(
project_id=project_id,
lyric_id=lyric_id,
song_id=song_id,
task_id=task_id,
creatomate_render_id=None,
status="processing",
)
session.add(video)
await session.commit()
video_id = video.id
stage1_time = time.perf_counter()
logger.info(
f"[generate_video] Video saved - task_id: {task_id}, id: {video_id}, "
f"stage1_elapsed: {(stage1_time - request_start) * 1000:.1f}ms"
)
# 세션이 여기서 자동으로 닫힘 (async with 블록 종료)
except HTTPException:
raise
except Exception as e:
logger.error(f"[generate_video] DB EXCEPTION - task_id: {task_id}, error: {e}")
return GenerateVideoResponse(
success=False,
task_id=task_id,
creatomate_render_id=None,
message="영상 생성 요청에 실패했습니다.",
error_message=str(e),
)
# ==========================================================================
# 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음)
# ==========================================================================
stage2_start = time.perf_counter()
try:
logger.info(
f"[generate_video] Stage 2 START - Creatomate API - task_id: {task_id}"
)
creatomate_service = CreatomateService(
orientation=orientation,
target_duration=song_duration,
)
logger.debug(
f"[generate_video] Using template_id: {creatomate_service.template_id}, duration: {creatomate_service.target_duration} (song duration: {song_duration})"
)
# 6-1. 템플릿 조회 (비동기)
template = await creatomate_service.get_one_template_data_async(
creatomate_service.template_id
)
logger.debug(f"[generate_video] Template fetched - task_id: {task_id}")
# 6-2. elements에서 리소스 매핑 생성
modifications = creatomate_service.elements_connect_resource_blackbox(
elements=template["source"]["elements"],
image_url_list=image_urls,
lyric=lyrics,
music_url=music_url,
address=store_address
)
logger.debug(f"[generate_video] Modifications created - task_id: {task_id}")
# 6-3. elements 수정
new_elements = creatomate_service.modify_element(
template["source"]["elements"],
modifications,
)
template["source"]["elements"] = new_elements
logger.debug(f"[generate_video] Elements modified - task_id: {task_id}")
# 6-4. duration 확장
final_template = creatomate_service.extend_template_duration(
template,
creatomate_service.target_duration,
)
logger.debug(
f"[generate_video] Duration extended to {creatomate_service.target_duration}s - task_id: {task_id}"
)
song_timestamp_result = await session.execute(
select(SongTimestamp).where(
SongTimestamp.suno_audio_id == song.suno_audio_id
)
)
song_timestamp_list = song_timestamp_result.scalars().all()
logger.debug(
f"[generate_video] song_timestamp_list count: {len(song_timestamp_list)}"
)
for i, ts in enumerate(song_timestamp_list):
logger.debug(
f"[generate_video] timestamp[{i}]: lyric_line={ts.lyric_line}, start_time={ts.start_time}, end_time={ts.end_time}"
)
match lyric_language:
case "English" :
lyric_font = "Noto Sans"
# lyric_font = "Pretendard" # 없어요
case _ :
lyric_font = "Noto Sans"
# LYRIC AUTO 결정부
if (creatomate_settings.DEBUG_AUTO_LYRIC):
auto_text_template = creatomate_service.get_auto_text_template()
final_template["source"]["elements"].append(creatomate_service.auto_lyric(auto_text_template))
else :
text_template = creatomate_service.get_text_template()
for idx, aligned in enumerate(song_timestamp_list):
caption = creatomate_service.lining_lyric(
text_template,
idx,
aligned.lyric_line,
aligned.start_time,
aligned.end_time,
lyric_font
)
final_template["source"]["elements"].append(caption)
# END - LYRIC AUTO 결정부
# logger.debug(
# f"[generate_video] final_template: {json.dumps(final_template, indent=2, ensure_ascii=False)}"
# )
# 6-5. 커스텀 렌더링 요청 (비동기)
render_response = await creatomate_service.make_creatomate_custom_call_async(
final_template["source"],
)
logger.debug(
f"[generate_video] Creatomate API response - task_id: {task_id}, response: {render_response}"
)
# 렌더 ID 추출
if isinstance(render_response, list) and len(render_response) > 0:
creatomate_render_id = render_response[0].get("id")
elif isinstance(render_response, dict):
creatomate_render_id = render_response.get("id")
else:
creatomate_render_id = None
stage2_time = time.perf_counter()
logger.info(
f"[generate_video] Stage 2 DONE - task_id: {task_id}, "
f"render_id: {creatomate_render_id}, "
f"stage2_elapsed: {(stage2_time - stage2_start) * 1000:.1f}ms"
)
except Exception as e:
logger.error(
f"[generate_video] Creatomate API EXCEPTION - task_id: {task_id}, error: {e}"
)
# 외부 API 실패 시 Video 상태를 failed로 업데이트
from app.database.session import AsyncSessionLocal
async with AsyncSessionLocal() as update_session:
video_result = await update_session.execute(
select(Video).where(Video.id == video_id)
)
video_to_update = video_result.scalar_one_or_none()
if video_to_update:
video_to_update.status = "failed"
await update_session.commit()
return GenerateVideoResponse(
success=False,
task_id=task_id,
creatomate_render_id=None,
message="영상 생성 요청에 실패했습니다.",
error_message=str(e),
)
# ==========================================================================
# 3단계: creatomate_render_id 업데이트 (새 세션으로 빠르게 처리)
# ==========================================================================
stage3_start = time.perf_counter()
logger.info(f"[generate_video] Stage 3 START - DB update - task_id: {task_id}")
try:
from app.database.session import AsyncSessionLocal
async with AsyncSessionLocal() as update_session:
video_result = await update_session.execute(
select(Video).where(Video.id == video_id)
)
video_to_update = video_result.scalar_one_or_none()
if video_to_update:
video_to_update.creatomate_render_id = creatomate_render_id
await update_session.commit()
stage3_time = time.perf_counter()
total_time = stage3_time - request_start
logger.debug(
f"[generate_video] Stage 3 DONE - task_id: {task_id}, "
f"stage3_elapsed: {(stage3_time - stage3_start) * 1000:.1f}ms"
)
logger.info(
f"[generate_video] SUCCESS - task_id: {task_id}, "
f"render_id: {creatomate_render_id}, "
f"total_time: {total_time * 1000:.1f}ms"
)
return GenerateVideoResponse(
success=True,
task_id=task_id,
creatomate_render_id=creatomate_render_id,
message="영상 생성 요청이 접수되었습니다. creatomate_render_id로 상태를 조회하세요.",
error_message=None,
)
except Exception as e:
logger.error(
f"[generate_video] Update EXCEPTION - task_id: {task_id}, error: {e}"
)
return GenerateVideoResponse(
success=False,
task_id=task_id,
creatomate_render_id=creatomate_render_id,
message="영상 생성은 요청되었으나 DB 업데이트에 실패했습니다.",
error_message=str(e),
)
@router.get(
"/status/{creatomate_render_id}",
summary="영상 생성 상태 조회",
description="""
Creatomate API를 통해 영상 생성 작업의 상태를 조회합니다.
succeeded 상태인 경우 백그라운드에서 MP4 파일을 다운로드하고 Video 테이블을 업데이트합니다.
## 인증
**Bearer 토큰 필수** - `Authorization: Bearer {access_token}` 헤더를 포함해야 합니다.
## 경로 파라미터
- **creatomate_render_id**: 영상 생성 시 반환된 Creatomate 렌더 ID (필수)
## 반환 정보
- **success**: 조회 성공 여부
- **status**: 작업 상태 (planned, waiting, rendering, succeeded, failed)
- **message**: 상태 메시지
- **render_data**: 렌더링 결과 데이터 (완료 시)
- **raw_response**: Creatomate API 원본 응답
## 사용 예시 (cURL)
```bash
curl -X GET "http://localhost:8000/video/status/{creatomate_render_id}" \\
-H "Authorization: Bearer {access_token}"
```
## 상태 값
- **planned**: 예약됨
- **waiting**: 대기 중
- **transcribing**: 트랜스크립션 중
- **rendering**: 렌더링 중
- **succeeded**: 성공
- **failed**: 실패
## 참고
- succeeded 시 백그라운드에서 MP4 다운로드 및 DB 업데이트 진행
""",
response_model=PollingVideoResponse,
responses={
200: {"description": "상태 조회 성공"},
401: {"description": "인증 실패 (토큰 없음/만료)"},
500: {"description": "상태 조회 실패"},
},
)
async def get_video_status(
creatomate_render_id: str,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> PollingVideoResponse:
"""creatomate_render_id로 영상 생성 작업의 상태를 조회합니다.
succeeded 상태인 경우 백그라운드에서 MP4 파일을 다운로드하고
Video 테이블의 status를 completed로, result_movie_url을 업데이트합니다.
"""
logger.info(
f"[get_video_status] START - creatomate_render_id: {creatomate_render_id}"
)
try:
creatomate_service = CreatomateService()
result = await creatomate_service.get_render_status_async(creatomate_render_id)
logger.debug(
f"[get_video_status] Creatomate API response - creatomate_render_id: {creatomate_render_id}, status: {result.get('status')}"
)
status = result.get("status", "unknown")
video_url = result.get("url")
# 상태별 메시지 설정
status_messages = {
"planned": "영상 생성이 예약되었습니다.",
"waiting": "영상 생성 대기 중입니다.",
"transcribing": "트랜스크립션 진행 중입니다.",
"rendering": "영상을 렌더링하고 있습니다.",
"succeeded": "영상 생성이 완료되었습니다.",
"failed": "영상 생성에 실패했습니다.",
}
message = status_messages.get(status, f"상태: {status}")
# succeeded 상태인 경우 백그라운드 태스크 실행
if status == "succeeded" and video_url:
# creatomate_render_id로 Video 조회하여 task_id 가져오기
video_result = await session.execute(
select(Video)
.where(Video.creatomate_render_id == creatomate_render_id)
.order_by(Video.created_at.desc())
.limit(1)
)
video = video_result.scalar_one_or_none()
if video and video.status != "completed":
# 이미 완료된 경우 백그라운드 작업 중복 실행 방지
# 백그라운드 태스크로 MP4 다운로드 → Blob 업로드 → DB 업데이트 → 임시 파일 삭제
logger.info(
f"[get_video_status] Background task args - task_id: {video.task_id}, video_url: {video_url}, creatomate_render_id: {creatomate_render_id}"
)
background_tasks.add_task(
download_and_upload_video_to_blob,
task_id=video.task_id,
video_url=video_url,
creatomate_render_id=creatomate_render_id,
user_uuid=current_user.user_uuid,
)
elif video and video.status == "completed":
logger.debug(
f"[get_video_status] SKIPPED - Video already completed, creatomate_render_id: {creatomate_render_id}"
)
render_data = VideoRenderData(
id=result.get("id"),
status=status,
url=video_url,
snapshot_url=result.get("snapshot_url"),
)
logger.info(
f"[get_video_status] SUCCESS - creatomate_render_id: {creatomate_render_id}"
)
return PollingVideoResponse(
success=True,
status=status,
message=message,
render_data=render_data,
raw_response=result,
error_message=None,
)
except Exception as e:
import traceback
logger.error(
f"[get_video_status] EXCEPTION - creatomate_render_id: {creatomate_render_id}, error: {e}"
)
return PollingVideoResponse(
success=False,
status="error",
message="상태 조회에 실패했습니다.",
render_data=None,
raw_response=None,
error_message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}",
)
@router.get(
"/download/{task_id}",
summary="영상 생성 URL 조회",
description="""
task_id를 기반으로 Video 테이블의 상태를 polling하고,
completed인 경우 Project 정보와 영상 URL을 반환합니다.
## 인증
**Bearer 토큰 필수** - `Authorization: Bearer {access_token}` 헤더를 포함해야 합니다.
## 경로 파라미터
- **task_id**: 프로젝트 task_id (필수)
## 반환 정보
- **success**: 조회 성공 여부
- **status**: 처리 상태 (processing, completed, failed)
- **message**: 응답 메시지
- **store_name**: 업체명
- **region**: 지역명
- **task_id**: 작업 고유 식별자
- **result_movie_url**: 영상 결과 URL (completed 시)
- **created_at**: 생성 일시
## 사용 예시 (cURL)
```bash
curl -X GET "http://localhost:8000/video/download/019123ab-cdef-7890-abcd-ef1234567890" \\
-H "Authorization: Bearer {access_token}"
```
## 참고
- processing 상태인 경우 result_movie_url은 null입니다.
- completed 상태인 경우 Project 정보와 함께 result_movie_url을 반환합니다.
""",
response_model=DownloadVideoResponse,
responses={
200: {"description": "조회 성공"},
401: {"description": "인증 실패 (토큰 없음/만료)"},
404: {"description": "Video를 찾을 수 없음"},
500: {"description": "조회 실패"},
},
)
async def download_video(
task_id: str,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> DownloadVideoResponse:
"""task_id로 Video 상태를 polling하고 completed 시 Project 정보와 영상 URL을 반환합니다."""
logger.info(f"[download_video] START - task_id: {task_id}")
try:
# task_id로 Video 조회 (여러 개 있을 경우 가장 최근 것 선택)
video_result = await session.execute(
select(Video)
.where(Video.task_id == task_id)
.order_by(Video.created_at.desc())
.limit(1)
)
video = video_result.scalar_one_or_none()
if not video:
logger.warning(f"[download_video] Video NOT FOUND - task_id: {task_id}")
return DownloadVideoResponse(
success=False,
status="not_found",
message=f"task_id '{task_id}'에 해당하는 Video를 찾을 수 없습니다.",
error_message="Video not found",
)
logger.debug(
f"[download_video] Video found - task_id: {task_id}, status: {video.status}"
)
# processing 상태인 경우
if video.status == "processing":
logger.debug(f"[download_video] PROCESSING - task_id: {task_id}")
return DownloadVideoResponse(
success=True,
status="processing",
message="영상 생성이 진행 중입니다.",
task_id=task_id,
)
# failed 상태인 경우
if video.status == "failed":
logger.error(f"[download_video] FAILED - task_id: {task_id}")
return DownloadVideoResponse(
success=False,
status="failed",
message="영상 생성에 실패했습니다.",
task_id=task_id,
error_message="Video generation failed",
)
# completed 상태인 경우 - Project 정보 조회
project_result = await session.execute(
select(Project).where(Project.id == video.project_id)
)
project = project_result.scalar_one_or_none()
logger.info(
f"[download_video] COMPLETED - task_id: {task_id}, result_movie_url: {video.result_movie_url}"
)
return DownloadVideoResponse(
success=True,
status="completed",
message="영상 다운로드가 완료되었습니다.",
store_name=project.store_name if project else None,
region=project.region if project else None,
task_id=task_id,
result_movie_url=video.result_movie_url,
created_at=video.created_at,
)
except Exception as e:
logger.error(f"[download_video] EXCEPTION - task_id: {task_id}, error: {e}")
return DownloadVideoResponse(
success=False,
status="error",
message="영상 다운로드 조회에 실패했습니다.",
error_message=str(e),
)