866 lines
37 KiB
Python
866 lines
37 KiB
Python
import requests
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from uuid import UUID
|
|
import json
|
|
from urllib.parse import urlparse
|
|
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
from pydantic import BaseModel, HttpUrl
|
|
from app.services.chatgpt_service import ChatgptService
|
|
from app.infra.crawling_refactor.naver_crawling import NaverCrawling
|
|
from app.infra.crawling_refactor.integrated_service import IntegratedService
|
|
from app.services.mureka_service_fix import MurekaServiceFix
|
|
from app.presentation.schemas.mureka_schema import CreateMusic
|
|
from app.repositories.music_repository import MusicRepository
|
|
from app.repositories.order_repository import OrderRepository
|
|
from app.repositories.item_repository import ItemRepository
|
|
from app.domain.models.item import Item
|
|
from app.domain.models.order import Order
|
|
from sqlalchemy.orm import Session
|
|
from app.core.database import SessionLocal
|
|
from app.domain.models.photo import Photo
|
|
from app.repositories.photo_repository import PhotoRepository
|
|
from app.infra.media.moviepy_service import MoviepyService
|
|
from app.repositories.video_repository import VideoRepository
|
|
from app.repositories.user_repository import UserRepository
|
|
from app.domain.models.video import Video
|
|
from app.core.env_setting import EnvSetting
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# class Song(BaseModel):
|
|
# song_id: str
|
|
# title: str
|
|
# version: str
|
|
# duration_milliseconds: int
|
|
# generate_at: int
|
|
# genres: List[str]
|
|
# moods: List[str]
|
|
# mp3_url: HttpUrl
|
|
# share_key: str
|
|
# machine_audit_state: int
|
|
# credit_type: int
|
|
# cover: HttpUrl
|
|
# stream_key: str
|
|
# description: Optional[str] = None
|
|
# share_link: HttpUrl
|
|
|
|
# class MusicFeed(BaseModel):
|
|
# feed_id: int
|
|
# state: int
|
|
# songs: List[Song]
|
|
# is_accelerated: bool
|
|
# generation_method: int
|
|
# model: str
|
|
# thinking_process_url: Optional[HttpUrl] = None
|
|
# generate_at: int
|
|
|
|
class Word(BaseModel):
|
|
start: int
|
|
end: int
|
|
text: str
|
|
|
|
class Line(BaseModel):
|
|
start: int
|
|
end: int
|
|
text: str
|
|
words: List[Word]
|
|
|
|
class LyricsSection(BaseModel):
|
|
section_type: str
|
|
start: int
|
|
end: int
|
|
lines: Optional[List[Line]] = None
|
|
|
|
class Choice(BaseModel):
|
|
url: str
|
|
flac_url: str
|
|
duration: int
|
|
lyrics_sections: List[LyricsSection]
|
|
id: str
|
|
index: Optional[int] = None
|
|
|
|
class MusicFeed(BaseModel):
|
|
id: str
|
|
created_at: int
|
|
finished_at: int
|
|
model: str
|
|
status: str
|
|
choices: List[Choice]
|
|
trace_id: str
|
|
|
|
|
|
|
|
|
|
|
|
class TaskService:
|
|
"""Celery 작업들의 비즈니스 로직을 처리하는 서비스"""
|
|
|
|
def __init__(self):
|
|
self.chatgpt_service = ChatgptService()
|
|
self.naver_crawling = NaverCrawling()
|
|
self.integrated_service = IntegratedService()
|
|
self.mureka_service = MurekaServiceFix()
|
|
self.moviepy_service = MoviepyService()
|
|
|
|
# 저장 디렉토리 설정
|
|
self.save_dir_path = Path("./uploads")
|
|
self.save_dir_path.mkdir(exist_ok=True, parents=True)
|
|
# 이미지 필터링 모델
|
|
self.save_dir_path_images_json = Path("./ai/request")
|
|
self.save_dir_path_images_json.mkdir(exist_ok=True, parents=True)
|
|
# 비디오 생성 로그 저장 디렉토리
|
|
self.save_dir_path_video_log = Path("./success_log")
|
|
self.save_dir_path_video_log.mkdir(exist_ok=True, parents=True)
|
|
|
|
def _get_order_id_by_task_id(self, root_task_id: str) -> UUID:
|
|
"""Task ID로 order_id 조회"""
|
|
try:
|
|
with SessionLocal() as db:
|
|
order_repo = OrderRepository(db)
|
|
# task_id로 order 검색하는 로직 필요 (DB 스키마에 따라)
|
|
# 또는 크롤링 결과 파일에서 order_id 추출
|
|
file_path = self._find_crawling_file_by_task_id(root_task_id)
|
|
if file_path:
|
|
crawl_data = self.integrated_service.load_metadata_from_file(file_path)
|
|
# 크롤링 결과에 order_id가 저장되어 있다면
|
|
return crawl_data.get("order_id")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"order_id 조회 실패: {e}")
|
|
return None
|
|
|
|
def _get_lyrics_data_by_task_id(self, root_task_id: str) -> Dict[str, Any]:
|
|
"""Task ID로 가사 데이터 조회"""
|
|
try:
|
|
# 가사 생성 결과를 파일이나 DB에서 조회
|
|
file_path = self._find_crawling_file_by_task_id(root_task_id)
|
|
if file_path:
|
|
crawl_data = self.integrated_service.load_metadata_from_file(file_path)
|
|
return {
|
|
"lyrics": "generated_lyrics_here", # 실제 가사 데이터
|
|
"name": crawl_data.get("name", ""),
|
|
"address": crawl_data.get("address", "")
|
|
}
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"가사 데이터 조회 실패: {e}")
|
|
return {}
|
|
|
|
|
|
def _validate_music_feed(self, music_feed: MusicFeed) -> str:
|
|
"""MusicFeed 객체 유효성 검사 및 첫 번째 음악 URL 반환"""
|
|
if not music_feed.choices:
|
|
raise ValueError("음악 선택지가 없습니다.")
|
|
|
|
first_choice = music_feed.choices[0]
|
|
if not first_choice.url or not first_choice.url.startswith(('http://', 'https://')):
|
|
raise ValueError(f"잘못된 URL 형식: {first_choice.url}")
|
|
|
|
return first_choice.url
|
|
|
|
def _download_image_from_url(self, url: str, filename: str) -> str:
|
|
"""URL에서 이미지를 다운로드해서 로컬 파일 경로 반환"""
|
|
try:
|
|
response = requests.get(url, timeout=30, stream=True)
|
|
response.raise_for_status()
|
|
|
|
# 임시 파일 경로 생성
|
|
file_extension = os.path.splitext(urlparse(url).path)[1] or '.jpg'
|
|
local_path = self.save_dir_path / f"{filename}{file_extension}"
|
|
|
|
with open(local_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
logger.info(f"이미지 다운로드 완료: {local_path}")
|
|
return str(local_path)
|
|
|
|
except Exception as e:
|
|
logger.error(f"이미지 다운로드 실패 - URL: {url}, Error: {e}")
|
|
raise Exception(f"이미지 다운로드 실패: {e}")
|
|
|
|
def _download_images_for_video(self, photo_urls: List[str], root_task_id: str) -> List[str]:
|
|
"""비디오 생성용 이미지들을 다운로드"""
|
|
local_paths = []
|
|
for i, url in enumerate(photo_urls):
|
|
filename = f"{root_task_id}_image_{i+1}"
|
|
local_path = self._download_image_from_url(url, filename)
|
|
local_paths.append(local_path)
|
|
|
|
logger.info(f"총 {len(local_paths)}개 이미지 다운로드 완료")
|
|
return local_paths
|
|
|
|
def _download_music_file(self, music_url: str, local_path: Path) -> None:
|
|
"""음악 파일 다운로드"""
|
|
try:
|
|
logger.info(f"음악 파일 다운로드 시작: {music_url}")
|
|
|
|
response = requests.get(music_url, timeout=300, stream=True)
|
|
response.raise_for_status()
|
|
|
|
# 파일 크기 확인 (선택사항)
|
|
content_length = response.headers.get('content-length')
|
|
if content_length:
|
|
file_size_mb = int(content_length) / (1024 * 1024)
|
|
logger.info(f"다운로드할 파일 크기: {file_size_mb:.2f}MB")
|
|
|
|
with open(local_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk: # 빈 청크 필터링
|
|
f.write(chunk)
|
|
|
|
logger.info(f"음악 파일 다운로드 완료: {local_path}")
|
|
|
|
except requests.RequestException as e:
|
|
raise Exception(f"음악 다운로드 실패: {e}")
|
|
except IOError as e:
|
|
raise Exception(f"파일 저장 실패: {e}")
|
|
|
|
def _generate_music_filename(self, root_task_id: str) -> Path:
|
|
"""음악 파일명 생성"""
|
|
now_str = datetime.now().strftime("%y%m%d_%H%M%S")
|
|
filename = f"{root_task_id}_music_{now_str}.mp3"
|
|
return self.save_dir_path / filename
|
|
|
|
def _find_crawling_file_by_task_id(self, root_task_id: str) -> str:
|
|
"""Task ID로 크롤링 파일 검색"""
|
|
try:
|
|
# IntegratedService의 저장 경로에서 파일 검색
|
|
storage_path = self.integrated_service.base_storage_path
|
|
|
|
# 파일명 패턴: task_{root_task_id}_날짜시간.json
|
|
pattern = f"*task_{root_task_id}*.json"
|
|
matching_files = list(storage_path.glob(pattern))
|
|
|
|
if matching_files:
|
|
# 가장 최근 파일 선택 (수정 시간 기준)
|
|
latest_file = max(matching_files, key=lambda f: f.stat().st_mtime)
|
|
logger.info(f"Task ID로 파일 찾기 성공: {latest_file}")
|
|
return str(latest_file)
|
|
|
|
# 대체 검색: 모든 JSON 파일에서 _crawling_info의 task_id 확인
|
|
logger.info(f"패턴 검색 실패, 전체 파일 스캔 시작...")
|
|
return self._scan_all_files_for_task_id(storage_path, root_task_id)
|
|
|
|
except Exception as e:
|
|
logger.error(f"파일 검색 실패: {e}")
|
|
return None
|
|
|
|
def _scan_all_files_for_task_id(self, storage_path: Path, root_task_id: str) -> str:
|
|
"""모든 JSON 파일을 스캔하여 Task ID 매칭"""
|
|
try:
|
|
for json_file in storage_path.glob("*.json"):
|
|
try:
|
|
with open(json_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# 파일 내용에서 task_id 확인
|
|
crawling_info = data.get("_crawling_info", {})
|
|
if root_task_id in str(crawling_info) or root_task_id in json_file.name:
|
|
logger.info(f"스캔으로 파일 찾기 성공: {json_file}")
|
|
return str(json_file)
|
|
|
|
except Exception as file_error:
|
|
logger.debug(f"파일 스캔 중 오류 (스킵): {json_file} - {file_error}")
|
|
continue
|
|
|
|
logger.warning(f"Task ID {root_task_id}에 해당하는 파일을 찾을 수 없습니다.")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"전체 파일 스캔 실패: {e}")
|
|
return None
|
|
|
|
def _url_parser(self, url: str) -> str:
|
|
"""네이버 지도 URL을 파싱하여 장소 id를 추출하고, metadata 추출용 url을 반환"""
|
|
if not url.startswith("https://map.naver.com/p/"):
|
|
raise ValueError("유효하지 않은 네이버 지도 URL입니다.")
|
|
|
|
try:
|
|
place_id = url.split('place/')[1].split('?')[0]
|
|
if not place_id:
|
|
raise ValueError("장소 id를 찾을 수 없습니다.")
|
|
|
|
print(f"장소 id 추출 완료: {place_id}")
|
|
new_url = f"https://map.naver.com/p/entry/place/{place_id}"
|
|
print(f"metadata 추출용 url 생성: {new_url}")
|
|
return new_url
|
|
|
|
except (IndexError, AttributeError) as e:
|
|
raise ValueError(f"URL 파싱 실패: {e}")
|
|
|
|
|
|
# 로그 추가
|
|
def _success_generate_video(self, root_task_id: str, order_id: UUID) -> None:
|
|
"""비디오 생성 성공 시 로그 추가"""
|
|
try:
|
|
with SessionLocal() as db:
|
|
order_repo = OrderRepository(db)
|
|
user_repo = UserRepository(db)
|
|
order = order_repo.get_by_id(order_id)
|
|
user = user_repo.get_by_id(order.user_id)
|
|
user_id = order.user_id
|
|
user_name = user.name
|
|
|
|
log_file_path = self.save_dir_path_video_log / "video_success_log.txt"
|
|
log_file_path.parent.mkdir(exist_ok=True, parents=True)
|
|
|
|
now_str = datetime.now().strftime("%y%m%d_%H%M%S")
|
|
logger.info(f"비디오 생성 성공 로그 저장 시작: {log_file_path}")
|
|
|
|
# 로그 파일 생성
|
|
with open(log_file_path, 'a', encoding='utf-8') as f:
|
|
log_entry = f"=== Video Success ===\n"
|
|
log_entry += f"User ID: {user_id}\n"
|
|
log_entry += f"User Name: {user_name}\n"
|
|
log_entry += f"Order ID: {order_id}\n"
|
|
log_entry += f"Created At: {now_str}\n\n"
|
|
f.write(log_entry)
|
|
|
|
logger.info(f"✅ 비디오 성공 로그 저장 완료 - Task ID: {root_task_id}, 파일: {log_file_path}")
|
|
|
|
except IOError as e:
|
|
logger.error(f"❌ 로그 파일 저장 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
except Exception as e:
|
|
logger.error(f"❌ 비디오 성공 로그 처리 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
|
|
def crawling_metadata(self, user_id: UUID, url: str, root_task_id: str) -> Dict[str, Any]:
|
|
"""메타데이터 크롤링 실행 및 item + order 저장"""
|
|
logger.info(f"📥 크롤링 시작 - URL: {url}, Task ID: {root_task_id}")
|
|
|
|
try:
|
|
# 1. 크롤링
|
|
metadata = self.integrated_service.get_metadata(
|
|
url=url,
|
|
save_to_file=True,
|
|
custom_filename=f"task_{root_task_id}"
|
|
)
|
|
|
|
# 2. 썸네일 이미지 처리 (fallback 구조)
|
|
images = metadata.get("images", {})
|
|
photo = images.get("photo_image", [])
|
|
review = images.get("review_blog_image", [])
|
|
thumbnail_url = (photo[0] if photo else None) or (review[0] if review else None)
|
|
|
|
# 3. 필수값 확인
|
|
name = metadata.get("name", "").strip()
|
|
address = metadata.get("address", "").strip()
|
|
item_url = self._url_parser(url)
|
|
|
|
# 태그 만들기
|
|
category = metadata.get("category", "").strip()
|
|
description = metadata.get("description", "").strip()
|
|
tags = self.chatgpt_service.extract_tag(name, address, category, description)
|
|
|
|
item_id = ""
|
|
order_id = ""
|
|
# 4. 세션 시작
|
|
with SessionLocal() as db:
|
|
try:
|
|
# 5. Item 생성 및 저장
|
|
item = Item(
|
|
user_id=user_id,
|
|
name=name,
|
|
address=address,
|
|
url=item_url,
|
|
phone_number=metadata.get("phone_number", "").strip(),
|
|
thumbnail_url=thumbnail_url,
|
|
hashtags=tags,
|
|
description=description
|
|
)
|
|
|
|
item_repo = ItemRepository(db)
|
|
item_repo.create(item)
|
|
db.flush() # item.id 확보용
|
|
|
|
# 6. Order 생성 및 저장
|
|
order = Order(
|
|
user_id=user_id,
|
|
item_id=item.id,
|
|
status="PENDING"
|
|
)
|
|
order_repo = OrderRepository(db)
|
|
order_repo.create(order)
|
|
db.flush() # order.id 확보용
|
|
|
|
item_id = item.id
|
|
order_id = order.id
|
|
|
|
# 7. 커밋
|
|
db.commit()
|
|
logger.info(f"✅ item/order 저장 완료 - task_id={root_task_id}")
|
|
|
|
except Exception as db_error:
|
|
db.rollback()
|
|
logger.error(f"❌ DB 저장 실패 - task_id={root_task_id}, error={db_error}")
|
|
raise db_error
|
|
|
|
|
|
## 이미지 필터링 모델에 사용될 json 파일 생성
|
|
photo_images = photo if isinstance(photo, list) else []
|
|
review_images = review if isinstance(review, list) else []
|
|
images_json = {
|
|
"images" : photo_images + review_images,
|
|
}
|
|
|
|
images_json_file_path = self.save_dir_path_images_json / f"{root_task_id}.json"
|
|
try:
|
|
with open(images_json_file_path, "w", encoding="utf-8") as f:
|
|
json.dump(images_json, f, ensure_ascii=False, indent=4)
|
|
logger.info(f"이미지 필터링 모델에 사용될 json 파일 생성 완료: {images_json_file_path}")
|
|
except Exception as e:
|
|
logger.error(f"이미지 필터링 모델에 사용될 json 파일 생성 실패: {e}")
|
|
|
|
# 8. 결과 반환
|
|
return {
|
|
"url": url,
|
|
"crawl_data": metadata,
|
|
"file_path": metadata.get("_saved_file_path"),
|
|
"task_id": root_task_id,
|
|
"item_id": item_id,
|
|
"order_id": order_id,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 크롤링 실패 - task_id={root_task_id}, error={e}")
|
|
raise e
|
|
|
|
def generate_lyrics(self, root_task_id: str) -> Dict[str, Any]:
|
|
"""가사 생성 실행 - JSON 파일에서 크롤링 데이터 조회"""
|
|
logger.info(f"가사 생성 시작 - Task ID: {root_task_id}")
|
|
|
|
try:
|
|
# Task ID로 크롤링 파일 검색
|
|
file_path = self._find_crawling_file_by_task_id(root_task_id)
|
|
|
|
if not file_path:
|
|
raise FileNotFoundError(f"크롤링 데이터 파일을 찾을 수 없습니다. Task ID: {root_task_id}")
|
|
|
|
# JSON 파일에서 메타데이터 로드
|
|
crawl_data = self.integrated_service.load_metadata_from_file(file_path)
|
|
|
|
if not crawl_data:
|
|
raise ValueError(f"크롤링 데이터를 로드할 수 없습니다. 파일: {file_path}")
|
|
|
|
# 크롤링된 데이터에서 필요한 정보 추출
|
|
name = crawl_data.get("name", "")
|
|
address = crawl_data.get("address", "")
|
|
category = crawl_data.get("category", "")
|
|
description = crawl_data.get("description", "")
|
|
|
|
logger.info(f"크롤링 데이터 로드 완료 - 장소: {name}, 주소: {address}")
|
|
|
|
# 가사 생성
|
|
ai_model = "kt-midm"
|
|
lyrics = self.chatgpt_service.generate_lyrics(name, address, category, description, ai_model)
|
|
|
|
result = {
|
|
"lyrics": lyrics,
|
|
"name": name,
|
|
"address": address,
|
|
"category": category,
|
|
"file_path": file_path,
|
|
"task_id": root_task_id
|
|
}
|
|
|
|
logger.info(f"가사 생성 모델: {ai_model}")
|
|
logger.info(f"가사 생성 결과: {lyrics}")
|
|
logger.info(f"가사 생성 완료 - Task ID: {root_task_id}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"가사 생성 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
raise e
|
|
|
|
def generate_music(self, data: Dict[str, Any], root_task_id: str, order_id: UUID) -> Dict[str, Any]:
|
|
"""음악 생성 실행"""
|
|
logger.info(f"🎼 음악 생성 시작 - Task ID: {root_task_id}")
|
|
|
|
try:
|
|
# 🔥 order_id가 None이면 조회
|
|
if order_id is None:
|
|
order_id = self._get_order_id_by_task_id(root_task_id)
|
|
if not order_id:
|
|
raise ValueError("order_id를 찾을 수 없습니다")
|
|
|
|
# 🔥 가사 데이터가 없으면 조회
|
|
name = data.get("name", "")
|
|
lyrics = data.get("lyrics", "")
|
|
|
|
if not name or not lyrics:
|
|
lyrics_data = self._get_lyrics_data_by_task_id(root_task_id)
|
|
name = lyrics_data.get("name", "")
|
|
lyrics = lyrics_data.get("lyrics", "")
|
|
|
|
if not name or not lyrics:
|
|
raise ValueError(f"필수 데이터 누락 - name: {bool(name)}, lyrics: {bool(lyrics)}")
|
|
|
|
# 2. 뮤레카 요청 준비
|
|
genre = "pop"
|
|
mood = "happy"
|
|
prompt = ", ".join(filter(None, [genre, mood]))
|
|
|
|
# 3. 뮤레카 API 호출
|
|
logger.info(f"🎹 뮤레카 API 호출 - 제목: {name}")
|
|
music_generate_result = self.mureka_service.generate_music(lyrics, prompt)
|
|
|
|
if music_generate_result.get("id") is None:
|
|
raise ValueError(f"뮤레카 생성 API 실패")
|
|
|
|
# 4. 뮤레카 노래 가져오기
|
|
music_result = self.mureka_service.get_music(music_generate_result.get("id"))
|
|
|
|
# 5. 진행상황 확인 ( 풀링 방식 )
|
|
max_wait_time = 60 * 20 # 3분
|
|
start_time = time.time()
|
|
|
|
while music_result.get("status") != "succeeded":
|
|
time.sleep(3)
|
|
|
|
# 타임아웃 체크
|
|
elapsed_time = time.time() - start_time
|
|
if elapsed_time > max_wait_time:
|
|
raise TimeoutError(f"음악 생성 타임아웃 ({max_wait_time}초 초과)")
|
|
|
|
music_result = self.mureka_service.get_music(music_generate_result.get("id"))
|
|
logger.info(f"뮤레카 진행상황 확인: {music_result.get('status')} (경과시간: {elapsed_time:.1f}초)")
|
|
|
|
# 실패 상태 확인 (running과 succeeded가 아닌 경우)
|
|
if music_result.get("status") not in ["running", "succeeded"]:
|
|
raise ValueError(f"뮤레카 진행상황 확인 실패: {music_result.get('status')}")
|
|
|
|
# 6. 파싱
|
|
try:
|
|
music_feed = MusicFeed(**music_result)
|
|
logger.debug(f"music_feed parsed: {music_feed}")
|
|
except Exception as parse_error:
|
|
logger.error(f"MusicFeed 파싱 실패: {parse_error}")
|
|
raise ValueError(f"뮤레카 응답 파싱 실패: {parse_error}")
|
|
|
|
# 7. 유효성 검사 및 다운로드 URL 확보
|
|
music_url = self._validate_music_feed(music_feed)
|
|
local_path = self._generate_music_filename(root_task_id)
|
|
self._download_music_file(music_url, local_path)
|
|
|
|
# 8. DB 저장 준비 - choices 개수 확인
|
|
music_save_data_list = []
|
|
|
|
for idx, choice in enumerate(music_feed.choices):
|
|
music_save_data = {
|
|
"order_id": order_id,
|
|
"index": idx,
|
|
"is_selected": idx == 0, # 첫 번째만 선택
|
|
"title": f"{music_feed.id}_{idx + 1}_songs",
|
|
"url": choice.url,
|
|
"duration": choice.duration,
|
|
"lyrics": lyrics
|
|
}
|
|
music_save_data_list.append(music_save_data)
|
|
|
|
# 9. 트랜잭션 처리
|
|
with SessionLocal() as db:
|
|
try:
|
|
music_repository = MusicRepository(db)
|
|
for music_data in music_save_data_list:
|
|
music_repository.create(music_data)
|
|
db.commit()
|
|
logger.info(f"🎵 DB 저장 완료 - Task ID: {root_task_id}, 저장된 음악 수: {len(music_save_data_list)}")
|
|
except Exception as db_error:
|
|
db.rollback()
|
|
logger.error(f"DB 저장 실패 - Task ID: {root_task_id}, Error: {db_error}")
|
|
raise db_error
|
|
|
|
# 10. 최종 결과 반환
|
|
result = {
|
|
"music_path": str(local_path),
|
|
"music_url": music_url,
|
|
"name": name,
|
|
"task_id": root_task_id,
|
|
"file_size": local_path.stat().st_size if local_path.exists() else 0,
|
|
"total_choices": len(music_feed.choices)
|
|
}
|
|
|
|
logger.info(f"✅ 음악 생성 완료 - Task ID: {root_task_id}, 파일: {local_path}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 음악 생성 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
raise e
|
|
|
|
|
|
def temp_image_filtering(self, root_task_id: str) -> List[str]:
|
|
'''이미지 필터링 모델 오류 or 한 개도 없을시 임시 처리'''
|
|
try:
|
|
# 1. Task ID로 크롤링 파일 검색
|
|
file_path = self._find_crawling_file_by_task_id(root_task_id)
|
|
|
|
if not file_path:
|
|
logger.warning(f"크롤링 데이터 파일을 찾을 수 없습니다. Task ID: {root_task_id}")
|
|
return []
|
|
|
|
# 2. JSON 파일에서 메타데이터 로드
|
|
crawl_data = self.integrated_service.load_metadata_from_file(file_path)
|
|
|
|
if not crawl_data:
|
|
logger.warning(f"크롤링 데이터를 로드할 수 없습니다. 파일: {file_path}")
|
|
return []
|
|
|
|
# 3. 이미지 데이터 추출 (상위 5개)
|
|
images = crawl_data.get("images", {})
|
|
photo_images = images.get("photo_image", [])[:5] # 상위 5개만
|
|
|
|
if not photo_images:
|
|
logger.warning(f"photo_image가 없습니다. Task ID: {root_task_id}")
|
|
return []
|
|
|
|
logger.info(f"임시 필터링 이미지 개수: {len(photo_images)}")
|
|
return photo_images
|
|
|
|
except Exception as e:
|
|
logger.error(f"임시 이미지 필터링 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
return []
|
|
|
|
def ai_model_image_filtering(self, root_task_id: str) -> List[str]:
|
|
'''AI 모델 이미지 필터링'''
|
|
try:
|
|
check_folder_path = Path("./ai/response") / f"{root_task_id}"
|
|
if not check_folder_path.exists():
|
|
logger.info(f"이미지 필터링 모델 결과값 폴더가 없습니다. Task ID: {root_task_id}")
|
|
return []
|
|
|
|
logger.info(f"필터링된 이미지 확인 시작")
|
|
|
|
# done.txt 파일 생성까지 대기
|
|
done_file_path = check_folder_path / "done.txt"
|
|
max_wait_time = 60 * 3 # 3분
|
|
check_interval = 3 # 3초마다 확인
|
|
waited_time = 0
|
|
|
|
logger.info(f"🔄 done.txt 파일 생성 대기 중... - {done_file_path}")
|
|
|
|
while not done_file_path.exists():
|
|
time.sleep(check_interval)
|
|
waited_time += check_interval
|
|
|
|
if waited_time >= max_wait_time:
|
|
logger.warning(f"⏰ {max_wait_time}초 타임아웃 - done.txt 파일이 생성되지 않았습니다. Task ID: {root_task_id}")
|
|
return []
|
|
|
|
logger.info(f"✅ done.txt 파일 생성 완료 - Task ID: {root_task_id}")
|
|
|
|
# 필터링된 이미지 파일들 수집
|
|
filter_model_images = []
|
|
for file_path in check_folder_path.glob(f"{root_task_id}_*.jpg"):
|
|
filter_model_images.append(file_path)
|
|
|
|
# 파일명 순서대로 정렬
|
|
filter_model_images.sort(key=lambda x: int(x.stem.split('_')[-1]))
|
|
|
|
# Backend URL로 변환 (실제 파일명 그대로 사용)
|
|
image_urls = []
|
|
settings = EnvSetting()
|
|
current_backend_url = settings.CURRENT_BACKEND_URL
|
|
|
|
for image_file in filter_model_images:
|
|
relative_path = f"response/{root_task_id}/{image_file.name}"
|
|
image_url = f"{current_backend_url}/ai/{relative_path}"
|
|
image_urls.append(image_url)
|
|
|
|
logger.info(f"🖼️ AI 모델 이미지 URL 변환 완료 - {len(image_urls)}개 파일")
|
|
return image_urls
|
|
|
|
except Exception as e:
|
|
logger.warning(f"AI 모델 이미지 필터링 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
return []
|
|
|
|
def image_filtering(self, data: Dict[str, Any], root_task_id: str, order_id: UUID) -> Dict[str, Any]:
|
|
"""이미지 필터링 실행"""
|
|
logger.info(f"📸 이미지 필터링 시작 - Task ID: {root_task_id}")
|
|
|
|
|
|
try:
|
|
image_filtering_result = ""
|
|
# 1. AI 모델 필터링 시도
|
|
photo_images = self.ai_model_image_filtering(root_task_id)
|
|
image_filtering_result = "ai"
|
|
|
|
# 2. AI 모델 실패 시 임시 처리
|
|
if not photo_images:
|
|
logger.info(f"AI 모델 필터링 실패, 임시 처리로 전환 - Task ID: {root_task_id}")
|
|
photo_images = self.temp_image_filtering(root_task_id)
|
|
image_filtering_result = "temp"
|
|
|
|
# 3. 최종적으로도 이미지가 없는 경우
|
|
if not photo_images:
|
|
logger.error(f"필터링할 이미지가 전혀 없습니다 - Task ID: {root_task_id}")
|
|
raise ValueError(f"필터링할 이미지가 없습니다 - Task ID: {root_task_id}")
|
|
|
|
# 4. Photo 데이터 생성
|
|
photo_data_list = []
|
|
for index, image_url in enumerate(photo_images, 1):
|
|
photo_data = {
|
|
"order_id": order_id,
|
|
"name": f"filtered_image_{index}",
|
|
"url": image_url,
|
|
"video_index": index,
|
|
"is_selected": True
|
|
}
|
|
photo_data_list.append(photo_data)
|
|
|
|
# 5. DB 저장 (트랜잭션 처리)
|
|
with SessionLocal() as db:
|
|
try:
|
|
photo_repository = PhotoRepository(db)
|
|
for photo_data in photo_data_list:
|
|
photo_repository.create(photo_data)
|
|
db.commit()
|
|
logger.info(f"📷 사진 DB 저장 완료 - Task ID: {root_task_id}, 저장된 이미지 수: {len(photo_data_list)}")
|
|
except Exception as db_error:
|
|
db.rollback()
|
|
logger.error(f"사진 DB 저장 실패 - Task ID: {root_task_id}, Error: {db_error}")
|
|
raise db_error
|
|
|
|
# 6. 결과 반환
|
|
result = {
|
|
"filtered_images": photo_images,
|
|
"image_count": len(photo_images),
|
|
"task_id": root_task_id,
|
|
"order_id": order_id,
|
|
"image_filtering":image_filtering_result
|
|
}
|
|
|
|
logger.info(f"✅ 이미지 필터링 완료 - Task ID: {root_task_id}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 이미지 필터링 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
raise e
|
|
|
|
def generate_video(self, data: Dict[str, Any], root_task_id: str, order_id: UUID, music_path: str, image_filtering_result: str) -> Dict[str, Any]:
|
|
"""비디오 생성 실행"""
|
|
logger.info(f"🎬 비디오 생성 시작 - Task ID: {root_task_id}")
|
|
|
|
try:
|
|
|
|
# 1. DB에서 정보 가져오기
|
|
with SessionLocal() as db:
|
|
item_repo = ItemRepository(db)
|
|
photo_repo = PhotoRepository(db)
|
|
|
|
# 업체 정보 조회
|
|
item = item_repo.get_by_order_id(order_id)
|
|
if not item:
|
|
raise ValueError(f"주문 ID {order_id}에 해당하는 업체를 찾을 수 없습니다.")
|
|
|
|
name = item.name
|
|
address = item.address
|
|
description = item.description or ""
|
|
tags = item.hashtags or []
|
|
|
|
# 사진들 조회
|
|
photos = photo_repo.get_by_order_id(order_id)
|
|
if not photos:
|
|
raise ValueError(f"주문 ID {order_id}에 해당하는 사진이 없습니다.")
|
|
|
|
# 사진 URL들 추출
|
|
photo_urls = [photo.url for photo in photos]
|
|
logger.info(f"비디오 생성용 이미지 {len(photo_urls)}개 조회 완료")
|
|
|
|
# 2. 이미지 처리 (AI vs Temp 구분)
|
|
if image_filtering_result == "ai":
|
|
# AI 결과: 내부 파일이므로 URL을 로컬 경로로 변환
|
|
local_image_paths = []
|
|
settings = EnvSetting()
|
|
current_backend_url = settings.CURRENT_BACKEND_URL
|
|
|
|
for photo_url in photo_urls:
|
|
# URL에서 로컬 경로 추출
|
|
# 예: https://domain/ai/response/task_123/task_123_1.jpg -> ./ai/response/task_123/task_123_1.jpg
|
|
if photo_url.startswith(current_backend_url):
|
|
# URL에서 /ai/ 이후 부분 추출
|
|
relative_path = photo_url.replace(f"{current_backend_url}/ai/", "")
|
|
local_path = Path("./ai") / relative_path
|
|
|
|
# 파일이 실제로 존재하는지 확인
|
|
if local_path.exists():
|
|
local_image_paths.append(str(local_path))
|
|
logger.debug(f"AI 이미지 로컬 경로 변환: {photo_url} -> {local_path}")
|
|
else:
|
|
logger.warning(f"AI 이미지 파일이 존재하지 않음: {local_path}")
|
|
else:
|
|
logger.warning(f"예상과 다른 URL 형식: {photo_url}")
|
|
|
|
if not local_image_paths:
|
|
raise ValueError("AI 모델 결과에서 유효한 로컬 이미지 경로를 찾을 수 없습니다.")
|
|
|
|
logger.info(f"AI 모델 로컬 이미지 사용: {len(local_image_paths)}개")
|
|
else:
|
|
# Temp 결과: 외부 URL이므로 다운로드 필요
|
|
local_image_paths = self._download_images_for_video(photo_urls, root_task_id)
|
|
|
|
# 3. 비디오 생성 (로컬 파일 경로 사용)
|
|
output_filename = f"{root_task_id}_generated_video.mp4"
|
|
video_path = self.moviepy_service.create_video_from_existing_images(
|
|
image_paths=local_image_paths,
|
|
music_path=music_path,
|
|
output_filename=output_filename
|
|
)
|
|
|
|
# 4. 비디오 정보 DB 저장
|
|
with SessionLocal() as db:
|
|
try:
|
|
video_repo = VideoRepository(db)
|
|
|
|
# Music ID 조회
|
|
music_repo = MusicRepository(db)
|
|
music = music_repo.get_by_order_id(order_id)
|
|
|
|
if not music:
|
|
raise ValueError(f"주문 ID {order_id}에 해당하는 음악을 찾을 수 없습니다.")
|
|
|
|
video_data = {
|
|
"music_id": music.id,
|
|
"order_id": order_id,
|
|
"title": f"{name} 홍보영상",
|
|
"description": f"{name} ({address}) - {description}",
|
|
"url": video_path,
|
|
"is_uploaded": False,
|
|
"download_count": 0,
|
|
"resolution": "1080p",
|
|
"status": "완료됨",
|
|
"thumbnail_url": photo_urls[0] if photo_urls else None
|
|
}
|
|
|
|
video_repo.create(video_data)
|
|
db.commit()
|
|
logger.info(f"🎥 비디오 DB 저장 완료 - Task ID: {root_task_id}")
|
|
|
|
except Exception as db_error:
|
|
db.rollback()
|
|
logger.error(f"비디오 DB 저장 실패 - Task ID: {root_task_id}, Error: {db_error}")
|
|
raise db_error
|
|
|
|
# 5. 결과 반환
|
|
result = {
|
|
"video_path": video_path,
|
|
"video_title": f"{name} 홍보영상",
|
|
"image_count": len(local_image_paths),
|
|
"task_id": root_task_id,
|
|
"tags": tags
|
|
}
|
|
|
|
logger.info(f"✅ 비디오 생성 완료 - Task ID: {root_task_id}")
|
|
|
|
# 로그 저장 로직 추가
|
|
self._success_generate_video(root_task_id, order_id)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 비디오 생성 실패 - Task ID: {root_task_id}, Error: {e}")
|
|
raise e |