469 lines
16 KiB
Python
469 lines
16 KiB
Python
"""
|
|
Azure Blob Storage 업로드 유틸리티
|
|
|
|
Azure Blob Storage에 파일을 업로드하는 클래스를 제공합니다.
|
|
파일 경로 또는 바이트 데이터를 직접 업로드할 수 있습니다.
|
|
|
|
URL 경로 형식:
|
|
- 음악: {BASE_URL}/{task_id}/song/{파일명}
|
|
- 영상: {BASE_URL}/{task_id}/video/{파일명}
|
|
- 이미지: {BASE_URL}/{task_id}/image/{파일명}
|
|
|
|
사용 예시:
|
|
from app.utils.upload_blob_as_request import AzureBlobUploader
|
|
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
|
|
# 파일 경로로 업로드
|
|
success = await uploader.upload_music(file_path="my_song.mp3")
|
|
success = await uploader.upload_video(file_path="my_video.mp4")
|
|
success = await uploader.upload_image(file_path="my_image.png")
|
|
|
|
# 바이트 데이터로 직접 업로드 (media 저장 없이)
|
|
success = await uploader.upload_music_bytes(audio_bytes, "my_song")
|
|
success = await uploader.upload_video_bytes(video_bytes, "my_video")
|
|
success = await uploader.upload_image_bytes(image_bytes, "my_image.png")
|
|
|
|
print(uploader.public_url) # 마지막 업로드의 공개 URL
|
|
|
|
성능 최적화:
|
|
- HTTP 클라이언트 재사용: 모듈 레벨의 공유 클라이언트로 커넥션 풀 재사용
|
|
- 동시 업로드: 공유 클라이언트를 통해 동시 요청 처리가 개선됩니다.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import aiofiles
|
|
import httpx
|
|
|
|
from config import azure_blob_settings
|
|
|
|
# 로거 설정
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# =============================================================================
|
|
# 모듈 레벨 공유 HTTP 클라이언트 (싱글톤 패턴)
|
|
# =============================================================================
|
|
|
|
# 모듈 레벨 공유 HTTP 클라이언트 (커넥션 풀 재사용)
|
|
_shared_blob_client: httpx.AsyncClient | None = None
|
|
|
|
|
|
async def get_shared_blob_client() -> httpx.AsyncClient:
|
|
"""공유 HTTP 클라이언트를 반환합니다. 없으면 생성합니다."""
|
|
global _shared_blob_client
|
|
if _shared_blob_client is None or _shared_blob_client.is_closed:
|
|
print("[AzureBlobUploader] Creating shared HTTP client...")
|
|
_shared_blob_client = httpx.AsyncClient(
|
|
timeout=httpx.Timeout(180.0, connect=10.0),
|
|
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
|
|
)
|
|
print("[AzureBlobUploader] Shared HTTP client created - "
|
|
"max_connections: 20, max_keepalive: 10")
|
|
return _shared_blob_client
|
|
|
|
|
|
async def close_shared_blob_client() -> None:
|
|
"""공유 HTTP 클라이언트를 닫습니다. 앱 종료 시 호출하세요."""
|
|
global _shared_blob_client
|
|
if _shared_blob_client is not None and not _shared_blob_client.is_closed:
|
|
await _shared_blob_client.aclose()
|
|
_shared_blob_client = None
|
|
print("[AzureBlobUploader] Shared HTTP client closed")
|
|
|
|
|
|
class AzureBlobUploader:
|
|
"""Azure Blob Storage 업로드 클래스
|
|
|
|
Azure Blob Storage에 음악, 영상, 이미지 파일을 업로드합니다.
|
|
URL 형식: {BASE_URL}/{task_id}/{category}/{file_name}?{SAS_TOKEN}
|
|
|
|
카테고리별 경로:
|
|
- 음악: {task_id}/song/{file_name}
|
|
- 영상: {task_id}/video/{file_name}
|
|
- 이미지: {task_id}/image/{file_name}
|
|
|
|
Attributes:
|
|
task_id: 작업 고유 식별자
|
|
"""
|
|
|
|
# Content-Type 매핑
|
|
IMAGE_CONTENT_TYPES = {
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".png": "image/png",
|
|
".gif": "image/gif",
|
|
".webp": "image/webp",
|
|
".bmp": "image/bmp",
|
|
}
|
|
|
|
def __init__(self, task_id: str):
|
|
"""AzureBlobUploader 초기화
|
|
|
|
Args:
|
|
task_id: 작업 고유 식별자
|
|
"""
|
|
self._task_id = task_id
|
|
self._base_url = azure_blob_settings.AZURE_BLOB_BASE_URL
|
|
self._sas_token = azure_blob_settings.AZURE_BLOB_SAS_TOKEN
|
|
self._last_public_url: str = ""
|
|
|
|
@property
|
|
def task_id(self) -> str:
|
|
"""작업 고유 식별자"""
|
|
return self._task_id
|
|
|
|
@property
|
|
def public_url(self) -> str:
|
|
"""마지막 업로드의 공개 URL (SAS 토큰 제외)"""
|
|
return self._last_public_url
|
|
|
|
def _build_upload_url(self, category: str, file_name: str) -> str:
|
|
"""업로드 URL 생성 (SAS 토큰 포함)"""
|
|
# SAS 토큰 앞뒤의 ?, ', " 제거
|
|
sas_token = self._sas_token.strip("?'\"")
|
|
return (
|
|
f"{self._base_url}/{self._task_id}/{category}/{file_name}?{sas_token}"
|
|
)
|
|
|
|
def _build_public_url(self, category: str, file_name: str) -> str:
|
|
"""공개 URL 생성 (SAS 토큰 제외)"""
|
|
return f"{self._base_url}/{self._task_id}/{category}/{file_name}"
|
|
|
|
async def _upload_bytes(
|
|
self,
|
|
file_content: bytes,
|
|
upload_url: str,
|
|
headers: dict,
|
|
timeout: float,
|
|
log_prefix: str,
|
|
) -> bool:
|
|
"""바이트 데이터를 업로드하는 공통 내부 메서드
|
|
|
|
Args:
|
|
file_content: 업로드할 바이트 데이터
|
|
upload_url: 업로드 URL
|
|
headers: HTTP 헤더
|
|
timeout: 요청 타임아웃 (초)
|
|
log_prefix: 로그 접두사
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
"""
|
|
size = len(file_content)
|
|
start_time = time.perf_counter()
|
|
|
|
try:
|
|
logger.info(f"[{log_prefix}] Starting upload")
|
|
print(f"[{log_prefix}] Getting shared client...")
|
|
|
|
client = await get_shared_blob_client()
|
|
client_time = time.perf_counter()
|
|
elapsed_ms = (client_time - start_time) * 1000
|
|
print(f"[{log_prefix}] Client acquired in {elapsed_ms:.1f}ms")
|
|
|
|
print(f"[{log_prefix}] Starting upload... "
|
|
f"(size: {size} bytes, timeout: {timeout}s)")
|
|
|
|
response = await asyncio.wait_for(
|
|
client.put(upload_url, content=file_content, headers=headers),
|
|
timeout=timeout,
|
|
)
|
|
upload_time = time.perf_counter()
|
|
duration_ms = (upload_time - start_time) * 1000
|
|
|
|
if response.status_code in [200, 201]:
|
|
logger.info(f"[{log_prefix}] SUCCESS - Status: {response.status_code}")
|
|
print(f"[{log_prefix}] SUCCESS - Status: {response.status_code}, "
|
|
f"Duration: {duration_ms:.1f}ms")
|
|
print(f"[{log_prefix}] Public URL: {self._last_public_url}")
|
|
return True
|
|
|
|
# 업로드 실패
|
|
logger.error(f"[{log_prefix}] FAILED - Status: {response.status_code}")
|
|
print(f"[{log_prefix}] FAILED - Status: {response.status_code}, "
|
|
f"Duration: {duration_ms:.1f}ms")
|
|
print(f"[{log_prefix}] Response: {response.text[:500]}")
|
|
return False
|
|
|
|
except asyncio.TimeoutError:
|
|
elapsed = time.perf_counter() - start_time
|
|
logger.error(f"[{log_prefix}] TIMEOUT after {elapsed:.1f}s")
|
|
print(f"[{log_prefix}] TIMEOUT after {elapsed:.1f}s")
|
|
return False
|
|
|
|
except httpx.ConnectError as e:
|
|
elapsed = time.perf_counter() - start_time
|
|
logger.error(f"[{log_prefix}] CONNECT_ERROR: {e}")
|
|
print(f"[{log_prefix}] CONNECT_ERROR after {elapsed:.1f}s - "
|
|
f"{type(e).__name__}: {e}")
|
|
return False
|
|
|
|
except httpx.ReadError as e:
|
|
elapsed = time.perf_counter() - start_time
|
|
logger.error(f"[{log_prefix}] READ_ERROR: {e}")
|
|
print(f"[{log_prefix}] READ_ERROR after {elapsed:.1f}s - "
|
|
f"{type(e).__name__}: {e}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
elapsed = time.perf_counter() - start_time
|
|
logger.error(f"[{log_prefix}] ERROR: {type(e).__name__}: {e}")
|
|
print(f"[{log_prefix}] ERROR after {elapsed:.1f}s - "
|
|
f"{type(e).__name__}: {e}")
|
|
return False
|
|
|
|
async def _upload_file(
|
|
self,
|
|
file_path: str,
|
|
category: str,
|
|
content_type: str,
|
|
timeout: float,
|
|
log_prefix: str,
|
|
) -> bool:
|
|
"""파일을 Azure Blob Storage에 업로드하는 내부 메서드
|
|
|
|
Args:
|
|
file_path: 업로드할 파일 경로
|
|
category: 카테고리 (song, video, image)
|
|
content_type: Content-Type 헤더 값
|
|
timeout: 요청 타임아웃 (초)
|
|
log_prefix: 로그 접두사
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
"""
|
|
# 파일 경로에서 파일명 추출
|
|
file_name = Path(file_path).name
|
|
|
|
upload_url = self._build_upload_url(category, file_name)
|
|
self._last_public_url = self._build_public_url(category, file_name)
|
|
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
|
|
|
|
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
|
|
|
|
async with aiofiles.open(file_path, "rb") as file:
|
|
file_content = await file.read()
|
|
|
|
return await self._upload_bytes(
|
|
file_content=file_content,
|
|
upload_url=upload_url,
|
|
headers=headers,
|
|
timeout=timeout,
|
|
log_prefix=log_prefix,
|
|
)
|
|
|
|
async def upload_music(self, file_path: str) -> bool:
|
|
"""음악 파일을 Azure Blob Storage에 업로드합니다.
|
|
|
|
URL 경로: {task_id}/song/{파일명}
|
|
|
|
Args:
|
|
file_path: 업로드할 파일 경로
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_music(file_path="my_song.mp3")
|
|
print(uploader.public_url)
|
|
"""
|
|
return await self._upload_file(
|
|
file_path=file_path,
|
|
category="song",
|
|
content_type="audio/mpeg",
|
|
timeout=120.0,
|
|
log_prefix="upload_music",
|
|
)
|
|
|
|
async def upload_music_bytes(
|
|
self, file_content: bytes, file_name: str
|
|
) -> bool:
|
|
"""음악 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
|
|
|
|
URL 경로: {task_id}/song/{파일명}
|
|
|
|
Args:
|
|
file_content: 업로드할 파일 바이트 데이터
|
|
file_name: 저장할 파일명 (확장자가 없으면 .mp3 추가)
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_music_bytes(audio_bytes, "my_song")
|
|
print(uploader.public_url)
|
|
"""
|
|
# 확장자가 없으면 .mp3 추가
|
|
if not Path(file_name).suffix:
|
|
file_name = f"{file_name}.mp3"
|
|
|
|
upload_url = self._build_upload_url("song", file_name)
|
|
self._last_public_url = self._build_public_url("song", file_name)
|
|
log_prefix = "upload_music_bytes"
|
|
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
|
|
|
|
headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"}
|
|
|
|
return await self._upload_bytes(
|
|
file_content=file_content,
|
|
upload_url=upload_url,
|
|
headers=headers,
|
|
timeout=120.0,
|
|
log_prefix=log_prefix,
|
|
)
|
|
|
|
async def upload_video(self, file_path: str) -> bool:
|
|
"""영상 파일을 Azure Blob Storage에 업로드합니다.
|
|
|
|
URL 경로: {task_id}/video/{파일명}
|
|
|
|
Args:
|
|
file_path: 업로드할 파일 경로
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_video(file_path="my_video.mp4")
|
|
print(uploader.public_url)
|
|
"""
|
|
return await self._upload_file(
|
|
file_path=file_path,
|
|
category="video",
|
|
content_type="video/mp4",
|
|
timeout=180.0,
|
|
log_prefix="upload_video",
|
|
)
|
|
|
|
async def upload_video_bytes(
|
|
self, file_content: bytes, file_name: str
|
|
) -> bool:
|
|
"""영상 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
|
|
|
|
URL 경로: {task_id}/video/{파일명}
|
|
|
|
Args:
|
|
file_content: 업로드할 파일 바이트 데이터
|
|
file_name: 저장할 파일명 (확장자가 없으면 .mp4 추가)
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_video_bytes(video_bytes, "my_video")
|
|
print(uploader.public_url)
|
|
"""
|
|
# 확장자가 없으면 .mp4 추가
|
|
if not Path(file_name).suffix:
|
|
file_name = f"{file_name}.mp4"
|
|
|
|
upload_url = self._build_upload_url("video", file_name)
|
|
self._last_public_url = self._build_public_url("video", file_name)
|
|
log_prefix = "upload_video_bytes"
|
|
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
|
|
|
|
headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"}
|
|
|
|
return await self._upload_bytes(
|
|
file_content=file_content,
|
|
upload_url=upload_url,
|
|
headers=headers,
|
|
timeout=180.0,
|
|
log_prefix=log_prefix,
|
|
)
|
|
|
|
async def upload_image(self, file_path: str) -> bool:
|
|
"""이미지 파일을 Azure Blob Storage에 업로드합니다.
|
|
|
|
URL 경로: {task_id}/image/{파일명}
|
|
|
|
Args:
|
|
file_path: 업로드할 파일 경로
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_image(file_path="my_image.png")
|
|
print(uploader.public_url)
|
|
"""
|
|
extension = Path(file_path).suffix.lower()
|
|
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
|
|
|
|
return await self._upload_file(
|
|
file_path=file_path,
|
|
category="image",
|
|
content_type=content_type,
|
|
timeout=60.0,
|
|
log_prefix="upload_image",
|
|
)
|
|
|
|
async def upload_image_bytes(
|
|
self, file_content: bytes, file_name: str
|
|
) -> bool:
|
|
"""이미지 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
|
|
|
|
URL 경로: {task_id}/image/{파일명}
|
|
|
|
Args:
|
|
file_content: 업로드할 파일 바이트 데이터
|
|
file_name: 저장할 파일명
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
with open("my_image.png", "rb") as f:
|
|
content = f.read()
|
|
success = await uploader.upload_image_bytes(content, "my_image.png")
|
|
print(uploader.public_url)
|
|
"""
|
|
extension = Path(file_name).suffix.lower()
|
|
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
|
|
|
|
upload_url = self._build_upload_url("image", file_name)
|
|
self._last_public_url = self._build_public_url("image", file_name)
|
|
log_prefix = "upload_image_bytes"
|
|
print(f"[{log_prefix}] URL (without SAS): {self._last_public_url}")
|
|
|
|
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
|
|
|
|
return await self._upload_bytes(
|
|
file_content=file_content,
|
|
upload_url=upload_url,
|
|
headers=headers,
|
|
timeout=60.0,
|
|
log_prefix=log_prefix,
|
|
)
|
|
|
|
|
|
# 사용 예시:
|
|
# import asyncio
|
|
#
|
|
# async def main():
|
|
# uploader = AzureBlobUploader(task_id="task-123")
|
|
#
|
|
# # 음악 업로드 -> {BASE_URL}/task-123/song/my_song.mp3
|
|
# await uploader.upload_music("my_song.mp3")
|
|
# print(uploader.public_url)
|
|
#
|
|
# # 영상 업로드 -> {BASE_URL}/task-123/video/my_video.mp4
|
|
# await uploader.upload_video("my_video.mp4")
|
|
# print(uploader.public_url)
|
|
#
|
|
# # 이미지 업로드 -> {BASE_URL}/task-123/image/my_image.png
|
|
# await uploader.upload_image("my_image.png")
|
|
# print(uploader.public_url)
|
|
#
|
|
# asyncio.run(main())
|