543 lines
19 KiB
Python
543 lines
19 KiB
Python
"""
|
|
YouTube Analytics 데이터 가공 프로세서
|
|
|
|
YouTube Analytics API의 원본 데이터를 프론트엔드용 Pydantic 스키마로 변환합니다.
|
|
"""
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Literal
|
|
|
|
from app.dashboard.schemas import (
|
|
AudienceData,
|
|
ContentMetric,
|
|
DailyData,
|
|
DashboardResponse,
|
|
MonthlyData,
|
|
TopContent,
|
|
)
|
|
from app.utils.logger import get_logger
|
|
|
|
logger = get_logger("dashboard")
|
|
|
|
_COUNTRY_CODE_MAP: dict[str, str] = {
|
|
"KR": "대한민국",
|
|
"US": "미국",
|
|
"JP": "일본",
|
|
"CN": "중국",
|
|
"GB": "영국",
|
|
"DE": "독일",
|
|
"FR": "프랑스",
|
|
"CA": "캐나다",
|
|
"AU": "호주",
|
|
"IN": "인도",
|
|
"ID": "인도네시아",
|
|
"TH": "태국",
|
|
"VN": "베트남",
|
|
"PH": "필리핀",
|
|
"MY": "말레이시아",
|
|
"SG": "싱가포르",
|
|
"TW": "대만",
|
|
"HK": "홍콩",
|
|
"BR": "브라질",
|
|
"MX": "멕시코",
|
|
"NL": "네덜란드",
|
|
"BE": "벨기에",
|
|
"SE": "스웨덴",
|
|
"NO": "노르웨이",
|
|
"FI": "핀란드",
|
|
"DK": "덴마크",
|
|
"IE": "아일랜드",
|
|
"PL": "폴란드",
|
|
"CZ": "체코",
|
|
"RO": "루마니아",
|
|
"HU": "헝가리",
|
|
"SK": "슬로바키아",
|
|
"SI": "슬로베니아",
|
|
"HR": "크로아티아",
|
|
"GR": "그리스",
|
|
"PT": "포르투갈",
|
|
"ES": "스페인",
|
|
"IT": "이탈리아",
|
|
}
|
|
|
|
class DataProcessor:
|
|
"""YouTube Analytics 데이터 가공 프로세서
|
|
|
|
YouTube Analytics API의 원본 JSON 데이터를 DashboardResponse 스키마로 변환합니다.
|
|
각 섹션별로 데이터 가공 로직을 분리하여 유지보수성을 향상시켰습니다.
|
|
"""
|
|
|
|
def process(
|
|
self,
|
|
raw_data: dict[str, Any],
|
|
top_content: list[TopContent],
|
|
period_video_count: int = 0,
|
|
mode: Literal["day", "month"] = "month",
|
|
end_date: str = "",
|
|
) -> DashboardResponse:
|
|
"""YouTube Analytics API 원본 데이터를 DashboardResponse로 변환
|
|
|
|
Args:
|
|
raw_data: YouTube Analytics API 응답 데이터 (mode에 따라 키 구성 다름)
|
|
공통:
|
|
- kpi: KPI 메트릭 (조회수, 좋아요, 댓글, 시청시간 등)
|
|
- top_videos: 인기 영상 데이터
|
|
- demographics: 연령/성별 데이터
|
|
- region: 지역별 데이터
|
|
mode="month" 추가:
|
|
- trend_recent: 최근 12개월 월별 조회수
|
|
- trend_previous: 이전 12개월 월별 조회수
|
|
mode="day" 추가:
|
|
- trend_recent: 최근 30일 일별 조회수
|
|
- trend_previous: 이전 30일 일별 조회수
|
|
top_content: TopContent 리스트 (라우터에서 Analytics + DB lookup으로 생성)
|
|
period_video_count: 조회 기간 내 업로드된 영상 수 (DB에서 집계)
|
|
mode: 조회 모드 ("month" | "day")
|
|
|
|
Returns:
|
|
DashboardResponse: 프론트엔드용 대시보드 응답 스키마
|
|
- mode="month": monthly_data 채움, daily_data=[]
|
|
- mode="day": daily_data 채움, monthly_data=[]
|
|
|
|
Example:
|
|
>>> processor = DataProcessor()
|
|
>>> response = processor.process(
|
|
... raw_data={
|
|
... "kpi": {...},
|
|
... "monthly_recent": {...},
|
|
... "monthly_previous": {...},
|
|
... "top_videos": {...},
|
|
... "demographics": {...},
|
|
... "region": {...},
|
|
... },
|
|
... top_content=[TopContent(...)],
|
|
... mode="month",
|
|
... )
|
|
"""
|
|
logger.debug(
|
|
f"[DataProcessor.process] START - "
|
|
f"top_content_count={len(top_content)}"
|
|
)
|
|
|
|
# 각 섹션별 데이터 가공 (안전한 딕셔너리 접근)
|
|
content_metrics = self._build_content_metrics(
|
|
raw_data.get("kpi", {}),
|
|
raw_data.get("kpi_previous", {}),
|
|
period_video_count,
|
|
)
|
|
|
|
if mode == "month":
|
|
monthly_data = self._merge_monthly_data(
|
|
raw_data.get("trend_recent", {}),
|
|
raw_data.get("trend_previous", {}),
|
|
end_date=end_date,
|
|
)
|
|
daily_data: list[DailyData] = []
|
|
else: # mode == "day"
|
|
daily_data = self._build_daily_data(
|
|
raw_data.get("trend_recent", {}),
|
|
raw_data.get("trend_previous", {}),
|
|
end_date=end_date,
|
|
)
|
|
monthly_data = []
|
|
|
|
audience_data = self._build_audience_data(
|
|
raw_data.get("demographics") or {},
|
|
raw_data.get("region") or {},
|
|
)
|
|
logger.debug(
|
|
f"[DataProcessor.process] SUCCESS - "
|
|
f"mode={mode}, metrics={len(content_metrics)}, "
|
|
f"top_content={len(top_content)}"
|
|
)
|
|
|
|
return DashboardResponse(
|
|
content_metrics=content_metrics,
|
|
monthly_data=monthly_data,
|
|
daily_data=daily_data,
|
|
top_content=top_content,
|
|
audience_data=audience_data,
|
|
)
|
|
|
|
def _build_content_metrics(
|
|
self,
|
|
kpi_data: dict[str, Any],
|
|
kpi_previous_data: dict[str, Any],
|
|
period_video_count: int = 0,
|
|
) -> list[ContentMetric]:
|
|
"""KPI 데이터를 ContentMetric 리스트로 변환
|
|
|
|
Args:
|
|
kpi_data: 최근 기간 KPI 응답
|
|
rows[0] = [views, likes, comments, shares,
|
|
estimatedMinutesWatched, averageViewDuration,
|
|
subscribersGained]
|
|
kpi_previous_data: 이전 기간 KPI 응답 (증감률 계산용)
|
|
period_video_count: 조회 기간 내 업로드된 영상 수 (DB에서 집계)
|
|
|
|
Returns:
|
|
list[ContentMetric]: KPI 지표 카드 리스트 (8개)
|
|
순서: 조회수, 시청시간, 평균 시청시간, 신규 구독자, 좋아요, 댓글, 공유, 업로드 영상
|
|
"""
|
|
logger.info(
|
|
f"[DataProcessor._build_content_metrics] START - "
|
|
f"kpi_keys={list(kpi_data.keys())}"
|
|
)
|
|
|
|
rows = kpi_data.get("rows", [])
|
|
if not rows or not rows[0]:
|
|
logger.warning(
|
|
f"[DataProcessor._build_content_metrics] NO_DATA - " f"rows={rows}"
|
|
)
|
|
return []
|
|
|
|
row = rows[0]
|
|
prev_rows = kpi_previous_data.get("rows", [])
|
|
prev_row = prev_rows[0] if prev_rows else []
|
|
|
|
def _get(r: list, i: int, default: float = 0.0) -> float:
|
|
return r[i] if len(r) > i else default
|
|
|
|
def _trend(recent: float, previous: float) -> tuple[float, str]:
|
|
pct = recent - previous
|
|
if pct > 0:
|
|
direction = "up"
|
|
elif pct < 0:
|
|
direction = "down"
|
|
else:
|
|
direction = "-"
|
|
return pct, direction
|
|
|
|
# 최근 기간
|
|
views = _get(row, 0)
|
|
likes = _get(row, 1)
|
|
comments = _get(row, 2)
|
|
shares = _get(row, 3)
|
|
estimated_minutes_watched = _get(row, 4)
|
|
average_view_duration = _get(row, 5)
|
|
subscribers_gained = _get(row, 6)
|
|
|
|
# 이전 기간
|
|
prev_views = _get(prev_row, 0)
|
|
prev_likes = _get(prev_row, 1)
|
|
prev_comments = _get(prev_row, 2)
|
|
prev_shares = _get(prev_row, 3)
|
|
prev_minutes_watched = _get(prev_row, 4)
|
|
prev_avg_duration = _get(prev_row, 5)
|
|
prev_subscribers = _get(prev_row, 6)
|
|
|
|
views_trend, views_dir = _trend(views, prev_views)
|
|
watch_trend, watch_dir = _trend(estimated_minutes_watched, prev_minutes_watched)
|
|
duration_trend, duration_dir = _trend(average_view_duration, prev_avg_duration)
|
|
subs_trend, subs_dir = _trend(subscribers_gained, prev_subscribers)
|
|
likes_trend, likes_dir = _trend(likes, prev_likes)
|
|
comments_trend, comments_dir = _trend(comments, prev_comments)
|
|
shares_trend, shares_dir = _trend(shares, prev_shares)
|
|
|
|
logger.info(
|
|
f"[DataProcessor._build_content_metrics] SUCCESS - "
|
|
f"views={views}({views_trend:+.1f}), "
|
|
f"watch_time={estimated_minutes_watched}min({watch_trend:+.1f}), "
|
|
f"subscribers={subscribers_gained}({subs_trend:+.1f})"
|
|
)
|
|
|
|
return [
|
|
ContentMetric(
|
|
id="total-views",
|
|
label="조회수",
|
|
value=float(views),
|
|
unit="count",
|
|
trend=round(float(views_trend), 1),
|
|
trend_direction=views_dir,
|
|
),
|
|
ContentMetric(
|
|
id="total-watch-time",
|
|
label="시청시간",
|
|
value=round(estimated_minutes_watched / 60, 1),
|
|
unit="hours",
|
|
trend=round(watch_trend / 60, 1),
|
|
trend_direction=watch_dir,
|
|
),
|
|
ContentMetric(
|
|
id="avg-view-duration",
|
|
label="평균 시청시간",
|
|
value=round(average_view_duration / 60, 1),
|
|
unit="minutes",
|
|
trend=round(duration_trend / 60, 1),
|
|
trend_direction=duration_dir,
|
|
),
|
|
ContentMetric(
|
|
id="new-subscribers",
|
|
label="신규 구독자",
|
|
value=float(subscribers_gained),
|
|
unit="count",
|
|
trend=subs_trend,
|
|
trend_direction=subs_dir,
|
|
),
|
|
ContentMetric(
|
|
id="likes",
|
|
label="좋아요",
|
|
value=float(likes),
|
|
unit="count",
|
|
trend=likes_trend,
|
|
trend_direction=likes_dir,
|
|
),
|
|
ContentMetric(
|
|
id="comments",
|
|
label="댓글",
|
|
value=float(comments),
|
|
unit="count",
|
|
trend=comments_trend,
|
|
trend_direction=comments_dir,
|
|
),
|
|
ContentMetric(
|
|
id="shares",
|
|
label="공유",
|
|
value=float(shares),
|
|
unit="count",
|
|
trend=shares_trend,
|
|
trend_direction=shares_dir,
|
|
),
|
|
ContentMetric(
|
|
id="uploaded-videos",
|
|
label="업로드 영상",
|
|
value=float(period_video_count),
|
|
unit="count",
|
|
trend=0.0,
|
|
trend_direction="-",
|
|
),
|
|
]
|
|
|
|
def _merge_monthly_data(
|
|
self,
|
|
data_recent: dict[str, Any],
|
|
data_previous: dict[str, Any],
|
|
end_date: str = "",
|
|
) -> list[MonthlyData]:
|
|
"""최근 12개월과 이전 12개월의 월별 데이터를 병합
|
|
|
|
end_date 기준 12개월을 명시 생성하여 API가 반환하지 않은 월(당월 등)도 0으로 포함합니다.
|
|
|
|
Args:
|
|
data_recent: 최근 12개월 월별 조회수 데이터
|
|
rows = [["2026-01", 150000], ["2026-02", 180000], ...]
|
|
data_previous: 이전 12개월 월별 조회수 데이터
|
|
rows = [["2025-01", 120000], ["2025-02", 140000], ...]
|
|
end_date: 기준 종료일 (YYYY-MM-DD). 미전달 시 오늘 사용
|
|
|
|
Returns:
|
|
list[MonthlyData]: 월별 비교 데이터 (12개, API 미반환 월은 0)
|
|
"""
|
|
logger.debug("[DataProcessor._merge_monthly_data] START")
|
|
|
|
rows_recent = data_recent.get("rows", [])
|
|
rows_previous = data_previous.get("rows", [])
|
|
|
|
map_recent = {row[0]: row[1] for row in rows_recent if len(row) >= 2}
|
|
map_previous = {row[0]: row[1] for row in rows_previous if len(row) >= 2}
|
|
|
|
# end_date 기준 12개월 명시 생성 (API 미반환 당월도 0으로 포함)
|
|
if end_date:
|
|
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
|
|
else:
|
|
end_dt = datetime.today()
|
|
|
|
result = []
|
|
for i in range(11, -1, -1):
|
|
m = end_dt.month - i
|
|
y = end_dt.year
|
|
if m <= 0:
|
|
m += 12
|
|
y -= 1
|
|
month_key = f"{y}-{m:02d}"
|
|
result.append(
|
|
MonthlyData(
|
|
month=f"{m}월",
|
|
this_year=map_recent.get(month_key, 0),
|
|
last_year=map_previous.get(f"{y - 1}-{m:02d}", 0),
|
|
)
|
|
)
|
|
|
|
logger.debug(
|
|
f"[DataProcessor._merge_monthly_data] SUCCESS - count={len(result)}"
|
|
)
|
|
return result
|
|
|
|
def _build_daily_data(
|
|
self,
|
|
data_recent: dict[str, Any],
|
|
data_previous: dict[str, Any],
|
|
end_date: str = "",
|
|
num_days: int = 30,
|
|
) -> list[DailyData]:
|
|
"""최근 30일과 이전 30일의 일별 데이터를 병합
|
|
|
|
end_date 기준 num_days개 날짜를 직접 생성하여 YouTube API 응답에
|
|
해당 날짜 row가 없어도 0으로 채웁니다 (X축 누락 방지).
|
|
|
|
Args:
|
|
data_recent: 최근 30일 일별 조회수 데이터
|
|
rows = [["2026-01-20", 5000], ["2026-01-21", 6200], ...]
|
|
data_previous: 이전 30일 일별 조회수 데이터
|
|
rows = [["2025-12-21", 4500], ["2025-12-22", 5100], ...]
|
|
end_date: 최근 기간의 마지막 날 (YYYY-MM-DD). 미전달 시 rows 마지막 날 사용
|
|
num_days: 표시할 일수 (기본 30)
|
|
|
|
Returns:
|
|
list[DailyData]: 일별 비교 데이터 (num_days개, 데이터 없는 날은 0)
|
|
"""
|
|
logger.debug("[DataProcessor._build_daily_data] START")
|
|
|
|
rows_recent = data_recent.get("rows", [])
|
|
rows_previous = data_previous.get("rows", [])
|
|
|
|
# 날짜 → 조회수 맵
|
|
map_recent = {row[0]: row[1] for row in rows_recent if len(row) >= 2}
|
|
map_previous = {row[0]: row[1] for row in rows_previous if len(row) >= 2}
|
|
|
|
# end_date 결정: 전달된 값 우선, 없으면 rows 마지막 날짜 사용
|
|
if end_date:
|
|
end_dt = datetime.strptime(end_date, "%Y-%m-%d").date()
|
|
elif rows_recent:
|
|
end_dt = datetime.strptime(rows_recent[-1][0], "%Y-%m-%d").date()
|
|
else:
|
|
logger.warning(
|
|
"[DataProcessor._build_daily_data] NO_DATA - rows_recent 비어있음"
|
|
)
|
|
return []
|
|
|
|
start_dt = end_dt - timedelta(days=num_days - 1)
|
|
|
|
# 날짜 범위를 직접 생성하여 누락된 날짜도 0으로 채움
|
|
result = []
|
|
current = start_dt
|
|
while current <= end_dt:
|
|
date_str = current.strftime("%Y-%m-%d")
|
|
date_label = f"{current.month}/{current.day}"
|
|
|
|
this_views = map_recent.get(date_str, 0)
|
|
|
|
# 이전 기간: 동일 인덱스 날짜 (current - 30일)
|
|
prev_date_str = (current - timedelta(days=num_days)).strftime("%Y-%m-%d")
|
|
last_views = map_previous.get(prev_date_str, 0)
|
|
|
|
result.append(
|
|
DailyData(
|
|
date=date_label,
|
|
this_period=int(this_views),
|
|
last_period=int(last_views),
|
|
)
|
|
)
|
|
current += timedelta(days=1)
|
|
|
|
logger.debug(f"[DataProcessor._build_daily_data] SUCCESS - count={len(result)}")
|
|
return result
|
|
|
|
def _build_audience_data(
|
|
self,
|
|
demographics_data: dict[str, Any],
|
|
geography_data: dict[str, Any],
|
|
) -> AudienceData:
|
|
"""시청자 분석 데이터 생성
|
|
|
|
연령대별, 성별, 지역별 시청자 분포를 분석합니다.
|
|
|
|
Args:
|
|
demographics_data: 연령/성별 API 응답
|
|
rows = [["age18-24", "male", 45000], ["age18-24", "female", 55000], ...]
|
|
geography_data: 지역별 API 응답
|
|
rows = [["KR", 1000000], ["US", 500000], ...]
|
|
|
|
Returns:
|
|
AudienceData: 시청자 분석 데이터
|
|
- age_groups: 연령대별 비율
|
|
- gender: 성별 조회수
|
|
- top_regions: 상위 지역 (5개)
|
|
"""
|
|
logger.debug("[DataProcessor._build_audience_data] START")
|
|
|
|
# === 연령/성별 데이터 처리 ===
|
|
demo_rows = demographics_data.get("rows", [])
|
|
|
|
age_map: dict[str, float] = {}
|
|
gender_map_f: dict[str, float] = {"male": 0.0, "female": 0.0}
|
|
|
|
for row in demo_rows:
|
|
if len(row) < 3:
|
|
continue
|
|
|
|
age_group = row[0] # "age18-24"
|
|
gender = row[1] # "male" or "female"
|
|
viewer_pct = row[2] # viewerPercentage (이미 % 값, 예: 45.5)
|
|
|
|
# 연령대별 집계: 남녀 비율 합산 (age18-24 → 18-24)
|
|
age_label = age_group.replace("age", "")
|
|
age_map[age_label] = age_map.get(age_label, 0.0) + viewer_pct
|
|
|
|
# 성별 집계
|
|
if gender in gender_map_f:
|
|
gender_map_f[gender] += viewer_pct
|
|
|
|
# 연령대 5개로 통합: 13-17+18-24 → 13-24, 55-64+65- → 55+
|
|
merged_age: dict[str, float] = {
|
|
"13-24": age_map.get("13-17", 0.0) + age_map.get("18-24", 0.0),
|
|
"25-34": age_map.get("25-34", 0.0),
|
|
"35-44": age_map.get("35-44", 0.0),
|
|
"45-54": age_map.get("45-54", 0.0),
|
|
"55+": age_map.get("55-64", 0.0) + age_map.get("65-", 0.0),
|
|
}
|
|
age_groups = [
|
|
{"label": age, "percentage": int(round(pct))}
|
|
for age, pct in merged_age.items()
|
|
]
|
|
gender_map = {k: int(round(v)) for k, v in gender_map_f.items()}
|
|
|
|
# === 지역 데이터 처리 ===
|
|
geo_rows = geography_data.get("rows", [])
|
|
total_geo_views = sum(row[1] for row in geo_rows if len(row) >= 2)
|
|
|
|
merged_geo: defaultdict[str, int] = defaultdict(int)
|
|
for row in geo_rows:
|
|
if len(row) >= 2:
|
|
merged_geo[self._translate_country_code(row[0])] += row[1]
|
|
|
|
top_regions = [
|
|
{
|
|
"region": region,
|
|
"percentage": int((views / total_geo_views * 100) if total_geo_views > 0 else 0),
|
|
}
|
|
for region, views in sorted(merged_geo.items(), key=lambda x: x[1], reverse=True)[:5]
|
|
]
|
|
|
|
logger.debug(
|
|
f"[DataProcessor._build_audience_data] SUCCESS - "
|
|
f"age_groups={len(age_groups)}, regions={len(top_regions)}"
|
|
)
|
|
|
|
return AudienceData(
|
|
age_groups=age_groups,
|
|
gender=gender_map,
|
|
top_regions=top_regions,
|
|
)
|
|
|
|
@staticmethod
|
|
def _translate_country_code(code: str) -> str:
|
|
"""국가 코드를 한국어로 변환
|
|
|
|
ISO 3166-1 alpha-2 국가 코드를 한국어 국가명으로 변환합니다.
|
|
|
|
Args:
|
|
code: ISO 3166-1 alpha-2 국가 코드 (예: "KR", "US")
|
|
|
|
Returns:
|
|
str: 한국어 국가명 (매핑되지 않은 경우 원본 코드 반환)
|
|
|
|
Example:
|
|
>>> _translate_country_code("KR")
|
|
"대한민국"
|
|
>>> _translate_country_code("US")
|
|
"미국"
|
|
"""
|
|
return _COUNTRY_CODE_MAP.get(code, "기타")
|