73 lines
2.3 KiB
Python
73 lines
2.3 KiB
Python
import logging
|
|
from datetime import datetime
|
|
|
|
from sqladmin.authentication import AuthenticationBackend
|
|
from sqlalchemy import select
|
|
from starlette.requests import Request
|
|
from starlette.responses import RedirectResponse
|
|
|
|
from app.backoffice.admin.models import Admin
|
|
from app.database.session import AsyncSessionLocal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AdminAuthBackend(AuthenticationBackend):
|
|
async def login(self, request: Request) -> bool:
|
|
form = await request.form()
|
|
username = form.get("username", "")
|
|
password = form.get("password", "")
|
|
|
|
if not username or not password:
|
|
return False
|
|
|
|
async with AsyncSessionLocal() as session:
|
|
result = await session.execute(
|
|
select(Admin).where(
|
|
Admin.username == username,
|
|
Admin.is_active == True, # noqa: E712
|
|
)
|
|
)
|
|
admin = result.scalar_one_or_none()
|
|
|
|
if admin is None or admin.password != password:
|
|
logger.warning(f"[ADMIN-AUTH] login failed username={username}")
|
|
return False
|
|
|
|
request.session["admin_id"] = admin.id
|
|
logger.info(f"[ADMIN-AUTH] login success admin_id={admin.id} username={username}")
|
|
|
|
# 마지막 로그인 시간 갱신
|
|
async with AsyncSessionLocal() as session:
|
|
result = await session.execute(select(Admin).where(Admin.id == admin.id))
|
|
a = result.scalar_one()
|
|
a.last_login_at = datetime.now()
|
|
await session.commit()
|
|
|
|
return True
|
|
|
|
async def logout(self, request: Request) -> bool:
|
|
request.session.clear()
|
|
return True
|
|
|
|
async def authenticate(self, request: Request) -> bool:
|
|
admin_id = request.session.get("admin_id")
|
|
if not admin_id:
|
|
return False
|
|
|
|
async with AsyncSessionLocal() as session:
|
|
result = await session.execute(
|
|
select(Admin).where(
|
|
Admin.id == admin_id,
|
|
Admin.is_active == True, # noqa: E712
|
|
)
|
|
)
|
|
admin = result.scalar_one_or_none()
|
|
|
|
if admin is None:
|
|
logger.warning(f"[ADMIN-AUTH] authenticate failed admin_id={admin_id}")
|
|
request.session.clear()
|
|
return False
|
|
|
|
return True
|