67 KiB
CastAD 백엔드 - LangChain, LangGraph, RAG 적용 설계 보고서
목차
1. 현재 시스템 분석
1.1 프로젝트 개요
CastAD는 AI 기반 광고 음악 및 영상 자동 생성 서비스입니다. 네이버 지도에서 수집한 숙박시설 정보를 기반으로 마케팅용 자동 영상을 생성하는 통합 플랫폼입니다.
핵심 파이프라인:
사용자 입력 → 가사 자동 생성 → 음악 자동 생성 → 영상 자동 생성
1.2 현재 기술 스택
| 구분 | 기술 |
|---|---|
| Backend Framework | FastAPI (async/await 기반) |
| ORM | SQLAlchemy 2.0 (비동기) |
| Database | MySQL (asyncmy 드라이버) |
| Cache | Redis |
| AI/API | OpenAI ChatGPT, Suno AI, Creatomate |
| Storage | Azure Blob Storage |
1.3 현재 핵심 흐름
┌─────────────────────────────────────────────────────────────────┐
│ 현재 파이프라인 구조 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ POST /crawling (선택) │
│ │ │
│ ▼ │
│ POST /lyric/generate ──────► ChatGPT API ──────► 가사 저장 │
│ │ │
│ ▼ │
│ POST /song/generate ───────► Suno API ─────────► 음악 저장 │
│ │ │ │
│ │ 클라이언트 폴링 │
│ │ │ │
│ ▼ ▼ │
│ POST /video/generate ──────► Creatomate API ───► 영상 저장 │
│ │ │ │
│ │ 클라이언트 폴링 │
│ ▼ ▼ │
│ GET /video/download ◄──────── 완료 ──────────► Azure Blob │
│ │
└─────────────────────────────────────────────────────────────────┘
1.4 현재 시스템의 한계점
| 문제점 | 설명 |
|---|---|
| 분산된 상태 관리 | 각 API 호출마다 독립적인 상태 관리, 전체 파이프라인 추적 어려움 |
| 클라이언트 의존적 폴링 | 음악/영상 생성 완료 여부를 클라이언트가 반복 확인해야 함 |
| 하드코딩된 프롬프트 | ChatGPT 프롬프트가 코드에 직접 작성, 유연성 부족 |
| 에러 복구 제한적 | 단순 실패 패턴 검사만 수행, 자동 복구 메커니즘 없음 |
| 과거 데이터 미활용 | 성공한 가사/마케팅 사례 재활용 불가 |
| 일관성 없는 품질 | 동일 조건에서도 결과물 품질 편차 존재 |
2. LangChain 적용 설계
2.1 적용 대상 및 목적
LangChain은 LLM 애플리케이션 개발을 위한 프레임워크로, 프롬프트 관리, 체인 구성, 출력 파싱 등을 체계화합니다.
적용 대상:
- 가사 생성 서비스 (
ChatgptService) - 마케팅 분석 서비스
- 다국어 처리 로직
2.2 설계 1: 프롬프트 템플릿 시스템
현재 문제:
# 현재: chatgpt_prompt.py
prompt = f"""
[ROLE] You are a marketing expert...
[INPUT] Customer: {customer_name}, Region: {region}...
"""
개선 설계:
# 개선: langchain 적용
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 가사 생성 프롬프트 템플릿
LYRIC_PROMPT = ChatPromptTemplate.from_messages([
("system", """[ROLE] You are a marketing expert and professional lyricist.
You specialize in creating catchy, emotional lyrics for travel and accommodation marketing.
[LANGUAGE REQUIREMENT]
Output MUST be 100% in {language}. No other languages allowed."""),
("human", """[INPUT]
Customer Name: {customer_name}
Region: {region}
Detailed Information: {detail_info}
[OUTPUT REQUIREMENTS]
- 8-12 lines of lyrics
- Focus on: relaxation, healing, beautiful scenery, memorable experiences
- Style: warm, inviting, poetic
- Include location-specific imagery
Generate lyrics now:""")
])
# 체인 구성
lyric_chain = LYRIC_PROMPT | ChatOpenAI(model="gpt-5-mini-mini") | StrOutputParser()
# 사용
result = await lyric_chain.ainvoke({
"customer_name": "스테이뫰",
"region": "강원도 속초",
"detail_info": "해변 근처 펜션",
"language": "Korean"
})
이점:
- 프롬프트 버전 관리 용이
- A/B 테스팅 지원
- 입력 변수 명확한 정의
2.3 설계 2: 다단계 마케팅 분석 체인
목적: 복잡한 마케팅 분석을 단계별로 수행하여 품질 향상
from langchain.chains import SequentialChain
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
# Step 1: 경쟁사 분석 체인
competitor_prompt = PromptTemplate(
input_variables=["region", "business_type"],
template="""
{region} 지역의 {business_type} 업종에 대해 분석하세요:
- 주요 경쟁사 특성
- 차별화 포인트
- 시장 포지셔닝
"""
)
competitor_chain = competitor_prompt | ChatOpenAI() | StrOutputParser()
# Step 2: 타겟 고객 분석 체인
audience_prompt = PromptTemplate(
input_variables=["region", "competitor_analysis"],
template="""
경쟁사 분석 결과: {competitor_analysis}
{region} 지역의 주요 타겟 고객층을 분석하세요:
- 연령대 및 특성
- 주요 니즈
- 결정 요인
"""
)
audience_chain = audience_prompt | ChatOpenAI() | StrOutputParser()
# Step 3: 마케팅 전략 종합 체인
strategy_prompt = PromptTemplate(
input_variables=["customer_name", "competitor_analysis", "audience_analysis"],
template="""
경쟁사 분석: {competitor_analysis}
타겟 고객: {audience_analysis}
{customer_name}을 위한 마케팅 전략을 제안하세요:
- 핵심 메시지
- 차별화 전략
- 추천 가사 방향
"""
)
strategy_chain = strategy_prompt | ChatOpenAI() | StrOutputParser()
# 통합 순차 체인
marketing_analysis_chain = (
{"region": RunnablePassthrough(), "business_type": RunnablePassthrough()}
| competitor_chain
| {"competitor_analysis": RunnablePassthrough(), "region": RunnablePassthrough()}
| audience_chain
| {"competitor_analysis": ..., "audience_analysis": RunnablePassthrough(), "customer_name": ...}
| strategy_chain
)
이점:
- 분석의 깊이와 체계성 향상
- 각 단계별 결과 추적 가능
- 중간 결과 캐싱 가능
2.4 설계 3: 출력 파싱 및 검증
목적: ChatGPT 응답의 구조화 및 자동 검증
from langchain.output_parsers import PydanticOutputParser
from langchain_core.output_parsers import OutputFixingParser
from pydantic import BaseModel, Field, validator
# 가사 출력 스키마
class LyricOutput(BaseModel):
title: str = Field(description="가사의 제목 (선택)")
lyrics: list[str] = Field(description="가사 각 줄", min_items=8, max_items=12)
mood: str = Field(description="가사의 분위기: warm, energetic, romantic 등")
@validator('lyrics')
def validate_line_count(cls, v):
if len(v) < 8:
raise ValueError("가사는 최소 8줄 이상이어야 합니다")
return v
# 파서 생성
parser = PydanticOutputParser(pydantic_object=LyricOutput)
# 자동 수정 파서 (파싱 실패 시 LLM으로 재시도)
fixing_parser = OutputFixingParser.from_llm(
parser=parser,
llm=ChatOpenAI(model="gpt-5-mini-mini")
)
# 프롬프트에 포맷 지시 추가
prompt_with_format = LYRIC_PROMPT.partial(
format_instructions=parser.get_format_instructions()
)
이점:
- 응답 형식 일관성 보장
- 자동 오류 복구
- 타입 안전성 확보
2.5 설계 4: Few-Shot 다국어 프롬프트
목적: 각 언어별 고품질 예시 제공으로 번역/생성 품질 향상
from langchain.prompts import FewShotPromptTemplate, PromptTemplate
# 언어별 예시
LANGUAGE_EXAMPLES = {
"Korean": [
{
"input": "강원도 속초 해변 펜션",
"output": """푸른 바다 물결 위에
새벽빛이 춤을 추고
당신의 하루를 담아
스테이뫰에서 쉬어가요"""
}
],
"English": [
{
"input": "Sokcho beach pension, Gangwon-do",
"output": """Where ocean waves meet morning light
A peaceful haven comes in sight
Let your worries drift away
At Stay Meoum, find your stay"""
}
],
"Japanese": [
{
"input": "江原道束草ビーチペンション",
"output": """青い海の波の上に
朝の光が踊る時
あなたの一日を包み込む
ステイメウムで休んでいこう"""
}
],
"Chinese": [...],
"Thai": [...],
"Vietnamese": [...]
}
# Few-Shot 프롬프트 생성
def create_multilingual_prompt(language: str):
example_prompt = PromptTemplate(
input_variables=["input", "output"],
template="입력: {input}\n가사:\n{output}"
)
return FewShotPromptTemplate(
examples=LANGUAGE_EXAMPLES.get(language, LANGUAGE_EXAMPLES["Korean"]),
example_prompt=example_prompt,
prefix="다음 예시를 참고하여 고품질 가사를 생성하세요:",
suffix="입력: {customer_info}\n가사:",
input_variables=["customer_info"]
)
이점:
- 언어별 문화적 뉘앙스 반영
- 일관된 스타일 유지
- 번역 품질 대폭 향상
3. LangGraph 적용 설계
3.1 적용 대상 및 목적
LangGraph는 복잡한 다단계 워크플로우를 상태 기계(State Machine)로 관리하는 프레임워크입니다.
적용 대상:
- 전체 영상 생성 파이프라인 (가사 → 음악 → 영상)
- 비동기 폴링 자동화
- 에러 처리 및 재시도 로직
3.2 설계 1: 통합 파이프라인 그래프
핵심 설계:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Optional, Literal
from datetime import datetime
# 파이프라인 상태 정의
class PipelineState(TypedDict):
# 입력
task_id: str
customer_name: str
region: str
detail_info: str
language: str
images: list[str]
orientation: Literal["vertical", "horizontal"]
# 중간 결과
lyric: Optional[str]
lyric_status: Optional[str]
song_url: Optional[str]
song_task_id: Optional[str]
song_status: Optional[str]
song_duration: Optional[float]
video_url: Optional[str]
video_render_id: Optional[str]
video_status: Optional[str]
# 메타데이터
error: Optional[str]
error_step: Optional[str]
started_at: datetime
completed_at: Optional[datetime]
retry_count: int
# 그래프 빌더
def build_video_pipeline() -> StateGraph:
graph = StateGraph(PipelineState)
# ===== 노드 정의 =====
# 1. 가사 생성 노드
async def generate_lyric(state: PipelineState) -> PipelineState:
"""ChatGPT로 가사 생성 (동기)"""
try:
lyric_chain = create_lyric_chain() # LangChain 체인
lyric = await lyric_chain.ainvoke({
"customer_name": state["customer_name"],
"region": state["region"],
"detail_info": state["detail_info"],
"language": state["language"]
})
# DB 저장
await save_lyric_to_db(state["task_id"], lyric)
return {
**state,
"lyric": lyric,
"lyric_status": "completed"
}
except Exception as e:
return {
**state,
"error": str(e),
"error_step": "lyric_generation",
"lyric_status": "failed"
}
# 2. 음악 생성 요청 노드
async def request_song(state: PipelineState) -> PipelineState:
"""Suno API에 음악 생성 요청"""
try:
suno = SunoAPIClient()
task_id = await suno.generate(
prompt=state["lyric"],
genre="K-Pop, Emotional"
)
# DB 저장
await save_song_request_to_db(state["task_id"], task_id)
return {
**state,
"song_task_id": task_id,
"song_status": "processing"
}
except Exception as e:
return {
**state,
"error": str(e),
"error_step": "song_request",
"song_status": "failed"
}
# 3. 음악 폴링 노드
async def poll_song_status(state: PipelineState) -> PipelineState:
"""Suno 상태 폴링 (최대 5분)"""
suno = SunoAPIClient()
max_attempts = 60 # 5초 간격 × 60 = 5분
for attempt in range(max_attempts):
result = await suno.get_task_status(state["song_task_id"])
if result["status"] == "SUCCESS":
audio_url = result["clips"][0]["audio_url"]
duration = result["clips"][0]["duration"]
# DB 업데이트
await update_song_status(
state["task_id"],
"completed",
audio_url,
duration
)
return {
**state,
"song_url": audio_url,
"song_duration": duration,
"song_status": "completed"
}
elif result["status"] == "FAILED":
return {
**state,
"error": "Suno generation failed",
"error_step": "song_polling",
"song_status": "failed"
}
await asyncio.sleep(5) # 5초 대기
return {
**state,
"error": "Song generation timeout",
"error_step": "song_polling",
"song_status": "timeout"
}
# 4. 영상 생성 요청 노드
async def request_video(state: PipelineState) -> PipelineState:
"""Creatomate API에 영상 렌더링 요청"""
try:
creatomate = CreatomateClient()
render_id = await creatomate.render(
images=state["images"],
music_url=state["song_url"],
lyrics=state["lyric"],
duration=state["song_duration"],
orientation=state["orientation"]
)
# DB 저장
await save_video_request_to_db(state["task_id"], render_id)
return {
**state,
"video_render_id": render_id,
"video_status": "processing"
}
except Exception as e:
return {
**state,
"error": str(e),
"error_step": "video_request",
"video_status": "failed"
}
# 5. 영상 폴링 노드
async def poll_video_status(state: PipelineState) -> PipelineState:
"""Creatomate 상태 폴링 (최대 10분)"""
creatomate = CreatomateClient()
max_attempts = 120 # 5초 간격 × 120 = 10분
for attempt in range(max_attempts):
result = await creatomate.get_render_status(state["video_render_id"])
if result["status"] == "succeeded":
video_url = result["url"]
# Azure Blob 업로드
blob_url = await upload_to_azure(video_url, state["task_id"])
# DB 업데이트
await update_video_status(state["task_id"], "completed", blob_url)
return {
**state,
"video_url": blob_url,
"video_status": "completed",
"completed_at": datetime.now()
}
elif result["status"] == "failed":
return {
**state,
"error": "Creatomate rendering failed",
"error_step": "video_polling",
"video_status": "failed"
}
await asyncio.sleep(5)
return {
**state,
"error": "Video generation timeout",
"error_step": "video_polling",
"video_status": "timeout"
}
# 6. 에러 처리 노드
async def handle_error(state: PipelineState) -> PipelineState:
"""에러 로깅 및 알림"""
await log_pipeline_error(
task_id=state["task_id"],
error=state["error"],
step=state["error_step"]
)
# 선택: 슬랙/이메일 알림
await send_error_notification(state)
return state
# ===== 노드 추가 =====
graph.add_node("generate_lyric", generate_lyric)
graph.add_node("request_song", request_song)
graph.add_node("poll_song", poll_song_status)
graph.add_node("request_video", request_video)
graph.add_node("poll_video", poll_video_status)
graph.add_node("handle_error", handle_error)
# ===== 엣지 정의 =====
# 시작점
graph.set_entry_point("generate_lyric")
# 조건부 분기: 가사 생성 후
def route_after_lyric(state: PipelineState):
if state.get("error"):
return "handle_error"
return "request_song"
graph.add_conditional_edges(
"generate_lyric",
route_after_lyric,
{
"request_song": "request_song",
"handle_error": "handle_error"
}
)
# 조건부 분기: 음악 요청 후
def route_after_song_request(state: PipelineState):
if state.get("error"):
return "handle_error"
return "poll_song"
graph.add_conditional_edges(
"request_song",
route_after_song_request
)
# 조건부 분기: 음악 폴링 후
def route_after_song_poll(state: PipelineState):
if state.get("error") or state["song_status"] in ["failed", "timeout"]:
return "handle_error"
return "request_video"
graph.add_conditional_edges(
"poll_song",
route_after_song_poll
)
# 조건부 분기: 영상 요청 후
graph.add_conditional_edges(
"request_video",
lambda s: "handle_error" if s.get("error") else "poll_video"
)
# 조건부 분기: 영상 폴링 후
graph.add_conditional_edges(
"poll_video",
lambda s: "handle_error" if s.get("error") else END
)
# 에러 핸들러는 항상 종료
graph.add_edge("handle_error", END)
return graph.compile()
그래프 시각화:
┌─────────────────┐
│ generate_lyric │
└────────┬────────┘
│
┌────────▼────────┐
┌─────┤ route check ├─────┐
│ └─────────────────┘ │
[error] [success]
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ handle_error │ │ request_song │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
END ┌─────────────────┐
│ poll_song │
└────────┬────────┘
│
┌────────▼────────┐
┌─────┤ route check ├─────┐
│ └─────────────────┘ │
[error/timeout] [success]
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ handle_error │ │ request_video │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
END ┌─────────────────┐
│ poll_video │
└────────┬────────┘
│
┌────────▼────────┐
┌─────┤ route check ├─────┐
│ └─────────────────┘ │
[error] [success]
│ │
▼ ▼
┌─────────────────┐ END
│ handle_error │ (파이프라인 완료)
└────────┬────────┘
│
▼
END
3.3 설계 2: 재시도 및 폴백 메커니즘
from langgraph.graph import StateGraph
class RetryState(TypedDict):
task_id: str
retry_count: int
max_retries: int
last_error: Optional[str]
# ... 기타 필드
def build_retry_aware_pipeline():
graph = StateGraph(RetryState)
async def generate_song_with_retry(state: RetryState) -> RetryState:
"""재시도 로직이 포함된 음악 생성"""
try:
# 1차 시도: Suno API
result = await suno_generate(state["lyric"])
return {**state, "song_url": result, "retry_count": 0}
except SunoRateLimitError:
# 재시도 1: 딜레이 후 재시도
if state["retry_count"] < state["max_retries"]:
await asyncio.sleep(30) # 30초 대기
return {
**state,
"retry_count": state["retry_count"] + 1,
"last_error": "rate_limit"
}
except SunoAPIError as e:
# 재시도 2: 프롬프트 수정 후 재시도
if "invalid lyrics" in str(e) and state["retry_count"] < 2:
simplified_lyric = await simplify_lyrics(state["lyric"])
return {
**state,
"lyric": simplified_lyric,
"retry_count": state["retry_count"] + 1
}
# 폴백: 대체 서비스 사용
try:
result = await alternative_music_service(state["lyric"])
return {**state, "song_url": result, "used_fallback": True}
except:
pass
return {
**state,
"error": "All music generation attempts failed",
"song_status": "failed"
}
# 조건부 재시도 엣지
def should_retry_song(state: RetryState):
if state.get("song_url"):
return "next_step"
if state["retry_count"] < state["max_retries"]:
return "retry_song"
return "handle_error"
graph.add_conditional_edges(
"generate_song",
should_retry_song,
{
"retry_song": "generate_song", # 자기 자신으로 루프
"next_step": "request_video",
"handle_error": "handle_error"
}
)
return graph.compile()
3.4 설계 3: 병렬 처리 지원
from langgraph.types import Send
from langgraph.graph import StateGraph
class ParallelState(TypedDict):
task_id: str
images: list[str]
analyzed_images: list[dict] # 병렬 분석 결과
# ...
def build_parallel_pipeline():
graph = StateGraph(ParallelState)
# 이미지 분석을 병렬로 수행
async def analyze_single_image(state: dict) -> dict:
"""단일 이미지 분석"""
image_url = state["image_url"]
analysis = await vision_model.analyze(image_url)
return {
"image_url": image_url,
"analysis": analysis,
"mood": analysis.get("mood"),
"colors": analysis.get("dominant_colors")
}
# 팬아웃: 여러 이미지를 병렬로 분석
def fanout_images(state: ParallelState):
return [
Send("analyze_image", {"image_url": img, "task_id": state["task_id"]})
for img in state["images"]
]
# 팬인: 분석 결과 수집
async def collect_analyses(state: ParallelState) -> ParallelState:
# LangGraph가 자동으로 병렬 결과를 수집
return state
graph.add_node("analyze_image", analyze_single_image)
graph.add_node("collect", collect_analyses)
graph.add_conditional_edges(
"start",
fanout_images # 여러 Send 반환 → 병렬 실행
)
return graph.compile()
3.5 FastAPI 통합
# main.py 또는 video/api/routers/v1/video.py
from fastapi import APIRouter, BackgroundTasks
from langgraph.graph import StateGraph
router = APIRouter()
pipeline = build_video_pipeline()
@router.post("/video/generate-full")
async def generate_full_video(
request: FullVideoRequest,
background_tasks: BackgroundTasks
):
"""단일 API 호출로 전체 파이프라인 실행"""
initial_state: PipelineState = {
"task_id": str(uuid7()),
"customer_name": request.customer_name,
"region": request.region,
"detail_info": request.detail_info,
"language": request.language,
"images": request.images,
"orientation": request.orientation,
"lyric": None,
"lyric_status": None,
"song_url": None,
"song_task_id": None,
"song_status": None,
"song_duration": None,
"video_url": None,
"video_render_id": None,
"video_status": None,
"error": None,
"error_step": None,
"started_at": datetime.now(),
"completed_at": None,
"retry_count": 0
}
# 백그라운드에서 파이프라인 실행
background_tasks.add_task(run_pipeline_async, initial_state)
return {
"task_id": initial_state["task_id"],
"status": "processing",
"message": "Pipeline started. Use GET /video/pipeline-status/{task_id} to check progress."
}
async def run_pipeline_async(initial_state: PipelineState):
"""백그라운드에서 LangGraph 파이프라인 실행"""
try:
final_state = await pipeline.ainvoke(initial_state)
# 결과 DB 저장
await save_pipeline_result(final_state)
# 완료 알림 (웹훅, 이메일 등)
if final_state.get("video_url"):
await send_completion_notification(final_state)
except Exception as e:
await log_pipeline_error(initial_state["task_id"], str(e))
@router.get("/video/pipeline-status/{task_id}")
async def get_pipeline_status(task_id: str):
"""파이프라인 진행 상태 조회"""
status = await get_status_from_db(task_id)
return {
"task_id": task_id,
"lyric_status": status.lyric_status,
"song_status": status.song_status,
"video_status": status.video_status,
"overall_status": determine_overall_status(status),
"video_url": status.video_url if status.video_status == "completed" else None,
"error": status.error
}
4. RAG 적용 설계
4.1 적용 대상 및 목적
RAG(Retrieval-Augmented Generation)는 외부 지식 기반을 검색하여 LLM 응답 품질을 향상시키는 기법입니다.
적용 대상:
- 마케팅 지식베이스 (성공 사례)
- 지역별/업종별 가사 예시
- 이미지 메타데이터 활용
- 프롬프트 최적화
4.2 설계 1: 마케팅 지식베이스 RAG
아키텍처:
┌─────────────────────────────────────────────────────────────┐
│ 마케팅 지식베이스 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Document Store (벡터 DB) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Collection: marketing_knowledge │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ 문서 1: 강원도 속초 펜션 마케팅 성공 사례 │ │ │
│ │ │ - 가사 예시 │ │ │
│ │ │ - 타겟 고객 분석 │ │ │
│ │ │ - 효과적인 키워드 │ │ │
│ │ │ - 영상 조회수/반응 │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ 문서 2: 제주도 게스트하우스 마케팅 사례 │ │ │
│ │ │ ... │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ... (수백 개의 사례) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
│
│ 유사도 검색
▼
┌─────────────────────────────────────────────────────────────┐
│ 가사 생성 프롬프트 │
├─────────────────────────────────────────────────────────────┤
│ "다음 유사 사례를 참고하여 가사를 생성하세요: │
│ [검색된 성공 사례 1] │
│ [검색된 성공 사례 2] │
│ ..." │
└─────────────────────────────────────────────────────────────┘
구현 코드:
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.document_loaders import JSONLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
# 1. 임베딩 모델 설정
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key=settings.CHATGPT_API_KEY
)
# 2. 벡터 스토어 초기화
vector_store = Chroma(
collection_name="marketing_knowledge",
embedding_function=embeddings,
persist_directory="./data/chroma_db"
)
# 3. 마케팅 사례 문서 구조
class MarketingCase(BaseModel):
case_id: str
region: str
business_type: str # "pension", "guesthouse", "hotel"
target_audience: str
successful_lyrics: str
keywords: list[str]
performance_metrics: dict # views, engagement, conversions
created_at: datetime
# 4. 문서 추가 함수
async def add_marketing_case(case: MarketingCase):
"""성공한 마케팅 사례를 벡터 스토어에 추가"""
# 메타데이터와 함께 문서 생성
document = Document(
page_content=f"""
지역: {case.region}
업종: {case.business_type}
타겟 고객: {case.target_audience}
성공 가사:
{case.successful_lyrics}
효과적인 키워드: {', '.join(case.keywords)}
성과: 조회수 {case.performance_metrics.get('views', 0)},
참여율 {case.performance_metrics.get('engagement', 0)}%
""",
metadata={
"case_id": case.case_id,
"region": case.region,
"business_type": case.business_type,
"created_at": case.created_at.isoformat()
}
)
vector_store.add_documents([document])
vector_store.persist()
# 5. RAG 기반 가사 생성
async def generate_lyrics_with_rag(
customer_name: str,
region: str,
business_type: str,
language: str
) -> str:
"""RAG를 활용한 고품질 가사 생성"""
# 유사 사례 검색
query = f"{region} {business_type} 마케팅 가사"
similar_cases = vector_store.similarity_search(
query,
k=3,
filter={"business_type": business_type} # 같은 업종만
)
# 검색 결과를 프롬프트에 포함
examples_text = "\n\n".join([
f"### 참고 사례 {i+1}\n{doc.page_content}"
for i, doc in enumerate(similar_cases)
])
# LangChain 프롬프트 구성
rag_prompt = ChatPromptTemplate.from_messages([
("system", """당신은 마케팅 전문가이자 작사가입니다.
다음 성공 사례를 참고하여 새로운 가사를 생성하세요.
참고 사례의 스타일과 키워드를 활용하되, 고유한 내용을 만드세요.
{examples}"""),
("human", """
고객명: {customer_name}
지역: {region}
언어: {language}
위 정보를 바탕으로 8-12줄의 감성적인 마케팅 가사를 생성하세요.
""")
])
chain = rag_prompt | ChatOpenAI(model="gpt-5-mini") | StrOutputParser()
result = await chain.ainvoke({
"examples": examples_text,
"customer_name": customer_name,
"region": region,
"language": language
})
return result
4.3 설계 2: 지역별 특화 RAG
목적: 각 지역의 특성(문화, 관광지, 특산물 등)을 반영한 가사 생성
# 지역 정보 문서 구조
class RegionInfo(BaseModel):
region_name: str
province: str
famous_attractions: list[str]
local_foods: list[str]
cultural_keywords: list[str]
seasonal_events: list[dict] # {"season": "summer", "event": "해수욕장 개장"}
atmosphere: list[str] # ["고즈넉한", "활기찬", "낭만적인"]
# 지역 정보 벡터 스토어
region_store = Chroma(
collection_name="region_knowledge",
embedding_function=embeddings,
persist_directory="./data/chroma_region"
)
# 지역 정보 추가 예시
regions_data = [
RegionInfo(
region_name="속초",
province="강원도",
famous_attractions=["설악산", "속초해변", "영금정", "아바이마을"],
local_foods=["오징어순대", "물회", "생선구이"],
cultural_keywords=["동해바다", "일출", "산과 바다", "청정자연"],
seasonal_events=[
{"season": "summer", "event": "속초해변 피서"},
{"season": "autumn", "event": "설악산 단풍"}
],
atmosphere=["시원한", "청량한", "자연친화적", "힐링"]
),
# ... 더 많은 지역
]
async def enrich_lyrics_with_region_info(
base_lyrics: str,
region: str
) -> str:
"""지역 정보로 가사 보강"""
# 지역 정보 검색
region_docs = region_store.similarity_search(region, k=1)
if not region_docs:
return base_lyrics
region_info = region_docs[0].page_content
# 가사에 지역 특성 반영
enrichment_prompt = ChatPromptTemplate.from_messages([
("system", """당신은 가사 편집 전문가입니다.
주어진 기본 가사에 지역의 특성을 자연스럽게 녹여내세요.
지역 정보:
{region_info}"""),
("human", """기본 가사:
{base_lyrics}
위 가사에 지역의 특성(명소, 분위기, 키워드)을 2-3개 자연스럽게 추가하세요.
원래 가사의 운율과 분위기를 유지하세요.""")
])
chain = enrichment_prompt | ChatOpenAI() | StrOutputParser()
return await chain.ainvoke({
"region_info": region_info,
"base_lyrics": base_lyrics
})
4.4 설계 3: 이미지 메타데이터 RAG
목적: 업로드된 이미지의 분석 결과를 저장하고, 영상 생성 시 최적의 이미지 순서 결정
from langchain_openai import ChatOpenAI
# Vision 모델로 이미지 분석
vision_model = ChatOpenAI(model="gpt-5-mini")
# 이미지 분석 문서 구조
class ImageAnalysis(BaseModel):
image_url: str
task_id: str
description: str
dominant_colors: list[str]
mood: str # "warm", "cool", "neutral"
scene_type: str # "interior", "exterior", "nature", "food"
suggested_position: str # "opening", "middle", "closing"
quality_score: float # 0.0 ~ 1.0
# 이미지 메타데이터 벡터 스토어
image_store = Chroma(
collection_name="image_metadata",
embedding_function=embeddings,
persist_directory="./data/chroma_images"
)
async def analyze_and_store_image(image_url: str, task_id: str):
"""이미지 분석 후 벡터 스토어에 저장"""
# gpt-5-mini Vision으로 이미지 분석
analysis_response = await vision_model.ainvoke([
{
"type": "text",
"text": """이미지를 분석하고 다음 JSON 형식으로 응답하세요:
{
"description": "이미지 설명 (2-3문장)",
"dominant_colors": ["색상1", "색상2"],
"mood": "warm/cool/neutral 중 하나",
"scene_type": "interior/exterior/nature/food 중 하나",
"suggested_position": "opening/middle/closing 중 하나 (영상에서 적합한 위치)",
"quality_score": 0.0~1.0 (이미지 품질/선명도)
}"""
},
{
"type": "image_url",
"image_url": {"url": image_url}
}
])
analysis = json.loads(analysis_response.content)
# 문서 생성 및 저장
document = Document(
page_content=f"""
이미지 설명: {analysis['description']}
분위기: {analysis['mood']}
장면 유형: {analysis['scene_type']}
추천 위치: {analysis['suggested_position']}
색상: {', '.join(analysis['dominant_colors'])}
""",
metadata={
"image_url": image_url,
"task_id": task_id,
**analysis
}
)
image_store.add_documents([document])
image_store.persist()
return ImageAnalysis(image_url=image_url, task_id=task_id, **analysis)
async def get_optimal_image_order(
task_id: str,
music_mood: str, # 음악 분위기
lyrics_theme: str # 가사 주제
) -> list[str]:
"""음악과 가사에 맞는 최적의 이미지 순서 결정"""
# 해당 task의 모든 이미지 조회
all_images = image_store.get(
where={"task_id": task_id}
)
# 음악/가사 분위기에 맞는 이미지 우선 검색
query = f"{music_mood} {lyrics_theme} 마케팅 영상"
sorted_images = image_store.similarity_search(
query,
k=len(all_images),
filter={"task_id": task_id}
)
# 이미지 순서 결정 로직
opening_images = [img for img in sorted_images if img.metadata["suggested_position"] == "opening"]
middle_images = [img for img in sorted_images if img.metadata["suggested_position"] == "middle"]
closing_images = [img for img in sorted_images if img.metadata["suggested_position"] == "closing"]
# 품질 점수로 정렬
opening_images.sort(key=lambda x: x.metadata["quality_score"], reverse=True)
closing_images.sort(key=lambda x: x.metadata["quality_score"], reverse=True)
# 최종 순서
ordered = (
opening_images[:2] + # 시작 2장
middle_images + # 중간 이미지들
closing_images[:1] # 마무리 1장
)
return [img.metadata["image_url"] for img in ordered]
4.5 설계 4: 프롬프트 히스토리 RAG
목적: 과거 성공/실패한 프롬프트를 학습하여 프롬프트 품질 지속 개선
# 프롬프트 결과 문서 구조
class PromptResult(BaseModel):
prompt_id: str
prompt_text: str
result_text: str
success: bool
failure_reason: Optional[str]
category: str # "lyric", "marketing_analysis", "region_enrichment"
metrics: dict # {"length": 10, "contains_region_keyword": True, ...}
created_at: datetime
# 프롬프트 히스토리 벡터 스토어
prompt_store = Chroma(
collection_name="prompt_history",
embedding_function=embeddings,
persist_directory="./data/chroma_prompts"
)
async def log_prompt_result(result: PromptResult):
"""프롬프트 결과 기록"""
document = Document(
page_content=f"""
프롬프트: {result.prompt_text}
결과: {result.result_text[:500]}...
성공 여부: {'성공' if result.success else '실패'}
실패 사유: {result.failure_reason or 'N/A'}
""",
metadata={
"prompt_id": result.prompt_id,
"success": result.success,
"category": result.category,
"created_at": result.created_at.isoformat(),
**result.metrics
}
)
prompt_store.add_documents([document])
async def get_improved_prompt(
base_prompt: str,
category: str
) -> str:
"""과거 결과를 기반으로 프롬프트 개선"""
# 유사한 성공 프롬프트 검색
successful_prompts = prompt_store.similarity_search(
base_prompt,
k=3,
filter={"success": True, "category": category}
)
# 유사한 실패 프롬프트 검색 (피해야 할 패턴)
failed_prompts = prompt_store.similarity_search(
base_prompt,
k=2,
filter={"success": False, "category": category}
)
# 프롬프트 개선 요청
improvement_prompt = ChatPromptTemplate.from_messages([
("system", """당신은 프롬프트 엔지니어링 전문가입니다.
다음 성공/실패 사례를 참고하여 주어진 프롬프트를 개선하세요.
### 성공 사례 (참고):
{successful_examples}
### 실패 사례 (피할 것):
{failed_examples}
### 개선 원칙:
1. 성공 사례의 패턴을 따르세요
2. 실패 사례의 패턴을 피하세요
3. 명확하고 구체적인 지시를 포함하세요
4. 출력 형식을 명시하세요"""),
("human", """개선할 프롬프트:
{base_prompt}
위 프롬프트를 개선하세요. 개선된 프롬프트만 출력하세요.""")
])
chain = improvement_prompt | ChatOpenAI() | StrOutputParser()
improved = await chain.ainvoke({
"successful_examples": "\n---\n".join([doc.page_content for doc in successful_prompts]),
"failed_examples": "\n---\n".join([doc.page_content for doc in failed_prompts]),
"base_prompt": base_prompt
})
return improved
5. 통합 아키텍처
5.1 전체 시스템 아키텍처
┌──────────────────────────────────────────────────────────────────────────┐
│ FastAPI 라우터 │
│ /lyric/generate, /song/generate, /video/generate, /video/generate-full │
└───────────────────────────────────┬──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ LangGraph 파이프라인 │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ StateGraph (상태 기계) │ │
│ │ │ │
│ │ [generate_lyric] → [request_song] → [poll_song] │ │
│ │ │ │ │ │
│ │ │ ▼ │ │
│ │ │ [request_video] → [poll_video] → END │ │
│ │ │ │ │ │
│ │ └─────────────────────┴──────────→ [handle_error] │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ LangChain │ │ RAG │ │ External │ │
│ │ Components │ │ Vector DBs │ │ APIs │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└──────────────────────────────────────────────────────────────────────────┘
│ │ │
┌───────────┘ ┌──────────┘ ┌──────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Prompt │ │ Chroma │ │ OpenAI │
│ Templates │ │ Vector │ │ Suno │
│ Chains │ │ Store │ │ Creatomate │
│ Parsers │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└────────────────┴───────────────┘
│
▼
┌─────────────────────────┐
│ MySQL + Azure Blob │
│ (영구 저장소) │
└─────────────────────────┘
5.2 데이터 흐름
┌─────────────────────────────────────────────────────────────────────────┐
│ 데이터 흐름 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 사용자 요청 │
│ ├── 고객 정보 (이름, 지역, 상세정보) │
│ └── 이미지 URL 리스트 │
│ │ │
│ ▼ │
│ 2. RAG 검색 (병렬) │
│ ├── 마케팅 지식베이스 → 유사 성공 사례 │
│ ├── 지역 정보베이스 → 지역 특성 │
│ └── 이미지 메타데이터 → 이미지 분석 │
│ │ │
│ ▼ │
│ 3. LangChain 프롬프트 구성 │
│ ├── 기본 템플릿 로드 │
│ ├── RAG 결과 주입 │
│ ├── Few-shot 예시 추가 │
│ └── 출력 형식 지정 │
│ │ │
│ ▼ │
│ 4. LangGraph 파이프라인 실행 │
│ ├── 가사 생성 (ChatGPT) │
│ ├── 음악 생성 (Suno, 폴링 자동화) │
│ └── 영상 생성 (Creatomate, 폴링 자동화) │
│ │ │
│ ▼ │
│ 5. 결과 저장 │
│ ├── MySQL: 메타데이터, 상태 │
│ ├── Azure Blob: 영상 파일 │
│ └── Chroma: 성공 사례 피드백 │
│ │ │
│ ▼ │
│ 6. 사용자 응답 │
│ └── 영상 URL, 상태, 메타데이터 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
5.3 디렉토리 구조 (신규)
app/
├── langchain/ # LangChain 관련
│ ├── __init__.py
│ ├── prompts/
│ │ ├── __init__.py
│ │ ├── lyric_prompts.py # 가사 생성 프롬프트
│ │ ├── marketing_prompts.py # 마케팅 분석 프롬프트
│ │ └── examples/ # Few-shot 예시
│ │ ├── korean.json
│ │ ├── english.json
│ │ └── ...
│ ├── chains/
│ │ ├── __init__.py
│ │ ├── lyric_chain.py # 가사 생성 체인
│ │ └── marketing_chain.py # 마케팅 분석 체인
│ └── parsers/
│ ├── __init__.py
│ └── lyric_parser.py # 가사 출력 파서
│
├── langgraph/ # LangGraph 관련
│ ├── __init__.py
│ ├── states/
│ │ ├── __init__.py
│ │ └── pipeline_state.py # 파이프라인 상태 정의
│ ├── nodes/
│ │ ├── __init__.py
│ │ ├── lyric_node.py # 가사 생성 노드
│ │ ├── song_node.py # 음악 생성 노드
│ │ ├── video_node.py # 영상 생성 노드
│ │ └── error_node.py # 에러 처리 노드
│ └── graphs/
│ ├── __init__.py
│ └── video_pipeline.py # 메인 파이프라인 그래프
│
├── rag/ # RAG 관련
│ ├── __init__.py
│ ├── stores/
│ │ ├── __init__.py
│ │ ├── marketing_store.py # 마케팅 지식베이스
│ │ ├── region_store.py # 지역 정보베이스
│ │ ├── image_store.py # 이미지 메타데이터
│ │ └── prompt_store.py # 프롬프트 히스토리
│ ├── loaders/
│ │ ├── __init__.py
│ │ └── case_loader.py # 사례 데이터 로더
│ └── retrievers/
│ ├── __init__.py
│ └── hybrid_retriever.py # 하이브리드 검색
│
└── data/ # 데이터 저장소
└── chroma_db/ # Chroma 벡터 DB
├── marketing_knowledge/
├── region_knowledge/
├── image_metadata/
└── prompt_history/
6. 기대 효과
6.1 정량적 기대 효과
| 지표 | 현재 | 목표 | 개선율 |
|---|---|---|---|
| 가사 생성 품질 | 70% 만족도 | 90% 만족도 | +29% |
| 재작업률 | 30% | 10% | -67% |
| 파이프라인 실패율 | 15% | 5% | -67% |
| 평균 처리 시간 | 10분 (수동 개입 필요) | 8분 (완전 자동) | -20% |
| 다국어 품질 | 60% | 85% | +42% |
| 프롬프트 튜닝 시간 | 2시간/버전 | 30분/버전 | -75% |
6.2 정성적 기대 효과
6.2.1 개발 생산성 향상
| 영역 | 효과 |
|---|---|
| 코드 유지보수 | 프롬프트와 비즈니스 로직 분리로 수정 용이 |
| 테스트 용이성 | 각 체인/노드 단위 테스트 가능 |
| 디버깅 | 상태 기계 기반으로 문제 지점 명확히 파악 |
| 확장성 | 새로운 AI 서비스 추가 시 노드만 추가하면 됨 |
6.2.2 품질 향상
| 영역 | 효과 |
|---|---|
| 일관성 | 동일 조건에서 일관된 품질의 결과물 생성 |
| 지역 맞춤화 | RAG로 지역별 특성 자동 반영 |
| 학습 효과 | 성공 사례 축적으로 시간이 지날수록 품질 향상 |
| 에러 복구 | 자동 재시도 및 폴백으로 안정성 강화 |
6.2.3 운영 효율성
| 영역 | 효과 |
|---|---|
| 모니터링 | 파이프라인 상태 추적으로 병목 지점 파악 |
| 비용 최적화 | 불필요한 API 호출 감소, 캐싱 활용 |
| 확장 대응 | 부하 증가 시 노드별 스케일링 가능 |
6.3 비즈니스 가치
┌────────────────────────────────────────────────────────────────┐
│ 비즈니스 가치 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 1. 고객 만족도 향상 │
│ └── 고품질 가사/영상으로 마케팅 효과 증대 │
│ │
│ 2. 서비스 차별화 │
│ └── 지역 맞춤 콘텐츠로 경쟁사 대비 우위 │
│ │
│ 3. 운영 비용 절감 │
│ └── 자동화로 수동 개입 최소화 │
│ │
│ 4. 확장 가능성 │
│ └── 새로운 지역/업종 추가 시 RAG 학습만으로 대응 │
│ │
│ 5. 데이터 자산화 │
│ └── 축적된 성공 사례가 진입 장벽 역할 │
│ │
└────────────────────────────────────────────────────────────────┘
7. 구현 로드맵
7.1 Phase 1: 기초 (1-2주)
목표: LangChain 기본 구조 구축
| 작업 | 설명 | 우선순위 |
|---|---|---|
| 의존성 설치 | langchain, langchain-openai, chromadb | P0 |
| 프롬프트 템플릿 작성 | 가사 생성 프롬프트 이관 | P0 |
| 기본 체인 구현 | ChatGPT 서비스 LangChain으로 래핑 | P0 |
| 출력 파서 구현 | 가사 응답 검증 및 파싱 | P1 |
| 테스트 작성 | 체인 단위 테스트 | P1 |
산출물:
app/langchain/디렉토리 구조- 가사 생성 LangChain 체인
- 기본 테스트 코드
7.2 Phase 2: 파이프라인 (2-3주)
목표: LangGraph 파이프라인 구축
| 작업 | 설명 | 우선순위 |
|---|---|---|
| 상태 정의 | PipelineState TypedDict 작성 | P0 |
| 노드 구현 | 가사, 음악, 영상 생성 노드 | P0 |
| 그래프 구성 | 엣지 및 조건부 분기 정의 | P0 |
| 폴링 통합 | Suno, Creatomate 폴링 자동화 | P0 |
| 에러 처리 | 에러 노드 및 재시도 로직 | P1 |
| FastAPI 통합 | 새 엔드포인트 추가 | P1 |
산출물:
app/langgraph/디렉토리 구조- 통합 파이프라인 그래프
/video/generate-full엔드포인트
7.3 Phase 3: RAG (2-3주)
목표: 지식베이스 구축 및 RAG 통합
| 작업 | 설명 | 우선순위 |
|---|---|---|
| Chroma 설정 | 벡터 스토어 초기화 | P0 |
| 마케팅 사례 수집 | 기존 성공 사례 데이터화 | P0 |
| 지역 정보 구축 | 주요 지역 정보 입력 | P1 |
| 검색 통합 | 가사 생성 시 RAG 적용 | P0 |
| 이미지 분석 | Vision API 연동 | P2 |
| 프롬프트 히스토리 | 자동 학습 파이프라인 | P2 |
산출물:
app/rag/디렉토리 구조- 마케팅/지역 지식베이스
- RAG 통합 가사 생성
7.4 Phase 4: 고도화 (2-3주)
목표: 최적화 및 모니터링
| 작업 | 설명 | 우선순위 |
|---|---|---|
| 성능 최적화 | 캐싱, 병렬 처리 개선 | P1 |
| 모니터링 | 파이프라인 상태 대시보드 | P1 |
| A/B 테스팅 | 프롬프트 버전 비교 | P2 |
| 문서화 | API 문서, 운영 가이드 | P1 |
| 부하 테스트 | 동시 요청 처리 검증 | P2 |
산출물:
- 최적화된 파이프라인
- 모니터링 대시보드
- 완성된 문서
8. 결론
8.1 요약
CastAD 백엔드에 LangChain, LangGraph, RAG를 적용하면:
- LangChain: 프롬프트 관리 체계화, 다단계 체인 구성, 출력 검증 자동화
- LangGraph: 복잡한 파이프라인 상태 관리, 폴링 자동화, 에러 처리 강화
- RAG: 과거 성공 사례 활용, 지역별 맞춤화, 지속적 품질 개선
8.2 핵심 가치
┌───────────────────────────────────────────────────────────┐
│ │
│ 현재: 각 단계가 독립적 → 상태 관리 어려움 │
│ 개선: 통합 파이프라인 → 자동화된 상태 추적 │
│ │
│ 현재: 하드코딩 프롬프트 → 수정 어려움 │
│ 개선: 템플릿 기반 → 유연한 프롬프트 관리 │
│ │
│ 현재: 과거 데이터 미활용 → 일관성 없는 품질 │
│ 개선: RAG 지식베이스 → 축적된 노하우 활용 │
│ │
└───────────────────────────────────────────────────────────┘
8.3 권장 사항
- 단계적 도입: Phase 1(LangChain)부터 시작하여 검증 후 확장
- 기존 API 유지: 새 엔드포인트 추가 방식으로 호환성 보장
- 데이터 축적 우선: RAG 효과를 위해 초기 사례 데이터 확보 중요
- 모니터링 병행: 각 단계별 성과 측정으로 ROI 검증
부록
A. 필요 의존성
# pyproject.toml 추가 의존성
[project.dependencies]
langchain = ">=0.1.0"
langchain-openai = ">=0.0.5"
langchain-community = ">=0.0.20"
langgraph = ">=0.0.30"
chromadb = ">=0.4.22"
tiktoken = ">=0.5.2"
B. 환경 변수 추가
# .env 추가
CHROMA_PERSIST_DIR=./data/chroma_db
LANGCHAIN_TRACING_V2=true # 선택: LangSmith 모니터링
LANGCHAIN_API_KEY=xxx # 선택: LangSmith 모니터링
C. 참고 자료
이 보고서는 CastAD 백엔드 프로젝트 분석을 기반으로 작성되었습니다. 작성일: 2025-12-28