359 lines
13 KiB
Python
359 lines
13 KiB
Python
"""
|
|
Azure Blob Storage 업로드 유틸리티
|
|
|
|
Azure Blob Storage에 파일을 업로드하는 클래스를 제공합니다.
|
|
파일 경로 또는 바이트 데이터를 직접 업로드할 수 있습니다.
|
|
|
|
URL 경로 형식:
|
|
- 음악: {BASE_URL}/{task_id}/song/{파일명}
|
|
- 영상: {BASE_URL}/{task_id}/video/{파일명}
|
|
- 이미지: {BASE_URL}/{task_id}/image/{파일명}
|
|
|
|
사용 예시:
|
|
from app.utils.upload_blob_as_request import AzureBlobUploader
|
|
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
|
|
# 파일 경로로 업로드
|
|
success = await uploader.upload_music(file_path="my_song.mp3")
|
|
success = await uploader.upload_video(file_path="my_video.mp4")
|
|
success = await uploader.upload_image(file_path="my_image.png")
|
|
|
|
# 바이트 데이터로 직접 업로드 (media 저장 없이)
|
|
success = await uploader.upload_music_bytes(audio_bytes, "my_song") # .mp3 자동 추가
|
|
success = await uploader.upload_video_bytes(video_bytes, "my_video") # .mp4 자동 추가
|
|
success = await uploader.upload_image_bytes(image_bytes, "my_image.png")
|
|
|
|
print(uploader.public_url) # 마지막 업로드의 공개 URL
|
|
"""
|
|
|
|
from pathlib import Path
|
|
|
|
import aiofiles
|
|
import httpx
|
|
|
|
from config import azure_blob_settings
|
|
|
|
|
|
class AzureBlobUploader:
|
|
"""Azure Blob Storage 업로드 클래스
|
|
|
|
Azure Blob Storage에 음악, 영상, 이미지 파일을 업로드합니다.
|
|
URL 형식: {BASE_URL}/{task_id}/{category}/{file_name}?{SAS_TOKEN}
|
|
|
|
카테고리별 경로:
|
|
- 음악: {task_id}/song/{file_name}
|
|
- 영상: {task_id}/video/{file_name}
|
|
- 이미지: {task_id}/image/{file_name}
|
|
|
|
Attributes:
|
|
task_id: 작업 고유 식별자
|
|
"""
|
|
|
|
# Content-Type 매핑
|
|
IMAGE_CONTENT_TYPES = {
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".png": "image/png",
|
|
".gif": "image/gif",
|
|
".webp": "image/webp",
|
|
".bmp": "image/bmp",
|
|
}
|
|
|
|
def __init__(self, task_id: str):
|
|
"""AzureBlobUploader 초기화
|
|
|
|
Args:
|
|
task_id: 작업 고유 식별자
|
|
"""
|
|
self._task_id = task_id
|
|
self._base_url = azure_blob_settings.AZURE_BLOB_BASE_URL
|
|
self._sas_token = azure_blob_settings.AZURE_BLOB_SAS_TOKEN
|
|
self._last_public_url: str = ""
|
|
|
|
@property
|
|
def task_id(self) -> str:
|
|
"""작업 고유 식별자"""
|
|
return self._task_id
|
|
|
|
@property
|
|
def public_url(self) -> str:
|
|
"""마지막 업로드의 공개 URL (SAS 토큰 제외)"""
|
|
return self._last_public_url
|
|
|
|
def _build_upload_url(self, category: str, file_name: str) -> str:
|
|
"""업로드 URL 생성 (SAS 토큰 포함)"""
|
|
# SAS 토큰 앞뒤의 ?, ', " 제거
|
|
sas_token = self._sas_token.strip("?'\"")
|
|
return f"{self._base_url}/{self._task_id}/{category}/{file_name}?{sas_token}"
|
|
|
|
def _build_public_url(self, category: str, file_name: str) -> str:
|
|
"""공개 URL 생성 (SAS 토큰 제외)"""
|
|
return f"{self._base_url}/{self._task_id}/{category}/{file_name}"
|
|
|
|
async def _upload_file(
|
|
self,
|
|
file_path: str,
|
|
category: str,
|
|
content_type: str,
|
|
timeout: float,
|
|
log_prefix: str,
|
|
) -> bool:
|
|
"""파일을 Azure Blob Storage에 업로드하는 내부 메서드
|
|
|
|
Args:
|
|
file_path: 업로드할 파일 경로
|
|
category: 카테고리 (song, video, image)
|
|
content_type: Content-Type 헤더 값
|
|
timeout: 요청 타임아웃 (초)
|
|
log_prefix: 로그 접두사
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
"""
|
|
# 파일 경로에서 파일명 추출
|
|
file_name = Path(file_path).name
|
|
|
|
upload_url = self._build_upload_url(category, file_name)
|
|
self._last_public_url = self._build_public_url(category, file_name)
|
|
print(f"[{log_prefix}] Upload URL (without SAS): {self._last_public_url}")
|
|
|
|
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
|
|
|
|
async with aiofiles.open(file_path, "rb") as file:
|
|
file_content = await file.read()
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.put(
|
|
upload_url, content=file_content, headers=headers, timeout=timeout
|
|
)
|
|
|
|
if response.status_code in [200, 201]:
|
|
print(f"[{log_prefix}] Success - Status Code: {response.status_code}")
|
|
print(f"[{log_prefix}] Public URL: {self._last_public_url}")
|
|
return True
|
|
else:
|
|
print(f"[{log_prefix}] Failed - Status Code: {response.status_code}")
|
|
print(f"[{log_prefix}] Response: {response.text}")
|
|
return False
|
|
|
|
async def upload_music(self, file_path: str) -> bool:
|
|
"""음악 파일을 Azure Blob Storage에 업로드합니다.
|
|
|
|
URL 경로: {task_id}/song/{파일명}
|
|
|
|
Args:
|
|
file_path: 업로드할 파일 경로
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_music(file_path="my_song.mp3")
|
|
print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3
|
|
"""
|
|
return await self._upload_file(
|
|
file_path=file_path,
|
|
category="song",
|
|
content_type="audio/mpeg",
|
|
timeout=120.0,
|
|
log_prefix="upload_music",
|
|
)
|
|
|
|
async def upload_music_bytes(self, file_content: bytes, file_name: str) -> bool:
|
|
"""음악 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
|
|
|
|
URL 경로: {task_id}/song/{파일명}
|
|
|
|
Args:
|
|
file_content: 업로드할 파일 바이트 데이터
|
|
file_name: 저장할 파일명 (확장자가 없으면 .mp3 추가)
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_music_bytes(audio_bytes, "my_song")
|
|
print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3
|
|
"""
|
|
# 확장자가 없으면 .mp3 추가
|
|
if not Path(file_name).suffix:
|
|
file_name = f"{file_name}.mp3"
|
|
|
|
upload_url = self._build_upload_url("song", file_name)
|
|
self._last_public_url = self._build_public_url("song", file_name)
|
|
print(f"[upload_music_bytes] Upload URL (without SAS): {self._last_public_url}")
|
|
|
|
headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.put(
|
|
upload_url, content=file_content, headers=headers, timeout=120.0
|
|
)
|
|
|
|
if response.status_code in [200, 201]:
|
|
print(f"[upload_music_bytes] Success - Status Code: {response.status_code}")
|
|
print(f"[upload_music_bytes] Public URL: {self._last_public_url}")
|
|
return True
|
|
else:
|
|
print(f"[upload_music_bytes] Failed - Status Code: {response.status_code}")
|
|
print(f"[upload_music_bytes] Response: {response.text}")
|
|
return False
|
|
|
|
async def upload_video(self, file_path: str) -> bool:
|
|
"""영상 파일을 Azure Blob Storage에 업로드합니다.
|
|
|
|
URL 경로: {task_id}/video/{파일명}
|
|
|
|
Args:
|
|
file_path: 업로드할 파일 경로
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_video(file_path="my_video.mp4")
|
|
print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4
|
|
"""
|
|
return await self._upload_file(
|
|
file_path=file_path,
|
|
category="video",
|
|
content_type="video/mp4",
|
|
timeout=180.0,
|
|
log_prefix="upload_video",
|
|
)
|
|
|
|
async def upload_video_bytes(self, file_content: bytes, file_name: str) -> bool:
|
|
"""영상 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
|
|
|
|
URL 경로: {task_id}/video/{파일명}
|
|
|
|
Args:
|
|
file_content: 업로드할 파일 바이트 데이터
|
|
file_name: 저장할 파일명 (확장자가 없으면 .mp4 추가)
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_video_bytes(video_bytes, "my_video")
|
|
print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4
|
|
"""
|
|
# 확장자가 없으면 .mp4 추가
|
|
if not Path(file_name).suffix:
|
|
file_name = f"{file_name}.mp4"
|
|
|
|
upload_url = self._build_upload_url("video", file_name)
|
|
self._last_public_url = self._build_public_url("video", file_name)
|
|
print(f"[upload_video_bytes] Upload URL (without SAS): {self._last_public_url}")
|
|
|
|
headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.put(
|
|
upload_url, content=file_content, headers=headers, timeout=180.0
|
|
)
|
|
|
|
if response.status_code in [200, 201]:
|
|
print(f"[upload_video_bytes] Success - Status Code: {response.status_code}")
|
|
print(f"[upload_video_bytes] Public URL: {self._last_public_url}")
|
|
return True
|
|
else:
|
|
print(f"[upload_video_bytes] Failed - Status Code: {response.status_code}")
|
|
print(f"[upload_video_bytes] Response: {response.text}")
|
|
return False
|
|
|
|
async def upload_image(self, file_path: str) -> bool:
|
|
"""이미지 파일을 Azure Blob Storage에 업로드합니다.
|
|
|
|
URL 경로: {task_id}/image/{파일명}
|
|
|
|
Args:
|
|
file_path: 업로드할 파일 경로
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
success = await uploader.upload_image(file_path="my_image.png")
|
|
print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png
|
|
"""
|
|
extension = Path(file_path).suffix.lower()
|
|
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
|
|
|
|
return await self._upload_file(
|
|
file_path=file_path,
|
|
category="image",
|
|
content_type=content_type,
|
|
timeout=60.0,
|
|
log_prefix="upload_image",
|
|
)
|
|
|
|
async def upload_image_bytes(self, file_content: bytes, file_name: str) -> bool:
|
|
"""이미지 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
|
|
|
|
URL 경로: {task_id}/image/{파일명}
|
|
|
|
Args:
|
|
file_content: 업로드할 파일 바이트 데이터
|
|
file_name: 저장할 파일명
|
|
|
|
Returns:
|
|
bool: 업로드 성공 여부
|
|
|
|
Example:
|
|
uploader = AzureBlobUploader(task_id="task-123")
|
|
with open("my_image.png", "rb") as f:
|
|
content = f.read()
|
|
success = await uploader.upload_image_bytes(content, "my_image.png")
|
|
print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png
|
|
"""
|
|
extension = Path(file_name).suffix.lower()
|
|
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
|
|
|
|
upload_url = self._build_upload_url("image", file_name)
|
|
self._last_public_url = self._build_public_url("image", file_name)
|
|
print(f"[upload_image_bytes] Upload URL (without SAS): {self._last_public_url}")
|
|
|
|
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.put(
|
|
upload_url, content=file_content, headers=headers, timeout=60.0
|
|
)
|
|
|
|
if response.status_code in [200, 201]:
|
|
print(f"[upload_image_bytes] Success - Status Code: {response.status_code}")
|
|
print(f"[upload_image_bytes] Public URL: {self._last_public_url}")
|
|
return True
|
|
else:
|
|
print(f"[upload_image_bytes] Failed - Status Code: {response.status_code}")
|
|
print(f"[upload_image_bytes] Response: {response.text}")
|
|
return False
|
|
|
|
|
|
# 사용 예시:
|
|
# import asyncio
|
|
#
|
|
# async def main():
|
|
# uploader = AzureBlobUploader(task_id="task-123")
|
|
#
|
|
# # 음악 업로드 -> {BASE_URL}/task-123/song/my_song.mp3
|
|
# await uploader.upload_music("my_song.mp3")
|
|
# print(uploader.public_url)
|
|
#
|
|
# # 영상 업로드 -> {BASE_URL}/task-123/video/my_video.mp4
|
|
# await uploader.upload_video("my_video.mp4")
|
|
# print(uploader.public_url)
|
|
#
|
|
# # 이미지 업로드 -> {BASE_URL}/task-123/image/my_image.png
|
|
# await uploader.upload_image("my_image.png")
|
|
# print(uploader.public_url)
|
|
#
|
|
# asyncio.run(main())
|