Merge branch 'feature-dashboard'

subtitle
김성경 2026-03-06 10:12:26 +09:00
commit d0334a5575
24 changed files with 2928 additions and 1 deletions

2
.gitignore vendored
View File

@ -50,3 +50,5 @@ logs/
*.yml
Dockerfile
.dockerignore
zzz/

View File

@ -24,6 +24,11 @@ async def lifespan(app: FastAPI):
await create_db_tables()
logger.info("Database tables created (DEBUG mode)")
# dashboard 테이블 초기화 및 기존 데이터 마이그레이션 (모든 환경)
from app.dashboard.migration import init_dashboard_table
await init_dashboard_table()
await NvMapPwScraper.initiate_scraper()
except asyncio.TimeoutError:
logger.error("Database initialization timed out")

View File

@ -309,6 +309,20 @@ def add_exception_handlers(app: FastAPI):
content=content,
)
# DashboardException 핸들러 추가
from app.dashboard.exceptions import DashboardException
@app.exception_handler(DashboardException)
def dashboard_exception_handler(request: Request, exc: DashboardException) -> Response:
logger.debug(f"Handled DashboardException: {exc.__class__.__name__} - {exc.message}")
return JSONResponse(
status_code=exc.status_code,
content={
"detail": exc.message,
"code": exc.code,
},
)
@app.exception_handler(status.HTTP_500_INTERNAL_SERVER_ERROR)
def internal_server_error_handler(request, exception):
# 에러 메시지 로깅 (한글 포함 가능)

View File

@ -0,0 +1,5 @@
"""
Dashboard Module
YouTube Analytics API를 활용한 대시보드 기능을 제공합니다.
"""

View File

@ -0,0 +1,3 @@
"""
Dashboard API Module
"""

View File

@ -0,0 +1,3 @@
"""
Dashboard Routers
"""

View File

@ -0,0 +1,7 @@
"""
Dashboard V1 Routers
"""
from app.dashboard.api.routers.v1.dashboard import router
__all__ = ["router"]

View File

@ -0,0 +1,527 @@
"""
Dashboard API 라우터
YouTube Analytics 기반 대시보드 통계를 제공합니다.
"""
import json
import logging
from datetime import date, datetime, timedelta
from typing import Literal
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dashboard.exceptions import (
YouTubeAccountNotConnectedError,
YouTubeAccountNotFoundError,
YouTubeAccountSelectionRequiredError,
YouTubeTokenExpiredError,
)
from app.dashboard.schemas import (
AudienceData,
CacheDeleteResponse,
ConnectedAccount,
ConnectedAccountsResponse,
ContentMetric,
DashboardResponse,
TopContent,
)
from app.dashboard.services import DataProcessor, YouTubeAnalyticsService
from app.dashboard.redis_cache import (
delete_cache,
delete_cache_pattern,
get_cache,
set_cache,
)
from app.database.session import get_session
from app.dashboard.models import Dashboard
from app.social.exceptions import TokenExpiredError
from app.social.services import SocialAccountService
from app.user.dependencies.auth import get_current_user
from app.user.models import SocialAccount, User
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/dashboard", tags=["Dashboard"])
@router.get(
"/accounts",
response_model=ConnectedAccountsResponse,
summary="연결된 소셜 계정 목록 조회",
description="""
연결된 소셜 계정 목록을 반환합니다.
여러 계정이 연결된 경우, 반환된 `platformUserId` 값을 `/dashboard/stats?platform_user_id=<>` 전달하여 계정을 선택합니다.
""",
)
async def get_connected_accounts(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> ConnectedAccountsResponse:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
accounts_raw = result.scalars().all()
# platform_user_id 기준
seen_platform_ids: set[str] = set()
connected = []
for acc in sorted(
accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id in seen_platform_ids:
continue
seen_platform_ids.add(acc.platform_user_id)
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
connected.append(
ConnectedAccount(
id=acc.id,
platform=acc.platform,
platform_username=acc.platform_username,
platform_user_id=acc.platform_user_id,
channel_title=data.get("channel_title"),
connected_at=acc.connected_at,
is_active=acc.is_active,
)
)
logger.info(
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
)
return ConnectedAccountsResponse(accounts=connected)
@router.get(
"/stats",
response_model=DashboardResponse,
summary="대시보드 통계 조회",
description="""
YouTube Analytics API를 활용한 대시보드 통계를 조회합니다.
## 주요 기능
- 최근 30 업로드 영상 기준 통계 제공
- KPI 지표: 조회수, 시청시간, 평균 시청시간, 신규 구독자, 좋아요, 댓글, 공유, 업로드 영상
- 월별 추이: 최근 12개월 vs 이전 12개월 비교
- 인기 영상 TOP 4
- 시청자 분석: 연령/성별/지역 분포
## 성능 최적화
- 7 YouTube Analytics API를 병렬로 호출
- Redis 캐싱 적용 (TTL: 12시간)
## 사전 조건
- YouTube 계정이 연동되어 있어야 합니다
## 조회 모드
- `day`: 최근 30 통계 (현재 날짜 -2 기준)
- `month`: 최근 12개월 통계 (현재 날짜 -2 기준, 기본값)
## 데이터 특성
- **지연 시간**: 48시간 (2) - 2 14 요청 2 12 자까지 확정
- **업데이트 주기**: 하루 1 (PT 자정, 한국 시간 오후 5~8)
- **실시간 아님**: 전날 데이터가 다음날 확정됩니다
""",
)
async def get_dashboard_stats(
mode: Literal["day", "month"] = Query(
default="month",
description="조회 모드: day(최근 30일), month(최근 12개월)",
),
platform_user_id: str | None = Query(
default=None,
description="사용할 YouTube 채널 ID (platform_user_id)",
),
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> DashboardResponse:
"""
대시보드 통계 조회
Args:
mode: 조회 모드 (day: 최근 30, month: 최근 12개월)
platform_user_id: 사용할 YouTube 채널 ID (여러 계정 연결 필수, 재연동해도 불변)
current_user: 현재 인증된 사용자
session: 데이터베이스 세션
Returns:
DashboardResponse: 대시보드 통계 데이터
Raises:
YouTubeAccountNotConnectedError: YouTube 계정이 연동되어 있지 않음
YouTubeAccountSelectionRequiredError: 여러 계정이 연결되어 있으나 계정 미선택
YouTubeAccountNotFoundError: 지정한 계정을 찾을 없음
YouTubeTokenExpiredError: YouTube 토큰 만료 (재연동 필요)
YouTubeAPIError: YouTube Analytics API 호출 실패
"""
logger.info(
f"[DASHBOARD] 통계 조회 시작 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
)
# 1. 모드별 날짜 자동 계산
today = date.today()
if mode == "day":
# 48시간 지연 적용: 오늘 기준 -2일을 end로 사용
# ex) 오늘 2/20 → end=2/18, start=1/20
end_dt = today - timedelta(days=2)
kpi_end_dt = end_dt
start_dt = end_dt - timedelta(days=29)
# 이전 30일 (YouTube API day_previous와 동일 기준)
prev_start_dt = start_dt - timedelta(days=30)
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
period_desc = "최근 30일"
else: # mode == "month"
# 월별 차트: dimensions=month API는 YYYY-MM-01 형식 필요
# ex) 오늘 2/24 → end=2026-02-01, start=2025-03-01 → 2025-03 ~ 2026-02 (12개월)
end_dt = today.replace(day=1)
# KPI 등 집계형 API: 48시간 지연 적용하여 현재 월 전체 데이터 포함
kpi_end_dt = today - timedelta(days=2)
start_month = end_dt.month - 11
if start_month <= 0:
start_month += 12
start_year = end_dt.year - 1
else:
start_year = end_dt.year
start_dt = date(start_year, start_month, 1)
# 이전 12개월 (YouTube API previous와 동일 기준 — 1년 전)
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
try:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
except ValueError: # 윤년 2/29 → 이전 연도 2/28
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
period_desc = "최근 12개월"
start_date = start_dt.strftime("%Y-%m-%d")
end_date = end_dt.strftime("%Y-%m-%d")
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
logger.debug(
f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}"
)
# 2. YouTube 계정 연동 확인
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
social_accounts_raw = result.scalars().all()
# platform_user_id 기준으로 중복 제거 (가장 최근 연동 계정 우선)
seen_platform_ids_stats: set[str] = set()
social_accounts = []
for acc in sorted(
social_accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id not in seen_platform_ids_stats:
seen_platform_ids_stats.add(acc.platform_user_id)
social_accounts.append(acc)
if not social_accounts:
logger.warning(
f"[NO YOUTUBE ACCOUNT] YouTube 계정 미연동 - "
f"user_uuid={current_user.user_uuid}"
)
raise YouTubeAccountNotConnectedError()
if platform_user_id is not None:
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
if not matched:
logger.warning(
f"[ACCOUNT NOT FOUND] 지정 계정 없음 - "
f"user_uuid={current_user.user_uuid}, platform_user_id={platform_user_id}"
)
raise YouTubeAccountNotFoundError()
social_account = matched[0]
elif len(social_accounts) == 1:
social_account = social_accounts[0]
else:
logger.warning(
f"[MULTI ACCOUNT] 계정 선택 필요 - "
f"user_uuid={current_user.user_uuid}, count={len(social_accounts)}"
)
raise YouTubeAccountSelectionRequiredError()
logger.debug(
f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}"
)
# 3. 기간 내 업로드 영상 수 조회
count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= start_dt,
Dashboard.uploaded_at < today + timedelta(days=1),
)
)
period_video_count = count_result.scalar() or 0
# 이전 기간 업로드 영상 수 조회 (trend 계산용)
prev_count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= prev_start_dt,
Dashboard.uploaded_at <= prev_kpi_end_dt,
)
)
prev_period_video_count = prev_count_result.scalar() or 0
logger.debug(
f"[3] 기간 내 업로드 영상 수 - current={period_video_count}, prev={prev_period_video_count}"
)
# 4. Redis 캐시 조회
# platform_user_id 기준 캐시 키: 재연동해도 채널 ID는 불변 → 캐시 유지됨
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
cached_raw = await get_cache(cache_key)
if cached_raw:
try:
payload = json.loads(cached_raw)
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
response = DashboardResponse.model_validate(payload["response"])
for metric in response.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
return response
except (json.JSONDecodeError, KeyError):
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
# 5. 최근 30개 업로드 영상 조회 (Analytics API 전달용)
# YouTube Analytics API 제약사항:
# - 영상 개수: 20~30개 권장 (최대 50개, 그 이상은 응답 지연 발생)
# - URL 길이: 2000자 제한 (video ID 11자 × 30개 = 330자로 안전)
result = await session.execute(
select(
Dashboard.platform_video_id,
Dashboard.title,
Dashboard.uploaded_at,
)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
)
.order_by(Dashboard.uploaded_at.desc())
.limit(30)
)
rows = result.all()
logger.debug(f"[5] 영상 조회 완료 - count={len(rows)}")
# 6. video_ids + 메타데이터 조회용 dict 구성
video_ids = []
video_lookup: dict[str, tuple[str, datetime]] = {} # {video_id: (title, uploaded_at)}
for row in rows:
platform_video_id, title, uploaded_at = row
video_ids.append(platform_video_id)
video_lookup[platform_video_id] = (title, uploaded_at)
logger.debug(
f"[6] 영상 메타데이터 구성 완료 - count={len(video_ids)}, sample={video_ids[:3]}"
)
# 6.1 업로드 영상 없음 → YouTube API 호출 없이 빈 응답 반환
if not video_ids:
logger.info(
f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - "
f"user_uuid={current_user.user_uuid}"
)
return DashboardResponse(
content_metrics=[
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
],
monthly_data=[],
daily_data=[],
top_content=[],
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
has_uploads=False,
)
# 7. 토큰 유효성 확인 및 자동 갱신 (만료 10분 전 갱신)
try:
access_token = await SocialAccountService().ensure_valid_token(
social_account, session
)
except TokenExpiredError:
logger.warning(
f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}"
)
raise YouTubeTokenExpiredError()
logger.debug("[7] 토큰 유효성 확인 완료")
# 8. YouTube Analytics API 호출 (7개 병렬)
youtube_service = YouTubeAnalyticsService()
raw_data = await youtube_service.fetch_all_metrics(
video_ids=video_ids,
start_date=start_date,
end_date=end_date,
kpi_end_date=kpi_end_date,
access_token=access_token,
mode=mode,
)
logger.debug("[8] YouTube Analytics API 호출 완료")
# 9. TopContent 조립 (Analytics top_videos + DB lookup)
processor = DataProcessor()
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
top_content: list[TopContent] = []
for row in top_content_rows[:4]:
if len(row) < 4:
continue
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
meta = video_lookup.get(video_id)
if not meta:
continue
title, uploaded_at = meta
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
top_content.append(
TopContent(
id=video_id,
title=title,
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
platform="youtube",
views=int(views),
engagement=f"{engagement_rate:.1f}%",
date=uploaded_at.strftime("%Y.%m.%d"),
)
)
logger.debug(f"[9] TopContent 조립 완료 - count={len(top_content)}")
# 10. 데이터 가공 (period_video_count=0 — API 무관 DB 집계값, 캐시에 포함하지 않음)
dashboard_data = processor.process(
raw_data, top_content, 0, mode=mode, end_date=end_date
)
logger.debug("[10] 데이터 가공 완료")
# 11. Redis 캐싱 (TTL: 12시간)
# YouTube Analytics는 하루 1회 갱신 (PT 자정, 한국 시간 오후 5~8시)
# 48시간 지연된 데이터이므로 12시간 캐싱으로 API 호출 최소화
# period_video_count는 캐시에 포함하지 않음 (DB 직접 집계, API 미사용)
cache_payload = json.dumps(
{"response": json.loads(dashboard_data.model_dump_json())}
)
cache_success = await set_cache(
cache_key,
cache_payload,
ttl=43200, # 12시간
)
if cache_success:
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
else:
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
# 12. 업로드 영상 수 및 trend 주입 (캐시 저장 후 — 항상 DB에서 직접 집계)
for metric in dashboard_data.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
logger.info(
f"[DASHBOARD] 통계 조회 완료 - "
f"user_uuid={current_user.user_uuid}, "
f"mode={mode}, period={period_desc}, videos={len(video_ids)}"
)
return dashboard_data
@router.delete(
"/cache",
response_model=CacheDeleteResponse,
summary="대시보드 캐시 삭제",
description="""
대시보드 Redis 캐시를 삭제합니다. 인증 없이 호출 가능합니다.
삭제 다음 `/stats` 요청 YouTube Analytics API를 새로 호출하여 최신 데이터를 반환합니다.
## 사용 시나리오
- 코드 배포 즉시 최신 데이터 반영이 필요할
- 데이터 이상 발생 캐시 강제 갱신
## 캐시 키 구조
`dashboard:{user_uuid}:{platform_user_id}:{mode}` (mode: day 또는 month)
## 파라미터
- `user_uuid`: 특정 사용자 캐시만 삭제. 미입력 전체 삭제
- `mode`: day / month / all (기본값: all)
""",
)
async def delete_dashboard_cache(
mode: Literal["day", "month", "all"] = Query(
default="all",
description="삭제할 캐시 모드: day, month, all(기본값, 모두 삭제)",
),
user_uuid: str | None = Query(
default=None,
description="대상 사용자 UUID. 미입력 시 전체 사용자 캐시 삭제",
),
) -> CacheDeleteResponse:
"""
대시보드 캐시 삭제
Args:
mode: 삭제할 캐시 모드 (day / month / all)
user_uuid: 대상 사용자 UUID (없으면 전체 삭제)
Returns:
CacheDeleteResponse: 삭제된 캐시 개수 메시지
"""
if user_uuid:
if mode == "all":
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*")
message = f"전체 캐시 삭제 완료 ({deleted}개)"
else:
cache_key = f"dashboard:{user_uuid}:{mode}"
success = await delete_cache(cache_key)
deleted = 1 if success else 0
message = f"{mode} 캐시 삭제 {'완료' if success else '실패 (키 없음)'}"
else:
deleted = await delete_cache_pattern("dashboard:*")
message = f"전체 사용자 캐시 삭제 완료 ({deleted}개)"
logger.info(
f"[CACHE DELETE] user_uuid={user_uuid or 'ALL'}, mode={mode}, deleted={deleted}"
)
return CacheDeleteResponse(deleted_count=deleted, message=message)

195
app/dashboard/exceptions.py Normal file
View File

@ -0,0 +1,195 @@
"""
Dashboard Exceptions
Dashboard API 관련 예외 클래스를 정의합니다.
"""
from fastapi import status
class DashboardException(Exception):
"""Dashboard 기본 예외"""
def __init__(
self,
message: str,
status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR,
code: str = "DASHBOARD_ERROR",
):
self.message = message
self.status_code = status_code
self.code = code
super().__init__(self.message)
# =============================================================================
# YouTube Analytics API 관련 예외
# =============================================================================
class YouTubeAPIException(DashboardException):
"""YouTube Analytics API 관련 예외 기본 클래스"""
def __init__(
self,
message: str = "YouTube Analytics API 호출 중 오류가 발생했습니다.",
status_code: int = status.HTTP_502_BAD_GATEWAY,
code: str = "YOUTUBE_API_ERROR",
):
super().__init__(message, status_code, code)
class YouTubeAPIError(YouTubeAPIException):
"""YouTube Analytics API 일반 오류"""
def __init__(self, detail: str = ""):
error_message = "YouTube Analytics API 호출에 실패했습니다."
if detail:
error_message += f" ({detail})"
super().__init__(
message=error_message,
status_code=status.HTTP_502_BAD_GATEWAY,
code="YOUTUBE_API_FAILED",
)
class YouTubeAuthError(YouTubeAPIException):
"""YouTube 인증 실패"""
def __init__(self, detail: str = ""):
error_message = "YouTube 인증에 실패했습니다. 계정 재연동이 필요합니다."
if detail:
error_message += f" ({detail})"
super().__init__(
message=error_message,
status_code=status.HTTP_401_UNAUTHORIZED,
code="YOUTUBE_AUTH_FAILED",
)
class YouTubeQuotaExceededError(YouTubeAPIException):
"""YouTube API 할당량 초과"""
def __init__(self):
super().__init__(
message="YouTube API 일일 할당량이 초과되었습니다. 내일 다시 시도해주세요.",
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
code="YOUTUBE_QUOTA_EXCEEDED",
)
class YouTubeDataNotFoundError(YouTubeAPIException):
"""YouTube Analytics 데이터 없음"""
def __init__(self, detail: str = ""):
error_message = "YouTube Analytics 데이터를 찾을 수 없습니다."
if detail:
error_message += f" ({detail})"
super().__init__(
message=error_message,
status_code=status.HTTP_404_NOT_FOUND,
code="YOUTUBE_DATA_NOT_FOUND",
)
# =============================================================================
# 계정 연동 관련 예외
# =============================================================================
class YouTubeAccountNotConnectedError(DashboardException):
"""YouTube 계정 미연동"""
def __init__(self):
super().__init__(
message="YouTube 계정이 연동되어 있지 않습니다. 먼저 YouTube 계정을 연동해주세요.",
status_code=status.HTTP_403_FORBIDDEN,
code="YOUTUBE_NOT_CONNECTED",
)
class YouTubeAccountSelectionRequiredError(DashboardException):
"""여러 YouTube 계정이 연동된 경우 계정 선택 필요"""
def __init__(self):
super().__init__(
message="연결된 YouTube 계정이 여러 개입니다. social_account_id 파라미터로 사용할 계정을 선택해주세요.",
status_code=status.HTTP_400_BAD_REQUEST,
code="YOUTUBE_ACCOUNT_SELECTION_REQUIRED",
)
class YouTubeAccountNotFoundError(DashboardException):
"""지정한 YouTube 계정을 찾을 수 없음"""
def __init__(self):
super().__init__(
message="지정한 YouTube 계정을 찾을 수 없습니다.",
status_code=status.HTTP_404_NOT_FOUND,
code="YOUTUBE_ACCOUNT_NOT_FOUND",
)
class YouTubeTokenExpiredError(DashboardException):
"""YouTube 토큰 만료"""
def __init__(self):
super().__init__(
message="YouTube 인증이 만료되었습니다. 계정을 재연동해주세요.",
status_code=status.HTTP_401_UNAUTHORIZED,
code="YOUTUBE_TOKEN_EXPIRED",
)
# =============================================================================
# 데이터 관련 예외
# =============================================================================
class NoVideosFoundError(DashboardException):
"""업로드된 영상 없음"""
def __init__(self):
super().__init__(
message="업로드된 YouTube 영상이 없습니다. 먼저 영상을 업로드해주세요.",
status_code=status.HTTP_404_NOT_FOUND,
code="NO_VIDEOS_FOUND",
)
class DashboardDataError(DashboardException):
"""대시보드 데이터 처리 오류"""
def __init__(self, detail: str = ""):
error_message = "대시보드 데이터 처리 중 오류가 발생했습니다."
if detail:
error_message += f" ({detail})"
super().__init__(
message=error_message,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
code="DASHBOARD_DATA_ERROR",
)
# =============================================================================
# 캐싱 관련 예외 (경고용, 실제로는 raise하지 않고 로깅만 사용)
# =============================================================================
class CacheError(DashboardException):
"""캐시 작업 오류
Note:
예외는 실제로 raise되지 않고,
캐시 실패 로깅만 하고 원본 데이터를 반환합니다.
"""
def __init__(self, operation: str, detail: str = ""):
error_message = f"캐시 {operation} 작업 중 오류가 발생했습니다."
if detail:
error_message += f" ({detail})"
super().__init__(
message=error_message,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
code="CACHE_ERROR",
)

112
app/dashboard/migration.py Normal file
View File

@ -0,0 +1,112 @@
"""
Dashboard Migration
dashboard 테이블 초기화 기존 데이터 마이그레이션을 담당합니다.
서버 기동 create_db_tables() 이후 호출됩니다.
"""
import logging
from sqlalchemy import func, select, text
from sqlalchemy.dialects.mysql import insert
from app.dashboard.models import Dashboard
from app.database.session import AsyncSessionLocal, engine
from app.social.models import SocialUpload
from app.user.models import SocialAccount
logger = logging.getLogger(__name__)
async def _dashboard_table_exists() -> bool:
"""dashboard 테이블 존재 여부 확인"""
async with engine.connect() as conn:
result = await conn.execute(
text(
"SELECT COUNT(*) FROM information_schema.tables "
"WHERE table_schema = DATABASE() AND table_name = 'dashboard'"
)
)
return result.scalar() > 0
async def _dashboard_is_empty() -> bool:
"""dashboard 테이블 데이터 존재 여부 확인"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(func.count()).select_from(Dashboard)
)
return result.scalar() == 0
async def _migrate_existing_data() -> None:
"""
SocialUpload(status=completed) Dashboard 마이그레이션.
INSERT IGNORE로 중복 안전하게 삽입.
"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(
SocialUpload.user_uuid,
SocialUpload.platform,
SocialUpload.platform_video_id,
SocialUpload.platform_url,
SocialUpload.title,
SocialUpload.uploaded_at,
SocialAccount.platform_user_id,
)
.join(SocialAccount, SocialUpload.social_account_id == SocialAccount.id)
.where(
SocialUpload.status == "completed",
SocialUpload.platform_video_id.isnot(None),
SocialUpload.uploaded_at.isnot(None),
SocialAccount.platform_user_id.isnot(None),
)
)
rows = result.all()
if not rows:
logger.info("[DASHBOARD_MIGRATE] 마이그레이션 대상 없음")
return
async with AsyncSessionLocal() as session:
for row in rows:
stmt = (
insert(Dashboard)
.values(
user_uuid=row.user_uuid,
platform=row.platform,
platform_user_id=row.platform_user_id,
platform_video_id=row.platform_video_id,
platform_url=row.platform_url,
title=row.title,
uploaded_at=row.uploaded_at,
)
.prefix_with("IGNORE")
)
await session.execute(stmt)
await session.commit()
logger.info(f"[DASHBOARD_MIGRATE] 마이그레이션 완료 - {len(rows)}건 삽입")
async def init_dashboard_table() -> None:
"""
dashboard 테이블 초기화 진입점.
- 테이블이 없으면 생성 마이그레이션
- 테이블이 있지만 비어있으면 마이그레이션 (DEBUG 모드에서 create_db_tables() 테이블 생성한 경우)
- 테이블이 있고 데이터도 있으면 스킵
"""
if not await _dashboard_table_exists():
logger.info("[DASHBOARD_MIGRATE] dashboard 테이블 없음 - 생성 및 마이그레이션 시작")
async with engine.begin() as conn:
await conn.run_sync(
lambda c: Dashboard.__table__.create(c, checkfirst=True)
)
await _migrate_existing_data()
elif await _dashboard_is_empty():
logger.info("[DASHBOARD_MIGRATE] dashboard 테이블 비어있음 - 마이그레이션 시작")
await _migrate_existing_data()
else:
logger.info("[DASHBOARD_MIGRATE] dashboard 테이블 이미 존재 - 스킵")

134
app/dashboard/models.py Normal file
View File

@ -0,0 +1,134 @@
"""
Dashboard Models
대시보드 전용 SQLAlchemy 모델을 정의합니다.
"""
from datetime import datetime
from typing import Optional
from sqlalchemy import BigInteger, DateTime, Index, String, UniqueConstraint, func
from sqlalchemy.orm import Mapped, mapped_column
from app.database.session import Base
class Dashboard(Base):
"""
채널별 영상 업로드 기록 테이블
YouTube 업로드 완료 채널 ID(platform_user_id) 함께 기록합니다.
SocialUpload.social_account_id는 재연동 변경되므로,
테이블로 채널 기준 안정적인 영상 필터링을 제공합니다.
Attributes:
id: 고유 식별자 (자동 증가)
user_uuid: 사용자 UUID (User.user_uuid 참조)
platform: 플랫폼 (youtube/instagram)
platform_user_id: 채널 ID (재연동 후에도 불변)
platform_video_id: 영상 ID
platform_url: 영상 URL
title: 영상 제목
uploaded_at: SocialUpload 완료 시각
created_at: 레코드 생성 시각
"""
__tablename__ = "dashboard"
__table_args__ = (
UniqueConstraint(
"platform_video_id",
"platform_user_id",
name="uq_vcu_video_channel",
),
Index("idx_vcu_user_platform", "user_uuid", "platform_user_id"),
Index("idx_vcu_uploaded_at", "uploaded_at"),
{
"mysql_engine": "InnoDB",
"mysql_charset": "utf8mb4",
"mysql_collate": "utf8mb4_unicode_ci",
},
)
# ==========================================================================
# 기본 식별자
# ==========================================================================
id: Mapped[int] = mapped_column(
BigInteger,
primary_key=True,
nullable=False,
autoincrement=True,
comment="고유 식별자",
)
# ==========================================================================
# 관계 필드
# ==========================================================================
user_uuid: Mapped[str] = mapped_column(
String(36),
nullable=False,
comment="사용자 UUID (User.user_uuid 참조)",
)
# ==========================================================================
# 플랫폼 정보
# ==========================================================================
platform: Mapped[str] = mapped_column(
String(20),
nullable=False,
comment="플랫폼 (youtube/instagram)",
)
platform_user_id: Mapped[str] = mapped_column(
String(100),
nullable=False,
comment="채널 ID (재연동 후에도 불변)",
)
# ==========================================================================
# 플랫폼 결과
# ==========================================================================
platform_video_id: Mapped[str] = mapped_column(
String(100),
nullable=False,
comment="영상 ID",
)
platform_url: Mapped[Optional[str]] = mapped_column(
String(500),
nullable=True,
comment="영상 URL",
)
# ==========================================================================
# 메타데이터
# ==========================================================================
title: Mapped[str] = mapped_column(
String(200),
nullable=False,
comment="영상 제목",
)
# ==========================================================================
# 시간 정보
# ==========================================================================
uploaded_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
comment="SocialUpload 완료 시각",
)
created_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.now(),
comment="레코드 생성 시각",
)
def __repr__(self) -> str:
return (
f"<Dashboard("
f"id={self.id}, "
f"platform_user_id='{self.platform_user_id}', "
f"platform_video_id='{self.platform_video_id}'"
f")>"
)

View File

@ -0,0 +1,173 @@
"""
Redis 캐싱 유틸리티
Dashboard API 성능 최적화를 위한 Redis 캐싱 기능을 제공합니다.
YouTube Analytics API 호출 결과를 캐싱하여 중복 요청을 방지합니다.
"""
from typing import Optional
from redis.asyncio import Redis
from app.utils.logger import get_logger
from config import db_settings
logger = get_logger("redis_cache")
# Dashboard 전용 Redis 클라이언트 (db=3 사용)
_cache_client = Redis(
host=db_settings.REDIS_HOST,
port=db_settings.REDIS_PORT,
db=3,
decode_responses=True,
)
async def get_cache(key: str) -> Optional[str]:
"""
Redis 캐시에서 값을 조회합니다.
Args:
key: 캐시
Returns:
캐시된 (문자열) 또는 None (캐시 미스)
Example:
>>> cached_data = await get_cache("dashboard:user123:2026-01-01:2026-12-31")
>>> if cached_data:
>>> return json.loads(cached_data)
"""
try:
logger.debug(f"[GET_CACHE] 캐시 조회 시작 - key: {key}")
value = await _cache_client.get(key)
if value:
logger.debug(f"[GET_CACHE] 캐시 HIT - key: {key}")
else:
logger.debug(f"[GET_CACHE] 캐시 MISS - key: {key}")
return value
except Exception as e:
logger.error(f"[GET_CACHE] 캐시 조회 실패 - key: {key}, error: {e}")
return None # 캐시 실패 시 None 반환 (원본 데이터 조회하도록 유도)
async def set_cache(key: str, value: str, ttl: int = 43200) -> bool:
"""
Redis 캐시에 값을 저장합니다.
Args:
key: 캐시
value: 저장할 (문자열)
ttl: 캐시 만료 시간 (). 기본값: 43200 (12시간)
Returns:
성공 여부
Example:
>>> import json
>>> data = {"views": 1000, "likes": 50}
>>> await set_cache("dashboard:user123:2026-01-01:2026-12-31", json.dumps(data), ttl=3600)
"""
try:
logger.debug(f"[SET_CACHE] 캐시 저장 시작 - key: {key}, ttl: {ttl}s")
await _cache_client.setex(key, ttl, value)
logger.debug(f"[SET_CACHE] 캐시 저장 성공 - key: {key}")
return True
except Exception as e:
logger.error(f"[SET_CACHE] 캐시 저장 실패 - key: {key}, error: {e}")
return False
async def delete_cache(key: str) -> bool:
"""
Redis 캐시에서 값을 삭제합니다.
Args:
key: 삭제할 캐시
Returns:
성공 여부
Example:
>>> await delete_cache("dashboard:user123:2026-01-01:2026-12-31")
"""
try:
logger.debug(f"[DELETE_CACHE] 캐시 삭제 시작 - key: {key}")
deleted_count = await _cache_client.delete(key)
logger.debug(
f"[DELETE_CACHE] 캐시 삭제 완료 - key: {key}, deleted: {deleted_count}"
)
return deleted_count > 0
except Exception as e:
logger.error(f"[DELETE_CACHE] 캐시 삭제 실패 - key: {key}, error: {e}")
return False
async def delete_cache_pattern(pattern: str) -> int:
"""
패턴에 매칭되는 모든 캐시 키를 삭제합니다.
Args:
pattern: 삭제할 패턴 (: "dashboard:user123:*")
Returns:
삭제된 개수
Example:
>>> # 특정 사용자의 모든 대시보드 캐시 삭제
>>> deleted = await delete_cache_pattern("dashboard:user123:*")
>>> print(f"{deleted}개의 캐시 삭제됨")
Note:
대량의 삭제 성능에 영향을 있으므로 주의해서 사용하세요.
"""
try:
logger.debug(f"[DELETE_CACHE_PATTERN] 패턴 캐시 삭제 시작 - pattern: {pattern}")
# 패턴에 매칭되는 모든 키 조회
keys = []
async for key in _cache_client.scan_iter(match=pattern):
keys.append(key)
if not keys:
logger.debug(f"[DELETE_CACHE_PATTERN] 삭제할 키 없음 - pattern: {pattern}")
return 0
# 모든 키 삭제
deleted_count = await _cache_client.delete(*keys)
logger.debug(
f"[DELETE_CACHE_PATTERN] 패턴 캐시 삭제 완료 - "
f"pattern: {pattern}, deleted: {deleted_count}"
)
return deleted_count
except Exception as e:
logger.error(
f"[DELETE_CACHE_PATTERN] 패턴 캐시 삭제 실패 - pattern: {pattern}, error: {e}"
)
return 0
async def close_cache_client():
"""
Redis 클라이언트 연결을 종료합니다.
애플리케이션 종료 호출되어야 합니다.
main.py의 shutdown 이벤트 핸들러에서 사용하세요.
Example:
>>> # main.py
>>> @app.on_event("shutdown")
>>> async def shutdown_event():
>>> await close_cache_client()
"""
try:
logger.info("[CLOSE_CACHE_CLIENT] Redis 캐시 클라이언트 종료 중...")
await _cache_client.close()
logger.info("[CLOSE_CACHE_CLIENT] Redis 캐시 클라이언트 종료 완료")
except Exception as e:
logger.error(
f"[CLOSE_CACHE_CLIENT] Redis 캐시 클라이언트 종료 실패 - error: {e}"
)

View File

@ -0,0 +1,29 @@
"""
Dashboard Schemas
Dashboard API의 요청/응답 스키마를 정의합니다.
"""
from app.dashboard.schemas.dashboard_schema import (
AudienceData,
CacheDeleteResponse,
ConnectedAccount,
ConnectedAccountsResponse,
ContentMetric,
DailyData,
DashboardResponse,
MonthlyData,
TopContent,
)
__all__ = [
"ConnectedAccount",
"ConnectedAccountsResponse",
"ContentMetric",
"DailyData",
"MonthlyData",
"TopContent",
"AudienceData",
"DashboardResponse",
"CacheDeleteResponse",
]

View File

@ -0,0 +1,313 @@
"""
Dashboard API Schemas
대시보드 API의 요청/응답 Pydantic 스키마를 정의합니다.
YouTube Analytics API 데이터를 프론트엔드에 전달하기 위한 모델입니다.
사용 예시:
from app.dashboard.schemas import DashboardResponse, ContentMetric
# 라우터에서 response_model로 사용
@router.get("/dashboard/stats", response_model=DashboardResponse)
async def get_dashboard_stats():
...
"""
from datetime import datetime
from typing import Any, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field
def to_camel(string: str) -> str:
"""snake_case를 camelCase로 변환
Args:
string: snake_case 문자열 (: "content_metrics")
Returns:
camelCase 문자열 (: "contentMetrics")
Example:
>>> to_camel("content_metrics")
"contentMetrics"
>>> to_camel("this_year")
"thisYear"
"""
components = string.split("_")
return components[0] + "".join(x.capitalize() for x in components[1:])
class ContentMetric(BaseModel):
"""KPI 지표 카드
대시보드 상단에 표시되는 핵심 성과 지표(KPI) 카드입니다.
Attributes:
id: 지표 고유 ID (: "total-views", "total-watch-time", "new-subscribers")
label: 한글 라벨 (: "조회수")
value: 원시 숫자값 (단위: unit 참조, 포맷팅은 프론트에서 처리)
unit: 값의 단위 "count" | "hours" | "minutes"
- count: 조회수, 구독자, 좋아요, 댓글, 공유, 업로드 영상
- hours: 시청시간 (estimatedMinutesWatched / 60)
- minutes: 평균 시청시간 (averageViewDuration / 60)
trend: 이전 기간 대비 증감량 (unit과 동일한 단위)
trend_direction: 증감 방향 ("up": 증가, "down": 감소, "-": 변동 없음)
Example:
>>> metric = ContentMetric(
... id="total-views",
... label="조회수",
... value=1200000.0,
... unit="count",
... trend=3800.0,
... trend_direction="up"
... )
"""
id: str
label: str
value: float
unit: str = "count"
trend: float
trend_direction: Literal["up", "down", "-"] = Field(alias="trendDirection")
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
)
class MonthlyData(BaseModel):
"""월별 추이 데이터
전년 대비 월별 조회수 비교 데이터입니다.
Attributes:
month: 표시 (: "1월", "2월")
this_year: 올해 해당 조회수
last_year: 작년 해당 조회수
Example:
>>> data = MonthlyData(
... month="1월",
... this_year=150000,
... last_year=120000
... )
"""
month: str
this_year: int = Field(alias="thisYear")
last_year: int = Field(alias="lastYear")
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
)
class DailyData(BaseModel):
"""일별 추이 데이터 (mode=day 전용)
최근 30일과 이전 30일의 일별 조회수 비교 데이터입니다.
Attributes:
date: 날짜 표시 (: "1/18", "1/19")
this_period: 최근 30 조회수
last_period: 이전 30 동일 요일 조회수
"""
date: str
this_period: int = Field(alias="thisPeriod")
last_period: int = Field(alias="lastPeriod")
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
)
class TopContent(BaseModel):
"""인기 영상
조회수 기준 상위 인기 영상 정보입니다.
Attributes:
id: YouTube 영상 ID
title: 영상 제목
thumbnail: 썸네일 이미지 URL
platform: 플랫폼 ("youtube" 또는 "instagram")
views: 원시 조회수 정수 (포맷팅은 프론트에서 처리, : 125400)
engagement: 참여율 (: "8.2%")
date: 업로드 날짜 (: "2026.01.15")
Example:
>>> content = TopContent(
... id="video-id-1",
... title="힐링 영상",
... thumbnail="https://i.ytimg.com/...",
... platform="youtube",
... views=125400,
... engagement="8.2%",
... date="2026.01.15"
... )
"""
id: str
title: str
thumbnail: str
platform: Literal["youtube", "instagram"]
views: int
engagement: str
date: str
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
)
class AudienceData(BaseModel):
"""시청자 분석 데이터
시청자의 연령대, 성별, 지역 분포 데이터입니다.
Attributes:
age_groups: 연령대별 시청자 비율 리스트
[{"label": "18-24", "percentage": 35}, ...]
gender: 성별 시청자 비율 (YouTube viewerPercentage 누적값)
{"male": 45, "female": 55}
top_regions: 상위 국가 리스트 (최대 5)
[{"region": "대한민국", "percentage": 42}, ...]
Example:
>>> data = AudienceData(
... age_groups=[{"label": "18-24", "percentage": 35}],
... gender={"male": 45, "female": 55},
... top_regions=[{"region": "대한민국", "percentage": 42}]
... )
"""
age_groups: list[dict[str, Any]] = Field(alias="ageGroups")
gender: dict[str, int]
top_regions: list[dict[str, Any]] = Field(alias="topRegions")
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
)
# class PlatformMetric(BaseModel):
# """플랫폼별 메트릭 (미사용 — platform_data 기능 미구현)"""
#
# id: str
# label: str
# value: str
# unit: Optional[str] = None
# trend: float
# trend_direction: Literal["up", "down", "-"] = Field(alias="trendDirection")
#
# model_config = ConfigDict(
# alias_generator=to_camel,
# populate_by_name=True,
# )
#
#
# class PlatformData(BaseModel):
# """플랫폼별 데이터 (미사용 — platform_data 기능 미구현)"""
#
# platform: Literal["youtube", "instagram"]
# display_name: str = Field(alias="displayName")
# metrics: list[PlatformMetric]
#
# model_config = ConfigDict(
# alias_generator=to_camel,
# populate_by_name=True,
# )
class DashboardResponse(BaseModel):
"""대시보드 전체 응답
GET /dashboard/stats 엔드포인트의 전체 응답 스키마입니다.
Attributes:
content_metrics: KPI 지표 카드 리스트 (8)
monthly_data: 월별 추이 데이터 (mode=month 채움, 최근 12개월 vs 이전 12개월)
daily_data: 일별 추이 데이터 (mode=day 채움, 최근 30 vs 이전 30)
top_content: 조회수 기준 인기 영상 TOP 4
audience_data: 시청자 분석 데이터 (연령/성별/지역)
has_uploads: 업로드 영상 존재 여부 (False 모든 지표가 0, 상태 UI 표시용)
Example:
>>> response = DashboardResponse(
... content_metrics=[...],
... monthly_data=[...],
... top_content=[...],
... audience_data=AudienceData(...),
... )
>>> json_str = response.model_dump_json() # JSON 직렬화
"""
content_metrics: list[ContentMetric] = Field(alias="contentMetrics")
monthly_data: list[MonthlyData] = Field(default=[], alias="monthlyData")
daily_data: list[DailyData] = Field(default=[], alias="dailyData")
top_content: list[TopContent] = Field(alias="topContent")
audience_data: AudienceData = Field(alias="audienceData")
has_uploads: bool = Field(default=True, alias="hasUploads")
# platform_data: list[PlatformData] = Field(default=[], alias="platformData") # 미사용
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
)
class ConnectedAccount(BaseModel):
"""연결된 소셜 계정 정보
Attributes:
id: SocialAccount 테이블 PK
platform: 플랫폼 (: "youtube")
platform_username: 플랫폼 사용자명 (: "@channelname")
platform_user_id: 플랫폼 채널 고유 ID 재연동해도 불변.
/dashboard/stats?platform_user_id=<> 으로 계정 선택에 사용
channel_title: YouTube 채널 제목 (SocialAccount.platform_data JSON에서 추출)
connected_at: 연동 일시
is_active: 활성화 상태
"""
id: int
platform: str
platform_user_id: str
platform_username: Optional[str] = None
channel_title: Optional[str] = None
connected_at: datetime
is_active: bool
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
)
class ConnectedAccountsResponse(BaseModel):
"""연결된 소셜 계정 목록 응답
Attributes:
accounts: 연결된 계정 목록
"""
accounts: list[ConnectedAccount]
class CacheDeleteResponse(BaseModel):
"""캐시 삭제 응답
Attributes:
deleted_count: 삭제된 캐시 개수
message: 처리 결과 메시지
"""
deleted_count: int
message: str

View File

@ -0,0 +1,13 @@
"""
Dashboard Services
YouTube Analytics API 연동 데이터 가공 서비스를 제공합니다.
"""
from app.dashboard.services.data_processor import DataProcessor
from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService
__all__ = [
"YouTubeAnalyticsService",
"DataProcessor",
]

View File

@ -0,0 +1,542 @@
"""
YouTube Analytics 데이터 가공 프로세서
YouTube Analytics API의 원본 데이터를 프론트엔드용 Pydantic 스키마로 변환합니다.
"""
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any, Literal
from app.dashboard.schemas import (
AudienceData,
ContentMetric,
DailyData,
DashboardResponse,
MonthlyData,
TopContent,
)
from app.utils.logger import get_logger
logger = get_logger("dashboard")
_COUNTRY_CODE_MAP: dict[str, str] = {
"KR": "대한민국",
"US": "미국",
"JP": "일본",
"CN": "중국",
"GB": "영국",
"DE": "독일",
"FR": "프랑스",
"CA": "캐나다",
"AU": "호주",
"IN": "인도",
"ID": "인도네시아",
"TH": "태국",
"VN": "베트남",
"PH": "필리핀",
"MY": "말레이시아",
"SG": "싱가포르",
"TW": "대만",
"HK": "홍콩",
"BR": "브라질",
"MX": "멕시코",
"NL": "네덜란드",
"BE": "벨기에",
"SE": "스웨덴",
"NO": "노르웨이",
"FI": "핀란드",
"DK": "덴마크",
"IE": "아일랜드",
"PL": "폴란드",
"CZ": "체코",
"RO": "루마니아",
"HU": "헝가리",
"SK": "슬로바키아",
"SI": "슬로베니아",
"HR": "크로아티아",
"GR": "그리스",
"PT": "포르투갈",
"ES": "스페인",
"IT": "이탈리아",
}
class DataProcessor:
"""YouTube Analytics 데이터 가공 프로세서
YouTube Analytics API의 원본 JSON 데이터를 DashboardResponse 스키마로 변환합니다.
섹션별로 데이터 가공 로직을 분리하여 유지보수성을 향상시켰습니다.
"""
def process(
self,
raw_data: dict[str, Any],
top_content: list[TopContent],
period_video_count: int = 0,
mode: Literal["day", "month"] = "month",
end_date: str = "",
) -> DashboardResponse:
"""YouTube Analytics API 원본 데이터를 DashboardResponse로 변환
Args:
raw_data: YouTube Analytics API 응답 데이터 (mode에 따라 구성 다름)
공통:
- kpi: KPI 메트릭 (조회수, 좋아요, 댓글, 시청시간 )
- top_videos: 인기 영상 데이터
- demographics: 연령/성별 데이터
- region: 지역별 데이터
mode="month" 추가:
- trend_recent: 최근 12개월 월별 조회수
- trend_previous: 이전 12개월 월별 조회수
mode="day" 추가:
- trend_recent: 최근 30 일별 조회수
- trend_previous: 이전 30 일별 조회수
top_content: TopContent 리스트 (라우터에서 Analytics + DB lookup으로 생성)
period_video_count: 조회 기간 업로드된 영상 (DB에서 집계)
mode: 조회 모드 ("month" | "day")
Returns:
DashboardResponse: 프론트엔드용 대시보드 응답 스키마
- mode="month": monthly_data 채움, daily_data=[]
- mode="day": daily_data 채움, monthly_data=[]
Example:
>>> processor = DataProcessor()
>>> response = processor.process(
... raw_data={
... "kpi": {...},
... "monthly_recent": {...},
... "monthly_previous": {...},
... "top_videos": {...},
... "demographics": {...},
... "region": {...},
... },
... top_content=[TopContent(...)],
... mode="month",
... )
"""
logger.debug(
f"[DataProcessor.process] START - "
f"top_content_count={len(top_content)}"
)
# 각 섹션별 데이터 가공 (안전한 딕셔너리 접근)
content_metrics = self._build_content_metrics(
raw_data.get("kpi", {}),
raw_data.get("kpi_previous", {}),
period_video_count,
)
if mode == "month":
monthly_data = self._merge_monthly_data(
raw_data.get("trend_recent", {}),
raw_data.get("trend_previous", {}),
end_date=end_date,
)
daily_data: list[DailyData] = []
else: # mode == "day"
daily_data = self._build_daily_data(
raw_data.get("trend_recent", {}),
raw_data.get("trend_previous", {}),
end_date=end_date,
)
monthly_data = []
audience_data = self._build_audience_data(
raw_data.get("demographics", {}),
raw_data.get("region", {}),
)
logger.debug(
f"[DataProcessor.process] SUCCESS - "
f"mode={mode}, metrics={len(content_metrics)}, "
f"top_content={len(top_content)}"
)
return DashboardResponse(
content_metrics=content_metrics,
monthly_data=monthly_data,
daily_data=daily_data,
top_content=top_content,
audience_data=audience_data,
)
def _build_content_metrics(
self,
kpi_data: dict[str, Any],
kpi_previous_data: dict[str, Any],
period_video_count: int = 0,
) -> list[ContentMetric]:
"""KPI 데이터를 ContentMetric 리스트로 변환
Args:
kpi_data: 최근 기간 KPI 응답
rows[0] = [views, likes, comments, shares,
estimatedMinutesWatched, averageViewDuration,
subscribersGained]
kpi_previous_data: 이전 기간 KPI 응답 (증감률 계산용)
period_video_count: 조회 기간 업로드된 영상 (DB에서 집계)
Returns:
list[ContentMetric]: KPI 지표 카드 리스트 (8)
순서: 조회수, 시청시간, 평균 시청시간, 신규 구독자, 좋아요, 댓글, 공유, 업로드 영상
"""
logger.info(
f"[DataProcessor._build_content_metrics] START - "
f"kpi_keys={list(kpi_data.keys())}"
)
rows = kpi_data.get("rows", [])
if not rows or not rows[0]:
logger.warning(
f"[DataProcessor._build_content_metrics] NO_DATA - " f"rows={rows}"
)
return []
row = rows[0]
prev_rows = kpi_previous_data.get("rows", [])
prev_row = prev_rows[0] if prev_rows else []
def _get(r: list, i: int, default: float = 0.0) -> float:
return r[i] if len(r) > i else default
def _trend(recent: float, previous: float) -> tuple[float, str]:
pct = recent - previous
if pct > 0:
direction = "up"
elif pct < 0:
direction = "down"
else:
direction = "-"
return pct, direction
# 최근 기간
views = _get(row, 0)
likes = _get(row, 1)
comments = _get(row, 2)
shares = _get(row, 3)
estimated_minutes_watched = _get(row, 4)
average_view_duration = _get(row, 5)
subscribers_gained = _get(row, 6)
# 이전 기간
prev_views = _get(prev_row, 0)
prev_likes = _get(prev_row, 1)
prev_comments = _get(prev_row, 2)
prev_shares = _get(prev_row, 3)
prev_minutes_watched = _get(prev_row, 4)
prev_avg_duration = _get(prev_row, 5)
prev_subscribers = _get(prev_row, 6)
views_trend, views_dir = _trend(views, prev_views)
watch_trend, watch_dir = _trend(estimated_minutes_watched, prev_minutes_watched)
duration_trend, duration_dir = _trend(average_view_duration, prev_avg_duration)
subs_trend, subs_dir = _trend(subscribers_gained, prev_subscribers)
likes_trend, likes_dir = _trend(likes, prev_likes)
comments_trend, comments_dir = _trend(comments, prev_comments)
shares_trend, shares_dir = _trend(shares, prev_shares)
logger.info(
f"[DataProcessor._build_content_metrics] SUCCESS - "
f"views={views}({views_trend:+.1f}), "
f"watch_time={estimated_minutes_watched}min({watch_trend:+.1f}), "
f"subscribers={subscribers_gained}({subs_trend:+.1f})"
)
return [
ContentMetric(
id="total-views",
label="조회수",
value=float(views),
unit="count",
trend=round(float(views_trend), 1),
trend_direction=views_dir,
),
ContentMetric(
id="total-watch-time",
label="시청시간",
value=round(estimated_minutes_watched / 60, 1),
unit="hours",
trend=round(watch_trend / 60, 1),
trend_direction=watch_dir,
),
ContentMetric(
id="avg-view-duration",
label="평균 시청시간",
value=round(average_view_duration / 60, 1),
unit="minutes",
trend=round(duration_trend / 60, 1),
trend_direction=duration_dir,
),
ContentMetric(
id="new-subscribers",
label="신규 구독자",
value=float(subscribers_gained),
unit="count",
trend=subs_trend,
trend_direction=subs_dir,
),
ContentMetric(
id="likes",
label="좋아요",
value=float(likes),
unit="count",
trend=likes_trend,
trend_direction=likes_dir,
),
ContentMetric(
id="comments",
label="댓글",
value=float(comments),
unit="count",
trend=comments_trend,
trend_direction=comments_dir,
),
ContentMetric(
id="shares",
label="공유",
value=float(shares),
unit="count",
trend=shares_trend,
trend_direction=shares_dir,
),
ContentMetric(
id="uploaded-videos",
label="업로드 영상",
value=float(period_video_count),
unit="count",
trend=0.0,
trend_direction="-",
),
]
def _merge_monthly_data(
self,
data_recent: dict[str, Any],
data_previous: dict[str, Any],
end_date: str = "",
) -> list[MonthlyData]:
"""최근 12개월과 이전 12개월의 월별 데이터를 병합
end_date 기준 12개월을 명시 생성하여 API가 반환하지 않은 (당월 ) 0으로 포함합니다.
Args:
data_recent: 최근 12개월 월별 조회수 데이터
rows = [["2026-01", 150000], ["2026-02", 180000], ...]
data_previous: 이전 12개월 월별 조회수 데이터
rows = [["2025-01", 120000], ["2025-02", 140000], ...]
end_date: 기준 종료일 (YYYY-MM-DD). 미전달 오늘 사용
Returns:
list[MonthlyData]: 월별 비교 데이터 (12, API 미반환 월은 0)
"""
logger.debug("[DataProcessor._merge_monthly_data] START")
rows_recent = data_recent.get("rows", [])
rows_previous = data_previous.get("rows", [])
map_recent = {row[0]: row[1] for row in rows_recent if len(row) >= 2}
map_previous = {row[0]: row[1] for row in rows_previous if len(row) >= 2}
# end_date 기준 12개월 명시 생성 (API 미반환 당월도 0으로 포함)
if end_date:
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
else:
end_dt = datetime.today()
result = []
for i in range(11, -1, -1):
m = end_dt.month - i
y = end_dt.year
if m <= 0:
m += 12
y -= 1
month_key = f"{y}-{m:02d}"
result.append(
MonthlyData(
month=f"{m}",
this_year=map_recent.get(month_key, 0),
last_year=map_previous.get(f"{y - 1}-{m:02d}", 0),
)
)
logger.debug(
f"[DataProcessor._merge_monthly_data] SUCCESS - count={len(result)}"
)
return result
def _build_daily_data(
self,
data_recent: dict[str, Any],
data_previous: dict[str, Any],
end_date: str = "",
num_days: int = 30,
) -> list[DailyData]:
"""최근 30일과 이전 30일의 일별 데이터를 병합
end_date 기준 num_days개 날짜를 직접 생성하여 YouTube API 응답에
해당 날짜 row가 없어도 0으로 채웁니다 (X축 누락 방지).
Args:
data_recent: 최근 30 일별 조회수 데이터
rows = [["2026-01-20", 5000], ["2026-01-21", 6200], ...]
data_previous: 이전 30 일별 조회수 데이터
rows = [["2025-12-21", 4500], ["2025-12-22", 5100], ...]
end_date: 최근 기간의 마지막 (YYYY-MM-DD). 미전달 rows 마지막 사용
num_days: 표시할 일수 (기본 30)
Returns:
list[DailyData]: 일별 비교 데이터 (num_days개, 데이터 없는 날은 0)
"""
logger.debug("[DataProcessor._build_daily_data] START")
rows_recent = data_recent.get("rows", [])
rows_previous = data_previous.get("rows", [])
# 날짜 → 조회수 맵
map_recent = {row[0]: row[1] for row in rows_recent if len(row) >= 2}
map_previous = {row[0]: row[1] for row in rows_previous if len(row) >= 2}
# end_date 결정: 전달된 값 우선, 없으면 rows 마지막 날짜 사용
if end_date:
end_dt = datetime.strptime(end_date, "%Y-%m-%d").date()
elif rows_recent:
end_dt = datetime.strptime(rows_recent[-1][0], "%Y-%m-%d").date()
else:
logger.warning(
"[DataProcessor._build_daily_data] NO_DATA - rows_recent 비어있음"
)
return []
start_dt = end_dt - timedelta(days=num_days - 1)
# 날짜 범위를 직접 생성하여 누락된 날짜도 0으로 채움
result = []
current = start_dt
while current <= end_dt:
date_str = current.strftime("%Y-%m-%d")
date_label = f"{current.month}/{current.day}"
this_views = map_recent.get(date_str, 0)
# 이전 기간: 동일 인덱스 날짜 (current - 30일)
prev_date_str = (current - timedelta(days=num_days)).strftime("%Y-%m-%d")
last_views = map_previous.get(prev_date_str, 0)
result.append(
DailyData(
date=date_label,
this_period=int(this_views),
last_period=int(last_views),
)
)
current += timedelta(days=1)
logger.debug(f"[DataProcessor._build_daily_data] SUCCESS - count={len(result)}")
return result
def _build_audience_data(
self,
demographics_data: dict[str, Any],
geography_data: dict[str, Any],
) -> AudienceData:
"""시청자 분석 데이터 생성
연령대별, 성별, 지역별 시청자 분포를 분석합니다.
Args:
demographics_data: 연령/성별 API 응답
rows = [["age18-24", "male", 45000], ["age18-24", "female", 55000], ...]
geography_data: 지역별 API 응답
rows = [["KR", 1000000], ["US", 500000], ...]
Returns:
AudienceData: 시청자 분석 데이터
- age_groups: 연령대별 비율
- gender: 성별 조회수
- top_regions: 상위 지역 (5)
"""
logger.debug("[DataProcessor._build_audience_data] START")
# === 연령/성별 데이터 처리 ===
demo_rows = demographics_data.get("rows", [])
age_map: dict[str, float] = {}
gender_map_f: dict[str, float] = {"male": 0.0, "female": 0.0}
for row in demo_rows:
if len(row) < 3:
continue
age_group = row[0] # "age18-24"
gender = row[1] # "male" or "female"
viewer_pct = row[2] # viewerPercentage (이미 % 값, 예: 45.5)
# 연령대별 집계: 남녀 비율 합산 (age18-24 → 18-24)
age_label = age_group.replace("age", "")
age_map[age_label] = age_map.get(age_label, 0.0) + viewer_pct
# 성별 집계
if gender in gender_map_f:
gender_map_f[gender] += viewer_pct
# 연령대 5개로 통합: 13-17+18-24 → 13-24, 55-64+65- → 55+
merged_age: dict[str, float] = {
"13-24": age_map.get("13-17", 0.0) + age_map.get("18-24", 0.0),
"25-34": age_map.get("25-34", 0.0),
"35-44": age_map.get("35-44", 0.0),
"45-54": age_map.get("45-54", 0.0),
"55+": age_map.get("55-64", 0.0) + age_map.get("65-", 0.0),
}
age_groups = [
{"label": age, "percentage": int(round(pct))}
for age, pct in merged_age.items()
]
gender_map = {k: int(round(v)) for k, v in gender_map_f.items()}
# === 지역 데이터 처리 ===
geo_rows = geography_data.get("rows", [])
total_geo_views = sum(row[1] for row in geo_rows if len(row) >= 2)
merged_geo: defaultdict[str, int] = defaultdict(int)
for row in geo_rows:
if len(row) >= 2:
merged_geo[self._translate_country_code(row[0])] += row[1]
top_regions = [
{
"region": region,
"percentage": int((views / total_geo_views * 100) if total_geo_views > 0 else 0),
}
for region, views in sorted(merged_geo.items(), key=lambda x: x[1], reverse=True)[:5]
]
logger.debug(
f"[DataProcessor._build_audience_data] SUCCESS - "
f"age_groups={len(age_groups)}, regions={len(top_regions)}"
)
return AudienceData(
age_groups=age_groups,
gender=gender_map,
top_regions=top_regions,
)
@staticmethod
def _translate_country_code(code: str) -> str:
"""국가 코드를 한국어로 변환
ISO 3166-1 alpha-2 국가 코드를 한국어 국가명으로 변환합니다.
Args:
code: ISO 3166-1 alpha-2 국가 코드 (: "KR", "US")
Returns:
str: 한국어 국가명 (매핑되지 않은 경우 원본 코드 반환)
Example:
>>> _translate_country_code("KR")
"대한민국"
>>> _translate_country_code("US")
"미국"
"""
return _COUNTRY_CODE_MAP.get(code, "기타")

View File

@ -0,0 +1,494 @@
"""
YouTube Analytics API 서비스
YouTube Analytics API v2를 호출하여 채널 영상 통계를 조회합니다.
"""
import asyncio
from datetime import datetime, timedelta
from typing import Any, Literal
import httpx
from app.dashboard.exceptions import (
YouTubeAPIError,
YouTubeAuthError,
YouTubeQuotaExceededError,
)
from app.utils.logger import get_logger
logger = get_logger("dashboard")
class YouTubeAnalyticsService:
"""YouTube Analytics API 호출 서비스
YouTube Analytics API v2를 사용하여 채널 통계, 영상 성과,
시청자 분석 데이터를 조회합니다.
API 문서:
https://developers.google.com/youtube/analytics/reference
"""
BASE_URL = "https://youtubeanalytics.googleapis.com/v2/reports"
async def fetch_all_metrics(
self,
video_ids: list[str],
start_date: str,
end_date: str,
access_token: str,
mode: Literal["day", "month"] = "month",
kpi_end_date: str = "",
) -> dict[str, Any]:
"""YouTube Analytics API 호출을 병렬로 실행
Args:
video_ids: YouTube 영상 ID 리스트 (최대 30, 리스트 허용)
start_date: 조회 시작일 (YYYY-MM-DD)
end_date: 조회 종료일 (YYYY-MM-DD)
access_token: YouTube OAuth 2.0 액세스 토큰
mode: 조회 모드 ("month" | "day")
kpi_end_date: KPI 집계 종료일 (미전달 end_date와 동일)
Returns:
dict[str, Any]: API 응답 데이터 (7 )
- kpi: 최근 기간 KPI 메트릭 (조회수, 좋아요, 댓글 )
- kpi_previous: 이전 기간 KPI 메트릭 (trend 계산용)
- trend_recent: 최근 기간 추이 (월별 또는 일별 조회수)
- trend_previous: 이전 기간 추이 (전년 또는 이전 30)
- top_videos: 조회수 기준 인기 영상 TOP 4
- demographics: 연령/성별 시청자 분포
- region: 지역별 조회수 TOP 5
Raises:
YouTubeAPIError: API 호출 실패
YouTubeQuotaExceededError: 할당량 초과
YouTubeAuthError: 인증 실패
Example:
>>> service = YouTubeAnalyticsService()
>>> data = await service.fetch_all_metrics(
... video_ids=["dQw4w9WgXcQ", "jNQXAC9IVRw"],
... start_date="2026-01-01",
... end_date="2026-12-31",
... access_token="ya29.a0..."
... )
"""
logger.debug(
f"[1/7] YouTube Analytics API 병렬 호출 시작 - "
f"video_count={len(video_ids)}, period={start_date}~{end_date}, mode={mode}"
)
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
# kpi_end_date: KPI/top_videos/demographics/region 호출에 사용
# month 모드에서는 현재 월 전체 데이터를 포함하기 위해 end_date(YYYY-MM-01)보다 늦은 날짜 사용
# day 모드 또는 미전달 시 end_date와 동일
_kpi_end = kpi_end_date if kpi_end_date else end_date
if mode == "month":
# 월별 차트: 라우터에서 이미 YYYY-MM-01 형식으로 계산된 날짜 그대로 사용
# recent: start_date ~ end_date (ex. 2025-03-01 ~ 2026-02-01)
# previous: 1년 전 동일 기간 (ex. 2024-03-01 ~ 2025-02-01)
recent_start = start_date
recent_end = end_date
previous_start = f"{int(start_date[:4]) - 1}{start_date[4:]}"
previous_end = f"{int(end_date[:4]) - 1}{end_date[4:]}"
# KPI 이전 기간: _kpi_end 기준 1년 전 (ex. 2026-02-22 → 2025-02-22)
previous_kpi_end = f"{int(_kpi_end[:4]) - 1}{_kpi_end[4:]}"
logger.debug(
f"[월별 데이터] 최근 12개월: {recent_start}~{recent_end}, "
f"이전 12개월: {previous_start}~{previous_end}, "
f"KPI 조회 종료일: {_kpi_end}"
)
else:
# 일별 차트: end_date 기준 최근 30일 / 이전 30일
day_recent_end = end_date
day_recent_start = (end_dt - timedelta(days=29)).strftime("%Y-%m-%d")
day_previous_end = (end_dt - timedelta(days=30)).strftime("%Y-%m-%d")
day_previous_start = (end_dt - timedelta(days=59)).strftime("%Y-%m-%d")
logger.debug(
f"[일별 데이터] 최근 30일: {day_recent_start}~{day_recent_end}, "
f"이전 30일: {day_previous_start}~{day_previous_end}"
)
# 7개 API 호출 태스크 생성 (mode별 선택적)
# [0] KPI(최근), [1] KPI(이전), [2] 추이(최근), [3] 추이(이전), [4] 인기영상, [5] 인구통계, [6] 지역
# mode=month: [2][3] = 월별 데이터 (YYYY-MM-01 형식 필요)
# mode=day: [2][3] = 일별 데이터
if mode == "month":
tasks = [
self._fetch_kpi(video_ids, start_date, _kpi_end, access_token),
self._fetch_kpi(video_ids, previous_start, previous_kpi_end, access_token),
self._fetch_monthly_data(video_ids, recent_start, recent_end, access_token),
self._fetch_monthly_data(video_ids, previous_start, previous_end, access_token),
self._fetch_top_videos(video_ids, start_date, _kpi_end, access_token),
self._fetch_demographics(start_date, _kpi_end, access_token),
self._fetch_region(start_date, _kpi_end, access_token),
]
else: # mode == "day"
tasks = [
self._fetch_kpi(video_ids, start_date, end_date, access_token),
self._fetch_kpi(video_ids, day_previous_start, day_previous_end, access_token),
self._fetch_daily_data(video_ids, day_recent_start, day_recent_end, access_token),
self._fetch_daily_data(video_ids, day_previous_start, day_previous_end, access_token),
self._fetch_top_videos(video_ids, start_date, end_date, access_token),
self._fetch_demographics(start_date, end_date, access_token),
self._fetch_region(start_date, end_date, access_token),
]
# 병렬 실행
results = await asyncio.gather(*tasks, return_exceptions=True)
# 에러 체크 (YouTubeAuthError, YouTubeQuotaExceededError는 원형 그대로 전파)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(
f"[YouTubeAnalyticsService] API 호출 {i+1}/7 실패: {result.__class__.__name__}"
)
if isinstance(result, (YouTubeAuthError, YouTubeQuotaExceededError)):
raise result
raise YouTubeAPIError(f"데이터 조회 실패: {result.__class__.__name__}")
logger.debug(
f"[7/7] YouTube Analytics API 병렬 호출 완료 - mode={mode}, 성공률 100%"
)
# 각 API 호출 결과 디버그 로깅
labels = [
"kpi",
"kpi_previous",
"trend_recent",
"trend_previous",
"top_videos",
"demographics",
"region",
]
for label, result in zip(labels, results):
rows = result.get("rows") if isinstance(result, dict) else None
row_count = len(rows) if rows else 0
preview = rows[:2] if rows else []
logger.debug(
f"[fetch_all_metrics] {label}: row_count={row_count}, preview={preview}"
)
return {
"kpi": results[0],
"kpi_previous": results[1],
"trend_recent": results[2],
"trend_previous": results[3],
"top_videos": results[4],
"demographics": results[5],
"region": results[6],
}
async def _fetch_kpi(
self,
video_ids: list[str],
start_date: str,
end_date: str,
access_token: str,
) -> dict[str, Any]:
"""전체 KPI 메트릭 조회 (contentMetrics용)
조회수, 좋아요, 댓글, 공유, 시청 시간, 구독자 증감
핵심 성과 지표를 조회합니다.
Args:
video_ids: YouTube 영상 ID 리스트
start_date: 조회 시작일 (YYYY-MM-DD)
end_date: 조회 종료일 (YYYY-MM-DD)
access_token: OAuth 2.0 액세스 토큰
Returns:
dict[str, Any]: YouTube Analytics API 응답
rows[0] = [views, likes, comments, shares,
estimatedMinutesWatched, averageViewDuration,
subscribersGained]
"""
logger.debug(
f"[YouTubeAnalyticsService._fetch_kpi] START - video_count={len(video_ids)}"
)
params = {
"ids": "channel==MINE",
"startDate": start_date,
"endDate": end_date,
"metrics": "views,likes,comments,shares,estimatedMinutesWatched,averageViewDuration,subscribersGained",
"filters": f"video=={','.join(video_ids)}",
}
result = await self._call_api(params, access_token)
logger.debug("[YouTubeAnalyticsService._fetch_kpi] SUCCESS")
return result
async def _fetch_monthly_data(
self,
video_ids: list[str],
start_date: str,
end_date: str,
access_token: str,
) -> dict[str, Any]:
"""월별 조회수 데이터 조회
지정된 기간의 월별 조회수를 조회합니다.
최근 12개월과 이전 12개월을 각각 조회하여 비교합니다.
Args:
video_ids: YouTube 영상 ID 리스트
start_date: 조회 시작일 (YYYY-MM-DD)
end_date: 조회 종료일 (YYYY-MM-DD)
access_token: OAuth 2.0 액세스 토큰
Returns:
dict[str, Any]: YouTube Analytics API 응답
rows = [["2026-01", 150000], ["2026-02", 180000], ...]
"""
logger.debug(
f"[YouTubeAnalyticsService._fetch_monthly_data] START - "
f"period={start_date}~{end_date}"
)
params = {
"ids": "channel==MINE",
"startDate": start_date,
"endDate": end_date,
"dimensions": "month",
"metrics": "views",
"filters": f"video=={','.join(video_ids)}",
"sort": "month",
}
result = await self._call_api(params, access_token)
logger.debug(
f"[YouTubeAnalyticsService._fetch_monthly_data] SUCCESS - "
f"period={start_date}~{end_date}"
)
return result
async def _fetch_daily_data(
self,
video_ids: list[str],
start_date: str,
end_date: str,
access_token: str,
) -> dict[str, Any]:
"""일별 조회수 데이터 조회
지정된 기간의 일별 조회수를 조회합니다.
최근 30일과 이전 30일을 각각 조회하여 비교합니다.
Args:
video_ids: YouTube 영상 ID 리스트
start_date: 조회 시작일 (YYYY-MM-DD)
end_date: 조회 종료일 (YYYY-MM-DD)
access_token: OAuth 2.0 액세스 토큰
Returns:
dict[str, Any]: YouTube Analytics API 응답
rows = [["2026-01-18", 5000], ["2026-01-19", 6200], ...]
"""
logger.debug(
f"[YouTubeAnalyticsService._fetch_daily_data] START - "
f"period={start_date}~{end_date}"
)
params = {
"ids": "channel==MINE",
"startDate": start_date,
"endDate": end_date,
"dimensions": "day",
"metrics": "views",
"filters": f"video=={','.join(video_ids)}",
"sort": "day",
}
result = await self._call_api(params, access_token)
logger.debug(
f"[YouTubeAnalyticsService._fetch_daily_data] SUCCESS - "
f"period={start_date}~{end_date}"
)
return result
async def _fetch_top_videos(
self,
video_ids: list[str],
start_date: str,
end_date: str,
access_token: str,
) -> dict[str, Any]:
"""영상별 조회수 조회 (topContent용)
조회수 기준 상위 4 영상의 성과 데이터를 조회합니다.
Args:
video_ids: YouTube 영상 ID 리스트
start_date: 조회 시작일 (YYYY-MM-DD)
end_date: 조회 종료일 (YYYY-MM-DD)
access_token: OAuth 2.0 액세스 토큰
Returns:
dict[str, Any]: YouTube Analytics API 응답
rows = [["video_id", views, likes, comments], ...]
조회수 내림차순으로 정렬된 상위 4 영상
"""
logger.debug("[YouTubeAnalyticsService._fetch_top_videos] START")
params = {
"ids": "channel==MINE",
"startDate": start_date,
"endDate": end_date,
"dimensions": "video",
"metrics": "views,likes,comments",
"filters": f"video=={','.join(video_ids)}",
"sort": "-views",
"maxResults": "4",
}
result = await self._call_api(params, access_token)
logger.debug("[YouTubeAnalyticsService._fetch_top_videos] SUCCESS")
return result
async def _fetch_demographics(
self,
start_date: str,
end_date: str,
access_token: str,
) -> dict[str, Any]:
"""연령/성별 분포 조회 (채널 전체 기준)
시청자의 연령대별, 성별 시청 비율을 조회합니다.
Note:
YouTube Analytics API 제약: ageGroup/gender 차원은 video 필터와 혼용 불가.
채널 전체 시청자 기준 데이터를 반환합니다.
Args:
start_date: 조회 시작일 (YYYY-MM-DD)
end_date: 조회 종료일 (YYYY-MM-DD)
access_token: OAuth 2.0 액세스 토큰
Returns:
dict[str, Any]: YouTube Analytics API 응답
rows = [["age18-24", "female", 45.5], ["age18-24", "male", 32.1], ...]
"""
logger.debug("[YouTubeAnalyticsService._fetch_demographics] START")
# Demographics 보고서는 video 필터 미지원 → 채널 전체 기준 데이터
# 지원 filters: country, province, continent, subContinent, liveOrOnDemand, subscribedStatus
params = {
"ids": "channel==MINE",
"startDate": start_date,
"endDate": end_date,
"dimensions": "ageGroup,gender",
"metrics": "viewerPercentage",
}
result = await self._call_api(params, access_token)
logger.debug("[YouTubeAnalyticsService._fetch_demographics] SUCCESS")
return result
async def _fetch_region(
self,
start_date: str,
end_date: str,
access_token: str,
) -> dict[str, Any]:
"""지역별 조회수 조회
지역별 조회수 분포를 조회합니다 (상위 5).
Args:
start_date: 조회 시작일 (YYYY-MM-DD)
end_date: 조회 종료일 (YYYY-MM-DD)
access_token: OAuth 2.0 액세스 토큰
Returns:
dict[str, Any]: YouTube Analytics API 응답
rows = [["KR", 1000000], ["US", 500000], ...]
조회수 내림차순으로 정렬된 상위 5 국가
"""
logger.debug("[YouTubeAnalyticsService._fetch_region] START")
params = {
"ids": "channel==MINE",
"startDate": start_date,
"endDate": end_date,
"dimensions": "country",
"metrics": "views",
"sort": "-views",
}
result = await self._call_api(params, access_token)
logger.debug("[YouTubeAnalyticsService._fetch_region] SUCCESS")
return result
async def _call_api(
self,
params: dict[str, str],
access_token: str,
) -> dict[str, Any]:
"""YouTube Analytics API 호출 공통 로직
모든 API 호출에 공통적으로 사용되는 HTTP 요청 로직입니다.
인증 헤더 추가, 에러 처리, 응답 파싱을 담당합니다.
Args:
params: API 요청 파라미터 (dimensions, metrics, filters )
access_token: OAuth 2.0 액세스 토큰
Returns:
dict[str, Any]: YouTube Analytics API JSON 응답
Raises:
YouTubeQuotaExceededError: 할당량 초과 (429)
YouTubeAuthError: 인증 실패 (401, 403)
YouTubeAPIError: 기타 API 오류
Note:
- 타임아웃: 30
- 할당량 초과 자동으로 YouTubeQuotaExceededError 발생
- 인증 실패 자동으로 YouTubeAuthError 발생
"""
headers = {"Authorization": f"Bearer {access_token}"}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
self.BASE_URL,
params=params,
headers=headers,
)
# 할당량 초과 체크
if response.status_code == 429:
logger.warning("[YouTubeAnalyticsService._call_api] QUOTA_EXCEEDED")
raise YouTubeQuotaExceededError()
# 인증 실패 체크
if response.status_code in (401, 403):
logger.warning(
f"[YouTubeAnalyticsService._call_api] AUTH_FAILED - status={response.status_code}"
)
raise YouTubeAuthError(f"YouTube 인증 실패: {response.status_code}")
# HTTP 에러 체크
response.raise_for_status()
return response.json()
except (YouTubeAuthError, YouTubeQuotaExceededError):
raise # 이미 처리된 예외는 그대로 전파
except httpx.HTTPStatusError as e:
logger.error(
f"[YouTubeAnalyticsService._call_api] HTTP_ERROR - "
f"status={e.response.status_code}, body={e.response.text[:500]}"
)
raise YouTubeAPIError(f"HTTP {e.response.status_code}")
except httpx.RequestError as e:
logger.error(f"[YouTubeAnalyticsService._call_api] REQUEST_ERROR - {e}")
raise YouTubeAPIError(f"네트워크 오류: {e}")
except Exception as e:
logger.error(f"[YouTubeAnalyticsService._call_api] UNEXPECTED_ERROR - {e}")
raise YouTubeAPIError(f"알 수 없는 오류: {e}")

71
app/dashboard/tasks.py Normal file
View File

@ -0,0 +1,71 @@
"""
Dashboard Background Tasks
업로드 완료 Dashboard 테이블에 레코드를 삽입하는 백그라운드 태스크입니다.
"""
import logging
from sqlalchemy import select
from sqlalchemy.dialects.mysql import insert
from app.dashboard.models import Dashboard
from app.database.session import BackgroundSessionLocal
from app.social.models import SocialUpload
from app.user.models import SocialAccount
logger = logging.getLogger(__name__)
async def insert_dashboard(upload_id: int) -> None:
"""
Dashboard 레코드 삽입
SocialUpload(id=upload_id) 완료 데이터를 DB에서 조회하여 Dashboard에 삽입합니다.
UniqueConstraint(platform_video_id, platform_user_id) 충돌 스킵(INSERT IGNORE).
"""
try:
async with BackgroundSessionLocal() as session:
result = await session.execute(
select(
SocialUpload.user_uuid,
SocialUpload.platform,
SocialUpload.platform_video_id,
SocialUpload.platform_url,
SocialUpload.title,
SocialUpload.uploaded_at,
SocialAccount.platform_user_id,
)
.join(SocialAccount, SocialUpload.social_account_id == SocialAccount.id)
.where(SocialUpload.id == upload_id)
)
row = result.one_or_none()
if not row:
logger.warning(f"[dashboard] upload_id={upload_id} 데이터 없음")
return
stmt = (
insert(Dashboard)
.values(
user_uuid=row.user_uuid,
platform=row.platform,
platform_user_id=row.platform_user_id,
platform_video_id=row.platform_video_id,
platform_url=row.platform_url,
title=row.title,
uploaded_at=row.uploaded_at,
)
.prefix_with("IGNORE")
)
await session.execute(stmt)
await session.commit()
logger.info(
f"[dashboard] 삽입 완료 - "
f"upload_id={upload_id}, platform_video_id={row.platform_video_id}"
)
except Exception as e:
logger.error(
f"[dashboard] 삽입 실패 - upload_id={upload_id}, error={e}"
)

View File

@ -79,6 +79,7 @@ async def create_db_tables():
from app.video.models import Video # noqa: F401
from app.sns.models import SNSUploadTask # noqa: F401
from app.social.models import SocialUpload # noqa: F401
from app.dashboard.models import Dashboard # noqa: F401
# 생성할 테이블 목록
tables_to_create = [
@ -94,6 +95,7 @@ async def create_db_tables():
SNSUploadTask.__table__,
SocialUpload.__table__,
MarketingIntel.__table__,
Dashboard.__table__,
]
logger.info("Creating database tables...")

View File

@ -91,6 +91,7 @@ PLATFORM_CONFIG = {
YOUTUBE_SCOPES = [
"https://www.googleapis.com/auth/youtube.upload", # 영상 업로드
"https://www.googleapis.com/auth/youtube.readonly", # 채널 정보 읽기
"https://www.googleapis.com/auth/yt-analytics.readonly", # 대시보드
"https://www.googleapis.com/auth/userinfo.profile", # 사용자 프로필
]

View File

@ -18,6 +18,7 @@ from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from config import social_upload_settings
from app.dashboard.tasks import insert_dashboard
from app.database.session import BackgroundSessionLocal
from app.social.constants import SocialPlatform, UploadStatus
from app.social.exceptions import TokenExpiredError, UploadError, UploadQuotaExceededError
@ -318,6 +319,7 @@ async def process_social_upload(upload_id: int) -> None:
f"platform_video_id: {result.platform_video_id}, "
f"url: {result.platform_url}"
)
await insert_dashboard(upload_id)
else:
retry_count = await _increment_retry_count(upload_id)

View File

@ -0,0 +1,173 @@
# YouTube Analytics API: Core Metrics Dimensions
## 1. Core Metrics (측정항목)
API 요청 시 `metrics` 파라미터에 들어갈 수 있는 값들입니다. 이들은 모두 **숫자**로 반환됩니다.
### A. 시청 및 도달 (Views Reach)
가장 기본이 되는 성과 지표입니다.
| Metric ID | 설명 (한글/영문) | 단위 | 비고 |
| ----------------------------- | --------------- | --------------- | --------------- |
| `**views`** | **조회수** (Views) | 횟수 | 가장 기본 지표 |
| `**estimatedMinutesWatched`** | **예상 시청 시간** | **분 (Minutes)** | 초 단위가 아님에 주의 |
| `**averageViewDuration`** | **평균 시청 지속 시간** | **초 (Seconds)** | 영상 1회당 평균 시청 시간 |
| `**averageViewPercentage`** | **평균 시청 비율** | % (0~100) | 영상 길이 대비 시청 비율 |
### B. 참여 및 반응 (Engagement)
시청자의 능동적인 행동 지표입니다.
| Metric ID | 설명 (한글/영문) | 단위 | 비고 |
| ----------------------- | ------------------- | --- | ---------------- |
| `**likes`** | **좋아요** (Likes) | 횟수 | |
| `**dislikes`** | **싫어요** (Dislikes) | 횟수 | |
| `**comments`** | **댓글 수** (Comments) | 횟수 | |
| `**shares`** | **공유 수** (Shares) | 횟수 | 공유 버튼 클릭 횟수 |
| `**subscribersGained`** | **신규 구독자** | 명 | 해당 영상으로 유입된 구독자 |
| `**subscribersLost`** | **이탈 구독자** | 명 | 해당 영상 시청 후 구독 취소 |
### C. 수익 (Revenue) - *수익 창출 채널 전용(유튜브파트너프로그램(YPP) 사용자만 가능)*
| Metric ID | 설명 (한글/영문) | 단위 | 비고 |
| ------------------------ | ----------- | ---------------- | ------------------ |
| `**estimatedRevenue`** | **총 예상 수익** | 통화 (예: USD, KRW) | 광고 + 유튜브 프리미엄 등 포함 |
| `**estimatedAdRevenue`** | **광고 수익** | 통화 | 순수 광고 수익 |
| `monetizedPlaybacks` | 수익 창출 재생 수 | 횟수 | 광고가 1회 이상 노출된 재생 |
---
## 2. Dimensions (차원)
API 요청 시 `dimensions` 파라미터에 들어갈 수 있는 값들입니다.
### A. 시간 및 영상 기준 (Time Item)
가장 많이 사용되는 필수 차원입니다.
| Dimension ID | 설명 | 사용 예시 (Use Case) | 필수 정렬(Sort) |
| ------------ | ------------------- | ---------------------- | ----------- |
| `**day`** | **일별** (Daily) | 최근 30일 조회수 추이 그래프 | `day` |
| `**month`** | **월별** (Monthly) | 월간 성장 리포트 | `month` |
| `**video`** | **영상별** (Per Video) | 인기 영상 랭킹 (Top Content) | `-views` |
| (없음) | **전체 합계** (Total) | 프로젝트 전체 성과 요약 (KPI) | (없음) |
### B. 시청자 분석 (Demographics)
**주의**: 이 차원들은 대부분 `video` 차원과 함께 사용할 수 없으며, 별도로 호출해야 합니다.
| Dimension ID | 설명 | 사용 예시 | 비고 |
| -------------- | ------- | --------------------------- | -------------- |
| `**ageGroup`** | **연령대** | 시청자 연령 분포 (18-24, 25-34...) | `video`와 혼용 불가 |
| `**gender`** | **성별** | 남녀 성비 (male, female) | `video`와 혼용 불가 |
| `**country`** | **국가** | 국가별 시청자 수 (KR, US...) | 지도 차트용, 채널 전체 기준 |
### C. 유입 및 기기 (Traffic Device)
| Dimension ID | 설명 | 반환 값 예시 |
| ------------------------------ | --------- | -------------------------------------- |
| `**insightTrafficSourceType`** | **유입 경로** | `YT_SEARCH` (검색), `RELATED_VIDEO` (추천) |
| `**deviceType`** | **기기 유형** | `MOBILE`, `DESKTOP`, `TV` |
---
## 3. 현재 사용 중인 API 호출 조합
대시보드에서 실제로 사용하는 7가지 호출 조합입니다. 모두 `ids=channel==MINE`으로 고정합니다.
### 1. KPI 요약 (`_fetch_kpi`) — 현재/이전 기간 각 1회
| 파라미터 | 값 |
| ---------- | ------------------------------------------------------------------------------------- |
| dimensions | (없음) |
| metrics | `views, likes, comments, shares, estimatedMinutesWatched, averageViewDuration, subscribersGained` |
| filters | `video==ID1,ID2,...` (업로드된 영상 ID 최대 30개) |
> 현재/이전 기간을 각각 호출하여 trend(증감률) 계산에 사용.
---
### 2. 월별 추이 차트 (`_fetch_monthly_data`) — 최근 12개월 / 이전 12개월 각 1회
| 파라미터 | 값 |
| ---------- | -------------------- |
| dimensions | `month` |
| metrics | `views` |
| filters | `video==ID1,ID2,...` |
| sort | `month` |
---
### 3. 일별 추이 차트 (`_fetch_daily_data`) — 최근 30일 / 이전 30일 각 1회
| 파라미터 | 값 |
| ---------- | -------------------- |
| dimensions | `day` |
| metrics | `views` |
| filters | `video==ID1,ID2,...` |
| sort | `day` |
---
### 4. 인기 영상 TOP 4 (`_fetch_top_videos`)
| 파라미터 | 값 |
| ---------- | ------------------------ |
| dimensions | `video` |
| metrics | `views, likes, comments` |
| filters | `video==ID1,ID2,...` |
| sort | `-views` |
| maxResults | `4` |
---
### 5. 시청자 연령/성별 분포 (`_fetch_demographics`) — 채널 전체 기준
| 파라미터 | 값 |
| ---------- | ----------------------- |
| dimensions | `ageGroup, gender` |
| metrics | `viewerPercentage` |
> `ageGroup`, `gender` 차원은 `video` 필터와 혼용 불가 → 채널 전체 시청자 기준.
---
### 6. 지역별 조회수 TOP 5 (`_fetch_region`) — 채널 전체 기준
| 파라미터 | 값 |
| ---------- | -------------------- |
| dimensions | `country` |
| metrics | `views` |
| sort | `-views` |
| maxResults | `5` |
---
## 4. API 사용 시 주의사항 및 제약사항
### A. 영상 ID 개수 제한
- **권장**: 최근 생성된 영상 20~30개(최대 50개)를 DB에서 가져오고 해당 목록을 API로 호출
- **Analytics API 공식 한도**: 명시된 개수 제한은 없지만 URL 길이 제한 2000자 (이론상 최대 150개)
- **실질적 제한**: Analytics API는 계산이 복잡하여 ID 150개를 한 번에 던지면, 유튜브 서버 응답 시간(Latency)이 길어지고 **50개 이상일 때 문제가 발생**한다는 StackOverflow의 보고가 있음
- **Data API 비교**: 같은 유튜브의 Data API는 `videos.list` 50개 제한
### B. 데이터 지연 (Latency)
- Analytics API 데이터는 실시간이 아니며 **24~48시간 지연(Latency)** 발생
- 최신 데이터가 필요한 경우 이 점을 고려해야 함

View File

@ -0,0 +1,77 @@
-- ===================================================================
-- dashboard 테이블 생성 마이그레이션
-- 채널별 영상 업로드 기록을 저장하는 테이블
-- SocialUpload.social_account_id는 재연동 시 변경되므로,
-- 채널 ID(platform_user_id) 기준으로 안정적인 영상 필터링을 제공합니다.
-- 생성일: 2026-02-24
-- ===================================================================
-- dashboard 테이블 생성
CREATE TABLE IF NOT EXISTS dashboard (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '고유 식별자',
-- 관계 필드
user_uuid VARCHAR(36) NOT NULL COMMENT '사용자 UUID (user.user_uuid 참조)',
-- 플랫폼 정보
platform VARCHAR(20) NOT NULL COMMENT '플랫폼 (youtube, instagram)',
platform_user_id VARCHAR(100) NOT NULL COMMENT '채널 ID (재연동 후에도 불변)',
-- 플랫폼 결과
platform_video_id VARCHAR(100) NOT NULL COMMENT '플랫폼에서 부여한 영상 ID',
platform_url VARCHAR(500) NULL COMMENT '플랫폼에서의 영상 URL',
-- 메타데이터
title VARCHAR(200) NOT NULL COMMENT '영상 제목',
-- 시간 정보
uploaded_at DATETIME NOT NULL COMMENT 'SocialUpload 완료 시각 (정렬 기준)',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '레코드 생성 시각',
-- 외래키 제약조건
CONSTRAINT fk_dashboard_user FOREIGN KEY (user_uuid) REFERENCES user(user_uuid) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='채널별 영상 업로드 기록 테이블 (대시보드 통계 기준)';
-- 유니크 인덱스 (동일 채널에 동일 영상 중복 삽입 방지)
CREATE UNIQUE INDEX uq_vcu_video_channel ON dashboard(platform_video_id, platform_user_id);
-- 복합 인덱스 (사용자별 채널 필터링)
CREATE INDEX idx_vcu_user_platform ON dashboard(user_uuid, platform_user_id);
-- 인덱스 (날짜 범위 조회)
CREATE INDEX idx_vcu_uploaded_at ON dashboard(uploaded_at);
-- ===================================================================
-- 기존 데이터 마이그레이션
-- social_upload(status=completed) → dashboard INSERT IGNORE
-- 서버 기동 시 init_dashboard_table()에서 자동 실행됩니다.
-- 아래 쿼리는 수동 실행 시 참고용입니다.
-- ===================================================================
/*
INSERT IGNORE INTO dashboard (
user_uuid,
platform,
platform_user_id,
platform_video_id,
platform_url,
title,
uploaded_at
)
SELECT
su.user_uuid,
su.platform,
sa.platform_user_id,
su.platform_video_id,
su.platform_url,
su.title,
su.uploaded_at
FROM social_upload su
JOIN social_account sa ON su.social_account_id = sa.id
WHERE
su.status = 'completed'
AND su.platform_video_id IS NOT NULL
AND su.uploaded_at IS NOT NULL
AND sa.platform_user_id IS NOT NULL;
*/

30
main.py
View File

@ -12,6 +12,7 @@ from app.database.session import engine
from app.user.models import User, RefreshToken # noqa: F401
from app.archive.api.routers.v1.archive import router as archive_router
from app.dashboard.api.routers.v1.dashboard import router as dashboard_router
from app.home.api.routers.v1.home import router as home_router
from app.user.api.routers.v1.auth import router as auth_router, test_router as auth_test_router
from app.user.api.routers.v1.social_account import router as social_account_router
@ -220,6 +221,34 @@ tags_metadata = [
- `processing`: 플랫폼에서 처리
- `completed`: 업로드 완료
- `failed`: 업로드 실패
""",
},
{
"name": "Dashboard",
"description": """YouTube Analytics 대시보드 API
## 주요 기능
- `GET /dashboard/stats` - YouTube 영상 성과 통계 조회
## 제공 데이터
- **KPI 지표**: 조회수, 참여율, 시청 시간, 평균 시청 시간
- **월별 추이**: 최근 12개월 vs 이전 12개월 비교
- **인기 영상**: 조회수 TOP 10
- **시청자 분석**: 연령/성별/지역 분포
- **플랫폼 메트릭**: 구독자 증감
## 성능 최적화
- 6 YouTube Analytics API 병렬 호출
- Redis 캐싱 (TTL: 1시간)
- 최근 30 업로드 영상 기준 분석
## 사전 조건
- YouTube 계정이 연동되어 있어야 합니다
- 최소 1 이상의 영상이 업로드되어 있어야 합니다
""",
},
{
@ -363,6 +392,7 @@ app.include_router(social_oauth_router, prefix="/social") # Social OAuth 라우
app.include_router(social_upload_router, prefix="/social") # Social Upload 라우터 추가
app.include_router(social_seo_router, prefix="/social") # Social Upload 라우터 추가
app.include_router(sns_router) # SNS API 라우터 추가
app.include_router(dashboard_router) # Dashboard API 라우터 추가
# DEBUG 모드에서만 테스트 라우터 등록
if prj_settings.DEBUG: