""" YouTube Uploader YouTube Data API v3를 사용한 영상 업로더입니다. Resumable Upload를 지원합니다. """ import json import logging import os from typing import Any, Callable, Optional import httpx from config import social_upload_settings from app.social.constants import PrivacyStatus, SocialPlatform from app.social.exceptions import UploadError, UploadQuotaExceededError from app.social.uploader.base import BaseSocialUploader, UploadMetadata, UploadResult logger = logging.getLogger(__name__) class YouTubeUploader(BaseSocialUploader): """ YouTube 영상 업로더 YouTube Data API v3의 Resumable Upload를 사용하여 대용량 영상을 안정적으로 업로드합니다. """ platform = SocialPlatform.YOUTUBE # YouTube API 엔드포인트 UPLOAD_URL = "https://www.googleapis.com/upload/youtube/v3/videos" VIDEOS_URL = "https://www.googleapis.com/youtube/v3/videos" # 청크 크기 (5MB - YouTube 권장) CHUNK_SIZE = 5 * 1024 * 1024 def __init__(self) -> None: self.timeout = social_upload_settings.UPLOAD_TIMEOUT_SECONDS async def upload( self, video_path: str, access_token: str, metadata: UploadMetadata, progress_callback: Optional[Callable[[int], None]] = None, ) -> UploadResult: """ YouTube에 영상 업로드 (Resumable Upload) Args: video_path: 업로드할 영상 파일 경로 access_token: OAuth 액세스 토큰 metadata: 업로드 메타데이터 progress_callback: 진행률 콜백 함수 (0-100) Returns: UploadResult: 업로드 결과 """ logger.info(f"[YOUTUBE_UPLOAD] 업로드 시작 - video_path: {video_path}") # 1. 메타데이터 유효성 검증 self.validate_metadata(metadata) # 2. 파일 크기 확인 if not os.path.exists(video_path): logger.error(f"[YOUTUBE_UPLOAD] 파일 없음 - path: {video_path}") return UploadResult( success=False, error_message=f"파일을 찾을 수 없습니다: {video_path}", ) file_size = os.path.getsize(video_path) logger.info(f"[YOUTUBE_UPLOAD] 파일 크기: {file_size / (1024*1024):.2f} MB") try: # 3. Resumable upload 세션 시작 upload_url = await self._init_resumable_upload( access_token=access_token, metadata=metadata, file_size=file_size, ) # 4. 파일 업로드 video_id = await self._upload_file( upload_url=upload_url, video_path=video_path, file_size=file_size, progress_callback=progress_callback, ) video_url = self.get_video_url(video_id) logger.info( f"[YOUTUBE_UPLOAD] 업로드 성공 - video_id: {video_id}, url: {video_url}" ) return UploadResult( success=True, platform_video_id=video_id, platform_url=video_url, ) except UploadQuotaExceededError: raise except UploadError as e: logger.error(f"[YOUTUBE_UPLOAD] 업로드 실패 - error: {e}") return UploadResult( success=False, error_message=str(e), ) except Exception as e: logger.error(f"[YOUTUBE_UPLOAD] 예상치 못한 에러 - error: {e}") return UploadResult( success=False, error_message=f"업로드 중 에러 발생: {str(e)}", ) async def _init_resumable_upload( self, access_token: str, metadata: UploadMetadata, file_size: int, ) -> str: """ Resumable upload 세션 시작 Args: access_token: OAuth 액세스 토큰 metadata: 업로드 메타데이터 file_size: 파일 크기 Returns: str: 업로드 URL Raises: UploadError: 세션 시작 실패 """ logger.debug("[YOUTUBE_UPLOAD] Resumable upload 세션 시작") # YouTube API 요청 본문 body = { "snippet": { "title": metadata.title, "description": metadata.description or "", "tags": metadata.tags or [], "categoryId": self._get_category_id(metadata), }, "status": { "privacyStatus": self._convert_privacy_status(metadata.privacy_status), "selfDeclaredMadeForKids": False, }, } headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json; charset=utf-8", "X-Upload-Content-Type": "video/*", "X-Upload-Content-Length": str(file_size), } params = { "uploadType": "resumable", "part": "snippet,status", } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( self.UPLOAD_URL, params=params, headers=headers, json=body, ) if response.status_code == 200: upload_url = response.headers.get("location") if upload_url: logger.debug( f"[YOUTUBE_UPLOAD] 세션 시작 성공 - upload_url: {upload_url[:50]}..." ) return upload_url # 에러 처리 error_data = response.json() if response.content else {} error_reason = ( error_data.get("error", {}).get("errors", [{}])[0].get("reason", "") ) if error_reason == "quotaExceeded": logger.error("[YOUTUBE_UPLOAD] API 할당량 초과") raise UploadQuotaExceededError(platform=self.platform.value) error_message = error_data.get("error", {}).get( "message", f"HTTP {response.status_code}" ) logger.error(f"[YOUTUBE_UPLOAD] 세션 시작 실패 - error: {error_message}") raise UploadError( platform=self.platform.value, detail=f"Resumable upload 세션 시작 실패: {error_message}", ) async def _upload_file( self, upload_url: str, video_path: str, file_size: int, progress_callback: Optional[Callable[[int], None]] = None, ) -> str: """ 파일 청크 업로드 Args: upload_url: Resumable upload URL video_path: 영상 파일 경로 file_size: 파일 크기 progress_callback: 진행률 콜백 Returns: str: YouTube 영상 ID Raises: UploadError: 업로드 실패 """ uploaded_bytes = 0 async with httpx.AsyncClient(timeout=self.timeout) as client: with open(video_path, "rb") as video_file: while uploaded_bytes < file_size: # 청크 읽기 chunk = video_file.read(self.CHUNK_SIZE) chunk_size = len(chunk) end_byte = uploaded_bytes + chunk_size - 1 headers = { "Content-Type": "video/*", "Content-Length": str(chunk_size), "Content-Range": f"bytes {uploaded_bytes}-{end_byte}/{file_size}", } response = await client.put( upload_url, headers=headers, content=chunk, ) if response.status_code == 200 or response.status_code == 201: # 업로드 완료 result = response.json() video_id = result.get("id") if video_id: return video_id raise UploadError( platform=self.platform.value, detail="응답에서 video ID를 찾을 수 없습니다.", ) elif response.status_code == 308: # 청크 업로드 성공, 계속 진행 uploaded_bytes += chunk_size progress = int((uploaded_bytes / file_size) * 100) if progress_callback: progress_callback(progress) logger.debug( f"[YOUTUBE_UPLOAD] 청크 업로드 완료 - " f"progress: {progress}%, " f"uploaded: {uploaded_bytes}/{file_size}" ) else: # 에러 error_data = response.json() if response.content else {} error_message = error_data.get("error", {}).get( "message", f"HTTP {response.status_code}" ) logger.error( f"[YOUTUBE_UPLOAD] 청크 업로드 실패 - error: {error_message}" ) raise UploadError( platform=self.platform.value, detail=f"청크 업로드 실패: {error_message}", ) raise UploadError( platform=self.platform.value, detail="업로드가 완료되지 않았습니다.", ) async def get_upload_status( self, platform_video_id: str, access_token: str, ) -> dict[str, Any]: """ 업로드 상태 조회 Args: platform_video_id: YouTube 영상 ID access_token: OAuth 액세스 토큰 Returns: dict: 업로드 상태 정보 """ logger.info(f"[YOUTUBE_UPLOAD] 상태 조회 - video_id: {platform_video_id}") headers = {"Authorization": f"Bearer {access_token}"} params = { "part": "status,processingDetails", "id": platform_video_id, } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( self.VIDEOS_URL, headers=headers, params=params, ) if response.status_code == 200: data = response.json() items = data.get("items", []) if items: item = items[0] status = item.get("status", {}) processing = item.get("processingDetails", {}) return { "upload_status": status.get("uploadStatus"), "privacy_status": status.get("privacyStatus"), "processing_status": processing.get( "processingStatus", "processing" ), "processing_progress": processing.get( "processingProgress", {} ), } return {"error": "영상을 찾을 수 없습니다."} return {"error": f"상태 조회 실패: HTTP {response.status_code}"} async def delete_video( self, platform_video_id: str, access_token: str, ) -> bool: """ 업로드된 영상 삭제 Args: platform_video_id: YouTube 영상 ID access_token: OAuth 액세스 토큰 Returns: bool: 삭제 성공 여부 """ logger.info(f"[YOUTUBE_UPLOAD] 영상 삭제 - video_id: {platform_video_id}") headers = {"Authorization": f"Bearer {access_token}"} params = {"id": platform_video_id} async with httpx.AsyncClient(timeout=30.0) as client: response = await client.delete( self.VIDEOS_URL, headers=headers, params=params, ) if response.status_code == 204: logger.info(f"[YOUTUBE_UPLOAD] 영상 삭제 성공 - video_id: {platform_video_id}") return True else: logger.warning( f"[YOUTUBE_UPLOAD] 영상 삭제 실패 - " f"video_id: {platform_video_id}, status: {response.status_code}" ) return False def _convert_privacy_status(self, privacy_status: PrivacyStatus) -> str: """ PrivacyStatus를 YouTube API 형식으로 변환 Args: privacy_status: 공개 상태 Returns: str: YouTube API 공개 상태 """ mapping = { PrivacyStatus.PUBLIC: "public", PrivacyStatus.UNLISTED: "unlisted", PrivacyStatus.PRIVATE: "private", } return mapping.get(privacy_status, "private") def _get_category_id(self, metadata: UploadMetadata) -> str: """ 카테고리 ID 추출 platform_options에서 category_id를 추출하거나 기본값 반환 Args: metadata: 업로드 메타데이터 Returns: str: YouTube 카테고리 ID """ if metadata.platform_options and "category_id" in metadata.platform_options: return str(metadata.platform_options["category_id"]) # 기본값: "22" (People & Blogs) return "22" # 싱글톤 인스턴스 youtube_uploader = YouTubeUploader()