import json import time from datetime import date from pathlib import Path from typing import Optional from urllib.parse import unquote, urlparse import aiofiles from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import func, select from app.database.session import get_session, AsyncSessionLocal from app.home.models import Image, MarketingIntel, ImageTag from app.user.dependencies.auth import get_current_user from app.user.models import User from app.home.schemas.home_schema import ( AutoCompleteRequest, AccommodationSearchItem, AccommodationSearchResponse, CrawlingRequest, CrawlingResponse, ErrorResponse, ImageUploadResponse, ImageUploadResultItem, ImageUrlItem, ManualMarketingRequest, ProcessedInfo, # MarketingAnalysis, ) from app.home.services.naver_search import naver_search_client from app.utils.upload_blob_as_request import AzureBlobUploader from app.utils.prompts.chatgpt_prompt import ChatgptService, ChatGPTResponseError from app.utils.common import generate_task_id from app.utils.logger import get_logger from app.utils.nvMapScraper import NvMapScraper, GraphQLException, URLNotFoundException from app.utils.nvMapPwScraper import NvMapPwScraper from app.utils.prompts.prompts import marketing_prompt from app.utils.autotag import autotag_images from config import MEDIA_ROOT # 로거 설정 logger = get_logger("home") # 전국 시/군 이름 목록 (roadAddress에서 region 추출용) # fmt: off KOREAN_CITIES = [ # 특별시/광역시 "서울시", "부산시", "대구시", "인천시", "광주시", "대전시", "울산시", "세종시", # 경기도 "수원시", "성남시", "고양시", "용인시", "부천시", "안산시", "안양시", "남양주시", "화성시", "평택시", "의정부시", "시흥시", "파주시", "김포시", "광주시", "광명시", "군포시", "하남시", "오산시", "이천시", "안성시", "구리시", "양주시", "포천시", "여주시", "동두천시", "과천시", "가평군", "양평군", "연천군", # 강원특별자치도 "춘천시", "원주시", "강릉시", "동해시", "태백시", "속초시", "삼척시", "홍천군", "횡성군", "영월군", "평창군", "정선군", "철원군", "화천군", "양구군", "인제군", "고성군", "양양군", # 충청북도 "청주시", "충주시", "제천시", "보은군", "옥천군", "영동군", "증평군", "진천군", "괴산군", "음성군", "단양군", # 충청남도 "천안시", "공주시", "보령시", "아산시", "서산시", "논산시", "계룡시", "당진시", "금산군", "부여군", "서천군", "청양군", "홍성군", "예산군", "태안군", # 전북특별자치도 "전주시", "군산시", "익산시", "정읍시", "남원시", "김제시", "완주군", "진안군", "무주군", "장수군", "임실군", "순창군", "고창군", "부안군", # 전라남도 "목포시", "여수시", "순천시", "나주시", "광양시", "담양군", "곡성군", "구례군", "고흥군", "보성군", "화순군", "장흥군", "강진군", "해남군", "영암군", "무안군", "함평군", "영광군", "장성군", "완도군", "진도군", "신안군", # 경상북도 "포항시", "경주시", "김천시", "안동시", "구미시", "영주시", "영천시", "상주시", "문경시", "경산시", "의성군", "청송군", "영양군", "영덕군", "청도군", "고령군", "성주군", "칠곡군", "예천군", "봉화군", "울진군", "울릉군", # 경상남도 "창원시", "진주시", "통영시", "사천시", "김해시", "밀양시", "거제시", "양산시", "의령군", "함안군", "창녕군", "고성군", "남해군", "하동군", "산청군", "함양군", "거창군", "합천군", # 제주특별자치도 "제주시", "서귀포시", ] # fmt: on # router = APIRouter(tags=["Home"]) router = APIRouter() @router.get( "/search/accommodation", summary="숙박/펜션 자동완성 검색", description=""" 네이버 지역 검색 API를 이용한 숙박/펜션 자동완성 검색입니다. ## 요청 파라미터 - **query**: 검색어 (필수) ## 반환 정보 - **query**: 검색어 - **count**: 검색 결과 수 (최대 10개) - **items**: 검색 결과 목록 - **title**: 숙소명 (HTML 태그 포함 가능) - **address**: 지번 주소 - **roadAddress**: 도로명 주소 """, response_model=AccommodationSearchResponse, responses={ 200: {"description": "검색 성공", "model": AccommodationSearchResponse}, }, tags=["Search"], ) async def search_accommodation( query: str, ) -> AccommodationSearchResponse: """숙박/펜션 자동완성 검색""" results = await naver_search_client.search_accommodation( query=query, display=10, ) items = [AccommodationSearchItem(**item) for item in results] return AccommodationSearchResponse( query=query, count=len(items), items=items, ) METRO_CITY_MAP = { "서울": "서울시", "부산": "부산시", "대구": "대구시", "인천": "인천시", "광주": "광주시", "대전": "대전시", "울산": "울산시", "세종": "세종시", } def _extract_region_from_address(road_address: str | None, jibun_address: str | None = None) -> str: """주소에서 시/군 이름 추출 — 도로명 우선, 실패 시 지번으로 재시도""" def _parse(address: str) -> str: token_set = set(address.split()) for city in KOREAN_CITIES: if city in token_set: # 완전 일치 (토큰 단위) return city if city[:-1] in token_set: # 접미사 생략 일치 (토큰 단위) return city tokens = address.split() if len(tokens) >= 2: second = tokens[1] if second.endswith("시") or second.endswith("군"): return second if second.endswith("구") or second.endswith("동"): return METRO_CITY_MAP.get(tokens[0], "") return "" if road_address: result = _parse(road_address) if result: return result if jibun_address: return _parse(jibun_address) return "" @router.post( "/crawling", summary="네이버 지도 크롤링", description=""" 네이버 지도 장소 URL을 입력받아 이미지 목록과 기본 정보를 크롤링합니다. ## 요청 필드 - **url**: 네이버 지도 장소 URL (필수) ## 반환 정보 - **image_list**: 장소 이미지 URL 목록 - **image_count**: 이미지 개수 - **processed_info**: 가공된 장소 정보 (customer_name, region, detail_region_info) """, response_model=CrawlingResponse, response_description="크롤링 결과", responses={ 200: {"description": "크롤링 성공", "model": CrawlingResponse}, 400: { "description": "잘못된 URL", "model": ErrorResponse, }, 502: { "description": "크롤링 실패", "model": ErrorResponse, }, }, tags=["Crawling"], ) async def crawling( request_body: CrawlingRequest, session: AsyncSession = Depends(get_session)): return await _crawling_logic(request_body.url, session) @router.post( "/autocomplete", summary="네이버 자동완성 크롤링", description=""" 네이버 검색 API 정보를 활용하여 Place ID를 추출한 뒤 자동으로 크롤링합니다. ## 요청 필드 - **title**: 네이버 검색 API Place 결과물 title (필수) - **address**: 네이버 검색 API Place 결과물 지번주소 (필수) - **roadAddress**:네이버 검색 API Place 결과물 도로명주소 ## 반환 정보 - **image_list**: 장소 이미지 URL 목록 - **image_count**: 이미지 개수 - **processed_info**: 가공된 장소 정보 (customer_name, region, detail_region_info) """, response_model=CrawlingResponse, response_description="크롤링 결과", responses={ 200: {"description": "크롤링 성공", "model": CrawlingResponse}, 400: { "description": "잘못된 URL", "model": ErrorResponse, }, 502: { "description": "크롤링 실패", "model": ErrorResponse, }, }, tags=["Crawling"], ) async def autocomplete_crawling( request_body: AutoCompleteRequest, session: AsyncSession = Depends(get_session)): url = await _autocomplete_logic(request_body.model_dump()) return await _crawling_logic(url, session) async def _crawling_logic( url:str, session: AsyncSession): request_start = time.perf_counter() logger.info("[crawling] ========== START ==========") logger.info(f"[crawling] URL: {url[:80]}...") # ========== Step 1: 네이버 지도 크롤링 ========== step1_start = time.perf_counter() logger.info("[crawling] Step 1: 네이버 지도 크롤링 시작...") try: scraper = NvMapScraper(url) await scraper.scrap() except GraphQLException as e: step1_elapsed = (time.perf_counter() - step1_start) * 1000 logger.error( f"[crawling] Step 1 FAILED - GraphQL 크롤링 실패: {e} ({step1_elapsed:.1f}ms)" ) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"네이버 지도 크롤링에 실패했습니다: {e}", ) except URLNotFoundException as e: step1_elapsed = (time.perf_counter() - step1_start) * 1000 logger.error( f"[crawling] Step 1 FAILED - 크롤링 실패: {e} ({step1_elapsed:.1f}ms)" ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Place ID를 확인할 수 없습니다. URL을 확인하세요. : {e}", ) except Exception as e: step1_elapsed = (time.perf_counter() - step1_start) * 1000 logger.error( f"[crawling] Step 1 FAILED - 크롤링 중 예기치 않은 오류: {e} ({step1_elapsed:.1f}ms)" ) logger.exception("[crawling] Step 1 상세 오류:") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail="네이버 지도 크롤링 중 오류가 발생했습니다.", ) step1_elapsed = (time.perf_counter() - step1_start) * 1000 image_count = len(scraper.image_link_list) if scraper.image_link_list else 0 logger.info( f"[crawling] Step 1 완료 - 이미지 {image_count}개 ({step1_elapsed:.1f}ms)" ) # ========== Step 2: 정보 가공 ========== step2_start = time.perf_counter() logger.info("[crawling] Step 2: 정보 가공 시작...") processed_info = None marketing_analysis = None if scraper.base_info: road_address = scraper.base_info.get("roadAddress", "") jibun_address = scraper.base_info.get("address", "") customer_name = scraper.base_info.get("name", "") region = _extract_region_from_address(road_address, jibun_address) processed_info = ProcessedInfo( customer_name=customer_name, region=region, detail_region_info=road_address or jibun_address or "", ) step2_elapsed = (time.perf_counter() - step2_start) * 1000 logger.info( f"[crawling] Step 2 완료 - {customer_name}, {region} ({step2_elapsed:.1f}ms)" ) # ========== Step 3: ChatGPT 마케팅 분석 ========== step3_start = time.perf_counter() logger.info("[crawling] Step 3: ChatGPT 마케팅 분석 시작...") try: # Step 3-1: ChatGPT 서비스 초기화 및 입력 데이터 구성 chatgpt_service = ChatgptService() input_marketing_data = { "customer_name": customer_name, "region": region, "detail_region_info": road_address or "", } # Step 3-2: GPT API 호출 → 구조화된 마케팅 분석 결과 반환 marketing_analysis = await chatgpt_service.generate_structured_output( marketing_prompt, input_marketing_data ) # Step 3-3: 분석 결과 DB 저장 marketing_intel = MarketingIntel( place_id=scraper.place_id, intel_result=marketing_analysis.model_dump(), ) session.add(marketing_intel) await session.commit() await session.refresh(marketing_intel) m_id = marketing_intel.id logger.debug(f"[MarketingPrompt] INSERT place_id={marketing_intel.place_id} id={marketing_intel.id}") step3_elapsed = (time.perf_counter() - step3_start) * 1000 logger.info( f"[crawling] Step 3 완료 - 마케팅 분석 성공 ({step3_elapsed:.1f}ms)" ) except ChatGPTResponseError as e: step3_elapsed = (time.perf_counter() - step3_start) * 1000 logger.error( f"[crawling] Step 3 FAILED - ChatGPT Error: status={e.status}, " f"code={e.error_code}, message={e.error_message} ({step3_elapsed:.1f}ms)" ) marketing_analysis = None gpt_status = "failed" except Exception as e: step3_elapsed = (time.perf_counter() - step3_start) * 1000 logger.error( f"[crawling] Step 3 FAILED - GPT 마케팅 분석 중 오류: {e} ({step3_elapsed:.1f}ms)" ) logger.exception("[crawling] Step 3 상세 오류:") marketing_analysis = None gpt_status = "failed" else: step2_elapsed = (time.perf_counter() - step2_start) * 1000 logger.warning( f"[crawling] Step 2 - base_info 없음, 마케팅 분석 스킵 ({step2_elapsed:.1f}ms)" ) # ========== 완료 ========== total_elapsed = (time.perf_counter() - request_start) * 1000 logger.info("[crawling] ========== COMPLETE ==========") logger.info(f"[crawling] 총 소요시간: {total_elapsed:.1f}ms") logger.info(f"[crawling] - Step 1 (크롤링): {step1_elapsed:.1f}ms") if scraper.base_info: logger.info(f"[crawling] - Step 2 (정보가공): {step2_elapsed:.1f}ms") if "step3_elapsed" in locals(): logger.info(f"[crawling] - Step 3 (GPT 분석): {step3_elapsed:.1f}ms") return { "status": gpt_status if 'gpt_status' in locals() else "completed", "image_list": scraper.image_link_list, "image_count": len(scraper.image_link_list) if scraper.image_link_list else 0, "processed_info": processed_info, "marketing_analysis": marketing_analysis, "m_id" : m_id } @router.post( "/marketing", summary="업체명+주소 직접 입력 마케팅 분석", description=""" 네이버 크롤링 없이 업체명과 주소를 직접 입력받아 마케팅 분석을 수행합니다. ## 요청 필드 - **customer_name**: 업체명 / 브랜드명 (필수) - **address**: 도로명 또는 지번 주소 (필수) ## 반환 정보 - **processed_info**: 가공된 장소 정보 (customer_name, region, detail_region_info) - **marketing_analysis**: ChatGPT 마케팅 분석 결과 - **m_id**: 마케팅 분석 결과 ID (이후 영상생성 파이프라인에 사용) """, response_model=CrawlingResponse, tags=["Marketing"], ) async def manual_marketing( request_body: ManualMarketingRequest, session: AsyncSession = Depends(get_session), ): # Step 1: 주소에서 지역명 추출 및 processed_info 구성 region = _extract_region_from_address(request_body.address) processed_info = ProcessedInfo( customer_name=request_body.store_name, region=region, detail_region_info=request_body.address, ) try: # Step 2: GPT API 호출 → 마케팅 분석 결과 생성 # place_id 없이 업체명+주소만으로 분석 (크롤링 없이 직접 입력된 경우) chatgpt_service = ChatgptService() input_marketing_data = { "customer_name": request_body.store_name, "region": region, "detail_region_info": request_body.address, } marketing_analysis = await chatgpt_service.generate_structured_output( marketing_prompt, input_marketing_data ) # Step 3: 분석 결과 DB 저장 (place_id=None — 네이버 장소와 연결되지 않음) marketing_intel = MarketingIntel( place_id=None, intel_result=marketing_analysis.model_dump(), ) session.add(marketing_intel) await session.commit() await session.refresh(marketing_intel) m_id = marketing_intel.id logger.debug(f"[MarketingPrompt] INSERT id={marketing_intel.id}") except ChatGPTResponseError as e: logger.error( f"[marketing] ChatGPT Error: status={e.status}, " f"code={e.error_code}, message={e.error_message}" ) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"마케팅 분석 중 ChatGPT 오류가 발생했습니다: {e.error_message}", ) except Exception as e: logger.error(f"[marketing] 마케팅 분석 중 오류: {e}") logger.exception("[marketing] 상세 오류:") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="마케팅 분석 중 오류가 발생했습니다.", ) return CrawlingResponse( status="completed", processed_info=processed_info, marketing_analysis=marketing_analysis, m_id=m_id, ) async def _autocomplete_logic(autocomplete_item:dict): step1_start = time.perf_counter() try: async with NvMapPwScraper() as pw_scraper: new_url = await pw_scraper.get_place_id_url(autocomplete_item) except Exception as e: step1_elapsed = (time.perf_counter() - step1_start) * 1000 logger.error( f"[crawling] Autocomplete FAILED - 자동완성 예기치 않은 오류: {e} ({step1_elapsed:.1f}ms)" ) logger.exception("[crawling] Autocomplete 상세 오류:") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="자동완성 place id 추출 실패", ) if not new_url: step1_elapsed = (time.perf_counter() - step1_start) * 1000 logger.error( f"[crawling] Autocomplete FAILED - URL을 찾을 수 없음 ({step1_elapsed:.1f}ms)" ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="해당 장소의 네이버 지도 URL을 찾을 수 없습니다.", ) return new_url def _extract_image_name(url: str, index: int) -> str: """URL에서 이미지 이름 추출 또는 기본 이름 생성""" try: path = urlparse(url).path filename = path.split("/")[-1] if path else "" if filename: return unquote(filename) except Exception: pass return f"image_{index + 1:03d}" ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".heif"} def _is_valid_image_extension(filename: str | None) -> bool: """파일명의 확장자가 유효한 이미지 확장자인지 확인""" if not filename: return False ext = Path(filename).suffix.lower() return ext in ALLOWED_IMAGE_EXTENSIONS def _get_file_extension(filename: str) -> str: """파일명에서 확장자 추출 (소문자)""" return Path(filename).suffix.lower() async def _save_upload_file(file: UploadFile, save_path: Path) -> None: """업로드 파일을 지정된 경로에 저장""" save_path.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(save_path, "wb") as f: content = await file.read() await f.write(content) IMAGES_JSON_EXAMPLE = """[ {"url": "https://naverbooking-phinf.pstatic.net/20240514_189/1715688030436xT14o_JPEG/1.jpg"}, {"url": "https://naverbooking-phinf.pstatic.net/20240514_48/1715688030574wTtQd_JPEG/2.jpg"}, {"url": "https://naverbooking-phinf.pstatic.net/20240514_92/17156880307484bvpH_JPEG/3.jpg"}, {"url": "https://naverbooking-phinf.pstatic.net/20240514_7/1715688031000y8Y5q_JPEG/4.jpg"}, {"url": "https://naverbooking-phinf.pstatic.net/20240514_259/17156880311809wCnY_JPEG/5.jpg", "name": "외관"} ]""" @router.post( "/image/upload/blob", summary="이미지 업로드 (Azure Blob Storage)", description=""" 이미지를 Azure Blob Storage에 업로드하고 새로운 task_id를 생성합니다. 바이너리 파일은 로컬 서버에 저장하지 않고 Azure Blob에 직접 업로드됩니다. ## 인증 **Bearer 토큰 필수** - `Authorization: Bearer {access_token}` 헤더를 포함해야 합니다. ## 요청 방식 multipart/form-data 형식으로 전송합니다. ## 요청 필드 - **images_json**: 외부 이미지 URL 목록 (JSON 문자열, 선택) - **files**: 이미지 바이너리 파일 목록 (선택) **주의**: images_json 또는 files 중 최소 하나는 반드시 전달해야 합니다. ## 지원 이미지 확장자 jpg, jpeg, png, webp, heic, heif ## images_json 예시 ```json [ {"url": "https://example.com/image1.jpg"}, {"url": "https://example.com/image2.jpg", "name": "외관"} ] ``` ## 바이너리 파일 업로드 테스트 방법 ### cURL로 테스트 ```bash # 바이너리 파일만 업로드 curl -X POST "http://localhost:8000/image/upload/blob" \\ -H "Authorization: Bearer {access_token}" \\ -F "files=@/path/to/image1.jpg" \\ -F "files=@/path/to/image2.png" # URL + 바이너리 파일 동시 업로드 curl -X POST "http://localhost:8000/image/upload/blob" \\ -H "Authorization: Bearer {access_token}" \\ -F 'images_json=[{"url":"https://example.com/image.jpg"}]' \\ -F "files=@/path/to/local_image.jpg" ``` ## 반환 정보 - **task_id**: 새로 생성된 작업 고유 식별자 - **total_count**: 총 업로드된 이미지 개수 - **url_count**: URL로 등록된 이미지 개수 (Image 테이블에 외부 URL 그대로 저장) - **file_count**: 파일로 업로드된 이미지 개수 (Azure Blob Storage에 저장) - **saved_count**: Image 테이블에 저장된 row 수 - **images**: 업로드된 이미지 목록 - **source**: "url" (외부 URL) 또는 "blob" (Azure Blob Storage) - **image_urls**: Image 테이블에 저장된 현재 task_id의 이미지 URL 목록 ## 저장 경로 - 바이너리 파일: Azure Blob Storage ({BASE_URL}/{task_id}/image/{파일명}) - URL 이미지: 외부 URL 그대로 Image 테이블에 저장 """, response_model=ImageUploadResponse, responses={ 200: {"description": "이미지 업로드 성공"}, 400: {"description": "이미지가 제공되지 않음", "model": ErrorResponse}, 401: {"description": "인증 실패 (토큰 없음/만료)"}, }, tags=["Image-Blob"], openapi_extra={ "requestBody": { "content": { "multipart/form-data": { "encoding": {"files": {"contentType": "application/octet-stream"}} } } } }, ) async def upload_images_blob( images_json: Optional[str] = Form( default=None, description="외부 이미지 URL 목록 (JSON 문자열)", examples=[IMAGES_JSON_EXAMPLE], ), files: Optional[list[UploadFile]] = File( default=None, description="이미지 바이너리 파일 목록", ), current_user: User = Depends(get_current_user), ) -> ImageUploadResponse: """이미지 업로드 (URL + Azure Blob Storage) 3단계로 분리하여 세션 점유 시간 최소화: - Stage 1: 입력 검증 및 파일 데이터 준비 (세션 없음) - Stage 2: Azure Blob 업로드 (세션 없음) - Stage 3: DB 저장 (새 세션으로 빠르게 처리) """ request_start = time.perf_counter() # task_id 생성 task_id = await generate_task_id() logger.info(f"[upload_images_blob] START - task_id: {task_id}") # ========== Stage 1: 입력 검증 및 파일 데이터 준비 (세션 없음) ========== has_images_json = images_json is not None and images_json.strip() != "" has_files = files is not None and len(files) > 0 if not has_images_json and not has_files: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="images_json 또는 files 중 하나는 반드시 제공해야 합니다.", ) # images_json 파싱 url_images: list[ImageUrlItem] = [] if has_images_json and images_json: try: parsed = json.loads(images_json) if isinstance(parsed, list): url_images = [ImageUrlItem(**item) for item in parsed if item] except (json.JSONDecodeError, TypeError, ValueError) as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"images_json 파싱 오류: {str(e)}", ) # 유효한 파일만 필터링 및 파일 내용 미리 읽기 valid_files_data: list[tuple[str, str, bytes]] = [] # (original_name, ext, content) skipped_files: list[str] = [] if has_files and files: for f in files: is_valid_ext = _is_valid_image_extension(f.filename) is_not_empty = f.size is None or f.size > 0 is_real_file = f.filename and f.filename != "filename" if f and is_real_file and is_valid_ext and is_not_empty: # 파일 내용을 미리 읽어둠 content = await f.read() ext = _get_file_extension(f.filename) # type: ignore[arg-type] valid_files_data.append((f.filename or "image", ext, content)) else: skipped_files.append(f.filename or "unknown") if not url_images and not valid_files_data: detail = ( f"유효한 이미지가 없습니다. " f"지원 확장자: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}. " f"건너뛴 파일: {skipped_files}" ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=detail, ) stage1_time = time.perf_counter() logger.info( f"[upload_images_blob] Stage 1 done - urls: {len(url_images)}, " f"files: {len(valid_files_data)}, " f"elapsed: {(stage1_time - request_start) * 1000:.1f}ms" ) # ========== Stage 2: Azure Blob 업로드 (세션 없음) ========== # 업로드 결과를 저장할 리스트 (나중에 DB에 저장) blob_upload_results: list[tuple[str, str]] = [] # (img_name, blob_url) img_order = len(url_images) # URL 이미지 다음 순서부터 시작 if valid_files_data: uploader = AzureBlobUploader(user_uuid=current_user.user_uuid, task_id=task_id) total_files = len(valid_files_data) for idx, (original_name, ext, file_content) in enumerate(valid_files_data): name_without_ext = ( original_name.rsplit(".", 1)[0] if "." in original_name else original_name ) filename = f"{name_without_ext}_{img_order:03d}{ext}" logger.debug( f"[upload_images_blob] Uploading file {idx + 1}/{total_files}: " f"{filename} ({len(file_content)} bytes)" ) # Azure Blob Storage에 직접 업로드 upload_success = await uploader.upload_image_bytes(file_content, filename) if upload_success: blob_url = uploader.public_url blob_upload_results.append((original_name, blob_url)) img_order += 1 logger.debug( f"[upload_images_blob] File {idx + 1}/{total_files} SUCCESS" ) else: skipped_files.append(filename) logger.warning( f"[upload_images_blob] File {idx + 1}/{total_files} FAILED" ) stage2_time = time.perf_counter() logger.info( f"[upload_images_blob] Stage 2 done - blob uploads: " f"{len(blob_upload_results)}, skipped: {len(skipped_files)}, " f"elapsed: {(stage2_time - stage1_time) * 1000:.1f}ms" ) # ========== Stage 3: DB 저장 (새 세션으로 빠르게 처리) ========== logger.info("[upload_images_blob] Stage 3 starting - DB save...") result_images: list[ImageUploadResultItem] = [] img_order = 0 try: async with AsyncSessionLocal() as session: # URL 이미지 저장 for url_item in url_images: img_name = url_item.name or _extract_image_name(url_item.url, img_order) image = Image( task_id=task_id, img_name=img_name, img_url=url_item.url, img_order=img_order, ) session.add(image) await session.flush() result_images.append( ImageUploadResultItem( id=image.id, img_name=img_name, img_url=url_item.url, img_order=img_order, source="url", ) ) img_order += 1 # Blob 업로드 결과 저장 for img_name, blob_url in blob_upload_results: image = Image( task_id=task_id, img_name=img_name, img_url=blob_url, img_order=img_order, ) session.add(image) await session.flush() result_images.append( ImageUploadResultItem( id=image.id, img_name=img_name, img_url=blob_url, img_order=img_order, source="blob", ) ) img_order += 1 await session.commit() stage3_time = time.perf_counter() logger.info( f"[upload_images_blob] Stage 3 done - " f"saved: {len(result_images)}, " f"elapsed: {(stage3_time - stage2_time) * 1000:.1f}ms" ) except SQLAlchemyError as e: logger.error(f"[upload_images_blob] DB Error - task_id: {task_id}, error: {e}") logger.exception("[upload_images_blob] DB 상세 오류:") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="이미지 저장 중 데이터베이스 오류가 발생했습니다.", ) except Exception as e: logger.error( f"[upload_images_blob] Stage 3 EXCEPTION - " f"task_id: {task_id}, error: {type(e).__name__}: {e}" ) logger.exception("[upload_images_blob] Stage 3 상세 오류:") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="이미지 업로드 중 오류가 발생했습니다.", ) saved_count = len(result_images) image_urls = [img.img_url for img in result_images] logger.info(f"[image_tagging] START - task_id: {task_id}") await tagging_images(image_urls, clear_old_tags=True) logger.info(f"[image_tagging] Done - task_id: {task_id}") total_time = time.perf_counter() - request_start logger.info( f"[upload_images_blob] SUCCESS - task_id: {task_id}, " f"total: {saved_count}, total_time: {total_time * 1000:.1f}ms" ) return ImageUploadResponse( task_id=task_id, total_count=len(result_images), url_count=len(url_images), file_count=len(blob_upload_results), saved_count=saved_count, images=result_images, image_urls=image_urls, ) async def tagging_images( image_urls : list[str], clear_old_tags : bool = False ) -> None: # 1. 조회 async with AsyncSessionLocal() as session: stmt = ( select(ImageTag) .where(ImageTag.img_url_hash.in_([func.crc32(url) for url in image_urls])) .where(ImageTag.img_url.in_(image_urls)) ) image_tags_query_results = await session.execute(stmt) image_tags = image_tags_query_results.scalars().all() existing_urls = {tag.img_url for tag in image_tags} new_imt = [ ImageTag(img_url=url, img_tag=None) for url in image_urls if url not in existing_urls ] if clear_old_tags: for tag in image_tags: tag.img_tag = None session.add_all(new_imt) null_imts = [imt for imt in image_tags if imt.img_tag is None] + new_imt await session.commit() if null_imts: tag_datas = await autotag_images([img.img_url for img in null_imts]) print(tag_datas) async with AsyncSessionLocal() as session: for tag, tag_data in zip(null_imts, tag_datas): if isinstance(tag_data, Exception): continue tag.img_tag = tag_data.model_dump(mode="json") session.add(tag) await session.commit()