o2o-castad-backend/docs/plan/celery/celery-plan_1-chain-primiti...

58 KiB

설계안 1: Celery Chain Primitive 파이프라인

Celery Canvas의 chain() 원시 타입을 활용한 선언적 파이프라인 설계


목차

  1. 개요 및 핵심 차이점
  2. 아키텍처 설계
  3. 데이터 흐름 상세
  4. 큐 및 태스크 동작 상세
  5. 코드 구현
  6. 상태 관리 및 모니터링
  7. 실패 처리 전략
  8. 설계 및 동작 설명
  9. 기존안과의 비교
  10. 배포 및 운영

1. 개요 및 핵심 차이점

1.1 설계 철학

이 설계안은 Celery Canvas의 chain() 원시 타입을 사용하여 파이프라인을 선언적으로 정의합니다.

기존안(celery-plan.md)에서는 각 태스크가 완료 후 다음 큐에 명시적으로 apply_async()를 호출하는 반면, 이 방식에서는 파이프라인 시작 시점에 전체 실행 순서를 한 번에 선언합니다.

┌─────────────────────────────────────────────────────────────────────────────┐
│                       기존안 vs 이 설계안                                    │
├────────────────────────────────┬────────────────────────────────────────────┤
│        기존안 (명시적 전달)     │       설계안 1 (Chain Primitive)            │
├────────────────────────────────┼────────────────────────────────────────────┤
│ lyric_task 내부에서             │ API에서 전체 체인을 한 번에 선언:           │
│   song_task.apply_async()      │   chain(                                   │
│   호출                         │     lyric.s() | song.s() | video.s()      │
│                                │   ).apply_async()                          │
│                                │                                            │
│ 각 태스크가 다음 단계를 "안다"  │ 각 태스크는 다음 단계를 "모른다"            │
│ (강한 흐름 결합)               │ (Celery가 자동으로 체이닝)                  │
│                                │                                            │
│ 태스크 A → 직접 발행 → 태스크 B│ 태스크 A → 결과 반환 → Celery → 태스크 B   │
└────────────────────────────────┴────────────────────────────────────────────┘

1.2 핵심 원칙

┌─────────────────────────────────────────────────────────────────┐
│                     핵심 설계 원칙                                │
├─────────────────────────────────────────────────────────────────┤
│ 1. 선언적 파이프라인: 실행 순서를 코드 한 줄로 정의              │
│ 2. 결과 전파: 이전 태스크의 반환값이 다음 태스크의 입력          │
│ 3. 순수 함수: 각 태스크는 자신의 다음 단계를 모름               │
│ 4. 단일 진입점: API에서 chain을 생성, 전체 흐름을 제어           │
└─────────────────────────────────────────────────────────────────┘

2. 아키텍처 설계

2.1 전체 아키텍처

┌─────────────────────────────────────────────────────────────────────────────┐
│                   Chain Primitive 파이프라인 아키텍처                         │
└─────────────────────────────────────────────────────────────────────────────┘

                               ┌─────────────┐
                               │   Client    │
                               └──────┬──────┘
                                      │ POST /pipeline/start
                                      ▼
                            ┌────────────────────┐
                            │     FastAPI        │
                            │                    │
                            │  chain(            │
                            │    lyric.s(data)   │◄── 파이프라인을 한 번에 선언
                            │    | song.s()      │
                            │    | video.s()     │
                            │  ).apply_async()   │
                            └─────────┬──────────┘
                                      │
                                      │ chain 메시지 발행
                                      ▼
                            ┌────────────────────┐
                            │   Redis Broker     │
                            │                    │
                            │  chain 메타데이터:  │
                            │  task1 → task2 →   │
                            │  task3 순서 보관    │
                            └─────────┬──────────┘
                                      │
                    ┌─────────────────┼─────────────────┐
                    │                 │                 │
                    ▼                 ▼                 ▼
           ┌───────────────┐ ┌───────────────┐ ┌───────────────┐
           │  lyric_queue  │ │  song_queue   │ │  video_queue  │
           └───────┬───────┘ └───────┬───────┘ └───────┬───────┘
                   │                 │                 │
                   ▼                 ▼                 ▼
           ┌───────────────┐ ┌───────────────┐ ┌───────────────┐
           │ Lyric Worker  │ │  Song Worker  │ │ Video Worker  │
           │               │ │               │ │               │
           │ 입력: data    │ │ 입력: lyric의 │ │ 입력: song의  │
           │ 출력: result  │ │   반환값      │ │   반환값      │
           │  (자동 전달)  │ │ 출력: result  │ │ 출력: result  │
           └───────────────┘ └───────────────┘ └───────────────┘

2.2 chain()의 내부 동작

┌─────────────────────────────────────────────────────────────────────────────┐
│                    chain()이 내부적으로 하는 일                               │
└─────────────────────────────────────────────────────────────────────────────┘

chain(lyric.s(data) | song.s() | video.s()).apply_async()
      │                │           │
      │                │           │
      ▼                ▼           ▼

Celery가 내부적으로 다음과 같이 변환:

1. lyric.apply_async(
       args=(data,),
       link=song.s().set(link=video.s())  ◄── 콜백 체인 설정
   )

2. lyric 완료 시:
   song.apply_async(
       args=(lyric_result,),              ◄── 이전 결과가 첫 번째 인자로
       link=video.s()
   )

3. song 완료 시:
   video.apply_async(
       args=(song_result,),              ◄── 이전 결과가 첫 번째 인자로
   )

핵심: 각 태스크의 "반환값"이 다음 태스크의 "첫 번째 인자"로 자동 전달됨

2.3 큐 설계 (기존안과 동일)

# 큐 구성은 기존안과 동일하게 3개 독립 큐 사용
# 차이점: 메시지 발행 방식만 다름 (chain이 자동으로 라우팅)

CELERY_QUEUES = {
    'lyric_queue': {'routing_key': 'lyric.generate'},
    'song_queue':  {'routing_key': 'song.generate'},
    'video_queue': {'routing_key': 'video.generate'},
}

3. 데이터 흐름 상세

3.1 핵심 차이: 결과 전파 방식

sequenceDiagram
    participant C as Client
    participant API as FastAPI
    participant CE as Celery Engine
    participant LW as Lyric Worker
    participant SW as Song Worker
    participant VW as Video Worker
    participant DB as MySQL

    C->>API: POST /pipeline/start

    Note over API: chain 선언
    API->>CE: chain(lyric.s(data) | song.s() | video.s())

    Note over CE: 1단계: lyric_queue에 발행
    CE->>LW: lyric_task(data)
    LW->>DB: Lyric 생성 + ChatGPT 호출
    LW-->>CE: return {"task_id": "xxx", "lyric_result": "..."}

    Note over CE: 2단계: 자동으로 song_queue에 발행 (lyric 결과 포함)
    CE->>SW: song_task({"task_id": "xxx", "lyric_result": "..."})
    SW->>DB: Song 생성 + Suno API 호출
    SW-->>CE: return {"task_id": "xxx", "song_url": "..."}

    Note over CE: 3단계: 자동으로 video_queue에 발행 (song 결과 포함)
    CE->>VW: video_task({"task_id": "xxx", "song_url": "..."})
    VW->>DB: Video 생성 + Creatomate 호출
    VW-->>CE: return {"task_id": "xxx", "video_url": "..."}

    Note over CE: chain 완료
    CE-->>C: (상태 조회 API로 확인)

3.2 단계별 데이터 형식

# 각 태스크의 입출력 형식 정의

# ─────────────────────────────────────────
# Lyric Task (chain의 첫 번째)
# ─────────────────────────────────────────
# 입력: API에서 전달하는 초기 데이터
lyric_input = {
    "task_id": "0192abc-...",
    "customer_name": "스테이 머뭄",
    "region": "군산",
    "detail_region_info": "군산 신흥동",
    "language": "Korean",
}

# 출력: 다음 태스크(song)의 입력이 됨
lyric_output = {
    "task_id": "0192abc-...",
    "lyric_id": 42,
    "lyric_result": "아침 햇살 가득한 카페 머뭄에서...",
    "language": "Korean",
    "status": "completed",
}

# ─────────────────────────────────────────
# Song Task (chain의 두 번째)
# ─────────────────────────────────────────
# 입력: lyric_output이 자동으로 전달됨
# song_input = lyric_output  (동일 구조)

# 출력: 다음 태스크(video)의 입력이 됨
song_output = {
    "task_id": "0192abc-...",
    "lyric_id": 42,
    "song_id": 15,
    "song_result_url": "https://blob.azure.../song.mp3",
    "duration": 62.5,
    "status": "completed",
}

# ─────────────────────────────────────────
# Video Task (chain의 마지막)
# ─────────────────────────────────────────
# 입력: song_output이 자동으로 전달됨
# video_input = song_output  (동일 구조)

# 출력: 최종 파이프라인 결과
video_output = {
    "task_id": "0192abc-...",
    "video_id": 8,
    "result_movie_url": "https://blob.azure.../video.mp4",
    "status": "completed",
}

3.3 상태 전이 다이어그램

stateDiagram-v2
    [*] --> chain_created: chain().apply_async()

    state "Chain Execution" as ChainExec {
        chain_created --> lyric_running: Celery가 lyric_queue에 발행
        lyric_running --> lyric_done: return lyric_output

        lyric_done --> song_running: Celery가 song_output를 song_queue에 자동 발행
        song_running --> song_done: return song_output

        song_done --> video_running: Celery가 video_queue에 자동 발행
        video_running --> video_done: return video_output
    }

    video_done --> [*]: chain 완료

    lyric_running --> chain_failed: 예외 발생 (max_retries 초과)
    song_running --> chain_failed: 예외 발생
    video_running --> chain_failed: 예외 발생

    chain_failed --> [*]: link_error 콜백 실행

4. 큐 및 태스크 동작 상세

4.1 chain이 큐를 활용하는 방식

┌─────────────────────────────────────────────────────────────────────────────┐
│                  chain이 큐를 활용하는 방식                                   │
└─────────────────────────────────────────────────────────────────────────────┘

chain 생성 시점 (API 서버):
━━━━━━━━━━━━━━━━━━━━━━━━━

pipeline = chain(
    generate_lyric.s(data).set(queue='lyric_queue'),   # ① 큐 지정
    generate_song.s().set(queue='song_queue'),          # ② 큐 지정
    generate_video.s().set(queue='video_queue'),        # ③ 큐 지정
)
pipeline.apply_async()

Celery 내부 처리:
━━━━━━━━━━━━━━━━

[T+0ms] lyric_queue에 메시지 발행
        메시지 내부에 "다음 태스크 정보" 포함:
        {
            "task": "generate_lyric",
            "kwargs": {"task_id": "...", ...},
            "callbacks": [             ◄── chain의 다음 단계 정보
                {
                    "task": "generate_song",
                    "queue": "song_queue",
                    "callbacks": [
                        {
                            "task": "generate_video",
                            "queue": "video_queue"
                        }
                    ]
                }
            ]
        }

[T+5s]  Lyric Worker가 lyric_queue에서 메시지 수신
        generate_lyric 실행
        결과 반환: {"task_id": "...", "lyric_result": "..."}

[T+5s]  Celery Engine이 자동으로:
        1. 결과를 callbacks[0]의 첫 번째 인자로 설정
        2. song_queue에 메시지 발행

[T+10s] Song Worker가 song_queue에서 메시지 수신
        generate_song(lyric_result) 실행
        결과 반환

[T+10s] Celery Engine이 자동으로:
        1. 결과를 다음 callback의 첫 번째 인자로
        2. video_queue에 메시지 발행

[T+20s] Video Worker가 video_queue에서 메시지 수신
        최종 처리

4.2 워커 격리 (기존안과 동일)

# 각 워커는 자신의 큐만 구독 (변경 없음)
celery -A app.celery_app worker -Q lyric_queue -n lyric@%h
celery -A app.celery_app worker -Q song_queue  -n song@%h
celery -A app.celery_app worker -Q video_queue -n video@%h

# chain이 큐를 지정하므로, 워커 격리는 자동으로 보장됨

4.3 chain의 메시지 크기 주의점

┌─────────────────────────────────────────────────────────────────────────────┐
│                  chain 메시지 크기 주의점                                     │
└─────────────────────────────────────────────────────────────────────────────┘

문제:
chain에서 이전 태스크의 반환값이 다음 태스크의 인자로 전달되므로,
반환값이 크면 Redis 메시지 크기가 커집니다.

예시:
  lyric_result가 10KB → song_task 메시지에 10KB 포함
  song_result에 audio_url만 포함 → 작은 크기

대응 전략:
  ✓ 반환값에는 ID와 URL만 포함 (경량 데이터)
  ✓ 대용량 데이터(가사 전문, 오디오)는 DB에 저장
  ✓ 다음 태스크가 DB에서 필요한 데이터를 직접 조회

권장 반환값 크기: < 1KB

5. 코드 구현

5.1 Celery 앱 설정

# app/celery_app.py
"""
Celery 앱 설정 (기존안과 동일한 큐 구조)

Chain에서는 task_routes 설정이 중요합니다.
chain에서 .set(queue=...) 를 생략하면 task_routes로 라우팅됩니다.
"""

from celery import Celery
from kombu import Queue, Exchange
import os

celery_app = Celery(
    'o2o_castad',
    broker=os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
    backend=os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'),
    include=[
        'app.tasks.lyric_tasks',
        'app.tasks.song_tasks',
        'app.tasks.video_tasks',
    ]
)

# 큐 정의 (기존안과 동일)
celery_app.conf.task_queues = (
    Queue('lyric_queue', Exchange('lyric', type='direct'), routing_key='lyric.generate'),
    Queue('song_queue', Exchange('song', type='direct'), routing_key='song.generate'),
    Queue('video_queue', Exchange('video', type='direct'), routing_key='video.generate'),
)

# 태스크 라우팅 - chain에서 자동으로 사용됨
celery_app.conf.task_routes = {
    'app.tasks.lyric_tasks.*': {'queue': 'lyric_queue'},
    'app.tasks.song_tasks.*':  {'queue': 'song_queue'},
    'app.tasks.video_tasks.*': {'queue': 'video_queue'},
}

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Seoul',
    enable_utc=True,
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,
    result_expires=86400,
    result_extended=True,
)

5.2 가사 생성 태스크

# app/tasks/lyric_tasks.py
"""
가사 생성 태스크 (Chain 방식)

핵심 차이점:
- 이 태스크는 다음 단계(song)에 대해 전혀 모릅니다.
- 단순히 자신의 작업을 수행하고 결과를 반환할 뿐입니다.
- chain()이 자동으로 반환값을 다음 태스크에 전달합니다.
- "다음 큐에 apply_async()"하는 코드가 없습니다.
"""

from celery import Task
from sqlalchemy import select
import asyncio
import logging

from app.celery_app import celery_app
from app.tasks.base import BaseTaskWithDB
from app.home.models import Project
from app.lyric.models import Lyric
from app.utils.chatgpt_prompt import ChatgptService
from app.utils.prompts.prompts import Prompt

logger = logging.getLogger(__name__)


@celery_app.task(
    base=BaseTaskWithDB,
    bind=True,
    name='app.tasks.lyric_tasks.generate_lyric',
    max_retries=3,
    default_retry_delay=30,
    acks_late=True,
)
def generate_lyric(self, data: dict) -> dict:
    """
    가사 생성 태스크

    chain()에서 첫 번째로 실행됩니다.
    반환값이 자동으로 다음 태스크(generate_song)의 입력이 됩니다.

    Args:
        data: {
            "task_id": str,
            "customer_name": str,
            "region": str,
            "detail_region_info": str,
            "language": str
        }

    Returns:
        dict: {
            "task_id": str,
            "lyric_id": int,
            "lyric_result": str,    ← 경량화: 가사 전문 대신 DB에서 조회하도록 유도
            "language": str,
            "status": "completed"
        }
        → 이 반환값이 generate_song의 첫 번째 인자로 자동 전달됨

    독립성 보장:
        - 이 태스크는 generate_song, generate_video의 존재를 모릅니다.
        - 자신의 작업(ChatGPT 가사 생성)만 수행합니다.
        - 반환값을 chain이 자동으로 전파합니다.
    """
    task_id = data['task_id']
    customer_name = data['customer_name']
    region = data['region']
    detail_region_info = data['detail_region_info']
    language = data.get('language', 'Korean')

    # 파이프라인 상태 업데이트
    self.update_pipeline_status(
        task_id=task_id,
        stage='lyric',
        status='processing',
        message='가사 생성을 시작합니다.'
    )

    async def _generate():
        # ────────────────────────────────────────────────
        # 1단계: DB 레코드 생성
        # ────────────────────────────────────────────────
        async with self.get_db_session() as session:
            project = await session.scalar(
                select(Project).where(Project.task_id == task_id)
            )
            if not project:
                project = Project(
                    task_id=task_id,
                    customer_name=customer_name,
                    region=region,
                )
                session.add(project)
                await session.flush()

            prompt = Prompt(
                customer_name=customer_name,
                region=region,
                detail_region_info=detail_region_info,
                language=language,
            )
            lyric_prompt = prompt.get_full_prompt()

            lyric = Lyric(
                project_id=project.id,
                task_id=task_id,
                status='processing',
                lyric_prompt=lyric_prompt,
                language=language,
            )
            session.add(lyric)
            await session.commit()
            lyric_id = lyric.id

        # ────────────────────────────────────────────────
        # 2단계: ChatGPT API 호출 (DB 세션 외부)
        # ────────────────────────────────────────────────
        try:
            chatgpt = ChatgptService()
            lyric_result = await chatgpt.generate_lyric(lyric_prompt)

            if not lyric_result or len(lyric_result.strip()) < 50:
                raise ValueError("생성된 가사가 너무 짧습니다.")
        except Exception as e:
            async with self.get_db_session() as session:
                lyric = await session.get(Lyric, lyric_id)
                lyric.status = 'failed'
                lyric.lyric_result = f"Error: {str(e)}"
                await session.commit()
            raise  # chain이 자동으로 에러 처리

        # ────────────────────────────────────────────────
        # 3단계: 결과 저장
        # ────────────────────────────────────────────────
        async with self.get_db_session() as session:
            lyric = await session.get(Lyric, lyric_id)
            lyric.status = 'completed'
            lyric.lyric_result = lyric_result
            await session.commit()

        return {
            'task_id': task_id,
            'lyric_id': lyric_id,
            'lyric_result': lyric_result,
            'language': language,
            'status': 'completed',
        }

    result = self.run_async(_generate())

    # 상태 업데이트
    self.update_pipeline_status(
        task_id=task_id,
        stage='lyric',
        status='completed',
        message='가사 생성 완료'
    )

    # ────────────────────────────────────────────────────────────────
    # 핵심: 여기서 다음 태스크를 직접 호출하지 않습니다!
    # chain()이 이 반환값을 자동으로 generate_song에 전달합니다.
    # ────────────────────────────────────────────────────────────────
    logger.info(f"[Lyric] task_id={task_id} 완료. chain이 자동으로 다음 단계 실행.")

    return result  # ← 이 값이 generate_song(result)로 자동 전달됨

5.3 노래 생성 태스크

# app/tasks/song_tasks.py
"""
노래 생성 태스크 (Chain 방식)

핵심 차이점:
- 첫 번째 인자 `prev_result`는 chain에서 자동으로 전달됩니다.
- generate_lyric의 반환값이 여기의 prev_result가 됩니다.
- 이 태스크도 generate_video의 존재를 모릅니다.
"""

from sqlalchemy import select, desc
import asyncio
import aiohttp
import os
import logging

from app.celery_app import celery_app
from app.tasks.base import BaseTaskWithDB
from app.home.models import Project
from app.lyric.models import Lyric
from app.song.models import Song, SongTimestamp
from app.utils.suno import SunoService
from app.utils.upload_blob_as_request import AzureBlobUploader

logger = logging.getLogger(__name__)

SUNO_POLL_INTERVAL = 10
SUNO_MAX_POLL_TIME = 300


@celery_app.task(
    base=BaseTaskWithDB,
    bind=True,
    name='app.tasks.song_tasks.generate_song',
    max_retries=3,
    default_retry_delay=60,
    acks_late=True,
    soft_time_limit=540,
    time_limit=600,
)
def generate_song(self, prev_result: dict, genre: str = "pop, ambient") -> dict:
    """
    노래 생성 태스크

    Args:
        prev_result: chain에서 자동 전달된 이전 태스크의 반환값
            {
                "task_id": str,
                "lyric_id": int,
                "lyric_result": str,
                "language": str,
            }
        genre: 음악 장르

    Returns:
        dict: 다음 태스크(generate_video)에 자동 전달될 결과
            {
                "task_id": str,
                "lyric_id": int,
                "song_id": int,
                "song_result_url": str,
                "duration": float,
            }

    chain 동작:
        generate_lyric() → [결과] → generate_song([결과]) → [결과] → generate_video([결과])
                                     ^^^^^^^^^^^^^^^^
                                     여기서 실행됨
    """
    # ────────────────────────────────────────────────────────────────
    # chain에서 전달받은 이전 결과에서 필요한 데이터 추출
    # ────────────────────────────────────────────────────────────────
    task_id = prev_result['task_id']
    lyric_id = prev_result['lyric_id']

    logger.info(f"[Song] chain에서 자동 전달 수신: task_id={task_id}")

    self.update_pipeline_status(
        task_id=task_id,
        stage='song',
        status='processing',
        message='노래 생성을 시작합니다.'
    )

    async def _generate():
        # ────────────────────────────────────────────────
        # 1단계: DB에서 상세 데이터 조회
        # ────────────────────────────────────────────────
        # chain이 전달한 데이터는 경량이므로,
        # 실제 작업에 필요한 상세 데이터는 DB에서 조회
        async with self.get_db_session() as session:
            lyric = await session.get(Lyric, lyric_id)
            if not lyric or lyric.status != 'completed':
                raise ValueError(f"Lyric not ready: id={lyric_id}")

            project = await session.get(Project, lyric.project_id)

            song = Song(
                project_id=project.id,
                lyric_id=lyric.id,
                task_id=task_id,
                status='processing',
                song_prompt=f"{lyric.lyric_result}\n\nGenre: {genre}",
                language=lyric.language,
            )
            session.add(song)
            await session.commit()
            song_id = song.id
            lyrics_text = lyric.lyric_result

        # ────────────────────────────────────────────────
        # 2단계: Suno API 호출 + 폴링 (DB 세션 외부)
        # ────────────────────────────────────────────────
        suno = SunoService()
        suno_response = await suno.generate_music(prompt=lyrics_text, style=genre)
        suno_task_id = suno_response.get('task_id')

        # suno_task_id 저장
        async with self.get_db_session() as session:
            song = await session.get(Song, song_id)
            song.suno_task_id = suno_task_id
            await session.commit()

        # 폴링
        elapsed = 0
        audio_url = None
        duration = None
        suno_audio_id = None

        while elapsed < SUNO_MAX_POLL_TIME:
            await asyncio.sleep(SUNO_POLL_INTERVAL)
            elapsed += SUNO_POLL_INTERVAL

            self.update_pipeline_status(
                task_id=task_id,
                stage='song',
                status='processing',
                message=f'Suno 음악 생성 중... ({elapsed}초)'
            )

            status_resp = await suno.get_task_status(suno_task_id)
            if status_resp.get('status') == 'SUCCESS':
                clips = status_resp.get('clips', [])
                if clips:
                    audio_url = clips[0].get('audio_url')
                    duration = clips[0].get('duration')
                    suno_audio_id = clips[0].get('id')
                break
            elif status_resp.get('status') == 'failed':
                raise ValueError("Suno generation failed")

        if not audio_url:
            raise ValueError("Suno generation timed out")

        # ────────────────────────────────────────────────
        # 3단계: 업로드 + 타임스탬프 저장
        # ────────────────────────────────────────────────
        temp_dir = f"media/temp/{task_id}"
        os.makedirs(temp_dir, exist_ok=True)
        temp_file = f"{temp_dir}/song.mp3"

        try:
            async with aiohttp.ClientSession() as http:
                async with http.get(audio_url) as resp:
                    with open(temp_file, 'wb') as f:
                        f.write(await resp.read())

            uploader = AzureBlobUploader()
            blob_url = await uploader.upload_file(
                file_path=temp_file,
                blob_name=f"songs/{task_id}/song.mp3",
                content_type='audio/mpeg',
            )
        finally:
            if os.path.exists(temp_file):
                os.remove(temp_file)

        # 타임스탬프 저장 (실패해도 계속)
        try:
            timestamps = await suno.get_lyric_timestamp(suno_audio_id)
            async with self.get_db_session() as session:
                for idx, ts in enumerate(timestamps):
                    session.add(SongTimestamp(
                        suno_audio_id=suno_audio_id,
                        order_idx=idx,
                        lyric_line=ts.get('text', ''),
                        start_time=ts.get('start_time', 0),
                        end_time=ts.get('end_time', 0),
                    ))
                await session.commit()
        except Exception as e:
            logger.warning(f"Timestamp save failed: {e}")

        # 최종 업데이트
        async with self.get_db_session() as session:
            song = await session.get(Song, song_id)
            song.status = 'completed'
            song.song_result_url = blob_url
            song.suno_audio_id = suno_audio_id
            song.duration = duration
            await session.commit()

        return {
            'task_id': task_id,
            'lyric_id': lyric_id,
            'song_id': song_id,
            'song_result_url': blob_url,
            'duration': duration,
            'status': 'completed',
        }

    result = self.run_async(_generate())

    self.update_pipeline_status(
        task_id=task_id,
        stage='song',
        status='completed',
        message='노래 생성 완료'
    )

    # ────────────────────────────────────────────────────────────────
    # chain이 자동으로 이 결과를 generate_video에 전달합니다.
    # 여기서 video_task를 직접 호출하지 않습니다!
    # ────────────────────────────────────────────────────────────────
    return result

5.4 비디오 생성 태스크

# app/tasks/video_tasks.py
"""
비디오 생성 태스크 (Chain 방식 - 마지막 단계)

chain의 마지막 태스크이므로, 반환값은 chain의 최종 결과가 됩니다.
AsyncResult.get()으로 이 결과를 조회할 수 있습니다.
"""

from sqlalchemy import select, desc
import asyncio
import aiohttp
import os
import logging

from app.celery_app import celery_app
from app.tasks.base import BaseTaskWithDB
from app.home.models import Project, Image
from app.lyric.models import Lyric
from app.song.models import Song, SongTimestamp
from app.video.models import Video
from app.utils.creatomate import CreatomateService
from app.utils.upload_blob_as_request import AzureBlobUploader

logger = logging.getLogger(__name__)

CREATOMATE_POLL_INTERVAL = 15
CREATOMATE_MAX_POLL_TIME = 600
TEMPLATE_ID_VERTICAL = "e8c7b43f-de4b-4ba3-b8eb-5df688569193"
TEMPLATE_ID_HORIZONTAL = "0f092a6a-f526-4ef0-9181-d4ad4426b9e7"


@celery_app.task(
    base=BaseTaskWithDB,
    bind=True,
    name='app.tasks.video_tasks.generate_video',
    max_retries=2,
    default_retry_delay=120,
    acks_late=True,
    soft_time_limit=840,
    time_limit=900,
)
def generate_video(self, prev_result: dict, orientation: str = "vertical") -> dict:
    """
    비디오 생성 태스크 (chain 마지막)

    Args:
        prev_result: chain에서 자동 전달 (generate_song의 반환값)
            {
                "task_id": str,
                "song_id": int,
                "song_result_url": str,
                "duration": float,
            }
        orientation: 비디오 방향

    Returns:
        dict: chain의 최종 결과
            {
                "task_id": str,
                "result_movie_url": str,
                "status": "completed",
            }
    """
    task_id = prev_result['task_id']
    song_id = prev_result['song_id']

    logger.info(f"[Video] chain 마지막 단계: task_id={task_id}")

    self.update_pipeline_status(
        task_id=task_id,
        stage='video',
        status='processing',
        message='비디오 생성을 시작합니다.'
    )

    async def _generate():
        # DB 조회
        async with self.get_db_session() as session:
            song = await session.get(Song, song_id)
            if not song or song.status != 'completed':
                raise ValueError(f"Song not ready: id={song_id}")

            lyric = await session.get(Lyric, song.lyric_id)
            project = await session.get(Project, song.project_id)

            images = await session.scalars(
                select(Image)
                .where(Image.project_id == project.id)
                .where(Image.is_deleted == False)
                .order_by(Image.img_order)
            )
            image_list = list(images)

            timestamps = await session.scalars(
                select(SongTimestamp)
                .where(SongTimestamp.suno_audio_id == song.suno_audio_id)
                .order_by(SongTimestamp.order_idx)
            )
            timestamp_list = list(timestamps)

            video = Video(
                project_id=project.id,
                lyric_id=lyric.id,
                song_id=song.id,
                task_id=task_id,
                status='processing',
            )
            session.add(video)
            await session.commit()
            video_id = video.id

            song_url = song.song_result_url
            song_duration = song.duration
            image_urls = [img.image_url for img in image_list]
            lyric_timestamps = [
                {'text': ts.lyric_line, 'start': ts.start_time, 'end': ts.end_time}
                for ts in timestamp_list
            ]

        # Creatomate 처리
        template_id = TEMPLATE_ID_VERTICAL if orientation == 'vertical' else TEMPLATE_ID_HORIZONTAL
        creatomate = CreatomateService()

        template = await creatomate.get_template(template_id)
        modifications = {
            'music_url': song_url,
            'duration': song_duration,
            **{f'image_{i+1}': url for i, url in enumerate(image_urls[:10])},
            'captions': lyric_timestamps,
        }

        render_response = await creatomate.render(
            template_id=template_id,
            modifications=modifications,
        )
        render_id = render_response.get('id')

        async with self.get_db_session() as session:
            video = await session.get(Video, video_id)
            video.creatomate_render_id = render_id
            await session.commit()

        # 폴링
        elapsed = 0
        video_url = None
        while elapsed < CREATOMATE_MAX_POLL_TIME:
            await asyncio.sleep(CREATOMATE_POLL_INTERVAL)
            elapsed += CREATOMATE_POLL_INTERVAL

            self.update_pipeline_status(
                task_id=task_id, stage='video', status='rendering',
                message=f'렌더링 중... ({elapsed}초)'
            )

            status_resp = await creatomate.get_render_status(render_id)
            if status_resp.get('status') == 'succeeded':
                video_url = status_resp.get('url')
                break
            elif status_resp.get('status') == 'failed':
                raise ValueError(f"Rendering failed: {status_resp.get('error_message')}")

        if not video_url:
            raise ValueError("Rendering timed out")

        # 업로드
        temp_dir = f"media/temp/{task_id}"
        os.makedirs(temp_dir, exist_ok=True)
        temp_file = f"{temp_dir}/video.mp4"

        try:
            async with aiohttp.ClientSession() as http:
                async with http.get(video_url) as resp:
                    with open(temp_file, 'wb') as f:
                        f.write(await resp.read())

            uploader = AzureBlobUploader()
            blob_url = await uploader.upload_file(
                file_path=temp_file,
                blob_name=f"videos/{task_id}/video.mp4",
                content_type='video/mp4',
            )
        finally:
            if os.path.exists(temp_file):
                os.remove(temp_file)

        async with self.get_db_session() as session:
            video = await session.get(Video, video_id)
            video.status = 'completed'
            video.result_movie_url = blob_url
            await session.commit()

        return {
            'task_id': task_id,
            'video_id': video_id,
            'result_movie_url': blob_url,
            'status': 'completed',
        }

    result = self.run_async(_generate())

    self.update_pipeline_status(
        task_id=task_id,
        stage='video',
        status='completed',
        message='파이프라인 완료',
        extra_data={'result_movie_url': result['result_movie_url']}
    )

    # chain의 마지막 태스크 - 최종 결과 반환
    return result

5.5 파이프라인 API (핵심 차이점)

# app/api/routers/v1/pipeline.py
"""
Chain 기반 파이프라인 API

핵심 차이점:
- API에서 chain()을 선언하여 전체 파이프라인을 한 번에 정의합니다.
- 각 태스크는 다음 단계를 모릅니다.
- 파이프라인 제어(순서, 에러 핸들링)가 API 레벨에 집중됩니다.
"""

from celery import chain
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel

from app.dependencies.auth import get_current_user
from app.tasks.lyric_tasks import generate_lyric
from app.tasks.song_tasks import generate_song
from app.tasks.video_tasks import generate_video
from app.celery_app import celery_app

router = APIRouter(prefix="/pipeline", tags=["Pipeline"])


class StartPipelineRequest(BaseModel):
    task_id: str
    customer_name: str
    region: str
    detail_region_info: str
    language: str = "Korean"
    orientation: str = "vertical"
    genre: str = "pop, ambient"


@router.post("/start")
async def start_pipeline(
    request: StartPipelineRequest,
    current_user=Depends(get_current_user)
):
    """
    파이프라인 시작

    chain()으로 전체 파이프라인을 선언적으로 정의합니다.
    """
    # ────────────────────────────────────────────────────────────────
    # 핵심: 전체 파이프라인을 한 줄로 선언
    # ────────────────────────────────────────────────────────────────
    # .s() = signature: 태스크의 지연 호출 선언
    # | = chain 연산자: 왼쪽 태스크의 결과를 오른쪽 태스크의 입력으로 연결
    # .set(queue=...) = 실행될 큐 지정

    initial_data = {
        "task_id": request.task_id,
        "customer_name": request.customer_name,
        "region": request.region,
        "detail_region_info": request.detail_region_info,
        "language": request.language,
    }

    pipeline = chain(
        # 1단계: 가사 생성 → lyric_queue에서 실행
        generate_lyric.s(initial_data).set(queue='lyric_queue'),

        # 2단계: 노래 생성 → song_queue에서 실행
        # generate_lyric의 반환값이 자동으로 첫 번째 인자로 전달
        generate_song.s(genre=request.genre).set(queue='song_queue'),

        # 3단계: 비디오 생성 → video_queue에서 실행
        # generate_song의 반환값이 자동으로 첫 번째 인자로 전달
        generate_video.s(orientation=request.orientation).set(queue='video_queue'),
    )

    # chain 실행 - 전체 파이프라인 시작
    async_result = pipeline.apply_async()

    return {
        'success': True,
        'task_id': request.task_id,
        'chain_id': async_result.id,       # chain 전체 ID
        'message': '파이프라인이 시작되었습니다.',
    }


@router.get("/result/{chain_id}")
async def get_chain_result(
    chain_id: str,
    current_user=Depends(get_current_user)
):
    """
    chain 결과 조회

    chain의 마지막 태스크(video)의 결과를 조회합니다.
    """
    result = celery_app.AsyncResult(chain_id)

    return {
        'chain_id': chain_id,
        'state': result.state,       # PENDING, STARTED, SUCCESS, FAILURE
        'result': result.result if result.ready() else None,
        'ready': result.ready(),
    }


# ────────────────────────────────────────────────────────────────
# chain 에러 핸들링 (link_error)
# ────────────────────────────────────────────────────────────────
# chain에서 에러 발생 시 호출될 콜백 태스크
@celery_app.task(name='app.tasks.error_handler')
def pipeline_error_handler(request, exc, traceback):
    """
    chain 실패 시 호출되는 에러 핸들러

    어떤 단계에서 실패했든, 이 핸들러가 호출됩니다.
    """
    task_id = request.kwargs.get('task_id') or 'unknown'
    logger.error(f"Pipeline failed: task_id={task_id}, error={exc}")
    # 알림 발송 등


# 에러 핸들러 포함 chain 사용 예시:
# pipeline = chain(
#     generate_lyric.s(data),
#     generate_song.s(),
#     generate_video.s(),
# )
# pipeline.apply_async(link_error=pipeline_error_handler.s())

6. 상태 관리 및 모니터링

6.1 chain 상태 추적

┌─────────────────────────────────────────────────────────────────────────────┐
│                    chain 상태 추적 방법                                      │
└─────────────────────────────────────────────────────────────────────────────┘

방법 1: chain_id로 결과 조회
───────────────────────────
async_result = celery_app.AsyncResult(chain_id)
print(async_result.state)   # PENDING, STARTED, SUCCESS, FAILURE
print(async_result.result)  # chain의 마지막 태스크 결과

※ 주의: chain_id로는 마지막 태스크 상태만 볼 수 있음

방법 2: 각 태스크의 ID를 저장하여 개별 추적
───────────────────────────────────────────
pipeline = chain(lyric.s(data), song.s(), video.s())
result = pipeline.apply_async()

# chain의 부모(parent) 추적
print(result.id)           # video 태스크 ID
print(result.parent.id)    # song 태스크 ID
print(result.parent.parent.id)  # lyric 태스크 ID

방법 3: 커스텀 Redis 상태 (기존안과 동일)
──────────────────────────────────────────
각 태스크 내에서 pipeline:{task_id}:status 업데이트
→ 이 방법이 가장 실용적

6.2 모니터링 (기존안과 동일)

Flower, 커스텀 CLI 도구 등은 기존안과 동일합니다.


7. 실패 처리 전략

7.1 chain에서의 실패 처리

┌─────────────────────────────────────────────────────────────────────────────┐
│                    chain 실패 시 동작                                        │
└─────────────────────────────────────────────────────────────────────────────┘

chain(lyric.s() | song.s() | video.s())

[시나리오 1] lyric 실패 (재시도 후 성공)
──────────────────────────────────────
lyric 실행 → 실패 → 재시도 (max_retries=3)
  ├─ 재시도 성공 → song 자동 실행 → video 자동 실행
  └─ 3회 모두 실패 → chain 전체 FAILURE
     └─ link_error 콜백 실행

[시나리오 2] song 실패 (lyric 이미 완료)
──────────────────────────────────────
lyric 완료 ✓ → song 실행 → 실패 → 재시도
  ├─ 재시도 성공 → video 자동 실행
  └─ 모두 실패 → chain 전체 FAILURE
     ※ lyric 결과는 DB에 보존됨 (재사용 가능)

[시나리오 3] 부분 재시작 (chain의 약점)
──────────────────────────────────────
chain은 기본적으로 "전체 또는 없음"입니다.
song에서 실패하면, chain 자체가 실패합니다.
song부터 재시작하려면 새로운 chain을 만들어야 합니다:

# song부터 재시작
resume_chain = chain(
    generate_song.s(prev_lyric_result),
    generate_video.s(),
)
resume_chain.apply_async()
# 전체 chain에 에러 핸들러 연결
pipeline = chain(
    generate_lyric.s(data),
    generate_song.s(),
    generate_video.s(),
)

# link_error: chain의 어떤 단계에서든 실패하면 호출
pipeline.apply_async(
    link_error=pipeline_error_handler.s()
)

@celery_app.task
def pipeline_error_handler(request, exc, traceback):
    """
    실패 알림 및 DLQ 저장
    """
    # Slack 알림
    # DLQ에 실패 정보 저장
    # DB 상태 업데이트 (failed)
    pass

7.3 부분 재시작 API

@router.post("/resume/{task_id}/{from_stage}")
async def resume_pipeline(
    task_id: str,
    from_stage: str,  # "song" 또는 "video"
    current_user=Depends(get_current_user)
):
    """
    실패한 단계부터 chain 재시작

    chain은 전체 재시작만 지원하므로,
    실패한 단계부터 새로운 chain을 생성합니다.
    """
    if from_stage == 'song':
        # DB에서 lyric 결과 조회
        lyric_result = await get_lyric_result_from_db(task_id)

        resume = chain(
            generate_song.s(lyric_result),
            generate_video.s(),
        )
        result = resume.apply_async()

    elif from_stage == 'video':
        song_result = await get_song_result_from_db(task_id)

        # 단일 태스크 (chain 불필요)
        result = generate_video.apply_async(
            args=(song_result,),
            queue='video_queue',
        )

    return {'resumed_from': from_stage, 'chain_id': result.id}

8. 설계 및 동작 설명

8.1 chain이 "독립성"을 보장하는 방법

┌─────────────────────────────────────────────────────────────────────────────┐
│                    chain의 독립성 보장 메커니즘                               │
└─────────────────────────────────────────────────────────────────────────────┘

[기존안]
lyric_tasks.py 내부에 이런 코드가 있음:
  from app.tasks.song_tasks import generate_song   ◄── 직접 의존
  generate_song.apply_async(...)                    ◄── 직접 호출

→ lyric_tasks.py가 song_tasks.py를 "알고 있음"
→ 수정 시 양쪽 파일을 함께 변경해야 할 수 있음

[이 설계안]
lyric_tasks.py는 단순히 결과를 return합니다.
pipeline.py에서 chain을 선언합니다.

→ lyric_tasks.py는 song_tasks.py의 존재를 모름
→ 각 태스크가 완전히 독립적인 "순수 함수"
→ 파이프라인 순서 변경은 pipeline.py만 수정

비유:
  기존안: 선수가 다음 주자를 직접 불러 릴레이 바통 전달
  이 방식: 감독(Celery)이 주자 순서를 관리, 선수는 자기 주로만 달림

8.2 chain vs 명시적 전달 의사결정 매트릭스

┌──────────────────────┬─────────────────┬─────────────────┐
│        기준          │   기존안 (명시적)│ 이 방식 (chain) │
├──────────────────────┼─────────────────┼─────────────────┤
│ 태스크 간 결합도     │     중간         │     낮음        │
│ 파이프라인 가시성    │     분산         │     집중        │
│ 부분 재시작 용이성   │     높음         │     중간        │
│ 코드 복잡도          │     중간         │     낮음        │
│ 유연성 (동적 분기)   │     높음         │     낮음        │
│ 디버깅 난이도        │     중간         │     중간        │
│ Celery 네이티브 지원 │     일부         │     완전        │
│ 추가 코드 필요량     │     많음         │     적음        │
└──────────────────────┴─────────────────┴─────────────────┘

8.3 언제 이 방식을 선택해야 하는가

이 방식이 적합한 경우:
✓ 파이프라인이 항상 고정된 순서 (lyric → song → video)
✓ 각 태스크가 순수 함수처럼 동작
✓ 파이프라인 전체를 한눈에 보고 싶을 때
✓ 코드량을 최소화하고 싶을 때

이 방식이 부적합한 경우:
✗ 동적 분기가 필요한 경우 (조건에 따라 다른 태스크 실행)
✗ 병렬 실행이 필요한 경우 (chord/group 조합 필요)
✗ 부분 재시작을 자주 하는 경우
✗ 태스크 결과가 매우 큰 경우 (메시지 크기 문제)

9. 기존안과의 비교

┌─────────────────────────────────────────────────────────────────────────────┐
│                    기존안 vs 설계안 1 종합 비교                               │
└─────────────────────────────────────────────────────────────────────────────┘

                   기존안                          설계안 1
            ┌─────────────────┐           ┌─────────────────────┐
            │  명시적 큐 전달  │           │  Celery Chain       │
            └────────┬────────┘           └──────────┬──────────┘
                     │                               │
파이프라인    각 태스크가                    API에서 chain()으로
정의 위치     직접 다음 단계 호출            한 번에 선언
                     │                               │
태스크       다음 태스크를                   다음 태스크를
의존성       직접 import                     전혀 모름
                     │                               │
결과 전달    task_id만 전달                  반환값 전체 전달
방식         DB에서 재조회                   (경량 데이터 권장)
                     │                               │
부분         해당 단계부터                   새 chain 생성 필요
재시작       바로 재시작 가능                (약간 번거로움)
                     │                               │
코드량       태스크당 ~10줄 추가             API에 chain 3줄
             (apply_async 호출)              (태스크는 return만)
                     │                               │
적합한       동적 파이프라인                 고정 순서
상황         조건부 분기                     단순한 체인

10. 배포 및 운영

10.1 실행 명령어 (기존안과 동일)

# 워커 실행은 기존안과 동일
celery -A app.celery_app worker -Q lyric_queue -c 2 -n lyric@%h
celery -A app.celery_app worker -Q song_queue  -c 2 -n song@%h
celery -A app.celery_app worker -Q video_queue -c 1 -n video@%h

10.2 Docker Compose (기존안과 동일)

워커 구성, 스케일링 전략은 기존안과 동일합니다. 차이점은 API 서버 코드에서 chain을 사용한다는 것뿐입니다.


문서 버전

버전 날짜 변경 내용
1.0 2024-XX-XX 초안 작성