o2o-castad-backend/docs/reference/fastapi_sqlalchemy_guide.md

2966 lines
97 KiB
Markdown

# FastAPI + SQLAlchemy ORM 완벽 실무 가이드
> SQLAlchemy 2.0+ / FastAPI 0.100+ / Python 3.10+ 기준
---
## 목차
1. [기본 설정](#1-기본-설정)
2. [모델 정의](#2-모델-정의)
3. [CRUD 기본 작업](#3-crud-기본-작업)
4. [조회 쿼리 심화](#4-조회-쿼리-심화)
5. [필터링과 조건](#5-필터링과-조건)
6. [정렬, 페이징, 제한](#6-정렬-페이징-제한)
7. [집계 함수 (Aggregation)](#7-집계-함수-aggregation)
8. [JOIN 쿼리](#8-join-쿼리)
9. [서브쿼리 (Subquery)](#9-서브쿼리-subquery)
10. [집합 연산 (Union, Intersect, Except)](#10-집합-연산-union-intersect-except)
11. [고급 표현식](#11-고급-표현식)
12. [Relationship과 Eager Loading](#12-relationship과-eager-loading)
13. [트랜잭션 관리](#13-트랜잭션-관리)
14. [FastAPI 통합 패턴](#14-fastapi-통합-패턴)
15. [성능 최적화](#15-성능-최적화)
16. [실무 레시피](#16-실무-레시피)
---
## 1. 기본 설정
### 1.1 프로젝트 구조
```
project/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── database.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── user.py
│ │ └── product.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── product.py
│ ├── repositories/
│ │ ├── __init__.py
│ │ └── user.py
│ ├── services/
│ │ ├── __init__.py
│ │ └── user.py
│ └── routers/
│ ├── __init__.py
│ └── user.py
├── alembic/
├── tests/
├── alembic.ini
├── requirements.txt
└── .env
```
### 1.2 Database 설정
```python
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
DATABASE_URL: str = "mysql+pymysql://user:pass@localhost:3306/dbname"
DATABASE_ECHO: bool = False # SQL 로그 출력
DATABASE_POOL_SIZE: int = 5
DATABASE_MAX_OVERFLOW: int = 10
class Config:
env_file = ".env"
@lru_cache
def get_settings() -> Settings:
return Settings()
```
```python
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from typing import Generator
from app.config import get_settings
settings = get_settings()
# Engine 생성
engine = create_engine(
settings.DATABASE_URL,
echo=settings.DATABASE_ECHO,
pool_size=settings.DATABASE_POOL_SIZE,
max_overflow=settings.DATABASE_MAX_OVERFLOW,
pool_pre_ping=True, # 연결 유효성 검사
pool_recycle=3600, # 1시간마다 연결 재생성
)
# Session Factory
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=False, # commit 후에도 객체 접근 가능
)
# Dependency
def get_db() -> Generator[Session, None, None]:
db = SessionLocal()
try:
yield db
finally:
db.close()
```
### 1.3 Base 모델 정의
```python
# app/models/base.py
from datetime import datetime
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
"""모든 모델의 기본 클래스"""
pass
class TimestampMixin:
"""생성/수정 시간 믹스인"""
created_at: Mapped[datetime] = mapped_column(
default=func.now(),
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
default=func.now(),
onupdate=func.now(),
nullable=False,
)
class SoftDeleteMixin:
"""소프트 삭제 믹스인"""
is_deleted: Mapped[bool] = mapped_column(default=False)
deleted_at: Mapped[datetime | None] = mapped_column(default=None)
```
---
## 2. 모델 정의
### 2.1 기본 모델
```python
# app/models/user.py
from typing import List, Optional
from sqlalchemy import String, Text, Boolean, Integer, ForeignKey, Index
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.models.base import Base, TimestampMixin
class User(TimestampMixin, Base):
__tablename__ = "user"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True)
# 필수 필드
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
hashed_password: Mapped[str] = mapped_column(String(255))
# 선택 필드 (nullable)
full_name: Mapped[Optional[str]] = mapped_column(String(100))
bio: Mapped[Optional[str]] = mapped_column(Text)
# 기본값이 있는 필드
is_active: Mapped[bool] = mapped_column(default=True)
is_superuser: Mapped[bool] = mapped_column(default=False)
login_count: Mapped[int] = mapped_column(default=0)
# Relationships
posts: Mapped[List["Post"]] = relationship(
"Post",
back_populates="author",
cascade="all, delete-orphan",
lazy="selectin",
)
profile: Mapped[Optional["Profile"]] = relationship(
"Profile",
back_populates="user",
uselist=False,
cascade="all, delete-orphan",
)
# 테이블 설정
__table_args__ = (
Index("idx_user_email_active", "email", "is_active"),
{"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"},
)
def __repr__(self) -> str:
return f"<User(id={self.id}, email='{self.email}')>"
```
### 2.2 관계가 있는 모델들
```python
# app/models/post.py
from typing import List, Optional
from sqlalchemy import String, Text, ForeignKey, Index
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.models.base import Base, TimestampMixin
class Post(TimestampMixin, Base):
__tablename__ = "post"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
view_count: Mapped[int] = mapped_column(default=0)
is_published: Mapped[bool] = mapped_column(default=False)
# Foreign Key
author_id: Mapped[int] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"),
index=True,
)
category_id: Mapped[Optional[int]] = mapped_column(
ForeignKey("category.id", ondelete="SET NULL"),
nullable=True,
)
# Relationships
author: Mapped["User"] = relationship("User", back_populates="posts")
category: Mapped[Optional["Category"]] = relationship("Category", back_populates="posts")
comments: Mapped[List["Comment"]] = relationship(
"Comment",
back_populates="post",
cascade="all, delete-orphan",
order_by="Comment.created_at.desc()",
)
# N:M 관계
tags: Mapped[List["Tag"]] = relationship(
"Tag",
secondary="post_tag",
back_populates="posts",
)
__table_args__ = (
Index("idx_post_author_published", "author_id", "is_published"),
)
class Category(Base):
__tablename__ = "category"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
description: Mapped[Optional[str]] = mapped_column(Text)
posts: Mapped[List["Post"]] = relationship("Post", back_populates="category")
class Tag(Base):
__tablename__ = "tag"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30), unique=True)
posts: Mapped[List["Post"]] = relationship(
"Post",
secondary="post_tag",
back_populates="tags",
)
# N:M 중간 테이블
from sqlalchemy import Table, Column, Integer, ForeignKey
from app.models.base import Base
post_tag = Table(
"post_tag",
Base.metadata,
Column("post_id", Integer, ForeignKey("post.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tag.id", ondelete="CASCADE"), primary_key=True),
)
```
---
## 3. CRUD 기본 작업
### 3.1 Create (생성)
```python
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models.user import User
# ─────────────────────────────────────────────────────
# 단일 레코드 생성
# ─────────────────────────────────────────────────────
def create_user(db: Session, email: str, username: str, password: str) -> User:
user = User(
email=email,
username=username,
hashed_password=password,
)
db.add(user)
db.commit()
db.refresh(user) # DB에서 생성된 값(id, created_at 등) 로드
return user
# ─────────────────────────────────────────────────────
# 여러 레코드 한번에 생성
# ─────────────────────────────────────────────────────
def create_users_bulk(db: Session, users_data: list[dict]) -> list[User]:
users = [User(**data) for data in users_data]
db.add_all(users)
db.commit()
# 각 객체 refresh
for user in users:
db.refresh(user)
return users
# ─────────────────────────────────────────────────────
# 관계와 함께 생성
# ─────────────────────────────────────────────────────
def create_post_with_tags(
db: Session,
title: str,
content: str,
author_id: int,
tag_names: list[str]
) -> Post:
# 기존 태그 조회 또는 생성
tags = []
for name in tag_names:
tag = db.scalar(select(Tag).where(Tag.name == name))
if not tag:
tag = Tag(name=name)
tags.append(tag)
post = Post(
title=title,
content=content,
author_id=author_id,
tags=tags,
)
db.add(post)
db.commit()
db.refresh(post)
return post
# ─────────────────────────────────────────────────────
# Insert ... ON DUPLICATE KEY UPDATE (Upsert)
# ─────────────────────────────────────────────────────
from sqlalchemy.dialects.mysql import insert as mysql_insert
def upsert_user(db: Session, email: str, username: str) -> None:
stmt = mysql_insert(User).values(
email=email,
username=username,
)
stmt = stmt.on_duplicate_key_update(
username=stmt.inserted.username,
updated_at=func.now(),
)
db.execute(stmt)
db.commit()
# PostgreSQL의 경우
from sqlalchemy.dialects.postgresql import insert as pg_insert
def upsert_user_pg(db: Session, email: str, username: str) -> None:
stmt = pg_insert(User).values(email=email, username=username)
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"username": username},
)
db.execute(stmt)
db.commit()
```
### 3.2 Read (조회)
```python
from sqlalchemy import select
from sqlalchemy.orm import Session
# ─────────────────────────────────────────────────────
# Primary Key로 조회
# ─────────────────────────────────────────────────────
def get_user_by_id(db: Session, user_id: int) -> User | None:
return db.get(User, user_id)
# ─────────────────────────────────────────────────────
# 단일 레코드 조회 (조건)
# ─────────────────────────────────────────────────────
def get_user_by_email(db: Session, email: str) -> User | None:
stmt = select(User).where(User.email == email)
return db.scalar(stmt)
# scalar(): 단일 값 반환 (없으면 None)
# scalars(): 여러 값의 ScalarResult 반환
# one(): 정확히 1개 (0개 또는 2개 이상이면 에러)
# one_or_none(): 0개면 None, 1개면 반환, 2개 이상이면 에러
# first(): 첫 번째 결과 (없으면 None)
# ─────────────────────────────────────────────────────
# 여러 레코드 조회
# ─────────────────────────────────────────────────────
def get_all_users(db: Session) -> list[User]:
stmt = select(User)
return list(db.scalars(stmt).all())
def get_active_users(db: Session) -> list[User]:
stmt = select(User).where(User.is_active == True)
return list(db.scalars(stmt).all())
# ─────────────────────────────────────────────────────
# 특정 컬럼만 조회
# ─────────────────────────────────────────────────────
def get_user_emails(db: Session) -> list[str]:
stmt = select(User.email)
return list(db.scalars(stmt).all())
def get_user_summary(db: Session) -> list[tuple]:
stmt = select(User.id, User.email, User.username)
return list(db.execute(stmt).all())
# ─────────────────────────────────────────────────────
# 존재 여부 확인
# ─────────────────────────────────────────────────────
def user_exists(db: Session, email: str) -> bool:
stmt = select(User.id).where(User.email == email)
return db.scalar(stmt) is not None
# 또는 exists() 사용
from sqlalchemy import exists
def user_exists_v2(db: Session, email: str) -> bool:
stmt = select(exists().where(User.email == email))
return db.scalar(stmt)
```
### 3.3 Update (수정)
```python
from sqlalchemy import update
from sqlalchemy.orm import Session
# ─────────────────────────────────────────────────────
# 단일 레코드 수정 (ORM 방식)
# ─────────────────────────────────────────────────────
def update_user(db: Session, user_id: int, **kwargs) -> User | None:
user = db.get(User, user_id)
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
db.commit()
db.refresh(user)
return user
# ─────────────────────────────────────────────────────
# Bulk Update (Core 방식) - 더 효율적
# ─────────────────────────────────────────────────────
def deactivate_users(db: Session, user_ids: list[int]) -> int:
stmt = (
update(User)
.where(User.id.in_(user_ids))
.values(is_active=False)
)
result = db.execute(stmt)
db.commit()
return result.rowcount # 영향받은 행 수
# ─────────────────────────────────────────────────────
# 조건부 Update
# ─────────────────────────────────────────────────────
def increment_login_count(db: Session, user_id: int) -> None:
stmt = (
update(User)
.where(User.id == user_id)
.values(login_count=User.login_count + 1)
)
db.execute(stmt)
db.commit()
# ─────────────────────────────────────────────────────
# CASE를 사용한 조건부 Update
# ─────────────────────────────────────────────────────
from sqlalchemy import case
def update_user_levels(db: Session) -> None:
stmt = (
update(User)
.values(
level=case(
(User.login_count >= 100, "gold"),
(User.login_count >= 50, "silver"),
else_="bronze",
)
)
)
db.execute(stmt)
db.commit()
```
### 3.4 Delete (삭제)
```python
from sqlalchemy import delete
from sqlalchemy.orm import Session
# ─────────────────────────────────────────────────────
# 단일 레코드 삭제 (ORM 방식)
# ─────────────────────────────────────────────────────
def delete_user(db: Session, user_id: int) -> bool:
user = db.get(User, user_id)
if not user:
return False
db.delete(user) # cascade 설정에 따라 관련 데이터도 삭제
db.commit()
return True
# ─────────────────────────────────────────────────────
# Bulk Delete (Core 방식) - 더 효율적
# ─────────────────────────────────────────────────────
def delete_inactive_users(db: Session) -> int:
stmt = delete(User).where(User.is_active == False)
result = db.execute(stmt)
db.commit()
return result.rowcount
# ─────────────────────────────────────────────────────
# Soft Delete
# ─────────────────────────────────────────────────────
from datetime import datetime
def soft_delete_user(db: Session, user_id: int) -> bool:
stmt = (
update(User)
.where(User.id == user_id)
.values(is_deleted=True, deleted_at=datetime.utcnow())
)
result = db.execute(stmt)
db.commit()
return result.rowcount > 0
# ─────────────────────────────────────────────────────
# 관계 데이터 삭제
# ─────────────────────────────────────────────────────
def remove_tag_from_post(db: Session, post_id: int, tag_id: int) -> None:
post = db.get(Post, post_id)
tag = db.get(Tag, tag_id)
if post and tag and tag in post.tags:
post.tags.remove(tag)
db.commit()
```
---
## 4. 조회 쿼리 심화
### 4.1 select() 기본 사용법
```python
from sqlalchemy import select
# ─────────────────────────────────────────────────────
# 전체 모델 조회
# ─────────────────────────────────────────────────────
stmt = select(User)
# SELECT user.id, user.email, user.username, ... FROM user
# ─────────────────────────────────────────────────────
# 특정 컬럼만 조회
# ─────────────────────────────────────────────────────
stmt = select(User.id, User.email)
# SELECT user.id, user.email FROM user
# ─────────────────────────────────────────────────────
# 컬럼 별칭 (alias)
# ─────────────────────────────────────────────────────
stmt = select(User.email.label("user_email"))
# SELECT user.email AS user_email FROM user
# ─────────────────────────────────────────────────────
# DISTINCT
# ─────────────────────────────────────────────────────
stmt = select(User.category_id).distinct()
# SELECT DISTINCT user.category_id FROM user
# ─────────────────────────────────────────────────────
# 여러 테이블에서 조회
# ─────────────────────────────────────────────────────
stmt = select(User, Post).join(Post)
# SELECT user.*, post.* FROM user JOIN post ON ...
stmt = select(User.email, Post.title).join(Post)
# SELECT user.email, post.title FROM user JOIN post ON ...
```
### 4.2 실행 메서드 비교
```python
from sqlalchemy.orm import Session
# ─────────────────────────────────────────────────────
# execute() - Row 객체 반환
# ─────────────────────────────────────────────────────
stmt = select(User.id, User.email)
result = db.execute(stmt)
for row in result:
print(row.id, row.email) # Row 객체
print(row[0], row[1]) # 인덱스 접근
print(row._mapping) # dict-like 접근
# ─────────────────────────────────────────────────────
# scalars() - 첫 번째 컬럼만 반환
# ─────────────────────────────────────────────────────
stmt = select(User)
users = db.scalars(stmt).all() # list[User]
stmt = select(User.email)
emails = db.scalars(stmt).all() # list[str]
# ─────────────────────────────────────────────────────
# scalar() - 단일 값 반환
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.id == 1)
user = db.scalar(stmt) # User | None
stmt = select(func.count(User.id))
count = db.scalar(stmt) # int
# ─────────────────────────────────────────────────────
# 결과 처리 메서드
# ─────────────────────────────────────────────────────
result = db.scalars(stmt)
result.all() # 모든 결과를 리스트로
result.first() # 첫 번째 결과 (없으면 None)
result.one() # 정확히 1개 (아니면 예외)
result.one_or_none()# 0-1개 (2개 이상이면 예외)
result.fetchmany(5) # 5개만 가져오기
result.unique() # 중복 제거 (relationship 로딩 시 필요)
```
---
## 5. 필터링과 조건
### 5.1 기본 비교 연산자
```python
from sqlalchemy import select, and_, or_, not_
# ─────────────────────────────────────────────────────
# 동등 비교
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.email == "test@example.com")
stmt = select(User).where(User.is_active == True)
stmt = select(User).where(User.category_id == None) # IS NULL
# ─────────────────────────────────────────────────────
# 부등 비교
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.age != 30)
stmt = select(User).where(User.age > 18)
stmt = select(User).where(User.age >= 18)
stmt = select(User).where(User.age < 65)
stmt = select(User).where(User.age <= 65)
# ─────────────────────────────────────────────────────
# BETWEEN
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.age.between(18, 65))
# WHERE age BETWEEN 18 AND 65
# ─────────────────────────────────────────────────────
# IN / NOT IN
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.status.in_(["active", "pending"]))
stmt = select(User).where(User.id.in_([1, 2, 3, 4, 5]))
stmt = select(User).where(User.status.not_in(["deleted", "banned"]))
# ─────────────────────────────────────────────────────
# IS NULL / IS NOT NULL
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.deleted_at.is_(None)) # IS NULL
stmt = select(User).where(User.deleted_at.is_not(None)) # IS NOT NULL
stmt = select(User).where(User.bio.isnot(None)) # 또 다른 방법
```
### 5.2 문자열 연산
```python
# ─────────────────────────────────────────────────────
# LIKE / ILIKE
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.email.like("%@gmail.com"))
stmt = select(User).where(User.username.like("john%"))
stmt = select(User).where(User.name.like("%홍길%"))
# 대소문자 무시 (PostgreSQL)
stmt = select(User).where(User.email.ilike("%@GMAIL.COM"))
# ─────────────────────────────────────────────────────
# CONTAINS (LIKE '%value%')
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.bio.contains("python"))
# WHERE bio LIKE '%python%'
# ─────────────────────────────────────────────────────
# STARTSWITH / ENDSWITH
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.email.startswith("admin"))
# WHERE email LIKE 'admin%'
stmt = select(User).where(User.email.endswith("@company.com"))
# WHERE email LIKE '%@company.com'
# ─────────────────────────────────────────────────────
# 정규표현식 (DB 지원 필요)
# ─────────────────────────────────────────────────────
stmt = select(User).where(User.email.regexp_match(r"^[a-z]+@"))
```
### 5.3 논리 연산자
```python
from sqlalchemy import and_, or_, not_
# ─────────────────────────────────────────────────────
# AND
# ─────────────────────────────────────────────────────
# 방법 1: and_() 함수
stmt = select(User).where(
and_(
User.is_active == True,
User.age >= 18,
)
)
# 방법 2: where() 체이닝 (암묵적 AND)
stmt = select(User).where(User.is_active == True).where(User.age >= 18)
# 방법 3: 콤마로 구분
stmt = select(User).where(User.is_active == True, User.age >= 18)
# ─────────────────────────────────────────────────────
# OR
# ─────────────────────────────────────────────────────
stmt = select(User).where(
or_(
User.role == "admin",
User.role == "moderator",
)
)
# ─────────────────────────────────────────────────────
# NOT
# ─────────────────────────────────────────────────────
stmt = select(User).where(not_(User.is_deleted == True))
stmt = select(User).where(~(User.is_deleted == True)) # ~ 연산자
# ─────────────────────────────────────────────────────
# 복합 조건
# ─────────────────────────────────────────────────────
stmt = select(User).where(
and_(
User.is_active == True,
or_(
User.role == "admin",
User.age >= 21,
),
not_(User.is_deleted == True),
)
)
# WHERE is_active = true
# AND (role = 'admin' OR age >= 21)
# AND NOT is_deleted = true
```
### 5.4 동적 필터링
```python
from typing import Optional
def search_users(
db: Session,
email: Optional[str] = None,
username: Optional[str] = None,
is_active: Optional[bool] = None,
min_age: Optional[int] = None,
max_age: Optional[int] = None,
) -> list[User]:
stmt = select(User)
# 동적으로 조건 추가
if email:
stmt = stmt.where(User.email.contains(email))
if username:
stmt = stmt.where(User.username.like(f"%{username}%"))
if is_active is not None:
stmt = stmt.where(User.is_active == is_active)
if min_age is not None:
stmt = stmt.where(User.age >= min_age)
if max_age is not None:
stmt = stmt.where(User.age <= max_age)
return list(db.scalars(stmt).all())
# 사용
users = search_users(db, email="gmail", is_active=True, min_age=18)
```
---
## 6. 정렬, 페이징, 제한
### 6.1 정렬 (ORDER BY)
```python
from sqlalchemy import select, asc, desc
# ─────────────────────────────────────────────────────
# 기본 정렬
# ─────────────────────────────────────────────────────
# 오름차순 (기본)
stmt = select(User).order_by(User.created_at)
stmt = select(User).order_by(asc(User.created_at))
# 내림차순
stmt = select(User).order_by(desc(User.created_at))
stmt = select(User).order_by(User.created_at.desc())
# ─────────────────────────────────────────────────────
# 다중 정렬
# ─────────────────────────────────────────────────────
stmt = select(User).order_by(
User.is_active.desc(),
User.created_at.desc(),
)
# ORDER BY is_active DESC, created_at DESC
# ─────────────────────────────────────────────────────
# NULL 처리
# ─────────────────────────────────────────────────────
stmt = select(User).order_by(User.last_login.desc().nullslast())
# NULL 값을 마지막에
stmt = select(User).order_by(User.last_login.asc().nullsfirst())
# NULL 값을 처음에
```
### 6.2 제한과 오프셋 (LIMIT, OFFSET)
```python
# ─────────────────────────────────────────────────────
# LIMIT
# ─────────────────────────────────────────────────────
stmt = select(User).limit(10)
# SELECT ... FROM user LIMIT 10
# ─────────────────────────────────────────────────────
# OFFSET
# ─────────────────────────────────────────────────────
stmt = select(User).offset(20).limit(10)
# SELECT ... FROM user LIMIT 10 OFFSET 20
# ─────────────────────────────────────────────────────
# 슬라이스 문법
# ─────────────────────────────────────────────────────
stmt = select(User).slice(20, 30) # offset=20, limit=10과 동일
```
### 6.3 페이지네이션 구현
```python
from typing import TypeVar, Generic
from pydantic import BaseModel
from sqlalchemy import select, func
from sqlalchemy.orm import Session
T = TypeVar("T")
class PaginatedResult(BaseModel, Generic[T]):
items: list[T]
total: int
page: int
page_size: int
total_pages: int
has_next: bool
has_prev: bool
def paginate(
db: Session,
stmt,
page: int = 1,
page_size: int = 20,
) -> dict:
# 페이지 유효성 검사
page = max(1, page)
page_size = min(max(1, page_size), 100) # 최대 100개
# 전체 개수 조회
count_stmt = select(func.count()).select_from(stmt.subquery())
total = db.scalar(count_stmt)
# 페이지네이션 적용
offset = (page - 1) * page_size
paginated_stmt = stmt.offset(offset).limit(page_size)
items = list(db.scalars(paginated_stmt).all())
total_pages = (total + page_size - 1) // page_size
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": total_pages,
"has_next": page < total_pages,
"has_prev": page > 1,
}
# 사용 예시
def get_users_paginated(
db: Session,
page: int = 1,
page_size: int = 20,
is_active: bool | None = None,
) -> dict:
stmt = select(User).order_by(User.created_at.desc())
if is_active is not None:
stmt = stmt.where(User.is_active == is_active)
return paginate(db, stmt, page, page_size)
# FastAPI 엔드포인트
@router.get("/users")
def list_users(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
is_active: bool | None = None,
db: Session = Depends(get_db),
):
return get_users_paginated(db, page, page_size, is_active)
```
### 6.4 커서 기반 페이지네이션
```python
from datetime import datetime
from typing import Optional
def get_users_cursor(
db: Session,
limit: int = 20,
cursor: Optional[datetime] = None,
) -> dict:
stmt = select(User).order_by(User.created_at.desc())
if cursor:
stmt = stmt.where(User.created_at < cursor)
stmt = stmt.limit(limit + 1) # 다음 페이지 확인용으로 1개 더
items = list(db.scalars(stmt).all())
has_next = len(items) > limit
if has_next:
items = items[:limit]
next_cursor = items[-1].created_at if items and has_next else None
return {
"items": items,
"next_cursor": next_cursor,
"has_next": has_next,
}
# 복합 커서 (동일 시간 처리)
def get_users_cursor_v2(
db: Session,
limit: int = 20,
cursor_time: Optional[datetime] = None,
cursor_id: Optional[int] = None,
) -> dict:
stmt = select(User).order_by(
User.created_at.desc(),
User.id.desc(),
)
if cursor_time and cursor_id:
stmt = stmt.where(
or_(
User.created_at < cursor_time,
and_(
User.created_at == cursor_time,
User.id < cursor_id,
),
)
)
stmt = stmt.limit(limit + 1)
items = list(db.scalars(stmt).all())
has_next = len(items) > limit
if has_next:
items = items[:limit]
return {
"items": items,
"next_cursor": {
"time": items[-1].created_at,
"id": items[-1].id,
} if items and has_next else None,
"has_next": has_next,
}
```
---
## 7. 집계 함수 (Aggregation)
### 7.1 기본 집계 함수
```python
from sqlalchemy import select, func
# ─────────────────────────────────────────────────────
# COUNT
# ─────────────────────────────────────────────────────
# 전체 개수
stmt = select(func.count(User.id))
total = db.scalar(stmt)
# 조건부 개수
stmt = select(func.count(User.id)).where(User.is_active == True)
active_count = db.scalar(stmt)
# COUNT(DISTINCT)
stmt = select(func.count(func.distinct(User.category_id)))
unique_categories = db.scalar(stmt)
# ─────────────────────────────────────────────────────
# SUM
# ─────────────────────────────────────────────────────
stmt = select(func.sum(Order.amount))
total_amount = db.scalar(stmt)
stmt = select(func.sum(Order.amount)).where(Order.status == "completed")
completed_amount = db.scalar(stmt)
# ─────────────────────────────────────────────────────
# AVG
# ─────────────────────────────────────────────────────
stmt = select(func.avg(Product.price))
avg_price = db.scalar(stmt)
# 반올림
stmt = select(func.round(func.avg(Product.price), 2))
avg_price_rounded = db.scalar(stmt)
# ─────────────────────────────────────────────────────
# MIN / MAX
# ─────────────────────────────────────────────────────
stmt = select(func.min(Product.price), func.max(Product.price))
result = db.execute(stmt).first()
min_price, max_price = result
# ─────────────────────────────────────────────────────
# 여러 집계 함수 함께 사용
# ─────────────────────────────────────────────────────
stmt = select(
func.count(Order.id).label("order_count"),
func.sum(Order.amount).label("total_amount"),
func.avg(Order.amount).label("avg_amount"),
func.min(Order.amount).label("min_amount"),
func.max(Order.amount).label("max_amount"),
)
result = db.execute(stmt).first()
print(f"주문 수: {result.order_count}")
print(f"총 금액: {result.total_amount}")
print(f"평균 금액: {result.avg_amount}")
```
### 7.2 GROUP BY
```python
# ─────────────────────────────────────────────────────
# 기본 GROUP BY
# ─────────────────────────────────────────────────────
# 카테고리별 상품 수
stmt = (
select(
Product.category_id,
func.count(Product.id).label("product_count"),
)
.group_by(Product.category_id)
)
results = db.execute(stmt).all()
for row in results:
print(f"Category {row.category_id}: {row.product_count} products")
# ─────────────────────────────────────────────────────
# JOIN과 함께 GROUP BY
# ─────────────────────────────────────────────────────
# 카테고리 이름과 함께
stmt = (
select(
Category.name,
func.count(Product.id).label("product_count"),
func.avg(Product.price).label("avg_price"),
)
.join(Product, Category.id == Product.category_id)
.group_by(Category.id, Category.name)
)
# ─────────────────────────────────────────────────────
# 날짜별 GROUP BY
# ─────────────────────────────────────────────────────
# 일별 주문 통계
stmt = (
select(
func.date(Order.created_at).label("order_date"),
func.count(Order.id).label("order_count"),
func.sum(Order.amount).label("total_amount"),
)
.group_by(func.date(Order.created_at))
.order_by(func.date(Order.created_at).desc())
)
# 월별 주문 통계
stmt = (
select(
func.year(Order.created_at).label("year"),
func.month(Order.created_at).label("month"),
func.count(Order.id).label("order_count"),
func.sum(Order.amount).label("total_amount"),
)
.group_by(
func.year(Order.created_at),
func.month(Order.created_at),
)
.order_by(
func.year(Order.created_at).desc(),
func.month(Order.created_at).desc(),
)
)
# ─────────────────────────────────────────────────────
# 다중 컬럼 GROUP BY
# ─────────────────────────────────────────────────────
stmt = (
select(
User.country,
User.city,
func.count(User.id).label("user_count"),
)
.group_by(User.country, User.city)
.order_by(func.count(User.id).desc())
)
```
### 7.3 HAVING
```python
# ─────────────────────────────────────────────────────
# 집계 결과 필터링
# ─────────────────────────────────────────────────────
# 5개 이상 상품이 있는 카테고리만
stmt = (
select(
Category.name,
func.count(Product.id).label("product_count"),
)
.join(Product)
.group_by(Category.id, Category.name)
.having(func.count(Product.id) >= 5)
)
# ─────────────────────────────────────────────────────
# 복합 HAVING 조건
# ─────────────────────────────────────────────────────
stmt = (
select(
User.id,
User.username,
func.count(Order.id).label("order_count"),
func.sum(Order.amount).label("total_spent"),
)
.join(Order)
.group_by(User.id, User.username)
.having(
and_(
func.count(Order.id) >= 10,
func.sum(Order.amount) >= 100000,
)
)
.order_by(func.sum(Order.amount).desc())
)
# 10건 이상 주문하고 10만원 이상 사용한 고객
```
### 7.4 윈도우 함수
```python
from sqlalchemy import over
# ─────────────────────────────────────────────────────
# ROW_NUMBER
# ─────────────────────────────────────────────────────
stmt = (
select(
User.id,
User.username,
User.created_at,
func.row_number().over(
order_by=User.created_at.desc()
).label("row_num"),
)
)
# ─────────────────────────────────────────────────────
# 파티션별 순위
# ─────────────────────────────────────────────────────
stmt = (
select(
Product.id,
Product.name,
Product.category_id,
Product.price,
func.rank().over(
partition_by=Product.category_id,
order_by=Product.price.desc(),
).label("price_rank"),
)
)
# 카테고리 내에서 가격 순위
# ─────────────────────────────────────────────────────
# 누적 합계
# ─────────────────────────────────────────────────────
stmt = (
select(
Order.id,
Order.created_at,
Order.amount,
func.sum(Order.amount).over(
order_by=Order.created_at,
).label("cumulative_amount"),
)
)
# ─────────────────────────────────────────────────────
# 이동 평균
# ─────────────────────────────────────────────────────
from sqlalchemy import text
stmt = (
select(
Order.id,
Order.created_at,
Order.amount,
func.avg(Order.amount).over(
order_by=Order.created_at,
rows=(2, 0), # 현재 행 포함 최근 3개
).label("moving_avg"),
)
)
```
---
## 8. JOIN 쿼리
### 8.1 기본 JOIN
```python
from sqlalchemy import select
# ─────────────────────────────────────────────────────
# INNER JOIN
# ─────────────────────────────────────────────────────
# 방법 1: relationship 사용 (자동 조건)
stmt = select(User, Post).join(User.posts)
# SELECT ... FROM user JOIN post ON user.id = post.author_id
# 방법 2: 명시적 조건
stmt = select(User, Post).join(Post, User.id == Post.author_id)
# 방법 3: 모델만 지정 (FK 자동 감지)
stmt = select(User, Post).join(Post)
# ─────────────────────────────────────────────────────
# LEFT OUTER JOIN
# ─────────────────────────────────────────────────────
stmt = select(User, Post).join(Post, isouter=True)
# SELECT ... FROM user LEFT JOIN post ON ...
# 또는
stmt = select(User, Post).outerjoin(Post)
# ─────────────────────────────────────────────────────
# RIGHT OUTER JOIN
# ─────────────────────────────────────────────────────
stmt = select(User, Post).join(Post, full=True) # FULL OUTER
# RIGHT JOIN은 순서를 바꿔서 구현
stmt = select(Post, User).outerjoin(User)
# ─────────────────────────────────────────────────────
# 다중 JOIN
# ─────────────────────────────────────────────────────
stmt = (
select(User, Post, Comment)
.join(Post, User.id == Post.author_id)
.join(Comment, Post.id == Comment.post_id)
)
# relationship 사용
stmt = (
select(User, Post, Comment)
.join(User.posts)
.join(Post.comments)
)
```
### 8.2 JOIN 결과 처리
```python
# ─────────────────────────────────────────────────────
# 여러 모델 조회
# ─────────────────────────────────────────────────────
stmt = select(User, Post).join(Post)
results = db.execute(stmt).all()
for user, post in results:
print(f"Author: {user.username}, Post: {post.title}")
# ─────────────────────────────────────────────────────
# 특정 컬럼만 조회
# ─────────────────────────────────────────────────────
stmt = (
select(User.username, Post.title, Post.created_at)
.join(Post)
.order_by(Post.created_at.desc())
)
results = db.execute(stmt).all()
for row in results:
print(f"{row.username}: {row.title}")
# ─────────────────────────────────────────────────────
# 하나의 모델만 필요할 때
# ─────────────────────────────────────────────────────
# 게시글이 있는 사용자만 조회
stmt = (
select(User)
.join(Post)
.distinct()
)
users = db.scalars(stmt).all()
```
### 8.3 Self JOIN
```python
# ─────────────────────────────────────────────────────
# 같은 테이블 조인 (별칭 필요)
# ─────────────────────────────────────────────────────
from sqlalchemy.orm import aliased
# 사용자와 그 추천인
Referrer = aliased(User, name="referrer")
stmt = (
select(User.username, Referrer.username.label("referrer_name"))
.join(Referrer, User.referrer_id == Referrer.id, isouter=True)
)
# ─────────────────────────────────────────────────────
# 계층 구조 조회 (상위/하위)
# ─────────────────────────────────────────────────────
Parent = aliased(Category, name="parent")
stmt = (
select(Category.name, Parent.name.label("parent_name"))
.join(Parent, Category.parent_id == Parent.id, isouter=True)
)
# ─────────────────────────────────────────────────────
# 같은 테이블 비교
# ─────────────────────────────────────────────────────
# 같은 카테고리의 다른 상품
OtherProduct = aliased(Product, name="other")
stmt = (
select(Product.name, OtherProduct.name.label("related_product"))
.join(
OtherProduct,
and_(
Product.category_id == OtherProduct.category_id,
Product.id != OtherProduct.id,
)
)
.where(Product.id == 1)
)
```
---
## 9. 서브쿼리 (Subquery)
### 9.1 스칼라 서브쿼리
```python
from sqlalchemy import select, func
# ─────────────────────────────────────────────────────
# SELECT 절의 서브쿼리
# ─────────────────────────────────────────────────────
# 각 사용자의 게시글 수
post_count_subq = (
select(func.count(Post.id))
.where(Post.author_id == User.id)
.correlate(User)
.scalar_subquery()
)
stmt = select(
User.username,
post_count_subq.label("post_count"),
)
# ─────────────────────────────────────────────────────
# 평균과 비교
# ─────────────────────────────────────────────────────
avg_price = select(func.avg(Product.price)).scalar_subquery()
stmt = select(Product).where(Product.price > avg_price)
# 평균보다 비싼 상품
```
### 9.2 FROM 절의 서브쿼리
```python
# ─────────────────────────────────────────────────────
# 서브쿼리를 테이블처럼 사용
# ─────────────────────────────────────────────────────
# 카테고리별 통계 서브쿼리
category_stats = (
select(
Product.category_id,
func.count(Product.id).label("product_count"),
func.avg(Product.price).label("avg_price"),
)
.group_by(Product.category_id)
.subquery()
)
# 메인 쿼리에서 조인
stmt = (
select(
Category.name,
category_stats.c.product_count,
category_stats.c.avg_price,
)
.join(category_stats, Category.id == category_stats.c.category_id)
)
# ─────────────────────────────────────────────────────
# 파생 테이블
# ─────────────────────────────────────────────────────
# 최근 7일 활성 사용자
recent_active = (
select(
User.id,
User.username,
func.count(Order.id).label("order_count"),
)
.join(Order)
.where(Order.created_at >= func.date_sub(func.now(), text("INTERVAL 7 DAY")))
.group_by(User.id)
.subquery()
)
# 많이 주문한 순으로
stmt = (
select(recent_active)
.order_by(recent_active.c.order_count.desc())
.limit(10)
)
```
### 9.3 WHERE 절의 서브쿼리
```python
from sqlalchemy import exists, any_, all_
# ─────────────────────────────────────────────────────
# IN 서브쿼리
# ─────────────────────────────────────────────────────
# 주문한 적 있는 사용자
ordered_user_ids = select(Order.user_id).distinct()
stmt = select(User).where(User.id.in_(ordered_user_ids))
# ─────────────────────────────────────────────────────
# EXISTS
# ─────────────────────────────────────────────────────
# 게시글이 있는 사용자
has_posts = (
exists()
.where(Post.author_id == User.id)
)
stmt = select(User).where(has_posts)
# NOT EXISTS
stmt = select(User).where(~has_posts) # 게시글 없는 사용자
# ─────────────────────────────────────────────────────
# ANY / ALL
# ─────────────────────────────────────────────────────
# 어떤 주문보다 비싼 상품 (ANY)
any_order_amount = select(Order.amount)
stmt = select(Product).where(Product.price > any_(any_order_amount))
# 모든 주문보다 비싼 상품 (ALL)
stmt = select(Product).where(Product.price > all_(any_order_amount))
```
### 9.4 Lateral 서브쿼리 (PostgreSQL)
```python
from sqlalchemy import lateral
# 각 사용자의 최근 3개 주문
recent_orders = (
select(Order)
.where(Order.user_id == User.id)
.order_by(Order.created_at.desc())
.limit(3)
.lateral()
)
stmt = (
select(User, recent_orders)
.outerjoin(recent_orders, True)
)
```
---
## 10. 집합 연산 (Union, Intersect, Except)
### 10.1 UNION
```python
from sqlalchemy import union, union_all
# ─────────────────────────────────────────────────────
# UNION (중복 제거)
# ─────────────────────────────────────────────────────
# 관리자 + 최근 활동 사용자
admins = select(User.id, User.username).where(User.is_admin == True)
recent_active = select(User.id, User.username).where(
User.last_login >= func.date_sub(func.now(), text("INTERVAL 7 DAY"))
)
stmt = union(admins, recent_active)
results = db.execute(stmt).all()
# ─────────────────────────────────────────────────────
# UNION ALL (중복 허용, 더 빠름)
# ─────────────────────────────────────────────────────
stmt = union_all(admins, recent_active)
# ─────────────────────────────────────────────────────
# 여러 개 UNION
# ─────────────────────────────────────────────────────
query1 = select(User.email.label("contact")).where(User.is_active == True)
query2 = select(Contact.email.label("contact")).where(Contact.subscribed == True)
query3 = select(Lead.email.label("contact")).where(Lead.status == "qualified")
stmt = union(query1, query2, query3)
# ─────────────────────────────────────────────────────
# UNION 결과 정렬/제한
# ─────────────────────────────────────────────────────
combined = union(admins, recent_active).subquery()
stmt = (
select(combined)
.order_by(combined.c.username)
.limit(10)
)
```
### 10.2 INTERSECT
```python
from sqlalchemy import intersect, intersect_all
# ─────────────────────────────────────────────────────
# INTERSECT (교집합)
# ─────────────────────────────────────────────────────
# 관리자이면서 최근 활동한 사용자
admins = select(User.id).where(User.is_admin == True)
recent_active = select(User.id).where(
User.last_login >= func.date_sub(func.now(), text("INTERVAL 7 DAY"))
)
stmt = intersect(admins, recent_active)
# ─────────────────────────────────────────────────────
# 여러 조건의 교집합
# ─────────────────────────────────────────────────────
# 세 가지 모두 만족하는 사용자
has_orders = select(Order.user_id).distinct()
has_reviews = select(Review.user_id).distinct()
is_verified = select(User.id).where(User.is_verified == True)
stmt = intersect(has_orders, has_reviews, is_verified)
```
### 10.3 EXCEPT
```python
from sqlalchemy import except_, except_all
# ─────────────────────────────────────────────────────
# EXCEPT (차집합)
# ─────────────────────────────────────────────────────
# 주문은 했지만 리뷰는 안 쓴 사용자
ordered = select(Order.user_id).distinct()
reviewed = select(Review.user_id).distinct()
stmt = except_(ordered, reviewed)
# ─────────────────────────────────────────────────────
# 실용 예: 미처리 항목 찾기
# ─────────────────────────────────────────────────────
# 모든 신규 사용자 중 환영 이메일 미발송 대상
all_new_users = select(User.id).where(
User.created_at >= func.date_sub(func.now(), text("INTERVAL 24 HOUR"))
)
email_sent = select(EmailLog.user_id).where(EmailLog.type == "welcome")
stmt = except_(all_new_users, email_sent)
```
---
## 11. 고급 표현식
### 11.1 CASE 문
```python
from sqlalchemy import case
# ─────────────────────────────────────────────────────
# 단순 CASE
# ─────────────────────────────────────────────────────
status_label = case(
(User.status == "active", "활성"),
(User.status == "pending", "대기"),
(User.status == "suspended", "정지"),
else_="알 수 없음",
)
stmt = select(User.username, status_label.label("status_label"))
# ─────────────────────────────────────────────────────
# 조건부 집계
# ─────────────────────────────────────────────────────
stmt = select(
func.count(case((Order.status == "completed", 1))).label("completed_count"),
func.count(case((Order.status == "pending", 1))).label("pending_count"),
func.count(case((Order.status == "cancelled", 1))).label("cancelled_count"),
func.sum(case((Order.status == "completed", Order.amount), else_=0)).label("completed_amount"),
)
# ─────────────────────────────────────────────────────
# 정렬에서 CASE
# ─────────────────────────────────────────────────────
priority_order = case(
(User.role == "admin", 1),
(User.role == "moderator", 2),
else_=3,
)
stmt = select(User).order_by(priority_order, User.username)
```
### 11.2 형변환 (CAST)
```python
from sqlalchemy import cast
from sqlalchemy.types import String, Integer, Float, Date
# ─────────────────────────────────────────────────────
# 타입 변환
# ─────────────────────────────────────────────────────
stmt = select(cast(User.age, String).label("age_str"))
stmt = select(cast(Product.price, Integer).label("price_int"))
stmt = select(cast(Order.created_at, Date).label("order_date"))
# ─────────────────────────────────────────────────────
# 문자열 연결 시 형변환
# ─────────────────────────────────────────────────────
stmt = select(
(User.username + " (" + cast(User.age, String) + "세)").label("display_name")
)
```
### 11.3 문자열 함수
```python
from sqlalchemy import func
# ─────────────────────────────────────────────────────
# 기본 문자열 함수
# ─────────────────────────────────────────────────────
stmt = select(
func.upper(User.username),
func.lower(User.email),
func.length(User.bio),
func.trim(User.name),
func.concat(User.first_name, " ", User.last_name).label("full_name"),
)
# ─────────────────────────────────────────────────────
# 문자열 추출
# ─────────────────────────────────────────────────────
stmt = select(
func.substring(User.email, 1, 10), # 처음 10자
func.left(User.username, 5),
func.right(User.email, 10),
)
# ─────────────────────────────────────────────────────
# 문자열 연결 (|| 연산자)
# ─────────────────────────────────────────────────────
from sqlalchemy import literal_column
# PostgreSQL/SQLite
full_name = User.first_name + " " + User.last_name
# MySQL
full_name = func.concat(User.first_name, " ", User.last_name)
```
### 11.4 날짜/시간 함수
```python
from sqlalchemy import func, extract
# ─────────────────────────────────────────────────────
# 현재 날짜/시간
# ─────────────────────────────────────────────────────
stmt = select(
func.now(),
func.current_date(),
func.current_time(),
func.current_timestamp(),
)
# ─────────────────────────────────────────────────────
# 날짜 추출
# ─────────────────────────────────────────────────────
stmt = select(
extract("year", Order.created_at).label("year"),
extract("month", Order.created_at).label("month"),
extract("day", Order.created_at).label("day"),
extract("hour", Order.created_at).label("hour"),
)
# 또는
stmt = select(
func.year(Order.created_at),
func.month(Order.created_at),
func.day(Order.created_at),
)
# ─────────────────────────────────────────────────────
# 날짜 연산
# ─────────────────────────────────────────────────────
# MySQL
from sqlalchemy import text
stmt = select(User).where(
User.created_at >= func.date_sub(func.now(), text("INTERVAL 30 DAY"))
)
# PostgreSQL
from datetime import timedelta
stmt = select(User).where(
User.created_at >= func.now() - timedelta(days=30)
)
# ─────────────────────────────────────────────────────
# 날짜 차이
# ─────────────────────────────────────────────────────
stmt = select(
func.datediff(func.now(), User.created_at).label("days_since_signup")
)
```
### 11.5 NULL 처리
```python
from sqlalchemy import func, coalesce, nullif
# ─────────────────────────────────────────────────────
# COALESCE (첫 번째 non-null 값)
# ─────────────────────────────────────────────────────
stmt = select(
coalesce(User.nickname, User.username, "Anonymous").label("display_name")
)
# ─────────────────────────────────────────────────────
# NULLIF (같으면 NULL)
# ─────────────────────────────────────────────────────
# 0으로 나누기 방지
stmt = select(
Order.total / nullif(Order.quantity, 0)
)
# ─────────────────────────────────────────────────────
# IFNULL / NVL (MySQL / Oracle)
# ─────────────────────────────────────────────────────
stmt = select(
func.ifnull(User.nickname, "No nickname")
)
```
### 11.6 Raw SQL 사용
```python
from sqlalchemy import text, literal_column
# ─────────────────────────────────────────────────────
# text() - Raw SQL
# ─────────────────────────────────────────────────────
# WHERE 절에서
stmt = select(User).where(text("MATCH(bio) AGAINST(:keyword)")).params(keyword="python")
# 전체 쿼리
result = db.execute(text("SELECT * FROM user WHERE id = :id"), {"id": 1})
# ─────────────────────────────────────────────────────
# literal_column() - 컬럼 표현식
# ─────────────────────────────────────────────────────
stmt = select(
User.username,
literal_column("'active'").label("status"),
)
# ─────────────────────────────────────────────────────
# literal() - 리터럴 값
# ─────────────────────────────────────────────────────
from sqlalchemy import literal
stmt = select(
User.username,
literal(1).label("constant"),
literal("active").label("status"),
)
```
---
## 12. Relationship과 Eager Loading
### 12.1 Lazy Loading (기본)
```python
# ─────────────────────────────────────────────────────
# N+1 문제 발생 예시
# ─────────────────────────────────────────────────────
users = db.scalars(select(User)).all() # 쿼리 1회
for user in users:
print(user.posts) # 각 사용자마다 추가 쿼리! (N회)
# 총 N+1회 쿼리 발생
```
### 12.2 Eager Loading 옵션
```python
from sqlalchemy.orm import selectinload, joinedload, subqueryload, raiseload
# ─────────────────────────────────────────────────────
# selectinload (권장: 1:N)
# ─────────────────────────────────────────────────────
stmt = select(User).options(selectinload(User.posts))
users = db.scalars(stmt).all()
# 쿼리 1: SELECT * FROM user
# 쿼리 2: SELECT * FROM post WHERE user_id IN (1, 2, 3, ...)
for user in users:
print(user.posts) # 추가 쿼리 없음!
# ─────────────────────────────────────────────────────
# joinedload (권장: N:1, 1:1)
# ─────────────────────────────────────────────────────
stmt = select(Post).options(joinedload(Post.author))
posts = db.scalars(stmt).unique().all() # unique() 필요!
# 쿼리: SELECT post.*, user.* FROM post JOIN user ON ...
# ─────────────────────────────────────────────────────
# subqueryload
# ─────────────────────────────────────────────────────
stmt = select(User).options(subqueryload(User.posts))
# 쿼리 1: SELECT * FROM user
# 쿼리 2: SELECT * FROM post WHERE user_id IN (SELECT id FROM user)
# ─────────────────────────────────────────────────────
# raiseload (로딩 금지)
# ─────────────────────────────────────────────────────
stmt = select(User).options(raiseload(User.posts))
user = db.scalar(stmt)
print(user.posts) # 에러 발생! 명시적 로딩 필요
```
### 12.3 중첩 Eager Loading
```python
# ─────────────────────────────────────────────────────
# 다단계 관계 로딩
# ─────────────────────────────────────────────────────
stmt = (
select(User)
.options(
selectinload(User.posts)
.selectinload(Post.comments)
.selectinload(Comment.author)
)
)
# 쿼리 1: SELECT * FROM user
# 쿼리 2: SELECT * FROM post WHERE user_id IN (...)
# 쿼리 3: SELECT * FROM comment WHERE post_id IN (...)
# 쿼리 4: SELECT * FROM user WHERE id IN (...) # 댓글 작성자
# ─────────────────────────────────────────────────────
# 여러 관계 동시 로딩
# ─────────────────────────────────────────────────────
stmt = (
select(User)
.options(
selectinload(User.posts),
selectinload(User.comments),
joinedload(User.profile),
)
)
# ─────────────────────────────────────────────────────
# contains_eager (이미 조인된 경우)
# ─────────────────────────────────────────────────────
from sqlalchemy.orm import contains_eager
stmt = (
select(Post)
.join(Post.author)
.where(User.is_active == True)
.options(contains_eager(Post.author)) # 조인 결과 사용
)
```
### 12.4 로딩 전략 비교
| 전략 | 쿼리 수 | 적합한 관계 | 장점 | 단점 |
|------|--------|------------|------|------|
| `selectinload` | 2 | 1:N | 효율적, 간단 | 대량 ID 시 IN 절 길어짐 |
| `joinedload` | 1 | N:1, 1:1 | 단일 쿼리 | 1:N에서 중복 발생 |
| `subqueryload` | 2 | 1:N | 복잡한 필터 지원 | 서브쿼리 오버헤드 |
| `raiseload` | - | - | 실수 방지 | 명시적 로딩 필요 |
---
## 13. 트랜잭션 관리
### 13.1 기본 트랜잭션
```python
from sqlalchemy.orm import Session
# ─────────────────────────────────────────────────────
# 명시적 commit/rollback
# ─────────────────────────────────────────────────────
def create_order(db: Session, user_id: int, items: list) -> Order:
try:
order = Order(user_id=user_id)
db.add(order)
db.flush() # ID 생성을 위해 flush
for item in items:
order_item = OrderItem(
order_id=order.id,
product_id=item["product_id"],
quantity=item["quantity"],
)
db.add(order_item)
db.commit()
db.refresh(order)
return order
except Exception as e:
db.rollback()
raise
# ─────────────────────────────────────────────────────
# context manager 사용
# ─────────────────────────────────────────────────────
from sqlalchemy.orm import Session
def transfer_money(engine, from_id: int, to_id: int, amount: float):
with Session(engine) as session:
with session.begin(): # 자동 commit/rollback
from_account = session.get(Account, from_id)
to_account = session.get(Account, to_id)
if from_account.balance < amount:
raise ValueError("잔액 부족")
from_account.balance -= amount
to_account.balance += amount
# begin() 블록 종료 시 자동 commit
# Session 종료 시 자동 close
```
### 13.2 Savepoint (중첩 트랜잭션)
```python
def complex_operation(db: Session):
try:
# 메인 작업
user = User(username="test")
db.add(user)
db.flush()
# 선택적 작업 (실패해도 메인은 유지)
savepoint = db.begin_nested()
try:
risky_operation()
savepoint.commit()
except Exception:
savepoint.rollback()
# 메인 트랜잭션은 유지됨
db.commit()
except Exception:
db.rollback()
raise
```
### 13.3 FastAPI에서 트랜잭션
```python
# app/database.py
from contextlib import contextmanager
def get_db() -> Generator[Session, None, None]:
db = SessionLocal()
try:
yield db
except Exception:
db.rollback()
raise
finally:
db.close()
# 트랜잭션이 필요한 서비스
class OrderService:
def __init__(self, db: Session):
self.db = db
def create_order_with_payment(self, data: OrderCreate) -> Order:
# 여러 작업이 하나의 트랜잭션
order = self._create_order(data)
self._process_payment(order)
self._update_inventory(order)
self._send_notification(order)
self.db.commit()
return order
# 라우터
@router.post("/orders")
def create_order(
data: OrderCreate,
db: Session = Depends(get_db),
):
service = OrderService(db)
return service.create_order_with_payment(data)
```
---
## 14. FastAPI 통합 패턴
### 14.1 Repository 패턴
```python
# app/repositories/base.py
from typing import TypeVar, Generic, Type, Optional, List
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models.base import Base
ModelType = TypeVar("ModelType", bound=Base)
class BaseRepository(Generic[ModelType]):
def __init__(self, model: Type[ModelType], db: Session):
self.model = model
self.db = db
def get(self, id: int) -> Optional[ModelType]:
return self.db.get(self.model, id)
def get_all(self, skip: int = 0, limit: int = 100) -> List[ModelType]:
stmt = select(self.model).offset(skip).limit(limit)
return list(self.db.scalars(stmt).all())
def create(self, obj: ModelType) -> ModelType:
self.db.add(obj)
self.db.commit()
self.db.refresh(obj)
return obj
def update(self, obj: ModelType) -> ModelType:
self.db.commit()
self.db.refresh(obj)
return obj
def delete(self, obj: ModelType) -> None:
self.db.delete(obj)
self.db.commit()
# app/repositories/user.py
from sqlalchemy import select
from app.models.user import User
class UserRepository(BaseRepository[User]):
def __init__(self, db: Session):
super().__init__(User, db)
def get_by_email(self, email: str) -> Optional[User]:
stmt = select(User).where(User.email == email)
return self.db.scalar(stmt)
def get_active_users(self) -> List[User]:
stmt = select(User).where(User.is_active == True)
return list(self.db.scalars(stmt).all())
def search(self, keyword: str) -> List[User]:
stmt = select(User).where(
or_(
User.username.contains(keyword),
User.email.contains(keyword),
)
)
return list(self.db.scalars(stmt).all())
```
### 14.2 Service 패턴
```python
# app/services/user.py
from typing import Optional, List
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.repositories.user import UserRepository
from app.core.security import hash_password
class UserService:
def __init__(self, db: Session):
self.db = db
self.repository = UserRepository(db)
def get_user(self, user_id: int) -> User:
user = self.repository.get(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return user
def get_user_by_email(self, email: str) -> Optional[User]:
return self.repository.get_by_email(email)
def create_user(self, data: UserCreate) -> User:
# 이메일 중복 확인
if self.repository.get_by_email(data.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
user = User(
email=data.email,
username=data.username,
hashed_password=hash_password(data.password),
)
return self.repository.create(user)
def update_user(self, user_id: int, data: UserUpdate) -> User:
user = self.get_user(user_id)
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
return self.repository.update(user)
def delete_user(self, user_id: int) -> None:
user = self.get_user(user_id)
self.repository.delete(user)
# Dependency
def get_user_service(db: Session = Depends(get_db)) -> UserService:
return UserService(db)
```
### 14.3 Router (Endpoints)
```python
# app/routers/user.py
from fastapi import APIRouter, Depends, Query, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.schemas.user import UserCreate, UserUpdate, UserResponse, UserListResponse
from app.services.user import UserService, get_user_service
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/", response_model=UserListResponse)
def list_users(
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
service: UserService = Depends(get_user_service),
):
users = service.repository.get_all(skip=skip, limit=limit)
return {"items": users, "total": len(users)}
@router.get("/{user_id}", response_model=UserResponse)
def get_user(
user_id: int,
service: UserService = Depends(get_user_service),
):
return service.get_user(user_id)
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
data: UserCreate,
service: UserService = Depends(get_user_service),
):
return service.create_user(data)
@router.patch("/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
data: UserUpdate,
service: UserService = Depends(get_user_service),
):
return service.update_user(user_id, data)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(
user_id: int,
service: UserService = Depends(get_user_service),
):
service.delete_user(user_id)
```
### 14.4 Pydantic 스키마
```python
# app/schemas/user.py
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field, ConfigDict
class UserBase(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
full_name: Optional[str] = None
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = Field(None, min_length=3, max_length=50)
full_name: Optional[str] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class UserListResponse(BaseModel):
items: List[UserResponse]
total: int
```
---
## 15. 성능 최적화
### 15.1 인덱스 최적화
```python
from sqlalchemy import Index
class User(Base):
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
status: Mapped[str] = mapped_column(String(20))
created_at: Mapped[datetime] = mapped_column(default=func.now())
__table_args__ = (
# 단일 인덱스
Index("idx_user_status", "status"),
# 복합 인덱스
Index("idx_user_status_created", "status", "created_at"),
# 부분 인덱스 (PostgreSQL)
Index(
"idx_user_active",
"email",
postgresql_where=text("status = 'active'"),
),
)
```
### 15.2 쿼리 최적화
```python
# ─────────────────────────────────────────────────────
# 1. 필요한 컬럼만 조회
# ─────────────────────────────────────────────────────
# Bad
users = db.scalars(select(User)).all()
emails = [u.email for u in users]
# Good
emails = db.scalars(select(User.email)).all()
# ─────────────────────────────────────────────────────
# 2. N+1 문제 해결
# ─────────────────────────────────────────────────────
# Bad
users = db.scalars(select(User)).all()
for user in users:
print(user.posts) # N+1
# Good
stmt = select(User).options(selectinload(User.posts))
users = db.scalars(stmt).all()
# ─────────────────────────────────────────────────────
# 3. Bulk 작업 사용
# ─────────────────────────────────────────────────────
# Bad - 개별 업데이트
for user in users:
user.is_active = False
db.commit()
# Good - Bulk 업데이트
db.execute(
update(User)
.where(User.id.in_([u.id for u in users]))
.values(is_active=False)
)
db.commit()
# ─────────────────────────────────────────────────────
# 4. 존재 확인은 EXISTS 사용
# ─────────────────────────────────────────────────────
# Bad
user = db.scalar(select(User).where(User.email == email))
exists = user is not None
# Good
exists = db.scalar(
select(exists().where(User.email == email))
)
# ─────────────────────────────────────────────────────
# 5. 카운트는 count() 사용
# ─────────────────────────────────────────────────────
# Bad
count = len(db.scalars(select(User)).all())
# Good
count = db.scalar(select(func.count(User.id)))
```
### 15.3 연결 풀 설정
```python
engine = create_engine(
DATABASE_URL,
pool_size=10, # 기본 연결 수
max_overflow=20, # 추가 허용 연결 수
pool_timeout=30, # 연결 대기 시간
pool_recycle=1800, # 연결 재생성 주기 (30분)
pool_pre_ping=True, # 연결 유효성 사전 검사
)
```
### 15.4 쿼리 실행 계획 확인
```python
from sqlalchemy import explain
stmt = select(User).where(User.email == "test@example.com")
# 실행 계획 출력
print(db.execute(explain(stmt)).fetchall())
# MySQL EXPLAIN
print(db.execute(text(f"EXPLAIN {stmt}")).fetchall())
```
---
## 16. 실무 레시피
### 16.1 Soft Delete 구현
```python
from datetime import datetime
from sqlalchemy import event
class SoftDeleteMixin:
is_deleted: Mapped[bool] = mapped_column(default=False)
deleted_at: Mapped[Optional[datetime]] = mapped_column(default=None)
def soft_delete(self):
self.is_deleted = True
self.deleted_at = datetime.utcnow()
# 자동 필터링 (Global Filter)
@event.listens_for(Session, "do_orm_execute")
def _add_filtering_criteria(execute_state):
if execute_state.is_select:
execute_state.statement = execute_state.statement.options(
with_loader_criteria(
SoftDeleteMixin,
lambda cls: cls.is_deleted == False,
include_aliases=True,
)
)
```
### 16.2 Audit Log
```python
from sqlalchemy import event
class AuditMixin:
created_by: Mapped[Optional[int]] = mapped_column(ForeignKey("user.id"))
updated_by: Mapped[Optional[int]] = mapped_column(ForeignKey("user.id"))
# 현재 사용자 컨텍스트
from contextvars import ContextVar
current_user_id: ContextVar[Optional[int]] = ContextVar("current_user_id", default=None)
@event.listens_for(AuditMixin, "before_insert", propagate=True)
def set_created_by(mapper, connection, target):
if user_id := current_user_id.get():
target.created_by = user_id
target.updated_by = user_id
@event.listens_for(AuditMixin, "before_update", propagate=True)
def set_updated_by(mapper, connection, target):
if user_id := current_user_id.get():
target.updated_by = user_id
# FastAPI 미들웨어
@app.middleware("http")
async def set_current_user(request: Request, call_next):
user_id = get_user_id_from_token(request)
token = current_user_id.set(user_id)
try:
response = await call_next(request)
return response
finally:
current_user_id.reset(token)
```
### 16.3 전체 텍스트 검색
```python
from sqlalchemy import Index, text
class Post(Base):
__tablename__ = "post"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
__table_args__ = (
# MySQL FULLTEXT 인덱스
Index(
"idx_post_fulltext",
"title", "content",
mysql_prefix="FULLTEXT",
),
)
def search_posts(db: Session, keyword: str) -> list[Post]:
stmt = (
select(Post)
.where(
text("MATCH(title, content) AGAINST(:keyword IN BOOLEAN MODE)")
)
.params(keyword=keyword)
)
return list(db.scalars(stmt).all())
```
### 16.4 슬러그 자동 생성
```python
from sqlalchemy import event
import re
def slugify(text: str) -> str:
text = text.lower()
text = re.sub(r'[^\w\s-]', '', text)
text = re.sub(r'[-\s]+', '-', text).strip('-')
return text
class Post(Base):
title: Mapped[str] = mapped_column(String(200))
slug: Mapped[str] = mapped_column(String(200), unique=True)
@event.listens_for(Post.title, "set")
def generate_slug(target, value, oldvalue, initiator):
if value and (not target.slug or oldvalue != value):
target.slug = slugify(value)
```
### 16.5 캐싱 패턴
```python
from functools import lru_cache
from typing import Optional
import json
import redis
redis_client = redis.Redis()
class CachedUserRepository:
def __init__(self, db: Session):
self.db = db
self.cache_ttl = 3600 # 1시간
def _cache_key(self, user_id: int) -> str:
return f"user:{user_id}"
def get(self, user_id: int) -> Optional[User]:
# 캐시 확인
cached = redis_client.get(self._cache_key(user_id))
if cached:
data = json.loads(cached)
return User(**data)
# DB 조회
user = self.db.get(User, user_id)
if user:
# 캐시 저장
redis_client.setex(
self._cache_key(user_id),
self.cache_ttl,
json.dumps(user.to_dict()),
)
return user
def invalidate(self, user_id: int) -> None:
redis_client.delete(self._cache_key(user_id))
```
---
## 부록: Quick Reference
### 자주 사용하는 import
```python
from sqlalchemy import (
create_engine, select, insert, update, delete,
and_, or_, not_, func, case, cast, exists,
text, literal, literal_column,
Index, ForeignKey, String, Integer, Text, Boolean,
desc, asc, nullsfirst, nullslast,
)
from sqlalchemy.orm import (
Session, sessionmaker, relationship,
Mapped, mapped_column,
selectinload, joinedload, subqueryload, raiseload,
contains_eager, aliased,
)
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.dialects.postgresql import insert as pg_insert
```
### 쿼리 실행 메서드
```python
db.execute(stmt) # Result 반환
db.scalars(stmt) # ScalarResult 반환
db.scalar(stmt) # 단일 값 반환
result.all() # 모든 결과
result.first() # 첫 번째 (없으면 None)
result.one() # 정확히 1개 (아니면 예외)
result.one_or_none() # 0-1개 (2개 이상 예외)
result.unique() # 중복 제거
```
### 관계 로딩 전략
```python
selectinload(Model.relation) # 1:N - SELECT ... IN
joinedload(Model.relation) # N:1, 1:1 - JOIN
subqueryload(Model.relation) # 복잡한 필터
raiseload(Model.relation) # 로딩 금지
```