import asyncio import logging from common.db import ( fetchone, set_instagram_status, save_instagram_raw_data, set_facebook_status, save_facebook_raw_data, set_naver_blog_status, save_naver_blog_raw_data, set_youtube_status, save_youtube_raw_data, set_gangnam_unni_status, save_gangnam_unni_raw_data, execute, save_hospital_raw_data, ) from common.utils import get_env, _run_optional_step from integrations.apify import ApifyClient from integrations.naver import NaverClient from integrations.youtube import YouTubeClient from integrations.firecrawl import FirecrawlClient from services.collect_extras import collect_brand_assets, collect_extra_channels, collect_channel_logos from services.facebook_audit import transform_for_storage as transform_facebook logger = logging.getLogger(__name__) async def collect_instagram(analysis_run_id: str, row_id: int, url: str) -> None: logger.info("[instagram] start run=%s url=%s", analysis_run_id, url) await set_instagram_status(row_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url) await save_instagram_raw_data(row_id, data) logger.info("[instagram] done run=%s", analysis_run_id) async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None: logger.info("[facebook] start run=%s url=%s", analysis_run_id, url) await set_facebook_status(row_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url) data = transform_facebook(data) await save_facebook_raw_data(row_id, data) logger.info("[facebook] done run=%s", analysis_run_id) async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> None: logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url) await set_naver_blog_status(row_id, "processing") data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url) await save_naver_blog_raw_data(row_id, data) logger.info("[naver_blog] done run=%s", analysis_run_id) async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None: logger.info("[youtube] start run=%s url=%s", analysis_run_id, url) await set_youtube_status(row_id, "processing") data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url) await save_youtube_raw_data(row_id, data) logger.info("[youtube] done run=%s", analysis_run_id) async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> None: logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url) await set_gangnam_unni_status(row_id, "processing") data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url) await save_gangnam_unni_raw_data(row_id, data) logger.info("[gangnam_unni] done run=%s", analysis_run_id) async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str) -> None: logger.info("[clinic] start run=%s url=%s", analysis_run_id, url) await execute("UPDATE hospital_baseinfo SET status = 'processing' WHERE hospital_id = %s", (hospital_id,)) data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url) await save_hospital_raw_data(hospital_id, data, analysis_run_id=analysis_run_id) logger.info("[clinic] done run=%s", analysis_run_id) async def collect_all( analysis_run_id: str, hospital_id: str, instagram_id: int | None = None, facebook_id: int | None = None, naver_blog_id: int | None = None, youtube_id: int | None = None, gangnam_unni_id: int | None = None, tiktok_url: str | None = None, instagram_en_url: str | None = None, facebook_en_url: str | None = None, kakao_talk_url: str | None = None, naver_cafe_url: str | None = None, ) -> None: async def _url(table: str, row_id: int) -> str: row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,)) return row["url"] if row else "" hospital = await fetchone("SELECT url FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,)) tasks = [collect_clinic_info(analysis_run_id, hospital_id, hospital["url"])] if instagram_id: tasks.append(collect_instagram(analysis_run_id, instagram_id, await _url("instagram_data", instagram_id))) if facebook_id: tasks.append(collect_facebook(analysis_run_id, facebook_id, await _url("facebook_data", facebook_id))) if naver_blog_id: tasks.append(collect_naver_blog(analysis_run_id, naver_blog_id, await _url("naver_blog_data", naver_blog_id))) if youtube_id: tasks.append(collect_youtube(analysis_run_id, youtube_id, await _url("youtube_data", youtube_id))) if gangnam_unni_id: tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id))) await asyncio.gather(*tasks, return_exceptions=True) # 아래 3단계는 모두 hospital raw_data를 read-modify-write 하므로 race 방지 위해 순차. # brand_assets : clinic_info가 채운 branding.logoUrl로 공식 로고/hex 추출 # extra_channels: 틱톡/인스타EN/페북EN 수집 # channel_logos : 공식 로고(brand_assets)+채널 profileImage(extra_channels) 채워진 뒤 Vision 비교 # 부가 기능이라 실패해도 리포트는 나와야 하므로 _run_optional_step으로 각각 격리. await _run_optional_step(collect_brand_assets(analysis_run_id, hospital_id), "brand_assets") await _run_optional_step( collect_extra_channels( analysis_run_id, hospital_id, tiktok_url=tiktok_url, instagram_en_url=instagram_en_url, facebook_en_url=facebook_en_url, kakao_talk_url=kakao_talk_url, naver_cafe_url=naver_cafe_url, ), "extra_channels", ) await _run_optional_step(collect_channel_logos(analysis_run_id, hospital_id), "channel_logos")