""" 인증 의존성 주입 FastAPI 라우터에서 사용할 인증 관련 의존성을 정의합니다. """ import logging from typing import Optional from fastapi import Depends from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.database.session import get_session from app.user.models import User from app.user.services.auth import ( AdminRequiredError, InvalidTokenError, MissingTokenError, UserInactiveError, UserNotFoundError, ) from app.user.services.jwt import decode_token logger = logging.getLogger(__name__) security = HTTPBearer(auto_error=False) async def get_current_user( credentials: HTTPAuthorizationCredentials | None = Depends(security), session: AsyncSession = Depends(get_session), ) -> User: """ 현재 로그인한 사용자 반환 (필수 인증) Args: credentials: HTTP Bearer 토큰 session: DB 세션 Returns: User: 현재 로그인한 사용자 Raises: MissingTokenError: 토큰이 없는 경우 InvalidTokenError: 토큰이 유효하지 않은 경우 TokenExpiredError: 토큰이 만료된 경우 UserNotFoundError: 사용자를 찾을 수 없는 경우 UserInactiveError: 비활성화된 계정인 경우 """ if credentials is None: logger.info("[AUTH-DEP] 토큰 없음 - MissingTokenError") raise MissingTokenError() token = credentials.credentials logger.debug(f"[AUTH-DEP] Access Token 검증 시작 - token: ...{token[-20:]}") payload = decode_token(token) if payload is None: logger.warning(f"[AUTH-DEP] Access Token 디코딩 실패 - token: ...{token[-20:]}") raise InvalidTokenError() # 토큰 타입 확인 if payload.get("type") != "access": logger.warning( f"[AUTH-DEP] 토큰 타입 불일치 - expected: access, " f"got: {payload.get('type')}, sub: {payload.get('sub')}" ) raise InvalidTokenError("액세스 토큰이 아닙니다.") user_uuid = payload.get("sub") if user_uuid is None: logger.warning(f"[AUTH-DEP] 토큰에 sub 클레임 없음 - token: ...{token[-20:]}") raise InvalidTokenError() # 사용자 조회 result = await session.execute( select(User).where( User.user_uuid == user_uuid, User.is_deleted == False, # noqa: E712 ) ) user = result.scalar_one_or_none() if user is None: logger.warning(f"[AUTH-DEP] 사용자 미존재 - user_uuid: {user_uuid}") raise UserNotFoundError() if not user.is_active: logger.warning( f"[AUTH-DEP] 비활성 사용자 접근 - user_uuid: {user_uuid}, user_id: {user.id}" ) raise UserInactiveError() logger.debug( f"[AUTH-DEP] Access Token 검증 성공 - user_uuid: {user_uuid}, user_id: {user.id}" ) return user async def get_current_user_optional( credentials: HTTPAuthorizationCredentials | None = Depends(security), session: AsyncSession = Depends(get_session), ) -> Optional[User]: """ 현재 로그인한 사용자 반환 (선택적 인증) 토큰이 없거나 유효하지 않으면 None 반환 Args: credentials: HTTP Bearer 토큰 session: DB 세션 Returns: User | None: 로그인한 사용자 또는 None """ if credentials is None: logger.debug("[AUTH-DEP] 선택적 인증 - 토큰 없음") return None token = credentials.credentials payload = decode_token(token) if payload is None: logger.debug(f"[AUTH-DEP] 선택적 인증 - 디코딩 실패, token: ...{token[-20:]}") return None if payload.get("type") != "access": logger.debug( f"[AUTH-DEP] 선택적 인증 - 타입 불일치 (type={payload.get('type')})" ) return None user_uuid = payload.get("sub") if user_uuid is None: logger.debug("[AUTH-DEP] 선택적 인증 - sub 없음") return None result = await session.execute( select(User).where( User.user_uuid == user_uuid, User.is_deleted == False, # noqa: E712 ) ) user = result.scalar_one_or_none() if user is None or not user.is_active: logger.debug( f"[AUTH-DEP] 선택적 인증 - 사용자 미존재 또는 비활성, user_uuid: {user_uuid}" ) return None logger.debug( f"[AUTH-DEP] 선택적 인증 성공 - user_uuid: {user_uuid}, user_id: {user.id}" ) return user async def get_current_admin( current_user: User = Depends(get_current_user), ) -> User: """ 현재 로그인한 관리자 반환 Args: current_user: 현재 로그인한 사용자 Returns: User: 관리자 권한이 있는 사용자 Raises: AdminRequiredError: 관리자 권한이 없는 경우 """ if not current_user.is_admin and current_user.role != "admin": raise AdminRequiredError() return current_user