fix: 가사/노래/영상 재생성 시 올바른 레코드 업데이트되도록 수정

feature-youtube-upload
hbyang 2026-01-30 15:19:26 +09:00
parent 7a0d5a6272
commit c92d6e2135
7 changed files with 92 additions and 62 deletions

View File

@ -37,7 +37,7 @@ router = APIRouter(prefix="/archive", tags=["Archive"])
- **page_size**: 페이지당 데이터 (기본값: 10, 최대: 100) - **page_size**: 페이지당 데이터 (기본값: 10, 최대: 100)
## 반환 정보 ## 반환 정보
- **items**: 영상 목록 (store_name, region, task_id, result_movie_url, created_at) - **items**: 영상 목록 (video_id, store_name, region, task_id, result_movie_url, created_at)
- **total**: 전체 데이터 - **total**: 전체 데이터
- **page**: 현재 페이지 - **page**: 현재 페이지
- **page_size**: 페이지당 데이터 - **page_size**: 페이지당 데이터
@ -53,7 +53,7 @@ GET /archive/videos/?page=1&page_size=10
## 참고 ## 참고
- **본인이 소유한 프로젝트의 영상만 반환됩니다.** - **본인이 소유한 프로젝트의 영상만 반환됩니다.**
- status가 'completed' 영상만 반환됩니다. - status가 'completed' 영상만 반환됩니다.
- 동일한 task_id가 있는 경우 가장 최근에 생성된 1개만 반환됩니다. - 재생성된 영상 포함 모든 영상이 반환됩니다.
- created_at 기준 내림차순 정렬됩니다. - created_at 기준 내림차순 정렬됩니다.
""", """,
response_model=PaginatedResponse[VideoListItem], response_model=PaginatedResponse[VideoListItem],
@ -149,35 +149,21 @@ async def get_videos(
Project.is_deleted == False, Project.is_deleted == False,
] ]
# 쿼리 1: 전체 개수 조회 (task_id 기준 고유 개수) # 쿼리 1: 전체 개수 조회 (모든 영상)
count_query = ( count_query = (
select(func.count(func.distinct(Video.task_id))) select(func.count(Video.id))
.join(Project, Video.project_id == Project.id) .join(Project, Video.project_id == Project.id)
.where(*base_conditions) .where(*base_conditions)
) )
total_result = await session.execute(count_query) total_result = await session.execute(count_query)
total = total_result.scalar() or 0 total = total_result.scalar() or 0
logger.debug(f"[get_videos] DEBUG - task_id 기준 고유 개수 (total): {total}") logger.debug(f"[get_videos] DEBUG - 전체 영상 개수 (total): {total}")
# 서브쿼리: task_id별 최신 Video의 id 조회 # 쿼리 2: Video + Project 데이터를 JOIN으로 한 번에 조회 (모든 영상)
subquery = (
select(func.max(Video.id).label("max_id"))
.join(Project, Video.project_id == Project.id)
.where(*base_conditions)
.group_by(Video.task_id)
.subquery()
)
# DEBUG: 서브쿼리 결과 확인
subquery_debug_result = await session.execute(select(subquery.c.max_id))
subquery_ids = [row[0] for row in subquery_debug_result.all()]
logger.debug(f"[get_videos] DEBUG - 서브쿼리 결과 (max_id 목록): {subquery_ids}")
# 쿼리 2: Video + Project 데이터를 JOIN으로 한 번에 조회
query = ( query = (
select(Video, Project) select(Video, Project)
.join(Project, Video.project_id == Project.id) .join(Project, Video.project_id == Project.id)
.where(Video.id.in_(select(subquery.c.max_id))) .where(*base_conditions)
.order_by(Video.created_at.desc()) .order_by(Video.created_at.desc())
.offset(offset) .offset(offset)
.limit(pagination.page_size) .limit(pagination.page_size)
@ -190,6 +176,7 @@ async def get_videos(
items = [] items = []
for video, project in rows: for video, project in rows:
item = VideoListItem( item = VideoListItem(
video_id=video.id,
store_name=project.store_name, store_name=project.store_name,
region=project.region, region=project.region,
task_id=video.task_id, task_id=video.task_id,

View File

@ -292,10 +292,21 @@ async def generate_lyric(
step1_elapsed = (time.perf_counter() - step1_start) * 1000 step1_elapsed = (time.perf_counter() - step1_start) * 1000
#logger.debug(f"[generate_lyric] Step 1 완료 - 프롬프트 {len(prompt)}자 ({step1_elapsed:.1f}ms)") #logger.debug(f"[generate_lyric] Step 1 완료 - 프롬프트 {len(prompt)}자 ({step1_elapsed:.1f}ms)")
# ========== Step 2: Project 테이블에 데이터 저장 ========== # ========== Step 2: Project 조회 또는 생성 ==========
step2_start = time.perf_counter() step2_start = time.perf_counter()
logger.debug(f"[generate_lyric] Step 2: Project 저장...") logger.debug(f"[generate_lyric] Step 2: Project 조회 또는 생성...")
# 기존 Project가 있는지 확인 (재생성 시 재사용)
existing_project_result = await session.execute(
select(Project).where(Project.task_id == task_id).limit(1)
)
project = existing_project_result.scalar_one_or_none()
if project:
# 기존 Project 재사용 (재생성 케이스)
logger.info(f"[generate_lyric] 기존 Project 재사용 - project_id: {project.id}, task_id: {task_id}")
else:
# 새 Project 생성 (최초 생성 케이스)
project = Project( project = Project(
store_name=request_body.customer_name, store_name=request_body.customer_name,
region=request_body.region, region=request_body.region,
@ -307,6 +318,7 @@ async def generate_lyric(
session.add(project) session.add(project)
await session.commit() await session.commit()
await session.refresh(project) await session.refresh(project)
logger.info(f"[generate_lyric] 새 Project 생성 - project_id: {project.id}, task_id: {task_id}")
step2_elapsed = (time.perf_counter() - step2_start) * 1000 step2_elapsed = (time.perf_counter() - step2_start) * 1000
logger.debug(f"[generate_lyric] Step 2 완료 - project_id: {project.id} ({step2_elapsed:.1f}ms)") logger.debug(f"[generate_lyric] Step 2 완료 - project_id: {project.id} ({step2_elapsed:.1f}ms)")
@ -340,6 +352,7 @@ async def generate_lyric(
task_id=task_id, task_id=task_id,
prompt=lyric_prompt, prompt=lyric_prompt,
lyric_input_data=lyric_input_data, lyric_input_data=lyric_input_data,
lyric_id=lyric.id,
) )
step4_elapsed = (time.perf_counter() - step4_start) * 1000 step4_elapsed = (time.perf_counter() - step4_start) * 1000

View File

@ -23,6 +23,7 @@ async def _update_lyric_status(
task_id: str, task_id: str,
status: str, status: str,
result: str | None = None, result: str | None = None,
lyric_id: int | None = None,
) -> bool: ) -> bool:
"""Lyric 테이블의 상태를 업데이트합니다. """Lyric 테이블의 상태를 업데이트합니다.
@ -30,12 +31,20 @@ async def _update_lyric_status(
task_id: 프로젝트 task_id task_id: 프로젝트 task_id
status: 변경할 상태 ("processing", "completed", "failed") status: 변경할 상태 ("processing", "completed", "failed")
result: 가사 결과 또는 에러 메시지 result: 가사 결과 또는 에러 메시지
lyric_id: 특정 Lyric 레코드 ID (재생성 정확한 레코드 식별용)
Returns: Returns:
bool: 업데이트 성공 여부 bool: 업데이트 성공 여부
""" """
try: try:
async with BackgroundSessionLocal() as session: async with BackgroundSessionLocal() as session:
if lyric_id:
# lyric_id로 특정 레코드 조회 (재생성 시에도 정확한 레코드 업데이트)
query_result = await session.execute(
select(Lyric).where(Lyric.id == lyric_id)
)
else:
# 기존 방식: task_id로 최신 레코드 조회
query_result = await session.execute( query_result = await session.execute(
select(Lyric) select(Lyric)
.where(Lyric.task_id == task_id) .where(Lyric.task_id == task_id)
@ -49,17 +58,17 @@ async def _update_lyric_status(
if result is not None: if result is not None:
lyric.lyric_result = result lyric.lyric_result = result
await session.commit() await session.commit()
logger.info(f"[Lyric] Status updated - task_id: {task_id}, status: {status}") logger.info(f"[Lyric] Status updated - task_id: {task_id}, lyric_id: {lyric_id}, status: {status}")
return True return True
else: else:
logger.warning(f"[Lyric] NOT FOUND in DB - task_id: {task_id}") logger.warning(f"[Lyric] NOT FOUND in DB - task_id: {task_id}, lyric_id: {lyric_id}")
return False return False
except SQLAlchemyError as e: except SQLAlchemyError as e:
logger.error(f"[Lyric] DB Error while updating status - task_id: {task_id}, error: {e}") logger.error(f"[Lyric] DB Error while updating status - task_id: {task_id}, lyric_id: {lyric_id}, error: {e}")
return False return False
except Exception as e: except Exception as e:
logger.error(f"[Lyric] Unexpected error while updating status - task_id: {task_id}, error: {e}") logger.error(f"[Lyric] Unexpected error while updating status - task_id: {task_id}, lyric_id: {lyric_id}, error: {e}")
return False return False
@ -67,13 +76,15 @@ async def generate_lyric_background(
task_id: str, task_id: str,
prompt: Prompt, prompt: Prompt,
lyric_input_data: dict, # 프롬프트 메타데이터에서 정의된 Input lyric_input_data: dict, # 프롬프트 메타데이터에서 정의된 Input
lyric_id: int | None = None,
) -> None: ) -> None:
"""백그라운드에서 ChatGPT를 통해 가사를 생성하고 Lyric 테이블을 업데이트합니다. """백그라운드에서 ChatGPT를 통해 가사를 생성하고 Lyric 테이블을 업데이트합니다.
Args: Args:
task_id: 프로젝트 task_id task_id: 프로젝트 task_id
prompt: ChatGPT에 전달할 프롬프트 prompt: ChatGPT에 전달할 프롬프트
language: 가사 언어 lyric_input_data: 프롬프트 입력 데이터
lyric_id: 특정 Lyric 레코드 ID (재생성 정확한 레코드 식별용)
""" """
import time import time
@ -116,7 +127,7 @@ async def generate_lyric_background(
step3_start = time.perf_counter() step3_start = time.perf_counter()
logger.debug(f"[generate_lyric_background] Step 3: DB 상태 업데이트...") logger.debug(f"[generate_lyric_background] Step 3: DB 상태 업데이트...")
await _update_lyric_status(task_id, "completed", result) await _update_lyric_status(task_id, "completed", result, lyric_id)
step3_elapsed = (time.perf_counter() - step3_start) * 1000 step3_elapsed = (time.perf_counter() - step3_start) * 1000
logger.debug(f"[generate_lyric_background] Step 3 완료 ({step3_elapsed:.1f}ms)") logger.debug(f"[generate_lyric_background] Step 3 완료 ({step3_elapsed:.1f}ms)")
@ -136,14 +147,14 @@ async def generate_lyric_background(
f"[generate_lyric_background] ChatGPT ERROR - task_id: {task_id}, " f"[generate_lyric_background] ChatGPT ERROR - task_id: {task_id}, "
f"status: {e.status}, code: {e.error_code}, message: {e.error_message} ({elapsed:.1f}ms)" f"status: {e.status}, code: {e.error_code}, message: {e.error_message} ({elapsed:.1f}ms)"
) )
await _update_lyric_status(task_id, "failed", f"ChatGPT Error: {e.error_message}") await _update_lyric_status(task_id, "failed", f"ChatGPT Error: {e.error_message}", lyric_id)
except SQLAlchemyError as e: except SQLAlchemyError as e:
elapsed = (time.perf_counter() - task_start) * 1000 elapsed = (time.perf_counter() - task_start) * 1000
logger.error(f"[generate_lyric_background] DB ERROR - task_id: {task_id}, error: {e} ({elapsed:.1f}ms)", exc_info=True) logger.error(f"[generate_lyric_background] DB ERROR - task_id: {task_id}, error: {e} ({elapsed:.1f}ms)", exc_info=True)
await _update_lyric_status(task_id, "failed", f"Database Error: {str(e)}") await _update_lyric_status(task_id, "failed", f"Database Error: {str(e)}", lyric_id)
except Exception as e: except Exception as e:
elapsed = (time.perf_counter() - task_start) * 1000 elapsed = (time.perf_counter() - task_start) * 1000
logger.error(f"[generate_lyric_background] EXCEPTION - task_id: {task_id}, error: {e} ({elapsed:.1f}ms)", exc_info=True) logger.error(f"[generate_lyric_background] EXCEPTION - task_id: {task_id}, error: {e} ({elapsed:.1f}ms)", exc_info=True)
await _update_lyric_status(task_id, "failed", f"Error: {str(e)}") await _update_lyric_status(task_id, "failed", f"Error: {str(e)}", lyric_id)

View File

@ -33,6 +33,7 @@ async def _update_song_status(
song_url: str | None = None, song_url: str | None = None,
suno_task_id: str | None = None, suno_task_id: str | None = None,
duration: float | None = None, duration: float | None = None,
song_id: int | None = None,
) -> bool: ) -> bool:
"""Song 테이블의 상태를 업데이트합니다. """Song 테이블의 상태를 업데이트합니다.
@ -42,13 +43,20 @@ async def _update_song_status(
song_url: 노래 URL song_url: 노래 URL
suno_task_id: Suno task ID (선택) suno_task_id: Suno task ID (선택)
duration: 노래 길이 (선택) duration: 노래 길이 (선택)
song_id: 특정 Song 레코드 ID (재생성 정확한 레코드 식별용)
Returns: Returns:
bool: 업데이트 성공 여부 bool: 업데이트 성공 여부
""" """
try: try:
async with BackgroundSessionLocal() as session: async with BackgroundSessionLocal() as session:
if suno_task_id: if song_id:
# song_id로 특정 레코드 조회 (가장 정확한 식별)
query_result = await session.execute(
select(Song).where(Song.id == song_id)
)
elif suno_task_id:
# suno_task_id로 조회 (Suno API 고유 ID)
query_result = await session.execute( query_result = await session.execute(
select(Song) select(Song)
.where(Song.suno_task_id == suno_task_id) .where(Song.suno_task_id == suno_task_id)
@ -56,6 +64,7 @@ async def _update_song_status(
.limit(1) .limit(1)
) )
else: else:
# 기존 방식: task_id로 최신 레코드 조회 (비권장)
query_result = await session.execute( query_result = await session.execute(
select(Song) select(Song)
.where(Song.task_id == task_id) .where(Song.task_id == task_id)
@ -72,17 +81,17 @@ async def _update_song_status(
if duration is not None: if duration is not None:
song.duration = duration song.duration = duration
await session.commit() await session.commit()
logger.info(f"[Song] Status updated - task_id: {task_id}, status: {status}") logger.info(f"[Song] Status updated - task_id: {task_id}, suno_task_id: {suno_task_id}, song_id: {song_id}, status: {status}")
return True return True
else: else:
logger.warning(f"[Song] NOT FOUND in DB - task_id: {task_id}") logger.warning(f"[Song] NOT FOUND in DB - task_id: {task_id}, suno_task_id: {suno_task_id}, song_id: {song_id}")
return False return False
except SQLAlchemyError as e: except SQLAlchemyError as e:
logger.error(f"[Song] DB Error while updating status - task_id: {task_id}, error: {e}") logger.error(f"[Song] DB Error while updating status - task_id: {task_id}, suno_task_id: {suno_task_id}, song_id: {song_id}, error: {e}")
return False return False
except Exception as e: except Exception as e:
logger.error(f"[Song] Unexpected error while updating status - task_id: {task_id}, error: {e}") logger.error(f"[Song] Unexpected error while updating status - task_id: {task_id}, suno_task_id: {suno_task_id}, song_id: {song_id}, error: {e}")
return False return False

View File

@ -561,7 +561,7 @@ async def get_video_status(
# 백그라운드 태스크로 MP4 다운로드 → Blob 업로드 → DB 업데이트 → 임시 파일 삭제 # 백그라운드 태스크로 MP4 다운로드 → Blob 업로드 → DB 업데이트 → 임시 파일 삭제
logger.info( logger.info(
f"[get_video_status] Background task args - task_id: {video.task_id}, video_url: {video_url}, store_name: {store_name}" f"[get_video_status] Background task args - task_id: {video.task_id}, video_url: {video_url}, store_name: {store_name}, creatomate_render_id: {creatomate_render_id}"
) )
background_tasks.add_task( background_tasks.add_task(
download_and_upload_video_to_blob, download_and_upload_video_to_blob,
@ -569,6 +569,7 @@ async def get_video_status(
video_url=video_url, video_url=video_url,
store_name=store_name, store_name=store_name,
user_uuid=current_user.user_uuid, user_uuid=current_user.user_uuid,
creatomate_render_id=creatomate_render_id,
) )
elif video and video.status == "completed": elif video and video.status == "completed":
logger.debug( logger.debug(
@ -823,6 +824,7 @@ async def get_videos(
project = projects_map.get(video.project_id) project = projects_map.get(video.project_id)
item = VideoListItem( item = VideoListItem(
video_id=video.id,
store_name=project.store_name if project else None, store_name=project.store_name if project else None,
region=project.region if project else None, region=project.region if project else None,
task_id=video.task_id, task_id=video.task_id,
@ -850,3 +852,5 @@ async def get_videos(
status_code=500, status_code=500,
detail=f"영상 목록 조회에 실패했습니다: {str(e)}", detail=f"영상 목록 조회에 실패했습니다: {str(e)}",
) )

View File

@ -5,7 +5,7 @@ Video API Schemas
""" """
from datetime import datetime from datetime import datetime
from typing import Any, Dict, Literal, Optional from typing import Any, Dict, Optional
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
@ -141,6 +141,7 @@ class VideoListItem(BaseModel):
Example: Example:
{ {
"video_id": 1,
"store_name": "스테이 머뭄", "store_name": "스테이 머뭄",
"region": "군산", "region": "군산",
"task_id": "019123ab-cdef-7890-abcd-ef1234567890", "task_id": "019123ab-cdef-7890-abcd-ef1234567890",
@ -149,8 +150,11 @@ class VideoListItem(BaseModel):
} }
""" """
video_id: int = Field(..., description="영상 고유 ID")
store_name: Optional[str] = Field(None, description="업체명") store_name: Optional[str] = Field(None, description="업체명")
region: Optional[str] = Field(None, description="지역명") region: Optional[str] = Field(None, description="지역명")
task_id: str = Field(..., description="작업 고유 식별자") task_id: str = Field(..., description="작업 고유 식별자")
result_movie_url: Optional[str] = Field(None, description="영상 결과 URL") result_movie_url: Optional[str] = Field(None, description="영상 결과 URL")
created_at: Optional[datetime] = Field(None, description="생성 일시") created_at: Optional[datetime] = Field(None, description="생성 일시")

View File

@ -107,6 +107,7 @@ async def download_and_upload_video_to_blob(
video_url: str, video_url: str,
store_name: str, store_name: str,
user_uuid: str, user_uuid: str,
creatomate_render_id: str | None = None,
) -> None: ) -> None:
"""백그라운드에서 영상을 다운로드하고 Azure Blob Storage에 업로드한 뒤 Video 테이블을 업데이트합니다. """백그라운드에서 영상을 다운로드하고 Azure Blob Storage에 업로드한 뒤 Video 테이블을 업데이트합니다.
@ -115,6 +116,7 @@ async def download_and_upload_video_to_blob(
video_url: 다운로드할 영상 URL video_url: 다운로드할 영상 URL
store_name: 저장할 파일명에 사용할 업체명 store_name: 저장할 파일명에 사용할 업체명
user_uuid: 사용자 UUID (Azure Blob Storage 경로에 사용) user_uuid: 사용자 UUID (Azure Blob Storage 경로에 사용)
creatomate_render_id: Creatomate 렌더 ID (특정 Video 식별용)
""" """
logger.info(f"[download_and_upload_video_to_blob] START - task_id: {task_id}, store_name: {store_name}") logger.info(f"[download_and_upload_video_to_blob] START - task_id: {task_id}, store_name: {store_name}")
temp_file_path: Path | None = None temp_file_path: Path | None = None
@ -154,21 +156,21 @@ async def download_and_upload_video_to_blob(
blob_url = uploader.public_url blob_url = uploader.public_url
logger.info(f"[download_and_upload_video_to_blob] Uploaded to Blob - task_id: {task_id}, url: {blob_url}") logger.info(f"[download_and_upload_video_to_blob] Uploaded to Blob - task_id: {task_id}, url: {blob_url}")
# Video 테이블 업데이트 # Video 테이블 업데이트 (creatomate_render_id로 특정 Video 식별)
await _update_video_status(task_id, "completed", blob_url) await _update_video_status(task_id, "completed", blob_url, creatomate_render_id)
logger.info(f"[download_and_upload_video_to_blob] SUCCESS - task_id: {task_id}") logger.info(f"[download_and_upload_video_to_blob] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}")
except httpx.HTTPError as e: except httpx.HTTPError as e:
logger.error(f"[download_and_upload_video_to_blob] DOWNLOAD ERROR - task_id: {task_id}, error: {e}", exc_info=True) logger.error(f"[download_and_upload_video_to_blob] DOWNLOAD ERROR - task_id: {task_id}, error: {e}", exc_info=True)
await _update_video_status(task_id, "failed") await _update_video_status(task_id, "failed", creatomate_render_id=creatomate_render_id)
except SQLAlchemyError as e: except SQLAlchemyError as e:
logger.error(f"[download_and_upload_video_to_blob] DB ERROR - task_id: {task_id}, error: {e}", exc_info=True) logger.error(f"[download_and_upload_video_to_blob] DB ERROR - task_id: {task_id}, error: {e}", exc_info=True)
await _update_video_status(task_id, "failed") await _update_video_status(task_id, "failed", creatomate_render_id=creatomate_render_id)
except Exception as e: except Exception as e:
logger.error(f"[download_and_upload_video_to_blob] EXCEPTION - task_id: {task_id}, error: {e}", exc_info=True) logger.error(f"[download_and_upload_video_to_blob] EXCEPTION - task_id: {task_id}, error: {e}", exc_info=True)
await _update_video_status(task_id, "failed") await _update_video_status(task_id, "failed", creatomate_render_id=creatomate_render_id)
finally: finally:
# 임시 파일 삭제 # 임시 파일 삭제