import logging import re from pathlib import Path import httpx from common.utils import get_env logger = logging.getLogger(__name__) _DEFAULT_CONTENT_TYPE = "application/octet-stream" _CONTENT_TYPES = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".bmp": "image/bmp", ".mp3": "audio/mpeg", ".mp4": "video/mp4", ".mov": "video/quicktime", ".pdf": "application/pdf", ".txt": "text/plain", ".json": "application/json", ".csv": "text/csv", } _shared_client: httpx.AsyncClient | None = None async def _get_client() -> httpx.AsyncClient: global _shared_client if _shared_client is None or _shared_client.is_closed: _shared_client = httpx.AsyncClient( timeout=httpx.Timeout(180.0, connect=10.0), limits=httpx.Limits(max_keepalive_connections=10, max_connections=20), ) return _shared_client def _sanitize_filename(file_name: str) -> str: stem = Path(file_name).stem suffix = Path(file_name).suffix sanitized = re.sub(r"[^가-힣a-zA-Z0-9]", "", stem) or "file" return f"{sanitized}{suffix}" def _guess_content_type(file_name: str) -> str: return _CONTENT_TYPES.get(Path(file_name).suffix.lower(), _DEFAULT_CONTENT_TYPE) class AzureBlobUploader: def __init__(self, group: str, category: str = "file"): self._group = group self._category = category self._base_url = get_env("AZURE_BLOB_BASE_URL") self._sas_token = get_env("AZURE_BLOB_SAS_TOKEN").strip("?'\"") def _public_url(self, file_name: str) -> str: return f"{self._base_url}/{self._group}/{self._category}/{file_name}" def _upload_url(self, file_name: str) -> str: return f"{self._public_url(file_name)}?{self._sas_token}" async def upload_bytes(self, content: bytes, file_name: str, content_type: str | None = None) -> str: file_name = _sanitize_filename(file_name) ct = content_type or _guess_content_type(file_name) url = self._upload_url(file_name) headers = {"Content-Type": ct, "x-ms-blob-type": "BlockBlob"} client = await _get_client() resp = await client.put(url, content=content, headers=headers) if resp.status_code not in (200, 201): logger.error("Azure Blob upload failed status=%s body=%s", resp.status_code, resp.text[:500]) raise RuntimeError(f"Azure Blob upload failed: {resp.status_code}") public_url = self._public_url(file_name) logger.info("Azure Blob uploaded url=%s size=%s", public_url, len(content)) return public_url