""" YouTube Analytics 데이터 가공 프로세서 YouTube Analytics API의 원본 데이터를 프론트엔드용 Pydantic 스키마로 변환합니다. """ from datetime import datetime, timedelta from typing import Any, Literal from app.dashboard.schemas import ( AudienceData, ContentMetric, DailyData, DashboardResponse, MonthlyData, TopContent, ) from app.utils.logger import get_logger logger = get_logger("dashboard") class DataProcessor: """YouTube Analytics 데이터 가공 프로세서 YouTube Analytics API의 원본 JSON 데이터를 DashboardResponse 스키마로 변환합니다. 각 섹션별로 데이터 가공 로직을 분리하여 유지보수성을 향상시켰습니다. """ def process( self, raw_data: dict[str, Any], top_content: list[TopContent], period_video_count: int = 0, mode: Literal["day", "month"] = "month", end_date: str = "", ) -> DashboardResponse: """YouTube Analytics API 원본 데이터를 DashboardResponse로 변환 Args: raw_data: YouTube Analytics API 응답 데이터 (mode에 따라 키 구성 다름) 공통: - kpi: KPI 메트릭 (조회수, 좋아요, 댓글, 시청시간 등) - top_videos: 인기 영상 데이터 - demographics: 연령/성별 데이터 - region: 지역별 데이터 mode="month" 추가: - trend_recent: 최근 12개월 월별 조회수 - trend_previous: 이전 12개월 월별 조회수 mode="day" 추가: - trend_recent: 최근 30일 일별 조회수 - trend_previous: 이전 30일 일별 조회수 top_content: TopContent 리스트 (라우터에서 Analytics + DB lookup으로 생성) period_video_count: 조회 기간 내 업로드된 영상 수 (DB에서 집계) mode: 조회 모드 ("month" | "day") Returns: DashboardResponse: 프론트엔드용 대시보드 응답 스키마 - mode="month": monthly_data 채움, daily_data=[] - mode="day": daily_data 채움, monthly_data=[] Example: >>> processor = DataProcessor() >>> response = processor.process( ... raw_data={ ... "kpi": {...}, ... "monthly_recent": {...}, ... "monthly_previous": {...}, ... "top_videos": {...}, ... "demographics": {...}, ... "region": {...}, ... }, ... top_content=[TopContent(...)], ... mode="month", ... ) """ logger.debug( f"[DataProcessor.process] START - " f"top_content_count={len(top_content)}" ) # 각 섹션별 데이터 가공 (안전한 딕셔너리 접근) content_metrics = self._build_content_metrics( raw_data.get("kpi", {}), raw_data.get("kpi_previous", {}), period_video_count, ) if mode == "month": monthly_data = self._merge_monthly_data( raw_data.get("trend_recent", {}), raw_data.get("trend_previous", {}), ) daily_data: list[DailyData] = [] else: # mode == "day" daily_data = self._build_daily_data( raw_data.get("trend_recent", {}), raw_data.get("trend_previous", {}), end_date=end_date, ) monthly_data = [] audience_data = self._build_audience_data( raw_data.get("demographics", {}), raw_data.get("region", {}), ) logger.debug( f"[DataProcessor.process] SUCCESS - " f"mode={mode}, metrics={len(content_metrics)}, " f"top_content={len(top_content)}" ) return DashboardResponse( content_metrics=content_metrics, monthly_data=monthly_data, daily_data=daily_data, top_content=top_content, audience_data=audience_data, ) def _build_content_metrics( self, kpi_data: dict[str, Any], kpi_previous_data: dict[str, Any], period_video_count: int = 0, ) -> list[ContentMetric]: """KPI 데이터를 ContentMetric 리스트로 변환 Args: kpi_data: 최근 기간 KPI 응답 rows[0] = [views, likes, comments, shares, estimatedMinutesWatched, averageViewDuration, subscribersGained] kpi_previous_data: 이전 기간 KPI 응답 (증감률 계산용) period_video_count: 조회 기간 내 업로드된 영상 수 (DB에서 집계) Returns: list[ContentMetric]: KPI 지표 카드 리스트 (8개) 순서: 조회수, 시청시간, 평균 시청시간, 신규 구독자, 좋아요, 댓글, 공유, 업로드 영상 """ logger.info( f"[DataProcessor._build_content_metrics] START - " f"kpi_keys={list(kpi_data.keys())}" ) rows = kpi_data.get("rows", []) if not rows or not rows[0]: logger.warning( f"[DataProcessor._build_content_metrics] NO_DATA - " f"rows={rows}" ) return [] row = rows[0] prev_rows = kpi_previous_data.get("rows", []) prev_row = prev_rows[0] if prev_rows else [] def _get(r: list, i: int, default: float = 0.0) -> float: return r[i] if len(r) > i else default def _trend(recent: float, previous: float) -> tuple[float, str]: pct = recent - previous if pct > 0: direction = "up" elif pct < 0: direction = "down" else: direction = "-" return pct, direction # 최근 기간 views = _get(row, 0) likes = _get(row, 1) comments = _get(row, 2) shares = _get(row, 3) estimated_minutes_watched = _get(row, 4) average_view_duration = _get(row, 5) subscribers_gained = _get(row, 6) # 이전 기간 prev_views = _get(prev_row, 0) prev_likes = _get(prev_row, 1) prev_comments = _get(prev_row, 2) prev_shares = _get(prev_row, 3) prev_minutes_watched = _get(prev_row, 4) prev_avg_duration = _get(prev_row, 5) prev_subscribers = _get(prev_row, 6) views_trend, views_dir = _trend(views, prev_views) watch_trend, watch_dir = _trend(estimated_minutes_watched, prev_minutes_watched) duration_trend, duration_dir = _trend(average_view_duration, prev_avg_duration) subs_trend, subs_dir = _trend(subscribers_gained, prev_subscribers) likes_trend, likes_dir = _trend(likes, prev_likes) comments_trend, comments_dir = _trend(comments, prev_comments) shares_trend, shares_dir = _trend(shares, prev_shares) logger.info( f"[DataProcessor._build_content_metrics] SUCCESS - " f"views={views}({views_trend:+.1f}%), " f"watch_time={estimated_minutes_watched}min({watch_trend:+.1f}%), " f"subscribers={subscribers_gained}({subs_trend:+.1f}%)" ) return [ ContentMetric( id="total-views", label="조회수", label_en="Total Views", value=self._format_number(int(views)), trend=views_trend, trend_direction=views_dir, ), ContentMetric( id="total-watch-time", label="시청시간", label_en="Watch Time", value=f"{round(estimated_minutes_watched / 60, 1)}시간", trend=watch_trend, trend_direction=watch_dir, ), ContentMetric( id="avg-view-duration", label="평균 시청시간", label_en="Avg. View Duration", value=f"{round(average_view_duration / 60)}분", trend=duration_trend, trend_direction=duration_dir, ), ContentMetric( id="new-subscribers", label="신규 구독자", label_en="New Subscribers", value=self._format_number(int(subscribers_gained)), trend=subs_trend, trend_direction=subs_dir, ), ContentMetric( id="likes", label="좋아요", label_en="Likes", value=self._format_number(int(likes)), trend=likes_trend, trend_direction=likes_dir, ), ContentMetric( id="comments", label="댓글", label_en="Comments", value=self._format_number(int(comments)), trend=comments_trend, trend_direction=comments_dir, ), ContentMetric( id="shares", label="공유", label_en="Shares", value=self._format_number(int(shares)), trend=shares_trend, trend_direction=shares_dir, ), ContentMetric( id="uploaded-videos", label="업로드 영상", label_en="Uploaded Videos", value=str(period_video_count), trend=0.0, trend_direction="up", ), ] def _merge_monthly_data( self, data_recent: dict[str, Any], data_previous: dict[str, Any], ) -> list[MonthlyData]: """최근 12개월과 이전 12개월의 월별 데이터를 병합 최근 12개월 대비 이전 12개월의 월별 조회수 비교 차트를 위한 데이터를 생성합니다. 실제 API 응답의 월 데이터를 기준으로 매핑합니다. Args: data_recent: 최근 12개월 월별 조회수 데이터 rows = [["2026-01", 150000], ["2026-02", 180000], ...] data_previous: 이전 12개월 월별 조회수 데이터 rows = [["2025-01", 120000], ["2025-02", 140000], ...] Returns: list[MonthlyData]: 월별 비교 데이터 (최대 12개) """ logger.debug("[DataProcessor._merge_monthly_data] START") rows_recent = data_recent.get("rows", []) rows_previous = data_previous.get("rows", []) # 월별 맵 생성: {"2025-02": 150000, "2025-03": 180000} map_recent = {row[0]: row[1] for row in rows_recent if len(row) >= 2} map_previous = {row[0]: row[1] for row in rows_previous if len(row) >= 2} # 최근 기간의 월 키만 기준으로 정렬 (24개 합집합 방지) # 각 월의 이전 연도 키는 1년 전으로 계산: "2025-02" → "2024-02" recent_months = sorted(map_recent.keys()) # 월별 데이터 생성 result = [] for month_key in recent_months: year, month = month_key.split("-") month_num = int(month) month_label = f"{month_num}월" # 이전 연도 동일 월: "2025-02" → "2024-02" prev_year_key = f"{int(year) - 1}-{month}" result.append( MonthlyData( month=month_label, this_year=map_recent.get(month_key, 0), last_year=map_previous.get(prev_year_key, 0), ) ) logger.debug( f"[DataProcessor._merge_monthly_data] SUCCESS - count={len(result)}" ) return result def _build_daily_data( self, data_recent: dict[str, Any], data_previous: dict[str, Any], end_date: str = "", num_days: int = 30, ) -> list[DailyData]: """최근 30일과 이전 30일의 일별 데이터를 병합 end_date 기준 num_days개 날짜를 직접 생성하여 YouTube API 응답에 해당 날짜 row가 없어도 0으로 채웁니다 (X축 누락 방지). Args: data_recent: 최근 30일 일별 조회수 데이터 rows = [["2026-01-20", 5000], ["2026-01-21", 6200], ...] data_previous: 이전 30일 일별 조회수 데이터 rows = [["2025-12-21", 4500], ["2025-12-22", 5100], ...] end_date: 최근 기간의 마지막 날 (YYYY-MM-DD). 미전달 시 rows 마지막 날 사용 num_days: 표시할 일수 (기본 30) Returns: list[DailyData]: 일별 비교 데이터 (num_days개, 데이터 없는 날은 0) """ logger.debug("[DataProcessor._build_daily_data] START") rows_recent = data_recent.get("rows", []) rows_previous = data_previous.get("rows", []) # 날짜 → 조회수 맵 map_recent = {row[0]: row[1] for row in rows_recent if len(row) >= 2} map_previous = {row[0]: row[1] for row in rows_previous if len(row) >= 2} # end_date 결정: 전달된 값 우선, 없으면 rows 마지막 날짜 사용 if end_date: end_dt = datetime.strptime(end_date, "%Y-%m-%d").date() elif rows_recent: end_dt = datetime.strptime(rows_recent[-1][0], "%Y-%m-%d").date() else: logger.warning( "[DataProcessor._build_daily_data] NO_DATA - rows_recent 비어있음" ) return [] start_dt = end_dt - timedelta(days=num_days - 1) # 날짜 범위를 직접 생성하여 누락된 날짜도 0으로 채움 result = [] current = start_dt while current <= end_dt: date_str = current.strftime("%Y-%m-%d") date_label = f"{current.month}/{current.day}" this_views = map_recent.get(date_str, 0) # 이전 기간: 동일 인덱스 날짜 (current - 30일) prev_date_str = (current - timedelta(days=num_days)).strftime("%Y-%m-%d") last_views = map_previous.get(prev_date_str, 0) result.append( DailyData( date=date_label, this_period=int(this_views), last_period=int(last_views), ) ) current += timedelta(days=1) logger.debug(f"[DataProcessor._build_daily_data] SUCCESS - count={len(result)}") return result def _build_audience_data( self, demographics_data: dict[str, Any], geography_data: dict[str, Any], ) -> AudienceData: """시청자 분석 데이터 생성 연령대별, 성별, 지역별 시청자 분포를 분석합니다. Args: demographics_data: 연령/성별 API 응답 rows = [["age18-24", "male", 45000], ["age18-24", "female", 55000], ...] geography_data: 지역별 API 응답 rows = [["KR", 1000000], ["US", 500000], ...] Returns: AudienceData: 시청자 분석 데이터 - age_groups: 연령대별 비율 - gender: 성별 조회수 - top_regions: 상위 지역 (5개) """ logger.debug("[DataProcessor._build_audience_data] START") # === 연령/성별 데이터 처리 === demo_rows = demographics_data.get("rows", []) age_map: dict[str, int] = {} gender_map = {"male": 0, "female": 0} for row in demo_rows: if len(row) < 3: continue age_group = row[0] # "age18-24" gender = row[1] # "male" or "female" views = row[2] # 연령대별 집계 (age18-24 → 18-24) age_label = age_group.replace("age", "") age_map[age_label] = age_map.get(age_label, 0) + views # 성별 집계 if gender in gender_map: gender_map[gender] += views # 연령대별 비율 계산 total_demo_views = sum(age_map.values()) age_groups = [ { "label": age, "percentage": int( (count / total_demo_views * 100) if total_demo_views > 0 else 0 ), } for age, count in sorted(age_map.items()) ] # === 지역 데이터 처리 === geo_rows = geography_data.get("rows", []) total_geo_views = sum(row[1] for row in geo_rows if len(row) >= 2) top_regions = [ { "region": self._translate_country_code(row[0]), "percentage": int( (row[1] / total_geo_views * 100) if total_geo_views > 0 else 0 ), } for row in geo_rows[:5] # 상위 5개 if len(row) >= 2 ] logger.debug( f"[DataProcessor._build_audience_data] SUCCESS - " f"age_groups={len(age_groups)}, regions={len(top_regions)}" ) return AudienceData( age_groups=age_groups, gender=gender_map, top_regions=top_regions, ) @staticmethod def _format_number(num: int) -> str: """숫자 포맷팅 (1234567 → "1.2M") 조회수, 구독자 수 등 큰 숫자를 읽기 쉽게 포맷팅합니다. Args: num: 원본 숫자 Returns: str: 포맷팅된 문자열 - 1,000,000 이상: "1.2M" - 1,000 이상: "12.5K" - 1,000 미만: "123" Example: >>> _format_number(1234567) "1.2M" >>> _format_number(12345) "12.3K" >>> _format_number(123) "123" """ if num >= 1_000_000: return f"{num / 1_000_000:.1f}M" elif num >= 1_000: return f"{num / 1_000:.1f}K" else: return str(num) @staticmethod def _format_duration(seconds: int) -> str: """초를 M:SS 형식으로 변환 (평균 시청 시간 표시용) Args: seconds: 초 단위 시간 Returns: str: M:SS 형식 문자열 - 204초 → "3:24" - 65초 → "1:05" - 45초 → "0:45" """ minutes = seconds // 60 secs = seconds % 60 return f"{minutes}:{secs:02d}" @staticmethod def _translate_country_code(code: str) -> str: """국가 코드를 한국어로 변환 ISO 3166-1 alpha-2 국가 코드를 한국어 국가명으로 변환합니다. Args: code: ISO 3166-1 alpha-2 국가 코드 (예: "KR", "US") Returns: str: 한국어 국가명 (매핑되지 않은 경우 원본 코드 반환) Example: >>> _translate_country_code("KR") "대한민국" >>> _translate_country_code("US") "미국" """ country_map = { "KR": "대한민국", "US": "미국", "JP": "일본", "CN": "중국", "GB": "영국", "DE": "독일", "FR": "프랑스", "CA": "캐나다", "AU": "호주", "IN": "인도", "ID": "인도네시아", "TH": "태국", "VN": "베트남", "PH": "필리핀", "MY": "말레이시아", "SG": "싱가포르", "TW": "대만", "HK": "홍콩", "BR": "브라질", "MX": "멕시코", } return country_map.get(code, code)