o2o-castad-backend/docs/analysis/db_쿼리_병렬화.md

1042 lines
38 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# DB 쿼리 병렬화 (Query Parallelization) 완벽 가이드
> **목적**: Python asyncio와 SQLAlchemy를 활용한 DB 쿼리 병렬화의 이론부터 실무 적용까지
> **대상**: 비동기 프로그래밍 기초 지식이 있는 백엔드 개발자
> **환경**: Python 3.11+, SQLAlchemy 2.0+, FastAPI
> **최종 수정**: 2024-12 (AsyncSession 병렬 쿼리 제한사항 추가)
---
## 목차
1. [이론적 배경](#1-이론적-배경)
2. [핵심 개념](#2-핵심-개념)
3. [SQLAlchemy AsyncSession 병렬 쿼리 제한사항](#3-sqlalchemy-asyncsession-병렬-쿼리-제한사항) ⚠️ **중요**
4. [설계 시 주의사항](#4-설계-시-주의사항)
5. [실무 시나리오 예제](#5-실무-시나리오-예제)
6. [성능 측정 및 모니터링](#6-성능-측정-및-모니터링)
7. [Best Practices](#7-best-practices)
---
## 1. 이론적 배경
### 1.1 동기 vs 비동기 실행
```
[순차 실행 - Sequential]
Query A ──────────▶ (100ms)
Query B ──────────▶ (100ms)
Query C ──────────▶ (100ms)
총 소요시간: 300ms
[병렬 실행 - Parallel]
Query A ──────────▶ (100ms)
Query B ──────────▶ (100ms)
Query C ──────────▶ (100ms)
총 소요시간: ~100ms (가장 느린 쿼리 기준)
```
### 1.2 왜 병렬화가 필요한가?
1. **I/O 바운드 작업의 특성**
- DB 쿼리는 네트워크 I/O가 대부분 (실제 CPU 작업은 짧음)
- 대기 시간 동안 다른 작업을 수행할 수 있음
2. **응답 시간 단축**
- N개의 독립적인 쿼리: `O(sum)``O(max)`
- 사용자 경험 개선
3. **리소스 효율성**
- 커넥션 풀을 효율적으로 활용
- 서버 처리량(throughput) 증가
### 1.3 asyncio.gather()의 동작 원리
```python
import asyncio
async def main():
# gather()는 모든 코루틴을 동시에 스케줄링
results = await asyncio.gather(
coroutine_1(), # Task 1 생성
coroutine_2(), # Task 2 생성
coroutine_3(), # Task 3 생성
)
# 모든 Task가 완료되면 결과를 리스트로 반환
return results
```
**핵심 동작:**
1. `gather()`는 각 코루틴을 Task로 래핑
2. 이벤트 루프가 모든 Task를 동시에 실행
3. I/O 대기 시 다른 Task로 컨텍스트 스위칭
4. 모든 Task 완료 시 결과 반환
---
## 2. 핵심 개념
### 2.1 독립성 판단 기준
병렬화가 가능한 쿼리의 조건:
| 조건 | 설명 | 예시 |
|------|------|------|
| **데이터 독립성** | 쿼리 간 결과 의존성 없음 | User, Product, Order 각각 조회 |
| **트랜잭션 독립성** | 같은 트랜잭션 내 순서 무관 | READ 작업들 |
| **비즈니스 독립성** | 결과 순서가 로직에 영향 없음 | 대시보드 데이터 조회 |
### 2.2 병렬화 불가능한 경우
```python
# ❌ 잘못된 예: 의존성이 있는 쿼리
user = await session.execute(select(User).where(User.id == user_id))
# orders 쿼리는 user.id에 의존 → 병렬화 불가
orders = await session.execute(
select(Order).where(Order.user_id == user.id)
)
```
```python
# ❌ 잘못된 예: 쓰기 후 읽기 (Write-then-Read)
await session.execute(insert(User).values(name="John"))
# 방금 생성된 데이터를 조회 → 순차 실행 필요
new_user = await session.execute(select(User).where(User.name == "John"))
```
---
## 3. SQLAlchemy AsyncSession 병렬 쿼리 제한사항
### ⚠️ 3.1 중요: 단일 AsyncSession에서 병렬 쿼리는 지원되지 않습니다
**이전에 잘못 알려진 내용:**
```python
# ❌ 이 코드는 실제로 작동하지 않습니다!
async with AsyncSessionLocal() as session:
results = await asyncio.gather(
session.execute(query1),
session.execute(query2),
session.execute(query3),
)
```
### 3.2 실제 발생하는 에러
위 코드를 실행하면 다음과 같은 에러가 발생합니다:
```
sqlalchemy.exc.InvalidRequestError:
Method 'close()' can't be called here; method '_connection_for_bind()'
is already in progress and this would cause an unexpected state change
to <SessionTransactionState.CLOSED: 5>
(Background on this error at: https://sqlalche.me/e/20/isce)
```
### 3.3 에러 발생 원인 분석
```
┌─────────────────────────────────────────────────────────────────────────┐
│ AsyncSession 내부 상태 충돌 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ AsyncSession은 내부적으로 하나의 Connection을 관리합니다. │
│ asyncio.gather()로 여러 쿼리를 동시에 실행하면: │
│ │
│ Time ──────────────────────────────────────────────────────────► │
│ │
│ Task 1: session.execute(query1) │
│ └── _connection_for_bind() 시작 ──► 연결 획득 중... │
│ │
│ Task 2: session.execute(query2) │
│ └── _connection_for_bind() 시작 ──► ⚠️ 충돌! │
│ (이미 Task 1이 연결 작업 중) │
│ │
│ Task 3: session.execute(query3) │
│ └── _connection_for_bind() 시작 ──► ⚠️ 충돌! │
│ │
│ 결과: InvalidRequestError 발생 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 핵심 원인: │
│ 1. AsyncSession은 단일 연결(Connection)을 사용 │
│ 2. 연결 상태 전이(state transition)가 순차적으로만 가능 │
│ 3. 동시에 여러 작업이 상태 전이를 시도하면 충돌 발생 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
### 3.4 SQLAlchemy 공식 문서의 설명
SQLAlchemy 2.0 문서에 따르면:
> The AsyncSession object is a mutable, stateful object which represents
> a single, stateful database transaction in progress. Using concurrent
> tasks with asyncio, with APIs such as asyncio.gather() for example,
> should use a **separate AsyncSession per individual task**.
**번역**: AsyncSession 객체는 진행 중인 단일 데이터베이스 트랜잭션을 나타내는
변경 가능한 상태 객체입니다. asyncio.gather() 같은 API로 동시 작업을 수행할 때는
**각 작업마다 별도의 AsyncSession**을 사용해야 합니다.
### 3.5 본 프로젝트에서 발생한 실제 사례
**문제가 발생한 코드 (video.py):**
```python
async def generate_video(task_id: str, ...):
async with AsyncSessionLocal() as session:
# ❌ 단일 세션에서 asyncio.gather() 사용 - 에러 발생!
project_result, lyric_result, song_result, image_result = (
await asyncio.gather(
session.execute(project_query),
session.execute(lyric_query),
session.execute(song_query),
session.execute(image_query),
)
)
```
**프론트엔드에서 표시된 에러:**
```
Method 'close()' can't be called here; method '_connection_for_bind()'
is already in progress and this would cause an unexpected state change
to <SessionTransactionState.CLOSED: 5>
```
### 3.6 해결 방법
#### 방법 1: 순차 실행 (권장 - 단순하고 안전)
```python
# ✅ 올바른 방법: 순차 실행
async def generate_video(task_id: str, ...):
async with AsyncSessionLocal() as session:
# 순차적으로 쿼리 실행 (가장 안전)
project_result = await session.execute(project_query)
lyric_result = await session.execute(lyric_query)
song_result = await session.execute(song_query)
image_result = await session.execute(image_query)
```
**장점:**
- 구현이 단순함
- 에러 처리가 명확함
- 트랜잭션 관리가 쉬움
**단점:**
- 총 실행 시간 = 각 쿼리 시간의 합
#### 방법 2: 별도 세션으로 병렬 실행 (성능 중요 시)
```python
# ✅ 올바른 방법: 각 쿼리마다 별도 세션 사용
async def fetch_with_separate_sessions(task_id: str):
async def get_project():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Project).where(Project.task_id == task_id)
)
return result.scalar_one_or_none()
async def get_lyric():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Lyric).where(Lyric.task_id == task_id)
)
return result.scalar_one_or_none()
async def get_song():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Song).where(Song.task_id == task_id)
)
return result.scalar_one_or_none()
async def get_images():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Image).where(Image.task_id == task_id)
)
return result.scalars().all()
# 별도 세션이므로 병렬 실행 가능!
project, lyric, song, images = await asyncio.gather(
get_project(),
get_lyric(),
get_song(),
get_images(),
)
return project, lyric, song, images
```
**장점:**
- 진정한 병렬 실행
- 총 실행 시간 = 가장 느린 쿼리 시간
**단점:**
- 커넥션 풀에서 여러 연결을 동시에 사용
- 각 쿼리가 별도 트랜잭션 (일관성 주의)
- 코드가 복잡해짐
#### 방법 3: 유틸리티 함수로 추상화
```python
from typing import TypeVar, Callable, Any
import asyncio
T = TypeVar('T')
async def parallel_queries(
queries: list[tuple[Callable, dict]],
) -> list[Any]:
"""
여러 쿼리를 별도 세션으로 병렬 실행합니다.
Args:
queries: [(query_func, kwargs), ...] 형태의 리스트
Returns:
각 쿼리의 결과 리스트
Example:
results = await parallel_queries([
(get_project, {"task_id": task_id}),
(get_song, {"task_id": task_id}),
])
"""
async def execute_with_session(query_func, kwargs):
async with AsyncSessionLocal() as session:
return await query_func(session, **kwargs)
return await asyncio.gather(*[
execute_with_session(func, kwargs)
for func, kwargs in queries
])
# 사용 예시
async def get_project(session, task_id: str):
result = await session.execute(
select(Project).where(Project.task_id == task_id)
)
return result.scalar_one_or_none()
async def get_song(session, task_id: str):
result = await session.execute(
select(Song).where(Song.task_id == task_id)
)
return result.scalar_one_or_none()
# 병렬 실행
project, song = await parallel_queries([
(get_project, {"task_id": "abc123"}),
(get_song, {"task_id": "abc123"}),
])
```
### 3.7 성능 비교: 순차 vs 별도 세션 병렬
```
┌─────────────────────────────────────────────────────────────────────────┐
│ 성능 비교 (4개 쿼리, 각 50ms) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ [순차 실행 - 단일 세션] │
│ ─────────────────────── │
│ Query 1 ─────▶ (50ms) │
│ Query 2 ─────▶ (50ms) │
│ Query 3 ─────▶ (50ms) │
│ Query 4 ─────▶ (50ms) │
│ 총 소요시간: ~200ms │
│ 커넥션 사용: 1개 │
│ │
│ [병렬 실행 - 별도 세션] │
│ ─────────────────────── │
│ Session 1: Query 1 ─────▶ (50ms) │
│ Session 2: Query 2 ─────▶ (50ms) │
│ Session 3: Query 3 ─────▶ (50ms) │
│ Session 4: Query 4 ─────▶ (50ms) │
│ 총 소요시간: ~55ms │
│ 커넥션 사용: 4개 (동시) │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 결론: │
│ - 성능 개선: 약 72% (200ms → 55ms) │
│ - 대가: 커넥션 풀 사용량 4배 증가 │
│ - 트레이드오프 고려 필요 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
### 3.8 언제 병렬 실행을 선택해야 하는가?
| 상황 | 권장 방식 | 이유 |
|------|----------|------|
| 쿼리 수 ≤ 3개 | 순차 실행 | 복잡도 대비 성능 이득 적음 |
| 쿼리 수 > 3개, 각 쿼리 > 50ms | 병렬 실행 | 유의미한 성능 개선 |
| 트랜잭션 일관성 필요 | 순차 실행 | 별도 세션은 별도 트랜잭션 |
| 커넥션 풀 여유 없음 | 순차 실행 | 풀 고갈 위험 |
| 실시간 응답 중요 (API) | 상황에 따라 | 사용자 경험 우선 |
| 백그라운드 작업 | 순차 실행 | 안정성 우선 |
### 3.9 커넥션 풀 고려사항
```python
# 엔진 설정 시 병렬 쿼리를 고려한 풀 크기 설정
engine = create_async_engine(
url=db_url,
pool_size=20, # 기본 풀 크기
max_overflow=20, # 추가 연결 허용
pool_timeout=30, # 풀 대기 타임아웃
)
# 계산 예시:
# - 동시 API 요청: 10개
# - 요청당 병렬 쿼리: 4개
# - 필요 커넥션: 10 × 4 = 40개
# - 설정: pool_size(20) + max_overflow(20) = 40개 ✅
```
---
## 4. 설계 시 주의사항
### 4.1 커넥션 풀 크기 설정
```python
# SQLAlchemy 엔진 설정
engine = create_async_engine(
url=db_url,
pool_size=20, # 기본 풀 크기
max_overflow=20, # 추가 연결 허용 수
pool_timeout=30, # 풀에서 연결 대기 시간
pool_recycle=3600, # 연결 재생성 주기
pool_pre_ping=True, # 연결 유효성 검사
)
```
**풀 크기 계산 공식:**
```
필요 커넥션 수 = 동시 요청 수 × 요청당 병렬 쿼리 수
```
예: 동시 10개 요청, 각 요청당 4개 병렬 쿼리
→ 최소 40개 커넥션 필요 (pool_size + max_overflow >= 40)
### 4.2 에러 처리 전략
```python
import asyncio
# 방법 1: return_exceptions=True (권장)
results = await asyncio.gather(
fetch_with_session_1(),
fetch_with_session_2(),
fetch_with_session_3(),
return_exceptions=True, # 예외를 결과로 반환
)
# 결과 처리
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Query {i} failed: {result}")
else:
print(f"Query {i} succeeded: {result}")
```
```python
# 방법 2: 개별 try-except 래핑
async def safe_fetch(query_func, **kwargs):
try:
async with AsyncSessionLocal() as session:
return await query_func(session, **kwargs)
except Exception as e:
print(f"Query failed: {e}")
return None
results = await asyncio.gather(
safe_fetch(get_project, task_id=task_id),
safe_fetch(get_song, task_id=task_id),
safe_fetch(get_images, task_id=task_id),
)
```
### 4.3 타임아웃 설정
```python
import asyncio
async def fetch_with_timeout(query_func, timeout_seconds: float, **kwargs):
"""타임아웃이 있는 쿼리 실행"""
try:
return await asyncio.wait_for(
query_func(**kwargs),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
raise Exception(f"Query timed out after {timeout_seconds}s")
# 사용 예
results = await asyncio.gather(
fetch_with_timeout(get_project, 5.0, task_id=task_id),
fetch_with_timeout(get_song, 5.0, task_id=task_id),
fetch_with_timeout(get_images, 10.0, task_id=task_id), # 더 긴 타임아웃
)
```
### 4.4 N+1 문제와 병렬화
```python
# ❌ N+1 문제 발생 코드
videos = await session.execute(select(Video))
for video in videos.scalars():
# N번의 추가 쿼리 발생!
project = await session.execute(
select(Project).where(Project.id == video.project_id)
)
# ✅ 해결 방법 1: JOIN 사용
query = select(Video).options(selectinload(Video.project))
videos = await session.execute(query)
# ✅ 해결 방법 2: IN 절로 배치 조회
video_list = videos.scalars().all()
project_ids = [v.project_id for v in video_list if v.project_id]
projects_result = await session.execute(
select(Project).where(Project.id.in_(project_ids))
)
projects_map = {p.id: p for p in projects_result.scalars().all()}
```
### 4.5 트랜잭션 격리 수준 고려
| 격리 수준 | 병렬 쿼리 안전성 | 설명 |
|-----------|------------------|------|
| READ UNCOMMITTED | ⚠️ 주의 | Dirty Read 가능 |
| READ COMMITTED | ✅ 안전 | 대부분의 경우 적합 |
| REPEATABLE READ | ✅ 안전 | 일관된 스냅샷 |
| SERIALIZABLE | ✅ 안전 | 성능 저하 가능 |
---
## 5. 실무 시나리오 예제
### 5.1 시나리오 1: 대시보드 데이터 조회 (별도 세션 병렬)
**요구사항**: 사용자 대시보드에 필요한 여러 통계 데이터를 한 번에 조회
```python
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
async def get_user(session: AsyncSession, user_id: int):
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_recent_orders(session: AsyncSession, user_id: int):
result = await session.execute(
select(Order)
.where(Order.user_id == user_id)
.order_by(Order.created_at.desc())
.limit(5)
)
return result.scalars().all()
async def get_total_amount(session: AsyncSession, user_id: int):
result = await session.execute(
select(func.sum(Order.amount))
.where(Order.user_id == user_id)
)
return result.scalar() or 0
async def get_wishlist_count(session: AsyncSession, user_id: int):
result = await session.execute(
select(func.count(Wishlist.id))
.where(Wishlist.user_id == user_id)
)
return result.scalar() or 0
async def get_dashboard_data(user_id: int) -> dict:
"""
대시보드에 필요한 모든 데이터를 병렬로 조회합니다.
각 쿼리는 별도의 세션을 사용합니다.
"""
async def fetch_user():
async with AsyncSessionLocal() as session:
return await get_user(session, user_id)
async def fetch_orders():
async with AsyncSessionLocal() as session:
return await get_recent_orders(session, user_id)
async def fetch_amount():
async with AsyncSessionLocal() as session:
return await get_total_amount(session, user_id)
async def fetch_wishlist():
async with AsyncSessionLocal() as session:
return await get_wishlist_count(session, user_id)
# 4개 쿼리를 별도 세션으로 병렬 실행
user, orders, total_amount, wishlist_count = await asyncio.gather(
fetch_user(),
fetch_orders(),
fetch_amount(),
fetch_wishlist(),
)
if not user:
raise ValueError(f"User {user_id} not found")
return {
"user": {"id": user.id, "name": user.name, "email": user.email},
"recent_orders": [
{"id": o.id, "amount": o.amount, "status": o.status}
for o in orders
],
"total_spent": total_amount,
"wishlist_count": wishlist_count,
}
# 사용 예시 (FastAPI)
@router.get("/dashboard")
async def dashboard(user_id: int):
return await get_dashboard_data(user_id)
```
**성능 비교:**
- 순차 실행: ~200ms (50ms × 4)
- 병렬 실행: ~60ms (가장 느린 쿼리 기준)
- **개선율: 약 70%**
---
### 5.2 시나리오 2: 영상 생성 데이터 조회 (순차 실행 - 권장)
**요구사항**: 영상 생성을 위해 Project, Lyric, Song, Image 데이터를 조회
**본 프로젝트에서 실제로 적용된 패턴입니다.**
```python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from dataclasses import dataclass
from fastapi import HTTPException
@dataclass
class VideoGenerationData:
"""영상 생성에 필요한 데이터"""
project_id: int
lyric_id: int
song_id: int
music_url: str
song_duration: float
lyrics: str
image_urls: list[str]
async def generate_video(
task_id: str,
orientation: str = "vertical",
) -> GenerateVideoResponse:
"""
Creatomate API를 통해 영상을 생성합니다.
중요: SQLAlchemy AsyncSession은 단일 세션에서 동시에 여러 쿼리를 실행하는 것을
지원하지 않습니다. asyncio.gather()로 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다.
따라서 쿼리는 순차적으로 실행합니다.
"""
from app.database.session import AsyncSessionLocal
print(f"[generate_video] START - task_id: {task_id}")
# 외부 API 호출 전에 필요한 데이터를 저장할 변수들
project_id: int | None = None
lyric_id: int | None = None
song_id: int | None = None
video_id: int | None = None
music_url: str | None = None
song_duration: float | None = None
lyrics: str | None = None
image_urls: list[str] = []
try:
# 세션을 명시적으로 열고 DB 작업 후 바로 닫음
async with AsyncSessionLocal() as session:
# ===== 순차 쿼리 실행: Project, Lyric, Song, Image =====
# Note: AsyncSession은 동일 세션에서 병렬 쿼리를 지원하지 않음
# Project 조회
project_result = await session.execute(
select(Project)
.where(Project.task_id == task_id)
.order_by(Project.created_at.desc())
.limit(1)
)
# Lyric 조회
lyric_result = await session.execute(
select(Lyric)
.where(Lyric.task_id == task_id)
.order_by(Lyric.created_at.desc())
.limit(1)
)
# Song 조회
song_result = await session.execute(
select(Song)
.where(Song.task_id == task_id)
.order_by(Song.created_at.desc())
.limit(1)
)
# Image 조회
image_result = await session.execute(
select(Image)
.where(Image.task_id == task_id)
.order_by(Image.img_order.asc())
)
print(f"[generate_video] Queries completed - task_id: {task_id}")
# 결과 처리 및 검증
project = project_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project_id = project.id
lyric = lyric_result.scalar_one_or_none()
if not lyric:
raise HTTPException(status_code=404, detail="Lyric not found")
lyric_id = lyric.id
song = song_result.scalar_one_or_none()
if not song:
raise HTTPException(status_code=404, detail="Song not found")
song_id = song.id
music_url = song.song_result_url
song_duration = song.duration
lyrics = song.song_prompt
images = image_result.scalars().all()
if not images:
raise HTTPException(status_code=404, detail="Images not found")
image_urls = [img.img_url for img in images]
# Video 레코드 생성 및 커밋
video = Video(
project_id=project_id,
lyric_id=lyric_id,
song_id=song_id,
task_id=task_id,
status="processing",
)
session.add(video)
await session.commit()
video_id = video.id
# 세션 종료 후 외부 API 호출 (커넥션 타임아웃 방지)
# ... Creatomate API 호출 로직 ...
except HTTPException:
raise
except Exception as e:
print(f"[generate_video] EXCEPTION - {e}")
raise
```
**이 패턴을 선택한 이유:**
1. **안정성**: 단일 세션 내에서 모든 쿼리 실행으로 트랜잭션 일관성 보장
2. **단순성**: 코드가 명확하고 디버깅이 쉬움
3. **충분한 성능**: 4개의 간단한 쿼리는 순차 실행해도 ~200ms 이내
4. **에러 방지**: AsyncSession 병렬 쿼리 제한으로 인한 에러 방지
---
### 5.3 시나리오 3: 복합 검색 (트레이드오프 분석)
```python
# 방법 A: 순차 실행 (단순, 안전)
async def search_sequential(session: AsyncSession, keyword: str):
items = await session.execute(items_query)
count = await session.execute(count_query)
categories = await session.execute(category_query)
price_range = await session.execute(price_query)
brands = await session.execute(brand_query)
return items, count, categories, price_range, brands
# 예상 시간: ~350ms (70ms × 5)
# 방법 B: 별도 세션 병렬 실행 (빠름, 복잡)
async def search_parallel(keyword: str):
async def fetch_items():
async with AsyncSessionLocal() as s:
return await s.execute(items_query)
async def fetch_count():
async with AsyncSessionLocal() as s:
return await s.execute(count_query)
# ... 나머지 함수들 ...
return await asyncio.gather(
fetch_items(),
fetch_count(),
fetch_categories(),
fetch_price_range(),
fetch_brands(),
)
# 예상 시간: ~80ms
# 결정 기준:
# - 검색 API가 자주 호출되고 응답 시간이 중요하다면 → 방법 B
# - 안정성이 우선이고 복잡도를 낮추고 싶다면 → 방법 A
# - 커넥션 풀 여유가 없다면 → 방법 A
```
---
## 6. 성능 측정 및 모니터링
### 6.1 실행 시간 측정 데코레이터
```python
import time
import functools
import asyncio
from typing import Callable, TypeVar
T = TypeVar("T")
def measure_time(func: Callable[..., T]) -> Callable[..., T]:
"""함수 실행 시간을 측정하는 데코레이터"""
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return await func(*args, **kwargs)
finally:
elapsed = (time.perf_counter() - start) * 1000
print(f"[{func.__name__}] Execution time: {elapsed:.2f}ms")
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return func(*args, **kwargs)
finally:
elapsed = (time.perf_counter() - start) * 1000
print(f"[{func.__name__}] Execution time: {elapsed:.2f}ms")
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
# 사용 예
@measure_time
async def fetch_data(task_id: str):
...
```
### 6.2 병렬 vs 순차 성능 비교 유틸리티
```python
import asyncio
import time
async def compare_execution_methods(
task_id: str,
query_funcs: list[Callable],
) -> dict:
"""순차 실행과 병렬 실행(별도 세션)의 성능을 비교합니다."""
# 순차 실행 (단일 세션)
sequential_start = time.perf_counter()
async with AsyncSessionLocal() as session:
sequential_results = []
for func in query_funcs:
result = await func(session, task_id)
sequential_results.append(result)
sequential_time = (time.perf_counter() - sequential_start) * 1000
# 병렬 실행 (별도 세션)
parallel_start = time.perf_counter()
async def run_with_session(func):
async with AsyncSessionLocal() as session:
return await func(session, task_id)
parallel_results = await asyncio.gather(*[
run_with_session(func) for func in query_funcs
])
parallel_time = (time.perf_counter() - parallel_start) * 1000
improvement = ((sequential_time - parallel_time) / sequential_time) * 100
return {
"sequential_time_ms": round(sequential_time, 2),
"parallel_time_ms": round(parallel_time, 2),
"improvement_percent": round(improvement, 1),
"query_count": len(query_funcs),
}
```
### 6.3 SQLAlchemy 쿼리 로깅
```python
import logging
# SQLAlchemy 쿼리 로깅 활성화
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
# 또는 엔진 생성 시 echo=True
engine = create_async_engine(url, echo=True)
```
---
## 7. Best Practices
### 7.1 체크리스트
병렬화 적용 전 확인사항:
- [ ] 쿼리들이 서로 독립적인가? (결과 의존성 없음)
- [ ] 모든 쿼리가 READ 작업인가? (또는 순서 무관한 WRITE)
- [ ] **별도 세션을 사용할 것인가?** (AsyncSession 제한사항)
- [ ] 커넥션 풀 크기가 충분한가?
- [ ] 에러 처리 전략이 수립되어 있는가?
- [ ] 타임아웃 설정이 적절한가?
- [ ] 트랜잭션 일관성이 필요한가?
### 7.2 권장 패턴
```python
# ✅ 패턴 1: 순차 실행 (단순하고 안전)
async def fetch_data(session: AsyncSession, task_id: str):
project = await session.execute(project_query)
song = await session.execute(song_query)
return project, song
# ✅ 패턴 2: 별도 세션으로 병렬 실행 (성능 중요 시)
async def fetch_data_parallel(task_id: str):
async def get_project():
async with AsyncSessionLocal() as s:
return await s.execute(project_query)
async def get_song():
async with AsyncSessionLocal() as s:
return await s.execute(song_query)
return await asyncio.gather(get_project(), get_song())
```
### 7.3 피해야 할 패턴
```python
# ❌ 절대 금지: 단일 세션에서 asyncio.gather()
async with AsyncSessionLocal() as session:
results = await asyncio.gather(
session.execute(query1),
session.execute(query2),
)
# 에러 발생: InvalidRequestError - Method 'close()' can't be called here
# ❌ 피하기: 과도한 병렬화 (커넥션 고갈)
# 100개 쿼리를 동시에 실행하면 커넥션 풀 고갈 위험
results = await asyncio.gather(*[fetch() for _ in range(100)])
# ✅ 해결: 배치 처리
BATCH_SIZE = 10
for i in range(0, len(items), BATCH_SIZE):
batch = items[i:i + BATCH_SIZE]
results = await asyncio.gather(*[fetch(item) for item in batch])
```
### 7.4 결정 가이드
```
┌─────────────────────────────────────────────────────────────────────────┐
│ 병렬화 결정 플로우차트 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 쿼리 개수가 3개 이하인가? │
│ │ │
│ ├── Yes ──► 순차 실행 (복잡도 대비 이득 적음) │
│ │ │
│ └── No ──► 각 쿼리가 50ms 이상 걸리는가? │
│ │ │
│ ├── No ──► 순차 실행 (이득 적음) │
│ │ │
│ └── Yes ──► 트랜잭션 일관성이 필요한가? │
│ │ │
│ ├── Yes ──► 순차 실행 │
│ │ (단일 세션) │
│ │ │
│ └── No ──► 커넥션 풀 여유? │
│ │ │
│ ├── No ──► │
│ │ 순차 실행 │
│ │ │
│ └── Yes ──► │
│ 병렬 실행 │
│ (별도세션) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
### 7.5 성능 최적화 팁
1. **인덱스 확인**: 병렬화해도 인덱스 없으면 느림
2. **쿼리 최적화 우선**: 병렬화 전에 개별 쿼리 최적화
3. **적절한 병렬 수준**: 보통 3-10개가 적절
4. **모니터링 필수**: 실제 개선 효과 측정
5. **커넥션 풀 모니터링**: 병렬 실행 시 풀 사용량 확인
---
## 부록: 관련 자료
- [SQLAlchemy 2.0 AsyncIO 문서](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html)
- [SQLAlchemy AsyncSession 동시성 제한](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#using-asyncio-scoped-session)
- [Python asyncio 공식 문서](https://docs.python.org/3/library/asyncio.html)
- [FastAPI 비동기 데이터베이스](https://fastapi.tiangolo.com/async/)
---
## 변경 이력
| 날짜 | 변경 내용 |
|------|----------|
| 2024-12 | AsyncSession 병렬 쿼리 제한사항 섹션 추가 (실제 프로젝트 에러 사례 포함) |
| 2024-12 | 잘못된 병렬 쿼리 예제 수정 |
| 2024-12 | 결정 플로우차트 추가 |