o2o-castad-backend/app/dashboard/api/routers/v1/dashboard.py

528 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
Dashboard API 라우터
YouTube Analytics 기반 대시보드 통계를 제공합니다.
"""
import json
import logging
from datetime import date, datetime, timedelta
from typing import Literal
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dashboard.exceptions import (
YouTubeAccountNotConnectedError,
YouTubeAccountNotFoundError,
YouTubeAccountSelectionRequiredError,
YouTubeTokenExpiredError,
)
from app.dashboard.schemas import (
AudienceData,
CacheDeleteResponse,
ConnectedAccount,
ConnectedAccountsResponse,
ContentMetric,
DashboardResponse,
TopContent,
)
from app.dashboard.services import DataProcessor, YouTubeAnalyticsService
from app.dashboard.redis_cache import (
delete_cache,
delete_cache_pattern,
get_cache,
set_cache,
)
from app.database.session import get_session
from app.dashboard.models import Dashboard
from app.social.exceptions import TokenExpiredError
from app.social.services import SocialAccountService
from app.user.dependencies.auth import get_current_user
from app.user.models import SocialAccount, User
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/dashboard", tags=["Dashboard"])
@router.get(
"/accounts",
response_model=ConnectedAccountsResponse,
summary="연결된 소셜 계정 목록 조회",
description="""
연결된 소셜 계정 목록을 반환합니다.
여러 계정이 연결된 경우, 반환된 `platformUserId` 값을 `/dashboard/stats?platform_user_id=<값>`에 전달하여 계정을 선택합니다.
""",
)
async def get_connected_accounts(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> ConnectedAccountsResponse:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
accounts_raw = result.scalars().all()
# platform_user_id 기준
seen_platform_ids: set[str] = set()
connected = []
for acc in sorted(
accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id in seen_platform_ids:
continue
seen_platform_ids.add(acc.platform_user_id)
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
connected.append(
ConnectedAccount(
id=acc.id,
platform=acc.platform,
platform_username=acc.platform_username,
platform_user_id=acc.platform_user_id,
channel_title=data.get("channel_title"),
connected_at=acc.connected_at,
is_active=acc.is_active,
)
)
logger.info(
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
)
return ConnectedAccountsResponse(accounts=connected)
@router.get(
"/stats",
response_model=DashboardResponse,
summary="대시보드 통계 조회",
description="""
YouTube Analytics API를 활용한 대시보드 통계를 조회합니다.
## 주요 기능
- 최근 30개 업로드 영상 기준 통계 제공
- KPI 지표: 조회수, 시청시간, 평균 시청시간, 신규 구독자, 좋아요, 댓글, 공유, 업로드 영상
- 월별 추이: 최근 12개월 vs 이전 12개월 비교
- 인기 영상 TOP 4
- 시청자 분석: 연령/성별/지역 분포
## 성능 최적화
- 7개 YouTube Analytics API를 병렬로 호출
- Redis 캐싱 적용 (TTL: 12시간)
## 사전 조건
- YouTube 계정이 연동되어 있어야 합니다
## 조회 모드
- `day`: 최근 30일 통계 (현재 날짜 -2일 기준)
- `month`: 최근 12개월 통계 (현재 날짜 -2일 기준, 기본값)
## 데이터 특성
- **지연 시간**: 48시간 (2일) - 2월 14일 요청 시 2월 12일 자까지 확정
- **업데이트 주기**: 하루 1회 (PT 자정, 한국 시간 오후 5~8시)
- **실시간 아님**: 전날 데이터가 다음날 확정됩니다
""",
)
async def get_dashboard_stats(
mode: Literal["day", "month"] = Query(
default="month",
description="조회 모드: day(최근 30일), month(최근 12개월)",
),
platform_user_id: str | None = Query(
default=None,
description="사용할 YouTube 채널 ID (platform_user_id)",
),
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> DashboardResponse:
"""
대시보드 통계 조회
Args:
mode: 조회 모드 (day: 최근 30일, month: 최근 12개월)
platform_user_id: 사용할 YouTube 채널 ID (여러 계정 연결 시 필수, 재연동해도 불변)
current_user: 현재 인증된 사용자
session: 데이터베이스 세션
Returns:
DashboardResponse: 대시보드 통계 데이터
Raises:
YouTubeAccountNotConnectedError: YouTube 계정이 연동되어 있지 않음
YouTubeAccountSelectionRequiredError: 여러 계정이 연결되어 있으나 계정 미선택
YouTubeAccountNotFoundError: 지정한 계정을 찾을 수 없음
YouTubeTokenExpiredError: YouTube 토큰 만료 (재연동 필요)
YouTubeAPIError: YouTube Analytics API 호출 실패
"""
logger.info(
f"[DASHBOARD] 통계 조회 시작 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
)
# 1. 모드별 날짜 자동 계산
today = date.today()
if mode == "day":
# 48시간 지연 적용: 오늘 기준 -2일을 end로 사용
# ex) 오늘 2/20 → end=2/18, start=1/20
end_dt = today - timedelta(days=2)
kpi_end_dt = end_dt
start_dt = end_dt - timedelta(days=29)
# 이전 30일 (YouTube API day_previous와 동일 기준)
prev_start_dt = start_dt - timedelta(days=30)
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
period_desc = "최근 30일"
else: # mode == "month"
# 월별 차트: dimensions=month API는 YYYY-MM-01 형식 필요
# ex) 오늘 2/24 → end=2026-02-01, start=2025-03-01 → 2025-03 ~ 2026-02 (12개월)
end_dt = today.replace(day=1)
# KPI 등 집계형 API: 48시간 지연 적용하여 현재 월 전체 데이터 포함
kpi_end_dt = today - timedelta(days=2)
start_month = end_dt.month - 11
if start_month <= 0:
start_month += 12
start_year = end_dt.year - 1
else:
start_year = end_dt.year
start_dt = date(start_year, start_month, 1)
# 이전 12개월 (YouTube API previous와 동일 기준 — 1년 전)
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
try:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
except ValueError: # 윤년 2/29 → 이전 연도 2/28
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
period_desc = "최근 12개월"
start_date = start_dt.strftime("%Y-%m-%d")
end_date = end_dt.strftime("%Y-%m-%d")
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
logger.debug(
f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}"
)
# 2. YouTube 계정 연동 확인
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
social_accounts_raw = result.scalars().all()
# platform_user_id 기준으로 중복 제거 (가장 최근 연동 계정 우선)
seen_platform_ids_stats: set[str] = set()
social_accounts = []
for acc in sorted(
social_accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id not in seen_platform_ids_stats:
seen_platform_ids_stats.add(acc.platform_user_id)
social_accounts.append(acc)
if not social_accounts:
logger.warning(
f"[NO YOUTUBE ACCOUNT] YouTube 계정 미연동 - "
f"user_uuid={current_user.user_uuid}"
)
raise YouTubeAccountNotConnectedError()
if platform_user_id is not None:
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
if not matched:
logger.warning(
f"[ACCOUNT NOT FOUND] 지정 계정 없음 - "
f"user_uuid={current_user.user_uuid}, platform_user_id={platform_user_id}"
)
raise YouTubeAccountNotFoundError()
social_account = matched[0]
elif len(social_accounts) == 1:
social_account = social_accounts[0]
else:
logger.warning(
f"[MULTI ACCOUNT] 계정 선택 필요 - "
f"user_uuid={current_user.user_uuid}, count={len(social_accounts)}"
)
raise YouTubeAccountSelectionRequiredError()
logger.debug(
f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}"
)
# 3. 기간 내 업로드 영상 수 조회
count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= start_dt,
Dashboard.uploaded_at <= end_dt,
)
)
period_video_count = count_result.scalar() or 0
# 이전 기간 업로드 영상 수 조회 (trend 계산용)
prev_count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= prev_start_dt,
Dashboard.uploaded_at <= prev_kpi_end_dt,
)
)
prev_period_video_count = prev_count_result.scalar() or 0
logger.debug(
f"[3] 기간 내 업로드 영상 수 - current={period_video_count}, prev={prev_period_video_count}"
)
# 4. Redis 캐시 조회
# platform_user_id 기준 캐시 키: 재연동해도 채널 ID는 불변 → 캐시 유지됨
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
cached_raw = await get_cache(cache_key)
if cached_raw:
try:
payload = json.loads(cached_raw)
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
response = DashboardResponse.model_validate(payload["response"])
for metric in response.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
return response
except (json.JSONDecodeError, KeyError):
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
# 5. 최근 30개 업로드 영상 조회 (Analytics API 전달용)
# YouTube Analytics API 제약사항:
# - 영상 개수: 20~30개 권장 (최대 50개, 그 이상은 응답 지연 발생)
# - URL 길이: 2000자 제한 (video ID 11자 × 30개 = 330자로 안전)
result = await session.execute(
select(
Dashboard.platform_video_id,
Dashboard.title,
Dashboard.uploaded_at,
)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
)
.order_by(Dashboard.uploaded_at.desc())
.limit(30)
)
rows = result.all()
logger.debug(f"[5] 영상 조회 완료 - count={len(rows)}")
# 6. video_ids + 메타데이터 조회용 dict 구성
video_ids = []
video_lookup: dict[str, tuple[str, datetime]] = {} # {video_id: (title, uploaded_at)}
for row in rows:
platform_video_id, title, uploaded_at = row
video_ids.append(platform_video_id)
video_lookup[platform_video_id] = (title, uploaded_at)
logger.debug(
f"[6] 영상 메타데이터 구성 완료 - count={len(video_ids)}, sample={video_ids[:3]}"
)
# 6.1 업로드 영상 없음 → YouTube API 호출 없이 빈 응답 반환
if not video_ids:
logger.info(
f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - "
f"user_uuid={current_user.user_uuid}"
)
return DashboardResponse(
content_metrics=[
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
],
monthly_data=[],
daily_data=[],
top_content=[],
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
has_uploads=False,
)
# 7. 토큰 유효성 확인 및 자동 갱신 (만료 10분 전 갱신)
try:
access_token = await SocialAccountService().ensure_valid_token(
social_account, session
)
except TokenExpiredError:
logger.warning(
f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}"
)
raise YouTubeTokenExpiredError()
logger.debug("[7] 토큰 유효성 확인 완료")
# 8. YouTube Analytics API 호출 (7개 병렬)
youtube_service = YouTubeAnalyticsService()
raw_data = await youtube_service.fetch_all_metrics(
video_ids=video_ids,
start_date=start_date,
end_date=end_date,
kpi_end_date=kpi_end_date,
access_token=access_token,
mode=mode,
)
logger.debug("[8] YouTube Analytics API 호출 완료")
# 9. TopContent 조립 (Analytics top_videos + DB lookup)
processor = DataProcessor()
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
top_content: list[TopContent] = []
for row in top_content_rows[:4]:
if len(row) < 4:
continue
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
meta = video_lookup.get(video_id)
if not meta:
continue
title, uploaded_at = meta
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
top_content.append(
TopContent(
id=video_id,
title=title,
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
platform="youtube",
views=int(views),
engagement=f"{engagement_rate:.1f}%",
date=uploaded_at.strftime("%Y.%m.%d"),
)
)
logger.debug(f"[9] TopContent 조립 완료 - count={len(top_content)}")
# 10. 데이터 가공 (period_video_count=0 — API 무관 DB 집계값, 캐시에 포함하지 않음)
dashboard_data = processor.process(
raw_data, top_content, 0, mode=mode, end_date=end_date
)
logger.debug("[10] 데이터 가공 완료")
# 11. Redis 캐싱 (TTL: 12시간)
# YouTube Analytics는 하루 1회 갱신 (PT 자정, 한국 시간 오후 5~8시)
# 48시간 지연된 데이터이므로 12시간 캐싱으로 API 호출 최소화
# period_video_count는 캐시에 포함하지 않음 (DB 직접 집계, API 미사용)
cache_payload = json.dumps(
{"response": json.loads(dashboard_data.model_dump_json())}
)
cache_success = await set_cache(
cache_key,
cache_payload,
ttl=43200, # 12시간
)
if cache_success:
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
else:
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
# 12. 업로드 영상 수 및 trend 주입 (캐시 저장 후 — 항상 DB에서 직접 집계)
for metric in dashboard_data.content_metrics:
if metric.id == "uploaded-videos":
metric.value = str(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
logger.info(
f"[DASHBOARD] 통계 조회 완료 - "
f"user_uuid={current_user.user_uuid}, "
f"mode={mode}, period={period_desc}, videos={len(video_ids)}"
)
return dashboard_data
@router.delete(
"/cache",
response_model=CacheDeleteResponse,
summary="대시보드 캐시 삭제",
description="""
대시보드 Redis 캐시를 삭제합니다. 인증 없이 호출 가능합니다.
삭제 후 다음 `/stats` 요청 시 YouTube Analytics API를 새로 호출하여 최신 데이터를 반환합니다.
## 사용 시나리오
- 코드 배포 후 즉시 최신 데이터 반영이 필요할 때
- 데이터 이상 발생 시 캐시 강제 갱신
## 캐시 키 구조
`dashboard:{user_uuid}:{platform_user_id}:{mode}` (mode: day 또는 month)
## 파라미터
- `user_uuid`: 특정 사용자 캐시만 삭제. 미입력 시 전체 삭제
- `mode`: day / month / all (기본값: all)
""",
)
async def delete_dashboard_cache(
mode: Literal["day", "month", "all"] = Query(
default="all",
description="삭제할 캐시 모드: day, month, all(기본값, 모두 삭제)",
),
user_uuid: str | None = Query(
default=None,
description="대상 사용자 UUID. 미입력 시 전체 사용자 캐시 삭제",
),
) -> CacheDeleteResponse:
"""
대시보드 캐시 삭제
Args:
mode: 삭제할 캐시 모드 (day / month / all)
user_uuid: 대상 사용자 UUID (없으면 전체 삭제)
Returns:
CacheDeleteResponse: 삭제된 캐시 키 개수 및 메시지
"""
if user_uuid:
if mode == "all":
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*")
message = f"전체 캐시 삭제 완료 ({deleted}개)"
else:
cache_key = f"dashboard:{user_uuid}:{mode}"
success = await delete_cache(cache_key)
deleted = 1 if success else 0
message = f"{mode} 캐시 삭제 {'완료' if success else '실패 (키 없음)'}"
else:
deleted = await delete_cache_pattern("dashboard:*")
message = f"전체 사용자 캐시 삭제 완료 ({deleted}개)"
logger.info(
f"[CACHE DELETE] user_uuid={user_uuid or 'ALL'}, mode={mode}, deleted={deleted}"
)
return CacheDeleteResponse(deleted_count=deleted, message=message)