o2o-castad-backend/app/home/api/routers/v1/home.py

825 lines
33 KiB
Python

import json
import logging
import traceback
from datetime import date
from pathlib import Path
from typing import Optional
from urllib.parse import unquote, urlparse
import aiofiles
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session, AsyncSessionLocal
from app.home.models import Image
from app.home.schemas.home_schema import (
CrawlingRequest,
CrawlingResponse,
ErrorResponse,
ImageUploadResponse,
ImageUploadResultItem,
ImageUrlItem,
MarketingAnalysis,
ProcessedInfo,
)
from app.utils.upload_blob_as_request import AzureBlobUploader
from app.utils.chatgpt_prompt import ChatgptService
from app.utils.common import generate_task_id
from app.utils.nvMapScraper import NvMapScraper, GraphQLException
from app.utils.prompts.prompts import marketing_prompt
# 로거 설정
logger = logging.getLogger(__name__)
MEDIA_ROOT = Path("media")
# 전국 시 이름 목록 (roadAddress에서 region 추출용)
# fmt: off
KOREAN_CITIES = [
# 특별시/광역시
"서울시", "부산시", "대구시", "인천시", "광주시", "대전시", "울산시", "세종시",
# 경기도
"수원시", "성남시", "고양시", "용인시", "부천시", "안산시", "안양시", "남양주시",
"화성시", "평택시", "의정부시", "시흥시", "파주시", "광명시", "김포시", "군포시",
"광주시", "이천시", "양주시", "오산시", "구리시", "안성시", "포천시", "의왕시",
"하남시", "여주시", "동두천시", "과천시",
# 강원도
"춘천시", "원주시", "강릉시", "동해시", "태백시", "속초시", "삼척시",
# 충청북도
"청주시", "충주시", "제천시",
# 충청남도
"천안시", "공주시", "보령시", "아산시", "서산시", "논산시", "계룡시", "당진시",
# 전라북도
"전주시", "군산시", "익산시", "정읍시", "남원시", "김제시",
# 전라남도
"목포시", "여수시", "순천시", "나주시", "광양시",
# 경상북도
"포항시", "경주시", "김천시", "안동시", "구미시", "영주시", "영천시", "상주시", "문경시", "경산시",
# 경상남도
"창원시", "진주시", "통영시", "사천시", "김해시", "밀양시", "거제시", "양산시",
# 제주도
"제주시", "서귀포시",
]
# fmt: on
router = APIRouter()
def _extract_region_from_address(road_address: str | None) -> str:
"""roadAddress에서 시 이름 추출"""
if not road_address:
return ""
for city in KOREAN_CITIES:
if city in road_address:
return city
return ""
@router.post(
"/crawling",
summary="네이버 지도 크롤링",
description="""
네이버 지도 장소 URL을 입력받아 이미지 목록과 기본 정보를 크롤링합니다.
## 요청 필드
- **url**: 네이버 지도 장소 URL (필수)
## 반환 정보
- **image_list**: 장소 이미지 URL 목록
- **image_count**: 이미지 개수
- **processed_info**: 가공된 장소 정보 (customer_name, region, detail_region_info)
""",
response_model=CrawlingResponse,
response_description="크롤링 결과",
responses={
200: {"description": "크롤링 성공", "model": CrawlingResponse},
400: {
"description": "잘못된 URL",
"model": ErrorResponse,
},
502: {
"description": "크롤링 실패",
"model": ErrorResponse,
},
},
tags=["crawling"],
)
async def crawling(request_body: CrawlingRequest):
"""네이버 지도 장소 크롤링"""
import time
request_start = time.perf_counter()
logger.info(f"[crawling] START - url: {request_body.url[:80]}...")
print(f"[crawling] ========== START ==========")
print(f"[crawling] URL: {request_body.url[:80]}...")
# ========== Step 1: 네이버 지도 크롤링 ==========
step1_start = time.perf_counter()
print(f"[crawling] Step 1: 네이버 지도 크롤링 시작...")
try:
scraper = NvMapScraper(request_body.url)
await scraper.scrap()
except GraphQLException as e:
step1_elapsed = (time.perf_counter() - step1_start) * 1000
logger.error(f"[crawling] Step 1 FAILED - GraphQL 크롤링 실패: {e} ({step1_elapsed:.1f}ms)")
print(f"[crawling] Step 1 FAILED - {e} ({step1_elapsed:.1f}ms)")
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"네이버 지도 크롤링에 실패했습니다: {e}",
)
except Exception as e:
step1_elapsed = (time.perf_counter() - step1_start) * 1000
logger.error(f"[crawling] Step 1 FAILED - 크롤링 중 예기치 않은 오류: {e} ({step1_elapsed:.1f}ms)")
print(f"[crawling] Step 1 FAILED - {e} ({step1_elapsed:.1f}ms)")
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="네이버 지도 크롤링 중 오류가 발생했습니다.",
)
step1_elapsed = (time.perf_counter() - step1_start) * 1000
image_count = len(scraper.image_link_list) if scraper.image_link_list else 0
logger.info(f"[crawling] Step 1 완료 - 이미지 {image_count}개 ({step1_elapsed:.1f}ms)")
print(f"[crawling] Step 1 완료 - 이미지 {image_count}개 ({step1_elapsed:.1f}ms)")
# ========== Step 2: 정보 가공 ==========
step2_start = time.perf_counter()
print(f"[crawling] Step 2: 정보 가공 시작...")
processed_info = None
marketing_analysis = None
if scraper.base_info:
road_address = scraper.base_info.get("roadAddress", "")
customer_name = scraper.base_info.get("name", "")
region = _extract_region_from_address(road_address)
processed_info = ProcessedInfo(
customer_name=customer_name,
region=region,
detail_region_info=road_address or "",
)
step2_elapsed = (time.perf_counter() - step2_start) * 1000
logger.info(f"[crawling] Step 2 완료 - {customer_name}, {region} ({step2_elapsed:.1f}ms)")
print(f"[crawling] Step 2 완료 - {customer_name}, {region} ({step2_elapsed:.1f}ms)")
# ========== Step 3: ChatGPT 마케팅 분석 ==========
step3_start = time.perf_counter()
print(f"[crawling] Step 3: ChatGPT 마케팅 분석 시작...")
try:
# Step 3-1: ChatGPT 서비스 초기화
step3_1_start = time.perf_counter()
chatgpt_service = ChatgptService()
step3_1_elapsed = (time.perf_counter() - step3_1_start) * 1000
print(f"[crawling] Step 3-1: 서비스 초기화 완료 ({step3_1_elapsed:.1f}ms)")
# Step 3-2: 프롬프트 생성
# step3_2_start = time.perf_counter()
input_marketing_data = {
"customer_name" : customer_name,
"region" : region,
"detail_region_info" : road_address or ""
}
# prompt = chatgpt_service.build_market_analysis_prompt()
# prompt1 = marketing_prompt.build_prompt(input_marketing_data)
# step3_2_elapsed = (time.perf_counter() - step3_2_start) * 1000
# print(f"[crawling] Step 3-2: 프롬프트 생성 완료 - ({step3_2_elapsed:.1f}ms)")
# Step 3-3: GPT API 호출
step3_3_start = time.perf_counter()
structured_report = await chatgpt_service.generate_structured_output(marketing_prompt, input_marketing_data)
step3_3_elapsed = (time.perf_counter() - step3_3_start) * 1000
logger.info(f"[crawling] Step 3-3: GPT API 호출 완료 - ({step3_3_elapsed:.1f}ms)")
print(f"[crawling] Step 3-3: GPT API 호출 완료 - ({step3_3_elapsed:.1f}ms)")
# Step 3-4: 응답 파싱 (크롤링에서 가져온 facility_info 전달)
step3_4_start = time.perf_counter()
print(f"[crawling] Step 3-4: 응답 파싱 시작 - facility_info: {scraper.facility_info}")
# 요약 Deprecated / 20250115 / Selling points를 첫 prompt에서 추출 중
# parsed = await chatgpt_service.parse_marketing_analysis(
# structured_report, facility_info=scraper.facility_info
# )
# marketing_analysis = MarketingAnalysis(**parsed)
marketing_analysis = MarketingAnalysis(
report=structured_report["report"],
tags=structured_report["tags"],
facilities = list([sp['keywords'] for sp in structured_report["selling_points"]])# [json.dumps(structured_report["selling_points"])] # 나중에 Selling Points로 변수와 데이터구조 변경할 것
)
# Selling Points 구조
# print(sp['category'])
# print(sp['keywords'])
# print(sp['description'])
step3_4_elapsed = (time.perf_counter() - step3_4_start) * 1000
print(f"[crawling] Step 3-4: 응답 파싱 완료 ({step3_4_elapsed:.1f}ms)")
step3_elapsed = (time.perf_counter() - step3_start) * 1000
logger.info(f"[crawling] Step 3 완료 - 마케팅 분석 성공 ({step3_elapsed:.1f}ms)")
print(f"[crawling] Step 3 완료 - 마케팅 분석 성공 ({step3_elapsed:.1f}ms)")
except Exception as e:
step3_elapsed = (time.perf_counter() - step3_start) * 1000
logger.error(f"[crawling] Step 3 FAILED - GPT 마케팅 분석 중 오류: {e} ({step3_elapsed:.1f}ms)")
print(f"[crawling] Step 3 FAILED - {e} ({step3_elapsed:.1f}ms)")
traceback.print_exc()
# GPT 실패 시에도 크롤링 결과는 반환
marketing_analysis = None
else:
step2_elapsed = (time.perf_counter() - step2_start) * 1000
logger.warning(f"[crawling] Step 2 - base_info 없음 ({step2_elapsed:.1f}ms)")
print(f"[crawling] Step 2 - base_info 없음, 마케팅 분석 스킵 ({step2_elapsed:.1f}ms)")
# ========== 완료 ==========
total_elapsed = (time.perf_counter() - request_start) * 1000
logger.info(f"[crawling] COMPLETE - 총 소요시간: {total_elapsed:.1f}ms")
print(f"[crawling] ========== COMPLETE ==========")
print(f"[crawling] 총 소요시간: {total_elapsed:.1f}ms")
print(f"[crawling] - Step 1 (크롤링): {step1_elapsed:.1f}ms")
if scraper.base_info:
print(f"[crawling] - Step 2 (정보가공): {step2_elapsed:.1f}ms")
if 'step3_elapsed' in locals():
print(f"[crawling] - Step 3 (GPT 분석): {step3_elapsed:.1f}ms")
if 'step3_3_elapsed' in locals():
print(f"[crawling] - GPT API 호출: {step3_3_elapsed:.1f}ms")
return {
"image_list": scraper.image_link_list,
"image_count": len(scraper.image_link_list) if scraper.image_link_list else 0,
"processed_info": processed_info,
"marketing_analysis": marketing_analysis
}
def _extract_image_name(url: str, index: int) -> str:
"""URL에서 이미지 이름 추출 또는 기본 이름 생성"""
try:
path = urlparse(url).path
filename = path.split("/")[-1] if path else ""
if filename:
return unquote(filename)
except Exception:
pass
return f"image_{index + 1:03d}"
ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".heif"}
def _is_valid_image_extension(filename: str | None) -> bool:
"""파일명의 확장자가 유효한 이미지 확장자인지 확인"""
if not filename:
return False
ext = Path(filename).suffix.lower()
return ext in ALLOWED_IMAGE_EXTENSIONS
def _get_file_extension(filename: str) -> str:
"""파일명에서 확장자 추출 (소문자)"""
return Path(filename).suffix.lower()
async def _save_upload_file(file: UploadFile, save_path: Path) -> None:
"""업로드 파일을 지정된 경로에 저장"""
save_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(save_path, "wb") as f:
content = await file.read()
await f.write(content)
IMAGES_JSON_EXAMPLE = """[
{"url": "https://naverbooking-phinf.pstatic.net/20240514_189/1715688030436xT14o_JPEG/1.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_48/1715688030574wTtQd_JPEG/2.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_92/17156880307484bvpH_JPEG/3.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_7/1715688031000y8Y5q_JPEG/4.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_259/17156880311809wCnY_JPEG/5.jpg", "name": "외관"}
]"""
@router.post(
"/image/upload/server",
include_in_schema=False,
summary="이미지 업로드 (로컬 서버)",
description="""
이미지를 로컬 서버(media 폴더)에 업로드하고 새로운 task_id를 생성합니다.
## 요청 방식
multipart/form-data 형식으로 전송합니다.
## 요청 필드
- **images_json**: 외부 이미지 URL 목록 (JSON 문자열, 선택)
- **files**: 이미지 바이너리 파일 목록 (선택)
**주의**: images_json 또는 files 중 최소 하나는 반드시 전달해야 합니다.
## 지원 이미지 확장자
jpg, jpeg, png, webp, heic, heif
## images_json 예시
```json
[
{"url": "https://naverbooking-phinf.pstatic.net/20240514_189/1715688030436xT14o_JPEG/1.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_48/1715688030574wTtQd_JPEG/2.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_92/17156880307484bvpH_JPEG/3.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_7/1715688031000y8Y5q_JPEG/4.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_259/17156880311809wCnY_JPEG/5.jpg", "name": "외관"}
]
```
## 바이너리 파일 업로드 테스트 방법
### 1. Swagger UI에서 테스트
1. 이 엔드포인트의 "Try it out" 버튼 클릭
2. task_id 입력 (예: test-task-001)
3. files 항목에서 "Add item" 클릭하여 로컬 이미지 파일 선택
4. (선택) images_json에 URL 목록 JSON 입력
5. "Execute" 버튼 클릭
### 2. cURL로 테스트
```bash
# 바이너리 파일만 업로드
curl -X POST "http://localhost:8000/image/upload/server/test-task-001" \\
-F "files=@/path/to/image1.jpg" \\
-F "files=@/path/to/image2.png"
# URL + 바이너리 파일 동시 업로드
curl -X POST "http://localhost:8000/image/upload/server/test-task-001" \\
-F 'images_json=[{"url":"https://example.com/image.jpg"}]' \\
-F "files=@/path/to/local_image.jpg"
```
### 3. Python requests로 테스트
```python
import requests
url = "http://localhost:8000/image/upload/server/test-task-001"
files = [
("files", ("image1.jpg", open("image1.jpg", "rb"), "image/jpeg")),
("files", ("image2.png", open("image2.png", "rb"), "image/png")),
]
data = {
"images_json": '[{"url": "https://example.com/image.jpg"}]'
}
response = requests.post(url, files=files, data=data)
print(response.json())
```
## 반환 정보
- **task_id**: 작업 고유 식별자
- **total_count**: 총 업로드된 이미지 개수
- **url_count**: URL로 등록된 이미지 개수 (Image 테이블에 외부 URL 그대로 저장)
- **file_count**: 파일로 업로드된 이미지 개수 (media 폴더에 저장)
- **saved_count**: Image 테이블에 저장된 row 수
- **images**: 업로드된 이미지 목록
- **source**: "url" (외부 URL) 또는 "file" (로컬 서버 저장)
## 저장 경로
- 바이너리 파일: /media/image/{날짜}/{uuid7}/{파일명}
- URL 이미지: 외부 URL 그대로 Image 테이블에 저장
## 반환 정보
- **task_id**: 새로 생성된 작업 고유 식별자
- **image_urls**: Image 테이블에 저장된 현재 task_id의 이미지 URL 목록
""",
response_model=ImageUploadResponse,
responses={
200: {"description": "이미지 업로드 성공"},
400: {"description": "이미지가 제공되지 않음", "model": ErrorResponse},
},
tags=["image"],
)
async def upload_images(
images_json: Optional[str] = Form(
default=None,
description="외부 이미지 URL 목록 (JSON 문자열)",
examples=[IMAGES_JSON_EXAMPLE],
),
files: Optional[list[UploadFile]] = File(
default=None, description="이미지 바이너리 파일 목록"
),
session: AsyncSession = Depends(get_session),
) -> ImageUploadResponse:
"""이미지 업로드 (URL + 바이너리 파일)"""
# task_id 생성
task_id = await generate_task_id()
# 1. 진입 검증: images_json 또는 files 중 하나는 반드시 있어야 함
has_images_json = images_json is not None and images_json.strip() != ""
has_files = files is not None and len(files) > 0
if not has_images_json and not has_files:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="images_json 또는 files 중 하나는 반드시 제공해야 합니다.",
)
# 2. images_json 파싱 (있는 경우만)
url_images: list[ImageUrlItem] = []
if has_images_json:
try:
parsed = json.loads(images_json)
if isinstance(parsed, list):
url_images = [ImageUrlItem(**item) for item in parsed if item]
except (json.JSONDecodeError, TypeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"images_json 파싱 오류: {str(e)}",
)
# 3. 유효한 파일만 필터링 (빈 파일, 유효한 이미지 확장자가 아닌 경우 제외)
valid_files: list[UploadFile] = []
skipped_files: list[str] = []
if has_files and files:
for f in files:
is_valid_ext = _is_valid_image_extension(f.filename)
is_not_empty = (
f.size is None or f.size > 0
) # size가 None이면 아직 읽지 않은 것
is_real_file = (
f.filename and f.filename != "filename"
) # Swagger 빈 파일 체크
if f and is_real_file and is_valid_ext and is_not_empty:
valid_files.append(f)
else:
skipped_files.append(f.filename or "unknown")
# 유효한 데이터가 하나도 없으면 에러
if not url_images and not valid_files:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"유효한 이미지가 없습니다. 지원 확장자: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}. 건너뛴 파일: {skipped_files}",
)
result_images: list[ImageUploadResultItem] = []
img_order = 0
# 1. URL 이미지 저장
for url_item in url_images:
img_name = url_item.name or _extract_image_name(url_item.url, img_order)
image = Image(
task_id=task_id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
)
session.add(image)
await session.flush() # ID 생성을 위해 flush
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
source="url",
)
)
img_order += 1
# 2. 바이너리 파일을 media에 저장
if valid_files:
today = date.today().strftime("%Y-%m-%d")
# 한 번의 요청에서 업로드된 모든 이미지는 같은 폴더에 저장
batch_uuid = await generate_task_id()
upload_dir = MEDIA_ROOT / "image" / today / batch_uuid
upload_dir.mkdir(parents=True, exist_ok=True)
for file in valid_files:
# 파일명: 원본 파일명 사용 (중복 방지를 위해 순서 추가)
original_name = file.filename or "image"
ext = _get_file_extension(file.filename) # type: ignore[arg-type]
# 파일명에서 확장자 제거 후 순서 추가
name_without_ext = (
original_name.rsplit(".", 1)[0]
if "." in original_name
else original_name
)
filename = f"{name_without_ext}_{img_order:03d}{ext}"
save_path = upload_dir / filename
# media에 파일 저장
await _save_upload_file(file, save_path)
# media 기준 URL 생성
img_url = f"/media/image/{today}/{batch_uuid}/{filename}"
img_name = file.filename or filename
image = Image(
task_id=task_id,
img_name=img_name,
img_url=img_url, # Media URL을 DB에 저장
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=img_url,
img_order=img_order,
source="file",
)
)
img_order += 1
saved_count = len(result_images)
await session.commit()
# Image 테이블에서 현재 task_id의 이미지 URL 목록 조회
image_urls = [img.img_url for img in result_images]
return ImageUploadResponse(
task_id=task_id,
total_count=len(result_images),
url_count=len(url_images),
file_count=len(valid_files),
saved_count=saved_count,
images=result_images,
image_urls=image_urls,
)
@router.post(
"/image/upload/blob",
summary="이미지 업로드 (Azure Blob Storage)",
description="""
이미지를 Azure Blob Storage에 업로드하고 새로운 task_id를 생성합니다.
바이너리 파일은 로컬 서버에 저장하지 않고 Azure Blob에 직접 업로드됩니다.
## 요청 방식
multipart/form-data 형식으로 전송합니다.
## 요청 필드
- **images_json**: 외부 이미지 URL 목록 (JSON 문자열, 선택)
- **files**: 이미지 바이너리 파일 목록 (선택)
**주의**: images_json 또는 files 중 최소 하나는 반드시 전달해야 합니다.
## 지원 이미지 확장자
jpg, jpeg, png, webp, heic, heif
## images_json 예시
```json
[
{"url": "https://example.com/image1.jpg"},
{"url": "https://example.com/image2.jpg", "name": "외관"}
]
```
## 바이너리 파일 업로드 테스트 방법
### cURL로 테스트
```bash
# 바이너리 파일만 업로드
curl -X POST "http://localhost:8000/image/upload/blob" \\
-F "files=@/path/to/image1.jpg" \\
-F "files=@/path/to/image2.png"
# URL + 바이너리 파일 동시 업로드
curl -X POST "http://localhost:8000/image/upload/blob" \\
-F 'images_json=[{"url":"https://example.com/image.jpg"}]' \\
-F "files=@/path/to/local_image.jpg"
```
## 반환 정보
- **task_id**: 새로 생성된 작업 고유 식별자
- **total_count**: 총 업로드된 이미지 개수
- **url_count**: URL로 등록된 이미지 개수 (Image 테이블에 외부 URL 그대로 저장)
- **file_count**: 파일로 업로드된 이미지 개수 (Azure Blob Storage에 저장)
- **saved_count**: Image 테이블에 저장된 row 수
- **images**: 업로드된 이미지 목록
- **source**: "url" (외부 URL) 또는 "blob" (Azure Blob Storage)
- **image_urls**: Image 테이블에 저장된 현재 task_id의 이미지 URL 목록
## 저장 경로
- 바이너리 파일: Azure Blob Storage ({BASE_URL}/{task_id}/image/{파일명})
- URL 이미지: 외부 URL 그대로 Image 테이블에 저장
""",
response_model=ImageUploadResponse,
responses={
200: {"description": "이미지 업로드 성공"},
400: {"description": "이미지가 제공되지 않음", "model": ErrorResponse},
},
tags=["image"],
)
async def upload_images_blob(
images_json: Optional[str] = Form(
default=None,
description="외부 이미지 URL 목록 (JSON 문자열)",
examples=[IMAGES_JSON_EXAMPLE],
),
files: Optional[list[UploadFile]] = File(
default=None, description="이미지 바이너리 파일 목록"
),
) -> ImageUploadResponse:
"""이미지 업로드 (URL + Azure Blob Storage)
3단계로 분리하여 세션 점유 시간 최소화:
- Stage 1: 입력 검증 및 파일 데이터 준비 (세션 없음)
- Stage 2: Azure Blob 업로드 (세션 없음)
- Stage 3: DB 저장 (새 세션으로 빠르게 처리)
"""
import time
request_start = time.perf_counter()
# task_id 생성
task_id = await generate_task_id()
print(f"[upload_images_blob] START - task_id: {task_id}")
# ========== Stage 1: 입력 검증 및 파일 데이터 준비 (세션 없음) ==========
has_images_json = images_json is not None and images_json.strip() != ""
has_files = files is not None and len(files) > 0
if not has_images_json and not has_files:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="images_json 또는 files 중 하나는 반드시 제공해야 합니다.",
)
# images_json 파싱
url_images: list[ImageUrlItem] = []
if has_images_json and images_json:
try:
parsed = json.loads(images_json)
if isinstance(parsed, list):
url_images = [ImageUrlItem(**item) for item in parsed if item]
except (json.JSONDecodeError, TypeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"images_json 파싱 오류: {str(e)}",
)
# 유효한 파일만 필터링 및 파일 내용 미리 읽기
valid_files_data: list[tuple[str, str, bytes]] = [] # (original_name, ext, content)
skipped_files: list[str] = []
if has_files and files:
for f in files:
is_valid_ext = _is_valid_image_extension(f.filename)
is_not_empty = f.size is None or f.size > 0
is_real_file = f.filename and f.filename != "filename"
if f and is_real_file and is_valid_ext and is_not_empty:
# 파일 내용을 미리 읽어둠
content = await f.read()
ext = _get_file_extension(f.filename) # type: ignore[arg-type]
valid_files_data.append((f.filename or "image", ext, content))
else:
skipped_files.append(f.filename or "unknown")
if not url_images and not valid_files_data:
detail = (
f"유효한 이미지가 없습니다. "
f"지원 확장자: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}. "
f"건너뛴 파일: {skipped_files}"
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=detail,
)
stage1_time = time.perf_counter()
print(f"[upload_images_blob] Stage 1 done - urls: {len(url_images)}, "
f"files: {len(valid_files_data)}, "
f"elapsed: {(stage1_time - request_start)*1000:.1f}ms")
# ========== Stage 2: Azure Blob 업로드 (세션 없음) ==========
# 업로드 결과를 저장할 리스트 (나중에 DB에 저장)
blob_upload_results: list[tuple[str, str]] = [] # (img_name, blob_url)
img_order = len(url_images) # URL 이미지 다음 순서부터 시작
if valid_files_data:
uploader = AzureBlobUploader(task_id=task_id)
total_files = len(valid_files_data)
for idx, (original_name, ext, file_content) in enumerate(valid_files_data):
name_without_ext = (
original_name.rsplit(".", 1)[0]
if "." in original_name
else original_name
)
filename = f"{name_without_ext}_{img_order:03d}{ext}"
print(f"[upload_images_blob] Uploading file {idx+1}/{total_files}: "
f"{filename} ({len(file_content)} bytes)")
# Azure Blob Storage에 직접 업로드
upload_success = await uploader.upload_image_bytes(file_content, filename)
if upload_success:
blob_url = uploader.public_url
blob_upload_results.append((original_name, blob_url))
img_order += 1
print(f"[upload_images_blob] File {idx+1}/{total_files} SUCCESS")
else:
skipped_files.append(filename)
print(f"[upload_images_blob] File {idx+1}/{total_files} FAILED")
stage2_time = time.perf_counter()
print(f"[upload_images_blob] Stage 2 done - blob uploads: "
f"{len(blob_upload_results)}, skipped: {len(skipped_files)}, "
f"elapsed: {(stage2_time - stage1_time)*1000:.1f}ms")
# ========== Stage 3: DB 저장 (새 세션으로 빠르게 처리) ==========
print("[upload_images_blob] Stage 3 starting - DB save...")
result_images: list[ImageUploadResultItem] = []
img_order = 0
try:
async with AsyncSessionLocal() as session:
# URL 이미지 저장
for url_item in url_images:
img_name = (
url_item.name or _extract_image_name(url_item.url, img_order)
)
image = Image(
task_id=task_id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
source="url",
)
)
img_order += 1
# Blob 업로드 결과 저장
for img_name, blob_url in blob_upload_results:
image = Image(
task_id=task_id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
source="blob",
)
)
img_order += 1
await session.commit()
stage3_time = time.perf_counter()
print(f"[upload_images_blob] Stage 3 done - "
f"saved: {len(result_images)}, "
f"elapsed: {(stage3_time - stage2_time)*1000:.1f}ms")
except SQLAlchemyError as e:
logger.error(f"[upload_images_blob] DB Error - task_id: {task_id}, error: {e}")
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="이미지 저장 중 데이터베이스 오류가 발생했습니다.",
)
except Exception as e:
logger.error(f"[upload_images_blob] Stage 3 EXCEPTION - "
f"task_id: {task_id}, error: {type(e).__name__}: {e}")
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="이미지 업로드 중 오류가 발생했습니다.",
)
saved_count = len(result_images)
image_urls = [img.img_url for img in result_images]
total_time = time.perf_counter() - request_start
print(f"[upload_images_blob] SUCCESS - task_id: {task_id}, "
f"total: {saved_count}, total_time: {total_time*1000:.1f}ms")
return ImageUploadResponse(
task_id=task_id,
total_count=len(result_images),
url_count=len(url_images),
file_count=len(blob_upload_results),
saved_count=saved_count,
images=result_images,
image_urls=image_urls,
)