diff --git a/app/core/common.py b/app/core/common.py index f908214..e317c06 100644 --- a/app/core/common.py +++ b/app/core/common.py @@ -33,10 +33,18 @@ async def lifespan(app: FastAPI): # Shutdown - 애플리케이션 종료 시 print("Shutting down...") - from app.database.session import engine - await engine.dispose() - print("Database engine disposed") + # 공유 HTTP 클라이언트 종료 + from app.utils.creatomate import close_shared_client + from app.utils.upload_blob_as_request import close_shared_blob_client + + await close_shared_client() + await close_shared_blob_client() + + # 데이터베이스 엔진 종료 + from app.database.session import dispose_engine + + await dispose_engine() # FastAPI 앱 생성 (lifespan 적용) diff --git a/app/database/session.py b/app/database/session.py index 5f98036..0951f28 100644 --- a/app/database/session.py +++ b/app/database/session.py @@ -1,3 +1,4 @@ +import time from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine @@ -19,8 +20,8 @@ engine = create_async_engine( pool_size=20, # 기본 풀 크기: 20 max_overflow=20, # 추가 연결: 20 (총 최대 40) pool_timeout=30, # 풀에서 연결 대기 시간 (초) - pool_recycle=3600, # 1시간마다 연결 재생성 - pool_pre_ping=True, # 연결 유효성 검사 + pool_recycle=280, # MySQL wait_timeout(기본 28800s, 클라우드는 보통 300s) 보다 짧게 설정 + pool_pre_ping=True, # 연결 유효성 검사 (죽은 연결 자동 재연결) pool_reset_on_return="rollback", # 반환 시 롤백으로 초기화 connect_args={ "connect_timeout": 10, # DB 연결 타임아웃 @@ -46,8 +47,8 @@ background_engine = create_async_engine( pool_size=10, # 백그라운드용 풀 크기: 10 max_overflow=10, # 추가 연결: 10 (총 최대 20) pool_timeout=60, # 백그라운드는 대기 시간 여유있게 - pool_recycle=3600, - pool_pre_ping=True, + pool_recycle=280, # MySQL wait_timeout 보다 짧게 설정 + pool_pre_ping=True, # 연결 유효성 검사 (죽은 연결 자동 재연결) pool_reset_on_return="rollback", connect_args={ "connect_timeout": 10, @@ -82,24 +83,79 @@ async def create_db_tables(): # FastAPI 의존성용 세션 제너레이터 async def get_session() -> AsyncGenerator[AsyncSession, None]: - # 커넥션 풀 상태 로깅 (디버깅용) + start_time = time.perf_counter() pool = engine.pool - print(f"[get_session] Pool status - size: {pool.size()}, checked_in: {pool.checkedin()}, checked_out: {pool.checkedout()}, overflow: {pool.overflow()}") + + # 커넥션 풀 상태 로깅 (디버깅용) + print( + f"[get_session] ACQUIRE - pool_size: {pool.size()}, " + f"in: {pool.checkedin()}, out: {pool.checkedout()}, " + f"overflow: {pool.overflow()}" + ) async with AsyncSessionLocal() as session: + acquire_time = time.perf_counter() + print( + f"[get_session] Session acquired in " + f"{(acquire_time - start_time)*1000:.1f}ms" + ) try: yield session except Exception as e: await session.rollback() - print(f"[get_session] Session rollback due to: {e}") + print( + f"[get_session] ROLLBACK - error: {type(e).__name__}: {e}, " + f"duration: {(time.perf_counter() - start_time)*1000:.1f}ms" + ) raise e finally: - # 명시적으로 세션 종료 확인 - print(f"[get_session] Session closing - Pool checked_out: {pool.checkedout()}") + total_time = time.perf_counter() - start_time + print( + f"[get_session] RELEASE - duration: {total_time*1000:.1f}ms, " + f"pool_out: {pool.checkedout()}" + ) + + +# 백그라운드 태스크용 세션 제너레이터 +async def get_background_session() -> AsyncGenerator[AsyncSession, None]: + start_time = time.perf_counter() + pool = background_engine.pool + + print( + f"[get_background_session] ACQUIRE - pool_size: {pool.size()}, " + f"in: {pool.checkedin()}, out: {pool.checkedout()}, " + f"overflow: {pool.overflow()}" + ) + + async with BackgroundSessionLocal() as session: + acquire_time = time.perf_counter() + print( + f"[get_background_session] Session acquired in " + f"{(acquire_time - start_time)*1000:.1f}ms" + ) + try: + yield session + except Exception as e: + await session.rollback() + print( + f"[get_background_session] ROLLBACK - " + f"error: {type(e).__name__}: {e}, " + f"duration: {(time.perf_counter() - start_time)*1000:.1f}ms" + ) + raise e + finally: + total_time = time.perf_counter() - start_time + print( + f"[get_background_session] RELEASE - " + f"duration: {total_time*1000:.1f}ms, " + f"pool_out: {pool.checkedout()}" + ) # 앱 종료 시 엔진 리소스 정리 함수 async def dispose_engine() -> None: + print("[dispose_engine] Disposing database engines...") await engine.dispose() + print("[dispose_engine] Main engine disposed") await background_engine.dispose() - print("Database engines disposed (main + background)") + print("[dispose_engine] Background engine disposed - ALL DONE") diff --git a/app/home/api/routers/v1/home.py b/app/home/api/routers/v1/home.py index 427fc5b..42cc159 100644 --- a/app/home/api/routers/v1/home.py +++ b/app/home/api/routers/v1/home.py @@ -505,6 +505,9 @@ async def upload_images_blob( - Stage 2: Azure Blob 업로드 (세션 없음) - Stage 3: DB 저장 (새 세션으로 빠르게 처리) """ + import time + request_start = time.perf_counter() + # task_id 생성 task_id = await generate_task_id() print(f"[upload_images_blob] START - task_id: {task_id}") @@ -560,8 +563,10 @@ async def upload_images_blob( detail=detail, ) + stage1_time = time.perf_counter() print(f"[upload_images_blob] Stage 1 done - urls: {len(url_images)}, " - f"files: {len(valid_files_data)}") + f"files: {len(valid_files_data)}, " + f"elapsed: {(stage1_time - request_start)*1000:.1f}ms") # ========== Stage 2: Azure Blob 업로드 (세션 없음) ========== # 업로드 결과를 저장할 리스트 (나중에 DB에 저장) @@ -570,8 +575,9 @@ async def upload_images_blob( if valid_files_data: uploader = AzureBlobUploader(task_id=task_id) + total_files = len(valid_files_data) - for original_name, ext, file_content in valid_files_data: + for idx, (original_name, ext, file_content) in enumerate(valid_files_data): name_without_ext = ( original_name.rsplit(".", 1)[0] if "." in original_name @@ -579,6 +585,9 @@ async def upload_images_blob( ) filename = f"{name_without_ext}_{img_order:03d}{ext}" + print(f"[upload_images_blob] Uploading file {idx+1}/{total_files}: " + f"{filename} ({len(file_content)} bytes)") + # Azure Blob Storage에 직접 업로드 upload_success = await uploader.upload_image_bytes(file_content, filename) @@ -586,70 +595,88 @@ async def upload_images_blob( blob_url = uploader.public_url blob_upload_results.append((original_name, blob_url)) img_order += 1 + print(f"[upload_images_blob] File {idx+1}/{total_files} SUCCESS") else: skipped_files.append(filename) + print(f"[upload_images_blob] File {idx+1}/{total_files} FAILED") + stage2_time = time.perf_counter() print(f"[upload_images_blob] Stage 2 done - blob uploads: " - f"{len(blob_upload_results)}, skipped: {len(skipped_files)}") + f"{len(blob_upload_results)}, skipped: {len(skipped_files)}, " + f"elapsed: {(stage2_time - stage1_time)*1000:.1f}ms") # ========== Stage 3: DB 저장 (새 세션으로 빠르게 처리) ========== + print("[upload_images_blob] Stage 3 starting - DB save...") result_images: list[ImageUploadResultItem] = [] img_order = 0 - async with AsyncSessionLocal() as session: - # URL 이미지 저장 - for url_item in url_images: - img_name = url_item.name or _extract_image_name(url_item.url, img_order) + try: + async with AsyncSessionLocal() as session: + # URL 이미지 저장 + for url_item in url_images: + img_name = ( + url_item.name or _extract_image_name(url_item.url, img_order) + ) - image = Image( - task_id=task_id, - img_name=img_name, - img_url=url_item.url, - img_order=img_order, - ) - session.add(image) - await session.flush() - - result_images.append( - ImageUploadResultItem( - id=image.id, + image = Image( + task_id=task_id, img_name=img_name, img_url=url_item.url, img_order=img_order, - source="url", ) - ) - img_order += 1 + session.add(image) + await session.flush() - # Blob 업로드 결과 저장 - for img_name, blob_url in blob_upload_results: - image = Image( - task_id=task_id, - img_name=img_name, - img_url=blob_url, - img_order=img_order, - ) - session.add(image) - await session.flush() + result_images.append( + ImageUploadResultItem( + id=image.id, + img_name=img_name, + img_url=url_item.url, + img_order=img_order, + source="url", + ) + ) + img_order += 1 - result_images.append( - ImageUploadResultItem( - id=image.id, + # Blob 업로드 결과 저장 + for img_name, blob_url in blob_upload_results: + image = Image( + task_id=task_id, img_name=img_name, img_url=blob_url, img_order=img_order, - source="blob", ) - ) - img_order += 1 + session.add(image) + await session.flush() - await session.commit() + result_images.append( + ImageUploadResultItem( + id=image.id, + img_name=img_name, + img_url=blob_url, + img_order=img_order, + source="blob", + ) + ) + img_order += 1 + + await session.commit() + stage3_time = time.perf_counter() + print(f"[upload_images_blob] Stage 3 done - " + f"saved: {len(result_images)}, " + f"elapsed: {(stage3_time - stage2_time)*1000:.1f}ms") + + except Exception as e: + print(f"[upload_images_blob] Stage 3 EXCEPTION - " + f"task_id: {task_id}, error: {type(e).__name__}: {e}") + raise saved_count = len(result_images) image_urls = [img.img_url for img in result_images] + total_time = time.perf_counter() - request_start print(f"[upload_images_blob] SUCCESS - task_id: {task_id}, " - f"total: {saved_count}, returning response...") + f"total: {saved_count}, total_time: {total_time*1000:.1f}ms") return ImageUploadResponse( task_id=task_id, diff --git a/app/song/api/routers/v1/song.py b/app/song/api/routers/v1/song.py index b244bd9..91c8ef6 100644 --- a/app/song/api/routers/v1/song.py +++ b/app/song/api/routers/v1/song.py @@ -84,81 +84,189 @@ POST /song/generate/019123ab-cdef-7890-abcd-ef1234567890 async def generate_song( task_id: str, request_body: GenerateSongRequest, - session: AsyncSession = Depends(get_session), ) -> GenerateSongResponse: """가사와 장르를 기반으로 Suno API를 통해 노래를 생성합니다. 1. task_id로 Project와 Lyric 조회 2. Song 테이블에 초기 데이터 저장 (status: processing) - 3. Suno API 호출 + 3. Suno API 호출 (세션 닫힌 상태) 4. suno_task_id 업데이트 후 응답 반환 + + Note: 이 함수는 Depends(get_session)을 사용하지 않고 명시적으로 세션을 관리합니다. + 외부 API 호출 중 DB 커넥션이 유지되지 않도록 하여 커넥션 타임아웃 문제를 방지합니다. """ - print(f"[generate_song] START - task_id: {task_id}, genre: {request_body.genre}, language: {request_body.language}") + import time + from app.database.session import AsyncSessionLocal + + request_start = time.perf_counter() + print( + f"[generate_song] START - task_id: {task_id}, " + f"genre: {request_body.genre}, language: {request_body.language}" + ) + + # 외부 API 호출 전에 필요한 데이터를 저장할 변수들 + project_id: int | None = None + lyric_id: int | None = None + song_id: int | None = None + + # ========================================================================== + # 1단계: DB 조회 및 초기 데이터 저장 (세션을 명시적으로 열고 닫음) + # ========================================================================== try: - # 1. task_id로 Project 조회 (중복 시 최신 것 선택) - project_result = await session.execute( - select(Project) - .where(Project.task_id == task_id) - .order_by(Project.created_at.desc()) - .limit(1) - ) - project = project_result.scalar_one_or_none() - - if not project: - print(f"[generate_song] Project NOT FOUND - task_id: {task_id}") - raise HTTPException( - status_code=404, - detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.", + async with AsyncSessionLocal() as session: + # Project 조회 (중복 시 최신 것 선택) + project_result = await session.execute( + select(Project) + .where(Project.task_id == task_id) + .order_by(Project.created_at.desc()) + .limit(1) ) - print(f"[generate_song] Project found - project_id: {project.id}, task_id: {task_id}") + project = project_result.scalar_one_or_none() - # 2. task_id로 Lyric 조회 (중복 시 최신 것 선택) - lyric_result = await session.execute( - select(Lyric) - .where(Lyric.task_id == task_id) - .order_by(Lyric.created_at.desc()) - .limit(1) - ) - lyric = lyric_result.scalar_one_or_none() + if not project: + print(f"[generate_song] Project NOT FOUND - task_id: {task_id}") + raise HTTPException( + status_code=404, + detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.", + ) + project_id = project.id - if not lyric: - print(f"[generate_song] Lyric NOT FOUND - task_id: {task_id}") - raise HTTPException( - status_code=404, - detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.", + # Lyric 조회 (중복 시 최신 것 선택) + lyric_result = await session.execute( + select(Lyric) + .where(Lyric.task_id == task_id) + .order_by(Lyric.created_at.desc()) + .limit(1) ) - print(f"[generate_song] Lyric found - lyric_id: {lyric.id}, task_id: {task_id}") + lyric = lyric_result.scalar_one_or_none() - # 3. Song 테이블에 초기 데이터 저장 - song_prompt = ( - f"[Lyrics]\n{request_body.lyrics}\n\n[Genre]\n{request_body.genre}" + if not lyric: + print(f"[generate_song] Lyric NOT FOUND - task_id: {task_id}") + raise HTTPException( + status_code=404, + detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.", + ) + lyric_id = lyric.id + + query_time = time.perf_counter() + print( + f"[generate_song] Queries completed - task_id: {task_id}, " + f"project_id: {project_id}, lyric_id: {lyric_id}, " + f"elapsed: {(query_time - request_start)*1000:.1f}ms" + ) + + # Song 테이블에 초기 데이터 저장 + song_prompt = ( + f"[Lyrics]\n{request_body.lyrics}\n\n[Genre]\n{request_body.genre}" + ) + + song = Song( + project_id=project_id, + lyric_id=lyric_id, + task_id=task_id, + suno_task_id=None, + status="processing", + song_prompt=song_prompt, + language=request_body.language, + ) + session.add(song) + await session.commit() + song_id = song.id + + stage1_time = time.perf_counter() + print( + f"[generate_song] Stage 1 DONE - Song saved - " + f"task_id: {task_id}, song_id: {song_id}, " + f"elapsed: {(stage1_time - request_start)*1000:.1f}ms" + ) + # 세션이 여기서 자동으로 닫힘 + + except HTTPException: + raise + except Exception as e: + print( + f"[generate_song] Stage 1 EXCEPTION - " + f"task_id: {task_id}, error: {type(e).__name__}: {e}" ) - - song = Song( - project_id=project.id, - lyric_id=lyric.id, + return GenerateSongResponse( + success=False, task_id=task_id, suno_task_id=None, - status="processing", - song_prompt=song_prompt, - language=request_body.language, + message="노래 생성 요청에 실패했습니다.", + error_message=str(e), ) - session.add(song) - await session.flush() # ID 생성을 위해 flush - print(f"[generate_song] Song saved (processing) - task_id: {task_id}") - # 4. Suno API 호출 - print(f"[generate_song] Suno API generation started - task_id: {task_id}") + # ========================================================================== + # 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음) + # ========================================================================== + stage2_start = time.perf_counter() + suno_task_id: str | None = None + + try: + print(f"[generate_song] Stage 2 START - Suno API - task_id: {task_id}") suno_service = SunoService() suno_task_id = await suno_service.generate( prompt=request_body.lyrics, genre=request_body.genre, ) - # 5. suno_task_id 업데이트 - song.suno_task_id = suno_task_id - await session.commit() - print(f"[generate_song] SUCCESS - task_id: {task_id}, suno_task_id: {suno_task_id}") + stage2_time = time.perf_counter() + print( + f"[generate_song] Stage 2 DONE - task_id: {task_id}, " + f"suno_task_id: {suno_task_id}, " + f"elapsed: {(stage2_time - stage2_start)*1000:.1f}ms" + ) + + except Exception as e: + print( + f"[generate_song] Stage 2 EXCEPTION - Suno API failed - " + f"task_id: {task_id}, error: {type(e).__name__}: {e}" + ) + # 외부 API 실패 시 Song 상태를 failed로 업데이트 + async with AsyncSessionLocal() as update_session: + song_result = await update_session.execute( + select(Song).where(Song.id == song_id) + ) + song_to_update = song_result.scalar_one_or_none() + if song_to_update: + song_to_update.status = "failed" + await update_session.commit() + + return GenerateSongResponse( + success=False, + task_id=task_id, + suno_task_id=None, + message="노래 생성 요청에 실패했습니다.", + error_message=str(e), + ) + + # ========================================================================== + # 3단계: suno_task_id 업데이트 (새 세션으로 빠르게 처리) + # ========================================================================== + stage3_start = time.perf_counter() + print(f"[generate_song] Stage 3 START - DB update - task_id: {task_id}") + + try: + async with AsyncSessionLocal() as update_session: + song_result = await update_session.execute( + select(Song).where(Song.id == song_id) + ) + song_to_update = song_result.scalar_one_or_none() + if song_to_update: + song_to_update.suno_task_id = suno_task_id + await update_session.commit() + + stage3_time = time.perf_counter() + total_time = stage3_time - request_start + print( + f"[generate_song] Stage 3 DONE - task_id: {task_id}, " + f"elapsed: {(stage3_time - stage3_start)*1000:.1f}ms" + ) + print( + f"[generate_song] SUCCESS - task_id: {task_id}, " + f"suno_task_id: {suno_task_id}, " + f"total_time: {total_time*1000:.1f}ms" + ) return GenerateSongResponse( success=True, @@ -168,16 +276,16 @@ async def generate_song( error_message=None, ) - except HTTPException: - raise except Exception as e: - print(f"[generate_song] EXCEPTION - task_id: {task_id}, error: {e}") - await session.rollback() + print( + f"[generate_song] Stage 3 EXCEPTION - " + f"task_id: {task_id}, error: {type(e).__name__}: {e}" + ) return GenerateSongResponse( success=False, task_id=task_id, - suno_task_id=None, - message="노래 생성 요청에 실패했습니다.", + suno_task_id=suno_task_id, + message="노래 생성은 요청되었으나 DB 업데이트에 실패했습니다.", error_message=str(e), ) @@ -483,14 +591,19 @@ async def get_songs( result = await session.execute(query) songs = result.scalars().all() - # Project 정보와 함께 SongListItem으로 변환 + # Project 정보 일괄 조회 (N+1 문제 해결) + project_ids = [s.project_id for s in songs if s.project_id] + projects_map: dict = {} + if project_ids: + projects_result = await session.execute( + select(Project).where(Project.id.in_(project_ids)) + ) + projects_map = {p.id: p for p in projects_result.scalars().all()} + + # SongListItem으로 변환 items = [] for song in songs: - # Project 조회 (song.project_id 직접 사용) - project_result = await session.execute( - select(Project).where(Project.id == song.project_id) - ) - project = project_result.scalar_one_or_none() + project = projects_map.get(song.project_id) item = SongListItem( store_name=project.store_name if project else None, @@ -502,13 +615,6 @@ async def get_songs( ) items.append(item) - # 개별 아이템 로그 - print( - f"[get_songs] Item - store_name: {item.store_name}, region: {item.region}, " - f"task_id: {item.task_id}, language: {item.language}, " - f"song_result_url: {item.song_result_url}, created_at: {item.created_at}" - ) - response = PaginatedResponse.create( items=items, total=total, diff --git a/app/utils/upload_blob_as_request.py b/app/utils/upload_blob_as_request.py index 5753aa8..c4be083 100644 --- a/app/utils/upload_blob_as_request.py +++ b/app/utils/upload_blob_as_request.py @@ -20,13 +20,19 @@ URL 경로 형식: success = await uploader.upload_image(file_path="my_image.png") # 바이트 데이터로 직접 업로드 (media 저장 없이) - success = await uploader.upload_music_bytes(audio_bytes, "my_song") # .mp3 자동 추가 - success = await uploader.upload_video_bytes(video_bytes, "my_video") # .mp4 자동 추가 + success = await uploader.upload_music_bytes(audio_bytes, "my_song") + success = await uploader.upload_video_bytes(video_bytes, "my_video") success = await uploader.upload_image_bytes(image_bytes, "my_image.png") print(uploader.public_url) # 마지막 업로드의 공개 URL + +성능 최적화: + - HTTP 클라이언트 재사용: 모듈 레벨의 공유 클라이언트로 커넥션 풀 재사용 + - 동시 업로드: 공유 클라이언트를 통해 동시 요청 처리가 개선됩니다. """ +import asyncio +import time from pathlib import Path import aiofiles @@ -35,6 +41,37 @@ import httpx from config import azure_blob_settings +# ============================================================================= +# 모듈 레벨 공유 HTTP 클라이언트 (싱글톤 패턴) +# ============================================================================= + +# 모듈 레벨 공유 HTTP 클라이언트 (커넥션 풀 재사용) +_shared_blob_client: httpx.AsyncClient | None = None + + +async def get_shared_blob_client() -> httpx.AsyncClient: + """공유 HTTP 클라이언트를 반환합니다. 없으면 생성합니다.""" + global _shared_blob_client + if _shared_blob_client is None or _shared_blob_client.is_closed: + print("[AzureBlobUploader] Creating shared HTTP client...") + _shared_blob_client = httpx.AsyncClient( + timeout=httpx.Timeout(180.0, connect=10.0), + limits=httpx.Limits(max_keepalive_connections=10, max_connections=20), + ) + print("[AzureBlobUploader] Shared HTTP client created - " + "max_connections: 20, max_keepalive: 10") + return _shared_blob_client + + +async def close_shared_blob_client() -> None: + """공유 HTTP 클라이언트를 닫습니다. 앱 종료 시 호출하세요.""" + global _shared_blob_client + if _shared_blob_client is not None and not _shared_blob_client.is_closed: + await _shared_blob_client.aclose() + _shared_blob_client = None + print("[AzureBlobUploader] Shared HTTP client closed") + + class AzureBlobUploader: """Azure Blob Storage 업로드 클래스 @@ -85,12 +122,75 @@ class AzureBlobUploader: """업로드 URL 생성 (SAS 토큰 포함)""" # SAS 토큰 앞뒤의 ?, ', " 제거 sas_token = self._sas_token.strip("?'\"") - return f"{self._base_url}/{self._task_id}/{category}/{file_name}?{sas_token}" + return ( + f"{self._base_url}/{self._task_id}/{category}/{file_name}?{sas_token}" + ) def _build_public_url(self, category: str, file_name: str) -> str: """공개 URL 생성 (SAS 토큰 제외)""" return f"{self._base_url}/{self._task_id}/{category}/{file_name}" + async def _upload_bytes( + self, + file_content: bytes, + upload_url: str, + headers: dict, + timeout: float, + log_prefix: str, + ) -> bool: + """바이트 데이터를 업로드하는 공통 내부 메서드""" + start_time = time.perf_counter() + + try: + print(f"[{log_prefix}] Getting shared client...") + client = await get_shared_blob_client() + client_time = time.perf_counter() + elapsed_ms = (client_time - start_time) * 1000 + print(f"[{log_prefix}] Client acquired in {elapsed_ms:.1f}ms") + + size = len(file_content) + print(f"[{log_prefix}] Starting upload... " + f"(size: {size} bytes, timeout: {timeout}s)") + + response = await asyncio.wait_for( + client.put(upload_url, content=file_content, headers=headers), + timeout=timeout, + ) + upload_time = time.perf_counter() + duration_ms = (upload_time - start_time) * 1000 + + if response.status_code in [200, 201]: + print(f"[{log_prefix}] SUCCESS - Status: {response.status_code}, " + f"Duration: {duration_ms:.1f}ms") + print(f"[{log_prefix}] Public URL: {self._last_public_url}") + return True + else: + print(f"[{log_prefix}] FAILED - Status: {response.status_code}, " + f"Duration: {duration_ms:.1f}ms") + print(f"[{log_prefix}] Response: {response.text[:500]}") + return False + + except asyncio.TimeoutError: + elapsed = time.perf_counter() - start_time + print(f"[{log_prefix}] TIMEOUT after {elapsed:.1f}s " + f"(limit: {timeout}s)") + return False + except httpx.ConnectError as e: + elapsed = time.perf_counter() - start_time + print(f"[{log_prefix}] CONNECT_ERROR after {elapsed:.1f}s - " + f"{type(e).__name__}: {e}") + return False + except httpx.ReadError as e: + elapsed = time.perf_counter() - start_time + print(f"[{log_prefix}] READ_ERROR after {elapsed:.1f}s - " + f"{type(e).__name__}: {e}") + return False + except Exception as e: + elapsed = time.perf_counter() - start_time + print(f"[{log_prefix}] ERROR after {elapsed:.1f}s - " + f"{type(e).__name__}: {e}") + return False + async def _upload_file( self, file_path: str, @@ -116,26 +216,20 @@ class AzureBlobUploader: upload_url = self._build_upload_url(category, file_name) self._last_public_url = self._build_public_url(category, file_name) - print(f"[{log_prefix}] Upload URL (without SAS): {self._last_public_url}") + print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}") headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"} async with aiofiles.open(file_path, "rb") as file: file_content = await file.read() - async with httpx.AsyncClient() as client: - response = await client.put( - upload_url, content=file_content, headers=headers, timeout=timeout - ) - - if response.status_code in [200, 201]: - print(f"[{log_prefix}] Success - Status Code: {response.status_code}") - print(f"[{log_prefix}] Public URL: {self._last_public_url}") - return True - else: - print(f"[{log_prefix}] Failed - Status Code: {response.status_code}") - print(f"[{log_prefix}] Response: {response.text}") - return False + return await self._upload_bytes( + file_content=file_content, + upload_url=upload_url, + headers=headers, + timeout=timeout, + log_prefix=log_prefix, + ) async def upload_music(self, file_path: str) -> bool: """음악 파일을 Azure Blob Storage에 업로드합니다. @@ -151,7 +245,7 @@ class AzureBlobUploader: Example: uploader = AzureBlobUploader(task_id="task-123") success = await uploader.upload_music(file_path="my_song.mp3") - print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3 + print(uploader.public_url) """ return await self._upload_file( file_path=file_path, @@ -161,7 +255,9 @@ class AzureBlobUploader: log_prefix="upload_music", ) - async def upload_music_bytes(self, file_content: bytes, file_name: str) -> bool: + async def upload_music_bytes( + self, file_content: bytes, file_name: str + ) -> bool: """음악 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다. URL 경로: {task_id}/song/{파일명} @@ -176,7 +272,7 @@ class AzureBlobUploader: Example: uploader = AzureBlobUploader(task_id="task-123") success = await uploader.upload_music_bytes(audio_bytes, "my_song") - print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3 + print(uploader.public_url) """ # 확장자가 없으면 .mp3 추가 if not Path(file_name).suffix: @@ -184,23 +280,18 @@ class AzureBlobUploader: upload_url = self._build_upload_url("song", file_name) self._last_public_url = self._build_public_url("song", file_name) - print(f"[upload_music_bytes] Upload URL (without SAS): {self._last_public_url}") + log_prefix = "upload_music_bytes" + print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}") headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"} - async with httpx.AsyncClient() as client: - response = await client.put( - upload_url, content=file_content, headers=headers, timeout=120.0 - ) - - if response.status_code in [200, 201]: - print(f"[upload_music_bytes] Success - Status Code: {response.status_code}") - print(f"[upload_music_bytes] Public URL: {self._last_public_url}") - return True - else: - print(f"[upload_music_bytes] Failed - Status Code: {response.status_code}") - print(f"[upload_music_bytes] Response: {response.text}") - return False + return await self._upload_bytes( + file_content=file_content, + upload_url=upload_url, + headers=headers, + timeout=120.0, + log_prefix=log_prefix, + ) async def upload_video(self, file_path: str) -> bool: """영상 파일을 Azure Blob Storage에 업로드합니다. @@ -216,7 +307,7 @@ class AzureBlobUploader: Example: uploader = AzureBlobUploader(task_id="task-123") success = await uploader.upload_video(file_path="my_video.mp4") - print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4 + print(uploader.public_url) """ return await self._upload_file( file_path=file_path, @@ -226,7 +317,9 @@ class AzureBlobUploader: log_prefix="upload_video", ) - async def upload_video_bytes(self, file_content: bytes, file_name: str) -> bool: + async def upload_video_bytes( + self, file_content: bytes, file_name: str + ) -> bool: """영상 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다. URL 경로: {task_id}/video/{파일명} @@ -241,7 +334,7 @@ class AzureBlobUploader: Example: uploader = AzureBlobUploader(task_id="task-123") success = await uploader.upload_video_bytes(video_bytes, "my_video") - print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4 + print(uploader.public_url) """ # 확장자가 없으면 .mp4 추가 if not Path(file_name).suffix: @@ -249,23 +342,18 @@ class AzureBlobUploader: upload_url = self._build_upload_url("video", file_name) self._last_public_url = self._build_public_url("video", file_name) - print(f"[upload_video_bytes] Upload URL (without SAS): {self._last_public_url}") + log_prefix = "upload_video_bytes" + print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}") headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"} - async with httpx.AsyncClient() as client: - response = await client.put( - upload_url, content=file_content, headers=headers, timeout=180.0 - ) - - if response.status_code in [200, 201]: - print(f"[upload_video_bytes] Success - Status Code: {response.status_code}") - print(f"[upload_video_bytes] Public URL: {self._last_public_url}") - return True - else: - print(f"[upload_video_bytes] Failed - Status Code: {response.status_code}") - print(f"[upload_video_bytes] Response: {response.text}") - return False + return await self._upload_bytes( + file_content=file_content, + upload_url=upload_url, + headers=headers, + timeout=180.0, + log_prefix=log_prefix, + ) async def upload_image(self, file_path: str) -> bool: """이미지 파일을 Azure Blob Storage에 업로드합니다. @@ -281,7 +369,7 @@ class AzureBlobUploader: Example: uploader = AzureBlobUploader(task_id="task-123") success = await uploader.upload_image(file_path="my_image.png") - print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png + print(uploader.public_url) """ extension = Path(file_path).suffix.lower() content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg") @@ -294,7 +382,9 @@ class AzureBlobUploader: log_prefix="upload_image", ) - async def upload_image_bytes(self, file_content: bytes, file_name: str) -> bool: + async def upload_image_bytes( + self, file_content: bytes, file_name: str + ) -> bool: """이미지 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다. URL 경로: {task_id}/image/{파일명} @@ -311,30 +401,25 @@ class AzureBlobUploader: with open("my_image.png", "rb") as f: content = f.read() success = await uploader.upload_image_bytes(content, "my_image.png") - print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png + print(uploader.public_url) """ extension = Path(file_name).suffix.lower() content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg") upload_url = self._build_upload_url("image", file_name) self._last_public_url = self._build_public_url("image", file_name) - print(f"[upload_image_bytes] Upload URL (without SAS): {self._last_public_url}") + log_prefix = "upload_image_bytes" + print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}") headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"} - async with httpx.AsyncClient() as client: - response = await client.put( - upload_url, content=file_content, headers=headers, timeout=60.0 - ) - - if response.status_code in [200, 201]: - print(f"[upload_image_bytes] Success - Status Code: {response.status_code}") - print(f"[upload_image_bytes] Public URL: {self._last_public_url}") - return True - else: - print(f"[upload_image_bytes] Failed - Status Code: {response.status_code}") - print(f"[upload_image_bytes] Response: {response.text}") - return False + return await self._upload_bytes( + file_content=file_content, + upload_url=upload_url, + headers=headers, + timeout=60.0, + log_prefix=log_prefix, + ) # 사용 예시: diff --git a/app/video/api/routers/v1/video.py b/app/video/api/routers/v1/video.py index 4081a5b..266e3fd 100644 --- a/app/video/api/routers/v1/video.py +++ b/app/video/api/routers/v1/video.py @@ -111,8 +111,10 @@ async def generate_video( 지원하지 않습니다. asyncio.gather()로 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다. 따라서 쿼리는 순차적으로 실행합니다. """ + import time from app.database.session import AsyncSessionLocal + request_start = time.perf_counter() print(f"[generate_video] START - task_id: {task_id}, orientation: {orientation}") # ========================================================================== @@ -165,7 +167,9 @@ async def generate_video( .order_by(Image.img_order.asc()) ) - print(f"[generate_video] Queries completed - task_id: {task_id}") + query_time = time.perf_counter() + print(f"[generate_video] Queries completed - task_id: {task_id}, " + f"elapsed: {(query_time - request_start)*1000:.1f}ms") # ===== 결과 처리: Project ===== project = project_result.scalar_one_or_none() @@ -241,7 +245,9 @@ async def generate_video( session.add(video) await session.commit() video_id = video.id - print(f"[generate_video] Video saved - task_id: {task_id}, id: {video_id}") + stage1_time = time.perf_counter() + print(f"[generate_video] Video saved - task_id: {task_id}, id: {video_id}, " + f"stage1_elapsed: {(stage1_time - request_start)*1000:.1f}ms") # 세션이 여기서 자동으로 닫힘 (async with 블록 종료) except HTTPException: @@ -259,8 +265,9 @@ async def generate_video( # ========================================================================== # 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음) # ========================================================================== + stage2_start = time.perf_counter() try: - print(f"[generate_video] Creatomate API generation started - task_id: {task_id}") + print(f"[generate_video] Stage 2 START - Creatomate API - task_id: {task_id}") creatomate_service = CreatomateService( orientation=orientation, target_duration=song_duration, @@ -309,6 +316,13 @@ async def generate_video( else: creatomate_render_id = None + stage2_time = time.perf_counter() + print( + f"[generate_video] Stage 2 DONE - task_id: {task_id}, " + f"render_id: {creatomate_render_id}, " + f"stage2_elapsed: {(stage2_time - stage2_start)*1000:.1f}ms" + ) + except Exception as e: print(f"[generate_video] Creatomate API EXCEPTION - task_id: {task_id}, error: {e}") # 외부 API 실패 시 Video 상태를 failed로 업데이트 @@ -332,6 +346,8 @@ async def generate_video( # ========================================================================== # 3단계: creatomate_render_id 업데이트 (새 세션으로 빠르게 처리) # ========================================================================== + stage3_start = time.perf_counter() + print(f"[generate_video] Stage 3 START - DB update - task_id: {task_id}") try: from app.database.session import AsyncSessionLocal async with AsyncSessionLocal() as update_session: @@ -342,7 +358,18 @@ async def generate_video( if video_to_update: video_to_update.creatomate_render_id = creatomate_render_id await update_session.commit() - print(f"[generate_video] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}") + + stage3_time = time.perf_counter() + total_time = stage3_time - request_start + print( + f"[generate_video] Stage 3 DONE - task_id: {task_id}, " + f"stage3_elapsed: {(stage3_time - stage3_start)*1000:.1f}ms" + ) + print( + f"[generate_video] SUCCESS - task_id: {task_id}, " + f"render_id: {creatomate_render_id}, " + f"total_time: {total_time*1000:.1f}ms" + ) return GenerateVideoResponse( success=True,