185 lines
6.4 KiB
Python
185 lines
6.4 KiB
Python
import time
|
|
from typing import AsyncGenerator
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
from sqlalchemy.orm import DeclarativeBase
|
|
|
|
from app.utils.logger import get_logger
|
|
from config import db_settings
|
|
|
|
logger = get_logger("database")
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
|
|
# =============================================================================
|
|
# 메인 엔진 (FastAPI 요청용)
|
|
# =============================================================================
|
|
engine = create_async_engine(
|
|
url=db_settings.MYSQL_URL,
|
|
echo=False,
|
|
pool_size=20, # 기본 풀 크기: 20
|
|
max_overflow=20, # 추가 연결: 20 (총 최대 40)
|
|
pool_timeout=30, # 풀에서 연결 대기 시간 (초)
|
|
pool_recycle=280, # MySQL wait_timeout(기본 28800s, 클라우드는 보통 300s) 보다 짧게 설정
|
|
pool_pre_ping=True, # 연결 유효성 검사 (죽은 연결 자동 재연결)
|
|
pool_reset_on_return="rollback", # 반환 시 롤백으로 초기화
|
|
connect_args={
|
|
"connect_timeout": 10, # DB 연결 타임아웃
|
|
"charset": "utf8mb4",
|
|
},
|
|
)
|
|
|
|
# 메인 세션 팩토리 (FastAPI DI용)
|
|
AsyncSessionLocal = async_sessionmaker(
|
|
bind=engine,
|
|
class_=AsyncSession,
|
|
expire_on_commit=False,
|
|
autoflush=False, # 명시적 flush 권장
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# 백그라운드 태스크 전용 엔진 (메인 풀과 분리)
|
|
# =============================================================================
|
|
background_engine = create_async_engine(
|
|
url=db_settings.MYSQL_URL,
|
|
echo=False,
|
|
pool_size=10, # 백그라운드용 풀 크기: 10
|
|
max_overflow=10, # 추가 연결: 10 (총 최대 20)
|
|
pool_timeout=60, # 백그라운드는 대기 시간 여유있게
|
|
pool_recycle=280, # MySQL wait_timeout 보다 짧게 설정
|
|
pool_pre_ping=True, # 연결 유효성 검사 (죽은 연결 자동 재연결)
|
|
pool_reset_on_return="rollback",
|
|
connect_args={
|
|
"connect_timeout": 10,
|
|
"charset": "utf8mb4",
|
|
},
|
|
)
|
|
|
|
# 백그라운드 세션 팩토리
|
|
BackgroundSessionLocal = async_sessionmaker(
|
|
bind=background_engine,
|
|
class_=AsyncSession,
|
|
expire_on_commit=False,
|
|
autoflush=False,
|
|
)
|
|
|
|
|
|
async def create_db_tables():
|
|
import asyncio
|
|
|
|
# 모델 import (테이블 메타데이터 등록용)
|
|
from app.user.models import User, RefreshToken, SocialAccount # noqa: F401
|
|
from app.home.models import Image, Project # noqa: F401
|
|
from app.lyric.models import Lyric # noqa: F401
|
|
from app.song.models import Song, SongTimestamp # noqa: F401
|
|
from app.video.models import Video # noqa: F401
|
|
from app.sns.models import SNSUploadTask # noqa: F401
|
|
from app.social.models import SocialUpload # noqa: F401
|
|
|
|
# 생성할 테이블 목록
|
|
tables_to_create = [
|
|
User.__table__,
|
|
RefreshToken.__table__,
|
|
SocialAccount.__table__,
|
|
Project.__table__,
|
|
Image.__table__,
|
|
Lyric.__table__,
|
|
Song.__table__,
|
|
SongTimestamp.__table__,
|
|
Video.__table__,
|
|
SNSUploadTask.__table__,
|
|
SocialUpload.__table__,
|
|
]
|
|
|
|
logger.info("Creating database tables...")
|
|
|
|
async with asyncio.timeout(10):
|
|
async with engine.begin() as connection:
|
|
await connection.run_sync(
|
|
lambda conn: Base.metadata.create_all(conn, tables=tables_to_create)
|
|
)
|
|
|
|
|
|
# FastAPI 의존성용 세션 제너레이터
|
|
async def get_session() -> AsyncGenerator[AsyncSession, None]:
|
|
start_time = time.perf_counter()
|
|
pool = engine.pool
|
|
|
|
# 커넥션 풀 상태 로깅 (디버깅용)
|
|
# logger.debug(
|
|
# f"[get_session] ACQUIRE - pool_size: {pool.size()}, "
|
|
# f"in: {pool.checkedin()}, out: {pool.checkedout()}, "
|
|
# f"overflow: {pool.overflow()}"
|
|
# )
|
|
|
|
async with AsyncSessionLocal() as session:
|
|
acquire_time = time.perf_counter()
|
|
# logger.debug(
|
|
# f"[get_session] Session acquired in "
|
|
# f"{(acquire_time - start_time)*1000:.1f}ms"
|
|
# )
|
|
try:
|
|
yield session
|
|
except Exception as e:
|
|
await session.rollback()
|
|
logger.error(
|
|
f"[get_session] ROLLBACK - error: {type(e).__name__}: {e}, "
|
|
f"duration: {(time.perf_counter() - start_time)*1000:.1f}ms"
|
|
)
|
|
raise e
|
|
finally:
|
|
total_time = time.perf_counter() - start_time
|
|
# logger.debug(
|
|
# f"[get_session] RELEASE - duration: {total_time*1000:.1f}ms, "
|
|
# f"pool_out: {pool.checkedout()}"
|
|
# )
|
|
|
|
|
|
# 백그라운드 태스크용 세션 제너레이터
|
|
async def get_background_session() -> AsyncGenerator[AsyncSession, None]:
|
|
start_time = time.perf_counter()
|
|
pool = background_engine.pool
|
|
|
|
# logger.debug(
|
|
# f"[get_background_session] ACQUIRE - pool_size: {pool.size()}, "
|
|
# f"in: {pool.checkedin()}, out: {pool.checkedout()}, "
|
|
# f"overflow: {pool.overflow()}"
|
|
# )
|
|
|
|
async with BackgroundSessionLocal() as session:
|
|
acquire_time = time.perf_counter()
|
|
# logger.debug(
|
|
# f"[get_background_session] Session acquired in "
|
|
# f"{(acquire_time - start_time)*1000:.1f}ms"
|
|
# )
|
|
try:
|
|
yield session
|
|
except Exception as e:
|
|
await session.rollback()
|
|
logger.error(
|
|
f"[get_background_session] ROLLBACK - "
|
|
f"error: {type(e).__name__}: {e}, "
|
|
f"duration: {(time.perf_counter() - start_time)*1000:.1f}ms"
|
|
)
|
|
raise e
|
|
finally:
|
|
total_time = time.perf_counter() - start_time
|
|
# logger.debug(
|
|
# f"[get_background_session] RELEASE - "
|
|
# f"duration: {total_time*1000:.1f}ms, "
|
|
# f"pool_out: {pool.checkedout()}"
|
|
# )
|
|
|
|
|
|
# 앱 종료 시 엔진 리소스 정리 함수
|
|
async def dispose_engine() -> None:
|
|
logger.info("[dispose_engine] Disposing database engines...")
|
|
await engine.dispose()
|
|
logger.info("[dispose_engine] Main engine disposed")
|
|
await background_engine.dispose()
|
|
logger.info("[dispose_engine] Background engine disposed - ALL DONE")
|