o2o-infinith-backend/app/services/enrichment.py

179 lines
7.9 KiB
Python

import asyncio
import json
import logging
import os
from urllib.parse import urlparse
from common.db import fetchone, fetch_raw, merge_hospital_raw_data
from common.utils import get_env
from integrations.apify import ApifyClient
from integrations.vision import VisionClient
from integrations.color_extractor import extract_brand_assets_from_site
from services.facebook_audit import transform_for_storage as transform_facebook
logger = logging.getLogger(__name__)
async def collect_brand_assets(analysis_run_id: str, hospital_id: str) -> None:
"""홈페이지에서 로고 URL + brand hex 색상을 뽑아 raw_data["brandAssets"]에 저장.
- 로고 URL/hex: HTML·CSS 정규식 (color_extractor) — Vision 의존 X, 사이트 전체 컬러 시스템이 더 정확.
- 로고 정성 묘사(심볼/워드마크/톤): Gemini Vision (GEMINI_API_KEY 없으면 색상만 저장하고 skip).
"""
logger.info("[brand_assets] start run=%s", analysis_run_id)
row = await fetchone(
"SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row:
return
raw = row["raw_data"]
raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {})
branding = raw_data.get("branding") or {}
homepage_url = row["url"]
# 0~1. 사이트 1회 fetch로 logo URL + brand hex 동시 추출 (img/background-image/CSS .logo, Vision 의존 X)
site = await extract_brand_assets_from_site(homepage_url) if homepage_url else {}
html_logo_url = site.get("logo_url")
css_colors = site.get("colors") or {}
if html_logo_url:
logger.info("[brand_assets] HTML logo found: %s", html_logo_url)
if css_colors:
logger.info("[brand_assets] css colors: %s", css_colors.get("brand_colors"))
# 2. 로고/대표 이미지 후보 (logo → og:image → favicon 순)
logo_url = html_logo_url or branding.get("logoUrl")
og_image = branding.get("ogImage")
favicon = branding.get("faviconUrl")
candidates: list[tuple[str, str]] = []
if logo_url: candidates.append(("logo", logo_url))
if og_image: candidates.append(("og", og_image))
if favicon: candidates.append(("favicon", favicon))
if homepage_url:
parsed = urlparse(homepage_url)
if parsed.scheme and parsed.netloc:
candidates.append(("favicon", f"{parsed.scheme}://{parsed.netloc}/favicon.ico"))
if not candidates and not css_colors:
logger.info("[brand_assets] skip — no logo/og/favicon candidates and no CSS colors")
return
# 3. Vision은 로고 정성 묘사만 (hex는 CSS 추출이 더 정확). 키 없으면 색상만 저장.
result: dict = {}
used_kind: str | None = None
api_key = os.getenv("GEMINI_API_KEY")
if api_key and candidates:
vc = VisionClient(api_key)
for kind, cand in candidates:
result = await vc.analyze_brand_assets(logo_url=cand, homepage_url=homepage_url)
if result:
used_kind = kind
break
# favicon으로만 분석된 경우 진짜 로고가 아니므로 logo URL은 박지 않음 (묘사는 OK)
if result and used_kind == "favicon" and result.get("logo_images"):
result["logo_images"] = {"circle": None, "horizontal": None, "korean": None}
elif not api_key:
logger.info("[brand_assets] GEMINI_API_KEY not set — 색상만 저장, Vision 묘사 skip")
# 4. CSS에서 추출한 brand_colors/palette를 Vision보다 우선 사용
if css_colors:
if css_colors.get("brand_colors"):
result["brand_colors"] = css_colors["brand_colors"]
if css_colors.get("color_palette"):
result["color_palette"] = css_colors["color_palette"]
result["color_source"] = "html+css"
elif result:
result["color_source"] = "vision"
if result:
result["logo_source"] = used_kind or "none"
await merge_hospital_raw_data(hospital_id, {"brandAssets": result})
logger.info("[brand_assets] done keys=%s", list(result.keys()) if result else None)
async def collect_extra_channels(
analysis_run_id: str,
hospital_id: str,
tiktok_url: str | None = None,
instagram_en_url: str | None = None,
facebook_en_url: str | None = None,
) -> None:
"""틱톡 / 인스타 EN / 페북 EN 수집 → hospital raw_data에 저장 (별도 테이블 없이).
인스타EN·페북EN은 기존 Apify 수집기 재사용, 틱톡은 신규 액터."""
apify = ApifyClient(get_env("APIFY_API_TOKEN"))
jobs: dict = {}
if instagram_en_url:
jobs["instagramEn"] = apify.get_instagram_profile(instagram_en_url)
if facebook_en_url:
jobs["facebookEn"] = apify.get_facebook_page(facebook_en_url)
if tiktok_url:
jobs["tiktok"] = apify.get_tiktok_profile(tiktok_url)
if not jobs:
return
logger.info("[extra_channels] start run=%s channels=%s", analysis_run_id, list(jobs))
done = await asyncio.gather(*jobs.values(), return_exceptions=True)
results: dict = {}
for key, res in zip(jobs.keys(), done):
if isinstance(res, Exception):
logger.warning("[extra_channels] %s 수집 실패: %s", key, res)
elif res:
if key == "facebookEn":
res = transform_facebook(res)
results[key] = res
if not results:
logger.info("[extra_channels] 수집 결과 없음 run=%s", analysis_run_id)
return
await merge_hospital_raw_data(hospital_id, results)
logger.info("[extra_channels] done run=%s keys=%s", analysis_run_id, list(results))
async def collect_channel_logos(analysis_run_id: str, hospital_id: str) -> None:
"""채널별 프로필 이미지(로고)를 모아 Gemini Vision으로 설명 + 공식 로고 일치 여부 평가.
→ hospital raw_data["channelLogos"]에 저장. GEMINI_API_KEY 없으면 skip.
brand_assets(공식 로고)·extra_channels(틱톡/EN profileImage) 다음에 실행돼야 함."""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
logger.info("[channel_logos] skip — GEMINI_API_KEY 없음")
return
hrow = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,))
raw = hrow["raw_data"] if hrow else None
raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {})
official = ((raw_data.get("brandAssets") or {}).get("logo_images") or {}).get("horizontal")
run = await fetchone(
"SELECT instagram_data_id, facebook_data_id, youtube_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
logos: list[dict] = []
# 전용 테이블 채널 (KR)
for ch, table, col in [
("Instagram", "instagram_data", "instagram_data_id"),
("Facebook", "facebook_data", "facebook_data_id"),
("YouTube", "youtube_data", "youtube_data_id"),
]:
rid = (run or {}).get(col)
if rid:
d = await fetch_raw(table, rid) or {}
if d.get("profileImage"):
logos.append({"channel": ch, "url": d["profileImage"]})
# 추가 채널 (hospital raw_data)
for ch, key in [("Instagram EN", "instagramEn"), ("Facebook EN", "facebookEn"), ("TikTok", "tiktok")]:
img = (raw_data.get(key) or {}).get("profileImage")
if img:
logos.append({"channel": ch, "url": img})
if not logos:
logger.info("[channel_logos] skip — 채널 프로필 이미지 없음")
return
logger.info("[channel_logos] start run=%s channels=%s official=%s", analysis_run_id,
[l["channel"] for l in logos], bool(official))
result = await VisionClient(api_key).describe_channel_logos(official, logos)
if result:
# Vision이 못 본 채널도 url은 채워둠 (프론트에서 이미지 표시용)
result["logos"] = logos
await merge_hospital_raw_data(hospital_id, {"channelLogos": result})
logger.info("[channel_logos] done run=%s keys=%s", analysis_run_id, list(result.keys()) if result else None)