o2o-infinith-backend/app/services/collect.py

199 lines
9.4 KiB
Python

import asyncio
import logging
from common.db.hospital import update_hospital_status, update_hospital
from common.db.source import select_run_sources, update_raw_info_status, update_raw_info
from common.utils import get_env, _run_optional_step
from integrations.apify import ApifyClient
from integrations.naver import NaverClient
from integrations.youtube import YouTubeClient
from integrations.firecrawl import FirecrawlClient
from models.status import SourceType
from integrations.site_fetcher import fetch_html_and_css
from services.brand_parser import find_logo_url_in_html, extract_brand_colors_from_text
from common.db.source import update_raw_info_merge, update_raw_info_logo_url, select_run_raw_data
from common.db.base import fetchone
from services.facebook_audit import transform_for_storage as transform_facebook
logger = logging.getLogger(__name__)
async def _save_with_logo(info_id: int, data: dict) -> None:
await update_raw_info(info_id, data)
if data.get("profileImage"):
await update_raw_info_logo_url(info_id, data["profileImage"])
async def collect_instagram(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[instagram] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[instagram] failed run=%s", analysis_run_id)
return
await _save_with_logo(info_id, data)
logger.info("[instagram] done run=%s", analysis_run_id)
async def collect_facebook(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[facebook] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[facebook] failed run=%s", analysis_run_id)
return
data = transform_facebook(data)
await _save_with_logo(info_id, data)
logger.info("[facebook] done run=%s", analysis_run_id)
async def collect_naver_blog(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[naver_blog] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[naver_blog] done run=%s", analysis_run_id)
async def collect_youtube(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[youtube] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[youtube] failed run=%s", analysis_run_id)
return
await _save_with_logo(info_id, data)
logger.info("[youtube] done run=%s", analysis_run_id)
async def collect_gangnam_unni(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[gangnam_unni] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[gangnam_unni] done run=%s", analysis_run_id)
async def collect_mainpage(analysis_run_id: str, info_id: int, hospital_id: str, url: str) -> None:
logger.info("[mainpage] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
await update_hospital_status(hospital_id, "processing")
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[mainpage] failed run=%s", analysis_run_id)
return
# 홈페이지 URL 자체도 raw_data 에 박아둬야 brand_assets / 분석 단계에서 mainpage URL 재조회 없이 사용 가능.
data = {**data, "sourceUrl": url}
await update_raw_info(info_id, data)
await update_hospital(hospital_id, data, analysis_run_id=analysis_run_id)
logger.info("[mainpage] done run=%s", analysis_run_id)
async def collect_tiktok(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[tiktok] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_tiktok_profile(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[tiktok] failed run=%s", analysis_run_id)
return
await _save_with_logo(info_id, data)
logger.info("[tiktok] done run=%s", analysis_run_id)
async def collect_naver_cafe(analysis_run_id: str, info_id: int, url: str) -> None:
"""카페는 로그인 필요라 본문 못 봄. URL 활성·cafeId·이름 언급수만 신호로 수집."""
logger.info("[naver_cafe] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_cafe_info(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[naver_cafe] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[naver_cafe] done run=%s", analysis_run_id)
async def collect_kakaotalk(analysis_run_id: str, info_id: int, url: str) -> None:
"""카카오톡은 수집 X — URL 보관만. LLM이 채널 존재 신호로만 사용."""
logger.info("[kakaotalk] url-only run=%s url=%s", analysis_run_id, url)
await update_raw_info(info_id, {"url": url})
async def collect_brand_basics(analysis_run_id: str, info_id: int) -> None:
logger.info("[brand_basics] start run=%s info=%s", analysis_run_id, info_id)
raw = await select_run_raw_data(analysis_run_id)
mainpage = raw.get("mainpage") or {}
homepage_url = mainpage.get("sourceUrl") or ""
branding_meta = mainpage.get("branding") or {}
html, css_texts = await fetch_html_and_css(homepage_url) if homepage_url else ("", [])
html_logo_url = find_logo_url_in_html(html, homepage_url, css_texts) if html else None
css_colors = extract_brand_colors_from_text(html, css_texts, homepage_url) if html else {}
logo_url = html_logo_url or branding_meta.get("logoUrl") or branding_meta.get("ogImage")
if logo_url:
mainpage_row = await fetchone(
"SELECT ri.info_id FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s AND rs.source_type = 'mainpage' LIMIT 1",
(analysis_run_id,),
)
if mainpage_row:
await update_raw_info_logo_url(mainpage_row["info_id"], logo_url)
payload: dict = {}
if css_colors:
if css_colors.get("brand_colors"): payload["brand_colors"] = css_colors["brand_colors"]
if css_colors.get("color_palette"): payload["color_palette"] = css_colors["color_palette"]
payload["color_source"] = "html+css"
if payload:
await update_raw_info_merge(info_id, {"brandAssets": payload})
logger.info("[brand_basics] done logo_url=%s colors=%s", bool(logo_url), bool(payload))
async def collect_all(analysis_run_id: str, hospital_id: str) -> None:
rows = await select_run_sources(analysis_run_id)
# source_type → collector. KR/EN 구분은 collector 입장에서 동일, language 컬럼만 다름.
_collectors = {
SourceType.INSTAGRAM: collect_instagram,
SourceType.FACEBOOK: collect_facebook,
SourceType.NAVER_BLOG: collect_naver_blog,
SourceType.YOUTUBE: collect_youtube,
SourceType.GANGNAM_UNNI: collect_gangnam_unni,
SourceType.TIKTOK: collect_tiktok,
SourceType.NAVER_CAFE: collect_naver_cafe,
SourceType.KAKAOTALK: collect_kakaotalk,
}
tasks = []
branding_info_id: int | None = None
for row in rows:
info_id = row["info_id"]
source_type = row["source_type"]
url = row["url"]
if source_type == SourceType.BRANDING:
branding_info_id = info_id # mainpage·채널 수집 끝난 뒤 2단계에서 사용
continue
if source_type == SourceType.MAINPAGE:
tasks.append(collect_mainpage(analysis_run_id, info_id, hospital_id, url))
elif source_type in _collectors:
tasks.append(_collectors[source_type](analysis_run_id, info_id, url))
await asyncio.gather(*tasks, return_exceptions=True)
# 2단계: branding (brandAssets → channelLogos 한 raw_info 안에 머지). mainpage·채널 raw_data 의존이라 순차.
# 부가 기능이라 실패해도 리포트는 나와야 하므로 _run_optional_step 으로 격리.
if branding_info_id is not None:
await _run_optional_step(collect_brand_basics(analysis_run_id, branding_info_id), "brand_basics")