o2o-castad-backend/app/home/api/routers/v1/home.py

393 lines
13 KiB
Python

from pathlib import Path
from fastapi import APIRouter
from app.home.schemas.home import (
CrawlingRequest,
CrawlingResponse,
ErrorResponse,
MarketingAnalysis,
ProcessedInfo,
)
from app.utils.chatgpt_prompt import ChatgptService
from app.utils.nvMapScraper import NvMapScraper
MEDIA_ROOT = Path("media")
# 전국 시 이름 목록 (roadAddress에서 region 추출용)
# fmt: off
KOREAN_CITIES = [
# 특별시/광역시
"서울시", "부산시", "대구시", "인천시", "광주시", "대전시", "울산시", "세종시",
# 경기도
"수원시", "성남시", "고양시", "용인시", "부천시", "안산시", "안양시", "남양주시",
"화성시", "평택시", "의정부시", "시흥시", "파주시", "광명시", "김포시", "군포시",
"광주시", "이천시", "양주시", "오산시", "구리시", "안성시", "포천시", "의왕시",
"하남시", "여주시", "동두천시", "과천시",
# 강원도
"춘천시", "원주시", "강릉시", "동해시", "태백시", "속초시", "삼척시",
# 충청북도
"청주시", "충주시", "제천시",
# 충청남도
"천안시", "공주시", "보령시", "아산시", "서산시", "논산시", "계룡시", "당진시",
# 전라북도
"전주시", "군산시", "익산시", "정읍시", "남원시", "김제시",
# 전라남도
"목포시", "여수시", "순천시", "나주시", "광양시",
# 경상북도
"포항시", "경주시", "김천시", "안동시", "구미시", "영주시", "영천시", "상주시", "문경시", "경산시",
# 경상남도
"창원시", "진주시", "통영시", "사천시", "김해시", "밀양시", "거제시", "양산시",
# 제주도
"제주시", "서귀포시",
]
# fmt: on
router = APIRouter()
def _extract_region_from_address(road_address: str | None) -> str:
"""roadAddress에서 시 이름 추출"""
if not road_address:
return ""
for city in KOREAN_CITIES:
if city in road_address:
return city
return ""
@router.post(
"/crawling",
summary="네이버 지도 크롤링",
description="""
네이버 지도 장소 URL을 입력받아 이미지 목록과 기본 정보를 크롤링합니다.
## 요청 필드
- **url**: 네이버 지도 장소 URL (필수)
## 반환 정보
- **image_list**: 장소 이미지 URL 목록
- **image_count**: 이미지 개수
- **processed_info**: 가공된 장소 정보 (customer_name, region, detail_region_info)
""",
response_model=CrawlingResponse,
response_description="크롤링 결과",
responses={
200: {"description": "크롤링 성공", "model": CrawlingResponse},
400: {
"description": "잘못된 URL",
"model": ErrorResponse,
},
},
tags=["crawling"],
)
async def crawling(request_body: CrawlingRequest):
"""네이버 지도 장소 크롤링"""
scraper = NvMapScraper(request_body.url)
await scraper.scrap()
# 가공된 정보 생성
processed_info = None
marketing_analysis = None
if scraper.base_info:
road_address = scraper.base_info.get("roadAddress", "")
customer_name = scraper.base_info.get("name", "")
region = _extract_region_from_address(road_address)
processed_info = ProcessedInfo(
customer_name=customer_name,
region=region,
detail_region_info=road_address or "",
)
# ChatGPT를 이용한 마케팅 분석
chatgpt_service = ChatgptService(
customer_name=customer_name,
region=region,
detail_region_info=road_address or "",
)
prompt = chatgpt_service.build_market_analysis_prompt()
raw_response = await chatgpt_service.generate(prompt)
parsed = await chatgpt_service.parse_marketing_analysis(raw_response)
marketing_analysis = MarketingAnalysis(**parsed)
return {
"image_list": scraper.image_link_list,
"image_count": len(scraper.image_link_list) if scraper.image_link_list else 0,
"processed_info": processed_info,
"marketing_analysis": marketing_analysis,
}
def _extract_image_name(url: str, index: int) -> str:
"""URL에서 이미지 이름 추출 또는 기본 이름 생성"""
try:
from urllib.parse import unquote, urlparse
path = urlparse(url).path
filename = path.split("/")[-1] if path else ""
if filename:
return unquote(filename)
except Exception:
pass
return f"image_{index + 1:03d}"
# @router.post(
# "/generate",
# summary="기본 영상 생성 요청",
# description="""
# 고객 정보만 받아 영상 생성 작업을 시작합니다. (이미지 없음)
# ## 요청 필드
# - **customer_name**: 고객명/가게명 (필수)
# - **region**: 지역명 (필수)
# - **detail_region_info**: 상세 지역 정보 (선택)
# - **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood)
# ## 반환 정보
# - **task_id**: 작업 고유 식별자 (UUID7)
# - **status**: 작업 상태
# - **message**: 응답 메시지
# """,
# response_model=GenerateResponse,
# response_description="생성 작업 시작 결과",
# tags=["generate"],
# )
# async def generate(
# request_body: GenerateRequest,
# background_tasks: BackgroundTasks,
# session: AsyncSession = Depends(get_session),
# ):
# """기본 영상 생성 요청 처리 (이미지 없음)"""
# # UUID7 생성 및 중복 검사
# while True:
# task_id = str(uuid7())
# existing = await session.execute(
# select(Project).where(Project.task_id == task_id)
# )
# if existing.scalar_one_or_none() is None:
# break
# # Project 생성 (이미지 없음)
# project = Project(
# store_name=request_body.customer_name,
# region=request_body.region,
# task_id=task_id,
# detail_region_info=json.dumps(
# {
# "detail": request_body.detail_region_info,
# "attribute": request_body.attribute.model_dump(),
# },
# ensure_ascii=False,
# ),
# )
# session.add(project)
# await session.commit()
# await session.refresh(project)
# background_tasks.add_task(task_process, request_body, task_id, project.id)
# return {
# "task_id": task_id,
# "status": "processing",
# "message": "생성 작업이 시작되었습니다.",
# }
# @router.post(
# "/generate/urls",
# summary="URL 기반 영상 생성 요청",
# description="""
# 고객 정보와 이미지 URL을 받아 영상 생성 작업을 시작합니다.
# ## 요청 필드
# - **customer_name**: 고객명/가게명 (필수)
# - **region**: 지역명 (필수)
# - **detail_region_info**: 상세 지역 정보 (선택)
# - **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood)
# - **images**: 이미지 URL 목록 (필수)
# ## 반환 정보
# - **task_id**: 작업 고유 식별자 (UUID7)
# - **status**: 작업 상태
# - **message**: 응답 메시지
# """,
# response_model=GenerateResponse,
# response_description="생성 작업 시작 결과",
# tags=["generate"],
# )
# async def generate_urls(
# request_body: GenerateUrlsRequest,
# session: AsyncSession = Depends(get_session),
# ):
# """URL 기반 영상 생성 요청 처리"""
# # UUID7 생성 및 중복 검사
# while True:
# task_id = str(uuid7())
# existing = await session.execute(
# select(Project).where(Project.task_id == task_id)
# )
# if existing.scalar_one_or_none() is None:
# break
# # Project 생성 (이미지 정보 제외)
# project = Project(
# store_name=request_body.customer_name,
# region=request_body.region,
# task_id=task_id,
# detail_region_info=json.dumps(
# {
# "detail": request_body.detail_region_info,
# "attribute": request_body.attribute.model_dump(),
# },
# ensure_ascii=False,
# ),
# )
# session.add(project)
# # Image 레코드 생성 (독립 테이블, task_id로 연결)
# for idx, img_item in enumerate(request_body.images):
# # name이 있으면 사용, 없으면 URL에서 추출
# img_name = img_item.name or _extract_image_name(img_item.url, idx)
# image = Image(
# task_id=task_id,
# img_name=img_name,
# img_url=img_item.url,
# img_order=idx,
# )
# session.add(image)
# await session.commit()
# return {
# "task_id": task_id,
# "status": "processing",
# "message": "생성 작업이 시작되었습니다.",
# }
# async def _save_upload_file(file: UploadFile, save_path: Path) -> None:
# """업로드 파일을 지정된 경로에 저장"""
# save_path.parent.mkdir(parents=True, exist_ok=True)
# async with aiofiles.open(save_path, "wb") as f:
# content = await file.read()
# await f.write(content)
# def _get_file_extension(filename: str | None) -> str:
# """파일명에서 확장자 추출"""
# if not filename:
# return ".jpg"
# ext = Path(filename).suffix.lower()
# return ext if ext else ".jpg"
# @router.post(
# "/generate/upload",
# summary="파일 업로드 기반 영상 생성 요청",
# description="""
# 고객 정보와 이미지 파일을 받아 영상 생성 작업을 시작합니다.
# ## 요청 필드 (multipart/form-data)
# - **customer_name**: 고객명/가게명 (필수)
# - **region**: 지역명 (필수)
# - **detail_region_info**: 상세 지역 정보 (선택)
# - **attribute**: 음악 속성 정보 JSON 문자열 (필수)
# - **images**: 이미지 파일 목록 (필수, 복수 파일)
# ## 반환 정보
# - **task_id**: 작업 고유 식별자 (UUID7)
# - **status**: 작업 상태
# - **message**: 응답 메시지
# - **uploaded_count**: 업로드된 이미지 개수
# """,
# response_model=GenerateUploadResponse,
# response_description="생성 작업 시작 결과",
# tags=["generate"],
# )
# async def generate_upload(
# customer_name: str = Form(..., description="고객명/가게명"),
# region: str = Form(..., description="지역명"),
# attribute: str = Form(..., description="음악 속성 정보 (JSON 문자열)"),
# images: list[UploadFile] = File(..., description="이미지 파일 목록"),
# detail_region_info: str | None = Form(None, description="상세 지역 정보"),
# session: AsyncSession = Depends(get_session),
# ):
# """파일 업로드 기반 영상 생성 요청 처리"""
# # attribute JSON 파싱 및 검증
# try:
# attribute_dict = json.loads(attribute)
# attribute_info = AttributeInfo(**attribute_dict)
# except json.JSONDecodeError:
# raise HTTPException(
# status_code=400, detail="attribute는 유효한 JSON 형식이어야 합니다."
# )
# except Exception as e:
# raise HTTPException(status_code=400, detail=f"attribute 검증 실패: {e}")
# # 이미지 파일 검증
# if not images:
# raise HTTPException(
# status_code=400, detail="최소 1개 이상의 이미지 파일이 필요합니다."
# )
# # UUID7 생성 및 중복 검사
# while True:
# task_id = str(uuid7())
# existing = await session.execute(
# select(Project).where(Project.task_id == task_id)
# )
# if existing.scalar_one_or_none() is None:
# break
# # 저장 경로 생성: media/날짜/task_id/
# today = date.today().strftime("%Y%m%d")
# upload_dir = MEDIA_ROOT / today / task_id
# # Project 생성 (이미지 정보 제외)
# project = Project(
# store_name=customer_name,
# region=region,
# task_id=task_id,
# detail_region_info=json.dumps(
# {
# "detail": detail_region_info,
# "attribute": attribute_info.model_dump(),
# },
# ensure_ascii=False,
# ),
# )
# session.add(project)
# # 이미지 파일 저장 및 Image 레코드 생성
# for idx, file in enumerate(images):
# # 각 이미지에 고유 UUID7 생성
# img_uuid = str(uuid7())
# ext = _get_file_extension(file.filename)
# filename = f"{img_uuid}{ext}"
# save_path = upload_dir / filename
# # 파일 저장
# await _save_upload_file(file, save_path)
# # Image 레코드 생성 (독립 테이블, task_id로 연결)
# img_url = f"/media/{today}/{task_id}/{filename}"
# image = Image(
# task_id=task_id,
# img_name=file.filename or filename,
# img_url=img_url,
# img_order=idx,
# )
# session.add(image)
# await session.commit()
# return {
# "task_id": task_id,
# "status": "processing",
# "message": "생성 작업이 시작되었습니다.",
# "uploaded_count": len(images),
# }