o2o-infinith-backend/app/services/collect.py

111 lines
4.8 KiB
Python

import asyncio
import logging
from common.db.hospital import update_hospital_status, update_hospital
from common.db.source import select_run_sources, update_raw_info_status, update_raw_info
from common.utils import get_env
from integrations.apify import ApifyClient
from integrations.naver import NaverClient
from integrations.youtube import YouTubeClient
from integrations.firecrawl import FirecrawlClient
from models.status import SourceType
logger = logging.getLogger(__name__)
async def collect_instagram(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[instagram] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[instagram] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[instagram] done run=%s", analysis_run_id)
async def collect_facebook(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[facebook] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[facebook] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[facebook] done run=%s", analysis_run_id)
async def collect_naver_blog(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[naver_blog] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[naver_blog] done run=%s", analysis_run_id)
async def collect_youtube(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[youtube] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[youtube] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[youtube] done run=%s", analysis_run_id)
async def collect_gangnam_unni(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[gangnam_unni] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[gangnam_unni] done run=%s", analysis_run_id)
async def collect_mainpage(analysis_run_id: str, info_id: int, hospital_id: str, url: str) -> None:
logger.info("[mainpage] start run=%s url=%s", analysis_run_id, url)
await update_raw_info_status(info_id, "processing")
await update_hospital_status(hospital_id, "processing")
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[mainpage] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
await update_hospital(hospital_id, data, analysis_run_id=analysis_run_id)
logger.info("[mainpage] done run=%s", analysis_run_id)
async def collect_all(analysis_run_id: str, hospital_id: str) -> None:
rows = await select_run_sources(analysis_run_id)
_collectors = {
SourceType.INSTAGRAM: collect_instagram,
SourceType.FACEBOOK: collect_facebook,
SourceType.NAVER_BLOG: collect_naver_blog,
SourceType.YOUTUBE: collect_youtube,
SourceType.GANGNAM_UNNI: collect_gangnam_unni,
}
tasks = []
for row in rows:
info_id = row["info_id"]
source_type = row["source_type"]
url = row["url"]
if source_type == SourceType.MAINPAGE:
tasks.append(collect_mainpage(analysis_run_id, info_id, hospital_id, url))
elif source_type in _collectors:
tasks.append(_collectors[source_type](analysis_run_id, info_id, url))
await asyncio.gather(*tasks, return_exceptions=True)