""" 소셜 업로드 서비스 업로드 요청, 상태 조회, 이력 조회, 재시도, 취소 관련 비즈니스 로직을 처리합니다. """ import logging from calendar import monthrange from datetime import datetime from typing import Optional from fastapi import BackgroundTasks, HTTPException, status from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from config import TIMEZONE from app.social.constants import SocialPlatform, UploadStatus from app.social.exceptions import SocialAccountNotFoundError, VideoNotFoundError from app.social.models import SocialUpload from app.social.schemas import ( MessageResponse, SocialUploadHistoryItem, SocialUploadHistoryResponse, SocialUploadResponse, SocialUploadStatusResponse, SocialUploadRequest, ) from app.social.services.account_service import SocialAccountService from app.social.worker.upload_task import process_social_upload from app.user.models import User from app.video.models import Video logger = logging.getLogger(__name__) class SocialUploadService: """소셜 업로드 비즈니스 로직 서비스""" def __init__(self, account_service: SocialAccountService): self._account_service = account_service async def request_upload( self, body: SocialUploadRequest, current_user: User, session: AsyncSession, background_tasks: BackgroundTasks, ) -> SocialUploadResponse: """ 소셜 플랫폼 업로드 요청 영상 검증, 계정 확인, 중복 확인 후 업로드 레코드 생성. 즉시 업로드이면 백그라운드 태스크 등록, 예약이면 스케줄러가 처리. """ logger.info( f"[UPLOAD_SERVICE] 업로드 요청 - " f"user_uuid: {current_user.user_uuid}, " f"video_id: {body.video_id}, " f"social_account_id: {body.social_account_id}" ) # 1. 영상 조회 및 검증 video_result = await session.execute( select(Video).where(Video.id == body.video_id) ) video = video_result.scalar_one_or_none() if not video: logger.warning(f"[UPLOAD_SERVICE] 영상 없음 - video_id: {body.video_id}") raise VideoNotFoundError(video_id=body.video_id) if not video.result_movie_url: logger.warning(f"[UPLOAD_SERVICE] 영상 URL 없음 - video_id: {body.video_id}") raise VideoNotFoundError( video_id=body.video_id, detail="영상이 아직 준비되지 않았습니다. 영상 생성이 완료된 후 시도해주세요.", ) # 2. 소셜 계정 조회 및 소유권 검증 account = await self._account_service.get_account_by_id( user_uuid=current_user.user_uuid, account_id=body.social_account_id, session=session, ) if not account: logger.warning( f"[UPLOAD_SERVICE] 연동 계정 없음 - " f"user_uuid: {current_user.user_uuid}, social_account_id: {body.social_account_id}" ) raise SocialAccountNotFoundError() # 3. 진행 중인 업로드 확인 (pending 또는 uploading 상태만) in_progress_result = await session.execute( select(SocialUpload).where( SocialUpload.video_id == body.video_id, SocialUpload.social_account_id == account.id, SocialUpload.status.in_([UploadStatus.PENDING.value, UploadStatus.UPLOADING.value]), ) ) in_progress_upload = in_progress_result.scalar_one_or_none() if in_progress_upload: logger.info( f"[UPLOAD_SERVICE] 진행 중인 업로드 존재 - upload_id: {in_progress_upload.id}" ) return SocialUploadResponse( success=True, upload_id=in_progress_upload.id, platform=account.platform, status=in_progress_upload.status, message="이미 업로드가 진행 중입니다.", ) # 4. 업로드 순번 계산 max_seq_result = await session.execute( select(func.coalesce(func.max(SocialUpload.upload_seq), 0)).where( SocialUpload.video_id == body.video_id, SocialUpload.social_account_id == account.id, ) ) next_seq = (max_seq_result.scalar() or 0) + 1 # 5. 새 업로드 레코드 생성 social_upload = SocialUpload( user_uuid=current_user.user_uuid, video_id=body.video_id, social_account_id=account.id, upload_seq=next_seq, platform=account.platform, status=UploadStatus.PENDING.value, upload_progress=0, title=body.title, description=body.description, tags=body.tags, privacy_status=body.privacy_status.value, scheduled_at=body.scheduled_at, platform_options={ **(body.platform_options or {}), "scheduled_at": body.scheduled_at.isoformat() if body.scheduled_at else None, }, retry_count=0, ) session.add(social_upload) await session.commit() await session.refresh(social_upload) logger.info( f"[UPLOAD_SERVICE] 업로드 레코드 생성 - " f"upload_id: {social_upload.id}, video_id: {body.video_id}, " f"account_id: {account.id}, upload_seq: {next_seq}, platform: {account.platform}" ) # 6. 즉시 업로드이면 백그라운드 태스크 등록 now_kst_naive = datetime.now(TIMEZONE).replace(tzinfo=None) is_scheduled = body.scheduled_at and body.scheduled_at > now_kst_naive if not is_scheduled: background_tasks.add_task(process_social_upload, social_upload.id) message = "예약 업로드가 등록되었습니다." if is_scheduled else "업로드 요청이 접수되었습니다." return SocialUploadResponse( success=True, upload_id=social_upload.id, platform=account.platform, status=social_upload.status, message=message, ) async def get_upload_status( self, upload_id: int, current_user: User, session: AsyncSession, ) -> SocialUploadStatusResponse: """업로드 상태 조회""" logger.info(f"[UPLOAD_SERVICE] 상태 조회 - upload_id: {upload_id}") result = await session.execute( select(SocialUpload).where( SocialUpload.id == upload_id, SocialUpload.user_uuid == current_user.user_uuid, ) ) upload = result.scalar_one_or_none() if not upload: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="업로드 정보를 찾을 수 없습니다.", ) return SocialUploadStatusResponse( upload_id=upload.id, video_id=upload.video_id, social_account_id=upload.social_account_id, upload_seq=upload.upload_seq, platform=upload.platform, status=UploadStatus(upload.status), upload_progress=upload.upload_progress, title=upload.title, platform_video_id=upload.platform_video_id, platform_url=upload.platform_url, error_message=upload.error_message, retry_count=upload.retry_count, scheduled_at=upload.scheduled_at, created_at=upload.created_at, uploaded_at=upload.uploaded_at, ) async def get_upload_history( self, current_user: User, session: AsyncSession, tab: str = "all", platform: Optional[SocialPlatform] = None, year: Optional[int] = None, month: Optional[int] = None, page: int = 1, size: int = 20, ) -> SocialUploadHistoryResponse: """업로드 이력 조회 (탭/년월/플랫폼 필터, 페이지네이션)""" now_kst = datetime.now(TIMEZONE) target_year = year or now_kst.year target_month = month or now_kst.month logger.info( f"[UPLOAD_SERVICE] 이력 조회 - " f"user_uuid: {current_user.user_uuid}, tab: {tab}, " f"year: {target_year}, month: {target_month}, page: {page}, size: {size}" ) # 월 범위 계산 last_day = monthrange(target_year, target_month)[1] month_start = datetime(target_year, target_month, 1, 0, 0, 0) month_end = datetime(target_year, target_month, last_day, 23, 59, 59) # 기본 쿼리 (cancelled 제외) base_conditions = [ SocialUpload.user_uuid == current_user.user_uuid, SocialUpload.created_at >= month_start, SocialUpload.created_at <= month_end, SocialUpload.status != UploadStatus.CANCELLED.value, ] query = select(SocialUpload).where(*base_conditions) count_query = select(func.count(SocialUpload.id)).where(*base_conditions) # 탭 필터 적용 if tab == "completed": query = query.where(SocialUpload.status == UploadStatus.COMPLETED.value) count_query = count_query.where(SocialUpload.status == UploadStatus.COMPLETED.value) elif tab == "scheduled": query = query.where( SocialUpload.status == UploadStatus.PENDING.value, SocialUpload.scheduled_at.isnot(None), ) count_query = count_query.where( SocialUpload.status == UploadStatus.PENDING.value, SocialUpload.scheduled_at.isnot(None), ) elif tab == "failed": query = query.where(SocialUpload.status == UploadStatus.FAILED.value) count_query = count_query.where(SocialUpload.status == UploadStatus.FAILED.value) # 플랫폼 필터 적용 if platform: query = query.where(SocialUpload.platform == platform.value) count_query = count_query.where(SocialUpload.platform == platform.value) # 총 개수 조회 total_result = await session.execute(count_query) total = total_result.scalar() or 0 # 페이지네이션 적용 query = ( query.order_by(SocialUpload.created_at.desc()) .offset((page - 1) * size) .limit(size) ) result = await session.execute(query) uploads = result.scalars().all() items = [ SocialUploadHistoryItem( upload_id=upload.id, video_id=upload.video_id, social_account_id=upload.social_account_id, upload_seq=upload.upload_seq, platform=upload.platform, status=upload.status, title=upload.title, platform_url=upload.platform_url, error_message=upload.error_message, scheduled_at=upload.scheduled_at, created_at=upload.created_at, uploaded_at=upload.uploaded_at, ) for upload in uploads ] return SocialUploadHistoryResponse( items=items, total=total, page=page, size=size, ) async def retry_upload( self, upload_id: int, current_user: User, session: AsyncSession, background_tasks: BackgroundTasks, ) -> SocialUploadResponse: """실패한 업로드 재시도""" logger.info(f"[UPLOAD_SERVICE] 재시도 요청 - upload_id: {upload_id}") result = await session.execute( select(SocialUpload).where( SocialUpload.id == upload_id, SocialUpload.user_uuid == current_user.user_uuid, ) ) upload = result.scalar_one_or_none() if not upload: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="업로드 정보를 찾을 수 없습니다.", ) if upload.status not in [UploadStatus.FAILED.value, UploadStatus.CANCELLED.value]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="실패하거나 취소된 업로드만 재시도할 수 있습니다.", ) # 상태 초기화 upload.status = UploadStatus.PENDING.value upload.upload_progress = 0 upload.error_message = None await session.commit() background_tasks.add_task(process_social_upload, upload.id) return SocialUploadResponse( success=True, upload_id=upload.id, platform=upload.platform, status=upload.status, message="업로드 재시도가 요청되었습니다.", ) async def cancel_upload( self, upload_id: int, current_user: User, session: AsyncSession, ) -> MessageResponse: """대기 중인 업로드 취소""" logger.info(f"[UPLOAD_SERVICE] 취소 요청 - upload_id: {upload_id}") result = await session.execute( select(SocialUpload).where( SocialUpload.id == upload_id, SocialUpload.user_uuid == current_user.user_uuid, ) ) upload = result.scalar_one_or_none() if not upload: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="업로드 정보를 찾을 수 없습니다.", ) if upload.status != UploadStatus.PENDING.value: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="대기 중인 업로드만 취소할 수 있습니다.", ) upload.status = UploadStatus.CANCELLED.value await session.commit() return MessageResponse( success=True, message="업로드가 취소되었습니다.", )