# CastAD 백엔드 - LangChain, LangGraph, RAG 적용 설계 보고서 ## 목차 1. [현재 시스템 분석](#1-현재-시스템-분석) 2. [LangChain 적용 설계](#2-langchain-적용-설계) 3. [LangGraph 적용 설계](#3-langgraph-적용-설계) 4. [RAG 적용 설계](#4-rag-적용-설계) 5. [통합 아키텍처](#5-통합-아키텍처) 6. [기대 효과](#6-기대-효과) 7. [구현 로드맵](#7-구현-로드맵) 8. [결론](#8-결론) --- ## 1. 현재 시스템 분석 ### 1.1 프로젝트 개요 CastAD는 **AI 기반 광고 음악 및 영상 자동 생성 서비스**입니다. 네이버 지도에서 수집한 숙박시설 정보를 기반으로 마케팅용 자동 영상을 생성하는 통합 플랫폼입니다. **핵심 파이프라인:** ``` 사용자 입력 → 가사 자동 생성 → 음악 자동 생성 → 영상 자동 생성 ``` ### 1.2 현재 기술 스택 | 구분 | 기술 | |------|------| | Backend Framework | FastAPI (async/await 기반) | | ORM | SQLAlchemy 2.0 (비동기) | | Database | MySQL (asyncmy 드라이버) | | Cache | Redis | | AI/API | OpenAI ChatGPT, Suno AI, Creatomate | | Storage | Azure Blob Storage | ### 1.3 현재 핵심 흐름 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 현재 파이프라인 구조 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ POST /crawling (선택) │ │ │ │ │ ▼ │ │ POST /lyric/generate ──────► ChatGPT API ──────► 가사 저장 │ │ │ │ │ ▼ │ │ POST /song/generate ───────► Suno API ─────────► 음악 저장 │ │ │ │ │ │ │ 클라이언트 폴링 │ │ │ │ │ │ ▼ ▼ │ │ POST /video/generate ──────► Creatomate API ───► 영상 저장 │ │ │ │ │ │ │ 클라이언트 폴링 │ │ ▼ ▼ │ │ GET /video/download ◄──────── 완료 ──────────► Azure Blob │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 1.4 현재 시스템의 한계점 | 문제점 | 설명 | |--------|------| | **분산된 상태 관리** | 각 API 호출마다 독립적인 상태 관리, 전체 파이프라인 추적 어려움 | | **클라이언트 의존적 폴링** | 음악/영상 생성 완료 여부를 클라이언트가 반복 확인해야 함 | | **하드코딩된 프롬프트** | ChatGPT 프롬프트가 코드에 직접 작성, 유연성 부족 | | **에러 복구 제한적** | 단순 실패 패턴 검사만 수행, 자동 복구 메커니즘 없음 | | **과거 데이터 미활용** | 성공한 가사/마케팅 사례 재활용 불가 | | **일관성 없는 품질** | 동일 조건에서도 결과물 품질 편차 존재 | --- ## 2. LangChain 적용 설계 ### 2.1 적용 대상 및 목적 LangChain은 **LLM 애플리케이션 개발을 위한 프레임워크**로, 프롬프트 관리, 체인 구성, 출력 파싱 등을 체계화합니다. **적용 대상:** 1. 가사 생성 서비스 (`ChatgptService`) 2. 마케팅 분석 서비스 3. 다국어 처리 로직 ### 2.2 설계 1: 프롬프트 템플릿 시스템 **현재 문제:** ```python # 현재: chatgpt_prompt.py prompt = f""" [ROLE] You are a marketing expert... [INPUT] Customer: {customer_name}, Region: {region}... """ ``` **개선 설계:** ```python # 개선: langchain 적용 from langchain.prompts import PromptTemplate, ChatPromptTemplate from langchain_openai import ChatOpenAI # 가사 생성 프롬프트 템플릿 LYRIC_PROMPT = ChatPromptTemplate.from_messages([ ("system", """[ROLE] You are a marketing expert and professional lyricist. You specialize in creating catchy, emotional lyrics for travel and accommodation marketing. [LANGUAGE REQUIREMENT] Output MUST be 100% in {language}. No other languages allowed."""), ("human", """[INPUT] Customer Name: {customer_name} Region: {region} Detailed Information: {detail_info} [OUTPUT REQUIREMENTS] - 8-12 lines of lyrics - Focus on: relaxation, healing, beautiful scenery, memorable experiences - Style: warm, inviting, poetic - Include location-specific imagery Generate lyrics now:""") ]) # 체인 구성 lyric_chain = LYRIC_PROMPT | ChatOpenAI(model="gpt-5-mini-mini") | StrOutputParser() # 사용 result = await lyric_chain.ainvoke({ "customer_name": "스테이뫰", "region": "강원도 속초", "detail_info": "해변 근처 펜션", "language": "Korean" }) ``` **이점:** - 프롬프트 버전 관리 용이 - A/B 테스팅 지원 - 입력 변수 명확한 정의 ### 2.3 설계 2: 다단계 마케팅 분석 체인 **목적:** 복잡한 마케팅 분석을 단계별로 수행하여 품질 향상 ```python from langchain.chains import SequentialChain from langchain.prompts import PromptTemplate from langchain_openai import ChatOpenAI # Step 1: 경쟁사 분석 체인 competitor_prompt = PromptTemplate( input_variables=["region", "business_type"], template=""" {region} 지역의 {business_type} 업종에 대해 분석하세요: - 주요 경쟁사 특성 - 차별화 포인트 - 시장 포지셔닝 """ ) competitor_chain = competitor_prompt | ChatOpenAI() | StrOutputParser() # Step 2: 타겟 고객 분석 체인 audience_prompt = PromptTemplate( input_variables=["region", "competitor_analysis"], template=""" 경쟁사 분석 결과: {competitor_analysis} {region} 지역의 주요 타겟 고객층을 분석하세요: - 연령대 및 특성 - 주요 니즈 - 결정 요인 """ ) audience_chain = audience_prompt | ChatOpenAI() | StrOutputParser() # Step 3: 마케팅 전략 종합 체인 strategy_prompt = PromptTemplate( input_variables=["customer_name", "competitor_analysis", "audience_analysis"], template=""" 경쟁사 분석: {competitor_analysis} 타겟 고객: {audience_analysis} {customer_name}을 위한 마케팅 전략을 제안하세요: - 핵심 메시지 - 차별화 전략 - 추천 가사 방향 """ ) strategy_chain = strategy_prompt | ChatOpenAI() | StrOutputParser() # 통합 순차 체인 marketing_analysis_chain = ( {"region": RunnablePassthrough(), "business_type": RunnablePassthrough()} | competitor_chain | {"competitor_analysis": RunnablePassthrough(), "region": RunnablePassthrough()} | audience_chain | {"competitor_analysis": ..., "audience_analysis": RunnablePassthrough(), "customer_name": ...} | strategy_chain ) ``` **이점:** - 분석의 깊이와 체계성 향상 - 각 단계별 결과 추적 가능 - 중간 결과 캐싱 가능 ### 2.4 설계 3: 출력 파싱 및 검증 **목적:** ChatGPT 응답의 구조화 및 자동 검증 ```python from langchain.output_parsers import PydanticOutputParser from langchain_core.output_parsers import OutputFixingParser from pydantic import BaseModel, Field, validator # 가사 출력 스키마 class LyricOutput(BaseModel): title: str = Field(description="가사의 제목 (선택)") lyrics: list[str] = Field(description="가사 각 줄", min_items=8, max_items=12) mood: str = Field(description="가사의 분위기: warm, energetic, romantic 등") @validator('lyrics') def validate_line_count(cls, v): if len(v) < 8: raise ValueError("가사는 최소 8줄 이상이어야 합니다") return v # 파서 생성 parser = PydanticOutputParser(pydantic_object=LyricOutput) # 자동 수정 파서 (파싱 실패 시 LLM으로 재시도) fixing_parser = OutputFixingParser.from_llm( parser=parser, llm=ChatOpenAI(model="gpt-5-mini-mini") ) # 프롬프트에 포맷 지시 추가 prompt_with_format = LYRIC_PROMPT.partial( format_instructions=parser.get_format_instructions() ) ``` **이점:** - 응답 형식 일관성 보장 - 자동 오류 복구 - 타입 안전성 확보 ### 2.5 설계 4: Few-Shot 다국어 프롬프트 **목적:** 각 언어별 고품질 예시 제공으로 번역/생성 품질 향상 ```python from langchain.prompts import FewShotPromptTemplate, PromptTemplate # 언어별 예시 LANGUAGE_EXAMPLES = { "Korean": [ { "input": "강원도 속초 해변 펜션", "output": """푸른 바다 물결 위에 새벽빛이 춤을 추고 당신의 하루를 담아 스테이뫰에서 쉬어가요""" } ], "English": [ { "input": "Sokcho beach pension, Gangwon-do", "output": """Where ocean waves meet morning light A peaceful haven comes in sight Let your worries drift away At Stay Meoum, find your stay""" } ], "Japanese": [ { "input": "江原道束草ビーチペンション", "output": """青い海の波の上に 朝の光が踊る時 あなたの一日を包み込む ステイメウムで休んでいこう""" } ], "Chinese": [...], "Thai": [...], "Vietnamese": [...] } # Few-Shot 프롬프트 생성 def create_multilingual_prompt(language: str): example_prompt = PromptTemplate( input_variables=["input", "output"], template="입력: {input}\n가사:\n{output}" ) return FewShotPromptTemplate( examples=LANGUAGE_EXAMPLES.get(language, LANGUAGE_EXAMPLES["Korean"]), example_prompt=example_prompt, prefix="다음 예시를 참고하여 고품질 가사를 생성하세요:", suffix="입력: {customer_info}\n가사:", input_variables=["customer_info"] ) ``` **이점:** - 언어별 문화적 뉘앙스 반영 - 일관된 스타일 유지 - 번역 품질 대폭 향상 --- ## 3. LangGraph 적용 설계 ### 3.1 적용 대상 및 목적 LangGraph는 **복잡한 다단계 워크플로우를 상태 기계(State Machine)로 관리**하는 프레임워크입니다. **적용 대상:** 1. 전체 영상 생성 파이프라인 (가사 → 음악 → 영상) 2. 비동기 폴링 자동화 3. 에러 처리 및 재시도 로직 ### 3.2 설계 1: 통합 파이프라인 그래프 **핵심 설계:** ```python from langgraph.graph import StateGraph, END from typing import TypedDict, Optional, Literal from datetime import datetime # 파이프라인 상태 정의 class PipelineState(TypedDict): # 입력 task_id: str customer_name: str region: str detail_info: str language: str images: list[str] orientation: Literal["vertical", "horizontal"] # 중간 결과 lyric: Optional[str] lyric_status: Optional[str] song_url: Optional[str] song_task_id: Optional[str] song_status: Optional[str] song_duration: Optional[float] video_url: Optional[str] video_render_id: Optional[str] video_status: Optional[str] # 메타데이터 error: Optional[str] error_step: Optional[str] started_at: datetime completed_at: Optional[datetime] retry_count: int # 그래프 빌더 def build_video_pipeline() -> StateGraph: graph = StateGraph(PipelineState) # ===== 노드 정의 ===== # 1. 가사 생성 노드 async def generate_lyric(state: PipelineState) -> PipelineState: """ChatGPT로 가사 생성 (동기)""" try: lyric_chain = create_lyric_chain() # LangChain 체인 lyric = await lyric_chain.ainvoke({ "customer_name": state["customer_name"], "region": state["region"], "detail_info": state["detail_info"], "language": state["language"] }) # DB 저장 await save_lyric_to_db(state["task_id"], lyric) return { **state, "lyric": lyric, "lyric_status": "completed" } except Exception as e: return { **state, "error": str(e), "error_step": "lyric_generation", "lyric_status": "failed" } # 2. 음악 생성 요청 노드 async def request_song(state: PipelineState) -> PipelineState: """Suno API에 음악 생성 요청""" try: suno = SunoAPIClient() task_id = await suno.generate( prompt=state["lyric"], genre="K-Pop, Emotional" ) # DB 저장 await save_song_request_to_db(state["task_id"], task_id) return { **state, "song_task_id": task_id, "song_status": "processing" } except Exception as e: return { **state, "error": str(e), "error_step": "song_request", "song_status": "failed" } # 3. 음악 폴링 노드 async def poll_song_status(state: PipelineState) -> PipelineState: """Suno 상태 폴링 (최대 5분)""" suno = SunoAPIClient() max_attempts = 60 # 5초 간격 × 60 = 5분 for attempt in range(max_attempts): result = await suno.get_task_status(state["song_task_id"]) if result["status"] == "SUCCESS": audio_url = result["clips"][0]["audio_url"] duration = result["clips"][0]["duration"] # DB 업데이트 await update_song_status( state["task_id"], "completed", audio_url, duration ) return { **state, "song_url": audio_url, "song_duration": duration, "song_status": "completed" } elif result["status"] == "FAILED": return { **state, "error": "Suno generation failed", "error_step": "song_polling", "song_status": "failed" } await asyncio.sleep(5) # 5초 대기 return { **state, "error": "Song generation timeout", "error_step": "song_polling", "song_status": "timeout" } # 4. 영상 생성 요청 노드 async def request_video(state: PipelineState) -> PipelineState: """Creatomate API에 영상 렌더링 요청""" try: creatomate = CreatomateClient() render_id = await creatomate.render( images=state["images"], music_url=state["song_url"], lyrics=state["lyric"], duration=state["song_duration"], orientation=state["orientation"] ) # DB 저장 await save_video_request_to_db(state["task_id"], render_id) return { **state, "video_render_id": render_id, "video_status": "processing" } except Exception as e: return { **state, "error": str(e), "error_step": "video_request", "video_status": "failed" } # 5. 영상 폴링 노드 async def poll_video_status(state: PipelineState) -> PipelineState: """Creatomate 상태 폴링 (최대 10분)""" creatomate = CreatomateClient() max_attempts = 120 # 5초 간격 × 120 = 10분 for attempt in range(max_attempts): result = await creatomate.get_render_status(state["video_render_id"]) if result["status"] == "succeeded": video_url = result["url"] # Azure Blob 업로드 blob_url = await upload_to_azure(video_url, state["task_id"]) # DB 업데이트 await update_video_status(state["task_id"], "completed", blob_url) return { **state, "video_url": blob_url, "video_status": "completed", "completed_at": datetime.now() } elif result["status"] == "failed": return { **state, "error": "Creatomate rendering failed", "error_step": "video_polling", "video_status": "failed" } await asyncio.sleep(5) return { **state, "error": "Video generation timeout", "error_step": "video_polling", "video_status": "timeout" } # 6. 에러 처리 노드 async def handle_error(state: PipelineState) -> PipelineState: """에러 로깅 및 알림""" await log_pipeline_error( task_id=state["task_id"], error=state["error"], step=state["error_step"] ) # 선택: 슬랙/이메일 알림 await send_error_notification(state) return state # ===== 노드 추가 ===== graph.add_node("generate_lyric", generate_lyric) graph.add_node("request_song", request_song) graph.add_node("poll_song", poll_song_status) graph.add_node("request_video", request_video) graph.add_node("poll_video", poll_video_status) graph.add_node("handle_error", handle_error) # ===== 엣지 정의 ===== # 시작점 graph.set_entry_point("generate_lyric") # 조건부 분기: 가사 생성 후 def route_after_lyric(state: PipelineState): if state.get("error"): return "handle_error" return "request_song" graph.add_conditional_edges( "generate_lyric", route_after_lyric, { "request_song": "request_song", "handle_error": "handle_error" } ) # 조건부 분기: 음악 요청 후 def route_after_song_request(state: PipelineState): if state.get("error"): return "handle_error" return "poll_song" graph.add_conditional_edges( "request_song", route_after_song_request ) # 조건부 분기: 음악 폴링 후 def route_after_song_poll(state: PipelineState): if state.get("error") or state["song_status"] in ["failed", "timeout"]: return "handle_error" return "request_video" graph.add_conditional_edges( "poll_song", route_after_song_poll ) # 조건부 분기: 영상 요청 후 graph.add_conditional_edges( "request_video", lambda s: "handle_error" if s.get("error") else "poll_video" ) # 조건부 분기: 영상 폴링 후 graph.add_conditional_edges( "poll_video", lambda s: "handle_error" if s.get("error") else END ) # 에러 핸들러는 항상 종료 graph.add_edge("handle_error", END) return graph.compile() ``` **그래프 시각화:** ``` ┌─────────────────┐ │ generate_lyric │ └────────┬────────┘ │ ┌────────▼────────┐ ┌─────┤ route check ├─────┐ │ └─────────────────┘ │ [error] [success] │ │ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ handle_error │ │ request_song │ └────────┬────────┘ └────────┬────────┘ │ │ ▼ ▼ END ┌─────────────────┐ │ poll_song │ └────────┬────────┘ │ ┌────────▼────────┐ ┌─────┤ route check ├─────┐ │ └─────────────────┘ │ [error/timeout] [success] │ │ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ handle_error │ │ request_video │ └────────┬────────┘ └────────┬────────┘ │ │ ▼ ▼ END ┌─────────────────┐ │ poll_video │ └────────┬────────┘ │ ┌────────▼────────┐ ┌─────┤ route check ├─────┐ │ └─────────────────┘ │ [error] [success] │ │ ▼ ▼ ┌─────────────────┐ END │ handle_error │ (파이프라인 완료) └────────┬────────┘ │ ▼ END ``` ### 3.3 설계 2: 재시도 및 폴백 메커니즘 ```python from langgraph.graph import StateGraph class RetryState(TypedDict): task_id: str retry_count: int max_retries: int last_error: Optional[str] # ... 기타 필드 def build_retry_aware_pipeline(): graph = StateGraph(RetryState) async def generate_song_with_retry(state: RetryState) -> RetryState: """재시도 로직이 포함된 음악 생성""" try: # 1차 시도: Suno API result = await suno_generate(state["lyric"]) return {**state, "song_url": result, "retry_count": 0} except SunoRateLimitError: # 재시도 1: 딜레이 후 재시도 if state["retry_count"] < state["max_retries"]: await asyncio.sleep(30) # 30초 대기 return { **state, "retry_count": state["retry_count"] + 1, "last_error": "rate_limit" } except SunoAPIError as e: # 재시도 2: 프롬프트 수정 후 재시도 if "invalid lyrics" in str(e) and state["retry_count"] < 2: simplified_lyric = await simplify_lyrics(state["lyric"]) return { **state, "lyric": simplified_lyric, "retry_count": state["retry_count"] + 1 } # 폴백: 대체 서비스 사용 try: result = await alternative_music_service(state["lyric"]) return {**state, "song_url": result, "used_fallback": True} except: pass return { **state, "error": "All music generation attempts failed", "song_status": "failed" } # 조건부 재시도 엣지 def should_retry_song(state: RetryState): if state.get("song_url"): return "next_step" if state["retry_count"] < state["max_retries"]: return "retry_song" return "handle_error" graph.add_conditional_edges( "generate_song", should_retry_song, { "retry_song": "generate_song", # 자기 자신으로 루프 "next_step": "request_video", "handle_error": "handle_error" } ) return graph.compile() ``` ### 3.4 설계 3: 병렬 처리 지원 ```python from langgraph.types import Send from langgraph.graph import StateGraph class ParallelState(TypedDict): task_id: str images: list[str] analyzed_images: list[dict] # 병렬 분석 결과 # ... def build_parallel_pipeline(): graph = StateGraph(ParallelState) # 이미지 분석을 병렬로 수행 async def analyze_single_image(state: dict) -> dict: """단일 이미지 분석""" image_url = state["image_url"] analysis = await vision_model.analyze(image_url) return { "image_url": image_url, "analysis": analysis, "mood": analysis.get("mood"), "colors": analysis.get("dominant_colors") } # 팬아웃: 여러 이미지를 병렬로 분석 def fanout_images(state: ParallelState): return [ Send("analyze_image", {"image_url": img, "task_id": state["task_id"]}) for img in state["images"] ] # 팬인: 분석 결과 수집 async def collect_analyses(state: ParallelState) -> ParallelState: # LangGraph가 자동으로 병렬 결과를 수집 return state graph.add_node("analyze_image", analyze_single_image) graph.add_node("collect", collect_analyses) graph.add_conditional_edges( "start", fanout_images # 여러 Send 반환 → 병렬 실행 ) return graph.compile() ``` ### 3.5 FastAPI 통합 ```python # main.py 또는 video/api/routers/v1/video.py from fastapi import APIRouter, BackgroundTasks from langgraph.graph import StateGraph router = APIRouter() pipeline = build_video_pipeline() @router.post("/video/generate-full") async def generate_full_video( request: FullVideoRequest, background_tasks: BackgroundTasks ): """단일 API 호출로 전체 파이프라인 실행""" initial_state: PipelineState = { "task_id": str(uuid7()), "customer_name": request.customer_name, "region": request.region, "detail_info": request.detail_info, "language": request.language, "images": request.images, "orientation": request.orientation, "lyric": None, "lyric_status": None, "song_url": None, "song_task_id": None, "song_status": None, "song_duration": None, "video_url": None, "video_render_id": None, "video_status": None, "error": None, "error_step": None, "started_at": datetime.now(), "completed_at": None, "retry_count": 0 } # 백그라운드에서 파이프라인 실행 background_tasks.add_task(run_pipeline_async, initial_state) return { "task_id": initial_state["task_id"], "status": "processing", "message": "Pipeline started. Use GET /video/pipeline-status/{task_id} to check progress." } async def run_pipeline_async(initial_state: PipelineState): """백그라운드에서 LangGraph 파이프라인 실행""" try: final_state = await pipeline.ainvoke(initial_state) # 결과 DB 저장 await save_pipeline_result(final_state) # 완료 알림 (웹훅, 이메일 등) if final_state.get("video_url"): await send_completion_notification(final_state) except Exception as e: await log_pipeline_error(initial_state["task_id"], str(e)) @router.get("/video/pipeline-status/{task_id}") async def get_pipeline_status(task_id: str): """파이프라인 진행 상태 조회""" status = await get_status_from_db(task_id) return { "task_id": task_id, "lyric_status": status.lyric_status, "song_status": status.song_status, "video_status": status.video_status, "overall_status": determine_overall_status(status), "video_url": status.video_url if status.video_status == "completed" else None, "error": status.error } ``` --- ## 4. RAG 적용 설계 ### 4.1 적용 대상 및 목적 RAG(Retrieval-Augmented Generation)는 **외부 지식 기반을 검색하여 LLM 응답 품질을 향상**시키는 기법입니다. **적용 대상:** 1. 마케팅 지식베이스 (성공 사례) 2. 지역별/업종별 가사 예시 3. 이미지 메타데이터 활용 4. 프롬프트 최적화 ### 4.2 설계 1: 마케팅 지식베이스 RAG **아키텍처:** ``` ┌─────────────────────────────────────────────────────────────┐ │ 마케팅 지식베이스 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Document Store (벡터 DB) │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Collection: marketing_knowledge │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────┐ │ │ │ │ │ 문서 1: 강원도 속초 펜션 마케팅 성공 사례 │ │ │ │ │ │ - 가사 예시 │ │ │ │ │ │ - 타겟 고객 분석 │ │ │ │ │ │ - 효과적인 키워드 │ │ │ │ │ │ - 영상 조회수/반응 │ │ │ │ │ └──────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────┐ │ │ │ │ │ 문서 2: 제주도 게스트하우스 마케팅 사례 │ │ │ │ │ │ ... │ │ │ │ │ └──────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ ... (수백 개의 사례) │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ 유사도 검색 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 가사 생성 프롬프트 │ ├─────────────────────────────────────────────────────────────┤ │ "다음 유사 사례를 참고하여 가사를 생성하세요: │ │ [검색된 성공 사례 1] │ │ [검색된 성공 사례 2] │ │ ..." │ └─────────────────────────────────────────────────────────────┘ ``` **구현 코드:** ```python from langchain.embeddings.openai import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.document_loaders import JSONLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document # 1. 임베딩 모델 설정 embeddings = OpenAIEmbeddings( model="text-embedding-3-small", openai_api_key=settings.CHATGPT_API_KEY ) # 2. 벡터 스토어 초기화 vector_store = Chroma( collection_name="marketing_knowledge", embedding_function=embeddings, persist_directory="./data/chroma_db" ) # 3. 마케팅 사례 문서 구조 class MarketingCase(BaseModel): case_id: str region: str business_type: str # "pension", "guesthouse", "hotel" target_audience: str successful_lyrics: str keywords: list[str] performance_metrics: dict # views, engagement, conversions created_at: datetime # 4. 문서 추가 함수 async def add_marketing_case(case: MarketingCase): """성공한 마케팅 사례를 벡터 스토어에 추가""" # 메타데이터와 함께 문서 생성 document = Document( page_content=f""" 지역: {case.region} 업종: {case.business_type} 타겟 고객: {case.target_audience} 성공 가사: {case.successful_lyrics} 효과적인 키워드: {', '.join(case.keywords)} 성과: 조회수 {case.performance_metrics.get('views', 0)}, 참여율 {case.performance_metrics.get('engagement', 0)}% """, metadata={ "case_id": case.case_id, "region": case.region, "business_type": case.business_type, "created_at": case.created_at.isoformat() } ) vector_store.add_documents([document]) vector_store.persist() # 5. RAG 기반 가사 생성 async def generate_lyrics_with_rag( customer_name: str, region: str, business_type: str, language: str ) -> str: """RAG를 활용한 고품질 가사 생성""" # 유사 사례 검색 query = f"{region} {business_type} 마케팅 가사" similar_cases = vector_store.similarity_search( query, k=3, filter={"business_type": business_type} # 같은 업종만 ) # 검색 결과를 프롬프트에 포함 examples_text = "\n\n".join([ f"### 참고 사례 {i+1}\n{doc.page_content}" for i, doc in enumerate(similar_cases) ]) # LangChain 프롬프트 구성 rag_prompt = ChatPromptTemplate.from_messages([ ("system", """당신은 마케팅 전문가이자 작사가입니다. 다음 성공 사례를 참고하여 새로운 가사를 생성하세요. 참고 사례의 스타일과 키워드를 활용하되, 고유한 내용을 만드세요. {examples}"""), ("human", """ 고객명: {customer_name} 지역: {region} 언어: {language} 위 정보를 바탕으로 8-12줄의 감성적인 마케팅 가사를 생성하세요. """) ]) chain = rag_prompt | ChatOpenAI(model="gpt-5-mini") | StrOutputParser() result = await chain.ainvoke({ "examples": examples_text, "customer_name": customer_name, "region": region, "language": language }) return result ``` ### 4.3 설계 2: 지역별 특화 RAG **목적:** 각 지역의 특성(문화, 관광지, 특산물 등)을 반영한 가사 생성 ```python # 지역 정보 문서 구조 class RegionInfo(BaseModel): region_name: str province: str famous_attractions: list[str] local_foods: list[str] cultural_keywords: list[str] seasonal_events: list[dict] # {"season": "summer", "event": "해수욕장 개장"} atmosphere: list[str] # ["고즈넉한", "활기찬", "낭만적인"] # 지역 정보 벡터 스토어 region_store = Chroma( collection_name="region_knowledge", embedding_function=embeddings, persist_directory="./data/chroma_region" ) # 지역 정보 추가 예시 regions_data = [ RegionInfo( region_name="속초", province="강원도", famous_attractions=["설악산", "속초해변", "영금정", "아바이마을"], local_foods=["오징어순대", "물회", "생선구이"], cultural_keywords=["동해바다", "일출", "산과 바다", "청정자연"], seasonal_events=[ {"season": "summer", "event": "속초해변 피서"}, {"season": "autumn", "event": "설악산 단풍"} ], atmosphere=["시원한", "청량한", "자연친화적", "힐링"] ), # ... 더 많은 지역 ] async def enrich_lyrics_with_region_info( base_lyrics: str, region: str ) -> str: """지역 정보로 가사 보강""" # 지역 정보 검색 region_docs = region_store.similarity_search(region, k=1) if not region_docs: return base_lyrics region_info = region_docs[0].page_content # 가사에 지역 특성 반영 enrichment_prompt = ChatPromptTemplate.from_messages([ ("system", """당신은 가사 편집 전문가입니다. 주어진 기본 가사에 지역의 특성을 자연스럽게 녹여내세요. 지역 정보: {region_info}"""), ("human", """기본 가사: {base_lyrics} 위 가사에 지역의 특성(명소, 분위기, 키워드)을 2-3개 자연스럽게 추가하세요. 원래 가사의 운율과 분위기를 유지하세요.""") ]) chain = enrichment_prompt | ChatOpenAI() | StrOutputParser() return await chain.ainvoke({ "region_info": region_info, "base_lyrics": base_lyrics }) ``` ### 4.4 설계 3: 이미지 메타데이터 RAG **목적:** 업로드된 이미지의 분석 결과를 저장하고, 영상 생성 시 최적의 이미지 순서 결정 ```python from langchain_openai import ChatOpenAI # Vision 모델로 이미지 분석 vision_model = ChatOpenAI(model="gpt-5-mini") # 이미지 분석 문서 구조 class ImageAnalysis(BaseModel): image_url: str task_id: str description: str dominant_colors: list[str] mood: str # "warm", "cool", "neutral" scene_type: str # "interior", "exterior", "nature", "food" suggested_position: str # "opening", "middle", "closing" quality_score: float # 0.0 ~ 1.0 # 이미지 메타데이터 벡터 스토어 image_store = Chroma( collection_name="image_metadata", embedding_function=embeddings, persist_directory="./data/chroma_images" ) async def analyze_and_store_image(image_url: str, task_id: str): """이미지 분석 후 벡터 스토어에 저장""" # gpt-5-mini Vision으로 이미지 분석 analysis_response = await vision_model.ainvoke([ { "type": "text", "text": """이미지를 분석하고 다음 JSON 형식으로 응답하세요: { "description": "이미지 설명 (2-3문장)", "dominant_colors": ["색상1", "색상2"], "mood": "warm/cool/neutral 중 하나", "scene_type": "interior/exterior/nature/food 중 하나", "suggested_position": "opening/middle/closing 중 하나 (영상에서 적합한 위치)", "quality_score": 0.0~1.0 (이미지 품질/선명도) }""" }, { "type": "image_url", "image_url": {"url": image_url} } ]) analysis = json.loads(analysis_response.content) # 문서 생성 및 저장 document = Document( page_content=f""" 이미지 설명: {analysis['description']} 분위기: {analysis['mood']} 장면 유형: {analysis['scene_type']} 추천 위치: {analysis['suggested_position']} 색상: {', '.join(analysis['dominant_colors'])} """, metadata={ "image_url": image_url, "task_id": task_id, **analysis } ) image_store.add_documents([document]) image_store.persist() return ImageAnalysis(image_url=image_url, task_id=task_id, **analysis) async def get_optimal_image_order( task_id: str, music_mood: str, # 음악 분위기 lyrics_theme: str # 가사 주제 ) -> list[str]: """음악과 가사에 맞는 최적의 이미지 순서 결정""" # 해당 task의 모든 이미지 조회 all_images = image_store.get( where={"task_id": task_id} ) # 음악/가사 분위기에 맞는 이미지 우선 검색 query = f"{music_mood} {lyrics_theme} 마케팅 영상" sorted_images = image_store.similarity_search( query, k=len(all_images), filter={"task_id": task_id} ) # 이미지 순서 결정 로직 opening_images = [img for img in sorted_images if img.metadata["suggested_position"] == "opening"] middle_images = [img for img in sorted_images if img.metadata["suggested_position"] == "middle"] closing_images = [img for img in sorted_images if img.metadata["suggested_position"] == "closing"] # 품질 점수로 정렬 opening_images.sort(key=lambda x: x.metadata["quality_score"], reverse=True) closing_images.sort(key=lambda x: x.metadata["quality_score"], reverse=True) # 최종 순서 ordered = ( opening_images[:2] + # 시작 2장 middle_images + # 중간 이미지들 closing_images[:1] # 마무리 1장 ) return [img.metadata["image_url"] for img in ordered] ``` ### 4.5 설계 4: 프롬프트 히스토리 RAG **목적:** 과거 성공/실패한 프롬프트를 학습하여 프롬프트 품질 지속 개선 ```python # 프롬프트 결과 문서 구조 class PromptResult(BaseModel): prompt_id: str prompt_text: str result_text: str success: bool failure_reason: Optional[str] category: str # "lyric", "marketing_analysis", "region_enrichment" metrics: dict # {"length": 10, "contains_region_keyword": True, ...} created_at: datetime # 프롬프트 히스토리 벡터 스토어 prompt_store = Chroma( collection_name="prompt_history", embedding_function=embeddings, persist_directory="./data/chroma_prompts" ) async def log_prompt_result(result: PromptResult): """프롬프트 결과 기록""" document = Document( page_content=f""" 프롬프트: {result.prompt_text} 결과: {result.result_text[:500]}... 성공 여부: {'성공' if result.success else '실패'} 실패 사유: {result.failure_reason or 'N/A'} """, metadata={ "prompt_id": result.prompt_id, "success": result.success, "category": result.category, "created_at": result.created_at.isoformat(), **result.metrics } ) prompt_store.add_documents([document]) async def get_improved_prompt( base_prompt: str, category: str ) -> str: """과거 결과를 기반으로 프롬프트 개선""" # 유사한 성공 프롬프트 검색 successful_prompts = prompt_store.similarity_search( base_prompt, k=3, filter={"success": True, "category": category} ) # 유사한 실패 프롬프트 검색 (피해야 할 패턴) failed_prompts = prompt_store.similarity_search( base_prompt, k=2, filter={"success": False, "category": category} ) # 프롬프트 개선 요청 improvement_prompt = ChatPromptTemplate.from_messages([ ("system", """당신은 프롬프트 엔지니어링 전문가입니다. 다음 성공/실패 사례를 참고하여 주어진 프롬프트를 개선하세요. ### 성공 사례 (참고): {successful_examples} ### 실패 사례 (피할 것): {failed_examples} ### 개선 원칙: 1. 성공 사례의 패턴을 따르세요 2. 실패 사례의 패턴을 피하세요 3. 명확하고 구체적인 지시를 포함하세요 4. 출력 형식을 명시하세요"""), ("human", """개선할 프롬프트: {base_prompt} 위 프롬프트를 개선하세요. 개선된 프롬프트만 출력하세요.""") ]) chain = improvement_prompt | ChatOpenAI() | StrOutputParser() improved = await chain.ainvoke({ "successful_examples": "\n---\n".join([doc.page_content for doc in successful_prompts]), "failed_examples": "\n---\n".join([doc.page_content for doc in failed_prompts]), "base_prompt": base_prompt }) return improved ``` --- ## 5. 통합 아키텍처 ### 5.1 전체 시스템 아키텍처 ``` ┌──────────────────────────────────────────────────────────────────────────┐ │ FastAPI 라우터 │ │ /lyric/generate, /song/generate, /video/generate, /video/generate-full │ └───────────────────────────────────┬──────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────────────┐ │ LangGraph 파이프라인 │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ StateGraph (상태 기계) │ │ │ │ │ │ │ │ [generate_lyric] → [request_song] → [poll_song] │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ [request_video] → [poll_video] → END │ │ │ │ │ │ │ │ │ │ └─────────────────────┴──────────→ [handle_error] │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────┼───────────────┐ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ LangChain │ │ RAG │ │ External │ │ │ │ Components │ │ Vector DBs │ │ APIs │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └──────────────────────────────────────────────────────────────────────────┘ │ │ │ ┌───────────┘ ┌──────────┘ ┌──────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Prompt │ │ Chroma │ │ OpenAI │ │ Templates │ │ Vector │ │ Suno │ │ Chains │ │ Store │ │ Creatomate │ │ Parsers │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └────────────────┴───────────────┘ │ ▼ ┌─────────────────────────┐ │ MySQL + Azure Blob │ │ (영구 저장소) │ └─────────────────────────┘ ``` ### 5.2 데이터 흐름 ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ 데이터 흐름 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. 사용자 요청 │ │ ├── 고객 정보 (이름, 지역, 상세정보) │ │ └── 이미지 URL 리스트 │ │ │ │ │ ▼ │ │ 2. RAG 검색 (병렬) │ │ ├── 마케팅 지식베이스 → 유사 성공 사례 │ │ ├── 지역 정보베이스 → 지역 특성 │ │ └── 이미지 메타데이터 → 이미지 분석 │ │ │ │ │ ▼ │ │ 3. LangChain 프롬프트 구성 │ │ ├── 기본 템플릿 로드 │ │ ├── RAG 결과 주입 │ │ ├── Few-shot 예시 추가 │ │ └── 출력 형식 지정 │ │ │ │ │ ▼ │ │ 4. LangGraph 파이프라인 실행 │ │ ├── 가사 생성 (ChatGPT) │ │ ├── 음악 생성 (Suno, 폴링 자동화) │ │ └── 영상 생성 (Creatomate, 폴링 자동화) │ │ │ │ │ ▼ │ │ 5. 결과 저장 │ │ ├── MySQL: 메타데이터, 상태 │ │ ├── Azure Blob: 영상 파일 │ │ └── Chroma: 성공 사례 피드백 │ │ │ │ │ ▼ │ │ 6. 사용자 응답 │ │ └── 영상 URL, 상태, 메타데이터 │ │ │ └─────────────────────────────────────────────────────────────────────────┘ ``` ### 5.3 디렉토리 구조 (신규) ``` app/ ├── langchain/ # LangChain 관련 │ ├── __init__.py │ ├── prompts/ │ │ ├── __init__.py │ │ ├── lyric_prompts.py # 가사 생성 프롬프트 │ │ ├── marketing_prompts.py # 마케팅 분석 프롬프트 │ │ └── examples/ # Few-shot 예시 │ │ ├── korean.json │ │ ├── english.json │ │ └── ... │ ├── chains/ │ │ ├── __init__.py │ │ ├── lyric_chain.py # 가사 생성 체인 │ │ └── marketing_chain.py # 마케팅 분석 체인 │ └── parsers/ │ ├── __init__.py │ └── lyric_parser.py # 가사 출력 파서 │ ├── langgraph/ # LangGraph 관련 │ ├── __init__.py │ ├── states/ │ │ ├── __init__.py │ │ └── pipeline_state.py # 파이프라인 상태 정의 │ ├── nodes/ │ │ ├── __init__.py │ │ ├── lyric_node.py # 가사 생성 노드 │ │ ├── song_node.py # 음악 생성 노드 │ │ ├── video_node.py # 영상 생성 노드 │ │ └── error_node.py # 에러 처리 노드 │ └── graphs/ │ ├── __init__.py │ └── video_pipeline.py # 메인 파이프라인 그래프 │ ├── rag/ # RAG 관련 │ ├── __init__.py │ ├── stores/ │ │ ├── __init__.py │ │ ├── marketing_store.py # 마케팅 지식베이스 │ │ ├── region_store.py # 지역 정보베이스 │ │ ├── image_store.py # 이미지 메타데이터 │ │ └── prompt_store.py # 프롬프트 히스토리 │ ├── loaders/ │ │ ├── __init__.py │ │ └── case_loader.py # 사례 데이터 로더 │ └── retrievers/ │ ├── __init__.py │ └── hybrid_retriever.py # 하이브리드 검색 │ └── data/ # 데이터 저장소 └── chroma_db/ # Chroma 벡터 DB ├── marketing_knowledge/ ├── region_knowledge/ ├── image_metadata/ └── prompt_history/ ``` --- ## 6. 기대 효과 ### 6.1 정량적 기대 효과 | 지표 | 현재 | 목표 | 개선율 | |------|------|------|--------| | **가사 생성 품질** | 70% 만족도 | 90% 만족도 | +29% | | **재작업률** | 30% | 10% | -67% | | **파이프라인 실패율** | 15% | 5% | -67% | | **평균 처리 시간** | 10분 (수동 개입 필요) | 8분 (완전 자동) | -20% | | **다국어 품질** | 60% | 85% | +42% | | **프롬프트 튜닝 시간** | 2시간/버전 | 30분/버전 | -75% | ### 6.2 정성적 기대 효과 #### 6.2.1 개발 생산성 향상 | 영역 | 효과 | |------|------| | **코드 유지보수** | 프롬프트와 비즈니스 로직 분리로 수정 용이 | | **테스트 용이성** | 각 체인/노드 단위 테스트 가능 | | **디버깅** | 상태 기계 기반으로 문제 지점 명확히 파악 | | **확장성** | 새로운 AI 서비스 추가 시 노드만 추가하면 됨 | #### 6.2.2 품질 향상 | 영역 | 효과 | |------|------| | **일관성** | 동일 조건에서 일관된 품질의 결과물 생성 | | **지역 맞춤화** | RAG로 지역별 특성 자동 반영 | | **학습 효과** | 성공 사례 축적으로 시간이 지날수록 품질 향상 | | **에러 복구** | 자동 재시도 및 폴백으로 안정성 강화 | #### 6.2.3 운영 효율성 | 영역 | 효과 | |------|------| | **모니터링** | 파이프라인 상태 추적으로 병목 지점 파악 | | **비용 최적화** | 불필요한 API 호출 감소, 캐싱 활용 | | **확장 대응** | 부하 증가 시 노드별 스케일링 가능 | ### 6.3 비즈니스 가치 ``` ┌────────────────────────────────────────────────────────────────┐ │ 비즈니스 가치 │ ├────────────────────────────────────────────────────────────────┤ │ │ │ 1. 고객 만족도 향상 │ │ └── 고품질 가사/영상으로 마케팅 효과 증대 │ │ │ │ 2. 서비스 차별화 │ │ └── 지역 맞춤 콘텐츠로 경쟁사 대비 우위 │ │ │ │ 3. 운영 비용 절감 │ │ └── 자동화로 수동 개입 최소화 │ │ │ │ 4. 확장 가능성 │ │ └── 새로운 지역/업종 추가 시 RAG 학습만으로 대응 │ │ │ │ 5. 데이터 자산화 │ │ └── 축적된 성공 사례가 진입 장벽 역할 │ │ │ └────────────────────────────────────────────────────────────────┘ ``` --- ## 7. 구현 로드맵 ### 7.1 Phase 1: 기초 (1-2주) **목표:** LangChain 기본 구조 구축 | 작업 | 설명 | 우선순위 | |------|------|----------| | 의존성 설치 | langchain, langchain-openai, chromadb | P0 | | 프롬프트 템플릿 작성 | 가사 생성 프롬프트 이관 | P0 | | 기본 체인 구현 | ChatGPT 서비스 LangChain으로 래핑 | P0 | | 출력 파서 구현 | 가사 응답 검증 및 파싱 | P1 | | 테스트 작성 | 체인 단위 테스트 | P1 | **산출물:** - `app/langchain/` 디렉토리 구조 - 가사 생성 LangChain 체인 - 기본 테스트 코드 ### 7.2 Phase 2: 파이프라인 (2-3주) **목표:** LangGraph 파이프라인 구축 | 작업 | 설명 | 우선순위 | |------|------|----------| | 상태 정의 | PipelineState TypedDict 작성 | P0 | | 노드 구현 | 가사, 음악, 영상 생성 노드 | P0 | | 그래프 구성 | 엣지 및 조건부 분기 정의 | P0 | | 폴링 통합 | Suno, Creatomate 폴링 자동화 | P0 | | 에러 처리 | 에러 노드 및 재시도 로직 | P1 | | FastAPI 통합 | 새 엔드포인트 추가 | P1 | **산출물:** - `app/langgraph/` 디렉토리 구조 - 통합 파이프라인 그래프 - `/video/generate-full` 엔드포인트 ### 7.3 Phase 3: RAG (2-3주) **목표:** 지식베이스 구축 및 RAG 통합 | 작업 | 설명 | 우선순위 | |------|------|----------| | Chroma 설정 | 벡터 스토어 초기화 | P0 | | 마케팅 사례 수집 | 기존 성공 사례 데이터화 | P0 | | 지역 정보 구축 | 주요 지역 정보 입력 | P1 | | 검색 통합 | 가사 생성 시 RAG 적용 | P0 | | 이미지 분석 | Vision API 연동 | P2 | | 프롬프트 히스토리 | 자동 학습 파이프라인 | P2 | **산출물:** - `app/rag/` 디렉토리 구조 - 마케팅/지역 지식베이스 - RAG 통합 가사 생성 ### 7.4 Phase 4: 고도화 (2-3주) **목표:** 최적화 및 모니터링 | 작업 | 설명 | 우선순위 | |------|------|----------| | 성능 최적화 | 캐싱, 병렬 처리 개선 | P1 | | 모니터링 | 파이프라인 상태 대시보드 | P1 | | A/B 테스팅 | 프롬프트 버전 비교 | P2 | | 문서화 | API 문서, 운영 가이드 | P1 | | 부하 테스트 | 동시 요청 처리 검증 | P2 | **산출물:** - 최적화된 파이프라인 - 모니터링 대시보드 - 완성된 문서 --- ## 8. 결론 ### 8.1 요약 CastAD 백엔드에 LangChain, LangGraph, RAG를 적용하면: 1. **LangChain**: 프롬프트 관리 체계화, 다단계 체인 구성, 출력 검증 자동화 2. **LangGraph**: 복잡한 파이프라인 상태 관리, 폴링 자동화, 에러 처리 강화 3. **RAG**: 과거 성공 사례 활용, 지역별 맞춤화, 지속적 품질 개선 ### 8.2 핵심 가치 ``` ┌───────────────────────────────────────────────────────────┐ │ │ │ 현재: 각 단계가 독립적 → 상태 관리 어려움 │ │ 개선: 통합 파이프라인 → 자동화된 상태 추적 │ │ │ │ 현재: 하드코딩 프롬프트 → 수정 어려움 │ │ 개선: 템플릿 기반 → 유연한 프롬프트 관리 │ │ │ │ 현재: 과거 데이터 미활용 → 일관성 없는 품질 │ │ 개선: RAG 지식베이스 → 축적된 노하우 활용 │ │ │ └───────────────────────────────────────────────────────────┘ ``` ### 8.3 권장 사항 1. **단계적 도입**: Phase 1(LangChain)부터 시작하여 검증 후 확장 2. **기존 API 유지**: 새 엔드포인트 추가 방식으로 호환성 보장 3. **데이터 축적 우선**: RAG 효과를 위해 초기 사례 데이터 확보 중요 4. **모니터링 병행**: 각 단계별 성과 측정으로 ROI 검증 --- ## 부록 ### A. 필요 의존성 ```toml # pyproject.toml 추가 의존성 [project.dependencies] langchain = ">=0.1.0" langchain-openai = ">=0.0.5" langchain-community = ">=0.0.20" langgraph = ">=0.0.30" chromadb = ">=0.4.22" tiktoken = ">=0.5.2" ``` ### B. 환경 변수 추가 ```env # .env 추가 CHROMA_PERSIST_DIR=./data/chroma_db LANGCHAIN_TRACING_V2=true # 선택: LangSmith 모니터링 LANGCHAIN_API_KEY=xxx # 선택: LangSmith 모니터링 ``` ### C. 참고 자료 - [LangChain Documentation](https://python.langchain.com/) - [LangGraph Documentation](https://langchain-ai.github.io/langgraph/) - [Chroma Documentation](https://docs.trychroma.com/) - [OpenAI Cookbook](https://cookbook.openai.com/) --- *이 보고서는 CastAD 백엔드 프로젝트 분석을 기반으로 작성되었습니다.* *작성일: 2025-12-28*