diff --git a/app/api/analysis.py b/app/api/analysis.py
index 7e2691c..2abbe32 100644
--- a/app/api/analysis.py
+++ b/app/api/analysis.py
@@ -8,10 +8,38 @@ from models.file import FileListItem, FileType, FileUploadResponse
from models.status import AnalysisStatus
from services.pipeline import run_pipeline
from services.file import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file
+from mock_urls import MOCK_CLINICS
+from common.utils import _normalize_homepage, _with_scheme
router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)])
logger = logging.getLogger(__name__)
+# 추후 DB에 클리닉별로 매핑할 채널들 — 지금은 mock_urls에서 homepage 매칭으로 보충.
+# 메인 채널(IG/FB/YT/네이버블로그/강남언니) + 부가 채널(틱톡/영문 IG·FB/카카오/네이버카페) 모두 포함.
+# 클라가 일부만 보내거나 빈 값이면 mock에서 동일 hospital을 찾아 채워줌.
+def _channels_from_mockurls(homepage_url: str) -> dict:
+ target = _normalize_homepage(homepage_url)
+ if not target:
+ return {}
+ for c in MOCK_CLINICS:
+ urls = c["urls"]
+ if _normalize_homepage(urls.get("homepage", "")) == target:
+ return {
+ # main
+ "instagram": _with_scheme(urls.get("instagram")),
+ "facebook": _with_scheme(urls.get("facebook")),
+ "naver_blog": _with_scheme(urls.get("naverBlog")),
+ "youtube": _with_scheme(urls.get("youtube")),
+ "gangnam_unni": _with_scheme(urls.get("gangnamUnni")),
+ # extra
+ "tiktok": _with_scheme(urls.get("tiktok")),
+ "instagram_en": _with_scheme(urls.get("instagramEn")),
+ "facebook_en": _with_scheme(urls.get("facebookEn")),
+ "kakao_talk": _with_scheme(urls.get("kakaoTalk")),
+ "naver_cafe": _with_scheme(urls.get("naverCafe")),
+ }
+ return {}
+
@router.post("", status_code=status.HTTP_202_ACCEPTED, response_model=AnalysisStartResponse)
async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks):
@@ -27,18 +55,36 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks
if not hospital:
raise HTTPException(status_code=409, detail="Clinic not found")
- ig_id = await insert_instagram_row(hospital_id, body.channels.instagram) if body.channels.instagram else None
- fb_id = await insert_facebook_row(hospital_id, body.channels.facebook) if body.channels.facebook else None
- nb_id = await insert_naver_blog_row(hospital_id, body.channels.naver_blog) if body.channels.naver_blog else None
- yt_id = await insert_youtube_row(hospital_id, body.channels.youtube) if body.channels.youtube else None
- gu_id = await insert_gangnam_unni_row(hospital_id, body.channels.gangnam_unni) if body.channels.gangnam_unni else None
+ # 클라가 안 보낸 채널은 mock_urls에서 homepage 매칭으로 보충 (main + extra 동일 규칙)
+ mock = _channels_from_mockurls(hospital["url"])
+
+ # 사용자가 'gangnamunni.com/...' 같이 scheme/www 없이 줘도 _with_scheme이 https://www. 보강.
+ ig_url = _with_scheme(body.channels.instagram) or mock.get("instagram")
+ fb_url = _with_scheme(body.channels.facebook) or mock.get("facebook")
+ nb_url = _with_scheme(body.channels.naver_blog) or mock.get("naver_blog")
+ yt_url = _with_scheme(body.channels.youtube) or mock.get("youtube")
+ gu_url = _with_scheme(body.channels.gangnam_unni) or mock.get("gangnam_unni")
+
+ ig_id = await insert_instagram_row(hospital_id, ig_url) if ig_url else None
+ fb_id = await insert_facebook_row(hospital_id, fb_url) if fb_url else None
+ nb_id = await insert_naver_blog_row(hospital_id, nb_url) if nb_url else None
+ yt_id = await insert_youtube_row(hospital_id, yt_url) if yt_url else None
+ gu_id = await insert_gangnam_unni_row(hospital_id, gu_url) if gu_url else None
analysis_run_id = await insert_analysis_run(
analysis_run_id, hospital_id, hospital["owner_user_id"],
ig_id, fb_id, nb_id, yt_id, gu_id,
)
- background_tasks.add_task(run_pipeline, analysis_run_id)
+ extra_channels = {
+ "tiktok": body.channels.tiktok or mock.get("tiktok"),
+ "instagram_en": body.channels.instagram_en or mock.get("instagram_en"),
+ "facebook_en": body.channels.facebook_en or mock.get("facebook_en"),
+ "kakao_talk": body.channels.kakao_talk or mock.get("kakao_talk"),
+ "naver_cafe": body.channels.naver_cafe or mock.get("naver_cafe"),
+ }
+ logger.info("[analysis] main+extra channels resolved (mock_matched=%s)", bool(mock))
+ background_tasks.add_task(run_pipeline, analysis_run_id, extra_channels)
return AnalysisStartResponse(
analysis_run_id=analysis_run_id,
diff --git a/app/api/plan.py b/app/api/plan.py
index ba59702..41065f6 100644
--- a/app/api/plan.py
+++ b/app/api/plan.py
@@ -1,8 +1,9 @@
import json
import logging
from fastapi import APIRouter, Depends, HTTPException, Response
-from common.db import fetchone
+from common.db import fetchone, fetch_raw
from common.deps import verify_api_key
+from common.utils import _with_scheme
from integrations.llm.schemas.plan import PlanOutput
from models.plan import PlanApiResponse
@@ -14,7 +15,7 @@ logger = logging.getLogger(__name__)
async def get_plan(run_id: str):
logger.info("GET /api/plan/%s", run_id)
row = await fetchone(
- "SELECT ar.plan_data, ar.created_at, h.hospital_name, h.hospital_name_en, h.url"
+ "SELECT ar.plan_data, ar.created_at, ar.gangnam_unni_data_id, h.hospital_name, h.hospital_name_en, h.url"
" FROM analysis_runs ar"
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
" WHERE ar.analysis_run_id = %s",
@@ -26,11 +27,13 @@ async def get_plan(run_id: str):
return Response(status_code=204)
data = json.loads(row["plan_data"]) if isinstance(row["plan_data"], str) else row["plan_data"]
plan = PlanOutput(**data)
+ gangnam_unni = await fetch_raw("gangnam_unni_data", row["gangnam_unni_data_id"]) or {}
+ clinic_name = gangnam_unni.get("name") or row["hospital_name"]
return PlanApiResponse(
id=run_id,
- clinic_name=row["hospital_name"],
+ clinic_name=clinic_name,
clinic_name_en=row["hospital_name_en"],
created_at=str(row["created_at"]),
- target_url=row["url"],
+ target_url=_with_scheme(row["url"]),
**plan.model_dump(),
)
diff --git a/app/api/report.py b/app/api/report.py
index 15f93a6..f39d8ce 100644
--- a/app/api/report.py
+++ b/app/api/report.py
@@ -3,6 +3,7 @@ import logging
from fastapi import APIRouter, Depends, HTTPException, Response
from common.db import fetchone
from common.deps import verify_api_key
+from common.utils import _with_scheme
from integrations.llm.schemas.report import ReportOutput
from models.report import MarketingReportResponse
@@ -31,6 +32,6 @@ async def get_report(run_id: str):
clinic_name=row["hospital_name"],
clinic_name_en=row["hospital_name_en"],
created_at=str(row["created_at"]),
- target_url=row["url"],
+ target_url=_with_scheme(row["url"]),
**llm_output.model_dump(exclude={"id", "created_at", "target_url"}),
)
diff --git a/app/common/db.py b/app/common/db.py
index 9012579..608d6fb 100644
--- a/app/common/db.py
+++ b/app/common/db.py
@@ -263,6 +263,19 @@ async def save_hospital_raw_data(hospital_id: str, data: dict, analysis_run_id:
await _insert_hospital_history(hospital_id, analysis_run_id)
+async def merge_hospital_raw_data(hospital_id: str, patch: dict) -> None:
+ """hospital_baseinfo.raw_data를 읽어 patch를 top-level 병합 후 저장 (read-modify-write).
+ 부가 수집 단계들이 순차로 raw_data에 키를 덧붙일 때 사용."""
+ row = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,))
+ raw = row["raw_data"] if row else None
+ raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {})
+ raw_data.update(patch)
+ await execute(
+ "UPDATE hospital_baseinfo SET raw_data = %s WHERE hospital_id = %s",
+ (json.dumps(raw_data, ensure_ascii=False), hospital_id),
+ )
+
+
async def get_market_analysis(analysis_run_id: str) -> dict:
rows = await fetchall(
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",
diff --git a/app/common/utils.py b/app/common/utils.py
index aca1d13..5d64de6 100644
--- a/app/common/utils.py
+++ b/app/common/utils.py
@@ -1,8 +1,11 @@
import os
import asyncio
+import logging
from http import HTTPMethod
import httpx
+logger = logging.getLogger(__name__)
+
REQUEST_TIMEOUT = 60
@@ -37,3 +40,48 @@ async def http_request(
print(f" [error] {label} → {e}")
return None
return None
+
+
+async def _run_optional_step(coro, label: str) -> None:
+ """부가 단계 실행 헬퍼: 예외를 삼키고 경고 로그만 남겨 호출측 흐름이 멈추지 않게 격리."""
+ try:
+ await coro
+ except Exception as e:
+ logger.warning("%s 실패 (무시하고 진행): %s", label, e)
+
+
+def _normalize_homepage(url: str) -> str:
+ """URL을 scheme/www/끝슬래시 제거 + 소문자로 정규화 (homepage 매칭용)."""
+ u = (url or "").strip().lower()
+ for p in ("https://", "http://"):
+ if u.startswith(p):
+ u = u[len(p):]
+ if u.startswith("www."):
+ u = u[4:]
+ return u.rstrip("/")
+
+
+# SSL 인증서가 www.* 에만 유효한 도메인 — bare 도메인이면 사용자 클릭 시 브라우저 SSL warning 뜸.
+_WWW_REQUIRED = ("gangnamunni.com", "facebook.com", "instagram.com", "toxnfill.com")
+
+
+def _with_scheme(u: str | None) -> str | None:
+ """scheme 없는 URL에 https:// 보정 (수집기/링크 표시용). 빈 값은 None.
+ + 중첩된 https://가 끼어있으면 마지막 URL만 추출 (LLM이 가끔 'https://www.X/https://Y' 같이 만듦).
+ + SSL 엄격 도메인(gangnamunni/facebook/instagram)은 www. 자동 보강."""
+ if not u:
+ return None
+ u = u.strip()
+ # 'https://www.facebook.com/https://facebook.com/X' 같은 중첩 → 마지막 'http(s)://' 부터 잘라 사용
+ last = max(u.rfind("https://"), u.rfind("http://"))
+ if last > 0:
+ u = u[last:]
+ if "://" not in u:
+ u = "https://" + u
+ # scheme 뒤가 www. 없이 SSL 엄격 도메인이면 www. 추가
+ for dom in _WWW_REQUIRED:
+ for scheme in ("https://", "http://"):
+ if u.startswith(scheme + dom):
+ u = scheme + "www." + u[len(scheme):]
+ break
+ return u
diff --git a/app/integrations/apify.py b/app/integrations/apify.py
index 9914fa6..f1b77fb 100644
--- a/app/integrations/apify.py
+++ b/app/integrations/apify.py
@@ -1,16 +1,32 @@
+import asyncio
from http import HTTPMethod
from urllib.parse import urlparse
from common.utils import http_request
APIFY_BASE = "https://api.apify.com/v2"
+# Instagram: profile + highlights 두 actor 직접 호출.
+IG_PROFILE_ACTOR = "coderx~instagram-profile-scraper-bio-posts"
+IG_HIGHLIGHTS_ACTOR = "igview-owner~instagram-highlights-scraper"
+
+# Facebook: pages + posts 두 actor 직접 호출.
+FB_PAGES_ACTOR = "apify~facebook-pages-scraper"
+FB_POSTS_ACTOR = "apify~facebook-posts-scraper"
+
+# TikTok
+TIKTOK_ACTOR = "clockworks~tiktok-scraper"
+
+
+def _ig_username(url: str) -> str:
+ return urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
+
class ApifyClient:
def __init__(self, token: str, wait_for_finish: int = 120):
self.token = token
self.wait_for_finish = wait_for_finish
- async def _run_actor(self, actor_id: str, input_data: dict) -> list[dict]:
+ async def _run_actor(self, actor_id: str, input_data: dict, limit: int = 20) -> list[dict]:
resp = await http_request(
HTTPMethod.POST,
url=f"{APIFY_BASE}/acts/{actor_id}/runs",
@@ -26,33 +42,53 @@ class ApifyClient:
items_resp = await http_request(
HTTPMethod.GET,
url=f"{APIFY_BASE}/datasets/{dataset_id}/items",
- params={"token": self.token, "limit": 20},
+ params={"token": self.token, "limit": limit},
label=f"apify-dataset-{dataset_id}",
)
if not items_resp or not items_resp.is_success:
return []
return items_resp.json()
- async def fetch_instagram_profile(self, url: str) -> dict | None:
- username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
- items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12})
+ async def fetch_instagram_profile(self, username: str) -> dict | None:
+ items = await self._run_actor(IG_PROFILE_ACTOR, {"usernames": [username]})
return items[0] if items else None
+ async def fetch_instagram_highlights(self, username: str) -> list[dict]:
+ return await self._run_actor(IG_HIGHLIGHTS_ACTOR, {"usernames": [username]})
+
async def get_instagram_profile(self, url: str) -> dict | None:
- profile = await self.fetch_instagram_profile(url)
- if not profile or profile.get("error"):
+ username = _ig_username(url)
+ # profile·highlights 두 actor를 병렬 호출 (highlights 실패해도 profile만 있으면 진행)
+ profile, highlights = await asyncio.gather(
+ self.fetch_instagram_profile(username),
+ self.fetch_instagram_highlights(username),
+ return_exceptions=True,
+ )
+ if isinstance(profile, Exception) or not profile or profile.get("error"):
return None
+ if isinstance(highlights, Exception):
+ highlights = []
+ # 프로필상 하이라이트가 있다고 하면(highlight_reel_count>0) 빈 결과일 때 최대 2회 재시도.
+ if not highlights and (profile.get("highlight_reel_count", 0) or profile.get("highlightReelCount", 0)) > 0:
+ for _ in range(2):
+ retry = await self.fetch_instagram_highlights(username)
+ if retry:
+ highlights = retry
+ break
return {
"username": profile["username"],
+ "profileImage": profile.get("hdProfilePicUrl") or profile.get("profilePicUrl"),
"followers": profile.get("followersCount", 0),
"following": profile.get("followsCount", 0),
"posts": profile.get("postsCount", 0),
"bio": profile.get("biography", ""),
+ "category": profile.get("businessCategoryName") or "",
"isBusinessAccount": profile.get("isBusinessAccount", False),
- #"externalUrl": profile.get("externalUrl"), LLM에 혼동을 주는 듯 하여 비활성화
+ #"externalUrl": profile.get("externalUrl"), LLM에 혼동을 주는 듯 하여 비활성화
+ "highlights": [h["highlightTitle"] for h in (highlights or []) if isinstance(h, dict) and h.get("highlightTitle")],
"latestPosts": [
{
- "type": p.get("type"),
+ "type": p.get("mediaType") or p.get("type"),
"likes": p.get("likesCount", 0),
"comments": p.get("commentsCount", 0),
"caption": (p.get("caption") or "")[:500],
@@ -62,87 +98,119 @@ class ApifyClient:
],
}
- async def fetch_instagram_posts(self, url: str, limit: int = 20) -> list[dict]:
- username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
- return await self._run_actor("apify~instagram-post-scraper", {
- "username": [f"https://www.instagram.com/{username}/"],
- "resultsLimit": limit,
- })
-
- async def get_instagram_posts(self, url: str, limit: int = 20) -> dict:
- items = await self.fetch_instagram_posts(url, limit)
- posts = [
- {
- "id": p["id"],
- "type": p.get("type"),
- "url": p.get("url"),
- "caption": (p.get("caption") or "")[:500],
- "hashtags": p.get("hashtags", []),
- "likesCount": p.get("likesCount", 0),
- "commentsCount": p.get("commentsCount", 0),
- "timestamp": p.get("timestamp"),
- }
- for p in items
- ]
- n = len(posts) or 1
- return {
- "posts": posts,
- "totalPosts": len(posts),
- "avgLikes": round(sum(p["likesCount"] for p in posts) / n),
- "avgComments": round(sum(p["commentsCount"] for p in posts) / n),
- }
-
- async def fetch_instagram_reels(self, url: str, limit: int = 15) -> list[dict]:
- username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
- print(username)
- return await self._run_actor("apify~instagram-reel-scraper", {
- "username": [f"https://www.instagram.com/{username}/reels/"],
- "resultsLimit": limit,
- })
-
- async def get_instagram_reels(self, url: str, limit: int = 15) -> dict:
- items = await self.fetch_instagram_reels(url, limit)
- reels = [
- {
- "id": r["id"],
- "url": r.get("url"),
- "caption": (r.get("caption") or "")[:500],
- "hashtags": r.get("hashtags", []),
- "likesCount": r.get("likesCount", 0),
- "commentsCount": r.get("commentsCount", 0),
- "videoViewCount": r.get("videoViewCount", 0),
- "videoPlayCount": r.get("videoPlayCount", 0),
- "videoDuration": r.get("videoDuration", 0),
- "timestamp": r.get("timestamp"),
- }
- for r in items
- ]
- n = len(reels) or 1
- return {
- "reels": reels,
- "totalReels": len(reels),
- "avgViews": round(sum(r["videoViewCount"] for r in reels) / n),
- "avgPlays": round(sum(r["videoPlayCount"] for r in reels) / n),
- }
+ # 인스타 post 스크래퍼는 현재 파이프라인 미사용 — 비활성화 (필요 시 복구)
+ # async def fetch_instagram_posts(self, url: str, limit: int = 20) -> list[dict]:
+ # username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
+ # return await self._run_actor("apify~instagram-post-scraper", {
+ # "directUrls": [f"https://www.instagram.com/{username}/"],
+ # "resultsLimit": limit,
+ # })
+ #
+ # async def get_instagram_posts(self, url: str, limit: int = 20) -> dict:
+ # items = await self.fetch_instagram_posts(url, limit)
+ # posts = [
+ # {
+ # "id": p["id"],
+ # "type": p.get("type"),
+ # "url": p.get("url"),
+ # "caption": (p.get("caption") or "")[:500],
+ # "hashtags": p.get("hashtags", []),
+ # "likesCount": p.get("likesCount", 0),
+ # "commentsCount": p.get("commentsCount", 0),
+ # "timestamp": p.get("timestamp"),
+ # }
+ # for p in items
+ # ]
+ # n = len(posts) or 1
+ # return {
+ # "posts": posts,
+ # "totalPosts": len(posts),
+ # "avgLikes": round(sum(p["likesCount"] for p in posts) / n),
+ # "avgComments": round(sum(p["commentsCount"] for p in posts) / n),
+ # }
async def fetch_facebook_page(self, page_url: str) -> dict | None:
- items = await self._run_actor("apify~facebook-pages-scraper", {"startUrls": [{"url": page_url}]})
+ items = await self._run_actor(FB_PAGES_ACTOR, {"startUrls": [{"url": page_url}]})
return items[0] if items else None
+ async def fetch_facebook_posts(self, page_url: str, limit: int = 20) -> list[dict]:
+ return await self._run_actor(
+ FB_POSTS_ACTOR, {"startUrls": [{"url": page_url}], "resultsLimit": limit}, limit=limit,
+ )
+
async def get_facebook_page(self, page_url: str) -> dict | None:
- page = await self.fetch_facebook_page(page_url)
- if not page:
+ # pages·posts 두 task 병렬 호출 (posts 실패해도 page만 있으면 진행)
+ page, posts = await asyncio.gather(
+ self.fetch_facebook_page(page_url),
+ self.fetch_facebook_posts(page_url),
+ return_exceptions=True,
+ )
+ if isinstance(page, Exception) or not page:
return None
+ if isinstance(posts, Exception):
+ posts = []
return {
"pageName": page.get("title") or page.get("name"),
+ "profileImage": page.get("profilePictureUrl") or page.get("profilePhoto") or page.get("profilePic"),
"pageUrl": page.get("pageUrl", page_url),
"followers": page.get("followers", 0),
- "likes": page.get("likes", 0),
+ "following": page.get("followings", 0),
+ "reviews": page.get("ratingCount", 0),
"categories": page.get("categories", []),
- "email": page.get("email"),
- "phone": page.get("phone"),
- "website": page.get("website"),
- "address": page.get("address"),
+ "website": page.get("website") or page.get("websites"),
"intro": page.get("intro"),
- "rating": page.get("rating"),
+ "latestPosts": [
+ {
+ "text": (p.get("text") or "")[:160],
+ "likes": p.get("likes", 0),
+ "reactions": p.get("topReactionsCount", 0),
+ "shares": p.get("shares", 0),
+ "views": p.get("viewsCount") or 0,
+ "isVideo": p.get("isVideo", False),
+ "timestamp": p.get("time") or p.get("timestamp"),
+ }
+ for p in (posts or []) if isinstance(p, dict)
+ ],
+ }
+
+ async def fetch_tiktok_profile(self, url: str) -> list[dict]:
+ user = urlparse(url).path.strip("/").lstrip("@").split("/")[0] if "://" in url else url.lstrip("@")
+ return await self._run_actor(TIKTOK_ACTOR, {
+ "profiles": [user],
+ "resultsPerPage": 10,
+ "profileScrapeSections": ["videos"],
+ "profileSorting": "latest",
+ "shouldDownloadVideos": False,
+ "shouldDownloadCovers": False,
+ "shouldDownloadSubtitles": False,
+ })
+
+ async def get_tiktok_profile(self, url: str) -> dict | None:
+ items = await self.fetch_tiktok_profile(url)
+ if not items:
+ return None
+ author = (items[0] or {}).get("authorMeta") or {}
+ videos = [
+ {
+ "title": (v.get("text") or "")[:300],
+ "playCount": v.get("playCount", 0),
+ "diggCount": v.get("diggCount", 0),
+ "commentCount": v.get("commentCount", 0),
+ "shareCount": v.get("shareCount", 0),
+ "createTime": v.get("createTimeISO"),
+ "url": v.get("webVideoUrl"),
+ }
+ for v in items if isinstance(v, dict)
+ ]
+ return {
+ "handle": author.get("name"),
+ "profileImage": author.get("avatar"),
+ "nickname": author.get("nickName"),
+ "followers": author.get("fans", 0),
+ "following": author.get("following", 0),
+ "likes": author.get("heart", 0),
+ "videoCount": author.get("video", 0),
+ "verified": author.get("verified", False),
+ "bio": author.get("signature", ""),
+ "recentVideos": videos[:10],
}
diff --git a/app/integrations/color_extractor.py b/app/integrations/color_extractor.py
new file mode 100644
index 0000000..6419061
--- /dev/null
+++ b/app/integrations/color_extractor.py
@@ -0,0 +1,275 @@
+"""홈페이지 HTML/CSS에서 hex 색상 직접 추출 + 빈도 기반 brand palette 산출.
+
+Vision LLM에 의존하지 않고 페이지의 실제 CSS 값을 정규식으로 잡음.
+로고만 분석하는 Vision보다 사이트 전체 컬러 시스템 (primary/secondary/background/text)을 더 정확히 추출.
+"""
+import logging
+import re
+import ssl
+from collections import Counter
+from urllib.parse import urljoin, urlparse
+import httpx
+
+logger = logging.getLogger(__name__)
+
+
+def _make_ssl_context() -> ssl.SSLContext:
+ """오래된 한국 의료 사이트들이 SSL DH_KEY_TOO_SMALL / cipher 약함 등으로 차단되는 문제 우회.
+ 보안 등급 1로 낮춤 + cert 검증 유지."""
+ ctx = ssl.create_default_context()
+ try:
+ ctx.set_ciphers("DEFAULT@SECLEVEL=1")
+ except ssl.SSLError:
+ pass
+ return ctx
+
+
+async def _fetch_html(url: str, timeout: float = 20.0) -> tuple[int, str]:
+ """SSL/검증 단계별 fallback으로 HTML 받기. 그랜드/톡스앤필 같은 oldsite 대응."""
+ headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}
+ # 1차: 표준 검증
+ try:
+ async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers) as c:
+ r = await c.get(url)
+ return r.status_code, r.text
+ except (httpx.ConnectError, httpx.ReadError, ssl.SSLError) as e:
+ logger.info("[fetch] %s standard SSL failed: %s — fallback to weak cipher", url, e)
+ # 2차: 약한 cipher 허용
+ try:
+ async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers, verify=_make_ssl_context()) as c:
+ r = await c.get(url)
+ return r.status_code, r.text
+ except (httpx.ConnectError, httpx.ReadError, ssl.SSLError) as e:
+ logger.info("[fetch] %s weak cipher failed: %s — fallback to verify=False", url, e)
+ # 3차: SSL 검증 끔 (host mismatch 등)
+ try:
+ async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers, verify=False) as c:
+ r = await c.get(url)
+ return r.status_code, r.text
+ except Exception as e:
+ logger.warning("[fetch] %s all fallbacks failed: %s", url, e)
+ return 0, ""
+
+LOGO_IMG_PATTERNS = [
+ # 1)
+ re.compile(r'
]*\bclass=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE),
+ # 2)
+ re.compile(r'
]*\bsrc=["\']([^"\']+)["\'][^>]*\bclass=["\'][^"\']*\blogo\b[^"\']*["\']', re.IGNORECASE),
+ # 3)
+ re.compile(r'
]*\bid=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE),
+ # 4)
+ re.compile(r'
]*\balt=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE),
+ # 5) <...nested...>
+ re.compile(r'<(?:a|h[1-6]|div|span)[^>]*\b(?:class|id)=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*>(?:[^<]|<(?!img))*
]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE | re.DOTALL),
+ # 6) inline background-image:
+ re.compile(r'<(?:a|div|span|h[1-6])[^>]*\b(?:class|id)=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bstyle=["\'][^"\']*background(?:-image)?\s*:\s*url\(\s*["\']?([^"\')\s]+)', re.IGNORECASE),
+ # 7) inline background-image: (속성 순서 반대)
+ re.compile(r'<(?:a|div|span|h[1-6])[^>]*\bstyle=["\'][^"\']*background(?:-image)?\s*:\s*url\(\s*["\']?([^"\')\s]+)[^"\']*["\'][^>]*\b(?:class|id)=["\'][^"\']*\blogo\b', re.IGNORECASE),
+ # 8) src 자체에 "logo" 포함 (header_logo.png, brand-logo.svg 등)
+ re.compile(r'
]*\bsrc=["\']([^"\']*\blogo\b[^"\']*\.(?:png|svg|jpe?g|webp)[^"\']*)["\']', re.IGNORECASE),
+ # 9) ...
(헤더 영역 첫 img)
+ re.compile(r']*>(?:[^<]|<(?!img))*
]*\bsrc=["\']([^"\']+\.(?:png|svg|jpe?g|webp)[^"\']*)["\']', re.IGNORECASE | re.DOTALL),
+ # 10)