diff --git a/app/archive/api/routers/v1/archive.py b/app/archive/api/routers/v1/archive.py index 678ffbd..9ca7c3e 100644 --- a/app/archive/api/routers/v1/archive.py +++ b/app/archive/api/routers/v1/archive.py @@ -37,7 +37,7 @@ router = APIRouter(prefix="/archive", tags=["Archive"]) - **page_size**: 페이지당 데이터 수 (기본값: 10, 최대: 100) ## 반환 정보 -- **items**: 영상 목록 (store_name, region, task_id, result_movie_url, created_at) +- **items**: 영상 목록 (video_id, store_name, region, task_id, result_movie_url, created_at) - **total**: 전체 데이터 수 - **page**: 현재 페이지 - **page_size**: 페이지당 데이터 수 @@ -53,7 +53,7 @@ GET /archive/videos/?page=1&page_size=10 ## 참고 - **본인이 소유한 프로젝트의 영상만 반환됩니다.** - status가 'completed'인 영상만 반환됩니다. -- 동일한 task_id가 있는 경우 가장 최근에 생성된 1개만 반환됩니다. +- 재생성된 영상 포함 모든 영상이 반환됩니다. - created_at 기준 내림차순 정렬됩니다. """, response_model=PaginatedResponse[VideoListItem], @@ -149,35 +149,21 @@ async def get_videos( Project.is_deleted == False, ] - # 쿼리 1: 전체 개수 조회 (task_id 기준 고유 개수) + # 쿼리 1: 전체 개수 조회 (모든 영상) count_query = ( - select(func.count(func.distinct(Video.task_id))) + select(func.count(Video.id)) .join(Project, Video.project_id == Project.id) .where(*base_conditions) ) total_result = await session.execute(count_query) total = total_result.scalar() or 0 - logger.debug(f"[get_videos] DEBUG - task_id 기준 고유 개수 (total): {total}") + logger.debug(f"[get_videos] DEBUG - 전체 영상 개수 (total): {total}") - # 서브쿼리: task_id별 최신 Video의 id 조회 - subquery = ( - select(func.max(Video.id).label("max_id")) - .join(Project, Video.project_id == Project.id) - .where(*base_conditions) - .group_by(Video.task_id) - .subquery() - ) - - # DEBUG: 서브쿼리 결과 확인 - subquery_debug_result = await session.execute(select(subquery.c.max_id)) - subquery_ids = [row[0] for row in subquery_debug_result.all()] - logger.debug(f"[get_videos] DEBUG - 서브쿼리 결과 (max_id 목록): {subquery_ids}") - - # 쿼리 2: Video + Project 데이터를 JOIN으로 한 번에 조회 + # 쿼리 2: Video + Project 데이터를 JOIN으로 한 번에 조회 (모든 영상) query = ( select(Video, Project) .join(Project, Video.project_id == Project.id) - .where(Video.id.in_(select(subquery.c.max_id))) + .where(*base_conditions) .order_by(Video.created_at.desc()) .offset(offset) .limit(pagination.page_size) @@ -190,6 +176,7 @@ async def get_videos( items = [] for video, project in rows: item = VideoListItem( + video_id=video.id, store_name=project.store_name, region=project.region, task_id=video.task_id, diff --git a/app/lyric/api/routers/v1/lyric.py b/app/lyric/api/routers/v1/lyric.py index 1effee9..860bdc7 100644 --- a/app/lyric/api/routers/v1/lyric.py +++ b/app/lyric/api/routers/v1/lyric.py @@ -292,21 +292,33 @@ async def generate_lyric( step1_elapsed = (time.perf_counter() - step1_start) * 1000 #logger.debug(f"[generate_lyric] Step 1 완료 - 프롬프트 {len(prompt)}자 ({step1_elapsed:.1f}ms)") - # ========== Step 2: Project 테이블에 데이터 저장 ========== + # ========== Step 2: Project 조회 또는 생성 ========== step2_start = time.perf_counter() - logger.debug(f"[generate_lyric] Step 2: Project 저장...") + logger.debug(f"[generate_lyric] Step 2: Project 조회 또는 생성...") - project = Project( - store_name=request_body.customer_name, - region=request_body.region, - task_id=task_id, - detail_region_info=request_body.detail_region_info, - language=request_body.language, - user_uuid=current_user.user_uuid, + # 기존 Project가 있는지 확인 (재생성 시 재사용) + existing_project_result = await session.execute( + select(Project).where(Project.task_id == task_id).limit(1) ) - session.add(project) - await session.commit() - await session.refresh(project) + project = existing_project_result.scalar_one_or_none() + + if project: + # 기존 Project 재사용 (재생성 케이스) + logger.info(f"[generate_lyric] 기존 Project 재사용 - project_id: {project.id}, task_id: {task_id}") + else: + # 새 Project 생성 (최초 생성 케이스) + project = Project( + store_name=request_body.customer_name, + region=request_body.region, + task_id=task_id, + detail_region_info=request_body.detail_region_info, + language=request_body.language, + user_uuid=current_user.user_uuid, + ) + session.add(project) + await session.commit() + await session.refresh(project) + logger.info(f"[generate_lyric] 새 Project 생성 - project_id: {project.id}, task_id: {task_id}") step2_elapsed = (time.perf_counter() - step2_start) * 1000 logger.debug(f"[generate_lyric] Step 2 완료 - project_id: {project.id} ({step2_elapsed:.1f}ms)") @@ -340,6 +352,7 @@ async def generate_lyric( task_id=task_id, prompt=lyric_prompt, lyric_input_data=lyric_input_data, + lyric_id=lyric.id, ) step4_elapsed = (time.perf_counter() - step4_start) * 1000 diff --git a/app/lyric/worker/lyric_task.py b/app/lyric/worker/lyric_task.py index 6d465cc..8c1ce31 100644 --- a/app/lyric/worker/lyric_task.py +++ b/app/lyric/worker/lyric_task.py @@ -23,6 +23,7 @@ async def _update_lyric_status( task_id: str, status: str, result: str | None = None, + lyric_id: int | None = None, ) -> bool: """Lyric 테이블의 상태를 업데이트합니다. @@ -30,18 +31,26 @@ async def _update_lyric_status( task_id: 프로젝트 task_id status: 변경할 상태 ("processing", "completed", "failed") result: 가사 결과 또는 에러 메시지 + lyric_id: 특정 Lyric 레코드 ID (재생성 시 정확한 레코드 식별용) Returns: bool: 업데이트 성공 여부 """ try: async with BackgroundSessionLocal() as session: - query_result = await session.execute( - select(Lyric) - .where(Lyric.task_id == task_id) - .order_by(Lyric.created_at.desc()) - .limit(1) - ) + if lyric_id: + # lyric_id로 특정 레코드 조회 (재생성 시에도 정확한 레코드 업데이트) + query_result = await session.execute( + select(Lyric).where(Lyric.id == lyric_id) + ) + else: + # 기존 방식: task_id로 최신 레코드 조회 + query_result = await session.execute( + select(Lyric) + .where(Lyric.task_id == task_id) + .order_by(Lyric.created_at.desc()) + .limit(1) + ) lyric = query_result.scalar_one_or_none() if lyric: @@ -49,31 +58,33 @@ async def _update_lyric_status( if result is not None: lyric.lyric_result = result await session.commit() - logger.info(f"[Lyric] Status updated - task_id: {task_id}, status: {status}") + logger.info(f"[Lyric] Status updated - task_id: {task_id}, lyric_id: {lyric_id}, status: {status}") return True else: - logger.warning(f"[Lyric] NOT FOUND in DB - task_id: {task_id}") + logger.warning(f"[Lyric] NOT FOUND in DB - task_id: {task_id}, lyric_id: {lyric_id}") return False except SQLAlchemyError as e: - logger.error(f"[Lyric] DB Error while updating status - task_id: {task_id}, error: {e}") + logger.error(f"[Lyric] DB Error while updating status - task_id: {task_id}, lyric_id: {lyric_id}, error: {e}") return False except Exception as e: - logger.error(f"[Lyric] Unexpected error while updating status - task_id: {task_id}, error: {e}") + logger.error(f"[Lyric] Unexpected error while updating status - task_id: {task_id}, lyric_id: {lyric_id}, error: {e}") return False async def generate_lyric_background( task_id: str, prompt: Prompt, - lyric_input_data: dict, # 프롬프트 메타데이터에서 정의된 Input + lyric_input_data: dict, # 프롬프트 메타데이터에서 정의된 Input + lyric_id: int | None = None, ) -> None: """백그라운드에서 ChatGPT를 통해 가사를 생성하고 Lyric 테이블을 업데이트합니다. Args: task_id: 프로젝트 task_id prompt: ChatGPT에 전달할 프롬프트 - language: 가사 언어 + lyric_input_data: 프롬프트 입력 데이터 + lyric_id: 특정 Lyric 레코드 ID (재생성 시 정확한 레코드 식별용) """ import time @@ -116,7 +127,7 @@ async def generate_lyric_background( step3_start = time.perf_counter() logger.debug(f"[generate_lyric_background] Step 3: DB 상태 업데이트...") - await _update_lyric_status(task_id, "completed", result) + await _update_lyric_status(task_id, "completed", result, lyric_id) step3_elapsed = (time.perf_counter() - step3_start) * 1000 logger.debug(f"[generate_lyric_background] Step 3 완료 ({step3_elapsed:.1f}ms)") @@ -136,14 +147,14 @@ async def generate_lyric_background( f"[generate_lyric_background] ChatGPT ERROR - task_id: {task_id}, " f"status: {e.status}, code: {e.error_code}, message: {e.error_message} ({elapsed:.1f}ms)" ) - await _update_lyric_status(task_id, "failed", f"ChatGPT Error: {e.error_message}") + await _update_lyric_status(task_id, "failed", f"ChatGPT Error: {e.error_message}", lyric_id) except SQLAlchemyError as e: elapsed = (time.perf_counter() - task_start) * 1000 logger.error(f"[generate_lyric_background] DB ERROR - task_id: {task_id}, error: {e} ({elapsed:.1f}ms)", exc_info=True) - await _update_lyric_status(task_id, "failed", f"Database Error: {str(e)}") + await _update_lyric_status(task_id, "failed", f"Database Error: {str(e)}", lyric_id) except Exception as e: elapsed = (time.perf_counter() - task_start) * 1000 logger.error(f"[generate_lyric_background] EXCEPTION - task_id: {task_id}, error: {e} ({elapsed:.1f}ms)", exc_info=True) - await _update_lyric_status(task_id, "failed", f"Error: {str(e)}") + await _update_lyric_status(task_id, "failed", f"Error: {str(e)}", lyric_id) diff --git a/app/song/worker/song_task.py b/app/song/worker/song_task.py index f743669..617ea4d 100644 --- a/app/song/worker/song_task.py +++ b/app/song/worker/song_task.py @@ -33,6 +33,7 @@ async def _update_song_status( song_url: str | None = None, suno_task_id: str | None = None, duration: float | None = None, + song_id: int | None = None, ) -> bool: """Song 테이블의 상태를 업데이트합니다. @@ -42,13 +43,20 @@ async def _update_song_status( song_url: 노래 URL suno_task_id: Suno task ID (선택) duration: 노래 길이 (선택) + song_id: 특정 Song 레코드 ID (재생성 시 정확한 레코드 식별용) Returns: bool: 업데이트 성공 여부 """ try: async with BackgroundSessionLocal() as session: - if suno_task_id: + if song_id: + # song_id로 특정 레코드 조회 (가장 정확한 식별) + query_result = await session.execute( + select(Song).where(Song.id == song_id) + ) + elif suno_task_id: + # suno_task_id로 조회 (Suno API 고유 ID) query_result = await session.execute( select(Song) .where(Song.suno_task_id == suno_task_id) @@ -56,6 +64,7 @@ async def _update_song_status( .limit(1) ) else: + # 기존 방식: task_id로 최신 레코드 조회 (비권장) query_result = await session.execute( select(Song) .where(Song.task_id == task_id) @@ -72,17 +81,17 @@ async def _update_song_status( if duration is not None: song.duration = duration await session.commit() - logger.info(f"[Song] Status updated - task_id: {task_id}, status: {status}") + logger.info(f"[Song] Status updated - task_id: {task_id}, suno_task_id: {suno_task_id}, song_id: {song_id}, status: {status}") return True else: - logger.warning(f"[Song] NOT FOUND in DB - task_id: {task_id}") + logger.warning(f"[Song] NOT FOUND in DB - task_id: {task_id}, suno_task_id: {suno_task_id}, song_id: {song_id}") return False except SQLAlchemyError as e: - logger.error(f"[Song] DB Error while updating status - task_id: {task_id}, error: {e}") + logger.error(f"[Song] DB Error while updating status - task_id: {task_id}, suno_task_id: {suno_task_id}, song_id: {song_id}, error: {e}") return False except Exception as e: - logger.error(f"[Song] Unexpected error while updating status - task_id: {task_id}, error: {e}") + logger.error(f"[Song] Unexpected error while updating status - task_id: {task_id}, suno_task_id: {suno_task_id}, song_id: {song_id}, error: {e}") return False diff --git a/app/video/api/routers/v1/video.py b/app/video/api/routers/v1/video.py index 9f268b7..d2184f6 100644 --- a/app/video/api/routers/v1/video.py +++ b/app/video/api/routers/v1/video.py @@ -561,7 +561,7 @@ async def get_video_status( # 백그라운드 태스크로 MP4 다운로드 → Blob 업로드 → DB 업데이트 → 임시 파일 삭제 logger.info( - f"[get_video_status] Background task args - task_id: {video.task_id}, video_url: {video_url}, store_name: {store_name}" + f"[get_video_status] Background task args - task_id: {video.task_id}, video_url: {video_url}, store_name: {store_name}, creatomate_render_id: {creatomate_render_id}" ) background_tasks.add_task( download_and_upload_video_to_blob, @@ -569,6 +569,7 @@ async def get_video_status( video_url=video_url, store_name=store_name, user_uuid=current_user.user_uuid, + creatomate_render_id=creatomate_render_id, ) elif video and video.status == "completed": logger.debug( @@ -823,6 +824,7 @@ async def get_videos( project = projects_map.get(video.project_id) item = VideoListItem( + video_id=video.id, store_name=project.store_name if project else None, region=project.region if project else None, task_id=video.task_id, @@ -850,3 +852,5 @@ async def get_videos( status_code=500, detail=f"영상 목록 조회에 실패했습니다: {str(e)}", ) + + diff --git a/app/video/schemas/video_schema.py b/app/video/schemas/video_schema.py index 9f1259e..1b570be 100644 --- a/app/video/schemas/video_schema.py +++ b/app/video/schemas/video_schema.py @@ -5,7 +5,7 @@ Video API Schemas """ from datetime import datetime -from typing import Any, Dict, Literal, Optional +from typing import Any, Dict, Optional from pydantic import BaseModel, ConfigDict, Field @@ -141,6 +141,7 @@ class VideoListItem(BaseModel): Example: { + "video_id": 1, "store_name": "스테이 머뭄", "region": "군산", "task_id": "019123ab-cdef-7890-abcd-ef1234567890", @@ -149,8 +150,11 @@ class VideoListItem(BaseModel): } """ + video_id: int = Field(..., description="영상 고유 ID") store_name: Optional[str] = Field(None, description="업체명") region: Optional[str] = Field(None, description="지역명") task_id: str = Field(..., description="작업 고유 식별자") result_movie_url: Optional[str] = Field(None, description="영상 결과 URL") created_at: Optional[datetime] = Field(None, description="생성 일시") + + diff --git a/app/video/worker/video_task.py b/app/video/worker/video_task.py index 4680a32..a9f544e 100644 --- a/app/video/worker/video_task.py +++ b/app/video/worker/video_task.py @@ -107,6 +107,7 @@ async def download_and_upload_video_to_blob( video_url: str, store_name: str, user_uuid: str, + creatomate_render_id: str | None = None, ) -> None: """백그라운드에서 영상을 다운로드하고 Azure Blob Storage에 업로드한 뒤 Video 테이블을 업데이트합니다. @@ -115,6 +116,7 @@ async def download_and_upload_video_to_blob( video_url: 다운로드할 영상 URL store_name: 저장할 파일명에 사용할 업체명 user_uuid: 사용자 UUID (Azure Blob Storage 경로에 사용) + creatomate_render_id: Creatomate 렌더 ID (특정 Video 식별용) """ logger.info(f"[download_and_upload_video_to_blob] START - task_id: {task_id}, store_name: {store_name}") temp_file_path: Path | None = None @@ -154,21 +156,21 @@ async def download_and_upload_video_to_blob( blob_url = uploader.public_url logger.info(f"[download_and_upload_video_to_blob] Uploaded to Blob - task_id: {task_id}, url: {blob_url}") - # Video 테이블 업데이트 - await _update_video_status(task_id, "completed", blob_url) - logger.info(f"[download_and_upload_video_to_blob] SUCCESS - task_id: {task_id}") + # Video 테이블 업데이트 (creatomate_render_id로 특정 Video 식별) + await _update_video_status(task_id, "completed", blob_url, creatomate_render_id) + logger.info(f"[download_and_upload_video_to_blob] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}") except httpx.HTTPError as e: logger.error(f"[download_and_upload_video_to_blob] DOWNLOAD ERROR - task_id: {task_id}, error: {e}", exc_info=True) - await _update_video_status(task_id, "failed") + await _update_video_status(task_id, "failed", creatomate_render_id=creatomate_render_id) except SQLAlchemyError as e: logger.error(f"[download_and_upload_video_to_blob] DB ERROR - task_id: {task_id}, error: {e}", exc_info=True) - await _update_video_status(task_id, "failed") + await _update_video_status(task_id, "failed", creatomate_render_id=creatomate_render_id) except Exception as e: logger.error(f"[download_and_upload_video_to_blob] EXCEPTION - task_id: {task_id}, error: {e}", exc_info=True) - await _update_video_status(task_id, "failed") + await _update_video_status(task_id, "failed", creatomate_render_id=creatomate_render_id) finally: # 임시 파일 삭제