o2o-castad-backend/app/backoffice/user_view_actions.py

115 lines
4.0 KiB
Python

import logging
from typing import Optional
from sqlalchemy import select
from starlette.requests import Request
from starlette.responses import RedirectResponse
from app.credit.models import CreditTransactionType
from app.credit.services.credit_service import charge_credit, deduct_credit
from app.database.session import AsyncSessionLocal
from app.user.models import User
logger = logging.getLogger(__name__)
async def _get_users_by_pks(session, pks: str) -> list[User]:
ids = [int(pk) for pk in pks.split(",") if pk.strip()]
if not ids:
return []
result = await session.execute(select(User).where(User.id.in_(ids)))
return list(result.scalars().all())
async def handle_block_users(request: Request, identity: str, block: bool) -> RedirectResponse:
pks = request.query_params.get("pks", "")
action_str = "차단" if block else "차단 해제"
async with AsyncSessionLocal() as session:
users = await _get_users_by_pks(session, pks)
for user in users:
user.is_active = not block
await session.commit()
logger.info(f"[USER-ADMIN] {action_str} count={len(users)}")
return RedirectResponse(request.url_for("admin:list", identity=identity), status_code=302)
async def handle_set_role(request: Request, identity: str, role: str) -> RedirectResponse:
pks = request.query_params.get("pks", "")
is_admin = role == "admin"
async with AsyncSessionLocal() as session:
users = await _get_users_by_pks(session, pks)
for user in users:
user.role = role
user.is_admin = is_admin
await session.commit()
logger.info(f"[USER-ADMIN] set_role role={role} count={len(users)}")
return RedirectResponse(request.url_for("admin:list", identity=identity), status_code=302)
async def handle_grant_credits(
request: Request,
identity: str,
amount: int,
admin_id: Optional[int],
) -> RedirectResponse:
pks = request.query_params.get("pks", "")
ids = [int(pk) for pk in pks.split(",") if pk.strip()]
async with AsyncSessionLocal() as session:
for user_id in ids:
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
continue
try:
await charge_credit(
session=session,
user_uuid=user.user_uuid,
amount=amount,
type=CreditTransactionType.ADMIN_ADJUST,
reason="관리자 수동 충전",
admin_id=admin_id,
)
await session.commit()
except Exception as e:
await session.rollback()
logger.warning(f"[USER-ADMIN] grant_credits failed user_id={user_id} error={e}")
return RedirectResponse(request.url_for("admin:list", identity=identity), status_code=302)
async def handle_deduct_credits(
request: Request,
identity: str,
amount: int,
admin_id: Optional[int],
) -> RedirectResponse:
pks = request.query_params.get("pks", "")
ids = [int(pk) for pk in pks.split(",") if pk.strip()]
async with AsyncSessionLocal() as session:
for user_id in ids:
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
continue
try:
await deduct_credit(
session=session,
user_uuid=user.user_uuid,
amount=amount,
type=CreditTransactionType.ADMIN_ADJUST,
reason="관리자 수동 차감",
admin_id=admin_id,
)
await session.commit()
except Exception as e:
await session.rollback()
logger.warning(f"[USER-ADMIN] deduct_credits failed user_id={user_id} error={e}")
return RedirectResponse(request.url_for("admin:list", identity=identity), status_code=302)