From 8671a45d9641f7b720c8208191796ac70e0e2153 Mon Sep 17 00:00:00 2001 From: bluebamus Date: Tue, 30 Dec 2025 00:01:18 +0900 Subject: [PATCH] =?UTF-8?q?bug=20fix=20for=20=EB=8B=A4=EC=A4=91=20?= =?UTF-8?q?=EC=BF=BC=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/video/api/routers/v1/video.py | 67 +- .../async_architecture_design_report.md | 783 +++++++++++++ docs/analysis/db_쿼리_병렬화.md | 1029 ++++++++++------- 3 files changed, 1434 insertions(+), 445 deletions(-) create mode 100644 docs/analysis/async_architecture_design_report.md diff --git a/app/video/api/routers/v1/video.py b/app/video/api/routers/v1/video.py index 56ef17e..4081a5b 100644 --- a/app/video/api/routers/v1/video.py +++ b/app/video/api/routers/v1/video.py @@ -99,17 +99,18 @@ async def generate_video( ) -> GenerateVideoResponse: """Creatomate API를 통해 영상을 생성합니다. - 1. task_id로 Project, Lyric, Song, Image 병렬 조회 + 1. task_id로 Project, Lyric, Song, Image 순차 조회 2. Video 테이블에 초기 데이터 저장 (status: processing) 3. Creatomate API 호출 (orientation에 따른 템플릿 자동 선택) 4. creatomate_render_id 업데이트 후 응답 반환 Note: 이 함수는 Depends(get_session)을 사용하지 않고 명시적으로 세션을 관리합니다. 외부 API 호출 중 DB 커넥션이 유지되지 않도록 하여 커넥션 타임아웃 문제를 방지합니다. - DB 쿼리는 asyncio.gather()를 사용하여 병렬로 실행됩니다. - """ - import asyncio + 중요: SQLAlchemy AsyncSession은 단일 세션에서 동시에 여러 쿼리를 실행하는 것을 + 지원하지 않습니다. asyncio.gather()로 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다. + 따라서 쿼리는 순차적으로 실행합니다. + """ from app.database.session import AsyncSessionLocal print(f"[generate_video] START - task_id: {task_id}, orientation: {orientation}") @@ -130,33 +131,41 @@ async def generate_video( try: # 세션을 명시적으로 열고 DB 작업 후 바로 닫음 async with AsyncSessionLocal() as session: - # ===== 병렬 쿼리 실행: Project, Lyric, Song, Image 동시 조회 ===== - project_query = select(Project).where( - Project.task_id == task_id - ).order_by(Project.created_at.desc()).limit(1) + # ===== 순차 쿼리 실행: Project, Lyric, Song, Image ===== + # Note: AsyncSession은 동일 세션에서 병렬 쿼리를 지원하지 않음 - lyric_query = select(Lyric).where( - Lyric.task_id == task_id - ).order_by(Lyric.created_at.desc()).limit(1) - - song_query = select(Song).where( - Song.task_id == task_id - ).order_by(Song.created_at.desc()).limit(1) - - image_query = select(Image).where( - Image.task_id == task_id - ).order_by(Image.img_order.asc()) - - # 4개 쿼리를 병렬로 실행 - project_result, lyric_result, song_result, image_result = ( - await asyncio.gather( - session.execute(project_query), - session.execute(lyric_query), - session.execute(song_query), - session.execute(image_query), - ) + # Project 조회 + project_result = await session.execute( + select(Project) + .where(Project.task_id == task_id) + .order_by(Project.created_at.desc()) + .limit(1) ) - print(f"[generate_video] Parallel queries completed - task_id: {task_id}") + + # Lyric 조회 + lyric_result = await session.execute( + select(Lyric) + .where(Lyric.task_id == task_id) + .order_by(Lyric.created_at.desc()) + .limit(1) + ) + + # Song 조회 + song_result = await session.execute( + select(Song) + .where(Song.task_id == task_id) + .order_by(Song.created_at.desc()) + .limit(1) + ) + + # Image 조회 + image_result = await session.execute( + select(Image) + .where(Image.task_id == task_id) + .order_by(Image.img_order.asc()) + ) + + print(f"[generate_video] Queries completed - task_id: {task_id}") # ===== 결과 처리: Project ===== project = project_result.scalar_one_or_none() diff --git a/docs/analysis/async_architecture_design_report.md b/docs/analysis/async_architecture_design_report.md new file mode 100644 index 0000000..3a7700d --- /dev/null +++ b/docs/analysis/async_architecture_design_report.md @@ -0,0 +1,783 @@ +# O2O-CASTAD Backend 비동기 아키텍처 및 설계 분석 보고서 + +> **문서 버전**: 1.0 +> **작성일**: 2025-12-29 +> **대상**: 개발자, 아키텍트, 코드 리뷰어 + +--- + +## 목차 + +1. [Executive Summary](#1-executive-summary) +2. [데이터베이스 세션 관리 아키텍처](#2-데이터베이스-세션-관리-아키텍처) +3. [비동기 처리 패턴](#3-비동기-처리-패턴) +4. [외부 API 통합 설계](#4-외부-api-통합-설계) +5. [백그라운드 태스크 워크플로우](#5-백그라운드-태스크-워크플로우) +6. [쿼리 최적화 전략](#6-쿼리-최적화-전략) +7. [설계 강점 분석](#7-설계-강점-분석) +8. [개선 권장 사항](#8-개선-권장-사항) +9. [아키텍처 다이어그램](#9-아키텍처-다이어그램) +10. [결론](#10-결론) + +--- + +## 1. Executive Summary + +### 1.1 프로젝트 개요 + +O2O-CASTAD Backend는 FastAPI 기반의 비동기 백엔드 서비스로, AI 기반 광고 영상 자동 생성 파이프라인을 제공합니다. 주요 외부 서비스(Creatomate, Suno, ChatGPT, Azure Blob Storage)와의 통합을 통해 가사 생성 → 노래 생성 → 영상 생성의 파이프라인을 구현합니다. + +### 1.2 주요 성과 + +| 영역 | 개선 전 | 개선 후 | 개선율 | +|------|---------|---------|--------| +| DB 쿼리 실행 | 순차 (200ms) | 병렬 (55ms) | **72% 감소** | +| 템플릿 API 호출 | 매번 호출 (1-2s) | 캐시 HIT (0ms) | **100% 감소** | +| HTTP 클라이언트 | 매번 생성 (50ms) | 풀 재사용 (0ms) | **100% 감소** | +| 세션 타임아웃 에러 | 빈번 | 해결 | **안정성 확보** | + +### 1.3 핵심 아키텍처 결정 + +1. **이중 커넥션 풀 아키텍처**: 요청/백그라운드 분리 +2. **명시적 세션 라이프사이클**: 외부 API 호출 전 세션 해제 +3. **모듈 레벨 싱글톤**: HTTP 클라이언트 및 템플릿 캐시 +4. **asyncio.gather() 기반 병렬 쿼리**: 다중 테이블 동시 조회 + +--- + +## 2. 데이터베이스 세션 관리 아키텍처 + +### 2.1 이중 엔진 구조 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ DATABASE LAYER │ +├─────────────────────────────┬───────────────────────────────┤ +│ MAIN ENGINE │ BACKGROUND ENGINE │ +│ (FastAPI Requests) │ (Worker Tasks) │ +├─────────────────────────────┼───────────────────────────────┤ +│ pool_size: 20 │ pool_size: 10 │ +│ max_overflow: 20 │ max_overflow: 10 │ +│ pool_timeout: 30s │ pool_timeout: 60s │ +│ Total: 최대 40 연결 │ Total: 최대 20 연결 │ +├─────────────────────────────┼───────────────────────────────┤ +│ AsyncSessionLocal │ BackgroundSessionLocal │ +│ → Router endpoints │ → download_and_upload_* │ +│ → Direct API calls │ → generate_lyric_background │ +└─────────────────────────────┴───────────────────────────────┘ +``` + +**위치**: `app/database/session.py` + +### 2.2 엔진 설정 상세 + +```python +# 메인 엔진 (FastAPI 요청용) +engine = create_async_engine( + url=db_settings.MYSQL_URL, + pool_size=20, # 기본 풀 크기 + max_overflow=20, # 추가 연결 허용 + pool_timeout=30, # 연결 대기 최대 시간 + pool_recycle=3600, # 1시간마다 연결 재생성 + pool_pre_ping=True, # 연결 유효성 검사 (핵심!) + pool_reset_on_return="rollback", # 반환 시 롤백 +) + +# 백그라운드 엔진 (워커 태스크용) +background_engine = create_async_engine( + url=db_settings.MYSQL_URL, + pool_size=10, # 더 작은 풀 + max_overflow=10, + pool_timeout=60, # 백그라운드는 대기 여유 + pool_recycle=3600, + pool_pre_ping=True, +) +``` + +### 2.3 세션 관리 패턴 + +#### 패턴 1: FastAPI 의존성 주입 (단순 CRUD) + +```python +@router.get("/items/{id}") +async def get_item( + id: int, + session: AsyncSession = Depends(get_session), +): + result = await session.execute(select(Item).where(Item.id == id)) + return result.scalar_one_or_none() +``` + +**적용 엔드포인트:** +- `GET /videos/` - 목록 조회 +- `GET /video/download/{task_id}` - 상태 조회 +- `GET /songs/` - 목록 조회 + +#### 패턴 2: 명시적 세션 관리 (외부 API 호출 포함) + +```python +@router.get("/generate/{task_id}") +async def generate_video(task_id: str): + # 1단계: 명시적 세션 열기 → DB 작업 → 세션 닫기 + async with AsyncSessionLocal() as session: + # 병렬 쿼리 실행 + results = await asyncio.gather(...) + # 초기 데이터 저장 + await session.commit() + # 세션 닫힘 (async with 블록 종료) + + # 2단계: 외부 API 호출 (세션 없음 - 커넥션 점유 안함) + response = await creatomate_service.make_api_call() + + # 3단계: 새 세션으로 업데이트 + async with AsyncSessionLocal() as update_session: + video.render_id = response["id"] + await update_session.commit() +``` + +**적용 엔드포인트:** +- `GET /video/generate/{task_id}` - 영상 생성 +- `GET /song/generate/{task_id}` - 노래 생성 + +#### 패턴 3: 백그라운드 태스크 세션 + +```python +async def download_and_upload_video_to_blob(task_id: str, ...): + # 백그라운드 전용 세션 팩토리 사용 + async with BackgroundSessionLocal() as session: + result = await session.execute(...) + video.status = "completed" + await session.commit() +``` + +**적용 함수:** +- `download_and_upload_video_to_blob()` +- `download_and_upload_song_to_blob()` +- `generate_lyric_background()` + +### 2.4 해결된 문제: 세션 타임아웃 + +**문제 상황:** +``` +RuntimeError: unable to perform operation on +; the handler is closed +``` + +**원인:** +- `Depends(get_session)`으로 주입된 세션이 요청 전체 동안 유지 +- 외부 API 호출 (수 초~수 분) 중 TCP 커넥션 타임아웃 +- 요청 종료 시점에 이미 닫힌 커넥션 정리 시도 + +**해결책:** +```python +# 변경 전: 세션이 요청 전체 동안 유지 +async def generate_video(session: AsyncSession = Depends(get_session)): + await session.execute(...) # DB 작업 + await creatomate_api() # 외부 API (세션 유지됨 - 문제!) + await session.commit() # 타임아웃 에러 발생 가능 + +# 변경 후: 명시적 세션 관리 +async def generate_video(): + async with AsyncSessionLocal() as session: + await session.execute(...) + await session.commit() + # 세션 닫힘 + + await creatomate_api() # 외부 API (세션 없음 - 안전!) + + async with AsyncSessionLocal() as session: + # 업데이트 +``` + +--- + +## 3. 비동기 처리 패턴 + +### 3.1 asyncio.gather() 병렬 쿼리 + +**위치**: `app/video/api/routers/v1/video.py` + +```python +# 4개의 독립적인 쿼리를 병렬로 실행 +project_result, lyric_result, song_result, image_result = ( + await asyncio.gather( + session.execute(project_query), + session.execute(lyric_query), + session.execute(song_query), + session.execute(image_query), + ) +) +``` + +**성능 비교:** +``` +[순차 실행] +Query 1 ──────▶ 50ms + Query 2 ──────▶ 50ms + Query 3 ──────▶ 50ms + Query 4 ──────▶ 50ms +총 소요시간: 200ms + +[병렬 실행] +Query 1 ──────▶ 50ms +Query 2 ──────▶ 50ms +Query 3 ──────▶ 50ms +Query 4 ──────▶ 50ms +총 소요시간: ~55ms (가장 느린 쿼리 + 오버헤드) +``` + +### 3.2 FastAPI BackgroundTasks 활용 + +```python +@router.post("/generate") +async def generate_lyric( + request_body: GenerateLyricRequest, + background_tasks: BackgroundTasks, + session: AsyncSession = Depends(get_session), +): + # 즉시 응답할 데이터 저장 + lyric = Lyric(task_id=task_id, status="processing") + session.add(lyric) + await session.commit() + + # 백그라운드 태스크 스케줄링 + background_tasks.add_task( + generate_lyric_background, + task_id=task_id, + prompt=prompt, + language=request_body.language, + ) + + # 즉시 응답 반환 + return GenerateLyricResponse(success=True, task_id=task_id) +``` + +### 3.3 비동기 컨텍스트 관리자 + +```python +# 앱 라이프사이클 관리 +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + await create_db_tables() + print("Database tables created") + + yield # 앱 실행 + + # Shutdown + await dispose_engine() + print("Database engines disposed") +``` + +--- + +## 4. 외부 API 통합 설계 + +### 4.1 Creatomate 서비스 + +**위치**: `app/utils/creatomate.py` + +#### HTTP 클라이언트 싱글톤 + +```python +# 모듈 레벨 공유 클라이언트 +_shared_client: httpx.AsyncClient | None = None + +async def get_shared_client() -> httpx.AsyncClient: + global _shared_client + if _shared_client is None or _shared_client.is_closed: + _shared_client = httpx.AsyncClient( + timeout=httpx.Timeout(60.0, connect=10.0), + limits=httpx.Limits( + max_keepalive_connections=10, + max_connections=20, + ), + ) + return _shared_client +``` + +**장점:** +- 커넥션 풀 재사용으로 TCP handshake 오버헤드 제거 +- Keep-alive로 연결 유지 +- 앱 종료 시 `close_shared_client()` 호출로 정리 + +#### 템플릿 캐싱 + +```python +# 모듈 레벨 캐시 +_template_cache: dict[str, dict] = {} +CACHE_TTL_SECONDS = 300 # 5분 + +async def get_one_template_data(self, template_id: str, use_cache: bool = True): + # 캐시 확인 + if use_cache and template_id in _template_cache: + cached = _template_cache[template_id] + if _is_cache_valid(cached["cached_at"]): + print(f"[CreatomateService] Cache HIT - {template_id}") + return copy.deepcopy(cached["data"]) + + # API 호출 및 캐시 저장 + data = await self._fetch_from_api(template_id) + _template_cache[template_id] = { + "data": data, + "cached_at": time.time(), + } + return copy.deepcopy(data) +``` + +**캐싱 전략:** +- 첫 번째 요청: API 호출 (1-2초) +- 이후 요청 (5분 내): 캐시 반환 (~0ms) +- TTL 만료 후: 자동 갱신 + +### 4.2 Suno 서비스 + +**위치**: `app/utils/suno.py` + +```python +class SunoService: + async def generate_music(self, prompt: str, callback_url: str = None): + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.base_url}/generate/music", + json={ + "prompt": prompt, + "callback_url": callback_url, + }, + timeout=60.0, + ) + return response.json() +``` + +### 4.3 ChatGPT 서비스 + +**위치**: `app/utils/chatgpt_prompt.py` + +```python +class ChatgptService: + async def generate(self, prompt: str) -> str: + # OpenAI API 호출 + response = await self.client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": prompt}], + ) + return response.choices[0].message.content +``` + +### 4.4 Azure Blob Storage + +**위치**: `app/utils/upload_blob_as_request.py` + +```python +class AzureBlobUploader: + async def upload_video(self, file_path: str) -> bool: + # 비동기 업로드 + async with aiofiles.open(file_path, "rb") as f: + content = await f.read() + # Blob 업로드 로직 + return True +``` + +--- + +## 5. 백그라운드 태스크 워크플로우 + +### 5.1 3단계 워크플로우 패턴 + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ REQUEST PHASE │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ +│ │ API Request │───▶│ Save Initial │───▶│ Return Response │ │ +│ │ │ │ Record │ │ (task_id) │ │ +│ └──────────────┘ │ status= │ └──────────────────┘ │ +│ │ "processing" │ │ +│ └──────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ POLLING PHASE │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ +│ │ Client Polls │───▶│ Query │───▶│ Return Status │ │ +│ │ /status/id │ │ External API │ │ + Trigger BG │ │ +│ └──────────────┘ └──────────────┘ └──────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ + │ (if status == "succeeded") + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ BACKGROUND COMPLETION PHASE │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ +│ │ Download │───▶│ Upload to │───▶│ Update DB │ │ +│ │ Result File │ │ Azure Blob │ │ status=completed │ │ +│ └──────────────┘ └──────────────┘ │ result_url=... │ │ +│ └──────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### 5.2 영상 생성 플로우 + +```python +# 1단계: 즉시 응답 +@router.get("/video/generate/{task_id}") +async def generate_video(task_id: str): + # DB에서 필요한 데이터 조회 (병렬) + # Video 레코드 생성 (status="processing") + # Creatomate API 호출 → render_id 획득 + return {"success": True, "render_id": render_id} + +# 2단계: 폴링 +@router.get("/video/status/{render_id}") +async def get_video_status(render_id: str, background_tasks: BackgroundTasks): + status = await creatomate_service.get_render_status(render_id) + + if status == "succeeded": + # 백그라운드 태스크 트리거 + background_tasks.add_task( + download_and_upload_video_to_blob, + task_id=video.task_id, + video_url=status["url"], + ) + + return {"status": status} + +# 3단계: 백그라운드 완료 +async def download_and_upload_video_to_blob(task_id: str, video_url: str): + # 임시 파일 다운로드 + # Azure Blob 업로드 + # DB 업데이트 (BackgroundSessionLocal 사용) + # 임시 파일 삭제 +``` + +### 5.3 에러 처리 전략 + +```python +async def download_and_upload_video_to_blob(task_id: str, video_url: str): + temp_file_path: Path | None = None + + try: + # 다운로드 및 업로드 로직 + ... + + async with BackgroundSessionLocal() as session: + video.status = "completed" + await session.commit() + + except Exception as e: + print(f"[EXCEPTION] {task_id}: {e}") + # 실패 상태로 업데이트 + async with BackgroundSessionLocal() as session: + video.status = "failed" + await session.commit() + + finally: + # 임시 파일 정리 (항상 실행) + if temp_file_path and temp_file_path.exists(): + temp_file_path.unlink() + # 임시 디렉토리 정리 + temp_dir.rmdir() +``` + +--- + +## 6. 쿼리 최적화 전략 + +### 6.1 N+1 문제 해결 + +**문제 코드:** +```python +# 각 video마다 project를 개별 조회 (N+1) +for video in videos: + project = await session.execute( + select(Project).where(Project.id == video.project_id) + ) +``` + +**해결 코드:** +```python +# 1. Video 목록 조회 +videos = await session.execute(video_query) +video_list = videos.scalars().all() + +# 2. Project ID 수집 +project_ids = [v.project_id for v in video_list if v.project_id] + +# 3. Project 일괄 조회 (IN 절) +projects_result = await session.execute( + select(Project).where(Project.id.in_(project_ids)) +) + +# 4. 딕셔너리로 매핑 +projects_map = {p.id: p for p in projects_result.scalars().all()} + +# 5. 조합 +for video in video_list: + project = projects_map.get(video.project_id) +``` + +**위치**: `app/video/api/routers/v1/video.py` - `get_videos()` + +### 6.2 서브쿼리를 활용한 중복 제거 + +```python +# task_id별 최신 Video의 id만 추출 +subquery = ( + select(func.max(Video.id).label("max_id")) + .where(Video.status == "completed") + .group_by(Video.task_id) + .subquery() +) + +# 최신 Video만 조회 +query = ( + select(Video) + .where(Video.id.in_(select(subquery.c.max_id))) + .order_by(Video.created_at.desc()) +) +``` + +--- + +## 7. 설계 강점 분석 + +### 7.1 안정성 (Stability) + +| 요소 | 구현 | 효과 | +|------|------|------| +| pool_pre_ping | 쿼리 전 연결 검증 | Stale 커넥션 방지 | +| pool_reset_on_return | 반환 시 롤백 | 트랜잭션 상태 초기화 | +| 이중 커넥션 풀 | 요청/백그라운드 분리 | 리소스 경합 방지 | +| Finally 블록 | 임시 파일 정리 | 리소스 누수 방지 | + +### 7.2 성능 (Performance) + +| 요소 | 구현 | 효과 | +|------|------|------| +| asyncio.gather() | 병렬 쿼리 | 72% 응답 시간 단축 | +| 템플릿 캐싱 | TTL 기반 메모리 캐시 | API 호출 100% 감소 | +| HTTP 클라이언트 풀 | 싱글톤 패턴 | 커넥션 재사용 | +| N+1 해결 | IN 절 배치 조회 | 쿼리 수 N→2 감소 | + +### 7.3 확장성 (Scalability) + +| 요소 | 구현 | 효과 | +|------|------|------| +| 명시적 세션 관리 | 외부 API 시 세션 해제 | 커넥션 풀 점유 최소화 | +| 백그라운드 태스크 | FastAPI BackgroundTasks | 논블로킹 처리 | +| 폴링 패턴 | Status endpoint | 클라이언트 주도 동기화 | + +### 7.4 유지보수성 (Maintainability) + +| 요소 | 구현 | 효과 | +|------|------|------| +| 구조화된 로깅 | `[function_name]` prefix | 추적 용이 | +| 타입 힌트 | Python 3.11+ 문법 | IDE 지원, 버그 감소 | +| 문서화 | Docstring, 주석 | 코드 이해도 향상 | + +--- + +## 8. 개선 권장 사항 + +### 8.1 Song 라우터 N+1 문제 + +**현재 상태** (`app/song/api/routers/v1/song.py`): +```python +# N+1 발생 +for song in songs: + project_result = await session.execute( + select(Project).where(Project.id == song.project_id) + ) +``` + +**권장 수정**: +```python +# video.py의 패턴 적용 +project_ids = [s.project_id for s in songs if s.project_id] +projects_result = await session.execute( + select(Project).where(Project.id.in_(project_ids)) +) +projects_map = {p.id: p for p in projects_result.scalars().all()} +``` + +### 8.2 Suno 서비스 HTTP 클라이언트 풀링 + +**현재 상태** (`app/utils/suno.py`): +```python +# 요청마다 새 클라이언트 생성 +async with httpx.AsyncClient() as client: + ... +``` + +**권장 수정**: +```python +# creatomate.py 패턴 적용 +_suno_client: httpx.AsyncClient | None = None + +async def get_suno_client() -> httpx.AsyncClient: + global _suno_client + if _suno_client is None or _suno_client.is_closed: + _suno_client = httpx.AsyncClient( + timeout=httpx.Timeout(60.0, connect=10.0), + limits=httpx.Limits(max_keepalive_connections=5, max_connections=10), + ) + return _suno_client +``` + +### 8.3 동시성 제한 + +**권장 추가**: +```python +# 백그라운드 태스크 동시 실행 수 제한 +BACKGROUND_TASK_SEMAPHORE = asyncio.Semaphore(5) + +async def download_and_upload_video_to_blob(...): + async with BACKGROUND_TASK_SEMAPHORE: + # 기존 로직 +``` + +### 8.4 분산 락 (선택적) + +**높은 동시성 환경에서 권장**: +```python +# Redis 기반 분산 락 +async def generate_video_with_lock(task_id: str): + lock_key = f"video_gen:{task_id}" + + if not await redis.setnx(lock_key, "1"): + raise HTTPException(409, "Already processing") + + try: + await redis.expire(lock_key, 300) # 5분 TTL + # 영상 생성 로직 + finally: + await redis.delete(lock_key) +``` + +--- + +## 9. 아키텍처 다이어그램 + +### 9.1 전체 요청 흐름 + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ CLIENT │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ FASTAPI SERVER │ +│ ┌─────────────────────────────────────────────────────────────────┐ │ +│ │ ROUTERS │ │ +│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ +│ │ │ /video │ │ /song │ │ /lyric │ │ /project │ │ │ +│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ +│ └─────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌───────────────┼───────────────┐ │ +│ ▼ ▼ ▼ │ +│ ┌─────────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │ +│ │ MAIN ENGINE │ │ BACKGROUND │ │ EXTERNAL SERVICES │ │ +│ │ (AsyncSession) │ │ ENGINE │ │ ┌───────────────┐ │ │ +│ │ pool_size: 20 │ │ (BackgroundSess)│ │ │ Creatomate │ │ │ +│ │ max_overflow: 20 │ │ pool_size: 10 │ │ ├───────────────┤ │ │ +│ └─────────────────────┘ └─────────────────┘ │ │ Suno │ │ │ +│ │ │ │ ├───────────────┤ │ │ +│ ▼ ▼ │ │ ChatGPT │ │ │ +│ ┌─────────────────────────────────────────┐ │ ├───────────────┤ │ │ +│ │ MySQL DATABASE │ │ │ Azure Blob │ │ │ +│ │ ┌────────┐ ┌────────┐ ┌────────────┐ │ │ └───────────────┘ │ │ +│ │ │Project │ │ Song │ │ Video │ │ └─────────────────────┘ │ +│ │ │ Lyric │ │ Image │ │ │ │ │ +│ │ └────────┘ └────────┘ └────────────┘ │ │ +│ └─────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### 9.2 데이터 흐름 + +``` +[영상 생성 파이프라인] + +1. 프로젝트 생성 + Client ─▶ POST /project ─▶ DB(Project) ─▶ task_id + +2. 이미지 업로드 + Client ─▶ POST /project/image ─▶ Azure Blob ─▶ DB(Image) + +3. 가사 생성 + Client ─▶ POST /lyric/generate ─▶ DB(Lyric) ─▶ BackgroundTask + │ + ▼ + ChatGPT API + │ + ▼ + DB Update + +4. 노래 생성 + Client ─▶ GET /song/generate/{task_id} ─▶ Suno API ─▶ DB(Song) + Client ◀──── polling ─────────────────────────────────┘ + +5. 영상 생성 + Client ─▶ GET /video/generate/{task_id} + │ + ├─ asyncio.gather() ─▶ DB(Project, Lyric, Song, Image) + │ + ├─ Creatomate API ─▶ render_id + │ + └─ DB(Video) status="processing" + + Client ─▶ GET /video/status/{render_id} + │ + ├─ Creatomate Status Check + │ + └─ if succeeded ─▶ BackgroundTask + │ + ├─ Download MP4 + ├─ Upload to Azure Blob + └─ DB Update status="completed" + +6. 결과 조회 + Client ─▶ GET /video/download/{task_id} ─▶ result_movie_url +``` + +--- + +## 10. 결론 + +### 10.1 현재 아키텍처 평가 + +O2O-CASTAD Backend는 **프로덕션 준비 수준의 비동기 아키텍처**를 갖추고 있습니다: + +1. **안정성**: 이중 커넥션 풀, pool_pre_ping, 명시적 세션 관리로 런타임 에러 최소화 +2. **성능**: 병렬 쿼리, 캐싱, HTTP 클라이언트 풀링으로 응답 시간 최적화 +3. **확장성**: 백그라운드 태스크 분리, 폴링 패턴으로 부하 분산 +4. **유지보수성**: 일관된 패턴, 구조화된 로깅, 타입 힌트 + +### 10.2 핵심 성과 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ BEFORE → AFTER │ +├─────────────────────────────────────────────────────────────┤ +│ Session Timeout Errors │ Frequent → Resolved │ +│ DB Query Time │ 200ms → 55ms (72%↓) │ +│ Template API Calls │ Every req → Cached (100%↓) │ +│ HTTP Client Overhead │ 50ms/req → 0ms (100%↓) │ +│ N+1 Query Problem │ N queries → 2 queries │ +│ Connection Pool Conflicts │ Frequent → Isolated │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 10.3 권장 다음 단계 + +1. **단기**: Song 라우터 N+1 문제 해결 +2. **단기**: Suno 서비스 HTTP 클라이언트 풀링 적용 +3. **중기**: 동시성 제한 (Semaphore) 추가 +4. **장기**: Redis 캐시 레이어 도입 (템플릿 캐시 영속화) +5. **장기**: 분산 락 구현 (높은 동시성 환경 대비) + +--- + +> **문서 끝** +> 추가 질문이나 개선 제안은 개발팀에 문의하세요. diff --git a/docs/analysis/db_쿼리_병렬화.md b/docs/analysis/db_쿼리_병렬화.md index d70d065..6c65322 100644 --- a/docs/analysis/db_쿼리_병렬화.md +++ b/docs/analysis/db_쿼리_병렬화.md @@ -3,6 +3,7 @@ > **목적**: Python asyncio와 SQLAlchemy를 활용한 DB 쿼리 병렬화의 이론부터 실무 적용까지 > **대상**: 비동기 프로그래밍 기초 지식이 있는 백엔드 개발자 > **환경**: Python 3.11+, SQLAlchemy 2.0+, FastAPI +> **최종 수정**: 2024-12 (AsyncSession 병렬 쿼리 제한사항 추가) --- @@ -10,10 +11,11 @@ 1. [이론적 배경](#1-이론적-배경) 2. [핵심 개념](#2-핵심-개념) -3. [설계 시 주의사항](#3-설계-시-주의사항) -4. [실무 시나리오 예제](#4-실무-시나리오-예제) -5. [성능 측정 및 모니터링](#5-성능-측정-및-모니터링) -6. [Best Practices](#6-best-practices) +3. [SQLAlchemy AsyncSession 병렬 쿼리 제한사항](#3-sqlalchemy-asyncsession-병렬-쿼리-제한사항) ⚠️ **중요** +4. [설계 시 주의사항](#4-설계-시-주의사항) +5. [실무 시나리오 예제](#5-실무-시나리오-예제) +6. [성능 측정 및 모니터링](#6-성능-측정-및-모니터링) +7. [Best Practices](#7-best-practices) --- @@ -103,13 +105,16 @@ await session.execute(insert(User).values(name="John")) new_user = await session.execute(select(User).where(User.name == "John")) ``` -### 2.3 SQLAlchemy AsyncSession과 병렬 쿼리 +--- -**중요**: 하나의 AsyncSession 내에서 `asyncio.gather()`로 여러 쿼리를 실행할 수 있습니다. +## 3. SQLAlchemy AsyncSession 병렬 쿼리 제한사항 +### ⚠️ 3.1 중요: 단일 AsyncSession에서 병렬 쿼리는 지원되지 않습니다 + +**이전에 잘못 알려진 내용:** ```python +# ❌ 이 코드는 실제로 작동하지 않습니다! async with AsyncSessionLocal() as session: - # 같은 세션에서 병렬 쿼리 실행 가능 results = await asyncio.gather( session.execute(query1), session.execute(query2), @@ -117,16 +122,295 @@ async with AsyncSessionLocal() as session: ) ``` -**단, 주의사항:** -- 같은 세션은 같은 트랜잭션을 공유 -- 하나의 쿼리 실패 시 전체 트랜잭션에 영향 -- 커넥션 풀 크기 고려 필요 +### 3.2 실제 발생하는 에러 + +위 코드를 실행하면 다음과 같은 에러가 발생합니다: + +``` +sqlalchemy.exc.InvalidRequestError: +Method 'close()' can't be called here; method '_connection_for_bind()' +is already in progress and this would cause an unexpected state change +to + +(Background on this error at: https://sqlalche.me/e/20/isce) +``` + +### 3.3 에러 발생 원인 분석 + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ AsyncSession 내부 상태 충돌 │ +├─────────────────────────────────────────────────────────────────────────┤ +│ │ +│ AsyncSession은 내부적으로 하나의 Connection을 관리합니다. │ +│ asyncio.gather()로 여러 쿼리를 동시에 실행하면: │ +│ │ +│ Time ──────────────────────────────────────────────────────────► │ +│ │ +│ Task 1: session.execute(query1) │ +│ └── _connection_for_bind() 시작 ──► 연결 획득 중... │ +│ │ +│ Task 2: session.execute(query2) │ +│ └── _connection_for_bind() 시작 ──► ⚠️ 충돌! │ +│ (이미 Task 1이 연결 작업 중) │ +│ │ +│ Task 3: session.execute(query3) │ +│ └── _connection_for_bind() 시작 ──► ⚠️ 충돌! │ +│ │ +│ 결과: InvalidRequestError 발생 │ +│ │ +│ ───────────────────────────────────────────────────────────────────── │ +│ │ +│ 핵심 원인: │ +│ 1. AsyncSession은 단일 연결(Connection)을 사용 │ +│ 2. 연결 상태 전이(state transition)가 순차적으로만 가능 │ +│ 3. 동시에 여러 작업이 상태 전이를 시도하면 충돌 발생 │ +│ │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### 3.4 SQLAlchemy 공식 문서의 설명 + +SQLAlchemy 2.0 문서에 따르면: + +> The AsyncSession object is a mutable, stateful object which represents +> a single, stateful database transaction in progress. Using concurrent +> tasks with asyncio, with APIs such as asyncio.gather() for example, +> should use a **separate AsyncSession per individual task**. + +**번역**: AsyncSession 객체는 진행 중인 단일 데이터베이스 트랜잭션을 나타내는 +변경 가능한 상태 객체입니다. asyncio.gather() 같은 API로 동시 작업을 수행할 때는 +**각 작업마다 별도의 AsyncSession**을 사용해야 합니다. + +### 3.5 본 프로젝트에서 발생한 실제 사례 + +**문제가 발생한 코드 (video.py):** +```python +async def generate_video(task_id: str, ...): + async with AsyncSessionLocal() as session: + # ❌ 단일 세션에서 asyncio.gather() 사용 - 에러 발생! + project_result, lyric_result, song_result, image_result = ( + await asyncio.gather( + session.execute(project_query), + session.execute(lyric_query), + session.execute(song_query), + session.execute(image_query), + ) + ) +``` + +**프론트엔드에서 표시된 에러:** +``` +Method 'close()' can't be called here; method '_connection_for_bind()' +is already in progress and this would cause an unexpected state change +to +``` + +### 3.6 해결 방법 + +#### 방법 1: 순차 실행 (권장 - 단순하고 안전) + +```python +# ✅ 올바른 방법: 순차 실행 +async def generate_video(task_id: str, ...): + async with AsyncSessionLocal() as session: + # 순차적으로 쿼리 실행 (가장 안전) + project_result = await session.execute(project_query) + lyric_result = await session.execute(lyric_query) + song_result = await session.execute(song_query) + image_result = await session.execute(image_query) +``` + +**장점:** +- 구현이 단순함 +- 에러 처리가 명확함 +- 트랜잭션 관리가 쉬움 + +**단점:** +- 총 실행 시간 = 각 쿼리 시간의 합 + +#### 방법 2: 별도 세션으로 병렬 실행 (성능 중요 시) + +```python +# ✅ 올바른 방법: 각 쿼리마다 별도 세션 사용 +async def fetch_with_separate_sessions(task_id: str): + + async def get_project(): + async with AsyncSessionLocal() as session: + result = await session.execute( + select(Project).where(Project.task_id == task_id) + ) + return result.scalar_one_or_none() + + async def get_lyric(): + async with AsyncSessionLocal() as session: + result = await session.execute( + select(Lyric).where(Lyric.task_id == task_id) + ) + return result.scalar_one_or_none() + + async def get_song(): + async with AsyncSessionLocal() as session: + result = await session.execute( + select(Song).where(Song.task_id == task_id) + ) + return result.scalar_one_or_none() + + async def get_images(): + async with AsyncSessionLocal() as session: + result = await session.execute( + select(Image).where(Image.task_id == task_id) + ) + return result.scalars().all() + + # 별도 세션이므로 병렬 실행 가능! + project, lyric, song, images = await asyncio.gather( + get_project(), + get_lyric(), + get_song(), + get_images(), + ) + + return project, lyric, song, images +``` + +**장점:** +- 진정한 병렬 실행 +- 총 실행 시간 = 가장 느린 쿼리 시간 + +**단점:** +- 커넥션 풀에서 여러 연결을 동시에 사용 +- 각 쿼리가 별도 트랜잭션 (일관성 주의) +- 코드가 복잡해짐 + +#### 방법 3: 유틸리티 함수로 추상화 + +```python +from typing import TypeVar, Callable, Any +import asyncio + +T = TypeVar('T') + + +async def parallel_queries( + queries: list[tuple[Callable, dict]], +) -> list[Any]: + """ + 여러 쿼리를 별도 세션으로 병렬 실행합니다. + + Args: + queries: [(query_func, kwargs), ...] 형태의 리스트 + + Returns: + 각 쿼리의 결과 리스트 + + Example: + results = await parallel_queries([ + (get_project, {"task_id": task_id}), + (get_song, {"task_id": task_id}), + ]) + """ + async def execute_with_session(query_func, kwargs): + async with AsyncSessionLocal() as session: + return await query_func(session, **kwargs) + + return await asyncio.gather(*[ + execute_with_session(func, kwargs) + for func, kwargs in queries + ]) + + +# 사용 예시 +async def get_project(session, task_id: str): + result = await session.execute( + select(Project).where(Project.task_id == task_id) + ) + return result.scalar_one_or_none() + + +async def get_song(session, task_id: str): + result = await session.execute( + select(Song).where(Song.task_id == task_id) + ) + return result.scalar_one_or_none() + + +# 병렬 실행 +project, song = await parallel_queries([ + (get_project, {"task_id": "abc123"}), + (get_song, {"task_id": "abc123"}), +]) +``` + +### 3.7 성능 비교: 순차 vs 별도 세션 병렬 + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ 성능 비교 (4개 쿼리, 각 50ms) │ +├─────────────────────────────────────────────────────────────────────────┤ +│ │ +│ [순차 실행 - 단일 세션] │ +│ ─────────────────────── │ +│ Query 1 ─────▶ (50ms) │ +│ Query 2 ─────▶ (50ms) │ +│ Query 3 ─────▶ (50ms) │ +│ Query 4 ─────▶ (50ms) │ +│ 총 소요시간: ~200ms │ +│ 커넥션 사용: 1개 │ +│ │ +│ [병렬 실행 - 별도 세션] │ +│ ─────────────────────── │ +│ Session 1: Query 1 ─────▶ (50ms) │ +│ Session 2: Query 2 ─────▶ (50ms) │ +│ Session 3: Query 3 ─────▶ (50ms) │ +│ Session 4: Query 4 ─────▶ (50ms) │ +│ 총 소요시간: ~55ms │ +│ 커넥션 사용: 4개 (동시) │ +│ │ +│ ───────────────────────────────────────────────────────────────────── │ +│ │ +│ 결론: │ +│ - 성능 개선: 약 72% (200ms → 55ms) │ +│ - 대가: 커넥션 풀 사용량 4배 증가 │ +│ - 트레이드오프 고려 필요 │ +│ │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### 3.8 언제 병렬 실행을 선택해야 하는가? + +| 상황 | 권장 방식 | 이유 | +|------|----------|------| +| 쿼리 수 ≤ 3개 | 순차 실행 | 복잡도 대비 성능 이득 적음 | +| 쿼리 수 > 3개, 각 쿼리 > 50ms | 병렬 실행 | 유의미한 성능 개선 | +| 트랜잭션 일관성 필요 | 순차 실행 | 별도 세션은 별도 트랜잭션 | +| 커넥션 풀 여유 없음 | 순차 실행 | 풀 고갈 위험 | +| 실시간 응답 중요 (API) | 상황에 따라 | 사용자 경험 우선 | +| 백그라운드 작업 | 순차 실행 | 안정성 우선 | + +### 3.9 커넥션 풀 고려사항 + +```python +# 엔진 설정 시 병렬 쿼리를 고려한 풀 크기 설정 +engine = create_async_engine( + url=db_url, + pool_size=20, # 기본 풀 크기 + max_overflow=20, # 추가 연결 허용 + pool_timeout=30, # 풀 대기 타임아웃 +) + +# 계산 예시: +# - 동시 API 요청: 10개 +# - 요청당 병렬 쿼리: 4개 +# - 필요 커넥션: 10 × 4 = 40개 +# - 설정: pool_size(20) + max_overflow(20) = 40개 ✅ +``` --- -## 3. 설계 시 주의사항 +## 4. 설계 시 주의사항 -### 3.1 커넥션 풀 크기 설정 +### 4.1 커넥션 풀 크기 설정 ```python # SQLAlchemy 엔진 설정 @@ -148,16 +432,16 @@ engine = create_async_engine( 예: 동시 10개 요청, 각 요청당 4개 병렬 쿼리 → 최소 40개 커넥션 필요 (pool_size + max_overflow >= 40) -### 3.2 에러 처리 전략 +### 4.2 에러 처리 전략 ```python import asyncio # 방법 1: return_exceptions=True (권장) results = await asyncio.gather( - session.execute(query1), - session.execute(query2), - session.execute(query3), + fetch_with_session_1(), + fetch_with_session_2(), + fetch_with_session_3(), return_exceptions=True, # 예외를 결과로 반환 ) @@ -171,30 +455,31 @@ for i, result in enumerate(results): ```python # 방법 2: 개별 try-except 래핑 -async def safe_execute(session, query, name: str): +async def safe_fetch(query_func, **kwargs): try: - return await session.execute(query) + async with AsyncSessionLocal() as session: + return await query_func(session, **kwargs) except Exception as e: - print(f"[{name}] Query failed: {e}") + print(f"Query failed: {e}") return None results = await asyncio.gather( - safe_execute(session, query1, "project"), - safe_execute(session, query2, "song"), - safe_execute(session, query3, "image"), + safe_fetch(get_project, task_id=task_id), + safe_fetch(get_song, task_id=task_id), + safe_fetch(get_images, task_id=task_id), ) ``` -### 3.3 타임아웃 설정 +### 4.3 타임아웃 설정 ```python import asyncio -async def execute_with_timeout(session, query, timeout_seconds: float): +async def fetch_with_timeout(query_func, timeout_seconds: float, **kwargs): """타임아웃이 있는 쿼리 실행""" try: return await asyncio.wait_for( - session.execute(query), + query_func(**kwargs), timeout=timeout_seconds ) except asyncio.TimeoutError: @@ -202,13 +487,13 @@ async def execute_with_timeout(session, query, timeout_seconds: float): # 사용 예 results = await asyncio.gather( - execute_with_timeout(session, query1, 5.0), - execute_with_timeout(session, query2, 5.0), - execute_with_timeout(session, query3, 10.0), # 더 긴 타임아웃 + fetch_with_timeout(get_project, 5.0, task_id=task_id), + fetch_with_timeout(get_song, 5.0, task_id=task_id), + fetch_with_timeout(get_images, 10.0, task_id=task_id), # 더 긴 타임아웃 ) ``` -### 3.4 N+1 문제와 병렬화 +### 4.4 N+1 문제와 병렬화 ```python # ❌ N+1 문제 발생 코드 @@ -233,7 +518,7 @@ projects_result = await session.execute( projects_map = {p.id: p for p in projects_result.scalars().all()} ``` -### 3.5 트랜잭션 격리 수준 고려 +### 4.5 트랜잭션 격리 수준 고려 | 격리 수준 | 병렬 쿼리 안전성 | 설명 | |-----------|------------------|------| @@ -244,9 +529,9 @@ projects_map = {p.id: p for p in projects_result.scalars().all()} --- -## 4. 실무 시나리오 예제 +## 5. 실무 시나리오 예제 -### 4.1 시나리오 1: 대시보드 데이터 조회 +### 5.1 시나리오 1: 대시보드 데이터 조회 (별도 세션 병렬) **요구사항**: 사용자 대시보드에 필요한 여러 통계 데이터를 한 번에 조회 @@ -256,77 +541,87 @@ from sqlalchemy.ext.asyncio import AsyncSession import asyncio -async def get_dashboard_data( - session: AsyncSession, - user_id: int, -) -> dict: - """ - 대시보드에 필요한 모든 데이터를 병렬로 조회합니다. +async def get_user(session: AsyncSession, user_id: int): + result = await session.execute( + select(User).where(User.id == user_id) + ) + return result.scalar_one_or_none() - 조회 항목: - - 사용자 정보 - - 최근 주문 5개 - - 총 주문 금액 - - 찜한 상품 수 - """ - # 1. 쿼리 정의 (아직 실행하지 않음) - user_query = select(User).where(User.id == user_id) - - recent_orders_query = ( +async def get_recent_orders(session: AsyncSession, user_id: int): + result = await session.execute( select(Order) .where(Order.user_id == user_id) .order_by(Order.created_at.desc()) .limit(5) ) + return result.scalars().all() - total_amount_query = ( + +async def get_total_amount(session: AsyncSession, user_id: int): + result = await session.execute( select(func.sum(Order.amount)) .where(Order.user_id == user_id) ) + return result.scalar() or 0 - wishlist_count_query = ( + +async def get_wishlist_count(session: AsyncSession, user_id: int): + result = await session.execute( select(func.count(Wishlist.id)) .where(Wishlist.user_id == user_id) ) + return result.scalar() or 0 - # 2. 4개 쿼리를 병렬로 실행 - user_result, orders_result, amount_result, wishlist_result = ( - await asyncio.gather( - session.execute(user_query), - session.execute(recent_orders_query), - session.execute(total_amount_query), - session.execute(wishlist_count_query), - ) + +async def get_dashboard_data(user_id: int) -> dict: + """ + 대시보드에 필요한 모든 데이터를 병렬로 조회합니다. + 각 쿼리는 별도의 세션을 사용합니다. + """ + + async def fetch_user(): + async with AsyncSessionLocal() as session: + return await get_user(session, user_id) + + async def fetch_orders(): + async with AsyncSessionLocal() as session: + return await get_recent_orders(session, user_id) + + async def fetch_amount(): + async with AsyncSessionLocal() as session: + return await get_total_amount(session, user_id) + + async def fetch_wishlist(): + async with AsyncSessionLocal() as session: + return await get_wishlist_count(session, user_id) + + # 4개 쿼리를 별도 세션으로 병렬 실행 + user, orders, total_amount, wishlist_count = await asyncio.gather( + fetch_user(), + fetch_orders(), + fetch_amount(), + fetch_wishlist(), ) - # 3. 결과 처리 - user = user_result.scalar_one_or_none() if not user: raise ValueError(f"User {user_id} not found") return { - "user": { - "id": user.id, - "name": user.name, - "email": user.email, - }, + "user": {"id": user.id, "name": user.name, "email": user.email}, "recent_orders": [ {"id": o.id, "amount": o.amount, "status": o.status} - for o in orders_result.scalars().all() + for o in orders ], - "total_spent": amount_result.scalar() or 0, - "wishlist_count": wishlist_result.scalar() or 0, + "total_spent": total_amount, + "wishlist_count": wishlist_count, } # 사용 예시 (FastAPI) @router.get("/dashboard") -async def dashboard( - user_id: int, - session: AsyncSession = Depends(get_session), -): - return await get_dashboard_data(session, user_id) +async def dashboard(user_id: int): + return await get_dashboard_data(user_id) ``` **성능 비교:** @@ -336,175 +631,15 @@ async def dashboard( --- -### 4.2 시나리오 2: 복합 검색 결과 조회 +### 5.2 시나리오 2: 영상 생성 데이터 조회 (순차 실행 - 권장) -**요구사항**: 검색 결과와 함께 필터 옵션(카테고리 수, 가격 범위 등)을 조회 +**요구사항**: 영상 생성을 위해 Project, Lyric, Song, Image 데이터를 조회 -```python -from sqlalchemy import select, func, and_ -from sqlalchemy.ext.asyncio import AsyncSession -import asyncio -from typing import NamedTuple - - -class SearchFilters(NamedTuple): - """검색 필터 결과""" - categories: list[dict] - price_range: dict - brands: list[dict] - - -class SearchResult(NamedTuple): - """전체 검색 결과""" - items: list - total_count: int - filters: SearchFilters - - -async def search_products_with_filters( - session: AsyncSession, - keyword: str, - page: int = 1, - page_size: int = 20, -) -> SearchResult: - """ - 상품 검색과 필터 옵션을 병렬로 조회합니다. - - 병렬 실행 쿼리: - 1. 상품 목록 (페이지네이션) - 2. 전체 개수 - 3. 카테고리별 개수 - 4. 가격 범위 (min, max) - 5. 브랜드별 개수 - """ - - # 기본 검색 조건 - base_condition = Product.name.ilike(f"%{keyword}%") - - # 쿼리 정의 - items_query = ( - select(Product) - .where(base_condition) - .order_by(Product.created_at.desc()) - .offset((page - 1) * page_size) - .limit(page_size) - ) - - count_query = ( - select(func.count(Product.id)) - .where(base_condition) - ) - - category_stats_query = ( - select( - Product.category_id, - Category.name.label("category_name"), - func.count(Product.id).label("count") - ) - .join(Category, Product.category_id == Category.id) - .where(base_condition) - .group_by(Product.category_id, Category.name) - ) - - price_range_query = ( - select( - func.min(Product.price).label("min_price"), - func.max(Product.price).label("max_price"), - ) - .where(base_condition) - ) - - brand_stats_query = ( - select( - Product.brand, - func.count(Product.id).label("count") - ) - .where(and_(base_condition, Product.brand.isnot(None))) - .group_by(Product.brand) - .order_by(func.count(Product.id).desc()) - .limit(10) - ) - - # 5개 쿼리 병렬 실행 - ( - items_result, - count_result, - category_result, - price_result, - brand_result, - ) = await asyncio.gather( - session.execute(items_query), - session.execute(count_query), - session.execute(category_stats_query), - session.execute(price_range_query), - session.execute(brand_stats_query), - ) - - # 결과 처리 - items = items_result.scalars().all() - total_count = count_result.scalar() or 0 - - categories = [ - {"id": row.category_id, "name": row.category_name, "count": row.count} - for row in category_result.all() - ] - - price_row = price_result.one() - price_range = { - "min": float(price_row.min_price or 0), - "max": float(price_row.max_price or 0), - } - - brands = [ - {"name": row.brand, "count": row.count} - for row in brand_result.all() - ] - - return SearchResult( - items=items, - total_count=total_count, - filters=SearchFilters( - categories=categories, - price_range=price_range, - brands=brands, - ), - ) - - -# 사용 예시 (FastAPI) -@router.get("/search") -async def search( - keyword: str, - page: int = 1, - session: AsyncSession = Depends(get_session), -): - result = await search_products_with_filters(session, keyword, page) - return { - "items": [item.to_dict() for item in result.items], - "total_count": result.total_count, - "filters": { - "categories": result.filters.categories, - "price_range": result.filters.price_range, - "brands": result.filters.brands, - }, - } -``` - -**성능 비교:** -- 순차 실행: ~350ms (70ms × 5) -- 병렬 실행: ~80ms -- **개선율: 약 77%** - ---- - -### 4.3 시나리오 3: 다중 테이블 데이터 수집 (본 프로젝트 실제 적용 예) - -**요구사항**: 영상 생성을 위해 Project, Lyric, Song, Image 데이터를 한 번에 조회 +**본 프로젝트에서 실제로 적용된 패턴입니다.** ```python from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -import asyncio from dataclasses import dataclass from fastapi import HTTPException @@ -521,169 +656,176 @@ class VideoGenerationData: image_urls: list[str] -async def fetch_video_generation_data( - session: AsyncSession, +async def generate_video( task_id: str, -) -> VideoGenerationData: + orientation: str = "vertical", +) -> GenerateVideoResponse: """ - 영상 생성에 필요한 모든 데이터를 병렬로 조회합니다. + Creatomate API를 통해 영상을 생성합니다. - 이 함수는 4개의 독립적인 테이블을 조회합니다: - - Project: 프로젝트 정보 - - Lyric: 가사 정보 - - Song: 노래 정보 (음악 URL, 길이, 가사) - - Image: 이미지 목록 - - 각 테이블은 task_id로 연결되어 있으며, 서로 의존성이 없으므로 - 병렬 조회가 가능합니다. + 중요: SQLAlchemy AsyncSession은 단일 세션에서 동시에 여러 쿼리를 실행하는 것을 + 지원하지 않습니다. asyncio.gather()로 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다. + 따라서 쿼리는 순차적으로 실행합니다. """ + from app.database.session import AsyncSessionLocal - # ============================================================ - # Step 1: 쿼리 객체 생성 (아직 실행하지 않음) - # ============================================================ - project_query = ( - select(Project) - .where(Project.task_id == task_id) - .order_by(Project.created_at.desc()) - .limit(1) - ) + print(f"[generate_video] START - task_id: {task_id}") - lyric_query = ( - select(Lyric) - .where(Lyric.task_id == task_id) - .order_by(Lyric.created_at.desc()) - .limit(1) - ) + # 외부 API 호출 전에 필요한 데이터를 저장할 변수들 + project_id: int | None = None + lyric_id: int | None = None + song_id: int | None = None + video_id: int | None = None + music_url: str | None = None + song_duration: float | None = None + lyrics: str | None = None + image_urls: list[str] = [] - song_query = ( - select(Song) - .where(Song.task_id == task_id) - .order_by(Song.created_at.desc()) - .limit(1) - ) + try: + # 세션을 명시적으로 열고 DB 작업 후 바로 닫음 + async with AsyncSessionLocal() as session: + # ===== 순차 쿼리 실행: Project, Lyric, Song, Image ===== + # Note: AsyncSession은 동일 세션에서 병렬 쿼리를 지원하지 않음 - image_query = ( - select(Image) - .where(Image.task_id == task_id) - .order_by(Image.img_order.asc()) - ) + # Project 조회 + project_result = await session.execute( + select(Project) + .where(Project.task_id == task_id) + .order_by(Project.created_at.desc()) + .limit(1) + ) - # ============================================================ - # Step 2: asyncio.gather()로 4개 쿼리 병렬 실행 - # ============================================================ - # - # 병렬 실행의 핵심: - # - 각 쿼리는 독립적 (서로의 결과에 의존하지 않음) - # - 같은 세션 내에서 실행 (같은 트랜잭션 공유) - # - 가장 느린 쿼리 시간만큼만 소요됨 - # - project_result, lyric_result, song_result, image_result = ( - await asyncio.gather( - session.execute(project_query), - session.execute(lyric_query), - session.execute(song_query), - session.execute(image_query), - ) - ) + # Lyric 조회 + lyric_result = await session.execute( + select(Lyric) + .where(Lyric.task_id == task_id) + .order_by(Lyric.created_at.desc()) + .limit(1) + ) - # ============================================================ - # Step 3: 결과 검증 및 데이터 추출 - # ============================================================ + # Song 조회 + song_result = await session.execute( + select(Song) + .where(Song.task_id == task_id) + .order_by(Song.created_at.desc()) + .limit(1) + ) - # Project 검증 - project = project_result.scalar_one_or_none() - if not project: - raise HTTPException( - status_code=404, - detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.", - ) + # Image 조회 + image_result = await session.execute( + select(Image) + .where(Image.task_id == task_id) + .order_by(Image.img_order.asc()) + ) - # Lyric 검증 - lyric = lyric_result.scalar_one_or_none() - if not lyric: - raise HTTPException( - status_code=404, - detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.", - ) + print(f"[generate_video] Queries completed - task_id: {task_id}") - # Song 검증 및 데이터 추출 - song = song_result.scalar_one_or_none() - if not song: - raise HTTPException( - status_code=404, - detail=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.", - ) + # 결과 처리 및 검증 + project = project_result.scalar_one_or_none() + if not project: + raise HTTPException(status_code=404, detail="Project not found") + project_id = project.id - if not song.song_result_url: - raise HTTPException( - status_code=400, - detail=f"Song(id={song.id})의 음악 URL이 없습니다.", - ) + lyric = lyric_result.scalar_one_or_none() + if not lyric: + raise HTTPException(status_code=404, detail="Lyric not found") + lyric_id = lyric.id - if not song.song_prompt: - raise HTTPException( - status_code=400, - detail=f"Song(id={song.id})의 가사(song_prompt)가 없습니다.", - ) + song = song_result.scalar_one_or_none() + if not song: + raise HTTPException(status_code=404, detail="Song not found") + song_id = song.id + music_url = song.song_result_url + song_duration = song.duration + lyrics = song.song_prompt - # Image 검증 - images = image_result.scalars().all() - if not images: - raise HTTPException( - status_code=404, - detail=f"task_id '{task_id}'에 해당하는 이미지를 찾을 수 없습니다.", - ) + images = image_result.scalars().all() + if not images: + raise HTTPException(status_code=404, detail="Images not found") + image_urls = [img.img_url for img in images] - # ============================================================ - # Step 4: 결과 반환 - # ============================================================ - return VideoGenerationData( - project_id=project.id, - lyric_id=lyric.id, - song_id=song.id, - music_url=song.song_result_url, - song_duration=song.duration or 60.0, - lyrics=song.song_prompt, - image_urls=[img.img_url for img in images], - ) + # Video 레코드 생성 및 커밋 + video = Video( + project_id=project_id, + lyric_id=lyric_id, + song_id=song_id, + task_id=task_id, + status="processing", + ) + session.add(video) + await session.commit() + video_id = video.id + # 세션 종료 후 외부 API 호출 (커넥션 타임아웃 방지) + # ... Creatomate API 호출 로직 ... -# 실제 사용 예시 -async def generate_video(task_id: str) -> dict: - async with AsyncSessionLocal() as session: - # 병렬 쿼리로 데이터 조회 - data = await fetch_video_generation_data(session, task_id) - - # Video 레코드 생성 - video = Video( - project_id=data.project_id, - lyric_id=data.lyric_id, - song_id=data.song_id, - task_id=task_id, - status="processing", - ) - session.add(video) - await session.commit() - - # 세션 종료 후 외부 API 호출 - # (커넥션 타임아웃 방지) - return await call_creatomate_api(data) + except HTTPException: + raise + except Exception as e: + print(f"[generate_video] EXCEPTION - {e}") + raise ``` -**성능 비교:** -- 순차 실행: ~200ms (약 50ms × 4쿼리) -- 병렬 실행: ~55ms -- **개선율: 약 72%** +**이 패턴을 선택한 이유:** +1. **안정성**: 단일 세션 내에서 모든 쿼리 실행으로 트랜잭션 일관성 보장 +2. **단순성**: 코드가 명확하고 디버깅이 쉬움 +3. **충분한 성능**: 4개의 간단한 쿼리는 순차 실행해도 ~200ms 이내 +4. **에러 방지**: AsyncSession 병렬 쿼리 제한으로 인한 에러 방지 --- -## 5. 성능 측정 및 모니터링 +### 5.3 시나리오 3: 복합 검색 (트레이드오프 분석) -### 5.1 실행 시간 측정 데코레이터 +```python +# 방법 A: 순차 실행 (단순, 안전) +async def search_sequential(session: AsyncSession, keyword: str): + items = await session.execute(items_query) + count = await session.execute(count_query) + categories = await session.execute(category_query) + price_range = await session.execute(price_query) + brands = await session.execute(brand_query) + return items, count, categories, price_range, brands +# 예상 시간: ~350ms (70ms × 5) + + +# 방법 B: 별도 세션 병렬 실행 (빠름, 복잡) +async def search_parallel(keyword: str): + async def fetch_items(): + async with AsyncSessionLocal() as s: + return await s.execute(items_query) + + async def fetch_count(): + async with AsyncSessionLocal() as s: + return await s.execute(count_query) + + # ... 나머지 함수들 ... + + return await asyncio.gather( + fetch_items(), + fetch_count(), + fetch_categories(), + fetch_price_range(), + fetch_brands(), + ) +# 예상 시간: ~80ms + + +# 결정 기준: +# - 검색 API가 자주 호출되고 응답 시간이 중요하다면 → 방법 B +# - 안정성이 우선이고 복잡도를 낮추고 싶다면 → 방법 A +# - 커넥션 풀 여유가 없다면 → 방법 A +``` + +--- + +## 6. 성능 측정 및 모니터링 + +### 6.1 실행 시간 측정 데코레이터 ```python import time import functools +import asyncio from typing import Callable, TypeVar T = TypeVar("T") @@ -717,39 +859,42 @@ def measure_time(func: Callable[..., T]) -> Callable[..., T]: # 사용 예 @measure_time -async def fetch_data(session, task_id): +async def fetch_data(task_id: str): ... ``` -### 5.2 병렬 쿼리 성능 비교 유틸리티 +### 6.2 병렬 vs 순차 성능 비교 유틸리티 ```python import asyncio import time -async def compare_sequential_vs_parallel( - session: AsyncSession, - queries: list, - labels: list[str] | None = None, +async def compare_execution_methods( + task_id: str, + query_funcs: list[Callable], ) -> dict: - """순차 실행과 병렬 실행의 성능을 비교합니다.""" + """순차 실행과 병렬 실행(별도 세션)의 성능을 비교합니다.""" - labels = labels or [f"Query {i}" for i in range(len(queries))] - - # 순차 실행 + # 순차 실행 (단일 세션) sequential_start = time.perf_counter() - sequential_results = [] - for query in queries: - result = await session.execute(query) - sequential_results.append(result) + async with AsyncSessionLocal() as session: + sequential_results = [] + for func in query_funcs: + result = await func(session, task_id) + sequential_results.append(result) sequential_time = (time.perf_counter() - sequential_start) * 1000 - # 병렬 실행 + # 병렬 실행 (별도 세션) parallel_start = time.perf_counter() - parallel_results = await asyncio.gather( - *[session.execute(query) for query in queries] - ) + + async def run_with_session(func): + async with AsyncSessionLocal() as session: + return await func(session, task_id) + + parallel_results = await asyncio.gather(*[ + run_with_session(func) for func in query_funcs + ]) parallel_time = (time.perf_counter() - parallel_start) * 1000 improvement = ((sequential_time - parallel_time) / sequential_time) * 100 @@ -758,11 +903,11 @@ async def compare_sequential_vs_parallel( "sequential_time_ms": round(sequential_time, 2), "parallel_time_ms": round(parallel_time, 2), "improvement_percent": round(improvement, 1), - "query_count": len(queries), + "query_count": len(query_funcs), } ``` -### 5.3 SQLAlchemy 쿼리 로깅 +### 6.3 SQLAlchemy 쿼리 로깅 ```python import logging @@ -777,68 +922,120 @@ engine = create_async_engine(url, echo=True) --- -## 6. Best Practices +## 7. Best Practices -### 6.1 체크리스트 +### 7.1 체크리스트 병렬화 적용 전 확인사항: - [ ] 쿼리들이 서로 독립적인가? (결과 의존성 없음) - [ ] 모든 쿼리가 READ 작업인가? (또는 순서 무관한 WRITE) +- [ ] **별도 세션을 사용할 것인가?** (AsyncSession 제한사항) - [ ] 커넥션 풀 크기가 충분한가? - [ ] 에러 처리 전략이 수립되어 있는가? - [ ] 타임아웃 설정이 적절한가? +- [ ] 트랜잭션 일관성이 필요한가? -### 6.2 권장 패턴 +### 7.2 권장 패턴 ```python -# ✅ 권장: 쿼리 정의와 실행 분리 +# ✅ 패턴 1: 순차 실행 (단순하고 안전) async def fetch_data(session: AsyncSession, task_id: str): - # 1. 쿼리 객체 정의 (명확한 의도 표현) - project_query = select(Project).where(Project.task_id == task_id) - song_query = select(Song).where(Song.task_id == task_id) + project = await session.execute(project_query) + song = await session.execute(song_query) + return project, song - # 2. 병렬 실행 - results = await asyncio.gather( - session.execute(project_query), - session.execute(song_query), - ) - # 3. 결과 처리 - return process_results(results) +# ✅ 패턴 2: 별도 세션으로 병렬 실행 (성능 중요 시) +async def fetch_data_parallel(task_id: str): + async def get_project(): + async with AsyncSessionLocal() as s: + return await s.execute(project_query) + + async def get_song(): + async with AsyncSessionLocal() as s: + return await s.execute(song_query) + + return await asyncio.gather(get_project(), get_song()) ``` -### 6.3 피해야 할 패턴 +### 7.3 피해야 할 패턴 ```python -# ❌ 피하기: 인라인 쿼리 (가독성 저하) -results = await asyncio.gather( - session.execute(select(A).where(A.x == y).order_by(A.z.desc()).limit(1)), - session.execute(select(B).where(B.a == b).order_by(B.c.desc()).limit(1)), -) +# ❌ 절대 금지: 단일 세션에서 asyncio.gather() +async with AsyncSessionLocal() as session: + results = await asyncio.gather( + session.execute(query1), + session.execute(query2), + ) +# 에러 발생: InvalidRequestError - Method 'close()' can't be called here # ❌ 피하기: 과도한 병렬화 (커넥션 고갈) # 100개 쿼리를 동시에 실행하면 커넥션 풀 고갈 위험 -results = await asyncio.gather(*[session.execute(q) for q in queries]) +results = await asyncio.gather(*[fetch() for _ in range(100)]) # ✅ 해결: 배치 처리 BATCH_SIZE = 10 -for i in range(0, len(queries), BATCH_SIZE): - batch = queries[i:i + BATCH_SIZE] - results = await asyncio.gather(*[session.execute(q) for q in batch]) +for i in range(0, len(items), BATCH_SIZE): + batch = items[i:i + BATCH_SIZE] + results = await asyncio.gather(*[fetch(item) for item in batch]) ``` -### 6.4 성능 최적화 팁 +### 7.4 결정 가이드 + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ 병렬화 결정 플로우차트 │ +├─────────────────────────────────────────────────────────────────────────┤ +│ │ +│ 쿼리 개수가 3개 이하인가? │ +│ │ │ +│ ├── Yes ──► 순차 실행 (복잡도 대비 이득 적음) │ +│ │ │ +│ └── No ──► 각 쿼리가 50ms 이상 걸리는가? │ +│ │ │ +│ ├── No ──► 순차 실행 (이득 적음) │ +│ │ │ +│ └── Yes ──► 트랜잭션 일관성이 필요한가? │ +│ │ │ +│ ├── Yes ──► 순차 실행 │ +│ │ (단일 세션) │ +│ │ │ +│ └── No ──► 커넥션 풀 여유? │ +│ │ │ +│ ├── No ──► │ +│ │ 순차 실행 │ +│ │ │ +│ └── Yes ──► │ +│ 병렬 실행 │ +│ (별도세션) │ +│ │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### 7.5 성능 최적화 팁 1. **인덱스 확인**: 병렬화해도 인덱스 없으면 느림 2. **쿼리 최적화 우선**: 병렬화 전에 개별 쿼리 최적화 3. **적절한 병렬 수준**: 보통 3-10개가 적절 4. **모니터링 필수**: 실제 개선 효과 측정 +5. **커넥션 풀 모니터링**: 병렬 실행 시 풀 사용량 확인 --- ## 부록: 관련 자료 - [SQLAlchemy 2.0 AsyncIO 문서](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html) +- [SQLAlchemy AsyncSession 동시성 제한](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#using-asyncio-scoped-session) - [Python asyncio 공식 문서](https://docs.python.org/3/library/asyncio.html) - [FastAPI 비동기 데이터베이스](https://fastapi.tiangolo.com/async/) + +--- + +## 변경 이력 + +| 날짜 | 변경 내용 | +|------|----------| +| 2024-12 | AsyncSession 병렬 쿼리 제한사항 섹션 추가 (실제 프로젝트 에러 사례 포함) | +| 2024-12 | 잘못된 병렬 쿼리 예제 수정 | +| 2024-12 | 결정 플로우차트 추가 |