finished upload images

insta
bluebamus 2025-12-26 13:27:24 +09:00
parent 03a80afc71
commit 6917a76d60
14 changed files with 1359 additions and 208 deletions

View File

@ -1,15 +1,27 @@
import json
from datetime import date
from pathlib import Path from pathlib import Path
from typing import Optional
from urllib.parse import unquote, urlparse
from fastapi import APIRouter import aiofiles
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.home.schemas.home import ( from app.database.session import get_session
from app.home.models import Image
from app.home.schemas.home_schema import (
CrawlingRequest, CrawlingRequest,
CrawlingResponse, CrawlingResponse,
ErrorResponse, ErrorResponse,
ImageUploadResponse,
ImageUploadResultItem,
ImageUrlItem,
MarketingAnalysis, MarketingAnalysis,
ProcessedInfo, ProcessedInfo,
) )
from app.utils.chatgpt_prompt import ChatgptService from app.utils.chatgpt_prompt import ChatgptService
from app.utils.common import generate_task_id
from app.utils.nvMapScraper import NvMapScraper from app.utils.nvMapScraper import NvMapScraper
MEDIA_ROOT = Path("media") MEDIA_ROOT = Path("media")
@ -123,8 +135,6 @@ async def crawling(request_body: CrawlingRequest):
def _extract_image_name(url: str, index: int) -> str: def _extract_image_name(url: str, index: int) -> str:
"""URL에서 이미지 이름 추출 또는 기본 이름 생성""" """URL에서 이미지 이름 추출 또는 기본 이름 생성"""
try: try:
from urllib.parse import unquote, urlparse
path = urlparse(url).path path = urlparse(url).path
filename = path.split("/")[-1] if path else "" filename = path.split("/")[-1] if path else ""
if filename: if filename:
@ -134,6 +144,278 @@ def _extract_image_name(url: str, index: int) -> str:
return f"image_{index + 1:03d}" return f"image_{index + 1:03d}"
ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".heif"}
def _is_valid_image_extension(filename: str | None) -> bool:
"""파일명의 확장자가 유효한 이미지 확장자인지 확인"""
if not filename:
return False
ext = Path(filename).suffix.lower()
return ext in ALLOWED_IMAGE_EXTENSIONS
def _get_file_extension(filename: str) -> str:
"""파일명에서 확장자 추출 (소문자)"""
return Path(filename).suffix.lower()
async def _save_upload_file(file: UploadFile, save_path: Path) -> None:
"""업로드 파일을 지정된 경로에 저장"""
save_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(save_path, "wb") as f:
content = await file.read()
await f.write(content)
IMAGES_JSON_EXAMPLE = """[
{"url": "https://naverbooking-phinf.pstatic.net/20240514_189/1715688030436xT14o_JPEG/1.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_48/1715688030574wTtQd_JPEG/2.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_92/17156880307484bvpH_JPEG/3.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_7/1715688031000y8Y5q_JPEG/4.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_259/17156880311809wCnY_JPEG/5.jpg", "name": "외관"}
]"""
@router.post(
"/image/{task_id}/upload",
summary="이미지 업로드",
description="""
task_id에 연결된 이미지를 업로드합니다.
## 요청 방식
multipart/form-data 형식으로 전송합니다.
## 요청 필드
- **images_json**: 외부 이미지 URL 목록 (JSON 문자열, 선택)
- **files**: 이미지 바이너리 파일 목록 (선택)
**주의**: images_json 또는 files 최소 하나는 반드시 전달해야 합니다.
## 지원 이미지 확장자
jpg, jpeg, png, webp, heic, heif
## images_json 예시
```json
[
{"url": "https://naverbooking-phinf.pstatic.net/20240514_189/1715688030436xT14o_JPEG/1.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_48/1715688030574wTtQd_JPEG/2.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_92/17156880307484bvpH_JPEG/3.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_7/1715688031000y8Y5q_JPEG/4.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_259/17156880311809wCnY_JPEG/5.jpg", "name": "외관"}
]
```
## 바이너리 파일 업로드 테스트 방법
### 1. Swagger UI에서 테스트
1. 엔드포인트의 "Try it out" 버튼 클릭
2. task_id 입력 (: test-task-001)
3. files 항목에서 "Add item" 클릭하여 로컬 이미지 파일 선택
4. (선택) images_json에 URL 목록 JSON 입력
5. "Execute" 버튼 클릭
### 2. cURL로 테스트
```bash
# 바이너리 파일만 업로드
curl -X POST "http://localhost:8000/image/test-task-001/upload" \\
-F "files=@/path/to/image1.jpg" \\
-F "files=@/path/to/image2.png"
# URL + 바이너리 파일 동시 업로드
curl -X POST "http://localhost:8000/image/test-task-001/upload" \\
-F 'images_json=[{"url":"https://example.com/image.jpg"}]' \\
-F "files=@/path/to/local_image.jpg"
```
### 3. Python requests로 테스트
```python
import requests
url = "http://localhost:8000/image/test-task-001/upload"
files = [
("files", ("image1.jpg", open("image1.jpg", "rb"), "image/jpeg")),
("files", ("image2.png", open("image2.png", "rb"), "image/png")),
]
data = {
"images_json": '[{"url": "https://example.com/image.jpg"}]'
}
response = requests.post(url, files=files, data=data)
print(response.json())
```
## 반환 정보
- **task_id**: 작업 고유 식별자
- **total_count**: 업로드된 이미지 개수
- **url_count**: URL로 등록된 이미지 개수
- **file_count**: 파일로 업로드된 이미지 개수
- **images**: 업로드된 이미지 목록
## 저장 경로
- 바이너리 파일: /media/{날짜}/{uuid7}/{파일명}
""",
response_model=ImageUploadResponse,
responses={
200: {"description": "이미지 업로드 성공"},
400: {"description": "이미지가 제공되지 않음", "model": ErrorResponse},
},
tags=["image"],
)
async def upload_images(
task_id: str,
images_json: Optional[str] = Form(
default=None,
description="외부 이미지 URL 목록 (JSON 문자열)",
example=IMAGES_JSON_EXAMPLE,
),
files: Optional[list[UploadFile]] = File(
default=None, description="이미지 바이너리 파일 목록"
),
session: AsyncSession = Depends(get_session),
) -> ImageUploadResponse:
"""이미지 업로드 (URL + 바이너리 파일)"""
print(f"[upload_images] START - task_id: {task_id}")
print(f"[upload_images] images_json: {images_json}")
print(f"[upload_images] files: {files}")
# 1. 진입 검증: images_json 또는 files 중 하나는 반드시 있어야 함
has_images_json = images_json is not None and images_json.strip() != ""
has_files = files is not None and len(files) > 0
print(f"[upload_images] has_images_json: {has_images_json}, has_files: {has_files}")
if has_files and files:
for idx, f in enumerate(files):
print(f"[upload_images] file[{idx}]: filename={f.filename}, size={f.size}, content_type={f.content_type}")
if not has_images_json and not has_files:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="images_json 또는 files 중 하나는 반드시 제공해야 합니다.",
)
# 2. images_json 파싱 (있는 경우만)
url_images: list[ImageUrlItem] = []
if has_images_json:
try:
parsed = json.loads(images_json)
if isinstance(parsed, list):
url_images = [ImageUrlItem(**item) for item in parsed if item]
except (json.JSONDecodeError, TypeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"images_json 파싱 오류: {str(e)}",
)
# 3. 유효한 파일만 필터링 (빈 파일, 유효한 이미지 확장자가 아닌 경우 제외)
valid_files: list[UploadFile] = []
skipped_files: list[str] = []
if has_files and files:
for f in files:
is_valid_ext = _is_valid_image_extension(f.filename)
is_not_empty = f.size is None or f.size > 0 # size가 None이면 아직 읽지 않은 것
is_real_file = f.filename and f.filename != "filename" # Swagger 빈 파일 체크
print(f"[upload_images] Checking file: {f.filename}, size={f.size}, is_valid_ext={is_valid_ext}, is_real_file={is_real_file}")
if f and is_real_file and is_valid_ext and is_not_empty:
valid_files.append(f)
else:
skipped_files.append(f.filename or "unknown")
print(f"[upload_images] valid_files count: {len(valid_files)}, skipped: {skipped_files}")
# 유효한 데이터가 하나도 없으면 에러
if not url_images and not valid_files:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"유효한 이미지가 없습니다. 지원 확장자: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}. 건너뛴 파일: {skipped_files}",
)
result_images: list[ImageUploadResultItem] = []
img_order = 0
# 1. URL 이미지 저장
for url_item in url_images:
img_name = url_item.name or _extract_image_name(url_item.url, img_order)
image = Image(
task_id=task_id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
)
session.add(image)
await session.flush() # ID 생성을 위해 flush
print(f"[upload_images] URL image saved - id: {image.id}, img_name: {img_name}")
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
source="url",
)
)
img_order += 1
# 2. 바이너리 파일 저장
if valid_files:
today = date.today().strftime("%Y-%m-%d")
# 한 번의 요청에서 업로드된 모든 이미지는 같은 폴더에 저장
batch_uuid = await generate_task_id()
upload_dir = MEDIA_ROOT / "image" / today / batch_uuid
upload_dir.mkdir(parents=True, exist_ok=True)
for file in valid_files:
# 파일명: 원본 파일명 사용 (중복 방지를 위해 순서 추가)
original_name = file.filename or "image"
ext = _get_file_extension(file.filename) # type: ignore[arg-type]
# 파일명에서 확장자 제거 후 순서 추가
name_without_ext = original_name.rsplit(".", 1)[0] if "." in original_name else original_name
filename = f"{name_without_ext}_{img_order:03d}{ext}"
save_path = upload_dir / filename
# 파일 저장
await _save_upload_file(file, save_path)
# 이미지 URL 생성
img_url = f"/media/image/{today}/{batch_uuid}/{filename}"
img_name = file.filename or filename
image = Image(
task_id=task_id,
img_name=img_name,
img_url=img_url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=img_url,
img_order=img_order,
source="file",
)
)
img_order += 1
print(f"[upload_images] Committing {len(result_images)} images to database...")
await session.commit()
print("[upload_images] Commit successful!")
return ImageUploadResponse(
task_id=task_id,
total_count=len(result_images),
url_count=len(url_images),
file_count=len(valid_files),
images=result_images,
)
# @router.post( # @router.post(
# "/generate", # "/generate",
# summary="기본 영상 생성 요청", # summary="기본 영상 생성 요청",

View File

@ -159,3 +159,58 @@ class ErrorResponse(BaseModel):
error_code: str = Field(..., description="에러 코드") error_code: str = Field(..., description="에러 코드")
message: str = Field(..., description="에러 메시지") message: str = Field(..., description="에러 메시지")
detail: Optional[str] = Field(None, description="상세 에러 정보") detail: Optional[str] = Field(None, description="상세 에러 정보")
# =============================================================================
# Image Upload Schemas
# =============================================================================
class ImageUrlItem(BaseModel):
"""이미지 URL 아이템 스키마"""
url: str = Field(..., description="외부 이미지 URL")
name: Optional[str] = Field(None, description="이미지명 (없으면 URL에서 추출)")
class ImageUploadRequest(BaseModel):
"""이미지 업로드 요청 스키마 (JSON body 부분)
URL 이미지 목록을 전달합니다.
바이너리 파일은 multipart/form-data로 별도 전달됩니다.
"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"images": [
{"url": "https://example.com/images/image_001.jpg"},
{"url": "https://example.com/images/image_002.jpg", "name": "외관"},
]
}
}
)
images: Optional[list[ImageUrlItem]] = Field(
None, description="외부 이미지 URL 목록"
)
class ImageUploadResultItem(BaseModel):
"""업로드된 이미지 결과 아이템"""
id: int = Field(..., description="이미지 ID")
img_name: str = Field(..., description="이미지명")
img_url: str = Field(..., description="이미지 URL")
img_order: int = Field(..., description="이미지 순서")
source: Literal["url", "file"] = Field(..., description="이미지 소스 (url 또는 file)")
class ImageUploadResponse(BaseModel):
"""이미지 업로드 응답 스키마"""
task_id: str = Field(..., description="작업 고유 식별자")
total_count: int = Field(..., description="총 업로드된 이미지 개수")
url_count: int = Field(..., description="URL로 등록된 이미지 개수")
file_count: int = Field(..., description="파일로 업로드된 이미지 개수")
images: list[ImageUploadResultItem] = Field(..., description="업로드된 이미지 목록")

View File

@ -3,7 +3,7 @@ import asyncio
from sqlalchemy import select from sqlalchemy import select
from app.database.session import get_worker_session from app.database.session import get_worker_session
from app.home.schemas.home import GenerateRequest from app.home.schemas.home_schema import GenerateRequest
from app.lyric.models import Lyric from app.lyric.models import Lyric
from app.utils.chatgpt_prompt import ChatgptService from app.utils.chatgpt_prompt import ChatgptService

View File

@ -31,8 +31,8 @@ async def download_and_save_song(
""" """
print(f"[download_and_save_song] START - task_id: {task_id}, store_name: {store_name}") print(f"[download_and_save_song] START - task_id: {task_id}, store_name: {store_name}")
try: try:
# 저장 경로 생성: media/{날짜}/{uuid7}/{store_name}.mp3 # 저장 경로 생성: media/song/{날짜}/{uuid7}/{store_name}.mp3
today = date.today().isoformat() today = date.today().strftime("%Y-%m-%d")
unique_id = await generate_task_id() unique_id = await generate_task_id()
# 파일명에 사용할 수 없는 문자 제거 # 파일명에 사용할 수 없는 문자 제거
safe_store_name = "".join( safe_store_name = "".join(
@ -42,7 +42,7 @@ async def download_and_save_song(
file_name = f"{safe_store_name}.mp3" file_name = f"{safe_store_name}.mp3"
# 절대 경로 생성 # 절대 경로 생성
media_dir = Path("media") / today / unique_id media_dir = Path("media") / "song" / today / unique_id
media_dir.mkdir(parents=True, exist_ok=True) media_dir.mkdir(parents=True, exist_ok=True)
file_path = media_dir / file_name file_path = media_dir / file_name
print(f"[download_and_save_song] Directory created - path: {file_path}") print(f"[download_and_save_song] Directory created - path: {file_path}")
@ -58,7 +58,7 @@ async def download_and_save_song(
print(f"[download_and_save_song] File saved - task_id: {task_id}, path: {file_path}") print(f"[download_and_save_song] File saved - task_id: {task_id}, path: {file_path}")
# 프론트엔드에서 접근 가능한 URL 생성 # 프론트엔드에서 접근 가능한 URL 생성
relative_path = f"/media/{today}/{unique_id}/{file_name}" relative_path = f"/media/song/{today}/{unique_id}/{file_name}"
base_url = f"http://{prj_settings.PROJECT_DOMAIN}" base_url = f"http://{prj_settings.PROJECT_DOMAIN}"
file_url = f"{base_url}{relative_path}" file_url = f"{base_url}{relative_path}"
print(f"[download_and_save_song] URL generated - task_id: {task_id}, url: {file_url}") print(f"[download_and_save_song] URL generated - task_id: {task_id}, url: {file_url}")

View File

@ -209,6 +209,29 @@ class CreatomateService:
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def get_render_status(self, render_id: str) -> dict:
"""렌더링 작업의 상태를 조회합니다.
Args:
render_id: Creatomate 렌더 ID
Returns:
렌더링 상태 정보
Note:
상태 :
- planned: 예약됨
- waiting: 대기
- transcribing: 트랜스크립션
- rendering: 렌더링
- succeeded: 성공
- failed: 실패
"""
url = f"{self.BASE_URL}/v1/renders/{render_id}"
response = httpx.get(url, headers=self.headers, timeout=30.0)
response.raise_for_status()
return response.json()
def calc_scene_duration(self, template: dict) -> float: def calc_scene_duration(self, template: dict) -> float:
"""템플릿의 전체 장면 duration을 계산합니다.""" """템플릿의 전체 장면 duration을 계산합니다."""
total_template_duration = 0.0 total_template_duration = 0.0

View File

@ -1,39 +1,92 @@
import requests """
Azure Blob Storage 업로드 유틸리티
Azure Blob Storage에 파일을 업로드하는 비동기 함수들을 제공합니다.
"""
from pathlib import Path from pathlib import Path
import aiofiles
import httpx
SAS_TOKEN = "sp=racwdl&st=2025-12-01T00:13:29Z&se=2026-07-31T08:28:29Z&spr=https&sv=2024-11-04&sr=c&sig=7fE2ozVBPu3Gq43%2FZDxEYdEcPLDXyNVfTf16IBasmVQ%3D" SAS_TOKEN = "sp=racwdl&st=2025-12-01T00:13:29Z&se=2026-07-31T08:28:29Z&spr=https&sv=2024-11-04&sr=c&sig=7fE2ozVBPu3Gq43%2FZDxEYdEcPLDXyNVfTf16IBasmVQ%3D"
def upload_music_to_azure_blob(file_path = "스테이 머뭄_1.mp3", url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp3"):
access_url = f"{url}?{SAS_TOKEN}"
headers = {
"Content-Type": "audio/mpeg",
"x-ms-blob-type": "BlockBlob"
}
with open(file_path, "rb") as file:
response = requests.put(access_url, data=file, headers=headers)
if response.status_code in [200, 201]:
print(f"Success Status Code: {response.status_code}")
else:
print(f"Failed Status Code: {response.status_code}")
print(f"Response: {response.text}")
def upload_video_to_azure_blob(file_path = "스테이 머뭄.mp4", url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp4"): async def upload_music_to_azure_blob(
file_path: str = "스테이 머뭄_1.mp3",
url: str = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp3",
) -> bool:
"""음악 파일을 Azure Blob Storage에 업로드합니다.
Args:
file_path: 업로드할 파일 경로
url: Azure Blob Storage URL
Returns:
bool: 업로드 성공 여부
"""
access_url = f"{url}?{SAS_TOKEN}" access_url = f"{url}?{SAS_TOKEN}"
headers = { headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"}
"Content-Type": "video/mp4",
"x-ms-blob-type": "BlockBlob" async with aiofiles.open(file_path, "rb") as file:
} file_content = await file.read()
with open(file_path, "rb") as file:
response = requests.put(access_url, data=file, headers=headers) async with httpx.AsyncClient() as client:
response = await client.put(access_url, content=file_content, headers=headers, timeout=120.0)
if response.status_code in [200, 201]: if response.status_code in [200, 201]:
print(f"Success Status Code: {response.status_code}") print(f"[upload_music_to_azure_blob] Success - Status Code: {response.status_code}")
return True
else: else:
print(f"Failed Status Code: {response.status_code}") print(f"[upload_music_to_azure_blob] Failed - Status Code: {response.status_code}")
print(f"Response: {response.text}") print(f"[upload_music_to_azure_blob] Response: {response.text}")
return False
def upload_image_to_azure_blob(file_path = "스테이 머뭄.png", url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.png"): async def upload_video_to_azure_blob(
file_path: str = "스테이 머뭄.mp4",
url: str = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp4",
) -> bool:
"""영상 파일을 Azure Blob Storage에 업로드합니다.
Args:
file_path: 업로드할 파일 경로
url: Azure Blob Storage URL
Returns:
bool: 업로드 성공 여부
"""
access_url = f"{url}?{SAS_TOKEN}"
headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"}
async with aiofiles.open(file_path, "rb") as file:
file_content = await file.read()
async with httpx.AsyncClient() as client:
response = await client.put(access_url, content=file_content, headers=headers, timeout=180.0)
if response.status_code in [200, 201]:
print(f"[upload_video_to_azure_blob] Success - Status Code: {response.status_code}")
return True
else:
print(f"[upload_video_to_azure_blob] Failed - Status Code: {response.status_code}")
print(f"[upload_video_to_azure_blob] Response: {response.text}")
return False
async def upload_image_to_azure_blob(
file_path: str = "스테이 머뭄.png",
url: str = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.png",
) -> bool:
"""이미지 파일을 Azure Blob Storage에 업로드합니다.
Args:
file_path: 업로드할 파일 경로
url: Azure Blob Storage URL
Returns:
bool: 업로드 성공 여부
"""
access_url = f"{url}?{SAS_TOKEN}" access_url = f"{url}?{SAS_TOKEN}"
extension = Path(file_path).suffix.lower() extension = Path(file_path).suffix.lower()
content_types = { content_types = {
@ -42,23 +95,27 @@ def upload_image_to_azure_blob(file_path = "스테이 머뭄.png", url = "https:
".png": "image/png", ".png": "image/png",
".gif": "image/gif", ".gif": "image/gif",
".webp": "image/webp", ".webp": "image/webp",
".bmp": "image/bmp" ".bmp": "image/bmp",
} }
content_type = content_types.get(extension, "image/jpeg") content_type = content_types.get(extension, "image/jpeg")
headers = { headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
"Content-Type": content_type,
"x-ms-blob-type": "BlockBlob" async with aiofiles.open(file_path, "rb") as file:
} file_content = await file.read()
with open(file_path, "rb") as file:
response = requests.put(access_url, data=file, headers=headers) async with httpx.AsyncClient() as client:
response = await client.put(access_url, content=file_content, headers=headers, timeout=60.0)
if response.status_code in [200, 201]: if response.status_code in [200, 201]:
print(f"Success Status Code: {response.status_code}") print(f"[upload_image_to_azure_blob] Success - Status Code: {response.status_code}")
return True
else: else:
print(f"Failed Status Code: {response.status_code}") print(f"[upload_image_to_azure_blob] Failed - Status Code: {response.status_code}")
print(f"Response: {response.text}") print(f"[upload_image_to_azure_blob] Response: {response.text}")
return False
upload_video_to_azure_blob() # 사용 예시:
# import asyncio
upload_image_to_azure_blob() # asyncio.run(upload_video_to_azure_blob())
# asyncio.run(upload_image_to_azure_blob())

View File

@ -1,108 +1,568 @@
""" """
Video API Endpoints (Test) Video API Router
프론트엔드 개발을 위한 테스트용 엔드포인트입니다. 모듈은 Creatomate API를 통한 영상 생성 관련 API 엔드포인트를 정의합니다.
엔드포인트 목록:
- POST /video/generate/{task_id}: 영상 생성 요청 (task_id로 Project/Lyric/Song 연결)
- GET /video/status/{creatomate_render_id}: Creatomate API 영상 생성 상태 조회
- GET /video/download/{task_id}: 영상 다운로드 상태 조회 (DB polling)
- GET /videos/: 완료된 영상 목록 조회 (페이지네이션)
사용 예시:
from app.video.api.routers.v1.video import router
app.include_router(router, prefix="/api/v1")
""" """
from datetime import datetime, timedelta from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from typing import Optional from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session
from app.dependencies.pagination import (
PaginationParams,
get_pagination_params,
)
from app.home.models import Project
from app.lyric.models import Lyric
from app.song.models import Song
from app.video.models import Video
from app.video.schemas.video_schema import (
DownloadVideoResponse,
GenerateVideoRequest,
GenerateVideoResponse,
PollingVideoResponse,
VideoListItem,
VideoRenderData,
)
from app.video.worker.video_task import download_and_save_video
from app.utils.creatomate import CreatomateService
from app.utils.pagination import PaginatedResponse
from fastapi import APIRouter
from pydantic import BaseModel, Field
router = APIRouter(prefix="/video", tags=["video"]) router = APIRouter(prefix="/video", tags=["video"])
# =============================================================================
# Schemas
# =============================================================================
class VideoGenerateResponse(BaseModel):
"""영상 생성 응답 스키마"""
success: bool = Field(..., description="성공 여부")
task_id: str = Field(..., description="작업 고유 식별자")
message: str = Field(..., description="응답 메시지")
error_message: Optional[str] = Field(None, description="에러 메시지")
class VideoStatusResponse(BaseModel):
"""영상 상태 조회 응답 스키마"""
task_id: str = Field(..., description="작업 고유 식별자")
status: str = Field(..., description="처리 상태 (processing, completed, failed)")
video_url: Optional[str] = Field(None, description="영상 URL")
class VideoItem(BaseModel):
"""영상 아이템 스키마"""
task_id: str = Field(..., description="작업 고유 식별자")
video_url: str = Field(..., description="영상 URL")
created_at: datetime = Field(..., description="생성 일시")
class VideoListResponse(BaseModel):
"""영상 목록 응답 스키마"""
videos: list[VideoItem] = Field(..., description="영상 목록")
total: int = Field(..., description="전체 개수")
# =============================================================================
# Test Endpoints
# =============================================================================
TEST_VIDEO_URL = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/1a584e86-6a74-417d-8cff-270ef60c8646.mp4"
@router.post( @router.post(
"/generate/{task_id}", "/generate/{task_id}",
summary="영상 생성 요청 (테스트)", summary="영상 생성 요청",
response_model=VideoGenerateResponse, description="""
Creatomate API를 통해 영상 생성을 요청합니다.
## 경로 파라미터
- **task_id**: Project/Lyric/Song의 task_id (필수) - 연관된 프로젝트, 가사, 노래를 조회하는 사용
## 요청 필드
- **template_id**: Creatomate 템플릿 ID (필수)
- **image_urls**: 영상에 사용할 이미지 URL 목록 (필수)
- **lyrics**: 영상에 표시할 가사 (필수)
- **music_url**: 배경 음악 URL (필수)
## 반환 정보
- **success**: 요청 성공 여부
- **task_id**: 내부 작업 ID (Project task_id)
- **creatomate_render_id**: Creatomate 렌더 ID (상태 조회에 사용)
- **message**: 응답 메시지
## 사용 예시
```
POST /video/generate/019123ab-cdef-7890-abcd-ef1234567890
{
"template_id": "abc123...",
"image_urls": ["https://...", "https://..."],
"lyrics": "가사 내용...",
"music_url": "https://..."
}
```
## 참고
- creatomate_render_id를 사용하여 /status/{creatomate_render_id} 엔드포인트에서 생성 상태를 확인할 있습니다.
- Video 테이블에 데이터가 저장되며, project_id, lyric_id, song_id가 자동으로 연결됩니다.
""",
response_model=GenerateVideoResponse,
responses={
200: {"description": "영상 생성 요청 성공"},
404: {"description": "Project, Lyric 또는 Song을 찾을 수 없음"},
500: {"description": "영상 생성 요청 실패"},
},
) )
async def generate_video(task_id: str) -> VideoGenerateResponse: async def generate_video(
"""영상 생성 요청 테스트 엔드포인트""" task_id: str,
return VideoGenerateResponse( request_body: GenerateVideoRequest,
session: AsyncSession = Depends(get_session),
) -> GenerateVideoResponse:
"""Creatomate API를 통해 영상을 생성합니다.
1. task_id로 Project, Lyric, Song 조회
2. Video 테이블에 초기 데이터 저장 (status: processing)
3. Creatomate API 호출
4. creatomate_render_id 업데이트 응답 반환
"""
print(f"[generate_video] START - task_id: {task_id}, template_id: {request_body.template_id}")
try:
# 1. task_id로 Project 조회
project_result = await session.execute(
select(Project).where(Project.task_id == task_id)
)
project = project_result.scalar_one_or_none()
if not project:
print(f"[generate_video] Project NOT FOUND - task_id: {task_id}")
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Project를 찾을 수 없습니다.",
)
print(f"[generate_video] Project found - project_id: {project.id}, task_id: {task_id}")
# 2. task_id로 Lyric 조회
lyric_result = await session.execute(
select(Lyric).where(Lyric.task_id == task_id)
)
lyric = lyric_result.scalar_one_or_none()
if not lyric:
print(f"[generate_video] Lyric NOT FOUND - task_id: {task_id}")
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Lyric을 찾을 수 없습니다.",
)
print(f"[generate_video] Lyric found - lyric_id: {lyric.id}, task_id: {task_id}")
# 3. task_id로 Song 조회 (가장 최근 것)
song_result = await session.execute(
select(Song)
.where(Song.task_id == task_id)
.order_by(Song.created_at.desc())
.limit(1)
)
song = song_result.scalar_one_or_none()
if not song:
print(f"[generate_video] Song NOT FOUND - task_id: {task_id}")
raise HTTPException(
status_code=404,
detail=f"task_id '{task_id}'에 해당하는 Song을 찾을 수 없습니다.",
)
print(f"[generate_video] Song found - song_id: {song.id}, task_id: {task_id}")
# 4. Video 테이블에 초기 데이터 저장
video = Video(
project_id=project.id,
lyric_id=lyric.id,
song_id=song.id,
task_id=task_id,
creatomate_render_id=None,
status="processing",
)
session.add(video)
await session.flush() # ID 생성을 위해 flush
print(f"[generate_video] Video saved (processing) - task_id: {task_id}")
# 5. Creatomate API 호출
print(f"[generate_video] Creatomate API generation started - task_id: {task_id}")
creatomate_service = CreatomateService()
# 템플릿에 리소스 매핑
modifications = creatomate_service.template_connect_resource_blackbox(
template_id=request_body.template_id,
image_url_list=request_body.image_urls,
lyric=request_body.lyrics,
music_url=request_body.music_url,
)
print(f"[generate_video] Modifications created - task_id: {task_id}")
# 렌더링 요청
render_response = creatomate_service.make_creatomate_call(
template_id=request_body.template_id,
modifications=modifications,
)
print(f"[generate_video] Creatomate API response - task_id: {task_id}, response: {render_response}")
# 렌더 ID 추출 (응답이 리스트인 경우 첫 번째 항목 사용)
if isinstance(render_response, list) and len(render_response) > 0:
creatomate_render_id = render_response[0].get("id")
elif isinstance(render_response, dict):
creatomate_render_id = render_response.get("id")
else:
creatomate_render_id = None
# 6. creatomate_render_id 업데이트
video.creatomate_render_id = creatomate_render_id
await session.commit()
print(f"[generate_video] SUCCESS - task_id: {task_id}, creatomate_render_id: {creatomate_render_id}")
return GenerateVideoResponse(
success=True, success=True,
task_id=task_id, task_id=task_id,
message="영상 생성 요청 성공", creatomate_render_id=creatomate_render_id,
message="영상 생성 요청이 접수되었습니다. creatomate_render_id로 상태를 조회하세요.",
error_message=None, error_message=None,
) )
except HTTPException:
raise
except Exception as e:
print(f"[generate_video] EXCEPTION - task_id: {task_id}, error: {e}")
await session.rollback()
return GenerateVideoResponse(
success=False,
task_id=task_id,
creatomate_render_id=None,
message="영상 생성 요청에 실패했습니다.",
error_message=str(e),
)
@router.get( @router.get(
"/status/{task_id}", "/status/{creatomate_render_id}",
summary="영상 상태 조회 (테스트)", summary="영상 생성 상태 조회",
response_model=VideoStatusResponse, description="""
Creatomate API를 통해 영상 생성 작업의 상태를 조회합니다.
succeeded 상태인 경우 백그라운드에서 MP4 파일을 다운로드하고 Video 테이블을 업데이트합니다.
## 경로 파라미터
- **creatomate_render_id**: 영상 생성 반환된 Creatomate 렌더 ID (필수)
## 반환 정보
- **success**: 조회 성공 여부
- **status**: 작업 상태 (planned, waiting, rendering, succeeded, failed)
- **message**: 상태 메시지
- **render_data**: 렌더링 결과 데이터 (완료 )
- **raw_response**: Creatomate API 원본 응답
## 사용 예시
```
GET /video/status/render-id-123...
```
## 상태 값
- **planned**: 예약됨
- **waiting**: 대기
- **transcribing**: 트랜스크립션
- **rendering**: 렌더링
- **succeeded**: 성공
- **failed**: 실패
## 참고
- succeeded 백그라운드에서 MP4 다운로드 DB 업데이트 진행
""",
response_model=PollingVideoResponse,
responses={
200: {"description": "상태 조회 성공"},
500: {"description": "상태 조회 실패"},
},
) )
async def get_video_status(task_id: str) -> VideoStatusResponse: async def get_video_status(
"""영상 상태 조회 테스트 엔드포인트""" creatomate_render_id: str,
return VideoStatusResponse( background_tasks: BackgroundTasks,
session: AsyncSession = Depends(get_session),
) -> PollingVideoResponse:
"""creatomate_render_id로 영상 생성 작업의 상태를 조회합니다.
succeeded 상태인 경우 백그라운드에서 MP4 파일을 다운로드하고
Video 테이블의 status를 completed로, result_movie_url을 업데이트합니다.
"""
print(f"[get_video_status] START - creatomate_render_id: {creatomate_render_id}")
try:
creatomate_service = CreatomateService()
result = creatomate_service.get_render_status(creatomate_render_id)
print(f"[get_video_status] Creatomate API response - creatomate_render_id: {creatomate_render_id}, status: {result.get('status')}")
status = result.get("status", "unknown")
video_url = result.get("url")
# 상태별 메시지 설정
status_messages = {
"planned": "영상 생성이 예약되었습니다.",
"waiting": "영상 생성 대기 중입니다.",
"transcribing": "트랜스크립션 진행 중입니다.",
"rendering": "영상을 렌더링하고 있습니다.",
"succeeded": "영상 생성이 완료되었습니다.",
"failed": "영상 생성에 실패했습니다.",
}
message = status_messages.get(status, f"상태: {status}")
# succeeded 상태인 경우 백그라운드 태스크 실행
if status == "succeeded" and video_url:
# creatomate_render_id로 Video 조회하여 task_id 가져오기
video_result = await session.execute(
select(Video)
.where(Video.creatomate_render_id == creatomate_render_id)
.order_by(Video.created_at.desc())
.limit(1)
)
video = video_result.scalar_one_or_none()
if video:
# task_id로 Project 조회하여 store_name 가져오기
project_result = await session.execute(
select(Project).where(Project.id == video.project_id)
)
project = project_result.scalar_one_or_none()
store_name = project.store_name if project else "video"
# 백그라운드 태스크로 MP4 다운로드 및 DB 업데이트
print(f"[get_video_status] Background task args - task_id: {video.task_id}, video_url: {video_url}, store_name: {store_name}")
background_tasks.add_task(
download_and_save_video,
task_id=video.task_id,
video_url=video_url,
store_name=store_name,
)
render_data = VideoRenderData(
id=result.get("id"),
status=status,
url=video_url,
snapshot_url=result.get("snapshot_url"),
)
print(f"[get_video_status] SUCCESS - creatomate_render_id: {creatomate_render_id}")
return PollingVideoResponse(
success=True,
status=status,
message=message,
render_data=render_data,
raw_response=result,
error_message=None,
)
except Exception as e:
import traceback
print(f"[get_video_status] EXCEPTION - creatomate_render_id: {creatomate_render_id}, error: {e}")
return PollingVideoResponse(
success=False,
status="error",
message="상태 조회에 실패했습니다.",
render_data=None,
raw_response=None,
error_message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}",
)
@router.get(
"/download/{task_id}",
summary="영상 다운로드 상태 조회",
description="""
task_id를 기반으로 Video 테이블의 상태를 polling하고,
completed인 경우 Project 정보와 영상 URL을 반환합니다.
## 경로 파라미터
- **task_id**: 프로젝트 task_id (필수)
## 반환 정보
- **success**: 조회 성공 여부
- **status**: 처리 상태 (processing, completed, failed)
- **message**: 응답 메시지
- **store_name**: 업체명
- **region**: 지역명
- **task_id**: 작업 고유 식별자
- **result_movie_url**: 영상 결과 URL (completed )
- **created_at**: 생성 일시
## 사용 예시
```
GET /video/download/019123ab-cdef-7890-abcd-ef1234567890
```
## 참고
- processing 상태인 경우 result_movie_url은 null입니다.
- completed 상태인 경우 Project 정보와 함께 result_movie_url을 반환합니다.
""",
response_model=DownloadVideoResponse,
responses={
200: {"description": "조회 성공"},
404: {"description": "Video를 찾을 수 없음"},
500: {"description": "조회 실패"},
},
)
async def download_video(
task_id: str,
session: AsyncSession = Depends(get_session),
) -> DownloadVideoResponse:
"""task_id로 Video 상태를 polling하고 completed 시 Project 정보와 영상 URL을 반환합니다."""
print(f"[download_video] START - task_id: {task_id}")
try:
# task_id로 Video 조회 (여러 개 있을 경우 가장 최근 것 선택)
video_result = await session.execute(
select(Video)
.where(Video.task_id == task_id)
.order_by(Video.created_at.desc())
.limit(1)
)
video = video_result.scalar_one_or_none()
if not video:
print(f"[download_video] Video NOT FOUND - task_id: {task_id}")
return DownloadVideoResponse(
success=False,
status="not_found",
message=f"task_id '{task_id}'에 해당하는 Video를 찾을 수 없습니다.",
error_message="Video not found",
)
print(f"[download_video] Video found - task_id: {task_id}, status: {video.status}")
# processing 상태인 경우
if video.status == "processing":
print(f"[download_video] PROCESSING - task_id: {task_id}")
return DownloadVideoResponse(
success=True,
status="processing",
message="영상 생성이 진행 중입니다.",
task_id=task_id, task_id=task_id,
)
# failed 상태인 경우
if video.status == "failed":
print(f"[download_video] FAILED - task_id: {task_id}")
return DownloadVideoResponse(
success=False,
status="failed",
message="영상 생성에 실패했습니다.",
task_id=task_id,
error_message="Video generation failed",
)
# completed 상태인 경우 - Project 정보 조회
project_result = await session.execute(
select(Project).where(Project.id == video.project_id)
)
project = project_result.scalar_one_or_none()
print(f"[download_video] COMPLETED - task_id: {task_id}, result_movie_url: {video.result_movie_url}")
return DownloadVideoResponse(
success=True,
status="completed", status="completed",
video_url=TEST_VIDEO_URL, message="영상 다운로드가 완료되었습니다.",
store_name=project.store_name if project else None,
region=project.region if project else None,
task_id=task_id,
result_movie_url=video.result_movie_url,
created_at=video.created_at,
)
except Exception as e:
print(f"[download_video] EXCEPTION - task_id: {task_id}, error: {e}")
return DownloadVideoResponse(
success=False,
status="error",
message="영상 다운로드 조회에 실패했습니다.",
error_message=str(e),
) )
@router.get( @router.get(
"s/", "s/",
summary="영상 목록 조회 (테스트)", summary="생성된 영상 목록 조회",
response_model=VideoListResponse, description="""
) 완료된 영상 목록을 페이지네이션하여 조회합니다.
async def get_videos() -> VideoListResponse:
"""영상 목록 조회 테스트 엔드포인트"""
now = datetime.now()
videos = [
VideoItem(
task_id=f"test-task-id-{i:03d}",
video_url=TEST_VIDEO_URL,
created_at=now - timedelta(hours=i),
)
for i in range(10)
]
return VideoListResponse( ## 쿼리 파라미터
videos=videos, - **page**: 페이지 번호 (1부터 시작, 기본값: 1)
total=len(videos), - **page_size**: 페이지당 데이터 (기본값: 10, 최대: 100)
## 반환 정보
- **items**: 영상 목록 (store_name, region, task_id, result_movie_url, created_at)
- **total**: 전체 데이터
- **page**: 현재 페이지
- **page_size**: 페이지당 데이터
- **total_pages**: 전체 페이지
- **has_next**: 다음 페이지 존재 여부
- **has_prev**: 이전 페이지 존재 여부
## 사용 예시
```
GET /videos/?page=1&page_size=10
```
## 참고
- status가 'completed' 영상만 반환됩니다.
- 동일한 task_id가 있는 경우 가장 최근에 생성된 1개만 반환됩니다.
- created_at 기준 내림차순 정렬됩니다.
""",
response_model=PaginatedResponse[VideoListItem],
responses={
200: {"description": "영상 목록 조회 성공"},
500: {"description": "조회 실패"},
},
)
async def get_videos(
session: AsyncSession = Depends(get_session),
pagination: PaginationParams = Depends(get_pagination_params),
) -> PaginatedResponse[VideoListItem]:
"""완료된 영상 목록을 페이지네이션하여 반환합니다."""
print(f"[get_videos] START - page: {pagination.page}, page_size: {pagination.page_size}")
try:
offset = (pagination.page - 1) * pagination.page_size
# 서브쿼리: task_id별 최신 Video의 id 조회 (completed 상태만)
subquery = (
select(func.max(Video.id).label("max_id"))
.where(Video.status == "completed")
.group_by(Video.task_id)
.subquery()
)
# 전체 개수 조회 (task_id별 최신 1개만)
count_query = select(func.count()).select_from(subquery)
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
# 데이터 조회 (completed 상태, task_id별 최신 1개만, 최신순)
query = (
select(Video)
.where(Video.id.in_(select(subquery.c.max_id)))
.order_by(Video.created_at.desc())
.offset(offset)
.limit(pagination.page_size)
)
result = await session.execute(query)
videos = result.scalars().all()
# Project 정보와 함께 VideoListItem으로 변환
items = []
for video in videos:
# Project 조회 (video.project_id 직접 사용)
project_result = await session.execute(
select(Project).where(Project.id == video.project_id)
)
project = project_result.scalar_one_or_none()
item = VideoListItem(
store_name=project.store_name if project else None,
region=project.region if project else None,
task_id=video.task_id,
result_movie_url=video.result_movie_url,
created_at=video.created_at,
)
items.append(item)
# 개별 아이템 로그
print(
f"[get_videos] Item - store_name: {item.store_name}, region: {item.region}, "
f"task_id: {item.task_id}, result_movie_url: {item.result_movie_url}, "
f"created_at: {item.created_at}"
)
response = PaginatedResponse.create(
items=items,
total=total,
page=pagination.page,
page_size=pagination.page_size,
)
print(
f"[get_videos] SUCCESS - total: {total}, page: {pagination.page}, "
f"page_size: {pagination.page_size}, items_count: {len(items)}"
)
return response
except Exception as e:
print(f"[get_videos] EXCEPTION - error: {e}")
raise HTTPException(
status_code=500,
detail=f"영상 목록 조회에 실패했습니다: {str(e)}",
) )

View File

@ -83,6 +83,12 @@ class Video(Base):
comment="영상 생성 작업 고유 식별자 (UUID)", comment="영상 생성 작업 고유 식별자 (UUID)",
) )
creatomate_render_id: Mapped[Optional[str]] = mapped_column(
String(64),
nullable=True,
comment="Creatomate API 렌더 ID",
)
status: Mapped[str] = mapped_column( status: Mapped[str] = mapped_column(
String(50), String(50),
nullable=False, nullable=False,

View File

@ -1,91 +1,194 @@
from dataclasses import dataclass, field """
Video API Schemas
영상 생성 관련 Pydantic 스키마를 정의합니다.
"""
from datetime import datetime from datetime import datetime
from typing import Dict, List from typing import Any, Dict, List, Optional
from fastapi import Request from pydantic import BaseModel, Field
@dataclass # =============================================================================
class StoreData: # Request Schemas
id: int # =============================================================================
created_at: datetime
store_name: str
store_category: str | None = None
store_region: str | None = None
store_address: str | None = None
store_phone_number: str | None = None
store_info: str | None = None
@dataclass class GenerateVideoRequest(BaseModel):
class AttributeData: """영상 생성 요청 스키마
id: int
attr_category: str Usage:
attr_value: str POST /video/generate/{task_id}
created_at: datetime Request body for generating a video via Creatomate API.
Example Request:
{
"template_id": "abc123...",
"image_urls": ["https://...", "https://..."],
"lyrics": "가사 내용...",
"music_url": "https://..."
}
"""
model_config = {
"json_schema_extra": {
"example": {
"template_id": "abc123-template-id",
"image_urls": [
"https://example.com/image1.jpg",
"https://example.com/image2.jpg",
],
"lyrics": "인스타 감성의 스테이 머뭄, 머물러봐요\n군산 신흥동 말랭이 마을의 마음 힐링",
"music_url": "https://example.com/song.mp3",
}
}
}
template_id: str = Field(..., description="Creatomate 템플릿 ID")
image_urls: List[str] = Field(..., description="영상에 사용할 이미지 URL 목록")
lyrics: str = Field(..., description="영상에 표시할 가사")
music_url: str = Field(..., description="배경 음악 URL")
@dataclass # =============================================================================
class SongSampleData: # Response Schemas
id: int # =============================================================================
ai: str
ai_model: str
sample_song: str
season: str | None = None
num_of_people: int | None = None
people_category: str | None = None
genre: str | None = None
@dataclass class GenerateVideoResponse(BaseModel):
class PromptTemplateData: """영상 생성 응답 스키마
id: int
prompt: str Usage:
description: str | None = None POST /video/generate/{task_id}
Returns the task IDs for tracking video generation.
Example Response (Success):
{
"success": true,
"task_id": "019123ab-cdef-7890-abcd-ef1234567890",
"creatomate_render_id": "render-id-123",
"message": "영상 생성 요청이 접수되었습니다.",
"error_message": null
}
"""
success: bool = Field(..., description="요청 성공 여부")
task_id: Optional[str] = Field(None, description="내부 작업 ID (Project task_id)")
creatomate_render_id: Optional[str] = Field(None, description="Creatomate 렌더 ID")
message: str = Field(..., description="응답 메시지")
error_message: Optional[str] = Field(None, description="에러 메시지 (실패 시)")
@dataclass class VideoRenderData(BaseModel):
class SongFormData: """Creatomate 렌더링 결과 데이터"""
store_name: str
store_id: str
prompts: str
attributes: Dict[str, str] = field(default_factory=dict)
attributes_str: str = ""
lyrics_ids: List[int] = field(default_factory=list)
llm_model: str = "gpt-4o"
@classmethod id: Optional[str] = Field(None, description="렌더 ID")
async def from_form(cls, request: Request): status: Optional[str] = Field(None, description="렌더 상태")
"""Request의 form 데이터로부터 dataclass 인스턴스 생성""" url: Optional[str] = Field(None, description="영상 URL")
form_data = await request.form() snapshot_url: Optional[str] = Field(None, description="스냅샷 URL")
# 고정 필드명들
fixed_keys = {"store_info_name", "store_id", "llm_model", "prompts"}
# lyrics-{id} 형태의 모든 키를 찾아서 ID 추출 class PollingVideoResponse(BaseModel):
lyrics_ids = [] """영상 생성 상태 조회 응답 스키마
attributes = {}
for key, value in form_data.items(): Usage:
if key.startswith("lyrics-"): GET /video/status/{creatomate_render_id}
lyrics_id = key.split("-")[1] Creatomate API 작업 상태를 조회합니다.
lyrics_ids.append(int(lyrics_id))
elif key not in fixed_keys:
attributes[key] = value
# attributes를 문자열로 변환 Note:
attributes_str = ( 상태 :
"\r\n\r\n".join([f"{key} : {value}" for key, value in attributes.items()]) - planned: 예약됨
if attributes - waiting: 대기
else "" - transcribing: 트랜스크립션
- rendering: 렌더링
- succeeded: 성공
- failed: 실패
Example Response (Success):
{
"success": true,
"status": "succeeded",
"message": "영상 생성이 완료되었습니다.",
"render_data": {
"id": "render-id",
"status": "succeeded",
"url": "https://...",
"snapshot_url": "https://..."
},
"raw_response": {...},
"error_message": null
}
"""
success: bool = Field(..., description="조회 성공 여부")
status: Optional[str] = Field(
None, description="작업 상태 (planned, waiting, rendering, succeeded, failed)"
) )
message: str = Field(..., description="상태 메시지")
render_data: Optional[VideoRenderData] = Field(None, description="렌더링 결과 데이터")
raw_response: Optional[Dict[str, Any]] = Field(None, description="Creatomate API 원본 응답")
error_message: Optional[str] = Field(None, description="에러 메시지 (실패 시)")
return cls(
store_name=form_data.get("store_info_name", ""), class DownloadVideoResponse(BaseModel):
store_id=form_data.get("store_id", ""), """영상 다운로드 응답 스키마
attributes=attributes,
attributes_str=attributes_str, Usage:
lyrics_ids=lyrics_ids, GET /video/download/{task_id}
llm_model=form_data.get("llm_model", "gpt-4o"), Polls for video completion and returns project info with video URL.
prompts=form_data.get("prompts", ""),
) Note:
상태 :
- processing: 영상 생성 진행 (result_movie_url은 null)
- completed: 영상 생성 완료 (result_movie_url 포함)
- failed: 영상 생성 실패
- not_found: task_id에 해당하는 Video 없음
- error: 조회 오류 발생
Example Response (Completed):
{
"success": true,
"status": "completed",
"message": "영상 다운로드가 완료되었습니다.",
"store_name": "스테이 머뭄",
"region": "군산",
"task_id": "019123ab-cdef-7890-abcd-ef1234567890",
"result_movie_url": "http://localhost:8000/media/2025-01-15/video.mp4",
"created_at": "2025-01-15T12:00:00",
"error_message": null
}
"""
success: bool = Field(..., description="다운로드 성공 여부")
status: str = Field(..., description="처리 상태 (processing, completed, failed, not_found, error)")
message: str = Field(..., description="응답 메시지")
store_name: Optional[str] = Field(None, description="업체명")
region: Optional[str] = Field(None, description="지역명")
task_id: Optional[str] = Field(None, description="작업 고유 식별자")
result_movie_url: Optional[str] = Field(None, description="영상 결과 URL")
created_at: Optional[datetime] = Field(None, description="생성 일시")
error_message: Optional[str] = Field(None, description="에러 메시지 (실패 시)")
class VideoListItem(BaseModel):
"""영상 목록 아이템 스키마
Usage:
GET /videos 응답의 개별 영상 정보
Example:
{
"store_name": "스테이 머뭄",
"region": "군산",
"task_id": "019123ab-cdef-7890-abcd-ef1234567890",
"result_movie_url": "http://localhost:8000/media/2025-01-15/video.mp4",
"created_at": "2025-01-15T12:00:00"
}
"""
store_name: Optional[str] = Field(None, description="업체명")
region: Optional[str] = Field(None, description="지역명")
task_id: str = Field(..., description="작업 고유 식별자")
result_movie_url: Optional[str] = Field(None, description="영상 결과 URL")
created_at: Optional[datetime] = Field(None, description="생성 일시")

View File

@ -0,0 +1,101 @@
"""
Video Background Tasks
영상 생성 관련 백그라운드 태스크를 정의합니다.
"""
from datetime import date
from pathlib import Path
import aiofiles
import httpx
from sqlalchemy import select
from app.database.session import AsyncSessionLocal
from app.video.models import Video
from app.utils.common import generate_task_id
from config import prj_settings
async def download_and_save_video(
task_id: str,
video_url: str,
store_name: str,
) -> None:
"""백그라운드에서 영상을 다운로드하고 Video 테이블을 업데이트합니다.
Args:
task_id: 프로젝트 task_id
video_url: 다운로드할 영상 URL
store_name: 저장할 파일명에 사용할 업체명
"""
print(f"[download_and_save_video] START - task_id: {task_id}, store_name: {store_name}")
try:
# 저장 경로 생성: media/{날짜}/{uuid7}/{store_name}.mp4
today = date.today().isoformat()
unique_id = await generate_task_id()
# 파일명에 사용할 수 없는 문자 제거
safe_store_name = "".join(
c for c in store_name if c.isalnum() or c in (" ", "_", "-")
).strip()
safe_store_name = safe_store_name or "video"
file_name = f"{safe_store_name}.mp4"
# 절대 경로 생성
media_dir = Path("media") / today / unique_id
media_dir.mkdir(parents=True, exist_ok=True)
file_path = media_dir / file_name
print(f"[download_and_save_video] Directory created - path: {file_path}")
# 영상 파일 다운로드
print(f"[download_and_save_video] Downloading video - task_id: {task_id}, url: {video_url}")
async with httpx.AsyncClient() as client:
response = await client.get(video_url, timeout=120.0) # 영상은 더 큰 timeout
response.raise_for_status()
async with aiofiles.open(str(file_path), "wb") as f:
await f.write(response.content)
print(f"[download_and_save_video] File saved - task_id: {task_id}, path: {file_path}")
# 프론트엔드에서 접근 가능한 URL 생성
relative_path = f"/media/{today}/{unique_id}/{file_name}"
base_url = f"http://{prj_settings.PROJECT_DOMAIN}"
file_url = f"{base_url}{relative_path}"
print(f"[download_and_save_video] URL generated - task_id: {task_id}, url: {file_url}")
# Video 테이블 업데이트 (새 세션 사용)
async with AsyncSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute(
select(Video)
.where(Video.task_id == task_id)
.order_by(Video.created_at.desc())
.limit(1)
)
video = result.scalar_one_or_none()
if video:
video.status = "completed"
video.result_movie_url = file_url
await session.commit()
print(f"[download_and_save_video] SUCCESS - task_id: {task_id}, status: completed")
else:
print(f"[download_and_save_video] Video NOT FOUND in DB - task_id: {task_id}")
except Exception as e:
print(f"[download_and_save_video] EXCEPTION - task_id: {task_id}, error: {e}")
# 실패 시 Video 테이블 업데이트
async with AsyncSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute(
select(Video)
.where(Video.task_id == task_id)
.order_by(Video.created_at.desc())
.limit(1)
)
video = result.scalar_one_or_none()
if video:
video.status = "failed"
await session.commit()
print(f"[download_and_save_video] FAILED - task_id: {task_id}, status updated to failed")

View File

@ -0,0 +1,64 @@
import requests
from pathlib import Path
SAS_TOKEN = "sp=racwdl&st=2025-12-01T00:13:29Z&se=2026-07-31T08:28:29Z&spr=https&sv=2024-11-04&sr=c&sig=7fE2ozVBPu3Gq43%2FZDxEYdEcPLDXyNVfTf16IBasmVQ%3D"
def upload_music_to_azure_blob(file_path = "스테이 머뭄_1.mp3", url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp3"):
access_url = f"{url}?{SAS_TOKEN}"
headers = {
"Content-Type": "audio/mpeg",
"x-ms-blob-type": "BlockBlob"
}
with open(file_path, "rb") as file:
response = requests.put(access_url, data=file, headers=headers)
if response.status_code in [200, 201]:
print(f"Success Status Code: {response.status_code}")
else:
print(f"Failed Status Code: {response.status_code}")
print(f"Response: {response.text}")
def upload_video_to_azure_blob(file_path = "스테이 머뭄.mp4", url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp4"):
access_url = f"{url}?{SAS_TOKEN}"
headers = {
"Content-Type": "video/mp4",
"x-ms-blob-type": "BlockBlob"
}
with open(file_path, "rb") as file:
response = requests.put(access_url, data=file, headers=headers)
if response.status_code in [200, 201]:
print(f"Success Status Code: {response.status_code}")
else:
print(f"Failed Status Code: {response.status_code}")
print(f"Response: {response.text}")
def upload_image_to_azure_blob(file_path = "스테이 머뭄.png", url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.png"):
access_url = f"{url}?{SAS_TOKEN}"
extension = Path(file_path).suffix.lower()
content_types = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp"
}
content_type = content_types.get(extension, "image/jpeg")
headers = {
"Content-Type": content_type,
"x-ms-blob-type": "BlockBlob"
}
with open(file_path, "rb") as file:
response = requests.put(access_url, data=file, headers=headers)
if response.status_code in [200, 201]:
print(f"Success Status Code: {response.status_code}")
else:
print(f"Failed Status Code: {response.status_code}")
print(f"Response: {response.text}")
upload_video_to_azure_blob()
upload_image_to_azure_blob()