diff --git a/docs/analysis/lang_report.md b/docs/analysis/lang_report.md new file mode 100644 index 0000000..431f9ae --- /dev/null +++ b/docs/analysis/lang_report.md @@ -0,0 +1,1705 @@ +# CastAD 백엔드 - LangChain, LangGraph, RAG 적용 설계 보고서 + +## 목차 +1. [현재 시스템 분석](#1-현재-시스템-분석) +2. [LangChain 적용 설계](#2-langchain-적용-설계) +3. [LangGraph 적용 설계](#3-langgraph-적용-설계) +4. [RAG 적용 설계](#4-rag-적용-설계) +5. [통합 아키텍처](#5-통합-아키텍처) +6. [기대 효과](#6-기대-효과) +7. [구현 로드맵](#7-구현-로드맵) +8. [결론](#8-결론) + +--- + +## 1. 현재 시스템 분석 + +### 1.1 프로젝트 개요 + +CastAD는 **AI 기반 광고 음악 및 영상 자동 생성 서비스**입니다. 네이버 지도에서 수집한 숙박시설 정보를 기반으로 마케팅용 자동 영상을 생성하는 통합 플랫폼입니다. + +**핵심 파이프라인:** +``` +사용자 입력 → 가사 자동 생성 → 음악 자동 생성 → 영상 자동 생성 +``` + +### 1.2 현재 기술 스택 + +| 구분 | 기술 | +|------|------| +| Backend Framework | FastAPI (async/await 기반) | +| ORM | SQLAlchemy 2.0 (비동기) | +| Database | MySQL (asyncmy 드라이버) | +| Cache | Redis | +| AI/API | OpenAI ChatGPT, Suno AI, Creatomate | +| Storage | Azure Blob Storage | + +### 1.3 현재 핵심 흐름 + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ 현재 파이프라인 구조 │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ POST /crawling (선택) │ +│ │ │ +│ ▼ │ +│ POST /lyric/generate ──────► ChatGPT API ──────► 가사 저장 │ +│ │ │ +│ ▼ │ +│ POST /song/generate ───────► Suno API ─────────► 음악 저장 │ +│ │ │ │ +│ │ 클라이언트 폴링 │ +│ │ │ │ +│ ▼ ▼ │ +│ POST /video/generate ──────► Creatomate API ───► 영상 저장 │ +│ │ │ │ +│ │ 클라이언트 폴링 │ +│ ▼ ▼ │ +│ GET /video/download ◄──────── 완료 ──────────► Azure Blob │ +│ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### 1.4 현재 시스템의 한계점 + +| 문제점 | 설명 | +|--------|------| +| **분산된 상태 관리** | 각 API 호출마다 독립적인 상태 관리, 전체 파이프라인 추적 어려움 | +| **클라이언트 의존적 폴링** | 음악/영상 생성 완료 여부를 클라이언트가 반복 확인해야 함 | +| **하드코딩된 프롬프트** | ChatGPT 프롬프트가 코드에 직접 작성, 유연성 부족 | +| **에러 복구 제한적** | 단순 실패 패턴 검사만 수행, 자동 복구 메커니즘 없음 | +| **과거 데이터 미활용** | 성공한 가사/마케팅 사례 재활용 불가 | +| **일관성 없는 품질** | 동일 조건에서도 결과물 품질 편차 존재 | + +--- + +## 2. LangChain 적용 설계 + +### 2.1 적용 대상 및 목적 + +LangChain은 **LLM 애플리케이션 개발을 위한 프레임워크**로, 프롬프트 관리, 체인 구성, 출력 파싱 등을 체계화합니다. + +**적용 대상:** +1. 가사 생성 서비스 (`ChatgptService`) +2. 마케팅 분석 서비스 +3. 다국어 처리 로직 + +### 2.2 설계 1: 프롬프트 템플릿 시스템 + +**현재 문제:** +```python +# 현재: chatgpt_prompt.py +prompt = f""" +[ROLE] You are a marketing expert... +[INPUT] Customer: {customer_name}, Region: {region}... +""" +``` + +**개선 설계:** +```python +# 개선: langchain 적용 +from langchain.prompts import PromptTemplate, ChatPromptTemplate +from langchain_openai import ChatOpenAI + +# 가사 생성 프롬프트 템플릿 +LYRIC_PROMPT = ChatPromptTemplate.from_messages([ + ("system", """[ROLE] You are a marketing expert and professional lyricist. +You specialize in creating catchy, emotional lyrics for travel and accommodation marketing. + +[LANGUAGE REQUIREMENT] +Output MUST be 100% in {language}. No other languages allowed."""), + + ("human", """[INPUT] +Customer Name: {customer_name} +Region: {region} +Detailed Information: {detail_info} + +[OUTPUT REQUIREMENTS] +- 8-12 lines of lyrics +- Focus on: relaxation, healing, beautiful scenery, memorable experiences +- Style: warm, inviting, poetic +- Include location-specific imagery + +Generate lyrics now:""") +]) + +# 체인 구성 +lyric_chain = LYRIC_PROMPT | ChatOpenAI(model="gpt-4o-mini") | StrOutputParser() + +# 사용 +result = await lyric_chain.ainvoke({ + "customer_name": "스테이뫰", + "region": "강원도 속초", + "detail_info": "해변 근처 펜션", + "language": "Korean" +}) +``` + +**이점:** +- 프롬프트 버전 관리 용이 +- A/B 테스팅 지원 +- 입력 변수 명확한 정의 + +### 2.3 설계 2: 다단계 마케팅 분석 체인 + +**목적:** 복잡한 마케팅 분석을 단계별로 수행하여 품질 향상 + +```python +from langchain.chains import SequentialChain +from langchain.prompts import PromptTemplate +from langchain_openai import ChatOpenAI + +# Step 1: 경쟁사 분석 체인 +competitor_prompt = PromptTemplate( + input_variables=["region", "business_type"], + template=""" + {region} 지역의 {business_type} 업종에 대해 분석하세요: + - 주요 경쟁사 특성 + - 차별화 포인트 + - 시장 포지셔닝 + """ +) +competitor_chain = competitor_prompt | ChatOpenAI() | StrOutputParser() + +# Step 2: 타겟 고객 분석 체인 +audience_prompt = PromptTemplate( + input_variables=["region", "competitor_analysis"], + template=""" + 경쟁사 분석 결과: {competitor_analysis} + + {region} 지역의 주요 타겟 고객층을 분석하세요: + - 연령대 및 특성 + - 주요 니즈 + - 결정 요인 + """ +) +audience_chain = audience_prompt | ChatOpenAI() | StrOutputParser() + +# Step 3: 마케팅 전략 종합 체인 +strategy_prompt = PromptTemplate( + input_variables=["customer_name", "competitor_analysis", "audience_analysis"], + template=""" + 경쟁사 분석: {competitor_analysis} + 타겟 고객: {audience_analysis} + + {customer_name}을 위한 마케팅 전략을 제안하세요: + - 핵심 메시지 + - 차별화 전략 + - 추천 가사 방향 + """ +) +strategy_chain = strategy_prompt | ChatOpenAI() | StrOutputParser() + +# 통합 순차 체인 +marketing_analysis_chain = ( + {"region": RunnablePassthrough(), "business_type": RunnablePassthrough()} + | competitor_chain + | {"competitor_analysis": RunnablePassthrough(), "region": RunnablePassthrough()} + | audience_chain + | {"competitor_analysis": ..., "audience_analysis": RunnablePassthrough(), "customer_name": ...} + | strategy_chain +) +``` + +**이점:** +- 분석의 깊이와 체계성 향상 +- 각 단계별 결과 추적 가능 +- 중간 결과 캐싱 가능 + +### 2.4 설계 3: 출력 파싱 및 검증 + +**목적:** ChatGPT 응답의 구조화 및 자동 검증 + +```python +from langchain.output_parsers import PydanticOutputParser +from langchain_core.output_parsers import OutputFixingParser +from pydantic import BaseModel, Field, validator + +# 가사 출력 스키마 +class LyricOutput(BaseModel): + title: str = Field(description="가사의 제목 (선택)") + lyrics: list[str] = Field(description="가사 각 줄", min_items=8, max_items=12) + mood: str = Field(description="가사의 분위기: warm, energetic, romantic 등") + + @validator('lyrics') + def validate_line_count(cls, v): + if len(v) < 8: + raise ValueError("가사는 최소 8줄 이상이어야 합니다") + return v + +# 파서 생성 +parser = PydanticOutputParser(pydantic_object=LyricOutput) + +# 자동 수정 파서 (파싱 실패 시 LLM으로 재시도) +fixing_parser = OutputFixingParser.from_llm( + parser=parser, + llm=ChatOpenAI(model="gpt-4o-mini") +) + +# 프롬프트에 포맷 지시 추가 +prompt_with_format = LYRIC_PROMPT.partial( + format_instructions=parser.get_format_instructions() +) +``` + +**이점:** +- 응답 형식 일관성 보장 +- 자동 오류 복구 +- 타입 안전성 확보 + +### 2.5 설계 4: Few-Shot 다국어 프롬프트 + +**목적:** 각 언어별 고품질 예시 제공으로 번역/생성 품질 향상 + +```python +from langchain.prompts import FewShotPromptTemplate, PromptTemplate + +# 언어별 예시 +LANGUAGE_EXAMPLES = { + "Korean": [ + { + "input": "강원도 속초 해변 펜션", + "output": """푸른 바다 물결 위에 +새벽빛이 춤을 추고 +당신의 하루를 담아 +스테이뫰에서 쉬어가요""" + } + ], + "English": [ + { + "input": "Sokcho beach pension, Gangwon-do", + "output": """Where ocean waves meet morning light +A peaceful haven comes in sight +Let your worries drift away +At Stay Meoum, find your stay""" + } + ], + "Japanese": [ + { + "input": "江原道束草ビーチペンション", + "output": """青い海の波の上に +朝の光が踊る時 +あなたの一日を包み込む +ステイメウムで休んでいこう""" + } + ], + "Chinese": [...], + "Thai": [...], + "Vietnamese": [...] +} + +# Few-Shot 프롬프트 생성 +def create_multilingual_prompt(language: str): + example_prompt = PromptTemplate( + input_variables=["input", "output"], + template="입력: {input}\n가사:\n{output}" + ) + + return FewShotPromptTemplate( + examples=LANGUAGE_EXAMPLES.get(language, LANGUAGE_EXAMPLES["Korean"]), + example_prompt=example_prompt, + prefix="다음 예시를 참고하여 고품질 가사를 생성하세요:", + suffix="입력: {customer_info}\n가사:", + input_variables=["customer_info"] + ) +``` + +**이점:** +- 언어별 문화적 뉘앙스 반영 +- 일관된 스타일 유지 +- 번역 품질 대폭 향상 + +--- + +## 3. LangGraph 적용 설계 + +### 3.1 적용 대상 및 목적 + +LangGraph는 **복잡한 다단계 워크플로우를 상태 기계(State Machine)로 관리**하는 프레임워크입니다. + +**적용 대상:** +1. 전체 영상 생성 파이프라인 (가사 → 음악 → 영상) +2. 비동기 폴링 자동화 +3. 에러 처리 및 재시도 로직 + +### 3.2 설계 1: 통합 파이프라인 그래프 + +**핵심 설계:** + +```python +from langgraph.graph import StateGraph, END +from typing import TypedDict, Optional, Literal +from datetime import datetime + +# 파이프라인 상태 정의 +class PipelineState(TypedDict): + # 입력 + task_id: str + customer_name: str + region: str + detail_info: str + language: str + images: list[str] + orientation: Literal["vertical", "horizontal"] + + # 중간 결과 + lyric: Optional[str] + lyric_status: Optional[str] + + song_url: Optional[str] + song_task_id: Optional[str] + song_status: Optional[str] + song_duration: Optional[float] + + video_url: Optional[str] + video_render_id: Optional[str] + video_status: Optional[str] + + # 메타데이터 + error: Optional[str] + error_step: Optional[str] + started_at: datetime + completed_at: Optional[datetime] + retry_count: int + +# 그래프 빌더 +def build_video_pipeline() -> StateGraph: + graph = StateGraph(PipelineState) + + # ===== 노드 정의 ===== + + # 1. 가사 생성 노드 + async def generate_lyric(state: PipelineState) -> PipelineState: + """ChatGPT로 가사 생성 (동기)""" + try: + lyric_chain = create_lyric_chain() # LangChain 체인 + lyric = await lyric_chain.ainvoke({ + "customer_name": state["customer_name"], + "region": state["region"], + "detail_info": state["detail_info"], + "language": state["language"] + }) + + # DB 저장 + await save_lyric_to_db(state["task_id"], lyric) + + return { + **state, + "lyric": lyric, + "lyric_status": "completed" + } + except Exception as e: + return { + **state, + "error": str(e), + "error_step": "lyric_generation", + "lyric_status": "failed" + } + + # 2. 음악 생성 요청 노드 + async def request_song(state: PipelineState) -> PipelineState: + """Suno API에 음악 생성 요청""" + try: + suno = SunoAPIClient() + task_id = await suno.generate( + prompt=state["lyric"], + genre="K-Pop, Emotional" + ) + + # DB 저장 + await save_song_request_to_db(state["task_id"], task_id) + + return { + **state, + "song_task_id": task_id, + "song_status": "processing" + } + except Exception as e: + return { + **state, + "error": str(e), + "error_step": "song_request", + "song_status": "failed" + } + + # 3. 음악 폴링 노드 + async def poll_song_status(state: PipelineState) -> PipelineState: + """Suno 상태 폴링 (최대 5분)""" + suno = SunoAPIClient() + max_attempts = 60 # 5초 간격 × 60 = 5분 + + for attempt in range(max_attempts): + result = await suno.get_task_status(state["song_task_id"]) + + if result["status"] == "SUCCESS": + audio_url = result["clips"][0]["audio_url"] + duration = result["clips"][0]["duration"] + + # DB 업데이트 + await update_song_status( + state["task_id"], + "completed", + audio_url, + duration + ) + + return { + **state, + "song_url": audio_url, + "song_duration": duration, + "song_status": "completed" + } + elif result["status"] == "FAILED": + return { + **state, + "error": "Suno generation failed", + "error_step": "song_polling", + "song_status": "failed" + } + + await asyncio.sleep(5) # 5초 대기 + + return { + **state, + "error": "Song generation timeout", + "error_step": "song_polling", + "song_status": "timeout" + } + + # 4. 영상 생성 요청 노드 + async def request_video(state: PipelineState) -> PipelineState: + """Creatomate API에 영상 렌더링 요청""" + try: + creatomate = CreatomateClient() + render_id = await creatomate.render( + images=state["images"], + music_url=state["song_url"], + lyrics=state["lyric"], + duration=state["song_duration"], + orientation=state["orientation"] + ) + + # DB 저장 + await save_video_request_to_db(state["task_id"], render_id) + + return { + **state, + "video_render_id": render_id, + "video_status": "processing" + } + except Exception as e: + return { + **state, + "error": str(e), + "error_step": "video_request", + "video_status": "failed" + } + + # 5. 영상 폴링 노드 + async def poll_video_status(state: PipelineState) -> PipelineState: + """Creatomate 상태 폴링 (최대 10분)""" + creatomate = CreatomateClient() + max_attempts = 120 # 5초 간격 × 120 = 10분 + + for attempt in range(max_attempts): + result = await creatomate.get_render_status(state["video_render_id"]) + + if result["status"] == "succeeded": + video_url = result["url"] + + # Azure Blob 업로드 + blob_url = await upload_to_azure(video_url, state["task_id"]) + + # DB 업데이트 + await update_video_status(state["task_id"], "completed", blob_url) + + return { + **state, + "video_url": blob_url, + "video_status": "completed", + "completed_at": datetime.now() + } + elif result["status"] == "failed": + return { + **state, + "error": "Creatomate rendering failed", + "error_step": "video_polling", + "video_status": "failed" + } + + await asyncio.sleep(5) + + return { + **state, + "error": "Video generation timeout", + "error_step": "video_polling", + "video_status": "timeout" + } + + # 6. 에러 처리 노드 + async def handle_error(state: PipelineState) -> PipelineState: + """에러 로깅 및 알림""" + await log_pipeline_error( + task_id=state["task_id"], + error=state["error"], + step=state["error_step"] + ) + + # 선택: 슬랙/이메일 알림 + await send_error_notification(state) + + return state + + # ===== 노드 추가 ===== + graph.add_node("generate_lyric", generate_lyric) + graph.add_node("request_song", request_song) + graph.add_node("poll_song", poll_song_status) + graph.add_node("request_video", request_video) + graph.add_node("poll_video", poll_video_status) + graph.add_node("handle_error", handle_error) + + # ===== 엣지 정의 ===== + + # 시작점 + graph.set_entry_point("generate_lyric") + + # 조건부 분기: 가사 생성 후 + def route_after_lyric(state: PipelineState): + if state.get("error"): + return "handle_error" + return "request_song" + + graph.add_conditional_edges( + "generate_lyric", + route_after_lyric, + { + "request_song": "request_song", + "handle_error": "handle_error" + } + ) + + # 조건부 분기: 음악 요청 후 + def route_after_song_request(state: PipelineState): + if state.get("error"): + return "handle_error" + return "poll_song" + + graph.add_conditional_edges( + "request_song", + route_after_song_request + ) + + # 조건부 분기: 음악 폴링 후 + def route_after_song_poll(state: PipelineState): + if state.get("error") or state["song_status"] in ["failed", "timeout"]: + return "handle_error" + return "request_video" + + graph.add_conditional_edges( + "poll_song", + route_after_song_poll + ) + + # 조건부 분기: 영상 요청 후 + graph.add_conditional_edges( + "request_video", + lambda s: "handle_error" if s.get("error") else "poll_video" + ) + + # 조건부 분기: 영상 폴링 후 + graph.add_conditional_edges( + "poll_video", + lambda s: "handle_error" if s.get("error") else END + ) + + # 에러 핸들러는 항상 종료 + graph.add_edge("handle_error", END) + + return graph.compile() +``` + +**그래프 시각화:** + +``` + ┌─────────────────┐ + │ generate_lyric │ + └────────┬────────┘ + │ + ┌────────▼────────┐ + ┌─────┤ route check ├─────┐ + │ └─────────────────┘ │ + [error] [success] + │ │ + ▼ ▼ + ┌─────────────────┐ ┌─────────────────┐ + │ handle_error │ │ request_song │ + └────────┬────────┘ └────────┬────────┘ + │ │ + ▼ ▼ + END ┌─────────────────┐ + │ poll_song │ + └────────┬────────┘ + │ + ┌────────▼────────┐ + ┌─────┤ route check ├─────┐ + │ └─────────────────┘ │ + [error/timeout] [success] + │ │ + ▼ ▼ + ┌─────────────────┐ ┌─────────────────┐ + │ handle_error │ │ request_video │ + └────────┬────────┘ └────────┬────────┘ + │ │ + ▼ ▼ + END ┌─────────────────┐ + │ poll_video │ + └────────┬────────┘ + │ + ┌────────▼────────┐ + ┌─────┤ route check ├─────┐ + │ └─────────────────┘ │ + [error] [success] + │ │ + ▼ ▼ + ┌─────────────────┐ END + │ handle_error │ (파이프라인 완료) + └────────┬────────┘ + │ + ▼ + END +``` + +### 3.3 설계 2: 재시도 및 폴백 메커니즘 + +```python +from langgraph.graph import StateGraph + +class RetryState(TypedDict): + task_id: str + retry_count: int + max_retries: int + last_error: Optional[str] + # ... 기타 필드 + +def build_retry_aware_pipeline(): + graph = StateGraph(RetryState) + + async def generate_song_with_retry(state: RetryState) -> RetryState: + """재시도 로직이 포함된 음악 생성""" + try: + # 1차 시도: Suno API + result = await suno_generate(state["lyric"]) + return {**state, "song_url": result, "retry_count": 0} + + except SunoRateLimitError: + # 재시도 1: 딜레이 후 재시도 + if state["retry_count"] < state["max_retries"]: + await asyncio.sleep(30) # 30초 대기 + return { + **state, + "retry_count": state["retry_count"] + 1, + "last_error": "rate_limit" + } + + except SunoAPIError as e: + # 재시도 2: 프롬프트 수정 후 재시도 + if "invalid lyrics" in str(e) and state["retry_count"] < 2: + simplified_lyric = await simplify_lyrics(state["lyric"]) + return { + **state, + "lyric": simplified_lyric, + "retry_count": state["retry_count"] + 1 + } + + # 폴백: 대체 서비스 사용 + try: + result = await alternative_music_service(state["lyric"]) + return {**state, "song_url": result, "used_fallback": True} + except: + pass + + return { + **state, + "error": "All music generation attempts failed", + "song_status": "failed" + } + + # 조건부 재시도 엣지 + def should_retry_song(state: RetryState): + if state.get("song_url"): + return "next_step" + if state["retry_count"] < state["max_retries"]: + return "retry_song" + return "handle_error" + + graph.add_conditional_edges( + "generate_song", + should_retry_song, + { + "retry_song": "generate_song", # 자기 자신으로 루프 + "next_step": "request_video", + "handle_error": "handle_error" + } + ) + + return graph.compile() +``` + +### 3.4 설계 3: 병렬 처리 지원 + +```python +from langgraph.types import Send +from langgraph.graph import StateGraph + +class ParallelState(TypedDict): + task_id: str + images: list[str] + analyzed_images: list[dict] # 병렬 분석 결과 + # ... + +def build_parallel_pipeline(): + graph = StateGraph(ParallelState) + + # 이미지 분석을 병렬로 수행 + async def analyze_single_image(state: dict) -> dict: + """단일 이미지 분석""" + image_url = state["image_url"] + analysis = await vision_model.analyze(image_url) + return { + "image_url": image_url, + "analysis": analysis, + "mood": analysis.get("mood"), + "colors": analysis.get("dominant_colors") + } + + # 팬아웃: 여러 이미지를 병렬로 분석 + def fanout_images(state: ParallelState): + return [ + Send("analyze_image", {"image_url": img, "task_id": state["task_id"]}) + for img in state["images"] + ] + + # 팬인: 분석 결과 수집 + async def collect_analyses(state: ParallelState) -> ParallelState: + # LangGraph가 자동으로 병렬 결과를 수집 + return state + + graph.add_node("analyze_image", analyze_single_image) + graph.add_node("collect", collect_analyses) + + graph.add_conditional_edges( + "start", + fanout_images # 여러 Send 반환 → 병렬 실행 + ) + + return graph.compile() +``` + +### 3.5 FastAPI 통합 + +```python +# main.py 또는 video/api/routers/v1/video.py + +from fastapi import APIRouter, BackgroundTasks +from langgraph.graph import StateGraph + +router = APIRouter() +pipeline = build_video_pipeline() + +@router.post("/video/generate-full") +async def generate_full_video( + request: FullVideoRequest, + background_tasks: BackgroundTasks +): + """단일 API 호출로 전체 파이프라인 실행""" + + initial_state: PipelineState = { + "task_id": str(uuid7()), + "customer_name": request.customer_name, + "region": request.region, + "detail_info": request.detail_info, + "language": request.language, + "images": request.images, + "orientation": request.orientation, + "lyric": None, + "lyric_status": None, + "song_url": None, + "song_task_id": None, + "song_status": None, + "song_duration": None, + "video_url": None, + "video_render_id": None, + "video_status": None, + "error": None, + "error_step": None, + "started_at": datetime.now(), + "completed_at": None, + "retry_count": 0 + } + + # 백그라운드에서 파이프라인 실행 + background_tasks.add_task(run_pipeline_async, initial_state) + + return { + "task_id": initial_state["task_id"], + "status": "processing", + "message": "Pipeline started. Use GET /video/pipeline-status/{task_id} to check progress." + } + +async def run_pipeline_async(initial_state: PipelineState): + """백그라운드에서 LangGraph 파이프라인 실행""" + try: + final_state = await pipeline.ainvoke(initial_state) + + # 결과 DB 저장 + await save_pipeline_result(final_state) + + # 완료 알림 (웹훅, 이메일 등) + if final_state.get("video_url"): + await send_completion_notification(final_state) + + except Exception as e: + await log_pipeline_error(initial_state["task_id"], str(e)) + +@router.get("/video/pipeline-status/{task_id}") +async def get_pipeline_status(task_id: str): + """파이프라인 진행 상태 조회""" + status = await get_status_from_db(task_id) + + return { + "task_id": task_id, + "lyric_status": status.lyric_status, + "song_status": status.song_status, + "video_status": status.video_status, + "overall_status": determine_overall_status(status), + "video_url": status.video_url if status.video_status == "completed" else None, + "error": status.error + } +``` + +--- + +## 4. RAG 적용 설계 + +### 4.1 적용 대상 및 목적 + +RAG(Retrieval-Augmented Generation)는 **외부 지식 기반을 검색하여 LLM 응답 품질을 향상**시키는 기법입니다. + +**적용 대상:** +1. 마케팅 지식베이스 (성공 사례) +2. 지역별/업종별 가사 예시 +3. 이미지 메타데이터 활용 +4. 프롬프트 최적화 + +### 4.2 설계 1: 마케팅 지식베이스 RAG + +**아키텍처:** + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 마케팅 지식베이스 │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ Document Store (벡터 DB) │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ Collection: marketing_knowledge │ │ +│ │ │ │ +│ │ ┌──────────────────────────────────────────────┐ │ │ +│ │ │ 문서 1: 강원도 속초 펜션 마케팅 성공 사례 │ │ │ +│ │ │ - 가사 예시 │ │ │ +│ │ │ - 타겟 고객 분석 │ │ │ +│ │ │ - 효과적인 키워드 │ │ │ +│ │ │ - 영상 조회수/반응 │ │ │ +│ │ └──────────────────────────────────────────────┘ │ │ +│ │ │ │ +│ │ ┌──────────────────────────────────────────────┐ │ │ +│ │ │ 문서 2: 제주도 게스트하우스 마케팅 사례 │ │ │ +│ │ │ ... │ │ │ +│ │ └──────────────────────────────────────────────┘ │ │ +│ │ │ │ +│ │ ... (수백 개의 사례) │ │ +│ └─────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────┘ + │ + │ 유사도 검색 + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ 가사 생성 프롬프트 │ +├─────────────────────────────────────────────────────────────┤ +│ "다음 유사 사례를 참고하여 가사를 생성하세요: │ +│ [검색된 성공 사례 1] │ +│ [검색된 성공 사례 2] │ +│ ..." │ +└─────────────────────────────────────────────────────────────┘ +``` + +**구현 코드:** + +```python +from langchain.embeddings.openai import OpenAIEmbeddings +from langchain.vectorstores import Chroma +from langchain.document_loaders import JSONLoader +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain.schema import Document + +# 1. 임베딩 모델 설정 +embeddings = OpenAIEmbeddings( + model="text-embedding-3-small", + openai_api_key=settings.CHATGPT_API_KEY +) + +# 2. 벡터 스토어 초기화 +vector_store = Chroma( + collection_name="marketing_knowledge", + embedding_function=embeddings, + persist_directory="./data/chroma_db" +) + +# 3. 마케팅 사례 문서 구조 +class MarketingCase(BaseModel): + case_id: str + region: str + business_type: str # "pension", "guesthouse", "hotel" + target_audience: str + successful_lyrics: str + keywords: list[str] + performance_metrics: dict # views, engagement, conversions + created_at: datetime + +# 4. 문서 추가 함수 +async def add_marketing_case(case: MarketingCase): + """성공한 마케팅 사례를 벡터 스토어에 추가""" + + # 메타데이터와 함께 문서 생성 + document = Document( + page_content=f""" +지역: {case.region} +업종: {case.business_type} +타겟 고객: {case.target_audience} +성공 가사: +{case.successful_lyrics} +효과적인 키워드: {', '.join(case.keywords)} +성과: 조회수 {case.performance_metrics.get('views', 0)}, + 참여율 {case.performance_metrics.get('engagement', 0)}% + """, + metadata={ + "case_id": case.case_id, + "region": case.region, + "business_type": case.business_type, + "created_at": case.created_at.isoformat() + } + ) + + vector_store.add_documents([document]) + vector_store.persist() + +# 5. RAG 기반 가사 생성 +async def generate_lyrics_with_rag( + customer_name: str, + region: str, + business_type: str, + language: str +) -> str: + """RAG를 활용한 고품질 가사 생성""" + + # 유사 사례 검색 + query = f"{region} {business_type} 마케팅 가사" + similar_cases = vector_store.similarity_search( + query, + k=3, + filter={"business_type": business_type} # 같은 업종만 + ) + + # 검색 결과를 프롬프트에 포함 + examples_text = "\n\n".join([ + f"### 참고 사례 {i+1}\n{doc.page_content}" + for i, doc in enumerate(similar_cases) + ]) + + # LangChain 프롬프트 구성 + rag_prompt = ChatPromptTemplate.from_messages([ + ("system", """당신은 마케팅 전문가이자 작사가입니다. +다음 성공 사례를 참고하여 새로운 가사를 생성하세요. +참고 사례의 스타일과 키워드를 활용하되, 고유한 내용을 만드세요. + +{examples}"""), + ("human", """ +고객명: {customer_name} +지역: {region} +언어: {language} + +위 정보를 바탕으로 8-12줄의 감성적인 마케팅 가사를 생성하세요. +""") + ]) + + chain = rag_prompt | ChatOpenAI(model="gpt-4o") | StrOutputParser() + + result = await chain.ainvoke({ + "examples": examples_text, + "customer_name": customer_name, + "region": region, + "language": language + }) + + return result +``` + +### 4.3 설계 2: 지역별 특화 RAG + +**목적:** 각 지역의 특성(문화, 관광지, 특산물 등)을 반영한 가사 생성 + +```python +# 지역 정보 문서 구조 +class RegionInfo(BaseModel): + region_name: str + province: str + famous_attractions: list[str] + local_foods: list[str] + cultural_keywords: list[str] + seasonal_events: list[dict] # {"season": "summer", "event": "해수욕장 개장"} + atmosphere: list[str] # ["고즈넉한", "활기찬", "낭만적인"] + +# 지역 정보 벡터 스토어 +region_store = Chroma( + collection_name="region_knowledge", + embedding_function=embeddings, + persist_directory="./data/chroma_region" +) + +# 지역 정보 추가 예시 +regions_data = [ + RegionInfo( + region_name="속초", + province="강원도", + famous_attractions=["설악산", "속초해변", "영금정", "아바이마을"], + local_foods=["오징어순대", "물회", "생선구이"], + cultural_keywords=["동해바다", "일출", "산과 바다", "청정자연"], + seasonal_events=[ + {"season": "summer", "event": "속초해변 피서"}, + {"season": "autumn", "event": "설악산 단풍"} + ], + atmosphere=["시원한", "청량한", "자연친화적", "힐링"] + ), + # ... 더 많은 지역 +] + +async def enrich_lyrics_with_region_info( + base_lyrics: str, + region: str +) -> str: + """지역 정보로 가사 보강""" + + # 지역 정보 검색 + region_docs = region_store.similarity_search(region, k=1) + + if not region_docs: + return base_lyrics + + region_info = region_docs[0].page_content + + # 가사에 지역 특성 반영 + enrichment_prompt = ChatPromptTemplate.from_messages([ + ("system", """당신은 가사 편집 전문가입니다. +주어진 기본 가사에 지역의 특성을 자연스럽게 녹여내세요. +지역 정보: +{region_info}"""), + ("human", """기본 가사: +{base_lyrics} + +위 가사에 지역의 특성(명소, 분위기, 키워드)을 2-3개 자연스럽게 추가하세요. +원래 가사의 운율과 분위기를 유지하세요.""") + ]) + + chain = enrichment_prompt | ChatOpenAI() | StrOutputParser() + + return await chain.ainvoke({ + "region_info": region_info, + "base_lyrics": base_lyrics + }) +``` + +### 4.4 설계 3: 이미지 메타데이터 RAG + +**목적:** 업로드된 이미지의 분석 결과를 저장하고, 영상 생성 시 최적의 이미지 순서 결정 + +```python +from langchain_openai import ChatOpenAI + +# Vision 모델로 이미지 분석 +vision_model = ChatOpenAI(model="gpt-4o") + +# 이미지 분석 문서 구조 +class ImageAnalysis(BaseModel): + image_url: str + task_id: str + description: str + dominant_colors: list[str] + mood: str # "warm", "cool", "neutral" + scene_type: str # "interior", "exterior", "nature", "food" + suggested_position: str # "opening", "middle", "closing" + quality_score: float # 0.0 ~ 1.0 + +# 이미지 메타데이터 벡터 스토어 +image_store = Chroma( + collection_name="image_metadata", + embedding_function=embeddings, + persist_directory="./data/chroma_images" +) + +async def analyze_and_store_image(image_url: str, task_id: str): + """이미지 분석 후 벡터 스토어에 저장""" + + # GPT-4o Vision으로 이미지 분석 + analysis_response = await vision_model.ainvoke([ + { + "type": "text", + "text": """이미지를 분석하고 다음 JSON 형식으로 응답하세요: +{ + "description": "이미지 설명 (2-3문장)", + "dominant_colors": ["색상1", "색상2"], + "mood": "warm/cool/neutral 중 하나", + "scene_type": "interior/exterior/nature/food 중 하나", + "suggested_position": "opening/middle/closing 중 하나 (영상에서 적합한 위치)", + "quality_score": 0.0~1.0 (이미지 품질/선명도) +}""" + }, + { + "type": "image_url", + "image_url": {"url": image_url} + } + ]) + + analysis = json.loads(analysis_response.content) + + # 문서 생성 및 저장 + document = Document( + page_content=f""" +이미지 설명: {analysis['description']} +분위기: {analysis['mood']} +장면 유형: {analysis['scene_type']} +추천 위치: {analysis['suggested_position']} +색상: {', '.join(analysis['dominant_colors'])} + """, + metadata={ + "image_url": image_url, + "task_id": task_id, + **analysis + } + ) + + image_store.add_documents([document]) + image_store.persist() + + return ImageAnalysis(image_url=image_url, task_id=task_id, **analysis) + +async def get_optimal_image_order( + task_id: str, + music_mood: str, # 음악 분위기 + lyrics_theme: str # 가사 주제 +) -> list[str]: + """음악과 가사에 맞는 최적의 이미지 순서 결정""" + + # 해당 task의 모든 이미지 조회 + all_images = image_store.get( + where={"task_id": task_id} + ) + + # 음악/가사 분위기에 맞는 이미지 우선 검색 + query = f"{music_mood} {lyrics_theme} 마케팅 영상" + sorted_images = image_store.similarity_search( + query, + k=len(all_images), + filter={"task_id": task_id} + ) + + # 이미지 순서 결정 로직 + opening_images = [img for img in sorted_images if img.metadata["suggested_position"] == "opening"] + middle_images = [img for img in sorted_images if img.metadata["suggested_position"] == "middle"] + closing_images = [img for img in sorted_images if img.metadata["suggested_position"] == "closing"] + + # 품질 점수로 정렬 + opening_images.sort(key=lambda x: x.metadata["quality_score"], reverse=True) + closing_images.sort(key=lambda x: x.metadata["quality_score"], reverse=True) + + # 최종 순서 + ordered = ( + opening_images[:2] + # 시작 2장 + middle_images + # 중간 이미지들 + closing_images[:1] # 마무리 1장 + ) + + return [img.metadata["image_url"] for img in ordered] +``` + +### 4.5 설계 4: 프롬프트 히스토리 RAG + +**목적:** 과거 성공/실패한 프롬프트를 학습하여 프롬프트 품질 지속 개선 + +```python +# 프롬프트 결과 문서 구조 +class PromptResult(BaseModel): + prompt_id: str + prompt_text: str + result_text: str + success: bool + failure_reason: Optional[str] + category: str # "lyric", "marketing_analysis", "region_enrichment" + metrics: dict # {"length": 10, "contains_region_keyword": True, ...} + created_at: datetime + +# 프롬프트 히스토리 벡터 스토어 +prompt_store = Chroma( + collection_name="prompt_history", + embedding_function=embeddings, + persist_directory="./data/chroma_prompts" +) + +async def log_prompt_result(result: PromptResult): + """프롬프트 결과 기록""" + + document = Document( + page_content=f""" +프롬프트: {result.prompt_text} +결과: {result.result_text[:500]}... +성공 여부: {'성공' if result.success else '실패'} +실패 사유: {result.failure_reason or 'N/A'} + """, + metadata={ + "prompt_id": result.prompt_id, + "success": result.success, + "category": result.category, + "created_at": result.created_at.isoformat(), + **result.metrics + } + ) + + prompt_store.add_documents([document]) + +async def get_improved_prompt( + base_prompt: str, + category: str +) -> str: + """과거 결과를 기반으로 프롬프트 개선""" + + # 유사한 성공 프롬프트 검색 + successful_prompts = prompt_store.similarity_search( + base_prompt, + k=3, + filter={"success": True, "category": category} + ) + + # 유사한 실패 프롬프트 검색 (피해야 할 패턴) + failed_prompts = prompt_store.similarity_search( + base_prompt, + k=2, + filter={"success": False, "category": category} + ) + + # 프롬프트 개선 요청 + improvement_prompt = ChatPromptTemplate.from_messages([ + ("system", """당신은 프롬프트 엔지니어링 전문가입니다. + +다음 성공/실패 사례를 참고하여 주어진 프롬프트를 개선하세요. + +### 성공 사례 (참고): +{successful_examples} + +### 실패 사례 (피할 것): +{failed_examples} + +### 개선 원칙: +1. 성공 사례의 패턴을 따르세요 +2. 실패 사례의 패턴을 피하세요 +3. 명확하고 구체적인 지시를 포함하세요 +4. 출력 형식을 명시하세요"""), + ("human", """개선할 프롬프트: +{base_prompt} + +위 프롬프트를 개선하세요. 개선된 프롬프트만 출력하세요.""") + ]) + + chain = improvement_prompt | ChatOpenAI() | StrOutputParser() + + improved = await chain.ainvoke({ + "successful_examples": "\n---\n".join([doc.page_content for doc in successful_prompts]), + "failed_examples": "\n---\n".join([doc.page_content for doc in failed_prompts]), + "base_prompt": base_prompt + }) + + return improved +``` + +--- + +## 5. 통합 아키텍처 + +### 5.1 전체 시스템 아키텍처 + +``` +┌──────────────────────────────────────────────────────────────────────────┐ +│ FastAPI 라우터 │ +│ /lyric/generate, /song/generate, /video/generate, /video/generate-full │ +└───────────────────────────────────┬──────────────────────────────────────┘ + │ + ▼ +┌──────────────────────────────────────────────────────────────────────────┐ +│ LangGraph 파이프라인 │ +│ ┌─────────────────────────────────────────────────────────────────┐ │ +│ │ StateGraph (상태 기계) │ │ +│ │ │ │ +│ │ [generate_lyric] → [request_song] → [poll_song] │ │ +│ │ │ │ │ │ +│ │ │ ▼ │ │ +│ │ │ [request_video] → [poll_video] → END │ │ +│ │ │ │ │ │ +│ │ └─────────────────────┴──────────→ [handle_error] │ │ +│ └─────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌───────────────┼───────────────┐ │ +│ │ │ │ │ +│ ▼ ▼ ▼ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ LangChain │ │ RAG │ │ External │ │ +│ │ Components │ │ Vector DBs │ │ APIs │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ │ +└──────────────────────────────────────────────────────────────────────────┘ + │ │ │ + ┌───────────┘ ┌──────────┘ ┌──────────┘ + │ │ │ + ▼ ▼ ▼ +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Prompt │ │ Chroma │ │ OpenAI │ +│ Templates │ │ Vector │ │ Suno │ +│ Chains │ │ Store │ │ Creatomate │ +│ Parsers │ │ │ │ │ +└─────────────┘ └─────────────┘ └─────────────┘ + │ │ │ + └────────────────┴───────────────┘ + │ + ▼ + ┌─────────────────────────┐ + │ MySQL + Azure Blob │ + │ (영구 저장소) │ + └─────────────────────────┘ +``` + +### 5.2 데이터 흐름 + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ 데이터 흐름 │ +├─────────────────────────────────────────────────────────────────────────┤ +│ │ +│ 1. 사용자 요청 │ +│ ├── 고객 정보 (이름, 지역, 상세정보) │ +│ └── 이미지 URL 리스트 │ +│ │ │ +│ ▼ │ +│ 2. RAG 검색 (병렬) │ +│ ├── 마케팅 지식베이스 → 유사 성공 사례 │ +│ ├── 지역 정보베이스 → 지역 특성 │ +│ └── 이미지 메타데이터 → 이미지 분석 │ +│ │ │ +│ ▼ │ +│ 3. LangChain 프롬프트 구성 │ +│ ├── 기본 템플릿 로드 │ +│ ├── RAG 결과 주입 │ +│ ├── Few-shot 예시 추가 │ +│ └── 출력 형식 지정 │ +│ │ │ +│ ▼ │ +│ 4. LangGraph 파이프라인 실행 │ +│ ├── 가사 생성 (ChatGPT) │ +│ ├── 음악 생성 (Suno, 폴링 자동화) │ +│ └── 영상 생성 (Creatomate, 폴링 자동화) │ +│ │ │ +│ ▼ │ +│ 5. 결과 저장 │ +│ ├── MySQL: 메타데이터, 상태 │ +│ ├── Azure Blob: 영상 파일 │ +│ └── Chroma: 성공 사례 피드백 │ +│ │ │ +│ ▼ │ +│ 6. 사용자 응답 │ +│ └── 영상 URL, 상태, 메타데이터 │ +│ │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### 5.3 디렉토리 구조 (신규) + +``` +app/ +├── langchain/ # LangChain 관련 +│ ├── __init__.py +│ ├── prompts/ +│ │ ├── __init__.py +│ │ ├── lyric_prompts.py # 가사 생성 프롬프트 +│ │ ├── marketing_prompts.py # 마케팅 분석 프롬프트 +│ │ └── examples/ # Few-shot 예시 +│ │ ├── korean.json +│ │ ├── english.json +│ │ └── ... +│ ├── chains/ +│ │ ├── __init__.py +│ │ ├── lyric_chain.py # 가사 생성 체인 +│ │ └── marketing_chain.py # 마케팅 분석 체인 +│ └── parsers/ +│ ├── __init__.py +│ └── lyric_parser.py # 가사 출력 파서 +│ +├── langgraph/ # LangGraph 관련 +│ ├── __init__.py +│ ├── states/ +│ │ ├── __init__.py +│ │ └── pipeline_state.py # 파이프라인 상태 정의 +│ ├── nodes/ +│ │ ├── __init__.py +│ │ ├── lyric_node.py # 가사 생성 노드 +│ │ ├── song_node.py # 음악 생성 노드 +│ │ ├── video_node.py # 영상 생성 노드 +│ │ └── error_node.py # 에러 처리 노드 +│ └── graphs/ +│ ├── __init__.py +│ └── video_pipeline.py # 메인 파이프라인 그래프 +│ +├── rag/ # RAG 관련 +│ ├── __init__.py +│ ├── stores/ +│ │ ├── __init__.py +│ │ ├── marketing_store.py # 마케팅 지식베이스 +│ │ ├── region_store.py # 지역 정보베이스 +│ │ ├── image_store.py # 이미지 메타데이터 +│ │ └── prompt_store.py # 프롬프트 히스토리 +│ ├── loaders/ +│ │ ├── __init__.py +│ │ └── case_loader.py # 사례 데이터 로더 +│ └── retrievers/ +│ ├── __init__.py +│ └── hybrid_retriever.py # 하이브리드 검색 +│ +└── data/ # 데이터 저장소 + └── chroma_db/ # Chroma 벡터 DB + ├── marketing_knowledge/ + ├── region_knowledge/ + ├── image_metadata/ + └── prompt_history/ +``` + +--- + +## 6. 기대 효과 + +### 6.1 정량적 기대 효과 + +| 지표 | 현재 | 목표 | 개선율 | +|------|------|------|--------| +| **가사 생성 품질** | 70% 만족도 | 90% 만족도 | +29% | +| **재작업률** | 30% | 10% | -67% | +| **파이프라인 실패율** | 15% | 5% | -67% | +| **평균 처리 시간** | 10분 (수동 개입 필요) | 8분 (완전 자동) | -20% | +| **다국어 품질** | 60% | 85% | +42% | +| **프롬프트 튜닝 시간** | 2시간/버전 | 30분/버전 | -75% | + +### 6.2 정성적 기대 효과 + +#### 6.2.1 개발 생산성 향상 + +| 영역 | 효과 | +|------|------| +| **코드 유지보수** | 프롬프트와 비즈니스 로직 분리로 수정 용이 | +| **테스트 용이성** | 각 체인/노드 단위 테스트 가능 | +| **디버깅** | 상태 기계 기반으로 문제 지점 명확히 파악 | +| **확장성** | 새로운 AI 서비스 추가 시 노드만 추가하면 됨 | + +#### 6.2.2 품질 향상 + +| 영역 | 효과 | +|------|------| +| **일관성** | 동일 조건에서 일관된 품질의 결과물 생성 | +| **지역 맞춤화** | RAG로 지역별 특성 자동 반영 | +| **학습 효과** | 성공 사례 축적으로 시간이 지날수록 품질 향상 | +| **에러 복구** | 자동 재시도 및 폴백으로 안정성 강화 | + +#### 6.2.3 운영 효율성 + +| 영역 | 효과 | +|------|------| +| **모니터링** | 파이프라인 상태 추적으로 병목 지점 파악 | +| **비용 최적화** | 불필요한 API 호출 감소, 캐싱 활용 | +| **확장 대응** | 부하 증가 시 노드별 스케일링 가능 | + +### 6.3 비즈니스 가치 + +``` +┌────────────────────────────────────────────────────────────────┐ +│ 비즈니스 가치 │ +├────────────────────────────────────────────────────────────────┤ +│ │ +│ 1. 고객 만족도 향상 │ +│ └── 고품질 가사/영상으로 마케팅 효과 증대 │ +│ │ +│ 2. 서비스 차별화 │ +│ └── 지역 맞춤 콘텐츠로 경쟁사 대비 우위 │ +│ │ +│ 3. 운영 비용 절감 │ +│ └── 자동화로 수동 개입 최소화 │ +│ │ +│ 4. 확장 가능성 │ +│ └── 새로운 지역/업종 추가 시 RAG 학습만으로 대응 │ +│ │ +│ 5. 데이터 자산화 │ +│ └── 축적된 성공 사례가 진입 장벽 역할 │ +│ │ +└────────────────────────────────────────────────────────────────┘ +``` + +--- + +## 7. 구현 로드맵 + +### 7.1 Phase 1: 기초 (1-2주) + +**목표:** LangChain 기본 구조 구축 + +| 작업 | 설명 | 우선순위 | +|------|------|----------| +| 의존성 설치 | langchain, langchain-openai, chromadb | P0 | +| 프롬프트 템플릿 작성 | 가사 생성 프롬프트 이관 | P0 | +| 기본 체인 구현 | ChatGPT 서비스 LangChain으로 래핑 | P0 | +| 출력 파서 구현 | 가사 응답 검증 및 파싱 | P1 | +| 테스트 작성 | 체인 단위 테스트 | P1 | + +**산출물:** +- `app/langchain/` 디렉토리 구조 +- 가사 생성 LangChain 체인 +- 기본 테스트 코드 + +### 7.2 Phase 2: 파이프라인 (2-3주) + +**목표:** LangGraph 파이프라인 구축 + +| 작업 | 설명 | 우선순위 | +|------|------|----------| +| 상태 정의 | PipelineState TypedDict 작성 | P0 | +| 노드 구현 | 가사, 음악, 영상 생성 노드 | P0 | +| 그래프 구성 | 엣지 및 조건부 분기 정의 | P0 | +| 폴링 통합 | Suno, Creatomate 폴링 자동화 | P0 | +| 에러 처리 | 에러 노드 및 재시도 로직 | P1 | +| FastAPI 통합 | 새 엔드포인트 추가 | P1 | + +**산출물:** +- `app/langgraph/` 디렉토리 구조 +- 통합 파이프라인 그래프 +- `/video/generate-full` 엔드포인트 + +### 7.3 Phase 3: RAG (2-3주) + +**목표:** 지식베이스 구축 및 RAG 통합 + +| 작업 | 설명 | 우선순위 | +|------|------|----------| +| Chroma 설정 | 벡터 스토어 초기화 | P0 | +| 마케팅 사례 수집 | 기존 성공 사례 데이터화 | P0 | +| 지역 정보 구축 | 주요 지역 정보 입력 | P1 | +| 검색 통합 | 가사 생성 시 RAG 적용 | P0 | +| 이미지 분석 | Vision API 연동 | P2 | +| 프롬프트 히스토리 | 자동 학습 파이프라인 | P2 | + +**산출물:** +- `app/rag/` 디렉토리 구조 +- 마케팅/지역 지식베이스 +- RAG 통합 가사 생성 + +### 7.4 Phase 4: 고도화 (2-3주) + +**목표:** 최적화 및 모니터링 + +| 작업 | 설명 | 우선순위 | +|------|------|----------| +| 성능 최적화 | 캐싱, 병렬 처리 개선 | P1 | +| 모니터링 | 파이프라인 상태 대시보드 | P1 | +| A/B 테스팅 | 프롬프트 버전 비교 | P2 | +| 문서화 | API 문서, 운영 가이드 | P1 | +| 부하 테스트 | 동시 요청 처리 검증 | P2 | + +**산출물:** +- 최적화된 파이프라인 +- 모니터링 대시보드 +- 완성된 문서 + +--- + +## 8. 결론 + +### 8.1 요약 + +CastAD 백엔드에 LangChain, LangGraph, RAG를 적용하면: + +1. **LangChain**: 프롬프트 관리 체계화, 다단계 체인 구성, 출력 검증 자동화 +2. **LangGraph**: 복잡한 파이프라인 상태 관리, 폴링 자동화, 에러 처리 강화 +3. **RAG**: 과거 성공 사례 활용, 지역별 맞춤화, 지속적 품질 개선 + +### 8.2 핵심 가치 + +``` +┌───────────────────────────────────────────────────────────┐ +│ │ +│ 현재: 각 단계가 독립적 → 상태 관리 어려움 │ +│ 개선: 통합 파이프라인 → 자동화된 상태 추적 │ +│ │ +│ 현재: 하드코딩 프롬프트 → 수정 어려움 │ +│ 개선: 템플릿 기반 → 유연한 프롬프트 관리 │ +│ │ +│ 현재: 과거 데이터 미활용 → 일관성 없는 품질 │ +│ 개선: RAG 지식베이스 → 축적된 노하우 활용 │ +│ │ +└───────────────────────────────────────────────────────────┘ +``` + +### 8.3 권장 사항 + +1. **단계적 도입**: Phase 1(LangChain)부터 시작하여 검증 후 확장 +2. **기존 API 유지**: 새 엔드포인트 추가 방식으로 호환성 보장 +3. **데이터 축적 우선**: RAG 효과를 위해 초기 사례 데이터 확보 중요 +4. **모니터링 병행**: 각 단계별 성과 측정으로 ROI 검증 + +--- + +## 부록 + +### A. 필요 의존성 + +```toml +# pyproject.toml 추가 의존성 +[project.dependencies] +langchain = ">=0.1.0" +langchain-openai = ">=0.0.5" +langchain-community = ">=0.0.20" +langgraph = ">=0.0.30" +chromadb = ">=0.4.22" +tiktoken = ">=0.5.2" +``` + +### B. 환경 변수 추가 + +```env +# .env 추가 +CHROMA_PERSIST_DIR=./data/chroma_db +LANGCHAIN_TRACING_V2=true # 선택: LangSmith 모니터링 +LANGCHAIN_API_KEY=xxx # 선택: LangSmith 모니터링 +``` + +### C. 참고 자료 + +- [LangChain Documentation](https://python.langchain.com/) +- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/) +- [Chroma Documentation](https://docs.trychroma.com/) +- [OpenAI Cookbook](https://cookbook.openai.com/) + +--- + +*이 보고서는 CastAD 백엔드 프로젝트 분석을 기반으로 작성되었습니다.* +*작성일: 2025-12-28* diff --git a/docs/analysis/orm_report.md b/docs/analysis/orm_report.md new file mode 100644 index 0000000..e430040 --- /dev/null +++ b/docs/analysis/orm_report.md @@ -0,0 +1,500 @@ +# ORM 동기식 전환 보고서 + +## 개요 + +현재 프로젝트는 **SQLAlchemy 2.0+ 비동기 방식**으로 구현되어 있습니다. 이 보고서는 동기식으로 전환할 경우 필요한 코드 수정 사항을 정리합니다. + +--- + +## 1. 현재 비동기 구현 현황 + +### 1.1 사용 중인 라이브러리 +- `sqlalchemy[asyncio]>=2.0.45` +- `asyncmy>=0.2.10` (MySQL 비동기 드라이버) +- `aiomysql>=0.3.2` + +### 1.2 주요 비동기 컴포넌트 +| 컴포넌트 | 현재 (비동기) | 변경 후 (동기) | +|---------|-------------|--------------| +| 엔진 | `create_async_engine` | `create_engine` | +| 세션 팩토리 | `async_sessionmaker` | `sessionmaker` | +| 세션 클래스 | `AsyncSession` | `Session` | +| DB 드라이버 | `mysql+asyncmy` | `mysql+pymysql` | + +--- + +## 2. 파일별 수정 사항 + +### 2.1 pyproject.toml - 의존성 변경 + +**파일**: `pyproject.toml` + +```diff +dependencies = [ + "fastapi[standard]>=0.115.8", +- "sqlalchemy[asyncio]>=2.0.45", ++ "sqlalchemy>=2.0.45", +- "asyncmy>=0.2.10", +- "aiomysql>=0.3.2", ++ "pymysql>=1.1.0", + ... +] +``` + +--- + +### 2.2 config.py - 데이터베이스 URL 변경 + +**파일**: `config.py` (라인 74-96) + +```diff +class DatabaseSettings(BaseSettings): + @property + def MYSQL_URL(self) -> str: +- return f"mysql+asyncmy://{self.MYSQL_USER}:{self.MYSQL_PASSWORD}@{self.MYSQL_HOST}:{self.MYSQL_PORT}/{self.MYSQL_DB}" ++ return f"mysql+pymysql://{self.MYSQL_USER}:{self.MYSQL_PASSWORD}@{self.MYSQL_HOST}:{self.MYSQL_PORT}/{self.MYSQL_DB}" +``` + +--- + +### 2.3 app/database/session.py - 세션 설정 전면 수정 + +**파일**: `app/database/session.py` + +#### 현재 코드 (비동기) +```python +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from typing import AsyncGenerator + +engine = create_async_engine( + url=db_settings.MYSQL_URL, + echo=False, + pool_size=10, + max_overflow=10, + pool_timeout=5, + pool_recycle=3600, + pool_pre_ping=True, + pool_reset_on_return="rollback", +) + +AsyncSessionLocal = async_sessionmaker( + bind=engine, + class_=AsyncSession, + expire_on_commit=False, + autoflush=False, +) + +async def get_session() -> AsyncGenerator[AsyncSession, None]: + async with AsyncSessionLocal() as session: + try: + yield session + except Exception as e: + await session.rollback() + raise e +``` + +#### 변경 후 코드 (동기) +```python +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker +from typing import Generator + +engine = create_engine( + url=db_settings.MYSQL_URL, + echo=False, + pool_size=10, + max_overflow=10, + pool_timeout=5, + pool_recycle=3600, + pool_pre_ping=True, + pool_reset_on_return="rollback", +) + +SessionLocal = sessionmaker( + bind=engine, + class_=Session, + expire_on_commit=False, + autoflush=False, +) + +def get_session() -> Generator[Session, None, None]: + with SessionLocal() as session: + try: + yield session + except Exception as e: + session.rollback() + raise e +``` + +#### get_worker_session 함수 변경 + +```diff +- from contextlib import asynccontextmanager ++ from contextlib import contextmanager + +- @asynccontextmanager +- async def get_worker_session() -> AsyncGenerator[AsyncSession, None]: +- worker_engine = create_async_engine( ++ @contextmanager ++ def get_worker_session() -> Generator[Session, None, None]: ++ worker_engine = create_engine( + url=db_settings.MYSQL_URL, + poolclass=NullPool, + ) +- session_factory = async_sessionmaker( +- bind=worker_engine, +- class_=AsyncSession, ++ session_factory = sessionmaker( ++ bind=worker_engine, ++ class_=Session, + expire_on_commit=False, + autoflush=False, + ) + +- async with session_factory() as session: ++ with session_factory() as session: + try: + yield session + finally: +- await session.close() +- await worker_engine.dispose() ++ session.close() ++ worker_engine.dispose() +``` + +--- + +### 2.4 app/*/dependencies.py - 타입 힌트 변경 + +**파일**: `app/song/dependencies.py`, `app/lyric/dependencies.py`, `app/video/dependencies.py` + +```diff +- from sqlalchemy.ext.asyncio import AsyncSession ++ from sqlalchemy.orm import Session + +- SessionDep = Annotated[AsyncSession, Depends(get_session)] ++ SessionDep = Annotated[Session, Depends(get_session)] +``` + +--- + +### 2.5 라우터 파일들 - async/await 제거 + +**영향받는 파일**: +- `app/home/api/routers/v1/home.py` +- `app/lyric/api/routers/v1/lyric.py` +- `app/song/api/routers/v1/song.py` +- `app/video/api/routers/v1/video.py` + +#### 예시: lyric.py (라인 70-90) + +```diff +- async def get_lyric_by_task_id( ++ def get_lyric_by_task_id( + task_id: str, +- session: AsyncSession = Depends(get_session), ++ session: Session = Depends(get_session), + ): +- result = await session.execute(select(Lyric).where(Lyric.task_id == task_id)) ++ result = session.execute(select(Lyric).where(Lyric.task_id == task_id)) + lyric = result.scalar_one_or_none() + ... +``` + +#### 예시: CRUD 작업 (라인 218-260) + +```diff +- async def create_project( ++ def create_project( + request_body: ProjectCreateRequest, +- session: AsyncSession = Depends(get_session), ++ session: Session = Depends(get_session), + ): + project = Project( + store_name=request_body.customer_name, + region=request_body.region, + task_id=task_id, + ) + session.add(project) +- await session.commit() +- await session.refresh(project) ++ session.commit() ++ session.refresh(project) + return project +``` + +#### 예시: 플러시 작업 (home.py 라인 340-350) + +```diff + session.add(image) +- await session.flush() ++ session.flush() + result = image.id +``` + +--- + +### 2.6 서비스 파일들 - Raw SQL 쿼리 변경 + +**영향받는 파일**: +- `app/lyric/services/lyrics.py` +- `app/song/services/song.py` +- `app/video/services/video.py` + +#### 예시: lyrics.py (라인 20-30) + +```diff +- async def get_store_default_info(conn: AsyncConnection): ++ def get_store_default_info(conn: Connection): + query = """SELECT * FROM store_default_info;""" +- result = await conn.execute(text(query)) ++ result = conn.execute(text(query)) + return result.fetchall() +``` + +#### 예시: INSERT 쿼리 (라인 360-400) + +```diff +- async def insert_song_result(conn: AsyncConnection, params: dict): ++ def insert_song_result(conn: Connection, params: dict): + insert_query = """INSERT INTO song_results_all (...) VALUES (...)""" +- await conn.execute(text(insert_query), params) +- await conn.commit() ++ conn.execute(text(insert_query), params) ++ conn.commit() +``` + +--- + +### 2.7 app/home/services/base.py - BaseService 클래스 + +**파일**: `app/home/services/base.py` + +```diff +- from sqlalchemy.ext.asyncio import AsyncSession ++ from sqlalchemy.orm import Session + + class BaseService: +- def __init__(self, model, session: AsyncSession): ++ def __init__(self, model, session: Session): + self.model = model + self.session = session + +- async def _get(self, id: UUID): +- return await self.session.get(self.model, id) ++ def _get(self, id: UUID): ++ return self.session.get(self.model, id) + +- async def _add(self, entity): ++ def _add(self, entity): + self.session.add(entity) +- await self.session.commit() +- await self.session.refresh(entity) ++ self.session.commit() ++ self.session.refresh(entity) + return entity + +- async def _update(self, entity): +- return await self._add(entity) ++ def _update(self, entity): ++ return self._add(entity) + +- async def _delete(self, entity): +- await self.session.delete(entity) ++ def _delete(self, entity): ++ self.session.delete(entity) +``` + +--- + +## 3. 모델 파일 - 변경 불필요 + +다음 모델 파일들은 **변경이 필요 없습니다**: +- `app/home/models.py` +- `app/lyric/models.py` +- `app/song/models.py` +- `app/video/models.py` + +모델 정의 자체는 비동기/동기와 무관하게 동일합니다. `Mapped`, `mapped_column`, `relationship` 등은 그대로 사용 가능합니다. + +단, **관계 로딩 전략**에서 `lazy="selectin"` 설정은 동기 환경에서도 작동하지만, 필요에 따라 `lazy="joined"` 또는 `lazy="subquery"`로 변경할 수 있습니다. + +--- + +## 4. 수정 패턴 요약 + +### 4.1 Import 변경 패턴 + +```diff +# 엔진/세션 관련 +- from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine ++ from sqlalchemy import create_engine ++ from sqlalchemy.orm import Session, sessionmaker + +# 타입 힌트 +- from typing import AsyncGenerator ++ from typing import Generator + +# 컨텍스트 매니저 +- from contextlib import asynccontextmanager ++ from contextlib import contextmanager +``` + +### 4.2 함수 정의 변경 패턴 + +```diff +- async def function_name(...): ++ def function_name(...): +``` + +### 4.3 await 제거 패턴 + +```diff +- result = await session.execute(query) ++ result = session.execute(query) + +- await session.commit() ++ session.commit() + +- await session.refresh(obj) ++ session.refresh(obj) + +- await session.flush() ++ session.flush() + +- await session.rollback() ++ session.rollback() + +- await session.close() ++ session.close() + +- await engine.dispose() ++ engine.dispose() +``` + +### 4.4 컨텍스트 매니저 변경 패턴 + +```diff +- async with SessionLocal() as session: ++ with SessionLocal() as session: +``` + +--- + +## 5. 영향받는 파일 목록 + +### 5.1 반드시 수정해야 하는 파일 + +| 파일 | 수정 범위 | +|-----|---------| +| `pyproject.toml` | 의존성 변경 | +| `config.py` | DB URL 변경 | +| `app/database/session.py` | 전면 수정 | +| `app/database/session-prod.py` | 전면 수정 | +| `app/home/api/routers/v1/home.py` | async/await 제거 | +| `app/lyric/api/routers/v1/lyric.py` | async/await 제거 | +| `app/song/api/routers/v1/song.py` | async/await 제거 | +| `app/video/api/routers/v1/video.py` | async/await 제거 | +| `app/lyric/services/lyrics.py` | async/await 제거 | +| `app/song/services/song.py` | async/await 제거 | +| `app/video/services/video.py` | async/await 제거 | +| `app/home/services/base.py` | async/await 제거 | +| `app/song/dependencies.py` | 타입 힌트 변경 | +| `app/lyric/dependencies.py` | 타입 힌트 변경 | +| `app/video/dependencies.py` | 타입 힌트 변경 | +| `app/dependencies/database.py` | 타입 힌트 변경 | + +### 5.2 수정 불필요한 파일 + +| 파일 | 이유 | +|-----|-----| +| `app/home/models.py` | 모델 정의는 동기/비동기 무관 | +| `app/lyric/models.py` | 모델 정의는 동기/비동기 무관 | +| `app/song/models.py` | 모델 정의는 동기/비동기 무관 | +| `app/video/models.py` | 모델 정의는 동기/비동기 무관 | + +--- + +## 6. 주의사항 + +### 6.1 FastAPI와의 호환성 + +FastAPI는 동기 엔드포인트도 지원합니다. 동기 함수는 스레드 풀에서 실행됩니다: + +```python +# 동기 엔드포인트 - FastAPI가 자동으로 스레드풀에서 실행 +@router.get("/items/{item_id}") +def get_item(item_id: int, session: Session = Depends(get_session)): + return session.get(Item, item_id) +``` + +### 6.2 성능 고려사항 + +동기식으로 전환 시 고려할 점: +- **동시성 감소**: 비동기 I/O의 이점 상실 +- **스레드 풀 의존**: 동시 요청이 많을 경우 스레드 풀 크기 조정 필요 +- **블로킹 I/O**: DB 쿼리 중 다른 요청 처리 불가 + +### 6.3 백그라운드 작업 + +현재 `get_worker_session()`으로 별도 이벤트 루프에서 실행되는 백그라운드 작업이 있습니다. 동기식 전환 시 스레드 기반 백그라운드 작업으로 변경해야 합니다: + +```python +from concurrent.futures import ThreadPoolExecutor + +executor = ThreadPoolExecutor(max_workers=4) + +def background_task(): + with get_worker_session() as session: + # 작업 수행 + pass + +# 실행 +executor.submit(background_task) +``` + +--- + +## 7. 마이그레이션 단계 + +### Step 1: 의존성 변경 +1. `pyproject.toml` 수정 +2. `pip install pymysql` 또는 `uv sync` 실행 + +### Step 2: 설정 파일 수정 +1. `config.py`의 DB URL 변경 +2. `app/database/session.py` 전면 수정 + +### Step 3: 라우터 수정 +1. 각 라우터 파일의 `async def` → `def` 변경 +2. 모든 `await` 키워드 제거 +3. `AsyncSession` → `Session` 타입 힌트 변경 + +### Step 4: 서비스 수정 +1. 서비스 파일들의 async/await 제거 +2. Raw SQL 쿼리 함수들 수정 + +### Step 5: 의존성 수정 +1. `dependencies.py` 파일들의 타입 힌트 변경 + +### Step 6: 테스트 +1. 모든 엔드포인트 기능 테스트 +2. 성능 테스트 (동시 요청 처리 확인) + +--- + +## 8. 결론 + +비동기에서 동기로 전환은 기술적으로 가능하지만, 다음을 고려해야 합니다: + +**장점**: +- 코드 복잡도 감소 (async/await 제거) +- 디버깅 용이 +- 레거시 라이브러리와의 호환성 향상 + +**단점**: +- 동시성 처리 능력 감소 +- I/O 바운드 작업에서 성능 저하 가능 +- FastAPI의 비동기 장점 미활용 + +현재 프로젝트가 FastAPI 기반이고 I/O 작업(DB, 외부 API 호출)이 많다면, **비동기 유지를 권장**합니다. 동기 전환은 특별한 요구사항(레거시 통합, 팀 역량 등)이 있을 때만 고려하시기 바랍니다.