37 KiB
DB 쿼리 병렬화 (Query Parallelization) 완벽 가이드
목적: Python asyncio와 SQLAlchemy를 활용한 DB 쿼리 병렬화의 이론부터 실무 적용까지 대상: 비동기 프로그래밍 기초 지식이 있는 백엔드 개발자 환경: Python 3.11+, SQLAlchemy 2.0+, FastAPI 최종 수정: 2024-12 (AsyncSession 병렬 쿼리 제한사항 추가)
목차
- 이론적 배경
- 핵심 개념
- SQLAlchemy AsyncSession 병렬 쿼리 제한사항 ⚠️ 중요
- 설계 시 주의사항
- 실무 시나리오 예제
- 성능 측정 및 모니터링
- Best Practices
1. 이론적 배경
1.1 동기 vs 비동기 실행
[순차 실행 - Sequential]
Query A ──────────▶ (100ms)
Query B ──────────▶ (100ms)
Query C ──────────▶ (100ms)
총 소요시간: 300ms
[병렬 실행 - Parallel]
Query A ──────────▶ (100ms)
Query B ──────────▶ (100ms)
Query C ──────────▶ (100ms)
총 소요시간: ~100ms (가장 느린 쿼리 기준)
1.2 왜 병렬화가 필요한가?
-
I/O 바운드 작업의 특성
- DB 쿼리는 네트워크 I/O가 대부분 (실제 CPU 작업은 짧음)
- 대기 시간 동안 다른 작업을 수행할 수 있음
-
응답 시간 단축
- N개의 독립적인 쿼리:
O(sum)→O(max) - 사용자 경험 개선
- N개의 독립적인 쿼리:
-
리소스 효율성
- 커넥션 풀을 효율적으로 활용
- 서버 처리량(throughput) 증가
1.3 asyncio.gather()의 동작 원리
import asyncio
async def main():
# gather()는 모든 코루틴을 동시에 스케줄링
results = await asyncio.gather(
coroutine_1(), # Task 1 생성
coroutine_2(), # Task 2 생성
coroutine_3(), # Task 3 생성
)
# 모든 Task가 완료되면 결과를 리스트로 반환
return results
핵심 동작:
gather()는 각 코루틴을 Task로 래핑- 이벤트 루프가 모든 Task를 동시에 실행
- I/O 대기 시 다른 Task로 컨텍스트 스위칭
- 모든 Task 완료 시 결과 반환
2. 핵심 개념
2.1 독립성 판단 기준
병렬화가 가능한 쿼리의 조건:
| 조건 | 설명 | 예시 |
|---|---|---|
| 데이터 독립성 | 쿼리 간 결과 의존성 없음 | User, Product, Order 각각 조회 |
| 트랜잭션 독립성 | 같은 트랜잭션 내 순서 무관 | READ 작업들 |
| 비즈니스 독립성 | 결과 순서가 로직에 영향 없음 | 대시보드 데이터 조회 |
2.2 병렬화 불가능한 경우
# ❌ 잘못된 예: 의존성이 있는 쿼리
user = await session.execute(select(User).where(User.id == user_id))
# orders 쿼리는 user.id에 의존 → 병렬화 불가
orders = await session.execute(
select(Order).where(Order.user_id == user.id)
)
# ❌ 잘못된 예: 쓰기 후 읽기 (Write-then-Read)
await session.execute(insert(User).values(name="John"))
# 방금 생성된 데이터를 조회 → 순차 실행 필요
new_user = await session.execute(select(User).where(User.name == "John"))
3. SQLAlchemy AsyncSession 병렬 쿼리 제한사항
⚠️ 3.1 중요: 단일 AsyncSession에서 병렬 쿼리는 지원되지 않습니다
이전에 잘못 알려진 내용:
# ❌ 이 코드는 실제로 작동하지 않습니다!
async with AsyncSessionLocal() as session:
results = await asyncio.gather(
session.execute(query1),
session.execute(query2),
session.execute(query3),
)
3.2 실제 발생하는 에러
위 코드를 실행하면 다음과 같은 에러가 발생합니다:
sqlalchemy.exc.InvalidRequestError:
Method 'close()' can't be called here; method '_connection_for_bind()'
is already in progress and this would cause an unexpected state change
to <SessionTransactionState.CLOSED: 5>
(Background on this error at: https://sqlalche.me/e/20/isce)
3.3 에러 발생 원인 분석
┌─────────────────────────────────────────────────────────────────────────┐
│ AsyncSession 내부 상태 충돌 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ AsyncSession은 내부적으로 하나의 Connection을 관리합니다. │
│ asyncio.gather()로 여러 쿼리를 동시에 실행하면: │
│ │
│ Time ──────────────────────────────────────────────────────────► │
│ │
│ Task 1: session.execute(query1) │
│ └── _connection_for_bind() 시작 ──► 연결 획득 중... │
│ │
│ Task 2: session.execute(query2) │
│ └── _connection_for_bind() 시작 ──► ⚠️ 충돌! │
│ (이미 Task 1이 연결 작업 중) │
│ │
│ Task 3: session.execute(query3) │
│ └── _connection_for_bind() 시작 ──► ⚠️ 충돌! │
│ │
│ 결과: InvalidRequestError 발생 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 핵심 원인: │
│ 1. AsyncSession은 단일 연결(Connection)을 사용 │
│ 2. 연결 상태 전이(state transition)가 순차적으로만 가능 │
│ 3. 동시에 여러 작업이 상태 전이를 시도하면 충돌 발생 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
3.4 SQLAlchemy 공식 문서의 설명
SQLAlchemy 2.0 문서에 따르면:
The AsyncSession object is a mutable, stateful object which represents a single, stateful database transaction in progress. Using concurrent tasks with asyncio, with APIs such as asyncio.gather() for example, should use a separate AsyncSession per individual task.
번역: AsyncSession 객체는 진행 중인 단일 데이터베이스 트랜잭션을 나타내는 변경 가능한 상태 객체입니다. asyncio.gather() 같은 API로 동시 작업을 수행할 때는 각 작업마다 별도의 AsyncSession을 사용해야 합니다.
3.5 본 프로젝트에서 발생한 실제 사례
문제가 발생한 코드 (video.py):
async def generate_video(task_id: str, ...):
async with AsyncSessionLocal() as session:
# ❌ 단일 세션에서 asyncio.gather() 사용 - 에러 발생!
project_result, lyric_result, song_result, image_result = (
await asyncio.gather(
session.execute(project_query),
session.execute(lyric_query),
session.execute(song_query),
session.execute(image_query),
)
)
프론트엔드에서 표시된 에러:
Method 'close()' can't be called here; method '_connection_for_bind()'
is already in progress and this would cause an unexpected state change
to <SessionTransactionState.CLOSED: 5>
3.6 해결 방법
방법 1: 순차 실행 (권장 - 단순하고 안전)
# ✅ 올바른 방법: 순차 실행
async def generate_video(task_id: str, ...):
async with AsyncSessionLocal() as session:
# 순차적으로 쿼리 실행 (가장 안전)
project_result = await session.execute(project_query)
lyric_result = await session.execute(lyric_query)
song_result = await session.execute(song_query)
image_result = await session.execute(image_query)
장점:
- 구현이 단순함
- 에러 처리가 명확함
- 트랜잭션 관리가 쉬움
단점:
- 총 실행 시간 = 각 쿼리 시간의 합
방법 2: 별도 세션으로 병렬 실행 (성능 중요 시)
# ✅ 올바른 방법: 각 쿼리마다 별도 세션 사용
async def fetch_with_separate_sessions(task_id: str):
async def get_project():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Project).where(Project.task_id == task_id)
)
return result.scalar_one_or_none()
async def get_lyric():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Lyric).where(Lyric.task_id == task_id)
)
return result.scalar_one_or_none()
async def get_song():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Song).where(Song.task_id == task_id)
)
return result.scalar_one_or_none()
async def get_images():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Image).where(Image.task_id == task_id)
)
return result.scalars().all()
# 별도 세션이므로 병렬 실행 가능!
project, lyric, song, images = await asyncio.gather(
get_project(),
get_lyric(),
get_song(),
get_images(),
)
return project, lyric, song, images
장점:
- 진정한 병렬 실행
- 총 실행 시간 = 가장 느린 쿼리 시간
단점:
- 커넥션 풀에서 여러 연결을 동시에 사용
- 각 쿼리가 별도 트랜잭션 (일관성 주의)
- 코드가 복잡해짐
방법 3: 유틸리티 함수로 추상화
from typing import TypeVar, Callable, Any
import asyncio
T = TypeVar('T')
async def parallel_queries(
queries: list[tuple[Callable, dict]],
) -> list[Any]:
"""
여러 쿼리를 별도 세션으로 병렬 실행합니다.
Args:
queries: [(query_func, kwargs), ...] 형태의 리스트
Returns:
각 쿼리의 결과 리스트
Example:
results = await parallel_queries([
(get_project, {"task_id": task_id}),
(get_song, {"task_id": task_id}),
])
"""
async def execute_with_session(query_func, kwargs):
async with AsyncSessionLocal() as session:
return await query_func(session, **kwargs)
return await asyncio.gather(*[
execute_with_session(func, kwargs)
for func, kwargs in queries
])
# 사용 예시
async def get_project(session, task_id: str):
result = await session.execute(
select(Project).where(Project.task_id == task_id)
)
return result.scalar_one_or_none()
async def get_song(session, task_id: str):
result = await session.execute(
select(Song).where(Song.task_id == task_id)
)
return result.scalar_one_or_none()
# 병렬 실행
project, song = await parallel_queries([
(get_project, {"task_id": "abc123"}),
(get_song, {"task_id": "abc123"}),
])
3.7 성능 비교: 순차 vs 별도 세션 병렬
┌─────────────────────────────────────────────────────────────────────────┐
│ 성능 비교 (4개 쿼리, 각 50ms) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ [순차 실행 - 단일 세션] │
│ ─────────────────────── │
│ Query 1 ─────▶ (50ms) │
│ Query 2 ─────▶ (50ms) │
│ Query 3 ─────▶ (50ms) │
│ Query 4 ─────▶ (50ms) │
│ 총 소요시간: ~200ms │
│ 커넥션 사용: 1개 │
│ │
│ [병렬 실행 - 별도 세션] │
│ ─────────────────────── │
│ Session 1: Query 1 ─────▶ (50ms) │
│ Session 2: Query 2 ─────▶ (50ms) │
│ Session 3: Query 3 ─────▶ (50ms) │
│ Session 4: Query 4 ─────▶ (50ms) │
│ 총 소요시간: ~55ms │
│ 커넥션 사용: 4개 (동시) │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 결론: │
│ - 성능 개선: 약 72% (200ms → 55ms) │
│ - 대가: 커넥션 풀 사용량 4배 증가 │
│ - 트레이드오프 고려 필요 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
3.8 언제 병렬 실행을 선택해야 하는가?
| 상황 | 권장 방식 | 이유 |
|---|---|---|
| 쿼리 수 ≤ 3개 | 순차 실행 | 복잡도 대비 성능 이득 적음 |
| 쿼리 수 > 3개, 각 쿼리 > 50ms | 병렬 실행 | 유의미한 성능 개선 |
| 트랜잭션 일관성 필요 | 순차 실행 | 별도 세션은 별도 트랜잭션 |
| 커넥션 풀 여유 없음 | 순차 실행 | 풀 고갈 위험 |
| 실시간 응답 중요 (API) | 상황에 따라 | 사용자 경험 우선 |
| 백그라운드 작업 | 순차 실행 | 안정성 우선 |
3.9 커넥션 풀 고려사항
# 엔진 설정 시 병렬 쿼리를 고려한 풀 크기 설정
engine = create_async_engine(
url=db_url,
pool_size=20, # 기본 풀 크기
max_overflow=20, # 추가 연결 허용
pool_timeout=30, # 풀 대기 타임아웃
)
# 계산 예시:
# - 동시 API 요청: 10개
# - 요청당 병렬 쿼리: 4개
# - 필요 커넥션: 10 × 4 = 40개
# - 설정: pool_size(20) + max_overflow(20) = 40개 ✅
4. 설계 시 주의사항
4.1 커넥션 풀 크기 설정
# SQLAlchemy 엔진 설정
engine = create_async_engine(
url=db_url,
pool_size=20, # 기본 풀 크기
max_overflow=20, # 추가 연결 허용 수
pool_timeout=30, # 풀에서 연결 대기 시간
pool_recycle=3600, # 연결 재생성 주기
pool_pre_ping=True, # 연결 유효성 검사
)
풀 크기 계산 공식:
필요 커넥션 수 = 동시 요청 수 × 요청당 병렬 쿼리 수
예: 동시 10개 요청, 각 요청당 4개 병렬 쿼리 → 최소 40개 커넥션 필요 (pool_size + max_overflow >= 40)
4.2 에러 처리 전략
import asyncio
# 방법 1: return_exceptions=True (권장)
results = await asyncio.gather(
fetch_with_session_1(),
fetch_with_session_2(),
fetch_with_session_3(),
return_exceptions=True, # 예외를 결과로 반환
)
# 결과 처리
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Query {i} failed: {result}")
else:
print(f"Query {i} succeeded: {result}")
# 방법 2: 개별 try-except 래핑
async def safe_fetch(query_func, **kwargs):
try:
async with AsyncSessionLocal() as session:
return await query_func(session, **kwargs)
except Exception as e:
print(f"Query failed: {e}")
return None
results = await asyncio.gather(
safe_fetch(get_project, task_id=task_id),
safe_fetch(get_song, task_id=task_id),
safe_fetch(get_images, task_id=task_id),
)
4.3 타임아웃 설정
import asyncio
async def fetch_with_timeout(query_func, timeout_seconds: float, **kwargs):
"""타임아웃이 있는 쿼리 실행"""
try:
return await asyncio.wait_for(
query_func(**kwargs),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
raise Exception(f"Query timed out after {timeout_seconds}s")
# 사용 예
results = await asyncio.gather(
fetch_with_timeout(get_project, 5.0, task_id=task_id),
fetch_with_timeout(get_song, 5.0, task_id=task_id),
fetch_with_timeout(get_images, 10.0, task_id=task_id), # 더 긴 타임아웃
)
4.4 N+1 문제와 병렬화
# ❌ N+1 문제 발생 코드
videos = await session.execute(select(Video))
for video in videos.scalars():
# N번의 추가 쿼리 발생!
project = await session.execute(
select(Project).where(Project.id == video.project_id)
)
# ✅ 해결 방법 1: JOIN 사용
query = select(Video).options(selectinload(Video.project))
videos = await session.execute(query)
# ✅ 해결 방법 2: IN 절로 배치 조회
video_list = videos.scalars().all()
project_ids = [v.project_id for v in video_list if v.project_id]
projects_result = await session.execute(
select(Project).where(Project.id.in_(project_ids))
)
projects_map = {p.id: p for p in projects_result.scalars().all()}
4.5 트랜잭션 격리 수준 고려
| 격리 수준 | 병렬 쿼리 안전성 | 설명 |
|---|---|---|
| READ UNCOMMITTED | ⚠️ 주의 | Dirty Read 가능 |
| READ COMMITTED | ✅ 안전 | 대부분의 경우 적합 |
| REPEATABLE READ | ✅ 안전 | 일관된 스냅샷 |
| SERIALIZABLE | ✅ 안전 | 성능 저하 가능 |
5. 실무 시나리오 예제
5.1 시나리오 1: 대시보드 데이터 조회 (별도 세션 병렬)
요구사항: 사용자 대시보드에 필요한 여러 통계 데이터를 한 번에 조회
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
async def get_user(session: AsyncSession, user_id: int):
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_recent_orders(session: AsyncSession, user_id: int):
result = await session.execute(
select(Order)
.where(Order.user_id == user_id)
.order_by(Order.created_at.desc())
.limit(5)
)
return result.scalars().all()
async def get_total_amount(session: AsyncSession, user_id: int):
result = await session.execute(
select(func.sum(Order.amount))
.where(Order.user_id == user_id)
)
return result.scalar() or 0
async def get_wishlist_count(session: AsyncSession, user_id: int):
result = await session.execute(
select(func.count(Wishlist.id))
.where(Wishlist.user_id == user_id)
)
return result.scalar() or 0
async def get_dashboard_data(user_id: int) -> dict:
"""
대시보드에 필요한 모든 데이터를 병렬로 조회합니다.
각 쿼리는 별도의 세션을 사용합니다.
"""
async def fetch_user():
async with AsyncSessionLocal() as session:
return await get_user(session, user_id)
async def fetch_orders():
async with AsyncSessionLocal() as session:
return await get_recent_orders(session, user_id)
async def fetch_amount():
async with AsyncSessionLocal() as session:
return await get_total_amount(session, user_id)
async def fetch_wishlist():
async with AsyncSessionLocal() as session:
return await get_wishlist_count(session, user_id)
# 4개 쿼리를 별도 세션으로 병렬 실행
user, orders, total_amount, wishlist_count = await asyncio.gather(
fetch_user(),
fetch_orders(),
fetch_amount(),
fetch_wishlist(),
)
if not user:
raise ValueError(f"User {user_id} not found")
return {
"user": {"id": user.id, "name": user.name, "email": user.email},
"recent_orders": [
{"id": o.id, "amount": o.amount, "status": o.status}
for o in orders
],
"total_spent": total_amount,
"wishlist_count": wishlist_count,
}
# 사용 예시 (FastAPI)
@router.get("/dashboard")
async def dashboard(user_id: int):
return await get_dashboard_data(user_id)
성능 비교:
- 순차 실행: ~200ms (50ms × 4)
- 병렬 실행: ~60ms (가장 느린 쿼리 기준)
- 개선율: 약 70%
5.2 시나리오 2: 영상 생성 데이터 조회 (순차 실행 - 권장)
요구사항: 영상 생성을 위해 Project, Lyric, Song, Image 데이터를 조회
본 프로젝트에서 실제로 적용된 패턴입니다.
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from dataclasses import dataclass
from fastapi import HTTPException
@dataclass
class VideoGenerationData:
"""영상 생성에 필요한 데이터"""
project_id: int
lyric_id: int
song_id: int
music_url: str
song_duration: float
lyrics: str
image_urls: list[str]
async def generate_video(
task_id: str,
orientation: str = "vertical",
) -> GenerateVideoResponse:
"""
Creatomate API를 통해 영상을 생성합니다.
중요: SQLAlchemy AsyncSession은 단일 세션에서 동시에 여러 쿼리를 실행하는 것을
지원하지 않습니다. asyncio.gather()로 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다.
따라서 쿼리는 순차적으로 실행합니다.
"""
from app.database.session import AsyncSessionLocal
print(f"[generate_video] START - task_id: {task_id}")
# 외부 API 호출 전에 필요한 데이터를 저장할 변수들
project_id: int | None = None
lyric_id: int | None = None
song_id: int | None = None
video_id: int | None = None
music_url: str | None = None
song_duration: float | None = None
lyrics: str | None = None
image_urls: list[str] = []
try:
# 세션을 명시적으로 열고 DB 작업 후 바로 닫음
async with AsyncSessionLocal() as session:
# ===== 순차 쿼리 실행: Project, Lyric, Song, Image =====
# Note: AsyncSession은 동일 세션에서 병렬 쿼리를 지원하지 않음
# Project 조회
project_result = await session.execute(
select(Project)
.where(Project.task_id == task_id)
.order_by(Project.created_at.desc())
.limit(1)
)
# Lyric 조회
lyric_result = await session.execute(
select(Lyric)
.where(Lyric.task_id == task_id)
.order_by(Lyric.created_at.desc())
.limit(1)
)
# Song 조회
song_result = await session.execute(
select(Song)
.where(Song.task_id == task_id)
.order_by(Song.created_at.desc())
.limit(1)
)
# Image 조회
image_result = await session.execute(
select(Image)
.where(Image.task_id == task_id)
.order_by(Image.img_order.asc())
)
print(f"[generate_video] Queries completed - task_id: {task_id}")
# 결과 처리 및 검증
project = project_result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project_id = project.id
lyric = lyric_result.scalar_one_or_none()
if not lyric:
raise HTTPException(status_code=404, detail="Lyric not found")
lyric_id = lyric.id
song = song_result.scalar_one_or_none()
if not song:
raise HTTPException(status_code=404, detail="Song not found")
song_id = song.id
music_url = song.song_result_url
song_duration = song.duration
lyrics = song.song_prompt
images = image_result.scalars().all()
if not images:
raise HTTPException(status_code=404, detail="Images not found")
image_urls = [img.img_url for img in images]
# Video 레코드 생성 및 커밋
video = Video(
project_id=project_id,
lyric_id=lyric_id,
song_id=song_id,
task_id=task_id,
status="processing",
)
session.add(video)
await session.commit()
video_id = video.id
# 세션 종료 후 외부 API 호출 (커넥션 타임아웃 방지)
# ... Creatomate API 호출 로직 ...
except HTTPException:
raise
except Exception as e:
print(f"[generate_video] EXCEPTION - {e}")
raise
이 패턴을 선택한 이유:
- 안정성: 단일 세션 내에서 모든 쿼리 실행으로 트랜잭션 일관성 보장
- 단순성: 코드가 명확하고 디버깅이 쉬움
- 충분한 성능: 4개의 간단한 쿼리는 순차 실행해도 ~200ms 이내
- 에러 방지: AsyncSession 병렬 쿼리 제한으로 인한 에러 방지
5.3 시나리오 3: 복합 검색 (트레이드오프 분석)
# 방법 A: 순차 실행 (단순, 안전)
async def search_sequential(session: AsyncSession, keyword: str):
items = await session.execute(items_query)
count = await session.execute(count_query)
categories = await session.execute(category_query)
price_range = await session.execute(price_query)
brands = await session.execute(brand_query)
return items, count, categories, price_range, brands
# 예상 시간: ~350ms (70ms × 5)
# 방법 B: 별도 세션 병렬 실행 (빠름, 복잡)
async def search_parallel(keyword: str):
async def fetch_items():
async with AsyncSessionLocal() as s:
return await s.execute(items_query)
async def fetch_count():
async with AsyncSessionLocal() as s:
return await s.execute(count_query)
# ... 나머지 함수들 ...
return await asyncio.gather(
fetch_items(),
fetch_count(),
fetch_categories(),
fetch_price_range(),
fetch_brands(),
)
# 예상 시간: ~80ms
# 결정 기준:
# - 검색 API가 자주 호출되고 응답 시간이 중요하다면 → 방법 B
# - 안정성이 우선이고 복잡도를 낮추고 싶다면 → 방법 A
# - 커넥션 풀 여유가 없다면 → 방법 A
6. 성능 측정 및 모니터링
6.1 실행 시간 측정 데코레이터
import time
import functools
import asyncio
from typing import Callable, TypeVar
T = TypeVar("T")
def measure_time(func: Callable[..., T]) -> Callable[..., T]:
"""함수 실행 시간을 측정하는 데코레이터"""
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return await func(*args, **kwargs)
finally:
elapsed = (time.perf_counter() - start) * 1000
print(f"[{func.__name__}] Execution time: {elapsed:.2f}ms")
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return func(*args, **kwargs)
finally:
elapsed = (time.perf_counter() - start) * 1000
print(f"[{func.__name__}] Execution time: {elapsed:.2f}ms")
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
# 사용 예
@measure_time
async def fetch_data(task_id: str):
...
6.2 병렬 vs 순차 성능 비교 유틸리티
import asyncio
import time
async def compare_execution_methods(
task_id: str,
query_funcs: list[Callable],
) -> dict:
"""순차 실행과 병렬 실행(별도 세션)의 성능을 비교합니다."""
# 순차 실행 (단일 세션)
sequential_start = time.perf_counter()
async with AsyncSessionLocal() as session:
sequential_results = []
for func in query_funcs:
result = await func(session, task_id)
sequential_results.append(result)
sequential_time = (time.perf_counter() - sequential_start) * 1000
# 병렬 실행 (별도 세션)
parallel_start = time.perf_counter()
async def run_with_session(func):
async with AsyncSessionLocal() as session:
return await func(session, task_id)
parallel_results = await asyncio.gather(*[
run_with_session(func) for func in query_funcs
])
parallel_time = (time.perf_counter() - parallel_start) * 1000
improvement = ((sequential_time - parallel_time) / sequential_time) * 100
return {
"sequential_time_ms": round(sequential_time, 2),
"parallel_time_ms": round(parallel_time, 2),
"improvement_percent": round(improvement, 1),
"query_count": len(query_funcs),
}
6.3 SQLAlchemy 쿼리 로깅
import logging
# SQLAlchemy 쿼리 로깅 활성화
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
# 또는 엔진 생성 시 echo=True
engine = create_async_engine(url, echo=True)
7. Best Practices
7.1 체크리스트
병렬화 적용 전 확인사항:
- 쿼리들이 서로 독립적인가? (결과 의존성 없음)
- 모든 쿼리가 READ 작업인가? (또는 순서 무관한 WRITE)
- 별도 세션을 사용할 것인가? (AsyncSession 제한사항)
- 커넥션 풀 크기가 충분한가?
- 에러 처리 전략이 수립되어 있는가?
- 타임아웃 설정이 적절한가?
- 트랜잭션 일관성이 필요한가?
7.2 권장 패턴
# ✅ 패턴 1: 순차 실행 (단순하고 안전)
async def fetch_data(session: AsyncSession, task_id: str):
project = await session.execute(project_query)
song = await session.execute(song_query)
return project, song
# ✅ 패턴 2: 별도 세션으로 병렬 실행 (성능 중요 시)
async def fetch_data_parallel(task_id: str):
async def get_project():
async with AsyncSessionLocal() as s:
return await s.execute(project_query)
async def get_song():
async with AsyncSessionLocal() as s:
return await s.execute(song_query)
return await asyncio.gather(get_project(), get_song())
7.3 피해야 할 패턴
# ❌ 절대 금지: 단일 세션에서 asyncio.gather()
async with AsyncSessionLocal() as session:
results = await asyncio.gather(
session.execute(query1),
session.execute(query2),
)
# 에러 발생: InvalidRequestError - Method 'close()' can't be called here
# ❌ 피하기: 과도한 병렬화 (커넥션 고갈)
# 100개 쿼리를 동시에 실행하면 커넥션 풀 고갈 위험
results = await asyncio.gather(*[fetch() for _ in range(100)])
# ✅ 해결: 배치 처리
BATCH_SIZE = 10
for i in range(0, len(items), BATCH_SIZE):
batch = items[i:i + BATCH_SIZE]
results = await asyncio.gather(*[fetch(item) for item in batch])
7.4 결정 가이드
┌─────────────────────────────────────────────────────────────────────────┐
│ 병렬화 결정 플로우차트 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 쿼리 개수가 3개 이하인가? │
│ │ │
│ ├── Yes ──► 순차 실행 (복잡도 대비 이득 적음) │
│ │ │
│ └── No ──► 각 쿼리가 50ms 이상 걸리는가? │
│ │ │
│ ├── No ──► 순차 실행 (이득 적음) │
│ │ │
│ └── Yes ──► 트랜잭션 일관성이 필요한가? │
│ │ │
│ ├── Yes ──► 순차 실행 │
│ │ (단일 세션) │
│ │ │
│ └── No ──► 커넥션 풀 여유? │
│ │ │
│ ├── No ──► │
│ │ 순차 실행 │
│ │ │
│ └── Yes ──► │
│ 병렬 실행 │
│ (별도세션) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
7.5 성능 최적화 팁
- 인덱스 확인: 병렬화해도 인덱스 없으면 느림
- 쿼리 최적화 우선: 병렬화 전에 개별 쿼리 최적화
- 적절한 병렬 수준: 보통 3-10개가 적절
- 모니터링 필수: 실제 개선 효과 측정
- 커넥션 풀 모니터링: 병렬 실행 시 풀 사용량 확인
부록: 관련 자료
변경 이력
| 날짜 | 변경 내용 |
|---|---|
| 2024-12 | AsyncSession 병렬 쿼리 제한사항 섹션 추가 (실제 프로젝트 에러 사례 포함) |
| 2024-12 | 잘못된 병렬 쿼리 예제 수정 |
| 2024-12 | 결정 플로우차트 추가 |