from http import HTTPMethod from common.utils import get_env, http_request FIRECRAWL_BASE = "https://api.firecrawl.dev/v1" class FirecrawlClient: def __init__(self, api_key: str): self.api_key = api_key def _headers(self) -> dict: return {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"} async def scrape(self, url: str, json_options: dict, wait_for: int = 5000) -> dict | None: resp = await http_request( HTTPMethod.POST, url=f"{FIRECRAWL_BASE}/scrape", headers=self._headers(), json_body={"url": url, "formats": ["json", "links"], "jsonOptions": json_options, "waitFor": wait_for}, label="firecrawl-scrape", ) if not resp or not resp.is_success: return None return resp.json().get("data") async def map(self, url: str, limit: int = 50) -> list[str]: resp = await http_request( HTTPMethod.POST, url=f"{FIRECRAWL_BASE}/map", headers=self._headers(), json_body={"url": url, "limit": limit}, label="firecrawl-map", ) if not resp or not resp.is_success: return [] return resp.json().get("links", []) async def search(self, query: str, limit: int = 5) -> list[dict]: resp = await http_request( HTTPMethod.POST, url=f"{FIRECRAWL_BASE}/search", headers=self._headers(), json_body={"query": query, "limit": limit}, label="firecrawl-search", ) if not resp or not resp.is_success: return [] return resp.json().get("data", []) async def fetch_social_buttons(self, url: str) -> list[dict]: data = await self.scrape(url, { "prompt": "Find ALL social media link URLs on this page — header, footer, sidebar, floating buttons. Extract actual href URLs for: Instagram, YouTube, Facebook, TikTok, Naver Blog, KakaoTalk.", "schema": { "type": "object", "properties": { "socialLinks": { "type": "array", "items": { "type": "object", "properties": {"platform": {"type": "string"}, "url": {"type": "string"}}, }, }, }, }, }) if not data: return [] return (data.get("json") or {}).get("socialLinks", []) async def fetch_gangnam_unni(self, hospital_url: str) -> dict | None: resp = await http_request( HTTPMethod.POST, url=f"{FIRECRAWL_BASE}/scrape", headers=self._headers(), json_body={ "url": hospital_url, "formats": ["json"], "jsonOptions": { "prompt": "Extract: hospital name, overall rating (out of 10), total review count, doctors with names/ratings/review counts/specialties, procedures, address, badges", "schema": { "type": "object", "properties": { "hospitalName": {"type": "string"}, "rating": {"type": "number"}, "totalReviews": {"type": "number"}, "doctors": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "rating": {"type": "number"}, "reviews": {"type": "number"}, "specialty": {"type": "string"}, }, }, }, "procedures": {"type": "array", "items": {"type": "string"}}, "address": {"type": "string"}, "badges": {"type": "array", "items": {"type": "string"}}, }, }, }, "waitFor": 5000, }, timeout=60, label="firecrawl-gangnamunni", ) if not resp or not resp.is_success: return None raw = (resp.json().get("data") or {}).get("json") return {"sourceUrl": hospital_url, **raw} if raw else None async def get_gangnam_unni(self, hospital_url: str) -> dict | None: raw = await self.fetch_gangnam_unni(hospital_url) if not raw or not raw.get("hospitalName"): return None return { "name": raw["hospitalName"], "rating": raw.get("rating"), "ratingScale": "/10", "totalReviews": raw.get("totalReviews", 0), "doctors": (raw.get("doctors") or [])[:10], "procedures": raw.get("procedures", []), "address": raw.get("address", ""), "badges": raw.get("badges", []), "sourceUrl": raw["sourceUrl"], }