import json import logging from common.db import fetchone, execute, get_analysis_raw_data, save_analysis_report, get_market_analysis from integrations.llm.llm_service import LLMService from integrations.llm.prompt import report_prompt, plan_prompt from integrations.llm.schemas.report import ReportOutput from integrations.llm.schemas.plan import PlanOutput from models.status import AnalysisStatus logger = logging.getLogger(__name__) async def generate_report(analysis_run_id: str) -> ReportOutput: run = await fetchone( "SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,), ) clinic_row = await fetchone( "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],), ) raw_data = clinic_row["raw_data"] if clinic_row else None clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) raw = await get_analysis_raw_data(analysis_run_id) market = await get_market_analysis(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None input_data = { "clinic_name": clinic.get("clinicName"), "clinic_name_en": clinic.get("clinicNameEn"), "address": clinic.get("address"), "phone": clinic.get("phone"), "slogan": clinic.get("slogan"), "services": json.dumps(clinic.get("services", []), ensure_ascii=False), "doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False), "market_competitors": _json(market.get("competitors")), "market_keywords": _json(market.get("keywords")), "market_trend": _json(market.get("trend")), "market_target_audience": _json(market.get("target_audience")), **{ channel: _json(data) for channel, data in raw.items() }, } return await LLMService(provider="perplexity").generate(report_prompt, input_data) async def generate_plan(analysis_run_id: str) -> PlanOutput: run = await fetchone( "SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,), ) clinic_row = await fetchone( "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],), ) raw_data = clinic_row["raw_data"] if clinic_row else None clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) report_data = run["report_data"] report = json.loads(report_data) if isinstance(report_data, str) else report_data market = await get_market_analysis(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None input_data = { "clinic_name": clinic.get("clinicName"), "clinic_name_en": clinic.get("clinicNameEn"), "address": clinic.get("address"), "phone": clinic.get("phone"), "slogan": clinic.get("slogan"), "services": json.dumps(clinic.get("services", []), ensure_ascii=False), "doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False), "report": _json(report), "market_competitors": _json(market.get("competitors")), "market_keywords": _json(market.get("keywords")), "market_trend": _json(market.get("trend")), "market_target_audience": _json(market.get("target_audience")), } return await LLMService(provider="perplexity").generate(plan_prompt, input_data) async def run_report_task(analysis_run_id: str) -> None: logger.info("[report] start run=%s", analysis_run_id) result = await generate_report(analysis_run_id) await save_analysis_report(analysis_run_id, result.model_dump()) logger.info("[report] done run=%s", analysis_run_id) async def run_plan_task(analysis_run_id: str) -> None: logger.info("[plan] start run=%s", analysis_run_id) result = await generate_plan(analysis_run_id) await execute( "UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s", (json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id), ) logger.info("[plan] done run=%s", analysis_run_id)