o2o-castad-backend/app/user/dependencies/auth.py

178 lines
5.2 KiB
Python

"""
인증 의존성 주입
FastAPI 라우터에서 사용할 인증 관련 의존성을 정의합니다.
"""
import logging
from typing import Optional
from fastapi import Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session
from app.user.models import User
from app.user.services.auth import (
AdminRequiredError,
InvalidTokenError,
MissingTokenError,
UserInactiveError,
UserNotFoundError,
)
from app.user.services.jwt import decode_token
logger = logging.getLogger(__name__)
security = HTTPBearer(auto_error=False)
async def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
session: AsyncSession = Depends(get_session),
) -> User:
"""
현재 로그인한 사용자 반환 (필수 인증)
Args:
credentials: HTTP Bearer 토큰
session: DB 세션
Returns:
User: 현재 로그인한 사용자
Raises:
MissingTokenError: 토큰이 없는 경우
InvalidTokenError: 토큰이 유효하지 않은 경우
TokenExpiredError: 토큰이 만료된 경우
UserNotFoundError: 사용자를 찾을 수 없는 경우
UserInactiveError: 비활성화된 계정인 경우
"""
if credentials is None:
logger.info("[AUTH-DEP] 토큰 없음 - MissingTokenError")
raise MissingTokenError()
token = credentials.credentials
logger.debug(f"[AUTH-DEP] Access Token 검증 시작 - token: ...{token[-20:]}")
payload = decode_token(token)
if payload is None:
logger.warning(f"[AUTH-DEP] Access Token 디코딩 실패 - token: ...{token[-20:]}")
raise InvalidTokenError()
# 토큰 타입 확인
if payload.get("type") != "access":
logger.warning(
f"[AUTH-DEP] 토큰 타입 불일치 - expected: access, "
f"got: {payload.get('type')}, sub: {payload.get('sub')}"
)
raise InvalidTokenError("액세스 토큰이 아닙니다.")
user_uuid = payload.get("sub")
if user_uuid is None:
logger.warning(f"[AUTH-DEP] 토큰에 sub 클레임 없음 - token: ...{token[-20:]}")
raise InvalidTokenError()
# 사용자 조회
result = await session.execute(
select(User).where(
User.user_uuid == user_uuid,
User.is_deleted == False, # noqa: E712
)
)
user = result.scalar_one_or_none()
if user is None:
logger.warning(f"[AUTH-DEP] 사용자 미존재 - user_uuid: {user_uuid}")
raise UserNotFoundError()
if not user.is_active:
logger.warning(
f"[AUTH-DEP] 비활성 사용자 접근 - user_uuid: {user_uuid}, user_id: {user.id}"
)
raise UserInactiveError()
logger.debug(
f"[AUTH-DEP] Access Token 검증 성공 - user_uuid: {user_uuid}, user_id: {user.id}"
)
return user
async def get_current_user_optional(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
session: AsyncSession = Depends(get_session),
) -> Optional[User]:
"""
현재 로그인한 사용자 반환 (선택적 인증)
토큰이 없거나 유효하지 않으면 None 반환
Args:
credentials: HTTP Bearer 토큰
session: DB 세션
Returns:
User | None: 로그인한 사용자 또는 None
"""
if credentials is None:
logger.debug("[AUTH-DEP] 선택적 인증 - 토큰 없음")
return None
token = credentials.credentials
payload = decode_token(token)
if payload is None:
logger.debug(f"[AUTH-DEP] 선택적 인증 - 디코딩 실패, token: ...{token[-20:]}")
return None
if payload.get("type") != "access":
logger.debug(
f"[AUTH-DEP] 선택적 인증 - 타입 불일치 (type={payload.get('type')})"
)
return None
user_uuid = payload.get("sub")
if user_uuid is None:
logger.debug("[AUTH-DEP] 선택적 인증 - sub 없음")
return None
result = await session.execute(
select(User).where(
User.user_uuid == user_uuid,
User.is_deleted == False, # noqa: E712
)
)
user = result.scalar_one_or_none()
if user is None or not user.is_active:
logger.debug(
f"[AUTH-DEP] 선택적 인증 - 사용자 미존재 또는 비활성, user_uuid: {user_uuid}"
)
return None
logger.debug(
f"[AUTH-DEP] 선택적 인증 성공 - user_uuid: {user_uuid}, user_id: {user.id}"
)
return user
async def get_current_admin(
current_user: User = Depends(get_current_user),
) -> User:
"""
현재 로그인한 관리자 반환
Args:
current_user: 현재 로그인한 사용자
Returns:
User: 관리자 권한이 있는 사용자
Raises:
AdminRequiredError: 관리자 권한이 없는 경우
"""
if not current_user.is_admin and current_user.role != "admin":
raise AdminRequiredError()
return current_user