o2o-castad-backend/app/social/worker/upload_task.py

375 lines
13 KiB
Python

"""
Social Upload Background Task
소셜 미디어 영상 업로드 백그라운드 태스크입니다.
"""
import logging
import os
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Optional
import aiofiles
import httpx
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from config import social_upload_settings
from app.database.session import BackgroundSessionLocal
from app.social.constants import SocialPlatform, UploadStatus
from app.social.exceptions import UploadError, UploadQuotaExceededError
from app.social.models import SocialUpload
from app.social.services import social_account_service
from app.social.uploader import get_uploader
from app.social.uploader.base import UploadMetadata
from app.user.models import SocialAccount
from app.video.models import Video
logger = logging.getLogger(__name__)
async def _update_upload_status(
upload_id: int,
status: UploadStatus,
upload_progress: int = 0,
platform_video_id: Optional[str] = None,
platform_url: Optional[str] = None,
error_message: Optional[str] = None,
) -> bool:
"""
업로드 상태 업데이트
Args:
upload_id: SocialUpload ID
status: 업로드 상태
upload_progress: 업로드 진행률 (0-100)
platform_video_id: 플랫폼 영상 ID
platform_url: 플랫폼 영상 URL
error_message: 에러 메시지
Returns:
bool: 업데이트 성공 여부
"""
try:
async with BackgroundSessionLocal() as session:
result = await session.execute(
select(SocialUpload).where(SocialUpload.id == upload_id)
)
upload = result.scalar_one_or_none()
if upload:
upload.status = status.value
upload.upload_progress = upload_progress
if platform_video_id:
upload.platform_video_id = platform_video_id
if platform_url:
upload.platform_url = platform_url
if error_message:
upload.error_message = error_message
if status == UploadStatus.COMPLETED:
upload.uploaded_at = datetime.now()
await session.commit()
logger.info(
f"[SOCIAL_UPLOAD] 상태 업데이트 - "
f"upload_id: {upload_id}, status: {status.value}, progress: {upload_progress}%"
)
return True
else:
logger.warning(f"[SOCIAL_UPLOAD] 업로드 레코드 없음 - upload_id: {upload_id}")
return False
except SQLAlchemyError as e:
logger.error(f"[SOCIAL_UPLOAD] DB 에러 - upload_id: {upload_id}, error: {e}")
return False
async def _download_video(video_url: str, upload_id: int) -> bytes:
"""
영상 파일 다운로드
Args:
video_url: 영상 URL
upload_id: 업로드 ID (로그용)
Returns:
bytes: 영상 파일 내용
Raises:
httpx.HTTPError: 다운로드 실패
"""
logger.info(f"[SOCIAL_UPLOAD] 영상 다운로드 시작 - upload_id: {upload_id}")
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.get(video_url)
response.raise_for_status()
logger.info(
f"[SOCIAL_UPLOAD] 영상 다운로드 완료 - "
f"upload_id: {upload_id}, size: {len(response.content)} bytes"
)
return response.content
async def _increment_retry_count(upload_id: int) -> int:
"""
재시도 횟수 증가
Args:
upload_id: SocialUpload ID
Returns:
int: 현재 재시도 횟수
"""
try:
async with BackgroundSessionLocal() as session:
result = await session.execute(
select(SocialUpload).where(SocialUpload.id == upload_id)
)
upload = result.scalar_one_or_none()
if upload:
upload.retry_count += 1
await session.commit()
return upload.retry_count
return 0
except SQLAlchemyError:
return 0
async def process_social_upload(upload_id: int) -> None:
"""
소셜 미디어 업로드 처리
백그라운드에서 실행되며, 영상을 소셜 플랫폼에 업로드합니다.
Args:
upload_id: SocialUpload ID
"""
logger.info(f"[SOCIAL_UPLOAD] 업로드 태스크 시작 - upload_id: {upload_id}")
temp_file_path: Optional[Path] = None
try:
# 1. 업로드 정보 조회
async with BackgroundSessionLocal() as session:
result = await session.execute(
select(SocialUpload).where(SocialUpload.id == upload_id)
)
upload = result.scalar_one_or_none()
if not upload:
logger.error(f"[SOCIAL_UPLOAD] 업로드 레코드 없음 - upload_id: {upload_id}")
return
# 2. Video 정보 조회
video_result = await session.execute(
select(Video).where(Video.id == upload.video_id)
)
video = video_result.scalar_one_or_none()
if not video or not video.result_movie_url:
logger.error(
f"[SOCIAL_UPLOAD] 영상 없음 또는 URL 없음 - "
f"upload_id: {upload_id}, video_id: {upload.video_id}"
)
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.FAILED,
error_message="영상을 찾을 수 없거나 URL이 없습니다.",
)
return
# 3. SocialAccount 정보 조회
account_result = await session.execute(
select(SocialAccount).where(SocialAccount.id == upload.social_account_id)
)
account = account_result.scalar_one_or_none()
if not account or not account.is_active:
logger.error(
f"[SOCIAL_UPLOAD] 소셜 계정 없음 또는 비활성화 - "
f"upload_id: {upload_id}, account_id: {upload.social_account_id}"
)
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.FAILED,
error_message="연동된 소셜 계정이 없거나 비활성화 상태입니다.",
)
return
# 필요한 정보 저장
video_url = video.result_movie_url
platform = SocialPlatform(upload.platform)
upload_title = upload.title
upload_description = upload.description
upload_tags = upload.tags if isinstance(upload.tags, list) else None
upload_privacy = upload.privacy_status
upload_options = upload.platform_options
# 4. 상태 업데이트: uploading
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.UPLOADING,
upload_progress=0,
)
# 5. 토큰 유효성 확인 및 갱신
async with BackgroundSessionLocal() as session:
# account 다시 조회 (세션이 닫혔으므로)
account_result = await session.execute(
select(SocialAccount).where(SocialAccount.id == upload.social_account_id)
)
account = account_result.scalar_one_or_none()
if not account:
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.FAILED,
error_message="소셜 계정을 찾을 수 없습니다.",
)
return
access_token = await social_account_service.ensure_valid_token(
account=account,
session=session,
)
# 6. 영상 다운로드
video_content = await _download_video(video_url, upload_id)
# 7. 임시 파일 저장
temp_dir = Path(social_upload_settings.UPLOAD_TEMP_DIR) / str(upload_id)
temp_dir.mkdir(parents=True, exist_ok=True)
temp_file_path = temp_dir / "video.mp4"
async with aiofiles.open(str(temp_file_path), "wb") as f:
await f.write(video_content)
logger.info(
f"[SOCIAL_UPLOAD] 임시 파일 저장 완료 - "
f"upload_id: {upload_id}, path: {temp_file_path}"
)
# 8. 메타데이터 준비
from app.social.constants import PrivacyStatus
metadata = UploadMetadata(
title=upload_title,
description=upload_description,
tags=upload_tags,
privacy_status=PrivacyStatus(upload_privacy),
platform_options=upload_options,
)
# 9. 진행률 콜백 함수
async def progress_callback(progress: int) -> None:
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.UPLOADING,
upload_progress=progress,
)
# 10. 플랫폼에 업로드
uploader = get_uploader(platform)
# 동기 콜백으로 변환 (httpx 청크 업로드 내에서 호출되므로)
def sync_progress_callback(progress: int) -> None:
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(
_update_upload_status(
upload_id=upload_id,
status=UploadStatus.UPLOADING,
upload_progress=progress,
)
)
except Exception:
pass
result = await uploader.upload(
video_path=str(temp_file_path),
access_token=access_token,
metadata=metadata,
progress_callback=sync_progress_callback,
)
# 11. 결과 처리
if result.success:
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.COMPLETED,
upload_progress=100,
platform_video_id=result.platform_video_id,
platform_url=result.platform_url,
)
logger.info(
f"[SOCIAL_UPLOAD] 업로드 완료 - "
f"upload_id: {upload_id}, "
f"platform_video_id: {result.platform_video_id}, "
f"url: {result.platform_url}"
)
else:
retry_count = await _increment_retry_count(upload_id)
if retry_count < social_upload_settings.UPLOAD_MAX_RETRIES:
# 재시도 가능
logger.warning(
f"[SOCIAL_UPLOAD] 업로드 실패, 재시도 예정 - "
f"upload_id: {upload_id}, retry: {retry_count}"
)
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.PENDING,
upload_progress=0,
error_message=f"업로드 실패 (재시도 {retry_count}/{social_upload_settings.UPLOAD_MAX_RETRIES}): {result.error_message}",
)
else:
# 최대 재시도 초과
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.FAILED,
error_message=f"최대 재시도 횟수 초과: {result.error_message}",
)
logger.error(
f"[SOCIAL_UPLOAD] 업로드 최종 실패 - "
f"upload_id: {upload_id}, error: {result.error_message}"
)
except UploadQuotaExceededError as e:
logger.error(f"[SOCIAL_UPLOAD] API 할당량 초과 - upload_id: {upload_id}")
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.FAILED,
error_message="플랫폼 API 일일 할당량이 초과되었습니다. 내일 다시 시도해주세요.",
)
except Exception as e:
logger.error(
f"[SOCIAL_UPLOAD] 예상치 못한 에러 - "
f"upload_id: {upload_id}, error: {e}"
)
await _update_upload_status(
upload_id=upload_id,
status=UploadStatus.FAILED,
error_message=f"업로드 중 에러 발생: {str(e)}",
)
finally:
# 임시 파일 정리
if temp_file_path and temp_file_path.exists():
try:
temp_file_path.unlink()
temp_file_path.parent.rmdir()
logger.debug(f"[SOCIAL_UPLOAD] 임시 파일 삭제 - path: {temp_file_path}")
except Exception as e:
logger.warning(f"[SOCIAL_UPLOAD] 임시 파일 삭제 실패 - error: {e}")