o2o-ado2-short-form/server/app/pipeline/azure_storage.py

113 lines
3.9 KiB
Python

"""Azure Blob Storage 연동 — 완성 영상·메타데이터 업로드 + 갤러리 목록 조회.
SDK 없이 httpx 로 Azure Blob REST API 를 직접 호출한다(이미 의존성).
BASE_URL: https://account.blob.core.windows.net/container/prefix/
→ 업로드: PUT BASE_URL{blob}?{sas}
→ 목록: GET https://account.blob.core.windows.net/container
?restype=container&comp=list&prefix={prefix}&{sas}
→ 공개URL: BASE_URL{blob} (SAS 불필요, 공개 컨테이너)
"""
from __future__ import annotations
import json
import xml.etree.ElementTree as ET
from email.utils import parsedate_to_datetime
from pathlib import Path
from urllib.parse import urlparse
import httpx
from app import config as cfg
def _put_url(filename: str) -> str:
"""업로드 SAS URL — BASE_URL/{filename}?{sas_token}."""
return f"{cfg.AZURE_BLOB_BASE_URL}{filename}?{cfg.AZURE_BLOB_SAS_TOKEN}"
def _list_url() -> str:
"""컨테이너 List Blobs SAS URL (prefix 필터 포함)."""
parsed = urlparse(cfg.AZURE_BLOB_BASE_URL)
# parsed.path = /container/prefix/ → parts = ["container", "prefix/"]
path_parts = parsed.path.lstrip("/").split("/", 1)
container = path_parts[0]
prefix = path_parts[1] if len(path_parts) > 1 else ""
base = f"{parsed.scheme}://{parsed.netloc}/{container}"
return (
f"{base}?restype=container&comp=list"
f"&prefix={prefix}&{cfg.AZURE_BLOB_SAS_TOKEN}"
)
def _public_url(filename: str) -> str:
"""SAS 없는 공개 접근 URL."""
return f"{cfg.AZURE_BLOB_BASE_URL}{filename}"
# ──────────────────────────── 업로드 ────────────────────────────
def upload_mp4(job_id: str, mp4_path: Path) -> str:
"""mp4 파일을 Azure 에 업로드하고 공개 URL 을 반환한다."""
filename = f"{job_id}.mp4"
with open(mp4_path, "rb") as f:
data = f.read()
resp = httpx.put(
_put_url(filename), content=data,
headers={"x-ms-blob-type": "BlockBlob", "Content-Type": "video/mp4"},
timeout=300.0,
)
resp.raise_for_status()
return _public_url(filename)
def upload_meta(job_id: str, meta: dict) -> None:
"""메타데이터 JSON 을 Azure 에 업로드한다(캡션·프로파일 등)."""
filename = f"{job_id}.json"
data = json.dumps(meta, ensure_ascii=False).encode("utf-8")
resp = httpx.put(
_put_url(filename), content=data,
headers={"x-ms-blob-type": "BlockBlob", "Content-Type": "application/json; charset=utf-8"},
timeout=30.0,
)
resp.raise_for_status()
# ──────────────────────────── 목록 조회 ────────────────────────────
def list_videos() -> list[dict]:
"""Azure 에서 mp4 블롭 목록을 최신순으로 반환.
각 항목: {job_id, video_url(공개), created_at(Unix ts)}
"""
resp = httpx.get(_list_url(), timeout=30.0)
resp.raise_for_status()
root = ET.fromstring(resp.text)
items: list[dict] = []
for blob in root.iter("Blob"):
name_el = blob.find("Name")
if name_el is None or not (name_el.text or "").endswith(".mp4"):
continue
job_id = Path(name_el.text).stem # "prefix/abc123.mp4" → "abc123"
ts = 0.0
props = blob.find("Properties")
if props is not None:
lm = props.find("Last-Modified")
if lm is not None and lm.text:
try:
ts = parsedate_to_datetime(lm.text).timestamp()
except Exception:
pass
items.append({
"job_id": job_id,
"video_url": _public_url(f"{job_id}.mp4"),
"created_at": ts,
})
items.sort(key=lambda x: x["created_at"], reverse=True)
return items