From eed57729d9ae5c1b97be3592ab0ba76d67050ef0 Mon Sep 17 00:00:00 2001 From: jaehwang Date: Fri, 29 May 2026 16:19:06 +0900 Subject: [PATCH] =?UTF-8?q?clinic=5Foverview=20,=20youtube=20analysis=20?= =?UTF-8?q?=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/integrations/llm/prompt.py | 9 +- app/integrations/llm/schemas/report.py | 24 ++- .../temp-prompt/youtube_diagnosis_prompt.txt | 24 +++ app/integrations/youtube.py | 17 +- app/models/report.py | 7 - app/services/analysis.py | 184 ++++++++++++++---- 6 files changed, 208 insertions(+), 57 deletions(-) create mode 100644 app/integrations/llm/temp-prompt/youtube_diagnosis_prompt.txt diff --git a/app/integrations/llm/prompt.py b/app/integrations/llm/prompt.py index 365b7b0..d4c3b88 100644 --- a/app/integrations/llm/prompt.py +++ b/app/integrations/llm/prompt.py @@ -1,7 +1,7 @@ import os from pydantic import BaseModel from common.utils import get_env -from integrations.llm.schemas.report import ReportInput, ReportOutput +from integrations.llm.schemas.report import ReportInput, ReportOutput, YouTubeDiagnosisInput, YouTubeDiagnosisOutput from integrations.llm.schemas.plan import PlanInput, PlanOutput from integrations.llm.schemas.market import ( MarketCompetitorsInput, MarketCompetitorsOutput, @@ -80,3 +80,10 @@ market_target_audience_prompt = Prompt( input_class=MarketTargetAudienceInput, output_class=MarketTargetAudienceOutput, ) + +youtube_diagnosis_prompt = Prompt( + file_name="youtube_diagnosis_prompt.txt", + prompt_model="REPORT_MODEL", + input_class=YouTubeDiagnosisInput, + output_class=YouTubeDiagnosisOutput, +) diff --git a/app/integrations/llm/schemas/report.py b/app/integrations/llm/schemas/report.py index 2b56110..9f24be3 100644 --- a/app/integrations/llm/schemas/report.py +++ b/app/integrations/llm/schemas/report.py @@ -70,18 +70,12 @@ class RegistryData(BaseModel): class ClinicSnapshot(BaseModel): name: str name_en: str - established: str - years_in_business: int staff_count: int lead_doctor: LeadDoctor overall_rating: float total_reviews: int - price_range: PriceRange certifications: list[str] - media_appearances: list[str] - medical_tourism: list[str] location: str - nearest_station: str phone: str domain: str logo_images: LogoImages | None = None @@ -137,7 +131,6 @@ class YouTubeAudit(BaseModel): avg_video_length: str upload_frequency: str channel_created_date: str - subscriber_rank: str channel_description: str linked_urls: list[LinkedUrl] playlists: list[str] @@ -345,3 +338,20 @@ class MarketingReport(BaseModel): ReportOutput = MarketingReport + + +# --- YouTubeDiagnosis --- + +class YouTubeDiagnosisInput(BaseModel): + channel_name: str | None = None + subscribers: int | None = None + total_videos: int | None = None + total_views: int | None = None + avg_video_length: str | None = None + upload_frequency: str | None = None + top_videos: str | None = None + playlists: str | None = None + + +class YouTubeDiagnosisOutput(BaseModel): + diagnosis: list[DiagnosisItem] diff --git a/app/integrations/llm/temp-prompt/youtube_diagnosis_prompt.txt b/app/integrations/llm/temp-prompt/youtube_diagnosis_prompt.txt new file mode 100644 index 0000000..13e098e --- /dev/null +++ b/app/integrations/llm/temp-prompt/youtube_diagnosis_prompt.txt @@ -0,0 +1,24 @@ +다음은 성형외과/피부과 유튜브 채널 데이터입니다. + +채널명: {channel_name} +구독자 수: {subscribers} +총 영상 수: {total_videos} +총 조회수: {total_views} +평균 영상 길이: {avg_video_length} +업로드 주기: {upload_frequency} +인기 영상 목록: {top_videos} +플레이리스트: {playlists} + +위 데이터를 바탕으로 이 채널의 마케팅 문제점과 개선사항을 진단해줘. +각 항목은 category(진단 카테고리), detail(상세 설명), severity(critical/warning/info) 형식의 JSON 배열로 출력해줘. + +진단 카테고리들은 다음과 같아. : +구독자 대비 조회수 비율, +최근 롱폼 조회수, +Shorts 조회수, +업로드 빈도, +콘텐츠 톤앤매너, +썸네일 디자인, +최고 성과 Shorts + +출처 번호([1], [2] 등)는 굳이 포함하지 마. \ No newline at end of file diff --git a/app/integrations/youtube.py b/app/integrations/youtube.py index 734f142..d76a2b0 100644 --- a/app/integrations/youtube.py +++ b/app/integrations/youtube.py @@ -79,7 +79,17 @@ class YouTubeClient: if resp and resp.is_success: videos = resp.json().get("items", [])[:10] - return {"channelId": channel_id, "channel": channel, "videos": videos} + playlists: list[dict] = [] + resp = await http_request( + HTTPMethod.GET, + url=f"{YT}/playlists", + params={"part": "snippet", "channelId": channel_id, "maxResults": 50, "key": self.api_key}, + label="yt-playlists", + ) + if resp and resp.is_success: + playlists = resp.json().get("items", []) + + return {"channelId": channel_id, "channel": channel, "videos": videos, "playlists": playlists} async def get_channel(self, url: str) -> dict | None: raw = await self.fetch_channel(url) @@ -109,6 +119,11 @@ class YouTubeClient: } for v in raw["videos"] ], + "playlists": [ + p.get("snippet", {}).get("title") + for p in raw["playlists"] + if p.get("snippet", {}).get("title") + ], } async def search_channels(self, query: str, max_results: int = 3) -> list[str]: diff --git a/app/models/report.py b/app/models/report.py index a5c765c..a99f59d 100644 --- a/app/models/report.py +++ b/app/models/report.py @@ -68,18 +68,12 @@ class RegistryData(CamelModel): class ClinicSnapshot(CamelModel): name: str name_en: str - established: str - years_in_business: int staff_count: int lead_doctor: LeadDoctor overall_rating: float total_reviews: int - price_range: PriceRange certifications: list[str] - media_appearances: list[str] - medical_tourism: list[str] location: str - nearest_station: str phone: str domain: str logo_images: LogoImages | None = None @@ -131,7 +125,6 @@ class YouTubeAudit(CamelModel): avg_video_length: str upload_frequency: str channel_created_date: str - subscriber_rank: str channel_description: str linked_urls: list[LinkedUrl] playlists: list[str] diff --git a/app/services/analysis.py b/app/services/analysis.py index 866d195..63fc23e 100644 --- a/app/services/analysis.py +++ b/app/services/analysis.py @@ -1,10 +1,12 @@ import json import logging import os +import re +from datetime import datetime from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis from integrations.llm.llm_service import LLMService -from integrations.llm.prompt import report_prompt, plan_prompt -from integrations.llm.schemas.report import ReportOutput +from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt +from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit from integrations.llm.schemas.plan import PlanOutput from models.status import AnalysisStatus @@ -84,6 +86,140 @@ async def generate_plan(analysis_run_id: str) -> PlanOutput: return await LLMService(provider="perplexity").generate(plan_prompt, input_data) +def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict: + snapshot: dict = {} + doctors = gangnam_unni.get("doctors", []) + lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None + if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"] + if hospital.get("clinicNameEn"): snapshot["name_en"] = hospital["clinicNameEn"] + if hospital.get("phone"): snapshot["phone"] = hospital["phone"] + if hospital.get("domain"): snapshot["domain"] = hospital["domain"] + if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"] + if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"] + if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"] + if gangnam_unni.get("badges"): snapshot["certifications"] = gangnam_unni["badges"] + if gangnam_unni.get("totalMajorStaffs"): snapshot["staff_count"] = gangnam_unni["totalMajorStaffs"] + if lead: + snapshot["lead_doctor"] = { + "name": lead.get("name"), + "credentials": lead.get("specialty"), + "rating": lead.get("rating"), + "review_count": lead.get("reviews"), + } + return ClinicSnapshot.model_validate(snapshot).model_dump() + + +def _parse_iso_duration_seconds(iso: str) -> int: + m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso or "") + if not m: + return 0 + h, mins, s = (int(x or 0) for x in m.groups()) + return h * 3600 + mins * 60 + s + + +def _format_seconds(seconds: int) -> str: + m, s = divmod(seconds, 60) + h, m = divmod(m, 60) + return f"{h}시간 {m}분" if h else f"{m}분 {s}초" + + +def _format_clock(seconds: int) -> str: + m, s = divmod(seconds, 60) + h, m = divmod(m, 60) + return f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}" + + +def _calc_avg_video_length(videos: list[dict]) -> str: + durations = [_parse_iso_duration_seconds(v.get("duration", "")) for v in videos] + durations = [d for d in durations if d > 0] + if not durations: + return "" + return _format_seconds(sum(durations) // len(durations)) + + +def _relative_date(date_str: str) -> str: + if not date_str: + return "" + try: + past = datetime.fromisoformat(date_str[:10]) + except ValueError: + return "" + days = (datetime.now() - past).days + if days < 1: + return "오늘" + if days < 30: + return f"{days}일 전" + if days < 365: + return f"{days // 30}개월 전" + return f"{days // 365}년 전" + + +def _calc_upload_frequency(videos: list[dict]) -> str: + dates = sorted( + [v["date"][:10] for v in videos if v.get("date")], + reverse=True, + ) + if len(dates) < 2: + return "" + gaps = [ + (datetime.fromisoformat(dates[i]) - datetime.fromisoformat(dates[i + 1])).days + for i in range(len(dates) - 1) + ] + avg_days = sum(gaps) // len(gaps) + if avg_days <= 7: + return f"주 {7 // max(avg_days, 1)}회" + if avg_days <= 30: + return f"월 {30 // avg_days}회" + return f"{avg_days}일에 1회" + + +async def _build_youtube_audit(youtube: dict) -> dict: + videos = youtube.get("videos", []) + yt_patch: dict = { + "weekly_view_growth": {"absolute": 0, "percentage": 0.0}, + "estimated_monthly_revenue": {"min": 0, "max": 0}, + "linked_urls": [], + "avg_video_length": _calc_avg_video_length(videos), + "upload_frequency": _calc_upload_frequency(videos), + } + if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"] + if youtube.get("handle"): yt_patch["handle"] = youtube["handle"] + if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"] + if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"] + if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"] + if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10] + if youtube.get("description"): yt_patch["channel_description"] = youtube["description"] + if youtube.get("playlists"): yt_patch["playlists"] = youtube["playlists"] + if videos: + yt_patch["top_videos"] = [ + { + "title": v["title"], + "views": v["views"], + "duration": _format_clock(_parse_iso_duration_seconds(v.get("duration", ""))), + "type": "Short" if "M" not in v.get("duration", "") else "Long", + "uploaded_ago": _relative_date(v.get("date", "")), + } + for v in videos + ] + + diagnosis_result = await LLMService(provider="perplexity").generate( + youtube_diagnosis_prompt, + { + "channel_name": yt_patch.get("channel_name"), + "subscribers": yt_patch.get("subscribers"), + "total_videos": yt_patch.get("total_videos"), + "total_views": yt_patch.get("total_views"), + "avg_video_length": yt_patch.get("avg_video_length"), + "upload_frequency": yt_patch.get("upload_frequency"), + "top_videos": json.dumps(yt_patch.get("top_videos", []), ensure_ascii=False), + "playlists": json.dumps(yt_patch.get("playlists", []), ensure_ascii=False), + }, + ) + yt_patch["diagnosis"] = [item.model_dump() for item in diagnosis_result.diagnosis] + + return YouTubeAudit.model_validate(yt_patch).model_dump() + + async def _build_overrides(analysis_run_id: str) -> dict: run = await fetchone( "SELECT hospital_id, instagram_data_id, facebook_data_id," @@ -95,34 +231,19 @@ async def _build_overrides(analysis_run_id: str) -> dict: return {} hospital_row = await fetchone( - "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", + "SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],), ) hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {} + hospital["domain"] = (hospital_row or {}).get("url") or "" instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {} facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {} naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {} youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {} gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {} - snapshot: dict = {} - - # ── gangnam_unni ────────────────────────────────────────────────────────── - doctors = gangnam_unni.get("doctors", []) - lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None - if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"] - if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"] - if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"] - if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"] - if gangnam_unni.get("badges"): snapshot["certifications"] = gangnam_unni["badges"] - if gangnam_unni.get("totalMajorStaffs"): snapshot["staff_count"] = gangnam_unni["totalMajorStaffs"] - if lead: - snapshot["lead_doctor"] = { - "name": lead.get("name"), - "credentials": lead.get("specialty"), - "rating": lead.get("rating"), - "review_count": lead.get("reviews"), - } + snapshot: dict = _build_clinic_snapshot(gangnam_unni, hospital) + yt_patch: dict = await _build_youtube_audit(youtube) # ── instagram ───────────────────────────────────────────────────────────── ig_patch: dict = {} @@ -143,26 +264,6 @@ async def _build_overrides(analysis_run_id: str) -> dict: if facebook.get("categories"): fb_patch["category"] = ", ".join(facebook["categories"]) if facebook.get("website"): fb_patch["linked_domain"] = facebook["website"] - # ── youtube ─────────────────────────────────────────────────────────────── - yt_patch: dict = {} - if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"] - if youtube.get("handle"): yt_patch["handle"] = youtube["handle"] - if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"] - if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"] - if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"] - if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10] - if youtube.get("description"): yt_patch["channel_description"] = youtube["description"] - if youtube.get("videos"): - yt_patch["top_videos"] = [ - { - "title": v["title"], - "views": v["views"], - "duration": v.get("duration"), - "type": "Short" if "M" not in v.get("duration", "") else "Long", - "uploaded_ago": v.get("date", "")[:10], - } - for v in youtube["videos"] - ] overrides: dict = {} if snapshot: @@ -225,6 +326,7 @@ async def run_report_task(analysis_run_id: str) -> None: if await _is_mock(analysis_run_id): logger.info("[report] mock mode run=%s", analysis_run_id) result = _load_mock_report() + result.youtube_audit.linked_urls = [] else: result = await generate_report(analysis_run_id) result = _patch_report(result, await _build_overrides(analysis_run_id))