o2o-castad-backend/app/utils/nvMapPwScraper.py

141 lines
4.7 KiB
Python

import asyncio
from playwright.async_api import async_playwright
from urllib import parse
import time
from app.utils.logger import get_logger
# 로거 설정
logger = get_logger("pwscraper")
class NvMapPwScraper():
# cls vars
is_ready = False
_playwright = None
_browser = None
_context = None
_win_width = 1280
_win_height = 720
_max_retry = 60 # place id timeout threshold seconds
# instance var
page = None
@classmethod
def default_context_builder(cls):
context_builder_dict = {}
context_builder_dict['viewport'] = {
'width' : cls._win_width,
'height' : cls._win_height
}
context_builder_dict['screen'] = {
'width' : cls._win_width,
'height' : cls._win_height
}
context_builder_dict['user_agent'] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
context_builder_dict['locale'] = 'ko-KR'
context_builder_dict['timezone_id']='Asia/Seoul'
return context_builder_dict
@classmethod
async def initiate_scraper(cls):
if not cls._playwright:
cls._playwright = await async_playwright().start()
if not cls._browser:
cls._browser = await cls._playwright.chromium.launch(headless=True)
if not cls._context:
cls._context = await cls._browser.new_context(**cls.default_context_builder())
cls.is_ready = True
def __init__(self):
if not self.is_ready:
raise Exception("nvMapScraper is not initiated")
async def __aenter__(self):
await self.create_page()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.page.close()
async def create_page(self):
self.page = await self._context.new_page()
await self.page.add_init_script(
'''const defaultGetter = Object.getOwnPropertyDescriptor(
Navigator.prototype,
"webdriver"
).get;
defaultGetter.apply(navigator);
defaultGetter.toString();
Object.defineProperty(Navigator.prototype, "webdriver", {
set: undefined,
enumerable: true,
configurable: true,
get: new Proxy(defaultGetter, {
apply: (target, thisArg, args) => {
Reflect.apply(target, thisArg, args);
return false;
},
}),
});
const patchedGetter = Object.getOwnPropertyDescriptor(
Navigator.prototype,
"webdriver"
).get;
patchedGetter.apply(navigator);
patchedGetter.toString();''')
await self.page.set_extra_http_headers({
'sec-ch-ua': '\"Not?A_Brand\";v=\"99\", \"Chromium\";v=\"130\"'
})
await self.page.goto("http://google.com")
async def goto_url(self, url, wait_until="domcontentloaded", timeout=20000):
page = self.page
await page.goto(url, wait_until=wait_until, timeout=timeout)
async def get_place_id_url(self, selected):
count = 0
get_place_id_url_start = time.perf_counter()
while (count <= 1):
title = selected['title'].replace("<b>", "").replace("</b>", "")
address = selected.get('roadAddress', selected['address']).replace("<b>", "").replace("</b>", "")
encoded_query = parse.quote(f"{address} {title}")
url = f"https://map.naver.com/p/search/{encoded_query}"
wait_first_start = time.perf_counter()
await self.goto_url(url, wait_until="networkidle",timeout = self._max_retry/2*1000)
wait_first_time = (time.perf_counter() - wait_first_start) * 1000
logger.debug(f"[DEBUG] Try {count+1} : Wait for perfect matching : {wait_first_time}ms")
if "/place/" in self.page.url:
return self.page.url
logger.debug(f"[DEBUG] Try {count+1} : url place id not found, retry for forced collect answer")
wait_forced_correct_start = time.perf_counter()
url = self.page.url.replace("?","?isCorrectAnswer=true&")
await self.goto_url(url, wait_until="networkidle",timeout = self._max_retry/2*1000)
wait_forced_correct_time = (time.perf_counter() - wait_forced_correct_start) * 1000
logger.debug(f"[DEBUG] Try {count+1} : Wait for forced isCorrectAnswer flag : {wait_forced_correct_time}ms")
if "/place/" in self.page.url:
return self.page.url
count += 1
logger.error("[ERROR] Not found url for {selected}")
return None # 404
# if (count == self._max_retry / 2):
# raise Exception("Failed to identify place id. loading timeout")
# else:
# raise Exception("Failed to identify place id. item is ambiguous")