import asyncio import re from difflib import SequenceMatcher from playwright.async_api import async_playwright from urllib import parse import time from app.utils.logger import get_logger # 로거 설정 logger = get_logger("pwscraper") class NvMapPwScraper(): # cls vars is_ready = False _playwright = None _browser = None _context = None _win_width = 1280 _win_height = 720 _max_retry = 3 _timeout = 60 # place id timeout threshold seconds # instance var page = None @classmethod def default_context_builder(cls): context_builder_dict = {} context_builder_dict['viewport'] = { 'width' : cls._win_width, 'height' : cls._win_height } context_builder_dict['screen'] = { 'width' : cls._win_width, 'height' : cls._win_height } context_builder_dict['user_agent'] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" context_builder_dict['locale'] = 'ko-KR' context_builder_dict['timezone_id']='Asia/Seoul' return context_builder_dict @classmethod async def initiate_scraper(cls): if not cls._playwright: cls._playwright = await async_playwright().start() if not cls._browser: cls._browser = await cls._playwright.chromium.launch(headless=True) if not cls._context: cls._context = await cls._browser.new_context(**cls.default_context_builder()) cls.is_ready = True def __init__(self): if not self.is_ready: raise Exception("nvMapScraper is not initiated") async def __aenter__(self): await self.create_page() return self async def __aexit__(self, exc_type, exc, tb): await self.page.close() async def create_page(self): self.page = await self._context.new_page() await self.page.add_init_script( '''const defaultGetter = Object.getOwnPropertyDescriptor( Navigator.prototype, "webdriver" ).get; defaultGetter.apply(navigator); defaultGetter.toString(); Object.defineProperty(Navigator.prototype, "webdriver", { set: undefined, enumerable: true, configurable: true, get: new Proxy(defaultGetter, { apply: (target, thisArg, args) => { Reflect.apply(target, thisArg, args); return false; }, }), }); const patchedGetter = Object.getOwnPropertyDescriptor( Navigator.prototype, "webdriver" ).get; patchedGetter.apply(navigator); patchedGetter.toString();''') await self.page.set_extra_http_headers({ 'sec-ch-ua': '\"Not?A_Brand\";v=\"99\", \"Chromium\";v=\"130\"' }) await self.page.goto("http://google.com") async def goto_url(self, url, wait_until="domcontentloaded", timeout=20000): page = self.page await page.goto(url, wait_until=wait_until, timeout=timeout) @staticmethod def _clean_title(text: str) -> str: return re.sub(r"<.*?>", "", text).strip() @staticmethod def _similarity(a: str, b: str) -> float: return SequenceMatcher(None, a, b).ratio() @staticmethod def _refine_address(address: str) -> str: """한국 주소 패턴에서 첫 번째 유효한 주소만 추출한다.""" patterns = [ # 도로명 (정식): 경기도 가평군 운악로 278 re.compile( r'[가-힣]+(?:특별시|광역시|특별자치시|도|특별자치도|시)\s+' r'[가-힣\s]+?(?:로|길|대로)\s+\d+(?:-\d+)?' ), # 지번 (정식): 경기도 가평군 조종면 운악리 278 re.compile( r'[가-힣]+(?:특별시|광역시|특별자치시|도|특별자치도|시)\s+' r'[가-힣\s]+?(?:읍|면|동|리|가)\s+\d+(?:-\d+)?' ), # 도로명 (축약): 경기 가평 운악로 278 re.compile( r'[가-힣]{1,4}\s+[가-힣]{1,6}\s+' r'[가-힣\s]+?(?:로|길|대로)\s+\d+(?:-\d+)?' ), # 지번 (축약): 경기 가평 조종면 운악리 278 re.compile( r'[가-힣]{1,4}\s+[가-힣]{1,6}\s+' r'[가-힣\s]+?(?:읍|면|동|리|가)\s+\d+(?:-\d+)?' ), ] for pattern in patterns: m = pattern.search(address) if m: return m.group().strip() return address async def _extract_candidates_from_list_page(self) -> list[dict]: """pcmap.place.naver.com iframe HTML에서 place ID와 업체명을 추출한다.""" pcmap_frame = None for frame in self.page.frames: if "pcmap.place.naver.com" in frame.url: pcmap_frame = frame logger.debug(f"[DEBUG] pcmap frame 발견: {frame.url[:80]}") break if not pcmap_frame: logger.debug("[DEBUG] pcmap frame 없음") return [] try: html = await pcmap_frame.content() except Exception as e: logger.debug(f"[DEBUG] pcmap frame content 추출 실패: {e}") return [] # {"id":"11659052","name":"프레지던트 호텔",...} 형태의 JSON 쌍 추출 pair_pattern = re.compile( r'"id"\s*:\s*"(\d{5,})"[^}]{0,200}?"name"\s*:\s*"([^"]{1,60})"' r'|"name"\s*:\s*"([^"]{1,60})"[^}]{0,200}?"id"\s*:\s*"(\d{5,})"' ) seen = {} # place_id → title (순서 보존) for m in pair_pattern.finditer(html): if m.group(1): # id 먼저 pid, title = m.group(1), m.group(2) else: # name 먼저 pid, title = m.group(4), m.group(3) if pid not in seen: seen[pid] = title candidates = [ {"title": title, "place_url": f"https://map.naver.com/p/entry/place/{pid}"} for pid, title in list(seen.items())[:10] ] for i, c in enumerate(candidates): logger.debug(f"[DEBUG] 후보 {i+1}: {c['title']} / {c['place_url']}") logger.debug(f"[DEBUG] 목록 후보 {len(candidates)}개 추출") return candidates async def _try_search(self, address: str, title: str) -> str | None: """주어진 주소+업체명으로 검색해서 place URL을 반환한다. 실패 시 None.""" encoded_query = parse.quote(f"{address} {title}".strip()) url = f"https://map.naver.com/p/search/{encoded_query}" try: await self.goto_url(url, wait_until="networkidle", timeout=self._timeout * 1000) except: if "/place/" in self.page.url: return self.page.url logger.error("[ERROR] Can't Finish networkidle") if "/place/" in self.page.url: return self.page.url candidates = await self._extract_candidates_from_list_page() if candidates: best = max( candidates, key=lambda c: self._similarity(title, self._clean_title(c['title'])) ) best_score = self._similarity(title, self._clean_title(best['title'])) logger.info( f"[AUTO-SELECT] '{title}' → '{best['title']}' (score={best_score:.2f}) {best['place_url']}" ) return best['place_url'] # isCorrectAnswer=true 로 강제 단일결과 재시도 (원본 로직 유지) correct_url = self.page.url.replace("?", "?isCorrectAnswer=true&") try: await self.goto_url(correct_url, wait_until="networkidle", timeout=self._timeout * 1000) except: if "/place/" in self.page.url: return self.page.url logger.error("[ERROR] Can't Finish networkidle (isCorrectAnswer)") if "/place/" in self.page.url: return self.page.url return None async def get_place_id_url(self, selected): title = self._clean_title(selected['title']) address = self._clean_title(selected.get('roadAddress', selected['address'])) # 1차 시도: 원본 주소 + 업체명 logger.debug(f"[DEBUG] 1차 시도 - address: {address}") result = await self._try_search(address, title) if result: return result # 2차 시도: 정제 주소 + 업체명 refined = self._refine_address(address) if refined != address: logger.info(f"[REFINE] 주소 정제: '{address}' → '{refined}'") result = await self._try_search(refined, title) if result: return result # 3차 시도: 업체명만으로 검색 logger.info(f"[RETRY] 업체명만으로 재시도: '{title}'") result = await self._try_search("", title) if result: return result logger.error(f"[ERROR] Not found url for {selected}") return None