336 lines
15 KiB
Python
336 lines
15 KiB
Python
import json
|
|
import logging
|
|
import re
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
from common.db.run import update_run_report, update_run_plan, select_run_report_data
|
|
from common.db.source import select_run_raw_data, select_mainpage_logo_url
|
|
from common.db.market import select_market
|
|
from integrations.llm.llm_service import LLMService
|
|
from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt
|
|
from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit
|
|
from services.branding import analyze_branding
|
|
from services.instagram_audit import build_instagram_audit
|
|
from services.facebook_audit import build_facebook_audit
|
|
from services.kpi_dashboard import build_kpi_dashboard
|
|
from integrations.llm.schemas.plan import PlanOutput
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def generate_report(analysis_run_id: str) -> ReportOutput:
|
|
raw = await select_run_raw_data(analysis_run_id)
|
|
clinic = raw.get("mainpage") or {}
|
|
branding = raw.get("branding") or {}
|
|
market = await select_market(analysis_run_id)
|
|
|
|
def _json(v) -> str | None:
|
|
return json.dumps(v, ensure_ascii=False) if v else None
|
|
|
|
input_data = {
|
|
"clinic_name": clinic.get("clinicName"),
|
|
"clinic_name_en": clinic.get("clinicNameEn"),
|
|
"address": clinic.get("address"),
|
|
"phone": clinic.get("phone"),
|
|
"slogan": clinic.get("slogan"),
|
|
"services": json.dumps(clinic.get("services", []), ensure_ascii=False),
|
|
"doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False),
|
|
"market_competitors": _json(market.get("competitors")),
|
|
"market_keywords": _json(market.get("keywords")),
|
|
"market_trend": _json(market.get("trend")),
|
|
"market_target_audience": _json(market.get("target_audience")),
|
|
# firecrawl 이 mainpage 에서 뽑은 branding 메타(logoUrl/ogImage/faviconUrl) + Vision/CSS 산출물
|
|
"branding": _json(clinic.get("branding")),
|
|
"brand_assets": _json(branding.get("brandAssets")),
|
|
"channel_logos": _json(branding.get("channelLogos")),
|
|
# 부가 채널 (raw_info entry) — raw dict 의 한국식 key 그대로
|
|
"tiktok": _json(raw.get("tiktok")),
|
|
"instagram_en": _json(raw.get("instagram_en")),
|
|
"facebook_en": _json(raw.get("facebook_en")),
|
|
"kakao_talk": _json(raw.get("kakaotalk")),
|
|
"naver_cafe": _json(raw.get("naver_cafe")),
|
|
# 메인 5채널은 raw dict 그대로 펼쳐서 prompt placeholder 와 매칭
|
|
**{
|
|
source_type: _json(data)
|
|
for source_type, data in raw.items()
|
|
if source_type not in {
|
|
"mainpage", "branding",
|
|
"tiktok", "instagram_en", "facebook_en", "kakaotalk", "naver_cafe",
|
|
}
|
|
},
|
|
}
|
|
|
|
return await LLMService(provider="perplexity").generate(report_prompt, input_data)
|
|
|
|
|
|
async def generate_plan(analysis_run_id: str) -> PlanOutput:
|
|
raw = await select_run_raw_data(analysis_run_id)
|
|
clinic = raw.get("mainpage") or {}
|
|
branding = raw.get("branding") or {}
|
|
report = await select_run_report_data(analysis_run_id)
|
|
market = await select_market(analysis_run_id)
|
|
|
|
def _json(v) -> str | None:
|
|
return json.dumps(v, ensure_ascii=False) if v else None
|
|
|
|
input_data = {
|
|
"clinic_name": clinic.get("clinicName"),
|
|
"clinic_name_en": clinic.get("clinicNameEn"),
|
|
"address": clinic.get("address"),
|
|
"phone": clinic.get("phone"),
|
|
"slogan": clinic.get("slogan"),
|
|
"services": json.dumps(clinic.get("services", []), ensure_ascii=False),
|
|
"doctors": json.dumps(clinic.get("doctors", []), ensure_ascii=False),
|
|
"report": _json(report),
|
|
"market_competitors": _json(market.get("competitors")),
|
|
"market_keywords": _json(market.get("keywords")),
|
|
"market_trend": _json(market.get("trend")),
|
|
"market_target_audience": _json(market.get("target_audience")),
|
|
"tiktok": _json(raw.get("tiktok")),
|
|
"instagram_en": _json(raw.get("instagram_en")),
|
|
"facebook_en": _json(raw.get("facebook_en")),
|
|
"naver_blog": _json(_naver_blog_summary(raw.get("naver_blog"))),
|
|
"naver_cafe": _json(raw.get("naver_cafe")),
|
|
"kakao_talk": _json(raw.get("kakaotalk")),
|
|
"channel_logos": _json(branding.get("channelLogos")),
|
|
"brand_assets": _json(branding.get("brandAssets")),
|
|
}
|
|
|
|
return await LLMService(provider="perplexity").generate(plan_prompt, input_data)
|
|
|
|
|
|
def _build_clinic_snapshot(gangnam_unni: dict, mainpage: dict, brand_assets: dict, logo_url: str | None) -> dict:
|
|
snapshot: dict = {}
|
|
doctors = gangnam_unni.get("doctors", [])
|
|
lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None
|
|
if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"]
|
|
if mainpage.get("clinicNameEn"): snapshot["name_en"] = mainpage["clinicNameEn"]
|
|
if mainpage.get("phone"): snapshot["phone"] = mainpage["phone"]
|
|
domain = mainpage.get("domain") or urlparse(mainpage.get("sourceUrl") or "").netloc
|
|
if domain: snapshot["domain"] = domain
|
|
if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"]
|
|
if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"]
|
|
if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"]
|
|
if gangnam_unni.get("badges"): snapshot["certifications"] = gangnam_unni["badges"]
|
|
if gangnam_unni.get("totalMajorStaffs"): snapshot["staff_count"] = gangnam_unni["totalMajorStaffs"]
|
|
if lead:
|
|
snapshot["lead_doctor"] = {
|
|
"name": lead.get("name"),
|
|
"credentials": lead.get("specialty"),
|
|
"rating": lead.get("rating"),
|
|
"review_count": lead.get("reviews"),
|
|
}
|
|
# logo URL 은 raw_info.logo_url 컬럼에서, brand_colors 는 JSON 에서 강제 주입. LLM 의 null 처리 차단.
|
|
if logo_url:
|
|
snapshot["logo_images"] = {"circle": None, "horizontal": logo_url, "korean": None}
|
|
if brand_assets.get("brand_colors"): snapshot["brand_colors"] = brand_assets["brand_colors"]
|
|
return ClinicSnapshot.model_validate(snapshot).model_dump()
|
|
|
|
|
|
def _naver_blog_summary(blog: dict | None) -> dict | None:
|
|
"""plan 카드 한 장에 들어가는 건 전체 포스트 수와 최근 활동 시점뿐. 그 외(본문·링크·제목)는
|
|
던져봐야 토큰만 늘고 LLM 이 무관 정보로 hallucinate 함."""
|
|
if not blog:
|
|
return None
|
|
posts = blog.get("posts") or []
|
|
return {
|
|
"totalPosts": blog.get("totalResults"),
|
|
"latestPostDate": posts[0].get("postDate") if posts else None,
|
|
}
|
|
|
|
|
|
def _parse_iso_duration_seconds(iso: str) -> int:
|
|
m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso or "")
|
|
if not m:
|
|
return 0
|
|
h, mins, s = (int(x or 0) for x in m.groups())
|
|
return h * 3600 + mins * 60 + s
|
|
|
|
|
|
def _format_seconds(seconds: int) -> str:
|
|
m, s = divmod(seconds, 60)
|
|
h, m = divmod(m, 60)
|
|
return f"{h}시간 {m}분" if h else f"{m}분 {s}초"
|
|
|
|
|
|
def _format_clock(seconds: int) -> str:
|
|
m, s = divmod(seconds, 60)
|
|
h, m = divmod(m, 60)
|
|
return f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}"
|
|
|
|
|
|
def _calc_avg_video_length(videos: list[dict]) -> str:
|
|
durations = [_parse_iso_duration_seconds(v.get("duration", "")) for v in videos]
|
|
durations = [d for d in durations if d > 0]
|
|
if not durations:
|
|
return ""
|
|
return _format_seconds(sum(durations) // len(durations))
|
|
|
|
|
|
def _relative_date(date_str: str) -> str:
|
|
if not date_str:
|
|
return ""
|
|
try:
|
|
past = datetime.fromisoformat(date_str[:10])
|
|
except ValueError:
|
|
return ""
|
|
days = (datetime.now() - past).days
|
|
if days < 1:
|
|
return "오늘"
|
|
if days < 30:
|
|
return f"{days}일 전"
|
|
if days < 365:
|
|
return f"{days // 30}개월 전"
|
|
return f"{days // 365}년 전"
|
|
|
|
|
|
def _calc_upload_frequency(videos: list[dict]) -> str:
|
|
dates = sorted(
|
|
[v["date"][:10] for v in videos if v.get("date")],
|
|
reverse=True,
|
|
)
|
|
if len(dates) < 2:
|
|
return ""
|
|
gaps = [
|
|
(datetime.fromisoformat(dates[i]) - datetime.fromisoformat(dates[i + 1])).days
|
|
for i in range(len(dates) - 1)
|
|
]
|
|
avg_days = sum(gaps) // len(gaps)
|
|
if avg_days <= 7:
|
|
return f"주 {7 // max(avg_days, 1)}회"
|
|
if avg_days <= 30:
|
|
return f"월 {30 // avg_days}회"
|
|
return f"{avg_days}일에 1회"
|
|
|
|
|
|
async def _build_youtube_audit(youtube: dict) -> dict:
|
|
videos = youtube.get("videos", [])
|
|
yt_patch: dict = {
|
|
"weekly_view_growth": {"absolute": 0, "percentage": 0.0},
|
|
"estimated_monthly_revenue": {"min": 0, "max": 0},
|
|
"linked_urls": [],
|
|
"avg_video_length": _calc_avg_video_length(videos),
|
|
"upload_frequency": _calc_upload_frequency(videos),
|
|
}
|
|
if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"]
|
|
if youtube.get("handle"): yt_patch["handle"] = youtube["handle"]
|
|
if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"]
|
|
if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"]
|
|
if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"]
|
|
if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10]
|
|
if youtube.get("description"): yt_patch["channel_description"] = youtube["description"]
|
|
if youtube.get("playlists"): yt_patch["playlists"] = youtube["playlists"]
|
|
if videos:
|
|
yt_patch["top_videos"] = [
|
|
{
|
|
"title": v["title"],
|
|
"views": v["views"],
|
|
"duration": _format_clock(_parse_iso_duration_seconds(v.get("duration", ""))),
|
|
"type": "Short" if "M" not in v.get("duration", "") else "Long",
|
|
"uploaded_ago": _relative_date(v.get("date", "")),
|
|
}
|
|
for v in videos
|
|
]
|
|
|
|
diagnosis_result = await LLMService(provider="perplexity").generate(
|
|
youtube_diagnosis_prompt,
|
|
{
|
|
"channel_name": yt_patch.get("channel_name"),
|
|
"subscribers": yt_patch.get("subscribers"),
|
|
"total_videos": yt_patch.get("total_videos"),
|
|
"total_views": yt_patch.get("total_views"),
|
|
"avg_video_length": yt_patch.get("avg_video_length"),
|
|
"upload_frequency": yt_patch.get("upload_frequency"),
|
|
"top_videos": json.dumps(yt_patch.get("top_videos", []), ensure_ascii=False),
|
|
"playlists": json.dumps(yt_patch.get("playlists", []), ensure_ascii=False),
|
|
},
|
|
)
|
|
yt_patch["diagnosis"] = [item.model_dump() for item in diagnosis_result.diagnosis]
|
|
|
|
return YouTubeAudit.model_validate(yt_patch).model_dump()
|
|
|
|
|
|
def _deep_merge(base: dict, overrides: dict) -> dict:
|
|
"""dict 끼리 만나면 재귀로 안쪽까지 합치고, 그 외(list/scalar/None) 는 override 값으로 통째 치환."""
|
|
for k, v in overrides.items():
|
|
if isinstance(v, dict) and isinstance(base.get(k), dict):
|
|
_deep_merge(base[k], v)
|
|
else:
|
|
base[k] = v
|
|
return base
|
|
|
|
|
|
async def _build_overrides(analysis_run_id: str, result: ReportOutput) -> ReportOutput:
|
|
raw = await select_run_raw_data(analysis_run_id)
|
|
if not raw:
|
|
return result
|
|
|
|
mainpage = raw.get("mainpage", {}) or {}
|
|
branding = raw.get("branding", {}) or {}
|
|
instagram = raw.get("instagram", {}) or {}
|
|
facebook = raw.get("facebook", {}) or {}
|
|
youtube = raw.get("youtube", {}) or {}
|
|
gangnam_unni = raw.get("gangnam_unni", {}) or {}
|
|
naver_blog = raw.get("naver_blog", {}) or {}
|
|
instagram_en = raw.get("instagram_en", {}) or {}
|
|
facebook_en = raw.get("facebook_en", {}) or {}
|
|
tiktok = raw.get("tiktok", {}) or {}
|
|
naver_cafe = raw.get("naver_cafe", {}) or {}
|
|
brand_assets = branding.get("brandAssets") or {}
|
|
channel_logos = branding.get("channelLogos") or {}
|
|
logo_url = await select_mainpage_logo_url(analysis_run_id)
|
|
|
|
llm_fb_pages = result.model_dump().get("facebook_audit", {}).get("pages", [])
|
|
|
|
snapshot: dict = _build_clinic_snapshot(gangnam_unni, mainpage, brand_assets, logo_url)
|
|
yt_patch: dict = await _build_youtube_audit(youtube)
|
|
ig_patch = build_instagram_audit(instagram, instagram_en, channel_logos)
|
|
fb_patch = build_facebook_audit(facebook, facebook_en, llm_fb_pages)
|
|
kpi_extras = {
|
|
"instagramEn": instagram_en,
|
|
"facebookEn": facebook_en,
|
|
"tiktok": tiktok,
|
|
"naverCafe": naver_cafe,
|
|
}
|
|
kpi = build_kpi_dashboard(instagram, facebook, youtube, gangnam_unni, kpi_extras, naver_blog)
|
|
|
|
overrides: dict = {}
|
|
if snapshot: overrides["clinic_snapshot"] = snapshot
|
|
if ig_patch: overrides["instagram_audit"] = ig_patch
|
|
if fb_patch: overrides["facebook_audit"] = fb_patch
|
|
if yt_patch: overrides["youtube_audit"] = yt_patch
|
|
if kpi: overrides["kpi_dashboard"] = kpi
|
|
|
|
merged = _deep_merge(result.model_dump(), overrides)
|
|
return ReportOutput(**merged)
|
|
|
|
|
|
async def run_report_task(analysis_run_id: str) -> None:
|
|
logger.info("[report] start run=%s", analysis_run_id)
|
|
await analyze_branding(analysis_run_id)
|
|
result = await generate_report(analysis_run_id)
|
|
result = await _build_overrides(analysis_run_id, result)
|
|
await update_run_report(analysis_run_id, result.model_dump())
|
|
logger.info("[report] done run=%s", analysis_run_id)
|
|
|
|
|
|
def _patch_plan(result: PlanOutput, logo_desc: str) -> PlanOutput:
|
|
"""brand_guide.channel_branding[].profile_photo 는 LLM 안 맡기고 코드가 박는다
|
|
(모든 채널 동일값 = brand_assets.logo_description). LLM 이 fallback 문구 hallucinate 방지."""
|
|
p = result.model_dump()
|
|
for ch in (p.get("brand_guide") or {}).get("channel_branding") or []:
|
|
ch["profile_photo"] = logo_desc
|
|
return PlanOutput(**p)
|
|
|
|
|
|
async def run_plan_task(analysis_run_id: str) -> None:
|
|
logger.info("[plan] start run=%s", analysis_run_id)
|
|
result = await generate_plan(analysis_run_id)
|
|
# profile_photo 는 brand_assets.logo_description 으로 코드가 박음 (LLM "(가이드 미보유)" 같은 hallucination 차단).
|
|
raw = await select_run_raw_data(analysis_run_id)
|
|
branding = raw.get("branding") or {}
|
|
logo_desc = ((branding.get("brandAssets") or {}).get("logo_description")) or ""
|
|
result = _patch_plan(result, logo_desc)
|
|
await update_run_plan(analysis_run_id, result.model_dump())
|
|
logger.info("[plan] done run=%s", analysis_run_id)
|