o2o-castad-backend/app/home/api/routers/v1/home.py

876 lines
33 KiB
Python

import json
import time
from datetime import date
from pathlib import Path
from typing import Optional
from urllib.parse import unquote, urlparse
import aiofiles
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import func, select
from app.database.session import get_session, AsyncSessionLocal
from app.home.models import Image, MarketingIntel, ImageTag
from app.user.dependencies.auth import get_current_user
from app.user.models import User
from app.home.schemas.home_schema import (
AutoCompleteRequest,
AccommodationSearchItem,
AccommodationSearchResponse,
CrawlingRequest,
CrawlingResponse,
ErrorResponse,
ImageUploadResponse,
ImageUploadResultItem,
ImageUrlItem,
ManualMarketingRequest,
MarketingAnalysisResponse,
ProcessedInfo,
# MarketingAnalysis,
)
from app.home.services.naver_search import naver_search_client
from app.utils.upload_blob_as_request import AzureBlobUploader
from app.utils.prompts.chatgpt_prompt import ChatgptService, ChatGPTResponseError
from app.utils.common import generate_task_id
from app.utils.logger import get_logger
from app.utils.nvMapScraper import NvMapScraper, GraphQLException, URLNotFoundException
from app.utils.nvMapPwScraper import NvMapPwScraper
from app.utils.prompts.prompts import marketing_prompt
from app.utils.autotag import autotag_images
from config import MEDIA_ROOT
# 로거 설정
logger = get_logger("home")
# 전국 시/군 이름 목록 (roadAddress에서 region 추출용)
# fmt: off
KOREAN_CITIES = [
# 특별시/광역시
"서울시", "부산시", "대구시", "인천시", "광주시", "대전시", "울산시", "세종시",
# 경기도
"수원시", "성남시", "고양시", "용인시", "부천시", "안산시", "안양시", "남양주시",
"화성시", "평택시", "의정부시", "시흥시", "파주시", "김포시", "광주시", "광명시",
"군포시", "하남시", "오산시", "이천시", "안성시", "구리시", "양주시", "포천시",
"여주시", "동두천시", "과천시", "가평군", "양평군", "연천군",
# 강원특별자치도
"춘천시", "원주시", "강릉시", "동해시", "태백시", "속초시", "삼척시",
"홍천군", "횡성군", "영월군", "평창군", "정선군", "철원군", "화천군",
"양구군", "인제군", "고성군", "양양군",
# 충청북도
"청주시", "충주시", "제천시",
"보은군", "옥천군", "영동군", "증평군", "진천군", "괴산군", "음성군", "단양군",
# 충청남도
"천안시", "공주시", "보령시", "아산시", "서산시", "논산시", "계룡시", "당진시",
"금산군", "부여군", "서천군", "청양군", "홍성군", "예산군", "태안군",
# 전북특별자치도
"전주시", "군산시", "익산시", "정읍시", "남원시", "김제시",
"완주군", "진안군", "무주군", "장수군", "임실군", "순창군", "고창군", "부안군",
# 전라남도
"목포시", "여수시", "순천시", "나주시", "광양시",
"담양군", "곡성군", "구례군", "고흥군", "보성군", "화순군", "장흥군", "강진군",
"해남군", "영암군", "무안군", "함평군", "영광군", "장성군", "완도군", "진도군", "신안군",
# 경상북도
"포항시", "경주시", "김천시", "안동시", "구미시", "영주시", "영천시", "상주시", "문경시", "경산시",
"의성군", "청송군", "영양군", "영덕군", "청도군", "고령군", "성주군", "칠곡군",
"예천군", "봉화군", "울진군", "울릉군",
# 경상남도
"창원시", "진주시", "통영시", "사천시", "김해시", "밀양시", "거제시", "양산시",
"의령군", "함안군", "창녕군", "고성군", "남해군", "하동군", "산청군", "함양군", "거창군", "합천군",
# 제주특별자치도
"제주시", "서귀포시",
]
# fmt: on
# router = APIRouter(tags=["Home"])
router = APIRouter()
@router.get(
"/search/accommodation",
summary="숙박/펜션 자동완성 검색",
description="""
네이버 지역 검색 API를 이용한 숙박/펜션 자동완성 검색입니다.
## 요청 파라미터
- **query**: 검색어 (필수)
## 반환 정보
- **query**: 검색어
- **count**: 검색 결과 수 (최대 10개)
- **items**: 검색 결과 목록
- **title**: 숙소명 (HTML 태그 포함 가능)
- **address**: 지번 주소
- **roadAddress**: 도로명 주소
""",
response_model=AccommodationSearchResponse,
responses={
200: {"description": "검색 성공", "model": AccommodationSearchResponse},
},
tags=["Search"],
)
async def search_accommodation(
query: str,
) -> AccommodationSearchResponse:
"""숙박/펜션 자동완성 검색"""
results = await naver_search_client.search_accommodation(
query=query,
display=10,
)
items = [AccommodationSearchItem(**item) for item in results]
return AccommodationSearchResponse(
query=query,
count=len(items),
items=items,
)
METRO_CITY_MAP = {
"서울": "서울시", "부산": "부산시", "대구": "대구시",
"인천": "인천시", "광주": "광주시", "대전": "대전시",
"울산": "울산시", "세종": "세종시",
}
def _extract_region_from_address(road_address: str | None) -> str:
"""roadAddress에서 시/군 이름 추출
매칭 우선순위:
1. KOREAN_CITIES 직접 매칭 (시/군 접미사 포함)
2. KOREAN_CITIES 접미사 생략 매칭
3. 주소 두 번째 토큰이 시/군으로 끝나는 경우 (예: "전북 군산시 ...")
4. 주소 두 번째 토큰이 구/동인 경우 → 첫 번째 토큰으로 광역시 매핑 (예: "서울 강남구 ...")
"""
if not road_address:
return ""
for city in KOREAN_CITIES:
if city in road_address:
return city
if city[:-1] in road_address:
return city
tokens = road_address.split()
if len(tokens) >= 2:
second = tokens[1]
if second.endswith("") or second.endswith(""):
return second
if second.endswith("") or second.endswith(""):
return METRO_CITY_MAP.get(tokens[0], "")
return ""
@router.post(
"/crawling",
summary="네이버 지도 크롤링",
description="""
네이버 지도 장소 URL을 입력받아 이미지 목록과 기본 정보를 크롤링합니다.
## 요청 필드
- **url**: 네이버 지도 장소 URL (필수)
## 반환 정보
- **image_list**: 장소 이미지 URL 목록
- **image_count**: 이미지 개수
- **processed_info**: 가공된 장소 정보 (customer_name, region, detail_region_info)
""",
response_model=CrawlingResponse,
response_description="크롤링 결과",
responses={
200: {"description": "크롤링 성공", "model": CrawlingResponse},
400: {
"description": "잘못된 URL",
"model": ErrorResponse,
},
502: {
"description": "크롤링 실패",
"model": ErrorResponse,
},
},
tags=["Crawling"],
)
async def crawling(
request_body: CrawlingRequest,
session: AsyncSession = Depends(get_session)):
return await _crawling_logic(request_body.url, session)
@router.post(
"/autocomplete",
summary="네이버 자동완성 크롤링",
description="""
네이버 검색 API 정보를 활용하여 Place ID를 추출한 뒤 자동으로 크롤링합니다.
## 요청 필드
- **title**: 네이버 검색 API Place 결과물 title (필수)
- **address**: 네이버 검색 API Place 결과물 지번주소 (필수)
- **roadAddress**:네이버 검색 API Place 결과물 도로명주소
## 반환 정보
- **image_list**: 장소 이미지 URL 목록
- **image_count**: 이미지 개수
- **processed_info**: 가공된 장소 정보 (customer_name, region, detail_region_info)
""",
response_model=CrawlingResponse,
response_description="크롤링 결과",
responses={
200: {"description": "크롤링 성공", "model": CrawlingResponse},
400: {
"description": "잘못된 URL",
"model": ErrorResponse,
},
502: {
"description": "크롤링 실패",
"model": ErrorResponse,
},
},
tags=["Crawling"],
)
async def autocomplete_crawling(
request_body: AutoCompleteRequest,
session: AsyncSession = Depends(get_session)):
url = await _autocomplete_logic(request_body.model_dump())
return await _crawling_logic(url, session)
async def _crawling_logic(
url:str,
session: AsyncSession):
request_start = time.perf_counter()
logger.info("[crawling] ========== START ==========")
logger.info(f"[crawling] URL: {url[:80]}...")
# ========== Step 1: 네이버 지도 크롤링 ==========
step1_start = time.perf_counter()
logger.info("[crawling] Step 1: 네이버 지도 크롤링 시작...")
try:
scraper = NvMapScraper(url)
await scraper.scrap()
except GraphQLException as e:
step1_elapsed = (time.perf_counter() - step1_start) * 1000
logger.error(
f"[crawling] Step 1 FAILED - GraphQL 크롤링 실패: {e} ({step1_elapsed:.1f}ms)"
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"네이버 지도 크롤링에 실패했습니다: {e}",
)
except URLNotFoundException as e:
step1_elapsed = (time.perf_counter() - step1_start) * 1000
logger.error(
f"[crawling] Step 1 FAILED - 크롤링 실패: {e} ({step1_elapsed:.1f}ms)"
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Place ID를 확인할 수 없습니다. URL을 확인하세요. : {e}",
)
except Exception as e:
step1_elapsed = (time.perf_counter() - step1_start) * 1000
logger.error(
f"[crawling] Step 1 FAILED - 크롤링 중 예기치 않은 오류: {e} ({step1_elapsed:.1f}ms)"
)
logger.exception("[crawling] Step 1 상세 오류:")
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="네이버 지도 크롤링 중 오류가 발생했습니다.",
)
step1_elapsed = (time.perf_counter() - step1_start) * 1000
image_count = len(scraper.image_link_list) if scraper.image_link_list else 0
logger.info(
f"[crawling] Step 1 완료 - 이미지 {image_count}개 ({step1_elapsed:.1f}ms)"
)
# ========== Step 2: 정보 가공 ==========
step2_start = time.perf_counter()
logger.info("[crawling] Step 2: 정보 가공 시작...")
processed_info = None
marketing_analysis = None
if scraper.base_info:
road_address = scraper.base_info.get("roadAddress", "") or scraper.base_info.get("address", "")
customer_name = scraper.base_info.get("name", "")
region = _extract_region_from_address(road_address)
processed_info = ProcessedInfo(
customer_name=customer_name,
region=region,
detail_region_info=road_address or "",
)
step2_elapsed = (time.perf_counter() - step2_start) * 1000
logger.info(
f"[crawling] Step 2 완료 - {customer_name}, {region} ({step2_elapsed:.1f}ms)"
)
# ========== Step 3: ChatGPT 마케팅 분석 ==========
step3_start = time.perf_counter()
logger.info("[crawling] Step 3: ChatGPT 마케팅 분석 시작...")
try:
marketing_analysis, m_id = await _run_marketing_analysis(
customer_name=customer_name,
region=region,
detail_region_info=road_address or "",
place_id=scraper.place_id,
session=session,
)
step3_elapsed = (time.perf_counter() - step3_start) * 1000
logger.info(
f"[crawling] Step 3 완료 - 마케팅 분석 성공 ({step3_elapsed:.1f}ms)"
)
except ChatGPTResponseError as e:
step3_elapsed = (time.perf_counter() - step3_start) * 1000
logger.error(
f"[crawling] Step 3 FAILED - ChatGPT Error: status={e.status}, "
f"code={e.error_code}, message={e.error_message} ({step3_elapsed:.1f}ms)"
)
marketing_analysis = None
gpt_status = "failed"
except Exception as e:
step3_elapsed = (time.perf_counter() - step3_start) * 1000
logger.error(
f"[crawling] Step 3 FAILED - GPT 마케팅 분석 중 오류: {e} ({step3_elapsed:.1f}ms)"
)
logger.exception("[crawling] Step 3 상세 오류:")
marketing_analysis = None
gpt_status = "failed"
else:
step2_elapsed = (time.perf_counter() - step2_start) * 1000
logger.warning(
f"[crawling] Step 2 - base_info 없음, 마케팅 분석 스킵 ({step2_elapsed:.1f}ms)"
)
# ========== 완료 ==========
total_elapsed = (time.perf_counter() - request_start) * 1000
logger.info("[crawling] ========== COMPLETE ==========")
logger.info(f"[crawling] 총 소요시간: {total_elapsed:.1f}ms")
logger.info(f"[crawling] - Step 1 (크롤링): {step1_elapsed:.1f}ms")
if scraper.base_info:
logger.info(f"[crawling] - Step 2 (정보가공): {step2_elapsed:.1f}ms")
if "step3_elapsed" in locals():
logger.info(f"[crawling] - Step 3 (GPT 분석): {step3_elapsed:.1f}ms")
return {
"status": gpt_status if 'gpt_status' in locals() else "completed",
"image_list": scraper.image_link_list,
"image_count": len(scraper.image_link_list) if scraper.image_link_list else 0,
"processed_info": processed_info,
"marketing_analysis": marketing_analysis,
"m_id" : m_id
}
async def _run_marketing_analysis(
customer_name: str,
region: str,
detail_region_info: str,
place_id: Optional[str],
session: AsyncSession,
):
"""ChatGPT 마케팅 분석 실행 → MarketingIntel 저장 → (MarketingPromptOutput, m_id) 반환"""
chatgpt_service = ChatgptService()
input_marketing_data = {
"customer_name": customer_name,
"region": region,
"detail_region_info": detail_region_info,
}
structured_report = await chatgpt_service.generate_structured_output(
marketing_prompt, input_marketing_data
)
marketing_intel = MarketingIntel(
place_id=place_id,
intel_result=structured_report.model_dump(),
)
session.add(marketing_intel)
await session.commit()
await session.refresh(marketing_intel)
logger.debug(f"[MarketingPrompt] INSERT place_id={marketing_intel.place_id} id={marketing_intel.id}")
return structured_report, marketing_intel.id
@router.post(
"/marketing",
summary="업체명+주소 직접 입력 마케팅 분석",
description="""
네이버 크롤링 없이 업체명과 주소를 직접 입력받아 마케팅 분석을 수행합니다.
## 요청 필드
- **customer_name**: 업체명 / 브랜드명 (필수)
- **address**: 도로명 또는 지번 주소 (필수)
## 반환 정보
- **processed_info**: 가공된 장소 정보 (customer_name, region, detail_region_info)
- **marketing_analysis**: ChatGPT 마케팅 분석 결과
- **m_id**: 마케팅 분석 결과 ID (이후 영상생성 파이프라인에 사용)
""",
response_model=MarketingAnalysisResponse,
tags=["Marketing"],
)
async def manual_marketing(
request_body: ManualMarketingRequest,
session: AsyncSession = Depends(get_session),
):
region = _extract_region_from_address(request_body.address)
processed_info = ProcessedInfo(
customer_name=request_body.store_name,
region=region,
detail_region_info=request_body.address,
)
try:
marketing_analysis, m_id = await _run_marketing_analysis(
customer_name=request_body.store_name,
region=region,
detail_region_info=request_body.address,
place_id=None,
session=session,
)
except ChatGPTResponseError as e:
logger.error(
f"[marketing] ChatGPT Error: status={e.status}, "
f"code={e.error_code}, message={e.error_message}"
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"마케팅 분석 중 ChatGPT 오류가 발생했습니다: {e.error_message}",
)
except Exception as e:
logger.error(f"[marketing] 마케팅 분석 중 오류: {e}")
logger.exception("[marketing] 상세 오류:")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="마케팅 분석 중 오류가 발생했습니다.",
)
return MarketingAnalysisResponse(
status="completed",
processed_info=processed_info,
marketing_analysis=marketing_analysis,
m_id=m_id,
)
async def _autocomplete_logic(autocomplete_item:dict):
step1_start = time.perf_counter()
try:
async with NvMapPwScraper() as pw_scraper:
new_url = await pw_scraper.get_place_id_url(autocomplete_item)
except Exception as e:
step1_elapsed = (time.perf_counter() - step1_start) * 1000
logger.error(
f"[crawling] Autocomplete FAILED - 자동완성 예기치 않은 오류: {e} ({step1_elapsed:.1f}ms)"
)
logger.exception("[crawling] Autocomplete 상세 오류:")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="자동완성 place id 추출 실패",
)
if not new_url:
step1_elapsed = (time.perf_counter() - step1_start) * 1000
logger.error(
f"[crawling] Autocomplete FAILED - URL을 찾을 수 없음 ({step1_elapsed:.1f}ms)"
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="해당 장소의 네이버 지도 URL을 찾을 수 없습니다.",
)
return new_url
def _extract_image_name(url: str, index: int) -> str:
"""URL에서 이미지 이름 추출 또는 기본 이름 생성"""
try:
path = urlparse(url).path
filename = path.split("/")[-1] if path else ""
if filename:
return unquote(filename)
except Exception:
pass
return f"image_{index + 1:03d}"
ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".heif"}
def _is_valid_image_extension(filename: str | None) -> bool:
"""파일명의 확장자가 유효한 이미지 확장자인지 확인"""
if not filename:
return False
ext = Path(filename).suffix.lower()
return ext in ALLOWED_IMAGE_EXTENSIONS
def _get_file_extension(filename: str) -> str:
"""파일명에서 확장자 추출 (소문자)"""
return Path(filename).suffix.lower()
async def _save_upload_file(file: UploadFile, save_path: Path) -> None:
"""업로드 파일을 지정된 경로에 저장"""
save_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(save_path, "wb") as f:
content = await file.read()
await f.write(content)
IMAGES_JSON_EXAMPLE = """[
{"url": "https://naverbooking-phinf.pstatic.net/20240514_189/1715688030436xT14o_JPEG/1.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_48/1715688030574wTtQd_JPEG/2.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_92/17156880307484bvpH_JPEG/3.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_7/1715688031000y8Y5q_JPEG/4.jpg"},
{"url": "https://naverbooking-phinf.pstatic.net/20240514_259/17156880311809wCnY_JPEG/5.jpg", "name": "외관"}
]"""
@router.post(
"/image/upload/blob",
summary="이미지 업로드 (Azure Blob Storage)",
description="""
이미지를 Azure Blob Storage에 업로드하고 새로운 task_id를 생성합니다.
바이너리 파일은 로컬 서버에 저장하지 않고 Azure Blob에 직접 업로드됩니다.
## 인증
**Bearer 토큰 필수** - `Authorization: Bearer {access_token}` 헤더를 포함해야 합니다.
## 요청 방식
multipart/form-data 형식으로 전송합니다.
## 요청 필드
- **images_json**: 외부 이미지 URL 목록 (JSON 문자열, 선택)
- **files**: 이미지 바이너리 파일 목록 (선택)
**주의**: images_json 또는 files 중 최소 하나는 반드시 전달해야 합니다.
## 지원 이미지 확장자
jpg, jpeg, png, webp, heic, heif
## images_json 예시
```json
[
{"url": "https://example.com/image1.jpg"},
{"url": "https://example.com/image2.jpg", "name": "외관"}
]
```
## 바이너리 파일 업로드 테스트 방법
### cURL로 테스트
```bash
# 바이너리 파일만 업로드
curl -X POST "http://localhost:8000/image/upload/blob" \\
-H "Authorization: Bearer {access_token}" \\
-F "files=@/path/to/image1.jpg" \\
-F "files=@/path/to/image2.png"
# URL + 바이너리 파일 동시 업로드
curl -X POST "http://localhost:8000/image/upload/blob" \\
-H "Authorization: Bearer {access_token}" \\
-F 'images_json=[{"url":"https://example.com/image.jpg"}]' \\
-F "files=@/path/to/local_image.jpg"
```
## 반환 정보
- **task_id**: 새로 생성된 작업 고유 식별자
- **total_count**: 총 업로드된 이미지 개수
- **url_count**: URL로 등록된 이미지 개수 (Image 테이블에 외부 URL 그대로 저장)
- **file_count**: 파일로 업로드된 이미지 개수 (Azure Blob Storage에 저장)
- **saved_count**: Image 테이블에 저장된 row 수
- **images**: 업로드된 이미지 목록
- **source**: "url" (외부 URL) 또는 "blob" (Azure Blob Storage)
- **image_urls**: Image 테이블에 저장된 현재 task_id의 이미지 URL 목록
## 저장 경로
- 바이너리 파일: Azure Blob Storage ({BASE_URL}/{task_id}/image/{파일명})
- URL 이미지: 외부 URL 그대로 Image 테이블에 저장
""",
response_model=ImageUploadResponse,
responses={
200: {"description": "이미지 업로드 성공"},
400: {"description": "이미지가 제공되지 않음", "model": ErrorResponse},
401: {"description": "인증 실패 (토큰 없음/만료)"},
},
tags=["Image-Blob"],
openapi_extra={
"requestBody": {
"content": {
"multipart/form-data": {
"encoding": {"files": {"contentType": "application/octet-stream"}}
}
}
}
},
)
async def upload_images_blob(
images_json: Optional[str] = Form(
default=None,
description="외부 이미지 URL 목록 (JSON 문자열)",
examples=[IMAGES_JSON_EXAMPLE],
),
files: Optional[list[UploadFile]] = File(
default=None,
description="이미지 바이너리 파일 목록",
),
current_user: User = Depends(get_current_user),
) -> ImageUploadResponse:
"""이미지 업로드 (URL + Azure Blob Storage)
3단계로 분리하여 세션 점유 시간 최소화:
- Stage 1: 입력 검증 및 파일 데이터 준비 (세션 없음)
- Stage 2: Azure Blob 업로드 (세션 없음)
- Stage 3: DB 저장 (새 세션으로 빠르게 처리)
"""
request_start = time.perf_counter()
# task_id 생성
task_id = await generate_task_id()
logger.info(f"[upload_images_blob] START - task_id: {task_id}")
# ========== Stage 1: 입력 검증 및 파일 데이터 준비 (세션 없음) ==========
has_images_json = images_json is not None and images_json.strip() != ""
has_files = files is not None and len(files) > 0
if not has_images_json and not has_files:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="images_json 또는 files 중 하나는 반드시 제공해야 합니다.",
)
# images_json 파싱
url_images: list[ImageUrlItem] = []
if has_images_json and images_json:
try:
parsed = json.loads(images_json)
if isinstance(parsed, list):
url_images = [ImageUrlItem(**item) for item in parsed if item]
except (json.JSONDecodeError, TypeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"images_json 파싱 오류: {str(e)}",
)
# 유효한 파일만 필터링 및 파일 내용 미리 읽기
valid_files_data: list[tuple[str, str, bytes]] = [] # (original_name, ext, content)
skipped_files: list[str] = []
if has_files and files:
for f in files:
is_valid_ext = _is_valid_image_extension(f.filename)
is_not_empty = f.size is None or f.size > 0
is_real_file = f.filename and f.filename != "filename"
if f and is_real_file and is_valid_ext and is_not_empty:
# 파일 내용을 미리 읽어둠
content = await f.read()
ext = _get_file_extension(f.filename) # type: ignore[arg-type]
valid_files_data.append((f.filename or "image", ext, content))
else:
skipped_files.append(f.filename or "unknown")
if not url_images and not valid_files_data:
detail = (
f"유효한 이미지가 없습니다. "
f"지원 확장자: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}. "
f"건너뛴 파일: {skipped_files}"
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=detail,
)
stage1_time = time.perf_counter()
logger.info(
f"[upload_images_blob] Stage 1 done - urls: {len(url_images)}, "
f"files: {len(valid_files_data)}, "
f"elapsed: {(stage1_time - request_start) * 1000:.1f}ms"
)
# ========== Stage 2: Azure Blob 업로드 (세션 없음) ==========
# 업로드 결과를 저장할 리스트 (나중에 DB에 저장)
blob_upload_results: list[tuple[str, str]] = [] # (img_name, blob_url)
img_order = len(url_images) # URL 이미지 다음 순서부터 시작
if valid_files_data:
uploader = AzureBlobUploader(user_uuid=current_user.user_uuid, task_id=task_id)
total_files = len(valid_files_data)
for idx, (original_name, ext, file_content) in enumerate(valid_files_data):
name_without_ext = (
original_name.rsplit(".", 1)[0]
if "." in original_name
else original_name
)
filename = f"{name_without_ext}_{img_order:03d}{ext}"
logger.debug(
f"[upload_images_blob] Uploading file {idx + 1}/{total_files}: "
f"{filename} ({len(file_content)} bytes)"
)
# Azure Blob Storage에 직접 업로드
upload_success = await uploader.upload_image_bytes(file_content, filename)
if upload_success:
blob_url = uploader.public_url
blob_upload_results.append((original_name, blob_url))
img_order += 1
logger.debug(
f"[upload_images_blob] File {idx + 1}/{total_files} SUCCESS"
)
else:
skipped_files.append(filename)
logger.warning(
f"[upload_images_blob] File {idx + 1}/{total_files} FAILED"
)
stage2_time = time.perf_counter()
logger.info(
f"[upload_images_blob] Stage 2 done - blob uploads: "
f"{len(blob_upload_results)}, skipped: {len(skipped_files)}, "
f"elapsed: {(stage2_time - stage1_time) * 1000:.1f}ms"
)
# ========== Stage 3: DB 저장 (새 세션으로 빠르게 처리) ==========
logger.info("[upload_images_blob] Stage 3 starting - DB save...")
result_images: list[ImageUploadResultItem] = []
img_order = 0
try:
async with AsyncSessionLocal() as session:
# URL 이미지 저장
for url_item in url_images:
img_name = url_item.name or _extract_image_name(url_item.url, img_order)
image = Image(
task_id=task_id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=url_item.url,
img_order=img_order,
source="url",
)
)
img_order += 1
# Blob 업로드 결과 저장
for img_name, blob_url in blob_upload_results:
image = Image(
task_id=task_id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
)
session.add(image)
await session.flush()
result_images.append(
ImageUploadResultItem(
id=image.id,
img_name=img_name,
img_url=blob_url,
img_order=img_order,
source="blob",
)
)
img_order += 1
await session.commit()
stage3_time = time.perf_counter()
logger.info(
f"[upload_images_blob] Stage 3 done - "
f"saved: {len(result_images)}, "
f"elapsed: {(stage3_time - stage2_time) * 1000:.1f}ms"
)
except SQLAlchemyError as e:
logger.error(f"[upload_images_blob] DB Error - task_id: {task_id}, error: {e}")
logger.exception("[upload_images_blob] DB 상세 오류:")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="이미지 저장 중 데이터베이스 오류가 발생했습니다.",
)
except Exception as e:
logger.error(
f"[upload_images_blob] Stage 3 EXCEPTION - "
f"task_id: {task_id}, error: {type(e).__name__}: {e}"
)
logger.exception("[upload_images_blob] Stage 3 상세 오류:")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="이미지 업로드 중 오류가 발생했습니다.",
)
saved_count = len(result_images)
image_urls = [img.img_url for img in result_images]
logger.info(f"[image_tagging] START - task_id: {task_id}")
await tagging_images(image_urls, clear_old_tags=True)
logger.info(f"[image_tagging] Done - task_id: {task_id}")
total_time = time.perf_counter() - request_start
logger.info(
f"[upload_images_blob] SUCCESS - task_id: {task_id}, "
f"total: {saved_count}, total_time: {total_time * 1000:.1f}ms"
)
return ImageUploadResponse(
task_id=task_id,
total_count=len(result_images),
url_count=len(url_images),
file_count=len(blob_upload_results),
saved_count=saved_count,
images=result_images,
image_urls=image_urls,
)
async def tagging_images(
image_urls : list[str],
clear_old_tags : bool = False
) -> None:
# 1. 조회
async with AsyncSessionLocal() as session:
stmt = (
select(ImageTag)
.where(ImageTag.img_url_hash.in_([func.crc32(url) for url in image_urls]))
.where(ImageTag.img_url.in_(image_urls))
)
image_tags_query_results = await session.execute(stmt)
image_tags = image_tags_query_results.scalars().all()
existing_urls = {tag.img_url for tag in image_tags}
new_imt = [
ImageTag(img_url=url, img_tag=None)
for url in image_urls
if url not in existing_urls
]
if clear_old_tags:
for tag in image_tags:
tag.img_tag = None
session.add_all(new_imt)
null_imts = [imt for imt in image_tags if imt.img_tag is None] + new_imt
await session.commit()
if null_imts:
tag_datas = await autotag_images([img.img_url for img in null_imts])
print(tag_datas)
async with AsyncSessionLocal() as session:
for tag, tag_data in zip(null_imts, tag_datas):
if isinstance(tag_data, Exception):
continue
tag.img_tag = tag_data.model_dump(mode="json")
session.add(tag)
await session.commit()