import logging from datetime import datetime from typing import Optional from config import TIMEZONE from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.credit.exceptions import ( ChargeRequestNotFoundError, InsufficientCreditError, InvalidRequestStateError, ) from app.credit.models import ( ChargeRequestStatus, CreditChargeRequest, CreditTransaction, CreditTransactionType, ) logger = logging.getLogger(__name__) async def record_transaction( *, session: AsyncSession, user_uuid: str, amount: int, balance_after: int, type: CreditTransactionType, reason: Optional[str] = None, admin_id: Optional[int] = None, related_request_id: Optional[int] = None, ) -> CreditTransaction: tx = CreditTransaction( user_uuid=user_uuid, amount=amount, balance_after=balance_after, type=type, reason=reason, admin_id=admin_id, related_request_id=related_request_id, ) session.add(tx) await session.flush() return tx async def charge_credit( *, session: AsyncSession, user_uuid: str, amount: int, type: CreditTransactionType = CreditTransactionType.CHARGE, reason: Optional[str] = None, admin_id: Optional[int] = None, related_request_id: Optional[int] = None, ) -> CreditTransaction: from app.user.models import User result = await session.execute( select(User).where(User.user_uuid == user_uuid).with_for_update() ) user = result.scalar_one_or_none() if user is None: from app.user.services.auth import UserNotFoundError raise UserNotFoundError() user.credits = user.credits + amount await session.flush() tx = await record_transaction( session=session, user_uuid=user_uuid, amount=amount, balance_after=user.credits, type=type, reason=reason, admin_id=admin_id, related_request_id=related_request_id, ) logger.info(f"[CREDIT] charge user_uuid={user_uuid} amount=+{amount} balance_after={user.credits}") return tx async def deduct_credit( *, session: AsyncSession, user_uuid: str, amount: int, type: CreditTransactionType = CreditTransactionType.CONSUME, reason: Optional[str] = None, admin_id: Optional[int] = None, ) -> CreditTransaction: from app.user.models import User result = await session.execute( select(User).where(User.user_uuid == user_uuid).with_for_update() ) user = result.scalar_one_or_none() if user is None: from app.user.services.auth import UserNotFoundError raise UserNotFoundError() if user.credits < amount: logger.warning(f"[CREDIT] insufficient credits user_uuid={user_uuid} credits={user.credits} requested={amount}") raise InsufficientCreditError() user.credits = user.credits - amount await session.flush() tx = await record_transaction( session=session, user_uuid=user_uuid, amount=-amount, balance_after=user.credits, type=type, reason=reason, admin_id=admin_id, ) logger.info(f"[CREDIT] deduct user_uuid={user_uuid} amount=-{amount} balance_after={user.credits}") return tx async def approve_charge_request( *, session: AsyncSession, request_id: int, admin_id: int, admin_note: Optional[str] = None, ) -> CreditChargeRequest: result = await session.execute( select(CreditChargeRequest) .where(CreditChargeRequest.id == request_id) .with_for_update() ) charge_request = result.scalar_one_or_none() if charge_request is None: raise ChargeRequestNotFoundError() if charge_request.status != ChargeRequestStatus.PENDING: logger.warning(f"[CREDIT] approve blocked request_id={request_id} status={charge_request.status}") raise InvalidRequestStateError() await charge_credit( session=session, user_uuid=charge_request.user_uuid, amount=charge_request.requested_amount, type=CreditTransactionType.CHARGE, reason="충전 요청 승인", admin_id=admin_id, related_request_id=request_id, ) charge_request.status = ChargeRequestStatus.APPROVED charge_request.admin_id = admin_id charge_request.admin_note = admin_note charge_request.processed_at = datetime.now(TIMEZONE) await session.flush() logger.info(f"[CREDIT] approved request_id={request_id} admin_id={admin_id} amount={charge_request.requested_amount}") return charge_request async def reject_charge_request( *, session: AsyncSession, request_id: int, admin_id: int, admin_note: Optional[str] = None, ) -> CreditChargeRequest: result = await session.execute( select(CreditChargeRequest) .where(CreditChargeRequest.id == request_id) .with_for_update() ) charge_request = result.scalar_one_or_none() if charge_request is None: raise ChargeRequestNotFoundError() if charge_request.status != ChargeRequestStatus.PENDING: logger.warning(f"[CREDIT] reject blocked request_id={request_id} status={charge_request.status}") raise InvalidRequestStateError() charge_request.status = ChargeRequestStatus.REJECTED charge_request.admin_id = admin_id charge_request.admin_note = admin_note charge_request.processed_at = datetime.now(TIMEZONE) await session.flush() logger.info(f"[CREDIT] rejected request_id={request_id} admin_id={admin_id}") return charge_request