import aiohttp from pathlib import Path class BlobUploader(): root_url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/" image_content_types = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".bmp": "image/bmp" } # SAS TOKEN 입력으로 저장 def __init__(self, sas_token): self.sas_token = sas_token # 음악 파일 업로드 (user_idx/task_idx/music_file_name 경로에 저장), 해당 업로드 링크 출력 / 실패 시 None async def upload_music_to_azure_blob(self, file_data : bytes, user_idx : str, task_idx : str, music_file_name : str) -> str: url = f"{self.root_url}/{user_idx}/{task_idx}/{music_file_name}" access_url = f"{url}?{self.sas_token}" headers = { "Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob" } async with aiohttp.ClientSession() as session: async with session.put(access_url, data=file_data, headers=headers) as resp: if resp.status in [200, 201]: return url else: text = await resp.text() print(f"Failed Status Code: {resp.status}") print(f"Response: {text}") return None # 영상 파일 업로드 (user_idx/task_idx/video_file_name 경로에 저장), 해당 업로드 링크 출력 / 실패 시 None async def upload_video_to_azure_blob(self, file_data : bytes, user_idx : str, task_idx : str, video_file_name : str) -> str: url = f"{self.root_url}/{user_idx}/{task_idx}/{video_file_name}" access_url = f"{url}?{self.sas_token}" headers = { "Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob" } async with aiohttp.ClientSession() as session: async with session.put(access_url, data=file_data, headers=headers) as resp: if resp.status in [200, 201]: return url else: text = await resp.text() print(f"Failed Status Code: {resp.status}") print(f"Response: {text}") return None # 이미지 파일 업로드 (user_idx/task_idx/image_file_name 경로에 저장), 해당 업로드 링크 출력 / 실패 시 None async def upload_image_to_azure_blob(self, file_data : bytes, user_idx : str, task_idx : str, image_file_name : str) -> str: url = f"{self.root_url}/{user_idx}/{task_idx}/{image_file_name}" access_url = f"{url}?{self.sas_token}" extension = Path(image_file_name).suffix.lower() content_type = self.image_content_types.get(extension, "image/jpeg") headers = { "Content-Type": content_type, "x-ms-blob-type": "BlockBlob" } async with aiohttp.ClientSession() as session: async with session.put(access_url, data=file_data, headers=headers) as resp: if resp.status in [200, 201]: return url else: text = await resp.text() print(f"Failed Status Code: {resp.status}") print(f"Response: {text}") return None