import asyncio
import re
from html import unescape
from difflib import SequenceMatcher
from playwright.async_api import async_playwright
from urllib import parse
import time
from app.utils.logger import get_logger
# 로거 설정
logger = get_logger("pwscraper")
class NvMapPwScraper():
# cls vars
is_ready = False
_playwright = None
_browser = None
_context = None
_win_width = 1280
_win_height = 720
_max_retry = 3
_timeout = 60 # place id timeout threshold seconds
# instance var
page = None
@classmethod
def default_context_builder(cls):
context_builder_dict = {}
context_builder_dict['viewport'] = {
'width' : cls._win_width,
'height' : cls._win_height
}
context_builder_dict['screen'] = {
'width' : cls._win_width,
'height' : cls._win_height
}
context_builder_dict['user_agent'] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
context_builder_dict['locale'] = 'ko-KR'
context_builder_dict['timezone_id']='Asia/Seoul'
return context_builder_dict
@classmethod
async def initiate_scraper(cls):
if not cls._playwright:
cls._playwright = await async_playwright().start()
if not cls._browser:
cls._browser = await cls._playwright.chromium.launch(headless=True)
if not cls._context:
cls._context = await cls._browser.new_context(**cls.default_context_builder())
cls.is_ready = True
def __init__(self):
if not self.is_ready:
raise Exception("nvMapScraper is not initiated")
async def __aenter__(self):
await self.create_page()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.page.close()
async def create_page(self):
self.page = await self._context.new_page()
await self.page.add_init_script(
'''const defaultGetter = Object.getOwnPropertyDescriptor(
Navigator.prototype,
"webdriver"
).get;
defaultGetter.apply(navigator);
defaultGetter.toString();
Object.defineProperty(Navigator.prototype, "webdriver", {
set: undefined,
enumerable: true,
configurable: true,
get: new Proxy(defaultGetter, {
apply: (target, thisArg, args) => {
Reflect.apply(target, thisArg, args);
return false;
},
}),
});
const patchedGetter = Object.getOwnPropertyDescriptor(
Navigator.prototype,
"webdriver"
).get;
patchedGetter.apply(navigator);
patchedGetter.toString();''')
await self.page.set_extra_http_headers({
'sec-ch-ua': '\"Not?A_Brand\";v=\"99\", \"Chromium\";v=\"130\"'
})
await self.page.goto("http://google.com")
async def goto_url(self, url, wait_until="domcontentloaded", timeout=20000):
page = self.page
await page.goto(url, wait_until=wait_until, timeout=timeout)
@staticmethod
def _clean_title(text: str) -> str:
text = unescape(text) # HTML 엔티티 디코딩 (& → &)
text = re.sub(r"<.*?>", "", text) # HTML 태그 제거
return text.strip()
@staticmethod
def _similarity(a: str, b: str) -> float:
return SequenceMatcher(None, a, b).ratio()
@staticmethod
def _refine_address(address: str) -> str:
"""한국 주소 패턴에서 첫 번째 유효한 주소만 추출한다."""
patterns = [
# 도로명 (정식): 경기도 가평군 운악로 278
re.compile(
r'[가-힣]+(?:특별시|광역시|특별자치시|도|특별자치도|시)\s+'
r'[가-힣\s]+?(?:로|길|대로)\s+\d+(?:-\d+)?'
),
# 지번 (정식): 경기도 가평군 조종면 운악리 278
re.compile(
r'[가-힣]+(?:특별시|광역시|특별자치시|도|특별자치도|시)\s+'
r'[가-힣\s]+?(?:읍|면|동|리|가)\s+\d+(?:-\d+)?'
),
# 도로명 (축약): 경기 가평 운악로 278
re.compile(
r'[가-힣]{1,4}\s+[가-힣]{1,6}\s+'
r'[가-힣\s]+?(?:로|길|대로)\s+\d+(?:-\d+)?'
),
# 지번 (축약): 경기 가평 조종면 운악리 278
re.compile(
r'[가-힣]{1,4}\s+[가-힣]{1,6}\s+'
r'[가-힣\s]+?(?:읍|면|동|리|가)\s+\d+(?:-\d+)?'
),
]
for pattern in patterns:
m = pattern.search(address)
if m:
return m.group().strip()
return address
async def _extract_candidates_from_list_page(self) -> list[dict]:
"""pcmap.place.naver.com iframe HTML에서 place ID와 업체명을 추출한다."""
pcmap_frame = None
for frame in self.page.frames:
if "pcmap.place.naver.com" in frame.url:
pcmap_frame = frame
logger.debug(f"[DEBUG] pcmap frame 발견: {frame.url[:80]}")
break
if not pcmap_frame:
logger.debug("[DEBUG] pcmap frame 없음")
return []
try:
html = await pcmap_frame.content()
except Exception as e:
logger.debug(f"[DEBUG] pcmap frame content 추출 실패: {e}")
return []
# {"id":"11659052","name":"프레지던트 호텔",...} 형태의 JSON 쌍 추출
pair_pattern = re.compile(
r'"id"\s*:\s*"(\d{5,})"[^}]{0,200}?"name"\s*:\s*"([^"]{1,60})"'
r'|"name"\s*:\s*"([^"]{1,60})"[^}]{0,200}?"id"\s*:\s*"(\d{5,})"'
)
seen = {} # place_id → title (순서 보존)
for m in pair_pattern.finditer(html):
if m.group(1): # id 먼저
pid, title = m.group(1), m.group(2)
else: # name 먼저
pid, title = m.group(4), m.group(3)
if pid not in seen:
seen[pid] = title
candidates = [
{"title": title, "place_url": f"https://map.naver.com/p/entry/place/{pid}"}
for pid, title in list(seen.items())[:10]
]
for i, c in enumerate(candidates):
logger.debug(f"[DEBUG] 후보 {i+1}: {c['title']} / {c['place_url']}")
logger.debug(f"[DEBUG] 목록 후보 {len(candidates)}개 추출")
return candidates
async def _try_search(self, address: str, title: str) -> str | None:
"""주어진 주소+업체명으로 검색해서 place URL을 반환한다. 실패 시 None."""
encoded_query = parse.quote(f"{address} {title}".strip())
url = f"https://map.naver.com/p/search/{encoded_query}"
try:
await self.goto_url(url, wait_until="networkidle", timeout=self._timeout * 1000)
except:
if "/place/" in self.page.url:
return self.page.url
logger.error("[ERROR] Can't Finish networkidle")
if "/place/" in self.page.url:
return self.page.url
candidates = await self._extract_candidates_from_list_page()
if candidates:
best = max(
candidates,
key=lambda c: self._similarity(title, self._clean_title(c['title']))
)
best_score = self._similarity(title, self._clean_title(best['title']))
logger.info(
f"[AUTO-SELECT] '{title}' → '{best['title']}' (score={best_score:.2f}) {best['place_url']}"
)
return best['place_url']
# isCorrectAnswer=true 로 강제 단일결과 재시도 (원본 로직 유지)
correct_url = self.page.url.replace("?", "?isCorrectAnswer=true&")
try:
await self.goto_url(correct_url, wait_until="networkidle", timeout=self._timeout * 1000)
except:
if "/place/" in self.page.url:
return self.page.url
logger.error("[ERROR] Can't Finish networkidle (isCorrectAnswer)")
if "/place/" in self.page.url:
return self.page.url
return None
async def get_place_id_url(self, selected):
title = self._clean_title(selected['title'])
address = self._clean_title(selected.get('roadAddress', selected['address']))
# 1차 시도: 원본 주소 + 업체명
logger.debug(f"[DEBUG] 1차 시도 - address: {address}")
result = await self._try_search(address, title)
if result:
return result
# 2차 시도: 정제 주소 + 업체명
refined = self._refine_address(address)
if refined != address:
logger.info(f"[REFINE] 주소 정제: '{address}' → '{refined}'")
result = await self._try_search(refined, title)
if result:
return result
# 3차 시도: 업체명만으로 검색
logger.info(f"[RETRY] 업체명만으로 재시도: '{title}'")
result = await self._try_search("", title)
if result:
return result
logger.error(f"[ERROR] Not found url for {selected}")
return None