Compare commits

..

8 Commits

27 changed files with 1732 additions and 1432 deletions

View File

@ -4,43 +4,22 @@ Dashboard API 라우터
YouTube Analytics 기반 대시보드 통계를 제공합니다.
"""
import json
import logging
from datetime import date, datetime, timedelta
from typing import Literal
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dashboard.exceptions import (
YouTubeAccountNotConnectedError,
YouTubeAccountNotFoundError,
YouTubeAccountSelectionRequiredError,
YouTubeTokenExpiredError,
)
from app.dashboard.utils.redis_cache import delete_cache_pattern
from app.dashboard.schemas import (
AudienceData,
CacheDeleteResponse,
ConnectedAccount,
ConnectedAccountsResponse,
ContentMetric,
DashboardResponse,
TopContent,
)
from app.dashboard.services import DataProcessor, YouTubeAnalyticsService
from app.dashboard.redis_cache import (
delete_cache,
delete_cache_pattern,
get_cache,
set_cache,
)
from app.dashboard.services import DashboardService
from app.database.session import get_session
from app.dashboard.models import Dashboard
from app.social.exceptions import TokenExpiredError
from app.social.services import SocialAccountService
from app.user.dependencies.auth import get_current_user
from app.user.models import SocialAccount, User
from app.user.models import User
logger = logging.getLogger(__name__)
@ -61,41 +40,8 @@ async def get_connected_accounts(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> ConnectedAccountsResponse:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
accounts_raw = result.scalars().all()
# platform_user_id 기준
seen_platform_ids: set[str] = set()
connected = []
for acc in sorted(
accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id in seen_platform_ids:
continue
seen_platform_ids.add(acc.platform_user_id)
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
connected.append(
ConnectedAccount(
id=acc.id,
platform=acc.platform,
platform_username=acc.platform_username,
platform_user_id=acc.platform_user_id,
channel_title=data.get("channel_title"),
connected_at=acc.connected_at,
is_active=acc.is_active,
)
)
logger.info(
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
)
service = DashboardService()
connected = await service.get_connected_accounts(current_user, session)
return ConnectedAccountsResponse(accounts=connected)
@ -142,328 +88,8 @@ async def get_dashboard_stats(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> DashboardResponse:
"""
대시보드 통계 조회
Args:
mode: 조회 모드 (day: 최근 30, month: 최근 12개월)
platform_user_id: 사용할 YouTube 채널 ID (여러 계정 연결 필수, 재연동해도 불변)
current_user: 현재 인증된 사용자
session: 데이터베이스 세션
Returns:
DashboardResponse: 대시보드 통계 데이터
Raises:
YouTubeAccountNotConnectedError: YouTube 계정이 연동되어 있지 않음
YouTubeAccountSelectionRequiredError: 여러 계정이 연결되어 있으나 계정 미선택
YouTubeAccountNotFoundError: 지정한 계정을 찾을 없음
YouTubeTokenExpiredError: YouTube 토큰 만료 (재연동 필요)
YouTubeAPIError: YouTube Analytics API 호출 실패
"""
logger.info(
f"[DASHBOARD] 통계 조회 시작 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
)
# 1. 모드별 날짜 자동 계산
today = date.today()
if mode == "day":
# 48시간 지연 적용: 오늘 기준 -2일을 end로 사용
# ex) 오늘 2/20 → end=2/18, start=1/20
end_dt = today - timedelta(days=2)
kpi_end_dt = end_dt
start_dt = end_dt - timedelta(days=29)
# 이전 30일 (YouTube API day_previous와 동일 기준)
prev_start_dt = start_dt - timedelta(days=30)
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
period_desc = "최근 30일"
else: # mode == "month"
# 월별 차트: dimensions=month API는 YYYY-MM-01 형식 필요
# ex) 오늘 2/24 → end=2026-02-01, start=2025-03-01 → 2025-03 ~ 2026-02 (12개월)
end_dt = today.replace(day=1)
# KPI 등 집계형 API: 48시간 지연 적용하여 현재 월 전체 데이터 포함
kpi_end_dt = today - timedelta(days=2)
start_month = end_dt.month - 11
if start_month <= 0:
start_month += 12
start_year = end_dt.year - 1
else:
start_year = end_dt.year
start_dt = date(start_year, start_month, 1)
# 이전 12개월 (YouTube API previous와 동일 기준 — 1년 전)
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
try:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
except ValueError: # 윤년 2/29 → 이전 연도 2/28
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
period_desc = "최근 12개월"
start_date = start_dt.strftime("%Y-%m-%d")
end_date = end_dt.strftime("%Y-%m-%d")
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
logger.debug(
f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}"
)
# 2. YouTube 계정 연동 확인
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
social_accounts_raw = result.scalars().all()
# platform_user_id 기준으로 중복 제거 (가장 최근 연동 계정 우선)
seen_platform_ids_stats: set[str] = set()
social_accounts = []
for acc in sorted(
social_accounts_raw, key=lambda a: a.connected_at or datetime.min, reverse=True
):
if acc.platform_user_id not in seen_platform_ids_stats:
seen_platform_ids_stats.add(acc.platform_user_id)
social_accounts.append(acc)
if not social_accounts:
logger.warning(
f"[NO YOUTUBE ACCOUNT] YouTube 계정 미연동 - "
f"user_uuid={current_user.user_uuid}"
)
raise YouTubeAccountNotConnectedError()
if platform_user_id is not None:
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
if not matched:
logger.warning(
f"[ACCOUNT NOT FOUND] 지정 계정 없음 - "
f"user_uuid={current_user.user_uuid}, platform_user_id={platform_user_id}"
)
raise YouTubeAccountNotFoundError()
social_account = matched[0]
elif len(social_accounts) == 1:
social_account = social_accounts[0]
else:
logger.warning(
f"[MULTI ACCOUNT] 계정 선택 필요 - "
f"user_uuid={current_user.user_uuid}, count={len(social_accounts)}"
)
raise YouTubeAccountSelectionRequiredError()
logger.debug(
f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}"
)
# 3. 기간 내 업로드 영상 수 조회
count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= start_dt,
Dashboard.uploaded_at < today + timedelta(days=1),
)
)
period_video_count = count_result.scalar() or 0
# 이전 기간 업로드 영상 수 조회 (trend 계산용)
prev_count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= prev_start_dt,
Dashboard.uploaded_at <= prev_kpi_end_dt,
)
)
prev_period_video_count = prev_count_result.scalar() or 0
logger.debug(
f"[3] 기간 내 업로드 영상 수 - current={period_video_count}, prev={prev_period_video_count}"
)
# 4. Redis 캐시 조회
# platform_user_id 기준 캐시 키: 재연동해도 채널 ID는 불변 → 캐시 유지됨
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
cached_raw = await get_cache(cache_key)
if cached_raw:
try:
payload = json.loads(cached_raw)
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
response = DashboardResponse.model_validate(payload["response"])
for metric in response.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
return response
except (json.JSONDecodeError, KeyError):
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
# 5. 최근 30개 업로드 영상 조회 (Analytics API 전달용)
# YouTube Analytics API 제약사항:
# - 영상 개수: 20~30개 권장 (최대 50개, 그 이상은 응답 지연 발생)
# - URL 길이: 2000자 제한 (video ID 11자 × 30개 = 330자로 안전)
result = await session.execute(
select(
Dashboard.platform_video_id,
Dashboard.title,
Dashboard.uploaded_at,
)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
)
.order_by(Dashboard.uploaded_at.desc())
.limit(30)
)
rows = result.all()
logger.debug(f"[5] 영상 조회 완료 - count={len(rows)}")
# 6. video_ids + 메타데이터 조회용 dict 구성
video_ids = []
video_lookup: dict[str, tuple[str, datetime]] = {} # {video_id: (title, uploaded_at)}
for row in rows:
platform_video_id, title, uploaded_at = row
video_ids.append(platform_video_id)
video_lookup[platform_video_id] = (title, uploaded_at)
logger.debug(
f"[6] 영상 메타데이터 구성 완료 - count={len(video_ids)}, sample={video_ids[:3]}"
)
# 6.1 업로드 영상 없음 → YouTube API 호출 없이 빈 응답 반환
if not video_ids:
logger.info(
f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - "
f"user_uuid={current_user.user_uuid}"
)
return DashboardResponse(
content_metrics=[
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
],
monthly_data=[],
daily_data=[],
top_content=[],
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
has_uploads=False,
)
# 7. 토큰 유효성 확인 및 자동 갱신 (만료 10분 전 갱신)
try:
access_token = await SocialAccountService().ensure_valid_token(
social_account, session
)
except TokenExpiredError:
logger.warning(
f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}"
)
raise YouTubeTokenExpiredError()
logger.debug("[7] 토큰 유효성 확인 완료")
# 8. YouTube Analytics API 호출 (7개 병렬)
youtube_service = YouTubeAnalyticsService()
raw_data = await youtube_service.fetch_all_metrics(
video_ids=video_ids,
start_date=start_date,
end_date=end_date,
kpi_end_date=kpi_end_date,
access_token=access_token,
mode=mode,
)
logger.debug("[8] YouTube Analytics API 호출 완료")
# 9. TopContent 조립 (Analytics top_videos + DB lookup)
processor = DataProcessor()
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
top_content: list[TopContent] = []
for row in top_content_rows[:4]:
if len(row) < 4:
continue
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
meta = video_lookup.get(video_id)
if not meta:
continue
title, uploaded_at = meta
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
top_content.append(
TopContent(
id=video_id,
title=title,
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
platform="youtube",
views=int(views),
engagement=f"{engagement_rate:.1f}%",
date=uploaded_at.strftime("%Y.%m.%d"),
)
)
logger.debug(f"[9] TopContent 조립 완료 - count={len(top_content)}")
# 10. 데이터 가공 (period_video_count=0 — API 무관 DB 집계값, 캐시에 포함하지 않음)
dashboard_data = processor.process(
raw_data, top_content, 0, mode=mode, end_date=end_date
)
logger.debug("[10] 데이터 가공 완료")
# 11. Redis 캐싱 (TTL: 12시간)
# YouTube Analytics는 하루 1회 갱신 (PT 자정, 한국 시간 오후 5~8시)
# 48시간 지연된 데이터이므로 12시간 캐싱으로 API 호출 최소화
# period_video_count는 캐시에 포함하지 않음 (DB 직접 집계, API 미사용)
cache_payload = json.dumps(
{"response": json.loads(dashboard_data.model_dump_json())}
)
cache_success = await set_cache(
cache_key,
cache_payload,
ttl=43200, # 12시간
)
if cache_success:
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
else:
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
# 12. 업로드 영상 수 및 trend 주입 (캐시 저장 후 — 항상 DB에서 직접 집계)
for metric in dashboard_data.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
logger.info(
f"[DASHBOARD] 통계 조회 완료 - "
f"user_uuid={current_user.user_uuid}, "
f"mode={mode}, period={period_desc}, videos={len(video_ids)}"
)
return dashboard_data
service = DashboardService()
return await service.get_stats(mode, platform_user_id, current_user, session)
@router.delete(
@ -483,7 +109,7 @@ async def get_dashboard_stats(
`dashboard:{user_uuid}:{platform_user_id}:{mode}` (mode: day 또는 month)
## 파라미터
- `user_uuid`: 특정 사용자 캐시만 삭제. 미입력 전체 삭제
- `user_uuid`: 삭제할 사용자 UUID (필수)
- `mode`: day / month / all (기본값: all)
""",
)
@ -492,33 +118,16 @@ async def delete_dashboard_cache(
default="all",
description="삭제할 캐시 모드: day, month, all(기본값, 모두 삭제)",
),
user_uuid: str | None = Query(
default=None,
description="대상 사용자 UUID. 미입력 시 전체 사용자 캐시 삭제",
user_uuid: str = Query(
description="대상 사용자 UUID",
),
) -> CacheDeleteResponse:
"""
대시보드 캐시 삭제
Args:
mode: 삭제할 캐시 모드 (day / month / all)
user_uuid: 대상 사용자 UUID (없으면 전체 삭제)
Returns:
CacheDeleteResponse: 삭제된 캐시 개수 메시지
"""
if user_uuid:
if mode == "all":
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*")
message = f"전체 캐시 삭제 완료 ({deleted}개)"
else:
cache_key = f"dashboard:{user_uuid}:{mode}"
success = await delete_cache(cache_key)
deleted = 1 if success else 0
message = f"{mode} 캐시 삭제 {'완료' if success else '실패 (키 없음)'}"
if mode == "all":
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*")
message = f"전체 캐시 삭제 완료 ({deleted}개)"
else:
deleted = await delete_cache_pattern("dashboard:*")
message = f"전체 사용자 캐시 삭제 완료 ({deleted}개)"
deleted = await delete_cache_pattern(f"dashboard:{user_uuid}:*:{mode}")
message = f"{mode} 캐시 삭제 완료 ({deleted}개)"
logger.info(
f"[CACHE DELETE] user_uuid={user_uuid or 'ALL'}, mode={mode}, deleted={deleted}"

View File

@ -113,7 +113,7 @@ class YouTubeAccountSelectionRequiredError(DashboardException):
def __init__(self):
super().__init__(
message="연결된 YouTube 계정이 여러 개입니다. social_account_id 파라미터로 사용할 계정을 선택해주세요.",
message="연결된 YouTube 계정이 여러 개입니다. platform_user_id 파라미터로 사용할 계정을 선택해주세요.",
status_code=status.HTTP_400_BAD_REQUEST,
code="YOUTUBE_ACCOUNT_SELECTION_REQUIRED",
)

View File

@ -197,35 +197,6 @@ class AudienceData(BaseModel):
)
# class PlatformMetric(BaseModel):
# """플랫폼별 메트릭 (미사용 — platform_data 기능 미구현)"""
#
# id: str
# label: str
# value: str
# unit: Optional[str] = None
# trend: float
# trend_direction: Literal["up", "down", "-"] = Field(alias="trendDirection")
#
# model_config = ConfigDict(
# alias_generator=to_camel,
# populate_by_name=True,
# )
#
#
# class PlatformData(BaseModel):
# """플랫폼별 데이터 (미사용 — platform_data 기능 미구현)"""
#
# platform: Literal["youtube", "instagram"]
# display_name: str = Field(alias="displayName")
# metrics: list[PlatformMetric]
#
# model_config = ConfigDict(
# alias_generator=to_camel,
# populate_by_name=True,
# )
class DashboardResponse(BaseModel):
"""대시보드 전체 응답
@ -255,7 +226,6 @@ class DashboardResponse(BaseModel):
top_content: list[TopContent] = Field(alias="topContent")
audience_data: AudienceData = Field(alias="audienceData")
has_uploads: bool = Field(default=True, alias="hasUploads")
# platform_data: list[PlatformData] = Field(default=[], alias="platformData") # 미사용
model_config = ConfigDict(
alias_generator=to_camel,

View File

@ -4,10 +4,12 @@ Dashboard Services
YouTube Analytics API 연동 데이터 가공 서비스를 제공합니다.
"""
from app.dashboard.services.dashboard_service import DashboardService
from app.dashboard.services.data_processor import DataProcessor
from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService
__all__ = [
"DashboardService",
"YouTubeAnalyticsService",
"DataProcessor",
]

View File

@ -0,0 +1,358 @@
"""
Dashboard Service
대시보드 비즈니스 로직을 담당합니다.
"""
import json
import logging
from datetime import date, datetime, timedelta
from typing import Literal
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dashboard.exceptions import (
YouTubeAccountNotConnectedError,
YouTubeAccountNotFoundError,
YouTubeAccountSelectionRequiredError,
YouTubeTokenExpiredError,
)
from app.dashboard.models import Dashboard
from app.dashboard.utils.redis_cache import get_cache, set_cache
from app.dashboard.schemas import (
AudienceData,
ConnectedAccount,
ContentMetric,
DashboardResponse,
TopContent,
)
from app.dashboard.services.data_processor import DataProcessor
from app.dashboard.services.youtube_analytics import YouTubeAnalyticsService
from app.social.exceptions import TokenExpiredError
from app.social.services import SocialAccountService
from app.user.models import SocialAccount, User
logger = logging.getLogger(__name__)
class DashboardService:
async def get_connected_accounts(
self,
current_user: User,
session: AsyncSession,
) -> list[ConnectedAccount]:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
accounts_raw = result.scalars().all()
connected = []
for acc in accounts_raw:
data = acc.platform_data if isinstance(acc.platform_data, dict) else {}
connected.append(
ConnectedAccount(
id=acc.id,
platform=acc.platform,
platform_username=acc.platform_username,
platform_user_id=acc.platform_user_id,
channel_title=data.get("channel_title"),
connected_at=acc.connected_at,
is_active=acc.is_active,
)
)
logger.info(
f"[ACCOUNTS] YouTube 계정 목록 조회 - "
f"user_uuid={current_user.user_uuid}, count={len(connected)}"
)
return connected
def calculate_date_range(
self, mode: Literal["day", "month"]
) -> tuple[date, date, date, date, date, str]:
"""모드별 날짜 범위 계산. (start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc) 반환"""
today = date.today()
if mode == "day":
end_dt = today - timedelta(days=2)
kpi_end_dt = end_dt
start_dt = end_dt - timedelta(days=29)
prev_start_dt = start_dt - timedelta(days=30)
prev_kpi_end_dt = kpi_end_dt - timedelta(days=30)
period_desc = "최근 30일"
else:
end_dt = today.replace(day=1)
kpi_end_dt = today - timedelta(days=2)
start_month = end_dt.month - 11
if start_month <= 0:
start_month += 12
start_year = end_dt.year - 1
else:
start_year = end_dt.year
start_dt = date(start_year, start_month, 1)
prev_start_dt = start_dt.replace(year=start_dt.year - 1)
try:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1)
except ValueError:
prev_kpi_end_dt = kpi_end_dt.replace(year=kpi_end_dt.year - 1, day=28)
period_desc = "최근 12개월"
return start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc
async def resolve_social_account(
self,
current_user: User,
session: AsyncSession,
platform_user_id: str | None,
) -> SocialAccount:
result = await session.execute(
select(SocialAccount).where(
SocialAccount.user_uuid == current_user.user_uuid,
SocialAccount.platform == "youtube",
SocialAccount.is_active == True, # noqa: E712
)
)
social_accounts_raw = result.scalars().all()
social_accounts = list(social_accounts_raw)
if not social_accounts:
raise YouTubeAccountNotConnectedError()
if platform_user_id is not None:
matched = [a for a in social_accounts if a.platform_user_id == platform_user_id]
if not matched:
raise YouTubeAccountNotFoundError()
return matched[0]
elif len(social_accounts) == 1:
return social_accounts[0]
else:
raise YouTubeAccountSelectionRequiredError()
async def get_video_counts(
self,
current_user: User,
session: AsyncSession,
social_account: SocialAccount,
start_dt: date,
prev_start_dt: date,
prev_kpi_end_dt: date,
) -> tuple[int, int]:
today = date.today()
count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= start_dt,
Dashboard.uploaded_at < today + timedelta(days=1),
)
)
period_video_count = count_result.scalar() or 0
prev_count_result = await session.execute(
select(func.count())
.select_from(Dashboard)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
Dashboard.uploaded_at >= prev_start_dt,
Dashboard.uploaded_at <= prev_kpi_end_dt,
)
)
prev_period_video_count = prev_count_result.scalar() or 0
return period_video_count, prev_period_video_count
async def get_video_ids(
self,
current_user: User,
session: AsyncSession,
social_account: SocialAccount,
) -> tuple[list[str], dict[str, tuple[str, datetime]]]:
result = await session.execute(
select(
Dashboard.platform_video_id,
Dashboard.title,
Dashboard.uploaded_at,
)
.where(
Dashboard.user_uuid == current_user.user_uuid,
Dashboard.platform == "youtube",
Dashboard.platform_user_id == social_account.platform_user_id,
)
.order_by(Dashboard.uploaded_at.desc())
.limit(30)
)
rows = result.all()
video_ids = []
video_lookup: dict[str, tuple[str, datetime]] = {}
for row in rows:
platform_video_id, title, uploaded_at = row
video_ids.append(platform_video_id)
video_lookup[platform_video_id] = (title, uploaded_at)
return video_ids, video_lookup
def build_empty_response(self) -> DashboardResponse:
return DashboardResponse(
content_metrics=[
ContentMetric(id="total-views", label="조회수", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="total-watch-time", label="시청시간", value=0.0, unit="hours", trend=0.0, trend_direction="-"),
ContentMetric(id="avg-view-duration", label="평균 시청시간", value=0.0, unit="minutes", trend=0.0, trend_direction="-"),
ContentMetric(id="new-subscribers", label="신규 구독자", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="likes", label="좋아요", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="comments", label="댓글", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="shares", label="공유", value=0.0, unit="count", trend=0.0, trend_direction="-"),
ContentMetric(id="uploaded-videos", label="업로드 영상", value=0.0, unit="count", trend=0.0, trend_direction="-"),
],
monthly_data=[],
daily_data=[],
top_content=[],
audience_data=AudienceData(age_groups=[], gender={"male": 0, "female": 0}, top_regions=[]),
has_uploads=False,
)
def inject_video_count(
self,
response: DashboardResponse,
period_video_count: int,
prev_period_video_count: int,
) -> None:
for metric in response.content_metrics:
if metric.id == "uploaded-videos":
metric.value = float(period_video_count)
video_trend = float(period_video_count - prev_period_video_count)
metric.trend = video_trend
metric.trend_direction = "up" if video_trend > 0 else ("down" if video_trend < 0 else "-")
break
async def get_stats(
self,
mode: Literal["day", "month"],
platform_user_id: str | None,
current_user: User,
session: AsyncSession,
) -> DashboardResponse:
logger.info(
f"[DASHBOARD] 통계 조회 시작 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, platform_user_id={platform_user_id}"
)
# 1. 날짜 계산
start_dt, end_dt, kpi_end_dt, prev_start_dt, prev_kpi_end_dt, period_desc = (
self.calculate_date_range(mode)
)
start_date = start_dt.strftime("%Y-%m-%d")
end_date = end_dt.strftime("%Y-%m-%d")
kpi_end_date = kpi_end_dt.strftime("%Y-%m-%d")
logger.debug(f"[1] 날짜 계산 완료 - period={period_desc}, start={start_date}, end={end_date}")
# 2. YouTube 계정 확인
social_account = await self.resolve_social_account(current_user, session, platform_user_id)
logger.debug(f"[2] YouTube 계정 확인 완료 - platform_user_id={social_account.platform_user_id}")
# 3. 영상 수 조회
period_video_count, prev_period_video_count = await self.get_video_counts(
current_user, session, social_account, start_dt, prev_start_dt, prev_kpi_end_dt
)
logger.debug(f"[3] 영상 수 - current={period_video_count}, prev={prev_period_video_count}")
# 4. 캐시 조회
cache_key = f"dashboard:{current_user.user_uuid}:{social_account.platform_user_id}:{mode}"
cached_raw = await get_cache(cache_key)
if cached_raw:
try:
payload = json.loads(cached_raw)
logger.info(f"[CACHE HIT] 캐시 반환 - user_uuid={current_user.user_uuid}")
response = DashboardResponse.model_validate(payload["response"])
self.inject_video_count(response, period_video_count, prev_period_video_count)
return response
except (json.JSONDecodeError, KeyError):
logger.warning(f"[CACHE PARSE ERROR] 포맷 오류, 무시 - key={cache_key}")
logger.debug("[4] 캐시 MISS - YouTube API 호출 필요")
# 5. 업로드 영상 조회
video_ids, video_lookup = await self.get_video_ids(current_user, session, social_account)
logger.debug(f"[5] 영상 조회 완료 - count={len(video_ids)}")
if not video_ids:
logger.info(f"[DASHBOARD] 업로드 영상 없음, 빈 응답 반환 - user_uuid={current_user.user_uuid}")
return self.build_empty_response()
# 6. 토큰 유효성 확인
try:
access_token = await SocialAccountService().ensure_valid_token(social_account, session)
except TokenExpiredError:
logger.warning(f"[TOKEN EXPIRED] 재연동 필요 - user_uuid={current_user.user_uuid}")
raise YouTubeTokenExpiredError()
logger.debug("[6] 토큰 유효성 확인 완료")
# 7. YouTube Analytics API 호출
youtube_service = YouTubeAnalyticsService()
raw_data = await youtube_service.fetch_all_metrics(
video_ids=video_ids,
start_date=start_date,
end_date=end_date,
kpi_end_date=kpi_end_date,
access_token=access_token,
mode=mode,
)
logger.debug("[7] YouTube Analytics API 호출 완료")
# 8. TopContent 조립
processor = DataProcessor()
top_content_rows = raw_data.get("top_videos", {}).get("rows", [])
top_content: list[TopContent] = []
for row in top_content_rows[:4]:
if len(row) < 4:
continue
video_id, views, likes, comments = row[0], row[1], row[2], row[3]
meta = video_lookup.get(video_id)
if not meta:
continue
title, uploaded_at = meta
engagement_rate = ((likes + comments) / views * 100) if views > 0 else 0
top_content.append(
TopContent(
id=video_id,
title=title,
thumbnail=f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
platform="youtube",
views=int(views),
engagement=f"{engagement_rate:.1f}%",
date=uploaded_at.strftime("%Y.%m.%d"),
)
)
logger.debug(f"[8] TopContent 조립 완료 - count={len(top_content)}")
# 9. 데이터 가공
dashboard_data = processor.process(raw_data, top_content, 0, mode=mode, end_date=end_date)
logger.debug("[9] 데이터 가공 완료")
# 10. 캐시 저장
cache_payload = json.dumps({"response": dashboard_data.model_dump(mode="json")})
cache_success = await set_cache(cache_key, cache_payload, ttl=43200)
if cache_success:
logger.debug(f"[CACHE SET] 캐시 저장 성공 - key={cache_key}")
else:
logger.warning(f"[CACHE SET] 캐시 저장 실패 - key={cache_key}")
# 11. 업로드 영상 수 주입
self.inject_video_count(dashboard_data, period_video_count, prev_period_video_count)
logger.info(
f"[DASHBOARD] 통계 조회 완료 - "
f"user_uuid={current_user.user_uuid}, mode={mode}, period={period_desc}, videos={len(video_ids)}"
)
return dashboard_data

View File

@ -143,8 +143,8 @@ class DataProcessor:
monthly_data = []
audience_data = self._build_audience_data(
raw_data.get("demographics", {}),
raw_data.get("region", {}),
raw_data.get("demographics") or {},
raw_data.get("region") or {},
)
logger.debug(
f"[DataProcessor.process] SUCCESS - "

View File

@ -141,6 +141,9 @@ class YouTubeAnalyticsService:
results = await asyncio.gather(*tasks, return_exceptions=True)
# 에러 체크 (YouTubeAuthError, YouTubeQuotaExceededError는 원형 그대로 전파)
# demographics(index 5)는 YouTubeAPIError 시 None으로 허용 (YouTube 서버 간헐적 오류 대응)
OPTIONAL_INDICES = {5, 6} # demographics, region
results = list(results)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(
@ -148,6 +151,12 @@ class YouTubeAnalyticsService:
)
if isinstance(result, (YouTubeAuthError, YouTubeQuotaExceededError)):
raise result
if i in OPTIONAL_INDICES and isinstance(result, YouTubeAPIError):
logger.warning(
f"[YouTubeAnalyticsService] 선택적 API 호출 {i+1}/7 실패, None으로 처리: {result}"
)
results[i] = None
continue
raise YouTubeAPIError(f"데이터 조회 실패: {result.__class__.__name__}")
logger.debug(

View File

@ -6,6 +6,7 @@ from sqlalchemy.orm import DeclarativeBase
from app.utils.logger import get_logger
from config import db_settings
import traceback
logger = get_logger("database")
@ -171,6 +172,7 @@ async def get_background_session() -> AsyncGenerator[AsyncSession, None]:
f"error: {type(e).__name__}: {e}, "
f"duration: {(time.perf_counter() - start_time)*1000:.1f}ms"
)
logger.debug(traceback.format_exc())
raise e
finally:
total_time = time.perf_counter() - start_time

View File

@ -787,6 +787,8 @@ async def tag_images_if_not_exist(
if null_tags:
tag_datas = await autotag_images([img.img_url for img in null_tags])
print(tag_datas)
for tag, tag_data in zip(null_tags, tag_datas):
tag.img_tag = tag_data.model_dump(mode="json")

View File

@ -301,6 +301,12 @@ class MarketingIntel(Base):
comment="마케팅 인텔리전스 결과물",
)
subtitle : Mapped[dict[str, Any]] = mapped_column(
JSON,
nullable=True,
comment="자막 정보 생성 결과물",
)
created_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,

View File

@ -41,7 +41,7 @@ from app.lyric.schemas.lyric import (
LyricListItem,
LyricStatusResponse,
)
from app.lyric.worker.lyric_task import generate_lyric_background
from app.lyric.worker.lyric_task import generate_lyric_background, generate_subtitle_background
from app.utils.chatgpt_prompt import ChatgptService
from app.utils.logger import get_logger
from app.utils.pagination import PaginatedResponse, get_paginated
@ -351,7 +351,7 @@ async def generate_lyric(
# ========== Step 4: 백그라운드 태스크 스케줄링 ==========
step4_start = time.perf_counter()
logger.debug(f"[generate_lyric] Step 4: 백그라운드 태스크 스케줄링...")
orientation = request_body.orientation
background_tasks.add_task(
generate_lyric_background,
task_id=task_id,
@ -359,6 +359,12 @@ async def generate_lyric(
lyric_input_data=lyric_input_data,
lyric_id=lyric.id,
)
background_tasks.add_task(
generate_subtitle_background,
orientation = orientation,
task_id=task_id
)
step4_elapsed = (time.perf_counter() - step4_start) * 1000
logger.debug(f"[generate_lyric] Step 4 완료 ({step4_elapsed:.1f}ms)")

View File

@ -23,7 +23,7 @@ Lyric API Schemas
"""
from datetime import datetime
from typing import Optional
from typing import Optional, Literal
from pydantic import BaseModel, ConfigDict, Field
@ -42,7 +42,8 @@ class GenerateLyricRequest(BaseModel):
"region": "군산",
"detail_region_info": "군산 신흥동 말랭이 마을",
"language": "Korean",
"m_id" : 1
"m_id" : 2,
"orientation" : "vertical"
}
"""
@ -54,7 +55,8 @@ class GenerateLyricRequest(BaseModel):
"region": "군산",
"detail_region_info": "군산 신흥동 말랭이 마을",
"language": "Korean",
"m_id" : 1
"m_id" : 1,
"orientation" : "vertical"
}
}
)
@ -68,7 +70,11 @@ class GenerateLyricRequest(BaseModel):
language: str = Field(
default="Korean",
description="가사 출력 언어 (Korean, English, Chinese, Japanese, Thai, Vietnamese)",
)
),
orientation: Literal["horizontal", "vertical"] = Field(
default="vertical",
description="영상 방향 (horizontal: 가로형, vertical: 세로형)",
),
m_id : Optional[int] = Field(None, description="마케팅 인텔리전스 ID 값")

View File

@ -7,11 +7,15 @@ Lyric Background Tasks
import traceback
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError
from app.database.session import BackgroundSessionLocal
from app.home.models import Image, Project, MarketingIntel
from app.lyric.models import Lyric
from app.utils.chatgpt_prompt import ChatgptService, ChatGPTResponseError
from app.utils.subtitles import SubtitleContentsGenerator
from app.utils.creatomate import CreatomateService
from app.utils.prompts.prompts import Prompt
from app.utils.logger import get_logger
@ -158,3 +162,55 @@ async def generate_lyric_background(
elapsed = (time.perf_counter() - task_start) * 1000
logger.error(f"[generate_lyric_background] EXCEPTION - task_id: {task_id}, error: {e} ({elapsed:.1f}ms)", exc_info=True)
await _update_lyric_status(task_id, "failed", f"Error: {str(e)}", lyric_id)
async def generate_subtitle_background(
orientation: str,
task_id: str
) -> None:
logger.info(f"[generate_subtitle_background] task_id: {task_id}, {orientation}")
creatomate_service = CreatomateService(orientation=orientation)
template = await creatomate_service.get_one_template_data(creatomate_service.template_id)
pitchings = creatomate_service.extract_text_format_from_template(template)
subtitle_generator = SubtitleContentsGenerator()
async with BackgroundSessionLocal() as session:
project_result = await session.execute(
select(Project)
.where(Project.task_id == task_id)
.order_by(Project.created_at.desc())
.limit(1)
)
project = project_result.scalar_one_or_none()
marketing_result = await session.execute(
select(MarketingIntel).where(MarketingIntel.id == project.marketing_intelligence)
)
marketing_intelligence = marketing_result.scalar_one_or_none()
store_address = project.detail_region_info
customer_name = project.store_name
logger.info(f"[generate_subtitle_background] customer_name: {customer_name}, {store_address}")
generated_subtitles = await subtitle_generator.generate_subtitle_contents(
marketing_intelligence = marketing_intelligence.intel_result,
pitching_label_list = pitchings,
customer_name = customer_name,
detail_region_info = store_address,
)
pitching_output_list = generated_subtitles.pitching_results
subtitle_modifications = {pitching_output.pitching_tag : pitching_output.pitching_data for pitching_output in pitching_output_list}
logger.info(f"[generate_subtitle_background] subtitle_modifications: {subtitle_modifications}")
async with BackgroundSessionLocal() as session:
marketing_result = await session.execute(
select(MarketingIntel).where(MarketingIntel.id == project.marketing_intelligence)
)
marketing_intelligence = marketing_result.scalar_one_or_none()
marketing_intelligence.subtitle = subtitle_modifications
await session.commit()
logger.info(f"[generate_subtitle_background] task_id: {task_id} DONE")
return

View File

@ -7,7 +7,7 @@ from sqlalchemy import Connection, text
from sqlalchemy.exc import SQLAlchemyError
from app.utils.logger import get_logger
from app.lyrics.schemas.lyrics_schema import (
from app.lyric.schemas.lyrics_schema import (
AttributeData,
PromptTemplateData,
SongFormData,

View File

@ -27,7 +27,20 @@ async def autotag_images(image_url_list : list[str]) -> list[dict]: #tag_list
"motion_recommended" : list(MotionRecommended)
}for image_url in image_url_list]
image_result_tasks = [chatgpt.generate_structured_output(image_autotag_prompt, image_input_data, image_input_data['img_url'], False) for image_input_data in image_input_data_list]
image_result_list = await asyncio.gather(*image_result_tasks)
image_result_tasks = [chatgpt.generate_structured_output(image_autotag_prompt, image_input_data, image_input_data['img_url'], False, silent = True) for image_input_data in image_input_data_list]
image_result_list = await asyncio.gather(*image_result_tasks, return_exceptions=True)
MAX_RETRY = 3 # 하드코딩, 어떻게 처리할지는 나중에
for _ in range(MAX_RETRY):
failed_idx = [i for i, r in enumerate(image_result_list) if isinstance(r, Exception)]
print("Failed", failed_idx)
if not failed_idx:
break
retried = await asyncio.gather(
*[chatgpt.generate_structured_output(image_autotag_prompt, image_input_data_list[i], image_input_data_list[i]['img_url'], False, silent=True) for i in failed],
return_exceptions=True
)
for i, result in zip(failed_idx, retried):
image_result_list[i] = result
print("Failed", failed_idx)
return image_result_list

View File

@ -101,12 +101,14 @@ class ChatgptService:
prompt : Prompt,
input_data : dict,
img_url : Optional[str] = None,
img_detail_high : bool = False
img_detail_high : bool = False,
silent : bool = False
) -> BaseModel:
prompt_text = prompt.build_prompt(input_data)
prompt_text = prompt.build_prompt(input_data, silent)
logger.debug(f"[ChatgptService] Generated Prompt (length: {len(prompt_text)})")
logger.info(f"[ChatgptService] Starting GPT request with structured output with model: {prompt.prompt_model}")
if not silent:
logger.info(f"[ChatgptService] Starting GPT request with structured output with model: {prompt.prompt_model}")
# GPT API 호출
#response = await self._call_structured_output_with_response_gpt_api(prompt_text, prompt.prompt_output, prompt.prompt_model)

View File

@ -31,11 +31,13 @@ response = await creatomate.make_creatomate_call(template_id, modifications)
import copy
import time
from enum import StrEnum
from typing import Literal
import httpx
from app.utils.logger import get_logger
from app.utils.prompts.schemas.image import SpaceType,Subject,Camera,MotionRecommended,NarrativePhase
from config import apikey_settings, creatomate_settings, recovery_settings
# 로거 설정
@ -220,6 +222,28 @@ autotext_template_h_1 = {
"stroke_color": "#333333",
"stroke_width": "0.2 vmin"
}
DVST0001 = "75161273-0422-4771-adeb-816bd7263fb0"
DVST0002 = "c68cf750-bc40-485a-a2c5-3f9fe301e386"
DVST0003 = "e1fb5b00-1f02-4f63-99fa-7524b433ba47"
DHST0001 = "660be601-080a-43ea-bf0f-adcf4596fa98"
DHST0002 = "3f194cc7-464e-4581-9db2-179d42d3e40f"
DHST0003 = "f45df555-2956-4a13-9004-ead047070b3d"
DVST0001T = "fe11aeab-ff29-4bc8-9f75-c695c7e243e6"
HST_LIST = [DHST0001,DHST0002,DHST0003]
VST_LIST = [DVST0001,DVST0002,DVST0003, DVST0001T]
SCENE_TRACK = 1
AUDIO_TRACK = 2
SUBTITLE_TRACK = 3
KEYWORD_TRACK = 4
def select_template(orientation:OrientationType):
if orientation == "horizontal":
return DHST0001
elif orientation == "vertical":
return DVST0001T
else:
raise
async def get_shared_client() -> httpx.AsyncClient:
"""공유 HTTP 클라이언트를 반환합니다. 없으면 생성합니다."""
@ -264,23 +288,10 @@ class CreatomateService:
BASE_URL = "https://api.creatomate.com"
# 템플릿 설정 (config에서 가져옴)
TEMPLATE_CONFIG = {
"horizontal": {
"template_id": creatomate_settings.TEMPLATE_ID_HORIZONTAL,
"duration": creatomate_settings.TEMPLATE_DURATION_HORIZONTAL,
},
"vertical": {
"template_id": creatomate_settings.TEMPLATE_ID_VERTICAL,
"duration": creatomate_settings.TEMPLATE_DURATION_VERTICAL,
},
}
def __init__(
self,
api_key: str | None = None,
orientation: OrientationType = "vertical",
target_duration: float | None = None,
orientation: OrientationType = "vertical"
):
"""
Args:
@ -294,14 +305,7 @@ class CreatomateService:
self.orientation = orientation
# orientation에 따른 템플릿 설정 가져오기
config = self.TEMPLATE_CONFIG.get(
orientation, self.TEMPLATE_CONFIG["vertical"]
)
self.template_id = config["template_id"]
self.target_duration = (
target_duration if target_duration is not None else config["duration"]
)
self.template_id = select_template(orientation)
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
@ -398,14 +402,6 @@ class CreatomateService:
return copy.deepcopy(data)
# 하위 호환성을 위한 별칭 (deprecated)
async def get_one_template_data_async(self, template_id: str) -> dict:
"""특정 템플릿 ID로 템플릿 정보를 조회합니다.
Deprecated: get_one_template_data() 사용하세요.
"""
return await self.get_one_template_data(template_id)
def parse_template_component_name(self, template_source: list) -> dict:
"""템플릿 정보를 파싱하여 리소스 이름을 추출합니다."""
@ -440,75 +436,77 @@ class CreatomateService:
return tag_list
async def template_matching_taged_image(
def template_matching_taged_image(
self,
template_id : str,
taged_image_list : list,
address : str
template : dict,
taged_image_list : list, # [{"image_name" : str , "image_tag" : dict}]
music_url: str,
address : str,
duplicate : bool = False
) -> list:
template_data = await self.get_one_template_data(template_id)
source_elements = template_data["source"]["elements"]
source_elements = template["source"]["elements"]
template_component_data = self.parse_template_component_name(source_elements)
modifications = {}
for idx, (template_component_name, template_type) in enumerate(template_component_data.items()):
for slot_idx, (template_component_name, template_type) in enumerate(template_component_data.items()):
match template_type:
case "image":
# modifications[template_component_name] = somethingtagedimage()
image_score_list = self.calculate_image_slot_score_multi(taged_image_list, template_component_name)
maximum_idx = image_score_list.index(max(image_score_list))
if duplicate:
selected = taged_image_list[maximum_idx]
else:
selected = taged_image_list.pop(maximum_idx)
image_name = selected["image_url"]
modifications[template_component_name] =image_name
pass
case "text":
if "address_input" in template_component_name:
modifications[template_component_name] = address
# modifications["audio-music"] = music_url
async def template_connect_resource_blackbox(
self,
template_id: str,
image_url_list: list[str],
lyric: str,
music_url: str,
address: str = None
) -> dict:
"""템플릿 정보와 이미지/가사/음악 리소스를 매핑합니다.
Note:
- 이미지는 순차적으로 집어넣기
- 가사는 개행마다 텍스트 삽입
- Template에 audio-music 항목이 있어야
"""
template_data = await self.get_one_template_data(template_id)
template_component_data = self.parse_template_component_name(
template_data["source"]["elements"]
)
lyric = lyric.replace("\r", "")
lyric_splited = lyric.split("\n")
modifications = {}
for idx, (template_component_name, template_type) in enumerate(
template_component_data.items()
):
match template_type:
case "image":
modifications[template_component_name] = image_url_list[
idx % len(image_url_list)
]
case "text":
if "address_input" in template_component_name:
modifications[template_component_name] = address
modifications["audio-music"] = music_url
return modifications
def calculate_image_slot_score_multi(self, taged_image_list : list[dict], slot_name : str):
image_tag_list = [taged_image["image_tag"] for taged_image in taged_image_list]
slot_tag_dict = self.parse_slot_name_to_tag(slot_name)
image_score_list = [0] * len(image_tag_list)
for slot_tag_cate, slot_tag_item in slot_tag_dict.items():
if slot_tag_cate == "narrative_preference":
slot_tag_narrative = slot_tag_item
continue
for idx, image_tag in enumerate(image_tag_list):
if slot_tag_item.value in image_tag[slot_tag_cate]: #collect!
image_score_list[idx] += 1 / (len(image_tag) - 1)
for idx, image_tag in enumerate(image_tag_list):
image_narrative_score = image_tag["narrative_preference"][slot_tag_narrative]
image_score_list[idx] = image_score_list[idx] * image_narrative_score
return image_score_list
def parse_slot_name_to_tag(self, slot_name : str) -> dict[str, StrEnum]:
tag_list = slot_name.split("-")
space_type = SpaceType(tag_list[0])
subject = Subject(tag_list[1])
camera = Camera(tag_list[2])
motion = MotionRecommended(tag_list[3])
narrative = NarrativePhase(tag_list[4])
tag_dict = {
"space_type" : space_type,
"subject" : subject,
"camera" : camera,
"motion_recommended" : motion,
"narrative_preference" : narrative,
}
return tag_dict
def elements_connect_resource_blackbox(
self,
elements: list,
image_url_list: list[str],
lyric: str,
music_url: str,
address: str = None
) -> dict:
@ -704,14 +702,6 @@ class CreatomateService:
original_response={"last_error": str(last_error)},
)
# 하위 호환성을 위한 별칭 (deprecated)
async def make_creatomate_custom_call_async(self, source: dict) -> dict:
"""템플릿 없이 Creatomate에 커스텀 렌더링 요청을 보냅니다.
Deprecated: make_creatomate_custom_call() 사용하세요.
"""
return await self.make_creatomate_custom_call(source)
async def get_render_status(self, render_id: str) -> dict:
"""렌더링 작업의 상태를 조회합니다.
@ -735,47 +725,57 @@ class CreatomateService:
response.raise_for_status()
return response.json()
# 하위 호환성을 위한 별칭 (deprecated)
async def get_render_status_async(self, render_id: str) -> dict:
"""렌더링 작업의 상태를 조회합니다.
Deprecated: get_render_status() 사용하세요.
"""
return await self.get_render_status(render_id)
def calc_scene_duration(self, template: dict) -> float:
"""템플릿의 전체 장면 duration을 계산합니다."""
total_template_duration = 0.0
track_maximum_duration = {
SCENE_TRACK : 0,
SUBTITLE_TRACK : 0,
KEYWORD_TRACK : 0
}
for elem in template["source"]["elements"]:
try:
if elem["type"] == "audio":
if elem["track"] not in track_maximum_duration:
continue
total_template_duration += elem["duration"]
if "animations" not in elem:
continue
for animation in elem["animations"]:
assert animation["time"] == 0 # 0이 아닌 경우 확인 필요
if animation["transition"]:
total_template_duration -= animation["duration"]
if elem["time"] == 0: # elem is auto / 만약 마지막 elem이 auto인데 그 앞에 time이 있는 elem 일 시 버그 발생 확률 있음
track_maximum_duration[elem["track"]] += elem["duration"]
if "animations" not in elem:
continue
for animation in elem["animations"]:
assert animation["time"] == 0 # 0이 아닌 경우 확인 필요
if animation["transition"]:
track_maximum_duration[elem["track"]] -= animation["duration"]
else:
track_maximum_duration[elem["track"]] = max(track_maximum_duration[elem["track"]], elem["time"] + elem["duration"])
except Exception as e:
logger.error(f"[calc_scene_duration] Error processing element: {elem}, {e}")
total_template_duration = max(track_maximum_duration.values())
return total_template_duration
def extend_template_duration(self, template: dict, target_duration: float) -> dict:
"""템플릿의 duration을 target_duration으로 확장합니다."""
template["duration"] = target_duration + 0.5 # 늘린것보단 짧게
target_duration += 1 # 수동으로 직접 변경 및 테스트 필요 : 파란박스 생기는것
# template["duration"] = target_duration + 0.5 # 늘린것보단 짧게
# target_duration += 1 # 수동으로 직접 변경 및 테스트 필요 : 파란박스 생기는것
total_template_duration = self.calc_scene_duration(template)
extend_rate = target_duration / total_template_duration
new_template = copy.deepcopy(template)
for elem in new_template["source"]["elements"]:
try:
if elem["type"] == "audio":
# if elem["type"] == "audio":
# continue
if elem["track"] == AUDIO_TRACK : # audio track은 패스
continue
elem["duration"] = elem["duration"] * extend_rate
if "time" in elem:
elem["time"] = elem["time"] * extend_rate
if "duration" in elem:
elem["duration"] = elem["duration"] * extend_rate
if "animations" not in elem:
continue
for animation in elem["animations"]:
@ -816,4 +816,25 @@ class CreatomateService:
return autotext_template_v_1
case "horizontal":
return autotext_template_h_1
def extract_text_format_from_template(self, template:dict):
keyword_list = []
subtitle_list = []
for elem in template["source"]["elements"]:
try: #최상위 내 텍스트만 검사
if elem["type"] == "text":
if elem["track"] == SUBTITLE_TRACK:
subtitle_list.append(elem["name"])
elif elem["track"] == KEYWORD_TRACK:
keyword_list.append(elem["name"])
except Exception as e:
logger.error(
f"[extend_template_duration] Error processing element: {elem}, {e}"
)
try:
assert(len(keyword_list)==len(subtitle_list))
except Exception as E:
logger.error("this template does not have same amount of keyword and subtitle.")
pitching_list = keyword_list + subtitle_list
return pitching_list

View File

@ -3,6 +3,7 @@ from pydantic import BaseModel
from config import prompt_settings
from app.utils.logger import get_logger
from app.utils.prompts.schemas import *
from functools import lru_cache
logger = get_logger("prompt")
@ -30,12 +31,13 @@ class Prompt():
return prompt_template
def build_prompt(self, input_data:dict) -> str:
def build_prompt(self, input_data:dict, silent:bool = False) -> str:
verified_input = self.prompt_input_class(**input_data)
build_template = self.prompt_template
build_template = build_template.format(**verified_input.model_dump())
logger.debug(f"build_template: {build_template}")
logger.debug(f"input_data: {input_data}")
if not silent:
logger.debug(f"build_template: {build_template}")
logger.debug(f"input_data: {input_data}")
return build_template
marketing_prompt = Prompt(
@ -66,6 +68,15 @@ image_autotag_prompt = Prompt(
prompt_model = prompt_settings.IMAGE_TAG_PROMPT_MODEL
)
@lru_cache()
def create_dynamic_subtitle_prompt(length : int) -> Prompt:
prompt_template_path=os.path.join(prompt_settings.PROMPT_FOLDER_ROOT, prompt_settings.SUBTITLE_PROMPT_FILE_NAME)
prompt_input_class = SubtitlePromptInput
prompt_output_class = SubtitlePromptOutput[length]
prompt_model = prompt_settings.SUBTITLE_PROMPT_MODEL
return Prompt(prompt_template_path, prompt_input_class, prompt_output_class, prompt_model)
def reload_all_prompt():
marketing_prompt._reload_prompt()
lyric_prompt._reload_prompt()

View File

@ -1,4 +1,5 @@
from .lyric import LyricPromptInput, LyricPromptOutput
from .marketing import MarketingPromptInput, MarketingPromptOutput
from .youtube import YTUploadPromptInput, YTUploadPromptOutput
from .image import *
from .image import *
from .subtitle import SubtitlePromptInput, SubtitlePromptOutput

View File

@ -0,0 +1,31 @@
from pydantic import BaseModel, create_model, Field
from typing import List, Optional
from functools import lru_cache
# Input 정의
class SubtitlePromptInput(BaseModel):
marketing_intelligence : str = Field(..., description="마케팅 인텔리전스 정보")
pitching_tag_list_string : str = Field(..., description="필요한 피칭 레이블 리스트 stringify")
customer_name : str = Field(..., description = "마케팅 대상 사업체 이름")
detail_region_info : str = Field(..., description = "마케팅 대상 지역 상세")
#subtillecars :
# Output 정의
class PitchingOutput(BaseModel):
pitching_tag: str = Field(..., description="피칭 레이블")
pitching_data: str = Field(..., description = "피칭 내용물")
class SubtitlePromptOutput(BaseModel):
pitching_results: List[PitchingOutput] = Field(..., description = "피칭 리스트")
@classmethod
@lru_cache()
def __class_getitem__(cls, n: int):
return create_model(
cls.__name__,
pitching_results=(
List[PitchingOutput],
Field(..., min_length=n, max_length=n, description="피칭 리스트")
),
)

View File

@ -26,7 +26,7 @@ Act as a Senior Brand Strategist and Marketing Data Analyst. Your goal is to ana
### 3. target_persona
Generate a list of personas based on the following:
* **`persona`**: Provide a descriptive name and profile for the target group.
* **`persona`**: Provide a descriptive name and profile for the target group. Must be **20 characters or fewer**.
* **`age`**: Set `min_age` and `max_age` (Integer 0-100) that accurately reflects the segment.
* **`favor_target`**: List specific elements or vibes this persona prefers (e.g., "Minimalist interior", "Pet-friendly facilities").
* **`decision_trigger`**: Identify the specific "Hook" or facility that leads this persona to finalize a booking.

View File

@ -0,0 +1,87 @@
당신은 숙박 브랜드 숏폼 영상의 자막 콘텐츠를 추출하는 전문가입니다.
입력으로 주어지는 **1) 5가지 기준의 레이어 이름 리스트**와 **2) 마케팅 인텔리전스 분석 결과(JSON)**를 바탕으로, 각 레이어 이름의 의미에 정확히 1:1 매칭되는 텍스트 콘텐츠만을 추출하세요.
분석 결과에 없는 정보는 절대 지어내거나 추론하지 마세요. 오직 제공된 JSON 데이터 내에서만 텍스트를 구성해야 합니다.
---
## 1. 레이어 네이밍 규칙 해석 및 매핑 가이드
입력되는 모든 레이어 이름은 예외 없이 `<track_role>-<narrative_phase>-<content_type>-<tone>-<pair_id>` 의 5단계 구조로 되어 있습니다.
마지막의 3자리 숫자 ID(`-001`, `-002` 등)는 모든 레이어에 필수적으로 부여됩니다.
### [1] track_role (텍스트 형태)
- `subtitle`: 씬 상황을 설명하는 간결한 문장형 텍스트 (1줄 이내)
- `keyword`: 씬을 상징하고 시선을 끄는 단답형/명사형 텍스트 (1~2단어)
### [2] narrative_phase (영상 흐름)
- `intro`: 영상 도입부. 가장 시선을 끄는 정보를 배치.
- `core`: 핵심 매력이나 주요 편의 시설 어필.
- `highlight`: 세부적인 매력 포인트나 공간의 특별한 분위기 묘사.
- `outro`: 영상 마무리. 브랜드 명칭 복기 및 타겟/위치 정보 제공.
### [3] content_type (데이터 매핑 대상)
- `hook_claim` 👉 `selling_points`에서 점수가 가장 높은 1순위 소구점이나 `market_positioning.core_value`를 활용하여 가장 강력한 핵심 세일즈 포인트를 어필. (가장 강력한 셀링포인트를 의미함)
- `selling_point` 👉 `selling_points`의 `description`, `korean_category` 등을 narrative 흐름에 맞춰 순차적으로 추출.
- `brand_name` 👉 JSON의 `store_name`을 추출.
- `location_info` 👉 JSON의 `detail_region_info`를 요약.
- `target_tag` 👉 `target_persona`나 `target_keywords`에서 타겟 고객군 또는 해시태그 추출.
### [4] tone (텍스트 어조)
- `sensory`: 직관적이고 감각적인 단어 사용
- `factual`: 과장 없이 사실 정보를 담백하게 전달
- `empathic`: 고객의 상황에 공감하는 따뜻한 어조
- `aspirational`: 열망을 자극하고 기대감을 주는 느낌
### [5] pair_id (씬 묶음 식별 번호)
- 텍스트 레이어는 `subtitle`과 `keyword`가 하나의 페어(Pair)를 이뤄 하나의 씬(Scene)에서 함께 등장합니다.
- 따라서 **동일한 씬에 속하는 `subtitle`과 `keyword` 레이어는 동일한 3자리 순번 ID(예: `-001`)**를 공유합니다.
- 영상 전반적인 씬 전개 순서에 따라 **다음 씬으로 넘어갈 때마다 ID가 순차적으로 증가**합니다. (예: 씬1은 `-001`, 씬2는 `-002`, 씬3은 `-003`...)
- **중요**: ID가 달라진다는 것은 '새로운 씬' 혹은 '다른 텍스트 쌍'을 의미하므로, **ID가 바뀌면 반드시 JSON 내의 다른 소구점이나 데이터를 추출**하여 내용이 중복되지 않도록 해야 합니다.
---
## 2. 콘텐츠 추출 시 주의사항
1. 각 입력 레이어 이름 1개당 **오직 1개의 텍스트 콘텐츠**만 매핑하여 출력합니다. (레이어명 이름 자체를 수정하거나 새로 만들지 마세요.)
2. `content_type`이 `selling_point`로 동일하더라도, `narrative_phase`(core, highlight)나 `tone`이 달라지면 JSON 내의 2순위, 3순위 세일즈 포인트를 순차적으로 활용하여 내용 겹침을 방지하세요.
3. 같은 씬에 속하는(같은 ID 번호를 가진) keyword는 핵심 단어로, subtitle은 적절한 마케팅 문구가 되어야 하며, 자연스럽게 이어지는 문맥을 형성하도록 구성하세요.
4. keyword가 subtitle에 완전히 포함되는 단어가 되지 않도록 유의하세요.
5. 정보 태그가 같더라도 ID가 다르다면 중복되지 않는 새로운 텍스트를 도출해야 합니다.
6. 콘텐츠 추출 시 마케팅 인텔리전스의 내용을 그대로 사용하기보다는 paraphrase을 수행하세요.
7. keyword는 공백 포함 전각 8자 / 반각 16자내, subtitle은 전각 15자 / 반각 30자 내로 구성하세요.
---
## 3. 출력 결과 포맷 및 예시
입력된 레이어 이름 순서에 맞춰, 매핑된 텍스트 콘텐츠만 작성하세요. (반드시 intro, core, highlight, outro 등 모든 씬 단계가 명확하게 매핑되어야 합니다.)
### 입력 레이어 리스트 예시 및 출력 예시
| Layer Name | Text Content |
|---|---|
| subtitle-intro-hook_claim-aspirational-001 | 반려견과 눈치 없이 온전하게 쉬는 완벽한 휴식 |
| keyword-intro-brand_name-sensory-001 | 스테이펫 홍천 |
| subtitle-core-selling_point-empathic-002 | 우리만의 독립된 공간감이 주는 진정한 쉼 |
| keyword-core-selling_point-factual-002 | 프라이빗 독채 |
| subtitle-highlight-selling_point-sensory-003 | 탁 트인 야외 무드존과 포토 스팟의 감성 컷 |
| keyword-highlight-selling_point-factual-003 | 넓은 정원 |
| subtitle-outro-target_tag-empathic-004 | #강원도애견동반 #주말숏브레이크 |
| keyword-outro-location_info-factual-004 | 강원 홍천군 화촌면 |
# 입력
**입력 1: 레이어 이름 리스트**
{pitching_tag_list_string}
**입력 2: 마케팅 인텔리전스 JSON**
{marketing_intelligence}
**입력 3: 비즈니스 정보 **
Business Name: {customer_name}
Region Details: {detail_region_info}

32
app/utils/subtitles.py Normal file
View File

@ -0,0 +1,32 @@
import copy
import time
import json
from typing import Literal, Any
import httpx
from app.utils.logger import get_logger
from app.utils.chatgpt_prompt import ChatgptService
from app.utils.prompts.schemas import *
from app.utils.prompts.prompts import *
class SubtitleContentsGenerator():
def __init__(self):
self.chatgpt_service = ChatgptService()
async def generate_subtitle_contents(self, marketing_intelligence : dict[str, Any], pitching_label_list : list[Any], customer_name : str, detail_region_info : str) -> SubtitlePromptOutput:
dynamic_subtitle_prompt = create_dynamic_subtitle_prompt(len(pitching_label_list))
pitching_label_string = "\n".join(pitching_label_list)
marketing_intel_string = json.dumps(marketing_intelligence, ensure_ascii=False)
input_data = {
"marketing_intelligence" : marketing_intel_string ,
"pitching_tag_list_string" : pitching_label_string,
"customer_name" : customer_name,
"detail_region_info" : detail_region_info,
}
output_data = await self.chatgpt_service.generate_structured_output(dynamic_subtitle_prompt, input_data)
return output_data

View File

@ -14,6 +14,8 @@ Video API Router
"""
import json
import asyncio
from typing import Literal
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query
@ -23,10 +25,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session
from app.user.dependencies.auth import get_current_user
from app.user.models import User
from app.home.models import Image, Project
from app.home.models import Image, Project, MarketingIntel, ImageTag
from app.lyric.models import Lyric
from app.song.models import Song, SongTimestamp
from app.utils.creatomate import CreatomateService
from app.utils.subtitles import SubtitleContentsGenerator
from app.utils.logger import get_logger
from app.video.models import Video
from app.video.schemas.video_schema import (
@ -36,6 +39,7 @@ from app.video.schemas.video_schema import (
VideoRenderData,
)
from app.video.worker.video_task import download_and_upload_video_to_blob
from app.video.services.video import get_image_tags_by_task_id
from config import creatomate_settings
@ -144,6 +148,34 @@ async def generate_video(
image_urls: list[str] = []
try:
subtitle_done = False
count = 0
async with AsyncSessionLocal() as session:
project_result = await session.execute(
select(Project)
.where(Project.task_id == task_id)
.order_by(Project.created_at.desc())
.limit(1)
)
project = project_result.scalar_one_or_none()
while not subtitle_done:
async with AsyncSessionLocal() as session:
logger.info(f"[generate_video] Checking subtitle- task_id: {task_id}, count : {count}")
marketing_result = await session.execute(
select(MarketingIntel).where(MarketingIntel.id == project.marketing_intelligence)
)
marketing_intelligence = marketing_result.scalar_one_or_none()
subtitle_done = bool(marketing_intelligence.subtitle)
if subtitle_done:
logger.info(f"[generate_video] Check subtitle done task_id: {task_id}")
break
await asyncio.sleep(5)
if count > 60 :
raise Exception("subtitle 결과 생성 실패")
count += 1
# 세션을 명시적으로 열고 DB 작업 후 바로 닫음
async with AsyncSessionLocal() as session:
# ===== 순차 쿼리 실행: Project, Lyric, Song, Image =====
@ -198,6 +230,12 @@ async def generate_video(
)
project_id = project.id
store_address = project.detail_region_info
# customer_name = project.store_name
marketing_result = await session.execute(
select(MarketingIntel).where(MarketingIntel.id == project.marketing_intelligence)
)
marketing_intelligence = marketing_result.scalar_one_or_none()
# ===== 결과 처리: Lyric =====
lyric = lyric_result.scalar_one_or_none()
@ -287,51 +325,61 @@ async def generate_video(
# 2단계: 외부 API 호출 (세션 사용 안함 - 커넥션 풀 점유 없음)
# ==========================================================================
stage2_start = time.perf_counter()
try:
logger.info(
f"[generate_video] Stage 2 START - Creatomate API - task_id: {task_id}"
)
creatomate_service = CreatomateService(
orientation=orientation,
target_duration=song_duration,
orientation=orientation
)
logger.debug(
f"[generate_video] Using template_id: {creatomate_service.template_id}, duration: {creatomate_service.target_duration} (song duration: {song_duration})"
f"[generate_video] Using template_id: {creatomate_service.template_id}, (song duration: {song_duration})"
)
# 6-1. 템플릿 조회 (비동기)
template = await creatomate_service.get_one_template_data_async(
template = await creatomate_service.get_one_template_data(
creatomate_service.template_id
)
logger.debug(f"[generate_video] Template fetched - task_id: {task_id}")
# 6-2. elements에서 리소스 매핑 생성
modifications = creatomate_service.elements_connect_resource_blackbox(
elements=template["source"]["elements"],
image_url_list=image_urls,
lyric=lyrics,
music_url=music_url,
address=store_address
# modifications = creatomate_service.elements_connect_resource_blackbox(
# elements=template["source"]["elements"],
# image_url_list=image_urls,
# music_url=music_url,
# address=store_address
taged_image_list = await get_image_tags_by_task_id(task_id)
modifications = creatomate_service.template_matching_taged_image(
template = template,
taged_image_list = taged_image_list,
music_url = music_url,
address = store_address,
duplicate = True,
)
logger.debug(f"[generate_video] Modifications created - task_id: {task_id}")
subtitle_modifications = marketing_intelligence.subtitle
modifications.update(subtitle_modifications)
# 6-3. elements 수정
new_elements = creatomate_service.modify_element(
template["source"]["elements"],
modifications,
)
template["source"]["elements"] = new_elements
logger.debug(f"[generate_video] Elements modified - task_id: {task_id}")
# 6-4. duration 확장
final_template = creatomate_service.extend_template_duration(
template,
creatomate_service.target_duration,
)
logger.debug(
f"[generate_video] Duration extended to {creatomate_service.target_duration}s - task_id: {task_id}"
song_duration,
)
logger.debug(f"[generate_video] Duration extended - task_id: {task_id}")
song_timestamp_result = await session.execute(
select(SongTimestamp).where(
SongTimestamp.suno_audio_id == song.suno_audio_id
@ -339,13 +387,10 @@ async def generate_video(
)
song_timestamp_list = song_timestamp_result.scalars().all()
logger.debug(
f"[generate_video] song_timestamp_list count: {len(song_timestamp_list)}"
)
logger.debug(f"[generate_video] song_timestamp_list count: {len(song_timestamp_list)}")
for i, ts in enumerate(song_timestamp_list):
logger.debug(
f"[generate_video] timestamp[{i}]: lyric_line={ts.lyric_line}, start_time={ts.start_time}, end_time={ts.end_time}"
)
logger.debug(f"[generate_video] timestamp[{i}]: lyric_line={ts.lyric_line}, start_time={ts.start_time}, end_time={ts.end_time}")
match lyric_language:
case "English" :
@ -355,33 +400,32 @@ async def generate_video(
lyric_font = "Noto Sans"
# LYRIC AUTO 결정부
if (creatomate_settings.DEBUG_AUTO_LYRIC):
auto_text_template = creatomate_service.get_auto_text_template()
final_template["source"]["elements"].append(creatomate_service.auto_lyric(auto_text_template))
else :
text_template = creatomate_service.get_text_template()
for idx, aligned in enumerate(song_timestamp_list):
caption = creatomate_service.lining_lyric(
text_template,
idx,
aligned.lyric_line,
aligned.start_time,
aligned.end_time,
lyric_font
)
final_template["source"]["elements"].append(caption)
if (creatomate_settings.LYRIC_SUBTITLE):
if (creatomate_settings.DEBUG_AUTO_LYRIC):
auto_text_template = creatomate_service.get_auto_text_template()
final_template["source"]["elements"].append(creatomate_service.auto_lyric(auto_text_template))
else :
text_template = creatomate_service.get_text_template()
for idx, aligned in enumerate(song_timestamp_list):
caption = creatomate_service.lining_lyric(
text_template,
idx,
aligned.lyric_line,
aligned.start_time,
aligned.end_time,
lyric_font
)
final_template["source"]["elements"].append(caption)
# END - LYRIC AUTO 결정부
# logger.debug(
# f"[generate_video] final_template: {json.dumps(final_template, indent=2, ensure_ascii=False)}"
# )
# 6-5. 커스텀 렌더링 요청 (비동기)
render_response = await creatomate_service.make_creatomate_custom_call_async(
render_response = await creatomate_service.make_creatomate_custom_call(
final_template["source"],
)
logger.debug(
f"[generate_video] Creatomate API response - task_id: {task_id}, response: {render_response}"
)
logger.debug(f"[generate_video] Creatomate API response - task_id: {task_id}, response: {render_response}")
# 렌더 ID 추출
if isinstance(render_response, list) and len(render_response) > 0:
@ -402,6 +446,8 @@ async def generate_video(
logger.error(
f"[generate_video] Creatomate API EXCEPTION - task_id: {task_id}, error: {e}"
)
import traceback
logger.error(traceback.format_exc())
# 외부 API 실패 시 Video 상태를 failed로 업데이트
from app.database.session import AsyncSessionLocal
@ -521,17 +567,13 @@ async def get_video_status(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> PollingVideoResponse:
"""creatomate_render_id로 영상 생성 작업의 상태를 조회합니다.
succeeded 상태인 경우 백그라운드에서 MP4 파일을 다운로드하고
Video 테이블의 status를 completed로, result_movie_url을 업데이트합니다.
"""
logger.info(
f"[get_video_status] START - creatomate_render_id: {creatomate_render_id}"
)
try:
creatomate_service = CreatomateService()
result = await creatomate_service.get_render_status_async(creatomate_render_id)
result = await creatomate_service.get_render_status(creatomate_render_id)
logger.debug(
f"[get_video_status] Creatomate API response - creatomate_render_id: {creatomate_render_id}, status: {result.get('status')}"
)

File diff suppressed because it is too large Load Diff

View File

@ -170,7 +170,11 @@ class CreatomateSettings(BaseSettings):
)
DEBUG_AUTO_LYRIC: bool = Field(
default=False,
description="Creatomate 자동 가사 생성 기능 사용 여부",
description="Creatomate 자체 자동 가사 생성 기능 사용 여부",
)
LYRIC_SUBTITLE: bool = Field(
default=False,
description="영상 가사 표기 여부"
)
model_config = _base_config
@ -189,6 +193,10 @@ class PromptSettings(BaseSettings):
IMAGE_TAG_PROMPT_FILE_NAME : str = Field(default="yt_upload_prompt.txt")
IMAGE_TAG_PROMPT_MODEL : str = Field(default="gpt-5-mini")
SUBTITLE_PROMPT_FILE_NAME : str = Field(...)
SUBTITLE_PROMPT_MODEL : str = Field(...)
model_config = _base_config