import json import logging import re from datetime import datetime from urllib.parse import urlparse from common.db.run import select_run, update_run_report, update_run_plan from common.db.source import select_run_raw_data from common.db.market import select_market from integrations.llm.llm_service import LLMService from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit from services.instagram_audit import build_instagram_accounts from services.facebook_audit import build_facebook_pages from services.kpi_dashboard import build_kpi_dashboard from integrations.llm.schemas.plan import PlanOutput logger = logging.getLogger(__name__) async def generate_report(analysis_run_id: str) -> ReportOutput: raw = await select_run_raw_data(analysis_run_id) clinic = raw.get("mainpage") or {} branding = raw.get("branding") or {} market = await select_market(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None input_data = { "clinic_name": clinic.get("clinicName"), "clinic_name_en": clinic.get("clinicNameEn"), "address": clinic.get("address"), "phone": clinic.get("phone"), "slogan": clinic.get("slogan"), "services": json.dumps(clinic.get("services", []), ensure_ascii=False), "doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False), "market_competitors": _json(market.get("competitors")), "market_keywords": _json(market.get("keywords")), "market_trend": _json(market.get("trend")), "market_target_audience": _json(market.get("target_audience")), # firecrawl 이 mainpage 에서 뽑은 branding 메타(logoUrl/ogImage/faviconUrl) + Vision/CSS 산출물 "branding": _json(clinic.get("branding")), "brand_assets": _json(branding.get("brandAssets")), "channel_logos": _json(branding.get("channelLogos")), # 부가 채널 (raw_info entry) — raw dict 의 한국식 key 그대로 "tiktok": _json(raw.get("tiktok")), "instagram_en": _json(raw.get("instagram_en")), "facebook_en": _json(raw.get("facebook_en")), "kakao_talk": _json(raw.get("kakaotalk")), "naver_cafe": _json(raw.get("naver_cafe")), # 메인 5채널은 raw dict 그대로 펼쳐서 prompt placeholder 와 매칭 **{ source_type: _json(data) for source_type, data in raw.items() if source_type not in { "mainpage", "branding", "tiktok", "instagram_en", "facebook_en", "kakaotalk", "naver_cafe", } }, } return await LLMService(provider="perplexity").generate(report_prompt, input_data) async def generate_plan(analysis_run_id: str) -> PlanOutput: run = await select_run(analysis_run_id) raw = await select_run_raw_data(analysis_run_id) clinic = raw.get("mainpage") or {} branding = raw.get("branding") or {} report_data = run["report_data"] report = json.loads(report_data) if isinstance(report_data, str) else report_data market = await select_market(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None input_data = { "clinic_name": clinic.get("clinicName"), "clinic_name_en": clinic.get("clinicNameEn"), "address": clinic.get("address"), "phone": clinic.get("phone"), "slogan": clinic.get("slogan"), "services": json.dumps(clinic.get("services", []), ensure_ascii=False), "doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False), "report": _json(report), "market_competitors": _json(market.get("competitors")), "market_keywords": _json(market.get("keywords")), "market_trend": _json(market.get("trend")), "market_target_audience": _json(market.get("target_audience")), "tiktok": _json(raw.get("tiktok")), "instagram_en": _json(raw.get("instagram_en")), "facebook_en": _json(raw.get("facebook_en")), "naver_blog": _json(_naver_blog_summary(raw.get("naver_blog"))), "naver_cafe": _json(raw.get("naver_cafe")), "kakao_talk": _json(raw.get("kakaotalk")), "channel_logos": _json(branding.get("channelLogos")), "brand_assets": _json(branding.get("brandAssets")), } return await LLMService(provider="perplexity").generate(plan_prompt, input_data) def _build_clinic_snapshot(gangnam_unni: dict, mainpage: dict, brand_assets: dict) -> dict: snapshot: dict = {} doctors = gangnam_unni.get("doctors", []) lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"] if mainpage.get("clinicNameEn"): snapshot["name_en"] = mainpage["clinicNameEn"] if mainpage.get("phone"): snapshot["phone"] = mainpage["phone"] domain = mainpage.get("domain") or urlparse(mainpage.get("sourceUrl") or "").netloc if domain: snapshot["domain"] = domain if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"] if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"] if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"] if gangnam_unni.get("badges"): snapshot["certifications"] = gangnam_unni["badges"] if gangnam_unni.get("totalMajorStaffs"): snapshot["staff_count"] = gangnam_unni["totalMajorStaffs"] if lead: snapshot["lead_doctor"] = { "name": lead.get("name"), "credentials": lead.get("specialty"), "rating": lead.get("rating"), "review_count": lead.get("reviews"), } # branding.brandAssets 에서 logo_images / brand_colors 강제 주입. LLM 이 프롬프트 가드 무시하고 null 로 두는 케이스 차단. if brand_assets.get("logo_images"): snapshot["logo_images"] = brand_assets["logo_images"] if brand_assets.get("brand_colors"): snapshot["brand_colors"] = brand_assets["brand_colors"] return ClinicSnapshot.model_validate(snapshot).model_dump() def _naver_blog_summary(blog: dict | None) -> dict | None: """plan 카드 한 장에 들어가는 건 전체 포스트 수와 최근 활동 시점뿐. 그 외(본문·링크·제목)는 던져봐야 토큰만 늘고 LLM 이 무관 정보로 hallucinate 함.""" if not blog: return None posts = blog.get("posts") or [] return { "totalPosts": blog.get("totalResults"), "latestPostDate": posts[0].get("postDate") if posts else None, } def _parse_iso_duration_seconds(iso: str) -> int: m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso or "") if not m: return 0 h, mins, s = (int(x or 0) for x in m.groups()) return h * 3600 + mins * 60 + s def _format_seconds(seconds: int) -> str: m, s = divmod(seconds, 60) h, m = divmod(m, 60) return f"{h}시간 {m}분" if h else f"{m}분 {s}초" def _format_clock(seconds: int) -> str: m, s = divmod(seconds, 60) h, m = divmod(m, 60) return f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}" def _calc_avg_video_length(videos: list[dict]) -> str: durations = [_parse_iso_duration_seconds(v.get("duration", "")) for v in videos] durations = [d for d in durations if d > 0] if not durations: return "" return _format_seconds(sum(durations) // len(durations)) def _relative_date(date_str: str) -> str: if not date_str: return "" try: past = datetime.fromisoformat(date_str[:10]) except ValueError: return "" days = (datetime.now() - past).days if days < 1: return "오늘" if days < 30: return f"{days}일 전" if days < 365: return f"{days // 30}개월 전" return f"{days // 365}년 전" def _calc_upload_frequency(videos: list[dict]) -> str: dates = sorted( [v["date"][:10] for v in videos if v.get("date")], reverse=True, ) if len(dates) < 2: return "" gaps = [ (datetime.fromisoformat(dates[i]) - datetime.fromisoformat(dates[i + 1])).days for i in range(len(dates) - 1) ] avg_days = sum(gaps) // len(gaps) if avg_days <= 7: return f"주 {7 // max(avg_days, 1)}회" if avg_days <= 30: return f"월 {30 // avg_days}회" return f"{avg_days}일에 1회" async def _build_youtube_audit(youtube: dict) -> dict: videos = youtube.get("videos", []) yt_patch: dict = { "weekly_view_growth": {"absolute": 0, "percentage": 0.0}, "estimated_monthly_revenue": {"min": 0, "max": 0}, "linked_urls": [], "avg_video_length": _calc_avg_video_length(videos), "upload_frequency": _calc_upload_frequency(videos), } if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"] if youtube.get("handle"): yt_patch["handle"] = youtube["handle"] if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"] if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"] if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"] if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10] if youtube.get("description"): yt_patch["channel_description"] = youtube["description"] if youtube.get("playlists"): yt_patch["playlists"] = youtube["playlists"] if videos: yt_patch["top_videos"] = [ { "title": v["title"], "views": v["views"], "duration": _format_clock(_parse_iso_duration_seconds(v.get("duration", ""))), "type": "Short" if "M" not in v.get("duration", "") else "Long", "uploaded_ago": _relative_date(v.get("date", "")), } for v in videos ] diagnosis_result = await LLMService(provider="perplexity").generate( youtube_diagnosis_prompt, { "channel_name": yt_patch.get("channel_name"), "subscribers": yt_patch.get("subscribers"), "total_videos": yt_patch.get("total_videos"), "total_views": yt_patch.get("total_views"), "avg_video_length": yt_patch.get("avg_video_length"), "upload_frequency": yt_patch.get("upload_frequency"), "top_videos": json.dumps(yt_patch.get("top_videos", []), ensure_ascii=False), "playlists": json.dumps(yt_patch.get("playlists", []), ensure_ascii=False), }, ) yt_patch["diagnosis"] = [item.model_dump() for item in diagnosis_result.diagnosis] return YouTubeAudit.model_validate(yt_patch).model_dump() async def _build_overrides(analysis_run_id: str) -> dict: raw = await select_run_raw_data(analysis_run_id) if not raw: return {} mainpage = raw.get("mainpage", {}) or {} branding = raw.get("branding", {}) or {} instagram = raw.get("instagram", {}) or {} facebook = raw.get("facebook", {}) or {} youtube = raw.get("youtube", {}) or {} gangnam_unni = raw.get("gangnam_unni", {}) or {} naver_blog = raw.get("naver_blog", {}) or {} instagram_en = raw.get("instagram_en", {}) or {} facebook_en = raw.get("facebook_en", {}) or {} tiktok = raw.get("tiktok", {}) or {} naver_cafe = raw.get("naver_cafe", {}) or {} brand_assets = branding.get("brandAssets") or {} channel_logos = branding.get("channelLogos") or {} snapshot: dict = _build_clinic_snapshot(gangnam_unni, mainpage, brand_assets) yt_patch: dict = await _build_youtube_audit(youtube) # ── instagram (KR·EN 계정을 코드에서 구성 → LLM 출력 무시하고 교체) ────────────── ig_patch = build_instagram_accounts(instagram, instagram_en, channel_logos) # ── facebook (KR=raw.facebook, EN=raw.facebook_en 둘 다 코드 산출, [KR, EN] 순서) ── fb_pages = build_facebook_pages(facebook, facebook_en) # ── KPI dashboard: 7개 mockup 라이프사이클 공식으로 코드가 결정. LLM 출력은 무시. ────── # build_kpi_dashboard 의 hospital 인자에 부가 채널 dict 모아서 넘김 (instagramEn/facebookEn/tiktok/naverCafe 키 기대). kpi_extras = { "instagramEn": instagram_en, "facebookEn": facebook_en, "tiktok": tiktok, "naverCafe": naver_cafe, } kpi = build_kpi_dashboard(instagram, facebook, youtube, gangnam_unni, kpi_extras, naver_blog) overrides: dict = {} if snapshot: overrides["clinic_snapshot"] = snapshot if ig_patch: overrides["instagram_audit"] = {"accounts": ig_patch} if fb_pages: overrides["facebook_audit"] = {"pages": fb_pages} if yt_patch: overrides["youtube_audit"] = yt_patch if kpi: overrides["kpi_dashboard"] = kpi return overrides def _deep_merge(base: dict, overrides: dict) -> dict: for k, v in overrides.items(): if isinstance(v, dict) and isinstance(base.get(k), dict): _deep_merge(base[k], v) elif isinstance(v, list) and isinstance(base.get(k), list): for i, item in enumerate(v): if i < len(base[k]) and isinstance(item, dict) and isinstance(base[k][i], dict): _deep_merge(base[k][i], item) else: base[k] = v return base def _patch_report(result: ReportOutput, overrides: dict) -> ReportOutput: merged = _deep_merge(result.model_dump(), overrides) # 인스타 계정은 프롬프트에서 LLM 이 [] 로 두게 했고, 코드가 수집 데이터로 채운다 (데이터 없으면 빈 리스트) merged.setdefault("instagram_audit", {})["accounts"] = (overrides.get("instagram_audit") or {}).get("accounts") or [] # 페북 페이지(KR+EN): _page_patch 가 부분 필드만 만들어 그대로 박으면 검증 실패(label/logo 등 누락). # LLM 이 만든 첫 페이지(보통 KR)를 템플릿으로 복사한 뒤 코드 patch 로 인덱스별 덮어쓰기 → # 필수 필드는 LLM 디폴트 받고, 수집 수치는 코드 값. EN 누락 버그 회피. fb_pages = (overrides.get("facebook_audit") or {}).get("pages") or [] if fb_pages: base_pages = merged.setdefault("facebook_audit", {}).setdefault("pages", []) template = base_pages[0] if base_pages else None while len(base_pages) < len(fb_pages) and template: base_pages.append({**template}) for i, patch in enumerate(fb_pages): if i < len(base_pages): base_pages[i].update(patch) # KPI dashboard 강제 치환 — 코드가 계산한 라이프사이클 공식 그대로. if overrides.get("kpi_dashboard"): merged["kpi_dashboard"] = overrides["kpi_dashboard"] return ReportOutput(**merged) async def run_report_task(analysis_run_id: str) -> None: logger.info("[report] start run=%s", analysis_run_id) result = await generate_report(analysis_run_id) result = _patch_report(result, await _build_overrides(analysis_run_id)) await update_run_report(analysis_run_id, result.model_dump()) logger.info("[report] done run=%s", analysis_run_id) def _patch_plan(result: PlanOutput, logo_desc: str) -> PlanOutput: """brand_guide.channel_branding[].profile_photo 는 LLM 안 맡기고 코드가 박는다 (모든 채널 동일값 = brand_assets.logo_description). LLM 이 fallback 문구 hallucinate 방지.""" p = result.model_dump() for ch in (p.get("brand_guide") or {}).get("channel_branding") or []: ch["profile_photo"] = logo_desc return PlanOutput(**p) async def run_plan_task(analysis_run_id: str) -> None: logger.info("[plan] start run=%s", analysis_run_id) result = await generate_plan(analysis_run_id) # profile_photo 는 brand_assets.logo_description 으로 코드가 박음 (LLM "(가이드 미보유)" 같은 hallucination 차단). raw = await select_run_raw_data(analysis_run_id) branding = raw.get("branding") or {} logo_desc = ((branding.get("brandAssets") or {}).get("logo_description")) or "" result = _patch_plan(result, logo_desc) await update_run_plan(analysis_run_id, result.model_dump()) logger.info("[plan] done run=%s", analysis_run_id)