428 lines
16 KiB
Python
428 lines
16 KiB
Python
import time
|
|
import logging
|
|
from uuid import UUID
|
|
from typing import Dict, Any, List
|
|
from app.core.celery_app import celery_app
|
|
from app.workers.tasks_service import TaskService
|
|
from app.workers.progress_tracker import ProgressTracker
|
|
from app.core.redis.redis_manager import RedisManager
|
|
from celery.exceptions import SoftTimeLimitExceeded
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_progress_tracker() -> ProgressTracker:
|
|
"""공통 ProgressTracker 인스턴스 생성"""
|
|
redis_manager = RedisManager()
|
|
return ProgressTracker(redis_manager)
|
|
|
|
|
|
def _get_current_step(status: Dict[str, bool]) -> str:
|
|
"""현재 진행 중인 단계 반환"""
|
|
if not status:
|
|
return "preparing"
|
|
|
|
step_order = ["crawling", "lyrics", "music", "images", "video", "cleanup"]
|
|
|
|
for step in step_order:
|
|
if not status.get(step, False):
|
|
return step
|
|
|
|
return "completed"
|
|
|
|
|
|
def get_task_progress_info(root_task_id: str) -> Dict[str, Any]:
|
|
"""
|
|
작업 진행률 정보 조회 (FastAPI 엔드포인트에서 사용)
|
|
|
|
Args:
|
|
root_task_id: 작업 ID
|
|
|
|
Returns:
|
|
Dict: 진행률 정보
|
|
"""
|
|
try:
|
|
progress = _get_progress_tracker()
|
|
status = progress.get_task_status(root_task_id)
|
|
percentage = progress.get_progress_percentage(root_task_id)
|
|
is_completed = progress.is_task_completed(root_task_id)
|
|
order_id = progress.get_order_id(root_task_id)
|
|
|
|
return {
|
|
"task_id": root_task_id,
|
|
"status": status,
|
|
"percentage": percentage,
|
|
"is_completed": is_completed,
|
|
"current_step": _get_current_step(status),
|
|
"total_steps": len(ProgressTracker.DEFAULT_STATUS),
|
|
"order_id": order_id
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"진행률 조회 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
return {
|
|
"task_id": root_task_id,
|
|
"status": {},
|
|
"percentage": 0.0,
|
|
"is_completed": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
@celery_app.task(bind=True, name="integrated_workflow_task", time_limit=1200, soft_time_limit=1080)
|
|
def task_integrated_workflow(self, user_id: str, url: str, root_task_id: str) -> Dict[str, Any]:
|
|
"""
|
|
통합 워크플로우: 모든 단계를 순차적으로 동기 실행
|
|
"""
|
|
progress = _get_progress_tracker()
|
|
task_service = TaskService()
|
|
|
|
try:
|
|
logger.info(f"🚀 통합 워크플로우 시작 - URL: {url}, Task ID: {root_task_id}")
|
|
|
|
# 1단계: 크롤링
|
|
logger.info(f"📥 1단계: 크롤링 시작 - Task ID: {root_task_id}")
|
|
crawling_result = task_service.crawling_metadata(user_id, url, root_task_id)
|
|
progress.set_task_step_done(root_task_id, "crawling")
|
|
logger.info(f"✅ 1단계: 크롤링 완료 - Task ID: {root_task_id}")
|
|
|
|
order_id = crawling_result.get("order_id")
|
|
|
|
# 2단계: 가사 생성
|
|
logger.info(f"🎵 2단계: 가사 생성 시작 - Task ID: {root_task_id}")
|
|
lyrics_result = task_service.generate_lyrics(root_task_id)
|
|
progress.set_task_step_done(root_task_id, "lyrics")
|
|
logger.info(f"✅ 2단계: 가사 생성 완료 - Task ID: {root_task_id}")
|
|
|
|
# 3단계: 음악 생성
|
|
logger.info(f"🎼 3단계: 음악 생성 시작 - Task ID: {root_task_id}, order_id: {order_id}")
|
|
music_result = task_service.generate_music(lyrics_result, root_task_id, order_id)
|
|
progress.set_task_step_done(root_task_id, "music")
|
|
progress.set_order_id(root_task_id, order_id)
|
|
logger.info(f"✅ 3단계: 음악 생성 완료 - Task ID: {root_task_id}, order_id: {order_id}")
|
|
|
|
# 4단계: 이미지 필터링
|
|
logger.info(f"📸 4단계: 이미지 필터링 시작 - Task ID: {root_task_id}")
|
|
image_result = task_service.image_filtering({}, root_task_id, order_id)
|
|
progress.set_task_step_done(root_task_id, "images")
|
|
logger.info(f"✅ 4단계: 이미지 필터링 완료 - Task ID: {root_task_id}")
|
|
image_filtering_result = image_result.get("image_filtering", "")
|
|
|
|
# 5단계: 비디오 생성
|
|
logger.info(f"🎬 5단계: 비디오 생성 시작 - Task ID: {root_task_id}")
|
|
video_result = task_service.generate_video(
|
|
{},
|
|
root_task_id,
|
|
order_id,
|
|
music_result.get("music_path", ""),
|
|
image_filtering_result
|
|
)
|
|
progress.set_task_step_done(root_task_id, "video")
|
|
logger.info(f"✅ 5단계: 비디오 생성 완료 - Task ID: {root_task_id}")
|
|
|
|
# 전체 결과 반환
|
|
final_result = {
|
|
"success": True,
|
|
"message": "전체 워크플로우 완료",
|
|
"order_id": order_id,
|
|
"task_id": root_task_id,
|
|
"crawling_info": {
|
|
"place_name": crawling_result.get("crawl_data", {}).get("name", ""),
|
|
"file_path": crawling_result.get("file_path", "")
|
|
},
|
|
"lyrics_info": {
|
|
"lyrics": lyrics_result.get("lyrics", ""),
|
|
"name": lyrics_result.get("name", ""),
|
|
"address": lyrics_result.get("address", "")
|
|
},
|
|
"music_info": {
|
|
"music_path": music_result.get("music_path", ""),
|
|
"music_url": music_result.get("music_url", ""),
|
|
"file_size": music_result.get("file_size", 0)
|
|
},
|
|
"image_info": {
|
|
"filtered_images": image_result.get("filtered_images", []),
|
|
"image_count": image_result.get("image_count", 0),
|
|
"image_filtering": image_result.get("image_filtering", "")
|
|
},
|
|
"video_info": {
|
|
"video_path": video_result.get("video_path", ""),
|
|
"video_title": video_result.get("video_title", ""),
|
|
"image_count": video_result.get("image_count", 0),
|
|
"tags": video_result.get("tags", [])
|
|
}
|
|
}
|
|
|
|
logger.info(f"🎉 전체 워크플로우 완료 - Task ID: {root_task_id}")
|
|
return final_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 통합 워크플로우 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
|
|
# 재시도 로직
|
|
if self.request.retries < 2: # 최대 2회 재시도
|
|
logger.info(f"🔄 워크플로우 재시도 {self.request.retries + 1}/2 - Task ID: {root_task_id}")
|
|
raise self.retry(countdown=30, max_retries=2, exc=e)
|
|
else:
|
|
# 최대 재시도 횟수 초과시 원본 예외 발생
|
|
raise e
|
|
|
|
|
|
@celery_app.task(bind=True, name="crawling_task")
|
|
def task_crawling_metadata(self, user_id: UUID, url: str, root_task_id: str) -> Dict[str, Any]:
|
|
"""
|
|
1단계: 메타데이터 크롤링 및 JSON 파일 저장
|
|
|
|
Args:
|
|
user_id: 사용자 ID
|
|
url: 크롤링할 URL
|
|
root_task_id: 전체 워크플로우 추적 ID
|
|
|
|
Returns:
|
|
Dict: URL, 크롤링된 메타데이터, 파일 경로
|
|
"""
|
|
progress = _get_progress_tracker()
|
|
|
|
try:
|
|
logger.info(f"🚀 크롤링 작업 시작 - URL: {url}, Task ID: {root_task_id}")
|
|
|
|
# 실제 작업 수행
|
|
task_service = TaskService()
|
|
result = task_service.crawling_metadata(user_id, url, root_task_id)
|
|
|
|
# 진행률 업데이트
|
|
progress.set_task_step_done(root_task_id, "crawling")
|
|
|
|
logger.info(f"✅ 크롤링 작업 완료 - Task ID: {root_task_id}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ [Crawling Task] Error - URL: {url}, Root Task ID: {root_task_id}, Error: {e}")
|
|
|
|
# 재시도 로직
|
|
if self.request.retries < 3:
|
|
raise self.retry(countdown=20, max_retries=3, exc=e)
|
|
else:
|
|
# 최대 재시도 횟수 초과시 원본 예외 발생
|
|
raise e
|
|
|
|
|
|
@celery_app.task(bind=True, name="lyrics_generation_task")
|
|
def task_generate_lyrics(self, root_task_id: str) -> Dict[str, Any]:
|
|
"""
|
|
2단계: JSON 파일에서 데이터 로드 후 가사 생성
|
|
|
|
Args:
|
|
root_task_id: 전체 워크플로우 추적 ID
|
|
|
|
Returns:
|
|
Dict: 생성된 가사 데이터
|
|
"""
|
|
progress = _get_progress_tracker()
|
|
|
|
try:
|
|
logger.info(f"🎵 가사 생성 작업 시작 - Task ID: {root_task_id}")
|
|
|
|
# 크롤링 완료 대기
|
|
timeout = 300 # 5분
|
|
start_time = time.time()
|
|
while not progress.get_task_status(root_task_id).get("crawling", False):
|
|
if time.time() - start_time > timeout:
|
|
raise Exception("크롤링 단계 완료 대기 타임아웃")
|
|
logger.info(f"크롤링 완료 대기 중... - Task ID: {root_task_id}")
|
|
time.sleep(5)
|
|
|
|
# 실제 작업 수행
|
|
task_service = TaskService()
|
|
result = task_service.generate_lyrics(root_task_id)
|
|
|
|
# 진행률 업데이트
|
|
progress.set_task_step_done(root_task_id, "lyrics")
|
|
|
|
logger.info(f"✅ 가사 생성 작업 완료 - Task ID: {root_task_id}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ [Lyrics Generation Task] Error - Root Task ID: {root_task_id}, Error: {e}")
|
|
|
|
# 재시도 로직
|
|
if self.request.retries < 3:
|
|
raise self.retry(countdown=20, max_retries=3, exc=e)
|
|
else:
|
|
# 최대 재시도 횟수 초과시 원본 예외 발생
|
|
raise e
|
|
|
|
|
|
@celery_app.task(bind=True, name="generate_music_task")
|
|
def task_generate_music(self, data: Dict[str, Any], root_task_id: str, order_id: UUID) -> Dict[str, Any]:
|
|
"""
|
|
3단계: 음악 생성
|
|
|
|
Args:
|
|
data: 이전 단계에서 전달받은 데이터 (가사 포함)
|
|
root_task_id: 전체 워크플로우 추적 ID
|
|
order_id: 주문 ID
|
|
|
|
Returns:
|
|
Dict: 생성된 음악 파일 경로
|
|
"""
|
|
progress = _get_progress_tracker()
|
|
|
|
try:
|
|
logger.info(f"🎼 음악 생성 작업 시작 - Task ID: {root_task_id}")
|
|
|
|
# 가사 생성 완료 대기
|
|
timeout = 300
|
|
start_time = time.time()
|
|
while not progress.get_task_status(root_task_id).get("lyrics", False):
|
|
if time.time() - start_time > timeout:
|
|
raise Exception("가사 생성 단계 완료 대기 타임아웃")
|
|
logger.info(f"가사 생성 완료 대기 중... - Task ID: {root_task_id}")
|
|
time.sleep(5)
|
|
|
|
# 실제 작업 수행
|
|
task_service = TaskService()
|
|
result = task_service.generate_music(data, root_task_id, order_id)
|
|
|
|
# 진행률 업데이트
|
|
progress.set_task_step_done(root_task_id, "music")
|
|
|
|
logger.info(f"✅ 음악 생성 작업 완료 - Task ID: {root_task_id}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ [Generate Music Task] Error - Root Task ID: {root_task_id}, Error: {e}")
|
|
|
|
# 재시도 로직
|
|
if self.request.retries < 3:
|
|
raise self.retry(countdown=20, max_retries=3, exc=e)
|
|
else:
|
|
# 최대 재시도 횟수 초과시 원본 예외 발생
|
|
raise e
|
|
|
|
|
|
@celery_app.task(bind=True, name="image_filtering_task")
|
|
def task_image_filtering(self, data: Dict[str, Any], root_task_id: str, order_id: UUID) -> Dict[str, Any]:
|
|
"""
|
|
4단계: 이미지 필터링
|
|
|
|
Args:
|
|
data: 이전 단계에서 전달받은 데이터
|
|
root_task_id: 전체 워크플로우 추적 ID
|
|
order_id: 주문 ID
|
|
|
|
Returns:
|
|
Dict: 필터링된 이미지 파일 경로들
|
|
"""
|
|
progress = _get_progress_tracker()
|
|
|
|
try:
|
|
logger.info(f"📸 이미지 필터링 작업 시작 - Task ID: {root_task_id}")
|
|
|
|
# 실제 작업 수행
|
|
task_service = TaskService()
|
|
result = task_service.image_filtering(data, root_task_id, order_id)
|
|
|
|
# 진행률 업데이트
|
|
progress.set_task_step_done(root_task_id, "images")
|
|
|
|
logger.info(f"✅ 이미지 필터링 작업 완료 - Task ID: {root_task_id}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ [Image Filtering Task] Error - Root Task ID: {root_task_id}, Error: {e}")
|
|
|
|
# 재시도 로직
|
|
if self.request.retries < 3:
|
|
raise self.retry(countdown=20, max_retries=3, exc=e)
|
|
else:
|
|
# 최대 재시도 횟수 초과시 원본 예외 발생
|
|
raise e
|
|
|
|
|
|
@celery_app.task(bind=True, name="generate_video_task", time_limit=1800, soft_time_limit=1500) # 30분 하드, 25분 소프트
|
|
def task_generate_video(self, data: Dict[str, Any], root_task_id: str, order_id: UUID, music_path: str, image_filtering_result: str) -> Dict[str, Any]:
|
|
"""
|
|
5단계: 비디오 생성
|
|
|
|
Args:
|
|
data: 이전 단계에서 전달받은 데이터
|
|
root_task_id: 전체 워크플로우 추적 ID
|
|
order_id: 주문 ID
|
|
music_path: 음악 파일 경로
|
|
|
|
Returns:
|
|
Dict: 생성된 비디오 파일 경로
|
|
"""
|
|
progress = _get_progress_tracker()
|
|
|
|
try:
|
|
logger.info(f"🎬 비디오 생성 작업 시작 - Task ID: {root_task_id}")
|
|
|
|
# 실제 작업 수행
|
|
task_service = TaskService()
|
|
result = task_service.generate_video(data, root_task_id, order_id, music_path)
|
|
|
|
# 진행률 업데이트
|
|
progress.set_task_step_done(root_task_id, "video")
|
|
|
|
logger.info(f"✅ 비디오 생성 작업 완료 - Task ID: {root_task_id}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ [Generate Video Task] Error - Root Task ID: {root_task_id}, Error: {e}")
|
|
|
|
# 재시도 로직 (시간 제한 예외는 재시도하지 않음)
|
|
if isinstance(e, SoftTimeLimitExceeded):
|
|
logger.error(f"⏰ 비디오 생성 시간 초과 - Task ID: {root_task_id}")
|
|
raise e
|
|
elif self.request.retries < 3:
|
|
raise self.retry(countdown=20, max_retries=3, exc=e)
|
|
else:
|
|
raise e
|
|
|
|
|
|
@celery_app.task(bind=True, name="cleanup_task")
|
|
def task_cleanup(self, root_task_id: str) -> Dict[str, Any]:
|
|
"""
|
|
정리 작업: 임시 파일 및 Redis 데이터 정리
|
|
|
|
Args:
|
|
root_task_id: 정리할 작업 ID
|
|
|
|
Returns:
|
|
Dict: 정리 결과
|
|
"""
|
|
progress = _get_progress_tracker()
|
|
|
|
try:
|
|
logger.info(f"🧹 정리 작업 시작 - Task ID: {root_task_id}")
|
|
|
|
# TODO: 실제 정리 로직 구현
|
|
# - 임시 파일 삭제
|
|
# - Redis 데이터 정리 (진행률 제외)
|
|
# - 로그 아카이브
|
|
|
|
deleted_files = 0
|
|
deleted_redis_keys = 0
|
|
|
|
# 진행률 상태 업데이트
|
|
progress.set_task_step_done(root_task_id, "cleanup")
|
|
|
|
# 작업 완료 후 진행률 데이터 삭제 (선택사항)
|
|
# progress.delete_task_status(root_task_id) # 진행률 기록을 유지하려면 주석 처리
|
|
|
|
result = {
|
|
"task_id": root_task_id,
|
|
"deleted_files": deleted_files,
|
|
"deleted_redis_keys": deleted_redis_keys,
|
|
"status": "completed",
|
|
"progress_percentage": progress.get_progress_percentage(root_task_id)
|
|
}
|
|
|
|
logger.info(f"✅ 정리 작업 완료 - Task ID: {root_task_id}, 진행률: {result['progress_percentage']}%")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ [cleanup] 오류 - Task ID: {root_task_id}, Error: {e}")
|
|
# 정리 작업은 재시도하지 않음 (무한 루프 방지)
|
|
return {"status": "failed", "error": str(e)} |