212 lines
11 KiB
Python
212 lines
11 KiB
Python
import json
|
|
import logging
|
|
from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis
|
|
from integrations.llm.llm_service import LLMService
|
|
from integrations.llm.prompt import report_prompt, plan_prompt
|
|
from integrations.llm.schemas.report import ReportOutput
|
|
from integrations.llm.schemas.plan import PlanOutput
|
|
from models.status import AnalysisStatus
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def generate_report(analysis_run_id: str) -> ReportOutput:
|
|
run = await fetchone(
|
|
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
|
|
(analysis_run_id,),
|
|
)
|
|
clinic_row = await fetchone(
|
|
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
(run["hospital_id"],),
|
|
)
|
|
raw_data = clinic_row["raw_data"] if clinic_row else None
|
|
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
|
|
raw = await get_analysis_raw_data(analysis_run_id)
|
|
market = await get_market_analysis(analysis_run_id)
|
|
|
|
def _json(v) -> str | None:
|
|
return json.dumps(v, ensure_ascii=False) if v else None
|
|
|
|
input_data = {
|
|
"clinic_name": clinic.get("clinicName"),
|
|
"clinic_name_en": clinic.get("clinicNameEn"),
|
|
"address": clinic.get("address"),
|
|
"phone": clinic.get("phone"),
|
|
"slogan": clinic.get("slogan"),
|
|
"services": json.dumps(clinic.get("services", []), ensure_ascii=False),
|
|
"doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False),
|
|
"market_competitors": _json(market.get("competitors")),
|
|
"market_keywords": _json(market.get("keywords")),
|
|
"market_trend": _json(market.get("trend")),
|
|
"market_target_audience": _json(market.get("target_audience")),
|
|
**{
|
|
channel: _json(data)
|
|
for channel, data in raw.items()
|
|
},
|
|
}
|
|
|
|
return await LLMService(provider="perplexity").generate(report_prompt, input_data)
|
|
|
|
async def generate_plan(analysis_run_id: str) -> PlanOutput:
|
|
run = await fetchone(
|
|
"SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s",
|
|
(analysis_run_id,),
|
|
)
|
|
clinic_row = await fetchone(
|
|
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
(run["hospital_id"],),
|
|
)
|
|
raw_data = clinic_row["raw_data"] if clinic_row else None
|
|
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
|
|
report_data = run["report_data"]
|
|
report = json.loads(report_data) if isinstance(report_data, str) else report_data
|
|
market = await get_market_analysis(analysis_run_id)
|
|
|
|
def _json(v) -> str | None:
|
|
return json.dumps(v, ensure_ascii=False) if v else None
|
|
|
|
input_data = {
|
|
"clinic_name": clinic.get("clinicName"),
|
|
"clinic_name_en": clinic.get("clinicNameEn"),
|
|
"address": clinic.get("address"),
|
|
"phone": clinic.get("phone"),
|
|
"slogan": clinic.get("slogan"),
|
|
"services": json.dumps(clinic.get("services", []), ensure_ascii=False),
|
|
"doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False),
|
|
"report": _json(report),
|
|
"market_competitors": _json(market.get("competitors")),
|
|
"market_keywords": _json(market.get("keywords")),
|
|
"market_trend": _json(market.get("trend")),
|
|
"market_target_audience": _json(market.get("target_audience")),
|
|
}
|
|
|
|
return await LLMService(provider="perplexity").generate(plan_prompt, input_data)
|
|
|
|
|
|
async def _build_overrides(analysis_run_id: str) -> dict:
|
|
run = await fetchone(
|
|
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
|
|
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
|
|
" FROM analysis_runs WHERE analysis_run_id = %s",
|
|
(analysis_run_id,),
|
|
)
|
|
if not run:
|
|
return {}
|
|
|
|
hospital_row = await fetchone(
|
|
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
(run["hospital_id"],),
|
|
)
|
|
hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {}
|
|
instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {}
|
|
facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {}
|
|
naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {}
|
|
youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {}
|
|
gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {}
|
|
|
|
snapshot: dict = {}
|
|
|
|
# ── gangnam_unni ──────────────────────────────────────────────────────────
|
|
doctors = gangnam_unni.get("doctors", [])
|
|
lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None
|
|
if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"]
|
|
if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"]
|
|
if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"]
|
|
if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"]
|
|
if gangnam_unni.get("badges"): snapshot["certifications"] = gangnam_unni["badges"]
|
|
if doctors: snapshot["staff_count"] = len(doctors)
|
|
if lead:
|
|
snapshot["lead_doctor"] = {
|
|
"name": lead.get("name"),
|
|
"credentials": lead.get("specialty"),
|
|
"rating": lead.get("rating"),
|
|
"review_count": lead.get("reviews"),
|
|
}
|
|
|
|
# ── instagram ─────────────────────────────────────────────────────────────
|
|
ig_patch: dict = {}
|
|
if instagram.get("username"): ig_patch["handle"] = instagram["username"]
|
|
if instagram.get("posts"): ig_patch["posts"] = instagram["posts"]
|
|
if instagram.get("followers"): ig_patch["followers"] = instagram["followers"]
|
|
if instagram.get("following"): ig_patch["following"] = instagram["following"]
|
|
if instagram.get("bio"): ig_patch["bio"] = instagram["bio"]
|
|
if instagram.get("username"): ig_patch["profile_link"] = f"https://www.instagram.com/{instagram['username']}/"
|
|
|
|
# ── facebook ──────────────────────────────────────────────────────────────
|
|
fb_patch: dict = {}
|
|
if facebook.get("pageUrl"): fb_patch["url"] = facebook["pageUrl"]
|
|
if facebook.get("pageUrl"): fb_patch["link"] = facebook["pageUrl"]
|
|
if facebook.get("pageName"): fb_patch["page_name"] = facebook["pageName"]
|
|
if facebook.get("followers"): fb_patch["followers"] = facebook["followers"]
|
|
if facebook.get("intro"): fb_patch["bio"] = facebook["intro"]
|
|
if facebook.get("categories"): fb_patch["category"] = ", ".join(facebook["categories"])
|
|
if facebook.get("website"): fb_patch["linked_domain"] = facebook["website"]
|
|
|
|
# ── youtube ───────────────────────────────────────────────────────────────
|
|
yt_patch: dict = {}
|
|
if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"]
|
|
if youtube.get("handle"): yt_patch["handle"] = youtube["handle"]
|
|
if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"]
|
|
if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"]
|
|
if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"]
|
|
if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10]
|
|
if youtube.get("description"): yt_patch["channel_description"] = youtube["description"]
|
|
if youtube.get("publishedAt"): snapshot["established"] = youtube["publishedAt"][:4]
|
|
if youtube.get("videos"):
|
|
yt_patch["top_videos"] = [
|
|
{
|
|
"title": v["title"],
|
|
"views": v["views"],
|
|
"duration": v.get("duration"),
|
|
"type": "Short" if "M" not in v.get("duration", "") else "Long",
|
|
"uploaded_ago": v.get("date", "")[:10],
|
|
}
|
|
for v in youtube["videos"]
|
|
]
|
|
|
|
overrides: dict = {}
|
|
if snapshot:
|
|
overrides["clinic_snapshot"] = snapshot
|
|
if ig_patch:
|
|
overrides["instagram_audit"] = {"accounts": [ig_patch]}
|
|
if fb_patch:
|
|
overrides["facebook_audit"] = {"pages": [fb_patch]}
|
|
if yt_patch:
|
|
overrides["youtube_audit"] = yt_patch
|
|
return overrides
|
|
|
|
|
|
def _deep_merge(base: dict, overrides: dict) -> dict:
|
|
for k, v in overrides.items():
|
|
if isinstance(v, dict) and isinstance(base.get(k), dict):
|
|
_deep_merge(base[k], v)
|
|
elif isinstance(v, list) and isinstance(base.get(k), list):
|
|
for i, item in enumerate(v):
|
|
if i < len(base[k]) and isinstance(item, dict) and isinstance(base[k][i], dict):
|
|
_deep_merge(base[k][i], item)
|
|
else:
|
|
base[k] = v
|
|
return base
|
|
|
|
def _patch_report(result: ReportOutput, overrides: dict) -> ReportOutput:
|
|
merged = _deep_merge(result.model_dump(), overrides)
|
|
return ReportOutput(**merged)
|
|
|
|
|
|
async def run_report_task(analysis_run_id: str) -> None:
|
|
logger.info("[report] start run=%s", analysis_run_id)
|
|
result = await generate_report(analysis_run_id)
|
|
result = _patch_report(result, await _build_overrides(analysis_run_id))
|
|
await save_analysis_report(analysis_run_id, result.model_dump())
|
|
logger.info("[report] done run=%s", analysis_run_id)
|
|
|
|
|
|
async def run_plan_task(analysis_run_id: str) -> None:
|
|
logger.info("[plan] start run=%s", analysis_run_id)
|
|
result = await generate_plan(analysis_run_id)
|
|
await execute(
|
|
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
|
|
(json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id),
|
|
)
|
|
logger.info("[plan] done run=%s", analysis_run_id)
|