import asyncio import logging from common.db.hospital import update_hospital_status, update_hospital from common.db.source import select_run_sources, update_raw_info_status, update_raw_info from common.utils import get_env, _run_optional_step from integrations.apify import ApifyClient from integrations.naver import NaverClient from integrations.youtube import YouTubeClient from integrations.firecrawl import FirecrawlClient from models.status import SourceType from services.collect_extras import collect_brand_assets, collect_channel_logos from services.facebook_audit import transform_for_storage as transform_facebook logger = logging.getLogger(__name__) async def collect_instagram(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[instagram] start run=%s url=%s", analysis_run_id, url) await update_raw_info_status(info_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url) if data is None: await update_raw_info_status(info_id, "failed") logger.warning("[instagram] failed run=%s", analysis_run_id) return await update_raw_info(info_id, data) logger.info("[instagram] done run=%s", analysis_run_id) async def collect_facebook(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[facebook] start run=%s url=%s", analysis_run_id, url) await update_raw_info_status(info_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url) if data is None: await update_raw_info_status(info_id, "failed") logger.warning("[facebook] failed run=%s", analysis_run_id) return data = transform_facebook(data) await update_raw_info(info_id, data) logger.info("[facebook] done run=%s", analysis_run_id) async def collect_naver_blog(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url) await update_raw_info_status(info_id, "processing") data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url) if data is None: await update_raw_info_status(info_id, "failed") logger.warning("[naver_blog] failed run=%s", analysis_run_id) return await update_raw_info(info_id, data) logger.info("[naver_blog] done run=%s", analysis_run_id) async def collect_youtube(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[youtube] start run=%s url=%s", analysis_run_id, url) await update_raw_info_status(info_id, "processing") data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url) if data is None: await update_raw_info_status(info_id, "failed") logger.warning("[youtube] failed run=%s", analysis_run_id) return await update_raw_info(info_id, data) logger.info("[youtube] done run=%s", analysis_run_id) async def collect_gangnam_unni(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url) await update_raw_info_status(info_id, "processing") data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url) if data is None: await update_raw_info_status(info_id, "failed") logger.warning("[gangnam_unni] failed run=%s", analysis_run_id) return await update_raw_info(info_id, data) logger.info("[gangnam_unni] done run=%s", analysis_run_id) async def collect_mainpage(analysis_run_id: str, info_id: int, hospital_id: str, url: str) -> None: logger.info("[mainpage] start run=%s url=%s", analysis_run_id, url) await update_raw_info_status(info_id, "processing") await update_hospital_status(hospital_id, "processing") data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url) if data is None: await update_raw_info_status(info_id, "failed") logger.warning("[mainpage] failed run=%s", analysis_run_id) return # 홈페이지 URL 자체도 raw_data 에 박아둬야 brand_assets / 분석 단계에서 mainpage URL 재조회 없이 사용 가능. data = {**data, "sourceUrl": url} await update_raw_info(info_id, data) await update_hospital(hospital_id, data, analysis_run_id=analysis_run_id) logger.info("[mainpage] done run=%s", analysis_run_id) async def collect_tiktok(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[tiktok] start run=%s url=%s", analysis_run_id, url) await update_raw_info_status(info_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_tiktok_profile(url) if data is None: await update_raw_info_status(info_id, "failed") logger.warning("[tiktok] failed run=%s", analysis_run_id) return await update_raw_info(info_id, data) logger.info("[tiktok] done run=%s", analysis_run_id) async def collect_naver_cafe(analysis_run_id: str, info_id: int, url: str) -> None: """카페는 로그인 필요라 본문 못 봄. URL 활성·cafeId·이름 언급수만 신호로 수집.""" logger.info("[naver_cafe] start run=%s url=%s", analysis_run_id, url) await update_raw_info_status(info_id, "processing") data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_cafe_info(url) if data is None: await update_raw_info_status(info_id, "failed") logger.warning("[naver_cafe] failed run=%s", analysis_run_id) return await update_raw_info(info_id, data) logger.info("[naver_cafe] done run=%s", analysis_run_id) async def collect_kakaotalk(analysis_run_id: str, info_id: int, url: str) -> None: """카카오톡은 수집 X — URL 보관만. LLM이 채널 존재 신호로만 사용.""" logger.info("[kakaotalk] url-only run=%s url=%s", analysis_run_id, url) await update_raw_info(info_id, {"url": url}) async def collect_all(analysis_run_id: str, hospital_id: str) -> None: rows = await select_run_sources(analysis_run_id) # source_type → collector. KR/EN 구분은 collector 입장에서 동일, language 컬럼만 다름. _collectors = { SourceType.INSTAGRAM: collect_instagram, SourceType.FACEBOOK: collect_facebook, SourceType.NAVER_BLOG: collect_naver_blog, SourceType.YOUTUBE: collect_youtube, SourceType.GANGNAM_UNNI: collect_gangnam_unni, SourceType.TIKTOK: collect_tiktok, SourceType.NAVER_CAFE: collect_naver_cafe, SourceType.KAKAOTALK: collect_kakaotalk, } tasks = [] branding_info_id: int | None = None for row in rows: info_id = row["info_id"] source_type = row["source_type"] url = row["url"] if source_type == SourceType.BRANDING: branding_info_id = info_id # mainpage·채널 수집 끝난 뒤 2단계에서 사용 continue if source_type == SourceType.MAINPAGE: tasks.append(collect_mainpage(analysis_run_id, info_id, hospital_id, url)) elif source_type in _collectors: tasks.append(_collectors[source_type](analysis_run_id, info_id, url)) await asyncio.gather(*tasks, return_exceptions=True) # 2단계: branding (brandAssets → channelLogos 한 raw_info 안에 머지). mainpage·채널 raw_data 의존이라 순차. # 부가 기능이라 실패해도 리포트는 나와야 하므로 _run_optional_step 으로 격리. if branding_info_id is not None: await _run_optional_step(collect_brand_assets(analysis_run_id, branding_info_id), "brand_assets") await _run_optional_step(collect_channel_logos(analysis_run_id, branding_info_id), "channel_logos")