from pathlib import Path from fastapi import APIRouter from app.home.schemas.home import ( CrawlingRequest, CrawlingResponse, ErrorResponse, MarketingAnalysis, ProcessedInfo, ) from app.utils.chatgpt_prompt import ChatgptService from app.utils.nvMapScraper import NvMapScraper MEDIA_ROOT = Path("media") # 전국 시 이름 목록 (roadAddress에서 region 추출용) # fmt: off KOREAN_CITIES = [ # 특별시/광역시 "서울시", "부산시", "대구시", "인천시", "광주시", "대전시", "울산시", "세종시", # 경기도 "수원시", "성남시", "고양시", "용인시", "부천시", "안산시", "안양시", "남양주시", "화성시", "평택시", "의정부시", "시흥시", "파주시", "광명시", "김포시", "군포시", "광주시", "이천시", "양주시", "오산시", "구리시", "안성시", "포천시", "의왕시", "하남시", "여주시", "동두천시", "과천시", # 강원도 "춘천시", "원주시", "강릉시", "동해시", "태백시", "속초시", "삼척시", # 충청북도 "청주시", "충주시", "제천시", # 충청남도 "천안시", "공주시", "보령시", "아산시", "서산시", "논산시", "계룡시", "당진시", # 전라북도 "전주시", "군산시", "익산시", "정읍시", "남원시", "김제시", # 전라남도 "목포시", "여수시", "순천시", "나주시", "광양시", # 경상북도 "포항시", "경주시", "김천시", "안동시", "구미시", "영주시", "영천시", "상주시", "문경시", "경산시", # 경상남도 "창원시", "진주시", "통영시", "사천시", "김해시", "밀양시", "거제시", "양산시", # 제주도 "제주시", "서귀포시", ] # fmt: on router = APIRouter() def _extract_region_from_address(road_address: str | None) -> str: """roadAddress에서 시 이름 추출""" if not road_address: return "" for city in KOREAN_CITIES: if city in road_address: return city return "" @router.post( "/crawling", summary="네이버 지도 크롤링", description=""" 네이버 지도 장소 URL을 입력받아 이미지 목록과 기본 정보를 크롤링합니다. ## 요청 필드 - **url**: 네이버 지도 장소 URL (필수) ## 반환 정보 - **image_list**: 장소 이미지 URL 목록 - **image_count**: 이미지 개수 - **processed_info**: 가공된 장소 정보 (customer_name, region, detail_region_info) """, response_model=CrawlingResponse, response_description="크롤링 결과", responses={ 200: {"description": "크롤링 성공", "model": CrawlingResponse}, 400: { "description": "잘못된 URL", "model": ErrorResponse, }, }, tags=["crawling"], ) async def crawling(request_body: CrawlingRequest): """네이버 지도 장소 크롤링""" scraper = NvMapScraper(request_body.url) await scraper.scrap() # 가공된 정보 생성 processed_info = None marketing_analysis = None if scraper.base_info: road_address = scraper.base_info.get("roadAddress", "") customer_name = scraper.base_info.get("name", "") region = _extract_region_from_address(road_address) processed_info = ProcessedInfo( customer_name=customer_name, region=region, detail_region_info=road_address or "", ) # ChatGPT를 이용한 마케팅 분석 chatgpt_service = ChatgptService( customer_name=customer_name, region=region, detail_region_info=road_address or "", ) prompt = chatgpt_service.build_market_analysis_prompt() raw_response = await chatgpt_service.generate(prompt) parsed = await chatgpt_service.parse_marketing_analysis(raw_response) marketing_analysis = MarketingAnalysis(**parsed) return { "image_list": scraper.image_link_list, "image_count": len(scraper.image_link_list) if scraper.image_link_list else 0, "processed_info": processed_info, "marketing_analysis": marketing_analysis, } def _extract_image_name(url: str, index: int) -> str: """URL에서 이미지 이름 추출 또는 기본 이름 생성""" try: from urllib.parse import unquote, urlparse path = urlparse(url).path filename = path.split("/")[-1] if path else "" if filename: return unquote(filename) except Exception: pass return f"image_{index + 1:03d}" # @router.post( # "/generate", # summary="기본 영상 생성 요청", # description=""" # 고객 정보만 받아 영상 생성 작업을 시작합니다. (이미지 없음) # ## 요청 필드 # - **customer_name**: 고객명/가게명 (필수) # - **region**: 지역명 (필수) # - **detail_region_info**: 상세 지역 정보 (선택) # - **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood) # ## 반환 정보 # - **task_id**: 작업 고유 식별자 (UUID7) # - **status**: 작업 상태 # - **message**: 응답 메시지 # """, # response_model=GenerateResponse, # response_description="생성 작업 시작 결과", # tags=["generate"], # ) # async def generate( # request_body: GenerateRequest, # background_tasks: BackgroundTasks, # session: AsyncSession = Depends(get_session), # ): # """기본 영상 생성 요청 처리 (이미지 없음)""" # # UUID7 생성 및 중복 검사 # while True: # task_id = str(uuid7()) # existing = await session.execute( # select(Project).where(Project.task_id == task_id) # ) # if existing.scalar_one_or_none() is None: # break # # Project 생성 (이미지 없음) # project = Project( # store_name=request_body.customer_name, # region=request_body.region, # task_id=task_id, # detail_region_info=json.dumps( # { # "detail": request_body.detail_region_info, # "attribute": request_body.attribute.model_dump(), # }, # ensure_ascii=False, # ), # ) # session.add(project) # await session.commit() # await session.refresh(project) # background_tasks.add_task(task_process, request_body, task_id, project.id) # return { # "task_id": task_id, # "status": "processing", # "message": "생성 작업이 시작되었습니다.", # } # @router.post( # "/generate/urls", # summary="URL 기반 영상 생성 요청", # description=""" # 고객 정보와 이미지 URL을 받아 영상 생성 작업을 시작합니다. # ## 요청 필드 # - **customer_name**: 고객명/가게명 (필수) # - **region**: 지역명 (필수) # - **detail_region_info**: 상세 지역 정보 (선택) # - **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood) # - **images**: 이미지 URL 목록 (필수) # ## 반환 정보 # - **task_id**: 작업 고유 식별자 (UUID7) # - **status**: 작업 상태 # - **message**: 응답 메시지 # """, # response_model=GenerateResponse, # response_description="생성 작업 시작 결과", # tags=["generate"], # ) # async def generate_urls( # request_body: GenerateUrlsRequest, # session: AsyncSession = Depends(get_session), # ): # """URL 기반 영상 생성 요청 처리""" # # UUID7 생성 및 중복 검사 # while True: # task_id = str(uuid7()) # existing = await session.execute( # select(Project).where(Project.task_id == task_id) # ) # if existing.scalar_one_or_none() is None: # break # # Project 생성 (이미지 정보 제외) # project = Project( # store_name=request_body.customer_name, # region=request_body.region, # task_id=task_id, # detail_region_info=json.dumps( # { # "detail": request_body.detail_region_info, # "attribute": request_body.attribute.model_dump(), # }, # ensure_ascii=False, # ), # ) # session.add(project) # # Image 레코드 생성 (독립 테이블, task_id로 연결) # for idx, img_item in enumerate(request_body.images): # # name이 있으면 사용, 없으면 URL에서 추출 # img_name = img_item.name or _extract_image_name(img_item.url, idx) # image = Image( # task_id=task_id, # img_name=img_name, # img_url=img_item.url, # img_order=idx, # ) # session.add(image) # await session.commit() # return { # "task_id": task_id, # "status": "processing", # "message": "생성 작업이 시작되었습니다.", # } # async def _save_upload_file(file: UploadFile, save_path: Path) -> None: # """업로드 파일을 지정된 경로에 저장""" # save_path.parent.mkdir(parents=True, exist_ok=True) # async with aiofiles.open(save_path, "wb") as f: # content = await file.read() # await f.write(content) # def _get_file_extension(filename: str | None) -> str: # """파일명에서 확장자 추출""" # if not filename: # return ".jpg" # ext = Path(filename).suffix.lower() # return ext if ext else ".jpg" # @router.post( # "/generate/upload", # summary="파일 업로드 기반 영상 생성 요청", # description=""" # 고객 정보와 이미지 파일을 받아 영상 생성 작업을 시작합니다. # ## 요청 필드 (multipart/form-data) # - **customer_name**: 고객명/가게명 (필수) # - **region**: 지역명 (필수) # - **detail_region_info**: 상세 지역 정보 (선택) # - **attribute**: 음악 속성 정보 JSON 문자열 (필수) # - **images**: 이미지 파일 목록 (필수, 복수 파일) # ## 반환 정보 # - **task_id**: 작업 고유 식별자 (UUID7) # - **status**: 작업 상태 # - **message**: 응답 메시지 # - **uploaded_count**: 업로드된 이미지 개수 # """, # response_model=GenerateUploadResponse, # response_description="생성 작업 시작 결과", # tags=["generate"], # ) # async def generate_upload( # customer_name: str = Form(..., description="고객명/가게명"), # region: str = Form(..., description="지역명"), # attribute: str = Form(..., description="음악 속성 정보 (JSON 문자열)"), # images: list[UploadFile] = File(..., description="이미지 파일 목록"), # detail_region_info: str | None = Form(None, description="상세 지역 정보"), # session: AsyncSession = Depends(get_session), # ): # """파일 업로드 기반 영상 생성 요청 처리""" # # attribute JSON 파싱 및 검증 # try: # attribute_dict = json.loads(attribute) # attribute_info = AttributeInfo(**attribute_dict) # except json.JSONDecodeError: # raise HTTPException( # status_code=400, detail="attribute는 유효한 JSON 형식이어야 합니다." # ) # except Exception as e: # raise HTTPException(status_code=400, detail=f"attribute 검증 실패: {e}") # # 이미지 파일 검증 # if not images: # raise HTTPException( # status_code=400, detail="최소 1개 이상의 이미지 파일이 필요합니다." # ) # # UUID7 생성 및 중복 검사 # while True: # task_id = str(uuid7()) # existing = await session.execute( # select(Project).where(Project.task_id == task_id) # ) # if existing.scalar_one_or_none() is None: # break # # 저장 경로 생성: media/날짜/task_id/ # today = date.today().strftime("%Y%m%d") # upload_dir = MEDIA_ROOT / today / task_id # # Project 생성 (이미지 정보 제외) # project = Project( # store_name=customer_name, # region=region, # task_id=task_id, # detail_region_info=json.dumps( # { # "detail": detail_region_info, # "attribute": attribute_info.model_dump(), # }, # ensure_ascii=False, # ), # ) # session.add(project) # # 이미지 파일 저장 및 Image 레코드 생성 # for idx, file in enumerate(images): # # 각 이미지에 고유 UUID7 생성 # img_uuid = str(uuid7()) # ext = _get_file_extension(file.filename) # filename = f"{img_uuid}{ext}" # save_path = upload_dir / filename # # 파일 저장 # await _save_upload_file(file, save_path) # # Image 레코드 생성 (독립 테이블, task_id로 연결) # img_url = f"/media/{today}/{task_id}/{filename}" # image = Image( # task_id=task_id, # img_name=file.filename or filename, # img_url=img_url, # img_order=idx, # ) # session.add(image) # await session.commit() # return { # "task_id": task_id, # "status": "processing", # "message": "생성 작업이 시작되었습니다.", # "uploaded_count": len(images), # }