o2o-castad-backend/docs/analysis/db_쿼리_병렬화.md

38 KiB
Raw Permalink Blame History

DB 쿼리 병렬화 (Query Parallelization) 완벽 가이드

목적: Python asyncio와 SQLAlchemy를 활용한 DB 쿼리 병렬화의 이론부터 실무 적용까지 대상: 비동기 프로그래밍 기초 지식이 있는 백엔드 개발자 환경: Python 3.11+, SQLAlchemy 2.0+, FastAPI 최종 수정: 2024-12 (AsyncSession 병렬 쿼리 제한사항 추가)


목차

  1. 이론적 배경
  2. 핵심 개념
  3. SQLAlchemy AsyncSession 병렬 쿼리 제한사항 ⚠️ 중요
  4. 설계 시 주의사항
  5. 실무 시나리오 예제
  6. 성능 측정 및 모니터링
  7. Best Practices

1. 이론적 배경

1.1 동기 vs 비동기 실행

[순차 실행 - Sequential]
Query A ──────────▶ (100ms)
                    Query B ──────────▶ (100ms)
                                        Query C ──────────▶ (100ms)
총 소요시간: 300ms

[병렬 실행 - Parallel]
Query A ──────────▶ (100ms)
Query B ──────────▶ (100ms)
Query C ──────────▶ (100ms)
총 소요시간: ~100ms (가장 느린 쿼리 기준)

1.2 왜 병렬화가 필요한가?

  1. I/O 바운드 작업의 특성

    • DB 쿼리는 네트워크 I/O가 대부분 (실제 CPU 작업은 짧음)
    • 대기 시간 동안 다른 작업을 수행할 수 있음
  2. 응답 시간 단축

    • N개의 독립적인 쿼리: O(sum)O(max)
    • 사용자 경험 개선
  3. 리소스 효율성

    • 커넥션 풀을 효율적으로 활용
    • 서버 처리량(throughput) 증가

1.3 asyncio.gather()의 동작 원리

import asyncio

async def main():
    # gather()는 모든 코루틴을 동시에 스케줄링
    results = await asyncio.gather(
        coroutine_1(),  # Task 1 생성
        coroutine_2(),  # Task 2 생성
        coroutine_3(),  # Task 3 생성
    )
    # 모든 Task가 완료되면 결과를 리스트로 반환
    return results

핵심 동작:

  1. gather()는 각 코루틴을 Task로 래핑
  2. 이벤트 루프가 모든 Task를 동시에 실행
  3. I/O 대기 시 다른 Task로 컨텍스트 스위칭
  4. 모든 Task 완료 시 결과 반환

2. 핵심 개념

2.1 독립성 판단 기준

병렬화가 가능한 쿼리의 조건:

조건 설명 예시
데이터 독립성 쿼리 간 결과 의존성 없음 User, Product, Order 각각 조회
트랜잭션 독립성 같은 트랜잭션 내 순서 무관 READ 작업들
비즈니스 독립성 결과 순서가 로직에 영향 없음 대시보드 데이터 조회

2.2 병렬화 불가능한 경우

# ❌ 잘못된 예: 의존성이 있는 쿼리
user = await session.execute(select(User).where(User.id == user_id))
# orders 쿼리는 user.id에 의존 → 병렬화 불가
orders = await session.execute(
    select(Order).where(Order.user_id == user.id)
)
# ❌ 잘못된 예: 쓰기 후 읽기 (Write-then-Read)
await session.execute(insert(User).values(name="John"))
# 방금 생성된 데이터를 조회 → 순차 실행 필요
new_user = await session.execute(select(User).where(User.name == "John"))

3. SQLAlchemy AsyncSession 병렬 쿼리 제한사항

⚠️ 3.1 중요: 단일 AsyncSession에서 병렬 쿼리는 지원되지 않습니다

이전에 잘못 알려진 내용:

# ❌ 이 코드는 실제로 작동하지 않습니다!
async with AsyncSessionLocal() as session:
    results = await asyncio.gather(
        session.execute(query1),
        session.execute(query2),
        session.execute(query3),
    )

3.2 실제 발생하는 에러

위 코드를 실행하면 다음과 같은 에러가 발생합니다:

sqlalchemy.exc.InvalidRequestError:
Method 'close()' can't be called here; method '_connection_for_bind()'
is already in progress and this would cause an unexpected state change
to <SessionTransactionState.CLOSED: 5>

(Background on this error at: https://sqlalche.me/e/20/isce)

3.3 에러 발생 원인 분석

┌─────────────────────────────────────────────────────────────────────────┐
│                    AsyncSession 내부 상태 충돌                            │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  AsyncSession은 내부적으로 하나의 Connection을 관리합니다.                  │
│  asyncio.gather()로 여러 쿼리를 동시에 실행하면:                           │
│                                                                         │
│  Time ──────────────────────────────────────────────────────────►       │
│                                                                         │
│  Task 1: session.execute(query1)                                        │
│          └── _connection_for_bind() 시작 ──► 연결 획득 중...              │
│                                                                         │
│  Task 2: session.execute(query2)                                        │
│          └── _connection_for_bind() 시작 ──► ⚠️ 충돌!                    │
│              (이미 Task 1이 연결 작업 중)                                 │
│                                                                         │
│  Task 3: session.execute(query3)                                        │
│          └── _connection_for_bind() 시작 ──► ⚠️ 충돌!                    │
│                                                                         │
│  결과: InvalidRequestError 발생                                          │
│                                                                         │
│  ─────────────────────────────────────────────────────────────────────  │
│                                                                         │
│  핵심 원인:                                                              │
│  1. AsyncSession은 단일 연결(Connection)을 사용                          │
│  2. 연결 상태 전이(state transition)가 순차적으로만 가능                   │
│  3. 동시에 여러 작업이 상태 전이를 시도하면 충돌 발생                       │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

3.4 SQLAlchemy 공식 문서의 설명

SQLAlchemy 2.0 문서에 따르면:

The AsyncSession object is a mutable, stateful object which represents a single, stateful database transaction in progress. Using concurrent tasks with asyncio, with APIs such as asyncio.gather() for example, should use a separate AsyncSession per individual task.

번역: AsyncSession 객체는 진행 중인 단일 데이터베이스 트랜잭션을 나타내는 변경 가능한 상태 객체입니다. asyncio.gather() 같은 API로 동시 작업을 수행할 때는 각 작업마다 별도의 AsyncSession을 사용해야 합니다.

3.5 본 프로젝트에서 발생한 실제 사례

문제가 발생한 코드 (video.py):

async def generate_video(task_id: str, ...):
    async with AsyncSessionLocal() as session:
        # ❌ 단일 세션에서 asyncio.gather() 사용 - 에러 발생!
        project_result, lyric_result, song_result, image_result = (
            await asyncio.gather(
                session.execute(project_query),
                session.execute(lyric_query),
                session.execute(song_query),
                session.execute(image_query),
            )
        )

프론트엔드에서 표시된 에러:

Method 'close()' can't be called here; method '_connection_for_bind()'
is already in progress and this would cause an unexpected state change
to <SessionTransactionState.CLOSED: 5>

3.6 해결 방법

방법 1: 순차 실행 (권장 - 단순하고 안전)

# ✅ 올바른 방법: 순차 실행
async def generate_video(task_id: str, ...):
    async with AsyncSessionLocal() as session:
        # 순차적으로 쿼리 실행 (가장 안전)
        project_result = await session.execute(project_query)
        lyric_result = await session.execute(lyric_query)
        song_result = await session.execute(song_query)
        image_result = await session.execute(image_query)

장점:

  • 구현이 단순함
  • 에러 처리가 명확함
  • 트랜잭션 관리가 쉬움

단점:

  • 총 실행 시간 = 각 쿼리 시간의 합

방법 2: 별도 세션으로 병렬 실행 (성능 중요 시)

# ✅ 올바른 방법: 각 쿼리마다 별도 세션 사용
async def fetch_with_separate_sessions(task_id: str):

    async def get_project():
        async with AsyncSessionLocal() as session:
            result = await session.execute(
                select(Project).where(Project.task_id == task_id)
            )
            return result.scalar_one_or_none()

    async def get_lyric():
        async with AsyncSessionLocal() as session:
            result = await session.execute(
                select(Lyric).where(Lyric.task_id == task_id)
            )
            return result.scalar_one_or_none()

    async def get_song():
        async with AsyncSessionLocal() as session:
            result = await session.execute(
                select(Song).where(Song.task_id == task_id)
            )
            return result.scalar_one_or_none()

    async def get_images():
        async with AsyncSessionLocal() as session:
            result = await session.execute(
                select(Image).where(Image.task_id == task_id)
            )
            return result.scalars().all()

    # 별도 세션이므로 병렬 실행 가능!
    project, lyric, song, images = await asyncio.gather(
        get_project(),
        get_lyric(),
        get_song(),
        get_images(),
    )

    return project, lyric, song, images

장점:

  • 진정한 병렬 실행
  • 총 실행 시간 = 가장 느린 쿼리 시간

단점:

  • 커넥션 풀에서 여러 연결을 동시에 사용
  • 각 쿼리가 별도 트랜잭션 (일관성 주의)
  • 코드가 복잡해짐

방법 3: 유틸리티 함수로 추상화

from typing import TypeVar, Callable, Any
import asyncio

T = TypeVar('T')


async def parallel_queries(
    queries: list[tuple[Callable, dict]],
) -> list[Any]:
    """
    여러 쿼리를 별도 세션으로 병렬 실행합니다.

    Args:
        queries: [(query_func, kwargs), ...] 형태의 리스트

    Returns:
        각 쿼리의 결과 리스트

    Example:
        results = await parallel_queries([
            (get_project, {"task_id": task_id}),
            (get_song, {"task_id": task_id}),
        ])
    """
    async def execute_with_session(query_func, kwargs):
        async with AsyncSessionLocal() as session:
            return await query_func(session, **kwargs)

    return await asyncio.gather(*[
        execute_with_session(func, kwargs)
        for func, kwargs in queries
    ])


# 사용 예시
async def get_project(session, task_id: str):
    result = await session.execute(
        select(Project).where(Project.task_id == task_id)
    )
    return result.scalar_one_or_none()


async def get_song(session, task_id: str):
    result = await session.execute(
        select(Song).where(Song.task_id == task_id)
    )
    return result.scalar_one_or_none()


# 병렬 실행
project, song = await parallel_queries([
    (get_project, {"task_id": "abc123"}),
    (get_song, {"task_id": "abc123"}),
])

3.7 성능 비교: 순차 vs 별도 세션 병렬

┌─────────────────────────────────────────────────────────────────────────┐
│                    성능 비교 (4개 쿼리, 각 50ms)                          │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  [순차 실행 - 단일 세션]                                                 │
│  ───────────────────────                                                │
│  Query 1 ─────▶ (50ms)                                                  │
│                  Query 2 ─────▶ (50ms)                                  │
│                                  Query 3 ─────▶ (50ms)                  │
│                                                  Query 4 ─────▶ (50ms)  │
│  총 소요시간: ~200ms                                                     │
│  커넥션 사용: 1개                                                        │
│                                                                         │
│  [병렬 실행 - 별도 세션]                                                 │
│  ───────────────────────                                                │
│  Session 1: Query 1 ─────▶ (50ms)                                       │
│  Session 2: Query 2 ─────▶ (50ms)                                       │
│  Session 3: Query 3 ─────▶ (50ms)                                       │
│  Session 4: Query 4 ─────▶ (50ms)                                       │
│  총 소요시간: ~55ms                                                      │
│  커넥션 사용: 4개 (동시)                                                 │
│                                                                         │
│  ─────────────────────────────────────────────────────────────────────  │
│                                                                         │
│  결론:                                                                   │
│  - 성능 개선: 약 72% (200ms → 55ms)                                      │
│  - 대가: 커넥션 풀 사용량 4배 증가                                        │
│  - 트레이드오프 고려 필요                                                 │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

3.8 언제 병렬 실행을 선택해야 하는가?

상황 권장 방식 이유
쿼리 수 ≤ 3개 순차 실행 복잡도 대비 성능 이득 적음
쿼리 수 > 3개, 각 쿼리 > 50ms 병렬 실행 유의미한 성능 개선
트랜잭션 일관성 필요 순차 실행 별도 세션은 별도 트랜잭션
커넥션 풀 여유 없음 순차 실행 풀 고갈 위험
실시간 응답 중요 (API) 상황에 따라 사용자 경험 우선
백그라운드 작업 순차 실행 안정성 우선

3.9 커넥션 풀 고려사항

# 엔진 설정 시 병렬 쿼리를 고려한 풀 크기 설정
engine = create_async_engine(
    url=db_url,
    pool_size=20,           # 기본 풀 크기
    max_overflow=20,        # 추가 연결 허용
    pool_timeout=30,        # 풀 대기 타임아웃
)

# 계산 예시:
# - 동시 API 요청: 10개
# - 요청당 병렬 쿼리: 4개
# - 필요 커넥션: 10 × 4 = 40개
# - 설정: pool_size(20) + max_overflow(20) = 40개 ✅

4. 설계 시 주의사항

4.1 커넥션 풀 크기 설정

# SQLAlchemy 엔진 설정
engine = create_async_engine(
    url=db_url,
    pool_size=20,        # 기본 풀 크기
    max_overflow=20,     # 추가 연결 허용 수
    pool_timeout=30,     # 풀에서 연결 대기 시간
    pool_recycle=3600,   # 연결 재생성 주기
    pool_pre_ping=True,  # 연결 유효성 검사
)

풀 크기 계산 공식:

필요 커넥션 수 = 동시 요청 수 × 요청당 병렬 쿼리 수

예: 동시 10개 요청, 각 요청당 4개 병렬 쿼리 → 최소 40개 커넥션 필요 (pool_size + max_overflow >= 40)

4.2 에러 처리 전략

import asyncio

# 방법 1: return_exceptions=True (권장)
results = await asyncio.gather(
    fetch_with_session_1(),
    fetch_with_session_2(),
    fetch_with_session_3(),
    return_exceptions=True,  # 예외를 결과로 반환
)

# 결과 처리
for i, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Query {i} failed: {result}")
    else:
        print(f"Query {i} succeeded: {result}")
# 방법 2: 개별 try-except 래핑
async def safe_fetch(query_func, **kwargs):
    try:
        async with AsyncSessionLocal() as session:
            return await query_func(session, **kwargs)
    except Exception as e:
        print(f"Query failed: {e}")
        return None

results = await asyncio.gather(
    safe_fetch(get_project, task_id=task_id),
    safe_fetch(get_song, task_id=task_id),
    safe_fetch(get_images, task_id=task_id),
)

4.3 타임아웃 설정

import asyncio

async def fetch_with_timeout(query_func, timeout_seconds: float, **kwargs):
    """타임아웃이 있는 쿼리 실행"""
    try:
        return await asyncio.wait_for(
            query_func(**kwargs),
            timeout=timeout_seconds
        )
    except asyncio.TimeoutError:
        raise Exception(f"Query timed out after {timeout_seconds}s")

# 사용 예
results = await asyncio.gather(
    fetch_with_timeout(get_project, 5.0, task_id=task_id),
    fetch_with_timeout(get_song, 5.0, task_id=task_id),
    fetch_with_timeout(get_images, 10.0, task_id=task_id),  # 더 긴 타임아웃
)

4.4 N+1 문제와 병렬화

# ❌ N+1 문제 발생 코드
videos = await session.execute(select(Video))
for video in videos.scalars():
    # N번의 추가 쿼리 발생!
    project = await session.execute(
        select(Project).where(Project.id == video.project_id)
    )

# ✅ 해결 방법 1: JOIN 사용
query = select(Video).options(selectinload(Video.project))
videos = await session.execute(query)

# ✅ 해결 방법 2: IN 절로 배치 조회
video_list = videos.scalars().all()
project_ids = [v.project_id for v in video_list if v.project_id]

projects_result = await session.execute(
    select(Project).where(Project.id.in_(project_ids))
)
projects_map = {p.id: p for p in projects_result.scalars().all()}

4.5 트랜잭션 격리 수준 고려

격리 수준 병렬 쿼리 안전성 설명
READ UNCOMMITTED ⚠️ 주의 Dirty Read 가능
READ COMMITTED 안전 대부분의 경우 적합
REPEATABLE READ 안전 일관된 스냅샷
SERIALIZABLE 안전 성능 저하 가능

5. 실무 시나리오 예제

5.1 시나리오 1: 대시보드 데이터 조회 (별도 세션 병렬)

요구사항: 사용자 대시보드에 필요한 여러 통계 데이터를 한 번에 조회

from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio


async def get_user(session: AsyncSession, user_id: int):
    result = await session.execute(
        select(User).where(User.id == user_id)
    )
    return result.scalar_one_or_none()


async def get_recent_orders(session: AsyncSession, user_id: int):
    result = await session.execute(
        select(Order)
        .where(Order.user_id == user_id)
        .order_by(Order.created_at.desc())
        .limit(5)
    )
    return result.scalars().all()


async def get_total_amount(session: AsyncSession, user_id: int):
    result = await session.execute(
        select(func.sum(Order.amount))
        .where(Order.user_id == user_id)
    )
    return result.scalar() or 0


async def get_wishlist_count(session: AsyncSession, user_id: int):
    result = await session.execute(
        select(func.count(Wishlist.id))
        .where(Wishlist.user_id == user_id)
    )
    return result.scalar() or 0


async def get_dashboard_data(user_id: int) -> dict:
    """
    대시보드에 필요한 모든 데이터를 병렬로 조회합니다.
    각 쿼리는 별도의 세션을 사용합니다.
    """

    async def fetch_user():
        async with AsyncSessionLocal() as session:
            return await get_user(session, user_id)

    async def fetch_orders():
        async with AsyncSessionLocal() as session:
            return await get_recent_orders(session, user_id)

    async def fetch_amount():
        async with AsyncSessionLocal() as session:
            return await get_total_amount(session, user_id)

    async def fetch_wishlist():
        async with AsyncSessionLocal() as session:
            return await get_wishlist_count(session, user_id)

    # 4개 쿼리를 별도 세션으로 병렬 실행
    user, orders, total_amount, wishlist_count = await asyncio.gather(
        fetch_user(),
        fetch_orders(),
        fetch_amount(),
        fetch_wishlist(),
    )

    if not user:
        raise ValueError(f"User {user_id} not found")

    return {
        "user": {"id": user.id, "name": user.name, "email": user.email},
        "recent_orders": [
            {"id": o.id, "amount": o.amount, "status": o.status}
            for o in orders
        ],
        "total_spent": total_amount,
        "wishlist_count": wishlist_count,
    }


# 사용 예시 (FastAPI)
@router.get("/dashboard")
async def dashboard(user_id: int):
    return await get_dashboard_data(user_id)

성능 비교:

  • 순차 실행: ~200ms (50ms × 4)
  • 병렬 실행: ~60ms (가장 느린 쿼리 기준)
  • 개선율: 약 70%

5.2 시나리오 2: 영상 생성 데이터 조회 (순차 실행 - 권장)

요구사항: 영상 생성을 위해 Project, Lyric, Song, Image 데이터를 조회

본 프로젝트에서 실제로 적용된 패턴입니다.

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from dataclasses import dataclass
from fastapi import HTTPException


@dataclass
class VideoGenerationData:
    """영상 생성에 필요한 데이터"""
    project_id: int
    lyric_id: int
    song_id: int
    music_url: str
    song_duration: float
    lyrics: str
    image_urls: list[str]


async def generate_video(
    task_id: str,
    orientation: str = "vertical",
) -> GenerateVideoResponse:
    """
    Creatomate API를 통해 영상을 생성합니다.

    중요: SQLAlchemy AsyncSession은 단일 세션에서 동시에 여러 쿼리를 실행하는 것을
    지원하지 않습니다. asyncio.gather()로 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다.
    따라서 쿼리는 순차적으로 실행합니다.
    """
    from app.database.session import AsyncSessionLocal

    print(f"[generate_video] START - task_id: {task_id}")

    # 외부 API 호출 전에 필요한 데이터를 저장할 변수들
    project_id: int | None = None
    lyric_id: int | None = None
    song_id: int | None = None
    video_id: int | None = None
    music_url: str | None = None
    song_duration: float | None = None
    lyrics: str | None = None
    image_urls: list[str] = []

    try:
        # 세션을 명시적으로 열고 DB 작업 후 바로 닫음
        async with AsyncSessionLocal() as session:
            # ===== 순차 쿼리 실행: Project, Lyric, Song, Image =====
            # Note: AsyncSession은 동일 세션에서 병렬 쿼리를 지원하지 않음

            # Project 조회
            project_result = await session.execute(
                select(Project)
                .where(Project.task_id == task_id)
                .order_by(Project.created_at.desc())
                .limit(1)
            )

            # Lyric 조회
            lyric_result = await session.execute(
                select(Lyric)
                .where(Lyric.task_id == task_id)
                .order_by(Lyric.created_at.desc())
                .limit(1)
            )

            # Song 조회
            song_result = await session.execute(
                select(Song)
                .where(Song.task_id == task_id)
                .order_by(Song.created_at.desc())
                .limit(1)
            )

            # Image 조회
            image_result = await session.execute(
                select(Image)
                .where(Image.task_id == task_id)
                .order_by(Image.img_order.asc())
            )

            print(f"[generate_video] Queries completed - task_id: {task_id}")

            # 결과 처리 및 검증
            project = project_result.scalar_one_or_none()
            if not project:
                raise HTTPException(status_code=404, detail="Project not found")
            project_id = project.id

            lyric = lyric_result.scalar_one_or_none()
            if not lyric:
                raise HTTPException(status_code=404, detail="Lyric not found")
            lyric_id = lyric.id

            song = song_result.scalar_one_or_none()
            if not song:
                raise HTTPException(status_code=404, detail="Song not found")
            song_id = song.id
            music_url = song.song_result_url
            song_duration = song.duration
            lyrics = song.song_prompt

            images = image_result.scalars().all()
            if not images:
                raise HTTPException(status_code=404, detail="Images not found")
            image_urls = [img.img_url for img in images]

            # Video 레코드 생성 및 커밋
            video = Video(
                project_id=project_id,
                lyric_id=lyric_id,
                song_id=song_id,
                task_id=task_id,
                status="processing",
            )
            session.add(video)
            await session.commit()
            video_id = video.id

        # 세션 종료 후 외부 API 호출 (커넥션 타임아웃 방지)
        # ... Creatomate API 호출 로직 ...

    except HTTPException:
        raise
    except Exception as e:
        print(f"[generate_video] EXCEPTION - {e}")
        raise

이 패턴을 선택한 이유:

  1. 안정성: 단일 세션 내에서 모든 쿼리 실행으로 트랜잭션 일관성 보장
  2. 단순성: 코드가 명확하고 디버깅이 쉬움
  3. 충분한 성능: 4개의 간단한 쿼리는 순차 실행해도 ~200ms 이내
  4. 에러 방지: AsyncSession 병렬 쿼리 제한으로 인한 에러 방지

5.3 시나리오 3: 복합 검색 (트레이드오프 분석)

# 방법 A: 순차 실행 (단순, 안전)
async def search_sequential(session: AsyncSession, keyword: str):
    items = await session.execute(items_query)
    count = await session.execute(count_query)
    categories = await session.execute(category_query)
    price_range = await session.execute(price_query)
    brands = await session.execute(brand_query)
    return items, count, categories, price_range, brands
# 예상 시간: ~350ms (70ms × 5)


# 방법 B: 별도 세션 병렬 실행 (빠름, 복잡)
async def search_parallel(keyword: str):
    async def fetch_items():
        async with AsyncSessionLocal() as s:
            return await s.execute(items_query)

    async def fetch_count():
        async with AsyncSessionLocal() as s:
            return await s.execute(count_query)

    # ... 나머지 함수들 ...

    return await asyncio.gather(
        fetch_items(),
        fetch_count(),
        fetch_categories(),
        fetch_price_range(),
        fetch_brands(),
    )
# 예상 시간: ~80ms


# 결정 기준:
# - 검색 API가 자주 호출되고 응답 시간이 중요하다면 → 방법 B
# - 안정성이 우선이고 복잡도를 낮추고 싶다면 → 방법 A
# - 커넥션 풀 여유가 없다면 → 방법 A

6. 성능 측정 및 모니터링

6.1 실행 시간 측정 데코레이터

import time
import functools
import asyncio
from typing import Callable, TypeVar

T = TypeVar("T")


def measure_time(func: Callable[..., T]) -> Callable[..., T]:
    """함수 실행 시간을 측정하는 데코레이터"""

    @functools.wraps(func)
    async def async_wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            return await func(*args, **kwargs)
        finally:
            elapsed = (time.perf_counter() - start) * 1000
            print(f"[{func.__name__}] Execution time: {elapsed:.2f}ms")

    @functools.wraps(func)
    def sync_wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            return func(*args, **kwargs)
        finally:
            elapsed = (time.perf_counter() - start) * 1000
            print(f"[{func.__name__}] Execution time: {elapsed:.2f}ms")

    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    return sync_wrapper


# 사용 예
@measure_time
async def fetch_data(task_id: str):
    ...

6.2 병렬 vs 순차 성능 비교 유틸리티

import asyncio
import time


async def compare_execution_methods(
    task_id: str,
    query_funcs: list[Callable],
) -> dict:
    """순차 실행과 병렬 실행(별도 세션)의 성능을 비교합니다."""

    # 순차 실행 (단일 세션)
    sequential_start = time.perf_counter()
    async with AsyncSessionLocal() as session:
        sequential_results = []
        for func in query_funcs:
            result = await func(session, task_id)
            sequential_results.append(result)
    sequential_time = (time.perf_counter() - sequential_start) * 1000

    # 병렬 실행 (별도 세션)
    parallel_start = time.perf_counter()

    async def run_with_session(func):
        async with AsyncSessionLocal() as session:
            return await func(session, task_id)

    parallel_results = await asyncio.gather(*[
        run_with_session(func) for func in query_funcs
    ])
    parallel_time = (time.perf_counter() - parallel_start) * 1000

    improvement = ((sequential_time - parallel_time) / sequential_time) * 100

    return {
        "sequential_time_ms": round(sequential_time, 2),
        "parallel_time_ms": round(parallel_time, 2),
        "improvement_percent": round(improvement, 1),
        "query_count": len(query_funcs),
    }

6.3 SQLAlchemy 쿼리 로깅

import logging

# SQLAlchemy 쿼리 로깅 활성화
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)

# 또는 엔진 생성 시 echo=True
engine = create_async_engine(url, echo=True)

7. Best Practices

7.1 체크리스트

병렬화 적용 전 확인사항:

  • 쿼리들이 서로 독립적인가? (결과 의존성 없음)
  • 모든 쿼리가 READ 작업인가? (또는 순서 무관한 WRITE)
  • 별도 세션을 사용할 것인가? (AsyncSession 제한사항)
  • 커넥션 풀 크기가 충분한가?
  • 에러 처리 전략이 수립되어 있는가?
  • 타임아웃 설정이 적절한가?
  • 트랜잭션 일관성이 필요한가?

7.2 권장 패턴

# ✅ 패턴 1: 순차 실행 (단순하고 안전)
async def fetch_data(session: AsyncSession, task_id: str):
    project = await session.execute(project_query)
    song = await session.execute(song_query)
    return project, song


# ✅ 패턴 2: 별도 세션으로 병렬 실행 (성능 중요 시)
async def fetch_data_parallel(task_id: str):
    async def get_project():
        async with AsyncSessionLocal() as s:
            return await s.execute(project_query)

    async def get_song():
        async with AsyncSessionLocal() as s:
            return await s.execute(song_query)

    return await asyncio.gather(get_project(), get_song())

7.3 피해야 할 패턴

# ❌ 절대 금지: 단일 세션에서 asyncio.gather()
async with AsyncSessionLocal() as session:
    results = await asyncio.gather(
        session.execute(query1),
        session.execute(query2),
    )
# 에러 발생: InvalidRequestError - Method 'close()' can't be called here

# ❌ 피하기: 과도한 병렬화 (커넥션 고갈)
# 100개 쿼리를 동시에 실행하면 커넥션 풀 고갈 위험
results = await asyncio.gather(*[fetch() for _ in range(100)])

# ✅ 해결: 배치 처리
BATCH_SIZE = 10
for i in range(0, len(items), BATCH_SIZE):
    batch = items[i:i + BATCH_SIZE]
    results = await asyncio.gather(*[fetch(item) for item in batch])

7.4 결정 가이드

┌─────────────────────────────────────────────────────────────────────────┐
│                    병렬화 결정 플로우차트                                 │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  쿼리 개수가 3개 이하인가?                                               │
│        │                                                                │
│        ├── Yes ──► 순차 실행 (복잡도 대비 이득 적음)                     │
│        │                                                                │
│        └── No ──► 각 쿼리가 50ms 이상 걸리는가?                         │
│                         │                                               │
│                         ├── No ──► 순차 실행 (이득 적음)                │
│                         │                                               │
│                         └── Yes ──► 트랜잭션 일관성이 필요한가?          │
│                                           │                             │
│                                           ├── Yes ──► 순차 실행         │
│                                           │           (단일 세션)        │
│                                           │                             │
│                                           └── No ──► 커넥션 풀 여유?    │
│                                                           │             │
│                                                           ├── No ──►   │
│                                                           │   순차 실행 │
│                                                           │             │
│                                                           └── Yes ──►  │
│                                                               병렬 실행 │
│                                                               (별도세션) │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

7.5 성능 최적화 팁

  1. 인덱스 확인: 병렬화해도 인덱스 없으면 느림
  2. 쿼리 최적화 우선: 병렬화 전에 개별 쿼리 최적화
  3. 적절한 병렬 수준: 보통 3-10개가 적절
  4. 모니터링 필수: 실제 개선 효과 측정
  5. 커넥션 풀 모니터링: 병렬 실행 시 풀 사용량 확인

부록: 관련 자료


변경 이력

날짜 변경 내용
2024-12 AsyncSession 병렬 쿼리 제한사항 섹션 추가 (실제 프로젝트 에러 사례 포함)
2024-12 잘못된 병렬 쿼리 예제 수정
2024-12 결정 플로우차트 추가