o2o-infinith-backend/app/integrations/apify.py

192 lines
8.0 KiB
Python

from http import HTTPMethod
from urllib.parse import urlparse
from common.utils import http_request
APIFY_BASE = "https://api.apify.com/v2"
class ApifyClient:
def __init__(self, token: str, wait_for_finish: int = 120):
self.token = token
self.wait_for_finish = wait_for_finish
async def _run_actor(self, actor_id: str, input_data: dict) -> list[dict]:
resp = await http_request(
HTTPMethod.POST,
url=f"{APIFY_BASE}/acts/{actor_id}/runs",
params={"token": self.token, "waitForFinish": self.wait_for_finish},
headers={"Content-Type": "application/json"},
json_body=input_data,
timeout=self.wait_for_finish + 10,
label=f"apify:{actor_id.split('~')[-1]}",
)
if not resp or not resp.is_success:
return []
dataset_id = resp.json()["data"]["defaultDatasetId"]
items_resp = await http_request(
HTTPMethod.GET,
url=f"{APIFY_BASE}/datasets/{dataset_id}/items",
params={"token": self.token, "limit": 20},
label=f"apify-dataset-{dataset_id}",
)
if not items_resp or not items_resp.is_success:
return []
return items_resp.json()
async def fetch_instagram_profile(self, url: str) -> dict | None:
username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12})
return items[0] if items else None
async def get_instagram_profile(self, url: str) -> dict | None:
profile = await self.fetch_instagram_profile(url)
if not profile or profile.get("error"):
return None
return {
"username": profile["username"],
"profileImage": profile.get("profilePicUrlHD") or profile.get("profilePicUrl"),
"followers": profile.get("followersCount", 0),
"following": profile.get("followsCount", 0),
"posts": profile.get("postsCount", 0),
"bio": profile.get("biography", ""),
"isBusinessAccount": profile.get("isBusinessAccount", False),
#"externalUrl": profile.get("externalUrl"), LLM에 혼동을 주는 듯 하여 비활성화
"latestPosts": [
{
"type": p.get("type"),
"likes": p.get("likesCount", 0),
"comments": p.get("commentsCount", 0),
"caption": (p.get("caption") or "")[:500],
"timestamp": p.get("timestamp"),
}
for p in (profile.get("latestPosts") or [])[:12]
],
}
async def fetch_instagram_posts(self, url: str, limit: int = 20) -> list[dict]:
username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
return await self._run_actor("apify~instagram-post-scraper", {
"directUrls": [f"https://www.instagram.com/{username}/"],
"resultsLimit": limit,
})
async def get_instagram_posts(self, url: str, limit: int = 20) -> dict:
items = await self.fetch_instagram_posts(url, limit)
posts = [
{
"id": p["id"],
"type": p.get("type"),
"url": p.get("url"),
"caption": (p.get("caption") or "")[:500],
"hashtags": p.get("hashtags", []),
"likesCount": p.get("likesCount", 0),
"commentsCount": p.get("commentsCount", 0),
"timestamp": p.get("timestamp"),
}
for p in items
]
n = len(posts) or 1
return {
"posts": posts,
"totalPosts": len(posts),
"avgLikes": round(sum(p["likesCount"] for p in posts) / n),
"avgComments": round(sum(p["commentsCount"] for p in posts) / n),
}
async def fetch_instagram_reels(self, url: str, limit: int = 15) -> list[dict]:
username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
return await self._run_actor("apify~instagram-reel-scraper", {
"directUrls": [f"https://www.instagram.com/{username}/reels/"],
"resultsLimit": limit,
})
async def get_instagram_reels(self, url: str, limit: int = 15) -> dict:
items = await self.fetch_instagram_reels(url, limit)
reels = [
{
"id": r["id"],
"url": r.get("url"),
"caption": (r.get("caption") or "")[:500],
"hashtags": r.get("hashtags", []),
"likesCount": r.get("likesCount", 0),
"commentsCount": r.get("commentsCount", 0),
"videoViewCount": r.get("videoViewCount", 0),
"videoPlayCount": r.get("videoPlayCount", 0),
"videoDuration": r.get("videoDuration", 0),
"timestamp": r.get("timestamp"),
}
for r in items
]
n = len(reels) or 1
return {
"reels": reels,
"totalReels": len(reels),
"avgViews": round(sum(r["videoViewCount"] for r in reels) / n),
"avgPlays": round(sum(r["videoPlayCount"] for r in reels) / n),
}
async def fetch_facebook_page(self, page_url: str) -> dict | None:
items = await self._run_actor("apify~facebook-pages-scraper", {"startUrls": [{"url": page_url}]})
return items[0] if items else None
async def get_facebook_page(self, page_url: str) -> dict | None:
page = await self.fetch_facebook_page(page_url)
if not page:
return None
return {
"pageName": page.get("title") or page.get("name"),
"profileImage": page.get("profilePictureUrl") or page.get("profilePhoto") or page.get("profilePic"),
"pageUrl": page.get("pageUrl", page_url),
"followers": page.get("followers", 0),
"likes": page.get("likes", 0),
"categories": page.get("categories", []),
"email": page.get("email"),
"phone": page.get("phone"),
"website": page.get("website"),
"address": page.get("address"),
"intro": page.get("intro"),
"rating": page.get("rating"),
}
async def fetch_tiktok_profile(self, url: str) -> list[dict]:
user = urlparse(url).path.strip("/").lstrip("@").split("/")[0] if "://" in url else url.lstrip("@")
return await self._run_actor("clockworks~tiktok-scraper", {
"profiles": [user],
"resultsPerPage": 10,
"profileScrapeSections": ["videos"],
"profileSorting": "latest",
"shouldDownloadVideos": False,
"shouldDownloadCovers": False,
"shouldDownloadSubtitles": False,
})
async def get_tiktok_profile(self, url: str) -> dict | None:
items = await self.fetch_tiktok_profile(url)
if not items:
return None
author = (items[0] or {}).get("authorMeta") or {}
videos = [
{
"title": (v.get("text") or "")[:300],
"playCount": v.get("playCount", 0),
"diggCount": v.get("diggCount", 0),
"commentCount": v.get("commentCount", 0),
"shareCount": v.get("shareCount", 0),
"createTime": v.get("createTimeISO"),
"url": v.get("webVideoUrl"),
}
for v in items if isinstance(v, dict)
]
return {
"handle": author.get("name"),
"profileImage": author.get("avatar"),
"nickname": author.get("nickName"),
"followers": author.get("fans", 0),
"following": author.get("following", 0),
"likes": author.get("heart", 0),
"videoCount": author.get("video", 0),
"verified": author.get("verified", False),
"bio": author.get("signature", ""),
"recentVideos": videos[:10],
}