""" Video API Router 이 모듈은 Creatomate API를 통한 영상 생성 관련 API 엔드포인트를 정의합니다. 엔드포인트 목록: - POST /video/generate/{task_id}: 영상 생성 요청 (task_id로 Project/Lyric/Song 연결) - GET /video/status/{creatomate_render_id}: Creatomate API 영상 생성 상태 조회 - GET /video/download/{task_id}: 영상 다운로드 상태 조회 (DB polling) - GET /videos/: 완료된 영상 목록 조회 (페이지네이션) 사용 예시: from app.video.api.routers.v1.video import router app.include_router(router, prefix="/api/v1") """ from typing import Literal from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.database.session import get_session from app.dependencies.pagination import ( PaginationParams, get_pagination_params, ) from app.home.models import Image, Project from app.lyric.models import Lyric from app.song.models import Song from app.video.models import Video from app.video.schemas.video_schema import ( DownloadVideoResponse, GenerateVideoResponse, PollingVideoResponse, VideoListItem, VideoRenderData, ) from app.video.worker.video_task import download_and_upload_video_to_blob from app.utils.creatomate import CreatomateService from app.utils.pagination import PaginatedResponse router = APIRouter(prefix="/video", tags=["video"]) @router.get( "/generate/{task_id}", summary="영상 생성 요청", description=""" Creatomate API를 통해 영상 생성을 요청합니다. ## 경로 파라미터 - **task_id**: Project/Lyric/Song/Image의 task_id (필수) - 연관된 프로젝트, 가사, 노래, 이미지를 조회하는 데 사용 ## 쿼리 파라미터 - **orientation**: 영상 방향 (horizontal: 가로형, vertical: 세로형, 기본값: vertical) - 선택 ## 자동 조회 정보 - **image_urls**: Image 테이블에서 task_id로 조회 (img_order 순서로 정렬) - **music_url**: Song 테이블의 song_result_url 사용 - **duration**: Song 테이블의 duration 사용 - **lyrics**: Song 테이블의 song_prompt (가사) 사용 ## 반환 정보 - **success**: 요청 성공 여부 - **task_id**: 내부 작업 ID (Project task_id) - **creatomate_render_id**: Creatomate 렌더 ID (상태 조회에 사용) - **message**: 응답 메시지 ## 사용 예시 ``` GET /video/generate/0694b716-dbff-7219-8000-d08cb5fce431 GET /video/generate/0694b716-dbff-7219-8000-d08cb5fce431?orientation=horizontal ``` ## 참고 - 이미지는 task_id로 Image 테이블에서 자동 조회됩니다 (img_order 순서). - 배경 음악(music_url), 영상 길이(duration), 가사(lyrics)는 task_id로 Song 테이블을 조회하여 자동으로 가져옵니다. - 같은 task_id로 여러 Song이 있을 경우 **가장 최근 생성된 노래**를 사용합니다. - Song의 song_result_url과 song_prompt가 있어야 영상 생성이 가능합니다. - creatomate_render_id를 사용하여 /status/{creatomate_render_id} 엔드포인트에서 생성 상태를 확인할 수 있습니다. - Video 테이블에 데이터가 저장되며, project_id, lyric_id, song_id가 자동으로 연결됩니다. """, response_model=GenerateVideoResponse, responses={ 200: {"description": "영상 생성 요청 성공"}, 400: {"description": "Song의 음악 URL, 가사(song_prompt) 또는 이미지가 없음"}, 404: {"description": "Project, Lyric, Song 또는 Image를 찾을 수 없음"}, 500: {"description": "영상 생성 요청 실패"}, }, ) async def generate_video( task_id: str, orientation: Literal["horizontal", "vertical"] = Query( default="vertical", description="영상 방향 (horizontal: 가로형, vertical: 세로형)", ), session: AsyncSession = Depends(get_session), ) -> GenerateVideoResponse: """Creatomate API를 통해 영상을 생성합니다. 1. task_id로 Project, Lyric, Song, Image 조회 2. Video 테이블에 초기 데이터 저장 (status: processing) 3. Creatomate API 호출 (orientation에 따른 템플릿 자동 선택) 4. creatomate_render_id 업데이트 후 응답 반환 """ print(f"[generate_video] START - task_id: {task_id}, orientation: {orientation}") try: # 1. task_id로 Project 조회 (중복 시 최신 것 선택) project_result = await session.execute( select(Project) .where(Project.task_id == task_id) .order_by(Project.created_at.desc()) .limit(1) ) project = project_result.scalar_one_or_none() if not project: print(f"[generate_video] Project NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.", ) print(f"[generate_video] Project found - project_id: {project.id}, task_id: {task_id}") # 2. task_id로 Lyric 조회 (중복 시 최신 것 선택) lyric_result = await session.execute( select(Lyric) .where(Lyric.task_id == task_id) .order_by(Lyric.created_at.desc()) .limit(1) ) lyric = lyric_result.scalar_one_or_none() if not lyric: print(f"[generate_video] Lyric NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.", ) print(f"[generate_video] Lyric found - lyric_id: {lyric.id}, task_id: {task_id}") # 3. task_id로 Song 조회 (가장 최근 것) song_result = await session.execute( select(Song) .where(Song.task_id == task_id) .order_by(Song.created_at.desc()) .limit(1) ) song = song_result.scalar_one_or_none() if not song: print(f"[generate_video] Song NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.", ) # Song에서 music_url과 duration 가져오기 music_url = song.song_result_url if not music_url: print(f"[generate_video] Song has no result URL - task_id: {task_id}, song_id: {song.id}") raise HTTPException( status_code=400, detail=f"Song(id={song.id})의 음악 URL이 없습니다. 노래 생성이 완료되었는지 확인하세요.", ) # Song에서 가사(song_prompt) 가져오기 lyrics = song.song_prompt if not lyrics: print(f"[generate_video] Song has no lyrics (song_prompt) - task_id: {task_id}, song_id: {song.id}") raise HTTPException( status_code=400, detail=f"Song(id={song.id})의 가사(song_prompt)가 없습니다.", ) print(f"[generate_video] Song found - song_id: {song.id}, task_id: {task_id}, duration: {song.duration}") print(f"[generate_video] Music URL (from DB): {music_url}, Song duration: {song.duration}, Lyrics length: {len(lyrics)}") # 4. task_id로 Image 조회 (img_order 순서로 정렬) image_result = await session.execute( select(Image) .where(Image.task_id == task_id) .order_by(Image.img_order.asc()) ) images = image_result.scalars().all() if not images: print(f"[generate_video] Image NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 이미지를 찾을 수 없습니다.", ) image_urls = [img.img_url for img in images] print(f"[generate_video] Images found - task_id: {task_id}, count: {len(image_urls)}") # 5. Video 테이블에 초기 데이터 저장 video = Video( project_id=project.id, lyric_id=lyric.id, song_id=song.id, task_id=task_id, creatomate_render_id=None, status="processing", ) session.add(video) await session.flush() # ID 생성을 위해 flush print(f"[generate_video] Video saved (processing) - task_id: {task_id}") # 6. Creatomate API 호출 (POC 패턴 적용) print(f"[generate_video] Creatomate API generation started - task_id: {task_id}") # orientation에 따른 템플릿 선택, duration은 Song에서 가져옴 (없으면 config 기본값 사용) creatomate_service = CreatomateService( orientation=orientation, target_duration=song.duration, # Song의 duration 사용 (None이면 config 기본값) ) print(f"[generate_video] Using template_id: {creatomate_service.template_id}, duration: {creatomate_service.target_duration} (song duration: {song.duration})") # 6-1. 템플릿 조회 (비동기, CreatomateService에서 orientation에 맞는 template_id 사용) template = await creatomate_service.get_one_template_data_async(creatomate_service.template_id) print(f"[generate_video] Template fetched - task_id: {task_id}") # 6-2. elements에서 리소스 매핑 생성 (music_url, lyrics는 DB에서 조회한 값 사용) modifications = creatomate_service.elements_connect_resource_blackbox( elements=template["source"]["elements"], image_url_list=image_urls, lyric=lyrics, music_url=music_url, ) print(f"[generate_video] Modifications created - task_id: {task_id}") # 6-3. elements 수정 new_elements = creatomate_service.modify_element( template["source"]["elements"], modifications, ) template["source"]["elements"] = new_elements print(f"[generate_video] Elements modified - task_id: {task_id}") # 6-4. duration 확장 (target_duration: 영상 길이) final_template = creatomate_service.extend_template_duration( template, creatomate_service.target_duration, ) print(f"[generate_video] Duration extended to {creatomate_service.target_duration}s - task_id: {task_id}") # 6-5. 커스텀 렌더링 요청 (비동기) render_response = await creatomate_service.make_creatomate_custom_call_async( final_template["source"], ) print(f"[generate_video] Creatomate API response - task_id: {task_id}, response: {render_response}") # 렌더 ID 추출 (응답이 리스트인 경우 첫 번째 항목 사용) if isinstance(render_response, list) and len(render_response) > 0: creatomate_render_id = render_response[0].get("id") elif isinstance(render_response, dict): creatomate_render_id = render_response.get("id") else: creatomate_render_id = None # 7. creatomate_render_id 업데이트 video.creatomate_render_id = creatomate_render_id await session.commit() print(f"[generate_video] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}") return GenerateVideoResponse( success=True, task_id=task_id, creatomate_render_id=creatomate_render_id, message="영상 생성 요청이 접수되었습니다. creatomate_render_id로 상태를 조회하세요.", error_message=None, ) except HTTPException: raise except Exception as e: print(f"[generate_video] EXCEPTION - task_id: {task_id}, error: {e}") await session.rollback() return GenerateVideoResponse( success=False, task_id=task_id, creatomate_render_id=None, message="영상 생성 요청에 실패했습니다.", error_message=str(e), ) @router.get( "/status/{creatomate_render_id}", summary="영상 생성 상태 조회", description=""" Creatomate API를 통해 영상 생성 작업의 상태를 조회합니다. succeeded 상태인 경우 백그라운드에서 MP4 파일을 다운로드하고 Video 테이블을 업데이트합니다. ## 경로 파라미터 - **creatomate_render_id**: 영상 생성 시 반환된 Creatomate 렌더 ID (필수) ## 반환 정보 - **success**: 조회 성공 여부 - **status**: 작업 상태 (planned, waiting, rendering, succeeded, failed) - **message**: 상태 메시지 - **render_data**: 렌더링 결과 데이터 (완료 시) - **raw_response**: Creatomate API 원본 응답 ## 사용 예시 ``` GET /video/status/render-id-123... ``` ## 상태 값 - **planned**: 예약됨 - **waiting**: 대기 중 - **transcribing**: 트랜스크립션 중 - **rendering**: 렌더링 중 - **succeeded**: 성공 - **failed**: 실패 ## 참고 - succeeded 시 백그라운드에서 MP4 다운로드 및 DB 업데이트 진행 """, response_model=PollingVideoResponse, responses={ 200: {"description": "상태 조회 성공"}, 500: {"description": "상태 조회 실패"}, }, ) async def get_video_status( creatomate_render_id: str, background_tasks: BackgroundTasks, session: AsyncSession = Depends(get_session), ) -> PollingVideoResponse: """creatomate_render_id로 영상 생성 작업의 상태를 조회합니다. succeeded 상태인 경우 백그라운드에서 MP4 파일을 다운로드하고 Video 테이블의 status를 completed로, result_movie_url을 업데이트합니다. """ print(f"[get_video_status] START - creatomate_render_id: {creatomate_render_id}") try: creatomate_service = CreatomateService() result = await creatomate_service.get_render_status_async(creatomate_render_id) print(f"[get_video_status] Creatomate API response - creatomate_render_id: {creatomate_render_id}, status: {result.get('status')}") status = result.get("status", "unknown") video_url = result.get("url") # 상태별 메시지 설정 status_messages = { "planned": "영상 생성이 예약되었습니다.", "waiting": "영상 생성 대기 중입니다.", "transcribing": "트랜스크립션 진행 중입니다.", "rendering": "영상을 렌더링하고 있습니다.", "succeeded": "영상 생성이 완료되었습니다.", "failed": "영상 생성에 실패했습니다.", } message = status_messages.get(status, f"상태: {status}") # succeeded 상태인 경우 백그라운드 태스크 실행 if status == "succeeded" and video_url: # creatomate_render_id로 Video 조회하여 task_id 가져오기 video_result = await session.execute( select(Video) .where(Video.creatomate_render_id == creatomate_render_id) .order_by(Video.created_at.desc()) .limit(1) ) video = video_result.scalar_one_or_none() if video and video.status != "completed": # 이미 완료된 경우 백그라운드 작업 중복 실행 방지 # task_id로 Project 조회하여 store_name 가져오기 project_result = await session.execute( select(Project).where(Project.id == video.project_id) ) project = project_result.scalar_one_or_none() store_name = project.store_name if project else "video" # 백그라운드 태스크로 MP4 다운로드 → Blob 업로드 → DB 업데이트 → 임시 파일 삭제 print(f"[get_video_status] Background task args - task_id: {video.task_id}, video_url: {video_url}, store_name: {store_name}") background_tasks.add_task( download_and_upload_video_to_blob, task_id=video.task_id, video_url=video_url, store_name=store_name, ) elif video and video.status == "completed": print(f"[get_video_status] SKIPPED - Video already completed, creatomate_render_id: {creatomate_render_id}") render_data = VideoRenderData( id=result.get("id"), status=status, url=video_url, snapshot_url=result.get("snapshot_url"), ) print(f"[get_video_status] SUCCESS - creatomate_render_id: {creatomate_render_id}") return PollingVideoResponse( success=True, status=status, message=message, render_data=render_data, raw_response=result, error_message=None, ) except Exception as e: import traceback print(f"[get_video_status] EXCEPTION - creatomate_render_id: {creatomate_render_id}, error: {e}") return PollingVideoResponse( success=False, status="error", message="상태 조회에 실패했습니다.", render_data=None, raw_response=None, error_message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}", ) @router.get( "/download/{task_id}", summary="영상 생성 URL 조회", description=""" task_id를 기반으로 Video 테이블의 상태를 polling하고, completed인 경우 Project 정보와 영상 URL을 반환합니다. ## 경로 파라미터 - **task_id**: 프로젝트 task_id (필수) ## 반환 정보 - **success**: 조회 성공 여부 - **status**: 처리 상태 (processing, completed, failed) - **message**: 응답 메시지 - **store_name**: 업체명 - **region**: 지역명 - **task_id**: 작업 고유 식별자 - **result_movie_url**: 영상 결과 URL (completed 시) - **created_at**: 생성 일시 ## 사용 예시 ``` GET /video/download/019123ab-cdef-7890-abcd-ef1234567890 ``` ## 참고 - processing 상태인 경우 result_movie_url은 null입니다. - completed 상태인 경우 Project 정보와 함께 result_movie_url을 반환합니다. """, response_model=DownloadVideoResponse, responses={ 200: {"description": "조회 성공"}, 404: {"description": "Video를 찾을 수 없음"}, 500: {"description": "조회 실패"}, }, ) async def download_video( task_id: str, session: AsyncSession = Depends(get_session), ) -> DownloadVideoResponse: """task_id로 Video 상태를 polling하고 completed 시 Project 정보와 영상 URL을 반환합니다.""" print(f"[download_video] START - task_id: {task_id}") try: # task_id로 Video 조회 (여러 개 있을 경우 가장 최근 것 선택) video_result = await session.execute( select(Video) .where(Video.task_id == task_id) .order_by(Video.created_at.desc()) .limit(1) ) video = video_result.scalar_one_or_none() if not video: print(f"[download_video] Video NOT FOUND - task_id: {task_id}") return DownloadVideoResponse( success=False, status="not_found", message=f"task_id '{task_id}'에 해당하는 Video를 찾을 수 없습니다.", error_message="Video not found", ) print(f"[download_video] Video found - task_id: {task_id}, status: {video.status}") # processing 상태인 경우 if video.status == "processing": print(f"[download_video] PROCESSING - task_id: {task_id}") return DownloadVideoResponse( success=True, status="processing", message="영상 생성이 진행 중입니다.", task_id=task_id, ) # failed 상태인 경우 if video.status == "failed": print(f"[download_video] FAILED - task_id: {task_id}") return DownloadVideoResponse( success=False, status="failed", message="영상 생성에 실패했습니다.", task_id=task_id, error_message="Video generation failed", ) # completed 상태인 경우 - Project 정보 조회 project_result = await session.execute( select(Project).where(Project.id == video.project_id) ) project = project_result.scalar_one_or_none() print(f"[download_video] COMPLETED - task_id: {task_id}, result_movie_url: {video.result_movie_url}") return DownloadVideoResponse( success=True, status="completed", message="영상 다운로드가 완료되었습니다.", store_name=project.store_name if project else None, region=project.region if project else None, task_id=task_id, result_movie_url=video.result_movie_url, created_at=video.created_at, ) except Exception as e: print(f"[download_video] EXCEPTION - task_id: {task_id}, error: {e}") return DownloadVideoResponse( success=False, status="error", message="영상 다운로드 조회에 실패했습니다.", error_message=str(e), ) @router.get( "s/", summary="생성된 영상 목록 조회", description=""" 완료된 영상 목록을 페이지네이션하여 조회합니다. ## 쿼리 파라미터 - **page**: 페이지 번호 (1부터 시작, 기본값: 1) - **page_size**: 페이지당 데이터 수 (기본값: 10, 최대: 100) ## 반환 정보 - **items**: 영상 목록 (store_name, region, task_id, result_movie_url, created_at) - **total**: 전체 데이터 수 - **page**: 현재 페이지 - **page_size**: 페이지당 데이터 수 - **total_pages**: 전체 페이지 수 - **has_next**: 다음 페이지 존재 여부 - **has_prev**: 이전 페이지 존재 여부 ## 사용 예시 ``` GET /videos/?page=1&page_size=10 ``` ## 참고 - status가 'completed'인 영상만 반환됩니다. - 동일한 task_id가 있는 경우 가장 최근에 생성된 1개만 반환됩니다. - created_at 기준 내림차순 정렬됩니다. """, response_model=PaginatedResponse[VideoListItem], responses={ 200: {"description": "영상 목록 조회 성공"}, 500: {"description": "조회 실패"}, }, ) async def get_videos( session: AsyncSession = Depends(get_session), pagination: PaginationParams = Depends(get_pagination_params), ) -> PaginatedResponse[VideoListItem]: """완료된 영상 목록을 페이지네이션하여 반환합니다.""" print(f"[get_videos] START - page: {pagination.page}, page_size: {pagination.page_size}") try: offset = (pagination.page - 1) * pagination.page_size # 서브쿼리: task_id별 최신 Video의 id 조회 (completed 상태만) subquery = ( select(func.max(Video.id).label("max_id")) .where(Video.status == "completed") .group_by(Video.task_id) .subquery() ) # 전체 개수 조회 (task_id별 최신 1개만) count_query = select(func.count()).select_from(subquery) total_result = await session.execute(count_query) total = total_result.scalar() or 0 # 데이터 조회 (completed 상태, task_id별 최신 1개만, 최신순) query = ( select(Video) .where(Video.id.in_(select(subquery.c.max_id))) .order_by(Video.created_at.desc()) .offset(offset) .limit(pagination.page_size) ) result = await session.execute(query) videos = result.scalars().all() # Project 정보 일괄 조회 (N+1 문제 해결) project_ids = [v.project_id for v in videos if v.project_id] projects_map: dict = {} if project_ids: projects_result = await session.execute( select(Project).where(Project.id.in_(project_ids)) ) projects_map = {p.id: p for p in projects_result.scalars().all()} # VideoListItem으로 변환 items = [] for video in videos: project = projects_map.get(video.project_id) item = VideoListItem( store_name=project.store_name if project else None, region=project.region if project else None, task_id=video.task_id, result_movie_url=video.result_movie_url, created_at=video.created_at, ) items.append(item) response = PaginatedResponse.create( items=items, total=total, page=pagination.page, page_size=pagination.page_size, ) print( f"[get_videos] SUCCESS - total: {total}, page: {pagination.page}, " f"page_size: {pagination.page_size}, items_count: {len(items)}" ) return response except Exception as e: print(f"[get_videos] EXCEPTION - error: {e}") raise HTTPException( status_code=500, detail=f"영상 목록 조회에 실패했습니다: {str(e)}", )