o2o-infinith-backend/app/services/analysis.py

366 lines
17 KiB
Python

import json
import logging
import re
from datetime import datetime
from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis
from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt
from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit
from services.instagram_audit import build_instagram_accounts
from services.facebook_audit import build_facebook_pages
from integrations.llm.schemas.plan import PlanOutput
from models.status import AnalysisStatus
logger = logging.getLogger(__name__)
async def generate_report(analysis_run_id: str) -> ReportOutput:
run = await fetchone(
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
clinic_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic_row["raw_data"] if clinic_row else None
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
raw = await get_analysis_raw_data(analysis_run_id)
market = await get_market_analysis(analysis_run_id)
def _json(v) -> str | None:
return json.dumps(v, ensure_ascii=False) if v else None
input_data = {
"clinic_name": clinic.get("clinicName"),
"clinic_name_en": clinic.get("clinicNameEn"),
"address": clinic.get("address"),
"phone": clinic.get("phone"),
"slogan": clinic.get("slogan"),
"services": json.dumps(clinic.get("services", []), ensure_ascii=False),
"doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False),
"market_competitors": _json(market.get("competitors")),
"market_keywords": _json(market.get("keywords")),
"market_trend": _json(market.get("trend")),
"market_target_audience": _json(market.get("target_audience")),
"branding": _json(clinic.get("branding")),
"brand_assets": _json(clinic.get("brandAssets")),
"tiktok": _json(clinic.get("tiktok")),
"instagram_en": _json(clinic.get("instagramEn")),
"facebook_en": _json(clinic.get("facebookEn")),
"kakao_talk": _json(clinic.get("kakaoTalk")),
"naver_cafe": _json(clinic.get("naverCafe")),
"channel_logos": _json(clinic.get("channelLogos")),
**{
channel: _json(data)
for channel, data in raw.items()
},
}
return await LLMService(provider="perplexity").generate(report_prompt, input_data)
async def generate_plan(analysis_run_id: str) -> PlanOutput:
run = await fetchone(
"SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
clinic_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic_row["raw_data"] if clinic_row else None
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
report_data = run["report_data"]
report = json.loads(report_data) if isinstance(report_data, str) else report_data
market = await get_market_analysis(analysis_run_id)
raw = await get_analysis_raw_data(analysis_run_id)
def _json(v) -> str | None:
return json.dumps(v, ensure_ascii=False) if v else None
input_data = {
"clinic_name": clinic.get("clinicName"),
"clinic_name_en": clinic.get("clinicNameEn"),
"address": clinic.get("address"),
"phone": clinic.get("phone"),
"slogan": clinic.get("slogan"),
"services": json.dumps(clinic.get("services", []), ensure_ascii=False),
"doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False),
"report": _json(report),
"market_competitors": _json(market.get("competitors")),
"market_keywords": _json(market.get("keywords")),
"market_trend": _json(market.get("trend")),
"market_target_audience": _json(market.get("target_audience")),
"tiktok": _json(clinic.get("tiktok")),
"instagram_en": _json(clinic.get("instagramEn")),
"facebook_en": _json(clinic.get("facebookEn")),
"naver_blog": _json(_naver_blog_summary(raw.get("naver_blog"))),
"naver_cafe": _json(clinic.get("naverCafe")),
"kakao_talk": _json(clinic.get("kakaoTalk")),
"channel_logos": _json(clinic.get("channelLogos")),
"brand_assets": _json(clinic.get("brandAssets")),
}
return await LLMService(provider="perplexity").generate(plan_prompt, input_data)
def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict:
snapshot: dict = {}
doctors = gangnam_unni.get("doctors", [])
lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None
if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"]
if hospital.get("clinicNameEn"): snapshot["name_en"] = hospital["clinicNameEn"]
if hospital.get("phone"): snapshot["phone"] = hospital["phone"]
if hospital.get("domain"): snapshot["domain"] = hospital["domain"]
if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"]
if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"]
if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"]
if gangnam_unni.get("badges"): snapshot["certifications"] = gangnam_unni["badges"]
if gangnam_unni.get("totalMajorStaffs"): snapshot["staff_count"] = gangnam_unni["totalMajorStaffs"]
if lead:
snapshot["lead_doctor"] = {
"name": lead.get("name"),
"credentials": lead.get("specialty"),
"rating": lead.get("rating"),
"review_count": lead.get("reviews"),
}
# brand_assets에서 logo_images / brand_colors 강제 주입. LLM이 프롬프트 가드 무시하고 null로 두는 케이스 차단.
ba = hospital.get("brandAssets") or {}
if ba.get("logo_images"): snapshot["logo_images"] = ba["logo_images"]
if ba.get("brand_colors"): snapshot["brand_colors"] = ba["brand_colors"]
return ClinicSnapshot.model_validate(snapshot).model_dump()
def _naver_blog_summary(blog: dict | None) -> dict | None:
"""plan 카드 한 장에 들어가는 건 전체 포스트 수와 최근 활동 시점뿐. 그 외(본문·링크·제목)는
던져봐야 토큰만 늘고 LLM이 무관 정보로 hallucinate 함."""
if not blog:
return None
posts = blog.get("posts") or []
return {
"totalPosts": blog.get("totalResults"),
"latestPostDate": posts[0].get("postDate") if posts else None,
}
def _parse_iso_duration_seconds(iso: str) -> int:
m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso or "")
if not m:
return 0
h, mins, s = (int(x or 0) for x in m.groups())
return h * 3600 + mins * 60 + s
def _format_seconds(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}시간 {m}" if h else f"{m}{s}"
def _format_clock(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}"
def _calc_avg_video_length(videos: list[dict]) -> str:
durations = [_parse_iso_duration_seconds(v.get("duration", "")) for v in videos]
durations = [d for d in durations if d > 0]
if not durations:
return ""
return _format_seconds(sum(durations) // len(durations))
def _relative_date(date_str: str) -> str:
if not date_str:
return ""
try:
past = datetime.fromisoformat(date_str[:10])
except ValueError:
return ""
days = (datetime.now() - past).days
if days < 1:
return "오늘"
if days < 30:
return f"{days}일 전"
if days < 365:
return f"{days // 30}개월 전"
return f"{days // 365}년 전"
def _calc_upload_frequency(videos: list[dict]) -> str:
dates = sorted(
[v["date"][:10] for v in videos if v.get("date")],
reverse=True,
)
if len(dates) < 2:
return ""
gaps = [
(datetime.fromisoformat(dates[i]) - datetime.fromisoformat(dates[i + 1])).days
for i in range(len(dates) - 1)
]
avg_days = sum(gaps) // len(gaps)
if avg_days <= 7:
return f"{7 // max(avg_days, 1)}"
if avg_days <= 30:
return f"{30 // avg_days}"
return f"{avg_days}일에 1회"
async def _build_youtube_audit(youtube: dict) -> dict:
videos = youtube.get("videos", [])
yt_patch: dict = {
"weekly_view_growth": {"absolute": 0, "percentage": 0.0},
"estimated_monthly_revenue": {"min": 0, "max": 0},
"linked_urls": [],
"avg_video_length": _calc_avg_video_length(videos),
"upload_frequency": _calc_upload_frequency(videos),
}
if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"]
if youtube.get("handle"): yt_patch["handle"] = youtube["handle"]
if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"]
if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"]
if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"]
if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10]
if youtube.get("description"): yt_patch["channel_description"] = youtube["description"]
if youtube.get("playlists"): yt_patch["playlists"] = youtube["playlists"]
if videos:
yt_patch["top_videos"] = [
{
"title": v["title"],
"views": v["views"],
"duration": _format_clock(_parse_iso_duration_seconds(v.get("duration", ""))),
"type": "Short" if "M" not in v.get("duration", "") else "Long",
"uploaded_ago": _relative_date(v.get("date", "")),
}
for v in videos
]
diagnosis_result = await LLMService(provider="perplexity").generate(
youtube_diagnosis_prompt,
{
"channel_name": yt_patch.get("channel_name"),
"subscribers": yt_patch.get("subscribers"),
"total_videos": yt_patch.get("total_videos"),
"total_views": yt_patch.get("total_views"),
"avg_video_length": yt_patch.get("avg_video_length"),
"upload_frequency": yt_patch.get("upload_frequency"),
"top_videos": json.dumps(yt_patch.get("top_videos", []), ensure_ascii=False),
"playlists": json.dumps(yt_patch.get("playlists", []), ensure_ascii=False),
},
)
yt_patch["diagnosis"] = [item.model_dump() for item in diagnosis_result.diagnosis]
return YouTubeAudit.model_validate(yt_patch).model_dump()
async def _build_overrides(analysis_run_id: str) -> dict:
run = await fetchone(
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
if not run:
return {}
hospital_row = await fetchone(
"SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {}
hospital["domain"] = (hospital_row or {}).get("url") or ""
instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {}
facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {}
naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {}
youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {}
gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {}
snapshot: dict = _build_clinic_snapshot(gangnam_unni, hospital)
yt_patch: dict = await _build_youtube_audit(youtube)
# ── instagram (KR·EN 계정을 코드에서 구성 → LLM 출력 무시하고 교체) ──────────────
ig_patch = build_instagram_accounts(
instagram, hospital.get("instagramEn") or {}, hospital.get("channelLogos") or {},
)
# ── facebook (KR=facebook_data, EN=hospital.facebookEn 둘 다 코드 산출, [KR, EN] 순서) ──
fb_pages = build_facebook_pages(facebook, hospital.get("facebookEn") or {})
overrides: dict = {}
if snapshot:
overrides["clinic_snapshot"] = snapshot
if ig_patch:
overrides["instagram_audit"] = {"accounts": ig_patch}
if fb_pages:
overrides["facebook_audit"] = {"pages": fb_pages}
if yt_patch:
overrides["youtube_audit"] = yt_patch
return overrides
def _deep_merge(base: dict, overrides: dict) -> dict:
for k, v in overrides.items():
if isinstance(v, dict) and isinstance(base.get(k), dict):
_deep_merge(base[k], v)
elif isinstance(v, list) and isinstance(base.get(k), list):
for i, item in enumerate(v):
if i < len(base[k]) and isinstance(item, dict) and isinstance(base[k][i], dict):
_deep_merge(base[k][i], item)
else:
base[k] = v
return base
def _patch_report(result: ReportOutput, overrides: dict) -> ReportOutput:
merged = _deep_merge(result.model_dump(), overrides)
# 인스타 계정은 프롬프트에서 LLM이 []로 두게 했고, 코드가 수집 데이터로 채운다 (데이터 없으면 빈 리스트)
merged.setdefault("instagram_audit", {})["accounts"] = (overrides.get("instagram_audit") or {}).get("accounts") or []
# 페북 페이지(KR+EN): _page_patch가 부분 필드만 만들어 그대로 박으면 검증 실패(label/logo 등 누락).
# LLM이 만든 첫 페이지(보통 KR)를 템플릿으로 복사한 뒤 코드 patch로 인덱스별 덮어쓰기 →
# 필수 필드는 LLM 디폴트 받고, 수집 수치는 코드 값. EN 누락 버그 회피.
fb_pages = (overrides.get("facebook_audit") or {}).get("pages") or []
if fb_pages:
base_pages = merged.setdefault("facebook_audit", {}).setdefault("pages", [])
template = base_pages[0] if base_pages else None
while len(base_pages) < len(fb_pages) and template:
base_pages.append({**template})
for i, patch in enumerate(fb_pages):
if i < len(base_pages):
base_pages[i].update(patch)
return ReportOutput(**merged)
async def run_report_task(analysis_run_id: str) -> None:
logger.info("[report] start run=%s", analysis_run_id)
result = await generate_report(analysis_run_id)
result = _patch_report(result, await _build_overrides(analysis_run_id))
await save_analysis_report(analysis_run_id, result.model_dump())
logger.info("[report] done run=%s", analysis_run_id)
def _patch_plan(result: PlanOutput, logo_desc: str) -> PlanOutput:
"""brand_guide.channel_branding[].profile_photo 는 LLM 안 맡기고 코드가 박는다
(모든 채널 동일값 = brand_assets.logo_description). LLM이 fallback 문구 hallucinate 방지."""
p = result.model_dump()
for ch in (p.get("brand_guide") or {}).get("channel_branding") or []:
ch["profile_photo"] = logo_desc
return PlanOutput(**p)
async def run_plan_task(analysis_run_id: str) -> None:
logger.info("[plan] start run=%s", analysis_run_id)
result = await generate_plan(analysis_run_id)
# profile_photo 는 brand_assets.logo_description 으로 코드가 박음 (LLM "(가이드 미보유)" 같은 hallucination 차단)
run = await fetchone("SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,))
if run:
hr = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],))
h = json.loads(hr["raw_data"]) if hr and isinstance(hr.get("raw_data"), str) else (hr or {}).get("raw_data") or {}
logo_desc = ((h.get("brandAssets") or {}).get("logo_description")) or ""
result = _patch_plan(result, logo_desc)
await execute(
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
(json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id),
)
logger.info("[plan] done run=%s", analysis_run_id)