o2o-castad-backend/app/social/uploader/youtube.py

421 lines
14 KiB
Python

"""
YouTube Uploader
YouTube Data API v3를 사용한 영상 업로더입니다.
Resumable Upload를 지원합니다.
"""
import json
import logging
import os
from typing import Any, Callable, Optional
import httpx
from config import social_upload_settings
from app.social.constants import PrivacyStatus, SocialPlatform
from app.social.exceptions import UploadError, UploadQuotaExceededError
from app.social.uploader.base import BaseSocialUploader, UploadMetadata, UploadResult
logger = logging.getLogger(__name__)
class YouTubeUploader(BaseSocialUploader):
"""
YouTube 영상 업로더
YouTube Data API v3의 Resumable Upload를 사용하여
대용량 영상을 안정적으로 업로드합니다.
"""
platform = SocialPlatform.YOUTUBE
# YouTube API 엔드포인트
UPLOAD_URL = "https://www.googleapis.com/upload/youtube/v3/videos"
VIDEOS_URL = "https://www.googleapis.com/youtube/v3/videos"
# 청크 크기 (5MB - YouTube 권장)
CHUNK_SIZE = 5 * 1024 * 1024
def __init__(self) -> None:
self.timeout = social_upload_settings.UPLOAD_TIMEOUT_SECONDS
async def upload(
self,
video_path: str,
access_token: str,
metadata: UploadMetadata,
progress_callback: Optional[Callable[[int], None]] = None,
) -> UploadResult:
"""
YouTube에 영상 업로드 (Resumable Upload)
Args:
video_path: 업로드할 영상 파일 경로
access_token: OAuth 액세스 토큰
metadata: 업로드 메타데이터
progress_callback: 진행률 콜백 함수 (0-100)
Returns:
UploadResult: 업로드 결과
"""
logger.info(f"[YOUTUBE_UPLOAD] 업로드 시작 - video_path: {video_path}")
# 1. 메타데이터 유효성 검증
self.validate_metadata(metadata)
# 2. 파일 크기 확인
if not os.path.exists(video_path):
logger.error(f"[YOUTUBE_UPLOAD] 파일 없음 - path: {video_path}")
return UploadResult(
success=False,
error_message=f"파일을 찾을 수 없습니다: {video_path}",
)
file_size = os.path.getsize(video_path)
logger.info(f"[YOUTUBE_UPLOAD] 파일 크기: {file_size / (1024*1024):.2f} MB")
try:
# 3. Resumable upload 세션 시작
upload_url = await self._init_resumable_upload(
access_token=access_token,
metadata=metadata,
file_size=file_size,
)
# 4. 파일 업로드
video_id = await self._upload_file(
upload_url=upload_url,
video_path=video_path,
file_size=file_size,
progress_callback=progress_callback,
)
video_url = self.get_video_url(video_id)
logger.info(
f"[YOUTUBE_UPLOAD] 업로드 성공 - video_id: {video_id}, url: {video_url}"
)
return UploadResult(
success=True,
platform_video_id=video_id,
platform_url=video_url,
)
except UploadQuotaExceededError:
raise
except UploadError as e:
logger.error(f"[YOUTUBE_UPLOAD] 업로드 실패 - error: {e}")
return UploadResult(
success=False,
error_message=str(e),
)
except Exception as e:
logger.error(f"[YOUTUBE_UPLOAD] 예상치 못한 에러 - error: {e}")
return UploadResult(
success=False,
error_message=f"업로드 중 에러 발생: {str(e)}",
)
async def _init_resumable_upload(
self,
access_token: str,
metadata: UploadMetadata,
file_size: int,
) -> str:
"""
Resumable upload 세션 시작
Args:
access_token: OAuth 액세스 토큰
metadata: 업로드 메타데이터
file_size: 파일 크기
Returns:
str: 업로드 URL
Raises:
UploadError: 세션 시작 실패
"""
logger.debug("[YOUTUBE_UPLOAD] Resumable upload 세션 시작")
# YouTube API 요청 본문
body = {
"snippet": {
"title": metadata.title,
"description": metadata.description or "",
"tags": metadata.tags or [],
"categoryId": self._get_category_id(metadata),
},
"status": {
"privacyStatus": self._convert_privacy_status(metadata.privacy_status),
"selfDeclaredMadeForKids": False,
},
}
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json; charset=utf-8",
"X-Upload-Content-Type": "video/*",
"X-Upload-Content-Length": str(file_size),
}
params = {
"uploadType": "resumable",
"part": "snippet,status",
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
self.UPLOAD_URL,
params=params,
headers=headers,
json=body,
)
if response.status_code == 200:
upload_url = response.headers.get("location")
if upload_url:
logger.debug(
f"[YOUTUBE_UPLOAD] 세션 시작 성공 - upload_url: {upload_url[:50]}..."
)
return upload_url
# 에러 처리
error_data = response.json() if response.content else {}
error_reason = (
error_data.get("error", {}).get("errors", [{}])[0].get("reason", "")
)
if error_reason == "quotaExceeded":
logger.error("[YOUTUBE_UPLOAD] API 할당량 초과")
raise UploadQuotaExceededError(platform=self.platform.value)
error_message = error_data.get("error", {}).get(
"message", f"HTTP {response.status_code}"
)
logger.error(f"[YOUTUBE_UPLOAD] 세션 시작 실패 - error: {error_message}")
raise UploadError(
platform=self.platform.value,
detail=f"Resumable upload 세션 시작 실패: {error_message}",
)
async def _upload_file(
self,
upload_url: str,
video_path: str,
file_size: int,
progress_callback: Optional[Callable[[int], None]] = None,
) -> str:
"""
파일 청크 업로드
Args:
upload_url: Resumable upload URL
video_path: 영상 파일 경로
file_size: 파일 크기
progress_callback: 진행률 콜백
Returns:
str: YouTube 영상 ID
Raises:
UploadError: 업로드 실패
"""
uploaded_bytes = 0
async with httpx.AsyncClient(timeout=self.timeout) as client:
with open(video_path, "rb") as video_file:
while uploaded_bytes < file_size:
# 청크 읽기
chunk = video_file.read(self.CHUNK_SIZE)
chunk_size = len(chunk)
end_byte = uploaded_bytes + chunk_size - 1
headers = {
"Content-Type": "video/*",
"Content-Length": str(chunk_size),
"Content-Range": f"bytes {uploaded_bytes}-{end_byte}/{file_size}",
}
response = await client.put(
upload_url,
headers=headers,
content=chunk,
)
if response.status_code == 200 or response.status_code == 201:
# 업로드 완료
result = response.json()
video_id = result.get("id")
if video_id:
return video_id
raise UploadError(
platform=self.platform.value,
detail="응답에서 video ID를 찾을 수 없습니다.",
)
elif response.status_code == 308:
# 청크 업로드 성공, 계속 진행
uploaded_bytes += chunk_size
progress = int((uploaded_bytes / file_size) * 100)
if progress_callback:
progress_callback(progress)
logger.debug(
f"[YOUTUBE_UPLOAD] 청크 업로드 완료 - "
f"progress: {progress}%, "
f"uploaded: {uploaded_bytes}/{file_size}"
)
else:
# 에러
error_data = response.json() if response.content else {}
error_message = error_data.get("error", {}).get(
"message", f"HTTP {response.status_code}"
)
logger.error(
f"[YOUTUBE_UPLOAD] 청크 업로드 실패 - error: {error_message}"
)
raise UploadError(
platform=self.platform.value,
detail=f"청크 업로드 실패: {error_message}",
)
raise UploadError(
platform=self.platform.value,
detail="업로드가 완료되지 않았습니다.",
)
async def get_upload_status(
self,
platform_video_id: str,
access_token: str,
) -> dict[str, Any]:
"""
업로드 상태 조회
Args:
platform_video_id: YouTube 영상 ID
access_token: OAuth 액세스 토큰
Returns:
dict: 업로드 상태 정보
"""
logger.info(f"[YOUTUBE_UPLOAD] 상태 조회 - video_id: {platform_video_id}")
headers = {"Authorization": f"Bearer {access_token}"}
params = {
"part": "status,processingDetails",
"id": platform_video_id,
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
self.VIDEOS_URL,
headers=headers,
params=params,
)
if response.status_code == 200:
data = response.json()
items = data.get("items", [])
if items:
item = items[0]
status = item.get("status", {})
processing = item.get("processingDetails", {})
return {
"upload_status": status.get("uploadStatus"),
"privacy_status": status.get("privacyStatus"),
"processing_status": processing.get(
"processingStatus", "processing"
),
"processing_progress": processing.get(
"processingProgress", {}
),
}
return {"error": "영상을 찾을 수 없습니다."}
return {"error": f"상태 조회 실패: HTTP {response.status_code}"}
async def delete_video(
self,
platform_video_id: str,
access_token: str,
) -> bool:
"""
업로드된 영상 삭제
Args:
platform_video_id: YouTube 영상 ID
access_token: OAuth 액세스 토큰
Returns:
bool: 삭제 성공 여부
"""
logger.info(f"[YOUTUBE_UPLOAD] 영상 삭제 - video_id: {platform_video_id}")
headers = {"Authorization": f"Bearer {access_token}"}
params = {"id": platform_video_id}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.delete(
self.VIDEOS_URL,
headers=headers,
params=params,
)
if response.status_code == 204:
logger.info(f"[YOUTUBE_UPLOAD] 영상 삭제 성공 - video_id: {platform_video_id}")
return True
else:
logger.warning(
f"[YOUTUBE_UPLOAD] 영상 삭제 실패 - "
f"video_id: {platform_video_id}, status: {response.status_code}"
)
return False
def _convert_privacy_status(self, privacy_status: PrivacyStatus) -> str:
"""
PrivacyStatus를 YouTube API 형식으로 변환
Args:
privacy_status: 공개 상태
Returns:
str: YouTube API 공개 상태
"""
mapping = {
PrivacyStatus.PUBLIC: "public",
PrivacyStatus.UNLISTED: "unlisted",
PrivacyStatus.PRIVATE: "private",
}
return mapping.get(privacy_status, "private")
def _get_category_id(self, metadata: UploadMetadata) -> str:
"""
카테고리 ID 추출
platform_options에서 category_id를 추출하거나 기본값 반환
Args:
metadata: 업로드 메타데이터
Returns:
str: YouTube 카테고리 ID
"""
if metadata.platform_options and "category_id" in metadata.platform_options:
return str(metadata.platform_options["category_id"])
# 기본값: "22" (People & Blogs)
return "22"
# 싱글톤 인스턴스
youtube_uploader = YouTubeUploader()