80 lines
2.6 KiB
Python
80 lines
2.6 KiB
Python
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from common.utils import get_env
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DEFAULT_CONTENT_TYPE = "application/octet-stream"
|
|
_CONTENT_TYPES = {
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".png": "image/png",
|
|
".gif": "image/gif",
|
|
".webp": "image/webp",
|
|
".bmp": "image/bmp",
|
|
".mp3": "audio/mpeg",
|
|
".mp4": "video/mp4",
|
|
".mov": "video/quicktime",
|
|
".pdf": "application/pdf",
|
|
".txt": "text/plain",
|
|
".json": "application/json",
|
|
".csv": "text/csv",
|
|
}
|
|
|
|
_shared_client: httpx.AsyncClient | None = None
|
|
|
|
|
|
async def _get_client() -> httpx.AsyncClient:
|
|
global _shared_client
|
|
if _shared_client is None or _shared_client.is_closed:
|
|
_shared_client = httpx.AsyncClient(
|
|
timeout=httpx.Timeout(180.0, connect=10.0),
|
|
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
|
|
)
|
|
return _shared_client
|
|
|
|
|
|
def _sanitize_filename(file_name: str) -> str:
|
|
stem = Path(file_name).stem
|
|
suffix = Path(file_name).suffix
|
|
sanitized = re.sub(r"[^가-힣a-zA-Z0-9]", "", stem) or "file"
|
|
return f"{sanitized}{suffix}"
|
|
|
|
|
|
def _guess_content_type(file_name: str) -> str:
|
|
return _CONTENT_TYPES.get(Path(file_name).suffix.lower(), _DEFAULT_CONTENT_TYPE)
|
|
|
|
|
|
class AzureBlobUploader:
|
|
def __init__(self, group: str, category: str = "file"):
|
|
self._group = group
|
|
self._category = category
|
|
self._base_url = get_env("AZURE_BLOB_BASE_URL")
|
|
self._sas_token = get_env("AZURE_BLOB_SAS_TOKEN").strip("?'\"")
|
|
|
|
def _public_url(self, file_name: str) -> str:
|
|
return f"{self._base_url}/{self._group}/{self._category}/{file_name}"
|
|
|
|
def _upload_url(self, file_name: str) -> str:
|
|
return f"{self._public_url(file_name)}?{self._sas_token}"
|
|
|
|
async def upload_bytes(self, content: bytes, file_name: str, content_type: str | None = None) -> str:
|
|
file_name = _sanitize_filename(file_name)
|
|
ct = content_type or _guess_content_type(file_name)
|
|
url = self._upload_url(file_name)
|
|
headers = {"Content-Type": ct, "x-ms-blob-type": "BlockBlob"}
|
|
|
|
client = await _get_client()
|
|
resp = await client.put(url, content=content, headers=headers)
|
|
if resp.status_code not in (200, 201):
|
|
logger.error("Azure Blob upload failed status=%s body=%s", resp.status_code, resp.text[:500])
|
|
raise RuntimeError(f"Azure Blob upload failed: {resp.status_code}")
|
|
|
|
public_url = self._public_url(file_name)
|
|
logger.info("Azure Blob uploaded url=%s size=%s", public_url, len(content))
|
|
return public_url
|