From 4855d443813337272baeafa2cb73410531593e62 Mon Sep 17 00:00:00 2001 From: Mina Choi Date: Wed, 27 May 2026 13:27:39 +0900 Subject: [PATCH] =?UTF-8?q?=EC=88=98=EC=A7=91=20=ED=8C=8C=EC=9D=B4?= =?UTF-8?q?=ED=94=84=EB=9D=BC=EC=9D=B8=20=ED=86=B5=ED=95=A9=20(enrichment?= =?UTF-8?q?=20=EB=B6=84=EB=A6=AC,=20raw=5Fdata=20merge=20=ED=97=AC?= =?UTF-8?q?=ED=8D=BC)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - enrichment.py: brand_assets/extra_channels/channel_logos 수집 분리 - db.merge_hospital_raw_data: raw_data read-modify-write 헬퍼 - utils: _run_optional_step·URL 헬퍼 공통화 Co-Authored-By: Claude Opus 4.7 (1M context) --- app/common/db.py | 13 +++ app/common/utils.py | 27 ++++++ app/services/collect.py | 23 ++++- app/services/enrichment.py | 175 +++++++++++++++++++++++++++++++++++++ app/services/pipeline.py | 6 +- 5 files changed, 241 insertions(+), 3 deletions(-) create mode 100644 app/services/enrichment.py diff --git a/app/common/db.py b/app/common/db.py index 9012579..608d6fb 100644 --- a/app/common/db.py +++ b/app/common/db.py @@ -263,6 +263,19 @@ async def save_hospital_raw_data(hospital_id: str, data: dict, analysis_run_id: await _insert_hospital_history(hospital_id, analysis_run_id) +async def merge_hospital_raw_data(hospital_id: str, patch: dict) -> None: + """hospital_baseinfo.raw_data를 읽어 patch를 top-level 병합 후 저장 (read-modify-write). + 부가 수집 단계들이 순차로 raw_data에 키를 덧붙일 때 사용.""" + row = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,)) + raw = row["raw_data"] if row else None + raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {}) + raw_data.update(patch) + await execute( + "UPDATE hospital_baseinfo SET raw_data = %s WHERE hospital_id = %s", + (json.dumps(raw_data, ensure_ascii=False), hospital_id), + ) + + async def get_market_analysis(analysis_run_id: str) -> dict: rows = await fetchall( "SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'", diff --git a/app/common/utils.py b/app/common/utils.py index aca1d13..783937d 100644 --- a/app/common/utils.py +++ b/app/common/utils.py @@ -1,8 +1,11 @@ import os import asyncio +import logging from http import HTTPMethod import httpx +logger = logging.getLogger(__name__) + REQUEST_TIMEOUT = 60 @@ -37,3 +40,27 @@ async def http_request( print(f" [error] {label} → {e}") return None return None + + +async def _run_optional_step(coro, label: str) -> None: + """부가 단계 실행 헬퍼: 예외를 삼키고 경고 로그만 남겨 호출측 흐름이 멈추지 않게 격리.""" + try: + await coro + except Exception as e: + logger.warning("%s 실패 (무시하고 진행): %s", label, e) + + +def _normalize_homepage(url: str) -> str: + """URL을 scheme/www/끝슬래시 제거 + 소문자로 정규화 (homepage 매칭용).""" + u = (url or "").strip().lower() + for p in ("https://", "http://"): + if u.startswith(p): + u = u[len(p):] + if u.startswith("www."): + u = u[4:] + return u.rstrip("/") + + +def _with_scheme(u: str | None) -> str | None: + """scheme 없는 URL에 https:// 보정 (수집기 파싱용). 빈 값은 None.""" + return (u if "://" in u else "https://" + u) if u else None diff --git a/app/services/collect.py b/app/services/collect.py index 2752c12..6a68aee 100644 --- a/app/services/collect.py +++ b/app/services/collect.py @@ -1,7 +1,7 @@ import asyncio import logging -from common.db import fetchone from common.db import ( + fetchone, set_instagram_status, save_instagram_raw_data, set_facebook_status, save_facebook_raw_data, set_naver_blog_status, save_naver_blog_raw_data, @@ -9,11 +9,12 @@ from common.db import ( set_gangnam_unni_status, save_gangnam_unni_raw_data, execute, save_hospital_raw_data, ) -from common.utils import get_env +from common.utils import get_env, _run_optional_step from integrations.apify import ApifyClient from integrations.naver import NaverClient from integrations.youtube import YouTubeClient from integrations.firecrawl import FirecrawlClient +from services.enrichment import collect_brand_assets, collect_extra_channels, collect_channel_logos logger = logging.getLogger(__name__) @@ -74,6 +75,9 @@ async def collect_all( naver_blog_id: int | None = None, youtube_id: int | None = None, gangnam_unni_id: int | None = None, + tiktok_url: str | None = None, + instagram_en_url: str | None = None, + facebook_en_url: str | None = None, ) -> None: async def _url(table: str, row_id: int) -> str: row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,)) @@ -94,3 +98,18 @@ async def collect_all( tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id))) await asyncio.gather(*tasks, return_exceptions=True) + + # 아래 3단계는 모두 hospital raw_data를 read-modify-write 하므로 race 방지 위해 순차. + # brand_assets : clinic_info가 채운 branding.logoUrl로 공식 로고/hex 추출 + # extra_channels: 틱톡/인스타EN/페북EN 수집 + # channel_logos : 공식 로고(brand_assets)+채널 profileImage(extra_channels) 채워진 뒤 Vision 비교 + # 부가 기능이라 실패해도 리포트는 나와야 하므로 _run_optional_step으로 각각 격리. + await _run_optional_step(collect_brand_assets(analysis_run_id, hospital_id), "brand_assets") + await _run_optional_step( + collect_extra_channels( + analysis_run_id, hospital_id, + tiktok_url=tiktok_url, instagram_en_url=instagram_en_url, facebook_en_url=facebook_en_url, + ), + "extra_channels", + ) + await _run_optional_step(collect_channel_logos(analysis_run_id, hospital_id), "channel_logos") diff --git a/app/services/enrichment.py b/app/services/enrichment.py new file mode 100644 index 0000000..f0af7c0 --- /dev/null +++ b/app/services/enrichment.py @@ -0,0 +1,175 @@ +import asyncio +import json +import logging +import os +from urllib.parse import urlparse +from common.db import fetchone, fetch_raw, merge_hospital_raw_data +from common.utils import get_env +from integrations.apify import ApifyClient +from integrations.vision import VisionClient +from integrations.color_extractor import extract_brand_assets_from_site + +logger = logging.getLogger(__name__) + + +async def collect_brand_assets(analysis_run_id: str, hospital_id: str) -> None: + """홈페이지에서 로고 URL + brand hex 색상을 뽑아 raw_data["brandAssets"]에 저장. + - 로고 URL/hex: HTML·CSS 정규식 (color_extractor) — Vision 의존 X, 사이트 전체 컬러 시스템이 더 정확. + - 로고 정성 묘사(심볼/워드마크/톤): Gemini Vision (GEMINI_API_KEY 없으면 색상만 저장하고 skip). + """ + logger.info("[brand_assets] start run=%s", analysis_run_id) + row = await fetchone( + "SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s", + (hospital_id,), + ) + if not row: + return + raw = row["raw_data"] + raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {}) + branding = raw_data.get("branding") or {} + homepage_url = row["url"] + + # 0~1. 사이트 1회 fetch로 logo URL + brand hex 동시 추출 (img/background-image/CSS .logo, Vision 의존 X) + site = await extract_brand_assets_from_site(homepage_url) if homepage_url else {} + html_logo_url = site.get("logo_url") + css_colors = site.get("colors") or {} + if html_logo_url: + logger.info("[brand_assets] HTML logo found: %s", html_logo_url) + if css_colors: + logger.info("[brand_assets] css colors: %s", css_colors.get("brand_colors")) + + # 2. 로고/대표 이미지 후보 (logo → og:image → favicon 순) + logo_url = html_logo_url or branding.get("logoUrl") + og_image = branding.get("ogImage") + favicon = branding.get("faviconUrl") + candidates: list[tuple[str, str]] = [] + if logo_url: candidates.append(("logo", logo_url)) + if og_image: candidates.append(("og", og_image)) + if favicon: candidates.append(("favicon", favicon)) + if homepage_url: + parsed = urlparse(homepage_url) + if parsed.scheme and parsed.netloc: + candidates.append(("favicon", f"{parsed.scheme}://{parsed.netloc}/favicon.ico")) + + if not candidates and not css_colors: + logger.info("[brand_assets] skip — no logo/og/favicon candidates and no CSS colors") + return + + # 3. Vision은 로고 정성 묘사만 (hex는 CSS 추출이 더 정확). 키 없으면 색상만 저장. + result: dict = {} + used_kind: str | None = None + api_key = os.getenv("GEMINI_API_KEY") + if api_key and candidates: + vc = VisionClient(api_key) + for kind, cand in candidates: + result = await vc.analyze_brand_assets(logo_url=cand, homepage_url=homepage_url) + if result: + used_kind = kind + break + # favicon으로만 분석된 경우 진짜 로고가 아니므로 logo URL은 박지 않음 (묘사는 OK) + if result and used_kind == "favicon" and result.get("logo_images"): + result["logo_images"] = {"circle": None, "horizontal": None, "korean": None} + elif not api_key: + logger.info("[brand_assets] GEMINI_API_KEY not set — 색상만 저장, Vision 묘사 skip") + + # 4. CSS에서 추출한 brand_colors/palette를 Vision보다 우선 사용 + if css_colors: + if css_colors.get("brand_colors"): + result["brand_colors"] = css_colors["brand_colors"] + if css_colors.get("color_palette"): + result["color_palette"] = css_colors["color_palette"] + result["color_source"] = "html+css" + elif result: + result["color_source"] = "vision" + + if result: + result["logo_source"] = used_kind or "none" + await merge_hospital_raw_data(hospital_id, {"brandAssets": result}) + logger.info("[brand_assets] done keys=%s", list(result.keys()) if result else None) + + +async def collect_extra_channels( + analysis_run_id: str, + hospital_id: str, + tiktok_url: str | None = None, + instagram_en_url: str | None = None, + facebook_en_url: str | None = None, +) -> None: + """틱톡 / 인스타 EN / 페북 EN 수집 → hospital raw_data에 저장 (별도 테이블 없이). + 인스타EN·페북EN은 기존 Apify 수집기 재사용, 틱톡은 신규 액터.""" + apify = ApifyClient(get_env("APIFY_API_TOKEN")) + jobs: dict = {} + if instagram_en_url: + jobs["instagramEn"] = apify.get_instagram_profile(instagram_en_url) + if facebook_en_url: + jobs["facebookEn"] = apify.get_facebook_page(facebook_en_url) + if tiktok_url: + jobs["tiktok"] = apify.get_tiktok_profile(tiktok_url) + if not jobs: + return + + logger.info("[extra_channels] start run=%s channels=%s", analysis_run_id, list(jobs)) + done = await asyncio.gather(*jobs.values(), return_exceptions=True) + results: dict = {} + for key, res in zip(jobs.keys(), done): + if isinstance(res, Exception): + logger.warning("[extra_channels] %s 수집 실패: %s", key, res) + elif res: + results[key] = res + if not results: + logger.info("[extra_channels] 수집 결과 없음 run=%s", analysis_run_id) + return + + await merge_hospital_raw_data(hospital_id, results) + logger.info("[extra_channels] done run=%s keys=%s", analysis_run_id, list(results)) + + +async def collect_channel_logos(analysis_run_id: str, hospital_id: str) -> None: + """채널별 프로필 이미지(로고)를 모아 Gemini Vision으로 설명 + 공식 로고 일치 여부 평가. + → hospital raw_data["channelLogos"]에 저장. GEMINI_API_KEY 없으면 skip. + brand_assets(공식 로고)·extra_channels(틱톡/EN profileImage) 다음에 실행돼야 함.""" + api_key = os.getenv("GEMINI_API_KEY") + if not api_key: + logger.info("[channel_logos] skip — GEMINI_API_KEY 없음") + return + + hrow = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,)) + raw = hrow["raw_data"] if hrow else None + raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {}) + official = ((raw_data.get("brandAssets") or {}).get("logo_images") or {}).get("horizontal") + + run = await fetchone( + "SELECT instagram_data_id, facebook_data_id, youtube_data_id" + " FROM analysis_runs WHERE analysis_run_id = %s", + (analysis_run_id,), + ) + logos: list[dict] = [] + # 전용 테이블 채널 (KR) + for ch, table, col in [ + ("Instagram", "instagram_data", "instagram_data_id"), + ("Facebook", "facebook_data", "facebook_data_id"), + ("YouTube", "youtube_data", "youtube_data_id"), + ]: + rid = (run or {}).get(col) + if rid: + d = await fetch_raw(table, rid) or {} + if d.get("profileImage"): + logos.append({"channel": ch, "url": d["profileImage"]}) + # 추가 채널 (hospital raw_data) + for ch, key in [("Instagram EN", "instagramEn"), ("Facebook EN", "facebookEn"), ("TikTok", "tiktok")]: + img = (raw_data.get(key) or {}).get("profileImage") + if img: + logos.append({"channel": ch, "url": img}) + + if not logos: + logger.info("[channel_logos] skip — 채널 프로필 이미지 없음") + return + + logger.info("[channel_logos] start run=%s channels=%s official=%s", analysis_run_id, + [l["channel"] for l in logos], bool(official)) + result = await VisionClient(api_key).describe_channel_logos(official, logos) + if result: + # Vision이 못 본 채널도 url은 채워둠 (프론트에서 이미지 표시용) + result["logos"] = logos + await merge_hospital_raw_data(hospital_id, {"channelLogos": result}) + logger.info("[channel_logos] done run=%s keys=%s", analysis_run_id, list(result.keys()) if result else None) diff --git a/app/services/pipeline.py b/app/services/pipeline.py index 35b722f..6db955b 100644 --- a/app/services/pipeline.py +++ b/app/services/pipeline.py @@ -8,8 +8,9 @@ from services.analysis import run_report_task, run_plan_task logger = logging.getLogger(__name__) -async def run_pipeline(analysis_run_id: str) -> None: +async def run_pipeline(analysis_run_id: str, extra_channels: dict | None = None) -> None: logger.info("[pipeline] start run=%s", analysis_run_id) + extra_channels = extra_channels or {} # ── 1. Collect ────────────────────────────────────────────────────────── run = await fetchone( @@ -26,6 +27,9 @@ async def run_pipeline(analysis_run_id: str) -> None: naver_blog_id=run["naver_blog_data_id"], youtube_id=run["youtube_data_id"], gangnam_unni_id=run["gangnam_unni_data_id"], + tiktok_url=extra_channels.get("tiktok"), + instagram_en_url=extra_channels.get("instagram_en"), + facebook_en_url=extra_channels.get("facebook_en"), ) # ── 2. Market ────────────────────────────────────────────────────────────