diff --git a/app/utils/nvMapPwScraper.py b/app/utils/nvMapPwScraper.py index 3fc6468..dde73f5 100644 --- a/app/utils/nvMapPwScraper.py +++ b/app/utils/nvMapPwScraper.py @@ -1,4 +1,6 @@ import asyncio +import re +from difflib import SequenceMatcher from playwright.async_api import async_playwright from urllib import parse import time @@ -95,57 +97,105 @@ patchedGetter.toString();''') page = self.page await page.goto(url, wait_until=wait_until, timeout=timeout) + @staticmethod + def _clean_title(text: str) -> str: + return re.sub(r"<.*?>", "", text).strip() + + @staticmethod + def _similarity(a: str, b: str) -> float: + return SequenceMatcher(None, a, b).ratio() + + async def _extract_candidates_from_list_page(self) -> list[dict]: + """pcmap.place.naver.com iframe HTML에서 place ID와 업체명을 추출한다.""" + pcmap_frame = None + for frame in self.page.frames: + if "pcmap.place.naver.com" in frame.url: + pcmap_frame = frame + logger.debug(f"[DEBUG] pcmap frame 발견: {frame.url[:80]}") + break + + if not pcmap_frame: + logger.debug("[DEBUG] pcmap frame 없음") + return [] + + try: + html = await pcmap_frame.content() + except Exception as e: + logger.debug(f"[DEBUG] pcmap frame content 추출 실패: {e}") + return [] + + # {"id":"11659052","name":"프레지던트 호텔",...} 형태의 JSON 쌍 추출 + pair_pattern = re.compile( + r'"id"\s*:\s*"(\d{5,})"[^}]{0,200}?"name"\s*:\s*"([^"]{1,60})"' + r'|"name"\s*:\s*"([^"]{1,60})"[^}]{0,200}?"id"\s*:\s*"(\d{5,})"' + ) + + seen = {} # place_id → title (순서 보존) + for m in pair_pattern.finditer(html): + if m.group(1): # id 먼저 + pid, title = m.group(1), m.group(2) + else: # name 먼저 + pid, title = m.group(4), m.group(3) + if pid not in seen: + seen[pid] = title + + candidates = [ + {"title": title, "place_url": f"https://map.naver.com/p/entry/place/{pid}"} + for pid, title in list(seen.items())[:10] + ] + + for i, c in enumerate(candidates): + logger.debug(f"[DEBUG] 후보 {i+1}: {c['title']} / {c['place_url']}") + + logger.debug(f"[DEBUG] 목록 후보 {len(candidates)}개 추출") + return candidates + async def get_place_id_url(self, selected): count = 0 - get_place_id_url_start = time.perf_counter() - while (count <= self._max_retry): - title = selected['title'].replace("", "").replace("", "") - address = selected.get('roadAddress', selected['address']).replace("", "").replace("", "") - encoded_query = parse.quote(f"{address} {title}") - url = f"https://map.naver.com/p/search/{encoded_query}" - - wait_first_start = time.perf_counter() + title = self._clean_title(selected['title']) + address = self._clean_title(selected.get('roadAddress', selected['address'])) + encoded_query = parse.quote(f"{address} {title}") + url = f"https://map.naver.com/p/search/{encoded_query}" + while count <= self._max_retry: try: - await self.goto_url(url, wait_until="networkidle",timeout = self._timeout*1000) + await self.goto_url(url, wait_until="networkidle", timeout=self._timeout * 1000) except: if "/place/" in self.page.url: return self.page.url - logger.error(f"[ERROR] Can't Finish networkidle") + logger.error("[ERROR] Can't Finish networkidle") - - wait_first_time = (time.perf_counter() - wait_first_start) * 1000 - - logger.debug(f"[DEBUG] Try {count+1} : Wait for perfect matching : {wait_first_time}ms") + logger.debug(f"[DEBUG] Try {count+1} : current url = {self.page.url}") if "/place/" in self.page.url: return self.page.url + # 목록 페이지에 머문 경우 — 후보 추출 후 유사도 선택 + candidates = await self._extract_candidates_from_list_page() + if candidates: + best = max( + candidates, + key=lambda c: self._similarity(title, self._clean_title(c['title'])) + ) + best_score = self._similarity(title, self._clean_title(best['title'])) + logger.info( + f"[AUTO-SELECT] '{title}' → '{best['title']}' (score={best_score:.2f}) {best['place_url']}" + ) + return best['place_url'] - logger.debug(f"[DEBUG] Try {count+1} : url place id not found, retry for forced collect answer") - wait_forced_correct_start = time.perf_counter() - - url = self.page.url.replace("?","?isCorrectAnswer=true&") + # isCorrectAnswer 플래그 재시도 + url = self.page.url.replace("?", "?isCorrectAnswer=true&") try: - await self.goto_url(url, wait_until="networkidle",timeout = self._timeout*1000) + await self.goto_url(url, wait_until="networkidle", timeout=self._timeout * 1000) except: if "/place/" in self.page.url: return self.page.url - logger.error(f"[ERROR] Can't Finish networkidle") + logger.error("[ERROR] Can't Finish networkidle") - wait_forced_correct_time = (time.perf_counter() - wait_forced_correct_start) * 1000 - logger.debug(f"[DEBUG] Try {count+1} : Wait for forced isCorrectAnswer flag : {wait_forced_correct_time}ms") - if "/place/" in self.page.url: return self.page.url + count += 1 - logger.error("[ERROR] Not found url for {selected}") - - return None # 404 - - - # if (count == self._max_retry / 2): - # raise Exception("Failed to identify place id. loading timeout") - # else: - # raise Exception("Failed to identify place id. item is ambiguous") + logger.error(f"[ERROR] Not found url for {selected}") + return None