""" Social Upload Background Task 소셜 미디어 영상 업로드 백그라운드 태스크입니다. """ import logging import os import tempfile from pathlib import Path from typing import Optional import aiofiles from app.utils.timezone import now import httpx from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from config import social_upload_settings from app.database.session import BackgroundSessionLocal from app.social.constants import SocialPlatform, UploadStatus from app.social.exceptions import TokenExpiredError, UploadError, UploadQuotaExceededError from app.social.models import SocialUpload from app.social.services import social_account_service from app.social.uploader import get_uploader from app.social.uploader.base import UploadMetadata from app.user.models import SocialAccount from app.video.models import Video logger = logging.getLogger(__name__) async def _update_upload_status( upload_id: int, status: UploadStatus, upload_progress: int = 0, platform_video_id: Optional[str] = None, platform_url: Optional[str] = None, error_message: Optional[str] = None, ) -> bool: """ 업로드 상태 업데이트 Args: upload_id: SocialUpload ID status: 업로드 상태 upload_progress: 업로드 진행률 (0-100) platform_video_id: 플랫폼 영상 ID platform_url: 플랫폼 영상 URL error_message: 에러 메시지 Returns: bool: 업데이트 성공 여부 """ try: async with BackgroundSessionLocal() as session: result = await session.execute( select(SocialUpload).where(SocialUpload.id == upload_id) ) upload = result.scalar_one_or_none() if upload: upload.status = status.value upload.upload_progress = upload_progress if platform_video_id: upload.platform_video_id = platform_video_id if platform_url: upload.platform_url = platform_url if error_message: upload.error_message = error_message if status == UploadStatus.COMPLETED: upload.uploaded_at = now() await session.commit() logger.info( f"[SOCIAL_UPLOAD] 상태 업데이트 - " f"upload_id: {upload_id}, status: {status.value}, progress: {upload_progress}%" ) return True else: logger.warning(f"[SOCIAL_UPLOAD] 업로드 레코드 없음 - upload_id: {upload_id}") return False except SQLAlchemyError as e: logger.error(f"[SOCIAL_UPLOAD] DB 에러 - upload_id: {upload_id}, error: {e}") return False async def _download_video(video_url: str, upload_id: int) -> bytes: """ 영상 파일 다운로드 Args: video_url: 영상 URL upload_id: 업로드 ID (로그용) Returns: bytes: 영상 파일 내용 Raises: httpx.HTTPError: 다운로드 실패 """ logger.info(f"[SOCIAL_UPLOAD] 영상 다운로드 시작 - upload_id: {upload_id}") async with httpx.AsyncClient(timeout=300.0) as client: response = await client.get(video_url) response.raise_for_status() logger.info( f"[SOCIAL_UPLOAD] 영상 다운로드 완료 - " f"upload_id: {upload_id}, size: {len(response.content)} bytes" ) return response.content async def _increment_retry_count(upload_id: int) -> int: """ 재시도 횟수 증가 Args: upload_id: SocialUpload ID Returns: int: 현재 재시도 횟수 """ try: async with BackgroundSessionLocal() as session: result = await session.execute( select(SocialUpload).where(SocialUpload.id == upload_id) ) upload = result.scalar_one_or_none() if upload: upload.retry_count += 1 await session.commit() return upload.retry_count return 0 except SQLAlchemyError: return 0 async def process_social_upload(upload_id: int) -> None: """ 소셜 미디어 업로드 처리 백그라운드에서 실행되며, 영상을 소셜 플랫폼에 업로드합니다. Args: upload_id: SocialUpload ID """ logger.info(f"[SOCIAL_UPLOAD] 업로드 태스크 시작 - upload_id: {upload_id}") temp_file_path: Optional[Path] = None try: # 1. 업로드 정보 조회 async with BackgroundSessionLocal() as session: result = await session.execute( select(SocialUpload).where(SocialUpload.id == upload_id) ) upload = result.scalar_one_or_none() if not upload: logger.error(f"[SOCIAL_UPLOAD] 업로드 레코드 없음 - upload_id: {upload_id}") return # 2. Video 정보 조회 video_result = await session.execute( select(Video).where(Video.id == upload.video_id) ) video = video_result.scalar_one_or_none() if not video or not video.result_movie_url: logger.error( f"[SOCIAL_UPLOAD] 영상 없음 또는 URL 없음 - " f"upload_id: {upload_id}, video_id: {upload.video_id}" ) await _update_upload_status( upload_id=upload_id, status=UploadStatus.FAILED, error_message="영상을 찾을 수 없거나 URL이 없습니다.", ) return # 3. SocialAccount 정보 조회 account_result = await session.execute( select(SocialAccount).where(SocialAccount.id == upload.social_account_id) ) account = account_result.scalar_one_or_none() if not account or not account.is_active: logger.error( f"[SOCIAL_UPLOAD] 소셜 계정 없음 또는 비활성화 - " f"upload_id: {upload_id}, account_id: {upload.social_account_id}" ) await _update_upload_status( upload_id=upload_id, status=UploadStatus.FAILED, error_message="연동된 소셜 계정이 없거나 비활성화 상태입니다.", ) return # 필요한 정보 저장 video_url = video.result_movie_url platform = SocialPlatform(upload.platform) upload_title = upload.title upload_description = upload.description upload_tags = upload.tags if isinstance(upload.tags, list) else None upload_privacy = upload.privacy_status upload_options = upload.platform_options # 4. 상태 업데이트: uploading await _update_upload_status( upload_id=upload_id, status=UploadStatus.UPLOADING, upload_progress=0, ) # 5. 토큰 유효성 확인 및 갱신 async with BackgroundSessionLocal() as session: # account 다시 조회 (세션이 닫혔으므로) account_result = await session.execute( select(SocialAccount).where(SocialAccount.id == upload.social_account_id) ) account = account_result.scalar_one_or_none() if not account: await _update_upload_status( upload_id=upload_id, status=UploadStatus.FAILED, error_message="소셜 계정을 찾을 수 없습니다.", ) return access_token = await social_account_service.ensure_valid_token( account=account, session=session, ) # 6. 영상 다운로드 video_content = await _download_video(video_url, upload_id) # 7. 임시 파일 저장 temp_dir = Path(social_upload_settings.UPLOAD_TEMP_DIR) / str(upload_id) temp_dir.mkdir(parents=True, exist_ok=True) temp_file_path = temp_dir / "video.mp4" async with aiofiles.open(str(temp_file_path), "wb") as f: await f.write(video_content) logger.info( f"[SOCIAL_UPLOAD] 임시 파일 저장 완료 - " f"upload_id: {upload_id}, path: {temp_file_path}" ) # 8. 메타데이터 준비 from app.social.constants import PrivacyStatus metadata = UploadMetadata( title=upload_title, description=upload_description, tags=upload_tags, privacy_status=PrivacyStatus(upload_privacy), platform_options=upload_options, ) # 9. 진행률 콜백 함수 async def progress_callback(progress: int) -> None: await _update_upload_status( upload_id=upload_id, status=UploadStatus.UPLOADING, upload_progress=progress, ) # 10. 플랫폼에 업로드 uploader = get_uploader(platform) # 동기 콜백으로 변환 (httpx 청크 업로드 내에서 호출되므로) def sync_progress_callback(progress: int) -> None: import asyncio try: loop = asyncio.get_event_loop() if loop.is_running(): asyncio.create_task( _update_upload_status( upload_id=upload_id, status=UploadStatus.UPLOADING, upload_progress=progress, ) ) except Exception: pass result = await uploader.upload( video_path=str(temp_file_path), access_token=access_token, metadata=metadata, progress_callback=sync_progress_callback, ) # 11. 결과 처리 if result.success: await _update_upload_status( upload_id=upload_id, status=UploadStatus.COMPLETED, upload_progress=100, platform_video_id=result.platform_video_id, platform_url=result.platform_url, ) logger.info( f"[SOCIAL_UPLOAD] 업로드 완료 - " f"upload_id: {upload_id}, " f"platform_video_id: {result.platform_video_id}, " f"url: {result.platform_url}" ) else: retry_count = await _increment_retry_count(upload_id) if retry_count < social_upload_settings.UPLOAD_MAX_RETRIES: # 재시도 가능 logger.warning( f"[SOCIAL_UPLOAD] 업로드 실패, 재시도 예정 - " f"upload_id: {upload_id}, retry: {retry_count}" ) await _update_upload_status( upload_id=upload_id, status=UploadStatus.PENDING, upload_progress=0, error_message=f"업로드 실패 (재시도 {retry_count}/{social_upload_settings.UPLOAD_MAX_RETRIES}): {result.error_message}", ) else: # 최대 재시도 초과 await _update_upload_status( upload_id=upload_id, status=UploadStatus.FAILED, error_message=f"최대 재시도 횟수 초과: {result.error_message}", ) logger.error( f"[SOCIAL_UPLOAD] 업로드 최종 실패 - " f"upload_id: {upload_id}, error: {result.error_message}" ) except UploadQuotaExceededError as e: logger.error(f"[SOCIAL_UPLOAD] API 할당량 초과 - upload_id: {upload_id}") await _update_upload_status( upload_id=upload_id, status=UploadStatus.FAILED, error_message="플랫폼 API 일일 할당량이 초과되었습니다. 내일 다시 시도해주세요.", ) except TokenExpiredError as e: logger.error( f"[SOCIAL_UPLOAD] 토큰 만료, 재연동 필요 - " f"upload_id: {upload_id}, platform: {e.platform}" ) await _update_upload_status( upload_id=upload_id, status=UploadStatus.FAILED, error_message=f"{e.platform} 계정 인증이 만료되었습니다. 계정을 다시 연동해주세요.", ) except Exception as e: logger.error( f"[SOCIAL_UPLOAD] 예상치 못한 에러 - " f"upload_id: {upload_id}, error: {e}" ) await _update_upload_status( upload_id=upload_id, status=UploadStatus.FAILED, error_message=f"업로드 중 에러 발생: {str(e)}", ) finally: # 임시 파일 정리 if temp_file_path and temp_file_path.exists(): try: temp_file_path.unlink() temp_file_path.parent.rmdir() logger.debug(f"[SOCIAL_UPLOAD] 임시 파일 삭제 - path: {temp_file_path}") except Exception as e: logger.warning(f"[SOCIAL_UPLOAD] 임시 파일 삭제 실패 - error: {e}")