o2o-ado2-short-form/server/app/pipeline/higgsfield_client.py

207 lines
8.1 KiB
Python

"""② Higgsfield step — photos + prompt → completed 8s base video.
두 가지 백엔드를 지원한다 (cfg.HIGGSFIELD_BACKEND):
· "cli" — 기존 검증 경로. `higgsfield` CLI 를 subprocess 로 호출.
~/.higgsfield 디바이스 로그인 인증 필요(컨테이너 비친화).
· "api" — 공식 Python SDK(higgsfield-client). HF_KEY 환경변수 인증.
컨테이너/서버리스 친화. 공개 문서 기준 DoP image2video 사용.
dry_run=True 면 양쪽 모두 건너뛰고 번들된 데모 클립을 반환(크레딧 0).
API 백엔드는 DoP image2video(POST {app} → job set, GET /v1/job-sets/{id} 폴링,
jobs[0].results.raw.url 추출)로 실측 검증됨. 단가/오디오 특성이 CLI(marketing_studio,
tv_spot)와 다르므로 app/variant 는 env(cfg.HIGGSFIELD_API_APP/VARIANT)로 교체 가능.
"""
from __future__ import annotations
import os
import re
import subprocess
import time
import uuid
from pathlib import Path
import httpx
from app import config as cfg
ENGINE = Path(__file__).resolve().parents[3] # engine/higgsfield_shorts
DEMO_VIDEO = cfg.WEBAPP_DIR / "demo" / "mumum.mp4"
TMP = ENGINE / "server" / ".tmp"
MODEL = cfg.HIGGSFIELD_MODEL
_URL_RE = re.compile(r'"result_url"\s*:\s*"([^"]+)"')
_CREDITS_RE = re.compile(r'"credits(?:_exact)?"\s*:\s*([0-9.]+)')
def _download(url: str) -> Path:
# httpx + 브라우저 UA 로 받음. (CDN이 urllib 기본 UA를 403 차단 — 실측 확인)
TMP.mkdir(parents=True, exist_ok=True)
dest = TMP / f"base_{uuid.uuid4().hex[:8]}.mp4"
with httpx.stream("GET", url, follow_redirects=True, timeout=120.0,
headers={"User-Agent": "Mozilla/5.0"}) as r:
r.raise_for_status()
with dest.open("wb") as f:
for chunk in r.iter_bytes():
f.write(chunk)
return dest
# ============================ CLI 백엔드 ============================
def _base_args(prompt: str, image_paths: list[Path], duration: int) -> list[str]:
args = ["--prompt", prompt]
for p in image_paths:
args += ["--image", str(p)]
args += ["--mode", cfg.HIGGSFIELD_MODE, "--duration", str(duration),
"--generate_audio", cfg.HIGGSFIELD_GENERATE_AUDIO,
"--aspect_ratio", cfg.HIGGSFIELD_ASPECT_RATIO]
return args
def cost(prompt: str, image_paths: list[Path], duration: int = cfg.HIGGSFIELD_DURATION) -> float:
"""CLI 백엔드 전용 — 생성 전 크레딧 선확인."""
out = subprocess.run(
["higgsfield", "generate", "cost", MODEL, *_base_args(prompt, image_paths, duration), "--json"],
capture_output=True, text=True, check=True,
)
m = _CREDITS_RE.search(out.stdout)
return float(m.group(1)) if m else 0.0
def _generate_cli(
prompt: str, image_paths: list[Path], duration: int, wait_timeout: str,
) -> tuple[Path, float]:
out = subprocess.run(
["higgsfield", "generate", "create", MODEL,
*_base_args(prompt, image_paths, duration),
"--wait", "--wait-timeout", wait_timeout, "--json"],
capture_output=True, text=True, check=True,
)
m = _URL_RE.search(out.stdout)
if not m:
raise RuntimeError(f"Higgsfield returned no result_url:\n{out.stdout[-2000:]}")
credits_m = _CREDITS_RE.search(out.stdout)
credits = float(credits_m.group(1)) if credits_m else 0.0
return _download(m.group(1)), credits
# ============================ API 백엔드 ============================
#
# DoP image2video 응답은 공식 SDK의 generic submit() 가정(request_id)과 달라
# 'job set' 구조다(실측 확인). 따라서 SDK 의 저수준 클라이언트(인증·base_url)만
# 빌려 직접 흐름을 구현한다:
# POST {app} → {"id": <job_set_id>, "jobs": [{status, results}]}
# GET /v1/job-sets/{id} → jobs[0].status / jobs[0].results.{raw,min}.url
#
_TERMINAL = {"completed", "failed", "nsfw", "canceled", "cancelled"}
def _parse_timeout(value: str) -> float:
"""'15m' / '900s' / '900' → 초(float)."""
v = str(value).strip().lower()
try:
if v.endswith("m"):
return float(v[:-1]) * 60
if v.endswith("s"):
return float(v[:-1])
return float(v)
except ValueError:
return 900.0
def _extract_url(results) -> str:
"""job 의 results 에서 영상 URL 추출. raw(풀화질) 우선, 없으면 min."""
if isinstance(results, dict):
for key in ("raw", "min", "result", "output"):
v = results.get(key)
if isinstance(v, dict) and v.get("url"):
return v["url"]
if isinstance(v, str):
return v
raise RuntimeError(f"Higgsfield 결과에서 영상 URL을 찾지 못함: {results!r}")
def _generate_api(
prompt: str, image_paths: list[Path], duration: int, wait_timeout: str,
) -> tuple[Path, float]:
try:
import higgsfield_client as hf # 공식 SDK (업로드/인증)
from higgsfield_client.http.client import SyncClient
except ImportError as e:
raise RuntimeError(
"higgsfield-client 미설치. `pip install higgsfield-client` 또는 이미지 재빌드 필요."
) from e
if not (os.environ.get("HF_KEY") or
(os.environ.get("HF_API_KEY") and os.environ.get("HF_API_SECRET"))):
raise RuntimeError(
"Higgsfield API 자격증명 없음. HF_KEY='key:secret' 또는 "
"HIGGSFIELD_API_KEY/HIGGSFIELD_API_SECRET(.env) 설정 필요."
)
# 1) 입력 사진 업로드 → URL.
image_urls = [hf.upload_file(str(p)) for p in image_paths[:cfg.HIGGSFIELD_API_MAX_IMAGES]]
if not image_urls:
raise RuntimeError("업로드된 입력 이미지가 없습니다.")
# 2) 생성 요청. 실인자는 'params' 로 감싼다(실측: 미래핑 시 422).
# 엔드포인트별 이미지 필드 구조가 다름(platform API 실측):
# · DoP : input_images (복수 배열)
# · Seedance : input_image (단수 object). model enum=seedance_pro|seedance_lite,
# aspect_ratio 9:16 네이티브 지원, duration 3~12.
common = {
"model": cfg.HIGGSFIELD_API_VARIANT,
"prompt": prompt,
"aspect_ratio": cfg.HIGGSFIELD_ASPECT_RATIO,
"duration": duration,
}
if "seedance" in cfg.HIGGSFIELD_API_APP.lower():
params = {**common, "input_image": {"type": "image_url", "image_url": image_urls[0]}}
else:
params = {**common,
"input_images": [{"type": "image_url", "image_url": u} for u in image_urls]}
body = {"params": params}
http = SyncClient()._client # 인증·base_url 설정된 httpx.Client
resp = http.post(cfg.HIGGSFIELD_API_APP, json=body)
resp.raise_for_status()
job_set_id = resp.json()["id"]
# 3) 완료까지 폴링
timeout_s = _parse_timeout(wait_timeout)
waited, interval = 0.0, 5.0
while True:
js = http.get(f"/v1/job-sets/{job_set_id}")
js.raise_for_status()
job = js.json()["jobs"][0]
status = (job.get("status") or "").lower()
if status == "completed":
break
if status in _TERMINAL:
raise RuntimeError(f"Higgsfield 생성 실패: status={status} (job_set {job_set_id})")
if waited >= timeout_s:
raise RuntimeError(f"Higgsfield 생성 타임아웃({wait_timeout}, job_set {job_set_id})")
time.sleep(interval)
waited += interval
# 4) 결과 영상 다운로드 (credits 는 응답에 없어 0.0)
return _download(_extract_url(job["results"])), 0.0
# ============================ 디스패처 ============================
def generate(
prompt: str,
image_paths: list[Path],
duration: int = cfg.HIGGSFIELD_DURATION,
dry_run: bool = False,
wait_timeout: str = cfg.HIGGSFIELD_WAIT_TIMEOUT,
) -> tuple[Path, float]:
"""Return (local mp4 path, credits spent). 백엔드는 cfg.HIGGSFIELD_BACKEND."""
if dry_run:
return DEMO_VIDEO, 0.0
if cfg.HIGGSFIELD_BACKEND == "api":
return _generate_api(prompt, image_paths, duration, wait_timeout)
return _generate_cli(prompt, image_paths, duration, wait_timeout)