세션 및 비동기 처리 개선

insta
bluebamus 2025-12-29 23:46:17 +09:00
parent 153b9f0ca4
commit 5c99610e00
10 changed files with 4559 additions and 289 deletions

View File

@ -1,9 +1,7 @@
from contextlib import asynccontextmanager
from typing import AsyncGenerator from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.pool import NullPool
from config import db_settings from config import db_settings
@ -12,24 +10,25 @@ class Base(DeclarativeBase):
pass pass
# 데이터베이스 엔진 생성 # =============================================================================
# 메인 엔진 (FastAPI 요청용)
# =============================================================================
engine = create_async_engine( engine = create_async_engine(
url=db_settings.MYSQL_URL, url=db_settings.MYSQL_URL,
echo=False, echo=False,
pool_size=10, pool_size=20, # 기본 풀 크기: 20
max_overflow=10, max_overflow=20, # 추가 연결: 20 (총 최대 40)
pool_timeout=5, pool_timeout=30, # 풀에서 연결 대기 시간 (초)
pool_recycle=3600, pool_recycle=3600, # 1시간마다 연결 재생성
pool_pre_ping=True, pool_pre_ping=True, # 연결 유효성 검사
pool_reset_on_return="rollback", pool_reset_on_return="rollback", # 반환 시 롤백으로 초기화
connect_args={ connect_args={
"connect_timeout": 3, "connect_timeout": 10, # DB 연결 타임아웃
"charset": "utf8mb4", "charset": "utf8mb4",
# "allow_public_key_retrieval": True,
}, },
) )
# Async sessionmaker 생성 # 메인 세션 팩토리 (FastAPI DI용)
AsyncSessionLocal = async_sessionmaker( AsyncSessionLocal = async_sessionmaker(
bind=engine, bind=engine,
class_=AsyncSession, class_=AsyncSession,
@ -38,6 +37,33 @@ AsyncSessionLocal = async_sessionmaker(
) )
# =============================================================================
# 백그라운드 태스크 전용 엔진 (메인 풀과 분리)
# =============================================================================
background_engine = create_async_engine(
url=db_settings.MYSQL_URL,
echo=False,
pool_size=10, # 백그라운드용 풀 크기: 10
max_overflow=10, # 추가 연결: 10 (총 최대 20)
pool_timeout=60, # 백그라운드는 대기 시간 여유있게
pool_recycle=3600,
pool_pre_ping=True,
pool_reset_on_return="rollback",
connect_args={
"connect_timeout": 10,
"charset": "utf8mb4",
},
)
# 백그라운드 세션 팩토리
BackgroundSessionLocal = async_sessionmaker(
bind=background_engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
async def create_db_tables(): async def create_db_tables():
import asyncio import asyncio
@ -56,72 +82,24 @@ async def create_db_tables():
# FastAPI 의존성용 세션 제너레이터 # FastAPI 의존성용 세션 제너레이터
async def get_session() -> AsyncGenerator[AsyncSession, None]: async def get_session() -> AsyncGenerator[AsyncSession, None]:
# 커넥션 풀 상태 로깅 (디버깅용)
pool = engine.pool
print(f"[get_session] Pool status - size: {pool.size()}, checked_in: {pool.checkedin()}, checked_out: {pool.checkedout()}, overflow: {pool.overflow()}")
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
try: try:
yield session yield session
# print("Session commited")
# await session.commit()
except Exception as e: except Exception as e:
await session.rollback() await session.rollback()
print(f"Session rollback due to: {e}") print(f"[get_session] Session rollback due to: {e}")
raise e raise e
# async with 종료 시 session.close()가 자동 호출됨 finally:
# 명시적으로 세션 종료 확인
print(f"[get_session] Session closing - Pool checked_out: {pool.checkedout()}")
# 앱 종료 시 엔진 리소스 정리 함수 # 앱 종료 시 엔진 리소스 정리 함수
async def dispose_engine() -> None: async def dispose_engine() -> None:
await engine.dispose() await engine.dispose()
print("Database engine disposed") await background_engine.dispose()
print("Database engines disposed (main + background)")
# =============================================================================
# 백그라운드 태스크용 세션 (별도 이벤트 루프에서 사용)
# =============================================================================
@asynccontextmanager
async def get_worker_session() -> AsyncGenerator[AsyncSession, None]:
"""백그라운드 태스크용 세션 컨텍스트 매니저
asyncio.run()으로 이벤트 루프를 생성하는 백그라운드 태스크에서 사용합니다.
NullPool을 사용하여 연결 풀링을 비활성화하고, 이벤트 루프 충돌을 방지합니다.
get_session()과의 차이점:
- get_session(): FastAPI DI용, 메인 이벤트 루프의 연결 사용
- get_worker_session(): 백그라운드 태스크용, NullPool로 매번 연결 생성
Usage:
async with get_worker_session() as session:
result = await session.execute(select(Model))
await session.commit()
Note:
- 호출마다 엔진을 생성하고 dispose하므로 오버헤드가 있음
- 빈번한 호출이 필요한 경우 방법 1(모듈 레벨 엔진) 고려
"""
worker_engine = create_async_engine(
url=db_settings.MYSQL_URL,
poolclass=NullPool,
connect_args={
"connect_timeout": 3,
"charset": "utf8mb4",
},
)
session_factory = async_sessionmaker(
bind=worker_engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
async with session_factory() as session:
try:
yield session
except Exception as e:
await session.rollback()
print(f"Worker session rollback due to: {e}")
raise e
finally:
await session.close()
await worker_engine.dispose()

View File

@ -8,7 +8,7 @@ import aiofiles
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session from app.database.session import get_session, AsyncSessionLocal
from app.home.models import Image from app.home.models import Image
from app.home.schemas.home_schema import ( from app.home.schemas.home_schema import (
CrawlingRequest, CrawlingRequest,
@ -497,13 +497,19 @@ async def upload_images_blob(
files: Optional[list[UploadFile]] = File( files: Optional[list[UploadFile]] = File(
default=None, description="이미지 바이너리 파일 목록" default=None, description="이미지 바이너리 파일 목록"
), ),
session: AsyncSession = Depends(get_session),
) -> ImageUploadResponse: ) -> ImageUploadResponse:
"""이미지 업로드 (URL + Azure Blob Storage)""" """이미지 업로드 (URL + Azure Blob Storage)
3단계로 분리하여 세션 점유 시간 최소화:
- Stage 1: 입력 검증 파일 데이터 준비 (세션 없음)
- Stage 2: Azure Blob 업로드 (세션 없음)
- Stage 3: DB 저장 ( 세션으로 빠르게 처리)
"""
# task_id 생성 # task_id 생성
task_id = await generate_task_id() task_id = await generate_task_id()
print(f"[upload_images_blob] START - task_id: {task_id}")
# 1. 진입 검증 # ========== Stage 1: 입력 검증 및 파일 데이터 준비 (세션 없음) ==========
has_images_json = images_json is not None and images_json.strip() != "" has_images_json = images_json is not None and images_json.strip() != ""
has_files = files is not None and len(files) > 0 has_files = files is not None and len(files) > 0
@ -513,9 +519,9 @@ async def upload_images_blob(
detail="images_json 또는 files 중 하나는 반드시 제공해야 합니다.", detail="images_json 또는 files 중 하나는 반드시 제공해야 합니다.",
) )
# 2. images_json 파싱 # images_json 파싱
url_images: list[ImageUrlItem] = [] url_images: list[ImageUrlItem] = []
if has_images_json: if has_images_json and images_json:
try: try:
parsed = json.loads(images_json) parsed = json.loads(images_json)
if isinstance(parsed, list): if isinstance(parsed, list):
@ -526,8 +532,8 @@ async def upload_images_blob(
detail=f"images_json 파싱 오류: {str(e)}", detail=f"images_json 파싱 오류: {str(e)}",
) )
# 3. 유효한 파일만 필터링 # 유효한 파일만 필터링 및 파일 내용 미리 읽기
valid_files: list[UploadFile] = [] valid_files_data: list[tuple[str, str, bytes]] = [] # (original_name, ext, content)
skipped_files: list[str] = [] skipped_files: list[str] = []
if has_files and files: if has_files and files:
for f in files: for f in files:
@ -536,50 +542,36 @@ async def upload_images_blob(
is_real_file = f.filename and f.filename != "filename" is_real_file = f.filename and f.filename != "filename"
if f and is_real_file and is_valid_ext and is_not_empty: if f and is_real_file and is_valid_ext and is_not_empty:
valid_files.append(f) # 파일 내용을 미리 읽어둠
content = await f.read()
ext = _get_file_extension(f.filename) # type: ignore[arg-type]
valid_files_data.append((f.filename or "image", ext, content))
else: else:
skipped_files.append(f.filename or "unknown") skipped_files.append(f.filename or "unknown")
if not url_images and not valid_files: if not url_images and not valid_files_data:
detail = (
f"유효한 이미지가 없습니다. "
f"지원 확장자: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}. "
f"건너뛴 파일: {skipped_files}"
)
raise HTTPException( raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,
detail=f"유효한 이미지가 없습니다. 지원 확장자: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}. 건너뛴 파일: {skipped_files}", detail=detail,
) )
result_images: list[ImageUploadResultItem] = [] print(f"[upload_images_blob] Stage 1 done - urls: {len(url_images)}, "
img_order = 0 f"files: {len(valid_files_data)}")
# 1. URL 이미지 저장 # ========== Stage 2: Azure Blob 업로드 (세션 없음) ==========
for url_item in url_images: # 업로드 결과를 저장할 리스트 (나중에 DB에 저장)
img_name = url_item.name or _extract_image_name(url_item.url, img_order) blob_upload_results: list[tuple[str, str]] = [] # (img_name, blob_url)
img_order = len(url_images) # URL 이미지 다음 순서부터 시작
image = Image( if valid_files_data:
task_id=task_id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
source="url",
)
)
img_order += 1
# 2. 바이너리 파일을 Azure Blob Storage에 직접 업로드 (media 저장 없음)
if valid_files:
uploader = AzureBlobUploader(task_id=task_id) uploader = AzureBlobUploader(task_id=task_id)
for file in valid_files: for original_name, ext, file_content in valid_files_data:
original_name = file.filename or "image"
ext = _get_file_extension(file.filename) # type: ignore[arg-type]
name_without_ext = ( name_without_ext = (
original_name.rsplit(".", 1)[0] original_name.rsplit(".", 1)[0]
if "." in original_name if "." in original_name
@ -587,49 +579,83 @@ async def upload_images_blob(
) )
filename = f"{name_without_ext}_{img_order:03d}{ext}" filename = f"{name_without_ext}_{img_order:03d}{ext}"
# 파일 내용 읽기
file_content = await file.read()
# Azure Blob Storage에 직접 업로드 # Azure Blob Storage에 직접 업로드
upload_success = await uploader.upload_image_bytes(file_content, filename) upload_success = await uploader.upload_image_bytes(file_content, filename)
if upload_success: if upload_success:
blob_url = uploader.public_url blob_url = uploader.public_url
img_name = file.filename or filename blob_upload_results.append((original_name, blob_url))
image = Image(
task_id=task_id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
source="blob",
)
)
img_order += 1 img_order += 1
else: else:
skipped_files.append(filename) skipped_files.append(filename)
saved_count = len(result_images) print(f"[upload_images_blob] Stage 2 done - blob uploads: "
await session.commit() f"{len(blob_upload_results)}, skipped: {len(skipped_files)}")
# Image 테이블에서 현재 task_id의 이미지 URL 목록 조회 # ========== Stage 3: DB 저장 (새 세션으로 빠르게 처리) ==========
result_images: list[ImageUploadResultItem] = []
img_order = 0
async with AsyncSessionLocal() as session:
# URL 이미지 저장
for url_item in url_images:
img_name = url_item.name or _extract_image_name(url_item.url, img_order)
image = Image(
task_id=task_id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
source="url",
)
)
img_order += 1
# Blob 업로드 결과 저장
for img_name, blob_url in blob_upload_results:
image = Image(
task_id=task_id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
source="blob",
)
)
img_order += 1
await session.commit()
saved_count = len(result_images)
image_urls = [img.img_url for img in result_images] image_urls = [img.img_url for img in result_images]
print(f"[upload_images_blob] SUCCESS - task_id: {task_id}, "
f"total: {saved_count}, returning response...")
return ImageUploadResponse( return ImageUploadResponse(
task_id=task_id, task_id=task_id,
total_count=len(result_images), total_count=len(result_images),
url_count=len(url_images), url_count=len(url_images),
file_count=len(valid_files) - len(skipped_files), file_count=len(blob_upload_results),
saved_count=saved_count, saved_count=saved_count,
images=result_images, images=result_images,
image_urls=image_urls, image_urls=image_urls,

View File

@ -6,7 +6,7 @@ Lyric Background Tasks
from sqlalchemy import select from sqlalchemy import select
from app.database.session import AsyncSessionLocal from app.database.session import BackgroundSessionLocal
from app.lyric.models import Lyric from app.lyric.models import Lyric
from app.utils.chatgpt_prompt import ChatgptService from app.utils.chatgpt_prompt import ChatgptService
@ -55,8 +55,8 @@ async def generate_lyric_background(
pattern.lower() in result.lower() for pattern in failure_patterns pattern.lower() in result.lower() for pattern in failure_patterns
) )
# Lyric 테이블 업데이트 ( 세션 사용) # Lyric 테이블 업데이트 (백그라운드 전용 세션 사용)
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
query_result = await session.execute( query_result = await session.execute(
select(Lyric) select(Lyric)
.where(Lyric.task_id == task_id) .where(Lyric.task_id == task_id)
@ -82,7 +82,7 @@ async def generate_lyric_background(
except Exception as e: except Exception as e:
print(f"[generate_lyric_background] EXCEPTION - task_id: {task_id}, error: {e}") print(f"[generate_lyric_background] EXCEPTION - task_id: {task_id}, error: {e}")
# 실패 시 Lyric 테이블 업데이트 # 실패 시 Lyric 테이블 업데이트
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
query_result = await session.execute( query_result = await session.execute(
select(Lyric) select(Lyric)
.where(Lyric.task_id == task_id) .where(Lyric.task_id == task_id)

View File

@ -11,7 +11,7 @@ import aiofiles
import httpx import httpx
from sqlalchemy import select from sqlalchemy import select
from app.database.session import AsyncSessionLocal from app.database.session import BackgroundSessionLocal
from app.song.models import Song from app.song.models import Song
from app.utils.common import generate_task_id from app.utils.common import generate_task_id
from app.utils.upload_blob_as_request import AzureBlobUploader from app.utils.upload_blob_as_request import AzureBlobUploader
@ -65,7 +65,7 @@ async def download_and_save_song(
print(f"[download_and_save_song] URL generated - task_id: {task_id}, url: {file_url}") print(f"[download_and_save_song] URL generated - task_id: {task_id}, url: {file_url}")
# Song 테이블 업데이트 (새 세션 사용) # Song 테이블 업데이트 (새 세션 사용)
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택 # 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute( result = await session.execute(
select(Song) select(Song)
@ -86,7 +86,7 @@ async def download_and_save_song(
except Exception as e: except Exception as e:
print(f"[download_and_save_song] EXCEPTION - task_id: {task_id}, error: {e}") print(f"[download_and_save_song] EXCEPTION - task_id: {task_id}, error: {e}")
# 실패 시 Song 테이블 업데이트 # 실패 시 Song 테이블 업데이트
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택 # 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute( result = await session.execute(
select(Song) select(Song)
@ -153,7 +153,7 @@ async def download_and_upload_song_to_blob(
print(f"[download_and_upload_song_to_blob] Uploaded to Blob - task_id: {task_id}, url: {blob_url}") print(f"[download_and_upload_song_to_blob] Uploaded to Blob - task_id: {task_id}, url: {blob_url}")
# Song 테이블 업데이트 (새 세션 사용) # Song 테이블 업데이트 (새 세션 사용)
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택 # 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute( result = await session.execute(
select(Song) select(Song)
@ -174,7 +174,7 @@ async def download_and_upload_song_to_blob(
except Exception as e: except Exception as e:
print(f"[download_and_upload_song_to_blob] EXCEPTION - task_id: {task_id}, error: {e}") print(f"[download_and_upload_song_to_blob] EXCEPTION - task_id: {task_id}, error: {e}")
# 실패 시 Song 테이블 업데이트 # 실패 시 Song 테이블 업데이트
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
result = await session.execute( result = await session.execute(
select(Song) select(Song)
.where(Song.task_id == task_id) .where(Song.task_id == task_id)
@ -226,7 +226,7 @@ async def download_and_upload_song_by_suno_task_id(
try: try:
# suno_task_id로 Song 조회하여 task_id 가져오기 # suno_task_id로 Song 조회하여 task_id 가져오기
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
result = await session.execute( result = await session.execute(
select(Song) select(Song)
.where(Song.suno_task_id == suno_task_id) .where(Song.suno_task_id == suno_task_id)
@ -277,7 +277,7 @@ async def download_and_upload_song_by_suno_task_id(
print(f"[download_and_upload_song_by_suno_task_id] Uploaded to Blob - suno_task_id: {suno_task_id}, url: {blob_url}") print(f"[download_and_upload_song_by_suno_task_id] Uploaded to Blob - suno_task_id: {suno_task_id}, url: {blob_url}")
# Song 테이블 업데이트 (새 세션 사용) # Song 테이블 업데이트 (새 세션 사용)
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
result = await session.execute( result = await session.execute(
select(Song) select(Song)
.where(Song.suno_task_id == suno_task_id) .where(Song.suno_task_id == suno_task_id)
@ -300,7 +300,7 @@ async def download_and_upload_song_by_suno_task_id(
print(f"[download_and_upload_song_by_suno_task_id] EXCEPTION - suno_task_id: {suno_task_id}, error: {e}") print(f"[download_and_upload_song_by_suno_task_id] EXCEPTION - suno_task_id: {suno_task_id}, error: {e}")
# 실패 시 Song 테이블 업데이트 # 실패 시 Song 테이블 업데이트
if task_id: if task_id:
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
result = await session.execute( result = await session.execute(
select(Song) select(Song)
.where(Song.suno_task_id == suno_task_id) .where(Song.suno_task_id == suno_task_id)

View File

@ -22,9 +22,15 @@ template = await creatomate.get_one_template_data(template_id)
# 영상 렌더링 요청 (비동기) # 영상 렌더링 요청 (비동기)
response = await creatomate.make_creatomate_call(template_id, modifications) response = await creatomate.make_creatomate_call(template_id, modifications)
``` ```
## 성능 최적화
- 템플릿 캐싱: 템플릿 데이터는 메모리에 캐싱되어 반복 조회 API 호출을 줄입니다.
- HTTP 클라이언트 재사용: 모듈 레벨의 공유 클라이언트로 커넥션 풀을 재사용합니다.
- 캐시 만료: 기본 5 자동 만료 (CACHE_TTL_SECONDS로 조정 가능)
""" """
import copy import copy
import time
from typing import Literal from typing import Literal
import httpx import httpx
@ -35,6 +41,51 @@ from config import apikey_settings, creatomate_settings
# Orientation 타입 정의 # Orientation 타입 정의
OrientationType = Literal["horizontal", "vertical"] OrientationType = Literal["horizontal", "vertical"]
# =============================================================================
# 모듈 레벨 캐시 및 HTTP 클라이언트 (싱글톤 패턴)
# =============================================================================
# 템플릿 캐시: {template_id: {"data": dict, "cached_at": float}}
_template_cache: dict[str, dict] = {}
# 캐시 TTL (초) - 기본 5분
CACHE_TTL_SECONDS = 300
# 모듈 레벨 공유 HTTP 클라이언트 (커넥션 풀 재사용)
_shared_client: httpx.AsyncClient | None = None
async def get_shared_client() -> httpx.AsyncClient:
"""공유 HTTP 클라이언트를 반환합니다. 없으면 생성합니다."""
global _shared_client
if _shared_client is None or _shared_client.is_closed:
_shared_client = httpx.AsyncClient(
timeout=httpx.Timeout(60.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
)
return _shared_client
async def close_shared_client() -> None:
"""공유 HTTP 클라이언트를 닫습니다. 앱 종료 시 호출하세요."""
global _shared_client
if _shared_client is not None and not _shared_client.is_closed:
await _shared_client.aclose()
_shared_client = None
print("[CreatomateService] Shared HTTP client closed")
def clear_template_cache() -> None:
"""템플릿 캐시를 전체 삭제합니다."""
global _template_cache
_template_cache.clear()
print("[CreatomateService] Template cache cleared")
def _is_cache_valid(cached_at: float) -> bool:
"""캐시가 유효한지 확인합니다."""
return (time.time() - cached_at) < CACHE_TTL_SECONDS
class CreatomateService: class CreatomateService:
"""Creatomate API를 통한 영상 생성 서비스 """Creatomate API를 통한 영상 생성 서비스
@ -90,18 +141,53 @@ class CreatomateService:
async def get_all_templates_data(self) -> dict: async def get_all_templates_data(self) -> dict:
"""모든 템플릿 정보를 조회합니다.""" """모든 템플릿 정보를 조회합니다."""
url = f"{self.BASE_URL}/v1/templates" url = f"{self.BASE_URL}/v1/templates"
async with httpx.AsyncClient() as client: client = await get_shared_client()
response = await client.get(url, headers=self.headers, timeout=30.0) response = await client.get(url, headers=self.headers, timeout=30.0)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
async def get_one_template_data(self, template_id: str) -> dict: async def get_one_template_data(
"""특정 템플릿 ID로 템플릿 정보를 조회합니다.""" self,
template_id: str,
use_cache: bool = True,
) -> dict:
"""특정 템플릿 ID로 템플릿 정보를 조회합니다.
Args:
template_id: 조회할 템플릿 ID
use_cache: 캐시 사용 여부 (기본: True)
Returns:
템플릿 데이터 (deep copy)
"""
global _template_cache
# 캐시 확인
if use_cache and template_id in _template_cache:
cached = _template_cache[template_id]
if _is_cache_valid(cached["cached_at"]):
print(f"[CreatomateService] Cache HIT - {template_id}")
return copy.deepcopy(cached["data"])
else:
# 만료된 캐시 삭제
del _template_cache[template_id]
print(f"[CreatomateService] Cache EXPIRED - {template_id}")
# API 호출
url = f"{self.BASE_URL}/v1/templates/{template_id}" url = f"{self.BASE_URL}/v1/templates/{template_id}"
async with httpx.AsyncClient() as client: client = await get_shared_client()
response = await client.get(url, headers=self.headers, timeout=30.0) response = await client.get(url, headers=self.headers, timeout=30.0)
response.raise_for_status() response.raise_for_status()
return response.json() data = response.json()
# 캐시 저장
_template_cache[template_id] = {
"data": data,
"cached_at": time.time(),
}
print(f"[CreatomateService] Cache MISS - {template_id} (cached)")
return copy.deepcopy(data)
# 하위 호환성을 위한 별칭 (deprecated) # 하위 호환성을 위한 별칭 (deprecated)
async def get_one_template_data_async(self, template_id: str) -> dict: async def get_one_template_data_async(self, template_id: str) -> dict:
@ -245,12 +331,12 @@ class CreatomateService:
"template_id": template_id, "template_id": template_id,
"modifications": modifications, "modifications": modifications,
} }
async with httpx.AsyncClient() as client: client = await get_shared_client()
response = await client.post( response = await client.post(
url, json=data, headers=self.headers, timeout=60.0 url, json=data, headers=self.headers, timeout=60.0
) )
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
async def make_creatomate_custom_call(self, source: dict) -> dict: async def make_creatomate_custom_call(self, source: dict) -> dict:
"""템플릿 없이 Creatomate에 커스텀 렌더링 요청을 보냅니다. """템플릿 없이 Creatomate에 커스텀 렌더링 요청을 보냅니다.
@ -259,12 +345,12 @@ class CreatomateService:
response에 요청 정보가 있으니 폴링 필요 response에 요청 정보가 있으니 폴링 필요
""" """
url = f"{self.BASE_URL}/v2/renders" url = f"{self.BASE_URL}/v2/renders"
async with httpx.AsyncClient() as client: client = await get_shared_client()
response = await client.post( response = await client.post(
url, json=source, headers=self.headers, timeout=60.0 url, json=source, headers=self.headers, timeout=60.0
) )
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
# 하위 호환성을 위한 별칭 (deprecated) # 하위 호환성을 위한 별칭 (deprecated)
async def make_creatomate_custom_call_async(self, source: dict) -> dict: async def make_creatomate_custom_call_async(self, source: dict) -> dict:
@ -293,10 +379,10 @@ class CreatomateService:
- failed: 실패 - failed: 실패
""" """
url = f"{self.BASE_URL}/v1/renders/{render_id}" url = f"{self.BASE_URL}/v1/renders/{render_id}"
async with httpx.AsyncClient() as client: client = await get_shared_client()
response = await client.get(url, headers=self.headers, timeout=30.0) response = await client.get(url, headers=self.headers, timeout=30.0)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
# 하위 호환성을 위한 별칭 (deprecated) # 하위 호환성을 위한 별칭 (deprecated)
async def get_render_status_async(self, render_id: str) -> dict: async def get_render_status_async(self, render_id: str) -> dict:

View File

@ -96,133 +96,173 @@ async def generate_video(
default="vertical", default="vertical",
description="영상 방향 (horizontal: 가로형, vertical: 세로형)", description="영상 방향 (horizontal: 가로형, vertical: 세로형)",
), ),
session: AsyncSession = Depends(get_session),
) -> GenerateVideoResponse: ) -> GenerateVideoResponse:
"""Creatomate API를 통해 영상을 생성합니다. """Creatomate API를 통해 영상을 생성합니다.
1. task_id로 Project, Lyric, Song, Image 조회 1. task_id로 Project, Lyric, Song, Image 병렬 조회
2. Video 테이블에 초기 데이터 저장 (status: processing) 2. Video 테이블에 초기 데이터 저장 (status: processing)
3. Creatomate API 호출 (orientation에 따른 템플릿 자동 선택) 3. Creatomate API 호출 (orientation에 따른 템플릿 자동 선택)
4. creatomate_render_id 업데이트 응답 반환 4. creatomate_render_id 업데이트 응답 반환
Note: 함수는 Depends(get_session) 사용하지 않고 명시적으로 세션을 관리합니다.
외부 API 호출 DB 커넥션이 유지되지 않도록 하여 커넥션 타임아웃 문제를 방지합니다.
DB 쿼리는 asyncio.gather() 사용하여 병렬로 실행됩니다.
""" """
import asyncio
from app.database.session import AsyncSessionLocal
print(f"[generate_video] START - task_id: {task_id}, orientation: {orientation}") print(f"[generate_video] START - task_id: {task_id}, orientation: {orientation}")
# ==========================================================================
# 1단계: DB 조회 및 초기 데이터 저장 (세션을 명시적으로 열고 닫음)
# ==========================================================================
# 외부 API 호출 전에 필요한 데이터를 저장할 변수들
project_id: int | None = None
lyric_id: int | None = None
song_id: int | None = None
video_id: int | None = None
music_url: str | None = None
song_duration: float | None = None
lyrics: str | None = None
image_urls: list[str] = []
try: try:
# 1. task_id로 Project 조회 (중복 시 최신 것 선택) # 세션을 명시적으로 열고 DB 작업 후 바로 닫음
project_result = await session.execute( async with AsyncSessionLocal() as session:
select(Project) # ===== 병렬 쿼리 실행: Project, Lyric, Song, Image 동시 조회 =====
.where(Project.task_id == task_id) project_query = select(Project).where(
.order_by(Project.created_at.desc()) Project.task_id == task_id
.limit(1) ).order_by(Project.created_at.desc()).limit(1)
)
project = project_result.scalar_one_or_none()
if not project: lyric_query = select(Lyric).where(
print(f"[generate_video] Project NOT FOUND - task_id: {task_id}") Lyric.task_id == task_id
raise HTTPException( ).order_by(Lyric.created_at.desc()).limit(1)
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.", song_query = select(Song).where(
Song.task_id == task_id
).order_by(Song.created_at.desc()).limit(1)
image_query = select(Image).where(
Image.task_id == task_id
).order_by(Image.img_order.asc())
# 4개 쿼리를 병렬로 실행
project_result, lyric_result, song_result, image_result = (
await asyncio.gather(
session.execute(project_query),
session.execute(lyric_query),
session.execute(song_query),
session.execute(image_query),
)
) )
print(f"[generate_video] Project found - project_id: {project.id}, task_id: {task_id}") print(f"[generate_video] Parallel queries completed - task_id: {task_id}")
# 2. task_id로 Lyric 조회 (중복 시 최신 것 선택) # ===== 결과 처리: Project =====
lyric_result = await session.execute( project = project_result.scalar_one_or_none()
select(Lyric) if not project:
.where(Lyric.task_id == task_id) print(f"[generate_video] Project NOT FOUND - task_id: {task_id}")
.order_by(Lyric.created_at.desc()) raise HTTPException(
.limit(1) status_code=404,
) detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.",
lyric = lyric_result.scalar_one_or_none() )
project_id = project.id
if not lyric: # ===== 결과 처리: Lyric =====
print(f"[generate_video] Lyric NOT FOUND - task_id: {task_id}") lyric = lyric_result.scalar_one_or_none()
raise HTTPException( if not lyric:
status_code=404, print(f"[generate_video] Lyric NOT FOUND - task_id: {task_id}")
detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.", raise HTTPException(
) status_code=404,
print(f"[generate_video] Lyric found - lyric_id: {lyric.id}, task_id: {task_id}") detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.",
)
lyric_id = lyric.id
# 3. task_id로 Song 조회 (가장 최근 것) # ===== 결과 처리: Song =====
song_result = await session.execute( song = song_result.scalar_one_or_none()
select(Song) if not song:
.where(Song.task_id == task_id) print(f"[generate_video] Song NOT FOUND - task_id: {task_id}")
.order_by(Song.created_at.desc()) raise HTTPException(
.limit(1) status_code=404,
) detail=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.",
song = song_result.scalar_one_or_none() )
if not song: song_id = song.id
print(f"[generate_video] Song NOT FOUND - task_id: {task_id}") music_url = song.song_result_url
raise HTTPException( song_duration = song.duration
status_code=404, lyrics = song.song_prompt
detail=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.",
if not music_url:
raise HTTPException(
status_code=400,
detail=f"Song(id={song_id})의 음악 URL이 없습니다.",
)
if not lyrics:
raise HTTPException(
status_code=400,
detail=f"Song(id={song_id})의 가사(song_prompt)가 없습니다.",
)
# ===== 결과 처리: Image =====
images = image_result.scalars().all()
if not images:
print(f"[generate_video] Image NOT FOUND - task_id: {task_id}")
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 이미지를 찾을 수 없습니다.",
)
image_urls = [img.img_url for img in images]
print(
f"[generate_video] Data loaded - task_id: {task_id}, "
f"project_id: {project_id}, lyric_id: {lyric_id}, "
f"song_id: {song_id}, images: {len(image_urls)}"
) )
# Song에서 music_url과 duration 가져오기 # ===== Video 테이블에 초기 데이터 저장 및 커밋 =====
music_url = song.song_result_url video = Video(
if not music_url: project_id=project_id,
print(f"[generate_video] Song has no result URL - task_id: {task_id}, song_id: {song.id}") lyric_id=lyric_id,
raise HTTPException( song_id=song_id,
status_code=400, task_id=task_id,
detail=f"Song(id={song.id})의 음악 URL이 없습니다. 노래 생성이 완료되었는지 확인하세요.", creatomate_render_id=None,
status="processing",
) )
session.add(video)
await session.commit()
video_id = video.id
print(f"[generate_video] Video saved - task_id: {task_id}, id: {video_id}")
# 세션이 여기서 자동으로 닫힘 (async with 블록 종료)
# Song에서 가사(song_prompt) 가져오기 except HTTPException:
lyrics = song.song_prompt raise
if not lyrics: except Exception as e:
print(f"[generate_video] Song has no lyrics (song_prompt) - task_id: {task_id}, song_id: {song.id}") print(f"[generate_video] DB EXCEPTION - task_id: {task_id}, error: {e}")
raise HTTPException( return GenerateVideoResponse(
status_code=400, success=False,
detail=f"Song(id={song.id})의 가사(song_prompt)가 없습니다.",
)
print(f"[generate_video] Song found - song_id: {song.id}, task_id: {task_id}, duration: {song.duration}")
print(f"[generate_video] Music URL (from DB): {music_url}, Song duration: {song.duration}, Lyrics length: {len(lyrics)}")
# 4. task_id로 Image 조회 (img_order 순서로 정렬)
image_result = await session.execute(
select(Image)
.where(Image.task_id == task_id)
.order_by(Image.img_order.asc())
)
images = image_result.scalars().all()
if not images:
print(f"[generate_video] Image NOT FOUND - task_id: {task_id}")
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 이미지를 찾을 수 없습니다.",
)
image_urls = [img.img_url for img in images]
print(f"[generate_video] Images found - task_id: {task_id}, count: {len(image_urls)}")
# 5. Video 테이블에 초기 데이터 저장
video = Video(
project_id=project.id,
lyric_id=lyric.id,
song_id=song.id,
task_id=task_id, task_id=task_id,
creatomate_render_id=None, creatomate_render_id=None,
status="processing", message="영상 생성 요청에 실패했습니다.",
error_message=str(e),
) )
session.add(video)
await session.flush() # ID 생성을 위해 flush
print(f"[generate_video] Video saved (processing) - task_id: {task_id}")
# 6. Creatomate API 호출 (POC 패턴 적용) # ==========================================================================
# 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음)
# ==========================================================================
try:
print(f"[generate_video] Creatomate API generation started - task_id: {task_id}") print(f"[generate_video] Creatomate API generation started - task_id: {task_id}")
# orientation에 따른 템플릿 선택, duration은 Song에서 가져옴 (없으면 config 기본값 사용)
creatomate_service = CreatomateService( creatomate_service = CreatomateService(
orientation=orientation, orientation=orientation,
target_duration=song.duration, # Song의 duration 사용 (None이면 config 기본값) target_duration=song_duration,
) )
print(f"[generate_video] Using template_id: {creatomate_service.template_id}, duration: {creatomate_service.target_duration} (song duration: {song.duration})") print(f"[generate_video] Using template_id: {creatomate_service.template_id}, duration: {creatomate_service.target_duration} (song duration: {song_duration})")
# 6-1. 템플릿 조회 (비동기, CreatomateService에서 orientation에 맞는 template_id 사용) # 6-1. 템플릿 조회 (비동기)
template = await creatomate_service.get_one_template_data_async(creatomate_service.template_id) template = await creatomate_service.get_one_template_data_async(creatomate_service.template_id)
print(f"[generate_video] Template fetched - task_id: {task_id}") print(f"[generate_video] Template fetched - task_id: {task_id}")
# 6-2. elements에서 리소스 매핑 생성 (music_url, lyrics는 DB에서 조회한 값 사용) # 6-2. elements에서 리소스 매핑 생성
modifications = creatomate_service.elements_connect_resource_blackbox( modifications = creatomate_service.elements_connect_resource_blackbox(
elements=template["source"]["elements"], elements=template["source"]["elements"],
image_url_list=image_urls, image_url_list=image_urls,
@ -239,7 +279,7 @@ async def generate_video(
template["source"]["elements"] = new_elements template["source"]["elements"] = new_elements
print(f"[generate_video] Elements modified - task_id: {task_id}") print(f"[generate_video] Elements modified - task_id: {task_id}")
# 6-4. duration 확장 (target_duration: 영상 길이) # 6-4. duration 확장
final_template = creatomate_service.extend_template_duration( final_template = creatomate_service.extend_template_duration(
template, template,
creatomate_service.target_duration, creatomate_service.target_duration,
@ -252,7 +292,7 @@ async def generate_video(
) )
print(f"[generate_video] Creatomate API response - task_id: {task_id}, response: {render_response}") print(f"[generate_video] Creatomate API response - task_id: {task_id}, response: {render_response}")
# 렌더 ID 추출 (응답이 리스트인 경우 첫 번째 항목 사용) # 렌더 ID 추출
if isinstance(render_response, list) and len(render_response) > 0: if isinstance(render_response, list) and len(render_response) > 0:
creatomate_render_id = render_response[0].get("id") creatomate_render_id = render_response[0].get("id")
elif isinstance(render_response, dict): elif isinstance(render_response, dict):
@ -260,9 +300,39 @@ async def generate_video(
else: else:
creatomate_render_id = None creatomate_render_id = None
# 7. creatomate_render_id 업데이트 except Exception as e:
video.creatomate_render_id = creatomate_render_id print(f"[generate_video] Creatomate API EXCEPTION - task_id: {task_id}, error: {e}")
await session.commit() # 외부 API 실패 시 Video 상태를 failed로 업데이트
from app.database.session import AsyncSessionLocal
async with AsyncSessionLocal() as update_session:
video_result = await update_session.execute(
select(Video).where(Video.id == video_id)
)
video_to_update = video_result.scalar_one_or_none()
if video_to_update:
video_to_update.status = "failed"
await update_session.commit()
return GenerateVideoResponse(
success=False,
task_id=task_id,
creatomate_render_id=None,
message="영상 생성 요청에 실패했습니다.",
error_message=str(e),
)
# ==========================================================================
# 3단계: creatomate_render_id 업데이트 (새 세션으로 빠르게 처리)
# ==========================================================================
try:
from app.database.session import AsyncSessionLocal
async with AsyncSessionLocal() as update_session:
video_result = await update_session.execute(
select(Video).where(Video.id == video_id)
)
video_to_update = video_result.scalar_one_or_none()
if video_to_update:
video_to_update.creatomate_render_id = creatomate_render_id
await update_session.commit()
print(f"[generate_video] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}") print(f"[generate_video] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}")
return GenerateVideoResponse( return GenerateVideoResponse(
@ -273,16 +343,13 @@ async def generate_video(
error_message=None, error_message=None,
) )
except HTTPException:
raise
except Exception as e: except Exception as e:
print(f"[generate_video] EXCEPTION - task_id: {task_id}, error: {e}") print(f"[generate_video] Update EXCEPTION - task_id: {task_id}, error: {e}")
await session.rollback()
return GenerateVideoResponse( return GenerateVideoResponse(
success=False, success=False,
task_id=task_id, task_id=task_id,
creatomate_render_id=None, creatomate_render_id=creatomate_render_id,
message="영상 생성 요청에 실패했습니다.", message="영상 생성 요청되었으나 DB 업데이트에 실패했습니다.",
error_message=str(e), error_message=str(e),
) )

View File

@ -10,7 +10,7 @@ import aiofiles
import httpx import httpx
from sqlalchemy import select from sqlalchemy import select
from app.database.session import AsyncSessionLocal from app.database.session import BackgroundSessionLocal
from app.video.models import Video from app.video.models import Video
from app.utils.upload_blob_as_request import AzureBlobUploader from app.utils.upload_blob_as_request import AzureBlobUploader
@ -66,7 +66,7 @@ async def download_and_upload_video_to_blob(
print(f"[download_and_upload_video_to_blob] Uploaded to Blob - task_id: {task_id}, url: {blob_url}") print(f"[download_and_upload_video_to_blob] Uploaded to Blob - task_id: {task_id}, url: {blob_url}")
# Video 테이블 업데이트 (새 세션 사용) # Video 테이블 업데이트 (새 세션 사용)
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택 # 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute( result = await session.execute(
select(Video) select(Video)
@ -87,7 +87,7 @@ async def download_and_upload_video_to_blob(
except Exception as e: except Exception as e:
print(f"[download_and_upload_video_to_blob] EXCEPTION - task_id: {task_id}, error: {e}") print(f"[download_and_upload_video_to_blob] EXCEPTION - task_id: {task_id}, error: {e}")
# 실패 시 Video 테이블 업데이트 # 실패 시 Video 테이블 업데이트
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
result = await session.execute( result = await session.execute(
select(Video) select(Video)
.where(Video.task_id == task_id) .where(Video.task_id == task_id)
@ -137,7 +137,7 @@ async def download_and_upload_video_by_creatomate_render_id(
try: try:
# creatomate_render_id로 Video 조회하여 task_id 가져오기 # creatomate_render_id로 Video 조회하여 task_id 가져오기
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
result = await session.execute( result = await session.execute(
select(Video) select(Video)
.where(Video.creatomate_render_id == creatomate_render_id) .where(Video.creatomate_render_id == creatomate_render_id)
@ -188,7 +188,7 @@ async def download_and_upload_video_by_creatomate_render_id(
print(f"[download_and_upload_video_by_creatomate_render_id] Uploaded to Blob - creatomate_render_id: {creatomate_render_id}, url: {blob_url}") print(f"[download_and_upload_video_by_creatomate_render_id] Uploaded to Blob - creatomate_render_id: {creatomate_render_id}, url: {blob_url}")
# Video 테이블 업데이트 (새 세션 사용) # Video 테이블 업데이트 (새 세션 사용)
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
result = await session.execute( result = await session.execute(
select(Video) select(Video)
.where(Video.creatomate_render_id == creatomate_render_id) .where(Video.creatomate_render_id == creatomate_render_id)
@ -209,7 +209,7 @@ async def download_and_upload_video_by_creatomate_render_id(
print(f"[download_and_upload_video_by_creatomate_render_id] EXCEPTION - creatomate_render_id: {creatomate_render_id}, error: {e}") print(f"[download_and_upload_video_by_creatomate_render_id] EXCEPTION - creatomate_render_id: {creatomate_render_id}, error: {e}")
# 실패 시 Video 테이블 업데이트 # 실패 시 Video 테이블 업데이트
if task_id: if task_id:
async with AsyncSessionLocal() as session: async with BackgroundSessionLocal() as session:
result = await session.execute( result = await session.execute(
select(Video) select(Video)
.where(Video.creatomate_render_id == creatomate_render_id) .where(Video.creatomate_render_id == creatomate_render_id)

View File

@ -0,0 +1,844 @@
# DB 쿼리 병렬화 (Query Parallelization) 완벽 가이드
> **목적**: Python asyncio와 SQLAlchemy를 활용한 DB 쿼리 병렬화의 이론부터 실무 적용까지
> **대상**: 비동기 프로그래밍 기초 지식이 있는 백엔드 개발자
> **환경**: Python 3.11+, SQLAlchemy 2.0+, FastAPI
---
## 목차
1. [이론적 배경](#1-이론적-배경)
2. [핵심 개념](#2-핵심-개념)
3. [설계 시 주의사항](#3-설계-시-주의사항)
4. [실무 시나리오 예제](#4-실무-시나리오-예제)
5. [성능 측정 및 모니터링](#5-성능-측정-및-모니터링)
6. [Best Practices](#6-best-practices)
---
## 1. 이론적 배경
### 1.1 동기 vs 비동기 실행
```
[순차 실행 - Sequential]
Query A ──────────▶ (100ms)
Query B ──────────▶ (100ms)
Query C ──────────▶ (100ms)
총 소요시간: 300ms
[병렬 실행 - Parallel]
Query A ──────────▶ (100ms)
Query B ──────────▶ (100ms)
Query C ──────────▶ (100ms)
총 소요시간: ~100ms (가장 느린 쿼리 기준)
```
### 1.2 왜 병렬화가 필요한가?
1. **I/O 바운드 작업의 특성**
- DB 쿼리는 네트워크 I/O가 대부분 (실제 CPU 작업은 짧음)
- 대기 시간 동안 다른 작업을 수행할 수 있음
2. **응답 시간 단축**
- N개의 독립적인 쿼리: `O(sum)``O(max)`
- 사용자 경험 개선
3. **리소스 효율성**
- 커넥션 풀을 효율적으로 활용
- 서버 처리량(throughput) 증가
### 1.3 asyncio.gather()의 동작 원리
```python
import asyncio
async def main():
# gather()는 모든 코루틴을 동시에 스케줄링
results = await asyncio.gather(
coroutine_1(), # Task 1 생성
coroutine_2(), # Task 2 생성
coroutine_3(), # Task 3 생성
)
# 모든 Task가 완료되면 결과를 리스트로 반환
return results
```
**핵심 동작:**
1. `gather()`는 각 코루틴을 Task로 래핑
2. 이벤트 루프가 모든 Task를 동시에 실행
3. I/O 대기 시 다른 Task로 컨텍스트 스위칭
4. 모든 Task 완료 시 결과 반환
---
## 2. 핵심 개념
### 2.1 독립성 판단 기준
병렬화가 가능한 쿼리의 조건:
| 조건 | 설명 | 예시 |
|------|------|------|
| **데이터 독립성** | 쿼리 간 결과 의존성 없음 | User, Product, Order 각각 조회 |
| **트랜잭션 독립성** | 같은 트랜잭션 내 순서 무관 | READ 작업들 |
| **비즈니스 독립성** | 결과 순서가 로직에 영향 없음 | 대시보드 데이터 조회 |
### 2.2 병렬화 불가능한 경우
```python
# ❌ 잘못된 예: 의존성이 있는 쿼리
user = await session.execute(select(User).where(User.id == user_id))
# orders 쿼리는 user.id에 의존 → 병렬화 불가
orders = await session.execute(
select(Order).where(Order.user_id == user.id)
)
```
```python
# ❌ 잘못된 예: 쓰기 후 읽기 (Write-then-Read)
await session.execute(insert(User).values(name="John"))
# 방금 생성된 데이터를 조회 → 순차 실행 필요
new_user = await session.execute(select(User).where(User.name == "John"))
```
### 2.3 SQLAlchemy AsyncSession과 병렬 쿼리
**중요**: 하나의 AsyncSession 내에서 `asyncio.gather()`로 여러 쿼리를 실행할 수 있습니다.
```python
async with AsyncSessionLocal() as session:
# 같은 세션에서 병렬 쿼리 실행 가능
results = await asyncio.gather(
session.execute(query1),
session.execute(query2),
session.execute(query3),
)
```
**단, 주의사항:**
- 같은 세션은 같은 트랜잭션을 공유
- 하나의 쿼리 실패 시 전체 트랜잭션에 영향
- 커넥션 풀 크기 고려 필요
---
## 3. 설계 시 주의사항
### 3.1 커넥션 풀 크기 설정
```python
# SQLAlchemy 엔진 설정
engine = create_async_engine(
url=db_url,
pool_size=20, # 기본 풀 크기
max_overflow=20, # 추가 연결 허용 수
pool_timeout=30, # 풀에서 연결 대기 시간
pool_recycle=3600, # 연결 재생성 주기
pool_pre_ping=True, # 연결 유효성 검사
)
```
**풀 크기 계산 공식:**
```
필요 커넥션 수 = 동시 요청 수 × 요청당 병렬 쿼리 수
```
예: 동시 10개 요청, 각 요청당 4개 병렬 쿼리
→ 최소 40개 커넥션 필요 (pool_size + max_overflow >= 40)
### 3.2 에러 처리 전략
```python
import asyncio
# 방법 1: return_exceptions=True (권장)
results = await asyncio.gather(
session.execute(query1),
session.execute(query2),
session.execute(query3),
return_exceptions=True, # 예외를 결과로 반환
)
# 결과 처리
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Query {i} failed: {result}")
else:
print(f"Query {i} succeeded: {result}")
```
```python
# 방법 2: 개별 try-except 래핑
async def safe_execute(session, query, name: str):
try:
return await session.execute(query)
except Exception as e:
print(f"[{name}] Query failed: {e}")
return None
results = await asyncio.gather(
safe_execute(session, query1, "project"),
safe_execute(session, query2, "song"),
safe_execute(session, query3, "image"),
)
```
### 3.3 타임아웃 설정
```python
import asyncio
async def execute_with_timeout(session, query, timeout_seconds: float):
"""타임아웃이 있는 쿼리 실행"""
try:
return await asyncio.wait_for(
session.execute(query),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
raise Exception(f"Query timed out after {timeout_seconds}s")
# 사용 예
results = await asyncio.gather(
execute_with_timeout(session, query1, 5.0),
execute_with_timeout(session, query2, 5.0),
execute_with_timeout(session, query3, 10.0), # 더 긴 타임아웃
)
```
### 3.4 N+1 문제와 병렬화
```python
# ❌ N+1 문제 발생 코드
videos = await session.execute(select(Video))
for video in videos.scalars():
# N번의 추가 쿼리 발생!
project = await session.execute(
select(Project).where(Project.id == video.project_id)
)
# ✅ 해결 방법 1: JOIN 사용
query = select(Video).options(selectinload(Video.project))
videos = await session.execute(query)
# ✅ 해결 방법 2: IN 절로 배치 조회
video_list = videos.scalars().all()
project_ids = [v.project_id for v in video_list if v.project_id]
projects_result = await session.execute(
select(Project).where(Project.id.in_(project_ids))
)
projects_map = {p.id: p for p in projects_result.scalars().all()}
```
### 3.5 트랜잭션 격리 수준 고려
| 격리 수준 | 병렬 쿼리 안전성 | 설명 |
|-----------|------------------|------|
| READ UNCOMMITTED | ⚠️ 주의 | Dirty Read 가능 |
| READ COMMITTED | ✅ 안전 | 대부분의 경우 적합 |
| REPEATABLE READ | ✅ 안전 | 일관된 스냅샷 |
| SERIALIZABLE | ✅ 안전 | 성능 저하 가능 |
---
## 4. 실무 시나리오 예제
### 4.1 시나리오 1: 대시보드 데이터 조회
**요구사항**: 사용자 대시보드에 필요한 여러 통계 데이터를 한 번에 조회
```python
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
async def get_dashboard_data(
session: AsyncSession,
user_id: int,
) -> dict:
"""
대시보드에 필요한 모든 데이터를 병렬로 조회합니다.
조회 항목:
- 사용자 정보
- 최근 주문 5개
- 총 주문 금액
- 찜한 상품 수
"""
# 1. 쿼리 정의 (아직 실행하지 않음)
user_query = select(User).where(User.id == user_id)
recent_orders_query = (
select(Order)
.where(Order.user_id == user_id)
.order_by(Order.created_at.desc())
.limit(5)
)
total_amount_query = (
select(func.sum(Order.amount))
.where(Order.user_id == user_id)
)
wishlist_count_query = (
select(func.count(Wishlist.id))
.where(Wishlist.user_id == user_id)
)
# 2. 4개 쿼리를 병렬로 실행
user_result, orders_result, amount_result, wishlist_result = (
await asyncio.gather(
session.execute(user_query),
session.execute(recent_orders_query),
session.execute(total_amount_query),
session.execute(wishlist_count_query),
)
)
# 3. 결과 처리
user = user_result.scalar_one_or_none()
if not user:
raise ValueError(f"User {user_id} not found")
return {
"user": {
"id": user.id,
"name": user.name,
"email": user.email,
},
"recent_orders": [
{"id": o.id, "amount": o.amount, "status": o.status}
for o in orders_result.scalars().all()
],
"total_spent": amount_result.scalar() or 0,
"wishlist_count": wishlist_result.scalar() or 0,
}
# 사용 예시 (FastAPI)
@router.get("/dashboard")
async def dashboard(
user_id: int,
session: AsyncSession = Depends(get_session),
):
return await get_dashboard_data(session, user_id)
```
**성능 비교:**
- 순차 실행: ~200ms (50ms × 4)
- 병렬 실행: ~60ms (가장 느린 쿼리 기준)
- **개선율: 약 70%**
---
### 4.2 시나리오 2: 복합 검색 결과 조회
**요구사항**: 검색 결과와 함께 필터 옵션(카테고리 수, 가격 범위 등)을 조회
```python
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
from typing import NamedTuple
class SearchFilters(NamedTuple):
"""검색 필터 결과"""
categories: list[dict]
price_range: dict
brands: list[dict]
class SearchResult(NamedTuple):
"""전체 검색 결과"""
items: list
total_count: int
filters: SearchFilters
async def search_products_with_filters(
session: AsyncSession,
keyword: str,
page: int = 1,
page_size: int = 20,
) -> SearchResult:
"""
상품 검색과 필터 옵션을 병렬로 조회합니다.
병렬 실행 쿼리:
1. 상품 목록 (페이지네이션)
2. 전체 개수
3. 카테고리별 개수
4. 가격 범위 (min, max)
5. 브랜드별 개수
"""
# 기본 검색 조건
base_condition = Product.name.ilike(f"%{keyword}%")
# 쿼리 정의
items_query = (
select(Product)
.where(base_condition)
.order_by(Product.created_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
count_query = (
select(func.count(Product.id))
.where(base_condition)
)
category_stats_query = (
select(
Product.category_id,
Category.name.label("category_name"),
func.count(Product.id).label("count")
)
.join(Category, Product.category_id == Category.id)
.where(base_condition)
.group_by(Product.category_id, Category.name)
)
price_range_query = (
select(
func.min(Product.price).label("min_price"),
func.max(Product.price).label("max_price"),
)
.where(base_condition)
)
brand_stats_query = (
select(
Product.brand,
func.count(Product.id).label("count")
)
.where(and_(base_condition, Product.brand.isnot(None)))
.group_by(Product.brand)
.order_by(func.count(Product.id).desc())
.limit(10)
)
# 5개 쿼리 병렬 실행
(
items_result,
count_result,
category_result,
price_result,
brand_result,
) = await asyncio.gather(
session.execute(items_query),
session.execute(count_query),
session.execute(category_stats_query),
session.execute(price_range_query),
session.execute(brand_stats_query),
)
# 결과 처리
items = items_result.scalars().all()
total_count = count_result.scalar() or 0
categories = [
{"id": row.category_id, "name": row.category_name, "count": row.count}
for row in category_result.all()
]
price_row = price_result.one()
price_range = {
"min": float(price_row.min_price or 0),
"max": float(price_row.max_price or 0),
}
brands = [
{"name": row.brand, "count": row.count}
for row in brand_result.all()
]
return SearchResult(
items=items,
total_count=total_count,
filters=SearchFilters(
categories=categories,
price_range=price_range,
brands=brands,
),
)
# 사용 예시 (FastAPI)
@router.get("/search")
async def search(
keyword: str,
page: int = 1,
session: AsyncSession = Depends(get_session),
):
result = await search_products_with_filters(session, keyword, page)
return {
"items": [item.to_dict() for item in result.items],
"total_count": result.total_count,
"filters": {
"categories": result.filters.categories,
"price_range": result.filters.price_range,
"brands": result.filters.brands,
},
}
```
**성능 비교:**
- 순차 실행: ~350ms (70ms × 5)
- 병렬 실행: ~80ms
- **개선율: 약 77%**
---
### 4.3 시나리오 3: 다중 테이블 데이터 수집 (본 프로젝트 실제 적용 예)
**요구사항**: 영상 생성을 위해 Project, Lyric, Song, Image 데이터를 한 번에 조회
```python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
from dataclasses import dataclass
from fastapi import HTTPException
@dataclass
class VideoGenerationData:
"""영상 생성에 필요한 데이터"""
project_id: int
lyric_id: int
song_id: int
music_url: str
song_duration: float
lyrics: str
image_urls: list[str]
async def fetch_video_generation_data(
session: AsyncSession,
task_id: str,
) -> VideoGenerationData:
"""
영상 생성에 필요한 모든 데이터를 병렬로 조회합니다.
이 함수는 4개의 독립적인 테이블을 조회합니다:
- Project: 프로젝트 정보
- Lyric: 가사 정보
- Song: 노래 정보 (음악 URL, 길이, 가사)
- Image: 이미지 목록
각 테이블은 task_id로 연결되어 있으며, 서로 의존성이 없으므로
병렬 조회가 가능합니다.
"""
# ============================================================
# Step 1: 쿼리 객체 생성 (아직 실행하지 않음)
# ============================================================
project_query = (
select(Project)
.where(Project.task_id == task_id)
.order_by(Project.created_at.desc())
.limit(1)
)
lyric_query = (
select(Lyric)
.where(Lyric.task_id == task_id)
.order_by(Lyric.created_at.desc())
.limit(1)
)
song_query = (
select(Song)
.where(Song.task_id == task_id)
.order_by(Song.created_at.desc())
.limit(1)
)
image_query = (
select(Image)
.where(Image.task_id == task_id)
.order_by(Image.img_order.asc())
)
# ============================================================
# Step 2: asyncio.gather()로 4개 쿼리 병렬 실행
# ============================================================
#
# 병렬 실행의 핵심:
# - 각 쿼리는 독립적 (서로의 결과에 의존하지 않음)
# - 같은 세션 내에서 실행 (같은 트랜잭션 공유)
# - 가장 느린 쿼리 시간만큼만 소요됨
#
project_result, lyric_result, song_result, image_result = (
await asyncio.gather(
session.execute(project_query),
session.execute(lyric_query),
session.execute(song_query),
session.execute(image_query),
)
)
# ============================================================
# Step 3: 결과 검증 및 데이터 추출
# ============================================================
# Project 검증
project = project_result.scalar_one_or_none()
if not project:
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.",
)
# Lyric 검증
lyric = lyric_result.scalar_one_or_none()
if not lyric:
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.",
)
# Song 검증 및 데이터 추출
song = song_result.scalar_one_or_none()
if not song:
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.",
)
if not song.song_result_url:
raise HTTPException(
status_code=400,
detail=f"Song(id={song.id})의 음악 URL이 없습니다.",
)
if not song.song_prompt:
raise HTTPException(
status_code=400,
detail=f"Song(id={song.id})의 가사(song_prompt)가 없습니다.",
)
# Image 검증
images = image_result.scalars().all()
if not images:
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 이미지를 찾을 수 없습니다.",
)
# ============================================================
# Step 4: 결과 반환
# ============================================================
return VideoGenerationData(
project_id=project.id,
lyric_id=lyric.id,
song_id=song.id,
music_url=song.song_result_url,
song_duration=song.duration or 60.0,
lyrics=song.song_prompt,
image_urls=[img.img_url for img in images],
)
# 실제 사용 예시
async def generate_video(task_id: str) -> dict:
async with AsyncSessionLocal() as session:
# 병렬 쿼리로 데이터 조회
data = await fetch_video_generation_data(session, task_id)
# Video 레코드 생성
video = Video(
project_id=data.project_id,
lyric_id=data.lyric_id,
song_id=data.song_id,
task_id=task_id,
status="processing",
)
session.add(video)
await session.commit()
# 세션 종료 후 외부 API 호출
# (커넥션 타임아웃 방지)
return await call_creatomate_api(data)
```
**성능 비교:**
- 순차 실행: ~200ms (약 50ms × 4쿼리)
- 병렬 실행: ~55ms
- **개선율: 약 72%**
---
## 5. 성능 측정 및 모니터링
### 5.1 실행 시간 측정 데코레이터
```python
import time
import functools
from typing import Callable, TypeVar
T = TypeVar("T")
def measure_time(func: Callable[..., T]) -> Callable[..., T]:
"""함수 실행 시간을 측정하는 데코레이터"""
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return await func(*args, **kwargs)
finally:
elapsed = (time.perf_counter() - start) * 1000
print(f"[{func.__name__}] Execution time: {elapsed:.2f}ms")
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return func(*args, **kwargs)
finally:
elapsed = (time.perf_counter() - start) * 1000
print(f"[{func.__name__}] Execution time: {elapsed:.2f}ms")
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
# 사용 예
@measure_time
async def fetch_data(session, task_id):
...
```
### 5.2 병렬 쿼리 성능 비교 유틸리티
```python
import asyncio
import time
async def compare_sequential_vs_parallel(
session: AsyncSession,
queries: list,
labels: list[str] | None = None,
) -> dict:
"""순차 실행과 병렬 실행의 성능을 비교합니다."""
labels = labels or [f"Query {i}" for i in range(len(queries))]
# 순차 실행
sequential_start = time.perf_counter()
sequential_results = []
for query in queries:
result = await session.execute(query)
sequential_results.append(result)
sequential_time = (time.perf_counter() - sequential_start) * 1000
# 병렬 실행
parallel_start = time.perf_counter()
parallel_results = await asyncio.gather(
*[session.execute(query) for query in queries]
)
parallel_time = (time.perf_counter() - parallel_start) * 1000
improvement = ((sequential_time - parallel_time) / sequential_time) * 100
return {
"sequential_time_ms": round(sequential_time, 2),
"parallel_time_ms": round(parallel_time, 2),
"improvement_percent": round(improvement, 1),
"query_count": len(queries),
}
```
### 5.3 SQLAlchemy 쿼리 로깅
```python
import logging
# SQLAlchemy 쿼리 로깅 활성화
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
# 또는 엔진 생성 시 echo=True
engine = create_async_engine(url, echo=True)
```
---
## 6. Best Practices
### 6.1 체크리스트
병렬화 적용 전 확인사항:
- [ ] 쿼리들이 서로 독립적인가? (결과 의존성 없음)
- [ ] 모든 쿼리가 READ 작업인가? (또는 순서 무관한 WRITE)
- [ ] 커넥션 풀 크기가 충분한가?
- [ ] 에러 처리 전략이 수립되어 있는가?
- [ ] 타임아웃 설정이 적절한가?
### 6.2 권장 패턴
```python
# ✅ 권장: 쿼리 정의와 실행 분리
async def fetch_data(session: AsyncSession, task_id: str):
# 1. 쿼리 객체 정의 (명확한 의도 표현)
project_query = select(Project).where(Project.task_id == task_id)
song_query = select(Song).where(Song.task_id == task_id)
# 2. 병렬 실행
results = await asyncio.gather(
session.execute(project_query),
session.execute(song_query),
)
# 3. 결과 처리
return process_results(results)
```
### 6.3 피해야 할 패턴
```python
# ❌ 피하기: 인라인 쿼리 (가독성 저하)
results = await asyncio.gather(
session.execute(select(A).where(A.x == y).order_by(A.z.desc()).limit(1)),
session.execute(select(B).where(B.a == b).order_by(B.c.desc()).limit(1)),
)
# ❌ 피하기: 과도한 병렬화 (커넥션 고갈)
# 100개 쿼리를 동시에 실행하면 커넥션 풀 고갈 위험
results = await asyncio.gather(*[session.execute(q) for q in queries])
# ✅ 해결: 배치 처리
BATCH_SIZE = 10
for i in range(0, len(queries), BATCH_SIZE):
batch = queries[i:i + BATCH_SIZE]
results = await asyncio.gather(*[session.execute(q) for q in batch])
```
### 6.4 성능 최적화 팁
1. **인덱스 확인**: 병렬화해도 인덱스 없으면 느림
2. **쿼리 최적화 우선**: 병렬화 전에 개별 쿼리 최적화
3. **적절한 병렬 수준**: 보통 3-10개가 적절
4. **모니터링 필수**: 실제 개선 효과 측정
---
## 부록: 관련 자료
- [SQLAlchemy 2.0 AsyncIO 문서](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html)
- [Python asyncio 공식 문서](https://docs.python.org/3/library/asyncio.html)
- [FastAPI 비동기 데이터베이스](https://fastapi.tiangolo.com/async/)

File diff suppressed because it is too large Load Diff

1488
docs/analysis/refactoring.md Normal file

File diff suppressed because it is too large Load Diff