207 lines
8.9 KiB
Python
207 lines
8.9 KiB
Python
import asyncio
|
|
from http import HTTPMethod
|
|
from urllib.parse import urlparse
|
|
from common.utils import http_request
|
|
|
|
APIFY_BASE = "https://api.apify.com/v2"
|
|
|
|
# Instagram: profile + highlights 두 actor 직접 호출.
|
|
IG_PROFILE_ACTOR = "coderx~instagram-profile-scraper-bio-posts"
|
|
IG_HIGHLIGHTS_ACTOR = "igview-owner~instagram-highlights-scraper"
|
|
|
|
# Facebook: pages + posts 두 actor 직접 호출.
|
|
FB_PAGES_ACTOR = "apify~facebook-pages-scraper"
|
|
FB_POSTS_ACTOR = "apify~facebook-posts-scraper"
|
|
|
|
|
|
def _ig_username(url: str) -> str:
|
|
return urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
|
|
|
|
|
|
class ApifyClient:
|
|
def __init__(self, token: str, wait_for_finish: int = 120):
|
|
self.token = token
|
|
self.wait_for_finish = wait_for_finish
|
|
|
|
async def _run_actor(self, actor_id: str, input_data: dict, limit: int = 20) -> list[dict]:
|
|
resp = await http_request(
|
|
HTTPMethod.POST,
|
|
url=f"{APIFY_BASE}/acts/{actor_id}/runs",
|
|
params={"token": self.token, "waitForFinish": self.wait_for_finish},
|
|
headers={"Content-Type": "application/json"},
|
|
json_body=input_data,
|
|
timeout=self.wait_for_finish + 10,
|
|
label=f"apify:{actor_id.split('~')[-1]}",
|
|
)
|
|
if not resp or not resp.is_success:
|
|
return []
|
|
dataset_id = resp.json()["data"]["defaultDatasetId"]
|
|
items_resp = await http_request(
|
|
HTTPMethod.GET,
|
|
url=f"{APIFY_BASE}/datasets/{dataset_id}/items",
|
|
params={"token": self.token, "limit": limit},
|
|
label=f"apify-dataset-{dataset_id}",
|
|
)
|
|
if not items_resp or not items_resp.is_success:
|
|
return []
|
|
return items_resp.json()
|
|
|
|
async def fetch_instagram_profile(self, username: str) -> dict | None:
|
|
items = await self._run_actor(IG_PROFILE_ACTOR, {"usernames": [username]})
|
|
return items[0] if items else None
|
|
|
|
async def fetch_instagram_highlights(self, username: str) -> list[dict]:
|
|
return await self._run_actor(IG_HIGHLIGHTS_ACTOR, {"usernames": [username]})
|
|
|
|
async def get_instagram_profile(self, url: str) -> dict | None:
|
|
username = _ig_username(url)
|
|
# profile·highlights 두 actor를 병렬 호출 (highlights 실패해도 profile만 있으면 진행)
|
|
profile, highlights = await asyncio.gather(
|
|
self.fetch_instagram_profile(username),
|
|
self.fetch_instagram_highlights(username),
|
|
return_exceptions=True,
|
|
)
|
|
if isinstance(profile, Exception) or not profile or profile.get("error"):
|
|
return None
|
|
if isinstance(highlights, Exception):
|
|
highlights = []
|
|
return {
|
|
"username": profile["username"],
|
|
"profileImage": profile.get("hdProfilePicUrl") or profile.get("profilePicUrl"),
|
|
"followers": profile.get("followersCount", 0),
|
|
"following": profile.get("followsCount", 0),
|
|
"posts": profile.get("postsCount", 0),
|
|
"bio": profile.get("biography", ""),
|
|
"category": profile.get("businessCategoryName") or "",
|
|
"isBusinessAccount": profile.get("isBusinessAccount", False),
|
|
#"externalUrl": profile.get("externalUrl"), LLM에 혼동을 주는 듯 하여 비활성화
|
|
"highlights": [h["highlightTitle"] for h in (highlights or []) if isinstance(h, dict) and h.get("highlightTitle")],
|
|
"latestPosts": [
|
|
{
|
|
"type": p.get("mediaType") or p.get("type"),
|
|
"likes": p.get("likesCount", 0),
|
|
"comments": p.get("commentsCount", 0),
|
|
"caption": (p.get("caption") or "")[:500],
|
|
"timestamp": p.get("timestamp"),
|
|
}
|
|
for p in (profile.get("latestPosts") or [])[:12]
|
|
],
|
|
}
|
|
|
|
# 인스타 post 스크래퍼는 현재 파이프라인 미사용 — 비활성화 (필요 시 복구)
|
|
# async def fetch_instagram_posts(self, url: str, limit: int = 20) -> list[dict]:
|
|
# username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
|
|
# return await self._run_actor("apify~instagram-post-scraper", {
|
|
# "directUrls": [f"https://www.instagram.com/{username}/"],
|
|
# "resultsLimit": limit,
|
|
# })
|
|
#
|
|
# async def get_instagram_posts(self, url: str, limit: int = 20) -> dict:
|
|
# items = await self.fetch_instagram_posts(url, limit)
|
|
# posts = [
|
|
# {
|
|
# "id": p["id"],
|
|
# "type": p.get("type"),
|
|
# "url": p.get("url"),
|
|
# "caption": (p.get("caption") or "")[:500],
|
|
# "hashtags": p.get("hashtags", []),
|
|
# "likesCount": p.get("likesCount", 0),
|
|
# "commentsCount": p.get("commentsCount", 0),
|
|
# "timestamp": p.get("timestamp"),
|
|
# }
|
|
# for p in items
|
|
# ]
|
|
# n = len(posts) or 1
|
|
# return {
|
|
# "posts": posts,
|
|
# "totalPosts": len(posts),
|
|
# "avgLikes": round(sum(p["likesCount"] for p in posts) / n),
|
|
# "avgComments": round(sum(p["commentsCount"] for p in posts) / n),
|
|
# }
|
|
|
|
async def fetch_facebook_page(self, page_url: str) -> dict | None:
|
|
items = await self._run_actor(FB_PAGES_ACTOR, {"startUrls": [{"url": page_url}]})
|
|
return items[0] if items else None
|
|
|
|
async def fetch_facebook_posts(self, page_url: str, limit: int = 20) -> list[dict]:
|
|
return await self._run_actor(
|
|
FB_POSTS_ACTOR, {"startUrls": [{"url": page_url}], "resultsLimit": limit}, limit=limit,
|
|
)
|
|
|
|
async def get_facebook_page(self, page_url: str) -> dict | None:
|
|
# pages·posts 두 task 병렬 호출 (posts 실패해도 page만 있으면 진행)
|
|
page, posts = await asyncio.gather(
|
|
self.fetch_facebook_page(page_url),
|
|
self.fetch_facebook_posts(page_url),
|
|
return_exceptions=True,
|
|
)
|
|
if isinstance(page, Exception) or not page:
|
|
return None
|
|
if isinstance(posts, Exception):
|
|
posts = []
|
|
return {
|
|
"pageName": page.get("title") or page.get("name"),
|
|
"profileImage": page.get("profilePictureUrl") or page.get("profilePhoto") or page.get("profilePic"),
|
|
"pageUrl": page.get("pageUrl", page_url),
|
|
"followers": page.get("followers", 0),
|
|
"following": page.get("followings", 0),
|
|
"reviews": page.get("ratingCount", 0),
|
|
"categories": page.get("categories", []),
|
|
"website": page.get("website") or page.get("websites"),
|
|
"intro": page.get("intro"),
|
|
"latestPosts": [
|
|
{
|
|
"text": (p.get("text") or "")[:160],
|
|
"likes": p.get("likes", 0),
|
|
"reactions": p.get("topReactionsCount", 0),
|
|
"shares": p.get("shares", 0),
|
|
"views": p.get("viewsCount") or 0,
|
|
"isVideo": p.get("isVideo", False),
|
|
"timestamp": p.get("time") or p.get("timestamp"),
|
|
}
|
|
for p in (posts or []) if isinstance(p, dict)
|
|
],
|
|
}
|
|
|
|
async def fetch_tiktok_profile(self, url: str) -> list[dict]:
|
|
user = urlparse(url).path.strip("/").lstrip("@").split("/")[0] if "://" in url else url.lstrip("@")
|
|
return await self._run_actor("clockworks~tiktok-scraper", {
|
|
"profiles": [user],
|
|
"resultsPerPage": 10,
|
|
"profileScrapeSections": ["videos"],
|
|
"profileSorting": "latest",
|
|
"shouldDownloadVideos": False,
|
|
"shouldDownloadCovers": False,
|
|
"shouldDownloadSubtitles": False,
|
|
})
|
|
|
|
async def get_tiktok_profile(self, url: str) -> dict | None:
|
|
items = await self.fetch_tiktok_profile(url)
|
|
if not items:
|
|
return None
|
|
author = (items[0] or {}).get("authorMeta") or {}
|
|
videos = [
|
|
{
|
|
"title": (v.get("text") or "")[:300],
|
|
"playCount": v.get("playCount", 0),
|
|
"diggCount": v.get("diggCount", 0),
|
|
"commentCount": v.get("commentCount", 0),
|
|
"shareCount": v.get("shareCount", 0),
|
|
"createTime": v.get("createTimeISO"),
|
|
"url": v.get("webVideoUrl"),
|
|
}
|
|
for v in items if isinstance(v, dict)
|
|
]
|
|
return {
|
|
"handle": author.get("name"),
|
|
"profileImage": author.get("avatar"),
|
|
"nickname": author.get("nickName"),
|
|
"followers": author.get("fans", 0),
|
|
"following": author.get("following", 0),
|
|
"likes": author.get("heart", 0),
|
|
"videoCount": author.get("video", 0),
|
|
"verified": author.get("verified", False),
|
|
"bio": author.get("signature", ""),
|
|
"recentVideos": videos[:10],
|
|
}
|