o2o-castad-backend/app/user/dependencies/auth.py

145 lines
3.7 KiB
Python

"""
인증 의존성 주입
FastAPI 라우터에서 사용할 인증 관련 의존성을 정의합니다.
"""
from typing import Optional
from fastapi import Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session
from app.user.models import User
from app.user.services.auth import (
AdminRequiredError,
InvalidTokenError,
MissingTokenError,
UserInactiveError,
UserNotFoundError,
)
from app.user.services.jwt import decode_token
security = HTTPBearer(auto_error=False)
async def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
session: AsyncSession = Depends(get_session),
) -> User:
"""
현재 로그인한 사용자 반환 (필수 인증)
Args:
credentials: HTTP Bearer 토큰
session: DB 세션
Returns:
User: 현재 로그인한 사용자
Raises:
MissingTokenError: 토큰이 없는 경우
InvalidTokenError: 토큰이 유효하지 않은 경우
TokenExpiredError: 토큰이 만료된 경우
UserNotFoundError: 사용자를 찾을 수 없는 경우
UserInactiveError: 비활성화된 계정인 경우
"""
if credentials is None:
raise MissingTokenError()
payload = decode_token(credentials.credentials)
if payload is None:
raise InvalidTokenError()
# 토큰 타입 확인
if payload.get("type") != "access":
raise InvalidTokenError("액세스 토큰이 아닙니다.")
user_uuid = payload.get("sub")
if user_uuid is None:
raise InvalidTokenError()
# 사용자 조회
result = await session.execute(
select(User).where(
User.user_uuid == user_uuid,
User.is_deleted == False, # noqa: E712
)
)
user = result.scalar_one_or_none()
if user is None:
raise UserNotFoundError()
if not user.is_active:
raise UserInactiveError()
return user
async def get_current_user_optional(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
session: AsyncSession = Depends(get_session),
) -> Optional[User]:
"""
현재 로그인한 사용자 반환 (선택적 인증)
토큰이 없거나 유효하지 않으면 None 반환
Args:
credentials: HTTP Bearer 토큰
session: DB 세션
Returns:
User | None: 로그인한 사용자 또는 None
"""
if credentials is None:
return None
payload = decode_token(credentials.credentials)
if payload is None:
return None
if payload.get("type") != "access":
return None
user_uuid = payload.get("sub")
if user_uuid is None:
return None
result = await session.execute(
select(User).where(
User.user_uuid == user_uuid,
User.is_deleted == False, # noqa: E712
)
)
user = result.scalar_one_or_none()
if user is None or not user.is_active:
return None
return user
async def get_current_admin(
current_user: User = Depends(get_current_user),
) -> User:
"""
현재 로그인한 관리자 반환
Args:
current_user: 현재 로그인한 사용자
Returns:
User: 관리자 권한이 있는 사용자
Raises:
AdminRequiredError: 관리자 권한이 없는 경우
"""
if not current_user.is_admin and current_user.role != "admin":
raise AdminRequiredError()
return current_user