o2o-castad-backend/app/video/worker/video_task.py

102 lines
3.9 KiB
Python

"""
Video Background Tasks
영상 생성 관련 백그라운드 태스크를 정의합니다.
"""
from datetime import date
from pathlib import Path
import aiofiles
import httpx
from sqlalchemy import select
from app.database.session import AsyncSessionLocal
from app.video.models import Video
from app.utils.common import generate_task_id
from config import prj_settings
async def download_and_save_video(
task_id: str,
video_url: str,
store_name: str,
) -> None:
"""백그라운드에서 영상을 다운로드하고 Video 테이블을 업데이트합니다.
Args:
task_id: 프로젝트 task_id
video_url: 다운로드할 영상 URL
store_name: 저장할 파일명에 사용할 업체명
"""
print(f"[download_and_save_video] START - task_id: {task_id}, store_name: {store_name}")
try:
# 저장 경로 생성: media/{날짜}/{uuid7}/{store_name}.mp4
today = date.today().isoformat()
unique_id = await generate_task_id()
# 파일명에 사용할 수 없는 문자 제거
safe_store_name = "".join(
c for c in store_name if c.isalnum() or c in (" ", "_", "-")
).strip()
safe_store_name = safe_store_name or "video"
file_name = f"{safe_store_name}.mp4"
# 절대 경로 생성
media_dir = Path("media") / today / unique_id
media_dir.mkdir(parents=True, exist_ok=True)
file_path = media_dir / file_name
print(f"[download_and_save_video] Directory created - path: {file_path}")
# 영상 파일 다운로드
print(f"[download_and_save_video] Downloading video - task_id: {task_id}, url: {video_url}")
async with httpx.AsyncClient() as client:
response = await client.get(video_url, timeout=120.0) # 영상은 더 큰 timeout
response.raise_for_status()
async with aiofiles.open(str(file_path), "wb") as f:
await f.write(response.content)
print(f"[download_and_save_video] File saved - task_id: {task_id}, path: {file_path}")
# 프론트엔드에서 접근 가능한 URL 생성
relative_path = f"/media/{today}/{unique_id}/{file_name}"
base_url = f"http://{prj_settings.PROJECT_DOMAIN}"
file_url = f"{base_url}{relative_path}"
print(f"[download_and_save_video] URL generated - task_id: {task_id}, url: {file_url}")
# Video 테이블 업데이트 (새 세션 사용)
async with AsyncSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute(
select(Video)
.where(Video.task_id == task_id)
.order_by(Video.created_at.desc())
.limit(1)
)
video = result.scalar_one_or_none()
if video:
video.status = "completed"
video.result_movie_url = file_url
await session.commit()
print(f"[download_and_save_video] SUCCESS - task_id: {task_id}, status: completed")
else:
print(f"[download_and_save_video] Video NOT FOUND in DB - task_id: {task_id}")
except Exception as e:
print(f"[download_and_save_video] EXCEPTION - task_id: {task_id}, error: {e}")
# 실패 시 Video 테이블 업데이트
async with AsyncSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute(
select(Video)
.where(Video.task_id == task_id)
.order_by(Video.created_at.desc())
.limit(1)
)
video = result.scalar_one_or_none()
if video:
video.status = "failed"
await session.commit()
print(f"[download_and_save_video] FAILED - task_id: {task_id}, status updated to failed")