업체명 검색시 동일주소에 2개 이상이 존재 하는 경우의 선택 로직 추가

feature-credit
김성경 2026-05-06 16:42:50 +09:00
parent 408744ad07
commit 593d042156
1 changed files with 82 additions and 32 deletions

View File

@ -1,4 +1,6 @@
import asyncio import asyncio
import re
from difflib import SequenceMatcher
from playwright.async_api import async_playwright from playwright.async_api import async_playwright
from urllib import parse from urllib import parse
import time import time
@ -95,57 +97,105 @@ patchedGetter.toString();''')
page = self.page page = self.page
await page.goto(url, wait_until=wait_until, timeout=timeout) await page.goto(url, wait_until=wait_until, timeout=timeout)
@staticmethod
def _clean_title(text: str) -> str:
return re.sub(r"<.*?>", "", text).strip()
@staticmethod
def _similarity(a: str, b: str) -> float:
return SequenceMatcher(None, a, b).ratio()
async def _extract_candidates_from_list_page(self) -> list[dict]:
"""pcmap.place.naver.com iframe HTML에서 place ID와 업체명을 추출한다."""
pcmap_frame = None
for frame in self.page.frames:
if "pcmap.place.naver.com" in frame.url:
pcmap_frame = frame
logger.debug(f"[DEBUG] pcmap frame 발견: {frame.url[:80]}")
break
if not pcmap_frame:
logger.debug("[DEBUG] pcmap frame 없음")
return []
try:
html = await pcmap_frame.content()
except Exception as e:
logger.debug(f"[DEBUG] pcmap frame content 추출 실패: {e}")
return []
# {"id":"11659052","name":"프레지던트 호텔",...} 형태의 JSON 쌍 추출
pair_pattern = re.compile(
r'"id"\s*:\s*"(\d{5,})"[^}]{0,200}?"name"\s*:\s*"([^"]{1,60})"'
r'|"name"\s*:\s*"([^"]{1,60})"[^}]{0,200}?"id"\s*:\s*"(\d{5,})"'
)
seen = {} # place_id → title (순서 보존)
for m in pair_pattern.finditer(html):
if m.group(1): # id 먼저
pid, title = m.group(1), m.group(2)
else: # name 먼저
pid, title = m.group(4), m.group(3)
if pid not in seen:
seen[pid] = title
candidates = [
{"title": title, "place_url": f"https://map.naver.com/p/entry/place/{pid}"}
for pid, title in list(seen.items())[:10]
]
for i, c in enumerate(candidates):
logger.debug(f"[DEBUG] 후보 {i+1}: {c['title']} / {c['place_url']}")
logger.debug(f"[DEBUG] 목록 후보 {len(candidates)}개 추출")
return candidates
async def get_place_id_url(self, selected): async def get_place_id_url(self, selected):
count = 0 count = 0
get_place_id_url_start = time.perf_counter() title = self._clean_title(selected['title'])
while (count <= self._max_retry): address = self._clean_title(selected.get('roadAddress', selected['address']))
title = selected['title'].replace("<b>", "").replace("</b>", "")
address = selected.get('roadAddress', selected['address']).replace("<b>", "").replace("</b>", "")
encoded_query = parse.quote(f"{address} {title}") encoded_query = parse.quote(f"{address} {title}")
url = f"https://map.naver.com/p/search/{encoded_query}" url = f"https://map.naver.com/p/search/{encoded_query}"
wait_first_start = time.perf_counter() while count <= self._max_retry:
try: try:
await self.goto_url(url, wait_until="networkidle",timeout = self._timeout*1000) await self.goto_url(url, wait_until="networkidle", timeout=self._timeout * 1000)
except: except:
if "/place/" in self.page.url: if "/place/" in self.page.url:
return self.page.url return self.page.url
logger.error(f"[ERROR] Can't Finish networkidle") logger.error("[ERROR] Can't Finish networkidle")
logger.debug(f"[DEBUG] Try {count+1} : current url = {self.page.url}")
wait_first_time = (time.perf_counter() - wait_first_start) * 1000
logger.debug(f"[DEBUG] Try {count+1} : Wait for perfect matching : {wait_first_time}ms")
if "/place/" in self.page.url: if "/place/" in self.page.url:
return self.page.url return self.page.url
# 목록 페이지에 머문 경우 — 후보 추출 후 유사도 선택
candidates = await self._extract_candidates_from_list_page()
if candidates:
best = max(
candidates,
key=lambda c: self._similarity(title, self._clean_title(c['title']))
)
best_score = self._similarity(title, self._clean_title(best['title']))
logger.info(
f"[AUTO-SELECT] '{title}''{best['title']}' (score={best_score:.2f}) {best['place_url']}"
)
return best['place_url']
logger.debug(f"[DEBUG] Try {count+1} : url place id not found, retry for forced collect answer") # isCorrectAnswer 플래그 재시도
wait_forced_correct_start = time.perf_counter() url = self.page.url.replace("?", "?isCorrectAnswer=true&")
url = self.page.url.replace("?","?isCorrectAnswer=true&")
try: try:
await self.goto_url(url, wait_until="networkidle",timeout = self._timeout*1000) await self.goto_url(url, wait_until="networkidle", timeout=self._timeout * 1000)
except: except:
if "/place/" in self.page.url: if "/place/" in self.page.url:
return self.page.url return self.page.url
logger.error(f"[ERROR] Can't Finish networkidle") logger.error("[ERROR] Can't Finish networkidle")
wait_forced_correct_time = (time.perf_counter() - wait_forced_correct_start) * 1000
logger.debug(f"[DEBUG] Try {count+1} : Wait for forced isCorrectAnswer flag : {wait_forced_correct_time}ms")
if "/place/" in self.page.url: if "/place/" in self.page.url:
return self.page.url return self.page.url
count += 1 count += 1
logger.error("[ERROR] Not found url for {selected}") logger.error(f"[ERROR] Not found url for {selected}")
return None
return None # 404
# if (count == self._max_retry / 2):
# raise Exception("Failed to identify place id. loading timeout")
# else:
# raise Exception("Failed to identify place id. item is ambiguous")