o2o-castad-backend/app/dashboard/services/dashboard_service.py

359 lines
14 KiB
Python

"""
Dashboard Service
대시보드 비즈니스 로직을 담당합니다.
"""
import json
import logging
from datetime import date, datetime, timedelta
from typing import Literal
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dashboard.exceptions import (
YouTubeAccountNotConnectedError,
YouTubeAccountNotFoundError,
YouTubeAccountSelectionRequiredError,
YouTubeTokenExpiredError,
)
from app.dashboard.models import Dashboard
from app.dashboard.utils.redis_cache import get_cache, set_cache
from app.dashboard.schemas import (
AudienceData,
ConnectedAccount,
ContentMetric,
DashboardResponse,
TopContent,
)
from app.dashboard.services.data_processor import DataProcessor
from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService
from app.social.exceptions import TokenExpiredError
from app.social.services import SocialAccountService
from app.user.models import SocialAccount, User
logger = logging.getLogger(__name__)
class DashboardService:
async def get_connected_accounts(
self,
current_user: User,
session: AsyncSession,
) -> list[ConnectedAccount]:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
accounts_raw = result.scalars().all()
connected = []
for acc in accounts_raw:
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
connected.append(
ConnectedAccount(
id=acc.id,
platform=acc.platform,
platform_username=acc.platform_username,
platform_user_id=acc.platform_user_id,
channel_title=data.get("channel_title"),
connected_at=acc.connected_at,
is_active=acc.is_active,
)
)
logger.info(
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
)
return connected
def calculate_date_range(
self, mode: Literal["day", "month"]
) -> tuple[date, date, date, date, date, str]:
"""모드별 날짜 범위 계산. (start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc) 반환"""
today = date.today()
if mode == "day":
end_dt = today - timedelta(days=2)
kpi_end_dt = end_dt
start_dt = end_dt - timedelta(days=29)
prev_start_dt = start_dt - timedelta(days=30)
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
period_desc = "최근 30일"
else:
end_dt = today.replace(day=1)
kpi_end_dt = today - timedelta(days=2)
start_month = end_dt.month - 11
if start_month <= 0:
start_month += 12
start_year = end_dt.year - 1
else:
start_year = end_dt.year
start_dt = date(start_year, start_month, 1)
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
try:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
except ValueError:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
period_desc = "최근 12개월"
return start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc
async def resolve_social_account(
self,
current_user: User,
session: AsyncSession,
platform_user_id: str | None,
) -> SocialAccount:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
social_accounts_raw = result.scalars().all()
social_accounts = list(social_accounts_raw)
if not social_accounts:
raise YouTubeAccountNotConnectedError()
if platform_user_id is not None:
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
if not matched:
raise YouTubeAccountNotFoundError()
return matched[0]
elif len(social_accounts) == 1:
return social_accounts[0]
else:
raise YouTubeAccountSelectionRequiredError()
async def get_video_counts(
self,
current_user: User,
session: AsyncSession,
social_account: SocialAccount,
start_dt: date,
prev_start_dt: date,
prev_kpi_end_dt: date,
) -> tuple[int, int]:
today = date.today()
count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= start_dt,
Dashboard.uploaded_at < today + timedelta(days=1),
)
)
period_video_count = count_result.scalar() or 0
prev_count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= prev_start_dt,
Dashboard.uploaded_at <= prev_kpi_end_dt,
)
)
prev_period_video_count = prev_count_result.scalar() or 0
return period_video_count, prev_period_video_count
async def get_video_ids(
self,
current_user: User,
session: AsyncSession,
social_account: SocialAccount,
) -> tuple[list[str], dict[str, tuple[str, datetime]]]:
result = await session.execute(
select(
Dashboard.platform_video_id,
Dashboard.title,
Dashboard.uploaded_at,
)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
)
.order_by(Dashboard.uploaded_at.desc())
.limit(30)
)
rows = result.all()
video_ids = []
video_lookup: dict[str, tuple[str, datetime]] = {}
for row in rows:
platform_video_id, title, uploaded_at = row
video_ids.append(platform_video_id)
video_lookup[platform_video_id] = (title, uploaded_at)
return video_ids, video_lookup
def build_empty_response(self) -> DashboardResponse:
return DashboardResponse(
content_metrics=[
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
],
monthly_data=[],
daily_data=[],
top_content=[],
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
has_uploads=False,
)
def inject_video_count(
self,
response: DashboardResponse,
period_video_count: int,
prev_period_video_count: int,
) -> None:
for metric in response.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
async def get_stats(
self,
mode: Literal["day", "month"],
platform_user_id: str | None,
current_user: User,
session: AsyncSession,
) -> DashboardResponse:
logger.info(
f"[DASHBOARD] 통계 조회 시작 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
)
# 1. 날짜 계산
start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc = (
self.calculate_date_range(mode)
)
start_date = start_dt.strftime("%Y-%m-%d")
end_date = end_dt.strftime("%Y-%m-%d")
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
logger.debug(f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}")
# 2. YouTube 계정 확인
social_account = await self.resolve_social_account(current_user, session, platform_user_id)
logger.debug(f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}")
# 3. 영상 수 조회
period_video_count, prev_period_video_count = await self.get_video_counts(
current_user, session, social_account, start_dt, prev_start_dt, prev_kpi_end_dt
)
logger.debug(f"[3] 영상 수 - current={period_video_count}, prev={prev_period_video_count}")
# 4. 캐시 조회
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
cached_raw = await get_cache(cache_key)
if cached_raw:
try:
payload = json.loads(cached_raw)
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
response = DashboardResponse.model_validate(payload["response"])
self.inject_video_count(response, period_video_count, prev_period_video_count)
return response
except (json.JSONDecodeError, KeyError):
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
# 5. 업로드 영상 조회
video_ids, video_lookup = await self.get_video_ids(current_user, session, social_account)
logger.debug(f"[5] 영상 조회 완료 - count={len(video_ids)}")
if not video_ids:
logger.info(f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - user_uuid={current_user.user_uuid}")
return self.build_empty_response()
# 6. 토큰 유효성 확인
try:
access_token = await SocialAccountService().ensure_valid_token(social_account, session)
except TokenExpiredError:
logger.warning(f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}")
raise YouTubeTokenExpiredError()
logger.debug("[6] 토큰 유효성 확인 완료")
# 7. YouTube Analytics API 호출
youtube_service = YouTubeAnalyticsService()
raw_data = await youtube_service.fetch_all_metrics(
video_ids=video_ids,
start_date=start_date,
end_date=end_date,
kpi_end_date=kpi_end_date,
access_token=access_token,
mode=mode,
)
logger.debug("[7] YouTube Analytics API 호출 완료")
# 8. TopContent 조립
processor = DataProcessor()
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
top_content: list[TopContent] = []
for row in top_content_rows[:4]:
if len(row) < 4:
continue
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
meta = video_lookup.get(video_id)
if not meta:
continue
title, uploaded_at = meta
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
top_content.append(
TopContent(
id=video_id,
title=title,
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
platform="youtube",
views=int(views),
engagement=f"{engagement_rate:.1f}%",
date=uploaded_at.strftime("%Y.%m.%d"),
)
)
logger.debug(f"[8] TopContent 조립 완료 - count={len(top_content)}")
# 9. 데이터 가공
dashboard_data = processor.process(raw_data, top_content, 0, mode=mode, end_date=end_date)
logger.debug("[9] 데이터 가공 완료")
# 10. 캐시 저장
cache_payload = json.dumps({"response": dashboard_data.model_dump(mode="json")})
cache_success = await set_cache(cache_key, cache_payload, ttl=43200)
if cache_success:
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
else:
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
# 11. 업로드 영상 수 주입
self.inject_video_count(dashboard_data, period_video_count, prev_period_video_count)
logger.info(
f"[DASHBOARD] 통계 조회 완료 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, period={period_desc}, videos={len(video_ids)}"
)
return dashboard_data