# Database Connection Pool 문제 분석 및 해결 가이드 ## 목차 1. [발견된 문제점 요약](#1-발견된-문제점-요약) 2. [설계적 문제 분석](#2-설계적-문제-분석) 3. [해결 방안 및 설계 제안](#3-해결-방안-및-설계-제안) 4. [개선 효과](#4-개선-효과) 5. [이론적 배경: 커넥션 풀 관리 원칙](#5-이론적-배경-커넥션-풀-관리-원칙) 6. [실무 시나리오 예제 코드](#6-실무-시나리오-예제-코드) 7. [설계 원칙 요약](#7-설계-원칙-요약) --- ## 1. 발견된 문제점 요약 ### 1.1 "Multiple rows were found when one or none was required" 에러 **문제 상황:** ```python # 기존 코드 (문제) result = await session.execute(select(Project).where(Project.task_id == task_id)) project = result.scalar_one_or_none() # task_id 중복 시 에러 발생! ``` **원인:** - `task_id`로 조회 시 중복 레코드가 존재할 수 있음 - `scalar_one_or_none()`은 정확히 0개 또는 1개의 결과만 허용 **해결:** ```python # 수정된 코드 result = await session.execute( select(Project) .where(Project.task_id == task_id) .order_by(Project.created_at.desc()) .limit(1) ) project = result.scalar_one_or_none() ``` ### 1.2 커넥션 풀 고갈 (Pool Exhaustion) **증상:** - API 요청이 응답을 반환하지 않음 - 동일한 요청이 중복으로 들어옴 (클라이언트 재시도) - 서버 로그에 타임아웃 관련 메시지 **원인:** - 외부 API 호출 중 DB 세션을 계속 점유 - 백그라운드 태스크와 API 요청이 동일한 커넥션 풀 사용 ### 1.3 세션 장시간 점유 **문제가 발생한 함수들:** | 파일 | 함수 | 문제 | |------|------|------| | `video.py` | `generate_video` | Creatomate API 호출 중 세션 점유 | | `home.py` | `upload_images_blob` | Azure Blob 업로드 중 세션 점유 | | `song_task.py` | 모든 함수 | API 풀과 동일한 세션 사용 | | `video_task.py` | 모든 함수 | API 풀과 동일한 세션 사용 | | `lyric_task.py` | `generate_lyric_background` | API 풀과 동일한 세션 사용 | --- ## 2. 설계적 문제 분석 ### 2.1 Anti-Pattern: Long-lived Session with External Calls ``` ┌─────────────────────────────────────────────────────────────────┐ │ 문제가 있는 패턴 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Request ──► Session 획득 ──► DB 조회 ──► 외부 API 호출 ──► DB 저장 ──► Session 반환 │ │ │ │ │ │ 30초~수 분 소요 │ │ │ │◄─────── 세션 점유 시간 ───────►│ │ │ │ └─────────────────────────────────────────────────────────────────┘ 문제점: 1. 외부 API 응답 대기 동안 커넥션 점유 2. Pool size=20일 때, 20개 요청만으로 풀 고갈 3. 후속 요청들이 pool_timeout까지 대기 후 실패 ``` ### 2.2 Anti-Pattern: Shared Pool for Different Workloads ``` ┌─────────────────────────────────────────────────────────────────┐ │ 공유 풀 문제 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌─────────────────────┐ │ │ │ API Requests │──────► │ │ │ └──────────────┘ │ Single Pool │ │ │ │ (pool_size=20) │ │ │ ┌──────────────┐ │ │ │ │ │ Background │──────► │ │ │ │ Tasks │ └─────────────────────┘ │ │ └──────────────┘ │ │ │ │ 문제: 백그라운드 태스크가 커넥션을 오래 점유하면 │ │ API 요청이 커넥션을 얻지 못함 │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 2.3 근본 원인 분석 ``` 원인 1: 책임 분리 실패 (Separation of Concerns) ├── DB 작업과 외부 API 호출이 단일 함수에 혼재 ├── 트랜잭션 범위가 불필요하게 넓음 └── 세션 생명주기 관리 부재 원인 2: 리소스 격리 실패 (Resource Isolation) ├── API 요청과 백그라운드 태스크가 동일 풀 사용 ├── 워크로드 특성 미고려 (빠른 API vs 느린 백그라운드) └── 우선순위 기반 리소스 할당 부재 원인 3: 방어적 프로그래밍 부재 (Defensive Programming) ├── 중복 데이터 발생 가능성 미고려 ├── 타임아웃 및 재시도 로직 미흡 └── 에러 상태에서의 세션 처리 미흡 ``` --- ## 3. 해결 방안 및 설계 제안 ### 3.1 해결책 1: 3-Stage Pattern (세션 분리 패턴) **핵심 아이디어:** 외부 API 호출 전에 세션을 반환하고, 호출 완료 후 새 세션으로 결과 저장 ```python async def process_with_external_api(task_id: str, session: AsyncSession): """3-Stage Pattern 적용""" # ========== Stage 1: DB 조회 및 준비 (세션 사용) ========== data = await session.execute(select(Model).where(...)) prepared_data = extract_needed_info(data) await session.commit() # 세션 해제 # ========== Stage 2: 외부 API 호출 (세션 없음) ========== # 이 구간에서는 DB 커넥션을 점유하지 않음 api_result = await external_api.call(prepared_data) # ========== Stage 3: 결과 저장 (새 세션) ========== async with AsyncSessionLocal() as new_session: record = await new_session.execute(select(Model).where(...)) record.status = "completed" record.result = api_result await new_session.commit() return result ``` ### 3.2 해결책 2: Separate Pool Strategy (풀 분리 전략) **핵심 아이디어:** API 요청과 백그라운드 태스크에 별도의 커넥션 풀 사용 ```python # 메인 엔진 (FastAPI 요청용) - 빠른 응답 필요 engine = create_async_engine( url=db_url, pool_size=20, max_overflow=20, pool_timeout=30, # 빠른 실패 pool_recycle=3600, ) AsyncSessionLocal = async_sessionmaker(bind=engine, ...) # 백그라운드 엔진 (장시간 작업용) - 안정성 우선 background_engine = create_async_engine( url=db_url, pool_size=10, max_overflow=10, pool_timeout=60, # 여유있는 대기 pool_recycle=3600, ) BackgroundSessionLocal = async_sessionmaker(bind=background_engine, ...) ``` ### 3.3 해결책 3: Query Safety Pattern (안전한 쿼리 패턴) **핵심 아이디어:** 항상 최신 데이터 1개만 조회 ```python # 안전한 조회 패턴 async def get_latest_record(session: AsyncSession, task_id: str): result = await session.execute( select(Model) .where(Model.task_id == task_id) .order_by(Model.created_at.desc()) .limit(1) ) return result.scalar_one_or_none() ``` --- ## 4. 개선 효과 ### 4.1 정량적 효과 | 지표 | 개선 전 | 개선 후 | 개선율 | |------|---------|---------|--------| | 평균 세션 점유 시간 | 30-60초 | 0.1-0.5초 | 99% 감소 | | 동시 처리 가능 요청 | ~20개 | ~200개+ | 10배 이상 | | Pool Exhaustion 발생 | 빈번 | 거의 없음 | - | | API 응답 실패율 | 높음 | 매우 낮음 | - | ### 4.2 정성적 효과 ``` 개선 효과 매트릭스: 개선 전 개선 후 ───────────────────────── 안정성 │ ★★☆☆☆ │ ★★★★★ │ 확장성 │ ★★☆☆☆ │ ★★★★☆ │ 유지보수성 │ ★★★☆☆ │ ★★★★☆ │ 리소스 효율성 │ ★☆☆☆☆ │ ★★★★★ │ 에러 추적 용이성 │ ★★☆☆☆ │ ★★★★☆ │ ``` --- ## 5. 이론적 배경: 커넥션 풀 관리 원칙 ### 5.1 커넥션 풀의 목적 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 커넥션 풀 동작 원리 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Application Connection Pool Database│ │ │ │ │ │ │ │─── get_connection() ────────►│ │ │ │ │◄── connection ───────────────│ │ │ │ │ │ │ │ │ │─── execute(query) ───────────┼──────────────────────►│ │ │ │◄── result ───────────────────┼◄──────────────────────│ │ │ │ │ │ │ │ │─── release_connection() ────►│ │ │ │ │ │ (connection 재사용) │ │ │ │ │ 장점: │ │ 1. 연결 생성 오버헤드 제거 (TCP handshake, 인증 등) │ │ 2. 동시 연결 수 제한으로 DB 과부하 방지 │ │ 3. 연결 재사용으로 리소스 효율성 향상 │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 5.2 핵심 파라미터 이해 ```python engine = create_async_engine( url=database_url, # pool_size: 풀에서 유지하는 영구 연결 수 # - 너무 작으면: 요청 대기 발생 # - 너무 크면: DB 리소스 낭비 pool_size=20, # max_overflow: pool_size 초과 시 생성 가능한 임시 연결 수 # - 총 최대 연결 = pool_size + max_overflow # - burst traffic 처리용 max_overflow=20, # pool_timeout: 연결 대기 최대 시간 (초) # - 초과 시 TimeoutError 발생 # - API 서버: 짧게 (빠른 실패 선호) # - Background: 길게 (안정성 선호) pool_timeout=30, # pool_recycle: 연결 재생성 주기 (초) # - MySQL wait_timeout보다 짧게 설정 # - "MySQL has gone away" 에러 방지 pool_recycle=3600, # pool_pre_ping: 연결 사용 전 유효성 검사 # - True: SELECT 1 실행하여 연결 확인 # - 약간의 오버헤드, 높은 안정성 pool_pre_ping=True, ) ``` ### 5.3 세션 관리 원칙 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 세션 관리 원칙 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 원칙 1: 최소 점유 시간 (Minimal Hold Time) │ │ ───────────────────────────────────────── │ │ "세션은 DB 작업에만 사용하고, 즉시 반환한다" │ │ │ │ ✗ 나쁜 예: │ │ session.query() → http_call(30s) → session.commit() │ │ │ │ ✓ 좋은 예: │ │ session.query() → session.commit() → http_call() → new_session│ │ │ │ 원칙 2: 범위 명확성 (Clear Scope) │ │ ───────────────────────────────── │ │ "세션의 시작과 끝을 명확히 정의한다" │ │ │ │ ✓ async with AsyncSessionLocal() as session: │ │ # 이 블록 내에서만 세션 사용 │ │ pass │ │ # 블록 종료 시 자동 반환 │ │ │ │ 원칙 3: 단일 책임 (Single Responsibility) │ │ ───────────────────────────────────────── │ │ "하나의 세션 블록은 하나의 트랜잭션 단위만 처리한다" │ │ │ │ 원칙 4: 실패 대비 (Failure Handling) │ │ ─────────────────────────────────── │ │ "예외 발생 시에도 세션이 반환되도록 보장한다" │ │ │ │ async with session: │ │ try: │ │ ... │ │ except Exception: │ │ await session.rollback() │ │ raise │ │ # finally에서 자동 close │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 5.4 워크로드 분리 원칙 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 워크로드 분리 전략 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 워크로드 유형별 특성: │ │ │ │ ┌─────────────────┬─────────────────┬─────────────────┐ │ │ │ API 요청 │ 백그라운드 작업 │ 배치 작업 │ │ │ ├─────────────────┼─────────────────┼─────────────────┤ │ │ │ 짧은 응답 시간 │ 긴 처리 시간 │ 매우 긴 처리 │ │ │ │ 높은 동시성 │ 중간 동시성 │ 낮은 동시성 │ │ │ │ 빠른 실패 선호 │ 재시도 허용 │ 안정성 최우선 │ │ │ └─────────────────┴─────────────────┴─────────────────┘ │ │ │ │ 분리 전략: │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ API Pool │ │ Background │ │ Batch Pool │ │ │ │ size=20 │ │ Pool │ │ size=5 │ │ │ │ timeout=30s │ │ size=10 │ │ timeout=300s│ │ │ └─────────────┘ │ timeout=60s │ └─────────────┘ │ │ └─────────────┘ │ │ │ │ 이점: │ │ 1. 워크로드 간 간섭 방지 │ │ 2. 각 워크로드에 최적화된 설정 적용 │ │ 3. 장애 격리 (한 풀의 문제가 다른 풀에 영향 X) │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## 6. 실무 시나리오 예제 코드 ### 시나리오 1: 이미지 처리 서비스 실제 프로젝트에서 자주 발생하는 "이미지 업로드 → 외부 처리 → 결과 저장" 패턴 #### 6.1.1 프로젝트 구조 ``` image_processing_service/ ├── app/ │ ├── __init__.py │ ├── main.py │ ├── config.py │ ├── database/ │ │ ├── __init__.py │ │ ├── session.py │ │ └── models.py │ ├── api/ │ │ ├── __init__.py │ │ └── routes/ │ │ ├── __init__.py │ │ └── images.py │ ├── services/ │ │ ├── __init__.py │ │ ├── image_processor.py │ │ └── storage_service.py │ └── workers/ │ ├── __init__.py │ └── image_tasks.py └── requirements.txt ``` #### 6.1.2 코드 구현 **config.py - 설정** ```python """ Configuration module for the image processing service. """ from pydantic_settings import BaseSettings class Settings(BaseSettings): """Application settings""" # Database DATABASE_URL: str = "mysql+asyncmy://user:pass@localhost/imagedb" # API Pool settings (빠른 응답 우선) API_POOL_SIZE: int = 20 API_POOL_MAX_OVERFLOW: int = 20 API_POOL_TIMEOUT: int = 30 # Background Pool settings (안정성 우선) BG_POOL_SIZE: int = 10 BG_POOL_MAX_OVERFLOW: int = 10 BG_POOL_TIMEOUT: int = 60 # External services IMAGE_PROCESSOR_URL: str = "https://api.imageprocessor.com" STORAGE_BUCKET: str = "processed-images" class Config: env_file = ".env" settings = Settings() ``` **database/session.py - 세션 관리** ```python """ Database session management with separate pools for different workloads. 핵심 설계 원칙: 1. API 요청과 백그라운드 작업에 별도 풀 사용 2. 각 풀은 워크로드 특성에 맞게 설정 3. 세션 상태 모니터링을 위한 로깅 """ from contextlib import asynccontextmanager from typing import AsyncGenerator from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine, ) from app.config import settings # ============================================================ # 메인 엔진 (FastAPI 요청용) # ============================================================ # 특징: 빠른 응답, 짧은 타임아웃, 빠른 실패 api_engine = create_async_engine( url=settings.DATABASE_URL, pool_size=settings.API_POOL_SIZE, max_overflow=settings.API_POOL_MAX_OVERFLOW, pool_timeout=settings.API_POOL_TIMEOUT, pool_recycle=3600, pool_pre_ping=True, echo=False, # Production에서는 False ) ApiSessionLocal = async_sessionmaker( bind=api_engine, class_=AsyncSession, expire_on_commit=False, autoflush=False, ) # ============================================================ # 백그라운드 엔진 (비동기 작업용) # ============================================================ # 특징: 긴 타임아웃, 안정성 우선 background_engine = create_async_engine( url=settings.DATABASE_URL, pool_size=settings.BG_POOL_SIZE, max_overflow=settings.BG_POOL_MAX_OVERFLOW, pool_timeout=settings.BG_POOL_TIMEOUT, pool_recycle=3600, pool_pre_ping=True, echo=False, ) BackgroundSessionLocal = async_sessionmaker( bind=background_engine, class_=AsyncSession, expire_on_commit=False, autoflush=False, ) # ============================================================ # 세션 제공 함수 # ============================================================ async def get_api_session() -> AsyncGenerator[AsyncSession, None]: """ FastAPI Dependency로 사용할 API 세션 제공자. 사용 예: @router.post("/images") async def upload(session: AsyncSession = Depends(get_api_session)): ... """ # 풀 상태 로깅 (디버깅용) pool = api_engine.pool print( f"[API Pool] size={pool.size()}, " f"checked_out={pool.checkedout()}, " f"overflow={pool.overflow()}" ) async with ApiSessionLocal() as session: try: yield session except Exception: await session.rollback() raise @asynccontextmanager async def get_background_session() -> AsyncGenerator[AsyncSession, None]: """ 백그라운드 작업용 세션 컨텍스트 매니저. 사용 예: async with get_background_session() as session: result = await session.execute(query) """ pool = background_engine.pool print( f"[Background Pool] size={pool.size()}, " f"checked_out={pool.checkedout()}, " f"overflow={pool.overflow()}" ) async with BackgroundSessionLocal() as session: try: yield session except Exception: await session.rollback() raise # ============================================================ # 리소스 정리 # ============================================================ async def dispose_engines() -> None: """애플리케이션 종료 시 모든 엔진 정리""" await api_engine.dispose() await background_engine.dispose() print("[Database] All engines disposed") ``` **database/models.py - 모델** ```python """ Database models for image processing service. """ from datetime import datetime from enum import Enum from sqlalchemy import String, Text, DateTime, Integer, Float from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column class Base(DeclarativeBase): pass class ImageStatus(str, Enum): """이미지 처리 상태""" PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" class Image(Base): """이미지 테이블""" __tablename__ = "images" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) task_id: Mapped[str] = mapped_column(String(50), index=True, nullable=False) original_url: Mapped[str] = mapped_column(Text, nullable=False) processed_url: Mapped[str | None] = mapped_column(Text, nullable=True) status: Mapped[str] = mapped_column( String(20), default=ImageStatus.PENDING.value, nullable=False ) width: Mapped[int | None] = mapped_column(Integer, nullable=True) height: Mapped[int | None] = mapped_column(Integer, nullable=True) file_size: Mapped[int | None] = mapped_column(Integer, nullable=True) processing_time: Mapped[float | None] = mapped_column(Float, nullable=True) error_message: Mapped[str | None] = mapped_column(Text, nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, nullable=False ) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False ) ``` **services/image_processor.py - 외부 API 서비스** ```python """ External image processing service client. """ import httpx from dataclasses import dataclass @dataclass class ProcessingResult: """이미지 처리 결과""" processed_url: str width: int height: int file_size: int processing_time: float class ImageProcessorService: """ 외부 이미지 처리 API 클라이언트. 주의: 이 서비스 호출은 DB 세션 외부에서 수행해야 합니다! """ def __init__(self, base_url: str): self.base_url = base_url self.timeout = httpx.Timeout(60.0, connect=10.0) async def process_image( self, image_url: str, options: dict | None = None ) -> ProcessingResult: """ 외부 API를 통해 이미지 처리. 이 함수는 30초~수 분이 소요될 수 있습니다. 반드시 DB 세션 외부에서 호출하세요! Args: image_url: 처리할 이미지 URL options: 처리 옵션 (resize, filter 등) Returns: ProcessingResult: 처리 결과 """ async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/process", json={ "image_url": image_url, "options": options or {} } ) response.raise_for_status() data = response.json() return ProcessingResult( processed_url=data["processed_url"], width=data["width"], height=data["height"], file_size=data["file_size"], processing_time=data["processing_time"], ) ``` **api/routes/images.py - API 라우터 (3-Stage Pattern 적용)** ```python """ Image API routes with proper session management. 핵심 패턴: 3-Stage Pattern - Stage 1: DB 작업 (세션 사용) - Stage 2: 외부 API 호출 (세션 없음) - Stage 3: 결과 저장 (새 세션) """ import asyncio from uuid import uuid4 from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel, HttpUrl from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.database.session import get_api_session, ApiSessionLocal from app.database.models import Image, ImageStatus from app.services.image_processor import ImageProcessorService from app.config import settings router = APIRouter(prefix="/images", tags=["images"]) # 외부 서비스 인스턴스 processor_service = ImageProcessorService(settings.IMAGE_PROCESSOR_URL) class ImageUploadRequest(BaseModel): """이미지 업로드 요청""" url: HttpUrl options: dict | None = None class ImageResponse(BaseModel): """이미지 응답""" task_id: str status: str original_url: str processed_url: str | None = None class Config: from_attributes = True # ============================================================ # 동기적 처리 (짧은 작업용) - 권장하지 않음 # ============================================================ @router.post("/process-sync", response_model=ImageResponse) async def process_image_sync( request: ImageUploadRequest, session: AsyncSession = Depends(get_api_session), ) -> ImageResponse: """ 동기적 이미지 처리 (3-Stage Pattern 적용). 주의: 외부 API 호출이 길어지면 요청 타임아웃 발생 가능. 대부분의 경우 비동기 처리를 권장합니다. """ task_id = str(uuid4()) # ========== Stage 1: 초기 레코드 생성 ========== image = Image( task_id=task_id, original_url=str(request.url), status=ImageStatus.PROCESSING.value, ) session.add(image) await session.commit() image_id = image.id # ID 저장 print(f"[Stage 1] Image record created - task_id: {task_id}") # ========== Stage 2: 외부 API 호출 (세션 없음!) ========== # 이 구간에서는 DB 커넥션을 점유하지 않습니다 try: print(f"[Stage 2] Calling external API - task_id: {task_id}") result = await processor_service.process_image( str(request.url), request.options ) print(f"[Stage 2] External API completed - task_id: {task_id}") except Exception as e: # 실패 시 상태 업데이트 (새 세션 사용) async with ApiSessionLocal() as error_session: stmt = select(Image).where(Image.id == image_id) db_image = (await error_session.execute(stmt)).scalar_one() db_image.status = ImageStatus.FAILED.value db_image.error_message = str(e) await error_session.commit() raise HTTPException(status_code=500, detail=str(e)) # ========== Stage 3: 결과 저장 (새 세션) ========== async with ApiSessionLocal() as update_session: stmt = select(Image).where(Image.id == image_id) db_image = (await update_session.execute(stmt)).scalar_one() db_image.status = ImageStatus.COMPLETED.value db_image.processed_url = result.processed_url db_image.width = result.width db_image.height = result.height db_image.file_size = result.file_size db_image.processing_time = result.processing_time await update_session.commit() print(f"[Stage 3] Result saved - task_id: {task_id}") return ImageResponse( task_id=task_id, status=db_image.status, original_url=db_image.original_url, processed_url=db_image.processed_url, ) # ============================================================ # 비동기 처리 (권장) # ============================================================ @router.post("/process-async", response_model=ImageResponse) async def process_image_async( request: ImageUploadRequest, background_tasks: BackgroundTasks, session: AsyncSession = Depends(get_api_session), ) -> ImageResponse: """ 비동기 이미지 처리 (즉시 응답 반환). 1. 초기 레코드 생성 후 즉시 응답 2. 백그라운드에서 처리 진행 3. 상태는 GET /images/{task_id} 로 조회 """ task_id = str(uuid4()) # DB 작업: 초기 레코드 생성 image = Image( task_id=task_id, original_url=str(request.url), status=ImageStatus.PENDING.value, ) session.add(image) await session.commit() # 백그라운드 태스크 등록 (별도 세션 사용) background_tasks.add_task( process_image_background, task_id=task_id, image_url=str(request.url), options=request.options, ) # 즉시 응답 반환 return ImageResponse( task_id=task_id, status=ImageStatus.PENDING.value, original_url=str(request.url), ) @router.get("/{task_id}", response_model=ImageResponse) async def get_image_status( task_id: str, session: AsyncSession = Depends(get_api_session), ) -> ImageResponse: """이미지 처리 상태 조회""" # 안전한 쿼리 패턴: 최신 레코드 1개만 조회 stmt = ( select(Image) .where(Image.task_id == task_id) .order_by(Image.created_at.desc()) .limit(1) ) result = await session.execute(stmt) image = result.scalar_one_or_none() if not image: raise HTTPException(status_code=404, detail="Image not found") return ImageResponse( task_id=image.task_id, status=image.status, original_url=image.original_url, processed_url=image.processed_url, ) # ============================================================ # 백그라운드 처리 함수 # ============================================================ async def process_image_background( task_id: str, image_url: str, options: dict | None, ) -> None: """ 백그라운드에서 이미지 처리. 중요: 이 함수는 BackgroundSessionLocal을 사용합니다. API 풀과 분리되어 있어 API 응답에 영향을 주지 않습니다. """ from app.database.session import BackgroundSessionLocal print(f"[Background] Starting - task_id: {task_id}") # 상태를 processing으로 업데이트 async with BackgroundSessionLocal() as session: stmt = ( select(Image) .where(Image.task_id == task_id) .order_by(Image.created_at.desc()) .limit(1) ) image = (await session.execute(stmt)).scalar_one_or_none() if image: image.status = ImageStatus.PROCESSING.value await session.commit() # 외부 API 호출 (세션 없음!) try: result = await processor_service.process_image(image_url, options) # 성공: 결과 저장 async with BackgroundSessionLocal() as session: stmt = ( select(Image) .where(Image.task_id == task_id) .order_by(Image.created_at.desc()) .limit(1) ) image = (await session.execute(stmt)).scalar_one_or_none() if image: image.status = ImageStatus.COMPLETED.value image.processed_url = result.processed_url image.width = result.width image.height = result.height image.file_size = result.file_size image.processing_time = result.processing_time await session.commit() print(f"[Background] Completed - task_id: {task_id}") except Exception as e: # 실패: 에러 저장 async with BackgroundSessionLocal() as session: stmt = ( select(Image) .where(Image.task_id == task_id) .order_by(Image.created_at.desc()) .limit(1) ) image = (await session.execute(stmt)).scalar_one_or_none() if image: image.status = ImageStatus.FAILED.value image.error_message = str(e) await session.commit() print(f"[Background] Failed - task_id: {task_id}, error: {e}") ``` **main.py - 애플리케이션 진입점** ```python """ Main application entry point. """ from contextlib import asynccontextmanager from fastapi import FastAPI from app.database.session import dispose_engines from app.api.routes import images @asynccontextmanager async def lifespan(app: FastAPI): """애플리케이션 생명주기 관리""" # Startup print("[App] Starting up...") yield # Shutdown print("[App] Shutting down...") await dispose_engines() app = FastAPI( title="Image Processing Service", description="이미지 처리 서비스 API", version="1.0.0", lifespan=lifespan, ) app.include_router(images.router, prefix="/api/v1") @app.get("/health") async def health_check(): """헬스 체크""" return {"status": "healthy"} ``` --- ### 시나리오 2: 결제 처리 서비스 결제 시스템에서의 "주문 생성 → 결제 처리 → 결과 저장" 패턴 #### 6.2.1 프로젝트 구조 ``` payment_service/ ├── app/ │ ├── __init__.py │ ├── main.py │ ├── config.py │ ├── database/ │ │ ├── __init__.py │ │ ├── session.py │ │ └── models.py │ ├── api/ │ │ ├── __init__.py │ │ └── routes/ │ │ ├── __init__.py │ │ └── payments.py │ ├── services/ │ │ ├── __init__.py │ │ └── payment_gateway.py │ └── workers/ │ ├── __init__.py │ └── payment_tasks.py └── requirements.txt ``` #### 6.2.2 코드 구현 **config.py - 설정** ```python """ Payment service configuration. """ from pydantic_settings import BaseSettings class Settings(BaseSettings): """Application settings""" DATABASE_URL: str = "mysql+asyncmy://user:pass@localhost/paymentdb" # Pool settings - 결제는 더 보수적으로 설정 API_POOL_SIZE: int = 30 # 결제는 트래픽이 많음 API_POOL_MAX_OVERFLOW: int = 20 API_POOL_TIMEOUT: int = 20 # 빠른 실패 BG_POOL_SIZE: int = 5 # 백그라운드는 적게 BG_POOL_MAX_OVERFLOW: int = 5 BG_POOL_TIMEOUT: int = 120 # 결제 검증은 시간이 걸릴 수 있음 # Payment gateway PAYMENT_GATEWAY_URL: str = "https://api.payment-gateway.com" PAYMENT_GATEWAY_KEY: str = "" class Config: env_file = ".env" settings = Settings() ``` **database/session.py - 세션 관리** ```python """ Database session management for payment service. 결제 서비스 특성: 1. 데이터 정합성이 매우 중요 2. 트랜잭션 롤백이 명확해야 함 3. 장애 시에도 데이터 손실 없어야 함 """ from contextlib import asynccontextmanager from typing import AsyncGenerator from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine, ) from app.config import settings # ============================================================ # API 엔진 (결제 요청 처리용) # ============================================================ api_engine = create_async_engine( url=settings.DATABASE_URL, pool_size=settings.API_POOL_SIZE, max_overflow=settings.API_POOL_MAX_OVERFLOW, pool_timeout=settings.API_POOL_TIMEOUT, pool_recycle=1800, # 30분 (결제는 더 자주 재생성) pool_pre_ping=True, echo=False, ) ApiSessionLocal = async_sessionmaker( bind=api_engine, class_=AsyncSession, expire_on_commit=False, autoflush=False, ) # ============================================================ # 백그라운드 엔진 (결제 검증, 정산 등) # ============================================================ background_engine = create_async_engine( url=settings.DATABASE_URL, pool_size=settings.BG_POOL_SIZE, max_overflow=settings.BG_POOL_MAX_OVERFLOW, pool_timeout=settings.BG_POOL_TIMEOUT, pool_recycle=1800, pool_pre_ping=True, echo=False, ) BackgroundSessionLocal = async_sessionmaker( bind=background_engine, class_=AsyncSession, expire_on_commit=False, autoflush=False, ) async def get_api_session() -> AsyncGenerator[AsyncSession, None]: """API 세션 제공""" async with ApiSessionLocal() as session: try: yield session except Exception: await session.rollback() raise @asynccontextmanager async def get_background_session() -> AsyncGenerator[AsyncSession, None]: """백그라운드 세션 제공""" async with BackgroundSessionLocal() as session: try: yield session except Exception: await session.rollback() raise async def dispose_engines() -> None: """엔진 정리""" await api_engine.dispose() await background_engine.dispose() ``` **database/models.py - 모델** ```python """ Payment database models. """ from datetime import datetime from decimal import Decimal from enum import Enum from sqlalchemy import String, Text, DateTime, Integer, Numeric, ForeignKey from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship class Base(DeclarativeBase): pass class PaymentStatus(str, Enum): """결제 상태""" PENDING = "pending" # 결제 대기 PROCESSING = "processing" # 처리 중 COMPLETED = "completed" # 완료 FAILED = "failed" # 실패 REFUNDED = "refunded" # 환불됨 CANCELLED = "cancelled" # 취소됨 class Order(Base): """주문 테이블""" __tablename__ = "orders" id: Mapped[int] = mapped_column(Integer, primary_key=True) order_number: Mapped[str] = mapped_column(String(50), unique=True, index=True) customer_id: Mapped[str] = mapped_column(String(50), index=True) total_amount: Mapped[Decimal] = mapped_column(Numeric(10, 2)) status: Mapped[str] = mapped_column(String(20), default="pending") created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow ) # Relationships payment: Mapped["Payment"] = relationship(back_populates="order", uselist=False) class Payment(Base): """결제 테이블""" __tablename__ = "payments" id: Mapped[int] = mapped_column(Integer, primary_key=True) payment_id: Mapped[str] = mapped_column(String(50), unique=True, index=True) order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), index=True) amount: Mapped[Decimal] = mapped_column(Numeric(10, 2)) currency: Mapped[str] = mapped_column(String(3), default="KRW") status: Mapped[str] = mapped_column( String(20), default=PaymentStatus.PENDING.value ) gateway_transaction_id: Mapped[str | None] = mapped_column(String(100)) gateway_response: Mapped[str | None] = mapped_column(Text) error_message: Mapped[str | None] = mapped_column(Text) created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow ) # Relationships order: Mapped["Order"] = relationship(back_populates="payment") ``` **services/payment_gateway.py - 결제 게이트웨이** ```python """ Payment gateway service client. """ import httpx from dataclasses import dataclass from decimal import Decimal @dataclass class PaymentResult: """결제 결과""" transaction_id: str status: str message: str raw_response: dict class PaymentGatewayService: """ 외부 결제 게이트웨이 클라이언트. 주의: 결제 API 호출은 반드시 DB 세션 외부에서! 결제는 3-10초 정도 소요될 수 있습니다. """ def __init__(self, base_url: str, api_key: str): self.base_url = base_url self.api_key = api_key self.timeout = httpx.Timeout(30.0, connect=10.0) async def process_payment( self, payment_id: str, amount: Decimal, currency: str, card_token: str, ) -> PaymentResult: """ 결제 처리. 이 함수는 3-10초가 소요될 수 있습니다. 반드시 DB 세션 외부에서 호출하세요! """ async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/v1/payments", headers={"Authorization": f"Bearer {self.api_key}"}, json={ "merchant_uid": payment_id, "amount": float(amount), "currency": currency, "card_token": card_token, } ) response.raise_for_status() data = response.json() return PaymentResult( transaction_id=data["transaction_id"], status=data["status"], message=data.get("message", ""), raw_response=data, ) async def verify_payment(self, transaction_id: str) -> PaymentResult: """결제 검증""" async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/v1/payments/{transaction_id}", headers={"Authorization": f"Bearer {self.api_key}"}, ) response.raise_for_status() data = response.json() return PaymentResult( transaction_id=data["transaction_id"], status=data["status"], message=data.get("message", ""), raw_response=data, ) async def refund_payment( self, transaction_id: str, amount: Decimal | None = None ) -> PaymentResult: """환불 처리""" async with httpx.AsyncClient(timeout=self.timeout) as client: payload = {"transaction_id": transaction_id} if amount: payload["amount"] = float(amount) response = await client.post( f"{self.base_url}/v1/refunds", headers={"Authorization": f"Bearer {self.api_key}"}, json=payload, ) response.raise_for_status() data = response.json() return PaymentResult( transaction_id=data["refund_id"], status=data["status"], message=data.get("message", ""), raw_response=data, ) ``` **api/routes/payments.py - 결제 API (3-Stage Pattern)** ```python """ Payment API routes with proper session management. 결제 시스템에서의 3-Stage Pattern: - Stage 1: 주문/결제 레코드 생성 (트랜잭션 보장) - Stage 2: 외부 결제 게이트웨이 호출 (세션 없음) - Stage 3: 결과 업데이트 (새 트랜잭션) 중요: 결제는 멱등성(Idempotency)을 보장해야 합니다! """ import json from decimal import Decimal from uuid import uuid4 from fastapi import APIRouter, Depends, HTTPException, Header from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.database.session import get_api_session, ApiSessionLocal from app.database.models import Order, Payment, PaymentStatus from app.services.payment_gateway import PaymentGatewayService from app.config import settings router = APIRouter(prefix="/payments", tags=["payments"]) # 결제 게이트웨이 서비스 gateway = PaymentGatewayService( settings.PAYMENT_GATEWAY_URL, settings.PAYMENT_GATEWAY_KEY, ) class PaymentRequest(BaseModel): """결제 요청""" order_number: str amount: Decimal currency: str = "KRW" card_token: str class PaymentResponse(BaseModel): """결제 응답""" payment_id: str order_number: str status: str amount: Decimal transaction_id: str | None = None message: str | None = None @router.post("/process", response_model=PaymentResponse) async def process_payment( request: PaymentRequest, idempotency_key: str = Header(..., alias="Idempotency-Key"), session: AsyncSession = Depends(get_api_session), ) -> PaymentResponse: """ 결제 처리 (3-Stage Pattern 적용). 멱등성 보장: - Idempotency-Key 헤더로 중복 결제 방지 - 동일 키로 재요청 시 기존 결과 반환 3-Stage Pattern: - Stage 1: 주문 조회 및 결제 레코드 생성 - Stage 2: 외부 결제 API 호출 (세션 해제) - Stage 3: 결제 결과 업데이트 """ payment_id = f"PAY-{idempotency_key}" # ========== 멱등성 체크 ========== existing = await session.execute( select(Payment).where(Payment.payment_id == payment_id) ) existing_payment = existing.scalar_one_or_none() if existing_payment: # 이미 처리된 결제가 있으면 기존 결과 반환 return PaymentResponse( payment_id=existing_payment.payment_id, order_number=request.order_number, status=existing_payment.status, amount=existing_payment.amount, transaction_id=existing_payment.gateway_transaction_id, message="이미 처리된 결제입니다", ) # ========== Stage 1: 주문 조회 및 결제 레코드 생성 ========== print(f"[Stage 1] Creating payment - payment_id: {payment_id}") # 주문 조회 order_result = await session.execute( select(Order) .where(Order.order_number == request.order_number) .limit(1) ) order = order_result.scalar_one_or_none() if not order: raise HTTPException(status_code=404, detail="주문을 찾을 수 없습니다") if order.total_amount != request.amount: raise HTTPException(status_code=400, detail="결제 금액이 일치하지 않습니다") # 결제 레코드 생성 payment = Payment( payment_id=payment_id, order_id=order.id, amount=request.amount, currency=request.currency, status=PaymentStatus.PROCESSING.value, ) session.add(payment) # 주문 상태 업데이트 order.status = "payment_processing" await session.commit() payment_db_id = payment.id order_id = order.id print(f"[Stage 1] Payment record created - payment_id: {payment_id}") # ========== Stage 2: 외부 결제 API 호출 (세션 없음!) ========== # 이 구간에서는 DB 커넥션을 점유하지 않습니다 print(f"[Stage 2] Calling payment gateway - payment_id: {payment_id}") try: gateway_result = await gateway.process_payment( payment_id=payment_id, amount=request.amount, currency=request.currency, card_token=request.card_token, ) print(f"[Stage 2] Gateway response - status: {gateway_result.status}") except Exception as e: # 게이트웨이 오류 시 실패 처리 print(f"[Stage 2] Gateway error - {e}") async with ApiSessionLocal() as error_session: # 결제 실패 처리 stmt = select(Payment).where(Payment.id == payment_db_id) db_payment = (await error_session.execute(stmt)).scalar_one() db_payment.status = PaymentStatus.FAILED.value db_payment.error_message = str(e) # 주문 상태 복원 order_stmt = select(Order).where(Order.id == order_id) db_order = (await error_session.execute(order_stmt)).scalar_one() db_order.status = "payment_failed" await error_session.commit() raise HTTPException( status_code=500, detail=f"결제 처리 중 오류가 발생했습니다: {str(e)}" ) # ========== Stage 3: 결제 결과 업데이트 (새 세션) ========== print(f"[Stage 3] Updating payment result - payment_id: {payment_id}") async with ApiSessionLocal() as update_session: # 결제 정보 업데이트 stmt = select(Payment).where(Payment.id == payment_db_id) db_payment = (await update_session.execute(stmt)).scalar_one() if gateway_result.status == "success": db_payment.status = PaymentStatus.COMPLETED.value new_order_status = "paid" else: db_payment.status = PaymentStatus.FAILED.value new_order_status = "payment_failed" db_payment.gateway_transaction_id = gateway_result.transaction_id db_payment.gateway_response = json.dumps(gateway_result.raw_response) # 주문 상태 업데이트 order_stmt = select(Order).where(Order.id == order_id) db_order = (await update_session.execute(order_stmt)).scalar_one() db_order.status = new_order_status await update_session.commit() print(f"[Stage 3] Payment completed - status: {db_payment.status}") return PaymentResponse( payment_id=db_payment.payment_id, order_number=request.order_number, status=db_payment.status, amount=db_payment.amount, transaction_id=db_payment.gateway_transaction_id, message=gateway_result.message, ) @router.get("/{payment_id}", response_model=PaymentResponse) async def get_payment_status( payment_id: str, session: AsyncSession = Depends(get_api_session), ) -> PaymentResponse: """결제 상태 조회""" # 안전한 쿼리 패턴 stmt = ( select(Payment) .where(Payment.payment_id == payment_id) .limit(1) ) result = await session.execute(stmt) payment = result.scalar_one_or_none() if not payment: raise HTTPException(status_code=404, detail="결제 정보를 찾을 수 없습니다") # 주문 정보 조회 order_stmt = select(Order).where(Order.id == payment.order_id) order = (await session.execute(order_stmt)).scalar_one() return PaymentResponse( payment_id=payment.payment_id, order_number=order.order_number, status=payment.status, amount=payment.amount, transaction_id=payment.gateway_transaction_id, ) @router.post("/{payment_id}/refund") async def refund_payment( payment_id: str, session: AsyncSession = Depends(get_api_session), ) -> PaymentResponse: """ 환불 처리 (3-Stage Pattern). """ # Stage 1: 결제 정보 조회 stmt = select(Payment).where(Payment.payment_id == payment_id).limit(1) result = await session.execute(stmt) payment = result.scalar_one_or_none() if not payment: raise HTTPException(status_code=404, detail="결제 정보를 찾을 수 없습니다") if payment.status != PaymentStatus.COMPLETED.value: raise HTTPException(status_code=400, detail="환불 가능한 상태가 아닙니다") if not payment.gateway_transaction_id: raise HTTPException(status_code=400, detail="거래 ID가 없습니다") transaction_id = payment.gateway_transaction_id payment_db_id = payment.id order_id = payment.order_id amount = payment.amount # 상태를 processing으로 변경 payment.status = "refund_processing" await session.commit() # Stage 2: 환불 API 호출 (세션 없음) try: refund_result = await gateway.refund_payment(transaction_id, amount) except Exception as e: # 실패 시 상태 복원 async with ApiSessionLocal() as error_session: stmt = select(Payment).where(Payment.id == payment_db_id) db_payment = (await error_session.execute(stmt)).scalar_one() db_payment.status = PaymentStatus.COMPLETED.value # 원래 상태로 await error_session.commit() raise HTTPException(status_code=500, detail=str(e)) # Stage 3: 결과 저장 async with ApiSessionLocal() as update_session: stmt = select(Payment).where(Payment.id == payment_db_id) db_payment = (await update_session.execute(stmt)).scalar_one() if refund_result.status == "success": db_payment.status = PaymentStatus.REFUNDED.value # 주문 상태도 업데이트 order_stmt = select(Order).where(Order.id == order_id) db_order = (await update_session.execute(order_stmt)).scalar_one() db_order.status = "refunded" else: db_payment.status = PaymentStatus.COMPLETED.value # 환불 실패 시 원래 상태 await update_session.commit() return PaymentResponse( payment_id=db_payment.payment_id, order_number="", # 간단히 처리 status=db_payment.status, amount=db_payment.amount, transaction_id=refund_result.transaction_id, message=refund_result.message, ) ``` **main.py - 애플리케이션** ```python """ Payment service main application. """ from contextlib import asynccontextmanager from fastapi import FastAPI from app.database.session import dispose_engines from app.api.routes import payments @asynccontextmanager async def lifespan(app: FastAPI): """생명주기 관리""" print("[Payment Service] Starting...") yield print("[Payment Service] Shutting down...") await dispose_engines() app = FastAPI( title="Payment Service", description="결제 처리 서비스", version="1.0.0", lifespan=lifespan, ) app.include_router(payments.router, prefix="/api/v1") @app.get("/health") async def health(): return {"status": "healthy", "service": "payment"} ``` --- ## 7. 설계 원칙 요약 ### 7.1 핵심 설계 원칙 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 커넥션 풀 설계 원칙 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. 최소 점유 원칙 (Minimal Hold Principle) │ │ ───────────────────────────────────────── │ │ "DB 커넥션은 DB 작업에만 사용하고 즉시 반환" │ │ │ │ ✗ session.query() → external_api() → session.commit() │ │ ✓ session.query() → commit() → external_api() → new_session │ │ │ │ 2. 워크로드 분리 원칙 (Workload Isolation) │ │ ───────────────────────────────────────── │ │ "다른 특성의 워크로드는 다른 풀을 사용" │ │ │ │ API 요청 → ApiPool (빠른 응답, 짧은 타임아웃) │ │ 백그라운드 → BackgroundPool (안정성, 긴 타임아웃) │ │ │ │ 3. 안전한 쿼리 원칙 (Safe Query Pattern) │ │ ───────────────────────────────────── │ │ "중복 가능성이 있는 조회는 항상 limit(1) 사용" │ │ │ │ select(Model).where(...).order_by(desc).limit(1) │ │ │ │ 4. 3-Stage 처리 원칙 (3-Stage Processing) │ │ ───────────────────────────────────── │ │ Stage 1: DB 작업 + 커밋 (세션 해제) │ │ Stage 2: 외부 API 호출 (세션 없음) │ │ Stage 3: 결과 저장 (새 세션) │ │ │ │ 5. 명시적 범위 원칙 (Explicit Scope) │ │ ───────────────────────────────────── │ │ "세션 범위를 async with로 명확히 정의" │ │ │ │ async with SessionLocal() as session: │ │ # 이 블록 내에서만 세션 사용 │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 7.2 체크리스트 새로운 API 엔드포인트를 작성할 때 확인해야 할 사항: ``` □ 외부 API 호출이 있는가? → 있다면 3-Stage Pattern 적용 □ 백그라운드 작업이 있는가? → 있다면 BackgroundSessionLocal 사용 □ 중복 데이터가 발생할 수 있는 쿼리가 있는가? → 있다면 order_by().limit(1) 적용 □ 세션이 예외 상황에서도 반환되는가? → async with 또는 try/finally 사용 □ 트랜잭션 범위가 적절한가? → 필요한 작업만 포함, 외부 호출 제외 ``` ### 7.3 Anti-Pattern 회피 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 피해야 할 패턴 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Anti-Pattern 1: Long-lived Session │ │ ─────────────────────────────────── │ │ ✗ async def handler(session): │ │ data = await session.query() │ │ result = await http_client.post() # 30초 소요 │ │ session.add(result) │ │ await session.commit() │ │ │ │ Anti-Pattern 2: Shared Pool for All │ │ ─────────────────────────────────── │ │ ✗ 모든 작업이 단일 풀 사용 │ │ → 백그라운드 작업이 API 응답을 블록킹 │ │ │ │ Anti-Pattern 3: Unsafe Query │ │ ─────────────────────────────── │ │ ✗ scalar_one_or_none() without limit(1) │ │ → 중복 데이터 시 예외 발생 │ │ │ │ Anti-Pattern 4: Missing Error Handling │ │ ─────────────────────────────────────── │ │ ✗ session = SessionLocal() │ │ await session.query() # 예외 발생 시 세션 누수 │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## 결론 이 문서에서 다룬 문제들은 대부분 **"외부 리소스 접근 중 DB 세션 점유"**라는 공통된 원인에서 발생했습니다. 해결의 핵심은: 1. **책임 분리**: DB 작업과 외부 API 호출을 명확히 분리 2. **리소스 격리**: 워크로드별 별도 커넥션 풀 사용 3. **방어적 프로그래밍**: 중복 데이터, 예외 상황 대비 이러한 원칙을 코드 리뷰 시 체크리스트로 활용하면, 프로덕션 환경에서의 커넥션 풀 관련 장애를 예방할 수 있습니다.