개선된 pool 관리

insta
bluebamus 2025-12-30 01:01:04 +09:00
parent 8671a45d96
commit efddee217a
6 changed files with 504 additions and 195 deletions

View File

@ -33,10 +33,18 @@ async def lifespan(app: FastAPI):
# Shutdown - 애플리케이션 종료 시 # Shutdown - 애플리케이션 종료 시
print("Shutting down...") print("Shutting down...")
from app.database.session import engine
await engine.dispose() # 공유 HTTP 클라이언트 종료
print("Database engine disposed") from app.utils.creatomate import close_shared_client
from app.utils.upload_blob_as_request import close_shared_blob_client
await close_shared_client()
await close_shared_blob_client()
# 데이터베이스 엔진 종료
from app.database.session import dispose_engine
await dispose_engine()
# FastAPI 앱 생성 (lifespan 적용) # FastAPI 앱 생성 (lifespan 적용)

View File

@ -1,3 +1,4 @@
import time
from typing import AsyncGenerator from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
@ -19,8 +20,8 @@ engine = create_async_engine(
pool_size=20, # 기본 풀 크기: 20 pool_size=20, # 기본 풀 크기: 20
max_overflow=20, # 추가 연결: 20 (총 최대 40) max_overflow=20, # 추가 연결: 20 (총 최대 40)
pool_timeout=30, # 풀에서 연결 대기 시간 (초) pool_timeout=30, # 풀에서 연결 대기 시간 (초)
pool_recycle=3600, # 1시간마다 연결 재생성 pool_recycle=280, # MySQL wait_timeout(기본 28800s, 클라우드는 보통 300s) 보다 짧게 설정
pool_pre_ping=True, # 연결 유효성 검사 pool_pre_ping=True, # 연결 유효성 검사 (죽은 연결 자동 재연결)
pool_reset_on_return="rollback", # 반환 시 롤백으로 초기화 pool_reset_on_return="rollback", # 반환 시 롤백으로 초기화
connect_args={ connect_args={
"connect_timeout": 10, # DB 연결 타임아웃 "connect_timeout": 10, # DB 연결 타임아웃
@ -46,8 +47,8 @@ background_engine = create_async_engine(
pool_size=10, # 백그라운드용 풀 크기: 10 pool_size=10, # 백그라운드용 풀 크기: 10
max_overflow=10, # 추가 연결: 10 (총 최대 20) max_overflow=10, # 추가 연결: 10 (총 최대 20)
pool_timeout=60, # 백그라운드는 대기 시간 여유있게 pool_timeout=60, # 백그라운드는 대기 시간 여유있게
pool_recycle=3600, pool_recycle=280, # MySQL wait_timeout 보다 짧게 설정
pool_pre_ping=True, pool_pre_ping=True, # 연결 유효성 검사 (죽은 연결 자동 재연결)
pool_reset_on_return="rollback", pool_reset_on_return="rollback",
connect_args={ connect_args={
"connect_timeout": 10, "connect_timeout": 10,
@ -82,24 +83,79 @@ async def create_db_tables():
# FastAPI 의존성용 세션 제너레이터 # FastAPI 의존성용 세션 제너레이터
async def get_session() -> AsyncGenerator[AsyncSession, None]: async def get_session() -> AsyncGenerator[AsyncSession, None]:
# 커넥션 풀 상태 로깅 (디버깅용) start_time = time.perf_counter()
pool = engine.pool pool = engine.pool
print(f"[get_session] Pool status - size: {pool.size()}, checked_in: {pool.checkedin()}, checked_out: {pool.checkedout()}, overflow: {pool.overflow()}")
# 커넥션 풀 상태 로깅 (디버깅용)
print(
f"[get_session] ACQUIRE - pool_size: {pool.size()}, "
f"in: {pool.checkedin()}, out: {pool.checkedout()}, "
f"overflow: {pool.overflow()}"
)
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
acquire_time = time.perf_counter()
print(
f"[get_session] Session acquired in "
f"{(acquire_time - start_time)*1000:.1f}ms"
)
try: try:
yield session yield session
except Exception as e: except Exception as e:
await session.rollback() await session.rollback()
print(f"[get_session] Session rollback due to: {e}") print(
f"[get_session] ROLLBACK - error: {type(e).__name__}: {e}, "
f"duration: {(time.perf_counter() - start_time)*1000:.1f}ms"
)
raise e raise e
finally: finally:
# 명시적으로 세션 종료 확인 total_time = time.perf_counter() - start_time
print(f"[get_session] Session closing - Pool checked_out: {pool.checkedout()}") print(
f"[get_session] RELEASE - duration: {total_time*1000:.1f}ms, "
f"pool_out: {pool.checkedout()}"
)
# 백그라운드 태스크용 세션 제너레이터
async def get_background_session() -> AsyncGenerator[AsyncSession, None]:
start_time = time.perf_counter()
pool = background_engine.pool
print(
f"[get_background_session] ACQUIRE - pool_size: {pool.size()}, "
f"in: {pool.checkedin()}, out: {pool.checkedout()}, "
f"overflow: {pool.overflow()}"
)
async with BackgroundSessionLocal() as session:
acquire_time = time.perf_counter()
print(
f"[get_background_session] Session acquired in "
f"{(acquire_time - start_time)*1000:.1f}ms"
)
try:
yield session
except Exception as e:
await session.rollback()
print(
f"[get_background_session] ROLLBACK - "
f"error: {type(e).__name__}: {e}, "
f"duration: {(time.perf_counter() - start_time)*1000:.1f}ms"
)
raise e
finally:
total_time = time.perf_counter() - start_time
print(
f"[get_background_session] RELEASE - "
f"duration: {total_time*1000:.1f}ms, "
f"pool_out: {pool.checkedout()}"
)
# 앱 종료 시 엔진 리소스 정리 함수 # 앱 종료 시 엔진 리소스 정리 함수
async def dispose_engine() -> None: async def dispose_engine() -> None:
print("[dispose_engine] Disposing database engines...")
await engine.dispose() await engine.dispose()
print("[dispose_engine] Main engine disposed")
await background_engine.dispose() await background_engine.dispose()
print("Database engines disposed (main + background)") print("[dispose_engine] Background engine disposed - ALL DONE")

View File

@ -505,6 +505,9 @@ async def upload_images_blob(
- Stage 2: Azure Blob 업로드 (세션 없음) - Stage 2: Azure Blob 업로드 (세션 없음)
- Stage 3: DB 저장 ( 세션으로 빠르게 처리) - Stage 3: DB 저장 ( 세션으로 빠르게 처리)
""" """
import time
request_start = time.perf_counter()
# task_id 생성 # task_id 생성
task_id = await generate_task_id() task_id = await generate_task_id()
print(f"[upload_images_blob] START - task_id: {task_id}") print(f"[upload_images_blob] START - task_id: {task_id}")
@ -560,8 +563,10 @@ async def upload_images_blob(
detail=detail, detail=detail,
) )
stage1_time = time.perf_counter()
print(f"[upload_images_blob] Stage 1 done - urls: {len(url_images)}, " print(f"[upload_images_blob] Stage 1 done - urls: {len(url_images)}, "
f"files: {len(valid_files_data)}") f"files: {len(valid_files_data)}, "
f"elapsed: {(stage1_time - request_start)*1000:.1f}ms")
# ========== Stage 2: Azure Blob 업로드 (세션 없음) ========== # ========== Stage 2: Azure Blob 업로드 (세션 없음) ==========
# 업로드 결과를 저장할 리스트 (나중에 DB에 저장) # 업로드 결과를 저장할 리스트 (나중에 DB에 저장)
@ -570,8 +575,9 @@ async def upload_images_blob(
if valid_files_data: if valid_files_data:
uploader = AzureBlobUploader(task_id=task_id) uploader = AzureBlobUploader(task_id=task_id)
total_files = len(valid_files_data)
for original_name, ext, file_content in valid_files_data: for idx, (original_name, ext, file_content) in enumerate(valid_files_data):
name_without_ext = ( name_without_ext = (
original_name.rsplit(".", 1)[0] original_name.rsplit(".", 1)[0]
if "." in original_name if "." in original_name
@ -579,6 +585,9 @@ async def upload_images_blob(
) )
filename = f"{name_without_ext}_{img_order:03d}{ext}" filename = f"{name_without_ext}_{img_order:03d}{ext}"
print(f"[upload_images_blob] Uploading file {idx+1}/{total_files}: "
f"{filename} ({len(file_content)} bytes)")
# Azure Blob Storage에 직접 업로드 # Azure Blob Storage에 직접 업로드
upload_success = await uploader.upload_image_bytes(file_content, filename) upload_success = await uploader.upload_image_bytes(file_content, filename)
@ -586,70 +595,88 @@ async def upload_images_blob(
blob_url = uploader.public_url blob_url = uploader.public_url
blob_upload_results.append((original_name, blob_url)) blob_upload_results.append((original_name, blob_url))
img_order += 1 img_order += 1
print(f"[upload_images_blob] File {idx+1}/{total_files} SUCCESS")
else: else:
skipped_files.append(filename) skipped_files.append(filename)
print(f"[upload_images_blob] File {idx+1}/{total_files} FAILED")
stage2_time = time.perf_counter()
print(f"[upload_images_blob] Stage 2 done - blob uploads: " print(f"[upload_images_blob] Stage 2 done - blob uploads: "
f"{len(blob_upload_results)}, skipped: {len(skipped_files)}") f"{len(blob_upload_results)}, skipped: {len(skipped_files)}, "
f"elapsed: {(stage2_time - stage1_time)*1000:.1f}ms")
# ========== Stage 3: DB 저장 (새 세션으로 빠르게 처리) ========== # ========== Stage 3: DB 저장 (새 세션으로 빠르게 처리) ==========
print("[upload_images_blob] Stage 3 starting - DB save...")
result_images: list[ImageUploadResultItem] = [] result_images: list[ImageUploadResultItem] = []
img_order = 0 img_order = 0
async with AsyncSessionLocal() as session: try:
# URL 이미지 저장 async with AsyncSessionLocal() as session:
for url_item in url_images: # URL 이미지 저장
img_name = url_item.name or _extract_image_name(url_item.url, img_order) for url_item in url_images:
img_name = (
url_item.name or _extract_image_name(url_item.url, img_order)
)
image = Image( image = Image(
task_id=task_id, task_id=task_id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name, img_name=img_name,
img_url=url_item.url, img_url=url_item.url,
img_order=img_order, img_order=img_order,
source="url",
) )
) session.add(image)
img_order += 1 await session.flush()
# Blob 업로드 결과 저장 result_images.append(
for img_name, blob_url in blob_upload_results: ImageUploadResultItem(
image = Image( id=image.id,
task_id=task_id, img_name=img_name,
img_name=img_name, img_url=url_item.url,
img_url=blob_url, img_order=img_order,
img_order=img_order, source="url",
) )
session.add(image) )
await session.flush() img_order += 1
result_images.append( # Blob 업로드 결과 저장
ImageUploadResultItem( for img_name, blob_url in blob_upload_results:
id=image.id, image = Image(
task_id=task_id,
img_name=img_name, img_name=img_name,
img_url=blob_url, img_url=blob_url,
img_order=img_order, img_order=img_order,
source="blob",
) )
) session.add(image)
img_order += 1 await session.flush()
await session.commit() result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
source="blob",
)
)
img_order += 1
await session.commit()
stage3_time = time.perf_counter()
print(f"[upload_images_blob] Stage 3 done - "
f"saved: {len(result_images)}, "
f"elapsed: {(stage3_time - stage2_time)*1000:.1f}ms")
except Exception as e:
print(f"[upload_images_blob] Stage 3 EXCEPTION - "
f"task_id: {task_id}, error: {type(e).__name__}: {e}")
raise
saved_count = len(result_images) saved_count = len(result_images)
image_urls = [img.img_url for img in result_images] image_urls = [img.img_url for img in result_images]
total_time = time.perf_counter() - request_start
print(f"[upload_images_blob] SUCCESS - task_id: {task_id}, " print(f"[upload_images_blob] SUCCESS - task_id: {task_id}, "
f"total: {saved_count}, returning response...") f"total: {saved_count}, total_time: {total_time*1000:.1f}ms")
return ImageUploadResponse( return ImageUploadResponse(
task_id=task_id, task_id=task_id,

View File

@ -84,81 +84,189 @@ POST /song/generate/019123ab-cdef-7890-abcd-ef1234567890
async def generate_song( async def generate_song(
task_id: str, task_id: str,
request_body: GenerateSongRequest, request_body: GenerateSongRequest,
session: AsyncSession = Depends(get_session),
) -> GenerateSongResponse: ) -> GenerateSongResponse:
"""가사와 장르를 기반으로 Suno API를 통해 노래를 생성합니다. """가사와 장르를 기반으로 Suno API를 통해 노래를 생성합니다.
1. task_id로 Project와 Lyric 조회 1. task_id로 Project와 Lyric 조회
2. Song 테이블에 초기 데이터 저장 (status: processing) 2. Song 테이블에 초기 데이터 저장 (status: processing)
3. Suno API 호출 3. Suno API 호출 (세션 닫힌 상태)
4. suno_task_id 업데이트 응답 반환 4. suno_task_id 업데이트 응답 반환
Note: 함수는 Depends(get_session) 사용하지 않고 명시적으로 세션을 관리합니다.
외부 API 호출 DB 커넥션이 유지되지 않도록 하여 커넥션 타임아웃 문제를 방지합니다.
""" """
print(f"[generate_song] START - task_id: {task_id}, genre: {request_body.genre}, language: {request_body.language}") import time
from app.database.session import AsyncSessionLocal
request_start = time.perf_counter()
print(
f"[generate_song] START - task_id: {task_id}, "
f"genre: {request_body.genre}, language: {request_body.language}"
)
# 외부 API 호출 전에 필요한 데이터를 저장할 변수들
project_id: int | None = None
lyric_id: int | None = None
song_id: int | None = None
# ==========================================================================
# 1단계: DB 조회 및 초기 데이터 저장 (세션을 명시적으로 열고 닫음)
# ==========================================================================
try: try:
# 1. task_id로 Project 조회 (중복 시 최신 것 선택) async with AsyncSessionLocal() as session:
project_result = await session.execute( # Project 조회 (중복 시 최신 것 선택)
select(Project) project_result = await session.execute(
.where(Project.task_id == task_id) select(Project)
.order_by(Project.created_at.desc()) .where(Project.task_id == task_id)
.limit(1) .order_by(Project.created_at.desc())
) .limit(1)
project = project_result.scalar_one_or_none()
if not project:
print(f"[generate_song] Project NOT FOUND - task_id: {task_id}")
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.",
) )
print(f"[generate_song] Project found - project_id: {project.id}, task_id: {task_id}") project = project_result.scalar_one_or_none()
# 2. task_id로 Lyric 조회 (중복 시 최신 것 선택) if not project:
lyric_result = await session.execute( print(f"[generate_song] Project NOT FOUND - task_id: {task_id}")
select(Lyric) raise HTTPException(
.where(Lyric.task_id == task_id) status_code=404,
.order_by(Lyric.created_at.desc()) detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.",
.limit(1) )
) project_id = project.id
lyric = lyric_result.scalar_one_or_none()
if not lyric: # Lyric 조회 (중복 시 최신 것 선택)
print(f"[generate_song] Lyric NOT FOUND - task_id: {task_id}") lyric_result = await session.execute(
raise HTTPException( select(Lyric)
status_code=404, .where(Lyric.task_id == task_id)
detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.", .order_by(Lyric.created_at.desc())
.limit(1)
) )
print(f"[generate_song] Lyric found - lyric_id: {lyric.id}, task_id: {task_id}") lyric = lyric_result.scalar_one_or_none()
# 3. Song 테이블에 초기 데이터 저장 if not lyric:
song_prompt = ( print(f"[generate_song] Lyric NOT FOUND - task_id: {task_id}")
f"[Lyrics]\n{request_body.lyrics}\n\n[Genre]\n{request_body.genre}" raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.",
)
lyric_id = lyric.id
query_time = time.perf_counter()
print(
f"[generate_song] Queries completed - task_id: {task_id}, "
f"project_id: {project_id}, lyric_id: {lyric_id}, "
f"elapsed: {(query_time - request_start)*1000:.1f}ms"
)
# Song 테이블에 초기 데이터 저장
song_prompt = (
f"[Lyrics]\n{request_body.lyrics}\n\n[Genre]\n{request_body.genre}"
)
song = Song(
project_id=project_id,
lyric_id=lyric_id,
task_id=task_id,
suno_task_id=None,
status="processing",
song_prompt=song_prompt,
language=request_body.language,
)
session.add(song)
await session.commit()
song_id = song.id
stage1_time = time.perf_counter()
print(
f"[generate_song] Stage 1 DONE - Song saved - "
f"task_id: {task_id}, song_id: {song_id}, "
f"elapsed: {(stage1_time - request_start)*1000:.1f}ms"
)
# 세션이 여기서 자동으로 닫힘
except HTTPException:
raise
except Exception as e:
print(
f"[generate_song] Stage 1 EXCEPTION - "
f"task_id: {task_id}, error: {type(e).__name__}: {e}"
) )
return GenerateSongResponse(
song = Song( success=False,
project_id=project.id,
lyric_id=lyric.id,
task_id=task_id, task_id=task_id,
suno_task_id=None, suno_task_id=None,
status="processing", message="노래 생성 요청에 실패했습니다.",
song_prompt=song_prompt, error_message=str(e),
language=request_body.language,
) )
session.add(song)
await session.flush() # ID 생성을 위해 flush
print(f"[generate_song] Song saved (processing) - task_id: {task_id}")
# 4. Suno API 호출 # ==========================================================================
print(f"[generate_song] Suno API generation started - task_id: {task_id}") # 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음)
# ==========================================================================
stage2_start = time.perf_counter()
suno_task_id: str | None = None
try:
print(f"[generate_song] Stage 2 START - Suno API - task_id: {task_id}")
suno_service = SunoService() suno_service = SunoService()
suno_task_id = await suno_service.generate( suno_task_id = await suno_service.generate(
prompt=request_body.lyrics, prompt=request_body.lyrics,
genre=request_body.genre, genre=request_body.genre,
) )
# 5. suno_task_id 업데이트 stage2_time = time.perf_counter()
song.suno_task_id = suno_task_id print(
await session.commit() f"[generate_song] Stage 2 DONE - task_id: {task_id}, "
print(f"[generate_song] SUCCESS - task_id: {task_id}, suno_task_id: {suno_task_id}") f"suno_task_id: {suno_task_id}, "
f"elapsed: {(stage2_time - stage2_start)*1000:.1f}ms"
)
except Exception as e:
print(
f"[generate_song] Stage 2 EXCEPTION - Suno API failed - "
f"task_id: {task_id}, error: {type(e).__name__}: {e}"
)
# 외부 API 실패 시 Song 상태를 failed로 업데이트
async with AsyncSessionLocal() as update_session:
song_result = await update_session.execute(
select(Song).where(Song.id == song_id)
)
song_to_update = song_result.scalar_one_or_none()
if song_to_update:
song_to_update.status = "failed"
await update_session.commit()
return GenerateSongResponse(
success=False,
task_id=task_id,
suno_task_id=None,
message="노래 생성 요청에 실패했습니다.",
error_message=str(e),
)
# ==========================================================================
# 3단계: suno_task_id 업데이트 (새 세션으로 빠르게 처리)
# ==========================================================================
stage3_start = time.perf_counter()
print(f"[generate_song] Stage 3 START - DB update - task_id: {task_id}")
try:
async with AsyncSessionLocal() as update_session:
song_result = await update_session.execute(
select(Song).where(Song.id == song_id)
)
song_to_update = song_result.scalar_one_or_none()
if song_to_update:
song_to_update.suno_task_id = suno_task_id
await update_session.commit()
stage3_time = time.perf_counter()
total_time = stage3_time - request_start
print(
f"[generate_song] Stage 3 DONE - task_id: {task_id}, "
f"elapsed: {(stage3_time - stage3_start)*1000:.1f}ms"
)
print(
f"[generate_song] SUCCESS - task_id: {task_id}, "
f"suno_task_id: {suno_task_id}, "
f"total_time: {total_time*1000:.1f}ms"
)
return GenerateSongResponse( return GenerateSongResponse(
success=True, success=True,
@ -168,16 +276,16 @@ async def generate_song(
error_message=None, error_message=None,
) )
except HTTPException:
raise
except Exception as e: except Exception as e:
print(f"[generate_song] EXCEPTION - task_id: {task_id}, error: {e}") print(
await session.rollback() f"[generate_song] Stage 3 EXCEPTION - "
f"task_id: {task_id}, error: {type(e).__name__}: {e}"
)
return GenerateSongResponse( return GenerateSongResponse(
success=False, success=False,
task_id=task_id, task_id=task_id,
suno_task_id=None, suno_task_id=suno_task_id,
message="노래 생성 요청에 실패했습니다.", message="노래 생성 요청되었으나 DB 업데이트에 실패했습니다.",
error_message=str(e), error_message=str(e),
) )
@ -483,14 +591,19 @@ async def get_songs(
result = await session.execute(query) result = await session.execute(query)
songs = result.scalars().all() songs = result.scalars().all()
# Project 정보와 함께 SongListItem으로 변환 # Project 정보 일괄 조회 (N+1 문제 해결)
project_ids = [s.project_id for s in songs if s.project_id]
projects_map: dict = {}
if project_ids:
projects_result = await session.execute(
select(Project).where(Project.id.in_(project_ids))
)
projects_map = {p.id: p for p in projects_result.scalars().all()}
# SongListItem으로 변환
items = [] items = []
for song in songs: for song in songs:
# Project 조회 (song.project_id 직접 사용) project = projects_map.get(song.project_id)
project_result = await session.execute(
select(Project).where(Project.id == song.project_id)
)
project = project_result.scalar_one_or_none()
item = SongListItem( item = SongListItem(
store_name=project.store_name if project else None, store_name=project.store_name if project else None,
@ -502,13 +615,6 @@ async def get_songs(
) )
items.append(item) items.append(item)
# 개별 아이템 로그
print(
f"[get_songs] Item - store_name: {item.store_name}, region: {item.region}, "
f"task_id: {item.task_id}, language: {item.language}, "
f"song_result_url: {item.song_result_url}, created_at: {item.created_at}"
)
response = PaginatedResponse.create( response = PaginatedResponse.create(
items=items, items=items,
total=total, total=total,

View File

@ -20,13 +20,19 @@ URL 경로 형식:
success = await uploader.upload_image(file_path="my_image.png") success = await uploader.upload_image(file_path="my_image.png")
# 바이트 데이터로 직접 업로드 (media 저장 없이) # 바이트 데이터로 직접 업로드 (media 저장 없이)
success = await uploader.upload_music_bytes(audio_bytes, "my_song") # .mp3 자동 추가 success = await uploader.upload_music_bytes(audio_bytes, "my_song")
success = await uploader.upload_video_bytes(video_bytes, "my_video") # .mp4 자동 추가 success = await uploader.upload_video_bytes(video_bytes, "my_video")
success = await uploader.upload_image_bytes(image_bytes, "my_image.png") success = await uploader.upload_image_bytes(image_bytes, "my_image.png")
print(uploader.public_url) # 마지막 업로드의 공개 URL print(uploader.public_url) # 마지막 업로드의 공개 URL
성능 최적화:
- HTTP 클라이언트 재사용: 모듈 레벨의 공유 클라이언트로 커넥션 재사용
- 동시 업로드: 공유 클라이언트를 통해 동시 요청 처리가 개선됩니다.
""" """
import asyncio
import time
from pathlib import Path from pathlib import Path
import aiofiles import aiofiles
@ -35,6 +41,37 @@ import httpx
from config import azure_blob_settings from config import azure_blob_settings
# =============================================================================
# 모듈 레벨 공유 HTTP 클라이언트 (싱글톤 패턴)
# =============================================================================
# 모듈 레벨 공유 HTTP 클라이언트 (커넥션 풀 재사용)
_shared_blob_client: httpx.AsyncClient | None = None
async def get_shared_blob_client() -> httpx.AsyncClient:
"""공유 HTTP 클라이언트를 반환합니다. 없으면 생성합니다."""
global _shared_blob_client
if _shared_blob_client is None or _shared_blob_client.is_closed:
print("[AzureBlobUploader] Creating shared HTTP client...")
_shared_blob_client = httpx.AsyncClient(
timeout=httpx.Timeout(180.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
)
print("[AzureBlobUploader] Shared HTTP client created - "
"max_connections: 20, max_keepalive: 10")
return _shared_blob_client
async def close_shared_blob_client() -> None:
"""공유 HTTP 클라이언트를 닫습니다. 앱 종료 시 호출하세요."""
global _shared_blob_client
if _shared_blob_client is not None and not _shared_blob_client.is_closed:
await _shared_blob_client.aclose()
_shared_blob_client = None
print("[AzureBlobUploader] Shared HTTP client closed")
class AzureBlobUploader: class AzureBlobUploader:
"""Azure Blob Storage 업로드 클래스 """Azure Blob Storage 업로드 클래스
@ -85,12 +122,75 @@ class AzureBlobUploader:
"""업로드 URL 생성 (SAS 토큰 포함)""" """업로드 URL 생성 (SAS 토큰 포함)"""
# SAS 토큰 앞뒤의 ?, ', " 제거 # SAS 토큰 앞뒤의 ?, ', " 제거
sas_token = self._sas_token.strip("?'\"") sas_token = self._sas_token.strip("?'\"")
return f"{self._base_url}/{self._task_id}/{category}/{file_name}?{sas_token}" return (
f"{self._base_url}/{self._task_id}/{category}/{file_name}?{sas_token}"
)
def _build_public_url(self, category: str, file_name: str) -> str: def _build_public_url(self, category: str, file_name: str) -> str:
"""공개 URL 생성 (SAS 토큰 제외)""" """공개 URL 생성 (SAS 토큰 제외)"""
return f"{self._base_url}/{self._task_id}/{category}/{file_name}" return f"{self._base_url}/{self._task_id}/{category}/{file_name}"
async def _upload_bytes(
self,
file_content: bytes,
upload_url: str,
headers: dict,
timeout: float,
log_prefix: str,
) -> bool:
"""바이트 데이터를 업로드하는 공통 내부 메서드"""
start_time = time.perf_counter()
try:
print(f"[{log_prefix}] Getting shared client...")
client = await get_shared_blob_client()
client_time = time.perf_counter()
elapsed_ms = (client_time - start_time) * 1000
print(f"[{log_prefix}] Client acquired in {elapsed_ms:.1f}ms")
size = len(file_content)
print(f"[{log_prefix}] Starting upload... "
f"(size: {size} bytes, timeout: {timeout}s)")
response = await asyncio.wait_for(
client.put(upload_url, content=file_content, headers=headers),
timeout=timeout,
)
upload_time = time.perf_counter()
duration_ms = (upload_time - start_time) * 1000
if response.status_code in [200, 201]:
print(f"[{log_prefix}] SUCCESS - Status: {response.status_code}, "
f"Duration: {duration_ms:.1f}ms")
print(f"[{log_prefix}] Public URL: {self._last_public_url}")
return True
else:
print(f"[{log_prefix}] FAILED - Status: {response.status_code}, "
f"Duration: {duration_ms:.1f}ms")
print(f"[{log_prefix}] Response: {response.text[:500]}")
return False
except asyncio.TimeoutError:
elapsed = time.perf_counter() - start_time
print(f"[{log_prefix}] TIMEOUT after {elapsed:.1f}s "
f"(limit: {timeout}s)")
return False
except httpx.ConnectError as e:
elapsed = time.perf_counter() - start_time
print(f"[{log_prefix}] CONNECT_ERROR after {elapsed:.1f}s - "
f"{type(e).__name__}: {e}")
return False
except httpx.ReadError as e:
elapsed = time.perf_counter() - start_time
print(f"[{log_prefix}] READ_ERROR after {elapsed:.1f}s - "
f"{type(e).__name__}: {e}")
return False
except Exception as e:
elapsed = time.perf_counter() - start_time
print(f"[{log_prefix}] ERROR after {elapsed:.1f}s - "
f"{type(e).__name__}: {e}")
return False
async def _upload_file( async def _upload_file(
self, self,
file_path: str, file_path: str,
@ -116,26 +216,20 @@ class AzureBlobUploader:
upload_url = self._build_upload_url(category, file_name) upload_url = self._build_upload_url(category, file_name)
self._last_public_url = self._build_public_url(category, file_name) self._last_public_url = self._build_public_url(category, file_name)
print(f"[{log_prefix}] Upload URL (without SAS): {self._last_public_url}") print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"} headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
async with aiofiles.open(file_path, "rb") as file: async with aiofiles.open(file_path, "rb") as file:
file_content = await file.read() file_content = await file.read()
async with httpx.AsyncClient() as client: return await self._upload_bytes(
response = await client.put( file_content=file_content,
upload_url, content=file_content, headers=headers, timeout=timeout upload_url=upload_url,
) headers=headers,
timeout=timeout,
if response.status_code in [200, 201]: log_prefix=log_prefix,
print(f"[{log_prefix}] Success - Status Code: {response.status_code}") )
print(f"[{log_prefix}] Public URL: {self._last_public_url}")
return True
else:
print(f"[{log_prefix}] Failed - Status Code: {response.status_code}")
print(f"[{log_prefix}] Response: {response.text}")
return False
async def upload_music(self, file_path: str) -> bool: async def upload_music(self, file_path: str) -> bool:
"""음악 파일을 Azure Blob Storage에 업로드합니다. """음악 파일을 Azure Blob Storage에 업로드합니다.
@ -151,7 +245,7 @@ class AzureBlobUploader:
Example: Example:
uploader = AzureBlobUploader(task_id="task-123") uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_music(file_path="my_song.mp3") success = await uploader.upload_music(file_path="my_song.mp3")
print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3 print(uploader.public_url)
""" """
return await self._upload_file( return await self._upload_file(
file_path=file_path, file_path=file_path,
@ -161,7 +255,9 @@ class AzureBlobUploader:
log_prefix="upload_music", log_prefix="upload_music",
) )
async def upload_music_bytes(self, file_content: bytes, file_name: str) -> bool: async def upload_music_bytes(
self, file_content: bytes, file_name: str
) -> bool:
"""음악 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다. """음악 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/song/{파일명} URL 경로: {task_id}/song/{파일명}
@ -176,7 +272,7 @@ class AzureBlobUploader:
Example: Example:
uploader = AzureBlobUploader(task_id="task-123") uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_music_bytes(audio_bytes, "my_song") success = await uploader.upload_music_bytes(audio_bytes, "my_song")
print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3 print(uploader.public_url)
""" """
# 확장자가 없으면 .mp3 추가 # 확장자가 없으면 .mp3 추가
if not Path(file_name).suffix: if not Path(file_name).suffix:
@ -184,23 +280,18 @@ class AzureBlobUploader:
upload_url = self._build_upload_url("song", file_name) upload_url = self._build_upload_url("song", file_name)
self._last_public_url = self._build_public_url("song", file_name) self._last_public_url = self._build_public_url("song", file_name)
print(f"[upload_music_bytes] Upload URL (without SAS): {self._last_public_url}") log_prefix = "upload_music_bytes"
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"} headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"}
async with httpx.AsyncClient() as client: return await self._upload_bytes(
response = await client.put( file_content=file_content,
upload_url, content=file_content, headers=headers, timeout=120.0 upload_url=upload_url,
) headers=headers,
timeout=120.0,
if response.status_code in [200, 201]: log_prefix=log_prefix,
print(f"[upload_music_bytes] Success - Status Code: {response.status_code}") )
print(f"[upload_music_bytes] Public URL: {self._last_public_url}")
return True
else:
print(f"[upload_music_bytes] Failed - Status Code: {response.status_code}")
print(f"[upload_music_bytes] Response: {response.text}")
return False
async def upload_video(self, file_path: str) -> bool: async def upload_video(self, file_path: str) -> bool:
"""영상 파일을 Azure Blob Storage에 업로드합니다. """영상 파일을 Azure Blob Storage에 업로드합니다.
@ -216,7 +307,7 @@ class AzureBlobUploader:
Example: Example:
uploader = AzureBlobUploader(task_id="task-123") uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_video(file_path="my_video.mp4") success = await uploader.upload_video(file_path="my_video.mp4")
print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4 print(uploader.public_url)
""" """
return await self._upload_file( return await self._upload_file(
file_path=file_path, file_path=file_path,
@ -226,7 +317,9 @@ class AzureBlobUploader:
log_prefix="upload_video", log_prefix="upload_video",
) )
async def upload_video_bytes(self, file_content: bytes, file_name: str) -> bool: async def upload_video_bytes(
self, file_content: bytes, file_name: str
) -> bool:
"""영상 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다. """영상 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/video/{파일명} URL 경로: {task_id}/video/{파일명}
@ -241,7 +334,7 @@ class AzureBlobUploader:
Example: Example:
uploader = AzureBlobUploader(task_id="task-123") uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_video_bytes(video_bytes, "my_video") success = await uploader.upload_video_bytes(video_bytes, "my_video")
print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4 print(uploader.public_url)
""" """
# 확장자가 없으면 .mp4 추가 # 확장자가 없으면 .mp4 추가
if not Path(file_name).suffix: if not Path(file_name).suffix:
@ -249,23 +342,18 @@ class AzureBlobUploader:
upload_url = self._build_upload_url("video", file_name) upload_url = self._build_upload_url("video", file_name)
self._last_public_url = self._build_public_url("video", file_name) self._last_public_url = self._build_public_url("video", file_name)
print(f"[upload_video_bytes] Upload URL (without SAS): {self._last_public_url}") log_prefix = "upload_video_bytes"
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"} headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"}
async with httpx.AsyncClient() as client: return await self._upload_bytes(
response = await client.put( file_content=file_content,
upload_url, content=file_content, headers=headers, timeout=180.0 upload_url=upload_url,
) headers=headers,
timeout=180.0,
if response.status_code in [200, 201]: log_prefix=log_prefix,
print(f"[upload_video_bytes] Success - Status Code: {response.status_code}") )
print(f"[upload_video_bytes] Public URL: {self._last_public_url}")
return True
else:
print(f"[upload_video_bytes] Failed - Status Code: {response.status_code}")
print(f"[upload_video_bytes] Response: {response.text}")
return False
async def upload_image(self, file_path: str) -> bool: async def upload_image(self, file_path: str) -> bool:
"""이미지 파일을 Azure Blob Storage에 업로드합니다. """이미지 파일을 Azure Blob Storage에 업로드합니다.
@ -281,7 +369,7 @@ class AzureBlobUploader:
Example: Example:
uploader = AzureBlobUploader(task_id="task-123") uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_image(file_path="my_image.png") success = await uploader.upload_image(file_path="my_image.png")
print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png print(uploader.public_url)
""" """
extension = Path(file_path).suffix.lower() extension = Path(file_path).suffix.lower()
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg") content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
@ -294,7 +382,9 @@ class AzureBlobUploader:
log_prefix="upload_image", log_prefix="upload_image",
) )
async def upload_image_bytes(self, file_content: bytes, file_name: str) -> bool: async def upload_image_bytes(
self, file_content: bytes, file_name: str
) -> bool:
"""이미지 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다. """이미지 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/image/{파일명} URL 경로: {task_id}/image/{파일명}
@ -311,30 +401,25 @@ class AzureBlobUploader:
with open("my_image.png", "rb") as f: with open("my_image.png", "rb") as f:
content = f.read() content = f.read()
success = await uploader.upload_image_bytes(content, "my_image.png") success = await uploader.upload_image_bytes(content, "my_image.png")
print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png print(uploader.public_url)
""" """
extension = Path(file_name).suffix.lower() extension = Path(file_name).suffix.lower()
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg") content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
upload_url = self._build_upload_url("image", file_name) upload_url = self._build_upload_url("image", file_name)
self._last_public_url = self._build_public_url("image", file_name) self._last_public_url = self._build_public_url("image", file_name)
print(f"[upload_image_bytes] Upload URL (without SAS): {self._last_public_url}") log_prefix = "upload_image_bytes"
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"} headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
async with httpx.AsyncClient() as client: return await self._upload_bytes(
response = await client.put( file_content=file_content,
upload_url, content=file_content, headers=headers, timeout=60.0 upload_url=upload_url,
) headers=headers,
timeout=60.0,
if response.status_code in [200, 201]: log_prefix=log_prefix,
print(f"[upload_image_bytes] Success - Status Code: {response.status_code}") )
print(f"[upload_image_bytes] Public URL: {self._last_public_url}")
return True
else:
print(f"[upload_image_bytes] Failed - Status Code: {response.status_code}")
print(f"[upload_image_bytes] Response: {response.text}")
return False
# 사용 예시: # 사용 예시:

View File

@ -111,8 +111,10 @@ async def generate_video(
지원하지 않습니다. asyncio.gather() 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다. 지원하지 않습니다. asyncio.gather() 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다.
따라서 쿼리는 순차적으로 실행합니다. 따라서 쿼리는 순차적으로 실행합니다.
""" """
import time
from app.database.session import AsyncSessionLocal from app.database.session import AsyncSessionLocal
request_start = time.perf_counter()
print(f"[generate_video] START - task_id: {task_id}, orientation: {orientation}") print(f"[generate_video] START - task_id: {task_id}, orientation: {orientation}")
# ========================================================================== # ==========================================================================
@ -165,7 +167,9 @@ async def generate_video(
.order_by(Image.img_order.asc()) .order_by(Image.img_order.asc())
) )
print(f"[generate_video] Queries completed - task_id: {task_id}") query_time = time.perf_counter()
print(f"[generate_video] Queries completed - task_id: {task_id}, "
f"elapsed: {(query_time - request_start)*1000:.1f}ms")
# ===== 결과 처리: Project ===== # ===== 결과 처리: Project =====
project = project_result.scalar_one_or_none() project = project_result.scalar_one_or_none()
@ -241,7 +245,9 @@ async def generate_video(
session.add(video) session.add(video)
await session.commit() await session.commit()
video_id = video.id video_id = video.id
print(f"[generate_video] Video saved - task_id: {task_id}, id: {video_id}") stage1_time = time.perf_counter()
print(f"[generate_video] Video saved - task_id: {task_id}, id: {video_id}, "
f"stage1_elapsed: {(stage1_time - request_start)*1000:.1f}ms")
# 세션이 여기서 자동으로 닫힘 (async with 블록 종료) # 세션이 여기서 자동으로 닫힘 (async with 블록 종료)
except HTTPException: except HTTPException:
@ -259,8 +265,9 @@ async def generate_video(
# ========================================================================== # ==========================================================================
# 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음) # 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음)
# ========================================================================== # ==========================================================================
stage2_start = time.perf_counter()
try: try:
print(f"[generate_video] Creatomate API generation started - task_id: {task_id}") print(f"[generate_video] Stage 2 START - Creatomate API - task_id: {task_id}")
creatomate_service = CreatomateService( creatomate_service = CreatomateService(
orientation=orientation, orientation=orientation,
target_duration=song_duration, target_duration=song_duration,
@ -309,6 +316,13 @@ async def generate_video(
else: else:
creatomate_render_id = None creatomate_render_id = None
stage2_time = time.perf_counter()
print(
f"[generate_video] Stage 2 DONE - task_id: {task_id}, "
f"render_id: {creatomate_render_id}, "
f"stage2_elapsed: {(stage2_time - stage2_start)*1000:.1f}ms"
)
except Exception as e: except Exception as e:
print(f"[generate_video] Creatomate API EXCEPTION - task_id: {task_id}, error: {e}") print(f"[generate_video] Creatomate API EXCEPTION - task_id: {task_id}, error: {e}")
# 외부 API 실패 시 Video 상태를 failed로 업데이트 # 외부 API 실패 시 Video 상태를 failed로 업데이트
@ -332,6 +346,8 @@ async def generate_video(
# ========================================================================== # ==========================================================================
# 3단계: creatomate_render_id 업데이트 (새 세션으로 빠르게 처리) # 3단계: creatomate_render_id 업데이트 (새 세션으로 빠르게 처리)
# ========================================================================== # ==========================================================================
stage3_start = time.perf_counter()
print(f"[generate_video] Stage 3 START - DB update - task_id: {task_id}")
try: try:
from app.database.session import AsyncSessionLocal from app.database.session import AsyncSessionLocal
async with AsyncSessionLocal() as update_session: async with AsyncSessionLocal() as update_session:
@ -342,7 +358,18 @@ async def generate_video(
if video_to_update: if video_to_update:
video_to_update.creatomate_render_id = creatomate_render_id video_to_update.creatomate_render_id = creatomate_render_id
await update_session.commit() await update_session.commit()
print(f"[generate_video] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}")
stage3_time = time.perf_counter()
total_time = stage3_time - request_start
print(
f"[generate_video] Stage 3 DONE - task_id: {task_id}, "
f"stage3_elapsed: {(stage3_time - stage3_start)*1000:.1f}ms"
)
print(
f"[generate_video] SUCCESS - task_id: {task_id}, "
f"render_id: {creatomate_render_id}, "
f"total_time: {total_time*1000:.1f}ms"
)
return GenerateVideoResponse( return GenerateVideoResponse(
success=True, success=True,