o2o-infinith-backend/app/services/pipeline.py

59 lines
2.7 KiB
Python

import logging
from common.db import fetchone, execute
from models.status import AnalysisStatus
from services.collect import collect_all
from services.market import run_market_analysis
from services.analysis import run_report_task, run_plan_task
logger = logging.getLogger(__name__)
async def run_pipeline(analysis_run_id: str, extra_channels: dict | None = None) -> None:
logger.info("[pipeline] start run=%s", analysis_run_id)
extra_channels = extra_channels or {}
# ── 1. Collect ──────────────────────────────────────────────────────────
run = await fetchone(
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
await collect_all(
analysis_run_id,
hospital_id=run["hospital_id"],
instagram_id=run["instagram_data_id"],
facebook_id=run["facebook_data_id"],
naver_blog_id=run["naver_blog_data_id"],
youtube_id=run["youtube_data_id"],
gangnam_unni_id=run["gangnam_unni_data_id"],
tiktok_url=extra_channels.get("tiktok"),
instagram_en_url=extra_channels.get("instagram_en"),
facebook_en_url=extra_channels.get("facebook_en"),
kakao_talk_url=extra_channels.get("kakao_talk"),
naver_cafe_url=extra_channels.get("naver_cafe"),
)
# ── 2. Market ────────────────────────────────────────────────────────────
await execute(
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.ANALYZING, analysis_run_id),
)
await run_market_analysis(analysis_run_id)
# ── 3. Report ────────────────────────────────────────────────────────────
await run_report_task(analysis_run_id)
# ── 4. Plan ──────────────────────────────────────────────────────────────
await execute(
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.PLANNING, analysis_run_id),
)
await run_plan_task(analysis_run_id)
await execute(
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.COMPLETED, analysis_run_id),
)
logger.info("[pipeline] done run=%s", analysis_run_id)