Compare commits

...

8 Commits

46 changed files with 2561 additions and 172 deletions

View File

@ -1,48 +1,78 @@
from fastapi import FastAPI from pathlib import Path
from sqladmin import Admin
from fastapi import FastAPI
from app.database.session import engine from sqladmin import Admin
from app.home.api.home_admin import ImageAdmin, ProjectAdmin from sqladmin.authentication import login_required
from app.lyric.api.lyrics_admin import LyricAdmin from starlette.exceptions import HTTPException
from app.song.api.song_admin import SongAdmin from starlette.requests import Request
from app.sns.api.sns_admin import SNSUploadTaskAdmin from starlette.responses import Response
from app.user.api.user_admin import RefreshTokenAdmin, SocialAccountAdmin, UserAdmin from sqlalchemy.ext.asyncio import AsyncEngine
from app.video.api.video_admin import VideoAdmin
from config import prj_settings from app.backoffice.admin.admin_view import AdminAdmin
from app.backoffice.admin.auth import AdminAuthBackend
# https://github.com/aminalaee/sqladmin from app.backoffice.credit_view import CreditChargeRequestAdmin, CreditTransactionAdmin
from app.backoffice.dashboard import get_dashboard_context
from app.user.api.user_admin import SocialAccountAdmin, UserAdmin
def init_admin( from config import prj_settings
app: FastAPI,
db_engine: engine, TEMPLATES_DIR = Path(__file__).parent / "backoffice" / "frontend" / "templates"
base_url: str = prj_settings.ADMIN_BASE_URL,
) -> Admin:
admin = Admin( class DashboardAdmin(Admin):
app, @login_required
db_engine, async def index(self, request: Request) -> Response:
base_url=base_url, ctx = await get_dashboard_context()
) admin_role = request.session.get("admin_role", "viewer")
return await self.templates.TemplateResponse(
# 프로젝트 관리 request,
admin.add_view(ProjectAdmin) "sqladmin/index.html",
admin.add_view(ImageAdmin) {"title": "대시보드", "subtitle": "", "admin_role": admin_role, **ctx},
)
# 가사 관리
admin.add_view(LyricAdmin) @login_required
async def edit(self, request: Request) -> Response:
# 노래 관리 if request.session.get("admin_role") == "viewer":
admin.add_view(SongAdmin) raise HTTPException(status_code=403)
return await super().edit(request)
# 영상 관리
admin.add_view(VideoAdmin) @login_required
async def create(self, request: Request) -> Response:
# 사용자 관리 if request.session.get("admin_role") == "viewer":
admin.add_view(UserAdmin) raise HTTPException(status_code=403)
admin.add_view(RefreshTokenAdmin) return await super().create(request)
admin.add_view(SocialAccountAdmin)
@login_required
# SNS 관리 async def delete(self, request: Request) -> Response:
admin.add_view(SNSUploadTaskAdmin) if request.session.get("admin_role") == "viewer":
raise HTTPException(status_code=403)
return admin return await super().delete(request)
def init_admin(
app: FastAPI,
db_engine: AsyncEngine,
base_url: str = prj_settings.ADMIN_BASE_URL,
) -> Admin:
auth_backend = AdminAuthBackend(secret_key=prj_settings.ADMIN_SESSION_SECRET)
admin = DashboardAdmin(
app,
db_engine,
base_url=base_url,
authentication_backend=auth_backend,
title="ADO2 관리자",
templates_dir=str(TEMPLATES_DIR),
)
# 사용자 관리
admin.add_view(UserAdmin)
admin.add_view(SocialAccountAdmin)
# 크레딧 관리 (superadmin: 전체, viewer: 읽기 전용)
admin.add_view(CreditChargeRequestAdmin)
admin.add_view(CreditTransactionAdmin)
# 백오피스 설정
admin.add_view(AdminAdmin)
return admin

View File

View File

View File

@ -0,0 +1,74 @@
from sqladmin import ModelView
from wtforms import PasswordField, SelectField
from app.backoffice.admin.models import Admin
from app.backoffice.mixins import SuperAdminOnly
class AdminAdmin(SuperAdminOnly, ModelView, model=Admin):
name = "관리자 계정"
name_plural = "관리자 계정 목록"
icon = "fa-solid fa-user-shield"
category = "백오피스 설정"
page_size = 30
column_list = [
"id",
"username",
"name",
"role",
"is_active",
"last_login_at",
"created_at",
]
column_details_list = [
"id",
"username",
"name",
"role",
"is_active",
"last_login_at",
"created_at",
"updated_at",
]
form_columns = ["username", "password", "name", "role", "is_active"]
form_overrides = {
"password": PasswordField,
"role": SelectField,
}
form_args = {
"role": {
"label": "권한",
"choices": [("superadmin", "전체 관리자"), ("viewer", "일반 관리자")],
"default": "viewer",
}
}
column_searchable_list = [Admin.username, Admin.name]
column_default_sort = (Admin.created_at, True)
column_sortable_list = [
Admin.id,
Admin.username,
Admin.is_active,
Admin.last_login_at,
Admin.created_at,
]
column_labels = {
"id": "ID",
"username": "아이디",
"name": "이름",
"role": "권한",
"is_active": "활성화",
"last_login_at": "마지막 로그인",
"created_at": "생성일시",
"updated_at": "수정일시",
}
can_delete = False

View File

@ -0,0 +1,74 @@
import logging
from datetime import datetime
from sqladmin.authentication import AuthenticationBackend
from sqlalchemy import select
from starlette.requests import Request
from starlette.responses import RedirectResponse
from app.backoffice.admin.models import Admin
from app.database.session import AsyncSessionLocal
logger = logging.getLogger(__name__)
class AdminAuthBackend(AuthenticationBackend):
async def login(self, request: Request) -> bool:
form = await request.form()
username = form.get("username", "")
password = form.get("password", "")
if not username or not password:
return False
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Admin).where(
Admin.username == username,
Admin.is_active == True, # noqa: E712
)
)
admin = result.scalar_one_or_none()
if admin is None or admin.password != password:
logger.warning(f"[ADMIN-AUTH] login failed username={username}")
return False
request.session["admin_id"] = admin.id
request.session["admin_role"] = admin.role
request.session["admin_name"] = admin.name or admin.username
logger.info(f"[ADMIN-AUTH] login success admin_id={admin.id} username={username} role={admin.role}")
# 마지막 로그인 시간 갱신
async with AsyncSessionLocal() as session:
result = await session.execute(select(Admin).where(Admin.id == admin.id))
a = result.scalar_one()
a.last_login_at = datetime.now()
await session.commit()
return True
async def logout(self, request: Request) -> bool:
request.session.clear()
return True
async def authenticate(self, request: Request) -> bool:
admin_id = request.session.get("admin_id")
if not admin_id:
return False
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Admin).where(
Admin.id == admin_id,
Admin.is_active == True, # noqa: E712
)
)
admin = result.scalar_one_or_none()
if admin is None:
logger.warning(f"[ADMIN-AUTH] authenticate failed admin_id={admin_id}")
request.session.clear()
return False
return True

View File

@ -0,0 +1,86 @@
from datetime import datetime
from typing import Optional
from sqlalchemy import BigInteger, Boolean, DateTime, Index, String, func
from sqlalchemy.orm import Mapped, mapped_column
from app.database.session import Base
class Admin(Base):
__tablename__ = "admin"
__table_args__ = (
Index("idx_admin_username", "username", unique=True),
Index("idx_admin_is_active", "is_active"),
{
"mysql_engine": "InnoDB",
"mysql_charset": "utf8mb4",
"mysql_collate": "utf8mb4_unicode_ci",
},
)
id: Mapped[int] = mapped_column(
BigInteger,
primary_key=True,
nullable=False,
autoincrement=True,
comment="고유 식별자",
)
username: Mapped[str] = mapped_column(
String(50),
nullable=False,
unique=True,
comment="로그인 ID",
)
password: Mapped[str] = mapped_column(
String(255),
nullable=False,
comment="비밀번호",
)
name: Mapped[Optional[str]] = mapped_column(
String(50),
nullable=True,
comment="표시 이름",
)
role: Mapped[str] = mapped_column(
String(20),
nullable=False,
default="viewer",
server_default="viewer",
comment="권한 (superadmin: 전체, viewer: 조회만)",
)
is_active: Mapped[bool] = mapped_column(
Boolean,
nullable=False,
default=True,
comment="활성화 상태 (비활성화 시 로그인 차단)",
)
last_login_at: Mapped[Optional[datetime]] = mapped_column(
DateTime,
nullable=True,
comment="마지막 로그인 일시",
)
created_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.now(),
comment="생성 일시",
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.now(),
onupdate=func.now(),
comment="수정 일시",
)
def __repr__(self) -> str:
return f"<Admin(id={self.id}, username='{self.username}', is_active={self.is_active})>"

View File

@ -0,0 +1,43 @@
import logging
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.backoffice.admin.models import Admin
logger = logging.getLogger(__name__)
async def create_admin(
*,
session: AsyncSession,
username: str,
password: str,
name: Optional[str] = None,
) -> Admin:
admin = Admin(
username=username,
password=password,
name=name,
)
session.add(admin)
await session.commit()
await session.refresh(admin)
logger.info(f"[ADMIN] created admin username={username}")
return admin
async def change_password(
*,
session: AsyncSession,
admin_id: int,
new_password: str,
) -> None:
result = await session.execute(select(Admin).where(Admin.id == admin_id))
admin = result.scalar_one_or_none()
if admin is None:
raise ValueError(f"Admin id={admin_id} not found")
admin.password = new_password
await session.commit()
logger.info(f"[ADMIN] password changed admin_id={admin_id}")

View File

@ -0,0 +1,205 @@
import logging
from sqlalchemy import select
from sqlalchemy.orm import outerjoin
from sqladmin import ModelView, action
from app.backoffice.mixins import SuperAdminEditable
from starlette.requests import Request
from starlette.responses import RedirectResponse
from app.backoffice.admin.models import Admin
from app.credit.models import CreditChargeRequest, CreditTransaction
from app.credit.services.credit_service import approve_charge_request, reject_charge_request
from app.database.session import AsyncSessionLocal
logger = logging.getLogger(__name__)
class CreditChargeRequestAdmin(SuperAdminEditable, ModelView, model=CreditChargeRequest):
name = "충전 요청"
name_plural = "충전 요청 목록"
icon = "fa-solid fa-coins"
category = "크레딧 관리"
page_size = 30
can_edit = True
can_delete = False
column_list = [
"id",
"user_uuid",
"requested_amount",
"status",
"admin.name",
"processed_at",
"created_at",
]
column_details_list = [
"id",
"user_uuid",
"requested_amount",
"message",
"status",
"admin.name",
"admin_note",
"processed_at",
"created_at",
"updated_at",
]
form_columns = ["admin_note"]
can_create = False
column_searchable_list = [
CreditChargeRequest.user_uuid,
CreditChargeRequest.status,
]
column_default_sort = (CreditChargeRequest.created_at, True)
column_sortable_list = [
CreditChargeRequest.id,
CreditChargeRequest.user_uuid,
CreditChargeRequest.requested_amount,
CreditChargeRequest.status,
CreditChargeRequest.processed_at,
CreditChargeRequest.created_at,
]
column_labels = {
"id": "ID",
"user_uuid": "사용자 UUID",
"requested_amount": "요청 크레딧",
"message": "사용자 메시지",
"status": "상태",
"admin.name": "처리 관리자",
"admin_note": "관리자 메모",
"processed_at": "처리일시",
"created_at": "요청일시",
"updated_at": "수정일시",
}
@action(
name="approve_request",
label="승인",
confirmation_message="선택한 충전 요청을 승인하시겠습니까?",
add_in_detail=True,
add_in_list=True,
)
async def approve_action(self, request: Request) -> RedirectResponse:
admin_id = request.session.get("admin_id")
pks = request.query_params.get("pks", "")
async with AsyncSessionLocal() as session:
for pk in pks.split(","):
if not pk.strip():
continue
try:
await approve_charge_request(
session=session,
request_id=int(pk),
admin_id=admin_id,
)
await session.commit()
except Exception as e:
await session.rollback()
logger.warning(f"[CREDIT-ADMIN] approve failed request_id={pk} error={e}")
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=302)
@action(
name="reject_request",
label="반려",
confirmation_message="선택한 충전 요청을 반려하시겠습니까?",
add_in_detail=True,
add_in_list=True,
)
async def reject_action(self, request: Request) -> RedirectResponse:
admin_id = request.session.get("admin_id")
pks = request.query_params.get("pks", "")
async with AsyncSessionLocal() as session:
for pk in pks.split(","):
if not pk.strip():
continue
try:
await reject_charge_request(
session=session,
request_id=int(pk),
admin_id=admin_id,
)
await session.commit()
except Exception as e:
await session.rollback()
logger.warning(f"[CREDIT-ADMIN] reject failed request_id={pk} error={e}")
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=302)
class CreditTransactionAdmin(SuperAdminEditable, ModelView, model=CreditTransaction):
name = "크레딧 변경"
name_plural = "크레딧 변경 목록"
icon = "fa-solid fa-clock-rotate-left"
category = "크레딧 관리"
page_size = 30
can_create = False
can_edit = False
can_delete = False
column_list = [
"id",
"user_uuid",
"amount",
"balance_after",
"type",
"admin.name",
"related_request_id",
"created_at",
]
column_details_list = [
"id",
"user_uuid",
"amount",
"balance_after",
"type",
"reason",
"admin.name",
"related_request_id",
"created_at",
]
column_searchable_list = [
CreditTransaction.user_uuid,
CreditTransaction.type,
]
column_default_sort = (CreditTransaction.created_at, True)
column_sortable_list = [
CreditTransaction.id,
CreditTransaction.user_uuid,
CreditTransaction.amount,
CreditTransaction.type,
CreditTransaction.created_at,
]
column_labels = {
"id": "ID",
"user_uuid": "사용자 UUID",
"amount": "변경 크레딧",
"balance_after": "변경 후 잔액",
"type": "변경 유형",
"reason": "사유",
"admin.name": "처리 관리자",
"related_request_id": "충전 요청 ID",
"created_at": "변경 일시",
}
def list_query(self, _request: Request):
return (
select(CreditTransaction)
.select_from(outerjoin(CreditTransaction, Admin, CreditTransaction.admin_id == Admin.id))
)

View File

@ -0,0 +1,79 @@
from datetime import datetime
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.requests import Request
from starlette.responses import Response
from app.credit.models import ChargeRequestStatus, CreditChargeRequest, CreditTransaction, CreditTransactionType
from app.database.session import AsyncSessionLocal
from app.user.models import User
from config import TIMEZONE
async def get_dashboard_context() -> dict:
async with AsyncSessionLocal() as session:
today_start = datetime.now(TIMEZONE).replace(hour=0, minute=0, second=0, microsecond=0)
pending_charge_requests_count = (await session.execute(
select(func.count()).select_from(CreditChargeRequest)
.where(CreditChargeRequest.status == ChargeRequestStatus.PENDING)
)).scalar()
today_charge = (await session.execute(
select(func.count()).select_from(CreditTransaction)
.where(
CreditTransaction.type == CreditTransactionType.CHARGE,
CreditTransaction.created_at >= today_start,
)
)).scalar()
today_consume = (await session.execute(
select(func.count()).select_from(CreditTransaction)
.where(
CreditTransaction.type == CreditTransactionType.CONSUME,
CreditTransaction.created_at >= today_start,
)
)).scalar()
month_start = datetime.now(TIMEZONE).replace(day=1, hour=0, minute=0, second=0, microsecond=0)
month_consume = (await session.execute(
select(func.coalesce(func.sum(func.abs(CreditTransaction.amount)), 0))
.select_from(CreditTransaction)
.where(
CreditTransaction.type == CreditTransactionType.CONSUME,
CreditTransaction.created_at >= month_start,
)
)).scalar()
pending_requests = (await session.execute(
select(CreditChargeRequest)
.where(CreditChargeRequest.status == ChargeRequestStatus.PENDING)
.order_by(CreditChargeRequest.created_at.desc())
.limit(10)
)).scalars().all()
recent_transactions = (await session.execute(
select(CreditTransaction)
.order_by(CreditTransaction.created_at.desc())
.limit(10)
)).scalars().all()
recent_users = (await session.execute(
select(User)
.where(User.is_deleted == False)
.order_by(User.created_at.desc())
.limit(10)
)).scalars().all()
return {
"stats": {
"pending_charge_requests": pending_charge_requests_count,
"today_charge": today_charge,
"today_consume": today_consume,
"month_consume": month_consume,
},
"pending_requests": pending_requests,
"recent_transactions": recent_transactions,
"recent_users": recent_users,
}

View File

@ -0,0 +1,106 @@
{% extends "sqladmin/layout.html" %}
{% block content %}
<div class="col-12">
<div class="card">
<div class="card-header">
<h3 class="card-title">
{% for pk in model_view.pk_columns -%}
{{ pk.name }}
{%- if not loop.last %};{% endif -%}
{% endfor %}: {{ get_object_identifier(model) }}</h3>
</div>
<div class="card-body border-bottom py-3">
<div class="table-responsive">
<table class="table card-table table-vcenter text-nowrap table-hover table-bordered">
<thead>
<tr>
<th class="w-1">Column</th>
<th class="w-1">Value</th>
</tr>
</thead>
<tbody>
{% for name in model_view._details_prop_names %}
{% set label = model_view._column_labels.get(name, name) %}
<tr>
<td>{{ label }}</td>
{% set value, formatted_value = model_view.get_detail_value(model, name) %}
{% if name in model_view._relation_names %}
{% if is_list( value ) %}
<td>
{% for elem, formatted_elem in zip(value, formatted_value) %}
{% if model_view.show_compact_lists %}
<a href="{{ model_view._build_url_for('admin:details', request, elem) }}">({{ formatted_elem }})</a>
{% else %}
<a href="{{ model_view._build_url_for('admin:details', request, elem) }}">{{ formatted_elem }}</a><br/>
{% endif %}
{% endfor %}
</td>
{% else %}
<td><a href="{{ model_view._url_for_details_with_prop(request, model, name) }}">{{ formatted_value }}</a>
</td>
{% endif %}
{% else %}
<td>{{ formatted_value }}</td>
{% endif %}
</tr>
{% endfor %}
</tbody>
</table>
</div>
<div class="card-footer container">
<div class="row">
<div class="col-md-1">
<a href="{{ url_for('admin:list', identity=model_view.identity) }}" class="btn">
Go Back
</a>
</div>
{% if model_view.can_delete and request.session.get('admin_role') == 'superadmin' %}
<div class="col-md-1">
<a href="#" data-name="{{ model_view.name }}" data-pk="{{ get_object_identifier(model) }}"
data-url="{{ model_view._url_for_delete(request, model) }}" data-bs-toggle="modal"
data-bs-target="#modal-delete" class="btn btn-danger">
Delete
</a>
</div>
{% endif %}
{% if model_view.can_edit and request.session.get('admin_role') == 'superadmin' %}
<div class="col-md-1">
<a href="{{ model_view._build_url_for('admin:edit', request, model) }}" class="btn btn-primary">
Edit
</a>
</div>
{% endif %}
{% for custom_action,label in model_view._custom_actions_in_detail.items() %}
<div class="col-md-1">
{% if custom_action in model_view._custom_actions_confirmation %}
<a href="#" class="btn btn-secondary" data-bs-toggle="modal"
data-bs-target="#modal-confirmation-{{ custom_action }}">
{{ label }}
</a>
{% else %}
<a href="{{ model_view._url_for_action(request, custom_action) }}?pks={{ get_object_identifier(model) }}"
class="btn btn-secondary">
{{ label }}
</a>
{% endif %}
</div>
{% endfor %}
</div>
</div>
</div>
</div>
</div>
{% if model_view.can_delete %}
{% include 'sqladmin/modals/delete.html' %}
{% endif %}
{% for custom_action in model_view._custom_actions_in_detail %}
{% if custom_action in model_view._custom_actions_confirmation %}
{% with confirmation_message = model_view._custom_actions_confirmation[custom_action], custom_action=custom_action,
url=model_view._url_for_action(request, custom_action) + '?pks=' + (get_object_identifier(model) | string) %}
{% include 'sqladmin/modals/details_action_confirmation.html' %}
{% endwith %}
{% endif %}
{% endfor %}
{% endblock %}

View File

@ -0,0 +1,149 @@
{% extends "sqladmin/layout.html" %}
{% block content %}
<!-- 요약 카드 -->
<div class="col-12">
<div class="row row-deck row-cards mb-4">
<div class="col-sm-6 col-lg-3">
<div class="card">
<div class="card-body">
<div class="subheader">대기 중인 요청</div>
<div class="h1 mb-3 text-warning">{{ stats.pending_charge_requests }}</div>
</div>
</div>
</div>
<div class="col-sm-6 col-lg-3">
<div class="card">
<div class="card-body">
<div class="subheader">오늘 승인한 요청</div>
<div class="h1 mb-3 text-success">{{ stats.today_charge }}</div>
</div>
</div>
</div>
<div class="col-sm-6 col-lg-3">
<div class="card">
<div class="card-body">
<div class="subheader">오늘 소모 크레딧</div>
<div class="h1 mb-3 text-danger">{{ stats.today_consume }}</div>
</div>
</div>
</div>
<div class="col-sm-6 col-lg-3">
<div class="card">
<div class="card-body">
<div class="subheader">이번 달 소모 크레딧</div>
<div class="h1 mb-3 text-danger">{{ stats.month_consume }}</div>
</div>
</div>
</div>
</div>
</div>
<!-- 대기 중인 요청 -->
<div class="col-12 col-lg-6">
<div class="card">
<div class="card-header">
<h3 class="card-title">대기 중인 요청</h3>
<div class="card-options">
<a href="{{ request.url_for('admin:list', identity='credit-charge-request') }}" class="btn btn-sm btn-primary">전체 보기</a>
</div>
</div>
<div class="table-responsive">
<table class="table table-vcenter card-table">
<thead>
<tr>
<th>사용자 UUID</th>
<th>요청 크레딧</th>
<th>요청일시</th>
</tr>
</thead>
<tbody>
{% for req in pending_requests %}
<tr>
<td class="text-truncate" style="max-width:150px;">{{ req.user_uuid }}</td>
<td>{{ req.requested_amount }}</td>
<td>{{ req.created_at.strftime('%Y-%m-%d %H:%M') }}</td>
</tr>
{% else %}
<tr><td colspan="3" class="text-center text-muted">대기 중인 요청 없음</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
<!-- 최근 크레딧 변화 -->
<div class="col-12 col-lg-6">
<div class="card">
<div class="card-header">
<h3 class="card-title">최근 크레딧 변화</h3>
<div class="card-options">
<a href="{{ request.url_for('admin:list', identity='credit-transaction') }}" class="btn btn-sm btn-primary">전체 보기</a>
</div>
</div>
<div class="table-responsive">
<table class="table table-vcenter card-table">
<thead>
<tr>
<th>사용자 UUID</th>
<th>유형</th>
<th>변경</th>
<th>잔액</th>
<th>일시</th>
</tr>
</thead>
<tbody>
{% for tx in recent_transactions %}
<tr>
<td class="text-truncate" style="max-width:120px;">{{ tx.user_uuid }}</td>
<td>{{ tx.type }}</td>
<td class="{{ 'text-success' if tx.amount > 0 else 'text-danger' }}">{{ '%+d' % tx.amount }}</td>
<td>{{ tx.balance_after }}</td>
<td>{{ tx.created_at.strftime('%Y-%m-%d %H:%M') }}</td>
</tr>
{% else %}
<tr><td colspan="5" class="text-center text-muted">이력 없음</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
<!-- 최근 가입 사용자 -->
<div class="col-12">
<div class="card">
<div class="card-header">
<h3 class="card-title">최근 가입 사용자</h3>
<div class="card-options">
<a href="{{ request.url_for('admin:list', identity='user') }}" class="btn btn-sm btn-primary">전체 보기</a>
</div>
</div>
<div class="table-responsive">
<table class="table table-vcenter card-table">
<thead>
<tr>
<th>닉네임</th>
<th>이메일</th>
<th>크레딧</th>
<th>권한</th>
<th>가입일시</th>
</tr>
</thead>
<tbody>
{% for user in recent_users %}
<tr>
<td>{{ user.nickname or '-' }}</td>
<td>{{ user.email or '-' }}</td>
<td>{{ user.credits }}</td>
<td>{{ user.role }}</td>
<td>{{ user.created_at.strftime('%Y-%m-%d %H:%M') }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,65 @@
{% extends "sqladmin/base.html" %}
{% from 'sqladmin/_macros.html' import display_menu %}
{% block body %}
<div class="wrapper">
<aside class="navbar navbar-expand-lg navbar-vertical navbar-expand-md navbar-dark">
<div class="container-fluid">
<h1 class="navbar-brand navbar-brand-autodark">
<a href="{{ url_for('admin:index') }}">
{% if admin.logo_url %}
<img src="{{ admin.logo_url }}" width="64" height="64" alt="Admin" class="navbar-brand-image" />
{% else %}
<h3>{{ admin.title }}</h3>
{% endif %}
</a>
</h1>
<nav class="navbar navbar-expand-sm" id="navbar-menu">
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbarSupportedContent"
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
{{ display_menu(admin._menu, request) }}
</div>
</nav>
{% if admin.authentication_backend %}
<div class="mb-2 text-center text-white">
<div class="fw-bold">{{ request.session.get('admin_name', '') }}</div>
<small>
{% if request.session.get('admin_role') == 'superadmin' %}
<span class="badge bg-danger">전체 관리자</span>
{% else %}
<span class="badge bg-warning text-dark">일반 관리자</span>
{% endif %}
</small>
</div>
<a href="{{ request.url_for('admin:logout') }}" class="btn btn-secondary btn-icon">
<i class="fa fa-sign-out"></i>
<span>Logout</span>
</a>
{% endif %}
</div>
</aside>
<div class="page-wrapper">
<div class="container-fluid">
<div class="page-header d-print-none">
{% block content_header %}
<div class="row align-items-center">
<div class="col">
<h2 class="page-title">{{ title }}</h2>
<div class="page-pretitle">{{ subtitle }}</div>
</div>
</div>
{% endblock %}
</div>
</div>
<div class="page-body flex-grow-1">
<div class="container-fluid">
<div class="row row-deck row-cards">
{% block content %} {% endblock %}
</div>
</div>
</div>
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,299 @@
{% extends "sqladmin/layout.html" %}
{% block content %}
<div class="container-fluid">
<div class="row">
<div class="col-12">
<div class="d-flex">
<div class="flex-grow-1 me-2">
<div class="card">
<div class="card-header">
<h3 class="card-title">{{ model_view.name_plural }}</h3>
<div class="ms-auto">
{% if model_view.can_export %}
{% if model_view.export_types | length > 1 %}
<div class="ms-3 d-inline-block dropdown">
<a href="#" class="btn btn-secondary dropdown-toggle" id="dropdownMenuButton1" data-bs-toggle="dropdown"
aria-expanded="false">
Export
</a>
<ul class="dropdown-menu" aria-labelledby="dropdownMenuButton1">
{% for export_type in model_view.export_types %}
<li><a class="dropdown-item"
href="{{ url_for('admin:export', identity=model_view.identity, export_type=export_type) }}">{{
export_type | upper }}</a></li>
{% endfor %}
</ul>
</div>
{% elif model_view.export_types | length == 1 %}
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:export', identity=model_view.identity, export_type=model_view.export_types[0]) }}"
class="btn btn-secondary">
Export
</a>
</div>
{% endif %}
{% endif %}
{% if model_view.can_create %}
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:create', identity=model_view.identity) }}" class="btn btn-primary">
+ New {{ model_view.name }}
</a>
</div>
{% endif %}
</div>
</div>
<div class="card-body border-bottom py-3">
<div class="d-flex justify-content-between">
<div class="dropdown col-4">
<button {% if not model_view.can_delete and not model_view._custom_actions_in_list %} disabled {% endif %}
class="btn btn-light dropdown-toggle" type="button" id="dropdownMenuButton" data-toggle="dropdown"
aria-haspopup="true" aria-expanded="false">
Actions
</button>
{% if model_view.can_delete or model_view._custom_actions_in_list %}
<div class="dropdown-menu" aria-labelledby="dropdownMenuButton">
{% if model_view.can_delete and request.session.get('admin_role') == 'superadmin' %}
<a class="dropdown-item" id="action-delete" href="#" data-name="{{ model_view.name }}"
data-url="{{ url_for('admin:delete', identity=model_view.identity) }}" data-bs-toggle="modal"
data-bs-target="#modal-delete">Delete selected items</a>
{% endif %}
{% for custom_action, label in model_view._custom_actions_in_list.items() %}
{% if custom_action in model_view._custom_actions_confirmation %}
<a class="dropdown-item" id="action-customconfirm-{{ custom_action }}" href="#" data-bs-toggle="modal"
data-bs-target="#modal-confirmation-{{ custom_action }}">
{{ label }}
</a>
{% else %}
<a class="dropdown-item" id="action-custom-{{ custom_action }}" href="#"
data-url="{{ model_view._url_for_action(request, custom_action) }}">
{{ label }}
</a>
{% endif %}
{% endfor %}
</div>
{% endif %}
</div>
{% if model_view.column_searchable_list %}
<div class="col-md-4 text-muted">
<div class="input-group">
<input id="search-input" type="text" class="form-control"
placeholder="Search: {{ model_view.search_placeholder() }}"
value="{{ request.query_params.get('search', '') }}">
<button id="search-button" class="btn" type="button">Search</button>
<button id="search-reset" class="btn" type="button" {% if not request.query_params.get('search')
%}disabled{% endif %}><i class="fa-solid fa-times"></i></button>
</div>
</div>
{% endif %}
</div>
</div>
<div class="table-responsive">
<table class="table card-table table-vcenter text-nowrap">
<thead>
<tr>
<th class="w-1"><input class="form-check-input m-0 align-middle" type="checkbox" aria-label="Select all"
id="select-all"></th>
<th class="w-1"></th>
{% for name in model_view._list_prop_names %}
{% set label = model_view._column_labels.get(name, name) %}
<th>
{% if name in model_view._sort_fields %}
{% if request.query_params.get("sortBy") == name and request.query_params.get("sort") == "asc" %}
<a href="{{ request.url.include_query_params(sort='desc') }}"><i class="fa-solid fa-arrow-up"></i> {{
label }}</a>
{% elif request.query_params.get("sortBy") == name and request.query_params.get("sort") == "desc" %}
<a href="{{ request.url.include_query_params(sort='asc') }}"><i class="fa-solid fa-arrow-down"></i> {{ label
}}</a>
{% else %}
<a href="{{ request.url.include_query_params(sortBy=name, sort='asc') }}">{{ label }}</a>
{% endif %}
{% else %}
{{ label }}
{% endif %}
</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for row in pagination.rows %}
<tr>
<td>
<input type="hidden" value="{{ get_object_identifier(row) }}">
<input class="form-check-input m-0 align-middle select-box" type="checkbox" aria-label="Select item">
</td>
<td class="text-end">
{% if model_view.can_view_details %}
<a href="{{ model_view._build_url_for('admin:details', request, row) }}" data-bs-toggle="tooltip"
data-bs-placement="top" title="View">
<span class="me-1"><i class="fa-solid fa-eye"></i></span>
</a>
{% endif %}
{% if model_view.can_edit and request.session.get('admin_role') == 'superadmin' %}
<a href="{{ model_view._build_url_for('admin:edit', request, row) }}" data-bs-toggle="tooltip"
data-bs-placement="top" title="Edit">
<span class="me-1"><i class="fa-solid fa-pen-to-square"></i></span>
</a>
{% endif %}
{% if model_view.can_delete and request.session.get('admin_role') == 'superadmin' %}
<a href="#" data-name="{{ model_view.name }}" data-pk="{{ get_object_identifier(row) }}"
data-url="{{ model_view._url_for_delete(request, row) }}" data-bs-toggle="modal"
data-bs-target="#modal-delete" title="Delete">
<span class="me-1"><i class="fa-solid fa-trash"></i></span>
</a>
{% endif %}
</td>
{% for name in model_view._list_prop_names %}
{% set value, formatted_value = model_view.get_list_value(row, name) %}
{% if name in model_view._relation_names %}
{% if is_list( value ) %}
<td>
{% for elem, formatted_elem in zip(value, formatted_value) %}
{% if model_view.show_compact_lists %}
<a href="{{ model_view._build_url_for('admin:details', request, elem) }}">({{ formatted_elem }})</a>
{% else %}
<a href="{{ model_view._build_url_for('admin:details', request, elem) }}">{{ formatted_elem }}</a><br/>
{% endif %}
{% endfor %}
</td>
{% else %}
<td><a href="{{ model_view._url_for_details_with_prop(request, row, name) }}">{{ formatted_value }}</a></td>
{% endif %}
{% else %}
<td>{{ formatted_value }}</td>
{% endif %}
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
</div>
<div class="card-footer d-flex justify-content-between align-items-center gap-2">
<p class="m-0 text-muted">Showing <span>{{ ((pagination.page - 1) * pagination.page_size) + 1 }}</span> to
<span>{{ min(pagination.page * pagination.page_size, pagination.count) }}</span> of <span>{{ pagination.count
}}</span> items
</p>
<ul class="pagination m-0 ms-auto">
<li class="page-item {% if not pagination.has_previous %}disabled{% endif %}">
{% if pagination.has_previous %}
<a class="page-link" href="{{ pagination.previous_page.url }}">
{% else %}
<a class="page-link" href="#">
{% endif %}
<i class="fa-solid fa-chevron-left"></i>
prev
</a>
</li>
{% for page_control in pagination.page_controls %}
<li class="page-item {% if page_control.number == pagination.page %}active{% endif %}"><a class="page-link"
href="{{ page_control.url }}">{{ page_control.number }}</a></li>
{% endfor %}
<li class="page-item {% if not pagination.has_next %}disabled{% endif %}">
{% if pagination.has_next %}
<a class="page-link" href="{{ pagination.next_page.url }}">
{% else %}
<a class="page-link" href="#">
{% endif %}
next
<i class="fa-solid fa-chevron-right"></i>
</a>
</li>
</ul>
<div class="dropdown text-muted">
Show
<a href="#" class="btn btn-sm btn-light dropdown-toggle" data-toggle="dropdown" aria-haspopup="true"
aria-expanded="false">
{{ request.query_params.get("pageSize") or model_view.page_size }} / Page
</a>
<div class="dropdown-menu">
{% for page_size_option in model_view.page_size_options %}
<a class="dropdown-item" href="{{ request.url.include_query_params(pageSize=page_size_option, page=pagination.resize(page_size_option).page) }}">
{{ page_size_option }} / Page
</a>
{% endfor %}
</div>
</div>
</div>
</div>
</div>
{% if model_view.get_filters() %}
<div class="col-md-3" style="width: 300px; flex-shrink: 0;">
<div id="filter-sidebar" class="card">
<div class="card-header">
<h3 class="card-title">Filters</h3>
</div>
<div class="card-body">
{% for filter in model_view.get_filters() %}
{% if filter.has_operator %}
<div class="mb-3">
<div class="fw-bold text-truncate">{{ filter.title }}</div>
<div>
<!-- Show current filter if active -->
{% set current_filter = request.query_params.get(filter.parameter_name, '') %}
{% set current_op = request.query_params.get(filter.parameter_name + '_op', '') %}
{% if current_filter %}
<div class="mb-2 text-muted small">
Current: {{ current_op }} {{ current_filter }}
<a href="{{ request.url.remove_query_params(filter.parameter_name).remove_query_params(filter.parameter_name + '_op') }}" class="text-decoration-none">[Clear]</a>
</div>
{% endif %}
<!-- Single form with dropdown for operations -->
<form method="get" class="d-flex flex-column" style="gap: 8px;">
<!-- Preserve existing query parameters -->
{% for key, value in request.query_params.items() %}
{% if key != filter.parameter_name and key != filter.parameter_name + '_op' %}
<input type="hidden" name="{{ key }}" value="{{ value }}">
{% endif %}
{% endfor %}
<!-- Operation dropdown -->
<select name="{{ filter.parameter_name }}_op" class="form-select form-select-sm" required>
<option value="">Select operation...</option>
{% for op_value, op_label in filter.get_operation_options_for_model(model_view.model) %}
<option value="{{ op_value }}" {% if current_op == op_value %}selected{% endif %}>{{ op_label }}</option>
{% endfor %}
</select>
<!-- Value input -->
<input type="text"
name="{{ filter.parameter_name }}"
placeholder="Enter value"
class="form-control form-control-sm"
value="{{ current_filter }}"
required>
<button type="submit" class="btn btn-sm btn-outline-primary">Apply Filter</button>
</form>
</div>
</div>
{% else %}
<!-- Fallback for other filter types -->
<div class="mb-3">
<div class="fw-bold text-truncate">{{ filter.title }}</div>
<div>
{% for lookup in filter.lookups(request, model_view.model, model_view._run_arbitrary_query) %}
<a href="{{ request.url.include_query_params(**{filter.parameter_name: lookup[0]}) }}" class="d-block text-decoration-none text-truncate">
{{ lookup[1] }}
</a>
{% endfor %}
</div>
</div>
{% endif %}
{% endfor %}
</div>
</div>
</div>
{% endif %}
</div>
</div>
</div>
{% if model_view.can_delete %}
{% include 'sqladmin/modals/delete.html' %}
{% endif %}
{% for custom_action in model_view._custom_actions_in_list %}
{% if custom_action in model_view._custom_actions_confirmation %}
{% with confirmation_message = model_view._custom_actions_confirmation[custom_action], custom_action=custom_action,
url=model_view._url_for_action(request, custom_action) %}
{% include 'sqladmin/modals/list_action_confirmation.html' %}
{% endwith %}
{% endif %}
{% endfor %}
</div>
{% endblock %}

41
app/backoffice/mixins.py Normal file
View File

@ -0,0 +1,41 @@
from starlette.requests import Request
class SuperAdminOnly:
"""superadmin만 접근 가능 (편집/삭제/액션 모두 허용)"""
def is_accessible(self, request: Request) -> bool:
return request.session.get("admin_role") == "superadmin"
class ViewerReadOnly:
"""viewer만 접근 가능한 읽기 전용 뷰"""
can_create = False
can_edit = False
can_delete = False
def is_accessible(self, request: Request) -> bool:
return request.session.get("admin_role") == "viewer"
class ViewerAccessible:
"""superadmin + viewer 접근 가능, 읽기 전용"""
can_create = False
can_edit = False
can_delete = False
def is_accessible(self, request: Request) -> bool:
return request.session.get("admin_role") in ("superadmin", "viewer")
class SuperAdminEditable:
"""superadmin + viewer 접근 가능, superadmin만 편집"""
can_create = False
can_edit = False
can_delete = False
def is_accessible(self, request: Request) -> bool:
return request.session.get("admin_role") in ("superadmin", "viewer")

View File

@ -0,0 +1,114 @@
import logging
from typing import Optional
from sqlalchemy import select
from starlette.requests import Request
from starlette.responses import RedirectResponse
from app.credit.models import CreditTransactionType
from app.credit.services.credit_service import charge_credit, deduct_credit
from app.database.session import AsyncSessionLocal
from app.user.models import User
logger = logging.getLogger(__name__)
async def _get_users_by_pks(session, pks: str) -> list[User]:
ids = [int(pk) for pk in pks.split(",") if pk.strip()]
if not ids:
return []
result = await session.execute(select(User).where(User.id.in_(ids)))
return list(result.scalars().all())
async def handle_block_users(request: Request, identity: str, block: bool) -> RedirectResponse:
pks = request.query_params.get("pks", "")
action_str = "차단" if block else "차단 해제"
async with AsyncSessionLocal() as session:
users = await _get_users_by_pks(session, pks)
for user in users:
user.is_active = not block
await session.commit()
logger.info(f"[USER-ADMIN] {action_str} count={len(users)}")
return RedirectResponse(request.url_for("admin:list", identity=identity), status_code=302)
async def handle_set_role(request: Request, identity: str, role: str) -> RedirectResponse:
pks = request.query_params.get("pks", "")
is_admin = role == "admin"
async with AsyncSessionLocal() as session:
users = await _get_users_by_pks(session, pks)
for user in users:
user.role = role
user.is_admin = is_admin
await session.commit()
logger.info(f"[USER-ADMIN] set_role role={role} count={len(users)}")
return RedirectResponse(request.url_for("admin:list", identity=identity), status_code=302)
async def handle_grant_credits(
request: Request,
identity: str,
amount: int,
admin_id: Optional[int],
) -> RedirectResponse:
pks = request.query_params.get("pks", "")
ids = [int(pk) for pk in pks.split(",") if pk.strip()]
async with AsyncSessionLocal() as session:
for user_id in ids:
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
continue
try:
await charge_credit(
session=session,
user_uuid=user.user_uuid,
amount=amount,
type=CreditTransactionType.ADMIN_ADJUST,
reason="관리자 수동 충전",
admin_id=admin_id,
)
await session.commit()
except Exception as e:
await session.rollback()
logger.warning(f"[USER-ADMIN] grant_credits failed user_id={user_id} error={e}")
return RedirectResponse(request.url_for("admin:list", identity=identity), status_code=302)
async def handle_deduct_credits(
request: Request,
identity: str,
amount: int,
admin_id: Optional[int],
) -> RedirectResponse:
pks = request.query_params.get("pks", "")
ids = [int(pk) for pk in pks.split(",") if pk.strip()]
async with AsyncSessionLocal() as session:
for user_id in ids:
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
continue
try:
await deduct_credit(
session=session,
user_uuid=user.user_uuid,
amount=amount,
type=CreditTransactionType.ADMIN_ADJUST,
reason="관리자 수동 차감",
admin_id=admin_id,
)
await session.commit()
except Exception as e:
await session.rollback()
logger.warning(f"[USER-ADMIN] deduct_credits failed user_id={user_id} error={e}")
return RedirectResponse(request.url_for("admin:list", identity=identity), status_code=302)

0
app/credit/__init__.py Normal file
View File

View File

View File

View File

View File

@ -0,0 +1,162 @@
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.credit.exceptions import ChargeRequestForbiddenError, ChargeRequestNotFoundError
from app.credit.models import ChargeRequestStatus, CreditChargeRequest, CreditTransaction
from app.credit.schemas.credit_schema import (
ChargeRequestCreate,
ChargeRequestListResponse,
ChargeRequestResponse,
CreditTransactionListResponse,
CreditTransactionResponse,
)
from app.database.session import get_session
from app.user.dependencies.auth import get_current_user
from app.user.models import User
router = APIRouter(prefix="/credits", tags=["Credits"])
@router.post(
"/charge-requests",
response_model=ChargeRequestResponse,
status_code=201,
summary="크레딧 충전 요청 제출",
)
async def create_charge_request(
body: ChargeRequestCreate,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> ChargeRequestResponse:
charge_request = CreditChargeRequest(
user_uuid=current_user.user_uuid,
requested_amount=body.requested_amount,
message=body.message,
)
session.add(charge_request)
await session.commit()
await session.refresh(charge_request)
return ChargeRequestResponse.model_validate(charge_request)
@router.get(
"/charge-requests",
response_model=ChargeRequestListResponse,
summary="내 충전 요청 목록",
)
async def list_charge_requests(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> ChargeRequestListResponse:
offset = (page - 1) * page_size
total_result = await session.execute(
select(func.count()).where(CreditChargeRequest.user_uuid == current_user.user_uuid)
)
total = total_result.scalar_one()
items_result = await session.execute(
select(CreditChargeRequest)
.where(CreditChargeRequest.user_uuid == current_user.user_uuid)
.order_by(CreditChargeRequest.created_at.desc())
.offset(offset)
.limit(page_size)
)
items = items_result.scalars().all()
return ChargeRequestListResponse(
items=[ChargeRequestResponse.model_validate(i) for i in items],
total=total,
page=page,
page_size=page_size,
)
@router.get(
"/charge-requests/{request_id}",
response_model=ChargeRequestResponse,
summary="내 충전 요청 상세",
)
async def get_charge_request(
request_id: int,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> ChargeRequestResponse:
result = await session.execute(
select(CreditChargeRequest).where(
CreditChargeRequest.id == request_id,
CreditChargeRequest.user_uuid == current_user.user_uuid,
)
)
charge_request = result.scalar_one_or_none()
if charge_request is None:
raise ChargeRequestNotFoundError()
return ChargeRequestResponse.model_validate(charge_request)
@router.delete(
"/charge-requests/{request_id}",
status_code=204,
summary="충전 요청 취소 (pending 상태만)",
)
async def cancel_charge_request(
request_id: int,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> None:
result = await session.execute(
select(CreditChargeRequest).where(CreditChargeRequest.id == request_id)
)
charge_request = result.scalar_one_or_none()
if charge_request is None:
raise ChargeRequestNotFoundError()
if charge_request.user_uuid != current_user.user_uuid:
raise ChargeRequestForbiddenError()
from app.credit.exceptions import InvalidRequestStateError
if charge_request.status != ChargeRequestStatus.PENDING:
raise InvalidRequestStateError("대기 중인 요청만 취소할 수 있습니다.")
charge_request.status = ChargeRequestStatus.CANCELLED
await session.commit()
@router.get(
"/transactions",
response_model=CreditTransactionListResponse,
summary="내 크레딧 거래 이력",
)
async def list_transactions(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> CreditTransactionListResponse:
offset = (page - 1) * page_size
total_result = await session.execute(
select(func.count()).where(CreditTransaction.user_uuid == current_user.user_uuid)
)
total = total_result.scalar_one()
items_result = await session.execute(
select(CreditTransaction)
.where(CreditTransaction.user_uuid == current_user.user_uuid)
.order_by(CreditTransaction.created_at.desc())
.offset(offset)
.limit(page_size)
)
items = items_result.scalars().all()
return CreditTransactionListResponse(
items=[CreditTransactionResponse.model_validate(i) for i in items],
total=total,
page=page,
page_size=page_size,
)

27
app/credit/exceptions.py Normal file
View File

@ -0,0 +1,27 @@
from starlette import status
from app.core.exceptions import FastShipError
class InsufficientCreditError(FastShipError):
"""크레딧이 부족합니다."""
status = status.HTTP_400_BAD_REQUEST
class InvalidRequestStateError(FastShipError):
"""이미 처리된 요청입니다."""
status = status.HTTP_409_CONFLICT
class ChargeRequestNotFoundError(FastShipError):
"""충전 요청을 찾을 수 없습니다."""
status = status.HTTP_404_NOT_FOUND
class ChargeRequestForbiddenError(FastShipError):
"""본인의 충전 요청만 조회할 수 있습니다."""
status = status.HTTP_403_FORBIDDEN

243
app/credit/models.py Normal file
View File

@ -0,0 +1,243 @@
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Optional
from sqlalchemy import BigInteger, DateTime, ForeignKey, Index, Integer, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database.session import Base
if TYPE_CHECKING:
from app.backoffice.admin.models import Admin
from app.user.models import User
class ChargeRequestStatus(str, Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
CANCELLED = "cancelled"
class CreditTransactionType(str, Enum):
CHARGE = "charge"
CONSUME = "consume"
REFUND = "refund"
ADMIN_ADJUST = "admin_adjust"
class CreditChargeRequest(Base):
__tablename__ = "credit_charge_request"
__table_args__ = (
Index("idx_credit_request_user_uuid", "user_uuid"),
Index("idx_credit_request_status", "status"),
Index("idx_credit_request_created_at", "created_at"),
Index("idx_credit_request_status_created", "status", "created_at"),
{
"mysql_engine": "InnoDB",
"mysql_charset": "utf8mb4",
"mysql_collate": "utf8mb4_unicode_ci",
},
)
id: Mapped[int] = mapped_column(
BigInteger,
primary_key=True,
nullable=False,
autoincrement=True,
comment="고유 식별자",
)
user_uuid: Mapped[str] = mapped_column(
String(36),
ForeignKey("user.user_uuid", ondelete="CASCADE"),
nullable=False,
comment="사용자 UUID (user.user_uuid 참조)",
)
requested_amount: Mapped[int] = mapped_column(
Integer,
nullable=False,
comment="요청 크레딧 수량 (양수)",
)
message: Mapped[Optional[str]] = mapped_column(
String(500),
nullable=True,
comment="사용자 요청 메시지",
)
status: Mapped[str] = mapped_column(
String(20),
nullable=False,
default=ChargeRequestStatus.PENDING,
server_default="pending",
comment="처리 상태 (pending/approved/rejected/cancelled)",
)
admin_id: Mapped[Optional[int]] = mapped_column(
BigInteger,
ForeignKey("admin.id", ondelete="SET NULL"),
nullable=True,
comment="처리한 백오피스 관리자 ID",
)
admin_note: Mapped[Optional[str]] = mapped_column(
String(1000),
nullable=True,
comment="관리자 메모",
)
processed_at: Mapped[Optional[datetime]] = mapped_column(
DateTime,
nullable=True,
comment="처리 일시",
)
created_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.now(),
comment="요청 일시",
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.now(),
onupdate=func.now(),
comment="수정 일시",
)
user: Mapped["User"] = relationship(
"User",
foreign_keys=[user_uuid],
primaryjoin="CreditChargeRequest.user_uuid == User.user_uuid",
back_populates="credit_requests",
lazy="noload",
)
transactions: Mapped[list["CreditTransaction"]] = relationship(
"CreditTransaction",
back_populates="charge_request",
lazy="noload",
)
admin: Mapped[Optional["Admin"]] = relationship(
"Admin",
foreign_keys=[admin_id],
primaryjoin="CreditChargeRequest.admin_id == Admin.id",
lazy="selectin",
)
def __repr__(self) -> str:
return (
f"<CreditChargeRequest("
f"id={self.id}, user_uuid='{self.user_uuid}', "
f"amount={self.requested_amount}, status='{self.status}'"
f")>"
)
class CreditTransaction(Base):
__tablename__ = "credit_transaction"
__table_args__ = (
Index("idx_credit_tx_user_uuid", "user_uuid"),
Index("idx_credit_tx_user_uuid_created", "user_uuid", "created_at"),
Index("idx_credit_tx_type", "type"),
Index("idx_credit_tx_related_request", "related_request_id"),
{
"mysql_engine": "InnoDB",
"mysql_charset": "utf8mb4",
"mysql_collate": "utf8mb4_unicode_ci",
},
)
id: Mapped[int] = mapped_column(
BigInteger,
primary_key=True,
nullable=False,
autoincrement=True,
comment="고유 식별자",
)
user_uuid: Mapped[str] = mapped_column(
String(36),
ForeignKey("user.user_uuid", ondelete="CASCADE"),
nullable=False,
comment="사용자 UUID",
)
amount: Mapped[int] = mapped_column(
Integer,
nullable=False,
comment="변경 크레딧 수량 (충전 양수, 차감 음수)",
)
balance_after: Mapped[int] = mapped_column(
Integer,
nullable=False,
comment="변경 직후 잔액",
)
type: Mapped[str] = mapped_column(
String(20),
nullable=False,
comment="변경 유형 (charge/consume/refund/admin_adjust)",
)
reason: Mapped[Optional[str]] = mapped_column(
String(255),
nullable=True,
comment="변경 사유",
)
admin_id: Mapped[Optional[int]] = mapped_column(
BigInteger,
ForeignKey("admin.id", ondelete="SET NULL"),
nullable=True,
comment="처리 관리자 ID (관리자 충전/차감 시)",
)
related_request_id: Mapped[Optional[int]] = mapped_column(
BigInteger,
ForeignKey("credit_charge_request.id", ondelete="SET NULL"),
nullable=True,
comment="연관 충전 요청 ID",
)
created_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.now(),
comment="변경 일시",
)
user: Mapped["User"] = relationship(
"User",
foreign_keys=[user_uuid],
primaryjoin="CreditTransaction.user_uuid == User.user_uuid",
back_populates="credit_transactions",
lazy="noload",
)
charge_request: Mapped[Optional[CreditChargeRequest]] = relationship(
"CreditChargeRequest",
back_populates="transactions",
lazy="noload",
)
admin: Mapped[Optional["Admin"]] = relationship(
"Admin",
foreign_keys=[admin_id],
primaryjoin="CreditTransaction.admin_id == Admin.id",
lazy="selectin",
)
def __repr__(self) -> str:
return (
f"<CreditTransaction("
f"id={self.id}, user_uuid='{self.user_uuid}', "
f"amount={self.amount}, type='{self.type}'"
f")>"
)

View File

View File

@ -0,0 +1,59 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, ConfigDict, Field
class ChargeRequestCreate(BaseModel):
requested_amount: int = Field(..., gt=0, le=10000, description="요청 크레딧 수량")
message: Optional[str] = Field(None, max_length=500, description="요청 메시지")
model_config = {
"json_schema_extra": {
"example": {
"requested_amount": 10,
"message": "크레딧 충전 요청합니다.",
}
}
}
class ChargeRequestResponse(BaseModel):
id: int
user_uuid: str
requested_amount: int
message: Optional[str]
status: str
admin_note: Optional[str]
processed_at: Optional[datetime]
created_at: datetime
updated_at: datetime
model_config = ConfigDict(from_attributes=True)
class ChargeRequestListResponse(BaseModel):
items: list[ChargeRequestResponse]
total: int
page: int
page_size: int
class CreditTransactionResponse(BaseModel):
id: int
user_uuid: str
amount: int
balance_after: int
type: str
reason: Optional[str]
related_request_id: Optional[int]
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class CreditTransactionListResponse(BaseModel):
items: list[CreditTransactionResponse]
total: int
page: int
page_size: int

View File

View File

@ -0,0 +1,195 @@
import logging
from datetime import datetime
from typing import Optional
from config import TIMEZONE
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.credit.exceptions import (
ChargeRequestNotFoundError,
InsufficientCreditError,
InvalidRequestStateError,
)
from app.credit.models import (
ChargeRequestStatus,
CreditChargeRequest,
CreditTransaction,
CreditTransactionType,
)
logger = logging.getLogger(__name__)
async def record_transaction(
*,
session: AsyncSession,
user_uuid: str,
amount: int,
balance_after: int,
type: CreditTransactionType,
reason: Optional[str] = None,
admin_id: Optional[int] = None,
related_request_id: Optional[int] = None,
) -> CreditTransaction:
tx = CreditTransaction(
user_uuid=user_uuid,
amount=amount,
balance_after=balance_after,
type=type,
reason=reason,
admin_id=admin_id,
related_request_id=related_request_id,
)
session.add(tx)
await session.flush()
return tx
async def charge_credit(
*,
session: AsyncSession,
user_uuid: str,
amount: int,
type: CreditTransactionType = CreditTransactionType.CHARGE,
reason: Optional[str] = None,
admin_id: Optional[int] = None,
related_request_id: Optional[int] = None,
) -> CreditTransaction:
from app.user.models import User
result = await session.execute(
select(User).where(User.user_uuid == user_uuid).with_for_update()
)
user = result.scalar_one_or_none()
if user is None:
from app.user.services.auth import UserNotFoundError
raise UserNotFoundError()
user.credits = user.credits + amount
await session.flush()
tx = await record_transaction(
session=session,
user_uuid=user_uuid,
amount=amount,
balance_after=user.credits,
type=type,
reason=reason,
admin_id=admin_id,
related_request_id=related_request_id,
)
logger.info(f"[CREDIT] charge user_uuid={user_uuid} amount=+{amount} balance_after={user.credits}")
return tx
async def deduct_credit(
*,
session: AsyncSession,
user_uuid: str,
amount: int,
type: CreditTransactionType = CreditTransactionType.CONSUME,
reason: Optional[str] = None,
admin_id: Optional[int] = None,
) -> CreditTransaction:
from app.user.models import User
result = await session.execute(
select(User).where(User.user_uuid == user_uuid).with_for_update()
)
user = result.scalar_one_or_none()
if user is None:
from app.user.services.auth import UserNotFoundError
raise UserNotFoundError()
if user.credits < amount:
logger.warning(f"[CREDIT] insufficient credits user_uuid={user_uuid} credits={user.credits} requested={amount}")
raise InsufficientCreditError()
user.credits = user.credits - amount
await session.flush()
tx = await record_transaction(
session=session,
user_uuid=user_uuid,
amount=-amount,
balance_after=user.credits,
type=type,
reason=reason,
admin_id=admin_id,
)
logger.info(f"[CREDIT] deduct user_uuid={user_uuid} amount=-{amount} balance_after={user.credits}")
return tx
async def approve_charge_request(
*,
session: AsyncSession,
request_id: int,
admin_id: int,
admin_note: Optional[str] = None,
) -> CreditChargeRequest:
result = await session.execute(
select(CreditChargeRequest)
.where(CreditChargeRequest.id == request_id)
.with_for_update()
)
charge_request = result.scalar_one_or_none()
if charge_request is None:
raise ChargeRequestNotFoundError()
if charge_request.status != ChargeRequestStatus.PENDING:
logger.warning(f"[CREDIT] approve blocked request_id={request_id} status={charge_request.status}")
raise InvalidRequestStateError()
await charge_credit(
session=session,
user_uuid=charge_request.user_uuid,
amount=charge_request.requested_amount,
type=CreditTransactionType.CHARGE,
reason="충전 요청 승인",
admin_id=admin_id,
related_request_id=request_id,
)
charge_request.status = ChargeRequestStatus.APPROVED
charge_request.admin_id = admin_id
charge_request.admin_note = admin_note
charge_request.processed_at = datetime.now(TIMEZONE)
await session.flush()
logger.info(f"[CREDIT] approved request_id={request_id} admin_id={admin_id} amount={charge_request.requested_amount}")
return charge_request
async def reject_charge_request(
*,
session: AsyncSession,
request_id: int,
admin_id: int,
admin_note: Optional[str] = None,
) -> CreditChargeRequest:
result = await session.execute(
select(CreditChargeRequest)
.where(CreditChargeRequest.id == request_id)
.with_for_update()
)
charge_request = result.scalar_one_or_none()
if charge_request is None:
raise ChargeRequestNotFoundError()
if charge_request.status != ChargeRequestStatus.PENDING:
logger.warning(f"[CREDIT] reject blocked request_id={request_id} status={charge_request.status}")
raise InvalidRequestStateError()
charge_request.status = ChargeRequestStatus.REJECTED
charge_request.admin_id = admin_id
charge_request.admin_note = admin_note
charge_request.processed_at = datetime.now(TIMEZONE)
await session.flush()
logger.info(f"[CREDIT] rejected request_id={request_id} admin_id={admin_id}")
return charge_request

View File

@ -81,8 +81,10 @@ async def create_db_tables():
from app.sns.models import SNSUploadTask # noqa: F401 from app.sns.models import SNSUploadTask # noqa: F401
from app.social.models import SocialUpload # noqa: F401 from app.social.models import SocialUpload # noqa: F401
from app.dashboard.models import Dashboard # noqa: F401 from app.dashboard.models import Dashboard # noqa: F401
from app.backoffice.admin.models import Admin # noqa: F401
from app.credit.models import CreditChargeRequest, CreditTransaction # noqa: F401
# 생성할 테이블 목록 # 생성할 테이블 목록 (FK 순서: 참조 대상 먼저)
tables_to_create = [ tables_to_create = [
User.__table__, User.__table__,
RefreshToken.__table__, RefreshToken.__table__,
@ -98,6 +100,9 @@ async def create_db_tables():
MarketingIntel.__table__, MarketingIntel.__table__,
Dashboard.__table__, Dashboard.__table__,
ImageTag.__table__, ImageTag.__table__,
Admin.__table__,
CreditChargeRequest.__table__,
CreditTransaction.__table__,
] ]
logger.info("Creating database tables...") logger.info("Creating database tables...")

View File

@ -803,6 +803,8 @@ async def tagging_images(
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
for tag, tag_data in zip(null_imts, tag_datas): for tag, tag_data in zip(null_imts, tag_datas):
if isinstance(tag_data, Exception):
continue
tag.img_tag = tag_data.model_dump(mode="json") tag.img_tag = tag_data.model_dump(mode="json")
session.add(tag) session.add(tag)
await session.commit() await session.commit()

View File

@ -239,7 +239,22 @@ async def generate_lyric(
request_start = time.perf_counter() request_start = time.perf_counter()
task_id = request_body.task_id task_id = request_body.task_id
user = (await session.execute(
select(User).where(User.user_uuid == current_user.user_uuid)
)).scalar_one()
if user.credits <= 0:
logger.info(
f"크레딧 부족, user_uuid: {current_user.user_uuid}, credits: {current_user.credits}"
)
return GenerateLyricResponse(
success=False,
task_id=task_id,
lyric=None,
language=request_body.language,
error_message="No credits remaining.",
)
logger.info(f"[generate_lyric] ========== START ==========") logger.info(f"[generate_lyric] ========== START ==========")
logger.info( logger.info(

View File

@ -122,6 +122,7 @@ class SocialUploadHistoryItem(BaseModel):
platform: str = Field(..., description="플랫폼명") platform: str = Field(..., description="플랫폼명")
status: str = Field(..., description="업로드 상태") status: str = Field(..., description="업로드 상태")
title: str = Field(..., description="영상 제목") title: str = Field(..., description="영상 제목")
platform_username: Optional[str] = Field(None, description="플랫폼 채널명")
platform_url: Optional[str] = Field(None, description="플랫폼 영상 URL") platform_url: Optional[str] = Field(None, description="플랫폼 영상 URL")
error_message: Optional[str] = Field(None, description="에러 메시지") error_message: Optional[str] = Field(None, description="에러 메시지")
scheduled_at: Optional[datetime] = Field(None, description="예약 게시 시간") scheduled_at: Optional[datetime] = Field(None, description="예약 게시 시간")

View File

@ -291,6 +291,7 @@ class SocialUploadService:
platform=upload.platform, platform=upload.platform,
status=upload.status, status=upload.status,
title=upload.title, title=upload.title,
platform_username=upload.social_account.platform_data.get("display_name") if upload.social_account and upload.social_account.platform_data else None,
platform_url=upload.platform_url, platform_url=upload.platform_url,
error_message=upload.error_message, error_message=upload.error_message,
scheduled_at=upload.scheduled_at, scheduled_at=upload.scheduled_at,

View File

@ -103,6 +103,22 @@ async def generate_song(
from app.database.session import AsyncSessionLocal from app.database.session import AsyncSessionLocal
request_start = time.perf_counter() request_start = time.perf_counter()
async with AsyncSessionLocal() as session:
user = (await session.execute(
select(User).where(User.user_uuid == current_user.user_uuid)
)).scalar_one()
if user.credits <= 0:
logger.info(f"크레딧 부족, user_uuid: {current_user.user_uuid}, credits: {user.credits}")
return GenerateSongResponse(
success=False,
task_id=task_id,
song_id=None,
message="No credits remaining.",
error_message="No credits remaining.",
)
logger.info( logger.info(
f"[generate_song] START - task_id: {task_id}, " f"[generate_song] START - task_id: {task_id}, "
f"genre: {request_body.genre}, language: {request_body.language}" f"genre: {request_body.genre}, language: {request_body.language}"

View File

@ -23,6 +23,7 @@ logger = logging.getLogger(__name__)
from app.user.dependencies import get_current_user from app.user.dependencies import get_current_user
from app.user.models import RefreshToken, User from app.user.models import RefreshToken, User
from app.user.schemas.user_schema import ( from app.user.schemas.user_schema import (
CreditResponse,
KakaoCodeRequest, KakaoCodeRequest,
KakaoLoginResponse, KakaoLoginResponse,
LoginResponse, LoginResponse,
@ -353,6 +354,22 @@ async def get_me(
return UserResponse.model_validate(current_user) return UserResponse.model_validate(current_user)
@router.get(
"/me/credits",
response_model=CreditResponse,
summary="잔여 크레딧 조회",
description="현재 로그인한 사용자의 잔여 영상 생성 크레딧을 반환합니다.",
responses={
200: {"description": "조회 성공"},
401: {"description": "인증 실패 (토큰 없음/만료)"},
},
)
async def get_my_credits(
current_user: User = Depends(get_current_user),
) -> CreditResponse:
return CreditResponse(credits=current_user.credits)
# ============================================================================= # =============================================================================
# 테스트용 엔드포인트 (DEBUG 모드에서만 main.py에서 라우터가 등록됨) # 테스트용 엔드포인트 (DEBUG 모드에서만 main.py에서 라우터가 등록됨)
# ============================================================================= # =============================================================================

View File

@ -1,20 +1,35 @@
from sqladmin import ModelView import logging
from app.user.models import RefreshToken, SocialAccount, User from sqladmin import ModelView, action
from starlette.requests import Request
from starlette.responses import RedirectResponse
from app.backoffice.mixins import SuperAdminEditable, ViewerAccessible
from app.backoffice.user_view_actions import (
handle_block_users,
handle_deduct_credits,
handle_grant_credits,
)
from app.user.models import SocialAccount, User
logger = logging.getLogger(__name__)
class UserAdmin(ModelView, model=User): class UserAdmin(SuperAdminEditable, ModelView, model=User):
name = "사용자" name = "사용자"
name_plural = "사용자 목록" name_plural = "사용자 목록"
icon = "fa-solid fa-user" icon = "fa-solid fa-user"
category = "사용자 관리" category = "사용자 관리"
page_size = 20 page_size = 30
can_edit = True
can_delete = True
column_list = [ column_list = [
"id", "id",
"kakao_id", "user_uuid",
"email", "email",
"nickname", "nickname",
"credits",
"role", "role",
"is_active", "is_active",
"is_deleted", "is_deleted",
@ -23,7 +38,7 @@ class UserAdmin(ModelView, model=User):
column_details_list = [ column_details_list = [
"id", "id",
"kakao_id", "user_uuid",
"email", "email",
"nickname", "nickname",
"profile_image_url", "profile_image_url",
@ -32,6 +47,7 @@ class UserAdmin(ModelView, model=User):
"name", "name",
"birth_date", "birth_date",
"gender", "gender",
"credits",
"is_active", "is_active",
"is_admin", "is_admin",
"role", "role",
@ -42,16 +58,22 @@ class UserAdmin(ModelView, model=User):
"updated_at", "updated_at",
] ]
form_excluded_columns = [ form_columns = [
"created_at", "nickname",
"updated_at", "email",
"projects", "phone",
"refresh_tokens", "name",
"social_accounts", "birth_date",
"gender",
"credits",
"is_active",
"is_admin",
"role",
"is_deleted",
] ]
column_searchable_list = [ column_searchable_list = [
User.kakao_id, User.user_uuid,
User.email, User.email,
User.nickname, User.nickname,
User.phone, User.phone,
@ -62,9 +84,10 @@ class UserAdmin(ModelView, model=User):
column_sortable_list = [ column_sortable_list = [
User.id, User.id,
User.kakao_id, User.user_uuid,
User.email, User.email,
User.nickname, User.nickname,
User.credits,
User.role, User.role,
User.is_active, User.is_active,
User.is_deleted, User.is_deleted,
@ -73,15 +96,16 @@ class UserAdmin(ModelView, model=User):
column_labels = { column_labels = {
"id": "ID", "id": "ID",
"kakao_id": "카카오 ID", "user_uuid": "UUID",
"email": "이메일", "email": "이메일",
"nickname": "닉네임", "nickname": "닉네임",
"profile_image_url": "프로필 이미지", "profile_image_url": "프로필 이미지",
"thumbnail_image_url": "썸네일 이미지", "thumbnail_image_url": "썸네일 이미지",
"phone": "전화번호", "phone": "전화번호",
"name": "실명", "name": "이름",
"birth_date": "생년월일", "birth_date": "생년월일",
"gender": "성별", "gender": "성별",
"credits": "크레딧",
"is_active": "활성화", "is_active": "활성화",
"is_admin": "관리자", "is_admin": "관리자",
"role": "권한", "role": "권한",
@ -92,71 +116,71 @@ class UserAdmin(ModelView, model=User):
"updated_at": "수정일시", "updated_at": "수정일시",
} }
@action(
name="01_block_user",
label="계정 차단",
confirmation_message="선택한 사용자를 차단하시겠습니까?",
add_in_list=True,
)
async def seq_f_block_user_action(self, request: Request) -> RedirectResponse:
return await handle_block_users(request, self.identity, block=True)
class RefreshTokenAdmin(ModelView, model=RefreshToken): @action(
name = "리프레시 토큰" name="02_unblock_user",
name_plural = "리프레시 토큰 목록" label="차단 해제",
icon = "fa-solid fa-key" confirmation_message="선택한 사용자의 차단을 해제하시겠습니까?",
category = "사용자 관리" add_in_list=True,
page_size = 20 )
async def seq_e_unblock_user_action(self, request: Request) -> RedirectResponse:
return await handle_block_users(request, self.identity, block=False)
column_list = [ @action(
"id", name="03_grant_credits_1",
"user_id", label="크레딧 +1",
"is_revoked", confirmation_message="선택한 사용자에게 크레딧 1개를 충전하시겠습니까?",
"expires_at", add_in_list=True,
"created_at", )
] async def seq_d_grant_credits_1_action(self, request: Request) -> RedirectResponse:
admin_id = request.session.get("admin_id")
return await handle_grant_credits(request, self.identity, amount=1, admin_id=admin_id)
column_details_list = [ @action(
"id", name="04_grant_credits_5",
"user_id", label="크레딧 +5",
"token_hash", confirmation_message="선택한 사용자에게 크레딧 5개를 충전하시겠습니까?",
"expires_at", add_in_list=True,
"is_revoked", )
"created_at", async def seq_c_grant_credits_5_action(self, request: Request) -> RedirectResponse:
"revoked_at", admin_id = request.session.get("admin_id")
"user_agent", return await handle_grant_credits(request, self.identity, amount=5, admin_id=admin_id)
"ip_address",
]
form_excluded_columns = ["created_at", "user"] @action(
name="05_grant_credits_10",
label="크레딧 +10",
confirmation_message="선택한 사용자에게 크레딧 10개를 충전하시겠습니까?",
add_in_list=True,
)
async def seq_b_grant_credits_10_action(self, request: Request) -> RedirectResponse:
admin_id = request.session.get("admin_id")
return await handle_grant_credits(request, self.identity, amount=10, admin_id=admin_id)
column_searchable_list = [ @action(
RefreshToken.user_id, name="06_deduct_credits_1",
RefreshToken.token_hash, label="크레딧 -1",
RefreshToken.ip_address, confirmation_message="선택한 사용자의 크레딧 1개를 차감하시겠습니까?",
] add_in_list=True,
)
column_default_sort = (RefreshToken.created_at, True) async def seq_a_deduct_credits_1_action(self, request: Request) -> RedirectResponse:
admin_id = request.session.get("admin_id")
column_sortable_list = [ return await handle_deduct_credits(request, self.identity, amount=1, admin_id=admin_id)
RefreshToken.id,
RefreshToken.user_id,
RefreshToken.is_revoked,
RefreshToken.expires_at,
RefreshToken.created_at,
]
column_labels = {
"id": "ID",
"user_id": "사용자 ID",
"token_hash": "토큰 해시",
"expires_at": "만료일시",
"is_revoked": "폐기됨",
"created_at": "생성일시",
"revoked_at": "폐기일시",
"user_agent": "User Agent",
"ip_address": "IP 주소",
}
class SocialAccountAdmin(ModelView, model=SocialAccount): class SocialAccountAdmin(ViewerAccessible, ModelView, model=SocialAccount):
name = "소셜 계정" name = "소셜 계정"
name_plural = "소셜 계정 목록" name_plural = "소셜 계정 목록"
icon = "fa-solid fa-share-nodes" icon = "fa-solid fa-share-nodes"
category = "사용자 관리" category = "사용자 관리"
page_size = 20 page_size = 30
column_list = [ column_list = [
"id", "id",
@ -174,8 +198,6 @@ class SocialAccountAdmin(ModelView, model=SocialAccount):
"platform", "platform",
"platform_user_id", "platform_user_id",
"platform_username", "platform_username",
"platform_data",
"scope",
"token_expires_at", "token_expires_at",
"is_active", "is_active",
"is_deleted", "is_deleted",

View File

@ -16,6 +16,7 @@ from app.database.session import Base
if TYPE_CHECKING: if TYPE_CHECKING:
from app.credit.models import CreditChargeRequest, CreditTransaction
from app.home.models import Project from app.home.models import Project
@ -216,6 +217,14 @@ class User(Base):
comment="마지막 로그인 일시", comment="마지막 로그인 일시",
) )
credits: Mapped[int] = mapped_column(
Integer,
nullable=False,
default=3,
server_default="3",
comment="잔여 영상 생성 크레딧",
)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime, DateTime,
nullable=False, nullable=False,
@ -268,6 +277,24 @@ class User(Base):
lazy="selectin", lazy="selectin",
) )
credit_requests: Mapped[List["CreditChargeRequest"]] = relationship(
"CreditChargeRequest",
foreign_keys="CreditChargeRequest.user_uuid",
primaryjoin="User.user_uuid == CreditChargeRequest.user_uuid",
back_populates="user",
cascade="all, delete-orphan",
lazy="noload",
)
credit_transactions: Mapped[List["CreditTransaction"]] = relationship(
"CreditTransaction",
foreign_keys="CreditTransaction.user_uuid",
primaryjoin="User.user_uuid == CreditTransaction.user_uuid",
back_populates="user",
cascade="all, delete-orphan",
lazy="noload",
)
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return (
f"<User(" f"<User("

View File

@ -160,6 +160,22 @@ class LoginResponse(BaseModel):
} }
} }
# =============================================================================
# 크레딧 스키마
# =============================================================================
class CreditResponse(BaseModel):
"""잔여 크레딧 응답"""
credits: int = Field(..., description="영상 생성 크레딧")
model_config = {
"json_schema_extra": {
"example": {
"credits": 3
}
}
}
# ============================================================================= # =============================================================================
# 내부 사용 스키마 (카카오 API 응답 파싱) # 내부 사용 스키마 (카카오 API 응답 파싱)

View File

@ -0,0 +1,24 @@
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from app.credit.exceptions import InsufficientCreditError
from app.credit.models import CreditTransactionType
from app.credit.services.credit_service import deduct_credit
logger = logging.getLogger(__name__)
async def consume_credit(user_uuid: str, session: AsyncSession, *, reason: str = "video generation") -> bool:
"""크레딧 1 차감. 기존 호출처와 시그니처 호환 유지."""
try:
await deduct_credit(
session=session,
user_uuid=user_uuid,
amount=1,
type=CreditTransactionType.CONSUME,
reason=reason,
)
return True
except InsufficientCreditError:
return False

View File

@ -463,6 +463,7 @@ class CreatomateService:
source_elements = template["source"]["elements"] source_elements = template["source"]["elements"]
template_component_data = self.parse_template_component_name(source_elements) template_component_data = self.parse_template_component_name(source_elements)
taged_image_list = [img for img in taged_image_list if img.get("image_tag") is not None]
modifications = {} modifications = {}
for slot_idx, (template_component_name, template_type) in enumerate(template_component_data.items()): for slot_idx, (template_component_name, template_type) in enumerate(template_component_data.items()):
@ -772,7 +773,9 @@ class CreatomateService:
if "animations" not in elem: if "animations" not in elem:
continue continue
for animation in elem["animations"]: for animation in elem["animations"]:
assert animation["time"] == 0 # 0이 아닌 경우 확인 필요 if "reversed" in animation:
continue
assert animation.get("time",0) == 0 # 0이 아닌 경우 확인 필요
if "transition" in animation and animation["transition"]: if "transition" in animation and animation["transition"]:
track_maximum_duration[elem["track"]] -= animation["duration"] track_maximum_duration[elem["track"]] -= animation["duration"]
else: else:

View File

@ -1,4 +1,6 @@
import asyncio import asyncio
import re
from difflib import SequenceMatcher
from playwright.async_api import async_playwright from playwright.async_api import async_playwright
from urllib import parse from urllib import parse
import time import time
@ -95,57 +97,154 @@ patchedGetter.toString();''')
page = self.page page = self.page
await page.goto(url, wait_until=wait_until, timeout=timeout) await page.goto(url, wait_until=wait_until, timeout=timeout)
@staticmethod
def _clean_title(text: str) -> str:
return re.sub(r"<.*?>", "", text).strip()
@staticmethod
def _similarity(a: str, b: str) -> float:
return SequenceMatcher(None, a, b).ratio()
@staticmethod
def _refine_address(address: str) -> str:
"""한국 주소 패턴에서 첫 번째 유효한 주소만 추출한다."""
patterns = [
# 도로명 (정식): 경기도 가평군 운악로 278
re.compile(
r'[가-힣]+(?:특별시|광역시|특별자치시|도|특별자치도|시)\s+'
r'[가-힣\s]+?(?:로|길|대로)\s+\d+(?:-\d+)?'
),
# 지번 (정식): 경기도 가평군 조종면 운악리 278
re.compile(
r'[가-힣]+(?:특별시|광역시|특별자치시|도|특별자치도|시)\s+'
r'[가-힣\s]+?(?:읍|면|동|리|가)\s+\d+(?:-\d+)?'
),
# 도로명 (축약): 경기 가평 운악로 278
re.compile(
r'[가-힣]{1,4}\s+[가-힣]{1,6}\s+'
r'[가-힣\s]+?(?:로|길|대로)\s+\d+(?:-\d+)?'
),
# 지번 (축약): 경기 가평 조종면 운악리 278
re.compile(
r'[가-힣]{1,4}\s+[가-힣]{1,6}\s+'
r'[가-힣\s]+?(?:읍|면|동|리|가)\s+\d+(?:-\d+)?'
),
]
for pattern in patterns:
m = pattern.search(address)
if m:
return m.group().strip()
return address
async def _extract_candidates_from_list_page(self) -> list[dict]:
"""pcmap.place.naver.com iframe HTML에서 place ID와 업체명을 추출한다."""
pcmap_frame = None
for frame in self.page.frames:
if "pcmap.place.naver.com" in frame.url:
pcmap_frame = frame
logger.debug(f"[DEBUG] pcmap frame 발견: {frame.url[:80]}")
break
if not pcmap_frame:
logger.debug("[DEBUG] pcmap frame 없음")
return []
try:
html = await pcmap_frame.content()
except Exception as e:
logger.debug(f"[DEBUG] pcmap frame content 추출 실패: {e}")
return []
# {"id":"11659052","name":"프레지던트 호텔",...} 형태의 JSON 쌍 추출
pair_pattern = re.compile(
r'"id"\s*:\s*"(\d{5,})"[^}]{0,200}?"name"\s*:\s*"([^"]{1,60})"'
r'|"name"\s*:\s*"([^"]{1,60})"[^}]{0,200}?"id"\s*:\s*"(\d{5,})"'
)
seen = {} # place_id → title (순서 보존)
for m in pair_pattern.finditer(html):
if m.group(1): # id 먼저
pid, title = m.group(1), m.group(2)
else: # name 먼저
pid, title = m.group(4), m.group(3)
if pid not in seen:
seen[pid] = title
candidates = [
{"title": title, "place_url": f"https://map.naver.com/p/entry/place/{pid}"}
for pid, title in list(seen.items())[:10]
]
for i, c in enumerate(candidates):
logger.debug(f"[DEBUG] 후보 {i+1}: {c['title']} / {c['place_url']}")
logger.debug(f"[DEBUG] 목록 후보 {len(candidates)}개 추출")
return candidates
async def _try_search(self, address: str, title: str) -> str | None:
"""주어진 주소+업체명으로 검색해서 place URL을 반환한다. 실패 시 None."""
encoded_query = parse.quote(f"{address} {title}".strip())
url = f"https://map.naver.com/p/search/{encoded_query}"
try:
await self.goto_url(url, wait_until="networkidle", timeout=self._timeout * 1000)
except:
if "/place/" in self.page.url:
return self.page.url
logger.error("[ERROR] Can't Finish networkidle")
if "/place/" in self.page.url:
return self.page.url
candidates = await self._extract_candidates_from_list_page()
if candidates:
best = max(
candidates,
key=lambda c: self._similarity(title, self._clean_title(c['title']))
)
best_score = self._similarity(title, self._clean_title(best['title']))
logger.info(
f"[AUTO-SELECT] '{title}''{best['title']}' (score={best_score:.2f}) {best['place_url']}"
)
return best['place_url']
# isCorrectAnswer=true 로 강제 단일결과 재시도 (원본 로직 유지)
correct_url = self.page.url.replace("?", "?isCorrectAnswer=true&")
try:
await self.goto_url(correct_url, wait_until="networkidle", timeout=self._timeout * 1000)
except:
if "/place/" in self.page.url:
return self.page.url
logger.error("[ERROR] Can't Finish networkidle (isCorrectAnswer)")
if "/place/" in self.page.url:
return self.page.url
return None
async def get_place_id_url(self, selected): async def get_place_id_url(self, selected):
count = 0 title = self._clean_title(selected['title'])
get_place_id_url_start = time.perf_counter() address = self._clean_title(selected.get('roadAddress', selected['address']))
while (count <= self._max_retry):
title = selected['title'].replace("<b>", "").replace("</b>", "")
address = selected.get('roadAddress', selected['address']).replace("<b>", "").replace("</b>", "")
encoded_query = parse.quote(f"{address} {title}")
url = f"https://map.naver.com/p/search/{encoded_query}"
wait_first_start = time.perf_counter()
try: # 1차 시도: 원본 주소 + 업체명
await self.goto_url(url, wait_until="networkidle",timeout = self._timeout*1000) logger.debug(f"[DEBUG] 1차 시도 - address: {address}")
except: result = await self._try_search(address, title)
if "/place/" in self.page.url: if result:
return self.page.url return result
logger.error(f"[ERROR] Can't Finish networkidle")
# 2차 시도: 정제 주소 + 업체명
refined = self._refine_address(address)
if refined != address:
logger.info(f"[REFINE] 주소 정제: '{address}''{refined}'")
result = await self._try_search(refined, title)
if result:
return result
wait_first_time = (time.perf_counter() - wait_first_start) * 1000 # 3차 시도: 업체명만으로 검색
logger.info(f"[RETRY] 업체명만으로 재시도: '{title}'")
result = await self._try_search("", title)
if result:
return result
logger.debug(f"[DEBUG] Try {count+1} : Wait for perfect matching : {wait_first_time}ms") logger.error(f"[ERROR] Not found url for {selected}")
return None
if "/place/" in self.page.url:
return self.page.url
logger.debug(f"[DEBUG] Try {count+1} : url place id not found, retry for forced collect answer")
wait_forced_correct_start = time.perf_counter()
url = self.page.url.replace("?","?isCorrectAnswer=true&")
try:
await self.goto_url(url, wait_until="networkidle",timeout = self._timeout*1000)
except:
if "/place/" in self.page.url:
return self.page.url
logger.error(f"[ERROR] Can't Finish networkidle")
wait_forced_correct_time = (time.perf_counter() - wait_forced_correct_start) * 1000
logger.debug(f"[DEBUG] Try {count+1} : Wait for forced isCorrectAnswer flag : {wait_forced_correct_time}ms")
if "/place/" in self.page.url:
return self.page.url
count += 1
logger.error("[ERROR] Not found url for {selected}")
return None # 404
# if (count == self._max_retry / 2):
# raise Exception("Failed to identify place id. loading timeout")
# else:
# raise Exception("Failed to identify place id. item is ambiguous")

View File

@ -851,6 +851,7 @@ async def get_image_tags_by_task_id(task_id: str) -> list[dict]:
.where( .where(
Image.task_id == task_id, Image.task_id == task_id,
Image.is_deleted == False, Image.is_deleted == False,
ImageTag.img_tag.is_not(None),
) )
) )
print(stmt.compile(dialect=mysql.dialect(), compile_kwargs={"literal_binds": True})) print(stmt.compile(dialect=mysql.dialect(), compile_kwargs={"literal_binds": True}))

View File

@ -13,6 +13,7 @@ from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from app.database.session import BackgroundSessionLocal from app.database.session import BackgroundSessionLocal
from app.user.services.credit import consume_credit
from app.video.models import Video from app.video.models import Video
from app.utils.upload_blob_as_request import AzureBlobUploader from app.utils.upload_blob_as_request import AzureBlobUploader
from app.utils.logger import get_logger from app.utils.logger import get_logger
@ -154,6 +155,12 @@ async def download_and_upload_video_to_blob(
# Video 테이블 업데이트 (creatomate_render_id로 특정 Video 식별) # Video 테이블 업데이트 (creatomate_render_id로 특정 Video 식별)
await _update_video_status(task_id, "completed", blob_url, creatomate_render_id) await _update_video_status(task_id, "completed", blob_url, creatomate_render_id)
# 영상 생성 완료 시 크레딧 1 차감 (credits > 0 조건으로 음수 방지)
async with BackgroundSessionLocal() as session:
await consume_credit(user_uuid, session)
await session.commit()
logger.info(f"[download_and_upload_video_to_blob] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}") logger.info(f"[download_and_upload_video_to_blob] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}")
except httpx.HTTPError as e: except httpx.HTTPError as e:

View File

@ -31,6 +31,8 @@ class ProjectSettings(BaseSettings):
VERSION: str = Field(default="0.1.0") VERSION: str = Field(default="0.1.0")
DESCRIPTION: str = Field(default="FastAPI 기반 CastAD 프로젝트") DESCRIPTION: str = Field(default="FastAPI 기반 CastAD 프로젝트")
ADMIN_BASE_URL: str = Field(default="/admin") ADMIN_BASE_URL: str = Field(default="/admin")
ADMIN_SESSION_SECRET: str = Field(default="dev-secret-change-me-in-production")
ADMIN_SESSION_MAX_AGE: int = Field(default=60 * 60 * 8)
DEBUG: bool = Field(default=True) DEBUG: bool = Field(default=True)
TIMEZONE: str = Field( TIMEZONE: str = Field(
default="Asia/Seoul", default="Asia/Seoul",

View File

@ -0,0 +1,8 @@
-- ============================================================
-- Migration: user 테이블에 credits 컬럼 추가
-- Date: 2026-04-28
-- Description: 사용자 크레딧 시스템 도입
-- ============================================================
ALTER TABLE `user`
ADD COLUMN credits INT NOT NULL DEFAULT 3 COMMENT '영상 생성 크레딧';

View File

@ -0,0 +1,21 @@
-- ============================================================
-- Migration: 백오피스 관리자 계정 테이블 추가
-- Date: 2026-04-29
-- Description: SQLAdmin 백오피스 전용 admin 계정 테이블 신설
-- ============================================================
CREATE TABLE IF NOT EXISTS `admin` (
id BIGINT NOT NULL AUTO_INCREMENT COMMENT '고유 식별자',
username VARCHAR(50) NOT NULL COMMENT '로그인 ID',
password VARCHAR(255) NOT NULL COMMENT '비밀번호',
name VARCHAR(50) NULL COMMENT '표시 이름',
role VARCHAR(20) NOT NULL DEFAULT 'superadmin' COMMENT '권한 (superadmin: 전체, viewer: 조회만)',
is_active TINYINT(1) NOT NULL DEFAULT 1 COMMENT '활성화 상태 (0: 비활성화)',
last_login_at DATETIME NULL COMMENT '마지막 로그인 일시',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '생성 일시',
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '수정 일시',
PRIMARY KEY (id),
UNIQUE INDEX idx_admin_username (username),
INDEX idx_admin_is_active (is_active)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='백오피스 관리자 계정';

View File

@ -0,0 +1,49 @@
-- ============================================================
-- Migration: 크레딧 충전 요청 / 거래 이력 테이블 추가
-- Date: 2026-04-29
-- Description: 백오피스 크레딧 워크플로우 도입
-- 선행 조건: migration_2026_04_29_add_admin_table.sql 먼저 실행
-- ============================================================
CREATE TABLE IF NOT EXISTS credit_charge_request (
id BIGINT NOT NULL AUTO_INCREMENT COMMENT '고유 식별자',
user_uuid VARCHAR(36) NOT NULL COMMENT '사용자 UUID (user.user_uuid 참조)',
requested_amount INT NOT NULL COMMENT '요청 크레딧 수량 (양수)',
message VARCHAR(500) NULL COMMENT '사용자 요청 메시지',
status VARCHAR(20) NOT NULL DEFAULT 'pending' COMMENT '처리 상태 (pending/approved/rejected/cancelled)',
admin_id BIGINT NULL COMMENT '처리한 백오피스 관리자 ID',
admin_note VARCHAR(1000) NULL COMMENT '관리자 메모',
processed_at DATETIME NULL COMMENT '처리 일시',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '요청 일시',
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '수정 일시',
PRIMARY KEY (id),
CONSTRAINT fk_charge_request_user FOREIGN KEY (user_uuid) REFERENCES `user`(user_uuid) ON DELETE CASCADE,
CONSTRAINT fk_charge_request_admin FOREIGN KEY (admin_id) REFERENCES `admin`(id) ON DELETE SET NULL,
INDEX idx_credit_request_user_uuid (user_uuid),
INDEX idx_credit_request_status (status),
INDEX idx_credit_request_created_at (created_at),
INDEX idx_credit_request_status_created (status, created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='사용자 크레딧 충전 요청';
CREATE TABLE IF NOT EXISTS credit_transaction (
id BIGINT NOT NULL AUTO_INCREMENT COMMENT '고유 식별자',
user_uuid VARCHAR(36) NOT NULL COMMENT '사용자 UUID',
amount INT NOT NULL COMMENT '변경 크레딧 수량 (충전 양수, 차감 음수)',
balance_after INT NOT NULL COMMENT '변경 후 잔액',
type VARCHAR(20) NOT NULL COMMENT '변경 유형 (charge/consume/refund/admin_adjust)',
reason VARCHAR(255) NULL COMMENT '변경 사유',
admin_id BIGINT NULL COMMENT '처리 관리자 ID',
related_request_id BIGINT NULL COMMENT '연관 충전 요청 ID',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '변경 일시',
PRIMARY KEY (id),
CONSTRAINT fk_credit_tx_user FOREIGN KEY (user_uuid) REFERENCES `user`(user_uuid) ON DELETE CASCADE,
CONSTRAINT fk_credit_tx_admin FOREIGN KEY (admin_id) REFERENCES `admin`(id) ON DELETE SET NULL,
CONSTRAINT fk_credit_tx_request FOREIGN KEY (related_request_id) REFERENCES credit_charge_request(id) ON DELETE SET NULL,
INDEX idx_credit_tx_user_uuid (user_uuid),
INDEX idx_credit_tx_user_uuid_created (user_uuid, created_at),
INDEX idx_credit_tx_type (type),
INDEX idx_credit_tx_related_request (related_request_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='크레딧 변경 이력';

View File

@ -24,6 +24,7 @@ from app.social.api.routers.v1.oauth import router as social_oauth_router
from app.social.api.routers.v1.upload import router as social_upload_router from app.social.api.routers.v1.upload import router as social_upload_router
from app.social.api.routers.v1.seo import router as social_seo_router from app.social.api.routers.v1.seo import router as social_seo_router
from app.social.api.routers.v1.internal import router as social_internal_router from app.social.api.routers.v1.internal import router as social_internal_router
from app.credit.api.routers.v1.credit import router as credit_router
from app.utils.cors import CustomCORSMiddleware from app.utils.cors import CustomCORSMiddleware
from config import prj_settings from config import prj_settings
@ -395,6 +396,7 @@ app.include_router(social_seo_router, prefix="/social") # Social Upload 라우
app.include_router(social_internal_router) # 내부 스케줄러 전용 라우터 app.include_router(social_internal_router) # 내부 스케줄러 전용 라우터
app.include_router(sns_router) # SNS API 라우터 추가 app.include_router(sns_router) # SNS API 라우터 추가
app.include_router(dashboard_router) # Dashboard API 라우터 추가 app.include_router(dashboard_router) # Dashboard API 라우터 추가
app.include_router(credit_router, prefix="/user") # Credit API 라우터 추가
# DEBUG 모드에서만 테스트 라우터 등록 # DEBUG 모드에서만 테스트 라우터 등록
if prj_settings.DEBUG: if prj_settings.DEBUG: