# FastAPI + SQLAlchemy ORM 완벽 실무 가이드 > SQLAlchemy 2.0+ / FastAPI 0.100+ / Python 3.10+ 기준 --- ## 목차 1. [기본 설정](#1-기본-설정) 2. [모델 정의](#2-모델-정의) 3. [CRUD 기본 작업](#3-crud-기본-작업) 4. [조회 쿼리 심화](#4-조회-쿼리-심화) 5. [필터링과 조건](#5-필터링과-조건) 6. [정렬, 페이징, 제한](#6-정렬-페이징-제한) 7. [집계 함수 (Aggregation)](#7-집계-함수-aggregation) 8. [JOIN 쿼리](#8-join-쿼리) 9. [서브쿼리 (Subquery)](#9-서브쿼리-subquery) 10. [집합 연산 (Union, Intersect, Except)](#10-집합-연산-union-intersect-except) 11. [고급 표현식](#11-고급-표현식) 12. [Relationship과 Eager Loading](#12-relationship과-eager-loading) 13. [트랜잭션 관리](#13-트랜잭션-관리) 14. [FastAPI 통합 패턴](#14-fastapi-통합-패턴) 15. [성능 최적화](#15-성능-최적화) 16. [실무 레시피](#16-실무-레시피) --- ## 1. 기본 설정 ### 1.1 프로젝트 구조 ``` project/ ├── app/ │ ├── __init__.py │ ├── main.py │ ├── config.py │ ├── database.py │ ├── models/ │ │ ├── __init__.py │ │ ├── base.py │ │ ├── user.py │ │ └── product.py │ ├── schemas/ │ │ ├── __init__.py │ │ ├── user.py │ │ └── product.py │ ├── repositories/ │ │ ├── __init__.py │ │ └── user.py │ ├── services/ │ │ ├── __init__.py │ │ └── user.py │ └── routers/ │ ├── __init__.py │ └── user.py ├── alembic/ ├── tests/ ├── alembic.ini ├── requirements.txt └── .env ``` ### 1.2 Database 설정 ```python # app/config.py from pydantic_settings import BaseSettings from functools import lru_cache class Settings(BaseSettings): DATABASE_URL: str = "mysql+pymysql://user:pass@localhost:3306/dbname" DATABASE_ECHO: bool = False # SQL 로그 출력 DATABASE_POOL_SIZE: int = 5 DATABASE_MAX_OVERFLOW: int = 10 class Config: env_file = ".env" @lru_cache def get_settings() -> Settings: return Settings() ``` ```python # app/database.py from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from typing import Generator from app.config import get_settings settings = get_settings() # Engine 생성 engine = create_engine( settings.DATABASE_URL, echo=settings.DATABASE_ECHO, pool_size=settings.DATABASE_POOL_SIZE, max_overflow=settings.DATABASE_MAX_OVERFLOW, pool_pre_ping=True, # 연결 유효성 검사 pool_recycle=3600, # 1시간마다 연결 재생성 ) # Session Factory SessionLocal = sessionmaker( bind=engine, autocommit=False, autoflush=False, expire_on_commit=False, # commit 후에도 객체 접근 가능 ) # Dependency def get_db() -> Generator[Session, None, None]: db = SessionLocal() try: yield db finally: db.close() ``` ### 1.3 Base 모델 정의 ```python # app/models/base.py from datetime import datetime from sqlalchemy import func from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column class Base(DeclarativeBase): """모든 모델의 기본 클래스""" pass class TimestampMixin: """생성/수정 시간 믹스인""" created_at: Mapped[datetime] = mapped_column( default=func.now(), nullable=False, ) updated_at: Mapped[datetime] = mapped_column( default=func.now(), onupdate=func.now(), nullable=False, ) class SoftDeleteMixin: """소프트 삭제 믹스인""" is_deleted: Mapped[bool] = mapped_column(default=False) deleted_at: Mapped[datetime | None] = mapped_column(default=None) ``` --- ## 2. 모델 정의 ### 2.1 기본 모델 ```python # app/models/user.py from typing import List, Optional from sqlalchemy import String, Text, Boolean, Integer, ForeignKey, Index from sqlalchemy.orm import Mapped, mapped_column, relationship from app.models.base import Base, TimestampMixin class User(TimestampMixin, Base): __tablename__ = "user" # Primary Key id: Mapped[int] = mapped_column(primary_key=True) # 필수 필드 email: Mapped[str] = mapped_column(String(255), unique=True, index=True) username: Mapped[str] = mapped_column(String(50), unique=True) hashed_password: Mapped[str] = mapped_column(String(255)) # 선택 필드 (nullable) full_name: Mapped[Optional[str]] = mapped_column(String(100)) bio: Mapped[Optional[str]] = mapped_column(Text) # 기본값이 있는 필드 is_active: Mapped[bool] = mapped_column(default=True) is_superuser: Mapped[bool] = mapped_column(default=False) login_count: Mapped[int] = mapped_column(default=0) # Relationships posts: Mapped[List["Post"]] = relationship( "Post", back_populates="author", cascade="all, delete-orphan", lazy="selectin", ) profile: Mapped[Optional["Profile"]] = relationship( "Profile", back_populates="user", uselist=False, cascade="all, delete-orphan", ) # 테이블 설정 __table_args__ = ( Index("idx_user_email_active", "email", "is_active"), {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}, ) def __repr__(self) -> str: return f"" ``` ### 2.2 관계가 있는 모델들 ```python # app/models/post.py from typing import List, Optional from sqlalchemy import String, Text, ForeignKey, Index from sqlalchemy.orm import Mapped, mapped_column, relationship from app.models.base import Base, TimestampMixin class Post(TimestampMixin, Base): __tablename__ = "post" id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str] = mapped_column(String(200)) content: Mapped[str] = mapped_column(Text) view_count: Mapped[int] = mapped_column(default=0) is_published: Mapped[bool] = mapped_column(default=False) # Foreign Key author_id: Mapped[int] = mapped_column( ForeignKey("user.id", ondelete="CASCADE"), index=True, ) category_id: Mapped[Optional[int]] = mapped_column( ForeignKey("category.id", ondelete="SET NULL"), nullable=True, ) # Relationships author: Mapped["User"] = relationship("User", back_populates="posts") category: Mapped[Optional["Category"]] = relationship("Category", back_populates="posts") comments: Mapped[List["Comment"]] = relationship( "Comment", back_populates="post", cascade="all, delete-orphan", order_by="Comment.created_at.desc()", ) # N:M 관계 tags: Mapped[List["Tag"]] = relationship( "Tag", secondary="post_tag", back_populates="posts", ) __table_args__ = ( Index("idx_post_author_published", "author_id", "is_published"), ) class Category(Base): __tablename__ = "category" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(50), unique=True) description: Mapped[Optional[str]] = mapped_column(Text) posts: Mapped[List["Post"]] = relationship("Post", back_populates="category") class Tag(Base): __tablename__ = "tag" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(30), unique=True) posts: Mapped[List["Post"]] = relationship( "Post", secondary="post_tag", back_populates="tags", ) # N:M 중간 테이블 from sqlalchemy import Table, Column, Integer, ForeignKey from app.models.base import Base post_tag = Table( "post_tag", Base.metadata, Column("post_id", Integer, ForeignKey("post.id", ondelete="CASCADE"), primary_key=True), Column("tag_id", Integer, ForeignKey("tag.id", ondelete="CASCADE"), primary_key=True), ) ``` --- ## 3. CRUD 기본 작업 ### 3.1 Create (생성) ```python from sqlalchemy import select from sqlalchemy.orm import Session from app.models.user import User # ───────────────────────────────────────────────────── # 단일 레코드 생성 # ───────────────────────────────────────────────────── def create_user(db: Session, email: str, username: str, password: str) -> User: user = User( email=email, username=username, hashed_password=password, ) db.add(user) db.commit() db.refresh(user) # DB에서 생성된 값(id, created_at 등) 로드 return user # ───────────────────────────────────────────────────── # 여러 레코드 한번에 생성 # ───────────────────────────────────────────────────── def create_users_bulk(db: Session, users_data: list[dict]) -> list[User]: users = [User(**data) for data in users_data] db.add_all(users) db.commit() # 각 객체 refresh for user in users: db.refresh(user) return users # ───────────────────────────────────────────────────── # 관계와 함께 생성 # ───────────────────────────────────────────────────── def create_post_with_tags( db: Session, title: str, content: str, author_id: int, tag_names: list[str] ) -> Post: # 기존 태그 조회 또는 생성 tags = [] for name in tag_names: tag = db.scalar(select(Tag).where(Tag.name == name)) if not tag: tag = Tag(name=name) tags.append(tag) post = Post( title=title, content=content, author_id=author_id, tags=tags, ) db.add(post) db.commit() db.refresh(post) return post # ───────────────────────────────────────────────────── # Insert ... ON DUPLICATE KEY UPDATE (Upsert) # ───────────────────────────────────────────────────── from sqlalchemy.dialects.mysql import insert as mysql_insert def upsert_user(db: Session, email: str, username: str) -> None: stmt = mysql_insert(User).values( email=email, username=username, ) stmt = stmt.on_duplicate_key_update( username=stmt.inserted.username, updated_at=func.now(), ) db.execute(stmt) db.commit() # PostgreSQL의 경우 from sqlalchemy.dialects.postgresql import insert as pg_insert def upsert_user_pg(db: Session, email: str, username: str) -> None: stmt = pg_insert(User).values(email=email, username=username) stmt = stmt.on_conflict_do_update( index_elements=["email"], set_={"username": username}, ) db.execute(stmt) db.commit() ``` ### 3.2 Read (조회) ```python from sqlalchemy import select from sqlalchemy.orm import Session # ───────────────────────────────────────────────────── # Primary Key로 조회 # ───────────────────────────────────────────────────── def get_user_by_id(db: Session, user_id: int) -> User | None: return db.get(User, user_id) # ───────────────────────────────────────────────────── # 단일 레코드 조회 (조건) # ───────────────────────────────────────────────────── def get_user_by_email(db: Session, email: str) -> User | None: stmt = select(User).where(User.email == email) return db.scalar(stmt) # scalar(): 단일 값 반환 (없으면 None) # scalars(): 여러 값의 ScalarResult 반환 # one(): 정확히 1개 (0개 또는 2개 이상이면 에러) # one_or_none(): 0개면 None, 1개면 반환, 2개 이상이면 에러 # first(): 첫 번째 결과 (없으면 None) # ───────────────────────────────────────────────────── # 여러 레코드 조회 # ───────────────────────────────────────────────────── def get_all_users(db: Session) -> list[User]: stmt = select(User) return list(db.scalars(stmt).all()) def get_active_users(db: Session) -> list[User]: stmt = select(User).where(User.is_active == True) return list(db.scalars(stmt).all()) # ───────────────────────────────────────────────────── # 특정 컬럼만 조회 # ───────────────────────────────────────────────────── def get_user_emails(db: Session) -> list[str]: stmt = select(User.email) return list(db.scalars(stmt).all()) def get_user_summary(db: Session) -> list[tuple]: stmt = select(User.id, User.email, User.username) return list(db.execute(stmt).all()) # ───────────────────────────────────────────────────── # 존재 여부 확인 # ───────────────────────────────────────────────────── def user_exists(db: Session, email: str) -> bool: stmt = select(User.id).where(User.email == email) return db.scalar(stmt) is not None # 또는 exists() 사용 from sqlalchemy import exists def user_exists_v2(db: Session, email: str) -> bool: stmt = select(exists().where(User.email == email)) return db.scalar(stmt) ``` ### 3.3 Update (수정) ```python from sqlalchemy import update from sqlalchemy.orm import Session # ───────────────────────────────────────────────────── # 단일 레코드 수정 (ORM 방식) # ───────────────────────────────────────────────────── def update_user(db: Session, user_id: int, **kwargs) -> User | None: user = db.get(User, user_id) if not user: return None for key, value in kwargs.items(): if hasattr(user, key): setattr(user, key, value) db.commit() db.refresh(user) return user # ───────────────────────────────────────────────────── # Bulk Update (Core 방식) - 더 효율적 # ───────────────────────────────────────────────────── def deactivate_users(db: Session, user_ids: list[int]) -> int: stmt = ( update(User) .where(User.id.in_(user_ids)) .values(is_active=False) ) result = db.execute(stmt) db.commit() return result.rowcount # 영향받은 행 수 # ───────────────────────────────────────────────────── # 조건부 Update # ───────────────────────────────────────────────────── def increment_login_count(db: Session, user_id: int) -> None: stmt = ( update(User) .where(User.id == user_id) .values(login_count=User.login_count + 1) ) db.execute(stmt) db.commit() # ───────────────────────────────────────────────────── # CASE를 사용한 조건부 Update # ───────────────────────────────────────────────────── from sqlalchemy import case def update_user_levels(db: Session) -> None: stmt = ( update(User) .values( level=case( (User.login_count >= 100, "gold"), (User.login_count >= 50, "silver"), else_="bronze", ) ) ) db.execute(stmt) db.commit() ``` ### 3.4 Delete (삭제) ```python from sqlalchemy import delete from sqlalchemy.orm import Session # ───────────────────────────────────────────────────── # 단일 레코드 삭제 (ORM 방식) # ───────────────────────────────────────────────────── def delete_user(db: Session, user_id: int) -> bool: user = db.get(User, user_id) if not user: return False db.delete(user) # cascade 설정에 따라 관련 데이터도 삭제 db.commit() return True # ───────────────────────────────────────────────────── # Bulk Delete (Core 방식) - 더 효율적 # ───────────────────────────────────────────────────── def delete_inactive_users(db: Session) -> int: stmt = delete(User).where(User.is_active == False) result = db.execute(stmt) db.commit() return result.rowcount # ───────────────────────────────────────────────────── # Soft Delete # ───────────────────────────────────────────────────── from datetime import datetime def soft_delete_user(db: Session, user_id: int) -> bool: stmt = ( update(User) .where(User.id == user_id) .values(is_deleted=True, deleted_at=datetime.utcnow()) ) result = db.execute(stmt) db.commit() return result.rowcount > 0 # ───────────────────────────────────────────────────── # 관계 데이터 삭제 # ───────────────────────────────────────────────────── def remove_tag_from_post(db: Session, post_id: int, tag_id: int) -> None: post = db.get(Post, post_id) tag = db.get(Tag, tag_id) if post and tag and tag in post.tags: post.tags.remove(tag) db.commit() ``` --- ## 4. 조회 쿼리 심화 ### 4.1 select() 기본 사용법 ```python from sqlalchemy import select # ───────────────────────────────────────────────────── # 전체 모델 조회 # ───────────────────────────────────────────────────── stmt = select(User) # SELECT user.id, user.email, user.username, ... FROM user # ───────────────────────────────────────────────────── # 특정 컬럼만 조회 # ───────────────────────────────────────────────────── stmt = select(User.id, User.email) # SELECT user.id, user.email FROM user # ───────────────────────────────────────────────────── # 컬럼 별칭 (alias) # ───────────────────────────────────────────────────── stmt = select(User.email.label("user_email")) # SELECT user.email AS user_email FROM user # ───────────────────────────────────────────────────── # DISTINCT # ───────────────────────────────────────────────────── stmt = select(User.category_id).distinct() # SELECT DISTINCT user.category_id FROM user # ───────────────────────────────────────────────────── # 여러 테이블에서 조회 # ───────────────────────────────────────────────────── stmt = select(User, Post).join(Post) # SELECT user.*, post.* FROM user JOIN post ON ... stmt = select(User.email, Post.title).join(Post) # SELECT user.email, post.title FROM user JOIN post ON ... ``` ### 4.2 실행 메서드 비교 ```python from sqlalchemy.orm import Session # ───────────────────────────────────────────────────── # execute() - Row 객체 반환 # ───────────────────────────────────────────────────── stmt = select(User.id, User.email) result = db.execute(stmt) for row in result: print(row.id, row.email) # Row 객체 print(row[0], row[1]) # 인덱스 접근 print(row._mapping) # dict-like 접근 # ───────────────────────────────────────────────────── # scalars() - 첫 번째 컬럼만 반환 # ───────────────────────────────────────────────────── stmt = select(User) users = db.scalars(stmt).all() # list[User] stmt = select(User.email) emails = db.scalars(stmt).all() # list[str] # ───────────────────────────────────────────────────── # scalar() - 단일 값 반환 # ───────────────────────────────────────────────────── stmt = select(User).where(User.id == 1) user = db.scalar(stmt) # User | None stmt = select(func.count(User.id)) count = db.scalar(stmt) # int # ───────────────────────────────────────────────────── # 결과 처리 메서드 # ───────────────────────────────────────────────────── result = db.scalars(stmt) result.all() # 모든 결과를 리스트로 result.first() # 첫 번째 결과 (없으면 None) result.one() # 정확히 1개 (아니면 예외) result.one_or_none()# 0-1개 (2개 이상이면 예외) result.fetchmany(5) # 5개만 가져오기 result.unique() # 중복 제거 (relationship 로딩 시 필요) ``` --- ## 5. 필터링과 조건 ### 5.1 기본 비교 연산자 ```python from sqlalchemy import select, and_, or_, not_ # ───────────────────────────────────────────────────── # 동등 비교 # ───────────────────────────────────────────────────── stmt = select(User).where(User.email == "test@example.com") stmt = select(User).where(User.is_active == True) stmt = select(User).where(User.category_id == None) # IS NULL # ───────────────────────────────────────────────────── # 부등 비교 # ───────────────────────────────────────────────────── stmt = select(User).where(User.age != 30) stmt = select(User).where(User.age > 18) stmt = select(User).where(User.age >= 18) stmt = select(User).where(User.age < 65) stmt = select(User).where(User.age <= 65) # ───────────────────────────────────────────────────── # BETWEEN # ───────────────────────────────────────────────────── stmt = select(User).where(User.age.between(18, 65)) # WHERE age BETWEEN 18 AND 65 # ───────────────────────────────────────────────────── # IN / NOT IN # ───────────────────────────────────────────────────── stmt = select(User).where(User.status.in_(["active", "pending"])) stmt = select(User).where(User.id.in_([1, 2, 3, 4, 5])) stmt = select(User).where(User.status.not_in(["deleted", "banned"])) # ───────────────────────────────────────────────────── # IS NULL / IS NOT NULL # ───────────────────────────────────────────────────── stmt = select(User).where(User.deleted_at.is_(None)) # IS NULL stmt = select(User).where(User.deleted_at.is_not(None)) # IS NOT NULL stmt = select(User).where(User.bio.isnot(None)) # 또 다른 방법 ``` ### 5.2 문자열 연산 ```python # ───────────────────────────────────────────────────── # LIKE / ILIKE # ───────────────────────────────────────────────────── stmt = select(User).where(User.email.like("%@gmail.com")) stmt = select(User).where(User.username.like("john%")) stmt = select(User).where(User.name.like("%홍길%")) # 대소문자 무시 (PostgreSQL) stmt = select(User).where(User.email.ilike("%@GMAIL.COM")) # ───────────────────────────────────────────────────── # CONTAINS (LIKE '%value%') # ───────────────────────────────────────────────────── stmt = select(User).where(User.bio.contains("python")) # WHERE bio LIKE '%python%' # ───────────────────────────────────────────────────── # STARTSWITH / ENDSWITH # ───────────────────────────────────────────────────── stmt = select(User).where(User.email.startswith("admin")) # WHERE email LIKE 'admin%' stmt = select(User).where(User.email.endswith("@company.com")) # WHERE email LIKE '%@company.com' # ───────────────────────────────────────────────────── # 정규표현식 (DB 지원 필요) # ───────────────────────────────────────────────────── stmt = select(User).where(User.email.regexp_match(r"^[a-z]+@")) ``` ### 5.3 논리 연산자 ```python from sqlalchemy import and_, or_, not_ # ───────────────────────────────────────────────────── # AND # ───────────────────────────────────────────────────── # 방법 1: and_() 함수 stmt = select(User).where( and_( User.is_active == True, User.age >= 18, ) ) # 방법 2: where() 체이닝 (암묵적 AND) stmt = select(User).where(User.is_active == True).where(User.age >= 18) # 방법 3: 콤마로 구분 stmt = select(User).where(User.is_active == True, User.age >= 18) # ───────────────────────────────────────────────────── # OR # ───────────────────────────────────────────────────── stmt = select(User).where( or_( User.role == "admin", User.role == "moderator", ) ) # ───────────────────────────────────────────────────── # NOT # ───────────────────────────────────────────────────── stmt = select(User).where(not_(User.is_deleted == True)) stmt = select(User).where(~(User.is_deleted == True)) # ~ 연산자 # ───────────────────────────────────────────────────── # 복합 조건 # ───────────────────────────────────────────────────── stmt = select(User).where( and_( User.is_active == True, or_( User.role == "admin", User.age >= 21, ), not_(User.is_deleted == True), ) ) # WHERE is_active = true # AND (role = 'admin' OR age >= 21) # AND NOT is_deleted = true ``` ### 5.4 동적 필터링 ```python from typing import Optional def search_users( db: Session, email: Optional[str] = None, username: Optional[str] = None, is_active: Optional[bool] = None, min_age: Optional[int] = None, max_age: Optional[int] = None, ) -> list[User]: stmt = select(User) # 동적으로 조건 추가 if email: stmt = stmt.where(User.email.contains(email)) if username: stmt = stmt.where(User.username.like(f"%{username}%")) if is_active is not None: stmt = stmt.where(User.is_active == is_active) if min_age is not None: stmt = stmt.where(User.age >= min_age) if max_age is not None: stmt = stmt.where(User.age <= max_age) return list(db.scalars(stmt).all()) # 사용 users = search_users(db, email="gmail", is_active=True, min_age=18) ``` --- ## 6. 정렬, 페이징, 제한 ### 6.1 정렬 (ORDER BY) ```python from sqlalchemy import select, asc, desc # ───────────────────────────────────────────────────── # 기본 정렬 # ───────────────────────────────────────────────────── # 오름차순 (기본) stmt = select(User).order_by(User.created_at) stmt = select(User).order_by(asc(User.created_at)) # 내림차순 stmt = select(User).order_by(desc(User.created_at)) stmt = select(User).order_by(User.created_at.desc()) # ───────────────────────────────────────────────────── # 다중 정렬 # ───────────────────────────────────────────────────── stmt = select(User).order_by( User.is_active.desc(), User.created_at.desc(), ) # ORDER BY is_active DESC, created_at DESC # ───────────────────────────────────────────────────── # NULL 처리 # ───────────────────────────────────────────────────── stmt = select(User).order_by(User.last_login.desc().nullslast()) # NULL 값을 마지막에 stmt = select(User).order_by(User.last_login.asc().nullsfirst()) # NULL 값을 처음에 ``` ### 6.2 제한과 오프셋 (LIMIT, OFFSET) ```python # ───────────────────────────────────────────────────── # LIMIT # ───────────────────────────────────────────────────── stmt = select(User).limit(10) # SELECT ... FROM user LIMIT 10 # ───────────────────────────────────────────────────── # OFFSET # ───────────────────────────────────────────────────── stmt = select(User).offset(20).limit(10) # SELECT ... FROM user LIMIT 10 OFFSET 20 # ───────────────────────────────────────────────────── # 슬라이스 문법 # ───────────────────────────────────────────────────── stmt = select(User).slice(20, 30) # offset=20, limit=10과 동일 ``` ### 6.3 페이지네이션 구현 ```python from typing import TypeVar, Generic from pydantic import BaseModel from sqlalchemy import select, func from sqlalchemy.orm import Session T = TypeVar("T") class PaginatedResult(BaseModel, Generic[T]): items: list[T] total: int page: int page_size: int total_pages: int has_next: bool has_prev: bool def paginate( db: Session, stmt, page: int = 1, page_size: int = 20, ) -> dict: # 페이지 유효성 검사 page = max(1, page) page_size = min(max(1, page_size), 100) # 최대 100개 # 전체 개수 조회 count_stmt = select(func.count()).select_from(stmt.subquery()) total = db.scalar(count_stmt) # 페이지네이션 적용 offset = (page - 1) * page_size paginated_stmt = stmt.offset(offset).limit(page_size) items = list(db.scalars(paginated_stmt).all()) total_pages = (total + page_size - 1) // page_size return { "items": items, "total": total, "page": page, "page_size": page_size, "total_pages": total_pages, "has_next": page < total_pages, "has_prev": page > 1, } # 사용 예시 def get_users_paginated( db: Session, page: int = 1, page_size: int = 20, is_active: bool | None = None, ) -> dict: stmt = select(User).order_by(User.created_at.desc()) if is_active is not None: stmt = stmt.where(User.is_active == is_active) return paginate(db, stmt, page, page_size) # FastAPI 엔드포인트 @router.get("/users") def list_users( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), is_active: bool | None = None, db: Session = Depends(get_db), ): return get_users_paginated(db, page, page_size, is_active) ``` ### 6.4 커서 기반 페이지네이션 ```python from datetime import datetime from typing import Optional def get_users_cursor( db: Session, limit: int = 20, cursor: Optional[datetime] = None, ) -> dict: stmt = select(User).order_by(User.created_at.desc()) if cursor: stmt = stmt.where(User.created_at < cursor) stmt = stmt.limit(limit + 1) # 다음 페이지 확인용으로 1개 더 items = list(db.scalars(stmt).all()) has_next = len(items) > limit if has_next: items = items[:limit] next_cursor = items[-1].created_at if items and has_next else None return { "items": items, "next_cursor": next_cursor, "has_next": has_next, } # 복합 커서 (동일 시간 처리) def get_users_cursor_v2( db: Session, limit: int = 20, cursor_time: Optional[datetime] = None, cursor_id: Optional[int] = None, ) -> dict: stmt = select(User).order_by( User.created_at.desc(), User.id.desc(), ) if cursor_time and cursor_id: stmt = stmt.where( or_( User.created_at < cursor_time, and_( User.created_at == cursor_time, User.id < cursor_id, ), ) ) stmt = stmt.limit(limit + 1) items = list(db.scalars(stmt).all()) has_next = len(items) > limit if has_next: items = items[:limit] return { "items": items, "next_cursor": { "time": items[-1].created_at, "id": items[-1].id, } if items and has_next else None, "has_next": has_next, } ``` --- ## 7. 집계 함수 (Aggregation) ### 7.1 기본 집계 함수 ```python from sqlalchemy import select, func # ───────────────────────────────────────────────────── # COUNT # ───────────────────────────────────────────────────── # 전체 개수 stmt = select(func.count(User.id)) total = db.scalar(stmt) # 조건부 개수 stmt = select(func.count(User.id)).where(User.is_active == True) active_count = db.scalar(stmt) # COUNT(DISTINCT) stmt = select(func.count(func.distinct(User.category_id))) unique_categories = db.scalar(stmt) # ───────────────────────────────────────────────────── # SUM # ───────────────────────────────────────────────────── stmt = select(func.sum(Order.amount)) total_amount = db.scalar(stmt) stmt = select(func.sum(Order.amount)).where(Order.status == "completed") completed_amount = db.scalar(stmt) # ───────────────────────────────────────────────────── # AVG # ───────────────────────────────────────────────────── stmt = select(func.avg(Product.price)) avg_price = db.scalar(stmt) # 반올림 stmt = select(func.round(func.avg(Product.price), 2)) avg_price_rounded = db.scalar(stmt) # ───────────────────────────────────────────────────── # MIN / MAX # ───────────────────────────────────────────────────── stmt = select(func.min(Product.price), func.max(Product.price)) result = db.execute(stmt).first() min_price, max_price = result # ───────────────────────────────────────────────────── # 여러 집계 함수 함께 사용 # ───────────────────────────────────────────────────── stmt = select( func.count(Order.id).label("order_count"), func.sum(Order.amount).label("total_amount"), func.avg(Order.amount).label("avg_amount"), func.min(Order.amount).label("min_amount"), func.max(Order.amount).label("max_amount"), ) result = db.execute(stmt).first() print(f"주문 수: {result.order_count}") print(f"총 금액: {result.total_amount}") print(f"평균 금액: {result.avg_amount}") ``` ### 7.2 GROUP BY ```python # ───────────────────────────────────────────────────── # 기본 GROUP BY # ───────────────────────────────────────────────────── # 카테고리별 상품 수 stmt = ( select( Product.category_id, func.count(Product.id).label("product_count"), ) .group_by(Product.category_id) ) results = db.execute(stmt).all() for row in results: print(f"Category {row.category_id}: {row.product_count} products") # ───────────────────────────────────────────────────── # JOIN과 함께 GROUP BY # ───────────────────────────────────────────────────── # 카테고리 이름과 함께 stmt = ( select( Category.name, func.count(Product.id).label("product_count"), func.avg(Product.price).label("avg_price"), ) .join(Product, Category.id == Product.category_id) .group_by(Category.id, Category.name) ) # ───────────────────────────────────────────────────── # 날짜별 GROUP BY # ───────────────────────────────────────────────────── # 일별 주문 통계 stmt = ( select( func.date(Order.created_at).label("order_date"), func.count(Order.id).label("order_count"), func.sum(Order.amount).label("total_amount"), ) .group_by(func.date(Order.created_at)) .order_by(func.date(Order.created_at).desc()) ) # 월별 주문 통계 stmt = ( select( func.year(Order.created_at).label("year"), func.month(Order.created_at).label("month"), func.count(Order.id).label("order_count"), func.sum(Order.amount).label("total_amount"), ) .group_by( func.year(Order.created_at), func.month(Order.created_at), ) .order_by( func.year(Order.created_at).desc(), func.month(Order.created_at).desc(), ) ) # ───────────────────────────────────────────────────── # 다중 컬럼 GROUP BY # ───────────────────────────────────────────────────── stmt = ( select( User.country, User.city, func.count(User.id).label("user_count"), ) .group_by(User.country, User.city) .order_by(func.count(User.id).desc()) ) ``` ### 7.3 HAVING ```python # ───────────────────────────────────────────────────── # 집계 결과 필터링 # ───────────────────────────────────────────────────── # 5개 이상 상품이 있는 카테고리만 stmt = ( select( Category.name, func.count(Product.id).label("product_count"), ) .join(Product) .group_by(Category.id, Category.name) .having(func.count(Product.id) >= 5) ) # ───────────────────────────────────────────────────── # 복합 HAVING 조건 # ───────────────────────────────────────────────────── stmt = ( select( User.id, User.username, func.count(Order.id).label("order_count"), func.sum(Order.amount).label("total_spent"), ) .join(Order) .group_by(User.id, User.username) .having( and_( func.count(Order.id) >= 10, func.sum(Order.amount) >= 100000, ) ) .order_by(func.sum(Order.amount).desc()) ) # 10건 이상 주문하고 10만원 이상 사용한 고객 ``` ### 7.4 윈도우 함수 ```python from sqlalchemy import over # ───────────────────────────────────────────────────── # ROW_NUMBER # ───────────────────────────────────────────────────── stmt = ( select( User.id, User.username, User.created_at, func.row_number().over( order_by=User.created_at.desc() ).label("row_num"), ) ) # ───────────────────────────────────────────────────── # 파티션별 순위 # ───────────────────────────────────────────────────── stmt = ( select( Product.id, Product.name, Product.category_id, Product.price, func.rank().over( partition_by=Product.category_id, order_by=Product.price.desc(), ).label("price_rank"), ) ) # 카테고리 내에서 가격 순위 # ───────────────────────────────────────────────────── # 누적 합계 # ───────────────────────────────────────────────────── stmt = ( select( Order.id, Order.created_at, Order.amount, func.sum(Order.amount).over( order_by=Order.created_at, ).label("cumulative_amount"), ) ) # ───────────────────────────────────────────────────── # 이동 평균 # ───────────────────────────────────────────────────── from sqlalchemy import text stmt = ( select( Order.id, Order.created_at, Order.amount, func.avg(Order.amount).over( order_by=Order.created_at, rows=(2, 0), # 현재 행 포함 최근 3개 ).label("moving_avg"), ) ) ``` --- ## 8. JOIN 쿼리 ### 8.1 기본 JOIN ```python from sqlalchemy import select # ───────────────────────────────────────────────────── # INNER JOIN # ───────────────────────────────────────────────────── # 방법 1: relationship 사용 (자동 조건) stmt = select(User, Post).join(User.posts) # SELECT ... FROM user JOIN post ON user.id = post.author_id # 방법 2: 명시적 조건 stmt = select(User, Post).join(Post, User.id == Post.author_id) # 방법 3: 모델만 지정 (FK 자동 감지) stmt = select(User, Post).join(Post) # ───────────────────────────────────────────────────── # LEFT OUTER JOIN # ───────────────────────────────────────────────────── stmt = select(User, Post).join(Post, isouter=True) # SELECT ... FROM user LEFT JOIN post ON ... # 또는 stmt = select(User, Post).outerjoin(Post) # ───────────────────────────────────────────────────── # RIGHT OUTER JOIN # ───────────────────────────────────────────────────── stmt = select(User, Post).join(Post, full=True) # FULL OUTER # RIGHT JOIN은 순서를 바꿔서 구현 stmt = select(Post, User).outerjoin(User) # ───────────────────────────────────────────────────── # 다중 JOIN # ───────────────────────────────────────────────────── stmt = ( select(User, Post, Comment) .join(Post, User.id == Post.author_id) .join(Comment, Post.id == Comment.post_id) ) # relationship 사용 stmt = ( select(User, Post, Comment) .join(User.posts) .join(Post.comments) ) ``` ### 8.2 JOIN 결과 처리 ```python # ───────────────────────────────────────────────────── # 여러 모델 조회 # ───────────────────────────────────────────────────── stmt = select(User, Post).join(Post) results = db.execute(stmt).all() for user, post in results: print(f"Author: {user.username}, Post: {post.title}") # ───────────────────────────────────────────────────── # 특정 컬럼만 조회 # ───────────────────────────────────────────────────── stmt = ( select(User.username, Post.title, Post.created_at) .join(Post) .order_by(Post.created_at.desc()) ) results = db.execute(stmt).all() for row in results: print(f"{row.username}: {row.title}") # ───────────────────────────────────────────────────── # 하나의 모델만 필요할 때 # ───────────────────────────────────────────────────── # 게시글이 있는 사용자만 조회 stmt = ( select(User) .join(Post) .distinct() ) users = db.scalars(stmt).all() ``` ### 8.3 Self JOIN ```python # ───────────────────────────────────────────────────── # 같은 테이블 조인 (별칭 필요) # ───────────────────────────────────────────────────── from sqlalchemy.orm import aliased # 사용자와 그 추천인 Referrer = aliased(User, name="referrer") stmt = ( select(User.username, Referrer.username.label("referrer_name")) .join(Referrer, User.referrer_id == Referrer.id, isouter=True) ) # ───────────────────────────────────────────────────── # 계층 구조 조회 (상위/하위) # ───────────────────────────────────────────────────── Parent = aliased(Category, name="parent") stmt = ( select(Category.name, Parent.name.label("parent_name")) .join(Parent, Category.parent_id == Parent.id, isouter=True) ) # ───────────────────────────────────────────────────── # 같은 테이블 비교 # ───────────────────────────────────────────────────── # 같은 카테고리의 다른 상품 OtherProduct = aliased(Product, name="other") stmt = ( select(Product.name, OtherProduct.name.label("related_product")) .join( OtherProduct, and_( Product.category_id == OtherProduct.category_id, Product.id != OtherProduct.id, ) ) .where(Product.id == 1) ) ``` --- ## 9. 서브쿼리 (Subquery) ### 9.1 스칼라 서브쿼리 ```python from sqlalchemy import select, func # ───────────────────────────────────────────────────── # SELECT 절의 서브쿼리 # ───────────────────────────────────────────────────── # 각 사용자의 게시글 수 post_count_subq = ( select(func.count(Post.id)) .where(Post.author_id == User.id) .correlate(User) .scalar_subquery() ) stmt = select( User.username, post_count_subq.label("post_count"), ) # ───────────────────────────────────────────────────── # 평균과 비교 # ───────────────────────────────────────────────────── avg_price = select(func.avg(Product.price)).scalar_subquery() stmt = select(Product).where(Product.price > avg_price) # 평균보다 비싼 상품 ``` ### 9.2 FROM 절의 서브쿼리 ```python # ───────────────────────────────────────────────────── # 서브쿼리를 테이블처럼 사용 # ───────────────────────────────────────────────────── # 카테고리별 통계 서브쿼리 category_stats = ( select( Product.category_id, func.count(Product.id).label("product_count"), func.avg(Product.price).label("avg_price"), ) .group_by(Product.category_id) .subquery() ) # 메인 쿼리에서 조인 stmt = ( select( Category.name, category_stats.c.product_count, category_stats.c.avg_price, ) .join(category_stats, Category.id == category_stats.c.category_id) ) # ───────────────────────────────────────────────────── # 파생 테이블 # ───────────────────────────────────────────────────── # 최근 7일 활성 사용자 recent_active = ( select( User.id, User.username, func.count(Order.id).label("order_count"), ) .join(Order) .where(Order.created_at >= func.date_sub(func.now(), text("INTERVAL 7 DAY"))) .group_by(User.id) .subquery() ) # 많이 주문한 순으로 stmt = ( select(recent_active) .order_by(recent_active.c.order_count.desc()) .limit(10) ) ``` ### 9.3 WHERE 절의 서브쿼리 ```python from sqlalchemy import exists, any_, all_ # ───────────────────────────────────────────────────── # IN 서브쿼리 # ───────────────────────────────────────────────────── # 주문한 적 있는 사용자 ordered_user_ids = select(Order.user_id).distinct() stmt = select(User).where(User.id.in_(ordered_user_ids)) # ───────────────────────────────────────────────────── # EXISTS # ───────────────────────────────────────────────────── # 게시글이 있는 사용자 has_posts = ( exists() .where(Post.author_id == User.id) ) stmt = select(User).where(has_posts) # NOT EXISTS stmt = select(User).where(~has_posts) # 게시글 없는 사용자 # ───────────────────────────────────────────────────── # ANY / ALL # ───────────────────────────────────────────────────── # 어떤 주문보다 비싼 상품 (ANY) any_order_amount = select(Order.amount) stmt = select(Product).where(Product.price > any_(any_order_amount)) # 모든 주문보다 비싼 상품 (ALL) stmt = select(Product).where(Product.price > all_(any_order_amount)) ``` ### 9.4 Lateral 서브쿼리 (PostgreSQL) ```python from sqlalchemy import lateral # 각 사용자의 최근 3개 주문 recent_orders = ( select(Order) .where(Order.user_id == User.id) .order_by(Order.created_at.desc()) .limit(3) .lateral() ) stmt = ( select(User, recent_orders) .outerjoin(recent_orders, True) ) ``` --- ## 10. 집합 연산 (Union, Intersect, Except) ### 10.1 UNION ```python from sqlalchemy import union, union_all # ───────────────────────────────────────────────────── # UNION (중복 제거) # ───────────────────────────────────────────────────── # 관리자 + 최근 활동 사용자 admins = select(User.id, User.username).where(User.is_admin == True) recent_active = select(User.id, User.username).where( User.last_login >= func.date_sub(func.now(), text("INTERVAL 7 DAY")) ) stmt = union(admins, recent_active) results = db.execute(stmt).all() # ───────────────────────────────────────────────────── # UNION ALL (중복 허용, 더 빠름) # ───────────────────────────────────────────────────── stmt = union_all(admins, recent_active) # ───────────────────────────────────────────────────── # 여러 개 UNION # ───────────────────────────────────────────────────── query1 = select(User.email.label("contact")).where(User.is_active == True) query2 = select(Contact.email.label("contact")).where(Contact.subscribed == True) query3 = select(Lead.email.label("contact")).where(Lead.status == "qualified") stmt = union(query1, query2, query3) # ───────────────────────────────────────────────────── # UNION 결과 정렬/제한 # ───────────────────────────────────────────────────── combined = union(admins, recent_active).subquery() stmt = ( select(combined) .order_by(combined.c.username) .limit(10) ) ``` ### 10.2 INTERSECT ```python from sqlalchemy import intersect, intersect_all # ───────────────────────────────────────────────────── # INTERSECT (교집합) # ───────────────────────────────────────────────────── # 관리자이면서 최근 활동한 사용자 admins = select(User.id).where(User.is_admin == True) recent_active = select(User.id).where( User.last_login >= func.date_sub(func.now(), text("INTERVAL 7 DAY")) ) stmt = intersect(admins, recent_active) # ───────────────────────────────────────────────────── # 여러 조건의 교집합 # ───────────────────────────────────────────────────── # 세 가지 모두 만족하는 사용자 has_orders = select(Order.user_id).distinct() has_reviews = select(Review.user_id).distinct() is_verified = select(User.id).where(User.is_verified == True) stmt = intersect(has_orders, has_reviews, is_verified) ``` ### 10.3 EXCEPT ```python from sqlalchemy import except_, except_all # ───────────────────────────────────────────────────── # EXCEPT (차집합) # ───────────────────────────────────────────────────── # 주문은 했지만 리뷰는 안 쓴 사용자 ordered = select(Order.user_id).distinct() reviewed = select(Review.user_id).distinct() stmt = except_(ordered, reviewed) # ───────────────────────────────────────────────────── # 실용 예: 미처리 항목 찾기 # ───────────────────────────────────────────────────── # 모든 신규 사용자 중 환영 이메일 미발송 대상 all_new_users = select(User.id).where( User.created_at >= func.date_sub(func.now(), text("INTERVAL 24 HOUR")) ) email_sent = select(EmailLog.user_id).where(EmailLog.type == "welcome") stmt = except_(all_new_users, email_sent) ``` --- ## 11. 고급 표현식 ### 11.1 CASE 문 ```python from sqlalchemy import case # ───────────────────────────────────────────────────── # 단순 CASE # ───────────────────────────────────────────────────── status_label = case( (User.status == "active", "활성"), (User.status == "pending", "대기"), (User.status == "suspended", "정지"), else_="알 수 없음", ) stmt = select(User.username, status_label.label("status_label")) # ───────────────────────────────────────────────────── # 조건부 집계 # ───────────────────────────────────────────────────── stmt = select( func.count(case((Order.status == "completed", 1))).label("completed_count"), func.count(case((Order.status == "pending", 1))).label("pending_count"), func.count(case((Order.status == "cancelled", 1))).label("cancelled_count"), func.sum(case((Order.status == "completed", Order.amount), else_=0)).label("completed_amount"), ) # ───────────────────────────────────────────────────── # 정렬에서 CASE # ───────────────────────────────────────────────────── priority_order = case( (User.role == "admin", 1), (User.role == "moderator", 2), else_=3, ) stmt = select(User).order_by(priority_order, User.username) ``` ### 11.2 형변환 (CAST) ```python from sqlalchemy import cast from sqlalchemy.types import String, Integer, Float, Date # ───────────────────────────────────────────────────── # 타입 변환 # ───────────────────────────────────────────────────── stmt = select(cast(User.age, String).label("age_str")) stmt = select(cast(Product.price, Integer).label("price_int")) stmt = select(cast(Order.created_at, Date).label("order_date")) # ───────────────────────────────────────────────────── # 문자열 연결 시 형변환 # ───────────────────────────────────────────────────── stmt = select( (User.username + " (" + cast(User.age, String) + "세)").label("display_name") ) ``` ### 11.3 문자열 함수 ```python from sqlalchemy import func # ───────────────────────────────────────────────────── # 기본 문자열 함수 # ───────────────────────────────────────────────────── stmt = select( func.upper(User.username), func.lower(User.email), func.length(User.bio), func.trim(User.name), func.concat(User.first_name, " ", User.last_name).label("full_name"), ) # ───────────────────────────────────────────────────── # 문자열 추출 # ───────────────────────────────────────────────────── stmt = select( func.substring(User.email, 1, 10), # 처음 10자 func.left(User.username, 5), func.right(User.email, 10), ) # ───────────────────────────────────────────────────── # 문자열 연결 (|| 연산자) # ───────────────────────────────────────────────────── from sqlalchemy import literal_column # PostgreSQL/SQLite full_name = User.first_name + " " + User.last_name # MySQL full_name = func.concat(User.first_name, " ", User.last_name) ``` ### 11.4 날짜/시간 함수 ```python from sqlalchemy import func, extract # ───────────────────────────────────────────────────── # 현재 날짜/시간 # ───────────────────────────────────────────────────── stmt = select( func.now(), func.current_date(), func.current_time(), func.current_timestamp(), ) # ───────────────────────────────────────────────────── # 날짜 추출 # ───────────────────────────────────────────────────── stmt = select( extract("year", Order.created_at).label("year"), extract("month", Order.created_at).label("month"), extract("day", Order.created_at).label("day"), extract("hour", Order.created_at).label("hour"), ) # 또는 stmt = select( func.year(Order.created_at), func.month(Order.created_at), func.day(Order.created_at), ) # ───────────────────────────────────────────────────── # 날짜 연산 # ───────────────────────────────────────────────────── # MySQL from sqlalchemy import text stmt = select(User).where( User.created_at >= func.date_sub(func.now(), text("INTERVAL 30 DAY")) ) # PostgreSQL from datetime import timedelta stmt = select(User).where( User.created_at >= func.now() - timedelta(days=30) ) # ───────────────────────────────────────────────────── # 날짜 차이 # ───────────────────────────────────────────────────── stmt = select( func.datediff(func.now(), User.created_at).label("days_since_signup") ) ``` ### 11.5 NULL 처리 ```python from sqlalchemy import func, coalesce, nullif # ───────────────────────────────────────────────────── # COALESCE (첫 번째 non-null 값) # ───────────────────────────────────────────────────── stmt = select( coalesce(User.nickname, User.username, "Anonymous").label("display_name") ) # ───────────────────────────────────────────────────── # NULLIF (같으면 NULL) # ───────────────────────────────────────────────────── # 0으로 나누기 방지 stmt = select( Order.total / nullif(Order.quantity, 0) ) # ───────────────────────────────────────────────────── # IFNULL / NVL (MySQL / Oracle) # ───────────────────────────────────────────────────── stmt = select( func.ifnull(User.nickname, "No nickname") ) ``` ### 11.6 Raw SQL 사용 ```python from sqlalchemy import text, literal_column # ───────────────────────────────────────────────────── # text() - Raw SQL # ───────────────────────────────────────────────────── # WHERE 절에서 stmt = select(User).where(text("MATCH(bio) AGAINST(:keyword)")).params(keyword="python") # 전체 쿼리 result = db.execute(text("SELECT * FROM user WHERE id = :id"), {"id": 1}) # ───────────────────────────────────────────────────── # literal_column() - 컬럼 표현식 # ───────────────────────────────────────────────────── stmt = select( User.username, literal_column("'active'").label("status"), ) # ───────────────────────────────────────────────────── # literal() - 리터럴 값 # ───────────────────────────────────────────────────── from sqlalchemy import literal stmt = select( User.username, literal(1).label("constant"), literal("active").label("status"), ) ``` --- ## 12. Relationship과 Eager Loading ### 12.1 Lazy Loading (기본) ```python # ───────────────────────────────────────────────────── # N+1 문제 발생 예시 # ───────────────────────────────────────────────────── users = db.scalars(select(User)).all() # 쿼리 1회 for user in users: print(user.posts) # 각 사용자마다 추가 쿼리! (N회) # 총 N+1회 쿼리 발생 ``` ### 12.2 Eager Loading 옵션 ```python from sqlalchemy.orm import selectinload, joinedload, subqueryload, raiseload # ───────────────────────────────────────────────────── # selectinload (권장: 1:N) # ───────────────────────────────────────────────────── stmt = select(User).options(selectinload(User.posts)) users = db.scalars(stmt).all() # 쿼리 1: SELECT * FROM user # 쿼리 2: SELECT * FROM post WHERE user_id IN (1, 2, 3, ...) for user in users: print(user.posts) # 추가 쿼리 없음! # ───────────────────────────────────────────────────── # joinedload (권장: N:1, 1:1) # ───────────────────────────────────────────────────── stmt = select(Post).options(joinedload(Post.author)) posts = db.scalars(stmt).unique().all() # unique() 필요! # 쿼리: SELECT post.*, user.* FROM post JOIN user ON ... # ───────────────────────────────────────────────────── # subqueryload # ───────────────────────────────────────────────────── stmt = select(User).options(subqueryload(User.posts)) # 쿼리 1: SELECT * FROM user # 쿼리 2: SELECT * FROM post WHERE user_id IN (SELECT id FROM user) # ───────────────────────────────────────────────────── # raiseload (로딩 금지) # ───────────────────────────────────────────────────── stmt = select(User).options(raiseload(User.posts)) user = db.scalar(stmt) print(user.posts) # 에러 발생! 명시적 로딩 필요 ``` ### 12.3 중첩 Eager Loading ```python # ───────────────────────────────────────────────────── # 다단계 관계 로딩 # ───────────────────────────────────────────────────── stmt = ( select(User) .options( selectinload(User.posts) .selectinload(Post.comments) .selectinload(Comment.author) ) ) # 쿼리 1: SELECT * FROM user # 쿼리 2: SELECT * FROM post WHERE user_id IN (...) # 쿼리 3: SELECT * FROM comment WHERE post_id IN (...) # 쿼리 4: SELECT * FROM user WHERE id IN (...) # 댓글 작성자 # ───────────────────────────────────────────────────── # 여러 관계 동시 로딩 # ───────────────────────────────────────────────────── stmt = ( select(User) .options( selectinload(User.posts), selectinload(User.comments), joinedload(User.profile), ) ) # ───────────────────────────────────────────────────── # contains_eager (이미 조인된 경우) # ───────────────────────────────────────────────────── from sqlalchemy.orm import contains_eager stmt = ( select(Post) .join(Post.author) .where(User.is_active == True) .options(contains_eager(Post.author)) # 조인 결과 사용 ) ``` ### 12.4 로딩 전략 비교 | 전략 | 쿼리 수 | 적합한 관계 | 장점 | 단점 | |------|--------|------------|------|------| | `selectinload` | 2 | 1:N | 효율적, 간단 | 대량 ID 시 IN 절 길어짐 | | `joinedload` | 1 | N:1, 1:1 | 단일 쿼리 | 1:N에서 중복 발생 | | `subqueryload` | 2 | 1:N | 복잡한 필터 지원 | 서브쿼리 오버헤드 | | `raiseload` | - | - | 실수 방지 | 명시적 로딩 필요 | --- ## 13. 트랜잭션 관리 ### 13.1 기본 트랜잭션 ```python from sqlalchemy.orm import Session # ───────────────────────────────────────────────────── # 명시적 commit/rollback # ───────────────────────────────────────────────────── def create_order(db: Session, user_id: int, items: list) -> Order: try: order = Order(user_id=user_id) db.add(order) db.flush() # ID 생성을 위해 flush for item in items: order_item = OrderItem( order_id=order.id, product_id=item["product_id"], quantity=item["quantity"], ) db.add(order_item) db.commit() db.refresh(order) return order except Exception as e: db.rollback() raise # ───────────────────────────────────────────────────── # context manager 사용 # ───────────────────────────────────────────────────── from sqlalchemy.orm import Session def transfer_money(engine, from_id: int, to_id: int, amount: float): with Session(engine) as session: with session.begin(): # 자동 commit/rollback from_account = session.get(Account, from_id) to_account = session.get(Account, to_id) if from_account.balance < amount: raise ValueError("잔액 부족") from_account.balance -= amount to_account.balance += amount # begin() 블록 종료 시 자동 commit # Session 종료 시 자동 close ``` ### 13.2 Savepoint (중첩 트랜잭션) ```python def complex_operation(db: Session): try: # 메인 작업 user = User(username="test") db.add(user) db.flush() # 선택적 작업 (실패해도 메인은 유지) savepoint = db.begin_nested() try: risky_operation() savepoint.commit() except Exception: savepoint.rollback() # 메인 트랜잭션은 유지됨 db.commit() except Exception: db.rollback() raise ``` ### 13.3 FastAPI에서 트랜잭션 ```python # app/database.py from contextlib import contextmanager def get_db() -> Generator[Session, None, None]: db = SessionLocal() try: yield db except Exception: db.rollback() raise finally: db.close() # 트랜잭션이 필요한 서비스 class OrderService: def __init__(self, db: Session): self.db = db def create_order_with_payment(self, data: OrderCreate) -> Order: # 여러 작업이 하나의 트랜잭션 order = self._create_order(data) self._process_payment(order) self._update_inventory(order) self._send_notification(order) self.db.commit() return order # 라우터 @router.post("/orders") def create_order( data: OrderCreate, db: Session = Depends(get_db), ): service = OrderService(db) return service.create_order_with_payment(data) ``` --- ## 14. FastAPI 통합 패턴 ### 14.1 Repository 패턴 ```python # app/repositories/base.py from typing import TypeVar, Generic, Type, Optional, List from sqlalchemy import select from sqlalchemy.orm import Session from app.models.base import Base ModelType = TypeVar("ModelType", bound=Base) class BaseRepository(Generic[ModelType]): def __init__(self, model: Type[ModelType], db: Session): self.model = model self.db = db def get(self, id: int) -> Optional[ModelType]: return self.db.get(self.model, id) def get_all(self, skip: int = 0, limit: int = 100) -> List[ModelType]: stmt = select(self.model).offset(skip).limit(limit) return list(self.db.scalars(stmt).all()) def create(self, obj: ModelType) -> ModelType: self.db.add(obj) self.db.commit() self.db.refresh(obj) return obj def update(self, obj: ModelType) -> ModelType: self.db.commit() self.db.refresh(obj) return obj def delete(self, obj: ModelType) -> None: self.db.delete(obj) self.db.commit() # app/repositories/user.py from sqlalchemy import select from app.models.user import User class UserRepository(BaseRepository[User]): def __init__(self, db: Session): super().__init__(User, db) def get_by_email(self, email: str) -> Optional[User]: stmt = select(User).where(User.email == email) return self.db.scalar(stmt) def get_active_users(self) -> List[User]: stmt = select(User).where(User.is_active == True) return list(self.db.scalars(stmt).all()) def search(self, keyword: str) -> List[User]: stmt = select(User).where( or_( User.username.contains(keyword), User.email.contains(keyword), ) ) return list(self.db.scalars(stmt).all()) ``` ### 14.2 Service 패턴 ```python # app/services/user.py from typing import Optional, List from fastapi import HTTPException, status from sqlalchemy.orm import Session from app.models.user import User from app.schemas.user import UserCreate, UserUpdate from app.repositories.user import UserRepository from app.core.security import hash_password class UserService: def __init__(self, db: Session): self.db = db self.repository = UserRepository(db) def get_user(self, user_id: int) -> User: user = self.repository.get(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) return user def get_user_by_email(self, email: str) -> Optional[User]: return self.repository.get_by_email(email) def create_user(self, data: UserCreate) -> User: # 이메일 중복 확인 if self.repository.get_by_email(data.email): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered", ) user = User( email=data.email, username=data.username, hashed_password=hash_password(data.password), ) return self.repository.create(user) def update_user(self, user_id: int, data: UserUpdate) -> User: user = self.get_user(user_id) update_data = data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(user, field, value) return self.repository.update(user) def delete_user(self, user_id: int) -> None: user = self.get_user(user_id) self.repository.delete(user) # Dependency def get_user_service(db: Session = Depends(get_db)) -> UserService: return UserService(db) ``` ### 14.3 Router (Endpoints) ```python # app/routers/user.py from fastapi import APIRouter, Depends, Query, status from sqlalchemy.orm import Session from app.database import get_db from app.schemas.user import UserCreate, UserUpdate, UserResponse, UserListResponse from app.services.user import UserService, get_user_service router = APIRouter(prefix="/users", tags=["users"]) @router.get("/", response_model=UserListResponse) def list_users( skip: int = Query(0, ge=0), limit: int = Query(20, ge=1, le=100), service: UserService = Depends(get_user_service), ): users = service.repository.get_all(skip=skip, limit=limit) return {"items": users, "total": len(users)} @router.get("/{user_id}", response_model=UserResponse) def get_user( user_id: int, service: UserService = Depends(get_user_service), ): return service.get_user(user_id) @router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user( data: UserCreate, service: UserService = Depends(get_user_service), ): return service.create_user(data) @router.patch("/{user_id}", response_model=UserResponse) def update_user( user_id: int, data: UserUpdate, service: UserService = Depends(get_user_service), ): return service.update_user(user_id, data) @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_user( user_id: int, service: UserService = Depends(get_user_service), ): service.delete_user(user_id) ``` ### 14.4 Pydantic 스키마 ```python # app/schemas/user.py from datetime import datetime from typing import Optional, List from pydantic import BaseModel, EmailStr, Field, ConfigDict class UserBase(BaseModel): email: EmailStr username: str = Field(..., min_length=3, max_length=50) full_name: Optional[str] = None class UserCreate(UserBase): password: str = Field(..., min_length=8) class UserUpdate(BaseModel): email: Optional[EmailStr] = None username: Optional[str] = Field(None, min_length=3, max_length=50) full_name: Optional[str] = None is_active: Optional[bool] = None class UserResponse(UserBase): id: int is_active: bool created_at: datetime model_config = ConfigDict(from_attributes=True) class UserListResponse(BaseModel): items: List[UserResponse] total: int ``` --- ## 15. 성능 최적화 ### 15.1 인덱스 최적화 ```python from sqlalchemy import Index class User(Base): __tablename__ = "user" id: Mapped[int] = mapped_column(primary_key=True) email: Mapped[str] = mapped_column(String(255), unique=True) status: Mapped[str] = mapped_column(String(20)) created_at: Mapped[datetime] = mapped_column(default=func.now()) __table_args__ = ( # 단일 인덱스 Index("idx_user_status", "status"), # 복합 인덱스 Index("idx_user_status_created", "status", "created_at"), # 부분 인덱스 (PostgreSQL) Index( "idx_user_active", "email", postgresql_where=text("status = 'active'"), ), ) ``` ### 15.2 쿼리 최적화 ```python # ───────────────────────────────────────────────────── # 1. 필요한 컬럼만 조회 # ───────────────────────────────────────────────────── # Bad users = db.scalars(select(User)).all() emails = [u.email for u in users] # Good emails = db.scalars(select(User.email)).all() # ───────────────────────────────────────────────────── # 2. N+1 문제 해결 # ───────────────────────────────────────────────────── # Bad users = db.scalars(select(User)).all() for user in users: print(user.posts) # N+1 # Good stmt = select(User).options(selectinload(User.posts)) users = db.scalars(stmt).all() # ───────────────────────────────────────────────────── # 3. Bulk 작업 사용 # ───────────────────────────────────────────────────── # Bad - 개별 업데이트 for user in users: user.is_active = False db.commit() # Good - Bulk 업데이트 db.execute( update(User) .where(User.id.in_([u.id for u in users])) .values(is_active=False) ) db.commit() # ───────────────────────────────────────────────────── # 4. 존재 확인은 EXISTS 사용 # ───────────────────────────────────────────────────── # Bad user = db.scalar(select(User).where(User.email == email)) exists = user is not None # Good exists = db.scalar( select(exists().where(User.email == email)) ) # ───────────────────────────────────────────────────── # 5. 카운트는 count() 사용 # ───────────────────────────────────────────────────── # Bad count = len(db.scalars(select(User)).all()) # Good count = db.scalar(select(func.count(User.id))) ``` ### 15.3 연결 풀 설정 ```python engine = create_engine( DATABASE_URL, pool_size=10, # 기본 연결 수 max_overflow=20, # 추가 허용 연결 수 pool_timeout=30, # 연결 대기 시간 pool_recycle=1800, # 연결 재생성 주기 (30분) pool_pre_ping=True, # 연결 유효성 사전 검사 ) ``` ### 15.4 쿼리 실행 계획 확인 ```python from sqlalchemy import explain stmt = select(User).where(User.email == "test@example.com") # 실행 계획 출력 print(db.execute(explain(stmt)).fetchall()) # MySQL EXPLAIN print(db.execute(text(f"EXPLAIN {stmt}")).fetchall()) ``` --- ## 16. 실무 레시피 ### 16.1 Soft Delete 구현 ```python from datetime import datetime from sqlalchemy import event class SoftDeleteMixin: is_deleted: Mapped[bool] = mapped_column(default=False) deleted_at: Mapped[Optional[datetime]] = mapped_column(default=None) def soft_delete(self): self.is_deleted = True self.deleted_at = datetime.utcnow() # 자동 필터링 (Global Filter) @event.listens_for(Session, "do_orm_execute") def _add_filtering_criteria(execute_state): if execute_state.is_select: execute_state.statement = execute_state.statement.options( with_loader_criteria( SoftDeleteMixin, lambda cls: cls.is_deleted == False, include_aliases=True, ) ) ``` ### 16.2 Audit Log ```python from sqlalchemy import event class AuditMixin: created_by: Mapped[Optional[int]] = mapped_column(ForeignKey("user.id")) updated_by: Mapped[Optional[int]] = mapped_column(ForeignKey("user.id")) # 현재 사용자 컨텍스트 from contextvars import ContextVar current_user_id: ContextVar[Optional[int]] = ContextVar("current_user_id", default=None) @event.listens_for(AuditMixin, "before_insert", propagate=True) def set_created_by(mapper, connection, target): if user_id := current_user_id.get(): target.created_by = user_id target.updated_by = user_id @event.listens_for(AuditMixin, "before_update", propagate=True) def set_updated_by(mapper, connection, target): if user_id := current_user_id.get(): target.updated_by = user_id # FastAPI 미들웨어 @app.middleware("http") async def set_current_user(request: Request, call_next): user_id = get_user_id_from_token(request) token = current_user_id.set(user_id) try: response = await call_next(request) return response finally: current_user_id.reset(token) ``` ### 16.3 전체 텍스트 검색 ```python from sqlalchemy import Index, text class Post(Base): __tablename__ = "post" id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str] = mapped_column(String(200)) content: Mapped[str] = mapped_column(Text) __table_args__ = ( # MySQL FULLTEXT 인덱스 Index( "idx_post_fulltext", "title", "content", mysql_prefix="FULLTEXT", ), ) def search_posts(db: Session, keyword: str) -> list[Post]: stmt = ( select(Post) .where( text("MATCH(title, content) AGAINST(:keyword IN BOOLEAN MODE)") ) .params(keyword=keyword) ) return list(db.scalars(stmt).all()) ``` ### 16.4 슬러그 자동 생성 ```python from sqlalchemy import event import re def slugify(text: str) -> str: text = text.lower() text = re.sub(r'[^\w\s-]', '', text) text = re.sub(r'[-\s]+', '-', text).strip('-') return text class Post(Base): title: Mapped[str] = mapped_column(String(200)) slug: Mapped[str] = mapped_column(String(200), unique=True) @event.listens_for(Post.title, "set") def generate_slug(target, value, oldvalue, initiator): if value and (not target.slug or oldvalue != value): target.slug = slugify(value) ``` ### 16.5 캐싱 패턴 ```python from functools import lru_cache from typing import Optional import json import redis redis_client = redis.Redis() class CachedUserRepository: def __init__(self, db: Session): self.db = db self.cache_ttl = 3600 # 1시간 def _cache_key(self, user_id: int) -> str: return f"user:{user_id}" def get(self, user_id: int) -> Optional[User]: # 캐시 확인 cached = redis_client.get(self._cache_key(user_id)) if cached: data = json.loads(cached) return User(**data) # DB 조회 user = self.db.get(User, user_id) if user: # 캐시 저장 redis_client.setex( self._cache_key(user_id), self.cache_ttl, json.dumps(user.to_dict()), ) return user def invalidate(self, user_id: int) -> None: redis_client.delete(self._cache_key(user_id)) ``` --- ## 부록: Quick Reference ### 자주 사용하는 import ```python from sqlalchemy import ( create_engine, select, insert, update, delete, and_, or_, not_, func, case, cast, exists, text, literal, literal_column, Index, ForeignKey, String, Integer, Text, Boolean, desc, asc, nullsfirst, nullslast, ) from sqlalchemy.orm import ( Session, sessionmaker, relationship, Mapped, mapped_column, selectinload, joinedload, subqueryload, raiseload, contains_eager, aliased, ) from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.postgresql import insert as pg_insert ``` ### 쿼리 실행 메서드 ```python db.execute(stmt) # Result 반환 db.scalars(stmt) # ScalarResult 반환 db.scalar(stmt) # 단일 값 반환 result.all() # 모든 결과 result.first() # 첫 번째 (없으면 None) result.one() # 정확히 1개 (아니면 예외) result.one_or_none() # 0-1개 (2개 이상 예외) result.unique() # 중복 제거 ``` ### 관계 로딩 전략 ```python selectinload(Model.relation) # 1:N - SELECT ... IN joinedload(Model.relation) # N:1, 1:1 - JOIN subqueryload(Model.relation) # 복잡한 필터 raiseload(Model.relation) # 로딩 금지 ```