수집 파이프라인 통합 (enrichment 분리, raw_data merge 헬퍼)

- enrichment.py: brand_assets/extra_channels/channel_logos 수집 분리
- db.merge_hospital_raw_data: raw_data read-modify-write 헬퍼
- utils: _run_optional_step·URL 헬퍼 공통화

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
insta-data
Mina Choi 2026-05-27 13:27:39 +09:00
parent 843ccdb806
commit 4855d44381
5 changed files with 241 additions and 3 deletions

View File

@ -263,6 +263,19 @@ async def save_hospital_raw_data(hospital_id: str, data: dict, analysis_run_id:
await _insert_hospital_history(hospital_id, analysis_run_id)
async def merge_hospital_raw_data(hospital_id: str, patch: dict) -> None:
"""hospital_baseinfo.raw_data를 읽어 patch를 top-level 병합 후 저장 (read-modify-write).
부가 수집 단계들이 순차로 raw_data에 키를 덧붙일 사용."""
row = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,))
raw = row["raw_data"] if row else None
raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {})
raw_data.update(patch)
await execute(
"UPDATE hospital_baseinfo SET raw_data = %s WHERE hospital_id = %s",
(json.dumps(raw_data, ensure_ascii=False), hospital_id),
)
async def get_market_analysis(analysis_run_id: str) -> dict:
rows = await fetchall(
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",

View File

@ -1,8 +1,11 @@
import os
import asyncio
import logging
from http import HTTPMethod
import httpx
logger = logging.getLogger(__name__)
REQUEST_TIMEOUT = 60
@ -37,3 +40,27 @@ async def http_request(
print(f" [error] {label}{e}")
return None
return None
async def _run_optional_step(coro, label: str) -> None:
"""부가 단계 실행 헬퍼: 예외를 삼키고 경고 로그만 남겨 호출측 흐름이 멈추지 않게 격리."""
try:
await coro
except Exception as e:
logger.warning("%s 실패 (무시하고 진행): %s", label, e)
def _normalize_homepage(url: str) -> str:
"""URL을 scheme/www/끝슬래시 제거 + 소문자로 정규화 (homepage 매칭용)."""
u = (url or "").strip().lower()
for p in ("https://", "http://"):
if u.startswith(p):
u = u[len(p):]
if u.startswith("www."):
u = u[4:]
return u.rstrip("/")
def _with_scheme(u: str | None) -> str | None:
"""scheme 없는 URL에 https:// 보정 (수집기 파싱용). 빈 값은 None."""
return (u if "://" in u else "https://" + u) if u else None

View File

@ -1,7 +1,7 @@
import asyncio
import logging
from common.db import fetchone
from common.db import (
fetchone,
set_instagram_status, save_instagram_raw_data,
set_facebook_status, save_facebook_raw_data,
set_naver_blog_status, save_naver_blog_raw_data,
@ -9,11 +9,12 @@ from common.db import (
set_gangnam_unni_status, save_gangnam_unni_raw_data,
execute, save_hospital_raw_data,
)
from common.utils import get_env
from common.utils import get_env, _run_optional_step
from integrations.apify import ApifyClient
from integrations.naver import NaverClient
from integrations.youtube import YouTubeClient
from integrations.firecrawl import FirecrawlClient
from services.enrichment import collect_brand_assets, collect_extra_channels, collect_channel_logos
logger = logging.getLogger(__name__)
@ -74,6 +75,9 @@ async def collect_all(
naver_blog_id: int | None = None,
youtube_id: int | None = None,
gangnam_unni_id: int | None = None,
tiktok_url: str | None = None,
instagram_en_url: str | None = None,
facebook_en_url: str | None = None,
) -> None:
async def _url(table: str, row_id: int) -> str:
row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,))
@ -94,3 +98,18 @@ async def collect_all(
tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id)))
await asyncio.gather(*tasks, return_exceptions=True)
# 아래 3단계는 모두 hospital raw_data를 read-modify-write 하므로 race 방지 위해 순차.
# brand_assets : clinic_info가 채운 branding.logoUrl로 공식 로고/hex 추출
# extra_channels: 틱톡/인스타EN/페북EN 수집
# channel_logos : 공식 로고(brand_assets)+채널 profileImage(extra_channels) 채워진 뒤 Vision 비교
# 부가 기능이라 실패해도 리포트는 나와야 하므로 _run_optional_step으로 각각 격리.
await _run_optional_step(collect_brand_assets(analysis_run_id, hospital_id), "brand_assets")
await _run_optional_step(
collect_extra_channels(
analysis_run_id, hospital_id,
tiktok_url=tiktok_url, instagram_en_url=instagram_en_url, facebook_en_url=facebook_en_url,
),
"extra_channels",
)
await _run_optional_step(collect_channel_logos(analysis_run_id, hospital_id), "channel_logos")

175
app/services/enrichment.py Normal file
View File

@ -0,0 +1,175 @@
import asyncio
import json
import logging
import os
from urllib.parse import urlparse
from common.db import fetchone, fetch_raw, merge_hospital_raw_data
from common.utils import get_env
from integrations.apify import ApifyClient
from integrations.vision import VisionClient
from integrations.color_extractor import extract_brand_assets_from_site
logger = logging.getLogger(__name__)
async def collect_brand_assets(analysis_run_id: str, hospital_id: str) -> None:
"""홈페이지에서 로고 URL + brand hex 색상을 뽑아 raw_data["brandAssets"]에 저장.
- 로고 URL/hex: HTML·CSS 정규식 (color_extractor) Vision 의존 X, 사이트 전체 컬러 시스템이 정확.
- 로고 정성 묘사(심볼/워드마크/): Gemini Vision (GEMINI_API_KEY 없으면 색상만 저장하고 skip).
"""
logger.info("[brand_assets] start run=%s", analysis_run_id)
row = await fetchone(
"SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row:
return
raw = row["raw_data"]
raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {})
branding = raw_data.get("branding") or {}
homepage_url = row["url"]
# 0~1. 사이트 1회 fetch로 logo URL + brand hex 동시 추출 (img/background-image/CSS .logo, Vision 의존 X)
site = await extract_brand_assets_from_site(homepage_url) if homepage_url else {}
html_logo_url = site.get("logo_url")
css_colors = site.get("colors") or {}
if html_logo_url:
logger.info("[brand_assets] HTML logo found: %s", html_logo_url)
if css_colors:
logger.info("[brand_assets] css colors: %s", css_colors.get("brand_colors"))
# 2. 로고/대표 이미지 후보 (logo → og:image → favicon 순)
logo_url = html_logo_url or branding.get("logoUrl")
og_image = branding.get("ogImage")
favicon = branding.get("faviconUrl")
candidates: list[tuple[str, str]] = []
if logo_url: candidates.append(("logo", logo_url))
if og_image: candidates.append(("og", og_image))
if favicon: candidates.append(("favicon", favicon))
if homepage_url:
parsed = urlparse(homepage_url)
if parsed.scheme and parsed.netloc:
candidates.append(("favicon", f"{parsed.scheme}://{parsed.netloc}/favicon.ico"))
if not candidates and not css_colors:
logger.info("[brand_assets] skip — no logo/og/favicon candidates and no CSS colors")
return
# 3. Vision은 로고 정성 묘사만 (hex는 CSS 추출이 더 정확). 키 없으면 색상만 저장.
result: dict = {}
used_kind: str | None = None
api_key = os.getenv("GEMINI_API_KEY")
if api_key and candidates:
vc = VisionClient(api_key)
for kind, cand in candidates:
result = await vc.analyze_brand_assets(logo_url=cand, homepage_url=homepage_url)
if result:
used_kind = kind
break
# favicon으로만 분석된 경우 진짜 로고가 아니므로 logo URL은 박지 않음 (묘사는 OK)
if result and used_kind == "favicon" and result.get("logo_images"):
result["logo_images"] = {"circle": None, "horizontal": None, "korean": None}
elif not api_key:
logger.info("[brand_assets] GEMINI_API_KEY not set — 색상만 저장, Vision 묘사 skip")
# 4. CSS에서 추출한 brand_colors/palette를 Vision보다 우선 사용
if css_colors:
if css_colors.get("brand_colors"):
result["brand_colors"] = css_colors["brand_colors"]
if css_colors.get("color_palette"):
result["color_palette"] = css_colors["color_palette"]
result["color_source"] = "html+css"
elif result:
result["color_source"] = "vision"
if result:
result["logo_source"] = used_kind or "none"
await merge_hospital_raw_data(hospital_id, {"brandAssets": result})
logger.info("[brand_assets] done keys=%s", list(result.keys()) if result else None)
async def collect_extra_channels(
analysis_run_id: str,
hospital_id: str,
tiktok_url: str | None = None,
instagram_en_url: str | None = None,
facebook_en_url: str | None = None,
) -> None:
"""틱톡 / 인스타 EN / 페북 EN 수집 → hospital raw_data에 저장 (별도 테이블 없이).
인스타EN·페북EN은 기존 Apify 수집기 재사용, 틱톡은 신규 액터."""
apify = ApifyClient(get_env("APIFY_API_TOKEN"))
jobs: dict = {}
if instagram_en_url:
jobs["instagramEn"] = apify.get_instagram_profile(instagram_en_url)
if facebook_en_url:
jobs["facebookEn"] = apify.get_facebook_page(facebook_en_url)
if tiktok_url:
jobs["tiktok"] = apify.get_tiktok_profile(tiktok_url)
if not jobs:
return
logger.info("[extra_channels] start run=%s channels=%s", analysis_run_id, list(jobs))
done = await asyncio.gather(*jobs.values(), return_exceptions=True)
results: dict = {}
for key, res in zip(jobs.keys(), done):
if isinstance(res, Exception):
logger.warning("[extra_channels] %s 수집 실패: %s", key, res)
elif res:
results[key] = res
if not results:
logger.info("[extra_channels] 수집 결과 없음 run=%s", analysis_run_id)
return
await merge_hospital_raw_data(hospital_id, results)
logger.info("[extra_channels] done run=%s keys=%s", analysis_run_id, list(results))
async def collect_channel_logos(analysis_run_id: str, hospital_id: str) -> None:
"""채널별 프로필 이미지(로고)를 모아 Gemini Vision으로 설명 + 공식 로고 일치 여부 평가.
hospital raw_data["channelLogos"] 저장. GEMINI_API_KEY 없으면 skip.
brand_assets(공식 로고)·extra_channels(틱톡/EN profileImage) 다음에 실행돼야 ."""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
logger.info("[channel_logos] skip — GEMINI_API_KEY 없음")
return
hrow = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,))
raw = hrow["raw_data"] if hrow else None
raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {})
official = ((raw_data.get("brandAssets") or {}).get("logo_images") or {}).get("horizontal")
run = await fetchone(
"SELECT instagram_data_id, facebook_data_id, youtube_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
logos: list[dict] = []
# 전용 테이블 채널 (KR)
for ch, table, col in [
("Instagram", "instagram_data", "instagram_data_id"),
("Facebook", "facebook_data", "facebook_data_id"),
("YouTube", "youtube_data", "youtube_data_id"),
]:
rid = (run or {}).get(col)
if rid:
d = await fetch_raw(table, rid) or {}
if d.get("profileImage"):
logos.append({"channel": ch, "url": d["profileImage"]})
# 추가 채널 (hospital raw_data)
for ch, key in [("Instagram EN", "instagramEn"), ("Facebook EN", "facebookEn"), ("TikTok", "tiktok")]:
img = (raw_data.get(key) or {}).get("profileImage")
if img:
logos.append({"channel": ch, "url": img})
if not logos:
logger.info("[channel_logos] skip — 채널 프로필 이미지 없음")
return
logger.info("[channel_logos] start run=%s channels=%s official=%s", analysis_run_id,
[l["channel"] for l in logos], bool(official))
result = await VisionClient(api_key).describe_channel_logos(official, logos)
if result:
# Vision이 못 본 채널도 url은 채워둠 (프론트에서 이미지 표시용)
result["logos"] = logos
await merge_hospital_raw_data(hospital_id, {"channelLogos": result})
logger.info("[channel_logos] done run=%s keys=%s", analysis_run_id, list(result.keys()) if result else None)

View File

@ -8,8 +8,9 @@ from services.analysis import run_report_task, run_plan_task
logger = logging.getLogger(__name__)
async def run_pipeline(analysis_run_id: str) -> None:
async def run_pipeline(analysis_run_id: str, extra_channels: dict | None = None) -> None:
logger.info("[pipeline] start run=%s", analysis_run_id)
extra_channels = extra_channels or {}
# ── 1. Collect ──────────────────────────────────────────────────────────
run = await fetchone(
@ -26,6 +27,9 @@ async def run_pipeline(analysis_run_id: str) -> None:
naver_blog_id=run["naver_blog_data_id"],
youtube_id=run["youtube_data_id"],
gangnam_unni_id=run["gangnam_unni_data_id"],
tiktok_url=extra_channels.get("tiktok"),
instagram_en_url=extra_channels.get("instagram_en"),
facebook_en_url=extra_channels.get("facebook_en"),
)
# ── 2. Market ────────────────────────────────────────────────────────────