o2o-infinith-backend/app/services/analysis.py

66 lines
2.8 KiB
Python

import asyncio
import json
import logging
from common.db import fetchone, execute, is_done, get_analysis_raw_data, save_analysis_report
from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import report_prompt
from integrations.llm.schemas.report import ReportOutput
logger = logging.getLogger(__name__)
async def generate_report(analysis_run_id: str) -> ReportOutput:
run = await fetchone(
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
clinic_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic_row["raw_data"] if clinic_row else None
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
raw = await get_analysis_raw_data(analysis_run_id)
input_data = {
"clinic_name": clinic.get("clinicName"),
"clinic_name_en": clinic.get("clinicNameEn"),
"address": clinic.get("address"),
"phone": clinic.get("phone"),
"slogan": clinic.get("slogan"),
"services": json.dumps(clinic.get("services", []), ensure_ascii=False),
"doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False),
**{
channel: json.dumps(data, ensure_ascii=False) if data else None
for channel, data in raw.items()
},
}
return await LLMService(provider="perplexity").generate(report_prompt, input_data)
async def run_report_task(analysis_run_id: str) -> None:
logger.info("[report] start run=%s", analysis_run_id)
result = await generate_report(analysis_run_id)
await save_analysis_report(analysis_run_id, result.model_dump())
await execute("UPDATE analysis_runs SET status = 'completed' WHERE analysis_run_id = %s", (analysis_run_id,))
logger.info("[report] done run=%s", analysis_run_id)
async def check_and_advance_analysis(analysis_run_id: str) -> None:
run = await fetchone(
"SELECT instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
results = [
await is_done("instagram_data", run["instagram_data_id"]),
await is_done("facebook_data", run["facebook_data_id"]),
await is_done("naver_blog_data", run["naver_blog_data_id"]),
await is_done("youtube_data", run["youtube_data_id"]),
await is_done("gangnam_unni_data", run["gangnam_unni_data_id"]),
]
if all(results):
await execute("UPDATE analysis_runs SET status = 'analyzing' WHERE analysis_run_id = %s", (analysis_run_id,))
asyncio.create_task(run_report_task(analysis_run_id))