integration 1차 데이터 및 DB 정의, 테스트

upload
jaehwang 2026-04-24 14:19:29 +09:00
parent 23e859217b
commit d930679e90
16 changed files with 883 additions and 0 deletions

2
.gitignore vendored
View File

@ -42,3 +42,5 @@ Thumbs.db
# Alembic # Alembic
alembic/versions/*.pyc alembic/versions/*.pyc
test_results/

View File

@ -0,0 +1,15 @@
# o2o-infinith-backend
## 설치
**Docker**
```bash
curl -fsSL https://get.docker.com | sh
```
## 실행
```bash
docker compose up -d
```

100
SQL/db_create.sql Normal file
View File

@ -0,0 +1,100 @@
-- 테이블 순서는 관계를 고려하여 한 번에 실행해도 에러가 발생하지 않게 정렬되었습니다.
-- instagram_data Table Create SQL
-- 테이블 생성 SQL - instagram_data
CREATE TABLE instagram_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` INT NOT NULL,
`url` VARCHAR(500) NOT NULL,
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - instagram_data(hospital_id)
CREATE INDEX IX_instagram_data_1
ON instagram_data(hospital_id);
-- facebook_data Table Create SQL
-- 테이블 생성 SQL - facebook_data
CREATE TABLE facebook_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` INT NOT NULL,
`url` VARCHAR(500) NOT NULL,
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - facebook_data(hospital_id)
CREATE INDEX IX_facebook_data_1
ON facebook_data(hospital_id);
-- naver_blog_data Table Create SQL
-- 테이블 생성 SQL - naver_blog_data
CREATE TABLE naver_blog_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` INT NOT NULL,
`url` VARCHAR(500) NOT NULL,
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - naver_blog_data(hospital_id)
CREATE INDEX IX_naver_blog_data_1
ON naver_blog_data(hospital_id);
-- hospital_baseinfo Table Create SQL
-- 테이블 생성 SQL - hospital_baseinfo
CREATE TABLE hospital_baseinfo
(
`hospital_id` INT NOT NULL AUTO_INCREMENT,
`owner_user_id` INT NOT NULL,
`hospital_name` VARCHAR(50) NOT NULL,
`brn` VARCHAR(50) NOT NULL,
`road_address` VARCHAR(100) NULL,
`site_address` VARCHAR(100) NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (hospital_id)
);
-- Index 설정 SQL - hospital_baseinfo(owner_user_id)
CREATE INDEX IX_hospital_baseinfo_1
ON hospital_baseinfo(owner_user_id);
-- user_info Table Create SQL
-- 테이블 생성 SQL - user_info
CREATE TABLE user_info
(
`user_id` INT NOT NULL AUTO_INCREMENT,
`username` VARCHAR(50) NOT NULL,
`password` VARCHAR(50) NOT NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (user_id)
);
-- youtube_data Table Create SQL
CREATE TABLE youtube_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` INT NOT NULL,
`url` VARCHAR(500) NOT NULL,
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - youtube_data(hospital_id)
CREATE INDEX IX_youtube_data_1
ON youtube_data(hospital_id);

82
app/common/utils.py Normal file
View File

@ -0,0 +1,82 @@
import os
import re
import asyncio
from http import HTTPMethod
import httpx
REQUEST_TIMEOUT = 60
def get_env(key: str) -> str:
v = os.environ.get(key, "")
if not v:
raise EnvironmentError(f"Missing env: {key}")
return v
async def http_request(
method: HTTPMethod,
url: str,
*,
label: str,
headers: dict | None = None,
params: dict | None = None,
json_body: dict | None = None,
timeout: int = REQUEST_TIMEOUT,
max_retries: int = 0,
) -> httpx.Response | None:
async with httpx.AsyncClient() as client:
for attempt in range(max_retries + 1):
try:
resp = await client.request(method, url, headers=headers, params=params, json=json_body, timeout=timeout)
return resp
except httpx.RequestError as e:
if attempt < max_retries:
print(f" [retry] {label}{e}, attempt {attempt + 1}")
await asyncio.sleep((attempt + 1) * 2)
else:
print(f" [error] {label}{e}")
return None
return None
_SKIP_IG = {"p", "reel", "stories", "explore", "accounts", "about", "directory"}
_SKIP_FB = {"sharer", "share", "dialog", "plugins", "groups", "events", "watch", "help"}
def extract_social_handles(urls: list[str]) -> dict[str, list[str]]:
result: dict[str, list[str]] = {"instagram": [], "youtube": [], "facebook": [], "naver_blog": [], "tiktok": []}
for url in urls:
if not url:
continue
m = re.search(r"instagram\.com/([a-zA-Z0-9._]+)", url)
if m and m.group(1).lower() not in _SKIP_IG:
result["instagram"].append(m.group(1))
m = re.search(r"youtube\.com/(?:@([a-zA-Z0-9._-]+)|channel/(UC[a-zA-Z0-9_-]+)|c/([a-zA-Z0-9._-]+))", url)
if m:
result["youtube"].append(f"@{m.group(1)}" if m.group(1) else (m.group(2) or m.group(3) or ""))
m = re.search(r"facebook\.com/([a-zA-Z0-9._-]+)", url)
if m and m.group(1).lower() not in _SKIP_FB:
result["facebook"].append(m.group(1))
m = re.search(r"blog\.naver\.com/([a-zA-Z0-9_-]+)", url)
if m:
result["naver_blog"].append(m.group(1))
m = re.search(r"tiktok\.com/@([a-zA-Z0-9._-]+)", url)
if m:
result["tiktok"].append(m.group(1))
return {k: list(set(v)) for k, v in result.items()}
def normalize_handle(platform: str, value: str) -> str:
"""URL이 들어오면 핸들을 추출하고, 이미 핸들이면 그대로 반환."""
if not value:
return value
if "://" in value or value.startswith("www."):
handles = extract_social_handles([value]).get(platform, [])
value = handles[0] if handles else value
return value.lstrip("@") if platform != "youtube" else value

View File

145
app/integrations/apify.py Normal file
View File

@ -0,0 +1,145 @@
from http import HTTPMethod
from common.utils import http_request
APIFY_BASE = "https://api.apify.com/v2"
class ApifyClient:
def __init__(self, token: str, wait_for_finish: int = 120):
self.token = token
self.wait_for_finish = wait_for_finish
async def _run_actor(self, actor_id: str, input_data: dict) -> list[dict]:
resp = await http_request(
HTTPMethod.POST,
url=f"{APIFY_BASE}/acts/{actor_id}/runs",
params={"token": self.token, "waitForFinish": self.wait_for_finish},
headers={"Content-Type": "application/json"},
json_body=input_data,
timeout=self.wait_for_finish + 10,
label=f"apify:{actor_id.split('~')[-1]}",
)
if not resp or not resp.is_success:
return []
dataset_id = resp.json()["data"]["defaultDatasetId"]
items_resp = await http_request(
HTTPMethod.GET,
url=f"{APIFY_BASE}/datasets/{dataset_id}/items",
params={"token": self.token, "limit": 20},
label=f"apify-dataset-{dataset_id}",
)
if not items_resp or not items_resp.is_success:
return []
return items_resp.json()
async def fetch_instagram_profile(self, handle: str) -> dict | None:
items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [handle], "resultsLimit": 12})
return items[0] if items else None
async def get_instagram_profile(self, handle: str) -> dict | None:
profile = await self.fetch_instagram_profile(handle)
if not profile or profile.get("error"):
return None
return {
"username": profile["username"],
"followers": profile.get("followersCount", 0),
"following": profile.get("followsCount", 0),
"posts": profile.get("postsCount", 0),
"bio": profile.get("biography", ""),
"isBusinessAccount": profile.get("isBusinessAccount", False),
"externalUrl": profile.get("externalUrl"),
"latestPosts": [
{
"type": p.get("type"),
"likes": p.get("likesCount", 0),
"comments": p.get("commentsCount", 0),
"caption": (p.get("caption") or "")[:500],
"timestamp": p.get("timestamp"),
}
for p in (profile.get("latestPosts") or [])[:12]
],
}
async def fetch_instagram_posts(self, handle: str, limit: int = 20) -> list[dict]:
clean = handle.lstrip("@")
return await self._run_actor("apify~instagram-post-scraper", {
"directUrls": [f"https://www.instagram.com/{clean}/"],
"resultsLimit": limit,
})
async def get_instagram_posts(self, handle: str, limit: int = 20) -> dict:
items = await self.fetch_instagram_posts(handle, limit)
posts = [
{
"id": p["id"],
"type": p.get("type"),
"url": p.get("url"),
"caption": (p.get("caption") or "")[:500],
"hashtags": p.get("hashtags", []),
"likesCount": p.get("likesCount", 0),
"commentsCount": p.get("commentsCount", 0),
"timestamp": p.get("timestamp"),
}
for p in items
]
n = len(posts) or 1
return {
"posts": posts,
"totalPosts": len(posts),
"avgLikes": round(sum(p["likesCount"] for p in posts) / n),
"avgComments": round(sum(p["commentsCount"] for p in posts) / n),
}
async def fetch_instagram_reels(self, handle: str, limit: int = 15) -> list[dict]:
clean = handle.lstrip("@")
return await self._run_actor("apify~instagram-reel-scraper", {
"directUrls": [f"https://www.instagram.com/{clean}/reels/"],
"resultsLimit": limit,
})
async def get_instagram_reels(self, handle: str, limit: int = 15) -> dict:
items = await self.fetch_instagram_reels(handle, limit)
reels = [
{
"id": r["id"],
"url": r.get("url"),
"caption": (r.get("caption") or "")[:500],
"hashtags": r.get("hashtags", []),
"likesCount": r.get("likesCount", 0),
"commentsCount": r.get("commentsCount", 0),
"videoViewCount": r.get("videoViewCount", 0),
"videoPlayCount": r.get("videoPlayCount", 0),
"videoDuration": r.get("videoDuration", 0),
"timestamp": r.get("timestamp"),
}
for r in items
]
n = len(reels) or 1
return {
"reels": reels,
"totalReels": len(reels),
"avgViews": round(sum(r["videoViewCount"] for r in reels) / n),
"avgPlays": round(sum(r["videoPlayCount"] for r in reels) / n),
}
async def fetch_facebook_page(self, page_url: str) -> dict | None:
items = await self._run_actor("apify~facebook-pages-scraper", {"startUrls": [{"url": page_url}]})
return items[0] if items else None
async def get_facebook_page(self, page_url: str) -> dict | None:
page = await self.fetch_facebook_page(page_url)
if not page:
return None
return {
"pageName": page["title"],
"pageUrl": page.get("pageUrl", page_url),
"followers": page.get("followers", 0),
"likes": page.get("likes", 0),
"categories": page.get("categories", []),
"email": page.get("email"),
"phone": page.get("phone"),
"website": page.get("website"),
"address": page.get("address"),
"intro": page.get("intro"),
"rating": page.get("rating"),
}

View File

@ -0,0 +1,128 @@
from http import HTTPMethod
from common.utils import get_env, http_request
FIRECRAWL_BASE = "https://api.firecrawl.dev/v1"
class FirecrawlClient:
def __init__(self, api_key: str):
self.api_key = api_key
def _headers(self) -> dict:
return {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
async def scrape(self, url: str, json_options: dict, wait_for: int = 5000) -> dict | None:
resp = await http_request(
HTTPMethod.POST,
url=f"{FIRECRAWL_BASE}/scrape",
headers=self._headers(),
json_body={"url": url, "formats": ["json", "links"], "jsonOptions": json_options, "waitFor": wait_for},
label="firecrawl-scrape",
)
if not resp or not resp.is_success:
return None
return resp.json().get("data")
async def map(self, url: str, limit: int = 50) -> list[str]:
resp = await http_request(
HTTPMethod.POST,
url=f"{FIRECRAWL_BASE}/map",
headers=self._headers(),
json_body={"url": url, "limit": limit},
label="firecrawl-map",
)
if not resp or not resp.is_success:
return []
return resp.json().get("links", [])
async def search(self, query: str, limit: int = 5) -> list[dict]:
resp = await http_request(
HTTPMethod.POST,
url=f"{FIRECRAWL_BASE}/search",
headers=self._headers(),
json_body={"query": query, "limit": limit},
label="firecrawl-search",
)
if not resp or not resp.is_success:
return []
return resp.json().get("data", [])
async def fetch_social_buttons(self, url: str) -> list[dict]:
data = await self.scrape(url, {
"prompt": "Find ALL social media link URLs on this page — header, footer, sidebar, floating buttons. Extract actual href URLs for: Instagram, YouTube, Facebook, TikTok, Naver Blog, KakaoTalk.",
"schema": {
"type": "object",
"properties": {
"socialLinks": {
"type": "array",
"items": {
"type": "object",
"properties": {"platform": {"type": "string"}, "url": {"type": "string"}},
},
},
},
},
})
if not data:
return []
return (data.get("json") or {}).get("socialLinks", [])
async def fetch_gangnam_unni(self, hospital_url: str) -> dict | None:
resp = await http_request(
HTTPMethod.POST,
url=f"{FIRECRAWL_BASE}/scrape",
headers=self._headers(),
json_body={
"url": hospital_url,
"formats": ["json"],
"jsonOptions": {
"prompt": "Extract: hospital name, overall rating (out of 10), total review count, doctors with names/ratings/review counts/specialties, procedures, address, badges",
"schema": {
"type": "object",
"properties": {
"hospitalName": {"type": "string"},
"rating": {"type": "number"},
"totalReviews": {"type": "number"},
"doctors": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"rating": {"type": "number"},
"reviews": {"type": "number"},
"specialty": {"type": "string"},
},
},
},
"procedures": {"type": "array", "items": {"type": "string"}},
"address": {"type": "string"},
"badges": {"type": "array", "items": {"type": "string"}},
},
},
},
"waitFor": 5000,
},
timeout=60,
label="firecrawl-gangnamunni",
)
if not resp or not resp.is_success:
return None
raw = (resp.json().get("data") or {}).get("json")
return {"sourceUrl": hospital_url, **raw} if raw else None
async def get_gangnam_unni(self, hospital_url: str) -> dict | None:
raw = await self.fetch_gangnam_unni(hospital_url)
if not raw or not raw.get("hospitalName"):
return None
return {
"name": raw["hospitalName"],
"rating": raw.get("rating"),
"ratingScale": "/10",
"totalReviews": raw.get("totalReviews", 0),
"doctors": (raw.get("doctors") or [])[:10],
"procedures": raw.get("procedures", []),
"address": raw.get("address", ""),
"badges": raw.get("badges", []),
"sourceUrl": raw["sourceUrl"],
}

View File

@ -0,0 +1,61 @@
from http import HTTPMethod
from common.utils import http_request
PLACES_BASE = "https://places.googleapis.com/v1"
FIELD_MASK = ",".join([
"places.id", "places.displayName", "places.formattedAddress",
"places.rating", "places.userRatingCount",
"places.internationalPhoneNumber", "places.websiteUri",
"places.googleMapsUri", "places.primaryTypeDisplayName",
"places.regularOpeningHours", "places.reviews",
])
class GooglePlacesClient:
def __init__(self, api_key: str):
self.api_key = api_key
def _headers(self) -> dict:
return {
"Content-Type": "application/json",
"X-Goog-Api-Key": self.api_key,
"X-Goog-FieldMask": FIELD_MASK,
}
async def fetch_place(self, query: str) -> dict | None:
resp = await http_request(
HTTPMethod.POST,
url=f"{PLACES_BASE}/places:searchText",
headers=self._headers(),
json_body={"textQuery": query, "languageCode": "ko", "regionCode": "KR", "maxResultCount": 3},
timeout=15,
label="google-places",
)
if not resp or not resp.is_success:
return None
places = resp.json().get("places", [])
return places[0] if places else None
async def get_place(self, query: str) -> dict | None:
p = await self.fetch_place(query)
if not p:
return None
return {
"name": (p.get("displayName") or {}).get("text", ""),
"rating": p.get("rating"),
"reviewCount": p.get("userRatingCount", 0),
"address": p.get("formattedAddress", ""),
"phone": p.get("internationalPhoneNumber", ""),
"clinicWebsite": p.get("websiteUri", ""),
"mapsUrl": p.get("googleMapsUri", ""),
"placeId": p.get("id", ""),
"category": (p.get("primaryTypeDisplayName") or {}).get("text", ""),
"topReviews": [
{
"stars": r.get("rating", 0),
"text": ((r.get("text") or {}).get("text", ""))[:500],
"date": r.get("publishTime", ""),
}
for r in (p.get("reviews") or [])[:10]
],
}

View File

@ -0,0 +1,3 @@
from .service import LLMService
from .prompt import Prompt

View File

@ -0,0 +1,19 @@
from pydantic import BaseModel
class Prompt:
def __init__(
self,
template: str,
model: str,
input_class: type[BaseModel],
output_class: type[BaseModel],
):
self.template = template
self.model = model
self.input_class = input_class
self.output_class = output_class
def build(self, input_data: dict) -> str:
verified = self.input_class(**input_data)
return self.template.format(**verified.model_dump())

View File

View File

@ -0,0 +1,61 @@
from pydantic import BaseModel
from openai import AsyncOpenAI
from common.utils import get_env
from .prompt import Prompt
class LLMResponseError(Exception):
def __init__(self, status: str, code: str = None, message: str = None):
self.status = status
self.code = code
self.message = message
super().__init__(f"LLM response failed: status={status}, code={code}, message={message}")
class LLMService:
def __init__(self, provider: str = "openai", max_retries: int = 2):
self.max_retries = max_retries
match provider:
case "openai":
self.client = AsyncOpenAI(api_key=get_env("OPENAI_API_KEY"))
case "perplexity":
self.client = AsyncOpenAI(
api_key=get_env("PERPLEXITY_API_KEY"),
base_url="https://api.perplexity.ai",
)
case "gemini":
self.client = AsyncOpenAI(
api_key=get_env("GEMINI_API_KEY"),
base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
)
case _:
raise NotImplementedError(f"Unknown provider: {provider}")
async def generate(
self,
prompt: Prompt,
input_data: dict,
) -> BaseModel:
prompt_text = prompt.build(input_data)
last_error = None
for attempt in range(self.max_retries + 1):
response = await self.client.beta.chat.completions.parse(
model=prompt.model,
messages=[{"role": "user", "content": prompt_text}],
response_format=prompt.output_class,
)
choice = response.choices[0]
finish_reason = choice.finish_reason
if finish_reason == "stop":
return choice.message.parsed
if finish_reason == "length":
last_error = LLMResponseError("incomplete", finish_reason, "max tokens reached")
elif finish_reason == "content_filter":
last_error = LLMResponseError("failed", finish_reason, "blocked by content filter")
else:
last_error = LLMResponseError("failed", finish_reason, f"unexpected finish_reason: {finish_reason}")
raise last_error

89
app/integrations/naver.py Normal file
View File

@ -0,0 +1,89 @@
import re
from http import HTTPMethod
from common.utils import http_request
NAVER_BASE = "https://openapi.naver.com/v1/search"
class NaverClient:
def __init__(self, client_id: str, client_secret: str):
self.client_id = client_id
self.client_secret = client_secret
def _headers(self) -> dict:
return {
"X-Naver-Client-Id": self.client_id,
"X-Naver-Client-Secret": self.client_secret,
}
async def fetch_blog_search(self, query: str, display: int = 5) -> list[dict]:
resp = await http_request(
HTTPMethod.GET,
url=f"{NAVER_BASE}/blog.json",
headers=self._headers(),
params={"query": query, "display": display, "sort": "sim"},
label="naver-blog",
)
if not resp or not resp.is_success:
return []
return resp.json().get("items", [])
async def fetch_web_search(self, query: str, display: int = 10) -> list[dict]:
resp = await http_request(
HTTPMethod.GET,
url=f"{NAVER_BASE}/webkr.json",
headers=self._headers(),
params={"query": query, "display": display},
label="naver-web",
)
if not resp or not resp.is_success:
return []
return resp.json().get("items", [])
async def fetch_local_search(self, query: str, display: int = 5) -> list[dict]:
resp = await http_request(
HTTPMethod.GET,
url=f"{NAVER_BASE}/local.json",
headers=self._headers(),
params={"query": query, "display": display, "sort": "comment"},
label="naver-local",
)
if not resp or not resp.is_success:
return []
return resp.json().get("items", [])
async def fetch_blog_rss(self, blog_handle: str) -> str | None:
resp = await http_request(
HTTPMethod.GET,
url=f"https://rss.blog.naver.com/{blog_handle}.xml",
timeout=15,
label="naver-rss",
)
if not resp or not resp.is_success:
return None
return resp.text
async def get_blog_rss(self, blog_handle: str) -> dict | None:
xml = await self.fetch_blog_rss(blog_handle)
if not xml:
return None
posts = []
for m in re.finditer(r"<item>([\s\S]*?)</item>", xml):
block = m.group(1)
title = re.search(r"<title><!\[CDATA\[(.*?)\]\]></title>", block) or re.search(r"<title>(.*?)</title>", block)
link = re.search(r"<link>(.*?)</link>", block)
date = re.search(r"<pubDate>(.*?)</pubDate>", block)
desc = re.search(r"<description><!\[CDATA\[(.*?)\]\]></description>", block) or re.search(r"<description>(.*?)</description>", block)
posts.append({
"title": title.group(1) if title else "",
"link": link.group(1) if link else "",
"postDate": date.group(1) if date else "",
"description": re.sub(r"<[^>]*>", "", desc.group(1) if desc else "").strip()[:150],
})
total_match = re.search(r"<totalCount>(\d+)</totalCount>", xml)
return {
"officialBlogUrl": f"https://blog.naver.com/{blog_handle}",
"officialBlogHandle": blog_handle,
"totalResults": int(total_match.group(1)) if total_match else len(posts),
"posts": posts[:10],
}

123
app/integrations/youtube.py Normal file
View File

@ -0,0 +1,123 @@
from http import HTTPMethod
from common.utils import http_request
YT = "https://www.googleapis.com/youtube/v3"
class YouTubeClient:
def __init__(self, api_key: str):
self.api_key = api_key
async def _resolve_channel_id(self, handle: str) -> str:
h = handle.lstrip("@")
if h.startswith("UC") and len(h) == 24:
return h
for param in ("forHandle", "forUsername"):
resp = await http_request(
HTTPMethod.GET,
url=f"{YT}/channels",
params={"part": "id", param: h, "key": self.api_key},
label="yt-resolve",
)
if resp and resp.is_success:
items = resp.json().get("items", [])
if items:
return items[0]["id"]
return ""
async def fetch_channel(self, handle_or_id: str) -> dict | None:
channel_id = await self._resolve_channel_id(handle_or_id)
if not channel_id:
return None
resp = await http_request(
HTTPMethod.GET,
url=f"{YT}/channels",
params={"part": "snippet,statistics", "id": channel_id, "key": self.api_key},
label="yt-channel",
)
if not resp or not resp.is_success:
return None
items = resp.json().get("items", [])
if not items:
return None
channel = items[0]
video_ids: list[str] = []
resp = await http_request(
HTTPMethod.GET,
url=f"{YT}/search",
params={
"part": "snippet",
"channelId": channel_id,
"order": "viewCount",
"type": "video",
"maxResults": 10,
"key": self.api_key,
},
label="yt-search",
)
if resp and resp.is_success:
video_ids = [i["id"]["videoId"] for i in resp.json().get("items", []) if i.get("id", {}).get("videoId")]
videos: list[dict] = []
if video_ids:
resp = await http_request(
HTTPMethod.GET,
url=f"{YT}/videos",
params={
"part": "snippet,statistics,contentDetails",
"id": ",".join(video_ids),
"key": self.api_key,
},
label="yt-videos",
)
if resp and resp.is_success:
videos = resp.json().get("items", [])[:10]
return {"channelId": channel_id, "channel": channel, "videos": videos}
async def get_channel(self, handle_or_id: str) -> dict | None:
raw = await self.fetch_channel(handle_or_id)
if not raw:
return None
ch = raw["channel"]
stats = ch.get("statistics", {})
snippet = ch.get("snippet", {})
return {
"channelId": raw["channelId"],
"channelName": snippet.get("title"),
"handle": snippet.get("customUrl"),
"description": snippet.get("description", ""),
"publishedAt": snippet.get("publishedAt"),
"subscribers": int(stats.get("subscriberCount", 0)),
"totalViews": int(stats.get("viewCount", 0)),
"totalVideos": int(stats.get("videoCount", 0)),
"videos": [
{
"title": v.get("snippet", {}).get("title"),
"views": int(v.get("statistics", {}).get("viewCount", 0)),
"likes": int(v.get("statistics", {}).get("likeCount", 0)),
"comments": int(v.get("statistics", {}).get("commentCount", 0)),
"date": v.get("snippet", {}).get("publishedAt"),
"duration": v.get("contentDetails", {}).get("duration"),
"url": f"https://www.youtube.com/watch?v={v['id']}",
}
for v in raw["videos"]
],
}
async def search_channels(self, query: str, max_results: int = 3) -> list[str]:
resp = await http_request(
HTTPMethod.GET,
url=f"{YT}/search",
params={"part": "snippet", "type": "channel", "q": query, "maxResults": max_results, "key": self.api_key},
label="yt-search-channels",
)
if not resp or not resp.is_success:
return []
return [
i.get("snippet", {}).get("channelId") or i.get("id", {}).get("channelId")
for i in resp.json().get("items", [])
if i.get("snippet", {}).get("channelId") or i.get("id", {}).get("channelId")
]

54
app/test_fetch.py Normal file
View File

@ -0,0 +1,54 @@
import asyncio
import json
import os
from dotenv import load_dotenv
load_dotenv("../.env")
from common.utils import normalize_handle
from integrations.youtube import YouTubeClient
from integrations.apify import ApifyClient
from integrations.naver import NaverClient
from integrations.firecrawl import FirecrawlClient
INPUT = {
"youtube": "@banobagips",
"instagram": ["@banobagi_ps"],
"facebook": "BanobagiPlasticSurgery",
"naver_blog": "https://blog.naver.com/banobagiprs",
"gangnam_unni": "https://www.gangnamunni.com/hospitals/23",
}
OUT_DIR = "../test_results"
def save(name: str, data) -> None:
os.makedirs(OUT_DIR, exist_ok=True)
path = os.path.join(OUT_DIR, f"{name}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
print(f"saved → {path}")
async def main():
yt = YouTubeClient(api_key=os.environ["YOUTUBE_API_KEY"])
apify = ApifyClient(token=os.environ["APIFY_API_TOKEN"])
naver = NaverClient(client_id=os.environ["NAVER_CLIENT_ID"], client_secret=os.environ["NAVER_CLIENT_SECRET"])
firecrawl = FirecrawlClient(api_key=os.environ["FIRECRAWL_API_KEY"])
yt_handle = normalize_handle("youtube", INPUT["youtube"])
ig_handle = normalize_handle("instagram", INPUT["instagram"][0])
fb_handle = normalize_handle("facebook", INPUT["facebook"])
naver_handle = normalize_handle("naver_blog", INPUT["naver_blog"])
save("youtube", await yt.fetch_channel(yt_handle))
save("instagram_profile", await apify.fetch_instagram_profile(ig_handle))
# save("instagram_posts", await apify.fetch_instagram_posts(ig_handle))
# save("instagram_reels", await apify.fetch_instagram_reels(ig_handle))
save("facebook", await apify.fetch_facebook_page(f"https://www.facebook.com/{fb_handle}"))
save("naver_blog", await naver.fetch_blog_rss(naver_handle))
save("gangnam_unni", await firecrawl.fetch_gangnam_unni(INPUT["gangnam_unni"]))
asyncio.run(main())

View File

@ -4,6 +4,7 @@ pydantic==2.13.2
python-dotenv==1.2.2 python-dotenv==1.2.2
redis==7.4.0 redis==7.4.0
httpx==0.28.1 httpx==0.28.1
openai==2.32.0
python-jose[cryptography]==3.5.0 python-jose[cryptography]==3.5.0
passlib[bcrypt]==1.7.4 passlib[bcrypt]==1.7.4
python-multipart==0.0.26 python-multipart==0.0.26