""" YouTube Analytics API 서비스 YouTube Analytics API v2를 호출하여 채널 및 영상 통계를 조회합니다. """ import asyncio from datetime import datetime, timedelta from typing import Any, Literal import httpx from app.dashboard.exceptions import ( YouTubeAPIError, YouTubeAuthError, YouTubeQuotaExceededError, ) from app.utils.logger import get_logger logger = get_logger("dashboard") class YouTubeAnalyticsService: """YouTube Analytics API 호출 서비스 YouTube Analytics API v2를 사용하여 채널 통계, 영상 성과, 시청자 분석 데이터를 조회합니다. API 문서: https://developers.google.com/youtube/analytics/reference """ BASE_URL = "https://youtubeanalytics.googleapis.com/v2/reports" async def fetch_all_metrics( self, video_ids: list[str], start_date: str, end_date: str, access_token: str, mode: Literal["day", "month"] = "month", kpi_end_date: str = "", ) -> dict[str, Any]: """YouTube Analytics API 호출을 병렬로 실행 Args: video_ids: YouTube 영상 ID 리스트 (최대 30개) start_date: 조회 시작일 (YYYY-MM-DD) end_date: 조회 종료일 (YYYY-MM-DD) access_token: YouTube OAuth 2.0 액세스 토큰 Returns: dict[str, Any]: API 응답 데이터 - kpi: KPI 메트릭 (조회수, 좋아요, 댓글 등) - monthly_recent: 최근 12개월 월별 조회수 - monthly_previous: 이전 12개월 월별 조회수 - top_videos: 인기 영상 TOP 4 - demographics: 연령/성별 분포 - region: 지역별 조회수 Raises: YouTubeAPIError: API 호출 실패 YouTubeQuotaExceededError: 할당량 초과 YouTubeAuthError: 인증 실패 Example: >>> service = YouTubeAnalyticsService() >>> data = await service.fetch_all_metrics( ... video_ids=["dQw4w9WgXcQ", "jNQXAC9IVRw"], ... start_date="2026-01-01", ... end_date="2026-12-31", ... access_token="ya29.a0..." ... ) """ logger.debug( f"[1/6] YouTube Analytics API 병렬 호출 시작 - " f"video_count={len(video_ids)}, period={start_date}~{end_date}, mode={mode}" ) end_dt = datetime.strptime(end_date, "%Y-%m-%d") # kpi_end_date: KPI/top_videos/demographics/region 호출에 사용 # month 모드에서는 현재 월 전체 데이터를 포함하기 위해 end_date(YYYY-MM-01)보다 늦은 날짜 사용 # day 모드 또는 미전달 시 end_date와 동일 _kpi_end = kpi_end_date if kpi_end_date else end_date if mode == "month": # 월별 차트: 라우터에서 이미 YYYY-MM-01 형식으로 계산된 날짜 그대로 사용 # recent: start_date ~ end_date (ex. 2025-03-01 ~ 2026-02-01) # previous: 1년 전 동일 기간 (ex. 2024-03-01 ~ 2025-02-01) recent_start = start_date recent_end = end_date previous_start = f"{int(start_date[:4]) - 1}{start_date[4:]}" previous_end = f"{int(end_date[:4]) - 1}{end_date[4:]}" # KPI 이전 기간: _kpi_end 기준 1년 전 (ex. 2026-02-22 → 2025-02-22) previous_kpi_end = f"{int(_kpi_end[:4]) - 1}{_kpi_end[4:]}" logger.debug( f"[월별 데이터] 최근 12개월: {recent_start}~{recent_end}, " f"이전 12개월: {previous_start}~{previous_end}, " f"KPI 조회 종료일: {_kpi_end}" ) else: # 일별 차트: end_date 기준 최근 30일 / 이전 30일 day_recent_end = end_date day_recent_start = (end_dt - timedelta(days=29)).strftime("%Y-%m-%d") day_previous_end = (end_dt - timedelta(days=30)).strftime("%Y-%m-%d") day_previous_start = (end_dt - timedelta(days=59)).strftime("%Y-%m-%d") logger.debug( f"[일별 데이터] 최근 30일: {day_recent_start}~{day_recent_end}, " f"이전 30일: {day_previous_start}~{day_previous_end}" ) # 7개 API 호출 태스크 생성 (mode별 선택적) # [0] KPI(최근), [1] KPI(이전), [2] 추이(최근), [3] 추이(이전), [4] 인기영상, [5] 인구통계, [6] 지역 # mode=month: [2][3] = 월별 데이터 (YYYY-MM-01 형식 필요) # mode=day: [2][3] = 일별 데이터 if mode == "month": tasks = [ self._fetch_kpi(video_ids, start_date, _kpi_end, access_token), self._fetch_kpi(video_ids, previous_start, previous_kpi_end, access_token), self._fetch_monthly_data(video_ids, recent_start, recent_end, access_token), self._fetch_monthly_data(video_ids, previous_start, previous_end, access_token), self._fetch_top_videos(video_ids, start_date, _kpi_end, access_token), self._fetch_demographics(start_date, _kpi_end, access_token), self._fetch_region(video_ids, start_date, _kpi_end, access_token), ] else: # mode == "day" tasks = [ self._fetch_kpi(video_ids, start_date, end_date, access_token), self._fetch_kpi(video_ids, day_previous_start, day_previous_end, access_token), self._fetch_daily_data(video_ids, day_recent_start, day_recent_end, access_token), self._fetch_daily_data(video_ids, day_previous_start, day_previous_end, access_token), self._fetch_top_videos(video_ids, start_date, end_date, access_token), self._fetch_demographics(start_date, end_date, access_token), self._fetch_region(video_ids, start_date, end_date, access_token), ] # 병렬 실행 results = await asyncio.gather(*tasks, return_exceptions=True) # 에러 체크 (YouTubeAuthError, YouTubeQuotaExceededError는 원형 그대로 전파) for i, result in enumerate(results): if isinstance(result, Exception): logger.error( f"[YouTubeAnalyticsService] API 호출 {i+1}/7 실패: {result.__class__.__name__}" ) if isinstance(result, (YouTubeAuthError, YouTubeQuotaExceededError)): raise result raise YouTubeAPIError(f"데이터 조회 실패: {result.__class__.__name__}") logger.debug( f"[7/7] YouTube Analytics API 병렬 호출 완료 - mode={mode}, 성공률 100%" ) # 각 API 호출 결과 디버그 로깅 labels = [ "kpi", "kpi_previous", "trend_recent", "trend_previous", "top_videos", "demographics", "region", ] for label, result in zip(labels, results): rows = result.get("rows") if isinstance(result, dict) else None row_count = len(rows) if rows else 0 preview = rows[:2] if rows else [] logger.debug( f"[fetch_all_metrics] {label}: row_count={row_count}, preview={preview}" ) return { "kpi": results[0], "kpi_previous": results[1], "trend_recent": results[2], "trend_previous": results[3], "top_videos": results[4], "demographics": results[5], "region": results[6], } async def _fetch_kpi( self, video_ids: list[str], start_date: str, end_date: str, access_token: str, ) -> dict[str, Any]: """전체 KPI 메트릭 조회 (contentMetrics용) 조회수, 좋아요, 댓글, 공유, 시청 시간, 구독자 증감 등 핵심 성과 지표를 조회합니다. Args: video_ids: YouTube 영상 ID 리스트 start_date: 조회 시작일 (YYYY-MM-DD) end_date: 조회 종료일 (YYYY-MM-DD) access_token: OAuth 2.0 액세스 토큰 Returns: dict[str, Any]: YouTube Analytics API 응답 rows[0] = [views, likes, comments, shares, estimatedMinutesWatched, averageViewDuration, subscribersGained] Note: annotationClickThroughRate는 2019년 annotations 기능 제거로 deprecated. """ logger.debug( f"[YouTubeAnalyticsService._fetch_kpi] START - video_count={len(video_ids)}" ) params = { "ids": "channel==MINE", "startDate": start_date, "endDate": end_date, "metrics": "views,likes,comments,shares,estimatedMinutesWatched,averageViewDuration,subscribersGained", # "filters": f"video=={','.join(video_ids)}", } result = await self._call_api(params, access_token) logger.debug("[YouTubeAnalyticsService._fetch_kpi] SUCCESS") return result async def _fetch_monthly_data( self, video_ids: list[str], start_date: str, end_date: str, access_token: str, ) -> dict[str, Any]: """월별 조회수 데이터 조회 지정된 기간의 월별 조회수를 조회합니다. 최근 12개월과 이전 12개월을 각각 조회하여 비교합니다. Args: video_ids: YouTube 영상 ID 리스트 start_date: 조회 시작일 (YYYY-MM-DD) end_date: 조회 종료일 (YYYY-MM-DD) access_token: OAuth 2.0 액세스 토큰 Returns: dict[str, Any]: YouTube Analytics API 응답 rows = [["2026-01", 150000], ["2026-02", 180000], ...] """ logger.debug( f"[YouTubeAnalyticsService._fetch_monthly_data] START - " f"period={start_date}~{end_date}" ) params = { "ids": "channel==MINE", "startDate": start_date, "endDate": end_date, "dimensions": "month", "metrics": "views", # "filters": f"video=={','.join(video_ids)}", "sort": "month", } result = await self._call_api(params, access_token) logger.debug( f"[YouTubeAnalyticsService._fetch_monthly_data] SUCCESS - " f"period={start_date}~{end_date}" ) return result async def _fetch_daily_data( self, video_ids: list[str], start_date: str, end_date: str, access_token: str, ) -> dict[str, Any]: """일별 조회수 데이터 조회 지정된 기간의 일별 조회수를 조회합니다. 최근 30일과 이전 30일을 각각 조회하여 비교합니다. Args: video_ids: YouTube 영상 ID 리스트 start_date: 조회 시작일 (YYYY-MM-DD) end_date: 조회 종료일 (YYYY-MM-DD) access_token: OAuth 2.0 액세스 토큰 Returns: dict[str, Any]: YouTube Analytics API 응답 rows = [["2026-01-18", 5000], ["2026-01-19", 6200], ...] """ logger.debug( f"[YouTubeAnalyticsService._fetch_daily_data] START - " f"period={start_date}~{end_date}" ) params = { "ids": "channel==MINE", "startDate": start_date, "endDate": end_date, "dimensions": "day", "metrics": "views", # "filters": f"video=={','.join(video_ids)}", "sort": "day", } result = await self._call_api(params, access_token) logger.debug( f"[YouTubeAnalyticsService._fetch_daily_data] SUCCESS - " f"period={start_date}~{end_date}" ) return result async def _fetch_top_videos( self, video_ids: list[str], start_date: str, end_date: str, access_token: str, ) -> dict[str, Any]: """영상별 조회수 조회 (topContent용) 조회수 기준 상위 4개 영상의 성과 데이터를 조회합니다. Args: video_ids: YouTube 영상 ID 리스트 start_date: 조회 시작일 (YYYY-MM-DD) end_date: 조회 종료일 (YYYY-MM-DD) access_token: OAuth 2.0 액세스 토큰 Returns: dict[str, Any]: YouTube Analytics API 응답 rows = [["video_id", views, likes, comments], ...] 조회수 내림차순으로 정렬된 상위 4개 영상 """ logger.debug("[YouTubeAnalyticsService._fetch_top_videos] START") params = { "ids": "channel==MINE", "startDate": start_date, "endDate": end_date, "dimensions": "video", "metrics": "views,likes,comments", # "filters": f"video=={','.join(video_ids)}", "sort": "-views", "maxResults": "4", } result = await self._call_api(params, access_token) logger.debug("[YouTubeAnalyticsService._fetch_top_videos] SUCCESS") return result async def _fetch_demographics( self, start_date: str, end_date: str, access_token: str, ) -> dict[str, Any]: """연령/성별 분포 조회 (채널 전체 기준) 시청자의 연령대별, 성별 시청 비율을 조회합니다. Note: YouTube Analytics API 제약: ageGroup/gender 차원은 video 필터와 혼용 불가. 채널 전체 시청자 기준 데이터를 반환합니다. Args: start_date: 조회 시작일 (YYYY-MM-DD) end_date: 조회 종료일 (YYYY-MM-DD) access_token: OAuth 2.0 액세스 토큰 Returns: dict[str, Any]: YouTube Analytics API 응답 rows = [["age18-24", "female", 45.5], ["age18-24", "male", 32.1], ...] """ logger.debug("[YouTubeAnalyticsService._fetch_demographics] START") # Demographics 보고서는 video 필터 미지원 → 채널 전체 기준 데이터 # 지원 filters: country, province, continent, subContinent, liveOrOnDemand, subscribedStatus params = { "ids": "channel==MINE", "startDate": start_date, "endDate": end_date, "dimensions": "ageGroup,gender", "metrics": "viewerPercentage", } result = await self._call_api(params, access_token) logger.debug("[YouTubeAnalyticsService._fetch_demographics] SUCCESS") return result async def _fetch_region( self, video_ids: list[str], start_date: str, end_date: str, access_token: str, ) -> dict[str, Any]: """지역별 조회수 조회 지역별 조회수 분포를 조회합니다 (상위 5개). Args: video_ids: YouTube 영상 ID 리스트 start_date: 조회 시작일 (YYYY-MM-DD) end_date: 조회 종료일 (YYYY-MM-DD) access_token: OAuth 2.0 액세스 토큰 Returns: dict[str, Any]: YouTube Analytics API 응답 rows = [["KR", 1000000], ["US", 500000], ...] 조회수 내림차순으로 정렬된 상위 5개 국가 """ logger.debug("[YouTubeAnalyticsService._fetch_country] START") params = { "ids": "channel==MINE", "startDate": start_date, "endDate": end_date, "dimensions": "country", "metrics": "views", # "filters": f"video=={','.join(video_ids)}", "sort": "-views", "maxResults": "5", } result = await self._call_api(params, access_token) logger.debug("[YouTubeAnalyticsService._fetch_region] SUCCESS") return result async def _call_api( self, params: dict[str, str], access_token: str, ) -> dict[str, Any]: """YouTube Analytics API 호출 공통 로직 모든 API 호출에 공통적으로 사용되는 HTTP 요청 로직입니다. 인증 헤더 추가, 에러 처리, 응답 파싱을 담당합니다. Args: params: API 요청 파라미터 (dimensions, metrics, filters 등) access_token: OAuth 2.0 액세스 토큰 Returns: dict[str, Any]: YouTube Analytics API JSON 응답 Raises: YouTubeQuotaExceededError: 할당량 초과 (429) YouTubeAuthError: 인증 실패 (401, 403) YouTubeAPIError: 기타 API 오류 Note: - 타임아웃: 30초 - 할당량 초과 시 자동으로 YouTubeQuotaExceededError 발생 - 인증 실패 시 자동으로 YouTubeAuthError 발생 """ headers = {"Authorization": f"Bearer {access_token}"} try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( self.BASE_URL, params=params, headers=headers, ) # 할당량 초과 체크 if response.status_code == 429: logger.warning("[YouTubeAnalyticsService._call_api] QUOTA_EXCEEDED") raise YouTubeQuotaExceededError() # 인증 실패 체크 if response.status_code in (401, 403): logger.warning( f"[YouTubeAnalyticsService._call_api] AUTH_FAILED - status={response.status_code}" ) raise YouTubeAuthError(f"YouTube 인증 실패: {response.status_code}") # HTTP 에러 체크 response.raise_for_status() return response.json() except (YouTubeAuthError, YouTubeQuotaExceededError): raise # 이미 처리된 예외는 그대로 전파 except httpx.HTTPStatusError as e: logger.error( f"[YouTubeAnalyticsService._call_api] HTTP_ERROR - " f"status={e.response.status_code}, body={e.response.text[:500]}" ) raise YouTubeAPIError(f"HTTP {e.response.status_code}") except httpx.RequestError as e: logger.error(f"[YouTubeAnalyticsService._call_api] REQUEST_ERROR - {e}") raise YouTubeAPIError(f"네트워크 오류: {e}") except Exception as e: logger.error(f"[YouTubeAnalyticsService._call_api] UNEXPECTED_ERROR - {e}") raise YouTubeAPIError(f"알 수 없는 오류: {e}")