207 lines
8.1 KiB
Python
207 lines
8.1 KiB
Python
"""② Higgsfield step — photos + prompt → completed 8s base video.
|
|
|
|
두 가지 백엔드를 지원한다 (cfg.HIGGSFIELD_BACKEND):
|
|
|
|
· "cli" — 기존 검증 경로. `higgsfield` CLI 를 subprocess 로 호출.
|
|
~/.higgsfield 디바이스 로그인 인증 필요(컨테이너 비친화).
|
|
· "api" — 공식 Python SDK(higgsfield-client). HF_KEY 환경변수 인증.
|
|
컨테이너/서버리스 친화. 공개 문서 기준 DoP image2video 사용.
|
|
|
|
dry_run=True 면 양쪽 모두 건너뛰고 번들된 데모 클립을 반환(크레딧 0).
|
|
|
|
API 백엔드는 DoP image2video(POST {app} → job set, GET /v1/job-sets/{id} 폴링,
|
|
jobs[0].results.raw.url 추출)로 실측 검증됨. 단가/오디오 특성이 CLI(marketing_studio,
|
|
tv_spot)와 다르므로 app/variant 는 env(cfg.HIGGSFIELD_API_APP/VARIANT)로 교체 가능.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from app import config as cfg
|
|
|
|
ENGINE = Path(__file__).resolve().parents[3] # engine/higgsfield_shorts
|
|
DEMO_VIDEO = cfg.WEBAPP_DIR / "demo" / "mumum.mp4"
|
|
TMP = ENGINE / "server" / ".tmp"
|
|
|
|
MODEL = cfg.HIGGSFIELD_MODEL
|
|
_URL_RE = re.compile(r'"result_url"\s*:\s*"([^"]+)"')
|
|
_CREDITS_RE = re.compile(r'"credits(?:_exact)?"\s*:\s*([0-9.]+)')
|
|
|
|
|
|
def _download(url: str) -> Path:
|
|
# httpx + 브라우저 UA 로 받음. (CDN이 urllib 기본 UA를 403 차단 — 실측 확인)
|
|
TMP.mkdir(parents=True, exist_ok=True)
|
|
dest = TMP / f"base_{uuid.uuid4().hex[:8]}.mp4"
|
|
with httpx.stream("GET", url, follow_redirects=True, timeout=120.0,
|
|
headers={"User-Agent": "Mozilla/5.0"}) as r:
|
|
r.raise_for_status()
|
|
with dest.open("wb") as f:
|
|
for chunk in r.iter_bytes():
|
|
f.write(chunk)
|
|
return dest
|
|
|
|
|
|
# ============================ CLI 백엔드 ============================
|
|
|
|
def _base_args(prompt: str, image_paths: list[Path], duration: int) -> list[str]:
|
|
args = ["--prompt", prompt]
|
|
for p in image_paths:
|
|
args += ["--image", str(p)]
|
|
args += ["--mode", cfg.HIGGSFIELD_MODE, "--duration", str(duration),
|
|
"--generate_audio", cfg.HIGGSFIELD_GENERATE_AUDIO,
|
|
"--aspect_ratio", cfg.HIGGSFIELD_ASPECT_RATIO]
|
|
return args
|
|
|
|
|
|
def cost(prompt: str, image_paths: list[Path], duration: int = cfg.HIGGSFIELD_DURATION) -> float:
|
|
"""CLI 백엔드 전용 — 생성 전 크레딧 선확인."""
|
|
out = subprocess.run(
|
|
["higgsfield", "generate", "cost", MODEL, *_base_args(prompt, image_paths, duration), "--json"],
|
|
capture_output=True, text=True, check=True,
|
|
)
|
|
m = _CREDITS_RE.search(out.stdout)
|
|
return float(m.group(1)) if m else 0.0
|
|
|
|
|
|
def _generate_cli(
|
|
prompt: str, image_paths: list[Path], duration: int, wait_timeout: str,
|
|
) -> tuple[Path, float]:
|
|
out = subprocess.run(
|
|
["higgsfield", "generate", "create", MODEL,
|
|
*_base_args(prompt, image_paths, duration),
|
|
"--wait", "--wait-timeout", wait_timeout, "--json"],
|
|
capture_output=True, text=True, check=True,
|
|
)
|
|
m = _URL_RE.search(out.stdout)
|
|
if not m:
|
|
raise RuntimeError(f"Higgsfield returned no result_url:\n{out.stdout[-2000:]}")
|
|
credits_m = _CREDITS_RE.search(out.stdout)
|
|
credits = float(credits_m.group(1)) if credits_m else 0.0
|
|
return _download(m.group(1)), credits
|
|
|
|
|
|
# ============================ API 백엔드 ============================
|
|
#
|
|
# DoP image2video 응답은 공식 SDK의 generic submit() 가정(request_id)과 달라
|
|
# 'job set' 구조다(실측 확인). 따라서 SDK 의 저수준 클라이언트(인증·base_url)만
|
|
# 빌려 직접 흐름을 구현한다:
|
|
# POST {app} → {"id": <job_set_id>, "jobs": [{status, results}]}
|
|
# GET /v1/job-sets/{id} → jobs[0].status / jobs[0].results.{raw,min}.url
|
|
#
|
|
_TERMINAL = {"completed", "failed", "nsfw", "canceled", "cancelled"}
|
|
|
|
|
|
def _parse_timeout(value: str) -> float:
|
|
"""'15m' / '900s' / '900' → 초(float)."""
|
|
v = str(value).strip().lower()
|
|
try:
|
|
if v.endswith("m"):
|
|
return float(v[:-1]) * 60
|
|
if v.endswith("s"):
|
|
return float(v[:-1])
|
|
return float(v)
|
|
except ValueError:
|
|
return 900.0
|
|
|
|
|
|
def _extract_url(results) -> str:
|
|
"""job 의 results 에서 영상 URL 추출. raw(풀화질) 우선, 없으면 min."""
|
|
if isinstance(results, dict):
|
|
for key in ("raw", "min", "result", "output"):
|
|
v = results.get(key)
|
|
if isinstance(v, dict) and v.get("url"):
|
|
return v["url"]
|
|
if isinstance(v, str):
|
|
return v
|
|
raise RuntimeError(f"Higgsfield 결과에서 영상 URL을 찾지 못함: {results!r}")
|
|
|
|
|
|
def _generate_api(
|
|
prompt: str, image_paths: list[Path], duration: int, wait_timeout: str,
|
|
) -> tuple[Path, float]:
|
|
try:
|
|
import higgsfield_client as hf # 공식 SDK (업로드/인증)
|
|
from higgsfield_client.http.client import SyncClient
|
|
except ImportError as e:
|
|
raise RuntimeError(
|
|
"higgsfield-client 미설치. `pip install higgsfield-client` 또는 이미지 재빌드 필요."
|
|
) from e
|
|
|
|
if not (os.environ.get("HF_KEY") or
|
|
(os.environ.get("HF_API_KEY") and os.environ.get("HF_API_SECRET"))):
|
|
raise RuntimeError(
|
|
"Higgsfield API 자격증명 없음. HF_KEY='key:secret' 또는 "
|
|
"HIGGSFIELD_API_KEY/HIGGSFIELD_API_SECRET(.env) 설정 필요."
|
|
)
|
|
|
|
# 1) 입력 사진 업로드 → URL.
|
|
image_urls = [hf.upload_file(str(p)) for p in image_paths[:cfg.HIGGSFIELD_API_MAX_IMAGES]]
|
|
if not image_urls:
|
|
raise RuntimeError("업로드된 입력 이미지가 없습니다.")
|
|
|
|
# 2) 생성 요청. 실인자는 'params' 로 감싼다(실측: 미래핑 시 422).
|
|
# 엔드포인트별 이미지 필드 구조가 다름(platform API 실측):
|
|
# · DoP : input_images (복수 배열)
|
|
# · Seedance : input_image (단수 object). model enum=seedance_pro|seedance_lite,
|
|
# aspect_ratio 9:16 네이티브 지원, duration 3~12.
|
|
common = {
|
|
"model": cfg.HIGGSFIELD_API_VARIANT,
|
|
"prompt": prompt,
|
|
"aspect_ratio": cfg.HIGGSFIELD_ASPECT_RATIO,
|
|
"duration": duration,
|
|
}
|
|
if "seedance" in cfg.HIGGSFIELD_API_APP.lower():
|
|
params = {**common, "input_image": {"type": "image_url", "image_url": image_urls[0]}}
|
|
else:
|
|
params = {**common,
|
|
"input_images": [{"type": "image_url", "image_url": u} for u in image_urls]}
|
|
body = {"params": params}
|
|
http = SyncClient()._client # 인증·base_url 설정된 httpx.Client
|
|
resp = http.post(cfg.HIGGSFIELD_API_APP, json=body)
|
|
resp.raise_for_status()
|
|
job_set_id = resp.json()["id"]
|
|
|
|
# 3) 완료까지 폴링
|
|
timeout_s = _parse_timeout(wait_timeout)
|
|
waited, interval = 0.0, 5.0
|
|
while True:
|
|
js = http.get(f"/v1/job-sets/{job_set_id}")
|
|
js.raise_for_status()
|
|
job = js.json()["jobs"][0]
|
|
status = (job.get("status") or "").lower()
|
|
if status == "completed":
|
|
break
|
|
if status in _TERMINAL:
|
|
raise RuntimeError(f"Higgsfield 생성 실패: status={status} (job_set {job_set_id})")
|
|
if waited >= timeout_s:
|
|
raise RuntimeError(f"Higgsfield 생성 타임아웃({wait_timeout}, job_set {job_set_id})")
|
|
time.sleep(interval)
|
|
waited += interval
|
|
|
|
# 4) 결과 영상 다운로드 (credits 는 응답에 없어 0.0)
|
|
return _download(_extract_url(job["results"])), 0.0
|
|
|
|
|
|
# ============================ 디스패처 ============================
|
|
|
|
def generate(
|
|
prompt: str,
|
|
image_paths: list[Path],
|
|
duration: int = cfg.HIGGSFIELD_DURATION,
|
|
dry_run: bool = False,
|
|
wait_timeout: str = cfg.HIGGSFIELD_WAIT_TIMEOUT,
|
|
) -> tuple[Path, float]:
|
|
"""Return (local mp4 path, credits spent). 백엔드는 cfg.HIGGSFIELD_BACKEND."""
|
|
if dry_run:
|
|
return DEMO_VIDEO, 0.0
|
|
if cfg.HIGGSFIELD_BACKEND == "api":
|
|
return _generate_api(prompt, image_paths, duration, wait_timeout)
|
|
return _generate_cli(prompt, image_paths, duration, wait_timeout)
|