From cda518c0275c90ca9d61cab988de0578b78b7ec4 Mon Sep 17 00:00:00 2001 From: jaehwang Date: Tue, 19 May 2026 15:22:34 +0900 Subject: [PATCH] =?UTF-8?q?=EC=8B=9C=EC=9E=A5=20=EC=A1=B0=EC=82=AC=20llm?= =?UTF-8?q?=20=EC=B6=94=EA=B0=80=20=EB=B0=8F=20=ED=8C=8C=EC=9D=B4=ED=94=84?= =?UTF-8?q?=EB=9D=BC=EC=9D=B8=20=EC=A0=95=EB=A6=AC,=20db=20=EC=BB=A4?= =?UTF-8?q?=EB=84=A5=EC=85=98=20=ED=92=80=20=EB=AC=B8=EC=A0=9C=20=EC=B2=98?= =?UTF-8?q?=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- SQL/db_create.sql | 18 ++++ app/api/analysis.py | 21 ++--- app/common/db.py | 34 ++++++-- app/integrations/llm/prompt.py | 34 ++++++++ app/integrations/llm/schemas/market.py | 61 ++++++++++++++ .../temp-prompt/market_competitors_prompt.txt | 2 + .../temp-prompt/market_keywords_prompt.txt | 2 + .../market_target_audience_prompt.txt | 2 + .../llm/temp-prompt/market_trend_prompt.txt | 2 + app/services/analysis.py | 42 ++-------- app/services/collect.py | 39 +++++++-- app/services/market.py | 84 +++++++++++++++++++ app/services/pipeline.py | 52 ++++++++++++ 13 files changed, 328 insertions(+), 65 deletions(-) create mode 100644 app/integrations/llm/schemas/market.py create mode 100644 app/integrations/llm/temp-prompt/market_competitors_prompt.txt create mode 100644 app/integrations/llm/temp-prompt/market_keywords_prompt.txt create mode 100644 app/integrations/llm/temp-prompt/market_target_audience_prompt.txt create mode 100644 app/integrations/llm/temp-prompt/market_trend_prompt.txt create mode 100644 app/services/market.py create mode 100644 app/services/pipeline.py diff --git a/SQL/db_create.sql b/SQL/db_create.sql index 2e8555e..a314918 100644 --- a/SQL/db_create.sql +++ b/SQL/db_create.sql @@ -179,3 +179,21 @@ CREATE INDEX IX_hospital_history_1 CREATE INDEX IX_hospital_history_2 ON hospital_history(analysis_run_id); + +-- market_analysis Table Create SQL +CREATE TABLE market_analysis +( + `id` INT NOT NULL AUTO_INCREMENT, + `analysis_run_id` CHAR(36) NOT NULL, + `analysis_type` VARCHAR(50) NOT NULL, + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type) +); + +-- Index 설정 SQL - market_analysis(analysis_run_id) +CREATE INDEX IX_market_analysis_1 + ON market_analysis(analysis_run_id); + diff --git a/app/api/analysis.py b/app/api/analysis.py index 7805d55..7b7bd6e 100644 --- a/app/api/analysis.py +++ b/app/api/analysis.py @@ -5,7 +5,7 @@ from common.deps import verify_api_key from common.db import fetchone, insert_instagram_row, insert_facebook_row, insert_naver_blog_row, insert_youtube_row, insert_gangnam_unni_row, insert_analysis_run from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse from models.status import AnalysisStatus -from services.collect import collect_instagram, collect_facebook, collect_naver_blog, collect_youtube, collect_gangnam_unni, collect_clinic_info +from services.pipeline import run_pipeline router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)]) logger = logging.getLogger(__name__) @@ -24,7 +24,6 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks ) if not hospital: raise HTTPException(status_code=409, detail="Clinic not found") - owner_user_id = hospital["owner_user_id"] if hospital else 0 ig_id = await insert_instagram_row(hospital_id, body.channels.instagram) if body.channels.instagram else None fb_id = await insert_facebook_row(hospital_id, body.channels.facebook) if body.channels.facebook else None @@ -32,20 +31,12 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks yt_id = await insert_youtube_row(hospital_id, body.channels.youtube) if body.channels.youtube else None gu_id = await insert_gangnam_unni_row(hospital_id, body.channels.gangnam_unni) if body.channels.gangnam_unni else None - analysis_run_id = await insert_analysis_run(analysis_run_id, hospital_id, owner_user_id, ig_id, fb_id, nb_id, yt_id, gu_id) + analysis_run_id = await insert_analysis_run( + analysis_run_id, hospital_id, hospital["owner_user_id"], + ig_id, fb_id, nb_id, yt_id, gu_id, + ) - background_tasks.add_task(collect_clinic_info, analysis_run_id, hospital_id, hospital["url"]) - - if ig_id: - background_tasks.add_task(collect_instagram, analysis_run_id, ig_id, body.channels.instagram) - if fb_id: - background_tasks.add_task(collect_facebook, analysis_run_id, fb_id, body.channels.facebook) - if nb_id: - background_tasks.add_task(collect_naver_blog, analysis_run_id, nb_id, body.channels.naver_blog) - if yt_id: - background_tasks.add_task(collect_youtube, analysis_run_id, yt_id, body.channels.youtube) - if gu_id: - background_tasks.add_task(collect_gangnam_unni, analysis_run_id, gu_id, body.channels.gangnam_unni) + background_tasks.add_task(run_pipeline, analysis_run_id) return AnalysisStartResponse( analysis_run_id=analysis_run_id, diff --git a/app/common/db.py b/app/common/db.py index fea6634..a6cfda7 100644 --- a/app/common/db.py +++ b/app/common/db.py @@ -26,22 +26,38 @@ async def get_pool() -> aiomysql.Pool: # 쓰기 (INSERT/UPDATE/DELETE) async def execute(sql: str, args: tuple = ()) -> int: pool = await get_pool() + print( + f"[Pool] size={pool.size} " + f"free={pool.freesize} " + f"used={pool.size - pool.freesize} " + f"max={pool.maxsize}" + ) async with pool.acquire() as conn: - await conn.ping(reconnect=True) - async with conn.cursor() as cur: - await cur.execute(sql, args) - await conn.commit() - return cur.lastrowid + try: + async with conn.cursor() as cur: + await cur.execute(sql, args) + await conn.commit() + return cur.lastrowid + finally: + conn.close() # 읽기 (SELECT) async def fetchone(sql: str, args: tuple = ()) -> dict | None: pool = await get_pool() + print( + f"[Pool] size={pool.size} " + f"free={pool.freesize} " + f"used={pool.size - pool.freesize} " + f"max={pool.maxsize}" + ) async with pool.acquire() as conn: - await conn.ping(reconnect=True) - async with conn.cursor(aiomysql.DictCursor) as cur: - await cur.execute(sql, args) - return await cur.fetchone() + try: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + return await cur.fetchone() + finally: + conn.close() async def insert_instagram_row(hospital_id: str, url: str) -> int: return await execute("INSERT INTO instagram_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) diff --git a/app/integrations/llm/prompt.py b/app/integrations/llm/prompt.py index c860459..365b7b0 100644 --- a/app/integrations/llm/prompt.py +++ b/app/integrations/llm/prompt.py @@ -3,6 +3,12 @@ from pydantic import BaseModel from common.utils import get_env from integrations.llm.schemas.report import ReportInput, ReportOutput from integrations.llm.schemas.plan import PlanInput, PlanOutput +from integrations.llm.schemas.market import ( + MarketCompetitorsInput, MarketCompetitorsOutput, + MarketKeywordsInput, MarketKeywordsOutput, + MarketTrendInput, MarketTrendOutput, + MarketTargetAudienceInput, MarketTargetAudienceOutput, +) _PROMPT_DIR = os.path.join(os.path.dirname(__file__), "temp-prompt") @@ -46,3 +52,31 @@ plan_prompt = Prompt( input_class=PlanInput, output_class=PlanOutput, ) + +market_competitors_prompt = Prompt( + file_name="market_competitors_prompt.txt", + prompt_model="MARKET_MODEL", + input_class=MarketCompetitorsInput, + output_class=MarketCompetitorsOutput, +) + +market_keywords_prompt = Prompt( + file_name="market_keywords_prompt.txt", + prompt_model="MARKET_MODEL", + input_class=MarketKeywordsInput, + output_class=MarketKeywordsOutput, +) + +market_trend_prompt = Prompt( + file_name="market_trend_prompt.txt", + prompt_model="MARKET_MODEL", + input_class=MarketTrendInput, + output_class=MarketTrendOutput, +) + +market_target_audience_prompt = Prompt( + file_name="market_target_audience_prompt.txt", + prompt_model="MARKET_MODEL", + input_class=MarketTargetAudienceInput, + output_class=MarketTargetAudienceOutput, +) diff --git a/app/integrations/llm/schemas/market.py b/app/integrations/llm/schemas/market.py new file mode 100644 index 0000000..4bf6084 --- /dev/null +++ b/app/integrations/llm/schemas/market.py @@ -0,0 +1,61 @@ +from pydantic import BaseModel + + +class MarketCompetitorsInput(BaseModel): + address: str + services: str + + +class MarketKeywordsInput(BaseModel): + services: str + + +class MarketTrendInput(BaseModel): + service: str + + +class MarketTargetAudienceInput(BaseModel): + clinic_name: str + + +# --- Output --- + +class Competitor(BaseModel): + name: str + procedures: list[str] + reputation: str + marketing_channels: list[str] + + +class MarketCompetitorsOutput(BaseModel): + competitors: list[Competitor] + + +class Keyword(BaseModel): + keyword: str + monthly_volume: str + competition: str + + +class MarketKeywordsOutput(BaseModel): + keywords: list[Keyword] + long_tail_keywords: list[str] + + +class MarketTrendOutput(BaseModel): + market_size: str + growth_rate: str + key_trends: list[str] + channel_effectiveness: list[str] + + +class AudienceSegment(BaseModel): + age_group: str + gender: str + interested_procedures: list[str] + info_channels: list[str] + decision_factors: list[str] + + +class MarketTargetAudienceOutput(BaseModel): + segments: list[AudienceSegment] diff --git a/app/integrations/llm/temp-prompt/market_competitors_prompt.txt b/app/integrations/llm/temp-prompt/market_competitors_prompt.txt new file mode 100644 index 0000000..5b48b97 --- /dev/null +++ b/app/integrations/llm/temp-prompt/market_competitors_prompt.txt @@ -0,0 +1,2 @@ +{address} 근처 {services} 전문 성형외과/피부과 경쟁 병원 5곳을 분석해줘. +각 병원의 이름, 주요 시술, 온라인 평판, 마케팅 채널을 JSON 형식으로 제공해줘. diff --git a/app/integrations/llm/temp-prompt/market_keywords_prompt.txt b/app/integrations/llm/temp-prompt/market_keywords_prompt.txt new file mode 100644 index 0000000..80c96d0 --- /dev/null +++ b/app/integrations/llm/temp-prompt/market_keywords_prompt.txt @@ -0,0 +1,2 @@ +한국 {services} 관련 검색 키워드 트렌드. +네이버와 구글에서 월간 검색량이 높은 키워드 20개, 경쟁 강도, 추천 롱테일 키워드를 JSON 형식으로 제공해줘. diff --git a/app/integrations/llm/temp-prompt/market_target_audience_prompt.txt b/app/integrations/llm/temp-prompt/market_target_audience_prompt.txt new file mode 100644 index 0000000..aecd45e --- /dev/null +++ b/app/integrations/llm/temp-prompt/market_target_audience_prompt.txt @@ -0,0 +1,2 @@ +{clinic_name}의 잠재 고객 분석. +연령대별, 성별, 관심 시술, 정보 탐색 채널, 의사결정 요인을 JSON 형식으로 제공해줘. diff --git a/app/integrations/llm/temp-prompt/market_trend_prompt.txt b/app/integrations/llm/temp-prompt/market_trend_prompt.txt new file mode 100644 index 0000000..08f693a --- /dev/null +++ b/app/integrations/llm/temp-prompt/market_trend_prompt.txt @@ -0,0 +1,2 @@ +한국 {service} 시장 트렌드 2025-2026. +시장 규모, 성장률, 주요 트렌드, 마케팅 채널별 효과를 JSON 형식으로 제공해줘. diff --git a/app/services/analysis.py b/app/services/analysis.py index 1e9828b..d27d2ae 100644 --- a/app/services/analysis.py +++ b/app/services/analysis.py @@ -1,7 +1,6 @@ -import asyncio import json import logging -from common.db import fetchone, execute, is_done, get_analysis_raw_data, save_analysis_report +from common.db import fetchone, execute, get_analysis_raw_data, save_analysis_report from integrations.llm.llm_service import LLMService from integrations.llm.prompt import report_prompt, plan_prompt from integrations.llm.schemas.report import ReportOutput @@ -69,43 +68,18 @@ async def generate_plan(analysis_run_id: str) -> PlanOutput: return await LLMService(provider="perplexity").generate(plan_prompt, input_data) -async def run_plan_task(analysis_run_id: str) -> None: - logger.info("[plan] start run=%s", analysis_run_id) - result = await generate_plan(analysis_run_id) - await execute( - "UPDATE analysis_runs SET plan_data = %s, status = %s WHERE analysis_run_id = %s", - (json.dumps(result.model_dump(), ensure_ascii=False), AnalysisStatus.COMPLETED, analysis_run_id), - ) - logger.info("[plan] done run=%s", analysis_run_id) - - async def run_report_task(analysis_run_id: str) -> None: logger.info("[report] start run=%s", analysis_run_id) result = await generate_report(analysis_run_id) await save_analysis_report(analysis_run_id, result.model_dump()) - await execute("UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", (AnalysisStatus.PLANNING, analysis_run_id)) logger.info("[report] done run=%s", analysis_run_id) - asyncio.create_task(run_plan_task(analysis_run_id)) -async def check_and_advance_analysis(analysis_run_id: str) -> None: - run = await fetchone( - "SELECT hospital_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id" - " FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), +async def run_plan_task(analysis_run_id: str) -> None: + logger.info("[plan] start run=%s", analysis_run_id) + result = await generate_plan(analysis_run_id) + await execute( + "UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s", + (json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id), ) - clinic_row = await fetchone( - "SELECT status FROM hospital_baseinfo WHERE hospital_id = %s", - (run["hospital_id"],), - ) - results = [ - clinic_row["status"] == "done" if clinic_row else False, - await is_done("instagram_data", run["instagram_data_id"]), - await is_done("facebook_data", run["facebook_data_id"]), - await is_done("naver_blog_data", run["naver_blog_data_id"]), - await is_done("youtube_data", run["youtube_data_id"]), - await is_done("gangnam_unni_data", run["gangnam_unni_data_id"]), - ] - if all(results): - await execute("UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", (AnalysisStatus.ANALYZING, analysis_run_id)) - asyncio.create_task(run_report_task(analysis_run_id)) + logger.info("[plan] done run=%s", analysis_run_id) diff --git a/app/services/collect.py b/app/services/collect.py index 6e25ff0..2752c12 100644 --- a/app/services/collect.py +++ b/app/services/collect.py @@ -1,4 +1,6 @@ +import asyncio import logging +from common.db import fetchone from common.db import ( set_instagram_status, save_instagram_raw_data, set_facebook_status, save_facebook_raw_data, @@ -8,7 +10,6 @@ from common.db import ( execute, save_hospital_raw_data, ) from common.utils import get_env -from services.analysis import check_and_advance_analysis from integrations.apify import ApifyClient from integrations.naver import NaverClient from integrations.youtube import YouTubeClient @@ -23,7 +24,6 @@ async def collect_instagram(analysis_run_id: str, row_id: int, url: str) -> None data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url) await save_instagram_raw_data(row_id, data) logger.info("[instagram] done run=%s", analysis_run_id) - await check_and_advance_analysis(analysis_run_id) async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None: @@ -32,7 +32,6 @@ async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None: data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url) await save_facebook_raw_data(row_id, data) logger.info("[facebook] done run=%s", analysis_run_id) - await check_and_advance_analysis(analysis_run_id) async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> None: @@ -41,7 +40,6 @@ async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> Non data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url) await save_naver_blog_raw_data(row_id, data) logger.info("[naver_blog] done run=%s", analysis_run_id) - await check_and_advance_analysis(analysis_run_id) async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None: @@ -50,7 +48,6 @@ async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None: data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url) await save_youtube_raw_data(row_id, data) logger.info("[youtube] done run=%s", analysis_run_id) - await check_and_advance_analysis(analysis_run_id) async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> None: @@ -59,7 +56,6 @@ async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> N data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url) await save_gangnam_unni_raw_data(row_id, data) logger.info("[gangnam_unni] done run=%s", analysis_run_id) - await check_and_advance_analysis(analysis_run_id) async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str) -> None: @@ -68,4 +64,33 @@ async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str) data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url) await save_hospital_raw_data(hospital_id, data, analysis_run_id=analysis_run_id) logger.info("[clinic] done run=%s", analysis_run_id) - await check_and_advance_analysis(analysis_run_id) + + +async def collect_all( + analysis_run_id: str, + hospital_id: str, + instagram_id: int | None = None, + facebook_id: int | None = None, + naver_blog_id: int | None = None, + youtube_id: int | None = None, + gangnam_unni_id: int | None = None, +) -> None: + async def _url(table: str, row_id: int) -> str: + row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,)) + return row["url"] if row else "" + + hospital = await fetchone("SELECT url FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,)) + tasks = [collect_clinic_info(analysis_run_id, hospital_id, hospital["url"])] + + if instagram_id: + tasks.append(collect_instagram(analysis_run_id, instagram_id, await _url("instagram_data", instagram_id))) + if facebook_id: + tasks.append(collect_facebook(analysis_run_id, facebook_id, await _url("facebook_data", facebook_id))) + if naver_blog_id: + tasks.append(collect_naver_blog(analysis_run_id, naver_blog_id, await _url("naver_blog_data", naver_blog_id))) + if youtube_id: + tasks.append(collect_youtube(analysis_run_id, youtube_id, await _url("youtube_data", youtube_id))) + if gangnam_unni_id: + tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id))) + + await asyncio.gather(*tasks, return_exceptions=True) diff --git a/app/services/market.py b/app/services/market.py new file mode 100644 index 0000000..8cfa193 --- /dev/null +++ b/app/services/market.py @@ -0,0 +1,84 @@ +import asyncio +import json +import logging +from common.db import fetchone, execute +from integrations.llm.llm_service import LLMService +from integrations.llm.prompt import ( + market_competitors_prompt, + market_keywords_prompt, + market_trend_prompt, + market_target_audience_prompt, +) + +logger = logging.getLogger(__name__) + +_TYPES = ["competitors", "keywords", "trend", "target_audience"] + + +async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None: + if exc: + logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc) + await execute( + "INSERT INTO market_analysis (analysis_run_id, analysis_type, status)" + " VALUES (%s, %s, 'failed')" + " ON DUPLICATE KEY UPDATE status = 'failed'", + (analysis_run_id, analysis_type), + ) + else: + await execute( + "INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)" + " VALUES (%s, %s, 'done', %s)" + " ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)", + (analysis_run_id, analysis_type, json.dumps(result.model_dump(), ensure_ascii=False)), + ) + + +async def run_market_analysis(analysis_run_id: str) -> None: + logger.info("[market] start run=%s", analysis_run_id) + + run = await fetchone( + "SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", + (analysis_run_id,), + ) + clinic = await fetchone( + "SELECT hospital_name, road_address, raw_data FROM hospital_baseinfo WHERE hospital_id = %s", + (run["hospital_id"],), + ) + + raw_data = clinic["raw_data"] + clinic_data = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) + + clinic_name = clinic["hospital_name"] or "" + address = clinic["road_address"] or "" + services = clinic_data.get("services", []) + services_str = ", ".join(services[:3]) + primary_service = services[0] if services else "" + + for analysis_type in _TYPES: + await execute( + "INSERT INTO market_analysis (analysis_run_id, analysis_type, status)" + " VALUES (%s, %s, 'processing')" + " ON DUPLICATE KEY UPDATE status = 'processing'", + (analysis_run_id, analysis_type), + ) + + llm = LLMService(provider="perplexity") + results = await asyncio.gather( + llm.generate(market_competitors_prompt, {"address": address, "services": services_str}), + llm.generate(market_keywords_prompt, {"services": services_str}), + llm.generate(market_trend_prompt, {"service": primary_service}), + llm.generate(market_target_audience_prompt, {"clinic_name": clinic_name}), + return_exceptions=True, + ) + # 그다지 좋은 방식은 아님 + await asyncio.gather(*[ + _save( + analysis_run_id, + analysis_type, + result=None if isinstance(r, Exception) else r, + exc=r if isinstance(r, Exception) else None, + ) + for analysis_type, r in zip(_TYPES, results) + ]) + + logger.info("[market] done run=%s", analysis_run_id) diff --git a/app/services/pipeline.py b/app/services/pipeline.py new file mode 100644 index 0000000..35b722f --- /dev/null +++ b/app/services/pipeline.py @@ -0,0 +1,52 @@ +import logging +from common.db import fetchone, execute +from models.status import AnalysisStatus +from services.collect import collect_all +from services.market import run_market_analysis +from services.analysis import run_report_task, run_plan_task + +logger = logging.getLogger(__name__) + + +async def run_pipeline(analysis_run_id: str) -> None: + logger.info("[pipeline] start run=%s", analysis_run_id) + + # ── 1. Collect ────────────────────────────────────────────────────────── + run = await fetchone( + "SELECT hospital_id, instagram_data_id, facebook_data_id," + " naver_blog_data_id, youtube_data_id, gangnam_unni_data_id" + " FROM analysis_runs WHERE analysis_run_id = %s", + (analysis_run_id,), + ) + await collect_all( + analysis_run_id, + hospital_id=run["hospital_id"], + instagram_id=run["instagram_data_id"], + facebook_id=run["facebook_data_id"], + naver_blog_id=run["naver_blog_data_id"], + youtube_id=run["youtube_data_id"], + gangnam_unni_id=run["gangnam_unni_data_id"], + ) + + # ── 2. Market ──────────────────────────────────────────────────────────── + await execute( + "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", + (AnalysisStatus.ANALYZING, analysis_run_id), + ) + await run_market_analysis(analysis_run_id) + + # ── 3. Report ──────────────────────────────────────────────────────────── + await run_report_task(analysis_run_id) + + # ── 4. Plan ────────────────────────────────────────────────────────────── + await execute( + "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", + (AnalysisStatus.PLANNING, analysis_run_id), + ) + await run_plan_task(analysis_run_id) + + await execute( + "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", + (AnalysisStatus.COMPLETED, analysis_run_id), + ) + logger.info("[pipeline] done run=%s", analysis_run_id)