diff --git a/app/dashboard/api/routers/v1/dashboard.py b/app/dashboard/api/routers/v1/dashboard.py index fa4b860..ec62ac0 100644 --- a/app/dashboard/api/routers/v1/dashboard.py +++ b/app/dashboard/api/routers/v1/dashboard.py @@ -4,43 +4,22 @@ Dashboard API 라우터 YouTube Analytics 기반 대시보드 통계를 제공합니다. """ -import json import logging -from datetime import date, datetime, timedelta from typing import Literal from fastapi import APIRouter, Depends, Query -from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession -from app.dashboard.exceptions import ( - YouTubeAccountNotConnectedError, - YouTubeAccountNotFoundError, - YouTubeAccountSelectionRequiredError, - YouTubeTokenExpiredError, -) +from app.dashboard.utils.redis_cache import delete_cache_pattern from app.dashboard.schemas import ( - AudienceData, CacheDeleteResponse, - ConnectedAccount, ConnectedAccountsResponse, - ContentMetric, DashboardResponse, - TopContent, -) -from app.dashboard.services import DataProcessor, YouTubeAnalyticsService -from app.dashboard.redis_cache import ( - delete_cache, - delete_cache_pattern, - get_cache, - set_cache, ) +from app.dashboard.services import DashboardService from app.database.session import get_session -from app.dashboard.models import Dashboard -from app.social.exceptions import TokenExpiredError -from app.social.services import SocialAccountService from app.user.dependencies.auth import get_current_user -from app.user.models import SocialAccount, User +from app.user.models import User logger = logging.getLogger(__name__) @@ -61,41 +40,8 @@ async def get_connected_accounts( current_user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ) -> ConnectedAccountsResponse: - result = await session.execute( - select(SocialAccount).where( - SocialAccount.user_uuid == current_user.user_uuid, - SocialAccount.platform == "youtube", - SocialAccount.is_active == True, # noqa: E712 - ) - ) - accounts_raw = result.scalars().all() - - # platform_user_id 기준 - seen_platform_ids: set[str] = set() - connected = [] - for acc in sorted( - accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True - ): - if acc.platform_user_id in seen_platform_ids: - continue - seen_platform_ids.add(acc.platform_user_id) - data = acc.platform_data if isinstance(acc.platform_data, dict) else {} - connected.append( - ConnectedAccount( - id=acc.id, - platform=acc.platform, - platform_username=acc.platform_username, - platform_user_id=acc.platform_user_id, - channel_title=data.get("channel_title"), - connected_at=acc.connected_at, - is_active=acc.is_active, - ) - ) - - logger.info( - f"[ACCOUNTS] YouTube 계정 목록 조회 - " - f"user_uuid={current_user.user_uuid}, count={len(connected)}" - ) + service = DashboardService() + connected = await service.get_connected_accounts(current_user, session) return ConnectedAccountsResponse(accounts=connected) @@ -142,328 +88,8 @@ async def get_dashboard_stats( current_user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ) -> DashboardResponse: - """ - 대시보드 통계 조회 - - Args: - mode: 조회 모드 (day: 최근 30일, month: 최근 12개월) - platform_user_id: 사용할 YouTube 채널 ID (여러 계정 연결 시 필수, 재연동해도 불변) - current_user: 현재 인증된 사용자 - session: 데이터베이스 세션 - - Returns: - DashboardResponse: 대시보드 통계 데이터 - - Raises: - YouTubeAccountNotConnectedError: YouTube 계정이 연동되어 있지 않음 - YouTubeAccountSelectionRequiredError: 여러 계정이 연결되어 있으나 계정 미선택 - YouTubeAccountNotFoundError: 지정한 계정을 찾을 수 없음 - YouTubeTokenExpiredError: YouTube 토큰 만료 (재연동 필요) - YouTubeAPIError: YouTube Analytics API 호출 실패 - """ - logger.info( - f"[DASHBOARD] 통계 조회 시작 - " - f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}" - ) - - # 1. 모드별 날짜 자동 계산 - today = date.today() - - if mode == "day": - # 48시간 지연 적용: 오늘 기준 -2일을 end로 사용 - # ex) 오늘 2/20 → end=2/18, start=1/20 - end_dt = today - timedelta(days=2) - kpi_end_dt = end_dt - start_dt = end_dt - timedelta(days=29) - # 이전 30일 (YouTube API day_previous와 동일 기준) - prev_start_dt = start_dt - timedelta(days=30) - prev_kpi_end_dt = kpi_end_dt - timedelta(days=30) - period_desc = "최근 30일" - else: # mode == "month" - # 월별 차트: dimensions=month API는 YYYY-MM-01 형식 필요 - # ex) 오늘 2/24 → end=2026-02-01, start=2025-03-01 → 2025-03 ~ 2026-02 (12개월) - end_dt = today.replace(day=1) - # KPI 등 집계형 API: 48시간 지연 적용하여 현재 월 전체 데이터 포함 - kpi_end_dt = today - timedelta(days=2) - - start_month = end_dt.month - 11 - if start_month <= 0: - start_month += 12 - start_year = end_dt.year - 1 - else: - start_year = end_dt.year - start_dt = date(start_year, start_month, 1) - # 이전 12개월 (YouTube API previous와 동일 기준 — 1년 전) - prev_start_dt = start_dt.replace(year=start_dt.year - 1) - try: - prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1) - except ValueError: # 윤년 2/29 → 이전 연도 2/28 - prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28) - period_desc = "최근 12개월" - - start_date = start_dt.strftime("%Y-%m-%d") - end_date = end_dt.strftime("%Y-%m-%d") - kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d") - - logger.debug( - f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}" - ) - - # 2. YouTube 계정 연동 확인 - result = await session.execute( - select(SocialAccount).where( - SocialAccount.user_uuid == current_user.user_uuid, - SocialAccount.platform == "youtube", - SocialAccount.is_active == True, # noqa: E712 - ) - ) - social_accounts_raw = result.scalars().all() - - # platform_user_id 기준으로 중복 제거 (가장 최근 연동 계정 우선) - seen_platform_ids_stats: set[str] = set() - social_accounts = [] - for acc in sorted( - social_accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True - ): - if acc.platform_user_id not in seen_platform_ids_stats: - seen_platform_ids_stats.add(acc.platform_user_id) - social_accounts.append(acc) - - if not social_accounts: - logger.warning( - f"[NO YOUTUBE ACCOUNT] YouTube 계정 미연동 - " - f"user_uuid={current_user.user_uuid}" - ) - raise YouTubeAccountNotConnectedError() - - if platform_user_id is not None: - matched = [a for a in social_accounts if a.platform_user_id == platform_user_id] - if not matched: - logger.warning( - f"[ACCOUNT NOT FOUND] 지정 계정 없음 - " - f"user_uuid={current_user.user_uuid}, platform_user_id={platform_user_id}" - ) - raise YouTubeAccountNotFoundError() - social_account = matched[0] - elif len(social_accounts) == 1: - social_account = social_accounts[0] - else: - logger.warning( - f"[MULTI ACCOUNT] 계정 선택 필요 - " - f"user_uuid={current_user.user_uuid}, count={len(social_accounts)}" - ) - raise YouTubeAccountSelectionRequiredError() - - logger.debug( - f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}" - ) - - # 3. 기간 내 업로드 영상 수 조회 - count_result = await session.execute( - select(func.count()) - .select_from(Dashboard) - .where( - Dashboard.user_uuid == current_user.user_uuid, - Dashboard.platform == "youtube", - Dashboard.platform_user_id == social_account.platform_user_id, - Dashboard.uploaded_at >= start_dt, - Dashboard.uploaded_at < today + timedelta(days=1), - ) - ) - period_video_count = count_result.scalar() or 0 - - # 이전 기간 업로드 영상 수 조회 (trend 계산용) - prev_count_result = await session.execute( - select(func.count()) - .select_from(Dashboard) - .where( - Dashboard.user_uuid == current_user.user_uuid, - Dashboard.platform == "youtube", - Dashboard.platform_user_id == social_account.platform_user_id, - Dashboard.uploaded_at >= prev_start_dt, - Dashboard.uploaded_at <= prev_kpi_end_dt, - ) - ) - prev_period_video_count = prev_count_result.scalar() or 0 - logger.debug( - f"[3] 기간 내 업로드 영상 수 - current={period_video_count}, prev={prev_period_video_count}" - ) - - # 4. Redis 캐시 조회 - # platform_user_id 기준 캐시 키: 재연동해도 채널 ID는 불변 → 캐시 유지됨 - cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}" - cached_raw = await get_cache(cache_key) - - if cached_raw: - try: - payload = json.loads(cached_raw) - logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}") - response = DashboardResponse.model_validate(payload["response"]) - for metric in response.content_metrics: - if metric.id == "uploaded-videos": - metric.value = float(period_video_count) - video_trend = float(period_video_count - prev_period_video_count) - metric.trend = video_trend - metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-") - break - return response - except (json.JSONDecodeError, KeyError): - logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}") - - logger.debug("[4] 캐시 MISS - YouTube API 호출 필요") - - # 5. 최근 30개 업로드 영상 조회 (Analytics API 전달용) - # YouTube Analytics API 제약사항: - # - 영상 개수: 20~30개 권장 (최대 50개, 그 이상은 응답 지연 발생) - # - URL 길이: 2000자 제한 (video ID 11자 × 30개 = 330자로 안전) - result = await session.execute( - select( - Dashboard.platform_video_id, - Dashboard.title, - Dashboard.uploaded_at, - ) - .where( - Dashboard.user_uuid == current_user.user_uuid, - Dashboard.platform == "youtube", - Dashboard.platform_user_id == social_account.platform_user_id, - ) - .order_by(Dashboard.uploaded_at.desc()) - .limit(30) - ) - rows = result.all() - logger.debug(f"[5] 영상 조회 완료 - count={len(rows)}") - - # 6. video_ids + 메타데이터 조회용 dict 구성 - video_ids = [] - video_lookup: dict[str, tuple[str, datetime]] = {} # {video_id: (title, uploaded_at)} - - for row in rows: - platform_video_id, title, uploaded_at = row - video_ids.append(platform_video_id) - video_lookup[platform_video_id] = (title, uploaded_at) - - logger.debug( - f"[6] 영상 메타데이터 구성 완료 - count={len(video_ids)}, sample={video_ids[:3]}" - ) - - # 6.1 업로드 영상 없음 → YouTube API 호출 없이 빈 응답 반환 - if not video_ids: - logger.info( - f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - " - f"user_uuid={current_user.user_uuid}" - ) - return DashboardResponse( - content_metrics=[ - ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"), - ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"), - ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"), - ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"), - ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"), - ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"), - ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"), - ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"), - ], - monthly_data=[], - daily_data=[], - top_content=[], - audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]), - has_uploads=False, - ) - - # 7. 토큰 유효성 확인 및 자동 갱신 (만료 10분 전 갱신) - try: - access_token = await SocialAccountService().ensure_valid_token( - social_account, session - ) - except TokenExpiredError: - logger.warning( - f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}" - ) - raise YouTubeTokenExpiredError() - - logger.debug("[7] 토큰 유효성 확인 완료") - - # 8. YouTube Analytics API 호출 (7개 병렬) - youtube_service = YouTubeAnalyticsService() - raw_data = await youtube_service.fetch_all_metrics( - video_ids=video_ids, - start_date=start_date, - end_date=end_date, - kpi_end_date=kpi_end_date, - access_token=access_token, - mode=mode, - ) - - logger.debug("[8] YouTube Analytics API 호출 완료") - - # 9. TopContent 조립 (Analytics top_videos + DB lookup) - processor = DataProcessor() - top_content_rows = raw_data.get("top_videos", {}).get("rows", []) - top_content: list[TopContent] = [] - for row in top_content_rows[:4]: - if len(row) < 4: - continue - video_id, views, likes, comments = row[0], row[1], row[2], row[3] - meta = video_lookup.get(video_id) - if not meta: - continue - title, uploaded_at = meta - engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0 - top_content.append( - TopContent( - id=video_id, - title=title, - thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg", - platform="youtube", - views=int(views), - engagement=f"{engagement_rate:.1f}%", - date=uploaded_at.strftime("%Y.%m.%d"), - ) - ) - - logger.debug(f"[9] TopContent 조립 완료 - count={len(top_content)}") - - # 10. 데이터 가공 (period_video_count=0 — API 무관 DB 집계값, 캐시에 포함하지 않음) - dashboard_data = processor.process( - raw_data, top_content, 0, mode=mode, end_date=end_date - ) - - logger.debug("[10] 데이터 가공 완료") - - # 11. Redis 캐싱 (TTL: 12시간) - # YouTube Analytics는 하루 1회 갱신 (PT 자정, 한국 시간 오후 5~8시) - # 48시간 지연된 데이터이므로 12시간 캐싱으로 API 호출 최소화 - # period_video_count는 캐시에 포함하지 않음 (DB 직접 집계, API 미사용) - cache_payload = json.dumps( - {"response": json.loads(dashboard_data.model_dump_json())} - ) - cache_success = await set_cache( - cache_key, - cache_payload, - ttl=43200, # 12시간 - ) - - if cache_success: - logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}") - else: - logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}") - - # 12. 업로드 영상 수 및 trend 주입 (캐시 저장 후 — 항상 DB에서 직접 집계) - for metric in dashboard_data.content_metrics: - if metric.id == "uploaded-videos": - metric.value = float(period_video_count) - video_trend = float(period_video_count - prev_period_video_count) - metric.trend = video_trend - metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-") - break - - logger.info( - f"[DASHBOARD] 통계 조회 완료 - " - f"user_uuid={current_user.user_uuid}, " - f"mode={mode}, period={period_desc}, videos={len(video_ids)}" - ) - - return dashboard_data + service = DashboardService() + return await service.get_stats(mode, platform_user_id, current_user, session) @router.delete( @@ -483,7 +109,7 @@ async def get_dashboard_stats( `dashboard:{user_uuid}:{platform_user_id}:{mode}` (mode: day 또는 month) ## 파라미터 -- `user_uuid`: 특정 사용자 캐시만 삭제. 미입력 시 전체 삭제 +- `user_uuid`: 삭제할 사용자 UUID (필수) - `mode`: day / month / all (기본값: all) """, ) @@ -492,33 +118,16 @@ async def delete_dashboard_cache( default="all", description="삭제할 캐시 모드: day, month, all(기본값, 모두 삭제)", ), - user_uuid: str | None = Query( - default=None, - description="대상 사용자 UUID. 미입력 시 전체 사용자 캐시 삭제", + user_uuid: str = Query( + description="대상 사용자 UUID", ), ) -> CacheDeleteResponse: - """ - 대시보드 캐시 삭제 - - Args: - mode: 삭제할 캐시 모드 (day / month / all) - user_uuid: 대상 사용자 UUID (없으면 전체 삭제) - - Returns: - CacheDeleteResponse: 삭제된 캐시 키 개수 및 메시지 - """ - if user_uuid: - if mode == "all": - deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*") - message = f"전체 캐시 삭제 완료 ({deleted}개)" - else: - cache_key = f"dashboard:{user_uuid}:{mode}" - success = await delete_cache(cache_key) - deleted = 1 if success else 0 - message = f"{mode} 캐시 삭제 {'완료' if success else '실패 (키 없음)'}" + if mode == "all": + deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*") + message = f"전체 캐시 삭제 완료 ({deleted}개)" else: - deleted = await delete_cache_pattern("dashboard:*") - message = f"전체 사용자 캐시 삭제 완료 ({deleted}개)" + deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*:{mode}") + message = f"{mode} 캐시 삭제 완료 ({deleted}개)" logger.info( f"[CACHE DELETE] user_uuid={user_uuid or 'ALL'}, mode={mode}, deleted={deleted}" diff --git a/app/dashboard/exceptions.py b/app/dashboard/exceptions.py index 950f6f0..42d0959 100644 --- a/app/dashboard/exceptions.py +++ b/app/dashboard/exceptions.py @@ -113,7 +113,7 @@ class YouTubeAccountSelectionRequiredError(DashboardException): def __init__(self): super().__init__( - message="연결된 YouTube 계정이 여러 개입니다. social_account_id 파라미터로 사용할 계정을 선택해주세요.", + message="연결된 YouTube 계정이 여러 개입니다. platform_user_id 파라미터로 사용할 계정을 선택해주세요.", status_code=status.HTTP_400_BAD_REQUEST, code="YOUTUBE_ACCOUNT_SELECTION_REQUIRED", ) diff --git a/app/dashboard/schemas/dashboard_schema.py b/app/dashboard/schemas/dashboard_schema.py index cac1c7b..53fa600 100644 --- a/app/dashboard/schemas/dashboard_schema.py +++ b/app/dashboard/schemas/dashboard_schema.py @@ -197,35 +197,6 @@ class AudienceData(BaseModel): ) -# class PlatformMetric(BaseModel): -# """플랫폼별 메트릭 (미사용 — platform_data 기능 미구현)""" -# -# id: str -# label: str -# value: str -# unit: Optional[str] = None -# trend: float -# trend_direction: Literal["up", "down", "-"] = Field(alias="trendDirection") -# -# model_config = ConfigDict( -# alias_generator=to_camel, -# populate_by_name=True, -# ) -# -# -# class PlatformData(BaseModel): -# """플랫폼별 데이터 (미사용 — platform_data 기능 미구현)""" -# -# platform: Literal["youtube", "instagram"] -# display_name: str = Field(alias="displayName") -# metrics: list[PlatformMetric] -# -# model_config = ConfigDict( -# alias_generator=to_camel, -# populate_by_name=True, -# ) - - class DashboardResponse(BaseModel): """대시보드 전체 응답 @@ -255,7 +226,6 @@ class DashboardResponse(BaseModel): top_content: list[TopContent] = Field(alias="topContent") audience_data: AudienceData = Field(alias="audienceData") has_uploads: bool = Field(default=True, alias="hasUploads") - # platform_data: list[PlatformData] = Field(default=[], alias="platformData") # 미사용 model_config = ConfigDict( alias_generator=to_camel, diff --git a/app/dashboard/services/__init__.py b/app/dashboard/services/__init__.py index 6559906..1581259 100644 --- a/app/dashboard/services/__init__.py +++ b/app/dashboard/services/__init__.py @@ -4,10 +4,12 @@ Dashboard Services YouTube Analytics API 연동 및 데이터 가공 서비스를 제공합니다. """ +from app.dashboard.services.dashboard_service import DashboardService from app.dashboard.services.data_processor import DataProcessor from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService __all__ = [ + "DashboardService", "YouTubeAnalyticsService", "DataProcessor", ] diff --git a/app/dashboard/services/dashboard_service.py b/app/dashboard/services/dashboard_service.py new file mode 100644 index 0000000..64d37cd --- /dev/null +++ b/app/dashboard/services/dashboard_service.py @@ -0,0 +1,358 @@ +""" +Dashboard Service + +대시보드 비즈니스 로직을 담당합니다. +""" + +import json +import logging +from datetime import date, datetime, timedelta +from typing import Literal + +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.dashboard.exceptions import ( + YouTubeAccountNotConnectedError, + YouTubeAccountNotFoundError, + YouTubeAccountSelectionRequiredError, + YouTubeTokenExpiredError, +) +from app.dashboard.models import Dashboard +from app.dashboard.utils.redis_cache import get_cache, set_cache +from app.dashboard.schemas import ( + AudienceData, + ConnectedAccount, + ContentMetric, + DashboardResponse, + TopContent, +) +from app.dashboard.services.data_processor import DataProcessor +from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService +from app.social.exceptions import TokenExpiredError +from app.social.services import SocialAccountService +from app.user.models import SocialAccount, User + +logger = logging.getLogger(__name__) + + +class DashboardService: + async def get_connected_accounts( + self, + current_user: User, + session: AsyncSession, + ) -> list[ConnectedAccount]: + result = await session.execute( + select(SocialAccount).where( + SocialAccount.user_uuid == current_user.user_uuid, + SocialAccount.platform == "youtube", + SocialAccount.is_active == True, # noqa: E712 + ) + ) + accounts_raw = result.scalars().all() + + connected = [] + for acc in accounts_raw: + data = acc.platform_data if isinstance(acc.platform_data, dict) else {} + connected.append( + ConnectedAccount( + id=acc.id, + platform=acc.platform, + platform_username=acc.platform_username, + platform_user_id=acc.platform_user_id, + channel_title=data.get("channel_title"), + connected_at=acc.connected_at, + is_active=acc.is_active, + ) + ) + + logger.info( + f"[ACCOUNTS] YouTube 계정 목록 조회 - " + f"user_uuid={current_user.user_uuid}, count={len(connected)}" + ) + return connected + + def calculate_date_range( + self, mode: Literal["day", "month"] + ) -> tuple[date, date, date, date, date, str]: + """모드별 날짜 범위 계산. (start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc) 반환""" + today = date.today() + + if mode == "day": + end_dt = today - timedelta(days=2) + kpi_end_dt = end_dt + start_dt = end_dt - timedelta(days=29) + prev_start_dt = start_dt - timedelta(days=30) + prev_kpi_end_dt = kpi_end_dt - timedelta(days=30) + period_desc = "최근 30일" + else: + end_dt = today.replace(day=1) + kpi_end_dt = today - timedelta(days=2) + start_month = end_dt.month - 11 + if start_month <= 0: + start_month += 12 + start_year = end_dt.year - 1 + else: + start_year = end_dt.year + start_dt = date(start_year, start_month, 1) + prev_start_dt = start_dt.replace(year=start_dt.year - 1) + try: + prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1) + except ValueError: + prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28) + period_desc = "최근 12개월" + + return start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc + + async def resolve_social_account( + self, + current_user: User, + session: AsyncSession, + platform_user_id: str | None, + ) -> SocialAccount: + result = await session.execute( + select(SocialAccount).where( + SocialAccount.user_uuid == current_user.user_uuid, + SocialAccount.platform == "youtube", + SocialAccount.is_active == True, # noqa: E712 + ) + ) + social_accounts_raw = result.scalars().all() + + social_accounts = list(social_accounts_raw) + + if not social_accounts: + raise YouTubeAccountNotConnectedError() + + if platform_user_id is not None: + matched = [a for a in social_accounts if a.platform_user_id == platform_user_id] + if not matched: + raise YouTubeAccountNotFoundError() + return matched[0] + elif len(social_accounts) == 1: + return social_accounts[0] + else: + raise YouTubeAccountSelectionRequiredError() + + async def get_video_counts( + self, + current_user: User, + session: AsyncSession, + social_account: SocialAccount, + start_dt: date, + prev_start_dt: date, + prev_kpi_end_dt: date, + ) -> tuple[int, int]: + today = date.today() + count_result = await session.execute( + select(func.count()) + .select_from(Dashboard) + .where( + Dashboard.user_uuid == current_user.user_uuid, + Dashboard.platform == "youtube", + Dashboard.platform_user_id == social_account.platform_user_id, + Dashboard.uploaded_at >= start_dt, + Dashboard.uploaded_at < today + timedelta(days=1), + ) + ) + period_video_count = count_result.scalar() or 0 + + prev_count_result = await session.execute( + select(func.count()) + .select_from(Dashboard) + .where( + Dashboard.user_uuid == current_user.user_uuid, + Dashboard.platform == "youtube", + Dashboard.platform_user_id == social_account.platform_user_id, + Dashboard.uploaded_at >= prev_start_dt, + Dashboard.uploaded_at <= prev_kpi_end_dt, + ) + ) + prev_period_video_count = prev_count_result.scalar() or 0 + + return period_video_count, prev_period_video_count + + async def get_video_ids( + self, + current_user: User, + session: AsyncSession, + social_account: SocialAccount, + ) -> tuple[list[str], dict[str, tuple[str, datetime]]]: + result = await session.execute( + select( + Dashboard.platform_video_id, + Dashboard.title, + Dashboard.uploaded_at, + ) + .where( + Dashboard.user_uuid == current_user.user_uuid, + Dashboard.platform == "youtube", + Dashboard.platform_user_id == social_account.platform_user_id, + ) + .order_by(Dashboard.uploaded_at.desc()) + .limit(30) + ) + rows = result.all() + + video_ids = [] + video_lookup: dict[str, tuple[str, datetime]] = {} + for row in rows: + platform_video_id, title, uploaded_at = row + video_ids.append(platform_video_id) + video_lookup[platform_video_id] = (title, uploaded_at) + + return video_ids, video_lookup + + def build_empty_response(self) -> DashboardResponse: + return DashboardResponse( + content_metrics=[ + ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"), + ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"), + ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"), + ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"), + ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"), + ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"), + ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"), + ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"), + ], + monthly_data=[], + daily_data=[], + top_content=[], + audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]), + has_uploads=False, + ) + + def inject_video_count( + self, + response: DashboardResponse, + period_video_count: int, + prev_period_video_count: int, + ) -> None: + for metric in response.content_metrics: + if metric.id == "uploaded-videos": + metric.value = float(period_video_count) + video_trend = float(period_video_count - prev_period_video_count) + metric.trend = video_trend + metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-") + break + + async def get_stats( + self, + mode: Literal["day", "month"], + platform_user_id: str | None, + current_user: User, + session: AsyncSession, + ) -> DashboardResponse: + logger.info( + f"[DASHBOARD] 통계 조회 시작 - " + f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}" + ) + + # 1. 날짜 계산 + start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc = ( + self.calculate_date_range(mode) + ) + start_date = start_dt.strftime("%Y-%m-%d") + end_date = end_dt.strftime("%Y-%m-%d") + kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d") + logger.debug(f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}") + + # 2. YouTube 계정 확인 + social_account = await self.resolve_social_account(current_user, session, platform_user_id) + logger.debug(f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}") + + # 3. 영상 수 조회 + period_video_count, prev_period_video_count = await self.get_video_counts( + current_user, session, social_account, start_dt, prev_start_dt, prev_kpi_end_dt + ) + logger.debug(f"[3] 영상 수 - current={period_video_count}, prev={prev_period_video_count}") + + # 4. 캐시 조회 + cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}" + cached_raw = await get_cache(cache_key) + if cached_raw: + try: + payload = json.loads(cached_raw) + logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}") + response = DashboardResponse.model_validate(payload["response"]) + self.inject_video_count(response, period_video_count, prev_period_video_count) + return response + except (json.JSONDecodeError, KeyError): + logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}") + + logger.debug("[4] 캐시 MISS - YouTube API 호출 필요") + + # 5. 업로드 영상 조회 + video_ids, video_lookup = await self.get_video_ids(current_user, session, social_account) + logger.debug(f"[5] 영상 조회 완료 - count={len(video_ids)}") + + if not video_ids: + logger.info(f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - user_uuid={current_user.user_uuid}") + return self.build_empty_response() + + # 6. 토큰 유효성 확인 + try: + access_token = await SocialAccountService().ensure_valid_token(social_account, session) + except TokenExpiredError: + logger.warning(f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}") + raise YouTubeTokenExpiredError() + logger.debug("[6] 토큰 유효성 확인 완료") + + # 7. YouTube Analytics API 호출 + youtube_service = YouTubeAnalyticsService() + raw_data = await youtube_service.fetch_all_metrics( + video_ids=video_ids, + start_date=start_date, + end_date=end_date, + kpi_end_date=kpi_end_date, + access_token=access_token, + mode=mode, + ) + logger.debug("[7] YouTube Analytics API 호출 완료") + + # 8. TopContent 조립 + processor = DataProcessor() + top_content_rows = raw_data.get("top_videos", {}).get("rows", []) + top_content: list[TopContent] = [] + for row in top_content_rows[:4]: + if len(row) < 4: + continue + video_id, views, likes, comments = row[0], row[1], row[2], row[3] + meta = video_lookup.get(video_id) + if not meta: + continue + title, uploaded_at = meta + engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0 + top_content.append( + TopContent( + id=video_id, + title=title, + thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg", + platform="youtube", + views=int(views), + engagement=f"{engagement_rate:.1f}%", + date=uploaded_at.strftime("%Y.%m.%d"), + ) + ) + logger.debug(f"[8] TopContent 조립 완료 - count={len(top_content)}") + + # 9. 데이터 가공 + dashboard_data = processor.process(raw_data, top_content, 0, mode=mode, end_date=end_date) + logger.debug("[9] 데이터 가공 완료") + + # 10. 캐시 저장 + cache_payload = json.dumps({"response": dashboard_data.model_dump(mode="json")}) + cache_success = await set_cache(cache_key, cache_payload, ttl=43200) + if cache_success: + logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}") + else: + logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}") + + # 11. 업로드 영상 수 주입 + self.inject_video_count(dashboard_data, period_video_count, prev_period_video_count) + + logger.info( + f"[DASHBOARD] 통계 조회 완료 - " + f"user_uuid={current_user.user_uuid}, mode={mode}, period={period_desc}, videos={len(video_ids)}" + ) + return dashboard_data diff --git a/app/dashboard/redis_cache.py b/app/dashboard/utils/redis_cache.py similarity index 100% rename from app/dashboard/redis_cache.py rename to app/dashboard/utils/redis_cache.py diff --git a/app/utils/prompts/templates/marketing_prompt.txt b/app/utils/prompts/templates/marketing_prompt.txt index 3a97061..d4de2e0 100644 --- a/app/utils/prompts/templates/marketing_prompt.txt +++ b/app/utils/prompts/templates/marketing_prompt.txt @@ -26,7 +26,7 @@ Act as a Senior Brand Strategist and Marketing Data Analyst. Your goal is to ana ### 3. target_persona Generate a list of personas based on the following: -* **`persona`**: Provide a descriptive name and profile for the target group. +* **`persona`**: Provide a descriptive name and profile for the target group. Must be **20 characters or fewer**. * **`age`**: Set `min_age` and `max_age` (Integer 0-100) that accurately reflects the segment. * **`favor_target`**: List specific elements or vibes this persona prefers (e.g., "Minimalist interior", "Pet-friendly facilities"). * **`decision_trigger`**: Identify the specific "Hook" or facility that leads this persona to finalize a booking.