import json import logging from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis from integrations.llm.llm_service import LLMService from integrations.llm.prompt import report_prompt, plan_prompt from integrations.llm.schemas.report import ReportOutput from services.instagram_audit import build_instagram_accounts from services.facebook_audit import build_facebook_pages from integrations.llm.schemas.plan import PlanOutput from models.status import AnalysisStatus logger = logging.getLogger(__name__) async def generate_report(analysis_run_id: str) -> ReportOutput: run = await fetchone( "SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,), ) clinic_row = await fetchone( "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],), ) raw_data = clinic_row["raw_data"] if clinic_row else None clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) raw = await get_analysis_raw_data(analysis_run_id) market = await get_market_analysis(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None input_data = { "clinic_name": clinic.get("clinicName"), "clinic_name_en": clinic.get("clinicNameEn"), "address": clinic.get("address"), "phone": clinic.get("phone"), "slogan": clinic.get("slogan"), "services": json.dumps(clinic.get("services", []), ensure_ascii=False), "doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False), "market_competitors": _json(market.get("competitors")), "market_keywords": _json(market.get("keywords")), "market_trend": _json(market.get("trend")), "market_target_audience": _json(market.get("target_audience")), "branding": _json(clinic.get("branding")), "brand_assets": _json(clinic.get("brandAssets")), "tiktok": _json(clinic.get("tiktok")), "instagram_en": _json(clinic.get("instagramEn")), "facebook_en": _json(clinic.get("facebookEn")), "kakao_talk": _json(clinic.get("kakaoTalk")), "naver_cafe": _json(clinic.get("naverCafe")), "channel_logos": _json(clinic.get("channelLogos")), **{ channel: _json(data) for channel, data in raw.items() }, } return await LLMService(provider="perplexity").generate(report_prompt, input_data) async def generate_plan(analysis_run_id: str) -> PlanOutput: run = await fetchone( "SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,), ) clinic_row = await fetchone( "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],), ) raw_data = clinic_row["raw_data"] if clinic_row else None clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) report_data = run["report_data"] report = json.loads(report_data) if isinstance(report_data, str) else report_data market = await get_market_analysis(analysis_run_id) raw = await get_analysis_raw_data(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None input_data = { "clinic_name": clinic.get("clinicName"), "clinic_name_en": clinic.get("clinicNameEn"), "address": clinic.get("address"), "phone": clinic.get("phone"), "slogan": clinic.get("slogan"), "services": json.dumps(clinic.get("services", []), ensure_ascii=False), "doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False), "report": _json(report), "market_competitors": _json(market.get("competitors")), "market_keywords": _json(market.get("keywords")), "market_trend": _json(market.get("trend")), "market_target_audience": _json(market.get("target_audience")), "tiktok": _json(clinic.get("tiktok")), "instagram_en": _json(clinic.get("instagramEn")), "facebook_en": _json(clinic.get("facebookEn")), "naver_blog": _json(_naver_blog_summary(raw.get("naver_blog"))), "channel_logos": _json(clinic.get("channelLogos")), "brand_assets": _json(clinic.get("brandAssets")), } return await LLMService(provider="perplexity").generate(plan_prompt, input_data) def _naver_blog_summary(blog: dict | None) -> dict | None: """plan 카드 한 장에 들어가는 건 전체 포스트 수와 최근 활동 시점뿐. 그 외(본문·링크·제목)는 던져봐야 토큰만 늘고 LLM이 무관 정보로 hallucinate 함.""" if not blog: return None posts = blog.get("posts") or [] return { "totalPosts": blog.get("totalResults"), "latestPostDate": posts[0].get("postDate") if posts else None, } async def _build_overrides(analysis_run_id: str) -> dict: run = await fetchone( "SELECT hospital_id, instagram_data_id, facebook_data_id," " naver_blog_data_id, youtube_data_id, gangnam_unni_data_id" " FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,), ) if not run: return {} hospital_row = await fetchone( "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],), ) hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {} instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {} facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {} naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {} youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {} gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {} snapshot: dict = {} # ── gangnam_unni ────────────────────────────────────────────────────────── doctors = gangnam_unni.get("doctors", []) lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"] if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"] if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"] if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"] if gangnam_unni.get("badges"): snapshot["certifications"] = gangnam_unni["badges"] if gangnam_unni.get("totalMajorStaffs"): snapshot["staff_count"] = gangnam_unni["totalMajorStaffs"] if lead: snapshot["lead_doctor"] = { "name": lead.get("name"), "credentials": lead.get("specialty"), "rating": lead.get("rating"), "review_count": lead.get("reviews"), } # ── instagram (KR·EN 계정을 코드에서 구성 → LLM 출력 무시하고 교체) ────────────── ig_patch = build_instagram_accounts( instagram, hospital.get("instagramEn") or {}, hospital.get("channelLogos") or {}, ) # ── facebook (KR=facebook_data, EN=hospital.facebookEn 둘 다 코드 산출, [KR, EN] 순서) ── fb_pages = build_facebook_pages(facebook, hospital.get("facebookEn") or {}) # ── youtube ─────────────────────────────────────────────────────────────── yt_patch: dict = {} if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"] if youtube.get("handle"): yt_patch["handle"] = youtube["handle"] if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"] if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"] if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"] if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10] if youtube.get("description"): yt_patch["channel_description"] = youtube["description"] if youtube.get("videos"): yt_patch["top_videos"] = [ { "title": v["title"], "views": v["views"], "duration": v.get("duration"), "type": "Short" if "M" not in v.get("duration", "") else "Long", "uploaded_ago": v.get("date", "")[:10], } for v in youtube["videos"] ] overrides: dict = {} if snapshot: overrides["clinic_snapshot"] = snapshot if ig_patch: overrides["instagram_audit"] = {"accounts": ig_patch} if fb_pages: overrides["facebook_audit"] = {"pages": fb_pages} if yt_patch: overrides["youtube_audit"] = yt_patch return overrides def _deep_merge(base: dict, overrides: dict) -> dict: for k, v in overrides.items(): if isinstance(v, dict) and isinstance(base.get(k), dict): _deep_merge(base[k], v) elif isinstance(v, list) and isinstance(base.get(k), list): for i, item in enumerate(v): if i < len(base[k]) and isinstance(item, dict) and isinstance(base[k][i], dict): _deep_merge(base[k][i], item) else: base[k] = v return base def _patch_report(result: ReportOutput, overrides: dict) -> ReportOutput: merged = _deep_merge(result.model_dump(), overrides) # 인스타 계정은 프롬프트에서 LLM이 []로 두게 했고, 코드가 수집 데이터로 채운다 (데이터 없으면 빈 리스트) merged.setdefault("instagram_audit", {})["accounts"] = (overrides.get("instagram_audit") or {}).get("accounts") or [] return ReportOutput(**merged) async def run_report_task(analysis_run_id: str) -> None: logger.info("[report] start run=%s", analysis_run_id) result = await generate_report(analysis_run_id) result = _patch_report(result, await _build_overrides(analysis_run_id)) await save_analysis_report(analysis_run_id, result.model_dump()) logger.info("[report] done run=%s", analysis_run_id) def _patch_plan(result: PlanOutput, logo_desc: str) -> PlanOutput: """brand_guide.channel_branding[].profile_photo 는 LLM 안 맡기고 코드가 박는다 (모든 채널 동일값 = brand_assets.logo_description). LLM이 fallback 문구 hallucinate 방지.""" p = result.model_dump() for ch in (p.get("brand_guide") or {}).get("channel_branding") or []: ch["profile_photo"] = logo_desc return PlanOutput(**p) async def run_plan_task(analysis_run_id: str) -> None: logger.info("[plan] start run=%s", analysis_run_id) result = await generate_plan(analysis_run_id) # profile_photo 는 brand_assets.logo_description 으로 코드가 박음 (LLM "(가이드 미보유)" 같은 hallucination 차단) run = await fetchone("SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,)) if run: hr = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],)) h = json.loads(hr["raw_data"]) if hr and isinstance(hr.get("raw_data"), str) else (hr or {}).get("raw_data") or {} logo_desc = ((h.get("brandAssets") or {}).get("logo_description")) or "" result = _patch_plan(result, logo_desc) await execute( "UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s", (json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id), ) logger.info("[plan] done run=%s", analysis_run_id)