386 lines
14 KiB
Python
386 lines
14 KiB
Python
"""
|
|
Social Upload Background Task
|
|
|
|
소셜 미디어 영상 업로드 백그라운드 태스크입니다.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import aiofiles
|
|
import httpx
|
|
from sqlalchemy import select
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from config import social_upload_settings
|
|
from app.database.session import BackgroundSessionLocal
|
|
from app.social.constants import SocialPlatform, UploadStatus
|
|
from app.social.exceptions import TokenExpiredError, UploadError, UploadQuotaExceededError
|
|
from app.social.models import SocialUpload
|
|
from app.social.services import social_account_service
|
|
from app.social.uploader import get_uploader
|
|
from app.social.uploader.base import UploadMetadata
|
|
from app.user.models import SocialAccount
|
|
from app.video.models import Video
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def _update_upload_status(
|
|
upload_id: int,
|
|
status: UploadStatus,
|
|
upload_progress: int = 0,
|
|
platform_video_id: Optional[str] = None,
|
|
platform_url: Optional[str] = None,
|
|
error_message: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
업로드 상태 업데이트
|
|
|
|
Args:
|
|
upload_id: SocialUpload ID
|
|
status: 업로드 상태
|
|
upload_progress: 업로드 진행률 (0-100)
|
|
platform_video_id: 플랫폼 영상 ID
|
|
platform_url: 플랫폼 영상 URL
|
|
error_message: 에러 메시지
|
|
|
|
Returns:
|
|
bool: 업데이트 성공 여부
|
|
"""
|
|
try:
|
|
async with BackgroundSessionLocal() as session:
|
|
result = await session.execute(
|
|
select(SocialUpload).where(SocialUpload.id == upload_id)
|
|
)
|
|
upload = result.scalar_one_or_none()
|
|
|
|
if upload:
|
|
upload.status = status.value
|
|
upload.upload_progress = upload_progress
|
|
|
|
if platform_video_id:
|
|
upload.platform_video_id = platform_video_id
|
|
if platform_url:
|
|
upload.platform_url = platform_url
|
|
if error_message:
|
|
upload.error_message = error_message
|
|
if status == UploadStatus.COMPLETED:
|
|
upload.uploaded_at = datetime.now()
|
|
|
|
await session.commit()
|
|
logger.info(
|
|
f"[SOCIAL_UPLOAD] 상태 업데이트 - "
|
|
f"upload_id: {upload_id}, status: {status.value}, progress: {upload_progress}%"
|
|
)
|
|
return True
|
|
else:
|
|
logger.warning(f"[SOCIAL_UPLOAD] 업로드 레코드 없음 - upload_id: {upload_id}")
|
|
return False
|
|
|
|
except SQLAlchemyError as e:
|
|
logger.error(f"[SOCIAL_UPLOAD] DB 에러 - upload_id: {upload_id}, error: {e}")
|
|
return False
|
|
|
|
|
|
async def _download_video(video_url: str, upload_id: int) -> bytes:
|
|
"""
|
|
영상 파일 다운로드
|
|
|
|
Args:
|
|
video_url: 영상 URL
|
|
upload_id: 업로드 ID (로그용)
|
|
|
|
Returns:
|
|
bytes: 영상 파일 내용
|
|
|
|
Raises:
|
|
httpx.HTTPError: 다운로드 실패
|
|
"""
|
|
logger.info(f"[SOCIAL_UPLOAD] 영상 다운로드 시작 - upload_id: {upload_id}")
|
|
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
response = await client.get(video_url)
|
|
response.raise_for_status()
|
|
|
|
logger.info(
|
|
f"[SOCIAL_UPLOAD] 영상 다운로드 완료 - "
|
|
f"upload_id: {upload_id}, size: {len(response.content)} bytes"
|
|
)
|
|
return response.content
|
|
|
|
|
|
async def _increment_retry_count(upload_id: int) -> int:
|
|
"""
|
|
재시도 횟수 증가
|
|
|
|
Args:
|
|
upload_id: SocialUpload ID
|
|
|
|
Returns:
|
|
int: 현재 재시도 횟수
|
|
"""
|
|
try:
|
|
async with BackgroundSessionLocal() as session:
|
|
result = await session.execute(
|
|
select(SocialUpload).where(SocialUpload.id == upload_id)
|
|
)
|
|
upload = result.scalar_one_or_none()
|
|
|
|
if upload:
|
|
upload.retry_count += 1
|
|
await session.commit()
|
|
return upload.retry_count
|
|
|
|
return 0
|
|
|
|
except SQLAlchemyError:
|
|
return 0
|
|
|
|
|
|
async def process_social_upload(upload_id: int) -> None:
|
|
"""
|
|
소셜 미디어 업로드 처리
|
|
|
|
백그라운드에서 실행되며, 영상을 소셜 플랫폼에 업로드합니다.
|
|
|
|
Args:
|
|
upload_id: SocialUpload ID
|
|
"""
|
|
logger.info(f"[SOCIAL_UPLOAD] 업로드 태스크 시작 - upload_id: {upload_id}")
|
|
|
|
temp_file_path: Optional[Path] = None
|
|
|
|
try:
|
|
# 1. 업로드 정보 조회
|
|
async with BackgroundSessionLocal() as session:
|
|
result = await session.execute(
|
|
select(SocialUpload).where(SocialUpload.id == upload_id)
|
|
)
|
|
upload = result.scalar_one_or_none()
|
|
|
|
if not upload:
|
|
logger.error(f"[SOCIAL_UPLOAD] 업로드 레코드 없음 - upload_id: {upload_id}")
|
|
return
|
|
|
|
# 2. Video 정보 조회
|
|
video_result = await session.execute(
|
|
select(Video).where(Video.id == upload.video_id)
|
|
)
|
|
video = video_result.scalar_one_or_none()
|
|
|
|
if not video or not video.result_movie_url:
|
|
logger.error(
|
|
f"[SOCIAL_UPLOAD] 영상 없음 또는 URL 없음 - "
|
|
f"upload_id: {upload_id}, video_id: {upload.video_id}"
|
|
)
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.FAILED,
|
|
error_message="영상을 찾을 수 없거나 URL이 없습니다.",
|
|
)
|
|
return
|
|
|
|
# 3. SocialAccount 정보 조회
|
|
account_result = await session.execute(
|
|
select(SocialAccount).where(SocialAccount.id == upload.social_account_id)
|
|
)
|
|
account = account_result.scalar_one_or_none()
|
|
|
|
if not account or not account.is_active:
|
|
logger.error(
|
|
f"[SOCIAL_UPLOAD] 소셜 계정 없음 또는 비활성화 - "
|
|
f"upload_id: {upload_id}, account_id: {upload.social_account_id}"
|
|
)
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.FAILED,
|
|
error_message="연동된 소셜 계정이 없거나 비활성화 상태입니다.",
|
|
)
|
|
return
|
|
|
|
# 필요한 정보 저장
|
|
video_url = video.result_movie_url
|
|
platform = SocialPlatform(upload.platform)
|
|
upload_title = upload.title
|
|
upload_description = upload.description
|
|
upload_tags = upload.tags if isinstance(upload.tags, list) else None
|
|
upload_privacy = upload.privacy_status
|
|
upload_options = upload.platform_options
|
|
|
|
# 4. 상태 업데이트: uploading
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.UPLOADING,
|
|
upload_progress=0,
|
|
)
|
|
|
|
# 5. 토큰 유효성 확인 및 갱신
|
|
async with BackgroundSessionLocal() as session:
|
|
# account 다시 조회 (세션이 닫혔으므로)
|
|
account_result = await session.execute(
|
|
select(SocialAccount).where(SocialAccount.id == upload.social_account_id)
|
|
)
|
|
account = account_result.scalar_one_or_none()
|
|
|
|
if not account:
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.FAILED,
|
|
error_message="소셜 계정을 찾을 수 없습니다.",
|
|
)
|
|
return
|
|
|
|
access_token = await social_account_service.ensure_valid_token(
|
|
account=account,
|
|
session=session,
|
|
)
|
|
|
|
# 6. 영상 다운로드
|
|
video_content = await _download_video(video_url, upload_id)
|
|
|
|
# 7. 임시 파일 저장
|
|
temp_dir = Path(social_upload_settings.UPLOAD_TEMP_DIR) / str(upload_id)
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
temp_file_path = temp_dir / "video.mp4"
|
|
|
|
async with aiofiles.open(str(temp_file_path), "wb") as f:
|
|
await f.write(video_content)
|
|
|
|
logger.info(
|
|
f"[SOCIAL_UPLOAD] 임시 파일 저장 완료 - "
|
|
f"upload_id: {upload_id}, path: {temp_file_path}"
|
|
)
|
|
|
|
# 8. 메타데이터 준비
|
|
from app.social.constants import PrivacyStatus
|
|
|
|
metadata = UploadMetadata(
|
|
title=upload_title,
|
|
description=upload_description,
|
|
tags=upload_tags,
|
|
privacy_status=PrivacyStatus(upload_privacy),
|
|
platform_options=upload_options,
|
|
)
|
|
|
|
# 9. 진행률 콜백 함수
|
|
async def progress_callback(progress: int) -> None:
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.UPLOADING,
|
|
upload_progress=progress,
|
|
)
|
|
|
|
# 10. 플랫폼에 업로드
|
|
uploader = get_uploader(platform)
|
|
|
|
# 동기 콜백으로 변환 (httpx 청크 업로드 내에서 호출되므로)
|
|
def sync_progress_callback(progress: int) -> None:
|
|
import asyncio
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_running():
|
|
asyncio.create_task(
|
|
_update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.UPLOADING,
|
|
upload_progress=progress,
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
result = await uploader.upload(
|
|
video_path=str(temp_file_path),
|
|
access_token=access_token,
|
|
metadata=metadata,
|
|
progress_callback=sync_progress_callback,
|
|
)
|
|
|
|
# 11. 결과 처리
|
|
if result.success:
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.COMPLETED,
|
|
upload_progress=100,
|
|
platform_video_id=result.platform_video_id,
|
|
platform_url=result.platform_url,
|
|
)
|
|
logger.info(
|
|
f"[SOCIAL_UPLOAD] 업로드 완료 - "
|
|
f"upload_id: {upload_id}, "
|
|
f"platform_video_id: {result.platform_video_id}, "
|
|
f"url: {result.platform_url}"
|
|
)
|
|
else:
|
|
retry_count = await _increment_retry_count(upload_id)
|
|
|
|
if retry_count < social_upload_settings.UPLOAD_MAX_RETRIES:
|
|
# 재시도 가능
|
|
logger.warning(
|
|
f"[SOCIAL_UPLOAD] 업로드 실패, 재시도 예정 - "
|
|
f"upload_id: {upload_id}, retry: {retry_count}"
|
|
)
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.PENDING,
|
|
upload_progress=0,
|
|
error_message=f"업로드 실패 (재시도 {retry_count}/{social_upload_settings.UPLOAD_MAX_RETRIES}): {result.error_message}",
|
|
)
|
|
else:
|
|
# 최대 재시도 초과
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.FAILED,
|
|
error_message=f"최대 재시도 횟수 초과: {result.error_message}",
|
|
)
|
|
logger.error(
|
|
f"[SOCIAL_UPLOAD] 업로드 최종 실패 - "
|
|
f"upload_id: {upload_id}, error: {result.error_message}"
|
|
)
|
|
|
|
except UploadQuotaExceededError as e:
|
|
logger.error(f"[SOCIAL_UPLOAD] API 할당량 초과 - upload_id: {upload_id}")
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.FAILED,
|
|
error_message="플랫폼 API 일일 할당량이 초과되었습니다. 내일 다시 시도해주세요.",
|
|
)
|
|
|
|
except TokenExpiredError as e:
|
|
logger.error(
|
|
f"[SOCIAL_UPLOAD] 토큰 만료, 재연동 필요 - "
|
|
f"upload_id: {upload_id}, platform: {e.platform}"
|
|
)
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.FAILED,
|
|
error_message=f"{e.platform} 계정 인증이 만료되었습니다. 계정을 다시 연동해주세요.",
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[SOCIAL_UPLOAD] 예상치 못한 에러 - "
|
|
f"upload_id: {upload_id}, error: {e}"
|
|
)
|
|
await _update_upload_status(
|
|
upload_id=upload_id,
|
|
status=UploadStatus.FAILED,
|
|
error_message=f"업로드 중 에러 발생: {str(e)}",
|
|
)
|
|
|
|
finally:
|
|
# 임시 파일 정리
|
|
if temp_file_path and temp_file_path.exists():
|
|
try:
|
|
temp_file_path.unlink()
|
|
temp_file_path.parent.rmdir()
|
|
logger.debug(f"[SOCIAL_UPLOAD] 임시 파일 삭제 - path: {temp_file_path}")
|
|
except Exception as e:
|
|
logger.warning(f"[SOCIAL_UPLOAD] 임시 파일 삭제 실패 - error: {e}")
|