92 lines
3.5 KiB
Python
92 lines
3.5 KiB
Python
import re
|
|
from http import HTTPMethod
|
|
from urllib.parse import urlparse
|
|
from common.utils import http_request
|
|
|
|
NAVER_BASE = "https://openapi.naver.com/v1/search"
|
|
|
|
|
|
class NaverClient:
|
|
def __init__(self, client_id: str, client_secret: str):
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
|
|
def _headers(self) -> dict:
|
|
return {
|
|
"X-Naver-Client-Id": self.client_id,
|
|
"X-Naver-Client-Secret": self.client_secret,
|
|
}
|
|
|
|
async def fetch_blog_search(self, query: str, display: int = 5) -> list[dict]:
|
|
resp = await http_request(
|
|
HTTPMethod.GET,
|
|
url=f"{NAVER_BASE}/blog.json",
|
|
headers=self._headers(),
|
|
params={"query": query, "display": display, "sort": "sim"},
|
|
label="naver-blog",
|
|
)
|
|
if not resp or not resp.is_success:
|
|
return []
|
|
return resp.json().get("items", [])
|
|
|
|
async def fetch_web_search(self, query: str, display: int = 10) -> list[dict]:
|
|
resp = await http_request(
|
|
HTTPMethod.GET,
|
|
url=f"{NAVER_BASE}/webkr.json",
|
|
headers=self._headers(),
|
|
params={"query": query, "display": display},
|
|
label="naver-web",
|
|
)
|
|
if not resp or not resp.is_success:
|
|
return []
|
|
return resp.json().get("items", [])
|
|
|
|
async def fetch_local_search(self, query: str, display: int = 5) -> list[dict]:
|
|
resp = await http_request(
|
|
HTTPMethod.GET,
|
|
url=f"{NAVER_BASE}/local.json",
|
|
headers=self._headers(),
|
|
params={"query": query, "display": display, "sort": "comment"},
|
|
label="naver-local",
|
|
)
|
|
if not resp or not resp.is_success:
|
|
return []
|
|
return resp.json().get("items", [])
|
|
|
|
async def fetch_blog_rss(self, handle: str) -> str | None:
|
|
resp = await http_request(
|
|
HTTPMethod.GET,
|
|
url=f"https://rss.blog.naver.com/{handle}.xml",
|
|
timeout=15,
|
|
label="naver-rss",
|
|
)
|
|
if not resp or not resp.is_success:
|
|
return None
|
|
return resp.text
|
|
|
|
async def get_blog_rss(self, url: str) -> dict | None:
|
|
blog_handle = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url
|
|
xml = await self.fetch_blog_rss(blog_handle)
|
|
if not xml:
|
|
return None
|
|
posts = []
|
|
for m in re.finditer(r"<item>([\s\S]*?)</item>", xml):
|
|
block = m.group(1)
|
|
title = re.search(r"<title><!\[CDATA\[(.*?)\]\]></title>", block) or re.search(r"<title>(.*?)</title>", block)
|
|
link = re.search(r"<link>(.*?)</link>", block)
|
|
date = re.search(r"<pubDate>(.*?)</pubDate>", block)
|
|
desc = re.search(r"<description><!\[CDATA\[(.*?)\]\]></description>", block) or re.search(r"<description>(.*?)</description>", block)
|
|
posts.append({
|
|
"title": title.group(1) if title else "",
|
|
"link": link.group(1) if link else "",
|
|
"postDate": date.group(1) if date else "",
|
|
"description": re.sub(r"<[^>]*>", "", desc.group(1) if desc else "").strip()[:150],
|
|
})
|
|
total_match = re.search(r"<totalCount>(\d+)</totalCount>", xml)
|
|
return {
|
|
"officialBlogUrl": f"https://blog.naver.com/{blog_handle}",
|
|
"officialBlogHandle": blog_handle,
|
|
"totalResults": int(total_match.group(1)) if total_match else len(posts),
|
|
"posts": posts[:10],
|
|
}
|