528 lines
20 KiB
Python
528 lines
20 KiB
Python
"""
|
||
Dashboard API 라우터
|
||
|
||
YouTube Analytics 기반 대시보드 통계를 제공합니다.
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
from datetime import date, datetime, timedelta
|
||
from typing import Literal
|
||
|
||
from fastapi import APIRouter, Depends, Query
|
||
from sqlalchemy import func, select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.dashboard.exceptions import (
|
||
YouTubeAccountNotConnectedError,
|
||
YouTubeAccountNotFoundError,
|
||
YouTubeAccountSelectionRequiredError,
|
||
YouTubeTokenExpiredError,
|
||
)
|
||
from app.dashboard.schemas import (
|
||
AudienceData,
|
||
CacheDeleteResponse,
|
||
ConnectedAccount,
|
||
ConnectedAccountsResponse,
|
||
ContentMetric,
|
||
DashboardResponse,
|
||
TopContent,
|
||
)
|
||
from app.dashboard.services import DataProcessor, YouTubeAnalyticsService
|
||
from app.dashboard.redis_cache import (
|
||
delete_cache,
|
||
delete_cache_pattern,
|
||
get_cache,
|
||
set_cache,
|
||
)
|
||
from app.database.session import get_session
|
||
from app.dashboard.models import Dashboard
|
||
from app.social.exceptions import TokenExpiredError
|
||
from app.social.services import SocialAccountService
|
||
from app.user.dependencies.auth import get_current_user
|
||
from app.user.models import SocialAccount, User
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter(prefix="/dashboard", tags=["Dashboard"])
|
||
|
||
|
||
@router.get(
|
||
"/accounts",
|
||
response_model=ConnectedAccountsResponse,
|
||
summary="연결된 소셜 계정 목록 조회",
|
||
description="""
|
||
연결된 소셜 계정 목록을 반환합니다.
|
||
|
||
여러 계정이 연결된 경우, 반환된 `platformUserId` 값을 `/dashboard/stats?platform_user_id=<값>`에 전달하여 계정을 선택합니다.
|
||
""",
|
||
)
|
||
async def get_connected_accounts(
|
||
current_user: User = Depends(get_current_user),
|
||
session: AsyncSession = Depends(get_session),
|
||
) -> ConnectedAccountsResponse:
|
||
result = await session.execute(
|
||
select(SocialAccount).where(
|
||
SocialAccount.user_uuid == current_user.user_uuid,
|
||
SocialAccount.platform == "youtube",
|
||
SocialAccount.is_active == True, # noqa: E712
|
||
)
|
||
)
|
||
accounts_raw = result.scalars().all()
|
||
|
||
# platform_user_id 기준
|
||
seen_platform_ids: set[str] = set()
|
||
connected = []
|
||
for acc in sorted(
|
||
accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
|
||
):
|
||
if acc.platform_user_id in seen_platform_ids:
|
||
continue
|
||
seen_platform_ids.add(acc.platform_user_id)
|
||
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
|
||
connected.append(
|
||
ConnectedAccount(
|
||
id=acc.id,
|
||
platform=acc.platform,
|
||
platform_username=acc.platform_username,
|
||
platform_user_id=acc.platform_user_id,
|
||
channel_title=data.get("channel_title"),
|
||
connected_at=acc.connected_at,
|
||
is_active=acc.is_active,
|
||
)
|
||
)
|
||
|
||
logger.info(
|
||
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
|
||
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
|
||
)
|
||
return ConnectedAccountsResponse(accounts=connected)
|
||
|
||
|
||
@router.get(
|
||
"/stats",
|
||
response_model=DashboardResponse,
|
||
summary="대시보드 통계 조회",
|
||
description="""
|
||
YouTube Analytics API를 활용한 대시보드 통계를 조회합니다.
|
||
|
||
## 주요 기능
|
||
- 최근 30개 업로드 영상 기준 통계 제공
|
||
- KPI 지표: 조회수, 시청시간, 평균 시청시간, 신규 구독자, 좋아요, 댓글, 공유, 업로드 영상
|
||
- 월별 추이: 최근 12개월 vs 이전 12개월 비교
|
||
- 인기 영상 TOP 4
|
||
- 시청자 분석: 연령/성별/지역 분포
|
||
|
||
## 성능 최적화
|
||
- 7개 YouTube Analytics API를 병렬로 호출
|
||
- Redis 캐싱 적용 (TTL: 12시간)
|
||
|
||
## 사전 조건
|
||
- YouTube 계정이 연동되어 있어야 합니다
|
||
|
||
## 조회 모드
|
||
- `day`: 최근 30일 통계 (현재 날짜 -2일 기준)
|
||
- `month`: 최근 12개월 통계 (현재 날짜 -2일 기준, 기본값)
|
||
|
||
## 데이터 특성
|
||
- **지연 시간**: 48시간 (2일) - 2월 14일 요청 시 2월 12일 자까지 확정
|
||
- **업데이트 주기**: 하루 1회 (PT 자정, 한국 시간 오후 5~8시)
|
||
- **실시간 아님**: 전날 데이터가 다음날 확정됩니다
|
||
""",
|
||
)
|
||
async def get_dashboard_stats(
|
||
mode: Literal["day", "month"] = Query(
|
||
default="month",
|
||
description="조회 모드: day(최근 30일), month(최근 12개월)",
|
||
),
|
||
platform_user_id: str | None = Query(
|
||
default=None,
|
||
description="사용할 YouTube 채널 ID (platform_user_id)",
|
||
),
|
||
current_user: User = Depends(get_current_user),
|
||
session: AsyncSession = Depends(get_session),
|
||
) -> DashboardResponse:
|
||
"""
|
||
대시보드 통계 조회
|
||
|
||
Args:
|
||
mode: 조회 모드 (day: 최근 30일, month: 최근 12개월)
|
||
platform_user_id: 사용할 YouTube 채널 ID (여러 계정 연결 시 필수, 재연동해도 불변)
|
||
current_user: 현재 인증된 사용자
|
||
session: 데이터베이스 세션
|
||
|
||
Returns:
|
||
DashboardResponse: 대시보드 통계 데이터
|
||
|
||
Raises:
|
||
YouTubeAccountNotConnectedError: YouTube 계정이 연동되어 있지 않음
|
||
YouTubeAccountSelectionRequiredError: 여러 계정이 연결되어 있으나 계정 미선택
|
||
YouTubeAccountNotFoundError: 지정한 계정을 찾을 수 없음
|
||
YouTubeTokenExpiredError: YouTube 토큰 만료 (재연동 필요)
|
||
YouTubeAPIError: YouTube Analytics API 호출 실패
|
||
"""
|
||
logger.info(
|
||
f"[DASHBOARD] 통계 조회 시작 - "
|
||
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
|
||
)
|
||
|
||
# 1. 모드별 날짜 자동 계산
|
||
today = date.today()
|
||
|
||
if mode == "day":
|
||
# 48시간 지연 적용: 오늘 기준 -2일을 end로 사용
|
||
# ex) 오늘 2/20 → end=2/18, start=1/20
|
||
end_dt = today - timedelta(days=2)
|
||
kpi_end_dt = end_dt
|
||
start_dt = end_dt - timedelta(days=29)
|
||
# 이전 30일 (YouTube API day_previous와 동일 기준)
|
||
prev_start_dt = start_dt - timedelta(days=30)
|
||
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
|
||
period_desc = "최근 30일"
|
||
else: # mode == "month"
|
||
# 월별 차트: dimensions=month API는 YYYY-MM-01 형식 필요
|
||
# ex) 오늘 2/24 → end=2026-02-01, start=2025-03-01 → 2025-03 ~ 2026-02 (12개월)
|
||
end_dt = today.replace(day=1)
|
||
# KPI 등 집계형 API: 48시간 지연 적용하여 현재 월 전체 데이터 포함
|
||
kpi_end_dt = today - timedelta(days=2)
|
||
|
||
start_month = end_dt.month - 11
|
||
if start_month <= 0:
|
||
start_month += 12
|
||
start_year = end_dt.year - 1
|
||
else:
|
||
start_year = end_dt.year
|
||
start_dt = date(start_year, start_month, 1)
|
||
# 이전 12개월 (YouTube API previous와 동일 기준 — 1년 전)
|
||
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
|
||
try:
|
||
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
|
||
except ValueError: # 윤년 2/29 → 이전 연도 2/28
|
||
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
|
||
period_desc = "최근 12개월"
|
||
|
||
start_date = start_dt.strftime("%Y-%m-%d")
|
||
end_date = end_dt.strftime("%Y-%m-%d")
|
||
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
|
||
|
||
logger.debug(
|
||
f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}"
|
||
)
|
||
|
||
# 2. YouTube 계정 연동 확인
|
||
result = await session.execute(
|
||
select(SocialAccount).where(
|
||
SocialAccount.user_uuid == current_user.user_uuid,
|
||
SocialAccount.platform == "youtube",
|
||
SocialAccount.is_active == True, # noqa: E712
|
||
)
|
||
)
|
||
social_accounts_raw = result.scalars().all()
|
||
|
||
# platform_user_id 기준으로 중복 제거 (가장 최근 연동 계정 우선)
|
||
seen_platform_ids_stats: set[str] = set()
|
||
social_accounts = []
|
||
for acc in sorted(
|
||
social_accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
|
||
):
|
||
if acc.platform_user_id not in seen_platform_ids_stats:
|
||
seen_platform_ids_stats.add(acc.platform_user_id)
|
||
social_accounts.append(acc)
|
||
|
||
if not social_accounts:
|
||
logger.warning(
|
||
f"[NO YOUTUBE ACCOUNT] YouTube 계정 미연동 - "
|
||
f"user_uuid={current_user.user_uuid}"
|
||
)
|
||
raise YouTubeAccountNotConnectedError()
|
||
|
||
if platform_user_id is not None:
|
||
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
|
||
if not matched:
|
||
logger.warning(
|
||
f"[ACCOUNT NOT FOUND] 지정 계정 없음 - "
|
||
f"user_uuid={current_user.user_uuid}, platform_user_id={platform_user_id}"
|
||
)
|
||
raise YouTubeAccountNotFoundError()
|
||
social_account = matched[0]
|
||
elif len(social_accounts) == 1:
|
||
social_account = social_accounts[0]
|
||
else:
|
||
logger.warning(
|
||
f"[MULTI ACCOUNT] 계정 선택 필요 - "
|
||
f"user_uuid={current_user.user_uuid}, count={len(social_accounts)}"
|
||
)
|
||
raise YouTubeAccountSelectionRequiredError()
|
||
|
||
logger.debug(
|
||
f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}"
|
||
)
|
||
|
||
# 3. 기간 내 업로드 영상 수 조회
|
||
count_result = await session.execute(
|
||
select(func.count())
|
||
.select_from(Dashboard)
|
||
.where(
|
||
Dashboard.user_uuid == current_user.user_uuid,
|
||
Dashboard.platform == "youtube",
|
||
Dashboard.platform_user_id == social_account.platform_user_id,
|
||
Dashboard.uploaded_at >= start_dt,
|
||
Dashboard.uploaded_at <= kpi_end_dt,
|
||
)
|
||
)
|
||
period_video_count = count_result.scalar() or 0
|
||
|
||
# 이전 기간 업로드 영상 수 조회 (trend 계산용)
|
||
prev_count_result = await session.execute(
|
||
select(func.count())
|
||
.select_from(Dashboard)
|
||
.where(
|
||
Dashboard.user_uuid == current_user.user_uuid,
|
||
Dashboard.platform == "youtube",
|
||
Dashboard.platform_user_id == social_account.platform_user_id,
|
||
Dashboard.uploaded_at >= prev_start_dt,
|
||
Dashboard.uploaded_at <= prev_kpi_end_dt,
|
||
)
|
||
)
|
||
prev_period_video_count = prev_count_result.scalar() or 0
|
||
logger.debug(
|
||
f"[3] 기간 내 업로드 영상 수 - current={period_video_count}, prev={prev_period_video_count}"
|
||
)
|
||
|
||
# 4. Redis 캐시 조회
|
||
# platform_user_id 기준 캐시 키: 재연동해도 채널 ID는 불변 → 캐시 유지됨
|
||
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
|
||
cached_raw = await get_cache(cache_key)
|
||
|
||
if cached_raw:
|
||
try:
|
||
payload = json.loads(cached_raw)
|
||
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
|
||
response = DashboardResponse.model_validate(payload["response"])
|
||
for metric in response.content_metrics:
|
||
if metric.id == "uploaded-videos":
|
||
metric.value = float(period_video_count)
|
||
video_trend = float(period_video_count - prev_period_video_count)
|
||
metric.trend = video_trend
|
||
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
|
||
break
|
||
return response
|
||
except (json.JSONDecodeError, KeyError):
|
||
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
|
||
|
||
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
|
||
|
||
# 5. 최근 30개 업로드 영상 조회 (Analytics API 전달용)
|
||
# YouTube Analytics API 제약사항:
|
||
# - 영상 개수: 20~30개 권장 (최대 50개, 그 이상은 응답 지연 발생)
|
||
# - URL 길이: 2000자 제한 (video ID 11자 × 30개 = 330자로 안전)
|
||
result = await session.execute(
|
||
select(
|
||
Dashboard.platform_video_id,
|
||
Dashboard.title,
|
||
Dashboard.uploaded_at,
|
||
)
|
||
.where(
|
||
Dashboard.user_uuid == current_user.user_uuid,
|
||
Dashboard.platform == "youtube",
|
||
Dashboard.platform_user_id == social_account.platform_user_id,
|
||
)
|
||
.order_by(Dashboard.uploaded_at.desc())
|
||
.limit(30)
|
||
)
|
||
rows = result.all()
|
||
logger.debug(f"[5] 영상 조회 완료 - count={len(rows)}")
|
||
|
||
# 6. video_ids + 메타데이터 조회용 dict 구성
|
||
video_ids = []
|
||
video_lookup: dict[str, tuple[str, datetime]] = {} # {video_id: (title, uploaded_at)}
|
||
|
||
for row in rows:
|
||
platform_video_id, title, uploaded_at = row
|
||
video_ids.append(platform_video_id)
|
||
video_lookup[platform_video_id] = (title, uploaded_at)
|
||
|
||
logger.debug(
|
||
f"[6] 영상 메타데이터 구성 완료 - count={len(video_ids)}, sample={video_ids[:3]}"
|
||
)
|
||
|
||
# 6.1 업로드 영상 없음 → YouTube API 호출 없이 빈 응답 반환
|
||
if not video_ids:
|
||
logger.info(
|
||
f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - "
|
||
f"user_uuid={current_user.user_uuid}"
|
||
)
|
||
return DashboardResponse(
|
||
content_metrics=[
|
||
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
|
||
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
|
||
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
|
||
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
|
||
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
|
||
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
|
||
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
|
||
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
|
||
],
|
||
monthly_data=[],
|
||
daily_data=[],
|
||
top_content=[],
|
||
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
|
||
has_uploads=False,
|
||
)
|
||
|
||
# 7. 토큰 유효성 확인 및 자동 갱신 (만료 10분 전 갱신)
|
||
try:
|
||
access_token = await SocialAccountService().ensure_valid_token(
|
||
social_account, session
|
||
)
|
||
except TokenExpiredError:
|
||
logger.warning(
|
||
f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}"
|
||
)
|
||
raise YouTubeTokenExpiredError()
|
||
|
||
logger.debug("[7] 토큰 유효성 확인 완료")
|
||
|
||
# 8. YouTube Analytics API 호출 (7개 병렬)
|
||
youtube_service = YouTubeAnalyticsService()
|
||
raw_data = await youtube_service.fetch_all_metrics(
|
||
video_ids=video_ids,
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
kpi_end_date=kpi_end_date,
|
||
access_token=access_token,
|
||
mode=mode,
|
||
)
|
||
|
||
logger.debug("[8] YouTube Analytics API 호출 완료")
|
||
|
||
# 9. TopContent 조립 (Analytics top_videos + DB lookup)
|
||
processor = DataProcessor()
|
||
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
|
||
top_content: list[TopContent] = []
|
||
for row in top_content_rows[:4]:
|
||
if len(row) < 4:
|
||
continue
|
||
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
|
||
meta = video_lookup.get(video_id)
|
||
if not meta:
|
||
continue
|
||
title, uploaded_at = meta
|
||
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
|
||
top_content.append(
|
||
TopContent(
|
||
id=video_id,
|
||
title=title,
|
||
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
|
||
platform="youtube",
|
||
views=int(views),
|
||
engagement=f"{engagement_rate:.1f}%",
|
||
date=uploaded_at.strftime("%Y.%m.%d"),
|
||
)
|
||
)
|
||
|
||
logger.debug(f"[9] TopContent 조립 완료 - count={len(top_content)}")
|
||
|
||
# 10. 데이터 가공 (period_video_count=0 — API 무관 DB 집계값, 캐시에 포함하지 않음)
|
||
dashboard_data = processor.process(
|
||
raw_data, top_content, 0, mode=mode, end_date=end_date
|
||
)
|
||
|
||
logger.debug("[10] 데이터 가공 완료")
|
||
|
||
# 11. Redis 캐싱 (TTL: 12시간)
|
||
# YouTube Analytics는 하루 1회 갱신 (PT 자정, 한국 시간 오후 5~8시)
|
||
# 48시간 지연된 데이터이므로 12시간 캐싱으로 API 호출 최소화
|
||
# period_video_count는 캐시에 포함하지 않음 (DB 직접 집계, API 미사용)
|
||
cache_payload = json.dumps(
|
||
{"response": json.loads(dashboard_data.model_dump_json())}
|
||
)
|
||
cache_success = await set_cache(
|
||
cache_key,
|
||
cache_payload,
|
||
ttl=43200, # 12시간
|
||
)
|
||
|
||
if cache_success:
|
||
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
|
||
else:
|
||
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
|
||
|
||
# 12. 업로드 영상 수 및 trend 주입 (캐시 저장 후 — 항상 DB에서 직접 집계)
|
||
for metric in dashboard_data.content_metrics:
|
||
if metric.id == "uploaded-videos":
|
||
metric.value = str(period_video_count)
|
||
video_trend = float(period_video_count - prev_period_video_count)
|
||
metric.trend = video_trend
|
||
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
|
||
break
|
||
|
||
logger.info(
|
||
f"[DASHBOARD] 통계 조회 완료 - "
|
||
f"user_uuid={current_user.user_uuid}, "
|
||
f"mode={mode}, period={period_desc}, videos={len(video_ids)}"
|
||
)
|
||
|
||
return dashboard_data
|
||
|
||
|
||
@router.delete(
|
||
"/cache",
|
||
response_model=CacheDeleteResponse,
|
||
summary="대시보드 캐시 삭제",
|
||
description="""
|
||
대시보드 Redis 캐시를 삭제합니다. 인증 없이 호출 가능합니다.
|
||
|
||
삭제 후 다음 `/stats` 요청 시 YouTube Analytics API를 새로 호출하여 최신 데이터를 반환합니다.
|
||
|
||
## 사용 시나리오
|
||
- 코드 배포 후 즉시 최신 데이터 반영이 필요할 때
|
||
- 데이터 이상 발생 시 캐시 강제 갱신
|
||
|
||
## 캐시 키 구조
|
||
`dashboard:{user_uuid}:{platform_user_id}:{mode}` (mode: day 또는 month)
|
||
|
||
## 파라미터
|
||
- `user_uuid`: 특정 사용자 캐시만 삭제. 미입력 시 전체 삭제
|
||
- `mode`: day / month / all (기본값: all)
|
||
""",
|
||
)
|
||
async def delete_dashboard_cache(
|
||
mode: Literal["day", "month", "all"] = Query(
|
||
default="all",
|
||
description="삭제할 캐시 모드: day, month, all(기본값, 모두 삭제)",
|
||
),
|
||
user_uuid: str | None = Query(
|
||
default=None,
|
||
description="대상 사용자 UUID. 미입력 시 전체 사용자 캐시 삭제",
|
||
),
|
||
) -> CacheDeleteResponse:
|
||
"""
|
||
대시보드 캐시 삭제
|
||
|
||
Args:
|
||
mode: 삭제할 캐시 모드 (day / month / all)
|
||
user_uuid: 대상 사용자 UUID (없으면 전체 삭제)
|
||
|
||
Returns:
|
||
CacheDeleteResponse: 삭제된 캐시 키 개수 및 메시지
|
||
"""
|
||
if user_uuid:
|
||
if mode == "all":
|
||
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*")
|
||
message = f"전체 캐시 삭제 완료 ({deleted}개)"
|
||
else:
|
||
cache_key = f"dashboard:{user_uuid}:{mode}"
|
||
success = await delete_cache(cache_key)
|
||
deleted = 1 if success else 0
|
||
message = f"{mode} 캐시 삭제 {'완료' if success else '실패 (키 없음)'}"
|
||
else:
|
||
deleted = await delete_cache_pattern("dashboard:*")
|
||
message = f"전체 사용자 캐시 삭제 완료 ({deleted}개)"
|
||
|
||
logger.info(
|
||
f"[CACHE DELETE] user_uuid={user_uuid or 'ALL'}, mode={mode}, deleted={deleted}"
|
||
)
|
||
|
||
return CacheDeleteResponse(deleted_count=deleted, message=message)
|