o2o-ado2-short-form/server/app/jobs.py

182 lines
6.7 KiB
Python

"""비동기 작업 큐 — 인메모리 레지스트리 + 스레드풀.
`/api/generate` 는 더 이상 파이프라인을 동기 블로킹하지 않는다. 대신:
submit(req, photo_paths, dry_run) → job_id 즉시 반환
ThreadPoolExecutor(max_workers=cfg.MAX_CONCURRENT_JOBS) 위에서 _run() 실행
get(job_id) → 현재 상태(JobStatus dict) — 프론트가 폴링
동시성: 워커 수만큼 동시에 생성하고, 초과분은 풀의 큐에서 status="queued" 로 대기한다.
영속성: 인메모리이므로 단일 프로세스 한정. 서버 재시작 시 진행 중 작업은 사라진다.
(단일 컨테이너·단일 uvicorn 배포 전제. 멀티프로세스/스케일아웃이 필요해지면
Redis 등 외부 스토어로 교체.)
"""
from __future__ import annotations
import shutil
import threading
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from app import config as cfg
from app.schemas import GenerateRequest
from app.pipeline import spec_builder, higgsfield_client, fal_client, remotion_render
OUTPUTS = cfg.OUTPUTS_DIR
# 단계 → (stage_index, 진입 시 표시할 대략 진행률). 프론트 진행바 3개와 1:1 매핑.
_STAGE_PROGRESS = {"spec": (0, 10), "higgsfield": (1, 40), "remotion": (2, 80)}
_executor = ThreadPoolExecutor(
max_workers=cfg.MAX_CONCURRENT_JOBS, thread_name_prefix="gen"
)
_jobs: dict[str, dict] = {}
_lock = threading.Lock()
def _now() -> float:
return time.time()
def _set(job_id: str, **fields) -> None:
"""레지스트리의 작업 레코드를 스레드 안전하게 부분 갱신."""
with _lock:
job = _jobs.get(job_id)
if job is not None:
job.update(fields, updated_at=_now())
def _stage(job_id: str, stage: str) -> None:
idx, progress = _STAGE_PROGRESS[stage]
_set(job_id, status="running", stage=stage, stage_index=idx, progress=progress)
def _purge_expired() -> None:
"""완료/실패 후 보존시간이 지난 레코드를 청소(메모리 누수 방지)."""
cutoff = _now() - cfg.JOB_RETENTION_SECONDS
with _lock:
stale = [
jid for jid, j in _jobs.items()
if j["status"] in ("done", "error") and j["updated_at"] < cutoff
]
for jid in stale:
_jobs.pop(jid, None)
def _cleanup(
upload_dir: Path | None,
base_video: Path | None,
remotion_out: Path | None,
remotion_temps: list[Path],
) -> None:
"""작업 완료·실패 후 임시 파일 전부 삭제. 오류는 무시(정리 실패가 메인 흐름에 영향 없게)."""
# 업로드 사진 폴더 통째로
if upload_dir and upload_dir.exists():
shutil.rmtree(upload_dir, ignore_errors=True)
# Higgsfield가 받아둔 베이스 영상 (.tmp/). dry_run 데모 파일은 제외
if base_video and base_video != higgsfield_client.DEMO_VIDEO:
try:
base_video.unlink(missing_ok=True)
except OSError:
pass
# Remotion 중간 파일들 (remotion/public/ 복사본, props JSON)
for p in remotion_temps:
try:
p.unlink(missing_ok=True)
except OSError:
pass
# Remotion 최종 출력 (outputs/ 로 복사 완료된 뒤 원본 삭제)
if remotion_out:
try:
remotion_out.unlink(missing_ok=True)
except OSError:
pass
def _run(job_id: str, req: GenerateRequest, photo_paths: list[Path], dry_run: bool) -> None:
"""워커 스레드 본체 — 기존 동기 파이프라인을 단계별 상태 갱신과 함께 실행."""
upload_dir = photo_paths[0].parent if photo_paths else None
base_video: Path | None = None
remotion_out: Path | None = None
remotion_temps: list[Path] = []
try:
# 1. LLM → spec
_stage(job_id, "spec")
spec = spec_builder.build_spec(req)
_set(job_id, caption=spec.caption, profile=spec.profile)
# 2. 비디오 생성 → 베이스 영상 (백엔드 선택)
# fal: Seedance 2.0 멀티이미지(9장) / higgsfield: 단일 이미지
_stage(job_id, "higgsfield")
backend = fal_client if cfg.VIDEO_BACKEND == "fal" else higgsfield_client
base_video, credits = backend.generate(
spec.higgsfield_prompt, photo_paths, dry_run=dry_run
)
_set(job_id, cost_credits=credits)
# Higgsfield 완료 즉시 outputs/ 에 저장.
# Remotion 이 실패해도 베이스 영상은 접근 가능하고,
# Remotion 이 성공하면 같은 경로에 최종본을 덮어쓴다.
OUTPUTS.mkdir(parents=True, exist_ok=True)
served = OUTPUTS / f"{job_id}.mp4"
if base_video != higgsfield_client.DEMO_VIDEO:
shutil.copy(base_video, served)
else:
shutil.copy(base_video, served) # dry_run 데모도 동일하게 복사
_set(job_id, video_url=f"/outputs/{job_id}.mp4")
# 3. Remotion → 자막 합성
_stage(job_id, "remotion")
remotion_out, remotion_temps = remotion_render.render(spec, base_video)
# Remotion 성공 → 자막 합성본으로 덮어쓰기
shutil.copy(remotion_out, served)
_set(
job_id, status="done", stage=None, stage_index=2, progress=100,
video_url=f"/outputs/{job_id}.mp4",
)
except Exception as e:
# 어떤 단계에서 왜 실패했는지 uvicorn 콘솔에 전체 스택을 남긴다(디버깅 필수).
print(f"[job {job_id}] FAILED at stage={get(job_id).get('stage') if get(job_id) else '?'}: {e}")
traceback.print_exc()
_set(job_id, status="error", error=str(e))
finally:
# 완료·실패 어느 쪽이든 임시 파일 정리 (outputs/ 의 최종본은 건드리지 않음)
_cleanup(upload_dir, base_video, remotion_out, remotion_temps)
def submit(req: GenerateRequest, photo_paths: list[Path], dry_run: bool) -> str:
"""작업을 큐에 등록하고 job_id 를 즉시 반환(논블로킹)."""
_purge_expired()
job_id = uuid.uuid4().hex[:8]
now = _now()
with _lock:
_jobs[job_id] = {
"job_id": job_id,
"status": "queued",
"stage": None,
"stage_index": 0,
"progress": 0,
"video_url": None,
"caption": None,
"profile": None,
"cost_credits": 0.0,
"error": None,
"created_at": now,
"updated_at": now,
}
_executor.submit(_run, job_id, req, photo_paths, dry_run)
return job_id
def get(job_id: str) -> dict | None:
"""작업 상태 스냅샷(복사본) 반환. 없으면 None."""
with _lock:
job = _jobs.get(job_id)
return dict(job) if job is not None else None