clinic_overview , youtube analysis 정리

db-migration
jaehwang 2026-05-29 16:19:06 +09:00
parent d1293f9188
commit eed57729d9
6 changed files with 208 additions and 57 deletions

View File

@ -1,7 +1,7 @@
import os
from pydantic import BaseModel
from common.utils import get_env
from integrations.llm.schemas.report import ReportInput, ReportOutput
from integrations.llm.schemas.report import ReportInput, ReportOutput, YouTubeDiagnosisInput, YouTubeDiagnosisOutput
from integrations.llm.schemas.plan import PlanInput, PlanOutput
from integrations.llm.schemas.market import (
MarketCompetitorsInput, MarketCompetitorsOutput,
@ -80,3 +80,10 @@ market_target_audience_prompt = Prompt(
input_class=MarketTargetAudienceInput,
output_class=MarketTargetAudienceOutput,
)
youtube_diagnosis_prompt = Prompt(
file_name="youtube_diagnosis_prompt.txt",
prompt_model="REPORT_MODEL",
input_class=YouTubeDiagnosisInput,
output_class=YouTubeDiagnosisOutput,
)

View File

@ -70,18 +70,12 @@ class RegistryData(BaseModel):
class ClinicSnapshot(BaseModel):
name: str
name_en: str
established: str
years_in_business: int
staff_count: int
lead_doctor: LeadDoctor
overall_rating: float
total_reviews: int
price_range: PriceRange
certifications: list[str]
media_appearances: list[str]
medical_tourism: list[str]
location: str
nearest_station: str
phone: str
domain: str
logo_images: LogoImages | None = None
@ -137,7 +131,6 @@ class YouTubeAudit(BaseModel):
avg_video_length: str
upload_frequency: str
channel_created_date: str
subscriber_rank: str
channel_description: str
linked_urls: list[LinkedUrl]
playlists: list[str]
@ -345,3 +338,20 @@ class MarketingReport(BaseModel):
ReportOutput = MarketingReport
# --- YouTubeDiagnosis ---
class YouTubeDiagnosisInput(BaseModel):
channel_name: str | None = None
subscribers: int | None = None
total_videos: int | None = None
total_views: int | None = None
avg_video_length: str | None = None
upload_frequency: str | None = None
top_videos: str | None = None
playlists: str | None = None
class YouTubeDiagnosisOutput(BaseModel):
diagnosis: list[DiagnosisItem]

View File

@ -0,0 +1,24 @@
다음은 성형외과/피부과 유튜브 채널 데이터입니다.
채널명: {channel_name}
구독자 수: {subscribers}
총 영상 수: {total_videos}
총 조회수: {total_views}
평균 영상 길이: {avg_video_length}
업로드 주기: {upload_frequency}
인기 영상 목록: {top_videos}
플레이리스트: {playlists}
위 데이터를 바탕으로 이 채널의 마케팅 문제점과 개선사항을 진단해줘.
각 항목은 category(진단 카테고리), detail(상세 설명), severity(critical/warning/info) 형식의 JSON 배열로 출력해줘.
진단 카테고리들은 다음과 같아. :
구독자 대비 조회수 비율,
최근 롱폼 조회수,
Shorts 조회수,
업로드 빈도,
콘텐츠 톤앤매너,
썸네일 디자인,
최고 성과 Shorts
출처 번호([1], [2] 등)는 굳이 포함하지 마.

View File

@ -79,7 +79,17 @@ class YouTubeClient:
if resp and resp.is_success:
videos = resp.json().get("items", [])[:10]
return {"channelId": channel_id, "channel": channel, "videos": videos}
playlists: list[dict] = []
resp = await http_request(
HTTPMethod.GET,
url=f"{YT}/playlists",
params={"part": "snippet", "channelId": channel_id, "maxResults": 50, "key": self.api_key},
label="yt-playlists",
)
if resp and resp.is_success:
playlists = resp.json().get("items", [])
return {"channelId": channel_id, "channel": channel, "videos": videos, "playlists": playlists}
async def get_channel(self, url: str) -> dict | None:
raw = await self.fetch_channel(url)
@ -109,6 +119,11 @@ class YouTubeClient:
}
for v in raw["videos"]
],
"playlists": [
p.get("snippet", {}).get("title")
for p in raw["playlists"]
if p.get("snippet", {}).get("title")
],
}
async def search_channels(self, query: str, max_results: int = 3) -> list[str]:

View File

@ -68,18 +68,12 @@ class RegistryData(CamelModel):
class ClinicSnapshot(CamelModel):
name: str
name_en: str
established: str
years_in_business: int
staff_count: int
lead_doctor: LeadDoctor
overall_rating: float
total_reviews: int
price_range: PriceRange
certifications: list[str]
media_appearances: list[str]
medical_tourism: list[str]
location: str
nearest_station: str
phone: str
domain: str
logo_images: LogoImages | None = None
@ -131,7 +125,6 @@ class YouTubeAudit(CamelModel):
avg_video_length: str
upload_frequency: str
channel_created_date: str
subscriber_rank: str
channel_description: str
linked_urls: list[LinkedUrl]
playlists: list[str]

View File

@ -1,10 +1,12 @@
import json
import logging
import os
import re
from datetime import datetime
from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis
from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import report_prompt, plan_prompt
from integrations.llm.schemas.report import ReportOutput
from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt
from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit
from integrations.llm.schemas.plan import PlanOutput
from models.status import AnalysisStatus
@ -84,33 +86,14 @@ async def generate_plan(analysis_run_id: str) -> PlanOutput:
return await LLMService(provider="perplexity").generate(plan_prompt, input_data)
async def _build_overrides(analysis_run_id: str) -> dict:
run = await fetchone(
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
if not run:
return {}
hospital_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {}
instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {}
facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {}
naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {}
youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {}
gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {}
def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict:
snapshot: dict = {}
# ── gangnam_unni ──────────────────────────────────────────────────────────
doctors = gangnam_unni.get("doctors", [])
lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None
if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"]
if hospital.get("clinicNameEn"): snapshot["name_en"] = hospital["clinicNameEn"]
if hospital.get("phone"): snapshot["phone"] = hospital["phone"]
if hospital.get("domain"): snapshot["domain"] = hospital["domain"]
if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"]
if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"]
if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"]
@ -123,6 +106,144 @@ async def _build_overrides(analysis_run_id: str) -> dict:
"rating": lead.get("rating"),
"review_count": lead.get("reviews"),
}
return ClinicSnapshot.model_validate(snapshot).model_dump()
def _parse_iso_duration_seconds(iso: str) -> int:
m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso or "")
if not m:
return 0
h, mins, s = (int(x or 0) for x in m.groups())
return h * 3600 + mins * 60 + s
def _format_seconds(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}시간 {m}" if h else f"{m}{s}"
def _format_clock(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}"
def _calc_avg_video_length(videos: list[dict]) -> str:
durations = [_parse_iso_duration_seconds(v.get("duration", "")) for v in videos]
durations = [d for d in durations if d > 0]
if not durations:
return ""
return _format_seconds(sum(durations) // len(durations))
def _relative_date(date_str: str) -> str:
if not date_str:
return ""
try:
past = datetime.fromisoformat(date_str[:10])
except ValueError:
return ""
days = (datetime.now() - past).days
if days < 1:
return "오늘"
if days < 30:
return f"{days}일 전"
if days < 365:
return f"{days // 30}개월 전"
return f"{days // 365}년 전"
def _calc_upload_frequency(videos: list[dict]) -> str:
dates = sorted(
[v["date"][:10] for v in videos if v.get("date")],
reverse=True,
)
if len(dates) < 2:
return ""
gaps = [
(datetime.fromisoformat(dates[i]) - datetime.fromisoformat(dates[i + 1])).days
for i in range(len(dates) - 1)
]
avg_days = sum(gaps) // len(gaps)
if avg_days <= 7:
return f"{7 // max(avg_days, 1)}"
if avg_days <= 30:
return f"{30 // avg_days}"
return f"{avg_days}일에 1회"
async def _build_youtube_audit(youtube: dict) -> dict:
videos = youtube.get("videos", [])
yt_patch: dict = {
"weekly_view_growth": {"absolute": 0, "percentage": 0.0},
"estimated_monthly_revenue": {"min": 0, "max": 0},
"linked_urls": [],
"avg_video_length": _calc_avg_video_length(videos),
"upload_frequency": _calc_upload_frequency(videos),
}
if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"]
if youtube.get("handle"): yt_patch["handle"] = youtube["handle"]
if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"]
if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"]
if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"]
if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10]
if youtube.get("description"): yt_patch["channel_description"] = youtube["description"]
if youtube.get("playlists"): yt_patch["playlists"] = youtube["playlists"]
if videos:
yt_patch["top_videos"] = [
{
"title": v["title"],
"views": v["views"],
"duration": _format_clock(_parse_iso_duration_seconds(v.get("duration", ""))),
"type": "Short" if "M" not in v.get("duration", "") else "Long",
"uploaded_ago": _relative_date(v.get("date", "")),
}
for v in videos
]
diagnosis_result = await LLMService(provider="perplexity").generate(
youtube_diagnosis_prompt,
{
"channel_name": yt_patch.get("channel_name"),
"subscribers": yt_patch.get("subscribers"),
"total_videos": yt_patch.get("total_videos"),
"total_views": yt_patch.get("total_views"),
"avg_video_length": yt_patch.get("avg_video_length"),
"upload_frequency": yt_patch.get("upload_frequency"),
"top_videos": json.dumps(yt_patch.get("top_videos", []), ensure_ascii=False),
"playlists": json.dumps(yt_patch.get("playlists", []), ensure_ascii=False),
},
)
yt_patch["diagnosis"] = [item.model_dump() for item in diagnosis_result.diagnosis]
return YouTubeAudit.model_validate(yt_patch).model_dump()
async def _build_overrides(analysis_run_id: str) -> dict:
run = await fetchone(
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
if not run:
return {}
hospital_row = await fetchone(
"SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {}
hospital["domain"] = (hospital_row or {}).get("url") or ""
instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {}
facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {}
naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {}
youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {}
gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {}
snapshot: dict = _build_clinic_snapshot(gangnam_unni, hospital)
yt_patch: dict = await _build_youtube_audit(youtube)
# ── instagram ─────────────────────────────────────────────────────────────
ig_patch: dict = {}
@ -143,26 +264,6 @@ async def _build_overrides(analysis_run_id: str) -> dict:
if facebook.get("categories"): fb_patch["category"] = ", ".join(facebook["categories"])
if facebook.get("website"): fb_patch["linked_domain"] = facebook["website"]
# ── youtube ───────────────────────────────────────────────────────────────
yt_patch: dict = {}
if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"]
if youtube.get("handle"): yt_patch["handle"] = youtube["handle"]
if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"]
if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"]
if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"]
if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10]
if youtube.get("description"): yt_patch["channel_description"] = youtube["description"]
if youtube.get("videos"):
yt_patch["top_videos"] = [
{
"title": v["title"],
"views": v["views"],
"duration": v.get("duration"),
"type": "Short" if "M" not in v.get("duration", "") else "Long",
"uploaded_ago": v.get("date", "")[:10],
}
for v in youtube["videos"]
]
overrides: dict = {}
if snapshot:
@ -225,6 +326,7 @@ async def run_report_task(analysis_run_id: str) -> None:
if await _is_mock(analysis_run_id):
logger.info("[report] mock mode run=%s", analysis_run_id)
result = _load_mock_report()
result.youtube_audit.linked_urls = []
else:
result = await generate_report(analysis_run_id)
result = _patch_report(result, await _build_overrides(analysis_run_id))