blob 이미지 업로드 완료

insta
bluebamus 2025-12-26 15:25:04 +09:00
parent 6917a76d60
commit 12e6f7357c
8 changed files with 756 additions and 447 deletions

View File

@ -20,6 +20,7 @@ from app.home.schemas.home_schema import (
MarketingAnalysis, MarketingAnalysis,
ProcessedInfo, ProcessedInfo,
) )
from app.utils.upload_blob_as_request import AzureBlobUploader
from app.utils.chatgpt_prompt import ChatgptService from app.utils.chatgpt_prompt import ChatgptService
from app.utils.common import generate_task_id from app.utils.common import generate_task_id
from app.utils.nvMapScraper import NvMapScraper from app.utils.nvMapScraper import NvMapScraper
@ -178,10 +179,11 @@ IMAGES_JSON_EXAMPLE = """[
@router.post( @router.post(
"/image/{task_id}/upload", "/image/upload/server/{task_id}",
include_in_schema=False,
summary="이미지 업로드", summary="이미지 업로드",
description=""" description="""
task_id에 연결된 이미지를 업로드합니다. task_id에 연결된 이미지를 서버에 업로드합니다.
## 요청 방식 ## 요청 방식
multipart/form-data 형식으로 전송합니다. multipart/form-data 형식으로 전송합니다.
@ -218,12 +220,12 @@ jpg, jpeg, png, webp, heic, heif
### 2. cURL로 테스트 ### 2. cURL로 테스트
```bash ```bash
# 바이너리 파일만 업로드 # 바이너리 파일만 업로드
curl -X POST "http://localhost:8000/image/test-task-001/upload" \\ curl -X POST "http://localhost:8000/image/upload/server/test-task-001" \\
-F "files=@/path/to/image1.jpg" \\ -F "files=@/path/to/image1.jpg" \\
-F "files=@/path/to/image2.png" -F "files=@/path/to/image2.png"
# URL + 바이너리 파일 동시 업로드 # URL + 바이너리 파일 동시 업로드
curl -X POST "http://localhost:8000/image/test-task-001/upload" \\ curl -X POST "http://localhost:8000/image/upload/server/test-task-001" \\
-F 'images_json=[{"url":"https://example.com/image.jpg"}]' \\ -F 'images_json=[{"url":"https://example.com/image.jpg"}]' \\
-F "files=@/path/to/local_image.jpg" -F "files=@/path/to/local_image.jpg"
``` ```
@ -232,7 +234,7 @@ curl -X POST "http://localhost:8000/image/test-task-001/upload" \\
```python ```python
import requests import requests
url = "http://localhost:8000/image/test-task-001/upload" url = "http://localhost:8000/image/upload/server/test-task-001"
files = [ files = [
("files", ("image1.jpg", open("image1.jpg", "rb"), "image/jpeg")), ("files", ("image1.jpg", open("image1.jpg", "rb"), "image/jpeg")),
("files", ("image2.png", open("image2.png", "rb"), "image/png")), ("files", ("image2.png", open("image2.png", "rb"), "image/png")),
@ -252,7 +254,7 @@ print(response.json())
- **images**: 업로드된 이미지 목록 - **images**: 업로드된 이미지 목록
## 저장 경로 ## 저장 경로
- 바이너리 파일: /media/{날짜}/{uuid7}/{파일명} - 바이너리 파일: /media/image/{날짜}/{uuid7}/{파일명}
""", """,
response_model=ImageUploadResponse, response_model=ImageUploadResponse,
responses={ responses={
@ -285,7 +287,9 @@ async def upload_images(
if has_files and files: if has_files and files:
for idx, f in enumerate(files): for idx, f in enumerate(files):
print(f"[upload_images] file[{idx}]: filename={f.filename}, size={f.size}, content_type={f.content_type}") print(
f"[upload_images] file[{idx}]: filename={f.filename}, size={f.size}, content_type={f.content_type}"
)
if not has_images_json and not has_files: if not has_images_json and not has_files:
raise HTTPException( raise HTTPException(
@ -312,16 +316,24 @@ async def upload_images(
if has_files and files: if has_files and files:
for f in files: for f in files:
is_valid_ext = _is_valid_image_extension(f.filename) is_valid_ext = _is_valid_image_extension(f.filename)
is_not_empty = f.size is None or f.size > 0 # size가 None이면 아직 읽지 않은 것 is_not_empty = (
is_real_file = f.filename and f.filename != "filename" # Swagger 빈 파일 체크 f.size is None or f.size > 0
print(f"[upload_images] Checking file: {f.filename}, size={f.size}, is_valid_ext={is_valid_ext}, is_real_file={is_real_file}") ) # size가 None이면 아직 읽지 않은 것
is_real_file = (
f.filename and f.filename != "filename"
) # Swagger 빈 파일 체크
print(
f"[upload_images] Checking file: {f.filename}, size={f.size}, is_valid_ext={is_valid_ext}, is_real_file={is_real_file}"
)
if f and is_real_file and is_valid_ext and is_not_empty: if f and is_real_file and is_valid_ext and is_not_empty:
valid_files.append(f) valid_files.append(f)
else: else:
skipped_files.append(f.filename or "unknown") skipped_files.append(f.filename or "unknown")
print(f"[upload_images] valid_files count: {len(valid_files)}, skipped: {skipped_files}") print(
f"[upload_images] valid_files count: {len(valid_files)}, skipped: {skipped_files}"
)
# 유효한 데이터가 하나도 없으면 에러 # 유효한 데이터가 하나도 없으면 에러
if not url_images and not valid_files: if not url_images and not valid_files:
@ -358,7 +370,7 @@ async def upload_images(
) )
img_order += 1 img_order += 1
# 2. 바이너리 파일 저장 # 2. 바이너리 파일을 media에 저장
if valid_files: if valid_files:
today = date.today().strftime("%Y-%m-%d") today = date.today().strftime("%Y-%m-%d")
# 한 번의 요청에서 업로드된 모든 이미지는 같은 폴더에 저장 # 한 번의 요청에서 업로드된 모든 이미지는 같은 폴더에 저장
@ -371,22 +383,27 @@ async def upload_images(
original_name = file.filename or "image" original_name = file.filename or "image"
ext = _get_file_extension(file.filename) # type: ignore[arg-type] ext = _get_file_extension(file.filename) # type: ignore[arg-type]
# 파일명에서 확장자 제거 후 순서 추가 # 파일명에서 확장자 제거 후 순서 추가
name_without_ext = original_name.rsplit(".", 1)[0] if "." in original_name else original_name name_without_ext = (
original_name.rsplit(".", 1)[0]
if "." in original_name
else original_name
)
filename = f"{name_without_ext}_{img_order:03d}{ext}" filename = f"{name_without_ext}_{img_order:03d}{ext}"
save_path = upload_dir / filename save_path = upload_dir / filename
# 파일 저장 # media에 파일 저장
await _save_upload_file(file, save_path) await _save_upload_file(file, save_path)
# 이미지 URL 생성 # media 기준 URL 생성
img_url = f"/media/image/{today}/{batch_uuid}/{filename}" img_url = f"/media/image/{today}/{batch_uuid}/{filename}"
img_name = file.filename or filename img_name = file.filename or filename
print(f"[upload_images] File saved to media - path: {save_path}, url: {img_url}")
image = Image( image = Image(
task_id=task_id, task_id=task_id,
img_name=img_name, img_name=img_name,
img_url=img_url, img_url=img_url, # Media URL을 DB에 저장
img_order=img_order, img_order=img_order,
) )
session.add(image) session.add(image)
@ -403,7 +420,8 @@ async def upload_images(
) )
img_order += 1 img_order += 1
print(f"[upload_images] Committing {len(result_images)} images to database...") saved_count = len(result_images)
print(f"[upload_images] Committing {saved_count} images to database...")
await session.commit() await session.commit()
print("[upload_images] Commit successful!") print("[upload_images] Commit successful!")
@ -412,263 +430,216 @@ async def upload_images(
total_count=len(result_images), total_count=len(result_images),
url_count=len(url_images), url_count=len(url_images),
file_count=len(valid_files), file_count=len(valid_files),
saved_count=saved_count,
images=result_images, images=result_images,
) )
# @router.post( @router.post(
# "/generate", "/image/upload/blob/{task_id}",
# summary="기본 영상 생성 요청", summary="이미지 업로드 (Azure Blob)",
# description=""" description="""
# 고객 정보만 받아 영상 생성 작업을 시작합니다. (이미지 없음) task_id에 연결된 이미지를 Azure Blob Storage에 업로드합니다.
# ## 요청 필드 ## 요청 방식
# - **customer_name**: 고객명/가게명 (필수) multipart/form-data 형식으로 전송합니다.
# - **region**: 지역명 (필수)
# - **detail_region_info**: 상세 지역 정보 (선택)
# - **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood)
# ## 반환 정보 ## 요청 필드
# - **task_id**: 작업 고유 식별자 (UUID7) - **images_json**: 외부 이미지 URL 목록 (JSON 문자열, 선택)
# - **status**: 작업 상태 - **files**: 이미지 바이너리 파일 목록 (선택)
# - **message**: 응답 메시지
# """,
# response_model=GenerateResponse,
# response_description="생성 작업 시작 결과",
# tags=["generate"],
# )
# async def generate(
# request_body: GenerateRequest,
# background_tasks: BackgroundTasks,
# session: AsyncSession = Depends(get_session),
# ):
# """기본 영상 생성 요청 처리 (이미지 없음)"""
# # UUID7 생성 및 중복 검사
# while True:
# task_id = str(uuid7())
# existing = await session.execute(
# select(Project).where(Project.task_id == task_id)
# )
# if existing.scalar_one_or_none() is None:
# break
# # Project 생성 (이미지 없음) **주의**: images_json 또는 files 최소 하나는 반드시 전달해야 합니다.
# project = Project(
# store_name=request_body.customer_name,
# region=request_body.region,
# task_id=task_id,
# detail_region_info=json.dumps(
# {
# "detail": request_body.detail_region_info,
# "attribute": request_body.attribute.model_dump(),
# },
# ensure_ascii=False,
# ),
# )
# session.add(project)
# await session.commit()
# await session.refresh(project)
# background_tasks.add_task(task_process, request_body, task_id, project.id) ## 지원 이미지 확장자
jpg, jpeg, png, webp, heic, heif
# return { ## images_json 예시
# "task_id": task_id, ```json
# "status": "processing", [
# "message": "생성 작업이 시작되었습니다.", {"url": "https://example.com/image1.jpg"},
# } {"url": "https://example.com/image2.jpg", "name": "외관"}
]
```
## 바이너리 파일 업로드 테스트 방법
# @router.post( ### cURL로 테스트
# "/generate/urls", ```bash
# summary="URL 기반 영상 생성 요청", # 바이너리 파일만 업로드
# description=""" curl -X POST "http://localhost:8000/image/upload/blob/test-task-001" \\
# 고객 정보와 이미지 URL을 받아 영상 생성 작업을 시작합니다. -F "files=@/path/to/image1.jpg" \\
-F "files=@/path/to/image2.png"
# ## 요청 필드 # URL + 바이너리 파일 동시 업로드
# - **customer_name**: 고객명/가게명 (필수) curl -X POST "http://localhost:8000/image/upload/blob/test-task-001" \\
# - **region**: 지역명 (필수) -F 'images_json=[{"url":"https://example.com/image.jpg"}]' \\
# - **detail_region_info**: 상세 지역 정보 (선택) -F "files=@/path/to/local_image.jpg"
# - **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood) ```
# - **images**: 이미지 URL 목록 (필수)
# ## 반환 정보 ## 반환 정보
# - **task_id**: 작업 고유 식별자 (UUID7) - **task_id**: 작업 고유 식별자
# - **status**: 작업 상태 - **total_count**: 업로드된 이미지 개수
# - **message**: 응답 메시지 - **url_count**: URL로 등록된 이미지 개수
# """, - **file_count**: 파일로 업로드된 이미지 개수 (Blob에 저장됨)
# response_model=GenerateResponse, - **images**: 업로드된 이미지 목록
# response_description="생성 작업 시작 결과",
# tags=["generate"],
# )
# async def generate_urls(
# request_body: GenerateUrlsRequest,
# session: AsyncSession = Depends(get_session),
# ):
# """URL 기반 영상 생성 요청 처리"""
# # UUID7 생성 및 중복 검사
# while True:
# task_id = str(uuid7())
# existing = await session.execute(
# select(Project).where(Project.task_id == task_id)
# )
# if existing.scalar_one_or_none() is None:
# break
# # Project 생성 (이미지 정보 제외) ## 저장 경로
# project = Project( - 바이너리 파일: Azure Blob Storage ({task_id}/{파일명})
# store_name=request_body.customer_name, """,
# region=request_body.region, response_model=ImageUploadResponse,
# task_id=task_id, responses={
# detail_region_info=json.dumps( 200: {"description": "이미지 업로드 성공"},
# { 400: {"description": "이미지가 제공되지 않음", "model": ErrorResponse},
# "detail": request_body.detail_region_info, },
# "attribute": request_body.attribute.model_dump(), tags=["image"],
# }, )
# ensure_ascii=False, async def upload_images_blob(
# ), task_id: str,
# ) images_json: Optional[str] = Form(
# session.add(project) default=None,
description="외부 이미지 URL 목록 (JSON 문자열)",
example=IMAGES_JSON_EXAMPLE,
),
files: Optional[list[UploadFile]] = File(
default=None, description="이미지 바이너리 파일 목록"
),
session: AsyncSession = Depends(get_session),
) -> ImageUploadResponse:
"""이미지 업로드 (URL + Azure Blob Storage)"""
print(f"[upload_images_blob] START - task_id: {task_id}")
# # Image 레코드 생성 (독립 테이블, task_id로 연결) # 1. 진입 검증
# for idx, img_item in enumerate(request_body.images): has_images_json = images_json is not None and images_json.strip() != ""
# # name이 있으면 사용, 없으면 URL에서 추출 has_files = files is not None and len(files) > 0
# img_name = img_item.name or _extract_image_name(img_item.url, idx)
# image = Image(
# task_id=task_id,
# img_name=img_name,
# img_url=img_item.url,
# img_order=idx,
# )
# session.add(image)
# await session.commit() if not has_images_json and not has_files:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="images_json 또는 files 중 하나는 반드시 제공해야 합니다.",
)
# return { # 2. images_json 파싱
# "task_id": task_id, url_images: list[ImageUrlItem] = []
# "status": "processing", if has_images_json:
# "message": "생성 작업이 시작되었습니다.", try:
# } parsed = json.loads(images_json)
if isinstance(parsed, list):
url_images = [ImageUrlItem(**item) for item in parsed if item]
except (json.JSONDecodeError, TypeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"images_json 파싱 오류: {str(e)}",
)
# 3. 유효한 파일만 필터링
valid_files: list[UploadFile] = []
skipped_files: list[str] = []
if has_files and files:
for f in files:
is_valid_ext = _is_valid_image_extension(f.filename)
is_not_empty = f.size is None or f.size > 0
is_real_file = f.filename and f.filename != "filename"
# async def _save_upload_file(file: UploadFile, save_path: Path) -> None: if f and is_real_file and is_valid_ext and is_not_empty:
# """업로드 파일을 지정된 경로에 저장""" valid_files.append(f)
# save_path.parent.mkdir(parents=True, exist_ok=True) else:
# async with aiofiles.open(save_path, "wb") as f: skipped_files.append(f.filename or "unknown")
# content = await file.read()
# await f.write(content)
print(f"[upload_images_blob] valid_files: {len(valid_files)}, url_images: {len(url_images)}")
# def _get_file_extension(filename: str | None) -> str: if not url_images and not valid_files:
# """파일명에서 확장자 추출""" raise HTTPException(
# if not filename: status_code=status.HTTP_400_BAD_REQUEST,
# return ".jpg" detail=f"유효한 이미지가 없습니다. 지원 확장자: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}. 건너뛴 파일: {skipped_files}",
# ext = Path(filename).suffix.lower() )
# return ext if ext else ".jpg"
result_images: list[ImageUploadResultItem] = []
img_order = 0
# @router.post( # 1. URL 이미지 저장
# "/generate/upload", for url_item in url_images:
# summary="파일 업로드 기반 영상 생성 요청", img_name = url_item.name or _extract_image_name(url_item.url, img_order)
# description="""
# 고객 정보와 이미지 파일을 받아 영상 생성 작업을 시작합니다.
# ## 요청 필드 (multipart/form-data) image = Image(
# - **customer_name**: 고객명/가게명 (필수) task_id=task_id,
# - **region**: 지역명 (필수) img_name=img_name,
# - **detail_region_info**: 상세 지역 정보 (선택) img_url=url_item.url,
# - **attribute**: 음악 속성 정보 JSON 문자열 (필수) img_order=img_order,
# - **images**: 이미지 파일 목록 (필수, 복수 파일) )
session.add(image)
await session.flush()
print(f"[upload_images_blob] URL saved - id: {image.id}, img_name: {img_name}")
# ## 반환 정보 result_images.append(
# - **task_id**: 작업 고유 식별자 (UUID7) ImageUploadResultItem(
# - **status**: 작업 상태 id=image.id,
# - **message**: 응답 메시지 img_name=img_name,
# - **uploaded_count**: 업로드된 이미지 개수 img_url=url_item.url,
# """, img_order=img_order,
# response_model=GenerateUploadResponse, source="url",
# response_description="생성 작업 시작 결과", )
# tags=["generate"], )
# ) img_order += 1
# async def generate_upload(
# customer_name: str = Form(..., description="고객명/가게명"),
# region: str = Form(..., description="지역명"),
# attribute: str = Form(..., description="음악 속성 정보 (JSON 문자열)"),
# images: list[UploadFile] = File(..., description="이미지 파일 목록"),
# detail_region_info: str | None = Form(None, description="상세 지역 정보"),
# session: AsyncSession = Depends(get_session),
# ):
# """파일 업로드 기반 영상 생성 요청 처리"""
# # attribute JSON 파싱 및 검증
# try:
# attribute_dict = json.loads(attribute)
# attribute_info = AttributeInfo(**attribute_dict)
# except json.JSONDecodeError:
# raise HTTPException(
# status_code=400, detail="attribute는 유효한 JSON 형식이어야 합니다."
# )
# except Exception as e:
# raise HTTPException(status_code=400, detail=f"attribute 검증 실패: {e}")
# # 이미지 파일 검증 # 2. 바이너리 파일을 Azure Blob Storage에 직접 업로드 (media 저장 없음)
# if not images: if valid_files:
# raise HTTPException( uploader = AzureBlobUploader(task_id=task_id)
# status_code=400, detail="최소 1개 이상의 이미지 파일이 필요합니다."
# )
# # UUID7 생성 및 중복 검사 for file in valid_files:
# while True: original_name = file.filename or "image"
# task_id = str(uuid7()) ext = _get_file_extension(file.filename) # type: ignore[arg-type]
# existing = await session.execute( name_without_ext = (
# select(Project).where(Project.task_id == task_id) original_name.rsplit(".", 1)[0]
# ) if "." in original_name
# if existing.scalar_one_or_none() is None: else original_name
# break )
filename = f"{name_without_ext}_{img_order:03d}{ext}"
# # 저장 경로 생성: media/날짜/task_id/ # 파일 내용 읽기
# today = date.today().strftime("%Y%m%d") file_content = await file.read()
# upload_dir = MEDIA_ROOT / today / task_id print(f"[upload_images_blob] Uploading {filename} ({len(file_content)} bytes) to Blob...")
# # Project 생성 (이미지 정보 제외) # Azure Blob Storage에 직접 업로드
# project = Project( upload_success = await uploader.upload_image_bytes(file_content, filename)
# store_name=customer_name,
# region=region,
# task_id=task_id,
# detail_region_info=json.dumps(
# {
# "detail": detail_region_info,
# "attribute": attribute_info.model_dump(),
# },
# ensure_ascii=False,
# ),
# )
# session.add(project)
# # 이미지 파일 저장 및 Image 레코드 생성 if upload_success:
# for idx, file in enumerate(images): blob_url = uploader.public_url
# # 각 이미지에 고유 UUID7 생성 img_name = file.filename or filename
# img_uuid = str(uuid7())
# ext = _get_file_extension(file.filename)
# filename = f"{img_uuid}{ext}"
# save_path = upload_dir / filename
# # 파일 저장 image = Image(
# await _save_upload_file(file, save_path) task_id=task_id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
)
session.add(image)
await session.flush()
print(f"[upload_images_blob] Blob saved - id: {image.id}, blob_url: {blob_url}")
# # Image 레코드 생성 (독립 테이블, task_id로 연결) result_images.append(
# img_url = f"/media/{today}/{task_id}/{filename}" ImageUploadResultItem(
# image = Image( id=image.id,
# task_id=task_id, img_name=img_name,
# img_name=file.filename or filename, img_url=blob_url,
# img_url=img_url, img_order=img_order,
# img_order=idx, source="blob",
# ) )
# session.add(image) )
img_order += 1
else:
print(f"[upload_images_blob] Failed to upload {filename}")
skipped_files.append(filename)
# await session.commit() saved_count = len(result_images)
print(f"[upload_images_blob] Committing {saved_count} images...")
await session.commit()
print(f"[upload_images_blob] Done! saved_count: {saved_count}")
# return { return ImageUploadResponse(
# "task_id": task_id, task_id=task_id,
# "status": "processing", total_count=len(result_images),
# "message": "생성 작업이 시작되었습니다.", url_count=len(url_images),
# "uploaded_count": len(images), file_count=len(valid_files) - len(skipped_files),
# } saved_count=saved_count,
images=result_images,
)

View File

@ -203,7 +203,9 @@ class ImageUploadResultItem(BaseModel):
img_name: str = Field(..., description="이미지명") img_name: str = Field(..., description="이미지명")
img_url: str = Field(..., description="이미지 URL") img_url: str = Field(..., description="이미지 URL")
img_order: int = Field(..., description="이미지 순서") img_order: int = Field(..., description="이미지 순서")
source: Literal["url", "file"] = Field(..., description="이미지 소스 (url 또는 file)") source: Literal["url", "file", "blob"] = Field(
..., description="이미지 소스 (url: 외부 URL, file: 로컬 서버, blob: Azure Blob)"
)
class ImageUploadResponse(BaseModel): class ImageUploadResponse(BaseModel):
@ -213,4 +215,5 @@ class ImageUploadResponse(BaseModel):
total_count: int = Field(..., description="총 업로드된 이미지 개수") total_count: int = Field(..., description="총 업로드된 이미지 개수")
url_count: int = Field(..., description="URL로 등록된 이미지 개수") url_count: int = Field(..., description="URL로 등록된 이미지 개수")
file_count: int = Field(..., description="파일로 업로드된 이미지 개수") file_count: int = Field(..., description="파일로 업로드된 이미지 개수")
saved_count: int = Field(..., description="Image 테이블에 저장된 row 수")
images: list[ImageUploadResultItem] = Field(..., description="업로드된 이미지 목록") images: list[ImageUploadResultItem] = Field(..., description="업로드된 이미지 목록")

View File

@ -0,0 +1,62 @@
"""
Home Worker 모듈
이미지 업로드 관련 백그라운드 작업을 처리합니다.
"""
from pathlib import Path
import aiofiles
from fastapi import UploadFile
from app.utils.upload_blob_as_request import AzureBlobUploader
MEDIA_ROOT = Path("media")
async def save_upload_file(file: UploadFile, save_path: Path) -> None:
"""업로드 파일을 지정된 경로에 저장"""
save_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(save_path, "wb") as f:
content = await file.read()
await f.write(content)
async def upload_image_to_blob(
task_id: str,
file: UploadFile,
filename: str,
save_dir: Path,
) -> tuple[bool, str, str]:
"""
이미지 파일을 media에 저장하고 Azure Blob Storage에 업로드합니다.
Args:
task_id: 작업 고유 식별자
file: 업로드할 파일 객체
filename: 저장될 파일명
save_dir: media 저장 디렉토리 경로
Returns:
tuple[bool, str, str]: (업로드 성공 여부, blob_url 또는 에러 메시지, media_path)
"""
save_path = save_dir / filename
media_path = str(save_path)
try:
# 1. media에 파일 저장
await save_upload_file(file, save_path)
print(f"[upload_image_to_blob] File saved to media: {save_path}")
# 2. Azure Blob Storage에 업로드
uploader = AzureBlobUploader(task_id=task_id)
upload_success = await uploader.upload_image(file_path=str(save_path))
if upload_success:
return True, uploader.public_url, media_path
else:
return False, f"Failed to upload {filename} to Blob", media_path
except Exception as e:
print(f"[upload_image_to_blob] Error: {e}")
return False, str(e), media_path

View File

@ -1,91 +0,0 @@
import asyncio
from sqlalchemy import select
from app.database.session import get_worker_session
from app.home.schemas.home_schema import GenerateRequest
from app.lyric.models import Lyric
from app.utils.chatgpt_prompt import ChatgptService
async def _save_lyric(task_id: str, project_id: int, lyric_prompt: str) -> int:
"""Lyric 레코드를 DB에 저장 (status=processing, lyric_result=null)"""
async with get_worker_session() as session:
lyric = Lyric(
task_id=task_id,
project_id=project_id,
status="processing",
lyric_prompt=lyric_prompt,
lyric_result=None,
)
session.add(lyric)
await session.commit()
await session.refresh(lyric)
print(f"Lyric saved: id={lyric.id}, task_id={task_id}, status=processing")
return lyric.id
async def _update_lyric_status(lyric_id: int, status: str, lyric_result: str | None = None) -> None:
"""Lyric 레코드의 status와 lyric_result를 업데이트"""
async with get_worker_session() as session:
result = await session.execute(select(Lyric).where(Lyric.id == lyric_id))
lyric = result.scalar_one_or_none()
if lyric:
lyric.status = status
if lyric_result is not None:
lyric.lyric_result = lyric_result
await session.commit()
print(f"Lyric updated: id={lyric_id}, status={status}")
async def lyric_task(
task_id: str,
project_id: int,
customer_name: str,
region: str,
detail_region_info: str,
language: str = "Korean",
) -> None:
"""가사 생성 작업: ChatGPT로 가사 생성 및 Lyric 테이블 저장/업데이트"""
service = ChatgptService(
customer_name=customer_name,
region=region,
detail_region_info=detail_region_info,
language=language,
)
# Lyric 레코드 저장 (status=processing, lyric_result=null)
lyric_prompt = service.build_lyrics_prompt()
lyric_id = await _save_lyric(task_id, project_id, lyric_prompt)
# GPT 호출
result = await service.generate(prompt=lyric_prompt)
print(f"GPT Response:\n{result}")
# 결과에 ERROR가 포함되어 있으면 status를 failed로 업데이트
if "ERROR:" in result:
await _update_lyric_status(lyric_id, "failed", lyric_result=result)
else:
await _update_lyric_status(lyric_id, "completed", lyric_result=result)
async def _task_process_async(request_body: GenerateRequest, task_id: str, project_id: int) -> None:
"""백그라운드 작업 처리 (async 버전)"""
customer_name = request_body.customer_name
region = request_body.region
detail_region_info = request_body.detail_region_info or ""
language = request_body.language
print(f"customer_name: {customer_name}")
print(f"region: {region}")
print(f"detail_region_info: {detail_region_info}")
print(f"language: {language}")
# 가사 생성 작업
await lyric_task(task_id, project_id, customer_name, region, detail_region_info, language)
def task_process(request_body: GenerateRequest, task_id: str, project_id: int) -> None:
"""백그라운드 작업 처리 함수 (sync wrapper)"""
asyncio.run(_task_process_async(request_body, task_id, project_id))

View File

@ -14,6 +14,7 @@ from sqlalchemy import select
from app.database.session import AsyncSessionLocal from app.database.session import AsyncSessionLocal
from app.song.models import Song from app.song.models import Song
from app.utils.common import generate_task_id from app.utils.common import generate_task_id
from app.utils.upload_blob_as_request import AzureBlobUploader
from config import prj_settings from config import prj_settings
@ -99,3 +100,108 @@ async def download_and_save_song(
song.status = "failed" song.status = "failed"
await session.commit() await session.commit()
print(f"[download_and_save_song] FAILED - task_id: {task_id}, status updated to failed") print(f"[download_and_save_song] FAILED - task_id: {task_id}, status updated to failed")
async def download_and_upload_song_to_blob(
task_id: str,
audio_url: str,
store_name: str,
) -> None:
"""백그라운드에서 노래를 다운로드하고 Azure Blob Storage에 업로드한 뒤 Song 테이블을 업데이트합니다.
Args:
task_id: 프로젝트 task_id
audio_url: 다운로드할 오디오 URL
store_name: 저장할 파일명에 사용할 업체명
"""
print(f"[download_and_upload_song_to_blob] START - task_id: {task_id}, store_name: {store_name}")
temp_file_path: Path | None = None
try:
# 파일명에 사용할 수 없는 문자 제거
safe_store_name = "".join(
c for c in store_name if c.isalnum() or c in (" ", "_", "-")
).strip()
safe_store_name = safe_store_name or "song"
file_name = f"{safe_store_name}.mp3"
# 임시 저장 경로 생성
temp_dir = Path("media") / "temp" / task_id
temp_dir.mkdir(parents=True, exist_ok=True)
temp_file_path = temp_dir / file_name
print(f"[download_and_upload_song_to_blob] Temp directory created - path: {temp_file_path}")
# 오디오 파일 다운로드
print(f"[download_and_upload_song_to_blob] Downloading audio - task_id: {task_id}, url: {audio_url}")
async with httpx.AsyncClient() as client:
response = await client.get(audio_url, timeout=60.0)
response.raise_for_status()
async with aiofiles.open(str(temp_file_path), "wb") as f:
await f.write(response.content)
print(f"[download_and_upload_song_to_blob] File downloaded - task_id: {task_id}, path: {temp_file_path}")
# Azure Blob Storage에 업로드
uploader = AzureBlobUploader(task_id=task_id)
upload_success = await uploader.upload_music(file_path=str(temp_file_path))
if not upload_success:
raise Exception("Azure Blob Storage 업로드 실패")
# SAS 토큰이 제외된 public_url 사용
blob_url = uploader.public_url
print(f"[download_and_upload_song_to_blob] Uploaded to Blob - task_id: {task_id}, url: {blob_url}")
# Song 테이블 업데이트 (새 세션 사용)
async with AsyncSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute(
select(Song)
.where(Song.task_id == task_id)
.order_by(Song.created_at.desc())
.limit(1)
)
song = result.scalar_one_or_none()
if song:
song.status = "completed"
song.song_result_url = blob_url
await session.commit()
print(f"[download_and_upload_song_to_blob] SUCCESS - task_id: {task_id}, status: completed")
else:
print(f"[download_and_upload_song_to_blob] Song NOT FOUND in DB - task_id: {task_id}")
except Exception as e:
print(f"[download_and_upload_song_to_blob] EXCEPTION - task_id: {task_id}, error: {e}")
# 실패 시 Song 테이블 업데이트
async with AsyncSessionLocal() as session:
# 여러 개 있을 경우 가장 최근 것 선택
result = await session.execute(
select(Song)
.where(Song.task_id == task_id)
.order_by(Song.created_at.desc())
.limit(1)
)
song = result.scalar_one_or_none()
if song:
song.status = "failed"
await session.commit()
print(f"[download_and_upload_song_to_blob] FAILED - task_id: {task_id}, status updated to failed")
finally:
# 임시 파일 삭제
if temp_file_path and temp_file_path.exists():
try:
temp_file_path.unlink()
print(f"[download_and_upload_song_to_blob] Temp file deleted - path: {temp_file_path}")
except Exception as e:
print(f"[download_and_upload_song_to_blob] Failed to delete temp file: {e}")
# 임시 디렉토리 삭제 시도
temp_dir = Path("media") / "temp" / task_id
if temp_dir.exists():
try:
temp_dir.rmdir()
except Exception:
pass # 디렉토리가 비어있지 않으면 무시

View File

@ -1,7 +1,30 @@
""" """
Azure Blob Storage 업로드 유틸리티 Azure Blob Storage 업로드 유틸리티
Azure Blob Storage에 파일을 업로드하는 비동기 함수들을 제공합니다. Azure Blob Storage에 파일을 업로드하는 클래스를 제공합니다.
파일 경로 또는 바이트 데이터를 직접 업로드할 있습니다.
URL 경로 형식:
- 음악: {BASE_URL}/{task_id}/song/{파일명}
- 영상: {BASE_URL}/{task_id}/video/{파일명}
- 이미지: {BASE_URL}/{task_id}/image/{파일명}
사용 예시:
from app.utils.upload_blob_as_request import AzureBlobUploader
uploader = AzureBlobUploader(task_id="task-123")
# 파일 경로로 업로드
success = await uploader.upload_music(file_path="my_song.mp3")
success = await uploader.upload_video(file_path="my_video.mp4")
success = await uploader.upload_image(file_path="my_image.png")
# 바이트 데이터로 직접 업로드 (media 저장 없이)
success = await uploader.upload_music_bytes(audio_bytes, "my_song") # .mp3 자동 추가
success = await uploader.upload_video_bytes(video_bytes, "my_video") # .mp4 자동 추가
success = await uploader.upload_image_bytes(image_bytes, "my_image.png")
print(uploader.public_url) # 마지막 업로드의 공개 URL
""" """
from pathlib import Path from pathlib import Path
@ -9,87 +32,26 @@ from pathlib import Path
import aiofiles import aiofiles
import httpx import httpx
SAS_TOKEN = "sp=racwdl&st=2025-12-01T00:13:29Z&se=2026-07-31T08:28:29Z&spr=https&sv=2024-11-04&sr=c&sig=7fE2ozVBPu3Gq43%2FZDxEYdEcPLDXyNVfTf16IBasmVQ%3D" from config import azure_blob_settings
async def upload_music_to_azure_blob( class AzureBlobUploader:
file_path: str = "스테이 머뭄_1.mp3", """Azure Blob Storage 업로드 클래스
url: str = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp3",
) -> bool:
"""음악 파일을 Azure Blob Storage에 업로드합니다.
Args: Azure Blob Storage에 음악, 영상, 이미지 파일을 업로드합니다.
file_path: 업로드할 파일 경로 URL 형식: {BASE_URL}/{task_id}/{category}/{file_name}?{SAS_TOKEN}
url: Azure Blob Storage URL
Returns: 카테고리별 경로:
bool: 업로드 성공 여부 - 음악: {task_id}/song/{file_name}
- 영상: {task_id}/video/{file_name}
- 이미지: {task_id}/image/{file_name}
Attributes:
task_id: 작업 고유 식별자
""" """
access_url = f"{url}?{SAS_TOKEN}"
headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"}
async with aiofiles.open(file_path, "rb") as file: # Content-Type 매핑
file_content = await file.read() IMAGE_CONTENT_TYPES = {
async with httpx.AsyncClient() as client:
response = await client.put(access_url, content=file_content, headers=headers, timeout=120.0)
if response.status_code in [200, 201]:
print(f"[upload_music_to_azure_blob] Success - Status Code: {response.status_code}")
return True
else:
print(f"[upload_music_to_azure_blob] Failed - Status Code: {response.status_code}")
print(f"[upload_music_to_azure_blob] Response: {response.text}")
return False
async def upload_video_to_azure_blob(
file_path: str = "스테이 머뭄.mp4",
url: str = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp4",
) -> bool:
"""영상 파일을 Azure Blob Storage에 업로드합니다.
Args:
file_path: 업로드할 파일 경로
url: Azure Blob Storage URL
Returns:
bool: 업로드 성공 여부
"""
access_url = f"{url}?{SAS_TOKEN}"
headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"}
async with aiofiles.open(file_path, "rb") as file:
file_content = await file.read()
async with httpx.AsyncClient() as client:
response = await client.put(access_url, content=file_content, headers=headers, timeout=180.0)
if response.status_code in [200, 201]:
print(f"[upload_video_to_azure_blob] Success - Status Code: {response.status_code}")
return True
else:
print(f"[upload_video_to_azure_blob] Failed - Status Code: {response.status_code}")
print(f"[upload_video_to_azure_blob] Response: {response.text}")
return False
async def upload_image_to_azure_blob(
file_path: str = "스테이 머뭄.png",
url: str = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.png",
) -> bool:
"""이미지 파일을 Azure Blob Storage에 업로드합니다.
Args:
file_path: 업로드할 파일 경로
url: Azure Blob Storage URL
Returns:
bool: 업로드 성공 여부
"""
access_url = f"{url}?{SAS_TOKEN}"
extension = Path(file_path).suffix.lower()
content_types = {
".jpg": "image/jpeg", ".jpg": "image/jpeg",
".jpeg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".png": "image/png",
@ -97,25 +59,300 @@ async def upload_image_to_azure_blob(
".webp": "image/webp", ".webp": "image/webp",
".bmp": "image/bmp", ".bmp": "image/bmp",
} }
content_type = content_types.get(extension, "image/jpeg")
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
async with aiofiles.open(file_path, "rb") as file: def __init__(self, task_id: str):
file_content = await file.read() """AzureBlobUploader 초기화
async with httpx.AsyncClient() as client: Args:
response = await client.put(access_url, content=file_content, headers=headers, timeout=60.0) task_id: 작업 고유 식별자
"""
self._task_id = task_id
self._base_url = azure_blob_settings.AZURE_BLOB_BASE_URL
self._sas_token = azure_blob_settings.AZURE_BLOB_SAS_TOKEN
self._last_public_url: str = ""
if response.status_code in [200, 201]: @property
print(f"[upload_image_to_azure_blob] Success - Status Code: {response.status_code}") def task_id(self) -> str:
return True """작업 고유 식별자"""
else: return self._task_id
print(f"[upload_image_to_azure_blob] Failed - Status Code: {response.status_code}")
print(f"[upload_image_to_azure_blob] Response: {response.text}") @property
return False def public_url(self) -> str:
"""마지막 업로드의 공개 URL (SAS 토큰 제외)"""
return self._last_public_url
def _build_upload_url(self, category: str, file_name: str) -> str:
"""업로드 URL 생성 (SAS 토큰 포함)"""
# SAS 토큰 앞뒤의 ?, ', " 제거
sas_token = self._sas_token.strip("?'\"")
return f"{self._base_url}/{self._task_id}/{category}/{file_name}?{sas_token}"
def _build_public_url(self, category: str, file_name: str) -> str:
"""공개 URL 생성 (SAS 토큰 제외)"""
return f"{self._base_url}/{self._task_id}/{category}/{file_name}"
async def _upload_file(
self,
file_path: str,
category: str,
content_type: str,
timeout: float,
log_prefix: str,
) -> bool:
"""파일을 Azure Blob Storage에 업로드하는 내부 메서드
Args:
file_path: 업로드할 파일 경로
category: 카테고리 (song, video, image)
content_type: Content-Type 헤더
timeout: 요청 타임아웃 ()
log_prefix: 로그 접두사
Returns:
bool: 업로드 성공 여부
"""
# 파일 경로에서 파일명 추출
file_name = Path(file_path).name
upload_url = self._build_upload_url(category, file_name)
self._last_public_url = self._build_public_url(category, file_name)
print(f"[{log_prefix}] Upload URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
async with aiofiles.open(file_path, "rb") as file:
file_content = await file.read()
async with httpx.AsyncClient() as client:
response = await client.put(
upload_url, content=file_content, headers=headers, timeout=timeout
)
if response.status_code in [200, 201]:
print(f"[{log_prefix}] Success - Status Code: {response.status_code}")
print(f"[{log_prefix}] Public URL: {self._last_public_url}")
return True
else:
print(f"[{log_prefix}] Failed - Status Code: {response.status_code}")
print(f"[{log_prefix}] Response: {response.text}")
return False
async def upload_music(self, file_path: str) -> bool:
"""음악 파일을 Azure Blob Storage에 업로드합니다.
URL 경로: {task_id}/song/{파일명}
Args:
file_path: 업로드할 파일 경로
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_music(file_path="my_song.mp3")
print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3
"""
return await self._upload_file(
file_path=file_path,
category="song",
content_type="audio/mpeg",
timeout=120.0,
log_prefix="upload_music",
)
async def upload_music_bytes(self, file_content: bytes, file_name: str) -> bool:
"""음악 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/song/{파일명}
Args:
file_content: 업로드할 파일 바이트 데이터
file_name: 저장할 파일명 (확장자가 없으면 .mp3 추가)
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_music_bytes(audio_bytes, "my_song")
print(uploader.public_url) # {BASE_URL}/task-123/song/my_song.mp3
"""
# 확장자가 없으면 .mp3 추가
if not Path(file_name).suffix:
file_name = f"{file_name}.mp3"
upload_url = self._build_upload_url("song", file_name)
self._last_public_url = self._build_public_url("song", file_name)
print(f"[upload_music_bytes] Upload URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"}
async with httpx.AsyncClient() as client:
response = await client.put(
upload_url, content=file_content, headers=headers, timeout=120.0
)
if response.status_code in [200, 201]:
print(f"[upload_music_bytes] Success - Status Code: {response.status_code}")
print(f"[upload_music_bytes] Public URL: {self._last_public_url}")
return True
else:
print(f"[upload_music_bytes] Failed - Status Code: {response.status_code}")
print(f"[upload_music_bytes] Response: {response.text}")
return False
async def upload_video(self, file_path: str) -> bool:
"""영상 파일을 Azure Blob Storage에 업로드합니다.
URL 경로: {task_id}/video/{파일명}
Args:
file_path: 업로드할 파일 경로
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_video(file_path="my_video.mp4")
print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4
"""
return await self._upload_file(
file_path=file_path,
category="video",
content_type="video/mp4",
timeout=180.0,
log_prefix="upload_video",
)
async def upload_video_bytes(self, file_content: bytes, file_name: str) -> bool:
"""영상 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/video/{파일명}
Args:
file_content: 업로드할 파일 바이트 데이터
file_name: 저장할 파일명 (확장자가 없으면 .mp4 추가)
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_video_bytes(video_bytes, "my_video")
print(uploader.public_url) # {BASE_URL}/task-123/video/my_video.mp4
"""
# 확장자가 없으면 .mp4 추가
if not Path(file_name).suffix:
file_name = f"{file_name}.mp4"
upload_url = self._build_upload_url("video", file_name)
self._last_public_url = self._build_public_url("video", file_name)
print(f"[upload_video_bytes] Upload URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"}
async with httpx.AsyncClient() as client:
response = await client.put(
upload_url, content=file_content, headers=headers, timeout=180.0
)
if response.status_code in [200, 201]:
print(f"[upload_video_bytes] Success - Status Code: {response.status_code}")
print(f"[upload_video_bytes] Public URL: {self._last_public_url}")
return True
else:
print(f"[upload_video_bytes] Failed - Status Code: {response.status_code}")
print(f"[upload_video_bytes] Response: {response.text}")
return False
async def upload_image(self, file_path: str) -> bool:
"""이미지 파일을 Azure Blob Storage에 업로드합니다.
URL 경로: {task_id}/image/{파일명}
Args:
file_path: 업로드할 파일 경로
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
success = await uploader.upload_image(file_path="my_image.png")
print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png
"""
extension = Path(file_path).suffix.lower()
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
return await self._upload_file(
file_path=file_path,
category="image",
content_type=content_type,
timeout=60.0,
log_prefix="upload_image",
)
async def upload_image_bytes(self, file_content: bytes, file_name: str) -> bool:
"""이미지 바이트 데이터를 Azure Blob Storage에 직접 업로드합니다.
URL 경로: {task_id}/image/{파일명}
Args:
file_content: 업로드할 파일 바이트 데이터
file_name: 저장할 파일명
Returns:
bool: 업로드 성공 여부
Example:
uploader = AzureBlobUploader(task_id="task-123")
with open("my_image.png", "rb") as f:
content = f.read()
success = await uploader.upload_image_bytes(content, "my_image.png")
print(uploader.public_url) # {BASE_URL}/task-123/image/my_image.png
"""
extension = Path(file_name).suffix.lower()
content_type = self.IMAGE_CONTENT_TYPES.get(extension, "image/jpeg")
upload_url = self._build_upload_url("image", file_name)
self._last_public_url = self._build_public_url("image", file_name)
print(f"[upload_image_bytes] Upload URL (without SAS): {self._last_public_url}")
headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
async with httpx.AsyncClient() as client:
response = await client.put(
upload_url, content=file_content, headers=headers, timeout=60.0
)
if response.status_code in [200, 201]:
print(f"[upload_image_bytes] Success - Status Code: {response.status_code}")
print(f"[upload_image_bytes] Public URL: {self._last_public_url}")
return True
else:
print(f"[upload_image_bytes] Failed - Status Code: {response.status_code}")
print(f"[upload_image_bytes] Response: {response.text}")
return False
# 사용 예시: # 사용 예시:
# import asyncio # import asyncio
# asyncio.run(upload_video_to_azure_blob()) #
# asyncio.run(upload_image_to_azure_blob()) # async def main():
# uploader = AzureBlobUploader(task_id="task-123")
#
# # 음악 업로드 -> {BASE_URL}/task-123/song/my_song.mp3
# await uploader.upload_music("my_song.mp3")
# print(uploader.public_url)
#
# # 영상 업로드 -> {BASE_URL}/task-123/video/my_video.mp4
# await uploader.upload_video("my_video.mp4")
# print(uploader.public_url)
#
# # 이미지 업로드 -> {BASE_URL}/task-123/image/my_image.png
# await uploader.upload_image("my_image.png")
# print(uploader.public_url)
#
# asyncio.run(main())

View File

@ -127,6 +127,21 @@ class CrawlerSettings(BaseSettings):
model_config = _base_config model_config = _base_config
class AzureBlobSettings(BaseSettings):
"""Azure Blob Storage 설정"""
AZURE_BLOB_SAS_TOKEN: str = Field(
default="",
description="Azure Blob Storage SAS 토큰",
)
AZURE_BLOB_BASE_URL: str = Field(
default="https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original",
description="Azure Blob Storage 기본 URL",
)
model_config = _base_config
prj_settings = ProjectSettings() prj_settings = ProjectSettings()
apikey_settings = APIKeySettings() apikey_settings = APIKeySettings()
db_settings = DatabaseSettings() db_settings = DatabaseSettings()
@ -134,3 +149,4 @@ security_settings = SecuritySettings()
notification_settings = NotificationSettings() notification_settings = NotificationSettings()
cors_settings = CORSSettings() cors_settings = CORSSettings()
crawler_settings = CrawlerSettings() crawler_settings = CrawlerSettings()
azure_blob_settings = AzureBlobSettings()

View File

@ -1,14 +1,18 @@
import requests
from pathlib import Path from pathlib import Path
import requests
SAS_TOKEN = "sp=racwdl&st=2025-12-01T00:13:29Z&se=2026-07-31T08:28:29Z&spr=https&sv=2024-11-04&sr=c&sig=7fE2ozVBPu3Gq43%2FZDxEYdEcPLDXyNVfTf16IBasmVQ%3D" SAS_TOKEN = "sp=racwdl&st=2025-12-01T00:13:29Z&se=2026-07-31T08:28:29Z&spr=https&sv=2024-11-04&sr=c&sig=7fE2ozVBPu3Gq43%2FZDxEYdEcPLDXyNVfTf16IBasmVQ%3D"
def upload_music_to_azure_blob(file_path = "스테이 머뭄_1.mp3", url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp3"): URL = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/"
def upload_music_to_azure_blob(
file_path="스테이 머뭄_1.mp3",
url="https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp3",
):
access_url = f"{url}?{SAS_TOKEN}" access_url = f"{url}?{SAS_TOKEN}"
headers = { headers = {"Content-Type": "audio/mpeg", "x-ms-blob-type": "BlockBlob"}
"Content-Type": "audio/mpeg",
"x-ms-blob-type": "BlockBlob"
}
with open(file_path, "rb") as file: with open(file_path, "rb") as file:
response = requests.put(access_url, data=file, headers=headers) response = requests.put(access_url, data=file, headers=headers)
if response.status_code in [200, 201]: if response.status_code in [200, 201]:
@ -17,12 +21,13 @@ def upload_music_to_azure_blob(file_path = "스테이 머뭄_1.mp3", url = "http
print(f"Failed Status Code: {response.status_code}") print(f"Failed Status Code: {response.status_code}")
print(f"Response: {response.text}") print(f"Response: {response.text}")
def upload_video_to_azure_blob(file_path = "스테이 머뭄.mp4", url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp4"):
def upload_video_to_azure_blob(
file_path="스테이 머뭄.mp4",
url="https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.mp4",
):
access_url = f"{url}?{SAS_TOKEN}" access_url = f"{url}?{SAS_TOKEN}"
headers = { headers = {"Content-Type": "video/mp4", "x-ms-blob-type": "BlockBlob"}
"Content-Type": "video/mp4",
"x-ms-blob-type": "BlockBlob"
}
with open(file_path, "rb") as file: with open(file_path, "rb") as file:
response = requests.put(access_url, data=file, headers=headers) response = requests.put(access_url, data=file, headers=headers)
@ -33,7 +38,10 @@ def upload_video_to_azure_blob(file_path = "스테이 머뭄.mp4", url = "https:
print(f"Response: {response.text}") print(f"Response: {response.text}")
def upload_image_to_azure_blob(file_path = "스테이 머뭄.png", url = "https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.png"): def upload_image_to_azure_blob(
file_path="스테이 머뭄.png",
url="https://ado2mediastoragepublic.blob.core.windows.net/ado2-media-public-access/ado2-media-original/dev-user-idx/dev-task-idx/test_my_mp3_should_now_exist.png",
):
access_url = f"{url}?{SAS_TOKEN}" access_url = f"{url}?{SAS_TOKEN}"
extension = Path(file_path).suffix.lower() extension = Path(file_path).suffix.lower()
content_types = { content_types = {
@ -42,13 +50,10 @@ def upload_image_to_azure_blob(file_path = "스테이 머뭄.png", url = "https:
".png": "image/png", ".png": "image/png",
".gif": "image/gif", ".gif": "image/gif",
".webp": "image/webp", ".webp": "image/webp",
".bmp": "image/bmp" ".bmp": "image/bmp",
} }
content_type = content_types.get(extension, "image/jpeg") content_type = content_types.get(extension, "image/jpeg")
headers = { headers = {"Content-Type": content_type, "x-ms-blob-type": "BlockBlob"}
"Content-Type": content_type,
"x-ms-blob-type": "BlockBlob"
}
with open(file_path, "rb") as file: with open(file_path, "rb") as file:
response = requests.put(access_url, data=file, headers=headers) response = requests.put(access_url, data=file, headers=headers)