Compare commits

..

No commits in common. "01c1cacb84525b75b5c4a771b94b84ba8f88d537" and "395b4dbbfb91541abf57ee3854b8fbbdd7f739ea" have entirely different histories.

9 changed files with 444 additions and 392 deletions

View File

@ -4,22 +4,43 @@ Dashboard API 라우터
YouTube Analytics 기반 대시보드 통계를 제공합니다.
"""
import json
import logging
from datetime import date, datetime, timedelta
from typing import Literal
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dashboard.utils.redis_cache import delete_cache_pattern
from app.dashboard.schemas import (
CacheDeleteResponse,
ConnectedAccountsResponse,
DashboardResponse,
from app.dashboard.exceptions import (
YouTubeAccountNotConnectedError,
YouTubeAccountNotFoundError,
YouTubeAccountSelectionRequiredError,
YouTubeTokenExpiredError,
)
from app.dashboard.schemas import (
AudienceData,
CacheDeleteResponse,
ConnectedAccount,
ConnectedAccountsResponse,
ContentMetric,
DashboardResponse,
TopContent,
)
from app.dashboard.services import DataProcessor, YouTubeAnalyticsService
from app.dashboard.redis_cache import (
delete_cache,
delete_cache_pattern,
get_cache,
set_cache,
)
from app.dashboard.services import DashboardService
from app.database.session import get_session
from app.dashboard.models import Dashboard
from app.social.exceptions import TokenExpiredError
from app.social.services import SocialAccountService
from app.user.dependencies.auth import get_current_user
from app.user.models import User
from app.user.models import SocialAccount, User
logger = logging.getLogger(__name__)
@ -40,8 +61,41 @@ async def get_connected_accounts(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> ConnectedAccountsResponse:
service = DashboardService()
connected = await service.get_connected_accounts(current_user, session)
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
accounts_raw = result.scalars().all()
# platform_user_id 기준
seen_platform_ids: set[str] = set()
connected = []
for acc in sorted(
accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id in seen_platform_ids:
continue
seen_platform_ids.add(acc.platform_user_id)
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
connected.append(
ConnectedAccount(
id=acc.id,
platform=acc.platform,
platform_username=acc.platform_username,
platform_user_id=acc.platform_user_id,
channel_title=data.get("channel_title"),
connected_at=acc.connected_at,
is_active=acc.is_active,
)
)
logger.info(
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
)
return ConnectedAccountsResponse(accounts=connected)
@ -88,8 +142,328 @@ async def get_dashboard_stats(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> DashboardResponse:
service = DashboardService()
return await service.get_stats(mode, platform_user_id, current_user, session)
"""
대시보드 통계 조회
Args:
mode: 조회 모드 (day: 최근 30, month: 최근 12개월)
platform_user_id: 사용할 YouTube 채널 ID (여러 계정 연결 필수, 재연동해도 불변)
current_user: 현재 인증된 사용자
session: 데이터베이스 세션
Returns:
DashboardResponse: 대시보드 통계 데이터
Raises:
YouTubeAccountNotConnectedError: YouTube 계정이 연동되어 있지 않음
YouTubeAccountSelectionRequiredError: 여러 계정이 연결되어 있으나 계정 미선택
YouTubeAccountNotFoundError: 지정한 계정을 찾을 없음
YouTubeTokenExpiredError: YouTube 토큰 만료 (재연동 필요)
YouTubeAPIError: YouTube Analytics API 호출 실패
"""
logger.info(
f"[DASHBOARD] 통계 조회 시작 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
)
# 1. 모드별 날짜 자동 계산
today = date.today()
if mode == "day":
# 48시간 지연 적용: 오늘 기준 -2일을 end로 사용
# ex) 오늘 2/20 → end=2/18, start=1/20
end_dt = today - timedelta(days=2)
kpi_end_dt = end_dt
start_dt = end_dt - timedelta(days=29)
# 이전 30일 (YouTube API day_previous와 동일 기준)
prev_start_dt = start_dt - timedelta(days=30)
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
period_desc = "최근 30일"
else: # mode == "month"
# 월별 차트: dimensions=month API는 YYYY-MM-01 형식 필요
# ex) 오늘 2/24 → end=2026-02-01, start=2025-03-01 → 2025-03 ~ 2026-02 (12개월)
end_dt = today.replace(day=1)
# KPI 등 집계형 API: 48시간 지연 적용하여 현재 월 전체 데이터 포함
kpi_end_dt = today - timedelta(days=2)
start_month = end_dt.month - 11
if start_month <= 0:
start_month += 12
start_year = end_dt.year - 1
else:
start_year = end_dt.year
start_dt = date(start_year, start_month, 1)
# 이전 12개월 (YouTube API previous와 동일 기준 — 1년 전)
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
try:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
except ValueError: # 윤년 2/29 → 이전 연도 2/28
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
period_desc = "최근 12개월"
start_date = start_dt.strftime("%Y-%m-%d")
end_date = end_dt.strftime("%Y-%m-%d")
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
logger.debug(
f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}"
)
# 2. YouTube 계정 연동 확인
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
social_accounts_raw = result.scalars().all()
# platform_user_id 기준으로 중복 제거 (가장 최근 연동 계정 우선)
seen_platform_ids_stats: set[str] = set()
social_accounts = []
for acc in sorted(
social_accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id not in seen_platform_ids_stats:
seen_platform_ids_stats.add(acc.platform_user_id)
social_accounts.append(acc)
if not social_accounts:
logger.warning(
f"[NO YOUTUBE ACCOUNT] YouTube 계정 미연동 - "
f"user_uuid={current_user.user_uuid}"
)
raise YouTubeAccountNotConnectedError()
if platform_user_id is not None:
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
if not matched:
logger.warning(
f"[ACCOUNT NOT FOUND] 지정 계정 없음 - "
f"user_uuid={current_user.user_uuid}, platform_user_id={platform_user_id}"
)
raise YouTubeAccountNotFoundError()
social_account = matched[0]
elif len(social_accounts) == 1:
social_account = social_accounts[0]
else:
logger.warning(
f"[MULTI ACCOUNT] 계정 선택 필요 - "
f"user_uuid={current_user.user_uuid}, count={len(social_accounts)}"
)
raise YouTubeAccountSelectionRequiredError()
logger.debug(
f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}"
)
# 3. 기간 내 업로드 영상 수 조회
count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= start_dt,
Dashboard.uploaded_at < today + timedelta(days=1),
)
)
period_video_count = count_result.scalar() or 0
# 이전 기간 업로드 영상 수 조회 (trend 계산용)
prev_count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= prev_start_dt,
Dashboard.uploaded_at <= prev_kpi_end_dt,
)
)
prev_period_video_count = prev_count_result.scalar() or 0
logger.debug(
f"[3] 기간 내 업로드 영상 수 - current={period_video_count}, prev={prev_period_video_count}"
)
# 4. Redis 캐시 조회
# platform_user_id 기준 캐시 키: 재연동해도 채널 ID는 불변 → 캐시 유지됨
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
cached_raw = await get_cache(cache_key)
if cached_raw:
try:
payload = json.loads(cached_raw)
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
response = DashboardResponse.model_validate(payload["response"])
for metric in response.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
return response
except (json.JSONDecodeError, KeyError):
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
# 5. 최근 30개 업로드 영상 조회 (Analytics API 전달용)
# YouTube Analytics API 제약사항:
# - 영상 개수: 20~30개 권장 (최대 50개, 그 이상은 응답 지연 발생)
# - URL 길이: 2000자 제한 (video ID 11자 × 30개 = 330자로 안전)
result = await session.execute(
select(
Dashboard.platform_video_id,
Dashboard.title,
Dashboard.uploaded_at,
)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
)
.order_by(Dashboard.uploaded_at.desc())
.limit(30)
)
rows = result.all()
logger.debug(f"[5] 영상 조회 완료 - count={len(rows)}")
# 6. video_ids + 메타데이터 조회용 dict 구성
video_ids = []
video_lookup: dict[str, tuple[str, datetime]] = {} # {video_id: (title, uploaded_at)}
for row in rows:
platform_video_id, title, uploaded_at = row
video_ids.append(platform_video_id)
video_lookup[platform_video_id] = (title, uploaded_at)
logger.debug(
f"[6] 영상 메타데이터 구성 완료 - count={len(video_ids)}, sample={video_ids[:3]}"
)
# 6.1 업로드 영상 없음 → YouTube API 호출 없이 빈 응답 반환
if not video_ids:
logger.info(
f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - "
f"user_uuid={current_user.user_uuid}"
)
return DashboardResponse(
content_metrics=[
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
],
monthly_data=[],
daily_data=[],
top_content=[],
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
has_uploads=False,
)
# 7. 토큰 유효성 확인 및 자동 갱신 (만료 10분 전 갱신)
try:
access_token = await SocialAccountService().ensure_valid_token(
social_account, session
)
except TokenExpiredError:
logger.warning(
f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}"
)
raise YouTubeTokenExpiredError()
logger.debug("[7] 토큰 유효성 확인 완료")
# 8. YouTube Analytics API 호출 (7개 병렬)
youtube_service = YouTubeAnalyticsService()
raw_data = await youtube_service.fetch_all_metrics(
video_ids=video_ids,
start_date=start_date,
end_date=end_date,
kpi_end_date=kpi_end_date,
access_token=access_token,
mode=mode,
)
logger.debug("[8] YouTube Analytics API 호출 완료")
# 9. TopContent 조립 (Analytics top_videos + DB lookup)
processor = DataProcessor()
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
top_content: list[TopContent] = []
for row in top_content_rows[:4]:
if len(row) < 4:
continue
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
meta = video_lookup.get(video_id)
if not meta:
continue
title, uploaded_at = meta
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
top_content.append(
TopContent(
id=video_id,
title=title,
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
platform="youtube",
views=int(views),
engagement=f"{engagement_rate:.1f}%",
date=uploaded_at.strftime("%Y.%m.%d"),
)
)
logger.debug(f"[9] TopContent 조립 완료 - count={len(top_content)}")
# 10. 데이터 가공 (period_video_count=0 — API 무관 DB 집계값, 캐시에 포함하지 않음)
dashboard_data = processor.process(
raw_data, top_content, 0, mode=mode, end_date=end_date
)
logger.debug("[10] 데이터 가공 완료")
# 11. Redis 캐싱 (TTL: 12시간)
# YouTube Analytics는 하루 1회 갱신 (PT 자정, 한국 시간 오후 5~8시)
# 48시간 지연된 데이터이므로 12시간 캐싱으로 API 호출 최소화
# period_video_count는 캐시에 포함하지 않음 (DB 직접 집계, API 미사용)
cache_payload = json.dumps(
{"response": json.loads(dashboard_data.model_dump_json())}
)
cache_success = await set_cache(
cache_key,
cache_payload,
ttl=43200, # 12시간
)
if cache_success:
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
else:
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
# 12. 업로드 영상 수 및 trend 주입 (캐시 저장 후 — 항상 DB에서 직접 집계)
for metric in dashboard_data.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
logger.info(
f"[DASHBOARD] 통계 조회 완료 - "
f"user_uuid={current_user.user_uuid}, "
f"mode={mode}, period={period_desc}, videos={len(video_ids)}"
)
return dashboard_data
@router.delete(
@ -109,7 +483,7 @@ async def get_dashboard_stats(
`dashboard:{user_uuid}:{platform_user_id}:{mode}` (mode: day 또는 month)
## 파라미터
- `user_uuid`: 삭제할 사용자 UUID (필수)
- `user_uuid`: 특정 사용자 캐시만 삭제. 미입력 전체 삭제
- `mode`: day / month / all (기본값: all)
""",
)
@ -118,16 +492,33 @@ async def delete_dashboard_cache(
default="all",
description="삭제할 캐시 모드: day, month, all(기본값, 모두 삭제)",
),
user_uuid: str = Query(
description="대상 사용자 UUID",
user_uuid: str | None = Query(
default=None,
description="대상 사용자 UUID. 미입력 시 전체 사용자 캐시 삭제",
),
) -> CacheDeleteResponse:
"""
대시보드 캐시 삭제
Args:
mode: 삭제할 캐시 모드 (day / month / all)
user_uuid: 대상 사용자 UUID (없으면 전체 삭제)
Returns:
CacheDeleteResponse: 삭제된 캐시 개수 메시지
"""
if user_uuid:
if mode == "all":
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*")
message = f"전체 캐시 삭제 완료 ({deleted}개)"
else:
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*:{mode}")
message = f"{mode} 캐시 삭제 완료 ({deleted}개)"
cache_key = f"dashboard:{user_uuid}:{mode}"
success = await delete_cache(cache_key)
deleted = 1 if success else 0
message = f"{mode} 캐시 삭제 {'완료' if success else '실패 (키 없음)'}"
else:
deleted = await delete_cache_pattern("dashboard:*")
message = f"전체 사용자 캐시 삭제 완료 ({deleted}개)"
logger.info(
f"[CACHE DELETE] user_uuid={user_uuid or 'ALL'}, mode={mode}, deleted={deleted}"

View File

@ -113,7 +113,7 @@ class YouTubeAccountSelectionRequiredError(DashboardException):
def __init__(self):
super().__init__(
message="연결된 YouTube 계정이 여러 개입니다. platform_user_id 파라미터로 사용할 계정을 선택해주세요.",
message="연결된 YouTube 계정이 여러 개입니다. social_account_id 파라미터로 사용할 계정을 선택해주세요.",
status_code=status.HTTP_400_BAD_REQUEST,
code="YOUTUBE_ACCOUNT_SELECTION_REQUIRED",
)

View File

@ -197,6 +197,35 @@ class AudienceData(BaseModel):
)
# class PlatformMetric(BaseModel):
# """플랫폼별 메트릭 (미사용 — platform_data 기능 미구현)"""
#
# id: str
# label: str
# value: str
# unit: Optional[str] = None
# trend: float
# trend_direction: Literal["up", "down", "-"] = Field(alias="trendDirection")
#
# model_config = ConfigDict(
# alias_generator=to_camel,
# populate_by_name=True,
# )
#
#
# class PlatformData(BaseModel):
# """플랫폼별 데이터 (미사용 — platform_data 기능 미구현)"""
#
# platform: Literal["youtube", "instagram"]
# display_name: str = Field(alias="displayName")
# metrics: list[PlatformMetric]
#
# model_config = ConfigDict(
# alias_generator=to_camel,
# populate_by_name=True,
# )
class DashboardResponse(BaseModel):
"""대시보드 전체 응답
@ -226,6 +255,7 @@ class DashboardResponse(BaseModel):
top_content: list[TopContent] = Field(alias="topContent")
audience_data: AudienceData = Field(alias="audienceData")
has_uploads: bool = Field(default=True, alias="hasUploads")
# platform_data: list[PlatformData] = Field(default=[], alias="platformData") # 미사용
model_config = ConfigDict(
alias_generator=to_camel,

View File

@ -4,12 +4,10 @@ Dashboard Services
YouTube Analytics API 연동 데이터 가공 서비스를 제공합니다.
"""
from app.dashboard.services.dashboard_service import DashboardService
from app.dashboard.services.data_processor import DataProcessor
from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService
__all__ = [
"DashboardService",
"YouTubeAnalyticsService",
"DataProcessor",
]

View File

@ -1,358 +0,0 @@
"""
Dashboard Service
대시보드 비즈니스 로직을 담당합니다.
"""
import json
import logging
from datetime import date, datetime, timedelta
from typing import Literal
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dashboard.exceptions import (
YouTubeAccountNotConnectedError,
YouTubeAccountNotFoundError,
YouTubeAccountSelectionRequiredError,
YouTubeTokenExpiredError,
)
from app.dashboard.models import Dashboard
from app.dashboard.utils.redis_cache import get_cache, set_cache
from app.dashboard.schemas import (
AudienceData,
ConnectedAccount,
ContentMetric,
DashboardResponse,
TopContent,
)
from app.dashboard.services.data_processor import DataProcessor
from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService
from app.social.exceptions import TokenExpiredError
from app.social.services import SocialAccountService
from app.user.models import SocialAccount, User
logger = logging.getLogger(__name__)
class DashboardService:
async def get_connected_accounts(
self,
current_user: User,
session: AsyncSession,
) -> list[ConnectedAccount]:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
accounts_raw = result.scalars().all()
connected = []
for acc in accounts_raw:
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
connected.append(
ConnectedAccount(
id=acc.id,
platform=acc.platform,
platform_username=acc.platform_username,
platform_user_id=acc.platform_user_id,
channel_title=data.get("channel_title"),
connected_at=acc.connected_at,
is_active=acc.is_active,
)
)
logger.info(
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
)
return connected
def calculate_date_range(
self, mode: Literal["day", "month"]
) -> tuple[date, date, date, date, date, str]:
"""모드별 날짜 범위 계산. (start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc) 반환"""
today = date.today()
if mode == "day":
end_dt = today - timedelta(days=2)
kpi_end_dt = end_dt
start_dt = end_dt - timedelta(days=29)
prev_start_dt = start_dt - timedelta(days=30)
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
period_desc = "최근 30일"
else:
end_dt = today.replace(day=1)
kpi_end_dt = today - timedelta(days=2)
start_month = end_dt.month - 11
if start_month <= 0:
start_month += 12
start_year = end_dt.year - 1
else:
start_year = end_dt.year
start_dt = date(start_year, start_month, 1)
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
try:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
except ValueError:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
period_desc = "최근 12개월"
return start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc
async def resolve_social_account(
self,
current_user: User,
session: AsyncSession,
platform_user_id: str | None,
) -> SocialAccount:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
social_accounts_raw = result.scalars().all()
social_accounts = list(social_accounts_raw)
if not social_accounts:
raise YouTubeAccountNotConnectedError()
if platform_user_id is not None:
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
if not matched:
raise YouTubeAccountNotFoundError()
return matched[0]
elif len(social_accounts) == 1:
return social_accounts[0]
else:
raise YouTubeAccountSelectionRequiredError()
async def get_video_counts(
self,
current_user: User,
session: AsyncSession,
social_account: SocialAccount,
start_dt: date,
prev_start_dt: date,
prev_kpi_end_dt: date,
) -> tuple[int, int]:
today = date.today()
count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= start_dt,
Dashboard.uploaded_at < today + timedelta(days=1),
)
)
period_video_count = count_result.scalar() or 0
prev_count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= prev_start_dt,
Dashboard.uploaded_at <= prev_kpi_end_dt,
)
)
prev_period_video_count = prev_count_result.scalar() or 0
return period_video_count, prev_period_video_count
async def get_video_ids(
self,
current_user: User,
session: AsyncSession,
social_account: SocialAccount,
) -> tuple[list[str], dict[str, tuple[str, datetime]]]:
result = await session.execute(
select(
Dashboard.platform_video_id,
Dashboard.title,
Dashboard.uploaded_at,
)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
)
.order_by(Dashboard.uploaded_at.desc())
.limit(30)
)
rows = result.all()
video_ids = []
video_lookup: dict[str, tuple[str, datetime]] = {}
for row in rows:
platform_video_id, title, uploaded_at = row
video_ids.append(platform_video_id)
video_lookup[platform_video_id] = (title, uploaded_at)
return video_ids, video_lookup
def build_empty_response(self) -> DashboardResponse:
return DashboardResponse(
content_metrics=[
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
],
monthly_data=[],
daily_data=[],
top_content=[],
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
has_uploads=False,
)
def inject_video_count(
self,
response: DashboardResponse,
period_video_count: int,
prev_period_video_count: int,
) -> None:
for metric in response.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
async def get_stats(
self,
mode: Literal["day", "month"],
platform_user_id: str | None,
current_user: User,
session: AsyncSession,
) -> DashboardResponse:
logger.info(
f"[DASHBOARD] 통계 조회 시작 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
)
# 1. 날짜 계산
start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc = (
self.calculate_date_range(mode)
)
start_date = start_dt.strftime("%Y-%m-%d")
end_date = end_dt.strftime("%Y-%m-%d")
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
logger.debug(f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}")
# 2. YouTube 계정 확인
social_account = await self.resolve_social_account(current_user, session, platform_user_id)
logger.debug(f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}")
# 3. 영상 수 조회
period_video_count, prev_period_video_count = await self.get_video_counts(
current_user, session, social_account, start_dt, prev_start_dt, prev_kpi_end_dt
)
logger.debug(f"[3] 영상 수 - current={period_video_count}, prev={prev_period_video_count}")
# 4. 캐시 조회
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
cached_raw = await get_cache(cache_key)
if cached_raw:
try:
payload = json.loads(cached_raw)
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
response = DashboardResponse.model_validate(payload["response"])
self.inject_video_count(response, period_video_count, prev_period_video_count)
return response
except (json.JSONDecodeError, KeyError):
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
# 5. 업로드 영상 조회
video_ids, video_lookup = await self.get_video_ids(current_user, session, social_account)
logger.debug(f"[5] 영상 조회 완료 - count={len(video_ids)}")
if not video_ids:
logger.info(f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - user_uuid={current_user.user_uuid}")
return self.build_empty_response()
# 6. 토큰 유효성 확인
try:
access_token = await SocialAccountService().ensure_valid_token(social_account, session)
except TokenExpiredError:
logger.warning(f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}")
raise YouTubeTokenExpiredError()
logger.debug("[6] 토큰 유효성 확인 완료")
# 7. YouTube Analytics API 호출
youtube_service = YouTubeAnalyticsService()
raw_data = await youtube_service.fetch_all_metrics(
video_ids=video_ids,
start_date=start_date,
end_date=end_date,
kpi_end_date=kpi_end_date,
access_token=access_token,
mode=mode,
)
logger.debug("[7] YouTube Analytics API 호출 완료")
# 8. TopContent 조립
processor = DataProcessor()
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
top_content: list[TopContent] = []
for row in top_content_rows[:4]:
if len(row) < 4:
continue
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
meta = video_lookup.get(video_id)
if not meta:
continue
title, uploaded_at = meta
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
top_content.append(
TopContent(
id=video_id,
title=title,
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
platform="youtube",
views=int(views),
engagement=f"{engagement_rate:.1f}%",
date=uploaded_at.strftime("%Y.%m.%d"),
)
)
logger.debug(f"[8] TopContent 조립 완료 - count={len(top_content)}")
# 9. 데이터 가공
dashboard_data = processor.process(raw_data, top_content, 0, mode=mode, end_date=end_date)
logger.debug("[9] 데이터 가공 완료")
# 10. 캐시 저장
cache_payload = json.dumps({"response": dashboard_data.model_dump(mode="json")})
cache_success = await set_cache(cache_key, cache_payload, ttl=43200)
if cache_success:
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
else:
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
# 11. 업로드 영상 수 주입
self.inject_video_count(dashboard_data, period_video_count, prev_period_video_count)
logger.info(
f"[DASHBOARD] 통계 조회 완료 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, period={period_desc}, videos={len(video_ids)}"
)
return dashboard_data

View File

@ -143,8 +143,8 @@ class DataProcessor:
monthly_data = []
audience_data = self._build_audience_data(
raw_data.get("demographics") or {},
raw_data.get("region") or {},
raw_data.get("demographics", {}),
raw_data.get("region", {}),
)
logger.debug(
f"[DataProcessor.process] SUCCESS - "

View File

@ -141,9 +141,6 @@ class YouTubeAnalyticsService:
results = await asyncio.gather(*tasks, return_exceptions=True)
# 에러 체크 (YouTubeAuthError, YouTubeQuotaExceededError는 원형 그대로 전파)
# demographics(index 5)는 YouTubeAPIError 시 None으로 허용 (YouTube 서버 간헐적 오류 대응)
OPTIONAL_INDICES = {5, 6} # demographics, region
results = list(results)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(
@ -151,12 +148,6 @@ class YouTubeAnalyticsService:
)
if isinstance(result, (YouTubeAuthError, YouTubeQuotaExceededError)):
raise result
if i in OPTIONAL_INDICES and isinstance(result, YouTubeAPIError):
logger.warning(
f"[YouTubeAnalyticsService] 선택적 API 호출 {i+1}/7 실패, None으로 처리: {result}"
)
results[i] = None
continue
raise YouTubeAPIError(f"데이터 조회 실패: {result.__class__.__name__}")
logger.debug(

View File

@ -26,7 +26,7 @@ Act as a Senior Brand Strategist and Marketing Data Analyst. Your goal is to ana
### 3. target_persona
Generate a list of personas based on the following:
* **`persona`**: Provide a descriptive name and profile for the target group. Must be **20 characters or fewer**.
* **`persona`**: Provide a descriptive name and profile for the target group.
* **`age`**: Set `min_age` and `max_age` (Integer 0-100) that accurately reflects the segment.
* **`favor_target`**: List specific elements or vibes this persona prefers (e.g., "Minimalist interior", "Pet-friendly facilities").
* **`decision_trigger`**: Identify the specific "Hook" or facility that leads this persona to finalize a booking.