""" 인증 의존성 주입 FastAPI 라우터에서 사용할 인증 관련 의존성을 정의합니다. """ from typing import Optional from fastapi import Depends from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.database.session import get_session from app.user.exceptions import ( AdminRequiredError, InvalidTokenError, MissingTokenError, TokenExpiredError, UserInactiveError, UserNotFoundError, ) from app.user.models import User from app.user.services.jwt import decode_token security = HTTPBearer(auto_error=False) async def get_current_user( credentials: HTTPAuthorizationCredentials | None = Depends(security), session: AsyncSession = Depends(get_session), ) -> User: """ 현재 로그인한 사용자 반환 (필수 인증) Args: credentials: HTTP Bearer 토큰 session: DB 세션 Returns: User: 현재 로그인한 사용자 Raises: MissingTokenError: 토큰이 없는 경우 InvalidTokenError: 토큰이 유효하지 않은 경우 TokenExpiredError: 토큰이 만료된 경우 UserNotFoundError: 사용자를 찾을 수 없는 경우 UserInactiveError: 비활성화된 계정인 경우 """ if credentials is None: raise MissingTokenError() payload = decode_token(credentials.credentials) if payload is None: raise InvalidTokenError() # 토큰 타입 확인 if payload.get("type") != "access": raise InvalidTokenError("액세스 토큰이 아닙니다.") user_uuid = payload.get("sub") if user_uuid is None: raise InvalidTokenError() # 사용자 조회 result = await session.execute( select(User).where( User.user_uuid == user_uuid, User.is_deleted == False, # noqa: E712 ) ) user = result.scalar_one_or_none() if user is None: raise UserNotFoundError() if not user.is_active: raise UserInactiveError() return user async def get_current_user_optional( credentials: HTTPAuthorizationCredentials | None = Depends(security), session: AsyncSession = Depends(get_session), ) -> Optional[User]: """ 현재 로그인한 사용자 반환 (선택적 인증) 토큰이 없거나 유효하지 않으면 None 반환 Args: credentials: HTTP Bearer 토큰 session: DB 세션 Returns: User | None: 로그인한 사용자 또는 None """ if credentials is None: return None payload = decode_token(credentials.credentials) if payload is None: return None if payload.get("type") != "access": return None user_uuid = payload.get("sub") if user_uuid is None: return None result = await session.execute( select(User).where( User.user_uuid == user_uuid, User.is_deleted == False, # noqa: E712 ) ) user = result.scalar_one_or_none() if user is None or not user.is_active: return None return user async def get_current_admin( current_user: User = Depends(get_current_user), ) -> User: """ 현재 로그인한 관리자 반환 Args: current_user: 현재 로그인한 사용자 Returns: User: 관리자 권한이 있는 사용자 Raises: AdminRequiredError: 관리자 권한이 없는 경우 """ if not current_user.is_admin and current_user.role != "admin": raise AdminRequiredError() return current_user