32 KiB
O2O-CASTAD Backend 비동기 아키텍처 및 설계 분석 보고서
문서 버전: 1.0 작성일: 2025-12-29 대상: 개발자, 아키텍트, 코드 리뷰어
목차
- Executive Summary
- 데이터베이스 세션 관리 아키텍처
- 비동기 처리 패턴
- 외부 API 통합 설계
- 백그라운드 태스크 워크플로우
- 쿼리 최적화 전략
- 설계 강점 분석
- 개선 권장 사항
- 아키텍처 다이어그램
- 결론
1. Executive Summary
1.1 프로젝트 개요
O2O-CASTAD Backend는 FastAPI 기반의 비동기 백엔드 서비스로, AI 기반 광고 영상 자동 생성 파이프라인을 제공합니다. 주요 외부 서비스(Creatomate, Suno, ChatGPT, Azure Blob Storage)와의 통합을 통해 가사 생성 → 노래 생성 → 영상 생성의 파이프라인을 구현합니다.
1.2 주요 성과
| 영역 | 개선 전 | 개선 후 | 개선율 |
|---|---|---|---|
| 템플릿 API 호출 | 매번 호출 (1-2s) | 캐시 HIT (0ms) | 100% 감소 |
| HTTP 클라이언트 | 매번 생성 (50ms) | 풀 재사용 (0ms) | 100% 감소 |
| 세션 타임아웃 에러 | 빈번 | 해결 | 안정성 확보 |
1.3 핵심 아키텍처 결정
- 이중 커넥션 풀 아키텍처: 요청/백그라운드 분리
- 명시적 세션 라이프사이클: 외부 API 호출 전 세션 해제
- 모듈 레벨 싱글톤: HTTP 클라이언트 및 템플릿 캐시
- 순차 쿼리 실행: AsyncSession 제약으로 단일 세션 내 병렬 쿼리 불가
2. 데이터베이스 세션 관리 아키텍처
2.1 이중 엔진 구조
┌─────────────────────────────────────────────────────────────┐
│ DATABASE LAYER │
├─────────────────────────────┬───────────────────────────────┤
│ MAIN ENGINE │ BACKGROUND ENGINE │
│ (FastAPI Requests) │ (Worker Tasks) │
├─────────────────────────────┼───────────────────────────────┤
│ pool_size: 20 │ pool_size: 10 │
│ max_overflow: 20 │ max_overflow: 10 │
│ pool_timeout: 30s │ pool_timeout: 60s │
│ Total: 최대 40 연결 │ Total: 최대 20 연결 │
├─────────────────────────────┼───────────────────────────────┤
│ AsyncSessionLocal │ BackgroundSessionLocal │
│ → Router endpoints │ → download_and_upload_* │
│ → Direct API calls │ → generate_lyric_background │
└─────────────────────────────┴───────────────────────────────┘
위치: app/database/session.py
2.2 엔진 설정 상세
# 메인 엔진 (FastAPI 요청용)
engine = create_async_engine(
url=db_settings.MYSQL_URL,
pool_size=20, # 기본 풀 크기
max_overflow=20, # 추가 연결 허용
pool_timeout=30, # 연결 대기 최대 시간
pool_recycle=280, # MySQL wait_timeout 보다 짧게 설정
pool_pre_ping=True, # 연결 유효성 검사 (핵심!)
pool_reset_on_return="rollback", # 반환 시 롤백
)
# 백그라운드 엔진 (워커 태스크용)
background_engine = create_async_engine(
url=db_settings.MYSQL_URL,
pool_size=10, # 더 작은 풀
max_overflow=10,
pool_timeout=60, # 백그라운드는 대기 여유
pool_recycle=280, # MySQL wait_timeout 보다 짧게 설정
pool_pre_ping=True,
pool_reset_on_return="rollback", # 반환 시 롤백
)
2.3 세션 관리 패턴
패턴 1: FastAPI 의존성 주입 (단순 CRUD)
@router.get("/items/{id}")
async def get_item(
id: int,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(select(Item).where(Item.id == id))
return result.scalar_one_or_none()
적용 엔드포인트:
GET /videos/- 목록 조회GET /video/download/{task_id}- 상태 조회
패턴 2: 명시적 세션 관리 (외부 API 호출 포함)
@router.get("/generate/{task_id}")
async def generate_video(task_id: str):
# 1단계: 명시적 세션 열기 → DB 작업 → 세션 닫기
async with AsyncSessionLocal() as session:
# 순차 쿼리 실행 (AsyncSession은 병렬 쿼리 미지원)
project = await session.execute(select(Project).where(...))
lyric = await session.execute(select(Lyric).where(...))
song = await session.execute(select(Song).where(...))
# 초기 데이터 저장
await session.commit()
# 세션 닫힘 (async with 블록 종료)
# 2단계: 외부 API 호출 (세션 없음 - 커넥션 점유 안함)
response = await creatomate_service.make_api_call()
# 3단계: 새 세션으로 업데이트
async with AsyncSessionLocal() as update_session:
video.render_id = response["id"]
await update_session.commit()
적용 엔드포인트:
GET /video/generate/{task_id}- 영상 생성GET /song/generate/{task_id}- 노래 생성
패턴 3: 백그라운드 태스크 세션
async def download_and_upload_video_to_blob(task_id: str, ...):
# 백그라운드 전용 세션 팩토리 사용
async with BackgroundSessionLocal() as session:
result = await session.execute(...)
video.status = "completed"
await session.commit()
적용 함수:
download_and_upload_video_to_blob()download_and_upload_song_to_blob()generate_lyric_background()
2.4 해결된 문제: 세션 타임아웃
문제 상황:
RuntimeError: unable to perform operation on
<TCPTransport closed=True reading=False ...>; the handler is closed
원인:
Depends(get_session)으로 주입된 세션이 요청 전체 동안 유지- 외부 API 호출 (수 초~수 분) 중 TCP 커넥션 타임아웃
- 요청 종료 시점에 이미 닫힌 커넥션 정리 시도
해결책:
# 변경 전: 세션이 요청 전체 동안 유지
async def generate_video(session: AsyncSession = Depends(get_session)):
await session.execute(...) # DB 작업
await creatomate_api() # 외부 API (세션 유지됨 - 문제!)
await session.commit() # 타임아웃 에러 발생 가능
# 변경 후: 명시적 세션 관리
async def generate_video():
async with AsyncSessionLocal() as session:
await session.execute(...)
await session.commit()
# 세션 닫힘
await creatomate_api() # 외부 API (세션 없음 - 안전!)
async with AsyncSessionLocal() as session:
# 업데이트
3. 비동기 처리 패턴
3.1 순차 쿼리 실행 (AsyncSession 제약)
위치: app/video/api/routers/v1/video.py
중요: SQLAlchemy AsyncSession은 단일 세션에서 동시에 여러 쿼리를 실행하는 것을 지원하지 않습니다.
asyncio.gather()로 병렬 쿼리를 실행하면 세션 상태 충돌이 발생합니다.
# 순차 쿼리 실행: Project, Lyric, Song, Image
# Note: AsyncSession은 동일 세션에서 병렬 쿼리를 지원하지 않음
# Project 조회
project_result = await session.execute(
select(Project).where(Project.task_id == task_id)
.order_by(Project.created_at.desc()).limit(1)
)
# Lyric 조회
lyric_result = await session.execute(
select(Lyric).where(Lyric.task_id == task_id)
.order_by(Lyric.created_at.desc()).limit(1)
)
# Song 조회
song_result = await session.execute(
select(Song).where(Song.task_id == task_id)
.order_by(Song.created_at.desc()).limit(1)
)
# Image 조회
image_result = await session.execute(
select(Image).where(Image.task_id == task_id)
.order_by(Image.img_order.asc())
)
AsyncSession 병렬 쿼리 제약 사항:
- 단일 세션 내에서
asyncio.gather()로 여러 쿼리 동시 실행 불가 - 세션 상태 충돌 및 예기치 않은 동작 발생 가능
- 병렬 쿼리가 필요한 경우 별도의 세션을 각각 생성해야 함
3.2 FastAPI BackgroundTasks 활용
@router.post("/generate")
async def generate_lyric(
request_body: GenerateLyricRequest,
background_tasks: BackgroundTasks,
session: AsyncSession = Depends(get_session),
):
# 즉시 응답할 데이터 저장
lyric = Lyric(task_id=task_id, status="processing")
session.add(lyric)
await session.commit()
# 백그라운드 태스크 스케줄링
background_tasks.add_task(
generate_lyric_background,
task_id=task_id,
prompt=prompt,
language=request_body.language,
)
# 즉시 응답 반환
return GenerateLyricResponse(success=True, task_id=task_id)
3.3 비동기 컨텍스트 관리자
# 앱 라이프사이클 관리
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await create_db_tables()
print("Database tables created")
yield # 앱 실행
# Shutdown
await dispose_engine()
print("Database engines disposed")
4. 외부 API 통합 설계
4.1 Creatomate 서비스
위치: app/utils/creatomate.py
HTTP 클라이언트 싱글톤
# 모듈 레벨 공유 클라이언트
_shared_client: httpx.AsyncClient | None = None
async def get_shared_client() -> httpx.AsyncClient:
global _shared_client
if _shared_client is None or _shared_client.is_closed:
_shared_client = httpx.AsyncClient(
timeout=httpx.Timeout(60.0, connect=10.0),
limits=httpx.Limits(
max_keepalive_connections=10,
max_connections=20,
),
)
return _shared_client
장점:
- 커넥션 풀 재사용으로 TCP handshake 오버헤드 제거
- Keep-alive로 연결 유지
- 앱 종료 시
close_shared_client()호출로 정리
템플릿 캐싱
# 모듈 레벨 캐시
_template_cache: dict[str, dict] = {}
CACHE_TTL_SECONDS = 300 # 5분
async def get_one_template_data(self, template_id: str, use_cache: bool = True):
# 캐시 확인
if use_cache and template_id in _template_cache:
cached = _template_cache[template_id]
if _is_cache_valid(cached["cached_at"]):
print(f"[CreatomateService] Cache HIT - {template_id}")
return copy.deepcopy(cached["data"])
# API 호출 및 캐시 저장
data = await self._fetch_from_api(template_id)
_template_cache[template_id] = {
"data": data,
"cached_at": time.time(),
}
return copy.deepcopy(data)
캐싱 전략:
- 첫 번째 요청: API 호출 (1-2초)
- 이후 요청 (5분 내): 캐시 반환 (~0ms)
- TTL 만료 후: 자동 갱신
4.2 Suno 서비스
위치: app/utils/suno.py
class SunoService:
async def generate_music(self, prompt: str, callback_url: str = None):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/generate/music",
json={
"prompt": prompt,
"callback_url": callback_url,
},
timeout=60.0,
)
return response.json()
4.3 ChatGPT 서비스
위치: app/utils/chatgpt_prompt.py
class ChatgptService:
async def generate(self, prompt: str) -> str:
# OpenAI API 호출
response = await self.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
4.4 Azure Blob Storage
위치: app/utils/upload_blob_as_request.py
class AzureBlobUploader:
async def upload_video(self, file_path: str) -> bool:
# 비동기 업로드
async with aiofiles.open(file_path, "rb") as f:
content = await f.read()
# Blob 업로드 로직
return True
5. 백그라운드 태스크 워크플로우
5.1 3단계 워크플로우 패턴
┌─────────────────────────────────────────────────────────────────┐
│ REQUEST PHASE │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ API Request │───▶│ Save Initial │───▶│ Return Response │ │
│ │ │ │ Record │ │ (task_id) │ │
│ └──────────────┘ │ status= │ └──────────────────┘ │
│ │ "processing" │ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ POLLING PHASE │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Client Polls │───▶│ Query │───▶│ Return Status │ │
│ │ /status/id │ │ External API │ │ + Trigger BG │ │
│ └──────────────┘ └──────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ (if status == "succeeded")
▼
┌─────────────────────────────────────────────────────────────────┐
│ BACKGROUND COMPLETION PHASE │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Download │───▶│ Upload to │───▶│ Update DB │ │
│ │ Result File │ │ Azure Blob │ │ status=completed │ │
│ └──────────────┘ └──────────────┘ │ result_url=... │ │
│ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
5.2 영상 생성 플로우
# 1단계: 즉시 응답
@router.get("/video/generate/{task_id}")
async def generate_video(task_id: str):
# DB에서 필요한 데이터 조회 (병렬)
# Video 레코드 생성 (status="processing")
# Creatomate API 호출 → render_id 획득
return {"success": True, "render_id": render_id}
# 2단계: 폴링
@router.get("/video/status/{render_id}")
async def get_video_status(render_id: str, background_tasks: BackgroundTasks):
status = await creatomate_service.get_render_status(render_id)
if status == "succeeded":
# 백그라운드 태스크 트리거
background_tasks.add_task(
download_and_upload_video_to_blob,
task_id=video.task_id,
video_url=status["url"],
)
return {"status": status}
# 3단계: 백그라운드 완료
async def download_and_upload_video_to_blob(task_id: str, video_url: str):
# 임시 파일 다운로드
# Azure Blob 업로드
# DB 업데이트 (BackgroundSessionLocal 사용)
# 임시 파일 삭제
5.3 에러 처리 전략
async def download_and_upload_video_to_blob(task_id: str, video_url: str):
temp_file_path: Path | None = None
try:
# 다운로드 및 업로드 로직
...
async with BackgroundSessionLocal() as session:
video.status = "completed"
await session.commit()
except Exception as e:
print(f"[EXCEPTION] {task_id}: {e}")
# 실패 상태로 업데이트
async with BackgroundSessionLocal() as session:
video.status = "failed"
await session.commit()
finally:
# 임시 파일 정리 (항상 실행)
if temp_file_path and temp_file_path.exists():
temp_file_path.unlink()
# 임시 디렉토리 정리
temp_dir.rmdir()
6. 쿼리 최적화 전략
6.1 N+1 문제 해결
문제 코드:
# 각 video마다 project를 개별 조회 (N+1)
for video in videos:
project = await session.execute(
select(Project).where(Project.id == video.project_id)
)
해결 코드:
# 1. Video 목록 조회
videos = await session.execute(video_query)
video_list = videos.scalars().all()
# 2. Project ID 수집
project_ids = [v.project_id for v in video_list if v.project_id]
# 3. Project 일괄 조회 (IN 절)
projects_result = await session.execute(
select(Project).where(Project.id.in_(project_ids))
)
# 4. 딕셔너리로 매핑
projects_map = {p.id: p for p in projects_result.scalars().all()}
# 5. 조합
for video in video_list:
project = projects_map.get(video.project_id)
위치: app/video/api/routers/v1/video.py - get_videos()
6.2 서브쿼리를 활용한 중복 제거
# task_id별 최신 Video의 id만 추출
subquery = (
select(func.max(Video.id).label("max_id"))
.where(Video.status == "completed")
.group_by(Video.task_id)
.subquery()
)
# 최신 Video만 조회
query = (
select(Video)
.where(Video.id.in_(select(subquery.c.max_id)))
.order_by(Video.created_at.desc())
)
7. 설계 강점 분석
7.1 안정성 (Stability)
| 요소 | 구현 | 효과 |
|---|---|---|
| pool_pre_ping | 쿼리 전 연결 검증 | Stale 커넥션 방지 |
| pool_reset_on_return | 반환 시 롤백 | 트랜잭션 상태 초기화 |
| 이중 커넥션 풀 | 요청/백그라운드 분리 | 리소스 경합 방지 |
| Finally 블록 | 임시 파일 정리 | 리소스 누수 방지 |
7.2 성능 (Performance)
| 요소 | 구현 | 효과 |
|---|---|---|
| 템플릿 캐싱 | TTL 기반 메모리 캐시 | API 호출 100% 감소 |
| HTTP 클라이언트 풀 | 싱글톤 패턴 | 커넥션 재사용 |
| N+1 해결 | IN 절 배치 조회 | 쿼리 수 N→2 감소 |
7.3 확장성 (Scalability)
| 요소 | 구현 | 효과 |
|---|---|---|
| 명시적 세션 관리 | 외부 API 시 세션 해제 | 커넥션 풀 점유 최소화 |
| 백그라운드 태스크 | FastAPI BackgroundTasks | 논블로킹 처리 |
| 폴링 패턴 | Status endpoint | 클라이언트 주도 동기화 |
7.4 유지보수성 (Maintainability)
| 요소 | 구현 | 효과 |
|---|---|---|
| 구조화된 로깅 | [function_name] prefix |
추적 용이 |
| 타입 힌트 | Python 3.11+ 문법 | IDE 지원, 버그 감소 |
| 문서화 | Docstring, 주석 | 코드 이해도 향상 |
8. 개선 권장 사항
8.1 Song 라우터 N+1 문제
현재 상태 (app/song/api/routers/v1/song.py):
# N+1 발생
for song in songs:
project_result = await session.execute(
select(Project).where(Project.id == song.project_id)
)
권장 수정:
# video.py의 패턴 적용
project_ids = [s.project_id for s in songs if s.project_id]
projects_result = await session.execute(
select(Project).where(Project.id.in_(project_ids))
)
projects_map = {p.id: p for p in projects_result.scalars().all()}
8.2 Suno 서비스 HTTP 클라이언트 풀링
현재 상태 (app/utils/suno.py):
# 요청마다 새 클라이언트 생성
async with httpx.AsyncClient() as client:
...
권장 수정:
# creatomate.py 패턴 적용
_suno_client: httpx.AsyncClient | None = None
async def get_suno_client() -> httpx.AsyncClient:
global _suno_client
if _suno_client is None or _suno_client.is_closed:
_suno_client = httpx.AsyncClient(
timeout=httpx.Timeout(60.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10),
)
return _suno_client
8.3 동시성 제한
권장 추가:
# 백그라운드 태스크 동시 실행 수 제한
BACKGROUND_TASK_SEMAPHORE = asyncio.Semaphore(5)
async def download_and_upload_video_to_blob(...):
async with BACKGROUND_TASK_SEMAPHORE:
# 기존 로직
8.4 분산 락 (선택적)
높은 동시성 환경에서 권장:
# Redis 기반 분산 락
async def generate_video_with_lock(task_id: str):
lock_key = f"video_gen:{task_id}"
if not await redis.setnx(lock_key, "1"):
raise HTTPException(409, "Already processing")
try:
await redis.expire(lock_key, 300) # 5분 TTL
# 영상 생성 로직
finally:
await redis.delete(lock_key)
9. 아키텍처 다이어그램
9.1 전체 요청 흐름
┌─────────────────────────────────────────────────────────────────────────┐
│ CLIENT │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ FASTAPI SERVER │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ ROUTERS │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ /video │ │ /song │ │ /lyric │ │ /project │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │
│ │ MAIN ENGINE │ │ BACKGROUND │ │ EXTERNAL SERVICES │ │
│ │ (AsyncSession) │ │ ENGINE │ │ ┌───────────────┐ │ │
│ │ pool_size: 20 │ │ (BackgroundSess)│ │ │ Creatomate │ │ │
│ │ max_overflow: 20 │ │ pool_size: 10 │ │ ├───────────────┤ │ │
│ └─────────────────────┘ └─────────────────┘ │ │ Suno │ │ │
│ │ │ │ ├───────────────┤ │ │
│ ▼ ▼ │ │ ChatGPT │ │ │
│ ┌─────────────────────────────────────────┐ │ ├───────────────┤ │ │
│ │ MySQL DATABASE │ │ │ Azure Blob │ │ │
│ │ ┌────────┐ ┌────────┐ ┌────────────┐ │ │ └───────────────┘ │ │
│ │ │Project │ │ Song │ │ Video │ │ └─────────────────────┘ │
│ │ │ Lyric │ │ Image │ │ │ │ │
│ │ └────────┘ └────────┘ └────────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
9.2 데이터 흐름
[영상 생성 파이프라인]
1. 프로젝트 생성
Client ─▶ POST /project ─▶ DB(Project) ─▶ task_id
2. 이미지 업로드
Client ─▶ POST /project/image ─▶ Azure Blob ─▶ DB(Image)
3. 가사 생성
Client ─▶ POST /lyric/generate ─▶ DB(Lyric) ─▶ BackgroundTask
│
▼
ChatGPT API
│
▼
DB Update
4. 노래 생성
Client ─▶ GET /song/generate/{task_id} ─▶ Suno API ─▶ DB(Song)
Client ◀──── polling ─────────────────────────────────┘
5. 영상 생성
Client ─▶ GET /video/generate/{task_id}
│
├─ 순차 쿼리 ─▶ DB(Project, Lyric, Song, Image)
│
├─ Creatomate API ─▶ render_id
│
└─ DB(Video) status="processing"
Client ─▶ GET /video/status/{render_id}
│
├─ Creatomate Status Check
│
└─ if succeeded ─▶ BackgroundTask
│
├─ Download MP4
├─ Upload to Azure Blob
└─ DB Update status="completed"
6. 결과 조회
Client ─▶ GET /video/download/{task_id} ─▶ result_movie_url
10. 결론
10.1 현재 아키텍처 평가
O2O-CASTAD Backend는 프로덕션 준비 수준의 비동기 아키텍처를 갖추고 있습니다:
- 안정성: 이중 커넥션 풀, pool_pre_ping, 명시적 세션 관리로 런타임 에러 최소화
- 성능: 캐싱, HTTP 클라이언트 풀링으로 응답 시간 최적화
- 확장성: 백그라운드 태스크 분리, 폴링 패턴으로 부하 분산
- 유지보수성: 일관된 패턴, 구조화된 로깅, 타입 힌트
10.2 핵심 성과
┌─────────────────────────────────────────────────────────────┐
│ BEFORE → AFTER │
├─────────────────────────────────────────────────────────────┤
│ Session Timeout Errors │ Frequent → Resolved │
│ Template API Calls │ Every req → Cached (100%↓) │
│ HTTP Client Overhead │ 50ms/req → 0ms (100%↓) │
│ N+1 Query Problem │ N queries → 2 queries │
│ Connection Pool Conflicts │ Frequent → Isolated │
└─────────────────────────────────────────────────────────────┘
10.3 권장 다음 단계
- 단기: Song 라우터 N+1 문제 해결
- 단기: Suno 서비스 HTTP 클라이언트 풀링 적용
- 중기: 동시성 제한 (Semaphore) 추가
- 장기: Redis 캐시 레이어 도입 (템플릿 캐시 영속화)
- 장기: 분산 락 구현 (높은 동시성 환경 대비)
문서 끝 추가 질문이나 개선 제안은 개발팀에 문의하세요.