o2o-castad-backend/app/utils/upload_blob_as_request.py

359 lines
13 KiB
Python

"""
Azure Blob Storage 업로드 유틸리티
Azure Blob Storage에 파일을 업로드하는 클래스를 제공합니다.
파일 경로 또는 바이트 데이터를 직접 업로드할 수 있습니다.
URL 경로 형식:
- 음악: {BASE_URL}/{task_id}/song/{파일명}
- 영상: {BASE_URL}/{task_id}/video/{파일명}
- 이미지: {BASE_URL}/{task_id}/image/{파일명}
사용 예시:
from app.utils.upload_blob_as_request import AzureBlobUploader
uploader = AzureBlobUploader(task_id="task-123")
# 파일 경로로 업로드
success = await uploader.upload_music(file_path="my_song.mp3")
success = await uploader.upload_video(file_path="my_video.mp4")
success = await uploader.upload_image(file_path="my_image.png")
# 바이트 데이터로 직접 업로드 (media 저장 없이)
success = await uploader.upload_music_bytes(audio_bytes, "my_song") # .mp3 자동 추가
success = await uploader.upload_video_bytes(video_bytes, "my_video") # .mp4 자동 추가
success = await uploader.upload_image_bytes(image_bytes, "my_image.png")
print(uploader.public_url) # 마지막 업로드의 공개 URL
"""
from pathlib import Path
import aiofiles
import httpx
from config import azure_blob_settings
class AzureBlobUploader:
"""Azure Blob Storage 업로드 클래스
Azure Blob Storage에 음악, 영상, 이미지 파일을 업로드합니다.
URL 형식: {BASE_URL}/{task_id}/{category}/{file_name}?{SAS_TOKEN}
카테고리별 경로:
- 음악: {task_id}/song/{file_name}
- 영상: {task_id}/video/{file_name}
- 이미지: {task_id}/image/{file_name}
Attributes:
task_id: 작업 고유 식별자
"""
# Content-Type 매핑
IMAGE_CONTENT_TYPES = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp",
}
def __init__(self, task_id: str):
"""AzureBlobUploader 초기화
Args:
task_id: 작업 고유 식별자
"""
self._task_id = task_id
self._base_url = azure_blob_settings.AZURE_BLOB_BASE_URL
self._sas_token = azure_blob_settings.AZURE_BLOB_SAS_TOKEN
self._last_public_url: str = ""
@property
def task_id(self) -> str:
"""작업 고유 식별자"""
return self._task_id
@property
def public_url(self) -> str:
"""마지막 업로드의 공개 URL (SAS 토큰 제외)"""
return self._last_public_url
def _build_upload_url(self, category: str, file_name: str) -> str:
"""업로드 URL 생성 (SAS 토큰 포함)"""
# SAS 토큰 앞뒤의 ?, ', " 제거
sas_token = self._sas_token.strip("?'\"")
return f"{self._base_url}/{self._task_id}/{category}/{file_name}?{sas_token}"
def _build_public_url(self, category: str, file_name: str) -> str:
"""공개 URL 생성 (SAS 토큰 제외)"""
return f"{self._base_url}/{self._task_id}/{category}/{file_name}"
async def _upload_file(
self,
file_path: str,
category: str,
content_type: str,
timeout: float,
log_prefix: str,
) -> bool:
"""파일을 Azure Blob Storage에 업로드하는 내부 메서드
Args:
file_path: 업로드할 파일 경로
category: 카테고리 (song, video, image)
content_type: Content-Type 헤더 값
timeout: 요청 타임아웃 (초)
log_prefix: 로그 접두사
Returns:
bool: 업로드 성공 여부
"""
# 파일 경로에서 파일명 추출
file_name = Path(file_path).name
upload_url = self._build_upload_url(category, file_name)
self._last_public_url = self._build_public_url(category, file_name)
print(f"[{log_prefix}] Upload URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
async with aiofiles.open(file_path, "rb") as file:
file_content = await file.read()
async with httpx.AsyncClient() as client:
response = await client.put(
upload_url, content=file_content, headers=headers, timeout=timeout
)
if response.status_code in [200, 201]:
print(f"[{log_prefix}] Success - Status Code: {response.status_code}")
print(f"[{log_prefix}] Public URL: {self._last_public_url}")
return True
else:
print(f"[{log_prefix}] Failed - Status Code: {response.status_code}")
print(f"[{log_prefix}] Response: {response.text}")
return False
async def upload_music(self, file_path: str) -> bool:
"""음악 파일을 Azure Blob Storage에 업로드합니다.
URL 경로: {task_id}/song/{파일명}
Args:
file_path: 업로드할 파일 경로
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_music(file_path="my_song.mp3")
print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3
"""
return await self._upload_file(
file_path=file_path,
category="song",
content_type="audio/mpeg",
timeout=120.0,
log_prefix="upload_music",
)
async def upload_music_bytes(self, file_content: bytes, file_name: str) -> bool:
"""음악 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/song/{파일명}
Args:
file_content: 업로드할 파일 바이트 데이터
file_name: 저장할 파일명 (확장자가 없으면 .mp3 추가)
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_music_bytes(audio_bytes, "my_song")
print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3
"""
# 확장자가 없으면 .mp3 추가
if not Path(file_name).suffix:
file_name = f"{file_name}.mp3"
upload_url = self._build_upload_url("song", file_name)
self._last_public_url = self._build_public_url("song", file_name)
print(f"[upload_music_bytes] Upload URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"}
async with httpx.AsyncClient() as client:
response = await client.put(
upload_url, content=file_content, headers=headers, timeout=120.0
)
if response.status_code in [200, 201]:
print(f"[upload_music_bytes] Success - Status Code: {response.status_code}")
print(f"[upload_music_bytes] Public URL: {self._last_public_url}")
return True
else:
print(f"[upload_music_bytes] Failed - Status Code: {response.status_code}")
print(f"[upload_music_bytes] Response: {response.text}")
return False
async def upload_video(self, file_path: str) -> bool:
"""영상 파일을 Azure Blob Storage에 업로드합니다.
URL 경로: {task_id}/video/{파일명}
Args:
file_path: 업로드할 파일 경로
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_video(file_path="my_video.mp4")
print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4
"""
return await self._upload_file(
file_path=file_path,
category="video",
content_type="video/mp4",
timeout=180.0,
log_prefix="upload_video",
)
async def upload_video_bytes(self, file_content: bytes, file_name: str) -> bool:
"""영상 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/video/{파일명}
Args:
file_content: 업로드할 파일 바이트 데이터
file_name: 저장할 파일명 (확장자가 없으면 .mp4 추가)
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_video_bytes(video_bytes, "my_video")
print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4
"""
# 확장자가 없으면 .mp4 추가
if not Path(file_name).suffix:
file_name = f"{file_name}.mp4"
upload_url = self._build_upload_url("video", file_name)
self._last_public_url = self._build_public_url("video", file_name)
print(f"[upload_video_bytes] Upload URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"}
async with httpx.AsyncClient() as client:
response = await client.put(
upload_url, content=file_content, headers=headers, timeout=180.0
)
if response.status_code in [200, 201]:
print(f"[upload_video_bytes] Success - Status Code: {response.status_code}")
print(f"[upload_video_bytes] Public URL: {self._last_public_url}")
return True
else:
print(f"[upload_video_bytes] Failed - Status Code: {response.status_code}")
print(f"[upload_video_bytes] Response: {response.text}")
return False
async def upload_image(self, file_path: str) -> bool:
"""이미지 파일을 Azure Blob Storage에 업로드합니다.
URL 경로: {task_id}/image/{파일명}
Args:
file_path: 업로드할 파일 경로
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_image(file_path="my_image.png")
print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png
"""
extension = Path(file_path).suffix.lower()
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
return await self._upload_file(
file_path=file_path,
category="image",
content_type=content_type,
timeout=60.0,
log_prefix="upload_image",
)
async def upload_image_bytes(self, file_content: bytes, file_name: str) -> bool:
"""이미지 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/image/{파일명}
Args:
file_content: 업로드할 파일 바이트 데이터
file_name: 저장할 파일명
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
with open("my_image.png", "rb") as f:
content = f.read()
success = await uploader.upload_image_bytes(content, "my_image.png")
print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png
"""
extension = Path(file_name).suffix.lower()
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
upload_url = self._build_upload_url("image", file_name)
self._last_public_url = self._build_public_url("image", file_name)
print(f"[upload_image_bytes] Upload URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
async with httpx.AsyncClient() as client:
response = await client.put(
upload_url, content=file_content, headers=headers, timeout=60.0
)
if response.status_code in [200, 201]:
print(f"[upload_image_bytes] Success - Status Code: {response.status_code}")
print(f"[upload_image_bytes] Public URL: {self._last_public_url}")
return True
else:
print(f"[upload_image_bytes] Failed - Status Code: {response.status_code}")
print(f"[upload_image_bytes] Response: {response.text}")
return False
# 사용 예시:
# import asyncio
#
# async def main():
# uploader = AzureBlobUploader(task_id="task-123")
#
# # 음악 업로드 -> {BASE_URL}/task-123/song/my_song.mp3
# await uploader.upload_music("my_song.mp3")
# print(uploader.public_url)
#
# # 영상 업로드 -> {BASE_URL}/task-123/video/my_video.mp4
# await uploader.upload_video("my_video.mp4")
# print(uploader.public_url)
#
# # 이미지 업로드 -> {BASE_URL}/task-123/image/my_image.png
# await uploader.upload_image("my_image.png")
# print(uploader.public_url)
#
# asyncio.run(main())