""" Video API Router 이 모듈은 Creatomate API를 통한 영상 생성 관련 API 엔드포인트를 정의합니다. 엔드포인트 목록: - POST /video/generate/{task_id}: 영상 생성 요청 (task_id로 Project/Lyric/Song 연결) - GET /video/status/{creatomate_render_id}: Creatomate API 영상 생성 상태 조회 - GET /video/download/{task_id}: 영상 다운로드 상태 조회 (DB polling) 사용 예시: from app.video.api.routers.v1.video import router app.include_router(router) """ import json from collections import defaultdict from typing import Literal from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query from sqlalchemy import func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.database.session import get_session from app.dependencies.pagination import PaginationParams, get_pagination_params from app.user.dependencies.auth import get_current_user, get_current_user_optional from app.user.models import User from app.utils.pagination import PaginatedResponse from app.home.models import Image, Project, MarketingIntel, ImageTag from app.home.api.routers.v1.home import _extract_region_from_address from app.utils.address_parser import SIDO_CITIES, SIDO_SEARCH_ALIASES from app.lyric.models import Lyric from app.song.models import Song, SongTimestamp from app.utils.creatomate import CreatomateService from app.utils.subtitles import SubtitleContentsGenerator from app.comment.models import Comment from app.database.like_cache import ( backfill_user_set, bulk_is_user_liked, get_like_count, get_like_counts, is_user_liked, is_user_set_exists, mark_dirty, mset_like_counts, set_like_count, toggle_like_atomic, ) from app.utils.logger import get_logger from app.video.models import Video, VideoReaction from app.video.schemas.video_schema import ( DownloadVideoResponse, GenerateVideoResponse, LikeToggleResponse, PollingVideoResponse, VideoDetailResponse, VideoRenderData, VideoThumbnailItem, ) from app.video.worker.video_task import download_and_upload_video_to_blob from app.video.services.video import get_image_tags_by_task_id from config import creatomate_settings logger = get_logger("video") router = APIRouter(prefix="/video", tags=["Video"]) @router.get( "/generate/{task_id}", summary="영상 생성 요청", description=""" Creatomate API를 통해 영상 생성을 요청합니다. ## 인증 **Bearer 토큰 필수** - `Authorization: Bearer {access_token}` 헤더를 포함해야 합니다. ## 경로 파라미터 - **task_id**: Project/Lyric/Song/Image의 task_id (필수) - 연관된 프로젝트, 가사, 노래, 이미지를 조회하는 데 사용 ## 쿼리 파라미터 - **orientation**: 영상 방향 (horizontal: 가로형, vertical: 세로형, 기본값: vertical) - 선택 ## 자동 조회 정보 - **image_urls**: Image 테이블에서 task_id로 조회 (img_order 순서로 정렬) - **music_url**: Song 테이블의 song_result_url 사용 - **duration**: Song 테이블의 duration 사용 - **lyrics**: Song 테이블의 song_prompt (가사) 사용 ## 반환 정보 - **success**: 요청 성공 여부 - **task_id**: 내부 작업 ID (Project task_id) - **creatomate_render_id**: Creatomate 렌더 ID (상태 조회에 사용) - **message**: 응답 메시지 ## 사용 예시 (cURL) ```bash # 세로형 영상 생성 (기본값) curl -X GET "http://localhost:8000/video/generate/0694b716-dbff-7219-8000-d08cb5fce431" \\ -H "Authorization: Bearer {access_token}" # 가로형 영상 생성 curl -X GET "http://localhost:8000/video/generate/0694b716-dbff-7219-8000-d08cb5fce431?orientation=horizontal" \\ -H "Authorization: Bearer {access_token}" ``` ## 참고 - 이미지는 task_id로 Image 테이블에서 자동 조회됩니다 (img_order 순서). - 배경 음악(music_url), 영상 길이(duration), 가사(lyrics)는 task_id로 Song 테이블을 조회하여 자동으로 가져옵니다. - 같은 task_id로 여러 Song이 있을 경우 **가장 최근 생성된 노래**를 사용합니다. - Song의 song_result_url과 song_prompt가 있어야 영상 생성이 가능합니다. - creatomate_render_id를 사용하여 /status/{creatomate_render_id} 엔드포인트에서 생성 상태를 확인할 수 있습니다. - Video 테이블에 데이터가 저장되며, project_id, lyric_id, song_id가 자동으로 연결됩니다. """, response_model=GenerateVideoResponse, responses={ 200: {"description": "영상 생성 요청 성공"}, 400: {"description": "Song의 음악 URL, 가사(song_prompt) 또는 이미지가 없음"}, 401: {"description": "인증 실패 (토큰 없음/만료)"}, 404: {"description": "Project, Lyric, Song 또는 Image를 찾을 수 없음"}, 500: {"description": "영상 생성 요청 실패"}, }, ) async def generate_video( task_id: str, orientation: Literal["horizontal", "vertical"] = Query( default="vertical", description="영상 방향 (horizontal: 가로형, vertical: 세로형)", ), current_user: User = Depends(get_current_user), ) -> GenerateVideoResponse: """Creatomate API를 통해 영상을 생성합니다. 1. task_id로 Project, Lyric, Song, Image 순차 조회 2. Video 테이블에 초기 데이터 저장 (status: processing) 3. Creatomate API 호출 (orientation에 따른 템플릿 자동 선택) 4. creatomate_render_id 업데이트 후 응답 반환 Note: 이 함수는 Depends(get_session)을 사용하지 않고 명시적으로 세션을 관리합니다. 외부 API 호출 중 DB 커넥션이 유지되지 않도록 하여 커넥션 타임아웃 문제를 방지합니다. 중요: SQLAlchemy AsyncSession은 단일 세션에서 동시에 여러 쿼리를 실행하는 것을 지원하지 않습니다. asyncio.gather()로 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다. 따라서 쿼리는 순차적으로 실행합니다. """ import time from app.database.session import AsyncSessionLocal request_start = time.perf_counter() logger.info( f"[generate_video] START - task_id: {task_id}, orientation: {orientation}" ) # ========================================================================== # 1단계: DB 조회 및 초기 데이터 저장 (세션을 명시적으로 열고 닫음) # ========================================================================== # 외부 API 호출 전에 필요한 데이터를 저장할 변수들 project_id: int | None = None lyric_id: int | None = None song_id: int | None = None video_id: int | None = None music_url: str | None = None song_duration: float | None = None lyrics: str | None = None image_urls: list[str] = [] try: # 세션을 명시적으로 열고 DB 작업 후 바로 닫음 async with AsyncSessionLocal() as session: # ===== 순차 쿼리 실행: Project, MarketingIntel, Lyric, Song, Image ===== # Note: AsyncSession은 동일 세션에서 병렬 쿼리를 지원하지 않음 # Project 조회 project_result = await session.execute( select(Project) .where(Project.task_id == task_id) .order_by(Project.created_at.desc()) .limit(1) ) project = project_result.scalar_one_or_none() if not project: logger.warning(f"[generate_video] Project NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.", ) project_id = project.id store_address = project.detail_region_info brand_name = project.store_name region = project.region # MarketingIntel 조회 marketing_result = await session.execute( select(MarketingIntel).where(MarketingIntel.id == project.marketing_intelligence) ) marketing_intelligence: MarketingIntel = marketing_result.scalar_one_or_none() # subtitle 미완료 시 즉시 반환 — Lyric/Song/Image 쿼리 전에 체크하여 불필요한 조회 방지 # 클라이언트가 /lyric/subtitle/status/{task_id} 폴링 후 재시도 if not marketing_intelligence.subtitle: logger.info(f"[generate_video] subtitle pending - task_id: {task_id}") return GenerateVideoResponse( success=False, status="subtitle_pending", task_id=task_id, creatomate_render_id=None, message="자막 생성이 아직 완료되지 않았습니다. /lyric/subtitle/status/{task_id}로 완료 확인 후 재요청하세요.", error_message=None, ) category_definition = marketing_intelligence.intel_result["market_positioning"]["category_definition"] target_keywords = marketing_intelligence.intel_result["target_keywords"] brand_concept = "" for sp in marketing_intelligence.intel_result["selling_points"]: if "concept" in sp["english_category"].lower(): brand_concept = sp["description"] # Lyric 조회 lyric_result = await session.execute( select(Lyric) .where(Lyric.task_id == task_id) .order_by(Lyric.created_at.desc()) .limit(1) ) # Song 조회 song_result = await session.execute( select(Song) .where(Song.task_id == task_id) .order_by(Song.created_at.desc()) .limit(1) ) # Image 조회 image_result = await session.execute( select(Image) .where(Image.task_id == task_id) .order_by(Image.img_order.asc()) ) query_time = time.perf_counter() logger.debug( f"[generate_video] Queries completed - task_id: {task_id}, " f"elapsed: {(query_time - request_start) * 1000:.1f}ms" ) # ===== 결과 처리: Lyric ===== lyric = lyric_result.scalar_one_or_none() if not lyric: logger.warning(f"[generate_video] Lyric NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.", ) lyric_id = lyric.id lyric_language = lyric.language # ===== 결과 처리: Song ===== song = song_result.scalar_one_or_none() if not song: logger.warning(f"[generate_video] Song NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.", ) song_id = song.id music_url = song.song_result_url song_duration = song.duration lyrics = song.song_prompt if not music_url: raise HTTPException( status_code=400, detail=f"Song(id={song_id})의 음악 URL이 없습니다.", ) if not lyrics: raise HTTPException( status_code=400, detail=f"Song(id={song_id})의 가사(song_prompt)가 없습니다.", ) # ===== 결과 처리: Image ===== images = image_result.scalars().all() if not images: logger.warning(f"[generate_video] Image NOT FOUND - task_id: {task_id}") raise HTTPException( status_code=404, detail=f"task_id '{task_id}'에 해당하는 이미지를 찾을 수 없습니다.", ) image_urls = [img.img_url for img in images] # SongTimestamp 조회 (외부 API 호출 전 필요한 데이터이므로 1단계에서 수집) song_timestamp_result = await session.execute( select(SongTimestamp).where( SongTimestamp.suno_audio_id == song.suno_audio_id ) ) song_timestamp_list = song_timestamp_result.scalars().all() logger.info( f"[generate_video] Data loaded - task_id: {task_id}, " f"project_id: {project_id}, lyric_id: {lyric_id}, " f"song_id: {song_id}, images: {len(image_urls)}, " f"timestamps: {len(song_timestamp_list)}" ) # ===== Video 테이블에 초기 데이터 저장 및 커밋 ===== video = Video( project_id=project_id, lyric_id=lyric_id, song_id=song_id, task_id=task_id, creatomate_render_id=None, status="processing", ) session.add(video) await session.commit() video_id = video.id stage1_time = time.perf_counter() logger.info( f"[generate_video] Video saved - task_id: {task_id}, id: {video_id}, " f"stage1_elapsed: {(stage1_time - request_start) * 1000:.1f}ms" ) # 세션이 여기서 자동으로 닫힘 (async with 블록 종료) except HTTPException: raise except Exception as e: logger.error(f"[generate_video] DB EXCEPTION - task_id: {task_id}, error: {e}") return GenerateVideoResponse( success=False, task_id=task_id, creatomate_render_id=None, message="영상 생성 요청에 실패했습니다.", error_message=str(e), ) # ========================================================================== # 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음) # ========================================================================== stage2_start = time.perf_counter() try: logger.info( f"[generate_video] Stage 2 START - Creatomate API - task_id: {task_id}" ) creatomate_service = CreatomateService( orientation=orientation ) logger.debug( f"[generate_video] Using template_id: {creatomate_service.template_id}, (song duration: {song_duration})" ) # 6-1. 템플릿 조회 (비동기) template = await creatomate_service.get_one_template_data( creatomate_service.template_id ) logger.debug(f"[generate_video] Template fetched - task_id: {task_id}") # 6-2. elements에서 리소스 매핑 생성 # modifications = creatomate_service.elements_connect_resource_blackbox( # elements=template["source"]["elements"], # image_url_list=image_urls, # music_url=music_url, # address=store_address taged_image_list = await get_image_tags_by_task_id(task_id) min_image_num = creatomate_service.counting_component( template = template, target_template_type = "image" ) duplicate = bool(len(taged_image_list) < min_image_num) logger.info(f"[generate_video] Duplicate : {duplicate} | length of taged_image {len(taged_image_list)}, min_len {min_image_num},- task_id: {task_id}") modifications = creatomate_service.template_matching_taged_image( template = template, taged_image_list = taged_image_list, music_url = music_url, address = store_address, duplicate = duplicate, ) logger.debug(f"[generate_video] Modifications created - task_id: {task_id}") subtitle_modifications = marketing_intelligence.subtitle modifications.update(subtitle_modifications) # revert thumbnail scene # thumbnail_modifications = creatomate_service.make_thumbnail_modification( # brand_name =brand_name, # region = region, # brand_concept = brand_concept, # category_definition= category_definition, # target_keywords=target_keywords) # modifications.update(thumbnail_modifications) # 6-3. elements 수정 new_elements = creatomate_service.modify_element( template["source"]["elements"], modifications, ) template["source"]["elements"] = new_elements logger.debug(f"[generate_video] Elements modified - task_id: {task_id}") # 6-4. duration 확장 final_template = creatomate_service.extend_template_duration( template, song_duration, ) logger.debug(f"[generate_video] Duration extended - task_id: {task_id}") logger.debug(f"[generate_video] song_timestamp_list count: {len(song_timestamp_list)}") for i, ts in enumerate(song_timestamp_list): logger.debug(f"[generate_video] timestamp[{i}]: lyric_line={ts.lyric_line}, start_time={ts.start_time}, end_time={ts.end_time}") match lyric_language: case "English" : lyric_font = "Noto Sans" # lyric_font = "Pretendard" # 없어요 case _ : lyric_font = "Noto Sans" # LYRIC AUTO 결정부 if (creatomate_settings.LYRIC_SUBTITLE): if (creatomate_settings.DEBUG_AUTO_LYRIC): auto_text_template = creatomate_service.get_auto_text_template() final_template["source"]["elements"].append(creatomate_service.auto_lyric(auto_text_template)) else : text_template = creatomate_service.get_text_template() for idx, aligned in enumerate(song_timestamp_list): caption = creatomate_service.lining_lyric( text_template, idx, aligned.lyric_line, aligned.start_time, aligned.end_time, lyric_font ) final_template["source"]["elements"].append(caption) # END - LYRIC AUTO 결정부 # logger.debug( # f"[generate_video] final_template: {json.dumps(final_template, indent=2, ensure_ascii=False)}" # ) # 6-5. 커스텀 렌더링 요청 (비동기) render_response = await creatomate_service.make_creatomate_custom_call( final_template["source"], ) logger.debug(f"[generate_video] Creatomate API response - task_id: {task_id}, response: {render_response}") # 렌더 ID 추출 if isinstance(render_response, list) and len(render_response) > 0: creatomate_render_id = render_response[0].get("id") elif isinstance(render_response, dict): creatomate_render_id = render_response.get("id") else: creatomate_render_id = None stage2_time = time.perf_counter() logger.info( f"[generate_video] Stage 2 DONE - task_id: {task_id}, " f"render_id: {creatomate_render_id}, " f"stage2_elapsed: {(stage2_time - stage2_start) * 1000:.1f}ms" ) except Exception as e: logger.error( f"[generate_video] Creatomate API EXCEPTION - task_id: {task_id}, error: {e}" ) import traceback logger.error(traceback.format_exc()) # 외부 API 실패 시 Video 상태를 failed로 업데이트 from app.database.session import AsyncSessionLocal async with AsyncSessionLocal() as update_session: video_result = await update_session.execute( select(Video).where(Video.id == video_id) ) video_to_update = video_result.scalar_one_or_none() if video_to_update: video_to_update.status = "failed" await update_session.commit() return GenerateVideoResponse( success=False, task_id=task_id, creatomate_render_id=None, message="영상 생성 요청에 실패했습니다.", error_message=str(e), ) # ========================================================================== # 3단계: creatomate_render_id 업데이트 (새 세션으로 빠르게 처리) # ========================================================================== stage3_start = time.perf_counter() logger.info(f"[generate_video] Stage 3 START - DB update - task_id: {task_id}") try: from app.database.session import AsyncSessionLocal async with AsyncSessionLocal() as update_session: video_result = await update_session.execute( select(Video).where(Video.id == video_id) ) video_to_update = video_result.scalar_one_or_none() if video_to_update: video_to_update.creatomate_render_id = creatomate_render_id await update_session.commit() stage3_time = time.perf_counter() total_time = stage3_time - request_start logger.debug( f"[generate_video] Stage 3 DONE - task_id: {task_id}, " f"stage3_elapsed: {(stage3_time - stage3_start) * 1000:.1f}ms" ) logger.info( f"[generate_video] SUCCESS - task_id: {task_id}, " f"render_id: {creatomate_render_id}, " f"total_time: {total_time * 1000:.1f}ms" ) return GenerateVideoResponse( success=True, task_id=task_id, creatomate_render_id=creatomate_render_id, message="영상 생성 요청이 접수되었습니다. creatomate_render_id로 상태를 조회하세요.", error_message=None, ) except Exception as e: logger.error( f"[generate_video] Update EXCEPTION - task_id: {task_id}, error: {e}" ) return GenerateVideoResponse( success=False, task_id=task_id, creatomate_render_id=creatomate_render_id, message="영상 생성은 요청되었으나 DB 업데이트에 실패했습니다.", error_message=str(e), ) @router.get( "/status/{creatomate_render_id}", summary="영상 생성 상태 조회", description=""" Creatomate API를 통해 영상 생성 작업의 상태를 조회합니다. succeeded 상태인 경우 백그라운드에서 MP4 파일을 다운로드하고 Video 테이블을 업데이트합니다. ## 인증 **Bearer 토큰 필수** - `Authorization: Bearer {access_token}` 헤더를 포함해야 합니다. ## 경로 파라미터 - **creatomate_render_id**: 영상 생성 시 반환된 Creatomate 렌더 ID (필수) ## 반환 정보 - **success**: 조회 성공 여부 - **status**: 작업 상태 (planned, waiting, rendering, succeeded, failed) - **message**: 상태 메시지 - **render_data**: 렌더링 결과 데이터 (완료 시) - **raw_response**: Creatomate API 원본 응답 ## 사용 예시 (cURL) ```bash curl -X GET "http://localhost:8000/video/status/{creatomate_render_id}" \\ -H "Authorization: Bearer {access_token}" ``` ## 상태 값 - **planned**: 예약됨 - **waiting**: 대기 중 - **transcribing**: 트랜스크립션 중 - **rendering**: 렌더링 중 - **succeeded**: 성공 - **failed**: 실패 ## 참고 - succeeded 시 백그라운드에서 MP4 다운로드 및 DB 업데이트 진행 """, response_model=PollingVideoResponse, responses={ 200: {"description": "상태 조회 성공"}, 401: {"description": "인증 실패 (토큰 없음/만료)"}, 500: {"description": "상태 조회 실패"}, }, ) async def get_video_status( creatomate_render_id: str, background_tasks: BackgroundTasks, current_user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ) -> PollingVideoResponse: logger.info( f"[get_video_status] START - creatomate_render_id: {creatomate_render_id}" ) try: creatomate_service = CreatomateService() result = await creatomate_service.get_render_status(creatomate_render_id) logger.debug( f"[get_video_status] Creatomate API response - creatomate_render_id: {creatomate_render_id}, status: {result.get('status')}" ) status = result.get("status", "unknown") video_url = result.get("url") # 상태별 메시지 설정 status_messages = { "planned": "영상 생성이 예약되었습니다.", "waiting": "영상 생성 대기 중입니다.", "transcribing": "트랜스크립션 진행 중입니다.", "rendering": "영상을 렌더링하고 있습니다.", "succeeded": "영상 생성이 완료되었습니다.", "failed": "영상 생성에 실패했습니다.", } message = status_messages.get(status, f"상태: {status}") video_id = None # succeeded 상태인 경우 백그라운드 태스크 실행 if status == "succeeded" and video_url: # creatomate_render_id로 Video 조회하여 task_id 가져오기 video_result = await session.execute( select(Video) .where(Video.creatomate_render_id == creatomate_render_id) .order_by(Video.created_at.desc()) .limit(1) ) video = video_result.scalar_one_or_none() video_id = video.id if video and video.status != "completed": # 이미 완료된 경우 백그라운드 작업 중복 실행 방지 # 백그라운드 태스크로 MP4 다운로드 → Blob 업로드 → DB 업데이트 → 임시 파일 삭제 logger.info( f"[get_video_status] Background task args - task_id: {video.task_id}, video_url: {video_url}, creatomate_render_id: {creatomate_render_id}" ) background_tasks.add_task( download_and_upload_video_to_blob, task_id=video.task_id, video_url=video_url, creatomate_render_id=creatomate_render_id, user_uuid=current_user.user_uuid, ) elif video and video.status == "completed": logger.debug( f"[get_video_status] SKIPPED - Video already completed, creatomate_render_id: {creatomate_render_id}" ) render_data = VideoRenderData( id=result.get("id"), status=status, url=video_url, snapshot_url=result.get("snapshot_url"), video_id = video_id if video_id else None ) logger.info( f"[get_video_status] SUCCESS - creatomate_render_id: {creatomate_render_id}" ) return PollingVideoResponse( success=True, status=status, message=message, render_data=render_data, raw_response=result, error_message=None, ) except Exception as e: import traceback logger.error( f"[get_video_status] EXCEPTION - creatomate_render_id: {creatomate_render_id}, error: {e}\n{traceback.format_exc()}" ) return PollingVideoResponse( success=False, status="error", message="상태 조회에 실패했습니다.", render_data=None, raw_response=None, error_message=f"{type(e).__name__}: {e}", ) @router.get( "/download/{task_id}", summary="영상 생성 URL 조회", description=""" task_id를 기반으로 Video 테이블의 상태를 polling하고, completed인 경우 Project 정보와 영상 URL을 반환합니다. ## 인증 **Bearer 토큰 필수** - `Authorization: Bearer {access_token}` 헤더를 포함해야 합니다. ## 경로 파라미터 - **task_id**: 프로젝트 task_id (필수) ## 반환 정보 - **success**: 조회 성공 여부 - **status**: 처리 상태 (processing, completed, failed) - **message**: 응답 메시지 - **store_name**: 업체명 - **region**: 지역명 - **task_id**: 작업 고유 식별자 - **result_movie_url**: 영상 결과 URL (completed 시) - **created_at**: 생성 일시 ## 사용 예시 (cURL) ```bash curl -X GET "http://localhost:8000/video/download/019123ab-cdef-7890-abcd-ef1234567890" \\ -H "Authorization: Bearer {access_token}" ``` ## 참고 - processing 상태인 경우 result_movie_url은 null입니다. - completed 상태인 경우 Project 정보와 함께 result_movie_url을 반환합니다. """, response_model=DownloadVideoResponse, responses={ 200: {"description": "조회 성공"}, 401: {"description": "인증 실패 (토큰 없음/만료)"}, 404: {"description": "Video를 찾을 수 없음"}, 500: {"description": "조회 실패"}, }, ) async def download_video( task_id: str, current_user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ) -> DownloadVideoResponse: """task_id로 Video 상태를 polling하고 completed 시 Project 정보와 영상 URL을 반환합니다.""" logger.info(f"[download_video] START - task_id: {task_id}") try: # task_id로 Video 조회 (여러 개 있을 경우 가장 최근 것 선택) video_result = await session.execute( select(Video) .where(Video.task_id == task_id) .order_by(Video.created_at.desc()) .limit(1) ) video = video_result.scalar_one_or_none() if not video: logger.warning(f"[download_video] Video NOT FOUND - task_id: {task_id}") return DownloadVideoResponse( success=False, status="not_found", message=f"task_id '{task_id}'에 해당하는 Video를 찾을 수 없습니다.", error_message="Video not found", ) logger.debug( f"[download_video] Video found - task_id: {task_id}, status: {video.status}" ) # processing 상태인 경우 if video.status == "processing": logger.debug(f"[download_video] PROCESSING - task_id: {task_id}") return DownloadVideoResponse( success=True, status="processing", message="영상 생성이 진행 중입니다.", task_id=task_id, ) # failed 상태인 경우 if video.status == "failed": logger.error(f"[download_video] FAILED - task_id: {task_id}") return DownloadVideoResponse( success=False, status="failed", message="영상 생성에 실패했습니다.", task_id=task_id, error_message="Video generation failed", ) # completed 상태인 경우 - Project 정보 조회 project_result = await session.execute( select(Project).where(Project.id == video.project_id) ) project = project_result.scalar_one_or_none() logger.info( f"[download_video] COMPLETED - task_id: {task_id}, result_movie_url: {video.result_movie_url}" ) return DownloadVideoResponse( success=True, status="completed", message="영상 다운로드가 완료되었습니다.", store_name=project.store_name if project else None, region=project.region or _extract_region_from_address(project.detail_region_info) if project else None, task_id=task_id, result_movie_url=video.result_movie_url, created_at=video.created_at, ) except Exception as e: logger.error(f"[download_video] EXCEPTION - task_id: {task_id}, error: {e}") return DownloadVideoResponse( success=False, status="error", message="영상 다운로드 조회에 실패했습니다.", error_message=str(e), ) @router.get( "/all", summary="ADO2 콘텐츠 - 전체 사용자 영상 갤러리", description=""" ## 개요 모든 사용자가 생성 완료한 영상을 페이지네이션하여 반환합니다. ## 쿼리 파라미터 - **page**: 페이지 번호 (1부터 시작, 기본값: 1) - **page_size**: 페이지당 데이터 수 (기본값: 10, 최대: 100) - **sort_by**: 정렬 기준 (created_at: 최신순, like_count: 좋아요순, comment_count: 댓글순, 기본값: created_at) - **order**: 정렬 방향 (desc: 내림차순, asc: 오름차순, 기본값: desc) - **store_name**: 업체명 검색 (부분 일치, 값이 있을 때만 전송) - **region**: 지역명 검색 (부분 일치, 값이 있을 때만 전송) """, response_model=PaginatedResponse[VideoThumbnailItem], responses={ 200: {"description": "갤러리 조회 성공"}, 500: {"description": "조회 실패"}, }, ) async def get_all_videos( current_user: User | None = Depends(get_current_user_optional), session: AsyncSession = Depends(get_session), pagination: PaginationParams = Depends(get_pagination_params), sort_by: str = Query(default="created_at", description="정렬 기준 (created_at, like_count, comment_count)"), order: str = Query(default="desc", description="정렬 방향 (desc, asc)"), store_name: str | None = Query(default=None, description="업체명 검색 (부분 일치)"), region: str | None = Query(default=None, description="지역명 검색 (부분 일치)"), ) -> PaginatedResponse[VideoThumbnailItem]: """전체 사용자의 완료된 영상 갤러리를 반환합니다.""" logger.info( f"[get_all_videos] START - page: {pagination.page}, page_size: {pagination.page_size}, " f"sort_by: {sort_by}, order: {order}, store_name: {store_name}, region: {region}" ) try: offset = (pagination.page - 1) * pagination.page_size where_clauses = [ Video.status == "completed", Video.is_deleted == False, # noqa: E712 Project.is_deleted == False, # noqa: E712 Video.result_movie_url.is_not(None), ] if store_name: where_clauses.append(Project.store_name.ilike(f"%{store_name}%")) if region: cities = SIDO_CITIES.get(region) if cities: aliases = SIDO_SEARCH_ALIASES.get(region, [region]) where_clauses.append( or_( Project.region.in_(cities), *[Project.detail_region_info.ilike(f"%{a}%") for a in aliases], ) ) else: where_clauses.append( or_( Project.region.ilike(f"%{region}%"), Project.detail_region_info.ilike(f"%{region}%"), ) ) count_q = ( select(func.count(Video.id)) .join(Project, Video.project_id == Project.id) .where(*where_clauses) ) total = (await session.execute(count_q)).scalar() or 0 comment_count_subq = ( select(func.count(Comment.id)) .where( Comment.video_id == Video.id, Comment.is_deleted == False, # noqa: E712 ) .correlate(Video) .scalar_subquery() ) # like_count 정렬은 Redis 대신 서브쿼리로 처리 (ORDER BY에만 사용) like_count_subq_for_sort = ( select(func.count(VideoReaction.id)) .where(VideoReaction.video_id == Video.id) .correlate(Video) .scalar_subquery() ) sort_col_map = { "like_count": like_count_subq_for_sort, "comment_count": comment_count_subq, "created_at": Video.created_at, } sort_col = sort_col_map.get(sort_by, Video.created_at) order_clause = sort_col.asc() if order == "asc" else sort_col.desc() list_q = ( select( Video, Project, comment_count_subq.label("comment_count"), ) .join(Project, Video.project_id == Project.id) .where(*where_clauses) .order_by(order_clause) .offset(offset) .limit(pagination.page_size) ) rows = (await session.execute(list_q)).all() video_ids = [v.id for v, p, _ in rows] # Redis mget으로 like_count 일괄 조회 like_count_map = await get_like_counts(video_ids) # 카운트 캐시 미스 보정 missing_ids = [vid for vid, cnt in like_count_map.items() if cnt is None] if missing_ids: db_counts = (await session.execute( select(VideoReaction.video_id, func.count(VideoReaction.id)) .where(VideoReaction.video_id.in_(missing_ids)) .group_by(VideoReaction.video_id) )).all() db_found_ids = set() batch = {} for vid, cnt in db_counts: batch[vid] = cnt like_count_map[vid] = cnt db_found_ids.add(vid) await mset_like_counts(batch) for vid in missing_ids: if vid not in db_found_ids: like_count_map[vid] = 0 # is_liked_by_me: Redis user-set 기준, cold-start 시 DB backfill liked_map: dict[int, bool] = {} if current_user: raw_liked = await bulk_is_user_liked(video_ids, current_user.user_uuid) # user-set이 없는(None) 영상 중 count > 0인 것만 backfill 필요 needs_backfill = [ vid for vid, liked in raw_liked.items() if liked is None and like_count_map.get(vid, 0) > 0 ] if needs_backfill: reaction_rows = (await session.execute( select(VideoReaction.video_id, VideoReaction.user_uuid) .where(VideoReaction.video_id.in_(needs_backfill)) )).all() user_map: dict[int, list[str]] = defaultdict(list) for vid, uuid in reaction_rows: user_map[vid].append(uuid) for vid in needs_backfill: await backfill_user_set(vid, user_map.get(vid, [])) # backfill 후 재조회 updated = await bulk_is_user_liked(needs_backfill, current_user.user_uuid) raw_liked.update(updated) liked_map = {vid: bool(liked) for vid, liked in raw_liked.items()} items = [ VideoThumbnailItem( video_id=v.id, store_name=p.store_name, result_movie_url=v.result_movie_url, created_at=v.created_at, like_count=like_count_map.get(v.id) or 0, is_liked_by_me=liked_map.get(v.id, False), comment_count=comment_count or 0, ) for v, p, comment_count in rows ] response = PaginatedResponse.create( items=items, total=total, page=pagination.page, page_size=pagination.page_size, ) logger.info(f"[get_all_videos] SUCCESS - total: {total}, items: {len(items)}") return response except Exception as e: logger.error(f"[get_all_videos] EXCEPTION - error: {e}") raise HTTPException(status_code=500, detail=f"갤러리 조회에 실패했습니다: {str(e)}") @router.post( "/{video_id}/like", summary="영상 좋아요 토글", description=""" ## 개요 영상에 좋아요를 토글합니다. 로그인 필수. - 처음 호출: 좋아요 추가 (is_liked=true) - 다시 호출: 좋아요 취소 (is_liked=false) """, response_model=LikeToggleResponse, responses={ 200: {"description": "토글 성공"}, 401: {"description": "인증 실패"}, 404: {"description": "영상을 찾을 수 없음"}, }, ) async def toggle_like( video_id: int, current_user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ) -> LikeToggleResponse: """영상 좋아요를 토글합니다. Write-Behind 패턴: 1. Redis user-set / count를 즉시 원자적으로 업데이트 (Lua script) 2. dirty SET에 표시 → 스케줄러가 1분마다 MySQL에 반영 DB write가 없으므로 고트래픽에서도 응답 지연 없음. """ logger.info(f"[toggle_like] START - video_id: {video_id}, user: {current_user.user_uuid}") try: # 영상 존재 확인 (DB read는 유지 — 404 처리 필수) video_result = await session.execute( select(Video).where( Video.id == video_id, Video.status == "completed", Video.is_deleted == False, # noqa: E712 ) ) if video_result.scalar_one_or_none() is None: raise HTTPException(status_code=404, detail="영상을 찾을 수 없습니다.") # Cold-start 보정: Redis에 데이터가 없으면 DB에서 backfill count = await get_like_count(video_id) if count is None: # 카운트와 user-set 모두 없음 → DB에서 전체 복구 user_uuids = (await session.execute( select(VideoReaction.user_uuid) .where(VideoReaction.video_id == video_id) )).scalars().all() await backfill_user_set(video_id, list(user_uuids)) await set_like_count(video_id, len(user_uuids)) elif count > 0: if not await is_user_set_exists(video_id): # 카운트는 있지만 user-set이 증발한 경우 (부분 캐시 미스) user_uuids = (await session.execute( select(VideoReaction.user_uuid) .where(VideoReaction.video_id == video_id) )).scalars().all() await backfill_user_set(video_id, list(user_uuids)) # Lua 스크립트로 원자적 토글 (race condition 방지) is_liked, like_count = await toggle_like_atomic(video_id, current_user.user_uuid) # dirty SET에 표시 → 스케줄러가 DB에 반영 await mark_dirty(video_id, current_user.user_uuid) logger.info( f"[toggle_like] SUCCESS - video_id: {video_id}, " f"is_liked: {is_liked}, count: {like_count}" ) return LikeToggleResponse(video_id=video_id, is_liked=is_liked, like_count=like_count) except HTTPException: raise except Exception as e: logger.error(f"[toggle_like] EXCEPTION - video_id: {video_id}, error: {e}") raise HTTPException(status_code=500, detail=f"좋아요 처리에 실패했습니다: {str(e)}") @router.get( "/{video_id}", summary="단일 영상 상세 조회", description=""" ## 개요 video_id에 해당하는 완료된 영상의 상세 정보를 반환합니다. ## 경로 파라미터 - **video_id**: 조회할 영상의 ID (Video.id) """, response_model=VideoDetailResponse, responses={ 200: {"description": "상세 조회 성공"}, 404: {"description": "영상을 찾을 수 없음"}, 500: {"description": "조회 실패"}, }, ) async def get_video_detail( video_id: int, current_user: User | None = Depends(get_current_user_optional), session: AsyncSession = Depends(get_session), ) -> VideoDetailResponse: """video_id에 해당하는 완료된 영상 상세 정보를 반환합니다.""" logger.info(f"[get_video_detail] START - video_id: {video_id}") try: result = await session.execute( select(Video, Project) .join(Project, Video.project_id == Project.id) .where( Video.id == video_id, Video.status == "completed", Video.is_deleted == False, # noqa: E712 Project.is_deleted == False, # noqa: E712 ) ) row = result.one_or_none() if row is None: logger.warning(f"[get_video_detail] NOT FOUND - video_id: {video_id}") raise HTTPException(status_code=404, detail="영상을 찾을 수 없습니다.") video, project = row # like_count: Redis 조회, 캐시 미스 시 DB backfill like_count = await get_like_count(video_id) if like_count is None: user_uuids = (await session.execute( select(VideoReaction.user_uuid) .where(VideoReaction.video_id == video_id) )).scalars().all() like_count = len(user_uuids) await backfill_user_set(video_id, list(user_uuids)) await set_like_count(video_id, like_count) # is_liked_by_me: Redis user-set 기준, cold-start 시 DB backfill is_liked_by_me = False if current_user: liked = await is_user_liked(video_id, current_user.user_uuid) if liked is None: # user-set 없음 → count key로 cold-start 여부 판별 if like_count > 0: user_uuids = (await session.execute( select(VideoReaction.user_uuid) .where(VideoReaction.video_id == video_id) )).scalars().all() await backfill_user_set(video_id, list(user_uuids)) liked = current_user.user_uuid in set(user_uuids) else: liked = False is_liked_by_me = liked logger.info(f"[get_video_detail] SUCCESS - video_id: {video_id}") return VideoDetailResponse( video_id=video.id, result_movie_url=video.result_movie_url, store_name=project.store_name, region=project.region or _extract_region_from_address(project.detail_region_info), created_at=video.created_at, like_count=like_count, is_liked_by_me=is_liked_by_me, ) except HTTPException: raise except Exception as e: logger.error(f"[get_video_detail] EXCEPTION - video_id: {video_id}, error: {e}") raise HTTPException(status_code=500, detail=f"영상 조회에 실패했습니다: {str(e)}")