O2Sound_ver2_final/backend/app/workers/tasks_service.py

866 lines
37 KiB
Python

import requests
import os
import time
from datetime import datetime
from pathlib import Path
from uuid import UUID
import json
from urllib.parse import urlparse
import logging
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, HttpUrl
from app.services.chatgpt_service import ChatgptService
from app.infra.crawling_refactor.naver_crawling import NaverCrawling
from app.infra.crawling_refactor.integrated_service import IntegratedService
from app.services.mureka_service_fix import MurekaServiceFix
from app.presentation.schemas.mureka_schema import CreateMusic
from app.repositories.music_repository import MusicRepository
from app.repositories.order_repository import OrderRepository
from app.repositories.item_repository import ItemRepository
from app.domain.models.item import Item
from app.domain.models.order import Order
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.domain.models.photo import Photo
from app.repositories.photo_repository import PhotoRepository
from app.infra.media.moviepy_service import MoviepyService
from app.repositories.video_repository import VideoRepository
from app.repositories.user_repository import UserRepository
from app.domain.models.video import Video
from app.core.env_setting import EnvSetting
logger = logging.getLogger(__name__)
# class Song(BaseModel):
# song_id: str
# title: str
# version: str
# duration_milliseconds: int
# generate_at: int
# genres: List[str]
# moods: List[str]
# mp3_url: HttpUrl
# share_key: str
# machine_audit_state: int
# credit_type: int
# cover: HttpUrl
# stream_key: str
# description: Optional[str] = None
# share_link: HttpUrl
# class MusicFeed(BaseModel):
# feed_id: int
# state: int
# songs: List[Song]
# is_accelerated: bool
# generation_method: int
# model: str
# thinking_process_url: Optional[HttpUrl] = None
# generate_at: int
class Word(BaseModel):
start: int
end: int
text: str
class Line(BaseModel):
start: int
end: int
text: str
words: List[Word]
class LyricsSection(BaseModel):
section_type: str
start: int
end: int
lines: Optional[List[Line]] = None
class Choice(BaseModel):
url: str
flac_url: str
duration: int
lyrics_sections: List[LyricsSection]
id: str
index: Optional[int] = None
class MusicFeed(BaseModel):
id: str
created_at: int
finished_at: int
model: str
status: str
choices: List[Choice]
trace_id: str
class TaskService:
"""Celery 작업들의 비즈니스 로직을 처리하는 서비스"""
def __init__(self):
self.chatgpt_service = ChatgptService()
self.naver_crawling = NaverCrawling()
self.integrated_service = IntegratedService()
self.mureka_service = MurekaServiceFix()
self.moviepy_service = MoviepyService()
# 저장 디렉토리 설정
self.save_dir_path = Path("./uploads")
self.save_dir_path.mkdir(exist_ok=True, parents=True)
# 이미지 필터링 모델
self.save_dir_path_images_json = Path("./ai/request")
self.save_dir_path_images_json.mkdir(exist_ok=True, parents=True)
# 비디오 생성 로그 저장 디렉토리
self.save_dir_path_video_log = Path("./success_log")
self.save_dir_path_video_log.mkdir(exist_ok=True, parents=True)
def _get_order_id_by_task_id(self, root_task_id: str) -> UUID:
"""Task ID로 order_id 조회"""
try:
with SessionLocal() as db:
order_repo = OrderRepository(db)
# task_id로 order 검색하는 로직 필요 (DB 스키마에 따라)
# 또는 크롤링 결과 파일에서 order_id 추출
file_path = self._find_crawling_file_by_task_id(root_task_id)
if file_path:
crawl_data = self.integrated_service.load_metadata_from_file(file_path)
# 크롤링 결과에 order_id가 저장되어 있다면
return crawl_data.get("order_id")
return None
except Exception as e:
logger.error(f"order_id 조회 실패: {e}")
return None
def _get_lyrics_data_by_task_id(self, root_task_id: str) -> Dict[str, Any]:
"""Task ID로 가사 데이터 조회"""
try:
# 가사 생성 결과를 파일이나 DB에서 조회
file_path = self._find_crawling_file_by_task_id(root_task_id)
if file_path:
crawl_data = self.integrated_service.load_metadata_from_file(file_path)
return {
"lyrics": "generated_lyrics_here", # 실제 가사 데이터
"name": crawl_data.get("name", ""),
"address": crawl_data.get("address", "")
}
return {}
except Exception as e:
logger.error(f"가사 데이터 조회 실패: {e}")
return {}
def _validate_music_feed(self, music_feed: MusicFeed) -> str:
"""MusicFeed 객체 유효성 검사 및 첫 번째 음악 URL 반환"""
if not music_feed.choices:
raise ValueError("음악 선택지가 없습니다.")
first_choice = music_feed.choices[0]
if not first_choice.url or not first_choice.url.startswith(('http://', 'https://')):
raise ValueError(f"잘못된 URL 형식: {first_choice.url}")
return first_choice.url
def _download_image_from_url(self, url: str, filename: str) -> str:
"""URL에서 이미지를 다운로드해서 로컬 파일 경로 반환"""
try:
response = requests.get(url, timeout=30, stream=True)
response.raise_for_status()
# 임시 파일 경로 생성
file_extension = os.path.splitext(urlparse(url).path)[1] or '.jpg'
local_path = self.save_dir_path / f"{filename}{file_extension}"
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
logger.info(f"이미지 다운로드 완료: {local_path}")
return str(local_path)
except Exception as e:
logger.error(f"이미지 다운로드 실패 - URL: {url}, Error: {e}")
raise Exception(f"이미지 다운로드 실패: {e}")
def _download_images_for_video(self, photo_urls: List[str], root_task_id: str) -> List[str]:
"""비디오 생성용 이미지들을 다운로드"""
local_paths = []
for i, url in enumerate(photo_urls):
filename = f"{root_task_id}_image_{i+1}"
local_path = self._download_image_from_url(url, filename)
local_paths.append(local_path)
logger.info(f"{len(local_paths)}개 이미지 다운로드 완료")
return local_paths
def _download_music_file(self, music_url: str, local_path: Path) -> None:
"""음악 파일 다운로드"""
try:
logger.info(f"음악 파일 다운로드 시작: {music_url}")
response = requests.get(music_url, timeout=300, stream=True)
response.raise_for_status()
# 파일 크기 확인 (선택사항)
content_length = response.headers.get('content-length')
if content_length:
file_size_mb = int(content_length) / (1024 * 1024)
logger.info(f"다운로드할 파일 크기: {file_size_mb:.2f}MB")
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # 빈 청크 필터링
f.write(chunk)
logger.info(f"음악 파일 다운로드 완료: {local_path}")
except requests.RequestException as e:
raise Exception(f"음악 다운로드 실패: {e}")
except IOError as e:
raise Exception(f"파일 저장 실패: {e}")
def _generate_music_filename(self, root_task_id: str) -> Path:
"""음악 파일명 생성"""
now_str = datetime.now().strftime("%y%m%d_%H%M%S")
filename = f"{root_task_id}_music_{now_str}.mp3"
return self.save_dir_path / filename
def _find_crawling_file_by_task_id(self, root_task_id: str) -> str:
"""Task ID로 크롤링 파일 검색"""
try:
# IntegratedService의 저장 경로에서 파일 검색
storage_path = self.integrated_service.base_storage_path
# 파일명 패턴: task_{root_task_id}_날짜시간.json
pattern = f"*task_{root_task_id}*.json"
matching_files = list(storage_path.glob(pattern))
if matching_files:
# 가장 최근 파일 선택 (수정 시간 기준)
latest_file = max(matching_files, key=lambda f: f.stat().st_mtime)
logger.info(f"Task ID로 파일 찾기 성공: {latest_file}")
return str(latest_file)
# 대체 검색: 모든 JSON 파일에서 _crawling_info의 task_id 확인
logger.info(f"패턴 검색 실패, 전체 파일 스캔 시작...")
return self._scan_all_files_for_task_id(storage_path, root_task_id)
except Exception as e:
logger.error(f"파일 검색 실패: {e}")
return None
def _scan_all_files_for_task_id(self, storage_path: Path, root_task_id: str) -> str:
"""모든 JSON 파일을 스캔하여 Task ID 매칭"""
try:
for json_file in storage_path.glob("*.json"):
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# 파일 내용에서 task_id 확인
crawling_info = data.get("_crawling_info", {})
if root_task_id in str(crawling_info) or root_task_id in json_file.name:
logger.info(f"스캔으로 파일 찾기 성공: {json_file}")
return str(json_file)
except Exception as file_error:
logger.debug(f"파일 스캔 중 오류 (스킵): {json_file} - {file_error}")
continue
logger.warning(f"Task ID {root_task_id}에 해당하는 파일을 찾을 수 없습니다.")
return None
except Exception as e:
logger.error(f"전체 파일 스캔 실패: {e}")
return None
def _url_parser(self, url: str) -> str:
"""네이버 지도 URL을 파싱하여 장소 id를 추출하고, metadata 추출용 url을 반환"""
if not url.startswith("https://map.naver.com/p/"):
raise ValueError("유효하지 않은 네이버 지도 URL입니다.")
try:
place_id = url.split('place/')[1].split('?')[0]
if not place_id:
raise ValueError("장소 id를 찾을 수 없습니다.")
print(f"장소 id 추출 완료: {place_id}")
new_url = f"https://map.naver.com/p/entry/place/{place_id}"
print(f"metadata 추출용 url 생성: {new_url}")
return new_url
except (IndexError, AttributeError) as e:
raise ValueError(f"URL 파싱 실패: {e}")
# 로그 추가
def _success_generate_video(self, root_task_id: str, order_id: UUID) -> None:
"""비디오 생성 성공 시 로그 추가"""
try:
with SessionLocal() as db:
order_repo = OrderRepository(db)
user_repo = UserRepository(db)
order = order_repo.get_by_id(order_id)
user = user_repo.get_by_id(order.user_id)
user_id = order.user_id
user_name = user.name
log_file_path = self.save_dir_path_video_log / "video_success_log.txt"
log_file_path.parent.mkdir(exist_ok=True, parents=True)
now_str = datetime.now().strftime("%y%m%d_%H%M%S")
logger.info(f"비디오 생성 성공 로그 저장 시작: {log_file_path}")
# 로그 파일 생성
with open(log_file_path, 'a', encoding='utf-8') as f:
log_entry = f"=== Video Success ===\n"
log_entry += f"User ID: {user_id}\n"
log_entry += f"User Name: {user_name}\n"
log_entry += f"Order ID: {order_id}\n"
log_entry += f"Created At: {now_str}\n\n"
f.write(log_entry)
logger.info(f"✅ 비디오 성공 로그 저장 완료 - Task ID: {root_task_id}, 파일: {log_file_path}")
except IOError as e:
logger.error(f"❌ 로그 파일 저장 실패 - Task ID: {root_task_id}, Error: {e}")
except Exception as e:
logger.error(f"❌ 비디오 성공 로그 처리 실패 - Task ID: {root_task_id}, Error: {e}")
def crawling_metadata(self, user_id: UUID, url: str, root_task_id: str) -> Dict[str, Any]:
"""메타데이터 크롤링 실행 및 item + order 저장"""
logger.info(f"📥 크롤링 시작 - URL: {url}, Task ID: {root_task_id}")
try:
# 1. 크롤링
metadata = self.integrated_service.get_metadata(
url=url,
save_to_file=True,
custom_filename=f"task_{root_task_id}"
)
# 2. 썸네일 이미지 처리 (fallback 구조)
images = metadata.get("images", {})
photo = images.get("photo_image", [])
review = images.get("review_blog_image", [])
thumbnail_url = (photo[0] if photo else None) or (review[0] if review else None)
# 3. 필수값 확인
name = metadata.get("name", "").strip()
address = metadata.get("address", "").strip()
item_url = self._url_parser(url)
# 태그 만들기
category = metadata.get("category", "").strip()
description = metadata.get("description", "").strip()
tags = self.chatgpt_service.extract_tag(name, address, category, description)
item_id = ""
order_id = ""
# 4. 세션 시작
with SessionLocal() as db:
try:
# 5. Item 생성 및 저장
item = Item(
user_id=user_id,
name=name,
address=address,
url=item_url,
phone_number=metadata.get("phone_number", "").strip(),
thumbnail_url=thumbnail_url,
hashtags=tags,
description=description
)
item_repo = ItemRepository(db)
item_repo.create(item)
db.flush() # item.id 확보용
# 6. Order 생성 및 저장
order = Order(
user_id=user_id,
item_id=item.id,
status="PENDING"
)
order_repo = OrderRepository(db)
order_repo.create(order)
db.flush() # order.id 확보용
item_id = item.id
order_id = order.id
# 7. 커밋
db.commit()
logger.info(f"✅ item/order 저장 완료 - task_id={root_task_id}")
except Exception as db_error:
db.rollback()
logger.error(f"❌ DB 저장 실패 - task_id={root_task_id}, error={db_error}")
raise db_error
## 이미지 필터링 모델에 사용될 json 파일 생성
photo_images = photo if isinstance(photo, list) else []
review_images = review if isinstance(review, list) else []
images_json = {
"images" : photo_images + review_images,
}
images_json_file_path = self.save_dir_path_images_json / f"{root_task_id}.json"
try:
with open(images_json_file_path, "w", encoding="utf-8") as f:
json.dump(images_json, f, ensure_ascii=False, indent=4)
logger.info(f"이미지 필터링 모델에 사용될 json 파일 생성 완료: {images_json_file_path}")
except Exception as e:
logger.error(f"이미지 필터링 모델에 사용될 json 파일 생성 실패: {e}")
# 8. 결과 반환
return {
"url": url,
"crawl_data": metadata,
"file_path": metadata.get("_saved_file_path"),
"task_id": root_task_id,
"item_id": item_id,
"order_id": order_id,
}
except Exception as e:
logger.error(f"❌ 크롤링 실패 - task_id={root_task_id}, error={e}")
raise e
def generate_lyrics(self, root_task_id: str) -> Dict[str, Any]:
"""가사 생성 실행 - JSON 파일에서 크롤링 데이터 조회"""
logger.info(f"가사 생성 시작 - Task ID: {root_task_id}")
try:
# Task ID로 크롤링 파일 검색
file_path = self._find_crawling_file_by_task_id(root_task_id)
if not file_path:
raise FileNotFoundError(f"크롤링 데이터 파일을 찾을 수 없습니다. Task ID: {root_task_id}")
# JSON 파일에서 메타데이터 로드
crawl_data = self.integrated_service.load_metadata_from_file(file_path)
if not crawl_data:
raise ValueError(f"크롤링 데이터를 로드할 수 없습니다. 파일: {file_path}")
# 크롤링된 데이터에서 필요한 정보 추출
name = crawl_data.get("name", "")
address = crawl_data.get("address", "")
category = crawl_data.get("category", "")
description = crawl_data.get("description", "")
logger.info(f"크롤링 데이터 로드 완료 - 장소: {name}, 주소: {address}")
# 가사 생성
ai_model = "kt-midm"
lyrics = self.chatgpt_service.generate_lyrics(name, address, category, description, ai_model)
result = {
"lyrics": lyrics,
"name": name,
"address": address,
"category": category,
"file_path": file_path,
"task_id": root_task_id
}
logger.info(f"가사 생성 모델: {ai_model}")
logger.info(f"가사 생성 결과: {lyrics}")
logger.info(f"가사 생성 완료 - Task ID: {root_task_id}")
return result
except Exception as e:
logger.error(f"가사 생성 실패 - Task ID: {root_task_id}, Error: {e}")
raise e
def generate_music(self, data: Dict[str, Any], root_task_id: str, order_id: UUID) -> Dict[str, Any]:
"""음악 생성 실행"""
logger.info(f"🎼 음악 생성 시작 - Task ID: {root_task_id}")
try:
# 🔥 order_id가 None이면 조회
if order_id is None:
order_id = self._get_order_id_by_task_id(root_task_id)
if not order_id:
raise ValueError("order_id를 찾을 수 없습니다")
# 🔥 가사 데이터가 없으면 조회
name = data.get("name", "")
lyrics = data.get("lyrics", "")
if not name or not lyrics:
lyrics_data = self._get_lyrics_data_by_task_id(root_task_id)
name = lyrics_data.get("name", "")
lyrics = lyrics_data.get("lyrics", "")
if not name or not lyrics:
raise ValueError(f"필수 데이터 누락 - name: {bool(name)}, lyrics: {bool(lyrics)}")
# 2. 뮤레카 요청 준비
genre = "pop"
mood = "happy"
prompt = ", ".join(filter(None, [genre, mood]))
# 3. 뮤레카 API 호출
logger.info(f"🎹 뮤레카 API 호출 - 제목: {name}")
music_generate_result = self.mureka_service.generate_music(lyrics, prompt)
if music_generate_result.get("id") is None:
raise ValueError(f"뮤레카 생성 API 실패")
# 4. 뮤레카 노래 가져오기
music_result = self.mureka_service.get_music(music_generate_result.get("id"))
# 5. 진행상황 확인 ( 풀링 방식 )
max_wait_time = 60 * 20 # 3분
start_time = time.time()
while music_result.get("status") != "succeeded":
time.sleep(3)
# 타임아웃 체크
elapsed_time = time.time() - start_time
if elapsed_time > max_wait_time:
raise TimeoutError(f"음악 생성 타임아웃 ({max_wait_time}초 초과)")
music_result = self.mureka_service.get_music(music_generate_result.get("id"))
logger.info(f"뮤레카 진행상황 확인: {music_result.get('status')} (경과시간: {elapsed_time:.1f}초)")
# 실패 상태 확인 (running과 succeeded가 아닌 경우)
if music_result.get("status") not in ["running", "succeeded"]:
raise ValueError(f"뮤레카 진행상황 확인 실패: {music_result.get('status')}")
# 6. 파싱
try:
music_feed = MusicFeed(**music_result)
logger.debug(f"music_feed parsed: {music_feed}")
except Exception as parse_error:
logger.error(f"MusicFeed 파싱 실패: {parse_error}")
raise ValueError(f"뮤레카 응답 파싱 실패: {parse_error}")
# 7. 유효성 검사 및 다운로드 URL 확보
music_url = self._validate_music_feed(music_feed)
local_path = self._generate_music_filename(root_task_id)
self._download_music_file(music_url, local_path)
# 8. DB 저장 준비 - choices 개수 확인
music_save_data_list = []
for idx, choice in enumerate(music_feed.choices):
music_save_data = {
"order_id": order_id,
"index": idx,
"is_selected": idx == 0, # 첫 번째만 선택
"title": f"{music_feed.id}_{idx + 1}_songs",
"url": choice.url,
"duration": choice.duration,
"lyrics": lyrics
}
music_save_data_list.append(music_save_data)
# 9. 트랜잭션 처리
with SessionLocal() as db:
try:
music_repository = MusicRepository(db)
for music_data in music_save_data_list:
music_repository.create(music_data)
db.commit()
logger.info(f"🎵 DB 저장 완료 - Task ID: {root_task_id}, 저장된 음악 수: {len(music_save_data_list)}")
except Exception as db_error:
db.rollback()
logger.error(f"DB 저장 실패 - Task ID: {root_task_id}, Error: {db_error}")
raise db_error
# 10. 최종 결과 반환
result = {
"music_path": str(local_path),
"music_url": music_url,
"name": name,
"task_id": root_task_id,
"file_size": local_path.stat().st_size if local_path.exists() else 0,
"total_choices": len(music_feed.choices)
}
logger.info(f"✅ 음악 생성 완료 - Task ID: {root_task_id}, 파일: {local_path}")
return result
except Exception as e:
logger.error(f"❌ 음악 생성 실패 - Task ID: {root_task_id}, Error: {e}")
raise e
def temp_image_filtering(self, root_task_id: str) -> List[str]:
'''이미지 필터링 모델 오류 or 한 개도 없을시 임시 처리'''
try:
# 1. Task ID로 크롤링 파일 검색
file_path = self._find_crawling_file_by_task_id(root_task_id)
if not file_path:
logger.warning(f"크롤링 데이터 파일을 찾을 수 없습니다. Task ID: {root_task_id}")
return []
# 2. JSON 파일에서 메타데이터 로드
crawl_data = self.integrated_service.load_metadata_from_file(file_path)
if not crawl_data:
logger.warning(f"크롤링 데이터를 로드할 수 없습니다. 파일: {file_path}")
return []
# 3. 이미지 데이터 추출 (상위 5개)
images = crawl_data.get("images", {})
photo_images = images.get("photo_image", [])[:5] # 상위 5개만
if not photo_images:
logger.warning(f"photo_image가 없습니다. Task ID: {root_task_id}")
return []
logger.info(f"임시 필터링 이미지 개수: {len(photo_images)}")
return photo_images
except Exception as e:
logger.error(f"임시 이미지 필터링 실패 - Task ID: {root_task_id}, Error: {e}")
return []
def ai_model_image_filtering(self, root_task_id: str) -> List[str]:
'''AI 모델 이미지 필터링'''
try:
check_folder_path = Path("./ai/response") / f"{root_task_id}"
if not check_folder_path.exists():
logger.info(f"이미지 필터링 모델 결과값 폴더가 없습니다. Task ID: {root_task_id}")
return []
logger.info(f"필터링된 이미지 확인 시작")
# done.txt 파일 생성까지 대기
done_file_path = check_folder_path / "done.txt"
max_wait_time = 60 * 3 # 3분
check_interval = 3 # 3초마다 확인
waited_time = 0
logger.info(f"🔄 done.txt 파일 생성 대기 중... - {done_file_path}")
while not done_file_path.exists():
time.sleep(check_interval)
waited_time += check_interval
if waited_time >= max_wait_time:
logger.warning(f"{max_wait_time}초 타임아웃 - done.txt 파일이 생성되지 않았습니다. Task ID: {root_task_id}")
return []
logger.info(f"✅ done.txt 파일 생성 완료 - Task ID: {root_task_id}")
# 필터링된 이미지 파일들 수집
filter_model_images = []
for file_path in check_folder_path.glob(f"{root_task_id}_*.jpg"):
filter_model_images.append(file_path)
# 파일명 순서대로 정렬
filter_model_images.sort(key=lambda x: int(x.stem.split('_')[-1]))
# Backend URL로 변환 (실제 파일명 그대로 사용)
image_urls = []
settings = EnvSetting()
current_backend_url = settings.CURRENT_BACKEND_URL
for image_file in filter_model_images:
relative_path = f"response/{root_task_id}/{image_file.name}"
image_url = f"{current_backend_url}/ai/{relative_path}"
image_urls.append(image_url)
logger.info(f"🖼️ AI 모델 이미지 URL 변환 완료 - {len(image_urls)}개 파일")
return image_urls
except Exception as e:
logger.warning(f"AI 모델 이미지 필터링 실패 - Task ID: {root_task_id}, Error: {e}")
return []
def image_filtering(self, data: Dict[str, Any], root_task_id: str, order_id: UUID) -> Dict[str, Any]:
"""이미지 필터링 실행"""
logger.info(f"📸 이미지 필터링 시작 - Task ID: {root_task_id}")
try:
image_filtering_result = ""
# 1. AI 모델 필터링 시도
photo_images = self.ai_model_image_filtering(root_task_id)
image_filtering_result = "ai"
# 2. AI 모델 실패 시 임시 처리
if not photo_images:
logger.info(f"AI 모델 필터링 실패, 임시 처리로 전환 - Task ID: {root_task_id}")
photo_images = self.temp_image_filtering(root_task_id)
image_filtering_result = "temp"
# 3. 최종적으로도 이미지가 없는 경우
if not photo_images:
logger.error(f"필터링할 이미지가 전혀 없습니다 - Task ID: {root_task_id}")
raise ValueError(f"필터링할 이미지가 없습니다 - Task ID: {root_task_id}")
# 4. Photo 데이터 생성
photo_data_list = []
for index, image_url in enumerate(photo_images, 1):
photo_data = {
"order_id": order_id,
"name": f"filtered_image_{index}",
"url": image_url,
"video_index": index,
"is_selected": True
}
photo_data_list.append(photo_data)
# 5. DB 저장 (트랜잭션 처리)
with SessionLocal() as db:
try:
photo_repository = PhotoRepository(db)
for photo_data in photo_data_list:
photo_repository.create(photo_data)
db.commit()
logger.info(f"📷 사진 DB 저장 완료 - Task ID: {root_task_id}, 저장된 이미지 수: {len(photo_data_list)}")
except Exception as db_error:
db.rollback()
logger.error(f"사진 DB 저장 실패 - Task ID: {root_task_id}, Error: {db_error}")
raise db_error
# 6. 결과 반환
result = {
"filtered_images": photo_images,
"image_count": len(photo_images),
"task_id": root_task_id,
"order_id": order_id,
"image_filtering":image_filtering_result
}
logger.info(f"✅ 이미지 필터링 완료 - Task ID: {root_task_id}")
return result
except Exception as e:
logger.error(f"❌ 이미지 필터링 실패 - Task ID: {root_task_id}, Error: {e}")
raise e
def generate_video(self, data: Dict[str, Any], root_task_id: str, order_id: UUID, music_path: str, image_filtering_result: str) -> Dict[str, Any]:
"""비디오 생성 실행"""
logger.info(f"🎬 비디오 생성 시작 - Task ID: {root_task_id}")
try:
# 1. DB에서 정보 가져오기
with SessionLocal() as db:
item_repo = ItemRepository(db)
photo_repo = PhotoRepository(db)
# 업체 정보 조회
item = item_repo.get_by_order_id(order_id)
if not item:
raise ValueError(f"주문 ID {order_id}에 해당하는 업체를 찾을 수 없습니다.")
name = item.name
address = item.address
description = item.description or ""
tags = item.hashtags or []
# 사진들 조회
photos = photo_repo.get_by_order_id(order_id)
if not photos:
raise ValueError(f"주문 ID {order_id}에 해당하는 사진이 없습니다.")
# 사진 URL들 추출
photo_urls = [photo.url for photo in photos]
logger.info(f"비디오 생성용 이미지 {len(photo_urls)}개 조회 완료")
# 2. 이미지 처리 (AI vs Temp 구분)
if image_filtering_result == "ai":
# AI 결과: 내부 파일이므로 URL을 로컬 경로로 변환
local_image_paths = []
settings = EnvSetting()
current_backend_url = settings.CURRENT_BACKEND_URL
for photo_url in photo_urls:
# URL에서 로컬 경로 추출
# 예: https://domain/ai/response/task_123/task_123_1.jpg -> ./ai/response/task_123/task_123_1.jpg
if photo_url.startswith(current_backend_url):
# URL에서 /ai/ 이후 부분 추출
relative_path = photo_url.replace(f"{current_backend_url}/ai/", "")
local_path = Path("./ai") / relative_path
# 파일이 실제로 존재하는지 확인
if local_path.exists():
local_image_paths.append(str(local_path))
logger.debug(f"AI 이미지 로컬 경로 변환: {photo_url} -> {local_path}")
else:
logger.warning(f"AI 이미지 파일이 존재하지 않음: {local_path}")
else:
logger.warning(f"예상과 다른 URL 형식: {photo_url}")
if not local_image_paths:
raise ValueError("AI 모델 결과에서 유효한 로컬 이미지 경로를 찾을 수 없습니다.")
logger.info(f"AI 모델 로컬 이미지 사용: {len(local_image_paths)}")
else:
# Temp 결과: 외부 URL이므로 다운로드 필요
local_image_paths = self._download_images_for_video(photo_urls, root_task_id)
# 3. 비디오 생성 (로컬 파일 경로 사용)
output_filename = f"{root_task_id}_generated_video.mp4"
video_path = self.moviepy_service.create_video_from_existing_images(
image_paths=local_image_paths,
music_path=music_path,
output_filename=output_filename
)
# 4. 비디오 정보 DB 저장
with SessionLocal() as db:
try:
video_repo = VideoRepository(db)
# Music ID 조회
music_repo = MusicRepository(db)
music = music_repo.get_by_order_id(order_id)
if not music:
raise ValueError(f"주문 ID {order_id}에 해당하는 음악을 찾을 수 없습니다.")
video_data = {
"music_id": music.id,
"order_id": order_id,
"title": f"{name} 홍보영상",
"description": f"{name} ({address}) - {description}",
"url": video_path,
"is_uploaded": False,
"download_count": 0,
"resolution": "1080p",
"status": "완료됨",
"thumbnail_url": photo_urls[0] if photo_urls else None
}
video_repo.create(video_data)
db.commit()
logger.info(f"🎥 비디오 DB 저장 완료 - Task ID: {root_task_id}")
except Exception as db_error:
db.rollback()
logger.error(f"비디오 DB 저장 실패 - Task ID: {root_task_id}, Error: {db_error}")
raise db_error
# 5. 결과 반환
result = {
"video_path": video_path,
"video_title": f"{name} 홍보영상",
"image_count": len(local_image_paths),
"task_id": root_task_id,
"tags": tags
}
logger.info(f"✅ 비디오 생성 완료 - Task ID: {root_task_id}")
# 로그 저장 로직 추가
self._success_generate_video(root_task_id, order_id)
return result
except Exception as e:
logger.error(f"❌ 비디오 생성 실패 - Task ID: {root_task_id}, Error: {e}")
raise e