마케팅 분석 결과 500자 요약 기능 추가

insta
bluebamus 2025-12-23 17:50:49 +09:00
parent 39db84c797
commit 83a72aaa46
3 changed files with 301 additions and 732 deletions

View File

@ -1,30 +1,15 @@
import json
import re
from datetime import date
from pathlib import Path
import aiofiles
from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from uuid_extensions import uuid7
from fastapi import APIRouter
from app.database.session import get_session
from app.home.models import Image, Project
from app.home.schemas.home import (
AttributeInfo,
CrawlingRequest,
CrawlingResponse,
ErrorResponse,
GenerateRequest,
GenerateResponse,
GenerateUploadResponse,
GenerateUrlsRequest,
MarketingAnalysis,
ProcessedInfo,
)
from app.utils.chatgpt_prompt import ChatgptService
from app.home.worker.main_task import task_process
from app.utils.nvMapScraper import NvMapScraper
MEDIA_ROOT = Path("media")
@ -71,32 +56,6 @@ def _extract_region_from_address(road_address: str | None) -> str:
return ""
def _parse_marketing_analysis(raw_response: str) -> MarketingAnalysis:
"""ChatGPT 마케팅 분석 응답을 파싱하여 MarketingAnalysis 객체로 변환"""
tags: list[str] = []
facilities: list[str] = []
report = raw_response
# JSON 블록 추출 시도
json_match = re.search(r"```json\s*(\{.*?\})\s*```", raw_response, re.DOTALL)
if json_match:
try:
json_data = json.loads(json_match.group(1))
tags = json_data.get("tags", [])
facilities = json_data.get("facilities", [])
# JSON 블록을 제외한 리포트 부분 추출
report = raw_response[: json_match.start()].strip()
# --- 구분자 제거
if report.startswith("---"):
report = report[3:].strip()
if report.endswith("---"):
report = report[:-3].strip()
except json.JSONDecodeError:
pass
return MarketingAnalysis(report=report, tags=tags, facilities=facilities)
@router.post(
"/crawling",
summary="네이버 지도 크롤링",
@ -150,7 +109,8 @@ async def crawling(request_body: CrawlingRequest):
)
prompt = chatgpt_service.build_market_analysis_prompt()
raw_response = await chatgpt_service.generate(prompt)
marketing_analysis = _parse_marketing_analysis(raw_response)
parsed = await chatgpt_service.parse_marketing_analysis(raw_response)
marketing_analysis = MarketingAnalysis(**parsed)
return {
"image_list": scraper.image_link_list,
@ -174,259 +134,259 @@ def _extract_image_name(url: str, index: int) -> str:
return f"image_{index + 1:03d}"
@router.post(
"/generate",
summary="기본 영상 생성 요청",
description="""
고객 정보만 받아 영상 생성 작업을 시작합니다. (이미지 없음)
# @router.post(
# "/generate",
# summary="기본 영상 생성 요청",
# description="""
# 고객 정보만 받아 영상 생성 작업을 시작합니다. (이미지 없음)
## 요청 필드
- **customer_name**: 고객명/가게명 (필수)
- **region**: 지역명 (필수)
- **detail_region_info**: 상세 지역 정보 (선택)
- **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood)
# ## 요청 필드
# - **customer_name**: 고객명/가게명 (필수)
# - **region**: 지역명 (필수)
# - **detail_region_info**: 상세 지역 정보 (선택)
# - **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood)
## 반환 정보
- **task_id**: 작업 고유 식별자 (UUID7)
- **status**: 작업 상태
- **message**: 응답 메시지
""",
response_model=GenerateResponse,
response_description="생성 작업 시작 결과",
tags=["generate"],
)
async def generate(
request_body: GenerateRequest,
background_tasks: BackgroundTasks,
session: AsyncSession = Depends(get_session),
):
"""기본 영상 생성 요청 처리 (이미지 없음)"""
# UUID7 생성 및 중복 검사
while True:
task_id = str(uuid7())
existing = await session.execute(
select(Project).where(Project.task_id == task_id)
)
if existing.scalar_one_or_none() is None:
break
# ## 반환 정보
# - **task_id**: 작업 고유 식별자 (UUID7)
# - **status**: 작업 상태
# - **message**: 응답 메시지
# """,
# response_model=GenerateResponse,
# response_description="생성 작업 시작 결과",
# tags=["generate"],
# )
# async def generate(
# request_body: GenerateRequest,
# background_tasks: BackgroundTasks,
# session: AsyncSession = Depends(get_session),
# ):
# """기본 영상 생성 요청 처리 (이미지 없음)"""
# # UUID7 생성 및 중복 검사
# while True:
# task_id = str(uuid7())
# existing = await session.execute(
# select(Project).where(Project.task_id == task_id)
# )
# if existing.scalar_one_or_none() is None:
# break
# Project 생성 (이미지 없음)
project = Project(
store_name=request_body.customer_name,
region=request_body.region,
task_id=task_id,
detail_region_info=json.dumps(
{
"detail": request_body.detail_region_info,
"attribute": request_body.attribute.model_dump(),
},
ensure_ascii=False,
),
)
session.add(project)
await session.commit()
await session.refresh(project)
# # Project 생성 (이미지 없음)
# project = Project(
# store_name=request_body.customer_name,
# region=request_body.region,
# task_id=task_id,
# detail_region_info=json.dumps(
# {
# "detail": request_body.detail_region_info,
# "attribute": request_body.attribute.model_dump(),
# },
# ensure_ascii=False,
# ),
# )
# session.add(project)
# await session.commit()
# await session.refresh(project)
background_tasks.add_task(task_process, request_body, task_id, project.id)
# background_tasks.add_task(task_process, request_body, task_id, project.id)
return {
"task_id": task_id,
"status": "processing",
"message": "생성 작업이 시작되었습니다.",
}
# return {
# "task_id": task_id,
# "status": "processing",
# "message": "생성 작업이 시작되었습니다.",
# }
@router.post(
"/generate/urls",
summary="URL 기반 영상 생성 요청",
description="""
고객 정보와 이미지 URL을 받아 영상 생성 작업을 시작합니다.
# @router.post(
# "/generate/urls",
# summary="URL 기반 영상 생성 요청",
# description="""
# 고객 정보와 이미지 URL을 받아 영상 생성 작업을 시작합니다.
## 요청 필드
- **customer_name**: 고객명/가게명 (필수)
- **region**: 지역명 (필수)
- **detail_region_info**: 상세 지역 정보 (선택)
- **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood)
- **images**: 이미지 URL 목록 (필수)
# ## 요청 필드
# - **customer_name**: 고객명/가게명 (필수)
# - **region**: 지역명 (필수)
# - **detail_region_info**: 상세 지역 정보 (선택)
# - **attribute**: 음악 속성 정보 (genre, vocal, tempo, mood)
# - **images**: 이미지 URL 목록 (필수)
## 반환 정보
- **task_id**: 작업 고유 식별자 (UUID7)
- **status**: 작업 상태
- **message**: 응답 메시지
""",
response_model=GenerateResponse,
response_description="생성 작업 시작 결과",
tags=["generate"],
)
async def generate_urls(
request_body: GenerateUrlsRequest,
session: AsyncSession = Depends(get_session),
):
"""URL 기반 영상 생성 요청 처리"""
# UUID7 생성 및 중복 검사
while True:
task_id = str(uuid7())
existing = await session.execute(
select(Project).where(Project.task_id == task_id)
)
if existing.scalar_one_or_none() is None:
break
# ## 반환 정보
# - **task_id**: 작업 고유 식별자 (UUID7)
# - **status**: 작업 상태
# - **message**: 응답 메시지
# """,
# response_model=GenerateResponse,
# response_description="생성 작업 시작 결과",
# tags=["generate"],
# )
# async def generate_urls(
# request_body: GenerateUrlsRequest,
# session: AsyncSession = Depends(get_session),
# ):
# """URL 기반 영상 생성 요청 처리"""
# # UUID7 생성 및 중복 검사
# while True:
# task_id = str(uuid7())
# existing = await session.execute(
# select(Project).where(Project.task_id == task_id)
# )
# if existing.scalar_one_or_none() is None:
# break
# Project 생성 (이미지 정보 제외)
project = Project(
store_name=request_body.customer_name,
region=request_body.region,
task_id=task_id,
detail_region_info=json.dumps(
{
"detail": request_body.detail_region_info,
"attribute": request_body.attribute.model_dump(),
},
ensure_ascii=False,
),
)
session.add(project)
# # Project 생성 (이미지 정보 제외)
# project = Project(
# store_name=request_body.customer_name,
# region=request_body.region,
# task_id=task_id,
# detail_region_info=json.dumps(
# {
# "detail": request_body.detail_region_info,
# "attribute": request_body.attribute.model_dump(),
# },
# ensure_ascii=False,
# ),
# )
# session.add(project)
# Image 레코드 생성 (독립 테이블, task_id로 연결)
for idx, img_item in enumerate(request_body.images):
# name이 있으면 사용, 없으면 URL에서 추출
img_name = img_item.name or _extract_image_name(img_item.url, idx)
image = Image(
task_id=task_id,
img_name=img_name,
img_url=img_item.url,
img_order=idx,
)
session.add(image)
# # Image 레코드 생성 (독립 테이블, task_id로 연결)
# for idx, img_item in enumerate(request_body.images):
# # name이 있으면 사용, 없으면 URL에서 추출
# img_name = img_item.name or _extract_image_name(img_item.url, idx)
# image = Image(
# task_id=task_id,
# img_name=img_name,
# img_url=img_item.url,
# img_order=idx,
# )
# session.add(image)
await session.commit()
# await session.commit()
return {
"task_id": task_id,
"status": "processing",
"message": "생성 작업이 시작되었습니다.",
}
# return {
# "task_id": task_id,
# "status": "processing",
# "message": "생성 작업이 시작되었습니다.",
# }
async def _save_upload_file(file: UploadFile, save_path: Path) -> None:
"""업로드 파일을 지정된 경로에 저장"""
save_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(save_path, "wb") as f:
content = await file.read()
await f.write(content)
# async def _save_upload_file(file: UploadFile, save_path: Path) -> None:
# """업로드 파일을 지정된 경로에 저장"""
# save_path.parent.mkdir(parents=True, exist_ok=True)
# async with aiofiles.open(save_path, "wb") as f:
# content = await file.read()
# await f.write(content)
def _get_file_extension(filename: str | None) -> str:
"""파일명에서 확장자 추출"""
if not filename:
return ".jpg"
ext = Path(filename).suffix.lower()
return ext if ext else ".jpg"
# def _get_file_extension(filename: str | None) -> str:
# """파일명에서 확장자 추출"""
# if not filename:
# return ".jpg"
# ext = Path(filename).suffix.lower()
# return ext if ext else ".jpg"
@router.post(
"/generate/upload",
summary="파일 업로드 기반 영상 생성 요청",
description="""
고객 정보와 이미지 파일을 받아 영상 생성 작업을 시작합니다.
# @router.post(
# "/generate/upload",
# summary="파일 업로드 기반 영상 생성 요청",
# description="""
# 고객 정보와 이미지 파일을 받아 영상 생성 작업을 시작합니다.
## 요청 필드 (multipart/form-data)
- **customer_name**: 고객명/가게명 (필수)
- **region**: 지역명 (필수)
- **detail_region_info**: 상세 지역 정보 (선택)
- **attribute**: 음악 속성 정보 JSON 문자열 (필수)
- **images**: 이미지 파일 목록 (필수, 복수 파일)
# ## 요청 필드 (multipart/form-data)
# - **customer_name**: 고객명/가게명 (필수)
# - **region**: 지역명 (필수)
# - **detail_region_info**: 상세 지역 정보 (선택)
# - **attribute**: 음악 속성 정보 JSON 문자열 (필수)
# - **images**: 이미지 파일 목록 (필수, 복수 파일)
## 반환 정보
- **task_id**: 작업 고유 식별자 (UUID7)
- **status**: 작업 상태
- **message**: 응답 메시지
- **uploaded_count**: 업로드된 이미지 개수
""",
response_model=GenerateUploadResponse,
response_description="생성 작업 시작 결과",
tags=["generate"],
)
async def generate_upload(
customer_name: str = Form(..., description="고객명/가게명"),
region: str = Form(..., description="지역명"),
attribute: str = Form(..., description="음악 속성 정보 (JSON 문자열)"),
images: list[UploadFile] = File(..., description="이미지 파일 목록"),
detail_region_info: str | None = Form(None, description="상세 지역 정보"),
session: AsyncSession = Depends(get_session),
):
"""파일 업로드 기반 영상 생성 요청 처리"""
# attribute JSON 파싱 및 검증
try:
attribute_dict = json.loads(attribute)
attribute_info = AttributeInfo(**attribute_dict)
except json.JSONDecodeError:
raise HTTPException(
status_code=400, detail="attribute는 유효한 JSON 형식이어야 합니다."
)
except Exception as e:
raise HTTPException(status_code=400, detail=f"attribute 검증 실패: {e}")
# ## 반환 정보
# - **task_id**: 작업 고유 식별자 (UUID7)
# - **status**: 작업 상태
# - **message**: 응답 메시지
# - **uploaded_count**: 업로드된 이미지 개수
# """,
# response_model=GenerateUploadResponse,
# response_description="생성 작업 시작 결과",
# tags=["generate"],
# )
# async def generate_upload(
# customer_name: str = Form(..., description="고객명/가게명"),
# region: str = Form(..., description="지역명"),
# attribute: str = Form(..., description="음악 속성 정보 (JSON 문자열)"),
# images: list[UploadFile] = File(..., description="이미지 파일 목록"),
# detail_region_info: str | None = Form(None, description="상세 지역 정보"),
# session: AsyncSession = Depends(get_session),
# ):
# """파일 업로드 기반 영상 생성 요청 처리"""
# # attribute JSON 파싱 및 검증
# try:
# attribute_dict = json.loads(attribute)
# attribute_info = AttributeInfo(**attribute_dict)
# except json.JSONDecodeError:
# raise HTTPException(
# status_code=400, detail="attribute는 유효한 JSON 형식이어야 합니다."
# )
# except Exception as e:
# raise HTTPException(status_code=400, detail=f"attribute 검증 실패: {e}")
# 이미지 파일 검증
if not images:
raise HTTPException(
status_code=400, detail="최소 1개 이상의 이미지 파일이 필요합니다."
)
# # 이미지 파일 검증
# if not images:
# raise HTTPException(
# status_code=400, detail="최소 1개 이상의 이미지 파일이 필요합니다."
# )
# UUID7 생성 및 중복 검사
while True:
task_id = str(uuid7())
existing = await session.execute(
select(Project).where(Project.task_id == task_id)
)
if existing.scalar_one_or_none() is None:
break
# # UUID7 생성 및 중복 검사
# while True:
# task_id = str(uuid7())
# existing = await session.execute(
# select(Project).where(Project.task_id == task_id)
# )
# if existing.scalar_one_or_none() is None:
# break
# 저장 경로 생성: media/날짜/task_id/
today = date.today().strftime("%Y%m%d")
upload_dir = MEDIA_ROOT / today / task_id
# # 저장 경로 생성: media/날짜/task_id/
# today = date.today().strftime("%Y%m%d")
# upload_dir = MEDIA_ROOT / today / task_id
# Project 생성 (이미지 정보 제외)
project = Project(
store_name=customer_name,
region=region,
task_id=task_id,
detail_region_info=json.dumps(
{
"detail": detail_region_info,
"attribute": attribute_info.model_dump(),
},
ensure_ascii=False,
),
)
session.add(project)
# # Project 생성 (이미지 정보 제외)
# project = Project(
# store_name=customer_name,
# region=region,
# task_id=task_id,
# detail_region_info=json.dumps(
# {
# "detail": detail_region_info,
# "attribute": attribute_info.model_dump(),
# },
# ensure_ascii=False,
# ),
# )
# session.add(project)
# 이미지 파일 저장 및 Image 레코드 생성
for idx, file in enumerate(images):
# 각 이미지에 고유 UUID7 생성
img_uuid = str(uuid7())
ext = _get_file_extension(file.filename)
filename = f"{img_uuid}{ext}"
save_path = upload_dir / filename
# # 이미지 파일 저장 및 Image 레코드 생성
# for idx, file in enumerate(images):
# # 각 이미지에 고유 UUID7 생성
# img_uuid = str(uuid7())
# ext = _get_file_extension(file.filename)
# filename = f"{img_uuid}{ext}"
# save_path = upload_dir / filename
# 파일 저장
await _save_upload_file(file, save_path)
# # 파일 저장
# await _save_upload_file(file, save_path)
# Image 레코드 생성 (독립 테이블, task_id로 연결)
img_url = f"/media/{today}/{task_id}/{filename}"
image = Image(
task_id=task_id,
img_name=file.filename or filename,
img_url=img_url,
img_order=idx,
)
session.add(image)
# # Image 레코드 생성 (독립 테이블, task_id로 연결)
# img_url = f"/media/{today}/{task_id}/{filename}"
# image = Image(
# task_id=task_id,
# img_name=file.filename or filename,
# img_url=img_url,
# img_order=idx,
# )
# session.add(image)
await session.commit()
# await session.commit()
return {
"task_id": task_id,
"status": "processing",
"message": "생성 작업이 시작되었습니다.",
"uploaded_count": len(images),
}
# return {
# "task_id": task_id,
# "status": "processing",
# "message": "생성 작업이 시작되었습니다.",
# "uploaded_count": len(images),
# }

View File

@ -1,463 +0,0 @@
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Body, Depends, status
from pydantic import BaseModel, EmailStr, Field
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session
router = APIRouter()
# ============================================================
# Pydantic Models for Request/Response
# ============================================================
class SigninRequest(BaseModel):
"""회원가입 요청 스키마"""
email: EmailStr = Field(
..., description="사용자 이메일 주소", example="user@example.com"
)
password: str = Field(
..., min_length=8, description="비밀번호 (최소 8자)", example="password123"
)
name: str = Field(
..., min_length=2, max_length=50, description="사용자 이름", example="홍길동"
)
phone: Optional[str] = Field(
None,
pattern=r"^\d{3}-\d{4}-\d{4}$",
description="전화번호 (형식: 010-1234-5678)",
example="010-1234-5678",
)
class SigninResponse(BaseModel):
"""회원가입 응답 스키마"""
success: bool = Field(..., description="요청 성공 여부")
message: str = Field(..., description="응답 메시지")
user_id: int = Field(..., description="생성된 사용자 ID")
email: EmailStr = Field(..., description="등록된 이메일")
created_at: datetime = Field(..., description="계정 생성 시간")
class LoginRequest(BaseModel):
"""로그인 요청 스키마"""
email: EmailStr = Field(
..., description="사용자 이메일 주소", example="user@example.com"
)
password: str = Field(..., description="비밀번호", example="password123")
class LoginResponse(BaseModel):
"""로그인 응답 스키마"""
success: bool = Field(..., description="로그인 성공 여부")
message: str = Field(..., description="응답 메시지")
access_token: str = Field(..., description="JWT 액세스 토큰")
refresh_token: str = Field(..., description="JWT 리프레시 토큰")
token_type: str = Field(default="bearer", description="토큰 타입")
expires_in: int = Field(..., description="토큰 만료 시간 (초)")
class LogoutResponse(BaseModel):
"""로그아웃 응답 스키마"""
success: bool = Field(..., description="로그아웃 성공 여부")
message: str = Field(..., description="응답 메시지")
class ProfileResponse(BaseModel):
"""프로필 조회 응답 스키마"""
user_id: int = Field(..., description="사용자 ID")
email: EmailStr = Field(..., description="이메일 주소")
name: str = Field(..., description="사용자 이름")
phone: Optional[str] = Field(None, description="전화번호")
profile_image: Optional[str] = Field(None, description="프로필 이미지 URL")
created_at: datetime = Field(..., description="계정 생성 시간")
last_login: Optional[datetime] = Field(None, description="마지막 로그인 시간")
class HomeResponse(BaseModel):
"""홈 응답 스키마"""
message: str = Field(..., description="환영 메시지")
version: str = Field(..., description="API 버전")
status: str = Field(..., description="서비스 상태")
timestamp: datetime = Field(..., description="응답 시간")
class ErrorResponse(BaseModel):
"""에러 응답 스키마"""
success: bool = Field(default=False, description="요청 성공 여부")
error_code: str = Field(..., description="에러 코드")
message: str = Field(..., description="에러 메시지")
detail: Optional[str] = Field(None, description="상세 에러 정보")
# ============================================================
# Dummy Data
# ============================================================
DUMMY_USER = {
"user_id": 1,
"email": "user@example.com",
"name": "홍길동",
"phone": "010-1234-5678",
"profile_image": "https://example.com/images/profile/default.png",
"created_at": datetime(2024, 1, 15, 10, 30, 0),
"last_login": datetime(2024, 12, 18, 9, 0, 0),
}
DUMMY_TOKENS = {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZXhwIjoxNzM0NTAwMDAwfQ.dummy_signature",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZXhwIjoxNzM1MDAwMDAwfQ.dummy_refresh",
"token_type": "bearer",
"expires_in": 3600,
}
# ============================================================
# Endpoints
# ============================================================
@router.get(
"/db",
summary="데이터베이스 상태 확인",
description="데이터베이스 연결 상태를 확인합니다. 간단한 쿼리를 실행하여 DB 연결이 정상인지 테스트합니다.",
response_description="데이터베이스 연결 상태 정보",
responses={
200: {
"description": "데이터베이스 연결 정상",
"content": {
"application/json": {
"example": {
"status": "healthy",
"database": "connected",
"test_query": 1,
}
}
},
},
500: {"description": "데이터베이스 연결 실패", "model": ErrorResponse},
},
tags=["health"],
)
async def db_health_check(session: AsyncSession = Depends(get_session)):
"""DB 연결 상태 확인"""
try:
result = await session.execute(text("SELECT 1"))
return {
"status": "healthy",
"database": "connected",
"test_query": result.scalar(),
}
except Exception as e:
return {"status": "unhealthy", "database": "disconnected", "error": str(e)}
@router.get(
"/",
summary="홈 엔드포인트",
description="API 서비스의 기본 정보를 반환합니다. 서비스 상태, 버전, 현재 시간 등의 정보를 확인할 수 있습니다.",
response_model=HomeResponse,
response_description="서비스 기본 정보",
responses={
200: {
"description": "성공적으로 홈 정보 반환",
"content": {
"application/json": {
"example": {
"message": "CASTAD API 서비스에 오신 것을 환영합니다.",
"version": "0.1.0",
"status": "running",
"timestamp": "2024-12-18T10:00:00",
}
}
},
}
},
tags=["home"],
)
async def home() -> HomeResponse:
"""홈 페이지 - API 기본 정보 반환"""
return HomeResponse(
message="CASTAD API 서비스에 오신 것을 환영합니다.",
version="0.1.0",
status="running",
timestamp=datetime.now(),
)
@router.post(
"/signin",
summary="회원가입",
description="""
새로운 사용자 계정을 생성합니다.
## 요청 필드
- **email**: 유효한 이메일 주소 (필수)
- **password**: 최소 8 이상의 비밀번호 (필수)
- **name**: 2~50 사이의 사용자 이름 (필수)
- **phone**: 전화번호 (선택, 형식: 010-1234-5678)
## 비밀번호 정책
- 최소 8 이상
- 영문, 숫자 조합 권장
""",
response_model=SigninResponse,
response_description="회원가입 결과",
status_code=status.HTTP_201_CREATED,
responses={
201: {"description": "회원가입 성공", "model": SigninResponse},
400: {
"description": "잘못된 요청 (유효성 검사 실패)",
"model": ErrorResponse,
"content": {
"application/json": {
"example": {
"success": False,
"error_code": "VALIDATION_ERROR",
"message": "입력값이 유효하지 않습니다.",
"detail": "이메일 형식이 올바르지 않습니다.",
}
}
},
},
409: {
"description": "이미 존재하는 이메일",
"model": ErrorResponse,
"content": {
"application/json": {
"example": {
"success": False,
"error_code": "EMAIL_EXISTS",
"message": "이미 등록된 이메일입니다.",
"detail": None,
}
}
},
},
},
tags=["auth"],
)
async def signin(
request_body: SigninRequest = Body(
...,
description="회원가입에 필요한 사용자 정보",
openapi_examples={
"기본 예시": {
"summary": "필수 필드만 입력",
"description": "이메일, 비밀번호, 이름만 입력하는 경우",
"value": {
"email": "newuser@example.com",
"password": "securepass123",
"name": "김철수",
},
},
"전체 필드 예시": {
"summary": "모든 필드 입력",
"description": "선택 필드를 포함한 전체 입력",
"value": {
"email": "newuser@example.com",
"password": "securepass123",
"name": "김철수",
"phone": "010-9876-5432",
},
},
},
),
) -> SigninResponse:
"""새로운 사용자 회원가입 처리"""
return SigninResponse(
success=True,
message="회원가입이 완료되었습니다.",
user_id=2,
email=request_body.email,
created_at=datetime.now(),
)
@router.post(
"/login",
summary="로그인",
description="""
사용자 인증을 수행하고 JWT 토큰을 발급합니다.
## 인증 방식
이메일과 비밀번호를 사용한 기본 인증을 수행합니다.
인증 성공 액세스 토큰과 리프레시 토큰이 발급됩니다.
## 토큰 정보
- **access_token**: API 요청 사용 (유효기간: 1시간)
- **refresh_token**: 액세스 토큰 갱신 사용 (유효기간: 7)
""",
response_model=LoginResponse,
response_description="로그인 결과 및 토큰 정보",
responses={
200: {"description": "로그인 성공", "model": LoginResponse},
401: {
"description": "인증 실패",
"model": ErrorResponse,
"content": {
"application/json": {
"example": {
"success": False,
"error_code": "INVALID_CREDENTIALS",
"message": "이메일 또는 비밀번호가 올바르지 않습니다.",
"detail": None,
}
}
},
},
403: {
"description": "계정 비활성화",
"model": ErrorResponse,
"content": {
"application/json": {
"example": {
"success": False,
"error_code": "ACCOUNT_DISABLED",
"message": "비활성화된 계정입니다.",
"detail": "관리자에게 문의하세요.",
}
}
},
},
},
tags=["auth"],
)
async def login(
request_body: LoginRequest = Body(
...,
description="로그인 인증 정보",
openapi_examples={
"로그인 예시": {
"summary": "일반 로그인",
"description": "이메일과 비밀번호로 로그인",
"value": {"email": "user@example.com", "password": "password123"},
}
},
),
) -> LoginResponse:
"""사용자 로그인 및 토큰 발급"""
return LoginResponse(
success=True,
message="로그인에 성공했습니다.",
access_token=DUMMY_TOKENS["access_token"],
refresh_token=DUMMY_TOKENS["refresh_token"],
token_type=DUMMY_TOKENS["token_type"],
expires_in=DUMMY_TOKENS["expires_in"],
)
@router.post(
"/logout",
summary="로그아웃",
description="""
현재 세션을 종료하고 토큰을 무효화합니다.
## 동작 방식
- 서버 토큰 블랙리스트에 현재 토큰 등록
- 클라이언트 토큰 삭제 권장
## 주의사항
로그아웃 후에는 동일한 토큰으로 API 요청이 불가능합니다.
""",
response_model=LogoutResponse,
response_description="로그아웃 결과",
responses={
200: {"description": "로그아웃 성공", "model": LogoutResponse},
401: {
"description": "인증되지 않은 요청",
"model": ErrorResponse,
"content": {
"application/json": {
"example": {
"success": False,
"error_code": "UNAUTHORIZED",
"message": "인증이 필요합니다.",
"detail": "유효한 토큰을 제공해주세요.",
}
}
},
},
},
tags=["auth"],
)
async def logout() -> LogoutResponse:
"""사용자 로그아웃 처리"""
return LogoutResponse(success=True, message="로그아웃되었습니다.")
@router.get(
"/profile",
summary="프로필 조회",
description="""
현재 로그인한 사용자의 프로필 정보를 조회합니다.
## 반환 정보
- 기본 정보: 사용자 ID, 이메일, 이름
- 연락처 정보: 전화번호
- 프로필 이미지 URL
- 계정 정보: 생성일, 마지막 로그인 시간
## 인증 필요
엔드포인트는 유효한 액세스 토큰이 필요합니다.
Authorization 헤더에 Bearer 토큰을 포함해주세요.
""",
response_model=ProfileResponse,
response_description="사용자 프로필 정보",
responses={
200: {"description": "프로필 조회 성공", "model": ProfileResponse},
401: {
"description": "인증되지 않은 요청",
"model": ErrorResponse,
"content": {
"application/json": {
"example": {
"success": False,
"error_code": "UNAUTHORIZED",
"message": "인증이 필요합니다.",
"detail": "유효한 토큰을 제공해주세요.",
}
}
},
},
404: {
"description": "사용자를 찾을 수 없음",
"model": ErrorResponse,
"content": {
"application/json": {
"example": {
"success": False,
"error_code": "USER_NOT_FOUND",
"message": "사용자를 찾을 수 없습니다.",
"detail": None,
}
}
},
},
},
tags=["user"],
)
async def profile() -> ProfileResponse:
"""현재 사용자 프로필 조회"""
return ProfileResponse(
user_id=DUMMY_USER["user_id"],
email=DUMMY_USER["email"],
name=DUMMY_USER["name"],
phone=DUMMY_USER["phone"],
profile_image=DUMMY_USER["profile_image"],
created_at=DUMMY_USER["created_at"],
last_login=DUMMY_USER["last_login"],
)

View File

@ -1,3 +1,6 @@
import json
import re
from openai import AsyncOpenAI
from config import apikey_settings
@ -255,3 +258,72 @@ class ChatgptService:
)
message = completion.choices[0].message.content
return message or ""
async def summarize_marketing(self, text: str) -> str:
"""마케팅 텍스트를 항목으로 구분하여 500자로 요약 정리"""
prompt = f"""[ROLE]
마케팅 콘텐츠 요약 전문가
[INPUT]
{text}
[TASK]
텍스트를 분석하여 핵심 내용을 항목별로 구분하여 500 이내로 요약해주세요.
[OUTPUT REQUIREMENTS]
- 항목별로 구분하여 정리 (: 타겟 고객, 차별점, 지역 특성 )
- 500 이내로 요약
- 핵심 정보만 간결하게 포함
- 한국어로 작성
[OUTPUT FORMAT]
---
[항목별로 구분된 500 이내 요약]
---
"""
completion = await self.client.chat.completions.create(
model=self.model, messages=[{"role": "user", "content": prompt}]
)
message = completion.choices[0].message.content
result = message or ""
# --- 구분자 제거
if result.startswith("---"):
result = result[3:].strip()
if result.endswith("---"):
result = result[:-3].strip()
return result
async def parse_marketing_analysis(self, raw_response: str) -> dict:
"""ChatGPT 마케팅 분석 응답을 파싱하고 요약하여 딕셔너리로 반환
Returns:
dict: {"report": str, "tags": list[str], "facilities": list[str]}
"""
tags: list[str] = []
facilities: list[str] = []
report = raw_response
# JSON 블록 추출 시도
json_match = re.search(r"```json\s*(\{.*?\})\s*```", raw_response, re.DOTALL)
if json_match:
try:
json_data = json.loads(json_match.group(1))
tags = json_data.get("tags", [])
facilities = json_data.get("facilities", [])
# JSON 블록을 제외한 리포트 부분 추출
report = raw_response[: json_match.start()].strip()
# --- 구분자 제거
if report.startswith("---"):
report = report[3:].strip()
if report.endswith("---"):
report = report[:-3].strip()
except json.JSONDecodeError:
pass
# 리포트 내용을 500자로 요약
if report:
report = await self.summarize_marketing(report)
return {"report": report, "tags": tags, "facilities": facilities}