ADO2VMCrawler/selenium_crawler/naver_map_crawler.py

574 lines
21 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
네이버 지도 크롤러 모듈
지도 검색 및 장소 정보 수집 기능
"""
import os
import time
import re
from datetime import datetime
from typing import List, Dict, Optional
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from .logger_utils import LoggerMixin, log_execution_time, log_exception, config
class NaverMapCrawler(LoggerMixin):
"""네이버 지도 크롤러"""
def __init__(self, driver: webdriver.Chrome, wait: WebDriverWait, project_dir: str):
"""
초기화
Args:
driver: 웹드라이버
wait: WebDriverWait 객체
project_dir: 프로젝트 디렉토리
"""
self.driver = driver
self.wait = wait
self.project_dir = project_dir
self.current_frame = None
# 로거 설정
self.setup_logger(log_prefix='map_crawler')
@log_execution_time()
def search_place(self, query: str) -> bool:
"""
장소 검색
Args:
query: 검색어
Returns:
성공 여부
"""
try:
# 로딩 지연 시간 설정 읽기
loading_delays = config.get('loading_delays', {})
self.log_info(f"네이버 지도 접속 및 '{query}' 검색 시작...")
self.driver.get("https://map.naver.com/")
# 초기 페이지 로드 대기 시간 증가
initial_wait = loading_delays.get('initial_page_load', 10)
self.log_info(f"페이지 로드 대기: {initial_wait}")
time.sleep(initial_wait)
self._save_screenshot("01_main_page")
# 검색창 찾기 (재시도 로직 추가)
search_box = None
for attempt in range(3):
search_box = self._find_search_box()
if search_box:
break
self.log_warning(f"검색창 찾기 시도 {attempt + 1}/3 실패, 재시도...")
time.sleep(3)
if not search_box:
self.log_error("검색창을 찾을 수 없습니다")
return False
# 검색 수행
self.log_info(f"검색어 입력: {query}")
search_box.clear()
time.sleep(1)
search_box.send_keys(query)
time.sleep(1)
search_box.send_keys(Keys.RETURN)
# 검색 후 대기 시간 증가
after_search_wait = loading_delays.get('after_search', 15)
self.log_info(f"검색 결과 로드 대기: {after_search_wait}")
time.sleep(after_search_wait)
self._save_screenshot("02_search_result")
# searchIframe 로드 대기 (시간 증가)
try:
iframe_wait = loading_delays.get('iframe_wait', 5)
self.wait.until(EC.presence_of_element_located((By.ID, "searchIframe")))
self.log_info("searchIframe 로드 확인")
time.sleep(iframe_wait)
except:
self.log_warning("searchIframe을 찾을 수 없음")
# 검색 결과 클릭
self.log_info("검색 결과 클릭 시도...")
result_wait = loading_delays.get('search_result_wait', 10)
time.sleep(result_wait)
if self._click_search_result():
# entryIframe이 나타날 때까지 대기 (최대 10초)
try:
self.driver.switch_to.default_content()
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, "entryIframe"))
)
self.log_info("entryIframe 로드 확인")
# iframe으로 전환하고 장소명 요소 대기
self.driver.switch_to.frame("entryIframe")
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".GHAhO, .place_name, h2.place_name"))
)
self.driver.switch_to.default_content()
self.log_info("장소 상세 페이지 로드 완료")
except TimeoutException:
self.log_warning("장소 상세 페이지 로드 타임아웃")
self._save_screenshot("03_place_detail")
return True
else:
self.log_error("검색 결과 클릭 실패")
return False
except Exception as e:
self.log_error(f"검색 중 오류: {e}", exc_info=True)
return False
@log_exception()
def collect_place_info(self) -> Dict:
"""
장소 기본 정보 수집
Returns:
장소 정보 딕셔너리
"""
place_info = {
'name': '',
'address': '',
'tel': '',
'category': '',
'business_hours': '',
'homepage': '',
'description': '',
'collected_at': datetime.now().isoformat()
}
try:
# entryIframe으로 전환
self._ensure_correct_frame("entryIframe")
# 동적 대기로 변경됨
# 정보 수집 매핑
info_mappings = {
'name': [".GHAhO", ".place_name", "h2.place_name", "._3XamX"],
'address': ["span.LDgIH", ".address", "._2yqUQ", "span[class*='addr']"],
'category': [".DJJvD", ".category", "._3ocDE"]
}
# 각 정보 수집
for field, selectors in info_mappings.items():
for selector in selectors:
try:
elem = self.driver.find_element(By.CSS_SELECTOR, selector)
place_info[field] = elem.text.strip()
if place_info[field]:
break
except:
continue
# 전화번호 수집 (특별 처리)
place_info['tel'] = self._extract_phone_number()
# 영업시간
try:
hours_elem = self.driver.find_element(By.CSS_SELECTOR, "._1APRQ")
place_info['business_hours'] = hours_elem.text.strip()
except:
pass
# 홈페이지
try:
homepage_elem = self.driver.find_element(By.CSS_SELECTOR, "a[class*='homepage']")
place_info['homepage'] = homepage_elem.get_attribute('href')
except:
pass
# 설명
try:
desc_elem = self.driver.find_element(By.CSS_SELECTOR, ".T8RMe")
place_info['description'] = desc_elem.text.strip()
except:
pass
self.log_info(f"장소 정보 수집 완료: {place_info['name']} - {place_info['address']}")
except Exception as e:
self.log_error(f"장소 정보 수집 중 오류: {e}")
return place_info
def collect_photo_urls(self) -> List[str]:
"""
사진 탭에서 사진 URL 수집
Returns:
사진 URL 리스트
"""
photo_urls = []
try:
self._ensure_correct_frame("entryIframe")
# 사진 탭 클릭
if not self._click_tab("사진"):
self.log_warning("사진 탭을 찾을 수 없습니다")
return photo_urls
time.sleep(3)
# 사진 URL 수집
photo_selectors = [
"img[class*='photo']",
"img[class*='thumb']",
"img[src*='phinf']",
"img[src*='blogfiles']"
]
all_images = []
for selector in photo_selectors:
images = self.driver.find_elements(By.CSS_SELECTOR, selector)
all_images.extend(images)
seen_urls = set()
for img in all_images[:20]:
try:
src = img.get_attribute("src") or img.get_attribute("data-src")
if src and src.startswith("http") and src not in seen_urls:
from naver_blog_crawler import clean_image_url
original_url = clean_image_url(src)
seen_urls.add(original_url)
photo_urls.append(original_url)
except:
continue
self.log_info(f"사진 탭에서 {len(photo_urls)}개 URL 수집")
except Exception as e:
self.log_error(f"사진 수집 중 오류: {e}")
return photo_urls
def collect_map_blog_reviews(self) -> List[Dict]:
"""
지도에서 블로그 리뷰 정보 수집
Returns:
블로그 리뷰 리스트
"""
reviews = []
try:
self._ensure_correct_frame("entryIframe")
# 블로그 탭 클릭
if not self._click_tab("블로그"):
self.log_warning("블로그 탭을 찾을 수 없습니다")
return reviews
time.sleep(3)
# 블로그 리뷰 요소 찾기
review_elems = self.driver.find_elements(By.CSS_SELECTOR, "li._2kAri")
for idx, elem in enumerate(review_elems[:10]):
try:
review_text = elem.text.strip()
lines = review_text.split('\n')
review_info = {
'title': lines[0] if lines else '',
'preview': ' '.join(lines[1:3]) if len(lines) > 1 else '',
'date': '',
'is_ad': False,
'ad_phrases': []
}
# 날짜 찾기
date_match = re.search(r'(\d{4}\.\d{1,2}\.\d{1,2})', review_text)
if date_match:
review_info['date'] = date_match.group(1)
# 광고성 문구 검색
ad_keywords = ['광고', '협찬', '제공받', '초대받', '스폰서', '파트너스', '수수료']
full_text = review_info['title'] + ' ' + review_info['preview']
for keyword in ad_keywords:
if keyword in full_text:
review_info['is_ad'] = True
idx = full_text.find(keyword)
start = max(0, idx - 20)
end = min(len(full_text), idx + 30)
review_info['ad_phrases'].append(full_text[start:end].strip())
reviews.append(review_info)
self.log_debug(f"리뷰 수집: {review_info['title'][:30]}...")
except Exception as e:
self.log_debug(f"리뷰 추출 중 오류: {e}")
continue
self.log_info(f"지도에서 블로그 리뷰 {len(reviews)}개 수집")
except Exception as e:
self.log_error(f"블로그 리뷰 수집 중 오류: {e}")
return reviews
def collect_blog_links(self, max_count: int = 5) -> List[tuple]:
"""
블로그 링크 수집
Args:
max_count: 최대 수집 개수
Returns:
(제목, URL) 튜플 리스트
"""
blog_links = []
self._ensure_correct_frame("entryIframe")
review_selectors = [
"li._2kAri",
"li[class*='blog']",
"ul[class*='list'] > li",
".place_section_content li"
]
review_elements = []
for selector in review_selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
valid_elements = []
for elem in elements:
try:
text = elem.text.strip()
if text and len(text) > 20 and '\n' in text:
valid_elements.append(elem)
except:
continue
if valid_elements:
review_elements = valid_elements
self.log_info(f"유효한 리뷰 요소 {len(valid_elements)}개 발견")
break
except:
continue
# 블로그 링크 추출
for elem in review_elements[:max_count]:
try:
text = elem.text.strip()
lines = text.split('\n')
title = lines[0] if lines else "제목 없음"
# URL 추출
url = self._extract_url_from_element(elem)
if url:
blog_links.append((title, url))
self.log_debug(f"블로그 링크 수집: {title[:30]}...")
except Exception as e:
self.log_debug(f"링크 추출 중 오류: {e}")
continue
self.log_info(f"{len(blog_links)}개의 블로그 링크 수집")
return blog_links
def _find_search_box(self):
"""검색창 찾기"""
self.log_debug("검색창 찾기 시도 중...")
time.sleep(2)
selectors = [
"input.input_search",
"input[placeholder*='검색']",
"input[type='search']",
"#search-input",
"input[name='query']",
"//input[@placeholder]",
"//input[contains(@class, 'search')]"
]
for selector in selectors:
try:
if selector.startswith('//'):
elem = self.driver.find_element(By.XPATH, selector)
else:
elem = self.driver.find_element(By.CSS_SELECTOR, selector)
if elem.is_displayed() and elem.is_enabled():
self.log_debug(f"검색창 발견: {selector}")
return elem
except:
continue
return None
def _click_search_result(self) -> bool:
"""검색 결과 클릭"""
# 동적 대기로 변경
# searchIframe 전환 시도
try:
self.driver.switch_to.default_content()
self.wait.until(EC.presence_of_element_located((By.ID, "searchIframe")))
self.driver.switch_to.frame("searchIframe")
self.log_debug("searchIframe으로 전환 성공")
except:
self.log_debug("searchIframe 전환 실패")
selectors = [
"a.place_bluelink",
"span.place_bluelink",
"//span[contains(@class, 'place')]",
"//a[contains(@class, 'place')]",
"li[class*='item'] a",
"div[class*='item'] a"
]
for selector in selectors:
try:
if selector.startswith('//'):
elements = self.driver.find_elements(By.XPATH, selector)
else:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
elem = elements[0]
self.driver.execute_script("arguments[0].scrollIntoView(true);", elem)
time.sleep(1)
try:
elem.click()
except:
self.driver.execute_script("arguments[0].click();", elem)
self.log_debug(f"검색 결과 클릭 성공: {selector}")
return True
except:
continue
self.driver.switch_to.default_content()
return False
def _click_tab(self, tab_name: str) -> bool:
"""탭 클릭"""
tab_selectors = [
f"//a[contains(text(), '{tab_name}')]",
f"//span[contains(text(), '{tab_name}')]",
f"a[href*='{tab_name.lower()}']"
]
for selector in tab_selectors:
try:
if selector.startswith('//'):
elem = self.driver.find_element(By.XPATH, selector)
else:
elem = self.driver.find_element(By.CSS_SELECTOR, selector)
if elem.is_displayed():
self.driver.execute_script("arguments[0].click();", elem)
self.log_debug(f"{tab_name} 탭 클릭 성공")
return True
except:
continue
return False
def _extract_phone_number(self) -> str:
"""전화번호 추출"""
tel_selectors = ["span.xlx7Q", "a[href^='tel:']", "._3ZA0S", ".phone"]
for selector in tel_selectors:
try:
elem = self.driver.find_element(By.CSS_SELECTOR, selector)
tel_text = elem.text.strip()
tel_match = re.search(r'(\d{2,4}[-.]?\d{3,4}[-.]?\d{4})', tel_text)
if tel_match:
return tel_match.group(1)
except:
continue
return ''
def _extract_url_from_element(self, elem) -> Optional[str]:
"""요소에서 URL 추출"""
# a 태그에서 href 찾기
try:
link_elements = elem.find_elements(By.TAG_NAME, "a")
for link_elem in link_elements:
href = link_elem.get_attribute("href")
if href and href.startswith("http"):
return href
except:
pass
# onclick 속성에서 URL 추출
try:
onclick = elem.get_attribute("onclick")
if onclick and "http" in onclick:
url_match = re.search(r'(https?://[^\s\'"]+)', onclick)
if url_match:
return url_match.group(1)
except:
pass
return None
def _ensure_correct_frame(self, target_frame: str = None):
"""올바른 프레임으로 전환"""
if target_frame != self.current_frame:
if target_frame is None:
self.driver.switch_to.default_content()
self.current_frame = None
self.log_debug("기본 프레임으로 전환")
else:
try:
self.driver.switch_to.default_content()
self.wait.until(EC.presence_of_element_located((By.ID, target_frame)))
self.driver.switch_to.frame(target_frame)
self.current_frame = target_frame
self.log_debug(f"{target_frame}으로 전환 성공")
except Exception as e:
self.log_debug(f"{target_frame} 전환 실패: {e}")
def _save_screenshot(self, name: str):
"""스크린샷 저장"""
try:
filename = f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
filepath = os.path.join(self.project_dir, 'debug', filename)
self.driver.save_screenshot(filepath)
self.log_debug(f"스크린샷 저장: {filename}")
except:
pass
def return_to_blog_list(self):
"""블로그 목록으로 돌아가기"""
try:
self._ensure_correct_frame(None)
self._ensure_correct_frame("entryIframe")
self.log_debug("블로그 목록으로 복귀")
except Exception as e:
self.log_error(f"블로그 목록 복귀 중 오류: {e}")