o2o-castad-backend/app/utils/upload_blob_as_request.py

469 lines
16 KiB
Python

"""
Azure Blob Storage 업로드 유틸리티
Azure Blob Storage에 파일을 업로드하는 클래스를 제공합니다.
파일 경로 또는 바이트 데이터를 직접 업로드할 수 있습니다.
URL 경로 형식:
- 음악: {BASE_URL}/{task_id}/song/{파일명}
- 영상: {BASE_URL}/{task_id}/video/{파일명}
- 이미지: {BASE_URL}/{task_id}/image/{파일명}
사용 예시:
from app.utils.upload_blob_as_request import AzureBlobUploader
uploader = AzureBlobUploader(task_id="task-123")
# 파일 경로로 업로드
success = await uploader.upload_music(file_path="my_song.mp3")
success = await uploader.upload_video(file_path="my_video.mp4")
success = await uploader.upload_image(file_path="my_image.png")
# 바이트 데이터로 직접 업로드 (media 저장 없이)
success = await uploader.upload_music_bytes(audio_bytes, "my_song")
success = await uploader.upload_video_bytes(video_bytes, "my_video")
success = await uploader.upload_image_bytes(image_bytes, "my_image.png")
print(uploader.public_url) # 마지막 업로드의 공개 URL
성능 최적화:
- HTTP 클라이언트 재사용: 모듈 레벨의 공유 클라이언트로 커넥션 풀 재사용
- 동시 업로드: 공유 클라이언트를 통해 동시 요청 처리가 개선됩니다.
"""
import asyncio
import logging
import time
from pathlib import Path
import aiofiles
import httpx
from config import azure_blob_settings
# 로거 설정
logger = logging.getLogger(__name__)
# =============================================================================
# 모듈 레벨 공유 HTTP 클라이언트 (싱글톤 패턴)
# =============================================================================
# 모듈 레벨 공유 HTTP 클라이언트 (커넥션 풀 재사용)
_shared_blob_client: httpx.AsyncClient | None = None
async def get_shared_blob_client() -> httpx.AsyncClient:
"""공유 HTTP 클라이언트를 반환합니다. 없으면 생성합니다."""
global _shared_blob_client
if _shared_blob_client is None or _shared_blob_client.is_closed:
print("[AzureBlobUploader] Creating shared HTTP client...")
_shared_blob_client = httpx.AsyncClient(
timeout=httpx.Timeout(180.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
)
print("[AzureBlobUploader] Shared HTTP client created - "
"max_connections: 20, max_keepalive: 10")
return _shared_blob_client
async def close_shared_blob_client() -> None:
"""공유 HTTP 클라이언트를 닫습니다. 앱 종료 시 호출하세요."""
global _shared_blob_client
if _shared_blob_client is not None and not _shared_blob_client.is_closed:
await _shared_blob_client.aclose()
_shared_blob_client = None
print("[AzureBlobUploader] Shared HTTP client closed")
class AzureBlobUploader:
"""Azure Blob Storage 업로드 클래스
Azure Blob Storage에 음악, 영상, 이미지 파일을 업로드합니다.
URL 형식: {BASE_URL}/{task_id}/{category}/{file_name}?{SAS_TOKEN}
카테고리별 경로:
- 음악: {task_id}/song/{file_name}
- 영상: {task_id}/video/{file_name}
- 이미지: {task_id}/image/{file_name}
Attributes:
task_id: 작업 고유 식별자
"""
# Content-Type 매핑
IMAGE_CONTENT_TYPES = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp",
}
def __init__(self, task_id: str):
"""AzureBlobUploader 초기화
Args:
task_id: 작업 고유 식별자
"""
self._task_id = task_id
self._base_url = azure_blob_settings.AZURE_BLOB_BASE_URL
self._sas_token = azure_blob_settings.AZURE_BLOB_SAS_TOKEN
self._last_public_url: str = ""
@property
def task_id(self) -> str:
"""작업 고유 식별자"""
return self._task_id
@property
def public_url(self) -> str:
"""마지막 업로드의 공개 URL (SAS 토큰 제외)"""
return self._last_public_url
def _build_upload_url(self, category: str, file_name: str) -> str:
"""업로드 URL 생성 (SAS 토큰 포함)"""
# SAS 토큰 앞뒤의 ?, ', " 제거
sas_token = self._sas_token.strip("?'\"")
return (
f"{self._base_url}/{self._task_id}/{category}/{file_name}?{sas_token}"
)
def _build_public_url(self, category: str, file_name: str) -> str:
"""공개 URL 생성 (SAS 토큰 제외)"""
return f"{self._base_url}/{self._task_id}/{category}/{file_name}"
async def _upload_bytes(
self,
file_content: bytes,
upload_url: str,
headers: dict,
timeout: float,
log_prefix: str,
) -> bool:
"""바이트 데이터를 업로드하는 공통 내부 메서드
Args:
file_content: 업로드할 바이트 데이터
upload_url: 업로드 URL
headers: HTTP 헤더
timeout: 요청 타임아웃 (초)
log_prefix: 로그 접두사
Returns:
bool: 업로드 성공 여부
"""
size = len(file_content)
start_time = time.perf_counter()
try:
logger.info(f"[{log_prefix}] Starting upload")
print(f"[{log_prefix}] Getting shared client...")
client = await get_shared_blob_client()
client_time = time.perf_counter()
elapsed_ms = (client_time - start_time) * 1000
print(f"[{log_prefix}] Client acquired in {elapsed_ms:.1f}ms")
print(f"[{log_prefix}] Starting upload... "
f"(size: {size} bytes, timeout: {timeout}s)")
response = await asyncio.wait_for(
client.put(upload_url, content=file_content, headers=headers),
timeout=timeout,
)
upload_time = time.perf_counter()
duration_ms = (upload_time - start_time) * 1000
if response.status_code in [200, 201]:
logger.info(f"[{log_prefix}] SUCCESS - Status: {response.status_code}")
print(f"[{log_prefix}] SUCCESS - Status: {response.status_code}, "
f"Duration: {duration_ms:.1f}ms")
print(f"[{log_prefix}] Public URL: {self._last_public_url}")
return True
# 업로드 실패
logger.error(f"[{log_prefix}] FAILED - Status: {response.status_code}")
print(f"[{log_prefix}] FAILED - Status: {response.status_code}, "
f"Duration: {duration_ms:.1f}ms")
print(f"[{log_prefix}] Response: {response.text[:500]}")
return False
except asyncio.TimeoutError:
elapsed = time.perf_counter() - start_time
logger.error(f"[{log_prefix}] TIMEOUT after {elapsed:.1f}s")
print(f"[{log_prefix}] TIMEOUT after {elapsed:.1f}s")
return False
except httpx.ConnectError as e:
elapsed = time.perf_counter() - start_time
logger.error(f"[{log_prefix}] CONNECT_ERROR: {e}")
print(f"[{log_prefix}] CONNECT_ERROR after {elapsed:.1f}s - "
f"{type(e).__name__}: {e}")
return False
except httpx.ReadError as e:
elapsed = time.perf_counter() - start_time
logger.error(f"[{log_prefix}] READ_ERROR: {e}")
print(f"[{log_prefix}] READ_ERROR after {elapsed:.1f}s - "
f"{type(e).__name__}: {e}")
return False
except Exception as e:
elapsed = time.perf_counter() - start_time
logger.error(f"[{log_prefix}] ERROR: {type(e).__name__}: {e}")
print(f"[{log_prefix}] ERROR after {elapsed:.1f}s - "
f"{type(e).__name__}: {e}")
return False
async def _upload_file(
self,
file_path: str,
category: str,
content_type: str,
timeout: float,
log_prefix: str,
) -> bool:
"""파일을 Azure Blob Storage에 업로드하는 내부 메서드
Args:
file_path: 업로드할 파일 경로
category: 카테고리 (song, video, image)
content_type: Content-Type 헤더 값
timeout: 요청 타임아웃 (초)
log_prefix: 로그 접두사
Returns:
bool: 업로드 성공 여부
"""
# 파일 경로에서 파일명 추출
file_name = Path(file_path).name
upload_url = self._build_upload_url(category, file_name)
self._last_public_url = self._build_public_url(category, file_name)
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
async with aiofiles.open(file_path, "rb") as file:
file_content = await file.read()
return await self._upload_bytes(
file_content=file_content,
upload_url=upload_url,
headers=headers,
timeout=timeout,
log_prefix=log_prefix,
)
async def upload_music(self, file_path: str) -> bool:
"""음악 파일을 Azure Blob Storage에 업로드합니다.
URL 경로: {task_id}/song/{파일명}
Args:
file_path: 업로드할 파일 경로
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_music(file_path="my_song.mp3")
print(uploader.public_url)
"""
return await self._upload_file(
file_path=file_path,
category="song",
content_type="audio/mpeg",
timeout=120.0,
log_prefix="upload_music",
)
async def upload_music_bytes(
self, file_content: bytes, file_name: str
) -> bool:
"""음악 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/song/{파일명}
Args:
file_content: 업로드할 파일 바이트 데이터
file_name: 저장할 파일명 (확장자가 없으면 .mp3 추가)
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_music_bytes(audio_bytes, "my_song")
print(uploader.public_url)
"""
# 확장자가 없으면 .mp3 추가
if not Path(file_name).suffix:
file_name = f"{file_name}.mp3"
upload_url = self._build_upload_url("song", file_name)
self._last_public_url = self._build_public_url("song", file_name)
log_prefix = "upload_music_bytes"
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"}
return await self._upload_bytes(
file_content=file_content,
upload_url=upload_url,
headers=headers,
timeout=120.0,
log_prefix=log_prefix,
)
async def upload_video(self, file_path: str) -> bool:
"""영상 파일을 Azure Blob Storage에 업로드합니다.
URL 경로: {task_id}/video/{파일명}
Args:
file_path: 업로드할 파일 경로
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_video(file_path="my_video.mp4")
print(uploader.public_url)
"""
return await self._upload_file(
file_path=file_path,
category="video",
content_type="video/mp4",
timeout=180.0,
log_prefix="upload_video",
)
async def upload_video_bytes(
self, file_content: bytes, file_name: str
) -> bool:
"""영상 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/video/{파일명}
Args:
file_content: 업로드할 파일 바이트 데이터
file_name: 저장할 파일명 (확장자가 없으면 .mp4 추가)
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_video_bytes(video_bytes, "my_video")
print(uploader.public_url)
"""
# 확장자가 없으면 .mp4 추가
if not Path(file_name).suffix:
file_name = f"{file_name}.mp4"
upload_url = self._build_upload_url("video", file_name)
self._last_public_url = self._build_public_url("video", file_name)
log_prefix = "upload_video_bytes"
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"}
return await self._upload_bytes(
file_content=file_content,
upload_url=upload_url,
headers=headers,
timeout=180.0,
log_prefix=log_prefix,
)
async def upload_image(self, file_path: str) -> bool:
"""이미지 파일을 Azure Blob Storage에 업로드합니다.
URL 경로: {task_id}/image/{파일명}
Args:
file_path: 업로드할 파일 경로
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_image(file_path="my_image.png")
print(uploader.public_url)
"""
extension = Path(file_path).suffix.lower()
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
return await self._upload_file(
file_path=file_path,
category="image",
content_type=content_type,
timeout=60.0,
log_prefix="upload_image",
)
async def upload_image_bytes(
self, file_content: bytes, file_name: str
) -> bool:
"""이미지 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/image/{파일명}
Args:
file_content: 업로드할 파일 바이트 데이터
file_name: 저장할 파일명
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
with open("my_image.png", "rb") as f:
content = f.read()
success = await uploader.upload_image_bytes(content, "my_image.png")
print(uploader.public_url)
"""
extension = Path(file_name).suffix.lower()
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
upload_url = self._build_upload_url("image", file_name)
self._last_public_url = self._build_public_url("image", file_name)
log_prefix = "upload_image_bytes"
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
return await self._upload_bytes(
file_content=file_content,
upload_url=upload_url,
headers=headers,
timeout=60.0,
log_prefix=log_prefix,
)
# 사용 예시:
# import asyncio
#
# async def main():
# uploader = AzureBlobUploader(task_id="task-123")
#
# # 음악 업로드 -> {BASE_URL}/task-123/song/my_song.mp3
# await uploader.upload_music("my_song.mp3")
# print(uploader.public_url)
#
# # 영상 업로드 -> {BASE_URL}/task-123/video/my_video.mp4
# await uploader.upload_video("my_video.mp4")
# print(uploader.public_url)
#
# # 이미지 업로드 -> {BASE_URL}/task-123/image/my_image.png
# await uploader.upload_image("my_image.png")
# print(uploader.public_url)
#
# asyncio.run(main())