From 1f45b3e53d5c46db508005e5dc3d061c334b88b9 Mon Sep 17 00:00:00 2001 From: jaehwang Date: Wed, 20 May 2026 09:57:29 +0900 Subject: [PATCH] =?UTF-8?q?fix=20=EC=9C=A0=ED=8A=9C=EB=B8=8C=20=EC=B1=84?= =?UTF-8?q?=EB=84=90=20=EC=88=98=EC=A7=91=20=EC=8B=A4=ED=8C=A8=20=EB=B2=84?= =?UTF-8?q?=EA=B7=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/common/utils.py | 44 ------------------------------ app/integrations/apify.py | 24 ++++++++--------- app/integrations/youtube.py | 42 +++++++++++++++++++---------- app/test_fetch.py | 54 ------------------------------------- 4 files changed, 40 insertions(+), 124 deletions(-) delete mode 100644 app/test_fetch.py diff --git a/app/common/utils.py b/app/common/utils.py index d7c4b21..aca1d13 100644 --- a/app/common/utils.py +++ b/app/common/utils.py @@ -1,5 +1,4 @@ import os -import re import asyncio from http import HTTPMethod import httpx @@ -38,46 +37,3 @@ async def http_request( print(f" [error] {label} → {e}") return None return None - - -_SKIP_IG = {"p", "reel", "stories", "explore", "accounts", "about", "directory"} -_SKIP_FB = {"sharer", "share", "dialog", "plugins", "groups", "events", "watch", "help"} - - -def extract_social_handles(urls: list[str]) -> dict[str, list[str]]: - result: dict[str, list[str]] = {"instagram": [], "youtube": [], "facebook": [], "naver_blog": [], "tiktok": []} - - for url in urls: - if not url: - continue - m = re.search(r"instagram\.com/([a-zA-Z0-9._]+)", url) - if m and m.group(1).lower() not in _SKIP_IG: - result["instagram"].append(m.group(1)) - - m = re.search(r"youtube\.com/(?:@([a-zA-Z0-9._-]+)|channel/(UC[a-zA-Z0-9_-]+)|c/([a-zA-Z0-9._-]+))", url) - if m: - result["youtube"].append(f"@{m.group(1)}" if m.group(1) else (m.group(2) or m.group(3) or "")) - - m = re.search(r"facebook\.com/([a-zA-Z0-9._-]+)", url) - if m and m.group(1).lower() not in _SKIP_FB: - result["facebook"].append(m.group(1)) - - m = re.search(r"blog\.naver\.com/([a-zA-Z0-9_-]+)", url) - if m: - result["naver_blog"].append(m.group(1)) - - m = re.search(r"tiktok\.com/@([a-zA-Z0-9._-]+)", url) - if m: - result["tiktok"].append(m.group(1)) - - return {k: list(set(v)) for k, v in result.items()} - - -def normalize_handle(platform: str, value: str) -> str: - """URL이 들어오면 핸들을 추출하고, 이미 핸들이면 그대로 반환.""" - if not value: - return value - if "://" in value or value.startswith("www."): - handles = extract_social_handles([value]).get(platform, []) - value = handles[0] if handles else value - return value.lstrip("@") if platform != "youtube" else value diff --git a/app/integrations/apify.py b/app/integrations/apify.py index 4eea74e..117318e 100644 --- a/app/integrations/apify.py +++ b/app/integrations/apify.py @@ -38,8 +38,8 @@ class ApifyClient: items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12}) return items[0] if items else None - async def get_instagram_profile(self, handle: str) -> dict | None: - profile = await self.fetch_instagram_profile(handle) + async def get_instagram_profile(self, url: str) -> dict | None: + profile = await self.fetch_instagram_profile(url) if not profile or profile.get("error"): return None return { @@ -62,15 +62,15 @@ class ApifyClient: ], } - async def fetch_instagram_posts(self, handle: str, limit: int = 20) -> list[dict]: - clean = handle.lstrip("@") + async def fetch_instagram_posts(self, url: str, limit: int = 20) -> list[dict]: + username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@") return await self._run_actor("apify~instagram-post-scraper", { - "directUrls": [f"https://www.instagram.com/{clean}/"], + "directUrls": [f"https://www.instagram.com/{username}/"], "resultsLimit": limit, }) - async def get_instagram_posts(self, handle: str, limit: int = 20) -> dict: - items = await self.fetch_instagram_posts(handle, limit) + async def get_instagram_posts(self, url: str, limit: int = 20) -> dict: + items = await self.fetch_instagram_posts(url, limit) posts = [ { "id": p["id"], @@ -92,15 +92,15 @@ class ApifyClient: "avgComments": round(sum(p["commentsCount"] for p in posts) / n), } - async def fetch_instagram_reels(self, handle: str, limit: int = 15) -> list[dict]: - clean = handle.lstrip("@") + async def fetch_instagram_reels(self, url: str, limit: int = 15) -> list[dict]: + username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@") return await self._run_actor("apify~instagram-reel-scraper", { - "directUrls": [f"https://www.instagram.com/{clean}/reels/"], + "directUrls": [f"https://www.instagram.com/{username}/reels/"], "resultsLimit": limit, }) - async def get_instagram_reels(self, handle: str, limit: int = 15) -> dict: - items = await self.fetch_instagram_reels(handle, limit) + async def get_instagram_reels(self, url: str, limit: int = 15) -> dict: + items = await self.fetch_instagram_reels(url, limit) reels = [ { "id": r["id"], diff --git a/app/integrations/youtube.py b/app/integrations/youtube.py index edc1100..734f142 100644 --- a/app/integrations/youtube.py +++ b/app/integrations/youtube.py @@ -1,5 +1,4 @@ from http import HTTPMethod -from urllib.parse import urlparse from common.utils import http_request YT = "https://www.googleapis.com/youtube/v3" @@ -9,25 +8,40 @@ class YouTubeClient: def __init__(self, api_key: str): self.api_key = api_key - async def _resolve_channel_id(self, handle: str) -> str: - h = urlparse(handle).path.strip("/").lstrip("@") if "://" in handle else handle.lstrip("@") - if h.startswith("UC") and len(h) == 24: - return h + async def _resolve_channel_id(self, url: str) -> str: + print("input yt url : ", url) + + # /channel/UCxxxxx → 채널 ID 직접 반환 + if "/channel/" in url: + return url.split("/channel/")[1].split("/")[0] + + if "/@" in url: + val = "@" + url.split("/@")[1].split("/")[0] + elif "/c/" in url: + val = url.split("/c/")[1].split("/")[0] + elif "/user/" in url: + val = url.split("/user/")[1].split("/")[0] + elif url.startswith("UC") and len(url) == 24: + return url + else: + val = url + + print("val : ", val) for param in ("forHandle", "forUsername"): resp = await http_request( HTTPMethod.GET, url=f"{YT}/channels", - params={"part": "id", param: h, "key": self.api_key}, + params={"part": "id", param: val, "key": self.api_key}, label="yt-resolve", ) - if resp and resp.is_success: - items = resp.json().get("items", []) - if items: - return items[0]["id"] + if resp and resp.is_success and (items := resp.json().get("items", [])): + print("items : ", items) + return items[0]["id"] + print("YT NOT FOUND") return "" - async def fetch_channel(self, handle_or_id: str) -> dict | None: - channel_id = await self._resolve_channel_id(handle_or_id) + async def fetch_channel(self, url: str) -> dict | None: + channel_id = await self._resolve_channel_id(url) if not channel_id: return None @@ -67,8 +81,8 @@ class YouTubeClient: return {"channelId": channel_id, "channel": channel, "videos": videos} - async def get_channel(self, handle_or_id: str) -> dict | None: - raw = await self.fetch_channel(handle_or_id) + async def get_channel(self, url: str) -> dict | None: + raw = await self.fetch_channel(url) if not raw: return None ch = raw["channel"] diff --git a/app/test_fetch.py b/app/test_fetch.py deleted file mode 100644 index b51ae6d..0000000 --- a/app/test_fetch.py +++ /dev/null @@ -1,54 +0,0 @@ -import asyncio -import json -import os - -from dotenv import load_dotenv - -load_dotenv("../.env") - -from common.utils import normalize_handle -from integrations.youtube import YouTubeClient -from integrations.apify import ApifyClient -from integrations.naver import NaverClient -from integrations.firecrawl import FirecrawlClient - -INPUT = { - "youtube": "@banobagips", - "instagram": ["@banobagi_ps"], - "facebook": "BanobagiPlasticSurgery", - "naver_blog": "https://blog.naver.com/banobagiprs", - "gangnam_unni": "https://www.gangnamunni.com/hospitals/23", -} - -OUT_DIR = "../test_results" - - -def save(name: str, data) -> None: - os.makedirs(OUT_DIR, exist_ok=True) - path = os.path.join(OUT_DIR, f"{name}.json") - with open(path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False) - print(f"saved → {path}") - - -async def main(): - yt = YouTubeClient(api_key=os.environ["YOUTUBE_API_KEY"]) - apify = ApifyClient(token=os.environ["APIFY_API_TOKEN"]) - naver = NaverClient(client_id=os.environ["NAVER_CLIENT_ID"], client_secret=os.environ["NAVER_CLIENT_SECRET"]) - firecrawl = FirecrawlClient(api_key=os.environ["FIRECRAWL_API_KEY"]) - - yt_handle = normalize_handle("youtube", INPUT["youtube"]) - ig_handle = normalize_handle("instagram", INPUT["instagram"][0]) - fb_handle = normalize_handle("facebook", INPUT["facebook"]) - naver_handle = normalize_handle("naver_blog", INPUT["naver_blog"]) - - save("youtube", await yt.fetch_channel(yt_handle)) - save("instagram_profile", await apify.fetch_instagram_profile(ig_handle)) - # save("instagram_posts", await apify.fetch_instagram_posts(ig_handle)) - # save("instagram_reels", await apify.fetch_instagram_reels(ig_handle)) - save("facebook", await apify.fetch_facebook_page(f"https://www.facebook.com/{fb_handle}")) - save("naver_blog", await naver.fetch_blog_rss(naver_handle)) - save("gangnam_unni", await firecrawl.fetch_gangnam_unni(INPUT["gangnam_unni"])) - - -asyncio.run(main())