Compare commits

..

No commits in common. "7426286fa6f5587fd60f0dad7a94e6188513e6aa" and "c705ce40f8651284feae56b868fc8ca142df3b52" have entirely different histories.

26 changed files with 1613 additions and 1402 deletions

View File

@ -4,22 +4,43 @@ Dashboard API 라우터
YouTube Analytics 기반 대시보드 통계를 제공합니다.
"""
import json
import logging
from datetime import date, datetime, timedelta
from typing import Literal
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dashboard.utils.redis_cache import delete_cache_pattern
from app.dashboard.schemas import (
CacheDeleteResponse,
ConnectedAccountsResponse,
DashboardResponse,
from app.dashboard.exceptions import (
YouTubeAccountNotConnectedError,
YouTubeAccountNotFoundError,
YouTubeAccountSelectionRequiredError,
YouTubeTokenExpiredError,
)
from app.dashboard.schemas import (
AudienceData,
CacheDeleteResponse,
ConnectedAccount,
ConnectedAccountsResponse,
ContentMetric,
DashboardResponse,
TopContent,
)
from app.dashboard.services import DataProcessor, YouTubeAnalyticsService
from app.dashboard.redis_cache import (
delete_cache,
delete_cache_pattern,
get_cache,
set_cache,
)
from app.dashboard.services import DashboardService
from app.database.session import get_session
from app.dashboard.models import Dashboard
from app.social.exceptions import TokenExpiredError
from app.social.services import SocialAccountService
from app.user.dependencies.auth import get_current_user
from app.user.models import User
from app.user.models import SocialAccount, User
logger = logging.getLogger(__name__)
@ -40,8 +61,41 @@ async def get_connected_accounts(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> ConnectedAccountsResponse:
service = DashboardService()
connected = await service.get_connected_accounts(current_user, session)
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
accounts_raw = result.scalars().all()
# platform_user_id 기준
seen_platform_ids: set[str] = set()
connected = []
for acc in sorted(
accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id in seen_platform_ids:
continue
seen_platform_ids.add(acc.platform_user_id)
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
connected.append(
ConnectedAccount(
id=acc.id,
platform=acc.platform,
platform_username=acc.platform_username,
platform_user_id=acc.platform_user_id,
channel_title=data.get("channel_title"),
connected_at=acc.connected_at,
is_active=acc.is_active,
)
)
logger.info(
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
)
return ConnectedAccountsResponse(accounts=connected)
@ -88,8 +142,328 @@ async def get_dashboard_stats(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> DashboardResponse:
service = DashboardService()
return await service.get_stats(mode, platform_user_id, current_user, session)
"""
대시보드 통계 조회
Args:
mode: 조회 모드 (day: 최근 30, month: 최근 12개월)
platform_user_id: 사용할 YouTube 채널 ID (여러 계정 연결 필수, 재연동해도 불변)
current_user: 현재 인증된 사용자
session: 데이터베이스 세션
Returns:
DashboardResponse: 대시보드 통계 데이터
Raises:
YouTubeAccountNotConnectedError: YouTube 계정이 연동되어 있지 않음
YouTubeAccountSelectionRequiredError: 여러 계정이 연결되어 있으나 계정 미선택
YouTubeAccountNotFoundError: 지정한 계정을 찾을 없음
YouTubeTokenExpiredError: YouTube 토큰 만료 (재연동 필요)
YouTubeAPIError: YouTube Analytics API 호출 실패
"""
logger.info(
f"[DASHBOARD] 통계 조회 시작 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
)
# 1. 모드별 날짜 자동 계산
today = date.today()
if mode == "day":
# 48시간 지연 적용: 오늘 기준 -2일을 end로 사용
# ex) 오늘 2/20 → end=2/18, start=1/20
end_dt = today - timedelta(days=2)
kpi_end_dt = end_dt
start_dt = end_dt - timedelta(days=29)
# 이전 30일 (YouTube API day_previous와 동일 기준)
prev_start_dt = start_dt - timedelta(days=30)
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
period_desc = "최근 30일"
else: # mode == "month"
# 월별 차트: dimensions=month API는 YYYY-MM-01 형식 필요
# ex) 오늘 2/24 → end=2026-02-01, start=2025-03-01 → 2025-03 ~ 2026-02 (12개월)
end_dt = today.replace(day=1)
# KPI 등 집계형 API: 48시간 지연 적용하여 현재 월 전체 데이터 포함
kpi_end_dt = today - timedelta(days=2)
start_month = end_dt.month - 11
if start_month <= 0:
start_month += 12
start_year = end_dt.year - 1
else:
start_year = end_dt.year
start_dt = date(start_year, start_month, 1)
# 이전 12개월 (YouTube API previous와 동일 기준 — 1년 전)
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
try:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
except ValueError: # 윤년 2/29 → 이전 연도 2/28
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
period_desc = "최근 12개월"
start_date = start_dt.strftime("%Y-%m-%d")
end_date = end_dt.strftime("%Y-%m-%d")
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
logger.debug(
f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}"
)
# 2. YouTube 계정 연동 확인
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
social_accounts_raw = result.scalars().all()
# platform_user_id 기준으로 중복 제거 (가장 최근 연동 계정 우선)
seen_platform_ids_stats: set[str] = set()
social_accounts = []
for acc in sorted(
social_accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id not in seen_platform_ids_stats:
seen_platform_ids_stats.add(acc.platform_user_id)
social_accounts.append(acc)
if not social_accounts:
logger.warning(
f"[NO YOUTUBE ACCOUNT] YouTube 계정 미연동 - "
f"user_uuid={current_user.user_uuid}"
)
raise YouTubeAccountNotConnectedError()
if platform_user_id is not None:
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
if not matched:
logger.warning(
f"[ACCOUNT NOT FOUND] 지정 계정 없음 - "
f"user_uuid={current_user.user_uuid}, platform_user_id={platform_user_id}"
)
raise YouTubeAccountNotFoundError()
social_account = matched[0]
elif len(social_accounts) == 1:
social_account = social_accounts[0]
else:
logger.warning(
f"[MULTI ACCOUNT] 계정 선택 필요 - "
f"user_uuid={current_user.user_uuid}, count={len(social_accounts)}"
)
raise YouTubeAccountSelectionRequiredError()
logger.debug(
f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}"
)
# 3. 기간 내 업로드 영상 수 조회
count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= start_dt,
Dashboard.uploaded_at < today + timedelta(days=1),
)
)
period_video_count = count_result.scalar() or 0
# 이전 기간 업로드 영상 수 조회 (trend 계산용)
prev_count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= prev_start_dt,
Dashboard.uploaded_at <= prev_kpi_end_dt,
)
)
prev_period_video_count = prev_count_result.scalar() or 0
logger.debug(
f"[3] 기간 내 업로드 영상 수 - current={period_video_count}, prev={prev_period_video_count}"
)
# 4. Redis 캐시 조회
# platform_user_id 기준 캐시 키: 재연동해도 채널 ID는 불변 → 캐시 유지됨
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
cached_raw = await get_cache(cache_key)
if cached_raw:
try:
payload = json.loads(cached_raw)
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
response = DashboardResponse.model_validate(payload["response"])
for metric in response.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
return response
except (json.JSONDecodeError, KeyError):
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
# 5. 최근 30개 업로드 영상 조회 (Analytics API 전달용)
# YouTube Analytics API 제약사항:
# - 영상 개수: 20~30개 권장 (최대 50개, 그 이상은 응답 지연 발생)
# - URL 길이: 2000자 제한 (video ID 11자 × 30개 = 330자로 안전)
result = await session.execute(
select(
Dashboard.platform_video_id,
Dashboard.title,
Dashboard.uploaded_at,
)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
)
.order_by(Dashboard.uploaded_at.desc())
.limit(30)
)
rows = result.all()
logger.debug(f"[5] 영상 조회 완료 - count={len(rows)}")
# 6. video_ids + 메타데이터 조회용 dict 구성
video_ids = []
video_lookup: dict[str, tuple[str, datetime]] = {} # {video_id: (title, uploaded_at)}
for row in rows:
platform_video_id, title, uploaded_at = row
video_ids.append(platform_video_id)
video_lookup[platform_video_id] = (title, uploaded_at)
logger.debug(
f"[6] 영상 메타데이터 구성 완료 - count={len(video_ids)}, sample={video_ids[:3]}"
)
# 6.1 업로드 영상 없음 → YouTube API 호출 없이 빈 응답 반환
if not video_ids:
logger.info(
f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - "
f"user_uuid={current_user.user_uuid}"
)
return DashboardResponse(
content_metrics=[
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
],
monthly_data=[],
daily_data=[],
top_content=[],
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
has_uploads=False,
)
# 7. 토큰 유효성 확인 및 자동 갱신 (만료 10분 전 갱신)
try:
access_token = await SocialAccountService().ensure_valid_token(
social_account, session
)
except TokenExpiredError:
logger.warning(
f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}"
)
raise YouTubeTokenExpiredError()
logger.debug("[7] 토큰 유효성 확인 완료")
# 8. YouTube Analytics API 호출 (7개 병렬)
youtube_service = YouTubeAnalyticsService()
raw_data = await youtube_service.fetch_all_metrics(
video_ids=video_ids,
start_date=start_date,
end_date=end_date,
kpi_end_date=kpi_end_date,
access_token=access_token,
mode=mode,
)
logger.debug("[8] YouTube Analytics API 호출 완료")
# 9. TopContent 조립 (Analytics top_videos + DB lookup)
processor = DataProcessor()
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
top_content: list[TopContent] = []
for row in top_content_rows[:4]:
if len(row) < 4:
continue
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
meta = video_lookup.get(video_id)
if not meta:
continue
title, uploaded_at = meta
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
top_content.append(
TopContent(
id=video_id,
title=title,
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
platform="youtube",
views=int(views),
engagement=f"{engagement_rate:.1f}%",
date=uploaded_at.strftime("%Y.%m.%d"),
)
)
logger.debug(f"[9] TopContent 조립 완료 - count={len(top_content)}")
# 10. 데이터 가공 (period_video_count=0 — API 무관 DB 집계값, 캐시에 포함하지 않음)
dashboard_data = processor.process(
raw_data, top_content, 0, mode=mode, end_date=end_date
)
logger.debug("[10] 데이터 가공 완료")
# 11. Redis 캐싱 (TTL: 12시간)
# YouTube Analytics는 하루 1회 갱신 (PT 자정, 한국 시간 오후 5~8시)
# 48시간 지연된 데이터이므로 12시간 캐싱으로 API 호출 최소화
# period_video_count는 캐시에 포함하지 않음 (DB 직접 집계, API 미사용)
cache_payload = json.dumps(
{"response": json.loads(dashboard_data.model_dump_json())}
)
cache_success = await set_cache(
cache_key,
cache_payload,
ttl=43200, # 12시간
)
if cache_success:
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
else:
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
# 12. 업로드 영상 수 및 trend 주입 (캐시 저장 후 — 항상 DB에서 직접 집계)
for metric in dashboard_data.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
logger.info(
f"[DASHBOARD] 통계 조회 완료 - "
f"user_uuid={current_user.user_uuid}, "
f"mode={mode}, period={period_desc}, videos={len(video_ids)}"
)
return dashboard_data
@router.delete(
@ -109,7 +483,7 @@ async def get_dashboard_stats(
`dashboard:{user_uuid}:{platform_user_id}:{mode}` (mode: day 또는 month)
## 파라미터
- `user_uuid`: 삭제할 사용자 UUID (필수)
- `user_uuid`: 특정 사용자 캐시만 삭제. 미입력 전체 삭제
- `mode`: day / month / all (기본값: all)
""",
)
@ -118,16 +492,33 @@ async def delete_dashboard_cache(
default="all",
description="삭제할 캐시 모드: day, month, all(기본값, 모두 삭제)",
),
user_uuid: str = Query(
description="대상 사용자 UUID",
user_uuid: str | None = Query(
default=None,
description="대상 사용자 UUID. 미입력 시 전체 사용자 캐시 삭제",
),
) -> CacheDeleteResponse:
if mode == "all":
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*")
message = f"전체 캐시 삭제 완료 ({deleted}개)"
"""
대시보드 캐시 삭제
Args:
mode: 삭제할 캐시 모드 (day / month / all)
user_uuid: 대상 사용자 UUID (없으면 전체 삭제)
Returns:
CacheDeleteResponse: 삭제된 캐시 개수 메시지
"""
if user_uuid:
if mode == "all":
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*")
message = f"전체 캐시 삭제 완료 ({deleted}개)"
else:
cache_key = f"dashboard:{user_uuid}:{mode}"
success = await delete_cache(cache_key)
deleted = 1 if success else 0
message = f"{mode} 캐시 삭제 {'완료' if success else '실패 (키 없음)'}"
else:
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*:{mode}")
message = f"{mode} 캐시 삭제 완료 ({deleted}개)"
deleted = await delete_cache_pattern("dashboard:*")
message = f"전체 사용자 캐시 삭제 완료 ({deleted}개)"
logger.info(
f"[CACHE DELETE] user_uuid={user_uuid or 'ALL'}, mode={mode}, deleted={deleted}"

View File

@ -113,7 +113,7 @@ class YouTubeAccountSelectionRequiredError(DashboardException):
def __init__(self):
super().__init__(
message="연결된 YouTube 계정이 여러 개입니다. platform_user_id 파라미터로 사용할 계정을 선택해주세요.",
message="연결된 YouTube 계정이 여러 개입니다. social_account_id 파라미터로 사용할 계정을 선택해주세요.",
status_code=status.HTTP_400_BAD_REQUEST,
code="YOUTUBE_ACCOUNT_SELECTION_REQUIRED",
)

View File

@ -197,6 +197,35 @@ class AudienceData(BaseModel):
)
# class PlatformMetric(BaseModel):
# """플랫폼별 메트릭 (미사용 — platform_data 기능 미구현)"""
#
# id: str
# label: str
# value: str
# unit: Optional[str] = None
# trend: float
# trend_direction: Literal["up", "down", "-"] = Field(alias="trendDirection")
#
# model_config = ConfigDict(
# alias_generator=to_camel,
# populate_by_name=True,
# )
#
#
# class PlatformData(BaseModel):
# """플랫폼별 데이터 (미사용 — platform_data 기능 미구현)"""
#
# platform: Literal["youtube", "instagram"]
# display_name: str = Field(alias="displayName")
# metrics: list[PlatformMetric]
#
# model_config = ConfigDict(
# alias_generator=to_camel,
# populate_by_name=True,
# )
class DashboardResponse(BaseModel):
"""대시보드 전체 응답
@ -226,6 +255,7 @@ class DashboardResponse(BaseModel):
top_content: list[TopContent] = Field(alias="topContent")
audience_data: AudienceData = Field(alias="audienceData")
has_uploads: bool = Field(default=True, alias="hasUploads")
# platform_data: list[PlatformData] = Field(default=[], alias="platformData") # 미사용
model_config = ConfigDict(
alias_generator=to_camel,

View File

@ -4,12 +4,10 @@ Dashboard Services
YouTube Analytics API 연동 데이터 가공 서비스를 제공합니다.
"""
from app.dashboard.services.dashboard_service import DashboardService
from app.dashboard.services.data_processor import DataProcessor
from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService
__all__ = [
"DashboardService",
"YouTubeAnalyticsService",
"DataProcessor",
]

View File

@ -1,358 +0,0 @@
"""
Dashboard Service
대시보드 비즈니스 로직을 담당합니다.
"""
import json
import logging
from datetime import date, datetime, timedelta
from typing import Literal
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dashboard.exceptions import (
YouTubeAccountNotConnectedError,
YouTubeAccountNotFoundError,
YouTubeAccountSelectionRequiredError,
YouTubeTokenExpiredError,
)
from app.dashboard.models import Dashboard
from app.dashboard.utils.redis_cache import get_cache, set_cache
from app.dashboard.schemas import (
AudienceData,
ConnectedAccount,
ContentMetric,
DashboardResponse,
TopContent,
)
from app.dashboard.services.data_processor import DataProcessor
from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService
from app.social.exceptions import TokenExpiredError
from app.social.services import SocialAccountService
from app.user.models import SocialAccount, User
logger = logging.getLogger(__name__)
class DashboardService:
async def get_connected_accounts(
self,
current_user: User,
session: AsyncSession,
) -> list[ConnectedAccount]:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
accounts_raw = result.scalars().all()
connected = []
for acc in accounts_raw:
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
connected.append(
ConnectedAccount(
id=acc.id,
platform=acc.platform,
platform_username=acc.platform_username,
platform_user_id=acc.platform_user_id,
channel_title=data.get("channel_title"),
connected_at=acc.connected_at,
is_active=acc.is_active,
)
)
logger.info(
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
)
return connected
def calculate_date_range(
self, mode: Literal["day", "month"]
) -> tuple[date, date, date, date, date, str]:
"""모드별 날짜 범위 계산. (start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc) 반환"""
today = date.today()
if mode == "day":
end_dt = today - timedelta(days=2)
kpi_end_dt = end_dt
start_dt = end_dt - timedelta(days=29)
prev_start_dt = start_dt - timedelta(days=30)
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
period_desc = "최근 30일"
else:
end_dt = today.replace(day=1)
kpi_end_dt = today - timedelta(days=2)
start_month = end_dt.month - 11
if start_month <= 0:
start_month += 12
start_year = end_dt.year - 1
else:
start_year = end_dt.year
start_dt = date(start_year, start_month, 1)
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
try:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
except ValueError:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
period_desc = "최근 12개월"
return start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc
async def resolve_social_account(
self,
current_user: User,
session: AsyncSession,
platform_user_id: str | None,
) -> SocialAccount:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
social_accounts_raw = result.scalars().all()
social_accounts = list(social_accounts_raw)
if not social_accounts:
raise YouTubeAccountNotConnectedError()
if platform_user_id is not None:
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
if not matched:
raise YouTubeAccountNotFoundError()
return matched[0]
elif len(social_accounts) == 1:
return social_accounts[0]
else:
raise YouTubeAccountSelectionRequiredError()
async def get_video_counts(
self,
current_user: User,
session: AsyncSession,
social_account: SocialAccount,
start_dt: date,
prev_start_dt: date,
prev_kpi_end_dt: date,
) -> tuple[int, int]:
today = date.today()
count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= start_dt,
Dashboard.uploaded_at < today + timedelta(days=1),
)
)
period_video_count = count_result.scalar() or 0
prev_count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= prev_start_dt,
Dashboard.uploaded_at <= prev_kpi_end_dt,
)
)
prev_period_video_count = prev_count_result.scalar() or 0
return period_video_count, prev_period_video_count
async def get_video_ids(
self,
current_user: User,
session: AsyncSession,
social_account: SocialAccount,
) -> tuple[list[str], dict[str, tuple[str, datetime]]]:
result = await session.execute(
select(
Dashboard.platform_video_id,
Dashboard.title,
Dashboard.uploaded_at,
)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
)
.order_by(Dashboard.uploaded_at.desc())
.limit(30)
)
rows = result.all()
video_ids = []
video_lookup: dict[str, tuple[str, datetime]] = {}
for row in rows:
platform_video_id, title, uploaded_at = row
video_ids.append(platform_video_id)
video_lookup[platform_video_id] = (title, uploaded_at)
return video_ids, video_lookup
def build_empty_response(self) -> DashboardResponse:
return DashboardResponse(
content_metrics=[
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
],
monthly_data=[],
daily_data=[],
top_content=[],
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
has_uploads=False,
)
def inject_video_count(
self,
response: DashboardResponse,
period_video_count: int,
prev_period_video_count: int,
) -> None:
for metric in response.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
async def get_stats(
self,
mode: Literal["day", "month"],
platform_user_id: str | None,
current_user: User,
session: AsyncSession,
) -> DashboardResponse:
logger.info(
f"[DASHBOARD] 통계 조회 시작 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
)
# 1. 날짜 계산
start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc = (
self.calculate_date_range(mode)
)
start_date = start_dt.strftime("%Y-%m-%d")
end_date = end_dt.strftime("%Y-%m-%d")
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
logger.debug(f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}")
# 2. YouTube 계정 확인
social_account = await self.resolve_social_account(current_user, session, platform_user_id)
logger.debug(f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}")
# 3. 영상 수 조회
period_video_count, prev_period_video_count = await self.get_video_counts(
current_user, session, social_account, start_dt, prev_start_dt, prev_kpi_end_dt
)
logger.debug(f"[3] 영상 수 - current={period_video_count}, prev={prev_period_video_count}")
# 4. 캐시 조회
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
cached_raw = await get_cache(cache_key)
if cached_raw:
try:
payload = json.loads(cached_raw)
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
response = DashboardResponse.model_validate(payload["response"])
self.inject_video_count(response, period_video_count, prev_period_video_count)
return response
except (json.JSONDecodeError, KeyError):
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
# 5. 업로드 영상 조회
video_ids, video_lookup = await self.get_video_ids(current_user, session, social_account)
logger.debug(f"[5] 영상 조회 완료 - count={len(video_ids)}")
if not video_ids:
logger.info(f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - user_uuid={current_user.user_uuid}")
return self.build_empty_response()
# 6. 토큰 유효성 확인
try:
access_token = await SocialAccountService().ensure_valid_token(social_account, session)
except TokenExpiredError:
logger.warning(f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}")
raise YouTubeTokenExpiredError()
logger.debug("[6] 토큰 유효성 확인 완료")
# 7. YouTube Analytics API 호출
youtube_service = YouTubeAnalyticsService()
raw_data = await youtube_service.fetch_all_metrics(
video_ids=video_ids,
start_date=start_date,
end_date=end_date,
kpi_end_date=kpi_end_date,
access_token=access_token,
mode=mode,
)
logger.debug("[7] YouTube Analytics API 호출 완료")
# 8. TopContent 조립
processor = DataProcessor()
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
top_content: list[TopContent] = []
for row in top_content_rows[:4]:
if len(row) < 4:
continue
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
meta = video_lookup.get(video_id)
if not meta:
continue
title, uploaded_at = meta
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
top_content.append(
TopContent(
id=video_id,
title=title,
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
platform="youtube",
views=int(views),
engagement=f"{engagement_rate:.1f}%",
date=uploaded_at.strftime("%Y.%m.%d"),
)
)
logger.debug(f"[8] TopContent 조립 완료 - count={len(top_content)}")
# 9. 데이터 가공
dashboard_data = processor.process(raw_data, top_content, 0, mode=mode, end_date=end_date)
logger.debug("[9] 데이터 가공 완료")
# 10. 캐시 저장
cache_payload = json.dumps({"response": dashboard_data.model_dump(mode="json")})
cache_success = await set_cache(cache_key, cache_payload, ttl=43200)
if cache_success:
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
else:
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
# 11. 업로드 영상 수 주입
self.inject_video_count(dashboard_data, period_video_count, prev_period_video_count)
logger.info(
f"[DASHBOARD] 통계 조회 완료 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, period={period_desc}, videos={len(video_ids)}"
)
return dashboard_data

View File

@ -1,39 +0,0 @@
"""
내부 전용 소셜 업로드 API
스케줄러 서버에서만 호출하는 내부 엔드포인트입니다.
X-Internal-Secret 헤더로 인증합니다.
"""
import logging
from fastapi import APIRouter, BackgroundTasks, Header, HTTPException, status
from app.social.worker.upload_task import process_social_upload
from config import internal_settings
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/internal/social", tags=["Internal"])
@router.post(
"/upload/{upload_id}",
summary="[내부] 예약 업로드 실행",
description="스케줄러 서버에서 호출하는 내부 전용 엔드포인트입니다.",
)
async def trigger_scheduled_upload(
upload_id: int,
background_tasks: BackgroundTasks,
x_internal_secret: str = Header(...),
) -> dict:
if x_internal_secret != internal_settings.INTERNAL_SECRET_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid internal secret",
)
logger.info(f"[INTERNAL] 예약 업로드 실행 - upload_id: {upload_id}")
background_tasks.add_task(process_social_upload, upload_id)
return {"success": True, "upload_id": upload_id, "message": "업로드 작업이 시작되었습니다."}

View File

@ -238,7 +238,7 @@ async def get_account_by_platform(
raise SocialAccountNotFoundError(platform=platform.value)
return social_account_service.to_response(account)
return social_account_service._to_response(account)
@router.delete(

View File

@ -1,37 +1,131 @@
"""
소셜 SEO API 라우터
SEO 관련 엔드포인트를 제공합니다.
비즈니스 로직은 SeoService에 위임합니다.
"""
import logging, json
import logging
from redis.asyncio import Redis
from fastapi import APIRouter, Depends
from config import social_oauth_settings, db_settings
from app.social.constants import YOUTUBE_SEO_HASH
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.social.schemas import (
YoutubeDescriptionRequest,
YoutubeDescriptionResponse,
)
from app.database.session import get_session
from app.social.schemas import YoutubeDescriptionRequest, YoutubeDescriptionResponse
from app.social.services import seo_service
from app.user.dependencies import get_current_user
from app.user.models import User
from app.home.models import Project, MarketingIntel
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from fastapi import HTTPException, status
from app.utils.prompts.prompts import yt_upload_prompt
from app.utils.chatgpt_prompt import ChatgptService, ChatGPTResponseError
redis_seo_client = Redis(
host=db_settings.REDIS_HOST,
port=db_settings.REDIS_PORT,
db=0,
decode_responses=True,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/seo", tags=["Social SEO"])
@router.post(
"/youtube",
response_model=YoutubeDescriptionResponse,
summary="유튜브 SEO description 생성",
description="유튜브 업로드 시 사용할 description을 SEO 적용하여 생성",
summary="유튜브 SEO descrption 생성",
description="유튜브 업로드 시 사용할 descrption을 SEO 적용하여 생성",
)
async def youtube_seo_description(
request_body: YoutubeDescriptionRequest,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> YoutubeDescriptionResponse:
return await seo_service.get_youtube_seo_description(
request_body.task_id, current_user, session
request_body: YoutubeDescriptionRequest,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> YoutubeDescriptionResponse:
# TODO : 나중에 Session Task_id 검증 미들웨어 만들면 추가해주세요.
logger.info(
f"[youtube_seo_description] Try Cache - user: {current_user.user_uuid} / task_id : {request_body.task_id}"
)
cached = await get_yt_seo_in_redis(request_body.task_id)
if cached: # redis hit
return cached
logger.info(
f"[youtube_seo_description] Cache miss - user: {current_user.user_uuid} "
)
updated_seo = await make_youtube_seo_description(request_body.task_id, current_user, session)
await set_yt_seo_in_redis(request_body.task_id, updated_seo)
return updated_seo
async def make_youtube_seo_description(
task_id: str,
current_user: User,
session: AsyncSession,
) -> YoutubeDescriptionResponse:
logger.info(
f"[make_youtube_seo_description] START - user: {current_user.user_uuid} "
)
try:
project_query = await session.execute(
select(Project)
.where(
Project.task_id == task_id,
Project.user_uuid == current_user.user_uuid)
.order_by(Project.created_at.desc())
.limit(1)
)
project = project_query.scalar_one_or_none()
marketing_query = await session.execute(
select(MarketingIntel)
.where(MarketingIntel.id == project.marketing_intelligence)
)
marketing_intelligence = marketing_query.scalar_one_or_none()
hashtags = marketing_intelligence.intel_result["target_keywords"]
yt_seo_input_data = {
"customer_name" : project.store_name,
"detail_region_info" : project.detail_region_info,
"marketing_intelligence_summary" : json.dumps(marketing_intelligence.intel_result, ensure_ascii=False),
"language" : project.language,
"target_keywords" : hashtags
}
chatgpt = ChatgptService(timeout = 180)
yt_seo_output = await chatgpt.generate_structured_output(yt_upload_prompt, yt_seo_input_data)
result_dict = {
"title" : yt_seo_output.title,
"description" : yt_seo_output.description,
"keywords": hashtags
}
result = YoutubeDescriptionResponse(**result_dict)
return result
except Exception as e:
logger.error(f"[youtube_seo_description] EXCEPTION - error: {e}")
raise HTTPException(
status_code=500,
detail=f"유튜브 SEO 생성에 실패했습니다. : {str(e)}",
)
async def get_yt_seo_in_redis(task_id:str) -> YoutubeDescriptionResponse | None:
field = f"task_id:{task_id}"
yt_seo_info = await redis_seo_client.hget(YOUTUBE_SEO_HASH, field)
if yt_seo_info:
yt_seo = json.loads(yt_seo_info)
else:
return None
return YoutubeDescriptionResponse(**yt_seo)
async def set_yt_seo_in_redis(task_id:str, yt_seo : YoutubeDescriptionResponse) -> None:
field = f"task_id:{task_id}"
yt_seo_info = json.dumps(yt_seo.model_dump(), ensure_ascii=False)
await redis_seo_client.hsetex(YOUTUBE_SEO_HASH, field, yt_seo_info, ex=3600)
return

View File

@ -2,34 +2,38 @@
소셜 업로드 API 라우터
소셜 미디어 영상 업로드 관련 엔드포인트를 제공합니다.
비즈니스 로직은 SocialUploadService에 위임합니다.
"""
import logging
import logging, json
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from fastapi import HTTPException, status
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session
from app.social.constants import SocialPlatform
from app.social.constants import SocialPlatform, UploadStatus
from app.social.exceptions import SocialAccountNotFoundError, VideoNotFoundError
from app.social.models import SocialUpload
from app.social.schemas import (
MessageResponse,
SocialUploadHistoryItem,
SocialUploadHistoryResponse,
SocialUploadRequest,
SocialUploadResponse,
SocialUploadStatusResponse,
)
from app.social.services import SocialUploadService, social_account_service
from app.social.services import social_account_service
from app.social.worker.upload_task import process_social_upload
from app.user.dependencies import get_current_user
from app.user.models import User
from app.video.models import Video
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/upload", tags=["Social Upload"])
upload_service = SocialUploadService(account_service=social_account_service)
@router.post(
"",
@ -66,7 +70,121 @@ async def upload_to_social(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> SocialUploadResponse:
return await upload_service.request_upload(body, current_user, session, background_tasks)
"""
소셜 플랫폼에 영상 업로드 요청
백그라운드에서 영상을 다운로드하고 소셜 플랫폼에 업로드합니다.
"""
logger.info(
f"[UPLOAD_API] 업로드 요청 - "
f"user_uuid: {current_user.user_uuid}, "
f"video_id: {body.video_id}, "
f"social_account_id: {body.social_account_id}"
)
# 1. 영상 조회 및 검증
video_result = await session.execute(
select(Video).where(Video.id == body.video_id)
)
video = video_result.scalar_one_or_none()
if not video:
logger.warning(f"[UPLOAD_API] 영상 없음 - video_id: {body.video_id}")
raise VideoNotFoundError(video_id=body.video_id)
if not video.result_movie_url:
logger.warning(f"[UPLOAD_API] 영상 URL 없음 - video_id: {body.video_id}")
raise VideoNotFoundError(
video_id=body.video_id,
detail="영상이 아직 준비되지 않았습니다. 영상 생성이 완료된 후 시도해주세요.",
)
# 2. 소셜 계정 조회 (social_account_id로 직접 조회, 소유권 검증 포함)
account = await social_account_service.get_account_by_id(
user_uuid=current_user.user_uuid,
account_id=body.social_account_id,
session=session,
)
if not account:
logger.warning(
f"[UPLOAD_API] 연동 계정 없음 - "
f"user_uuid: {current_user.user_uuid}, social_account_id: {body.social_account_id}"
)
raise SocialAccountNotFoundError()
# 3. 진행 중인 업로드 확인 (pending 또는 uploading 상태만)
in_progress_result = await session.execute(
select(SocialUpload).where(
SocialUpload.video_id == body.video_id,
SocialUpload.social_account_id == account.id,
SocialUpload.status.in_([UploadStatus.PENDING.value, UploadStatus.UPLOADING.value]),
)
)
in_progress_upload = in_progress_result.scalar_one_or_none()
if in_progress_upload:
logger.info(
f"[UPLOAD_API] 진행 중인 업로드 존재 - upload_id: {in_progress_upload.id}"
)
return SocialUploadResponse(
success=True,
upload_id=in_progress_upload.id,
platform=account.platform,
status=in_progress_upload.status,
message="이미 업로드가 진행 중입니다.",
)
# 4. 업로드 순번 계산 (동일 video + account 조합에서 최대 순번 + 1)
max_seq_result = await session.execute(
select(func.coalesce(func.max(SocialUpload.upload_seq), 0)).where(
SocialUpload.video_id == body.video_id,
SocialUpload.social_account_id == account.id,
)
)
max_seq = max_seq_result.scalar() or 0
next_seq = max_seq + 1
# 5. 새 업로드 레코드 생성 (항상 새로 생성하여 이력 보존)
social_upload = SocialUpload(
user_uuid=current_user.user_uuid,
video_id=body.video_id,
social_account_id=account.id,
upload_seq=next_seq,
platform=account.platform,
status=UploadStatus.PENDING.value,
upload_progress=0,
title=body.title,
description=body.description,
tags=body.tags,
privacy_status=body.privacy_status.value,
platform_options={
**(body.platform_options or {}),
"scheduled_at": body.scheduled_at.isoformat() if body.scheduled_at else None,
},
retry_count=0,
)
session.add(social_upload)
await session.commit()
await session.refresh(social_upload)
logger.info(
f"[UPLOAD_API] 업로드 레코드 생성 - "
f"upload_id: {social_upload.id}, video_id: {body.video_id}, "
f"account_id: {account.id}, upload_seq: {next_seq}, platform: {account.platform}"
)
# 6. 백그라운드 태스크 등록
background_tasks.add_task(process_social_upload, social_upload.id)
return SocialUploadResponse(
success=True,
upload_id=social_upload.id,
platform=account.platform,
status=social_upload.status,
message="업로드 요청이 접수되었습니다.",
)
@router.get(
@ -80,35 +198,120 @@ async def get_upload_status(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> SocialUploadStatusResponse:
return await upload_service.get_upload_status(upload_id, current_user, session)
"""
업로드 상태 조회
"""
logger.info(f"[UPLOAD_API] 상태 조회 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
return SocialUploadStatusResponse(
upload_id=upload.id,
video_id=upload.video_id,
social_account_id=upload.social_account_id,
upload_seq=upload.upload_seq,
platform=upload.platform,
status=UploadStatus(upload.status),
upload_progress=upload.upload_progress,
title=upload.title,
platform_video_id=upload.platform_video_id,
platform_url=upload.platform_url,
error_message=upload.error_message,
retry_count=upload.retry_count,
created_at=upload.created_at,
uploaded_at=upload.uploaded_at,
)
@router.get(
"/history",
response_model=SocialUploadHistoryResponse,
summary="업로드 이력 조회",
description="""
사용자의 소셜 미디어 업로드 이력을 조회합니다.
## tab 파라미터
- `all`: 전체 (기본값)
- `completed`: 완료된 업로드
- `scheduled`: 예약 업로드 (pending + scheduled_at 있음)
- `failed`: 실패한 업로드
""",
description="사용자의 소셜 미디어 업로드 이력을 조회합니다.",
)
async def get_upload_history(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
tab: str = Query("all", description="탭 필터 (all/completed/scheduled/failed)"),
platform: Optional[SocialPlatform] = Query(None, description="플랫폼 필터"),
year: Optional[int] = Query(None, description="조회 연도 (없으면 현재 연도)"),
month: Optional[int] = Query(None, ge=1, le=12, description="조회 월 (없으면 현재 월)"),
status: Optional[UploadStatus] = Query(None, description="상태 필터"),
page: int = Query(1, ge=1, description="페이지 번호"),
size: int = Query(20, ge=1, le=100, description="페이지 크기"),
) -> SocialUploadHistoryResponse:
return await upload_service.get_upload_history(
current_user, session, tab, platform, year, month, page, size
"""
업로드 이력 조회
"""
logger.info(
f"[UPLOAD_API] 이력 조회 - "
f"user_uuid: {current_user.user_uuid}, page: {page}, size: {size}"
)
# 기본 쿼리
query = select(SocialUpload).where(
SocialUpload.user_uuid == current_user.user_uuid
)
count_query = select(func.count(SocialUpload.id)).where(
SocialUpload.user_uuid == current_user.user_uuid
)
# 필터 적용
if platform:
query = query.where(SocialUpload.platform == platform.value)
count_query = count_query.where(SocialUpload.platform == platform.value)
if status:
query = query.where(SocialUpload.status == status.value)
count_query = count_query.where(SocialUpload.status == status.value)
# 총 개수 조회
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
# 페이지네이션 적용
query = (
query.order_by(SocialUpload.created_at.desc())
.offset((page - 1) * size)
.limit(size)
)
result = await session.execute(query)
uploads = result.scalars().all()
items = [
SocialUploadHistoryItem(
upload_id=upload.id,
video_id=upload.video_id,
social_account_id=upload.social_account_id,
upload_seq=upload.upload_seq,
platform=upload.platform,
status=upload.status,
title=upload.title,
platform_url=upload.platform_url,
created_at=upload.created_at,
uploaded_at=upload.uploaded_at,
)
for upload in uploads
]
return SocialUploadHistoryResponse(
items=items,
total=total,
page=page,
size=size,
)
@ -124,7 +327,53 @@ async def retry_upload(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> SocialUploadResponse:
return await upload_service.retry_upload(upload_id, current_user, session, background_tasks)
"""
업로드 재시도
실패한 업로드를 다시 시도합니다.
"""
logger.info(f"[UPLOAD_API] 재시도 요청 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
if upload.status not in [UploadStatus.FAILED.value, UploadStatus.CANCELLED.value]:
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="실패하거나 취소된 업로드만 재시도할 수 있습니다.",
)
# 상태 초기화
upload.status = UploadStatus.PENDING.value
upload.upload_progress = 0
upload.error_message = None
await session.commit()
# 백그라운드 태스크 등록
background_tasks.add_task(process_social_upload, upload.id)
return SocialUploadResponse(
success=True,
upload_id=upload.id,
platform=upload.platform,
status=upload.status,
message="업로드 재시도가 요청되었습니다.",
)
@router.delete(
@ -138,4 +387,38 @@ async def cancel_upload(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> MessageResponse:
return await upload_service.cancel_upload(upload_id, current_user, session)
"""
업로드 취소
대기 중인 업로드를 취소합니다.
이미 진행 중이거나 완료된 업로드는 취소할 없습니다.
"""
logger.info(f"[UPLOAD_API] 취소 요청 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
if upload.status != UploadStatus.PENDING.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="대기 중인 업로드만 취소할 수 있습니다.",
)
upload.status = UploadStatus.CANCELLED.value
await session.commit()
return MessageResponse(
success=True,
message="업로드가 취소되었습니다.",
)

View File

@ -190,15 +190,6 @@ class SocialUpload(Base):
comment="플랫폼별 추가 옵션 (JSON)",
)
# ==========================================================================
# 예약 게시 시간
# ==========================================================================
scheduled_at: Mapped[Optional[datetime]] = mapped_column(
DateTime,
nullable=True,
comment="예약 게시 시간 (스케줄러가 이 시간 이후에 업로드 실행)",
)
# ==========================================================================
# 에러 정보
# ==========================================================================

324
app/social/schemas.py Normal file
View File

@ -0,0 +1,324 @@
"""
Social Media Schemas
소셜 미디어 연동 관련 Pydantic 스키마를 정의합니다.
"""
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field
from app.social.constants import PrivacyStatus, SocialPlatform, UploadStatus
# =============================================================================
# OAuth 관련 스키마
# =============================================================================
class SocialConnectResponse(BaseModel):
"""소셜 계정 연동 시작 응답"""
auth_url: str = Field(..., description="OAuth 인증 URL")
state: str = Field(..., description="CSRF 방지용 state 토큰")
platform: str = Field(..., description="플랫폼명")
model_config = ConfigDict(
json_schema_extra={
"example": {
"auth_url": "https://accounts.google.com/o/oauth2/v2/auth?...",
"state": "abc123xyz",
"platform": "youtube",
}
}
)
class SocialAccountResponse(BaseModel):
"""연동된 소셜 계정 정보"""
id: int = Field(..., description="소셜 계정 ID")
platform: str = Field(..., description="플랫폼명")
platform_user_id: str = Field(..., description="플랫폼 내 사용자 ID")
platform_username: Optional[str] = Field(None, description="플랫폼 내 사용자명")
display_name: Optional[str] = Field(None, description="표시 이름")
profile_image_url: Optional[str] = Field(None, description="프로필 이미지 URL")
is_active: bool = Field(..., description="활성화 상태")
connected_at: datetime = Field(..., description="연동 일시")
platform_data: Optional[dict[str, Any]] = Field(
None, description="플랫폼별 추가 정보 (채널ID, 구독자 수 등)"
)
model_config = ConfigDict(
from_attributes=True,
json_schema_extra={
"example": {
"id": 1,
"platform": "youtube",
"platform_user_id": "UC1234567890",
"platform_username": "my_channel",
"display_name": "My Channel",
"profile_image_url": "https://...",
"is_active": True,
"connected_at": "2024-01-15T12:00:00",
"platform_data": {
"channel_id": "UC1234567890",
"channel_title": "My Channel",
"subscriber_count": 1000,
},
}
}
)
class SocialAccountListResponse(BaseModel):
"""연동된 소셜 계정 목록 응답"""
accounts: list[SocialAccountResponse] = Field(..., description="연동 계정 목록")
total: int = Field(..., description="총 연동 계정 수")
model_config = ConfigDict(
json_schema_extra={
"example": {
"accounts": [
{
"id": 1,
"platform": "youtube",
"platform_user_id": "UC1234567890",
"platform_username": "my_channel",
"display_name": "My Channel",
"is_active": True,
"connected_at": "2024-01-15T12:00:00",
}
],
"total": 1,
}
}
)
# =============================================================================
# 내부 사용 스키마 (OAuth 토큰 응답)
# =============================================================================
class OAuthTokenResponse(BaseModel):
"""OAuth 토큰 응답 (내부 사용)"""
access_token: str
refresh_token: Optional[str] = None
expires_in: int
token_type: str = "Bearer"
scope: Optional[str] = None
class PlatformUserInfo(BaseModel):
"""플랫폼 사용자 정보 (내부 사용)"""
platform_user_id: str
username: Optional[str] = None
display_name: Optional[str] = None
profile_image_url: Optional[str] = None
platform_data: dict[str, Any] = Field(default_factory=dict)
# =============================================================================
# 업로드 관련 스키마
# =============================================================================
class SocialUploadRequest(BaseModel):
"""소셜 업로드 요청"""
video_id: int = Field(..., description="업로드할 영상 ID")
social_account_id: int = Field(..., description="업로드할 소셜 계정 ID (연동 계정 목록의 id)")
title: str = Field(..., min_length=1, max_length=100, description="영상 제목")
description: Optional[str] = Field(
None, max_length=5000, description="영상 설명"
)
tags: Optional[list[str]] = Field(None, description="태그 목록 (쉼표로 구분된 문자열도 가능)")
privacy_status: PrivacyStatus = Field(
default=PrivacyStatus.PRIVATE, description="공개 상태 (public, unlisted, private)"
)
scheduled_at: Optional[datetime] = Field(
None, description="예약 게시 시간 (없으면 즉시 게시)"
)
platform_options: Optional[dict[str, Any]] = Field(
None, description="플랫폼별 추가 옵션"
)
model_config = ConfigDict(
json_schema_extra={
"example": {
"video_id": 123,
"social_account_id": 1,
"title": "도그앤조이 애견펜션 2026.02.02",
"description": "영상 설명입니다.",
"tags": ["여행", "vlog", "애견펜션"],
"privacy_status": "public",
"scheduled_at": "2026-02-02T15:00:00",
"platform_options": {
"category_id": "22", # YouTube 카테고리
},
}
}
)
class SocialUploadResponse(BaseModel):
"""소셜 업로드 요청 응답"""
success: bool = Field(..., description="요청 성공 여부")
upload_id: int = Field(..., description="업로드 작업 ID")
platform: str = Field(..., description="플랫폼명")
status: str = Field(..., description="업로드 상태")
message: str = Field(..., description="응답 메시지")
model_config = ConfigDict(
json_schema_extra={
"example": {
"success": True,
"upload_id": 456,
"platform": "youtube",
"status": "pending",
"message": "업로드 요청이 접수되었습니다.",
}
}
)
class SocialUploadStatusResponse(BaseModel):
"""업로드 상태 조회 응답"""
upload_id: int = Field(..., description="업로드 작업 ID")
video_id: int = Field(..., description="영상 ID")
social_account_id: int = Field(..., description="소셜 계정 ID")
upload_seq: int = Field(..., description="업로드 순번 (동일 영상+채널 조합 내 순번)")
platform: str = Field(..., description="플랫폼명")
status: UploadStatus = Field(..., description="업로드 상태")
upload_progress: int = Field(..., description="업로드 진행률 (0-100)")
title: str = Field(..., description="영상 제목")
platform_video_id: Optional[str] = Field(None, description="플랫폼 영상 ID")
platform_url: Optional[str] = Field(None, description="플랫폼 영상 URL")
error_message: Optional[str] = Field(None, description="에러 메시지")
retry_count: int = Field(default=0, description="재시도 횟수")
created_at: datetime = Field(..., description="생성 일시")
uploaded_at: Optional[datetime] = Field(None, description="업로드 완료 일시")
model_config = ConfigDict(
from_attributes=True,
json_schema_extra={
"example": {
"upload_id": 456,
"video_id": 123,
"social_account_id": 1,
"upload_seq": 2,
"platform": "youtube",
"status": "completed",
"upload_progress": 100,
"title": "나의 첫 영상",
"platform_video_id": "dQw4w9WgXcQ",
"platform_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
"error_message": None,
"retry_count": 0,
"created_at": "2024-01-15T12:00:00",
"uploaded_at": "2024-01-15T12:05:00",
}
}
)
class SocialUploadHistoryItem(BaseModel):
"""업로드 이력 아이템"""
upload_id: int = Field(..., description="업로드 작업 ID")
video_id: int = Field(..., description="영상 ID")
social_account_id: int = Field(..., description="소셜 계정 ID")
upload_seq: int = Field(..., description="업로드 순번 (동일 영상+채널 조합 내 순번)")
platform: str = Field(..., description="플랫폼명")
status: str = Field(..., description="업로드 상태")
title: str = Field(..., description="영상 제목")
platform_url: Optional[str] = Field(None, description="플랫폼 영상 URL")
created_at: datetime = Field(..., description="생성 일시")
uploaded_at: Optional[datetime] = Field(None, description="업로드 완료 일시")
model_config = ConfigDict(from_attributes=True)
class SocialUploadHistoryResponse(BaseModel):
"""업로드 이력 목록 응답"""
items: list[SocialUploadHistoryItem] = Field(..., description="업로드 이력 목록")
total: int = Field(..., description="전체 개수")
page: int = Field(..., description="현재 페이지")
size: int = Field(..., description="페이지 크기")
model_config = ConfigDict(
json_schema_extra={
"example": {
"items": [
{
"upload_id": 456,
"video_id": 123,
"platform": "youtube",
"status": "completed",
"title": "나의 첫 영상",
"platform_url": "https://www.youtube.com/watch?v=xxx",
"created_at": "2024-01-15T12:00:00",
"uploaded_at": "2024-01-15T12:05:00",
}
],
"total": 1,
"page": 1,
"size": 20,
}
}
)
class YoutubeDescriptionRequest(BaseModel):
"""유튜브 SEO Description 제안 (자동완성) Request 모델"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"task_id" : "019c739f-65fc-7d15-8c88-b31be00e588e"
}
}
)
task_id: str = Field(..., description="작업 고유 식별자")
class YoutubeDescriptionResponse(BaseModel):
"""유튜브 SEO Description 제안 (자동완성) Response 모델"""
title:str = Field(..., description="유튜브 영상 제목 - SEO/AEO 최적화")
description : str = Field(..., description="제안된 유튜브 SEO Description")
keywords : list[str] = Field(..., description="해시태그 리스트")
model_config = ConfigDict(
json_schema_extra={
"example": {
"title" : "여기에 더미 타이틀",
"description": "여기에 더미 텍스트",
"keywords": ["여기에", "더미", "해시태그"]
}
}
)
# =============================================================================
# 공통 응답 스키마
# =============================================================================
class MessageResponse(BaseModel):
"""단순 메시지 응답"""
success: bool = Field(..., description="성공 여부")
message: str = Field(..., description="응답 메시지")
model_config = ConfigDict(
json_schema_extra={
"example": {
"success": True,
"message": "작업이 완료되었습니다.",
}
}
)

View File

@ -1,35 +0,0 @@
from app.social.schemas.oauth_schema import (
SocialConnectResponse,
SocialAccountResponse,
SocialAccountListResponse,
OAuthTokenResponse,
PlatformUserInfo,
MessageResponse,
)
from app.social.schemas.upload_schema import (
SocialUploadRequest,
SocialUploadResponse,
SocialUploadStatusResponse,
SocialUploadHistoryItem,
SocialUploadHistoryResponse,
)
from app.social.schemas.seo_schema import (
YoutubeDescriptionRequest,
YoutubeDescriptionResponse,
)
__all__ = [
"SocialConnectResponse",
"SocialAccountResponse",
"SocialAccountListResponse",
"OAuthTokenResponse",
"PlatformUserInfo",
"MessageResponse",
"SocialUploadRequest",
"SocialUploadResponse",
"SocialUploadStatusResponse",
"SocialUploadHistoryItem",
"SocialUploadHistoryResponse",
"YoutubeDescriptionRequest",
"YoutubeDescriptionResponse",
]

View File

@ -1,125 +0,0 @@
"""
소셜 OAuth 관련 Pydantic 스키마
"""
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field
class SocialConnectResponse(BaseModel):
"""소셜 계정 연동 시작 응답"""
auth_url: str = Field(..., description="OAuth 인증 URL")
state: str = Field(..., description="CSRF 방지용 state 토큰")
platform: str = Field(..., description="플랫폼명")
model_config = ConfigDict(
json_schema_extra={
"example": {
"auth_url": "https://accounts.google.com/o/oauth2/v2/auth?...",
"state": "abc123xyz",
"platform": "youtube",
}
}
)
class SocialAccountResponse(BaseModel):
"""연동된 소셜 계정 정보"""
id: int = Field(..., description="소셜 계정 ID")
platform: str = Field(..., description="플랫폼명")
platform_user_id: str = Field(..., description="플랫폼 내 사용자 ID")
platform_username: Optional[str] = Field(None, description="플랫폼 내 사용자명")
display_name: Optional[str] = Field(None, description="표시 이름")
profile_image_url: Optional[str] = Field(None, description="프로필 이미지 URL")
is_active: bool = Field(..., description="활성화 상태")
connected_at: datetime = Field(..., description="연동 일시")
platform_data: Optional[dict[str, Any]] = Field(
None, description="플랫폼별 추가 정보 (채널ID, 구독자 수 등)"
)
model_config = ConfigDict(
from_attributes=True,
json_schema_extra={
"example": {
"id": 1,
"platform": "youtube",
"platform_user_id": "UC1234567890",
"platform_username": "my_channel",
"display_name": "My Channel",
"profile_image_url": "https://...",
"is_active": True,
"connected_at": "2024-01-15T12:00:00",
"platform_data": {
"channel_id": "UC1234567890",
"channel_title": "My Channel",
"subscriber_count": 1000,
},
}
}
)
class SocialAccountListResponse(BaseModel):
"""연동된 소셜 계정 목록 응답"""
accounts: list[SocialAccountResponse] = Field(..., description="연동 계정 목록")
total: int = Field(..., description="총 연동 계정 수")
model_config = ConfigDict(
json_schema_extra={
"example": {
"accounts": [
{
"id": 1,
"platform": "youtube",
"platform_user_id": "UC1234567890",
"platform_username": "my_channel",
"display_name": "My Channel",
"is_active": True,
"connected_at": "2024-01-15T12:00:00",
}
],
"total": 1,
}
}
)
class OAuthTokenResponse(BaseModel):
"""OAuth 토큰 응답 (내부 사용)"""
access_token: str
refresh_token: Optional[str] = None
expires_in: int
token_type: str = "Bearer"
scope: Optional[str] = None
class PlatformUserInfo(BaseModel):
"""플랫폼 사용자 정보 (내부 사용)"""
platform_user_id: str
username: Optional[str] = None
display_name: Optional[str] = None
profile_image_url: Optional[str] = None
platform_data: dict[str, Any] = Field(default_factory=dict)
class MessageResponse(BaseModel):
"""단순 메시지 응답"""
success: bool = Field(..., description="성공 여부")
message: str = Field(..., description="응답 메시지")
model_config = ConfigDict(
json_schema_extra={
"example": {
"success": True,
"message": "작업이 완료되었습니다.",
}
}
)

View File

@ -1,37 +0,0 @@
"""
소셜 SEO 관련 Pydantic 스키마
"""
from pydantic import BaseModel, ConfigDict, Field
class YoutubeDescriptionRequest(BaseModel):
"""유튜브 SEO Description 제안 요청"""
task_id: str = Field(..., description="작업 고유 식별자")
model_config = ConfigDict(
json_schema_extra={
"example": {
"task_id": "019c739f-65fc-7d15-8c88-b31be00e588e"
}
}
)
class YoutubeDescriptionResponse(BaseModel):
"""유튜브 SEO Description 제안 응답"""
title: str = Field(..., description="유튜브 영상 제목 - SEO/AEO 최적화")
description: str = Field(..., description="제안된 유튜브 SEO Description")
keywords: list[str] = Field(..., description="해시태그 리스트")
model_config = ConfigDict(
json_schema_extra={
"example": {
"title": "여기에 더미 타이틀",
"description": "여기에 더미 텍스트",
"keywords": ["여기에", "더미", "해시태그"]
}
}
)

View File

@ -1,162 +0,0 @@
"""
소셜 업로드 관련 Pydantic 스키마
"""
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field
from app.social.constants import PrivacyStatus, UploadStatus
class SocialUploadRequest(BaseModel):
"""소셜 업로드 요청"""
video_id: int = Field(..., description="업로드할 영상 ID")
social_account_id: int = Field(..., description="업로드할 소셜 계정 ID (연동 계정 목록의 id)")
title: str = Field(..., min_length=1, max_length=100, description="영상 제목")
description: Optional[str] = Field(
None, max_length=5000, description="영상 설명"
)
tags: Optional[list[str]] = Field(None, description="태그 목록 (쉼표로 구분된 문자열도 가능)")
privacy_status: PrivacyStatus = Field(
default=PrivacyStatus.PRIVATE, description="공개 상태 (public, unlisted, private)"
)
scheduled_at: Optional[datetime] = Field(
None, description="예약 게시 시간 (없으면 즉시 게시)"
)
platform_options: Optional[dict[str, Any]] = Field(
None, description="플랫폼별 추가 옵션"
)
model_config = ConfigDict(
json_schema_extra={
"example": {
"video_id": 123,
"social_account_id": 1,
"title": "도그앤조이 애견펜션 2026.02.02",
"description": "영상 설명입니다.",
"tags": ["여행", "vlog", "애견펜션"],
"privacy_status": "public",
"scheduled_at": "2026-02-02T15:00:00",
"platform_options": {
"category_id": "22",
},
}
}
)
class SocialUploadResponse(BaseModel):
"""소셜 업로드 요청 응답"""
success: bool = Field(..., description="요청 성공 여부")
upload_id: int = Field(..., description="업로드 작업 ID")
platform: str = Field(..., description="플랫폼명")
status: str = Field(..., description="업로드 상태")
message: str = Field(..., description="응답 메시지")
model_config = ConfigDict(
json_schema_extra={
"example": {
"success": True,
"upload_id": 456,
"platform": "youtube",
"status": "pending",
"message": "업로드 요청이 접수되었습니다.",
}
}
)
class SocialUploadStatusResponse(BaseModel):
"""업로드 상태 조회 응답"""
upload_id: int = Field(..., description="업로드 작업 ID")
video_id: int = Field(..., description="영상 ID")
social_account_id: int = Field(..., description="소셜 계정 ID")
upload_seq: int = Field(..., description="업로드 순번 (동일 영상+채널 조합 내 순번)")
platform: str = Field(..., description="플랫폼명")
status: UploadStatus = Field(..., description="업로드 상태")
upload_progress: int = Field(..., description="업로드 진행률 (0-100)")
title: str = Field(..., description="영상 제목")
platform_video_id: Optional[str] = Field(None, description="플랫폼 영상 ID")
platform_url: Optional[str] = Field(None, description="플랫폼 영상 URL")
error_message: Optional[str] = Field(None, description="에러 메시지")
retry_count: int = Field(default=0, description="재시도 횟수")
scheduled_at: Optional[datetime] = Field(None, description="예약 게시 시간 (있으면 예약 업로드)")
created_at: datetime = Field(..., description="생성 일시")
uploaded_at: Optional[datetime] = Field(None, description="업로드 완료 일시")
model_config = ConfigDict(
from_attributes=True,
json_schema_extra={
"example": {
"upload_id": 456,
"video_id": 123,
"social_account_id": 1,
"upload_seq": 2,
"platform": "youtube",
"status": "completed",
"upload_progress": 100,
"title": "나의 첫 영상",
"platform_video_id": "dQw4w9WgXcQ",
"platform_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
"error_message": None,
"retry_count": 0,
"created_at": "2024-01-15T12:00:00",
"uploaded_at": "2024-01-15T12:05:00",
}
}
)
class SocialUploadHistoryItem(BaseModel):
"""업로드 이력 아이템"""
upload_id: int = Field(..., description="업로드 작업 ID")
video_id: int = Field(..., description="영상 ID")
social_account_id: int = Field(..., description="소셜 계정 ID")
upload_seq: int = Field(..., description="업로드 순번 (동일 영상+채널 조합 내 순번)")
platform: str = Field(..., description="플랫폼명")
status: str = Field(..., description="업로드 상태")
title: str = Field(..., description="영상 제목")
platform_url: Optional[str] = Field(None, description="플랫폼 영상 URL")
error_message: Optional[str] = Field(None, description="에러 메시지")
scheduled_at: Optional[datetime] = Field(None, description="예약 게시 시간")
created_at: datetime = Field(..., description="생성 일시")
uploaded_at: Optional[datetime] = Field(None, description="업로드 완료 일시")
model_config = ConfigDict(from_attributes=True)
class SocialUploadHistoryResponse(BaseModel):
"""업로드 이력 목록 응답"""
items: list[SocialUploadHistoryItem] = Field(..., description="업로드 이력 목록")
total: int = Field(..., description="전체 개수")
page: int = Field(..., description="현재 페이지")
size: int = Field(..., description="페이지 크기")
model_config = ConfigDict(
json_schema_extra={
"example": {
"items": [
{
"upload_id": 456,
"video_id": 123,
"platform": "youtube",
"status": "completed",
"title": "나의 첫 영상",
"platform_url": "https://www.youtube.com/watch?v=xxx",
"created_at": "2024-01-15T12:00:00",
"uploaded_at": "2024-01-15T12:05:00",
}
],
"total": 1,
"page": 1,
"size": 20,
}
}
)

View File

@ -188,7 +188,7 @@ class SocialAccountService:
session=session,
update_connected_at=is_reactivation, # 재활성화 시에만 연결 시간 업데이트
)
return self.to_response(existing_account)
return self._to_response(existing_account)
# 5. 새 소셜 계정 저장 (기존 계정이 없는 경우에만)
social_account = await self._create_social_account(
@ -204,7 +204,7 @@ class SocialAccountService:
f"account_id: {social_account.id}, platform: {platform.value}"
)
return self.to_response(social_account)
return self._to_response(social_account)
async def get_connected_accounts(
self,
@ -241,7 +241,7 @@ class SocialAccountService:
for account in accounts:
await self._try_refresh_token(account, session)
return [self.to_response(account) for account in accounts]
return [self._to_response(account) for account in accounts]
async def refresh_all_tokens(
self,
@ -713,7 +713,7 @@ class SocialAccountService:
return account
def to_response(self, account: SocialAccount) -> SocialAccountResponse:
def _to_response(self, account: SocialAccount) -> SocialAccountResponse:
"""
SocialAccount를 SocialAccountResponse로 변환

View File

@ -1,11 +0,0 @@
from app.social.services.account_service import SocialAccountService, social_account_service
from app.social.services.upload_service import SocialUploadService
from app.social.services.seo_service import SeoService, seo_service
__all__ = [
"SocialAccountService",
"social_account_service",
"SocialUploadService",
"SeoService",
"seo_service",
]

View File

@ -1,12 +0,0 @@
"""
소셜 서비스 베이스 클래스
"""
from sqlalchemy.ext.asyncio import AsyncSession
class BaseService:
"""서비스 레이어 베이스 클래스"""
def __init__(self, session: AsyncSession | None = None):
self.session = session

View File

@ -1,129 +0,0 @@
"""
유튜브 SEO 서비스
SEO description 생성 Redis 캐싱 로직을 처리합니다.
"""
import json
import logging
from fastapi import HTTPException
from redis.asyncio import Redis
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from config import db_settings
from app.home.models import MarketingIntel, Project
from app.social.constants import YOUTUBE_SEO_HASH
from app.social.schemas import YoutubeDescriptionResponse
from app.user.models import User
from app.utils.chatgpt_prompt import ChatgptService
from app.utils.prompts.prompts import yt_upload_prompt
logger = logging.getLogger(__name__)
redis_seo_client = Redis(
host=db_settings.REDIS_HOST,
port=db_settings.REDIS_PORT,
db=0,
decode_responses=True,
)
class SeoService:
"""유튜브 SEO 비즈니스 로직 서비스"""
async def get_youtube_seo_description(
self,
task_id: str,
current_user: User,
session: AsyncSession,
) -> YoutubeDescriptionResponse:
"""
유튜브 SEO description 생성
Redis 캐시 확인 miss이면 GPT로 생성하고 캐싱.
"""
logger.info(
f"[SEO_SERVICE] Try Cache - user: {current_user.user_uuid} / task_id: {task_id}"
)
cached = await self._get_from_redis(task_id)
if cached:
return cached
logger.info(f"[SEO_SERVICE] Cache miss - user: {current_user.user_uuid}")
result = await self._generate_seo_description(task_id, current_user, session)
await self._set_to_redis(task_id, result)
return result
async def _generate_seo_description(
self,
task_id: str,
current_user: User,
session: AsyncSession,
) -> YoutubeDescriptionResponse:
"""GPT를 사용하여 SEO description 생성"""
logger.info(f"[SEO_SERVICE] Generating SEO - user: {current_user.user_uuid}")
try:
project_result = await session.execute(
select(Project)
.where(
Project.task_id == task_id,
Project.user_uuid == current_user.user_uuid,
)
.order_by(Project.created_at.desc())
.limit(1)
)
project = project_result.scalar_one_or_none()
marketing_result = await session.execute(
select(MarketingIntel).where(MarketingIntel.id == project.marketing_intelligence)
)
marketing_intelligence = marketing_result.scalar_one_or_none()
hashtags = marketing_intelligence.intel_result["target_keywords"]
yt_seo_input_data = {
"customer_name": project.store_name,
"detail_region_info": project.detail_region_info,
"marketing_intelligence_summary": json.dumps(
marketing_intelligence.intel_result, ensure_ascii=False
),
"language": project.language,
"target_keywords": hashtags,
}
chatgpt = ChatgptService(timeout=180)
yt_seo_output = await chatgpt.generate_structured_output(yt_upload_prompt, yt_seo_input_data)
return YoutubeDescriptionResponse(
title=yt_seo_output.title,
description=yt_seo_output.description,
keywords=hashtags,
)
except Exception as e:
logger.error(f"[SEO_SERVICE] EXCEPTION - error: {e}")
raise HTTPException(
status_code=500,
detail=f"유튜브 SEO 생성에 실패했습니다. : {str(e)}",
)
async def _get_from_redis(self, task_id: str) -> YoutubeDescriptionResponse | None:
field = f"task_id:{task_id}"
yt_seo_info = await redis_seo_client.hget(YOUTUBE_SEO_HASH, field)
if yt_seo_info:
return YoutubeDescriptionResponse(**json.loads(yt_seo_info))
return None
async def _set_to_redis(self, task_id: str, yt_seo: YoutubeDescriptionResponse) -> None:
field = f"task_id:{task_id}"
yt_seo_info = json.dumps(yt_seo.model_dump(), ensure_ascii=False)
await redis_seo_client.hset(YOUTUBE_SEO_HASH, field, yt_seo_info)
await redis_seo_client.expire(YOUTUBE_SEO_HASH, 3600)
seo_service = SeoService()

View File

@ -1,391 +0,0 @@
"""
소셜 업로드 서비스
업로드 요청, 상태 조회, 이력 조회, 재시도, 취소 관련 비즈니스 로직을 처리합니다.
"""
import logging
from calendar import monthrange
from datetime import datetime
from typing import Optional
from fastapi import BackgroundTasks, HTTPException, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from config import TIMEZONE
from app.social.constants import SocialPlatform, UploadStatus
from app.social.exceptions import SocialAccountNotFoundError, VideoNotFoundError
from app.social.models import SocialUpload
from app.social.schemas import (
MessageResponse,
SocialUploadHistoryItem,
SocialUploadHistoryResponse,
SocialUploadResponse,
SocialUploadStatusResponse,
SocialUploadRequest,
)
from app.social.services.account_service import SocialAccountService
from app.social.worker.upload_task import process_social_upload
from app.user.models import User
from app.video.models import Video
logger = logging.getLogger(__name__)
class SocialUploadService:
"""소셜 업로드 비즈니스 로직 서비스"""
def __init__(self, account_service: SocialAccountService):
self._account_service = account_service
async def request_upload(
self,
body: SocialUploadRequest,
current_user: User,
session: AsyncSession,
background_tasks: BackgroundTasks,
) -> SocialUploadResponse:
"""
소셜 플랫폼 업로드 요청
영상 검증, 계정 확인, 중복 확인 업로드 레코드 생성.
즉시 업로드이면 백그라운드 태스크 등록, 예약이면 스케줄러가 처리.
"""
logger.info(
f"[UPLOAD_SERVICE] 업로드 요청 - "
f"user_uuid: {current_user.user_uuid}, "
f"video_id: {body.video_id}, "
f"social_account_id: {body.social_account_id}"
)
# 1. 영상 조회 및 검증
video_result = await session.execute(
select(Video).where(Video.id == body.video_id)
)
video = video_result.scalar_one_or_none()
if not video:
logger.warning(f"[UPLOAD_SERVICE] 영상 없음 - video_id: {body.video_id}")
raise VideoNotFoundError(video_id=body.video_id)
if not video.result_movie_url:
logger.warning(f"[UPLOAD_SERVICE] 영상 URL 없음 - video_id: {body.video_id}")
raise VideoNotFoundError(
video_id=body.video_id,
detail="영상이 아직 준비되지 않았습니다. 영상 생성이 완료된 후 시도해주세요.",
)
# 2. 소셜 계정 조회 및 소유권 검증
account = await self._account_service.get_account_by_id(
user_uuid=current_user.user_uuid,
account_id=body.social_account_id,
session=session,
)
if not account:
logger.warning(
f"[UPLOAD_SERVICE] 연동 계정 없음 - "
f"user_uuid: {current_user.user_uuid}, social_account_id: {body.social_account_id}"
)
raise SocialAccountNotFoundError()
# 3. 진행 중인 업로드 확인 (pending 또는 uploading 상태만)
in_progress_result = await session.execute(
select(SocialUpload).where(
SocialUpload.video_id == body.video_id,
SocialUpload.social_account_id == account.id,
SocialUpload.status.in_([UploadStatus.PENDING.value, UploadStatus.UPLOADING.value]),
)
)
in_progress_upload = in_progress_result.scalar_one_or_none()
if in_progress_upload:
logger.info(
f"[UPLOAD_SERVICE] 진행 중인 업로드 존재 - upload_id: {in_progress_upload.id}"
)
return SocialUploadResponse(
success=True,
upload_id=in_progress_upload.id,
platform=account.platform,
status=in_progress_upload.status,
message="이미 업로드가 진행 중입니다.",
)
# 4. 업로드 순번 계산
max_seq_result = await session.execute(
select(func.coalesce(func.max(SocialUpload.upload_seq), 0)).where(
SocialUpload.video_id == body.video_id,
SocialUpload.social_account_id == account.id,
)
)
next_seq = (max_seq_result.scalar() or 0) + 1
# 5. 새 업로드 레코드 생성
social_upload = SocialUpload(
user_uuid=current_user.user_uuid,
video_id=body.video_id,
social_account_id=account.id,
upload_seq=next_seq,
platform=account.platform,
status=UploadStatus.PENDING.value,
upload_progress=0,
title=body.title,
description=body.description,
tags=body.tags,
privacy_status=body.privacy_status.value,
scheduled_at=body.scheduled_at,
platform_options={
**(body.platform_options or {}),
"scheduled_at": body.scheduled_at.isoformat() if body.scheduled_at else None,
},
retry_count=0,
)
session.add(social_upload)
await session.commit()
await session.refresh(social_upload)
logger.info(
f"[UPLOAD_SERVICE] 업로드 레코드 생성 - "
f"upload_id: {social_upload.id}, video_id: {body.video_id}, "
f"account_id: {account.id}, upload_seq: {next_seq}, platform: {account.platform}"
)
# 6. 즉시 업로드이면 백그라운드 태스크 등록
now_kst_naive = datetime.now(TIMEZONE).replace(tzinfo=None)
is_scheduled = body.scheduled_at and body.scheduled_at > now_kst_naive
if not is_scheduled:
background_tasks.add_task(process_social_upload, social_upload.id)
message = "예약 업로드가 등록되었습니다." if is_scheduled else "업로드 요청이 접수되었습니다."
return SocialUploadResponse(
success=True,
upload_id=social_upload.id,
platform=account.platform,
status=social_upload.status,
message=message,
)
async def get_upload_status(
self,
upload_id: int,
current_user: User,
session: AsyncSession,
) -> SocialUploadStatusResponse:
"""업로드 상태 조회"""
logger.info(f"[UPLOAD_SERVICE] 상태 조회 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
return SocialUploadStatusResponse(
upload_id=upload.id,
video_id=upload.video_id,
social_account_id=upload.social_account_id,
upload_seq=upload.upload_seq,
platform=upload.platform,
status=UploadStatus(upload.status),
upload_progress=upload.upload_progress,
title=upload.title,
platform_video_id=upload.platform_video_id,
platform_url=upload.platform_url,
error_message=upload.error_message,
retry_count=upload.retry_count,
scheduled_at=upload.scheduled_at,
created_at=upload.created_at,
uploaded_at=upload.uploaded_at,
)
async def get_upload_history(
self,
current_user: User,
session: AsyncSession,
tab: str = "all",
platform: Optional[SocialPlatform] = None,
year: Optional[int] = None,
month: Optional[int] = None,
page: int = 1,
size: int = 20,
) -> SocialUploadHistoryResponse:
"""업로드 이력 조회 (탭/년월/플랫폼 필터, 페이지네이션)"""
now_kst = datetime.now(TIMEZONE)
target_year = year or now_kst.year
target_month = month or now_kst.month
logger.info(
f"[UPLOAD_SERVICE] 이력 조회 - "
f"user_uuid: {current_user.user_uuid}, tab: {tab}, "
f"year: {target_year}, month: {target_month}, page: {page}, size: {size}"
)
# 월 범위 계산
last_day = monthrange(target_year, target_month)[1]
month_start = datetime(target_year, target_month, 1, 0, 0, 0)
month_end = datetime(target_year, target_month, last_day, 23, 59, 59)
# 기본 쿼리 (cancelled 제외)
base_conditions = [
SocialUpload.user_uuid == current_user.user_uuid,
SocialUpload.created_at >= month_start,
SocialUpload.created_at <= month_end,
SocialUpload.status != UploadStatus.CANCELLED.value,
]
query = select(SocialUpload).where(*base_conditions)
count_query = select(func.count(SocialUpload.id)).where(*base_conditions)
# 탭 필터 적용
if tab == "completed":
query = query.where(SocialUpload.status == UploadStatus.COMPLETED.value)
count_query = count_query.where(SocialUpload.status == UploadStatus.COMPLETED.value)
elif tab == "scheduled":
query = query.where(
SocialUpload.status == UploadStatus.PENDING.value,
SocialUpload.scheduled_at.isnot(None),
)
count_query = count_query.where(
SocialUpload.status == UploadStatus.PENDING.value,
SocialUpload.scheduled_at.isnot(None),
)
elif tab == "failed":
query = query.where(SocialUpload.status == UploadStatus.FAILED.value)
count_query = count_query.where(SocialUpload.status == UploadStatus.FAILED.value)
# 플랫폼 필터 적용
if platform:
query = query.where(SocialUpload.platform == platform.value)
count_query = count_query.where(SocialUpload.platform == platform.value)
# 총 개수 조회
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
# 페이지네이션 적용
query = (
query.order_by(SocialUpload.created_at.desc())
.offset((page - 1) * size)
.limit(size)
)
result = await session.execute(query)
uploads = result.scalars().all()
items = [
SocialUploadHistoryItem(
upload_id=upload.id,
video_id=upload.video_id,
social_account_id=upload.social_account_id,
upload_seq=upload.upload_seq,
platform=upload.platform,
status=upload.status,
title=upload.title,
platform_url=upload.platform_url,
error_message=upload.error_message,
scheduled_at=upload.scheduled_at,
created_at=upload.created_at,
uploaded_at=upload.uploaded_at,
)
for upload in uploads
]
return SocialUploadHistoryResponse(
items=items,
total=total,
page=page,
size=size,
)
async def retry_upload(
self,
upload_id: int,
current_user: User,
session: AsyncSession,
background_tasks: BackgroundTasks,
) -> SocialUploadResponse:
"""실패한 업로드 재시도"""
logger.info(f"[UPLOAD_SERVICE] 재시도 요청 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
if upload.status not in [UploadStatus.FAILED.value, UploadStatus.CANCELLED.value]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="실패하거나 취소된 업로드만 재시도할 수 있습니다.",
)
# 상태 초기화
upload.status = UploadStatus.PENDING.value
upload.upload_progress = 0
upload.error_message = None
await session.commit()
background_tasks.add_task(process_social_upload, upload.id)
return SocialUploadResponse(
success=True,
upload_id=upload.id,
platform=upload.platform,
status=upload.status,
message="업로드 재시도가 요청되었습니다.",
)
async def cancel_upload(
self,
upload_id: int,
current_user: User,
session: AsyncSession,
) -> MessageResponse:
"""대기 중인 업로드 취소"""
logger.info(f"[UPLOAD_SERVICE] 취소 요청 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
if upload.status != UploadStatus.PENDING.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="대기 중인 업로드만 취소할 수 있습니다.",
)
upload.status = UploadStatus.CANCELLED.value
await session.commit()
return MessageResponse(
success=True,
message="업로드가 취소되었습니다.",
)

View File

@ -26,7 +26,7 @@ Act as a Senior Brand Strategist and Marketing Data Analyst. Your goal is to ana
### 3. target_persona
Generate a list of personas based on the following:
* **`persona`**: Provide a descriptive name and profile for the target group. Must be **20 characters or fewer**.
* **`persona`**: Provide a descriptive name and profile for the target group.
* **`age`**: Set `min_age` and `max_age` (Integer 0-100) that accurately reflects the segment.
* **`favor_target`**: List specific elements or vibes this persona prefers (e.g., "Minimalist interior", "Pet-friendly facilities").
* **`decision_trigger`**: Identify the specific "Hook" or facility that leads this persona to finalize a booking.

View File

@ -566,17 +566,6 @@ class SocialOAuthSettings(BaseSettings):
model_config = _base_config
class InternalSettings(BaseSettings):
"""내부 서버 간 통신 설정"""
INTERNAL_SECRET_KEY: str = Field(
default="change-me-internal-secret-key",
description="스케줄러 서버 → 백엔드 내부 API 인증 키",
)
model_config = _base_config
class SocialUploadSettings(BaseSettings):
"""소셜 미디어 업로드 설정
@ -624,5 +613,4 @@ kakao_settings = KakaoSettings()
jwt_settings = JWTSettings()
recovery_settings = RecoverySettings()
social_oauth_settings = SocialOAuthSettings()
internal_settings = InternalSettings()
social_upload_settings = SocialUploadSettings()

View File

@ -1,9 +0,0 @@
-- ============================================================
-- Migration: social_upload 테이블에 scheduled_at 컬럼 추가
-- Date: 2026-03-05
-- Description: SNS 예약 업로드 스케줄러를 위한 예약 게시 시간 컬럼 추가
-- ============================================================
ALTER TABLE social_upload
ADD COLUMN scheduled_at DATETIME NULL COMMENT '예약 게시 시간 (스케줄러가 이 시간 이후에 업로드 실행)'
AFTER privacy_status;

View File

@ -23,7 +23,6 @@ from app.video.api.routers.v1.video import router as video_router
from app.social.api.routers.v1.oauth import router as social_oauth_router
from app.social.api.routers.v1.upload import router as social_upload_router
from app.social.api.routers.v1.seo import router as social_seo_router
from app.social.api.routers.v1.internal import router as social_internal_router
from app.utils.cors import CustomCORSMiddleware
from config import prj_settings
@ -392,7 +391,6 @@ app.include_router(archive_router) # Archive API 라우터 추가
app.include_router(social_oauth_router, prefix="/social") # Social OAuth 라우터 추가
app.include_router(social_upload_router, prefix="/social") # Social Upload 라우터 추가
app.include_router(social_seo_router, prefix="/social") # Social Upload 라우터 추가
app.include_router(social_internal_router) # 내부 스케줄러 전용 라우터
app.include_router(sns_router) # SNS API 라우터 추가
app.include_router(dashboard_router) # Dashboard API 라우터 추가

422
token_log_plan.md Normal file
View File

@ -0,0 +1,422 @@
# 서버 JWT 토큰 라이프사이클 로깅 강화 계획
## 1. 토큰 라이프사이클 개요
서버가 직접 발급/관리하는 JWT 토큰의 전체 흐름:
```
[발급] kakao_login() / generate_test_token()
├── Access Token 생성 (sub=user_uuid, type=access, exp=60분)
├── Refresh Token 생성 (sub=user_uuid, type=refresh, exp=7일)
└── Refresh Token DB 저장 (token_hash, user_id, expires_at)
[검증] get_current_user() — 매 요청마다 Access Token 검증
├── Bearer 헤더에서 토큰 추출
├── decode_token() → payload (sub, type, exp)
├── type == "access" 확인
└── user_uuid로 사용자 조회/활성 확인
[갱신] refresh_tokens() — Access Token 만료 시 Refresh Token으로 갱신
├── 기존 Refresh Token 디코딩 → payload
├── token_hash로 DB 조회 → is_revoked / expires_at 확인
├── 기존 Refresh Token 폐기 (is_revoked=True)
├── 새 Access Token + 새 Refresh Token 발급
└── 새 Refresh Token DB 저장
[폐기] logout() / logout_all()
├── 단일: token_hash로 해당 Refresh Token 폐기
└── 전체: user_id로 모든 Refresh Token 폐기
```
---
## 2. 현황 분석 — 로깅 공백 지점
### 발급 단계
| 위치 | 함수 | 현재 로깅 | 부족한 정보 |
|------|------|----------|------------|
| `jwt.py` | `create_access_token()` | 없음 | 발급 대상(user_uuid), 만료시간 |
| `jwt.py` | `create_refresh_token()` | 없음 | 발급 대상(user_uuid), 만료시간 |
| `auth.py` (service) | `_save_refresh_token()` | 없음 | DB 저장 결과(token_hash, expires_at) |
| `auth.py` (service) | `kakao_login()` | `debug`로 토큰 앞 30자 출력 | 충분 (변경 불필요) |
### 검증 단계
| 위치 | 함수 | 현재 로깅 | 부족한 정보 |
|------|------|----------|------------|
| `jwt.py` | `decode_token()` | 없음 | 디코딩 성공 시 payload 내용, 실패 시 원인 |
| `auth.py` (dependency) | `get_current_user()` | 없음 | 검증 각 단계 통과/실패 사유, 토큰 내 정보 |
| `auth.py` (dependency) | `get_current_user_optional()` | 없음 | 위와 동일 |
### 갱신 단계
| 위치 | 함수 | 현재 로깅 | 부족한 정보 |
|------|------|----------|------------|
| `auth.py` (router) | `refresh_token()` | 없음 | 수신 토큰 정보, 갱신 결과 |
| `auth.py` (service) | `refresh_tokens()` | 진입/완료 `info` 1줄씩 | 각 단계 실패 사유, DB 토큰 상태, 신규 토큰 정보 |
### 폐기 단계
| 위치 | 함수 | 현재 로깅 | 부족한 정보 |
|------|------|----------|------------|
| `auth.py` (router) | `logout()`, `logout_all()` | 없음 | 요청 수신, 대상 사용자 |
| `auth.py` (service) | `logout()`, `logout_all()` | 없음 | 폐기 대상, 폐기 결과 |
---
## 3. 수정 대상 파일
| # | 파일 | 수정 내용 |
|---|------|----------|
| 1 | `app/user/services/jwt.py` | 토큰 발급 로그 + `decode_token()` 실패 원인 분류 |
| 2 | `app/user/dependencies/auth.py` | Access Token 검증 과정 로깅 |
| 3 | `app/user/services/auth.py` | `refresh_tokens()`, `_save_refresh_token()`, `logout()`, `logout_all()` 로깅 |
| 4 | `app/user/api/routers/v1/auth.py` | `refresh_token()`, `logout()`, `logout_all()` 라우터 로깅 |
---
## 4. 상세 구현 계획
### 4-1. `jwt.py` — 토큰 발급 로그 + 디코딩 실패 원인 분류
**import 추가:**
```python
import logging
from jose import JWTError, ExpiredSignatureError, JWTClaimsError, jwt
logger = logging.getLogger(__name__)
```
**`create_access_token()` — 발급 로그 추가:**
```python
def create_access_token(user_uuid: str) -> str:
expire = now() + timedelta(minutes=jwt_settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"sub": user_uuid, "exp": expire, "type": "access"}
token = jwt.encode(to_encode, jwt_settings.JWT_SECRET, algorithm=jwt_settings.JWT_ALGORITHM)
logger.debug(f"[JWT] Access Token 발급 - user_uuid: {user_uuid}, expires: {expire}, token: ...{token[-20:]}")
return token
```
**`create_refresh_token()` — 발급 로그 추가:**
```python
def create_refresh_token(user_uuid: str) -> str:
expire = now() + timedelta(days=jwt_settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS)
to_encode = {"sub": user_uuid, "exp": expire, "type": "refresh"}
token = jwt.encode(to_encode, jwt_settings.JWT_SECRET, algorithm=jwt_settings.JWT_ALGORITHM)
logger.debug(f"[JWT] Refresh Token 발급 - user_uuid: {user_uuid}, expires: {expire}, token: ...{token[-20:]}")
return token
```
**`decode_token()` — 성공/실패 분류 로그:**
```python
def decode_token(token: str) -> Optional[dict]:
try:
payload = jwt.decode(token, jwt_settings.JWT_SECRET, algorithms=[jwt_settings.JWT_ALGORITHM])
logger.debug(
f"[JWT] 토큰 디코딩 성공 - type: {payload.get('type')}, "
f"sub: {payload.get('sub')}, exp: {payload.get('exp')}, "
f"token: ...{token[-20:]}"
)
return payload
except ExpiredSignatureError:
logger.info(f"[JWT] 토큰 만료 - token: ...{token[-20:]}")
return None
except JWTClaimsError as e:
logger.warning(f"[JWT] 클레임 검증 실패 - error: {e}, token: ...{token[-20:]}")
return None
except JWTError as e:
logger.warning(f"[JWT] 토큰 디코딩 실패 - error: {type(e).__name__}: {e}, token: ...{token[-20:]}")
return None
```
### 4-2. `dependencies/auth.py` — Access Token 검증 로깅
**import 추가:**
```python
import logging
logger = logging.getLogger(__name__)
```
**`get_current_user()` — 검증 과정 로그:**
```python
async def get_current_user(...) -> User:
if credentials is None:
logger.info("[AUTH-DEP] 토큰 없음 - MissingTokenError")
raise MissingTokenError()
token = credentials.credentials
logger.debug(f"[AUTH-DEP] Access Token 검증 시작 - token: ...{token[-20:]}")
payload = decode_token(token)
if payload is None:
logger.warning(f"[AUTH-DEP] Access Token 디코딩 실패 - token: ...{token[-20:]}")
raise InvalidTokenError()
if payload.get("type") != "access":
logger.warning(f"[AUTH-DEP] 토큰 타입 불일치 - expected: access, got: {payload.get('type')}, sub: {payload.get('sub')}")
raise InvalidTokenError("액세스 토큰이 아닙니다.")
user_uuid = payload.get("sub")
if user_uuid is None:
logger.warning(f"[AUTH-DEP] 토큰에 sub 클레임 없음 - token: ...{token[-20:]}")
raise InvalidTokenError()
# 사용자 조회
result = await session.execute(...)
user = result.scalar_one_or_none()
if user is None:
logger.warning(f"[AUTH-DEP] 사용자 미존재 - user_uuid: {user_uuid}")
raise UserNotFoundError()
if not user.is_active:
logger.warning(f"[AUTH-DEP] 비활성 사용자 접근 - user_uuid: {user_uuid}, user_id: {user.id}")
raise UserInactiveError()
logger.debug(f"[AUTH-DEP] Access Token 검증 성공 - user_uuid: {user_uuid}, user_id: {user.id}")
return user
```
**`get_current_user_optional()` — 동일 패턴, `debug` 레벨:**
```python
async def get_current_user_optional(...) -> Optional[User]:
if credentials is None:
logger.debug("[AUTH-DEP] 선택적 인증 - 토큰 없음")
return None
token = credentials.credentials
payload = decode_token(token)
if payload is None:
logger.debug(f"[AUTH-DEP] 선택적 인증 - 디코딩 실패, token: ...{token[-20:]}")
return None
if payload.get("type") != "access":
logger.debug(f"[AUTH-DEP] 선택적 인증 - 타입 불일치 (type={payload.get('type')})")
return None
user_uuid = payload.get("sub")
if user_uuid is None:
logger.debug("[AUTH-DEP] 선택적 인증 - sub 없음")
return None
result = await session.execute(...)
user = result.scalar_one_or_none()
if user is None or not user.is_active:
logger.debug(f"[AUTH-DEP] 선택적 인증 - 사용자 미존재 또는 비활성, user_uuid: {user_uuid}")
return None
logger.debug(f"[AUTH-DEP] 선택적 인증 성공 - user_uuid: {user_uuid}, user_id: {user.id}")
return user
```
### 4-3. `services/auth.py` — Refresh Token 갱신/폐기 로깅
**`refresh_tokens()` — 전체 흐름 로그:**
```python
async def refresh_tokens(self, refresh_token: str, session: AsyncSession) -> TokenResponse:
logger.info(f"[AUTH] 토큰 갱신 시작 (Rotation) - token: ...{refresh_token[-20:]}")
# 1. 디코딩
payload = decode_token(refresh_token)
if payload is None:
logger.warning(f"[AUTH] 토큰 갱신 실패 [1/8 디코딩] - token: ...{refresh_token[-20:]}")
raise InvalidTokenError()
if payload.get("type") != "refresh":
logger.warning(f"[AUTH] 토큰 갱신 실패 [1/8 타입] - type={payload.get('type')}, sub: {payload.get('sub')}")
raise InvalidTokenError("리프레시 토큰이 아닙니다.")
logger.debug(f"[AUTH] 토큰 갱신 [1/8] 디코딩 성공 - sub: {payload.get('sub')}, exp: {payload.get('exp')}")
# 2. DB 조회
token_hash = get_token_hash(refresh_token)
db_token = await self._get_refresh_token_by_hash(token_hash, session)
if db_token is None:
logger.warning(f"[AUTH] 토큰 갱신 실패 [2/8 DB조회] - DB에 없음, token_hash: {token_hash[:16]}...")
raise InvalidTokenError()
logger.debug(f"[AUTH] 토큰 갱신 [2/8] DB 조회 성공 - token_hash: {token_hash[:16]}..., user_uuid: {db_token.user_uuid}, is_revoked: {db_token.is_revoked}, expires_at: {db_token.expires_at}")
# 3. 폐기 여부
if db_token.is_revoked:
logger.warning(f"[AUTH] 토큰 갱신 실패 [3/8 폐기됨] - 이미 폐기된 토큰 (replay attack 의심), token_hash: {token_hash[:16]}..., user_uuid: {db_token.user_uuid}, revoked_at: {db_token.revoked_at}")
raise TokenRevokedError()
# 4. 만료 확인
if db_token.expires_at < now().replace(tzinfo=None):
logger.info(f"[AUTH] 토큰 갱신 실패 [4/8 만료] - expires_at: {db_token.expires_at}, user_uuid: {db_token.user_uuid}")
raise TokenExpiredError()
# 5. 사용자 확인
user_uuid = payload.get("sub")
user = await self._get_user_by_uuid(user_uuid, session)
if user is None:
logger.warning(f"[AUTH] 토큰 갱신 실패 [5/8 사용자] - 사용자 미존재, user_uuid: {user_uuid}")
raise UserNotFoundError()
if not user.is_active:
logger.warning(f"[AUTH] 토큰 갱신 실패 [5/8 비활성] - user_uuid: {user_uuid}, user_id: {user.id}")
raise UserInactiveError()
# 6. 기존 토큰 폐기
db_token.is_revoked = True
db_token.revoked_at = now().replace(tzinfo=None)
logger.debug(f"[AUTH] 토큰 갱신 [6/8] 기존 토큰 폐기 - token_hash: {token_hash[:16]}...")
# 7. 새 토큰 발급
new_access_token = create_access_token(user.user_uuid)
new_refresh_token = create_refresh_token(user.user_uuid)
logger.debug(f"[AUTH] 토큰 갱신 [7/8] 새 토큰 발급 - new_access: ...{new_access_token[-20:]}, new_refresh: ...{new_refresh_token[-20:]}")
# 8. 새 Refresh Token DB 저장 + 커밋
await self._save_refresh_token(user_id=user.id, user_uuid=user.user_uuid, token=new_refresh_token, session=session)
await session.commit()
logger.info(f"[AUTH] 토큰 갱신 완료 [8/8] - user_uuid: {user.user_uuid}, user_id: {user.id}, old_hash: {token_hash[:16]}..., new_refresh: ...{new_refresh_token[-20:]}")
return TokenResponse(...)
```
**`_save_refresh_token()` — DB 저장 로그:**
```python
async def _save_refresh_token(self, ...) -> RefreshToken:
token_hash = get_token_hash(token)
expires_at = get_refresh_token_expires_at()
refresh_token = RefreshToken(...)
session.add(refresh_token)
await session.flush()
logger.debug(f"[AUTH] Refresh Token DB 저장 - user_uuid: {user_uuid}, token_hash: {token_hash[:16]}..., expires_at: {expires_at}")
return refresh_token
```
**`logout()` — 단일 로그아웃 로그:**
```python
async def logout(self, user_id: int, refresh_token: str, session: AsyncSession) -> None:
token_hash = get_token_hash(refresh_token)
logger.info(f"[AUTH] 로그아웃 - user_id: {user_id}, token_hash: {token_hash[:16]}..., token: ...{refresh_token[-20:]}")
await self._revoke_refresh_token_by_hash(token_hash, session)
logger.info(f"[AUTH] 로그아웃 완료 - user_id: {user_id}")
```
**`logout_all()` — 전체 로그아웃 로그:**
```python
async def logout_all(self, user_id: int, session: AsyncSession) -> None:
logger.info(f"[AUTH] 전체 로그아웃 - user_id: {user_id}")
await self._revoke_all_user_tokens(user_id, session)
logger.info(f"[AUTH] 전체 로그아웃 완료 - user_id: {user_id}")
```
### 4-4. `routers/v1/auth.py` — 라우터 진입/완료 로깅
```python
# POST /auth/refresh
async def refresh_token(body, session) -> TokenResponse:
logger.info(f"[ROUTER] POST /auth/refresh - token: ...{body.refresh_token[-20:]}")
result = await auth_service.refresh_tokens(refresh_token=body.refresh_token, session=session)
logger.info(f"[ROUTER] POST /auth/refresh 완료 - new_access: ...{result.access_token[-20:]}, new_refresh: ...{result.refresh_token[-20:]}")
return result
# POST /auth/logout
async def logout(body, current_user, session) -> Response:
logger.info(f"[ROUTER] POST /auth/logout - user_id: {current_user.id}, user_uuid: {current_user.user_uuid}, token: ...{body.refresh_token[-20:]}")
await auth_service.logout(user_id=current_user.id, refresh_token=body.refresh_token, session=session)
logger.info(f"[ROUTER] POST /auth/logout 완료 - user_id: {current_user.id}")
return Response(status_code=status.HTTP_204_NO_CONTENT)
# POST /auth/logout/all
async def logout_all(current_user, session) -> Response:
logger.info(f"[ROUTER] POST /auth/logout/all - user_id: {current_user.id}, user_uuid: {current_user.user_uuid}")
await auth_service.logout_all(user_id=current_user.id, session=session)
logger.info(f"[ROUTER] POST /auth/logout/all 완료 - user_id: {current_user.id}")
return Response(status_code=status.HTTP_204_NO_CONTENT)
```
---
## 5. 보안 원칙
| 원칙 | 적용 방법 | 이유 |
|------|----------|------|
| 토큰 전체 노출 금지 | 뒷 20자만: `...{token[-20:]}` | 토큰 탈취 시 세션 하이재킹 가능 |
| 해시값 부분 노출 | 앞 16자만: `{hash[:16]}...` | DB 레코드 식별에 충분 |
| user_uuid 전체 허용 | 전체 출력 | 내부 식별자, 토큰이 아님 |
| 페이로드 내용 출력 | `sub`, `type`, `exp` 출력 | 디버깅에 필수, 민감정보 아님 |
| DB 토큰 상태 출력 | `is_revoked`, `expires_at`, `revoked_at` | 토큰 라이프사이클 추적 |
| 로그 레벨 구분 | 하단 표 참조 | 운영 환경에서 불필요한 로그 억제 |
### 로그 레벨 기준
| 레벨 | 사용 기준 | 예시 |
|------|----------|------|
| `debug` | 정상 처리 과정 상세 (운영환경에서 비활성) | 토큰 발급, 디코딩 성공, 검증 통과 |
| `info` | 주요 이벤트 (운영환경에서 활성) | 갱신 시작/완료, 로그아웃, 만료로 인한 실패 |
| `warning` | 비정상/의심 상황 | 디코딩 실패, 폐기된 토큰 사용, 사용자 미존재 |
---
## 6. 구현 순서
| 순서 | 파일 | 이유 |
|------|------|------|
| 1 | `app/user/services/jwt.py` | 최하위 유틸리티. 토큰 발급/디코딩의 기본 로그 |
| 2 | `app/user/dependencies/auth.py` | 모든 인증 API의 공통 진입점 |
| 3 | `app/user/services/auth.py` | 갱신/폐기 비즈니스 로직 |
| 4 | `app/user/api/routers/v1/auth.py` | 라우터 진입/완료 + 응답 토큰 정보 |
---
## 7. 기대 효과 — 시나리오별 로그 출력 예시
### 시나리오 1: 정상 토큰 갱신
```
[ROUTER] POST /auth/refresh - token: ...7d90-aac8-ecf1385c
[AUTH] 토큰 갱신 시작 (Rotation) - token: ...7d90-aac8-ecf1385c
[JWT] 토큰 디코딩 성공 - type: refresh, sub: 019c5452-b1cf-7d90-aac8-ecf1385c9dc4, exp: 1739450400
[AUTH] 토큰 갱신 [1/8] 디코딩 성공 - sub: 019c5452-..., exp: 1739450400
[AUTH] 토큰 갱신 [2/8] DB 조회 성공 - token_hash: a1b2c3d4e5f6g7h8..., is_revoked: False, expires_at: 2026-02-20 11:46:36
[AUTH] 토큰 갱신 [6/8] 기존 토큰 폐기 - token_hash: a1b2c3d4e5f6g7h8...
[JWT] Access Token 발급 - user_uuid: 019c5452-..., expires: 2026-02-13 12:46:36
[JWT] Refresh Token 발급 - user_uuid: 019c5452-..., expires: 2026-02-20 11:46:36
[AUTH] 토큰 갱신 [7/8] 새 토큰 발급 - new_access: ...xNewAccess12345, new_refresh: ...xNewRefresh6789
[AUTH] Refresh Token DB 저장 - user_uuid: 019c5452-..., token_hash: f8e9d0c1b2a3..., expires_at: 2026-02-20 11:46:36
[AUTH] 토큰 갱신 완료 [8/8] - user_uuid: 019c5452-..., user_id: 42, old_hash: a1b2c3d4e5f6g7h8..., new_refresh: ...xNewRefresh6789
[ROUTER] POST /auth/refresh 완료 - new_access: ...xNewAccess12345, new_refresh: ...xNewRefresh6789
```
### 시나리오 2: 만료된 Refresh Token으로 갱신 시도
```
[ROUTER] POST /auth/refresh - token: ...expiredToken12345
[AUTH] 토큰 갱신 시작 (Rotation) - token: ...expiredToken12345
[JWT] 토큰 만료 - token: ...expiredToken12345
[AUTH] 토큰 갱신 실패 [1/8 디코딩] - token: ...expiredToken12345
→ 401 InvalidTokenError 응답
```
### 시나리오 3: 이미 폐기된 Refresh Token 재사용 (Replay Attack)
```
[ROUTER] POST /auth/refresh - token: ...revokedToken98765
[AUTH] 토큰 갱신 시작 (Rotation) - token: ...revokedToken98765
[JWT] 토큰 디코딩 성공 - type: refresh, sub: 019c5452-..., exp: 1739450400
[AUTH] 토큰 갱신 [2/8] DB 조회 성공 - token_hash: c3d4e5f6..., is_revoked: True, expires_at: 2026-02-20
[AUTH] 토큰 갱신 실패 [3/8 폐기됨] - replay attack 의심, token_hash: c3d4e5f6..., user_uuid: 019c5452-..., revoked_at: 2026-02-13 10:30:00
→ 401 TokenRevokedError 응답
```
### 시나리오 4: Access Token 검증 (매 API 요청)
```
[AUTH-DEP] Access Token 검증 시작 - token: ...validAccess12345
[JWT] 토큰 디코딩 성공 - type: access, sub: 019c5452-..., exp: 1739450400
[AUTH-DEP] Access Token 검증 성공 - user_uuid: 019c5452-..., user_id: 42
```
### 시나리오 5: 로그아웃
```
[ROUTER] POST /auth/logout - user_id: 42, user_uuid: 019c5452-..., token: ...refreshToRevoke99
[AUTH] 로그아웃 - user_id: 42, token_hash: d5e6f7g8..., token: ...refreshToRevoke99
[AUTH] 로그아웃 완료 - user_id: 42
[ROUTER] POST /auth/logout 완료 - user_id: 42
```