1706 lines
67 KiB
Markdown
1706 lines
67 KiB
Markdown
# CastAD 백엔드 - LangChain, LangGraph, RAG 적용 설계 보고서
|
||
|
||
## 목차
|
||
1. [현재 시스템 분석](#1-현재-시스템-분석)
|
||
2. [LangChain 적용 설계](#2-langchain-적용-설계)
|
||
3. [LangGraph 적용 설계](#3-langgraph-적용-설계)
|
||
4. [RAG 적용 설계](#4-rag-적용-설계)
|
||
5. [통합 아키텍처](#5-통합-아키텍처)
|
||
6. [기대 효과](#6-기대-효과)
|
||
7. [구현 로드맵](#7-구현-로드맵)
|
||
8. [결론](#8-결론)
|
||
|
||
---
|
||
|
||
## 1. 현재 시스템 분석
|
||
|
||
### 1.1 프로젝트 개요
|
||
|
||
CastAD는 **AI 기반 광고 음악 및 영상 자동 생성 서비스**입니다. 네이버 지도에서 수집한 숙박시설 정보를 기반으로 마케팅용 자동 영상을 생성하는 통합 플랫폼입니다.
|
||
|
||
**핵심 파이프라인:**
|
||
```
|
||
사용자 입력 → 가사 자동 생성 → 음악 자동 생성 → 영상 자동 생성
|
||
```
|
||
|
||
### 1.2 현재 기술 스택
|
||
|
||
| 구분 | 기술 |
|
||
|------|------|
|
||
| Backend Framework | FastAPI (async/await 기반) |
|
||
| ORM | SQLAlchemy 2.0 (비동기) |
|
||
| Database | MySQL (asyncmy 드라이버) |
|
||
| Cache | Redis |
|
||
| AI/API | OpenAI ChatGPT, Suno AI, Creatomate |
|
||
| Storage | Azure Blob Storage |
|
||
|
||
### 1.3 현재 핵심 흐름
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 현재 파이프라인 구조 │
|
||
├─────────────────────────────────────────────────────────────────┤
|
||
│ │
|
||
│ POST /crawling (선택) │
|
||
│ │ │
|
||
│ ▼ │
|
||
│ POST /lyric/generate ──────► ChatGPT API ──────► 가사 저장 │
|
||
│ │ │
|
||
│ ▼ │
|
||
│ POST /song/generate ───────► Suno API ─────────► 음악 저장 │
|
||
│ │ │ │
|
||
│ │ 클라이언트 폴링 │
|
||
│ │ │ │
|
||
│ ▼ ▼ │
|
||
│ POST /video/generate ──────► Creatomate API ───► 영상 저장 │
|
||
│ │ │ │
|
||
│ │ 클라이언트 폴링 │
|
||
│ ▼ ▼ │
|
||
│ GET /video/download ◄──────── 완료 ──────────► Azure Blob │
|
||
│ │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
### 1.4 현재 시스템의 한계점
|
||
|
||
| 문제점 | 설명 |
|
||
|--------|------|
|
||
| **분산된 상태 관리** | 각 API 호출마다 독립적인 상태 관리, 전체 파이프라인 추적 어려움 |
|
||
| **클라이언트 의존적 폴링** | 음악/영상 생성 완료 여부를 클라이언트가 반복 확인해야 함 |
|
||
| **하드코딩된 프롬프트** | ChatGPT 프롬프트가 코드에 직접 작성, 유연성 부족 |
|
||
| **에러 복구 제한적** | 단순 실패 패턴 검사만 수행, 자동 복구 메커니즘 없음 |
|
||
| **과거 데이터 미활용** | 성공한 가사/마케팅 사례 재활용 불가 |
|
||
| **일관성 없는 품질** | 동일 조건에서도 결과물 품질 편차 존재 |
|
||
|
||
---
|
||
|
||
## 2. LangChain 적용 설계
|
||
|
||
### 2.1 적용 대상 및 목적
|
||
|
||
LangChain은 **LLM 애플리케이션 개발을 위한 프레임워크**로, 프롬프트 관리, 체인 구성, 출력 파싱 등을 체계화합니다.
|
||
|
||
**적용 대상:**
|
||
1. 가사 생성 서비스 (`ChatgptService`)
|
||
2. 마케팅 분석 서비스
|
||
3. 다국어 처리 로직
|
||
|
||
### 2.2 설계 1: 프롬프트 템플릿 시스템
|
||
|
||
**현재 문제:**
|
||
```python
|
||
# 현재: chatgpt_prompt.py
|
||
prompt = f"""
|
||
[ROLE] You are a marketing expert...
|
||
[INPUT] Customer: {customer_name}, Region: {region}...
|
||
"""
|
||
```
|
||
|
||
**개선 설계:**
|
||
```python
|
||
# 개선: langchain 적용
|
||
from langchain.prompts import PromptTemplate, ChatPromptTemplate
|
||
from langchain_openai import ChatOpenAI
|
||
|
||
# 가사 생성 프롬프트 템플릿
|
||
LYRIC_PROMPT = ChatPromptTemplate.from_messages([
|
||
("system", """[ROLE] You are a marketing expert and professional lyricist.
|
||
You specialize in creating catchy, emotional lyrics for travel and accommodation marketing.
|
||
|
||
[LANGUAGE REQUIREMENT]
|
||
Output MUST be 100% in {language}. No other languages allowed."""),
|
||
|
||
("human", """[INPUT]
|
||
Customer Name: {customer_name}
|
||
Region: {region}
|
||
Detailed Information: {detail_info}
|
||
|
||
[OUTPUT REQUIREMENTS]
|
||
- 8-12 lines of lyrics
|
||
- Focus on: relaxation, healing, beautiful scenery, memorable experiences
|
||
- Style: warm, inviting, poetic
|
||
- Include location-specific imagery
|
||
|
||
Generate lyrics now:""")
|
||
])
|
||
|
||
# 체인 구성
|
||
lyric_chain = LYRIC_PROMPT | ChatOpenAI(model="gpt-5-mini-mini") | StrOutputParser()
|
||
|
||
# 사용
|
||
result = await lyric_chain.ainvoke({
|
||
"customer_name": "스테이뫰",
|
||
"region": "강원도 속초",
|
||
"detail_info": "해변 근처 펜션",
|
||
"language": "Korean"
|
||
})
|
||
```
|
||
|
||
**이점:**
|
||
- 프롬프트 버전 관리 용이
|
||
- A/B 테스팅 지원
|
||
- 입력 변수 명확한 정의
|
||
|
||
### 2.3 설계 2: 다단계 마케팅 분석 체인
|
||
|
||
**목적:** 복잡한 마케팅 분석을 단계별로 수행하여 품질 향상
|
||
|
||
```python
|
||
from langchain.chains import SequentialChain
|
||
from langchain.prompts import PromptTemplate
|
||
from langchain_openai import ChatOpenAI
|
||
|
||
# Step 1: 경쟁사 분석 체인
|
||
competitor_prompt = PromptTemplate(
|
||
input_variables=["region", "business_type"],
|
||
template="""
|
||
{region} 지역의 {business_type} 업종에 대해 분석하세요:
|
||
- 주요 경쟁사 특성
|
||
- 차별화 포인트
|
||
- 시장 포지셔닝
|
||
"""
|
||
)
|
||
competitor_chain = competitor_prompt | ChatOpenAI() | StrOutputParser()
|
||
|
||
# Step 2: 타겟 고객 분석 체인
|
||
audience_prompt = PromptTemplate(
|
||
input_variables=["region", "competitor_analysis"],
|
||
template="""
|
||
경쟁사 분석 결과: {competitor_analysis}
|
||
|
||
{region} 지역의 주요 타겟 고객층을 분석하세요:
|
||
- 연령대 및 특성
|
||
- 주요 니즈
|
||
- 결정 요인
|
||
"""
|
||
)
|
||
audience_chain = audience_prompt | ChatOpenAI() | StrOutputParser()
|
||
|
||
# Step 3: 마케팅 전략 종합 체인
|
||
strategy_prompt = PromptTemplate(
|
||
input_variables=["customer_name", "competitor_analysis", "audience_analysis"],
|
||
template="""
|
||
경쟁사 분석: {competitor_analysis}
|
||
타겟 고객: {audience_analysis}
|
||
|
||
{customer_name}을 위한 마케팅 전략을 제안하세요:
|
||
- 핵심 메시지
|
||
- 차별화 전략
|
||
- 추천 가사 방향
|
||
"""
|
||
)
|
||
strategy_chain = strategy_prompt | ChatOpenAI() | StrOutputParser()
|
||
|
||
# 통합 순차 체인
|
||
marketing_analysis_chain = (
|
||
{"region": RunnablePassthrough(), "business_type": RunnablePassthrough()}
|
||
| competitor_chain
|
||
| {"competitor_analysis": RunnablePassthrough(), "region": RunnablePassthrough()}
|
||
| audience_chain
|
||
| {"competitor_analysis": ..., "audience_analysis": RunnablePassthrough(), "customer_name": ...}
|
||
| strategy_chain
|
||
)
|
||
```
|
||
|
||
**이점:**
|
||
- 분석의 깊이와 체계성 향상
|
||
- 각 단계별 결과 추적 가능
|
||
- 중간 결과 캐싱 가능
|
||
|
||
### 2.4 설계 3: 출력 파싱 및 검증
|
||
|
||
**목적:** ChatGPT 응답의 구조화 및 자동 검증
|
||
|
||
```python
|
||
from langchain.output_parsers import PydanticOutputParser
|
||
from langchain_core.output_parsers import OutputFixingParser
|
||
from pydantic import BaseModel, Field, validator
|
||
|
||
# 가사 출력 스키마
|
||
class LyricOutput(BaseModel):
|
||
title: str = Field(description="가사의 제목 (선택)")
|
||
lyrics: list[str] = Field(description="가사 각 줄", min_items=8, max_items=12)
|
||
mood: str = Field(description="가사의 분위기: warm, energetic, romantic 등")
|
||
|
||
@validator('lyrics')
|
||
def validate_line_count(cls, v):
|
||
if len(v) < 8:
|
||
raise ValueError("가사는 최소 8줄 이상이어야 합니다")
|
||
return v
|
||
|
||
# 파서 생성
|
||
parser = PydanticOutputParser(pydantic_object=LyricOutput)
|
||
|
||
# 자동 수정 파서 (파싱 실패 시 LLM으로 재시도)
|
||
fixing_parser = OutputFixingParser.from_llm(
|
||
parser=parser,
|
||
llm=ChatOpenAI(model="gpt-5-mini-mini")
|
||
)
|
||
|
||
# 프롬프트에 포맷 지시 추가
|
||
prompt_with_format = LYRIC_PROMPT.partial(
|
||
format_instructions=parser.get_format_instructions()
|
||
)
|
||
```
|
||
|
||
**이점:**
|
||
- 응답 형식 일관성 보장
|
||
- 자동 오류 복구
|
||
- 타입 안전성 확보
|
||
|
||
### 2.5 설계 4: Few-Shot 다국어 프롬프트
|
||
|
||
**목적:** 각 언어별 고품질 예시 제공으로 번역/생성 품질 향상
|
||
|
||
```python
|
||
from langchain.prompts import FewShotPromptTemplate, PromptTemplate
|
||
|
||
# 언어별 예시
|
||
LANGUAGE_EXAMPLES = {
|
||
"Korean": [
|
||
{
|
||
"input": "강원도 속초 해변 펜션",
|
||
"output": """푸른 바다 물결 위에
|
||
새벽빛이 춤을 추고
|
||
당신의 하루를 담아
|
||
스테이뫰에서 쉬어가요"""
|
||
}
|
||
],
|
||
"English": [
|
||
{
|
||
"input": "Sokcho beach pension, Gangwon-do",
|
||
"output": """Where ocean waves meet morning light
|
||
A peaceful haven comes in sight
|
||
Let your worries drift away
|
||
At Stay Meoum, find your stay"""
|
||
}
|
||
],
|
||
"Japanese": [
|
||
{
|
||
"input": "江原道束草ビーチペンション",
|
||
"output": """青い海の波の上に
|
||
朝の光が踊る時
|
||
あなたの一日を包み込む
|
||
ステイメウムで休んでいこう"""
|
||
}
|
||
],
|
||
"Chinese": [...],
|
||
"Thai": [...],
|
||
"Vietnamese": [...]
|
||
}
|
||
|
||
# Few-Shot 프롬프트 생성
|
||
def create_multilingual_prompt(language: str):
|
||
example_prompt = PromptTemplate(
|
||
input_variables=["input", "output"],
|
||
template="입력: {input}\n가사:\n{output}"
|
||
)
|
||
|
||
return FewShotPromptTemplate(
|
||
examples=LANGUAGE_EXAMPLES.get(language, LANGUAGE_EXAMPLES["Korean"]),
|
||
example_prompt=example_prompt,
|
||
prefix="다음 예시를 참고하여 고품질 가사를 생성하세요:",
|
||
suffix="입력: {customer_info}\n가사:",
|
||
input_variables=["customer_info"]
|
||
)
|
||
```
|
||
|
||
**이점:**
|
||
- 언어별 문화적 뉘앙스 반영
|
||
- 일관된 스타일 유지
|
||
- 번역 품질 대폭 향상
|
||
|
||
---
|
||
|
||
## 3. LangGraph 적용 설계
|
||
|
||
### 3.1 적용 대상 및 목적
|
||
|
||
LangGraph는 **복잡한 다단계 워크플로우를 상태 기계(State Machine)로 관리**하는 프레임워크입니다.
|
||
|
||
**적용 대상:**
|
||
1. 전체 영상 생성 파이프라인 (가사 → 음악 → 영상)
|
||
2. 비동기 폴링 자동화
|
||
3. 에러 처리 및 재시도 로직
|
||
|
||
### 3.2 설계 1: 통합 파이프라인 그래프
|
||
|
||
**핵심 설계:**
|
||
|
||
```python
|
||
from langgraph.graph import StateGraph, END
|
||
from typing import TypedDict, Optional, Literal
|
||
from datetime import datetime
|
||
|
||
# 파이프라인 상태 정의
|
||
class PipelineState(TypedDict):
|
||
# 입력
|
||
task_id: str
|
||
customer_name: str
|
||
region: str
|
||
detail_info: str
|
||
language: str
|
||
images: list[str]
|
||
orientation: Literal["vertical", "horizontal"]
|
||
|
||
# 중간 결과
|
||
lyric: Optional[str]
|
||
lyric_status: Optional[str]
|
||
|
||
song_url: Optional[str]
|
||
song_task_id: Optional[str]
|
||
song_status: Optional[str]
|
||
song_duration: Optional[float]
|
||
|
||
video_url: Optional[str]
|
||
video_render_id: Optional[str]
|
||
video_status: Optional[str]
|
||
|
||
# 메타데이터
|
||
error: Optional[str]
|
||
error_step: Optional[str]
|
||
started_at: datetime
|
||
completed_at: Optional[datetime]
|
||
retry_count: int
|
||
|
||
# 그래프 빌더
|
||
def build_video_pipeline() -> StateGraph:
|
||
graph = StateGraph(PipelineState)
|
||
|
||
# ===== 노드 정의 =====
|
||
|
||
# 1. 가사 생성 노드
|
||
async def generate_lyric(state: PipelineState) -> PipelineState:
|
||
"""ChatGPT로 가사 생성 (동기)"""
|
||
try:
|
||
lyric_chain = create_lyric_chain() # LangChain 체인
|
||
lyric = await lyric_chain.ainvoke({
|
||
"customer_name": state["customer_name"],
|
||
"region": state["region"],
|
||
"detail_info": state["detail_info"],
|
||
"language": state["language"]
|
||
})
|
||
|
||
# DB 저장
|
||
await save_lyric_to_db(state["task_id"], lyric)
|
||
|
||
return {
|
||
**state,
|
||
"lyric": lyric,
|
||
"lyric_status": "completed"
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
**state,
|
||
"error": str(e),
|
||
"error_step": "lyric_generation",
|
||
"lyric_status": "failed"
|
||
}
|
||
|
||
# 2. 음악 생성 요청 노드
|
||
async def request_song(state: PipelineState) -> PipelineState:
|
||
"""Suno API에 음악 생성 요청"""
|
||
try:
|
||
suno = SunoAPIClient()
|
||
task_id = await suno.generate(
|
||
prompt=state["lyric"],
|
||
genre="K-Pop, Emotional"
|
||
)
|
||
|
||
# DB 저장
|
||
await save_song_request_to_db(state["task_id"], task_id)
|
||
|
||
return {
|
||
**state,
|
||
"song_task_id": task_id,
|
||
"song_status": "processing"
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
**state,
|
||
"error": str(e),
|
||
"error_step": "song_request",
|
||
"song_status": "failed"
|
||
}
|
||
|
||
# 3. 음악 폴링 노드
|
||
async def poll_song_status(state: PipelineState) -> PipelineState:
|
||
"""Suno 상태 폴링 (최대 5분)"""
|
||
suno = SunoAPIClient()
|
||
max_attempts = 60 # 5초 간격 × 60 = 5분
|
||
|
||
for attempt in range(max_attempts):
|
||
result = await suno.get_task_status(state["song_task_id"])
|
||
|
||
if result["status"] == "SUCCESS":
|
||
audio_url = result["clips"][0]["audio_url"]
|
||
duration = result["clips"][0]["duration"]
|
||
|
||
# DB 업데이트
|
||
await update_song_status(
|
||
state["task_id"],
|
||
"completed",
|
||
audio_url,
|
||
duration
|
||
)
|
||
|
||
return {
|
||
**state,
|
||
"song_url": audio_url,
|
||
"song_duration": duration,
|
||
"song_status": "completed"
|
||
}
|
||
elif result["status"] == "FAILED":
|
||
return {
|
||
**state,
|
||
"error": "Suno generation failed",
|
||
"error_step": "song_polling",
|
||
"song_status": "failed"
|
||
}
|
||
|
||
await asyncio.sleep(5) # 5초 대기
|
||
|
||
return {
|
||
**state,
|
||
"error": "Song generation timeout",
|
||
"error_step": "song_polling",
|
||
"song_status": "timeout"
|
||
}
|
||
|
||
# 4. 영상 생성 요청 노드
|
||
async def request_video(state: PipelineState) -> PipelineState:
|
||
"""Creatomate API에 영상 렌더링 요청"""
|
||
try:
|
||
creatomate = CreatomateClient()
|
||
render_id = await creatomate.render(
|
||
images=state["images"],
|
||
music_url=state["song_url"],
|
||
lyrics=state["lyric"],
|
||
duration=state["song_duration"],
|
||
orientation=state["orientation"]
|
||
)
|
||
|
||
# DB 저장
|
||
await save_video_request_to_db(state["task_id"], render_id)
|
||
|
||
return {
|
||
**state,
|
||
"video_render_id": render_id,
|
||
"video_status": "processing"
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
**state,
|
||
"error": str(e),
|
||
"error_step": "video_request",
|
||
"video_status": "failed"
|
||
}
|
||
|
||
# 5. 영상 폴링 노드
|
||
async def poll_video_status(state: PipelineState) -> PipelineState:
|
||
"""Creatomate 상태 폴링 (최대 10분)"""
|
||
creatomate = CreatomateClient()
|
||
max_attempts = 120 # 5초 간격 × 120 = 10분
|
||
|
||
for attempt in range(max_attempts):
|
||
result = await creatomate.get_render_status(state["video_render_id"])
|
||
|
||
if result["status"] == "succeeded":
|
||
video_url = result["url"]
|
||
|
||
# Azure Blob 업로드
|
||
blob_url = await upload_to_azure(video_url, state["task_id"])
|
||
|
||
# DB 업데이트
|
||
await update_video_status(state["task_id"], "completed", blob_url)
|
||
|
||
return {
|
||
**state,
|
||
"video_url": blob_url,
|
||
"video_status": "completed",
|
||
"completed_at": datetime.now()
|
||
}
|
||
elif result["status"] == "failed":
|
||
return {
|
||
**state,
|
||
"error": "Creatomate rendering failed",
|
||
"error_step": "video_polling",
|
||
"video_status": "failed"
|
||
}
|
||
|
||
await asyncio.sleep(5)
|
||
|
||
return {
|
||
**state,
|
||
"error": "Video generation timeout",
|
||
"error_step": "video_polling",
|
||
"video_status": "timeout"
|
||
}
|
||
|
||
# 6. 에러 처리 노드
|
||
async def handle_error(state: PipelineState) -> PipelineState:
|
||
"""에러 로깅 및 알림"""
|
||
await log_pipeline_error(
|
||
task_id=state["task_id"],
|
||
error=state["error"],
|
||
step=state["error_step"]
|
||
)
|
||
|
||
# 선택: 슬랙/이메일 알림
|
||
await send_error_notification(state)
|
||
|
||
return state
|
||
|
||
# ===== 노드 추가 =====
|
||
graph.add_node("generate_lyric", generate_lyric)
|
||
graph.add_node("request_song", request_song)
|
||
graph.add_node("poll_song", poll_song_status)
|
||
graph.add_node("request_video", request_video)
|
||
graph.add_node("poll_video", poll_video_status)
|
||
graph.add_node("handle_error", handle_error)
|
||
|
||
# ===== 엣지 정의 =====
|
||
|
||
# 시작점
|
||
graph.set_entry_point("generate_lyric")
|
||
|
||
# 조건부 분기: 가사 생성 후
|
||
def route_after_lyric(state: PipelineState):
|
||
if state.get("error"):
|
||
return "handle_error"
|
||
return "request_song"
|
||
|
||
graph.add_conditional_edges(
|
||
"generate_lyric",
|
||
route_after_lyric,
|
||
{
|
||
"request_song": "request_song",
|
||
"handle_error": "handle_error"
|
||
}
|
||
)
|
||
|
||
# 조건부 분기: 음악 요청 후
|
||
def route_after_song_request(state: PipelineState):
|
||
if state.get("error"):
|
||
return "handle_error"
|
||
return "poll_song"
|
||
|
||
graph.add_conditional_edges(
|
||
"request_song",
|
||
route_after_song_request
|
||
)
|
||
|
||
# 조건부 분기: 음악 폴링 후
|
||
def route_after_song_poll(state: PipelineState):
|
||
if state.get("error") or state["song_status"] in ["failed", "timeout"]:
|
||
return "handle_error"
|
||
return "request_video"
|
||
|
||
graph.add_conditional_edges(
|
||
"poll_song",
|
||
route_after_song_poll
|
||
)
|
||
|
||
# 조건부 분기: 영상 요청 후
|
||
graph.add_conditional_edges(
|
||
"request_video",
|
||
lambda s: "handle_error" if s.get("error") else "poll_video"
|
||
)
|
||
|
||
# 조건부 분기: 영상 폴링 후
|
||
graph.add_conditional_edges(
|
||
"poll_video",
|
||
lambda s: "handle_error" if s.get("error") else END
|
||
)
|
||
|
||
# 에러 핸들러는 항상 종료
|
||
graph.add_edge("handle_error", END)
|
||
|
||
return graph.compile()
|
||
```
|
||
|
||
**그래프 시각화:**
|
||
|
||
```
|
||
┌─────────────────┐
|
||
│ generate_lyric │
|
||
└────────┬────────┘
|
||
│
|
||
┌────────▼────────┐
|
||
┌─────┤ route check ├─────┐
|
||
│ └─────────────────┘ │
|
||
[error] [success]
|
||
│ │
|
||
▼ ▼
|
||
┌─────────────────┐ ┌─────────────────┐
|
||
│ handle_error │ │ request_song │
|
||
└────────┬────────┘ └────────┬────────┘
|
||
│ │
|
||
▼ ▼
|
||
END ┌─────────────────┐
|
||
│ poll_song │
|
||
└────────┬────────┘
|
||
│
|
||
┌────────▼────────┐
|
||
┌─────┤ route check ├─────┐
|
||
│ └─────────────────┘ │
|
||
[error/timeout] [success]
|
||
│ │
|
||
▼ ▼
|
||
┌─────────────────┐ ┌─────────────────┐
|
||
│ handle_error │ │ request_video │
|
||
└────────┬────────┘ └────────┬────────┘
|
||
│ │
|
||
▼ ▼
|
||
END ┌─────────────────┐
|
||
│ poll_video │
|
||
└────────┬────────┘
|
||
│
|
||
┌────────▼────────┐
|
||
┌─────┤ route check ├─────┐
|
||
│ └─────────────────┘ │
|
||
[error] [success]
|
||
│ │
|
||
▼ ▼
|
||
┌─────────────────┐ END
|
||
│ handle_error │ (파이프라인 완료)
|
||
└────────┬────────┘
|
||
│
|
||
▼
|
||
END
|
||
```
|
||
|
||
### 3.3 설계 2: 재시도 및 폴백 메커니즘
|
||
|
||
```python
|
||
from langgraph.graph import StateGraph
|
||
|
||
class RetryState(TypedDict):
|
||
task_id: str
|
||
retry_count: int
|
||
max_retries: int
|
||
last_error: Optional[str]
|
||
# ... 기타 필드
|
||
|
||
def build_retry_aware_pipeline():
|
||
graph = StateGraph(RetryState)
|
||
|
||
async def generate_song_with_retry(state: RetryState) -> RetryState:
|
||
"""재시도 로직이 포함된 음악 생성"""
|
||
try:
|
||
# 1차 시도: Suno API
|
||
result = await suno_generate(state["lyric"])
|
||
return {**state, "song_url": result, "retry_count": 0}
|
||
|
||
except SunoRateLimitError:
|
||
# 재시도 1: 딜레이 후 재시도
|
||
if state["retry_count"] < state["max_retries"]:
|
||
await asyncio.sleep(30) # 30초 대기
|
||
return {
|
||
**state,
|
||
"retry_count": state["retry_count"] + 1,
|
||
"last_error": "rate_limit"
|
||
}
|
||
|
||
except SunoAPIError as e:
|
||
# 재시도 2: 프롬프트 수정 후 재시도
|
||
if "invalid lyrics" in str(e) and state["retry_count"] < 2:
|
||
simplified_lyric = await simplify_lyrics(state["lyric"])
|
||
return {
|
||
**state,
|
||
"lyric": simplified_lyric,
|
||
"retry_count": state["retry_count"] + 1
|
||
}
|
||
|
||
# 폴백: 대체 서비스 사용
|
||
try:
|
||
result = await alternative_music_service(state["lyric"])
|
||
return {**state, "song_url": result, "used_fallback": True}
|
||
except:
|
||
pass
|
||
|
||
return {
|
||
**state,
|
||
"error": "All music generation attempts failed",
|
||
"song_status": "failed"
|
||
}
|
||
|
||
# 조건부 재시도 엣지
|
||
def should_retry_song(state: RetryState):
|
||
if state.get("song_url"):
|
||
return "next_step"
|
||
if state["retry_count"] < state["max_retries"]:
|
||
return "retry_song"
|
||
return "handle_error"
|
||
|
||
graph.add_conditional_edges(
|
||
"generate_song",
|
||
should_retry_song,
|
||
{
|
||
"retry_song": "generate_song", # 자기 자신으로 루프
|
||
"next_step": "request_video",
|
||
"handle_error": "handle_error"
|
||
}
|
||
)
|
||
|
||
return graph.compile()
|
||
```
|
||
|
||
### 3.4 설계 3: 병렬 처리 지원
|
||
|
||
```python
|
||
from langgraph.types import Send
|
||
from langgraph.graph import StateGraph
|
||
|
||
class ParallelState(TypedDict):
|
||
task_id: str
|
||
images: list[str]
|
||
analyzed_images: list[dict] # 병렬 분석 결과
|
||
# ...
|
||
|
||
def build_parallel_pipeline():
|
||
graph = StateGraph(ParallelState)
|
||
|
||
# 이미지 분석을 병렬로 수행
|
||
async def analyze_single_image(state: dict) -> dict:
|
||
"""단일 이미지 분석"""
|
||
image_url = state["image_url"]
|
||
analysis = await vision_model.analyze(image_url)
|
||
return {
|
||
"image_url": image_url,
|
||
"analysis": analysis,
|
||
"mood": analysis.get("mood"),
|
||
"colors": analysis.get("dominant_colors")
|
||
}
|
||
|
||
# 팬아웃: 여러 이미지를 병렬로 분석
|
||
def fanout_images(state: ParallelState):
|
||
return [
|
||
Send("analyze_image", {"image_url": img, "task_id": state["task_id"]})
|
||
for img in state["images"]
|
||
]
|
||
|
||
# 팬인: 분석 결과 수집
|
||
async def collect_analyses(state: ParallelState) -> ParallelState:
|
||
# LangGraph가 자동으로 병렬 결과를 수집
|
||
return state
|
||
|
||
graph.add_node("analyze_image", analyze_single_image)
|
||
graph.add_node("collect", collect_analyses)
|
||
|
||
graph.add_conditional_edges(
|
||
"start",
|
||
fanout_images # 여러 Send 반환 → 병렬 실행
|
||
)
|
||
|
||
return graph.compile()
|
||
```
|
||
|
||
### 3.5 FastAPI 통합
|
||
|
||
```python
|
||
# main.py 또는 video/api/routers/v1/video.py
|
||
|
||
from fastapi import APIRouter, BackgroundTasks
|
||
from langgraph.graph import StateGraph
|
||
|
||
router = APIRouter()
|
||
pipeline = build_video_pipeline()
|
||
|
||
@router.post("/video/generate-full")
|
||
async def generate_full_video(
|
||
request: FullVideoRequest,
|
||
background_tasks: BackgroundTasks
|
||
):
|
||
"""단일 API 호출로 전체 파이프라인 실행"""
|
||
|
||
initial_state: PipelineState = {
|
||
"task_id": str(uuid7()),
|
||
"customer_name": request.customer_name,
|
||
"region": request.region,
|
||
"detail_info": request.detail_info,
|
||
"language": request.language,
|
||
"images": request.images,
|
||
"orientation": request.orientation,
|
||
"lyric": None,
|
||
"lyric_status": None,
|
||
"song_url": None,
|
||
"song_task_id": None,
|
||
"song_status": None,
|
||
"song_duration": None,
|
||
"video_url": None,
|
||
"video_render_id": None,
|
||
"video_status": None,
|
||
"error": None,
|
||
"error_step": None,
|
||
"started_at": datetime.now(),
|
||
"completed_at": None,
|
||
"retry_count": 0
|
||
}
|
||
|
||
# 백그라운드에서 파이프라인 실행
|
||
background_tasks.add_task(run_pipeline_async, initial_state)
|
||
|
||
return {
|
||
"task_id": initial_state["task_id"],
|
||
"status": "processing",
|
||
"message": "Pipeline started. Use GET /video/pipeline-status/{task_id} to check progress."
|
||
}
|
||
|
||
async def run_pipeline_async(initial_state: PipelineState):
|
||
"""백그라운드에서 LangGraph 파이프라인 실행"""
|
||
try:
|
||
final_state = await pipeline.ainvoke(initial_state)
|
||
|
||
# 결과 DB 저장
|
||
await save_pipeline_result(final_state)
|
||
|
||
# 완료 알림 (웹훅, 이메일 등)
|
||
if final_state.get("video_url"):
|
||
await send_completion_notification(final_state)
|
||
|
||
except Exception as e:
|
||
await log_pipeline_error(initial_state["task_id"], str(e))
|
||
|
||
@router.get("/video/pipeline-status/{task_id}")
|
||
async def get_pipeline_status(task_id: str):
|
||
"""파이프라인 진행 상태 조회"""
|
||
status = await get_status_from_db(task_id)
|
||
|
||
return {
|
||
"task_id": task_id,
|
||
"lyric_status": status.lyric_status,
|
||
"song_status": status.song_status,
|
||
"video_status": status.video_status,
|
||
"overall_status": determine_overall_status(status),
|
||
"video_url": status.video_url if status.video_status == "completed" else None,
|
||
"error": status.error
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 4. RAG 적용 설계
|
||
|
||
### 4.1 적용 대상 및 목적
|
||
|
||
RAG(Retrieval-Augmented Generation)는 **외부 지식 기반을 검색하여 LLM 응답 품질을 향상**시키는 기법입니다.
|
||
|
||
**적용 대상:**
|
||
1. 마케팅 지식베이스 (성공 사례)
|
||
2. 지역별/업종별 가사 예시
|
||
3. 이미지 메타데이터 활용
|
||
4. 프롬프트 최적화
|
||
|
||
### 4.2 설계 1: 마케팅 지식베이스 RAG
|
||
|
||
**아키텍처:**
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────────────┐
|
||
│ 마케팅 지식베이스 │
|
||
├─────────────────────────────────────────────────────────────┤
|
||
│ │
|
||
│ Document Store (벡터 DB) │
|
||
│ ┌─────────────────────────────────────────────────────┐ │
|
||
│ │ Collection: marketing_knowledge │ │
|
||
│ │ │ │
|
||
│ │ ┌──────────────────────────────────────────────┐ │ │
|
||
│ │ │ 문서 1: 강원도 속초 펜션 마케팅 성공 사례 │ │ │
|
||
│ │ │ - 가사 예시 │ │ │
|
||
│ │ │ - 타겟 고객 분석 │ │ │
|
||
│ │ │ - 효과적인 키워드 │ │ │
|
||
│ │ │ - 영상 조회수/반응 │ │ │
|
||
│ │ └──────────────────────────────────────────────┘ │ │
|
||
│ │ │ │
|
||
│ │ ┌──────────────────────────────────────────────┐ │ │
|
||
│ │ │ 문서 2: 제주도 게스트하우스 마케팅 사례 │ │ │
|
||
│ │ │ ... │ │ │
|
||
│ │ └──────────────────────────────────────────────┘ │ │
|
||
│ │ │ │
|
||
│ │ ... (수백 개의 사례) │ │
|
||
│ └─────────────────────────────────────────────────────┘ │
|
||
│ │
|
||
└─────────────────────────────────────────────────────────────┘
|
||
│
|
||
│ 유사도 검색
|
||
▼
|
||
┌─────────────────────────────────────────────────────────────┐
|
||
│ 가사 생성 프롬프트 │
|
||
├─────────────────────────────────────────────────────────────┤
|
||
│ "다음 유사 사례를 참고하여 가사를 생성하세요: │
|
||
│ [검색된 성공 사례 1] │
|
||
│ [검색된 성공 사례 2] │
|
||
│ ..." │
|
||
└─────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
**구현 코드:**
|
||
|
||
```python
|
||
from langchain.embeddings.openai import OpenAIEmbeddings
|
||
from langchain.vectorstores import Chroma
|
||
from langchain.document_loaders import JSONLoader
|
||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||
from langchain.schema import Document
|
||
|
||
# 1. 임베딩 모델 설정
|
||
embeddings = OpenAIEmbeddings(
|
||
model="text-embedding-3-small",
|
||
openai_api_key=settings.CHATGPT_API_KEY
|
||
)
|
||
|
||
# 2. 벡터 스토어 초기화
|
||
vector_store = Chroma(
|
||
collection_name="marketing_knowledge",
|
||
embedding_function=embeddings,
|
||
persist_directory="./data/chroma_db"
|
||
)
|
||
|
||
# 3. 마케팅 사례 문서 구조
|
||
class MarketingCase(BaseModel):
|
||
case_id: str
|
||
region: str
|
||
business_type: str # "pension", "guesthouse", "hotel"
|
||
target_audience: str
|
||
successful_lyrics: str
|
||
keywords: list[str]
|
||
performance_metrics: dict # views, engagement, conversions
|
||
created_at: datetime
|
||
|
||
# 4. 문서 추가 함수
|
||
async def add_marketing_case(case: MarketingCase):
|
||
"""성공한 마케팅 사례를 벡터 스토어에 추가"""
|
||
|
||
# 메타데이터와 함께 문서 생성
|
||
document = Document(
|
||
page_content=f"""
|
||
지역: {case.region}
|
||
업종: {case.business_type}
|
||
타겟 고객: {case.target_audience}
|
||
성공 가사:
|
||
{case.successful_lyrics}
|
||
효과적인 키워드: {', '.join(case.keywords)}
|
||
성과: 조회수 {case.performance_metrics.get('views', 0)},
|
||
참여율 {case.performance_metrics.get('engagement', 0)}%
|
||
""",
|
||
metadata={
|
||
"case_id": case.case_id,
|
||
"region": case.region,
|
||
"business_type": case.business_type,
|
||
"created_at": case.created_at.isoformat()
|
||
}
|
||
)
|
||
|
||
vector_store.add_documents([document])
|
||
vector_store.persist()
|
||
|
||
# 5. RAG 기반 가사 생성
|
||
async def generate_lyrics_with_rag(
|
||
customer_name: str,
|
||
region: str,
|
||
business_type: str,
|
||
language: str
|
||
) -> str:
|
||
"""RAG를 활용한 고품질 가사 생성"""
|
||
|
||
# 유사 사례 검색
|
||
query = f"{region} {business_type} 마케팅 가사"
|
||
similar_cases = vector_store.similarity_search(
|
||
query,
|
||
k=3,
|
||
filter={"business_type": business_type} # 같은 업종만
|
||
)
|
||
|
||
# 검색 결과를 프롬프트에 포함
|
||
examples_text = "\n\n".join([
|
||
f"### 참고 사례 {i+1}\n{doc.page_content}"
|
||
for i, doc in enumerate(similar_cases)
|
||
])
|
||
|
||
# LangChain 프롬프트 구성
|
||
rag_prompt = ChatPromptTemplate.from_messages([
|
||
("system", """당신은 마케팅 전문가이자 작사가입니다.
|
||
다음 성공 사례를 참고하여 새로운 가사를 생성하세요.
|
||
참고 사례의 스타일과 키워드를 활용하되, 고유한 내용을 만드세요.
|
||
|
||
{examples}"""),
|
||
("human", """
|
||
고객명: {customer_name}
|
||
지역: {region}
|
||
언어: {language}
|
||
|
||
위 정보를 바탕으로 8-12줄의 감성적인 마케팅 가사를 생성하세요.
|
||
""")
|
||
])
|
||
|
||
chain = rag_prompt | ChatOpenAI(model="gpt-5-mini") | StrOutputParser()
|
||
|
||
result = await chain.ainvoke({
|
||
"examples": examples_text,
|
||
"customer_name": customer_name,
|
||
"region": region,
|
||
"language": language
|
||
})
|
||
|
||
return result
|
||
```
|
||
|
||
### 4.3 설계 2: 지역별 특화 RAG
|
||
|
||
**목적:** 각 지역의 특성(문화, 관광지, 특산물 등)을 반영한 가사 생성
|
||
|
||
```python
|
||
# 지역 정보 문서 구조
|
||
class RegionInfo(BaseModel):
|
||
region_name: str
|
||
province: str
|
||
famous_attractions: list[str]
|
||
local_foods: list[str]
|
||
cultural_keywords: list[str]
|
||
seasonal_events: list[dict] # {"season": "summer", "event": "해수욕장 개장"}
|
||
atmosphere: list[str] # ["고즈넉한", "활기찬", "낭만적인"]
|
||
|
||
# 지역 정보 벡터 스토어
|
||
region_store = Chroma(
|
||
collection_name="region_knowledge",
|
||
embedding_function=embeddings,
|
||
persist_directory="./data/chroma_region"
|
||
)
|
||
|
||
# 지역 정보 추가 예시
|
||
regions_data = [
|
||
RegionInfo(
|
||
region_name="속초",
|
||
province="강원도",
|
||
famous_attractions=["설악산", "속초해변", "영금정", "아바이마을"],
|
||
local_foods=["오징어순대", "물회", "생선구이"],
|
||
cultural_keywords=["동해바다", "일출", "산과 바다", "청정자연"],
|
||
seasonal_events=[
|
||
{"season": "summer", "event": "속초해변 피서"},
|
||
{"season": "autumn", "event": "설악산 단풍"}
|
||
],
|
||
atmosphere=["시원한", "청량한", "자연친화적", "힐링"]
|
||
),
|
||
# ... 더 많은 지역
|
||
]
|
||
|
||
async def enrich_lyrics_with_region_info(
|
||
base_lyrics: str,
|
||
region: str
|
||
) -> str:
|
||
"""지역 정보로 가사 보강"""
|
||
|
||
# 지역 정보 검색
|
||
region_docs = region_store.similarity_search(region, k=1)
|
||
|
||
if not region_docs:
|
||
return base_lyrics
|
||
|
||
region_info = region_docs[0].page_content
|
||
|
||
# 가사에 지역 특성 반영
|
||
enrichment_prompt = ChatPromptTemplate.from_messages([
|
||
("system", """당신은 가사 편집 전문가입니다.
|
||
주어진 기본 가사에 지역의 특성을 자연스럽게 녹여내세요.
|
||
지역 정보:
|
||
{region_info}"""),
|
||
("human", """기본 가사:
|
||
{base_lyrics}
|
||
|
||
위 가사에 지역의 특성(명소, 분위기, 키워드)을 2-3개 자연스럽게 추가하세요.
|
||
원래 가사의 운율과 분위기를 유지하세요.""")
|
||
])
|
||
|
||
chain = enrichment_prompt | ChatOpenAI() | StrOutputParser()
|
||
|
||
return await chain.ainvoke({
|
||
"region_info": region_info,
|
||
"base_lyrics": base_lyrics
|
||
})
|
||
```
|
||
|
||
### 4.4 설계 3: 이미지 메타데이터 RAG
|
||
|
||
**목적:** 업로드된 이미지의 분석 결과를 저장하고, 영상 생성 시 최적의 이미지 순서 결정
|
||
|
||
```python
|
||
from langchain_openai import ChatOpenAI
|
||
|
||
# Vision 모델로 이미지 분석
|
||
vision_model = ChatOpenAI(model="gpt-5-mini")
|
||
|
||
# 이미지 분석 문서 구조
|
||
class ImageAnalysis(BaseModel):
|
||
image_url: str
|
||
task_id: str
|
||
description: str
|
||
dominant_colors: list[str]
|
||
mood: str # "warm", "cool", "neutral"
|
||
scene_type: str # "interior", "exterior", "nature", "food"
|
||
suggested_position: str # "opening", "middle", "closing"
|
||
quality_score: float # 0.0 ~ 1.0
|
||
|
||
# 이미지 메타데이터 벡터 스토어
|
||
image_store = Chroma(
|
||
collection_name="image_metadata",
|
||
embedding_function=embeddings,
|
||
persist_directory="./data/chroma_images"
|
||
)
|
||
|
||
async def analyze_and_store_image(image_url: str, task_id: str):
|
||
"""이미지 분석 후 벡터 스토어에 저장"""
|
||
|
||
# gpt-5-mini Vision으로 이미지 분석
|
||
analysis_response = await vision_model.ainvoke([
|
||
{
|
||
"type": "text",
|
||
"text": """이미지를 분석하고 다음 JSON 형식으로 응답하세요:
|
||
{
|
||
"description": "이미지 설명 (2-3문장)",
|
||
"dominant_colors": ["색상1", "색상2"],
|
||
"mood": "warm/cool/neutral 중 하나",
|
||
"scene_type": "interior/exterior/nature/food 중 하나",
|
||
"suggested_position": "opening/middle/closing 중 하나 (영상에서 적합한 위치)",
|
||
"quality_score": 0.0~1.0 (이미지 품질/선명도)
|
||
}"""
|
||
},
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {"url": image_url}
|
||
}
|
||
])
|
||
|
||
analysis = json.loads(analysis_response.content)
|
||
|
||
# 문서 생성 및 저장
|
||
document = Document(
|
||
page_content=f"""
|
||
이미지 설명: {analysis['description']}
|
||
분위기: {analysis['mood']}
|
||
장면 유형: {analysis['scene_type']}
|
||
추천 위치: {analysis['suggested_position']}
|
||
색상: {', '.join(analysis['dominant_colors'])}
|
||
""",
|
||
metadata={
|
||
"image_url": image_url,
|
||
"task_id": task_id,
|
||
**analysis
|
||
}
|
||
)
|
||
|
||
image_store.add_documents([document])
|
||
image_store.persist()
|
||
|
||
return ImageAnalysis(image_url=image_url, task_id=task_id, **analysis)
|
||
|
||
async def get_optimal_image_order(
|
||
task_id: str,
|
||
music_mood: str, # 음악 분위기
|
||
lyrics_theme: str # 가사 주제
|
||
) -> list[str]:
|
||
"""음악과 가사에 맞는 최적의 이미지 순서 결정"""
|
||
|
||
# 해당 task의 모든 이미지 조회
|
||
all_images = image_store.get(
|
||
where={"task_id": task_id}
|
||
)
|
||
|
||
# 음악/가사 분위기에 맞는 이미지 우선 검색
|
||
query = f"{music_mood} {lyrics_theme} 마케팅 영상"
|
||
sorted_images = image_store.similarity_search(
|
||
query,
|
||
k=len(all_images),
|
||
filter={"task_id": task_id}
|
||
)
|
||
|
||
# 이미지 순서 결정 로직
|
||
opening_images = [img for img in sorted_images if img.metadata["suggested_position"] == "opening"]
|
||
middle_images = [img for img in sorted_images if img.metadata["suggested_position"] == "middle"]
|
||
closing_images = [img for img in sorted_images if img.metadata["suggested_position"] == "closing"]
|
||
|
||
# 품질 점수로 정렬
|
||
opening_images.sort(key=lambda x: x.metadata["quality_score"], reverse=True)
|
||
closing_images.sort(key=lambda x: x.metadata["quality_score"], reverse=True)
|
||
|
||
# 최종 순서
|
||
ordered = (
|
||
opening_images[:2] + # 시작 2장
|
||
middle_images + # 중간 이미지들
|
||
closing_images[:1] # 마무리 1장
|
||
)
|
||
|
||
return [img.metadata["image_url"] for img in ordered]
|
||
```
|
||
|
||
### 4.5 설계 4: 프롬프트 히스토리 RAG
|
||
|
||
**목적:** 과거 성공/실패한 프롬프트를 학습하여 프롬프트 품질 지속 개선
|
||
|
||
```python
|
||
# 프롬프트 결과 문서 구조
|
||
class PromptResult(BaseModel):
|
||
prompt_id: str
|
||
prompt_text: str
|
||
result_text: str
|
||
success: bool
|
||
failure_reason: Optional[str]
|
||
category: str # "lyric", "marketing_analysis", "region_enrichment"
|
||
metrics: dict # {"length": 10, "contains_region_keyword": True, ...}
|
||
created_at: datetime
|
||
|
||
# 프롬프트 히스토리 벡터 스토어
|
||
prompt_store = Chroma(
|
||
collection_name="prompt_history",
|
||
embedding_function=embeddings,
|
||
persist_directory="./data/chroma_prompts"
|
||
)
|
||
|
||
async def log_prompt_result(result: PromptResult):
|
||
"""프롬프트 결과 기록"""
|
||
|
||
document = Document(
|
||
page_content=f"""
|
||
프롬프트: {result.prompt_text}
|
||
결과: {result.result_text[:500]}...
|
||
성공 여부: {'성공' if result.success else '실패'}
|
||
실패 사유: {result.failure_reason or 'N/A'}
|
||
""",
|
||
metadata={
|
||
"prompt_id": result.prompt_id,
|
||
"success": result.success,
|
||
"category": result.category,
|
||
"created_at": result.created_at.isoformat(),
|
||
**result.metrics
|
||
}
|
||
)
|
||
|
||
prompt_store.add_documents([document])
|
||
|
||
async def get_improved_prompt(
|
||
base_prompt: str,
|
||
category: str
|
||
) -> str:
|
||
"""과거 결과를 기반으로 프롬프트 개선"""
|
||
|
||
# 유사한 성공 프롬프트 검색
|
||
successful_prompts = prompt_store.similarity_search(
|
||
base_prompt,
|
||
k=3,
|
||
filter={"success": True, "category": category}
|
||
)
|
||
|
||
# 유사한 실패 프롬프트 검색 (피해야 할 패턴)
|
||
failed_prompts = prompt_store.similarity_search(
|
||
base_prompt,
|
||
k=2,
|
||
filter={"success": False, "category": category}
|
||
)
|
||
|
||
# 프롬프트 개선 요청
|
||
improvement_prompt = ChatPromptTemplate.from_messages([
|
||
("system", """당신은 프롬프트 엔지니어링 전문가입니다.
|
||
|
||
다음 성공/실패 사례를 참고하여 주어진 프롬프트를 개선하세요.
|
||
|
||
### 성공 사례 (참고):
|
||
{successful_examples}
|
||
|
||
### 실패 사례 (피할 것):
|
||
{failed_examples}
|
||
|
||
### 개선 원칙:
|
||
1. 성공 사례의 패턴을 따르세요
|
||
2. 실패 사례의 패턴을 피하세요
|
||
3. 명확하고 구체적인 지시를 포함하세요
|
||
4. 출력 형식을 명시하세요"""),
|
||
("human", """개선할 프롬프트:
|
||
{base_prompt}
|
||
|
||
위 프롬프트를 개선하세요. 개선된 프롬프트만 출력하세요.""")
|
||
])
|
||
|
||
chain = improvement_prompt | ChatOpenAI() | StrOutputParser()
|
||
|
||
improved = await chain.ainvoke({
|
||
"successful_examples": "\n---\n".join([doc.page_content for doc in successful_prompts]),
|
||
"failed_examples": "\n---\n".join([doc.page_content for doc in failed_prompts]),
|
||
"base_prompt": base_prompt
|
||
})
|
||
|
||
return improved
|
||
```
|
||
|
||
---
|
||
|
||
## 5. 통합 아키텍처
|
||
|
||
### 5.1 전체 시스템 아키텍처
|
||
|
||
```
|
||
┌──────────────────────────────────────────────────────────────────────────┐
|
||
│ FastAPI 라우터 │
|
||
│ /lyric/generate, /song/generate, /video/generate, /video/generate-full │
|
||
└───────────────────────────────────┬──────────────────────────────────────┘
|
||
│
|
||
▼
|
||
┌──────────────────────────────────────────────────────────────────────────┐
|
||
│ LangGraph 파이프라인 │
|
||
│ ┌─────────────────────────────────────────────────────────────────┐ │
|
||
│ │ StateGraph (상태 기계) │ │
|
||
│ │ │ │
|
||
│ │ [generate_lyric] → [request_song] → [poll_song] │ │
|
||
│ │ │ │ │ │
|
||
│ │ │ ▼ │ │
|
||
│ │ │ [request_video] → [poll_video] → END │ │
|
||
│ │ │ │ │ │
|
||
│ │ └─────────────────────┴──────────→ [handle_error] │ │
|
||
│ └─────────────────────────────────────────────────────────────────┘ │
|
||
│ │ │
|
||
│ ┌───────────────┼───────────────┐ │
|
||
│ │ │ │ │
|
||
│ ▼ ▼ ▼ │
|
||
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
||
│ │ LangChain │ │ RAG │ │ External │ │
|
||
│ │ Components │ │ Vector DBs │ │ APIs │ │
|
||
│ └─────────────┘ └─────────────┘ └─────────────┘ │
|
||
└──────────────────────────────────────────────────────────────────────────┘
|
||
│ │ │
|
||
┌───────────┘ ┌──────────┘ ┌──────────┘
|
||
│ │ │
|
||
▼ ▼ ▼
|
||
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
||
│ Prompt │ │ Chroma │ │ OpenAI │
|
||
│ Templates │ │ Vector │ │ Suno │
|
||
│ Chains │ │ Store │ │ Creatomate │
|
||
│ Parsers │ │ │ │ │
|
||
└─────────────┘ └─────────────┘ └─────────────┘
|
||
│ │ │
|
||
└────────────────┴───────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────────┐
|
||
│ MySQL + Azure Blob │
|
||
│ (영구 저장소) │
|
||
└─────────────────────────┘
|
||
```
|
||
|
||
### 5.2 데이터 흐름
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────────────────────────┐
|
||
│ 데이터 흐름 │
|
||
├─────────────────────────────────────────────────────────────────────────┤
|
||
│ │
|
||
│ 1. 사용자 요청 │
|
||
│ ├── 고객 정보 (이름, 지역, 상세정보) │
|
||
│ └── 이미지 URL 리스트 │
|
||
│ │ │
|
||
│ ▼ │
|
||
│ 2. RAG 검색 (병렬) │
|
||
│ ├── 마케팅 지식베이스 → 유사 성공 사례 │
|
||
│ ├── 지역 정보베이스 → 지역 특성 │
|
||
│ └── 이미지 메타데이터 → 이미지 분석 │
|
||
│ │ │
|
||
│ ▼ │
|
||
│ 3. LangChain 프롬프트 구성 │
|
||
│ ├── 기본 템플릿 로드 │
|
||
│ ├── RAG 결과 주입 │
|
||
│ ├── Few-shot 예시 추가 │
|
||
│ └── 출력 형식 지정 │
|
||
│ │ │
|
||
│ ▼ │
|
||
│ 4. LangGraph 파이프라인 실행 │
|
||
│ ├── 가사 생성 (ChatGPT) │
|
||
│ ├── 음악 생성 (Suno, 폴링 자동화) │
|
||
│ └── 영상 생성 (Creatomate, 폴링 자동화) │
|
||
│ │ │
|
||
│ ▼ │
|
||
│ 5. 결과 저장 │
|
||
│ ├── MySQL: 메타데이터, 상태 │
|
||
│ ├── Azure Blob: 영상 파일 │
|
||
│ └── Chroma: 성공 사례 피드백 │
|
||
│ │ │
|
||
│ ▼ │
|
||
│ 6. 사용자 응답 │
|
||
│ └── 영상 URL, 상태, 메타데이터 │
|
||
│ │
|
||
└─────────────────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
### 5.3 디렉토리 구조 (신규)
|
||
|
||
```
|
||
app/
|
||
├── langchain/ # LangChain 관련
|
||
│ ├── __init__.py
|
||
│ ├── prompts/
|
||
│ │ ├── __init__.py
|
||
│ │ ├── lyric_prompts.py # 가사 생성 프롬프트
|
||
│ │ ├── marketing_prompts.py # 마케팅 분석 프롬프트
|
||
│ │ └── examples/ # Few-shot 예시
|
||
│ │ ├── korean.json
|
||
│ │ ├── english.json
|
||
│ │ └── ...
|
||
│ ├── chains/
|
||
│ │ ├── __init__.py
|
||
│ │ ├── lyric_chain.py # 가사 생성 체인
|
||
│ │ └── marketing_chain.py # 마케팅 분석 체인
|
||
│ └── parsers/
|
||
│ ├── __init__.py
|
||
│ └── lyric_parser.py # 가사 출력 파서
|
||
│
|
||
├── langgraph/ # LangGraph 관련
|
||
│ ├── __init__.py
|
||
│ ├── states/
|
||
│ │ ├── __init__.py
|
||
│ │ └── pipeline_state.py # 파이프라인 상태 정의
|
||
│ ├── nodes/
|
||
│ │ ├── __init__.py
|
||
│ │ ├── lyric_node.py # 가사 생성 노드
|
||
│ │ ├── song_node.py # 음악 생성 노드
|
||
│ │ ├── video_node.py # 영상 생성 노드
|
||
│ │ └── error_node.py # 에러 처리 노드
|
||
│ └── graphs/
|
||
│ ├── __init__.py
|
||
│ └── video_pipeline.py # 메인 파이프라인 그래프
|
||
│
|
||
├── rag/ # RAG 관련
|
||
│ ├── __init__.py
|
||
│ ├── stores/
|
||
│ │ ├── __init__.py
|
||
│ │ ├── marketing_store.py # 마케팅 지식베이스
|
||
│ │ ├── region_store.py # 지역 정보베이스
|
||
│ │ ├── image_store.py # 이미지 메타데이터
|
||
│ │ └── prompt_store.py # 프롬프트 히스토리
|
||
│ ├── loaders/
|
||
│ │ ├── __init__.py
|
||
│ │ └── case_loader.py # 사례 데이터 로더
|
||
│ └── retrievers/
|
||
│ ├── __init__.py
|
||
│ └── hybrid_retriever.py # 하이브리드 검색
|
||
│
|
||
└── data/ # 데이터 저장소
|
||
└── chroma_db/ # Chroma 벡터 DB
|
||
├── marketing_knowledge/
|
||
├── region_knowledge/
|
||
├── image_metadata/
|
||
└── prompt_history/
|
||
```
|
||
|
||
---
|
||
|
||
## 6. 기대 효과
|
||
|
||
### 6.1 정량적 기대 효과
|
||
|
||
| 지표 | 현재 | 목표 | 개선율 |
|
||
|------|------|------|--------|
|
||
| **가사 생성 품질** | 70% 만족도 | 90% 만족도 | +29% |
|
||
| **재작업률** | 30% | 10% | -67% |
|
||
| **파이프라인 실패율** | 15% | 5% | -67% |
|
||
| **평균 처리 시간** | 10분 (수동 개입 필요) | 8분 (완전 자동) | -20% |
|
||
| **다국어 품질** | 60% | 85% | +42% |
|
||
| **프롬프트 튜닝 시간** | 2시간/버전 | 30분/버전 | -75% |
|
||
|
||
### 6.2 정성적 기대 효과
|
||
|
||
#### 6.2.1 개발 생산성 향상
|
||
|
||
| 영역 | 효과 |
|
||
|------|------|
|
||
| **코드 유지보수** | 프롬프트와 비즈니스 로직 분리로 수정 용이 |
|
||
| **테스트 용이성** | 각 체인/노드 단위 테스트 가능 |
|
||
| **디버깅** | 상태 기계 기반으로 문제 지점 명확히 파악 |
|
||
| **확장성** | 새로운 AI 서비스 추가 시 노드만 추가하면 됨 |
|
||
|
||
#### 6.2.2 품질 향상
|
||
|
||
| 영역 | 효과 |
|
||
|------|------|
|
||
| **일관성** | 동일 조건에서 일관된 품질의 결과물 생성 |
|
||
| **지역 맞춤화** | RAG로 지역별 특성 자동 반영 |
|
||
| **학습 효과** | 성공 사례 축적으로 시간이 지날수록 품질 향상 |
|
||
| **에러 복구** | 자동 재시도 및 폴백으로 안정성 강화 |
|
||
|
||
#### 6.2.3 운영 효율성
|
||
|
||
| 영역 | 효과 |
|
||
|------|------|
|
||
| **모니터링** | 파이프라인 상태 추적으로 병목 지점 파악 |
|
||
| **비용 최적화** | 불필요한 API 호출 감소, 캐싱 활용 |
|
||
| **확장 대응** | 부하 증가 시 노드별 스케일링 가능 |
|
||
|
||
### 6.3 비즈니스 가치
|
||
|
||
```
|
||
┌────────────────────────────────────────────────────────────────┐
|
||
│ 비즈니스 가치 │
|
||
├────────────────────────────────────────────────────────────────┤
|
||
│ │
|
||
│ 1. 고객 만족도 향상 │
|
||
│ └── 고품질 가사/영상으로 마케팅 효과 증대 │
|
||
│ │
|
||
│ 2. 서비스 차별화 │
|
||
│ └── 지역 맞춤 콘텐츠로 경쟁사 대비 우위 │
|
||
│ │
|
||
│ 3. 운영 비용 절감 │
|
||
│ └── 자동화로 수동 개입 최소화 │
|
||
│ │
|
||
│ 4. 확장 가능성 │
|
||
│ └── 새로운 지역/업종 추가 시 RAG 학습만으로 대응 │
|
||
│ │
|
||
│ 5. 데이터 자산화 │
|
||
│ └── 축적된 성공 사례가 진입 장벽 역할 │
|
||
│ │
|
||
└────────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
---
|
||
|
||
## 7. 구현 로드맵
|
||
|
||
### 7.1 Phase 1: 기초 (1-2주)
|
||
|
||
**목표:** LangChain 기본 구조 구축
|
||
|
||
| 작업 | 설명 | 우선순위 |
|
||
|------|------|----------|
|
||
| 의존성 설치 | langchain, langchain-openai, chromadb | P0 |
|
||
| 프롬프트 템플릿 작성 | 가사 생성 프롬프트 이관 | P0 |
|
||
| 기본 체인 구현 | ChatGPT 서비스 LangChain으로 래핑 | P0 |
|
||
| 출력 파서 구현 | 가사 응답 검증 및 파싱 | P1 |
|
||
| 테스트 작성 | 체인 단위 테스트 | P1 |
|
||
|
||
**산출물:**
|
||
- `app/langchain/` 디렉토리 구조
|
||
- 가사 생성 LangChain 체인
|
||
- 기본 테스트 코드
|
||
|
||
### 7.2 Phase 2: 파이프라인 (2-3주)
|
||
|
||
**목표:** LangGraph 파이프라인 구축
|
||
|
||
| 작업 | 설명 | 우선순위 |
|
||
|------|------|----------|
|
||
| 상태 정의 | PipelineState TypedDict 작성 | P0 |
|
||
| 노드 구현 | 가사, 음악, 영상 생성 노드 | P0 |
|
||
| 그래프 구성 | 엣지 및 조건부 분기 정의 | P0 |
|
||
| 폴링 통합 | Suno, Creatomate 폴링 자동화 | P0 |
|
||
| 에러 처리 | 에러 노드 및 재시도 로직 | P1 |
|
||
| FastAPI 통합 | 새 엔드포인트 추가 | P1 |
|
||
|
||
**산출물:**
|
||
- `app/langgraph/` 디렉토리 구조
|
||
- 통합 파이프라인 그래프
|
||
- `/video/generate-full` 엔드포인트
|
||
|
||
### 7.3 Phase 3: RAG (2-3주)
|
||
|
||
**목표:** 지식베이스 구축 및 RAG 통합
|
||
|
||
| 작업 | 설명 | 우선순위 |
|
||
|------|------|----------|
|
||
| Chroma 설정 | 벡터 스토어 초기화 | P0 |
|
||
| 마케팅 사례 수집 | 기존 성공 사례 데이터화 | P0 |
|
||
| 지역 정보 구축 | 주요 지역 정보 입력 | P1 |
|
||
| 검색 통합 | 가사 생성 시 RAG 적용 | P0 |
|
||
| 이미지 분석 | Vision API 연동 | P2 |
|
||
| 프롬프트 히스토리 | 자동 학습 파이프라인 | P2 |
|
||
|
||
**산출물:**
|
||
- `app/rag/` 디렉토리 구조
|
||
- 마케팅/지역 지식베이스
|
||
- RAG 통합 가사 생성
|
||
|
||
### 7.4 Phase 4: 고도화 (2-3주)
|
||
|
||
**목표:** 최적화 및 모니터링
|
||
|
||
| 작업 | 설명 | 우선순위 |
|
||
|------|------|----------|
|
||
| 성능 최적화 | 캐싱, 병렬 처리 개선 | P1 |
|
||
| 모니터링 | 파이프라인 상태 대시보드 | P1 |
|
||
| A/B 테스팅 | 프롬프트 버전 비교 | P2 |
|
||
| 문서화 | API 문서, 운영 가이드 | P1 |
|
||
| 부하 테스트 | 동시 요청 처리 검증 | P2 |
|
||
|
||
**산출물:**
|
||
- 최적화된 파이프라인
|
||
- 모니터링 대시보드
|
||
- 완성된 문서
|
||
|
||
---
|
||
|
||
## 8. 결론
|
||
|
||
### 8.1 요약
|
||
|
||
CastAD 백엔드에 LangChain, LangGraph, RAG를 적용하면:
|
||
|
||
1. **LangChain**: 프롬프트 관리 체계화, 다단계 체인 구성, 출력 검증 자동화
|
||
2. **LangGraph**: 복잡한 파이프라인 상태 관리, 폴링 자동화, 에러 처리 강화
|
||
3. **RAG**: 과거 성공 사례 활용, 지역별 맞춤화, 지속적 품질 개선
|
||
|
||
### 8.2 핵심 가치
|
||
|
||
```
|
||
┌───────────────────────────────────────────────────────────┐
|
||
│ │
|
||
│ 현재: 각 단계가 독립적 → 상태 관리 어려움 │
|
||
│ 개선: 통합 파이프라인 → 자동화된 상태 추적 │
|
||
│ │
|
||
│ 현재: 하드코딩 프롬프트 → 수정 어려움 │
|
||
│ 개선: 템플릿 기반 → 유연한 프롬프트 관리 │
|
||
│ │
|
||
│ 현재: 과거 데이터 미활용 → 일관성 없는 품질 │
|
||
│ 개선: RAG 지식베이스 → 축적된 노하우 활용 │
|
||
│ │
|
||
└───────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
### 8.3 권장 사항
|
||
|
||
1. **단계적 도입**: Phase 1(LangChain)부터 시작하여 검증 후 확장
|
||
2. **기존 API 유지**: 새 엔드포인트 추가 방식으로 호환성 보장
|
||
3. **데이터 축적 우선**: RAG 효과를 위해 초기 사례 데이터 확보 중요
|
||
4. **모니터링 병행**: 각 단계별 성과 측정으로 ROI 검증
|
||
|
||
---
|
||
|
||
## 부록
|
||
|
||
### A. 필요 의존성
|
||
|
||
```toml
|
||
# pyproject.toml 추가 의존성
|
||
[project.dependencies]
|
||
langchain = ">=0.1.0"
|
||
langchain-openai = ">=0.0.5"
|
||
langchain-community = ">=0.0.20"
|
||
langgraph = ">=0.0.30"
|
||
chromadb = ">=0.4.22"
|
||
tiktoken = ">=0.5.2"
|
||
```
|
||
|
||
### B. 환경 변수 추가
|
||
|
||
```env
|
||
# .env 추가
|
||
CHROMA_PERSIST_DIR=./data/chroma_db
|
||
LANGCHAIN_TRACING_V2=true # 선택: LangSmith 모니터링
|
||
LANGCHAIN_API_KEY=xxx # 선택: LangSmith 모니터링
|
||
```
|
||
|
||
### C. 참고 자료
|
||
|
||
- [LangChain Documentation](https://python.langchain.com/)
|
||
- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/)
|
||
- [Chroma Documentation](https://docs.trychroma.com/)
|
||
- [OpenAI Cookbook](https://cookbook.openai.com/)
|
||
|
||
---
|
||
|
||
*이 보고서는 CastAD 백엔드 프로젝트 분석을 기반으로 작성되었습니다.*
|
||
*작성일: 2025-12-28*
|