"""② Higgsfield step — photos + prompt → completed 8s base video. 두 가지 백엔드를 지원한다 (cfg.HIGGSFIELD_BACKEND): · "cli" — 기존 검증 경로. `higgsfield` CLI 를 subprocess 로 호출. ~/.higgsfield 디바이스 로그인 인증 필요(컨테이너 비친화). · "api" — 공식 Python SDK(higgsfield-client). HF_KEY 환경변수 인증. 컨테이너/서버리스 친화. 공개 문서 기준 DoP image2video 사용. dry_run=True 면 양쪽 모두 건너뛰고 번들된 데모 클립을 반환(크레딧 0). API 백엔드는 DoP image2video(POST {app} → job set, GET /v1/job-sets/{id} 폴링, jobs[0].results.raw.url 추출)로 실측 검증됨. 단가/오디오 특성이 CLI(marketing_studio, tv_spot)와 다르므로 app/variant 는 env(cfg.HIGGSFIELD_API_APP/VARIANT)로 교체 가능. """ from __future__ import annotations import os import re import subprocess import time import uuid from pathlib import Path import httpx from app import config as cfg ENGINE = Path(__file__).resolve().parents[3] # engine/higgsfield_shorts DEMO_VIDEO = cfg.WEBAPP_DIR / "demo" / "mumum.mp4" TMP = ENGINE / "server" / ".tmp" MODEL = cfg.HIGGSFIELD_MODEL _URL_RE = re.compile(r'"result_url"\s*:\s*"([^"]+)"') _CREDITS_RE = re.compile(r'"credits(?:_exact)?"\s*:\s*([0-9.]+)') def _download(url: str) -> Path: # httpx + 브라우저 UA 로 받음. (CDN이 urllib 기본 UA를 403 차단 — 실측 확인) TMP.mkdir(parents=True, exist_ok=True) dest = TMP / f"base_{uuid.uuid4().hex[:8]}.mp4" with httpx.stream("GET", url, follow_redirects=True, timeout=120.0, headers={"User-Agent": "Mozilla/5.0"}) as r: r.raise_for_status() with dest.open("wb") as f: for chunk in r.iter_bytes(): f.write(chunk) return dest # ============================ CLI 백엔드 ============================ def _base_args(prompt: str, image_paths: list[Path], duration: int) -> list[str]: args = ["--prompt", prompt] for p in image_paths: args += ["--image", str(p)] args += ["--mode", cfg.HIGGSFIELD_MODE, "--duration", str(duration), "--generate_audio", cfg.HIGGSFIELD_GENERATE_AUDIO, "--aspect_ratio", cfg.HIGGSFIELD_ASPECT_RATIO] return args def cost(prompt: str, image_paths: list[Path], duration: int = cfg.HIGGSFIELD_DURATION) -> float: """CLI 백엔드 전용 — 생성 전 크레딧 선확인.""" out = subprocess.run( ["higgsfield", "generate", "cost", MODEL, *_base_args(prompt, image_paths, duration), "--json"], capture_output=True, text=True, check=True, ) m = _CREDITS_RE.search(out.stdout) return float(m.group(1)) if m else 0.0 def _generate_cli( prompt: str, image_paths: list[Path], duration: int, wait_timeout: str, ) -> tuple[Path, float]: out = subprocess.run( ["higgsfield", "generate", "create", MODEL, *_base_args(prompt, image_paths, duration), "--wait", "--wait-timeout", wait_timeout, "--json"], capture_output=True, text=True, check=True, ) m = _URL_RE.search(out.stdout) if not m: raise RuntimeError(f"Higgsfield returned no result_url:\n{out.stdout[-2000:]}") credits_m = _CREDITS_RE.search(out.stdout) credits = float(credits_m.group(1)) if credits_m else 0.0 return _download(m.group(1)), credits # ============================ API 백엔드 ============================ # # DoP image2video 응답은 공식 SDK의 generic submit() 가정(request_id)과 달라 # 'job set' 구조다(실측 확인). 따라서 SDK 의 저수준 클라이언트(인증·base_url)만 # 빌려 직접 흐름을 구현한다: # POST {app} → {"id": , "jobs": [{status, results}]} # GET /v1/job-sets/{id} → jobs[0].status / jobs[0].results.{raw,min}.url # _TERMINAL = {"completed", "failed", "nsfw", "canceled", "cancelled"} def _parse_timeout(value: str) -> float: """'15m' / '900s' / '900' → 초(float).""" v = str(value).strip().lower() try: if v.endswith("m"): return float(v[:-1]) * 60 if v.endswith("s"): return float(v[:-1]) return float(v) except ValueError: return 900.0 def _extract_url(results) -> str: """job 의 results 에서 영상 URL 추출. raw(풀화질) 우선, 없으면 min.""" if isinstance(results, dict): for key in ("raw", "min", "result", "output"): v = results.get(key) if isinstance(v, dict) and v.get("url"): return v["url"] if isinstance(v, str): return v raise RuntimeError(f"Higgsfield 결과에서 영상 URL을 찾지 못함: {results!r}") def _generate_api( prompt: str, image_paths: list[Path], duration: int, wait_timeout: str, ) -> tuple[Path, float]: try: import higgsfield_client as hf # 공식 SDK (업로드/인증) from higgsfield_client.http.client import SyncClient except ImportError as e: raise RuntimeError( "higgsfield-client 미설치. `pip install higgsfield-client` 또는 이미지 재빌드 필요." ) from e if not (os.environ.get("HF_KEY") or (os.environ.get("HF_API_KEY") and os.environ.get("HF_API_SECRET"))): raise RuntimeError( "Higgsfield API 자격증명 없음. HF_KEY='key:secret' 또는 " "HIGGSFIELD_API_KEY/HIGGSFIELD_API_SECRET(.env) 설정 필요." ) # 1) 입력 사진 업로드 → URL. image_urls = [hf.upload_file(str(p)) for p in image_paths[:cfg.HIGGSFIELD_API_MAX_IMAGES]] if not image_urls: raise RuntimeError("업로드된 입력 이미지가 없습니다.") # 2) 생성 요청. 실인자는 'params' 로 감싼다(실측: 미래핑 시 422). # 엔드포인트별 이미지 필드 구조가 다름(platform API 실측): # · DoP : input_images (복수 배열) # · Seedance : input_image (단수 object). model enum=seedance_pro|seedance_lite, # aspect_ratio 9:16 네이티브 지원, duration 3~12. common = { "model": cfg.HIGGSFIELD_API_VARIANT, "prompt": prompt, "aspect_ratio": cfg.HIGGSFIELD_ASPECT_RATIO, "duration": duration, } if "seedance" in cfg.HIGGSFIELD_API_APP.lower(): params = {**common, "input_image": {"type": "image_url", "image_url": image_urls[0]}} else: params = {**common, "input_images": [{"type": "image_url", "image_url": u} for u in image_urls]} body = {"params": params} http = SyncClient()._client # 인증·base_url 설정된 httpx.Client resp = http.post(cfg.HIGGSFIELD_API_APP, json=body) resp.raise_for_status() job_set_id = resp.json()["id"] # 3) 완료까지 폴링 timeout_s = _parse_timeout(wait_timeout) waited, interval = 0.0, 5.0 while True: js = http.get(f"/v1/job-sets/{job_set_id}") js.raise_for_status() job = js.json()["jobs"][0] status = (job.get("status") or "").lower() if status == "completed": break if status in _TERMINAL: raise RuntimeError(f"Higgsfield 생성 실패: status={status} (job_set {job_set_id})") if waited >= timeout_s: raise RuntimeError(f"Higgsfield 생성 타임아웃({wait_timeout}, job_set {job_set_id})") time.sleep(interval) waited += interval # 4) 결과 영상 다운로드 (credits 는 응답에 없어 0.0) return _download(_extract_url(job["results"])), 0.0 # ============================ 디스패처 ============================ def generate( prompt: str, image_paths: list[Path], duration: int = cfg.HIGGSFIELD_DURATION, dry_run: bool = False, wait_timeout: str = cfg.HIGGSFIELD_WAIT_TIMEOUT, ) -> tuple[Path, float]: """Return (local mp4 path, credits spent). 백엔드는 cfg.HIGGSFIELD_BACKEND.""" if dry_run: return DEMO_VIDEO, 0.0 if cfg.HIGGSFIELD_BACKEND == "api": return _generate_api(prompt, image_paths, duration, wait_timeout) return _generate_cli(prompt, image_paths, duration, wait_timeout)