o2o-infinith-backend/app/integrations/azure_blob.py

80 lines
2.6 KiB
Python

import logging
import re
from pathlib import Path
import httpx
from common.utils import get_env
logger = logging.getLogger(__name__)
_DEFAULT_CONTENT_TYPE = "application/octet-stream"
_CONTENT_TYPES = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp",
".mp3": "audio/mpeg",
".mp4": "video/mp4",
".mov": "video/quicktime",
".pdf": "application/pdf",
".txt": "text/plain",
".json": "application/json",
".csv": "text/csv",
}
_shared_client: httpx.AsyncClient | None = None
async def _get_client() -> httpx.AsyncClient:
global _shared_client
if _shared_client is None or _shared_client.is_closed:
_shared_client = httpx.AsyncClient(
timeout=httpx.Timeout(180.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
)
return _shared_client
def _sanitize_filename(file_name: str) -> str:
stem = Path(file_name).stem
suffix = Path(file_name).suffix
sanitized = re.sub(r"[^가-힣a-zA-Z0-9]", "", stem) or "file"
return f"{sanitized}{suffix}"
def _guess_content_type(file_name: str) -> str:
return _CONTENT_TYPES.get(Path(file_name).suffix.lower(), _DEFAULT_CONTENT_TYPE)
class AzureBlobUploader:
def __init__(self, group: str, category: str = "file"):
self._group = group
self._category = category
self._base_url = get_env("AZURE_BLOB_BASE_URL")
self._sas_token = get_env("AZURE_BLOB_SAS_TOKEN").strip("?'\"")
def _public_url(self, file_name: str) -> str:
return f"{self._base_url}/{self._group}/{self._category}/{file_name}"
def _upload_url(self, file_name: str) -> str:
return f"{self._public_url(file_name)}?{self._sas_token}"
async def upload_bytes(self, content: bytes, file_name: str, content_type: str | None = None) -> str:
file_name = _sanitize_filename(file_name)
ct = content_type or _guess_content_type(file_name)
url = self._upload_url(file_name)
headers = {"Content-Type": ct, "x-ms-blob-type": "BlockBlob"}
client = await _get_client()
resp = await client.put(url, content=content, headers=headers)
if resp.status_code not in (200, 201):
logger.error("Azure Blob upload failed status=%s body=%s", resp.status_code, resp.text[:500])
raise RuntimeError(f"Azure Blob upload failed: {resp.status_code}")
public_url = self._public_url(file_name)
logger.info("Azure Blob uploaded url=%s size=%s", public_url, len(content))
return public_url