o2o-infinith-backend/app/services/analysis.py

281 lines
13 KiB
Python

import json
import logging
from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis
from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import report_prompt, plan_prompt
from integrations.llm.schemas.report import ReportOutput
from integrations.llm.schemas.plan import PlanOutput
from models.status import AnalysisStatus
logger = logging.getLogger(__name__)
async def generate_report(analysis_run_id: str) -> ReportOutput:
run = await fetchone(
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
clinic_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic_row["raw_data"] if clinic_row else None
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
raw = await get_analysis_raw_data(analysis_run_id)
market = await get_market_analysis(analysis_run_id)
def _json(v) -> str | None:
return json.dumps(v, ensure_ascii=False) if v else None
input_data = {
"clinic_name": clinic.get("clinicName"),
"clinic_name_en": clinic.get("clinicNameEn"),
"address": clinic.get("address"),
"phone": clinic.get("phone"),
"slogan": clinic.get("slogan"),
"services": json.dumps(clinic.get("services", []), ensure_ascii=False),
"doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False),
"market_competitors": _json(market.get("competitors")),
"market_keywords": _json(market.get("keywords")),
"market_trend": _json(market.get("trend")),
"market_target_audience": _json(market.get("target_audience")),
"branding": _json(clinic.get("branding")),
"brand_assets": _json(clinic.get("brandAssets")),
"tiktok": _json(clinic.get("tiktok")),
"instagram_en": _json(clinic.get("instagramEn")),
"facebook_en": _json(clinic.get("facebookEn")),
"channel_logos": _json(clinic.get("channelLogos")),
**{
channel: _json(data)
for channel, data in raw.items()
},
}
return await LLMService(provider="perplexity").generate(report_prompt, input_data)
async def generate_plan(analysis_run_id: str) -> PlanOutput:
run = await fetchone(
"SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
clinic_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic_row["raw_data"] if clinic_row else None
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
report_data = run["report_data"]
report = json.loads(report_data) if isinstance(report_data, str) else report_data
market = await get_market_analysis(analysis_run_id)
def _json(v) -> str | None:
return json.dumps(v, ensure_ascii=False) if v else None
input_data = {
"clinic_name": clinic.get("clinicName"),
"clinic_name_en": clinic.get("clinicNameEn"),
"address": clinic.get("address"),
"phone": clinic.get("phone"),
"slogan": clinic.get("slogan"),
"services": json.dumps(clinic.get("services", []), ensure_ascii=False),
"doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False),
"report": _json(report),
"market_competitors": _json(market.get("competitors")),
"market_keywords": _json(market.get("keywords")),
"market_trend": _json(market.get("trend")),
"market_target_audience": _json(market.get("target_audience")),
"tiktok": _json(clinic.get("tiktok")),
"instagram_en": _json(clinic.get("instagramEn")),
"facebook_en": _json(clinic.get("facebookEn")),
"channel_logos": _json(clinic.get("channelLogos")),
"brand_assets": _json(clinic.get("brandAssets")),
}
return await LLMService(provider="perplexity").generate(plan_prompt, input_data)
def _en_instagram_account(d: dict) -> dict:
"""영문 인스타 raw_data → InstagramAccount dict (factual 값 + 빈 정성필드). audit 보강용."""
return {
"handle": d["username"], "language": "EN", "label": "인스타그램 EN",
"posts": d.get("posts") or 0, "followers": d.get("followers") or 0,
"following": d.get("following") or 0, "category": "",
"profile_link": f"https://www.instagram.com/{d['username']}/",
"highlights": [], "reels_count": 0, "content_format": "", "profile_photo": "",
"bio": d.get("bio") or "",
}
def _en_facebook_page(d: dict) -> dict:
"""영문 페북 raw_data → FacebookPage dict (factual 값 + 빈 정성필드). audit 보강용."""
url = d.get("pageUrl") or ""
return {
"url": url, "page_name": d.get("pageName") or "", "language": "EN", "label": "페이스북 EN",
"followers": d.get("followers") or 0, "following": 0,
"category": ", ".join(d.get("categories") or []), "bio": d.get("intro") or "",
"logo": "", "logo_description": "", "link": url, "linked_domain": d.get("website") or "",
"reviews": 0, "recent_post_age": "", "has_whatsapp": False,
}
def _clinic_snapshot(brand_assets: dict, g: dict) -> dict:
"""brandAssets(색·로고) + 강남언니(평점/리뷰/대표의) → clinic_snapshot 정확값."""
snap: dict = {}
if brand_assets.get("brand_colors"): snap["brand_colors"] = brand_assets["brand_colors"]
if brand_assets.get("logo_images"): snap["logo_images"] = brand_assets["logo_images"]
if g.get("name"): snap["name"] = g["name"]
if g.get("rating"): snap["overall_rating"] = g["rating"]
if g.get("totalReviews"): snap["total_reviews"] = g["totalReviews"]
if g.get("address"): snap["location"] = g["address"]
if g.get("badges"): snap["certifications"] = g["badges"]
if g.get("totalMajorStaffs"): snap["staff_count"] = g["totalMajorStaffs"]
doctors = g.get("doctors", [])
if doctors:
lead = max(doctors, key=lambda d: d.get("reviews", 0))
snap["lead_doctor"] = {
"name": lead.get("name"), "credentials": lead.get("specialty"),
"rating": lead.get("rating"), "review_count": lead.get("reviews"),
}
return snap
def _instagram_patch(ig: dict) -> dict:
"""instagram_data(KR) → instagram_audit.accounts factual 덮어쓰기 값."""
p: dict = {}
if ig.get("username"):
p["handle"] = ig["username"]
p["profile_link"] = f"https://www.instagram.com/{ig['username']}/"
if ig.get("posts"): p["posts"] = ig["posts"]
if ig.get("followers"): p["followers"] = ig["followers"]
if ig.get("following"): p["following"] = ig["following"]
if ig.get("bio"): p["bio"] = ig["bio"]
return p
def _facebook_patch(fb: dict) -> dict:
"""facebook_data(KR) → facebook_audit.pages factual 덮어쓰기 값."""
p: dict = {}
if fb.get("pageUrl"):
p["url"] = fb["pageUrl"]
p["link"] = fb["pageUrl"]
if fb.get("pageName"): p["page_name"] = fb["pageName"]
if fb.get("followers"): p["followers"] = fb["followers"]
if fb.get("intro"): p["bio"] = fb["intro"]
if fb.get("categories"): p["category"] = ", ".join(fb["categories"])
if fb.get("website"): p["linked_domain"] = fb["website"]
return p
def _youtube_patch(yt: dict) -> dict:
"""youtube_data → youtube_audit factual 덮어쓰기 값."""
p: dict = {}
if yt.get("channelName"): p["channel_name"] = yt["channelName"]
if yt.get("handle"): p["handle"] = yt["handle"]
if yt.get("subscribers"): p["subscribers"] = yt["subscribers"]
if yt.get("totalVideos"): p["total_videos"] = yt["totalVideos"]
if yt.get("totalViews"): p["total_views"] = yt["totalViews"]
if yt.get("publishedAt"): p["channel_created_date"] = yt["publishedAt"][:10]
if yt.get("description"): p["channel_description"] = yt["description"]
if yt.get("videos"):
p["top_videos"] = [
{
"title": v["title"], "views": v["views"], "duration": v.get("duration"),
"type": "Short" if "M" not in v.get("duration", "") else "Long",
"uploaded_ago": v.get("date", "")[:10],
}
for v in yt["videos"]
]
return p
async def _build_overrides(analysis_run_id: str) -> dict:
run = await fetchone(
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
if not run:
return {}
hospital_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {}
instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {}
facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {}
youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {}
gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {}
snapshot = _clinic_snapshot(hospital.get("brandAssets") or {}, gangnam_unni)
ig_patch = _instagram_patch(instagram)
fb_patch = _facebook_patch(facebook)
yt_patch = _youtube_patch(youtube)
ig_en = hospital.get("instagramEn") or {}
fb_en = hospital.get("facebookEn") or {}
overrides: dict = {}
if snapshot:
overrides["clinic_snapshot"] = snapshot
if ig_patch:
overrides["instagram_audit"] = {"accounts": [ig_patch]}
if fb_patch:
overrides["facebook_audit"] = {"pages": [fb_patch]}
if yt_patch:
overrides["youtube_audit"] = yt_patch
if ig_en.get("username"):
overrides["_en_ig_account"] = _en_instagram_account(ig_en)
if fb_en.get("pageUrl") or fb_en.get("pageName"):
overrides["_en_fb_page"] = _en_facebook_page(fb_en)
return overrides
def _deep_merge(base: dict, overrides: dict) -> dict:
for k, v in overrides.items():
if isinstance(v, dict) and isinstance(base.get(k), dict):
_deep_merge(base[k], v)
elif isinstance(v, list) and isinstance(base.get(k), list):
for i, item in enumerate(v):
if i < len(base[k]) and isinstance(item, dict) and isinstance(base[k][i], dict):
_deep_merge(base[k][i], item)
else:
base[k] = v
return base
def _ensure_en_entry(audit: dict, list_key: str, en_entry: dict | None) -> None:
"""audit 리스트(accounts/pages)에 EN 항목이 없으면 추가 — LLM 누락 대비, 중복 방지."""
if not en_entry:
return
items = audit.setdefault(list_key, [])
if not any(it.get("language") == "EN" for it in items):
items.append(en_entry)
def _patch_report(result: ReportOutput, overrides: dict) -> ReportOutput:
en_ig = overrides.pop("_en_ig_account", None)
en_fb = overrides.pop("_en_fb_page", None)
merged = _deep_merge(result.model_dump(), overrides)
# LLM이 audit에 영문 계정을 빠뜨려도 항상 KR+EN 둘 다 보장.
_ensure_en_entry(merged.setdefault("instagram_audit", {}), "accounts", en_ig)
_ensure_en_entry(merged.setdefault("facebook_audit", {}), "pages", en_fb)
return ReportOutput(**merged)
async def run_report_task(analysis_run_id: str) -> None:
logger.info("[report] start run=%s", analysis_run_id)
result = await generate_report(analysis_run_id)
result = _patch_report(result, await _build_overrides(analysis_run_id))
await save_analysis_report(analysis_run_id, result.model_dump())
logger.info("[report] done run=%s", analysis_run_id)
async def run_plan_task(analysis_run_id: str) -> None:
logger.info("[plan] start run=%s", analysis_run_id)
result = await generate_plan(analysis_run_id)
await execute(
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
(json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id),
)
logger.info("[plan] done run=%s", analysis_run_id)