""" YouTube Analytics 데이터 가공 프로세서 YouTube Analytics API의 원본 데이터를 프론트엔드용 Pydantic 스키마로 변환합니다. """ from collections import defaultdict from datetime import datetime, timedelta from typing import Any, Literal from app.dashboard.schemas import ( AudienceData, ContentMetric, DailyData, DashboardResponse, MonthlyData, TopContent, ) from app.utils.logger import get_logger logger = get_logger("dashboard") _COUNTRY_CODE_MAP: dict[str, str] = { "KR": "대한민국", "US": "미국", "JP": "일본", "CN": "중국", "GB": "영국", "DE": "독일", "FR": "프랑스", "CA": "캐나다", "AU": "호주", "IN": "인도", "ID": "인도네시아", "TH": "태국", "VN": "베트남", "PH": "필리핀", "MY": "말레이시아", "SG": "싱가포르", "TW": "대만", "HK": "홍콩", "BR": "브라질", "MX": "멕시코", "NL": "네덜란드", "BE": "벨기에", "SE": "스웨덴", "NO": "노르웨이", "FI": "핀란드", "DK": "덴마크", "IE": "아일랜드", "PL": "폴란드", "CZ": "체코", "RO": "루마니아", "HU": "헝가리", "SK": "슬로바키아", "SI": "슬로베니아", "HR": "크로아티아", "GR": "그리스", "PT": "포르투갈", "ES": "스페인", "IT": "이탈리아", } class DataProcessor: """YouTube Analytics 데이터 가공 프로세서 YouTube Analytics API의 원본 JSON 데이터를 DashboardResponse 스키마로 변환합니다. 각 섹션별로 데이터 가공 로직을 분리하여 유지보수성을 향상시켰습니다. """ def process( self, raw_data: dict[str, Any], top_content: list[TopContent], period_video_count: int = 0, mode: Literal["day", "month"] = "month", end_date: str = "", ) -> DashboardResponse: """YouTube Analytics API 원본 데이터를 DashboardResponse로 변환 Args: raw_data: YouTube Analytics API 응답 데이터 (mode에 따라 키 구성 다름) 공통: - kpi: KPI 메트릭 (조회수, 좋아요, 댓글, 시청시간 등) - top_videos: 인기 영상 데이터 - demographics: 연령/성별 데이터 - region: 지역별 데이터 mode="month" 추가: - trend_recent: 최근 12개월 월별 조회수 - trend_previous: 이전 12개월 월별 조회수 mode="day" 추가: - trend_recent: 최근 30일 일별 조회수 - trend_previous: 이전 30일 일별 조회수 top_content: TopContent 리스트 (라우터에서 Analytics + DB lookup으로 생성) period_video_count: 조회 기간 내 업로드된 영상 수 (DB에서 집계) mode: 조회 모드 ("month" | "day") Returns: DashboardResponse: 프론트엔드용 대시보드 응답 스키마 - mode="month": monthly_data 채움, daily_data=[] - mode="day": daily_data 채움, monthly_data=[] Example: >>> processor = DataProcessor() >>> response = processor.process( ... raw_data={ ... "kpi": {...}, ... "monthly_recent": {...}, ... "monthly_previous": {...}, ... "top_videos": {...}, ... "demographics": {...}, ... "region": {...}, ... }, ... top_content=[TopContent(...)], ... mode="month", ... ) """ logger.debug( f"[DataProcessor.process] START - " f"top_content_count={len(top_content)}" ) # 각 섹션별 데이터 가공 (안전한 딕셔너리 접근) content_metrics = self._build_content_metrics( raw_data.get("kpi", {}), raw_data.get("kpi_previous", {}), period_video_count, ) if mode == "month": monthly_data = self._merge_monthly_data( raw_data.get("trend_recent", {}), raw_data.get("trend_previous", {}), end_date=end_date, ) daily_data: list[DailyData] = [] else: # mode == "day" daily_data = self._build_daily_data( raw_data.get("trend_recent", {}), raw_data.get("trend_previous", {}), end_date=end_date, ) monthly_data = [] audience_data = self._build_audience_data( raw_data.get("demographics") or {}, raw_data.get("region") or {}, ) logger.debug( f"[DataProcessor.process] SUCCESS - " f"mode={mode}, metrics={len(content_metrics)}, " f"top_content={len(top_content)}" ) return DashboardResponse( content_metrics=content_metrics, monthly_data=monthly_data, daily_data=daily_data, top_content=top_content, audience_data=audience_data, ) def _build_content_metrics( self, kpi_data: dict[str, Any], kpi_previous_data: dict[str, Any], period_video_count: int = 0, ) -> list[ContentMetric]: """KPI 데이터를 ContentMetric 리스트로 변환 Args: kpi_data: 최근 기간 KPI 응답 rows[0] = [views, likes, comments, shares, estimatedMinutesWatched, averageViewDuration, subscribersGained] kpi_previous_data: 이전 기간 KPI 응답 (증감률 계산용) period_video_count: 조회 기간 내 업로드된 영상 수 (DB에서 집계) Returns: list[ContentMetric]: KPI 지표 카드 리스트 (8개) 순서: 조회수, 시청시간, 평균 시청시간, 신규 구독자, 좋아요, 댓글, 공유, 업로드 영상 """ logger.info( f"[DataProcessor._build_content_metrics] START - " f"kpi_keys={list(kpi_data.keys())}" ) rows = kpi_data.get("rows", []) if not rows or not rows[0]: logger.warning( f"[DataProcessor._build_content_metrics] NO_DATA - " f"rows={rows}" ) return [] row = rows[0] prev_rows = kpi_previous_data.get("rows", []) prev_row = prev_rows[0] if prev_rows else [] def _get(r: list, i: int, default: float = 0.0) -> float: return r[i] if len(r) > i else default def _trend(recent: float, previous: float) -> tuple[float, str]: pct = recent - previous if pct > 0: direction = "up" elif pct < 0: direction = "down" else: direction = "-" return pct, direction # 최근 기간 views = _get(row, 0) likes = _get(row, 1) comments = _get(row, 2) shares = _get(row, 3) estimated_minutes_watched = _get(row, 4) average_view_duration = _get(row, 5) subscribers_gained = _get(row, 6) # 이전 기간 prev_views = _get(prev_row, 0) prev_likes = _get(prev_row, 1) prev_comments = _get(prev_row, 2) prev_shares = _get(prev_row, 3) prev_minutes_watched = _get(prev_row, 4) prev_avg_duration = _get(prev_row, 5) prev_subscribers = _get(prev_row, 6) views_trend, views_dir = _trend(views, prev_views) watch_trend, watch_dir = _trend(estimated_minutes_watched, prev_minutes_watched) duration_trend, duration_dir = _trend(average_view_duration, prev_avg_duration) subs_trend, subs_dir = _trend(subscribers_gained, prev_subscribers) likes_trend, likes_dir = _trend(likes, prev_likes) comments_trend, comments_dir = _trend(comments, prev_comments) shares_trend, shares_dir = _trend(shares, prev_shares) logger.info( f"[DataProcessor._build_content_metrics] SUCCESS - " f"views={views}({views_trend:+.1f}), " f"watch_time={estimated_minutes_watched}min({watch_trend:+.1f}), " f"subscribers={subscribers_gained}({subs_trend:+.1f})" ) return [ ContentMetric( id="total-views", label="조회수", value=float(views), unit="count", trend=round(float(views_trend), 1), trend_direction=views_dir, ), ContentMetric( id="total-watch-time", label="시청시간", value=round(estimated_minutes_watched / 60, 1), unit="hours", trend=round(watch_trend / 60, 1), trend_direction=watch_dir, ), ContentMetric( id="avg-view-duration", label="평균 시청시간", value=round(average_view_duration / 60, 1), unit="minutes", trend=round(duration_trend / 60, 1), trend_direction=duration_dir, ), ContentMetric( id="new-subscribers", label="신규 구독자", value=float(subscribers_gained), unit="count", trend=subs_trend, trend_direction=subs_dir, ), ContentMetric( id="likes", label="좋아요", value=float(likes), unit="count", trend=likes_trend, trend_direction=likes_dir, ), ContentMetric( id="comments", label="댓글", value=float(comments), unit="count", trend=comments_trend, trend_direction=comments_dir, ), ContentMetric( id="shares", label="공유", value=float(shares), unit="count", trend=shares_trend, trend_direction=shares_dir, ), ContentMetric( id="uploaded-videos", label="업로드 영상", value=float(period_video_count), unit="count", trend=0.0, trend_direction="-", ), ] def _merge_monthly_data( self, data_recent: dict[str, Any], data_previous: dict[str, Any], end_date: str = "", ) -> list[MonthlyData]: """최근 12개월과 이전 12개월의 월별 데이터를 병합 end_date 기준 12개월을 명시 생성하여 API가 반환하지 않은 월(당월 등)도 0으로 포함합니다. Args: data_recent: 최근 12개월 월별 조회수 데이터 rows = [["2026-01", 150000], ["2026-02", 180000], ...] data_previous: 이전 12개월 월별 조회수 데이터 rows = [["2025-01", 120000], ["2025-02", 140000], ...] end_date: 기준 종료일 (YYYY-MM-DD). 미전달 시 오늘 사용 Returns: list[MonthlyData]: 월별 비교 데이터 (12개, API 미반환 월은 0) """ logger.debug("[DataProcessor._merge_monthly_data] START") rows_recent = data_recent.get("rows", []) rows_previous = data_previous.get("rows", []) map_recent = {row[0]: row[1] for row in rows_recent if len(row) >= 2} map_previous = {row[0]: row[1] for row in rows_previous if len(row) >= 2} # end_date 기준 12개월 명시 생성 (API 미반환 당월도 0으로 포함) if end_date: end_dt = datetime.strptime(end_date, "%Y-%m-%d") else: end_dt = datetime.today() result = [] for i in range(11, -1, -1): m = end_dt.month - i y = end_dt.year if m <= 0: m += 12 y -= 1 month_key = f"{y}-{m:02d}" result.append( MonthlyData( month=f"{m}월", this_year=map_recent.get(month_key, 0), last_year=map_previous.get(f"{y - 1}-{m:02d}", 0), ) ) logger.debug( f"[DataProcessor._merge_monthly_data] SUCCESS - count={len(result)}" ) return result def _build_daily_data( self, data_recent: dict[str, Any], data_previous: dict[str, Any], end_date: str = "", num_days: int = 30, ) -> list[DailyData]: """최근 30일과 이전 30일의 일별 데이터를 병합 end_date 기준 num_days개 날짜를 직접 생성하여 YouTube API 응답에 해당 날짜 row가 없어도 0으로 채웁니다 (X축 누락 방지). Args: data_recent: 최근 30일 일별 조회수 데이터 rows = [["2026-01-20", 5000], ["2026-01-21", 6200], ...] data_previous: 이전 30일 일별 조회수 데이터 rows = [["2025-12-21", 4500], ["2025-12-22", 5100], ...] end_date: 최근 기간의 마지막 날 (YYYY-MM-DD). 미전달 시 rows 마지막 날 사용 num_days: 표시할 일수 (기본 30) Returns: list[DailyData]: 일별 비교 데이터 (num_days개, 데이터 없는 날은 0) """ logger.debug("[DataProcessor._build_daily_data] START") rows_recent = data_recent.get("rows", []) rows_previous = data_previous.get("rows", []) # 날짜 → 조회수 맵 map_recent = {row[0]: row[1] for row in rows_recent if len(row) >= 2} map_previous = {row[0]: row[1] for row in rows_previous if len(row) >= 2} # end_date 결정: 전달된 값 우선, 없으면 rows 마지막 날짜 사용 if end_date: end_dt = datetime.strptime(end_date, "%Y-%m-%d").date() elif rows_recent: end_dt = datetime.strptime(rows_recent[-1][0], "%Y-%m-%d").date() else: logger.warning( "[DataProcessor._build_daily_data] NO_DATA - rows_recent 비어있음" ) return [] start_dt = end_dt - timedelta(days=num_days - 1) # 날짜 범위를 직접 생성하여 누락된 날짜도 0으로 채움 result = [] current = start_dt while current <= end_dt: date_str = current.strftime("%Y-%m-%d") date_label = f"{current.month}/{current.day}" this_views = map_recent.get(date_str, 0) # 이전 기간: 동일 인덱스 날짜 (current - 30일) prev_date_str = (current - timedelta(days=num_days)).strftime("%Y-%m-%d") last_views = map_previous.get(prev_date_str, 0) result.append( DailyData( date=date_label, this_period=int(this_views), last_period=int(last_views), ) ) current += timedelta(days=1) logger.debug(f"[DataProcessor._build_daily_data] SUCCESS - count={len(result)}") return result def _build_audience_data( self, demographics_data: dict[str, Any], geography_data: dict[str, Any], ) -> AudienceData: """시청자 분석 데이터 생성 연령대별, 성별, 지역별 시청자 분포를 분석합니다. Args: demographics_data: 연령/성별 API 응답 rows = [["age18-24", "male", 45000], ["age18-24", "female", 55000], ...] geography_data: 지역별 API 응답 rows = [["KR", 1000000], ["US", 500000], ...] Returns: AudienceData: 시청자 분석 데이터 - age_groups: 연령대별 비율 - gender: 성별 조회수 - top_regions: 상위 지역 (5개) """ logger.debug("[DataProcessor._build_audience_data] START") # === 연령/성별 데이터 처리 === demo_rows = demographics_data.get("rows", []) age_map: dict[str, float] = {} gender_map_f: dict[str, float] = {"male": 0.0, "female": 0.0} for row in demo_rows: if len(row) < 3: continue age_group = row[0] # "age18-24" gender = row[1] # "male" or "female" viewer_pct = row[2] # viewerPercentage (이미 % 값, 예: 45.5) # 연령대별 집계: 남녀 비율 합산 (age18-24 → 18-24) age_label = age_group.replace("age", "") age_map[age_label] = age_map.get(age_label, 0.0) + viewer_pct # 성별 집계 if gender in gender_map_f: gender_map_f[gender] += viewer_pct # 연령대 5개로 통합: 13-17+18-24 → 13-24, 55-64+65- → 55+ merged_age: dict[str, float] = { "13-24": age_map.get("13-17", 0.0) + age_map.get("18-24", 0.0), "25-34": age_map.get("25-34", 0.0), "35-44": age_map.get("35-44", 0.0), "45-54": age_map.get("45-54", 0.0), "55+": age_map.get("55-64", 0.0) + age_map.get("65-", 0.0), } age_groups = [ {"label": age, "percentage": int(round(pct))} for age, pct in merged_age.items() ] gender_map = {k: int(round(v)) for k, v in gender_map_f.items()} # === 지역 데이터 처리 === geo_rows = geography_data.get("rows", []) total_geo_views = sum(row[1] for row in geo_rows if len(row) >= 2) merged_geo: defaultdict[str, int] = defaultdict(int) for row in geo_rows: if len(row) >= 2: merged_geo[self._translate_country_code(row[0])] += row[1] top_regions = [ { "region": region, "percentage": int((views / total_geo_views * 100) if total_geo_views > 0 else 0), } for region, views in sorted(merged_geo.items(), key=lambda x: x[1], reverse=True)[:5] ] logger.debug( f"[DataProcessor._build_audience_data] SUCCESS - " f"age_groups={len(age_groups)}, regions={len(top_regions)}" ) return AudienceData( age_groups=age_groups, gender=gender_map, top_regions=top_regions, ) @staticmethod def _translate_country_code(code: str) -> str: """국가 코드를 한국어로 변환 ISO 3166-1 alpha-2 국가 코드를 한국어 국가명으로 변환합니다. Args: code: ISO 3166-1 alpha-2 국가 코드 (예: "KR", "US") Returns: str: 한국어 국가명 (매핑되지 않은 경우 원본 코드 반환) Example: >>> _translate_country_code("KR") "대한민국" >>> _translate_country_code("US") "미국" """ return _COUNTRY_CODE_MAP.get(code, "기타")