import asyncio from http import HTTPMethod from urllib.parse import urlparse from common.utils import http_request APIFY_BASE = "https://api.apify.com/v2" # Instagram: profile + highlights 두 actor 직접 호출. IG_PROFILE_ACTOR = "coderx~instagram-profile-scraper-bio-posts" IG_HIGHLIGHTS_ACTOR = "igview-owner~instagram-highlights-scraper" def _ig_username(url: str) -> str: return urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@") class ApifyClient: def __init__(self, token: str, wait_for_finish: int = 120): self.token = token self.wait_for_finish = wait_for_finish async def _run_actor(self, actor_id: str, input_data: dict) -> list[dict]: resp = await http_request( HTTPMethod.POST, url=f"{APIFY_BASE}/acts/{actor_id}/runs", params={"token": self.token, "waitForFinish": self.wait_for_finish}, headers={"Content-Type": "application/json"}, json_body=input_data, timeout=self.wait_for_finish + 10, label=f"apify:{actor_id.split('~')[-1]}", ) if not resp or not resp.is_success: return [] dataset_id = resp.json()["data"]["defaultDatasetId"] items_resp = await http_request( HTTPMethod.GET, url=f"{APIFY_BASE}/datasets/{dataset_id}/items", params={"token": self.token, "limit": 20}, label=f"apify-dataset-{dataset_id}", ) if not items_resp or not items_resp.is_success: return [] return items_resp.json() async def fetch_instagram_profile(self, username: str) -> dict | None: items = await self._run_actor(IG_PROFILE_ACTOR, {"usernames": [username]}) return items[0] if items else None async def fetch_instagram_highlights(self, username: str) -> list[dict]: return await self._run_actor(IG_HIGHLIGHTS_ACTOR, {"usernames": [username]}) async def get_instagram_profile(self, url: str) -> dict | None: username = _ig_username(url) # profile·highlights 두 actor를 병렬 호출 (highlights 실패해도 profile만 있으면 진행) profile, highlights = await asyncio.gather( self.fetch_instagram_profile(username), self.fetch_instagram_highlights(username), return_exceptions=True, ) if isinstance(profile, Exception) or not profile or profile.get("error"): return None if isinstance(highlights, Exception): highlights = [] return { "username": profile["username"], "profileImage": profile.get("hdProfilePicUrl") or profile.get("profilePicUrl"), "followers": profile.get("followersCount", 0), "following": profile.get("followsCount", 0), "posts": profile.get("postsCount", 0), "bio": profile.get("biography", ""), "category": profile.get("businessCategoryName") or "", "isBusinessAccount": profile.get("isBusinessAccount", False), #"externalUrl": profile.get("externalUrl"), LLM에 혼동을 주는 듯 하여 비활성화 "highlights": [h["highlightTitle"] for h in (highlights or []) if isinstance(h, dict) and h.get("highlightTitle")], "latestPosts": [ { "type": p.get("mediaType") or p.get("type"), "likes": p.get("likesCount", 0), "comments": p.get("commentsCount", 0), "caption": (p.get("caption") or "")[:500], "timestamp": p.get("timestamp"), } for p in (profile.get("latestPosts") or [])[:12] ], } # 인스타 post 스크래퍼는 현재 파이프라인 미사용 — 비활성화 (필요 시 복구) # async def fetch_instagram_posts(self, url: str, limit: int = 20) -> list[dict]: # username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@") # return await self._run_actor("apify~instagram-post-scraper", { # "directUrls": [f"https://www.instagram.com/{username}/"], # "resultsLimit": limit, # }) # # async def get_instagram_posts(self, url: str, limit: int = 20) -> dict: # items = await self.fetch_instagram_posts(url, limit) # posts = [ # { # "id": p["id"], # "type": p.get("type"), # "url": p.get("url"), # "caption": (p.get("caption") or "")[:500], # "hashtags": p.get("hashtags", []), # "likesCount": p.get("likesCount", 0), # "commentsCount": p.get("commentsCount", 0), # "timestamp": p.get("timestamp"), # } # for p in items # ] # n = len(posts) or 1 # return { # "posts": posts, # "totalPosts": len(posts), # "avgLikes": round(sum(p["likesCount"] for p in posts) / n), # "avgComments": round(sum(p["commentsCount"] for p in posts) / n), # } async def fetch_facebook_page(self, page_url: str) -> dict | None: items = await self._run_actor("apify~facebook-pages-scraper", {"startUrls": [{"url": page_url}]}) return items[0] if items else None async def get_facebook_page(self, page_url: str) -> dict | None: page = await self.fetch_facebook_page(page_url) if not page: return None return { "pageName": page.get("title") or page.get("name"), "profileImage": page.get("profilePictureUrl") or page.get("profilePhoto") or page.get("profilePic"), "pageUrl": page.get("pageUrl", page_url), "followers": page.get("followers", 0), "likes": page.get("likes", 0), "categories": page.get("categories", []), "email": page.get("email"), "phone": page.get("phone"), "website": page.get("website"), "address": page.get("address"), "intro": page.get("intro"), "rating": page.get("rating"), } async def fetch_tiktok_profile(self, url: str) -> list[dict]: user = urlparse(url).path.strip("/").lstrip("@").split("/")[0] if "://" in url else url.lstrip("@") return await self._run_actor("clockworks~tiktok-scraper", { "profiles": [user], "resultsPerPage": 10, "profileScrapeSections": ["videos"], "profileSorting": "latest", "shouldDownloadVideos": False, "shouldDownloadCovers": False, "shouldDownloadSubtitles": False, }) async def get_tiktok_profile(self, url: str) -> dict | None: items = await self.fetch_tiktok_profile(url) if not items: return None author = (items[0] or {}).get("authorMeta") or {} videos = [ { "title": (v.get("text") or "")[:300], "playCount": v.get("playCount", 0), "diggCount": v.get("diggCount", 0), "commentCount": v.get("commentCount", 0), "shareCount": v.get("shareCount", 0), "createTime": v.get("createTimeISO"), "url": v.get("webVideoUrl"), } for v in items if isinstance(v, dict) ] return { "handle": author.get("name"), "profileImage": author.get("avatar"), "nickname": author.get("nickName"), "followers": author.get("fans", 0), "following": author.get("following", 0), "likes": author.get("heart", 0), "videoCount": author.get("video", 0), "verified": author.get("verified", False), "bio": author.get("signature", ""), "recentVideos": videos[:10], }