53 lines
2.4 KiB
Python
53 lines
2.4 KiB
Python
import logging
|
|
from common.db import fetchone, execute
|
|
from models.status import AnalysisStatus
|
|
from services.collect import collect_all
|
|
from services.market import run_market_analysis
|
|
from services.analysis import run_report_task, run_plan_task
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def run_pipeline(analysis_run_id: str) -> None:
|
|
logger.info("[pipeline] start run=%s", analysis_run_id)
|
|
|
|
# ── 1. Collect ──────────────────────────────────────────────────────────
|
|
run = await fetchone(
|
|
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
|
|
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
|
|
" FROM analysis_runs WHERE analysis_run_id = %s",
|
|
(analysis_run_id,),
|
|
)
|
|
await collect_all(
|
|
analysis_run_id,
|
|
hospital_id=run["hospital_id"],
|
|
instagram_id=run["instagram_data_id"],
|
|
facebook_id=run["facebook_data_id"],
|
|
naver_blog_id=run["naver_blog_data_id"],
|
|
youtube_id=run["youtube_data_id"],
|
|
gangnam_unni_id=run["gangnam_unni_data_id"],
|
|
)
|
|
|
|
# ── 2. Market ────────────────────────────────────────────────────────────
|
|
await execute(
|
|
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
|
|
(AnalysisStatus.ANALYZING, analysis_run_id),
|
|
)
|
|
await run_market_analysis(analysis_run_id)
|
|
|
|
# ── 3. Report ────────────────────────────────────────────────────────────
|
|
await run_report_task(analysis_run_id)
|
|
|
|
# ── 4. Plan ──────────────────────────────────────────────────────────────
|
|
await execute(
|
|
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
|
|
(AnalysisStatus.PLANNING, analysis_run_id),
|
|
)
|
|
await run_plan_task(analysis_run_id)
|
|
|
|
await execute(
|
|
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
|
|
(AnalysisStatus.COMPLETED, analysis_run_id),
|
|
)
|
|
logger.info("[pipeline] done run=%s", analysis_run_id)
|