o2o-castad-backend/app/backoffice/admin/service.py

44 lines
1.0 KiB
Python

import logging
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.backoffice.admin.models import Admin
logger = logging.getLogger(__name__)
async def create_admin(
*,
session: AsyncSession,
username: str,
password: str,
name: Optional[str] = None,
) -> Admin:
admin = Admin(
username=username,
password=password,
name=name,
)
session.add(admin)
await session.commit()
await session.refresh(admin)
logger.info(f"[ADMIN] created admin username={username}")
return admin
async def change_password(
*,
session: AsyncSession,
admin_id: int,
new_password: str,
) -> None:
result = await session.execute(select(Admin).where(Admin.id == admin_id))
admin = result.scalar_one_or_none()
if admin is None:
raise ValueError(f"Admin id={admin_id} not found")
admin.password = new_password
await session.commit()
logger.info(f"[ADMIN] password changed admin_id={admin_id}")