# 설계안 3: Celery Beat + 상태 머신 스케줄러 > **DB 상태 기반 폴링 스케줄러로 태스크를 디스패치하는 이벤트 소싱 설계** --- ## 목차 1. [개요 및 핵심 차이점](#1-개요-및-핵심-차이점) 2. [아키텍처 설계](#2-아키텍처-설계) 3. [데이터 흐름 상세](#3-데이터-흐름-상세) 4. [큐 및 태스크 동작 상세](#4-큐-및-태스크-동작-상세) 5. [코드 구현](#5-코드-구현) 6. [상태 관리 및 모니터링](#6-상태-관리-및-모니터링) 7. [실패 처리 전략](#7-실패-처리-전략) 8. [설계 및 동작 설명](#8-설계-및-동작-설명) 9. [기존안과의 비교](#9-기존안과의-비교) 10. [배포 및 운영](#10-배포-및-운영) --- ## 1. 개요 및 핵심 차이점 ### 1.1 설계 철학 이 설계안은 **태스크 간 직접 연결을 완전히 제거**합니다. 기존안, 설계안 1, 2는 모두 태스크 완료 시 "다음 태스크를 발행"합니다(방법만 다를 뿐). 이 방식은 **Celery Beat 스케줄러가 주기적으로 DB를 폴링**하여, "다음 단계를 실행할 준비가 된" 레코드를 찾아 태스크를 디스패치합니다. ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 4가지 방식 비교 │ ├─────────────────┬─────────────┬─────────────┬──────────────────────────────┤ │ 기존안 │ 설계안 1 │ 설계안 2 │ 설계안 3 (이 문서) │ │ (명시적 전달) │ (chain) │ (link) │ (Beat 스케줄러) │ ├─────────────────┼─────────────┼─────────────┼──────────────────────────────┤ │ │ │ │ │ │ Task A 완료 → │ Celery가 │ link 콜백이 │ Task A 완료 → DB 상태만 변경 │ │ Task B 직접 │ 자동으로 │ 자동으로 │ Beat가 주기적으로 DB 폴링 │ │ 호출 │ 전달 │ 다음 실행 │ "준비된" 레코드 발견 시 │ │ │ │ │ Task B 디스패치 │ │ │ │ │ │ │ 태스크가 다음 │ 태스크가 │ 태스크가 │ 태스크가 다음 단계를 │ │ 단계를 안다 │ 모른다 │ 모른다 │ 완전히 모른다. │ │ │ │ │ 심지어 "다음 단계가 있다"는 │ │ │ │ │ 것도 모른다. │ └─────────────────┴─────────────┴─────────────┴──────────────────────────────┘ ``` ### 1.2 비유: 공장 라인 vs 주문 시스템 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 비유로 이해하기 │ └─────────────────────────────────────────────────────────────────────────────┘ [기존안/설계안 1,2: 공장 컨베이어 벨트] ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 공정 A → 벨트 → 공정 B → 벨트 → 공정 C 각 공정이 완료되면 벨트가 다음 공정으로 자동 이동. [설계안 3: 식당 주문 시스템] ━━━━━━━━━━━━━━━━━━━━━━━━━━━ 주문 접수 → 주문표에 기록 ↓ 감독(Beat)이 주기적으로 주문표 확인: "가사 생성이 완료된 주문이 있나?" → 있으면 노래 생성 워커에게 전달 "노래 생성이 완료된 주문이 있나?" → 있으면 비디오 생성 워커에게 전달 각 워커는 자신이 맡은 작업만 수행하고, 결과를 주문표(DB)에 기록하고 종료. "다음에 뭘 해야 하는지"는 감독(Beat)이 결정. ``` ### 1.3 핵심 원칙 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 핵심 설계 원칙 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. 완전한 분리: 태스크가 다음 단계의 존재 자체를 모름 │ │ 2. DB가 진실의 원천: 모든 상태 전이는 DB 기록 │ │ 3. Beat 스케줄러: 주기적 폴링으로 다음 단계 디스패치 │ │ 4. 이벤트 소싱: 상태 변경 이력이 자연스럽게 DB에 축적 │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## 2. 아키텍처 설계 ### 2.1 전체 아키텍처 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Beat + 상태 머신 아키텍처 │ └─────────────────────────────────────────────────────────────────────────────┘ ┌─────────────┐ │ Client │ └──────┬──────┘ │ ▼ ┌────────────────────┐ │ FastAPI │ │ │ │ Pipeline 레코드 │ │ 생성 (pending) │ └─────────┬──────────┘ │ ▼ ┌────────────────────┐ │ MySQL │ │ │ │ Pipeline 테이블: │ │ status = "pending" │◄────────────────────┐ └─────────┬──────────┘ │ │ │ │ 주기적 │ 상태 업데이트 │ 폴링 (10초) │ │ │ ┌─────────▼──────────┐ │ │ Celery Beat │ │ │ (스케줄러) │ │ │ │ │ │ "다음 단계 준비된 │ │ │ 레코드가 있나?" │ │ └─────────┬──────────┘ │ │ │ ┌────────────────┼────────────────┐ │ │ │ │ │ ▼ ▼ ▼ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │ lyric_queue │ │ song_queue │ │ video_queue │ │ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ │ │ │ │ │ ▼ ▼ ▼ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │ Lyric Worker │ │ Song Worker │ │ Video Worker │ │ │ │ │ │ │ │ │ │ 자신의 작업만 │ │ 자신의 작업만 │ │ 자신의 작업만 │ │ │ 수행 후 │ │ 수행 후 │ │ 수행 후 │ │ │ DB 상태 변경 │─┤ DB 상태 변경 │─┤ DB 상태 변경 │─────┘ └───────────────┘ └───────────────┘ └───────────────┘ ``` ### 2.2 Pipeline 모델 (상태 머신) ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Pipeline 상태 머신 │ └─────────────────────────────────────────────────────────────────────────────┘ Pipeline 테이블 (새로 추가): ━━━━━━━━━━━━━━━━━━━━━━━━━━ id │ task_id │ stage │ status │ retry_count │ dispatched_at ────┼─────────────┼─────────┼────────────┼─────────────┼────────────── 1 │ 0192abc-... │ lyric │ pending │ 0 │ NULL 2 │ 0193def-... │ song │ processing │ 0 │ 2024-01-01 3 │ 0194ghi-... │ video │ completed │ 0 │ 2024-01-01 4 │ 0195jkl-... │ lyric │ failed │ 3 │ 2024-01-01 상태 전이 규칙: ━━━━━━━━━━━━━━ pending ──── Beat가 감지 ────→ dispatched dispatched ── Worker가 시작 ──→ processing processing ── Worker가 완료 ──→ stage_completed stage_completed ── Beat가 감지 ──→ 다음 stage의 pending 또는 pipeline_completed (video 완료 시) processing ── Worker가 실패 ──→ failed failed ── Beat가 재시도 판단 ──→ pending (retry_count < max) 또는 dead ``` ### 2.3 상태 전이 다이어그램 ```mermaid stateDiagram-v2 [*] --> lyric_pending: API 요청 state "Lyric Phase" as LP { lyric_pending --> lyric_dispatched: Beat 감지 lyric_dispatched --> lyric_processing: Worker 시작 lyric_processing --> lyric_completed: Worker 완료 lyric_processing --> lyric_failed: Worker 실패 lyric_failed --> lyric_pending: Beat 재시도 (count < 3) lyric_failed --> dead: Beat 포기 (count >= 3) } state "Song Phase" as SP { lyric_completed --> song_pending: Beat가 다음 단계 생성 song_pending --> song_dispatched: Beat 감지 song_dispatched --> song_processing: Worker 시작 song_processing --> song_completed: Worker 완료 song_processing --> song_failed: Worker 실패 song_failed --> song_pending: Beat 재시도 } state "Video Phase" as VP { song_completed --> video_pending: Beat가 다음 단계 생성 video_pending --> video_dispatched: Beat 감지 video_dispatched --> video_processing: Worker 시작 video_processing --> video_completed: Worker 완료 video_processing --> video_failed: Worker 실패 video_failed --> video_pending: Beat 재시도 } video_completed --> [*]: 파이프라인 완료 dead --> [*]: DLQ로 이동 ``` --- ## 3. 데이터 흐름 상세 ### 3.1 전체 시퀀스 ```mermaid sequenceDiagram participant C as Client participant API as FastAPI participant DB as MySQL participant Beat as Celery Beat participant LQ as lyric_queue participant LW as Lyric Worker participant SQ as song_queue participant SW as Song Worker C->>API: POST /pipeline/start API->>DB: Pipeline(stage=lyric, status=pending) API-->>C: {"task_id": "xxx"} Note over Beat,DB: Beat는 10초마다 DB 폴링 loop 매 10초 Beat->>DB: SELECT * FROM pipeline WHERE status='pending' end Beat->>DB: Pipeline 발견! status='dispatched'로 변경 Beat->>LQ: lyric_task.apply_async(pipeline_id) LQ->>LW: 태스크 수신 LW->>DB: status = 'processing' LW->>LW: ChatGPT 호출 LW->>DB: status = 'lyric_completed' Note over Beat,DB: Beat가 다음 폴링 사이클에서 감지 Beat->>DB: lyric_completed 발견! Beat->>DB: 새 Pipeline(stage=song, status=pending) 생성 Beat->>SQ: song_task.apply_async(pipeline_id) Note over SW: 동일한 패턴으로 song → video 진행 ``` ### 3.2 Beat 스케줄러의 폴링 사이클 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Beat 스케줄러 폴링 사이클 │ └─────────────────────────────────────────────────────────────────────────────┘ 매 10초마다 Beat가 실행하는 로직: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ [Step 1] pending 상태 레코드 검색 ────────────────────────────────── SELECT * FROM pipeline WHERE status = 'pending' AND dispatched_at IS NULL ORDER BY created_at ASC LIMIT 10; → 찾으면: status='dispatched', dispatched_at=NOW() 로 업데이트 → 해당 큐에 태스크 발행 [Step 2] completed 상태 레코드 검색 (다음 단계 생성) ──────────────────────────────────────────────────── SELECT * FROM pipeline WHERE status IN ('lyric_completed', 'song_completed') AND next_stage_created = FALSE ORDER BY updated_at ASC LIMIT 10; → lyric_completed → 새 Pipeline(stage=song, status=pending) 생성 → song_completed → 새 Pipeline(stage=video, status=pending) 생성 → next_stage_created = TRUE 로 업데이트 [Step 3] 실패 레코드 재시도 검색 ────────────────────────────────── SELECT * FROM pipeline WHERE status = 'failed' AND retry_count < max_retries AND last_failed_at < NOW() - INTERVAL retry_delay SECOND ORDER BY last_failed_at ASC LIMIT 5; → retry_count += 1, status='pending' 으로 변경 [Step 4] 타임아웃 레코드 검색 (stuck 감지) ────────────────────────────────────────── SELECT * FROM pipeline WHERE status = 'dispatched' AND dispatched_at < NOW() - INTERVAL 15 MINUTE; → 15분 이상 dispatched 상태 → 워커가 죽었을 수 있음 → status='pending'으로 되돌림 (재디스패치) ``` --- ## 4. 큐 및 태스크 동작 상세 ### 4.1 태스크의 완전한 독립성 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 태스크의 완전한 독립성 │ └─────────────────────────────────────────────────────────────────────────────┘ 다른 모든 설계안에서는 태스크가 "반환값을 통해" 다음 단계와 연결됩니다. 이 설계안에서는 태스크가 DB 상태만 변경하고 종료합니다. [다른 설계안의 태스크] def generate_lyric(data): # 작업 수행 result = do_work() return result ← 이 반환값이 다음 태스크의 입력이 됨 [이 설계안의 태스크] def generate_lyric(pipeline_id): # 작업 수행 do_work() # DB 상태만 변경 pipeline.status = 'lyric_completed' db.commit() # return 없음! 아무것도 반환하지 않음 → 태스크가 "다음에 뭘 해야 하는지" 전혀 모름 → 태스크가 "결과를 누가 사용하는지" 전혀 모름 → Beat 스케줄러가 DB를 보고 다음 단계를 결정 ``` ### 4.2 각 단계의 작업 범위 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 각 태스크의 작업 범위 │ └─────────────────────────────────────────────────────────────────────────────┘ Lyric Worker: ┌────────────────────────────────────┐ │ 1. pipeline_id로 Pipeline 조회 │ │ 2. Pipeline.status = 'processing' │ │ 3. ChatGPT API 호출 │ │ 4. Lyric 레코드에 결과 저장 │ │ 5. Pipeline.status = 'lyric_completed' │ │ 6. 끝. 다음 단계? 모름. │ └────────────────────────────────────┘ Song Worker: ┌────────────────────────────────────┐ │ 1. pipeline_id로 Pipeline 조회 │ │ 2. Pipeline.status = 'processing' │ │ 3. DB에서 Lyric 결과 조회 │ │ 4. Suno API 호출 + 폴링 │ │ 5. Song 레코드에 결과 저장 │ │ 6. Pipeline.status = 'song_completed' │ │ 7. 끝. 다음 단계? 모름. │ └────────────────────────────────────┘ Video Worker: ┌────────────────────────────────────┐ │ 1. pipeline_id로 Pipeline 조회 │ │ 2. Pipeline.status = 'processing' │ │ 3. DB에서 Song, Image 등 조회 │ │ 4. Creatomate API 호출 + 폴링 │ │ 5. Video 레코드에 결과 저장 │ │ 6. Pipeline.status = 'video_completed' │ │ 7. 끝. 파이프라인 완료? 모름. │ └────────────────────────────────────┘ Beat Scheduler: ┌────────────────────────────────────────────────────────────┐ │ 매 10초마다: │ │ 1. pending 레코드 → 해당 큐에 디스패치 │ │ 2. lyric_completed → song stage 생성 (pending) │ │ 3. song_completed → video stage 생성 (pending) │ │ 4. video_completed → 파이프라인 완료 마킹 │ │ 5. failed + 재시도 가능 → pending으로 변경 │ │ 6. stuck 감지 → 재디스패치 │ └────────────────────────────────────────────────────────────┘ ``` --- ## 5. 코드 구현 ### 5.1 Pipeline 모델 (새로 추가) ```python # app/pipeline/models.py """ Pipeline 상태 머신 모델 파이프라인의 각 단계를 DB 레코드로 관리합니다. Beat 스케줄러가 이 테이블을 폴링하여 다음 단계를 결정합니다. """ from sqlalchemy import ( Column, Integer, String, DateTime, Boolean, ForeignKey, Enum as SQLEnum, func ) from sqlalchemy.orm import relationship from app.database.session import Base import enum class PipelineStage(str, enum.Enum): """파이프라인 단계""" LYRIC = "lyric" SONG = "song" VIDEO = "video" class PipelineStatus(str, enum.Enum): """파이프라인 상태""" PENDING = "pending" # 대기 중 (Beat가 디스패치 대기) DISPATCHED = "dispatched" # 큐에 발행됨 (워커 수신 대기) PROCESSING = "processing" # 워커가 처리 중 STAGE_COMPLETED = "stage_completed" # 현재 단계 완료 PIPELINE_COMPLETED = "pipeline_completed" # 전체 파이프라인 완료 FAILED = "failed" # 실패 DEAD = "dead" # 최대 재시도 초과, DLQ class Pipeline(Base): """ 파이프라인 상태 추적 테이블 각 행이 파이프라인의 한 "단계"를 나타냅니다. task_id가 동일한 여러 행이 존재할 수 있습니다 (lyric, song, video). """ __tablename__ = 'pipelines' id = Column(Integer, primary_key=True, autoincrement=True) task_id = Column(String(255), index=True, nullable=False) # 현재 단계 (lyric / song / video) stage = Column(SQLEnum(PipelineStage), nullable=False) # 상태 status = Column(SQLEnum(PipelineStatus), default=PipelineStatus.PENDING) # 재시도 관리 retry_count = Column(Integer, default=0) max_retries = Column(Integer, default=3) # Celery 태스크 ID (디스패치된 태스크 추적용) celery_task_id = Column(String(255), nullable=True) # 다음 단계 생성 여부 (중복 생성 방지) next_stage_created = Column(Boolean, default=False) # 타임스탬프 created_at = Column(DateTime, server_default=func.now()) updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now()) dispatched_at = Column(DateTime, nullable=True) completed_at = Column(DateTime, nullable=True) last_failed_at = Column(DateTime, nullable=True) # 에러 정보 error_message = Column(String(2000), nullable=True) # 파이프라인 설정 (최초 요청 데이터) config_json = Column(String(4000), nullable=True) # JSON 문자열 def __repr__(self): return f"" ``` ### 5.2 Beat 스케줄러 (디스패처) ```python # app/tasks/pipeline_dispatcher.py """ 파이프라인 디스패처 (Beat 스케줄러에서 주기적으로 호출) 이 모듈이 이 설계안의 핵심입니다. Beat가 10초마다 이 함수를 호출하고, DB에서 "처리 준비된" 레코드를 찾아 적절한 큐에 디스패치합니다. 태스크 간 연결 로직이 모두 여기에 집중됩니다. 개별 태스크는 자신의 작업만 수행하고 DB 상태만 변경합니다. """ from celery import Celery from sqlalchemy import select, and_ from datetime import datetime, timedelta import asyncio import json import logging from app.celery_app import celery_app from app.tasks.base import BaseTaskWithDB from app.pipeline.models import Pipeline, PipelineStage, PipelineStatus logger = logging.getLogger(__name__) # ============================================================================ # 스테이지별 다음 단계 매핑 # ============================================================================ NEXT_STAGE_MAP = { PipelineStage.LYRIC: PipelineStage.SONG, PipelineStage.SONG: PipelineStage.VIDEO, PipelineStage.VIDEO: None, # 마지막 단계 } # 스테이지별 큐 매핑 STAGE_QUEUE_MAP = { PipelineStage.LYRIC: 'lyric_queue', PipelineStage.SONG: 'song_queue', PipelineStage.VIDEO: 'video_queue', } # 스테이지별 태스크 매핑 STAGE_TASK_MAP = { PipelineStage.LYRIC: 'app.tasks.lyric_tasks.generate_lyric', PipelineStage.SONG: 'app.tasks.song_tasks.generate_song', PipelineStage.VIDEO: 'app.tasks.video_tasks.generate_video', } # 재시도 간격 (초) RETRY_DELAYS = { PipelineStage.LYRIC: 30, PipelineStage.SONG: 60, PipelineStage.VIDEO: 120, } # stuck 감지 타임아웃 (분) STUCK_TIMEOUT_MINUTES = 15 @celery_app.task( name='app.tasks.pipeline_dispatcher.dispatch_pipelines', ignore_result=True, # 스케줄러이므로 결과 저장 불필요 ) def dispatch_pipelines(): """ 파이프라인 디스패처 (Beat에서 10초마다 호출) 4가지 동작을 순서대로 수행합니다: 1. pending 레코드 → 해당 큐에 디스패치 2. stage_completed 레코드 → 다음 stage 생성 3. failed 레코드 → 재시도 판단 4. stuck 레코드 → 재디스패치 """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(_dispatch_all()) finally: loop.close() async def _dispatch_all(): """비동기 디스패처 메인 로직""" from app.database.session import BackgroundSessionLocal async with BackgroundSessionLocal() as session: # ──────────────────────────────────────────────── # Step 1: pending 레코드 디스패치 # ──────────────────────────────────────────────── # "아직 큐에 발행되지 않은 대기 중인 레코드"를 찾아 # 적절한 큐에 태스크를 발행합니다. pending_pipelines = await session.scalars( select(Pipeline) .where(Pipeline.status == PipelineStatus.PENDING) .where(Pipeline.dispatched_at.is_(None)) .order_by(Pipeline.created_at.asc()) .limit(10) # 한 사이클에 최대 10개 ) for pipeline in pending_pipelines: task_name = STAGE_TASK_MAP[pipeline.stage] queue_name = STAGE_QUEUE_MAP[pipeline.stage] # Celery 태스크 발행 result = celery_app.send_task( task_name, kwargs={'pipeline_id': pipeline.id}, queue=queue_name, ) # 상태 업데이트 pipeline.status = PipelineStatus.DISPATCHED pipeline.dispatched_at = datetime.utcnow() pipeline.celery_task_id = result.id logger.info( f"[Dispatch] pipeline_id={pipeline.id}, " f"stage={pipeline.stage}, queue={queue_name}" ) # ──────────────────────────────────────────────── # Step 2: stage_completed → 다음 stage 생성 # ──────────────────────────────────────────────── # "현재 단계가 완료되었고, 다음 단계가 아직 생성되지 않은" # 레코드를 찾아 다음 단계의 Pipeline 레코드를 생성합니다. completed_pipelines = await session.scalars( select(Pipeline) .where(Pipeline.status == PipelineStatus.STAGE_COMPLETED) .where(Pipeline.next_stage_created == False) .order_by(Pipeline.updated_at.asc()) .limit(10) ) for pipeline in completed_pipelines: next_stage = NEXT_STAGE_MAP.get(pipeline.stage) if next_stage is None: # 마지막 단계 (video) → 파이프라인 완료 pipeline.status = PipelineStatus.PIPELINE_COMPLETED pipeline.next_stage_created = True logger.info( f"[Complete] task_id={pipeline.task_id} 파이프라인 완료" ) else: # 다음 단계 레코드 생성 next_pipeline = Pipeline( task_id=pipeline.task_id, stage=next_stage, status=PipelineStatus.PENDING, config_json=pipeline.config_json, # 설정 전파 ) session.add(next_pipeline) pipeline.next_stage_created = True logger.info( f"[NextStage] task_id={pipeline.task_id}, " f"{pipeline.stage} → {next_stage}" ) # ──────────────────────────────────────────────── # Step 3: failed 레코드 재시도 # ──────────────────────────────────────────────── # "실패했지만 재시도 횟수가 남아있는" 레코드를 찾아 # 적절한 대기 후 pending으로 변경합니다. failed_pipelines = await session.scalars( select(Pipeline) .where(Pipeline.status == PipelineStatus.FAILED) .where(Pipeline.retry_count < Pipeline.max_retries) .order_by(Pipeline.last_failed_at.asc()) .limit(5) ) for pipeline in failed_pipelines: # 재시도 간격 확인 retry_delay = RETRY_DELAYS.get(pipeline.stage, 60) min_retry_time = datetime.utcnow() - timedelta(seconds=retry_delay) if pipeline.last_failed_at and pipeline.last_failed_at > min_retry_time: continue # 아직 대기 시간이 안 됨 pipeline.status = PipelineStatus.PENDING pipeline.dispatched_at = None # 재디스패치 허용 pipeline.retry_count += 1 logger.info( f"[Retry] pipeline_id={pipeline.id}, " f"retry #{pipeline.retry_count}/{pipeline.max_retries}" ) # ──────────────────────────────────────────────── # Step 4: stuck 레코드 감지 (타임아웃) # ──────────────────────────────────────────────── # "dispatched 상태에서 오래 머물러 있는" 레코드를 찾아 # 재디스패치합니다 (워커가 죽었을 가능성). stuck_threshold = datetime.utcnow() - timedelta(minutes=STUCK_TIMEOUT_MINUTES) stuck_pipelines = await session.scalars( select(Pipeline) .where(Pipeline.status == PipelineStatus.DISPATCHED) .where(Pipeline.dispatched_at < stuck_threshold) .limit(5) ) for pipeline in stuck_pipelines: pipeline.status = PipelineStatus.PENDING pipeline.dispatched_at = None logger.warning( f"[Stuck] pipeline_id={pipeline.id}, " f"dispatched_at={pipeline.dispatched_at}, 재디스패치" ) # 모든 변경사항 커밋 await session.commit() ``` ### 5.3 Celery Beat 설정 ```python # app/celery_app.py (Beat 스케줄 추가) """ Beat 스케줄 설정 dispatch_pipelines 태스크를 10초마다 실행합니다. """ from celery.schedules import crontab from datetime import timedelta celery_app.conf.beat_schedule = { # ──────────────────────────────────────────────── # 핵심: 10초마다 파이프라인 디스패처 실행 # ──────────────────────────────────────────────── 'dispatch-pipelines-every-10s': { 'task': 'app.tasks.pipeline_dispatcher.dispatch_pipelines', 'schedule': timedelta(seconds=10), 'options': { 'queue': 'scheduler_queue', # 별도 큐 (워커와 분리) 'expires': 9, # 다음 사이클 전에 만료 (중복 실행 방지) }, }, # 선택적: 1시간마다 DLQ 정리 'cleanup-dead-pipelines-hourly': { 'task': 'app.tasks.pipeline_dispatcher.cleanup_dead_pipelines', 'schedule': crontab(minute=0), # 매 정시 'options': {'queue': 'scheduler_queue'}, }, } # 스케줄러 전용 큐 추가 from kombu import Queue, Exchange celery_app.conf.task_queues += ( Queue('scheduler_queue', Exchange('scheduler', type='direct')), ) ``` ### 5.4 태스크 구현 (단순화) ```python # app/tasks/lyric_tasks.py """ 가사 생성 태스크 (Beat + 상태 머신 방식) 핵심 차이점: - pipeline_id를 받아 DB에서 모든 데이터를 조회합니다. - 결과를 반환하지 않습니다 (return 없음). - DB 상태만 변경하고 종료합니다. - "다음 단계가 있다"는 것조차 모릅니다. """ from sqlalchemy import select import asyncio import json import logging from datetime import datetime from app.celery_app import celery_app from app.tasks.base import BaseTaskWithDB from app.pipeline.models import Pipeline, PipelineStatus from app.home.models import Project from app.lyric.models import Lyric from app.utils.chatgpt_prompt import ChatgptService from app.utils.prompts.prompts import Prompt logger = logging.getLogger(__name__) @celery_app.task( base=BaseTaskWithDB, bind=True, name='app.tasks.lyric_tasks.generate_lyric', queue='lyric_queue', acks_late=True, # ──────────────────────────────────────────────── # 재시도는 Beat가 관리하므로, Celery 자체 재시도 비활성화 # ──────────────────────────────────────────────── max_retries=0, # Celery 재시도 없음 (Beat가 관리) ) def generate_lyric(self, pipeline_id: int): """ 가사 생성 태스크 Args: pipeline_id: Pipeline 테이블의 PK Returns: None - 결과를 반환하지 않음. DB 상태만 변경. 독립성: - 이 태스크는 Pipeline 모델과 Lyric 모델만 안다 - Song, Video의 존재를 모른다 - "다음 단계"가 있다는 것도 모른다 - Beat 스케줄러가 DB 상태를 보고 다음 단계를 결정한다 """ async def _generate(): # ──────────────────────────────────────────────── # 1단계: Pipeline 레코드 조회 및 상태 변경 # ──────────────────────────────────────────────── async with self.get_db_session() as session: pipeline = await session.get(Pipeline, pipeline_id) if not pipeline: raise ValueError(f"Pipeline not found: id={pipeline_id}") # 상태를 processing으로 변경 pipeline.status = PipelineStatus.PROCESSING await session.commit() # 설정 데이터 파싱 config = json.loads(pipeline.config_json) if pipeline.config_json else {} task_id = pipeline.task_id # ──────────────────────────────────────────────── # 2단계: 가사 생성 (기존 비즈니스 로직과 동일) # ──────────────────────────────────────────────── async with self.get_db_session() as session: project = await session.scalar( select(Project).where(Project.task_id == task_id) ) if not project: project = Project( task_id=task_id, customer_name=config.get('customer_name', ''), region=config.get('region', ''), ) session.add(project) await session.flush() prompt = Prompt( customer_name=config.get('customer_name', ''), region=config.get('region', ''), detail_region_info=config.get('detail_region_info', ''), language=config.get('language', 'Korean'), ) lyric_prompt = prompt.get_full_prompt() lyric = Lyric( project_id=project.id, task_id=task_id, status='processing', lyric_prompt=lyric_prompt, language=config.get('language', 'Korean'), ) session.add(lyric) await session.commit() lyric_id = lyric.id # ChatGPT 호출 (DB 세션 외부) try: chatgpt = ChatgptService() lyric_result = await chatgpt.generate_lyric(lyric_prompt) if not lyric_result or len(lyric_result.strip()) < 50: raise ValueError("가사가 너무 짧습니다.") except Exception as e: # 실패: DB 상태를 failed로 변경 async with self.get_db_session() as session: lyric = await session.get(Lyric, lyric_id) lyric.status = 'failed' lyric.lyric_result = f"Error: {str(e)}" pipeline = await session.get(Pipeline, pipeline_id) pipeline.status = PipelineStatus.FAILED pipeline.error_message = str(e) pipeline.last_failed_at = datetime.utcnow() await session.commit() # 여기서 raise하면 Celery가 FAILURE로 기록 # Beat가 나중에 재시도를 판단함 raise # ──────────────────────────────────────────────── # 3단계: 결과 저장 + Pipeline 상태 완료 # ──────────────────────────────────────────────── async with self.get_db_session() as session: lyric = await session.get(Lyric, lyric_id) lyric.status = 'completed' lyric.lyric_result = lyric_result pipeline = await session.get(Pipeline, pipeline_id) pipeline.status = PipelineStatus.STAGE_COMPLETED pipeline.completed_at = datetime.utcnow() await session.commit() # ──────────────────────────────────────────────── # 끝! 다음 단계는 Beat가 처리합니다. # 이 태스크는 아무것도 반환하지 않습니다. # ──────────────────────────────────────────────── logger.info( f"[Lyric] pipeline_id={pipeline_id} 완료. " f"Beat가 다음 단계를 자동으로 감지합니다." ) self.run_async(_generate()) # return 없음 ``` ### 5.5 Song/Video 태스크 (동일 패턴) Song, Video 태스크도 동일한 패턴을 따릅니다: 1. `pipeline_id`를 받아 DB에서 데이터 조회 2. 비즈니스 로직 수행 3. DB 상태만 변경 (`STAGE_COMPLETED` 또는 `FAILED`) 4. 아무것도 반환하지 않음 ### 5.6 파이프라인 API ```python # app/api/routers/v1/pipeline.py """ 파이프라인 API (Beat + 상태 머신 방식) 단순히 Pipeline 레코드를 생성하면 됩니다. Beat가 자동으로 감지하여 처리를 시작합니다. """ from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy import select, and_ import json from app.dependencies.auth import get_current_user from app.database.session import AsyncSessionLocal from app.pipeline.models import Pipeline, PipelineStage, PipelineStatus router = APIRouter(prefix="/pipeline", tags=["Pipeline"]) class StartPipelineRequest(BaseModel): task_id: str customer_name: str region: str detail_region_info: str language: str = "Korean" orientation: str = "vertical" genre: str = "pop, ambient" @router.post("/start") async def start_pipeline( request: StartPipelineRequest, current_user=Depends(get_current_user) ): """ 파이프라인 시작 Pipeline 레코드를 생성하기만 합니다. Beat 스케줄러가 자동으로 감지하여 첫 번째 단계(lyric)를 디스패치합니다. 이 방식의 가장 큰 장점: - API가 Celery 태스크를 직접 호출하지 않음 - DB에 레코드 생성만으로 파이프라인 시작 - Beat가 자동으로 처리를 시작 """ async with AsyncSessionLocal() as session: # 파이프라인 설정을 JSON으로 저장 config = { "customer_name": request.customer_name, "region": request.region, "detail_region_info": request.detail_region_info, "language": request.language, "orientation": request.orientation, "genre": request.genre, } # Pipeline 레코드 생성 (첫 번째 단계: lyric) pipeline = Pipeline( task_id=request.task_id, stage=PipelineStage.LYRIC, status=PipelineStatus.PENDING, config_json=json.dumps(config), ) session.add(pipeline) await session.commit() return { 'success': True, 'task_id': request.task_id, 'message': '파이프라인이 생성되었습니다. Beat가 곧 처리를 시작합니다.', } @router.get("/status/{task_id}") async def get_pipeline_status( task_id: str, current_user=Depends(get_current_user) ): """ 파이프라인 상태 조회 DB의 Pipeline 레코드를 조회하여 전체 상태를 반환합니다. Redis가 아닌 DB가 진실의 원천(source of truth)입니다. """ async with AsyncSessionLocal() as session: pipelines = await session.scalars( select(Pipeline) .where(Pipeline.task_id == task_id) .order_by(Pipeline.created_at.asc()) ) pipeline_list = list(pipelines) if not pipeline_list: raise HTTPException(404, "Pipeline not found") stages = {} overall_status = 'processing' for p in pipeline_list: stages[p.stage.value] = { 'status': p.status.value, 'retry_count': p.retry_count, 'error': p.error_message, 'completed_at': str(p.completed_at) if p.completed_at else None, } if p.status == PipelineStatus.PIPELINE_COMPLETED: overall_status = 'completed' elif p.status == PipelineStatus.DEAD: overall_status = 'dead' elif p.status == PipelineStatus.FAILED: overall_status = 'failed' return { 'task_id': task_id, 'overall_status': overall_status, 'stages': stages, 'total_stages': len(pipeline_list), } @router.post("/retry/{task_id}") async def retry_pipeline( task_id: str, current_user=Depends(get_current_user) ): """ 실패한 파이프라인 재시도 가장 최근 실패한 단계를 pending으로 변경합니다. Beat가 자동으로 재디스패치합니다. """ async with AsyncSessionLocal() as session: failed = await session.scalar( select(Pipeline) .where(Pipeline.task_id == task_id) .where(Pipeline.status.in_([PipelineStatus.FAILED, PipelineStatus.DEAD])) .order_by(Pipeline.created_at.desc()) ) if not failed: raise HTTPException(404, "No failed pipeline found") # pending으로 변경 → Beat가 자동으로 디스패치 failed.status = PipelineStatus.PENDING failed.dispatched_at = None failed.retry_count = 0 # 리셋 await session.commit() return { 'success': True, 'task_id': task_id, 'stage': failed.stage.value, 'message': 'Beat가 곧 재시도합니다.', } ``` --- ## 6. 상태 관리 및 모니터링 ### 6.1 DB가 진실의 원천 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 상태 관리: DB가 진실의 원천 │ └─────────────────────────────────────────────────────────────────────────────┘ [기존안/설계안 1,2] ━━━━━━━━━━━━━━━━━ 상태 저장소: MySQL + Redis (이중 관리) MySQL: 영구 보존, 감사 로그 Redis: 실시간 조회, Celery 상태 문제: • MySQL과 Redis 상태가 불일치할 수 있음 • Redis 장애 시 상태 추적 불가 • 상태 동기화 로직 필요 [이 설계안] ━━━━━━━━━━ 상태 저장소: MySQL만 (단일 원천) Pipeline 테이블이 모든 상태를 관리 Redis는 Celery 브로커로만 사용 장점: ✓ 상태 불일치 없음 ✓ 트랜잭션 보장 (ACID) ✓ 상태 이력이 자연스럽게 DB에 축적 ✓ SQL로 복잡한 상태 조회 가능 ✓ Redis 장애가 상태 추적에 영향 없음 단점: ✗ 실시간 폴링에 DB 부하 ✗ Beat 폴링 간격(10초)만큼 지연 ``` ### 6.2 모니터링 쿼리 ```sql -- 현재 진행 중인 파이프라인 수 SELECT COUNT(DISTINCT task_id) FROM pipelines WHERE status NOT IN ('pipeline_completed', 'dead'); -- 단계별 대기 중인 태스크 수 SELECT stage, COUNT(*) as count FROM pipelines WHERE status = 'pending' GROUP BY stage; -- 실패율 (최근 1시간) SELECT stage, COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed, COUNT(CASE WHEN status = 'stage_completed' THEN 1 END) as completed, ROUND( COUNT(CASE WHEN status = 'failed' THEN 1 END) * 100.0 / NULLIF(COUNT(*), 0), 2 ) as failure_rate FROM pipelines WHERE updated_at > NOW() - INTERVAL 1 HOUR GROUP BY stage; -- 평균 처리 시간 SELECT stage, AVG(TIMESTAMPDIFF(SECOND, dispatched_at, completed_at)) as avg_seconds FROM pipelines WHERE status = 'stage_completed' AND completed_at IS NOT NULL GROUP BY stage; -- stuck 레코드 (15분 이상 dispatched 상태) SELECT * FROM pipelines WHERE status = 'dispatched' AND dispatched_at < NOW() - INTERVAL 15 MINUTE; ``` --- ## 7. 실패 처리 전략 ### 7.1 Beat 기반 재시도의 장점 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Beat 기반 재시도 vs Celery 내장 재시도 │ └─────────────────────────────────────────────────────────────────────────────┘ [Celery 내장 재시도] (기존안, 설계안 1,2) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ • 워커 프로세스 내에서 재시도 • 워커가 죽으면 재시도 정보도 소실 • 재시도 횟수/간격이 코드에 하드코딩 • 재시도 중인 태스크를 외부에서 제어 불가 [Beat 기반 재시도] (이 설계안) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ • DB에 실패 상태 기록 → Beat가 폴링하여 재시도 • 워커가 죽어도 DB 레코드는 남아있음 → 자동 복구 • retry_count, max_retries를 DB에서 관리 → 런타임 변경 가능 • 관리자가 DB를 수정하여 재시도 제어 가능 실제 운영에서의 장점: • "이 태스크의 재시도를 중지하고 싶다" → UPDATE pipelines SET status='dead' • "재시도 횟수를 늘리고 싶다" → UPDATE pipelines SET max_retries=5 • "실패 원인을 확인하고 수동 재시도" → UPDATE pipelines SET status='pending' ``` ### 7.2 Stuck 감지 (자동 복구) ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Stuck 감지 시나리오 │ └─────────────────────────────────────────────────────────────────────────────┘ 시나리오: 워커가 태스크 처리 중 OOM으로 죽음 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ [기존안] acks_late=True + reject_on_worker_lost=True → Celery가 메시지를 재큐 → 다른 워커가 처리 ※ 하지만 DB 상태가 "processing"으로 남아있을 수 있음 [이 설계안] Beat의 Stuck 감지 T+0: Pipeline.status = 'dispatched', dispatched_at = 12:00 T+5: Worker 시작 → status = 'processing' T+7: Worker OOM으로 죽음 T+15: Beat 폴링: "processing 상태가 15분 이상? stuck!" → status = 'pending' 으로 되돌림 T+25: Beat 폴링: pending 발견 → 재디스패치 → 다른 워커가 처리 이 방식은 "워커 사망" 시나리오를 DB 레벨에서 자동 복구합니다. Celery의 acks_late 메커니즘과 이중으로 보호됩니다. ``` --- ## 8. 설계 및 동작 설명 ### 8.1 이벤트 소싱 패턴 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 이벤트 소싱 관점에서의 설계 │ └─────────────────────────────────────────────────────────────────────────────┘ 이 설계안은 "이벤트 소싱"과 유사한 패턴을 따릅니다. 이벤트 소싱이란: 시스템의 상태를 "현재 값" 대신 "이벤트(상태 변경) 시퀀스"로 저장하는 패턴 이 설계에서: Pipeline 테이블의 각 행 = 하나의 상태 변경 이벤트 task_id = "abc"의 이벤트 시퀀스: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ id │ stage │ status │ created_at │ 의미 ────┼───────┼─────────────────┼────────────┼──────────── 1 │ lyric │ pending │ 12:00:00 │ 파이프라인 생성 1 │ lyric │ dispatched │ 12:00:10 │ Beat가 디스패치 1 │ lyric │ processing │ 12:00:11 │ Worker 시작 1 │ lyric │ stage_completed │ 12:00:16 │ 가사 생성 완료 2 │ song │ pending │ 12:00:20 │ Beat가 다음 단계 생성 2 │ song │ dispatched │ 12:00:20 │ Beat가 디스패치 2 │ song │ processing │ 12:00:21 │ Worker 시작 2 │ song │ failed │ 12:01:21 │ Suno 실패 2 │ song │ pending │ 12:02:30 │ Beat 재시도 2 │ song │ dispatched │ 12:02:30 │ Beat가 재디스패치 2 │ song │ processing │ 12:02:31 │ Worker 재시작 2 │ song │ stage_completed │ 12:03:31 │ 노래 생성 완료 3 │ video │ pending │ 12:03:40 │ Beat가 다음 단계 생성 ... ``` ### 8.2 장단점 분석 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 장단점 분석 │ └─────────────────────────────────────────────────────────────────────────────┘ 장점: ━━━━ ✓ 완벽한 독립성: 태스크가 다음 단계를 전혀 모름 ✓ 자동 복구: stuck, 워커 사망 등을 Beat가 자동 감지 ✓ 상태 이력: DB에 모든 상태 변경이 기록됨 ✓ 런타임 제어: DB 수정으로 재시도 횟수, 상태 등 변경 가능 ✓ 디버깅 용이: DB 쿼리로 전체 파이프라인 이력 조회 ✓ 단일 원천: DB가 유일한 상태 저장소 (불일치 없음) 단점: ━━━━ ✗ 지연: Beat 폴링 간격(10초)만큼 다음 단계 시작 지연 ✗ DB 부하: 주기적 폴링 쿼리가 DB에 부하 ✗ 복잡도: Pipeline 모델 + Beat 디스패처 코드 필요 ✗ Beat 단일 장애점: Beat 프로세스가 죽으면 디스패치 중단 ✗ 추가 테이블: Pipeline 테이블 마이그레이션 필요 ``` ### 8.3 Beat 폴링 간격과 지연 트레이드오프 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 폴링 간격 트레이드오프 │ └─────────────────────────────────────────────────────────────────────────────┘ 폴링 간격: 1초 5초 10초 30초 60초 ──────────────────────────────────────────────── DB 부하: 매우높음 높음 중간 낮음 매우낮음 지연: 1초 5초 10초 30초 60초 응답성: 최고 높음 적정 낮음 매우낮음 ▲ ▲ │ │ 높은 QPS에 배치 처리에 부적합 적합 이 프로젝트 권장: 10초 ────────────────────── • 가사/노래/비디오 생성은 각각 수 초~수 분이 걸림 • 10초 지연은 사용자 경험에 거의 영향 없음 • 인덱스된 쿼리로 DB 부하 최소화 가능 ``` --- ## 9. 기존안과의 비교 ``` ┌──────────────────────┬─────────────┬──────────┬──────────┬──────────────┐ │ 기준 │ 기존안 │ 설계안 1 │ 설계안 2 │ 설계안 3 │ │ │(명시적 전달)│ (chain) │ (link) │(Beat+상태머신)│ ├──────────────────────┼─────────────┼──────────┼──────────┼──────────────┤ │ 태스크 독립성 │ 중간 │ 높음 │ 높음 │ 최고 │ │ 실시간성 │ 즉시 │ 즉시 │ 즉시 │ ~10초 지연 │ │ 상태 관리 │ MySQL+Redis │ Redis │ Redis │ MySQL만 │ │ 자동 복구 │ 제한적 │ 제한적 │ 제한적 │ 강력 │ │ 런타임 제어 │ 코드 변경 │ 코드 변경│ 코드 변경│ DB 수정 가능 │ │ 추가 인프라 │ 없음 │ 없음 │ 없음 │ Beat 프로세스│ │ DB 부하 │ 낮음 │ 낮음 │ 낮음 │ 중간 │ │ 코드 복잡도 │ 낮음 │ 낮음 │ 중간 │ 중간 │ │ 디버깅 │ 중간 │ 중간 │ 중간 │ 용이 │ │ 파이프라인 이력 │ 수동 구현 │ 수동 │ 수동 │ 자동 축적 │ │ 적합한 상황 │ 빠른 반응 │ 단순 체인│ 에러격리 │ 복잡한 워크플로│ └──────────────────────┴─────────────┴──────────┴──────────┴──────────────┘ ``` --- ## 10. 배포 및 운영 ### 10.1 실행 명령어 ```bash # 1. Redis + DB 실행 # 2. DB 마이그레이션 (Pipeline 테이블 추가) uv run alembic upgrade head # 3. FastAPI 서버 uv run uvicorn main:app --reload # 4. Celery Beat (스케줄러) - 반드시 1개만 실행! uv run celery -A app.celery_app beat --loglevel=info # 5. Celery 워커 (각 큐별) uv run celery -A app.celery_app worker -Q lyric_queue -c 2 -n lyric@%h uv run celery -A app.celery_app worker -Q song_queue -c 2 -n song@%h uv run celery -A app.celery_app worker -Q video_queue -c 1 -n video@%h # 6. 스케줄러 전용 워커 uv run celery -A app.celery_app worker -Q scheduler_queue -c 1 -n scheduler@%h ``` ### 10.2 Beat 고가용성 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Beat 고가용성 전략 │ └─────────────────────────────────────────────────────────────────────────────┘ 문제: Beat는 반드시 1개만 실행해야 함 (중복 디스패치 방지) → Beat가 죽으면 파이프라인 진행 중단 해결 방법: [방법 1] 프로세스 관리자 (Supervisor) ───────────────────────────────────── [program:celery-beat] command=celery -A app.celery_app beat autorestart=true startretries=10 [방법 2] Kubernetes Deployment (replicas=1) ─────────────────────────────────────────── apiVersion: apps/v1 kind: Deployment metadata: name: celery-beat spec: replicas: 1 # 반드시 1 strategy: type: Recreate # 롤링 업데이트 대신 재생성 template: spec: containers: - name: beat command: ["celery", "-A", "app.celery_app", "beat"] [방법 3] django-celery-beat (DB 스케줄러) ─────────────────────────────────────────── • DB에 스케줄을 저장하여 Beat 재시작 시에도 스케줄 유지 • 분산 잠금으로 중복 실행 방지 가능 ``` --- ## 문서 버전 | 버전 | 날짜 | 변경 내용 | |------|------|-----------| | 1.0 | 2024-XX-XX | 초안 작성 |