fix 유튜브 채널 수집 실패 버그

insta-data
jaehwang 2026-05-20 09:57:29 +09:00
parent 18d01357c0
commit 1f45b3e53d
4 changed files with 40 additions and 124 deletions

View File

@ -1,5 +1,4 @@
import os import os
import re
import asyncio import asyncio
from http import HTTPMethod from http import HTTPMethod
import httpx import httpx
@ -38,46 +37,3 @@ async def http_request(
print(f" [error] {label}{e}") print(f" [error] {label}{e}")
return None return None
return None return None
_SKIP_IG = {"p", "reel", "stories", "explore", "accounts", "about", "directory"}
_SKIP_FB = {"sharer", "share", "dialog", "plugins", "groups", "events", "watch", "help"}
def extract_social_handles(urls: list[str]) -> dict[str, list[str]]:
result: dict[str, list[str]] = {"instagram": [], "youtube": [], "facebook": [], "naver_blog": [], "tiktok": []}
for url in urls:
if not url:
continue
m = re.search(r"instagram\.com/([a-zA-Z0-9._]+)", url)
if m and m.group(1).lower() not in _SKIP_IG:
result["instagram"].append(m.group(1))
m = re.search(r"youtube\.com/(?:@([a-zA-Z0-9._-]+)|channel/(UC[a-zA-Z0-9_-]+)|c/([a-zA-Z0-9._-]+))", url)
if m:
result["youtube"].append(f"@{m.group(1)}" if m.group(1) else (m.group(2) or m.group(3) or ""))
m = re.search(r"facebook\.com/([a-zA-Z0-9._-]+)", url)
if m and m.group(1).lower() not in _SKIP_FB:
result["facebook"].append(m.group(1))
m = re.search(r"blog\.naver\.com/([a-zA-Z0-9_-]+)", url)
if m:
result["naver_blog"].append(m.group(1))
m = re.search(r"tiktok\.com/@([a-zA-Z0-9._-]+)", url)
if m:
result["tiktok"].append(m.group(1))
return {k: list(set(v)) for k, v in result.items()}
def normalize_handle(platform: str, value: str) -> str:
"""URL이 들어오면 핸들을 추출하고, 이미 핸들이면 그대로 반환."""
if not value:
return value
if "://" in value or value.startswith("www."):
handles = extract_social_handles([value]).get(platform, [])
value = handles[0] if handles else value
return value.lstrip("@") if platform != "youtube" else value

View File

@ -38,8 +38,8 @@ class ApifyClient:
items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12}) items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12})
return items[0] if items else None return items[0] if items else None
async def get_instagram_profile(self, handle: str) -> dict | None: async def get_instagram_profile(self, url: str) -> dict | None:
profile = await self.fetch_instagram_profile(handle) profile = await self.fetch_instagram_profile(url)
if not profile or profile.get("error"): if not profile or profile.get("error"):
return None return None
return { return {
@ -62,15 +62,15 @@ class ApifyClient:
], ],
} }
async def fetch_instagram_posts(self, handle: str, limit: int = 20) -> list[dict]: async def fetch_instagram_posts(self, url: str, limit: int = 20) -> list[dict]:
clean = handle.lstrip("@") username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
return await self._run_actor("apify~instagram-post-scraper", { return await self._run_actor("apify~instagram-post-scraper", {
"directUrls": [f"https://www.instagram.com/{clean}/"], "directUrls": [f"https://www.instagram.com/{username}/"],
"resultsLimit": limit, "resultsLimit": limit,
}) })
async def get_instagram_posts(self, handle: str, limit: int = 20) -> dict: async def get_instagram_posts(self, url: str, limit: int = 20) -> dict:
items = await self.fetch_instagram_posts(handle, limit) items = await self.fetch_instagram_posts(url, limit)
posts = [ posts = [
{ {
"id": p["id"], "id": p["id"],
@ -92,15 +92,15 @@ class ApifyClient:
"avgComments": round(sum(p["commentsCount"] for p in posts) / n), "avgComments": round(sum(p["commentsCount"] for p in posts) / n),
} }
async def fetch_instagram_reels(self, handle: str, limit: int = 15) -> list[dict]: async def fetch_instagram_reels(self, url: str, limit: int = 15) -> list[dict]:
clean = handle.lstrip("@") username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
return await self._run_actor("apify~instagram-reel-scraper", { return await self._run_actor("apify~instagram-reel-scraper", {
"directUrls": [f"https://www.instagram.com/{clean}/reels/"], "directUrls": [f"https://www.instagram.com/{username}/reels/"],
"resultsLimit": limit, "resultsLimit": limit,
}) })
async def get_instagram_reels(self, handle: str, limit: int = 15) -> dict: async def get_instagram_reels(self, url: str, limit: int = 15) -> dict:
items = await self.fetch_instagram_reels(handle, limit) items = await self.fetch_instagram_reels(url, limit)
reels = [ reels = [
{ {
"id": r["id"], "id": r["id"],

View File

@ -1,5 +1,4 @@
from http import HTTPMethod from http import HTTPMethod
from urllib.parse import urlparse
from common.utils import http_request from common.utils import http_request
YT = "https://www.googleapis.com/youtube/v3" YT = "https://www.googleapis.com/youtube/v3"
@ -9,25 +8,40 @@ class YouTubeClient:
def __init__(self, api_key: str): def __init__(self, api_key: str):
self.api_key = api_key self.api_key = api_key
async def _resolve_channel_id(self, handle: str) -> str: async def _resolve_channel_id(self, url: str) -> str:
h = urlparse(handle).path.strip("/").lstrip("@") if "://" in handle else handle.lstrip("@") print("input yt url : ", url)
if h.startswith("UC") and len(h) == 24:
return h # /channel/UCxxxxx → 채널 ID 직접 반환
if "/channel/" in url:
return url.split("/channel/")[1].split("/")[0]
if "/@" in url:
val = "@" + url.split("/@")[1].split("/")[0]
elif "/c/" in url:
val = url.split("/c/")[1].split("/")[0]
elif "/user/" in url:
val = url.split("/user/")[1].split("/")[0]
elif url.startswith("UC") and len(url) == 24:
return url
else:
val = url
print("val : ", val)
for param in ("forHandle", "forUsername"): for param in ("forHandle", "forUsername"):
resp = await http_request( resp = await http_request(
HTTPMethod.GET, HTTPMethod.GET,
url=f"{YT}/channels", url=f"{YT}/channels",
params={"part": "id", param: h, "key": self.api_key}, params={"part": "id", param: val, "key": self.api_key},
label="yt-resolve", label="yt-resolve",
) )
if resp and resp.is_success: if resp and resp.is_success and (items := resp.json().get("items", [])):
items = resp.json().get("items", []) print("items : ", items)
if items: return items[0]["id"]
return items[0]["id"] print("YT NOT FOUND")
return "" return ""
async def fetch_channel(self, handle_or_id: str) -> dict | None: async def fetch_channel(self, url: str) -> dict | None:
channel_id = await self._resolve_channel_id(handle_or_id) channel_id = await self._resolve_channel_id(url)
if not channel_id: if not channel_id:
return None return None
@ -67,8 +81,8 @@ class YouTubeClient:
return {"channelId": channel_id, "channel": channel, "videos": videos} return {"channelId": channel_id, "channel": channel, "videos": videos}
async def get_channel(self, handle_or_id: str) -> dict | None: async def get_channel(self, url: str) -> dict | None:
raw = await self.fetch_channel(handle_or_id) raw = await self.fetch_channel(url)
if not raw: if not raw:
return None return None
ch = raw["channel"] ch = raw["channel"]

View File

@ -1,54 +0,0 @@
import asyncio
import json
import os
from dotenv import load_dotenv
load_dotenv("../.env")
from common.utils import normalize_handle
from integrations.youtube import YouTubeClient
from integrations.apify import ApifyClient
from integrations.naver import NaverClient
from integrations.firecrawl import FirecrawlClient
INPUT = {
"youtube": "@banobagips",
"instagram": ["@banobagi_ps"],
"facebook": "BanobagiPlasticSurgery",
"naver_blog": "https://blog.naver.com/banobagiprs",
"gangnam_unni": "https://www.gangnamunni.com/hospitals/23",
}
OUT_DIR = "../test_results"
def save(name: str, data) -> None:
os.makedirs(OUT_DIR, exist_ok=True)
path = os.path.join(OUT_DIR, f"{name}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
print(f"saved → {path}")
async def main():
yt = YouTubeClient(api_key=os.environ["YOUTUBE_API_KEY"])
apify = ApifyClient(token=os.environ["APIFY_API_TOKEN"])
naver = NaverClient(client_id=os.environ["NAVER_CLIENT_ID"], client_secret=os.environ["NAVER_CLIENT_SECRET"])
firecrawl = FirecrawlClient(api_key=os.environ["FIRECRAWL_API_KEY"])
yt_handle = normalize_handle("youtube", INPUT["youtube"])
ig_handle = normalize_handle("instagram", INPUT["instagram"][0])
fb_handle = normalize_handle("facebook", INPUT["facebook"])
naver_handle = normalize_handle("naver_blog", INPUT["naver_blog"])
save("youtube", await yt.fetch_channel(yt_handle))
save("instagram_profile", await apify.fetch_instagram_profile(ig_handle))
# save("instagram_posts", await apify.fetch_instagram_posts(ig_handle))
# save("instagram_reels", await apify.fetch_instagram_reels(ig_handle))
save("facebook", await apify.fetch_facebook_page(f"https://www.facebook.com/{fb_handle}"))
save("naver_blog", await naver.fetch_blog_rss(naver_handle))
save("gangnam_unni", await firecrawl.fetch_gangnam_unni(INPUT["gangnam_unni"]))
asyncio.run(main())