diff --git a/.gitignore b/.gitignore index 5d381cc..d33bc9a 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,4 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +data/ \ No newline at end of file diff --git a/main.py b/main.py index 8a2d531..9798ab8 100644 --- a/main.py +++ b/main.py @@ -1,22 +1,105 @@ import os, json import asyncio import playwright +import platform from playwright.async_api import async_playwright +from selenium_crawler.logger_utils import setup_logger, config +from selenium_crawler.naver_integrated_crawler import NaverIntegratedCrawler +from selenium_crawler.naver_blog_crawler import ImageFilterConfig + import asyncio -HOME_PATH = "/home/azureuser" -async def playwright_test(): - async with async_playwright() as p: - browser = await p.chromium.launch() - page = await browser.new_page() - await page.goto("https://playwright.dev") - print(await page.title()) - await browser.close() - pass + +async def main(): + """메인 함수""" + print("=== 네이버 지도 + 블로그 통합 크롤러 (리팩토링 버전) ===\n") + + # 설정 파일에서 값 읽기 + print("설정 파일에서 크롤러 설정을 로드합니다...") + + # 이미지 필터 설정 + image_filter_config = config.get('image_filter', {}) + image_filter = ImageFilterConfig( + min_width=image_filter_config.get('min_width', 400), + min_height=image_filter_config.get('min_height', 400), + min_file_size_kb=image_filter_config.get('min_file_size_kb', 10), + max_file_size_mb=image_filter_config.get('max_file_size_mb', 10), + require_both_dimensions=image_filter_config.get('require_both_dimensions', False), + allowed_formats=set(image_filter_config.get('allowed_formats', ['.jpg', '.jpeg', '.png', '.webp'])) + ) + + print(f"\n이미지 필터 설정:") + print(f" - 최소 크기: {image_filter.min_width}x{image_filter.min_height} px") + print(f" - 파일 크기: {image_filter.min_file_size_kb}KB ~ {image_filter.max_file_size_mb}MB") + print(f" - 허용 포맷: {', '.join(sorted(image_filter.allowed_formats))}") + print(f" - 크기 조건: {'AND' if image_filter.require_both_dimensions else 'OR'}") + + # 프로젝트 디렉토리 설정 + if platform.system() == 'Windows': + project_dir = config.get('paths.project_dir_windows', r'C:\CrawlingData') + else: + project_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') + + # 크롤러 초기화 + crawler = NaverIntegratedCrawler(project_dir=project_dir, image_filter=image_filter) + print(f"\n프로젝트 디렉토리: {project_dir}") + + try: + crawler.setup_driver() + + # 검색어 입력 (설정 파일의 기본값 사용) + default_query = config.get('crawler.default_search_query', '선돌막국수') + search_query = input(f"\n검색할 장소를 입력하세요 (기본값: {default_query}): ").strip() + if not search_query: + search_query = default_query + + # 블로그 수 입력 (설정 파일의 기본값 사용) + default_max_blogs = config.get('crawler.max_blogs', 1) + max_blogs_str = input(f"수집할 블로그 수를 입력하세요 (기본값: {default_max_blogs}): ").strip() + max_blogs = int(max_blogs_str) if max_blogs_str.isdigit() else default_max_blogs + + # 설정 확인 + print(f"\n=== 크롤링 설정 확인 ===") + print(f"검색어: {search_query}") + print(f"수집할 블로그 수: {max_blogs}") + + proceed = input("\n위 설정으로 진행하시겠습니까? (Y/n): ").strip().lower() + if proceed == 'n': + print("크롤링을 취소합니다.") + return + + # 통합 수집 실행 + max_blogs =1 + results = crawler.collect_all_info(search_query, max_blogs) + + print(f"\n=== 수집 완료 ===") + print(f"장소 정보: {results['place_info'].get('name', 'N/A')}") + print(f"수집한 사진: {len(results['photo_urls'])}개") + print(f"지도 블로그 리뷰: {len(results['map_blog_reviews'])}개") + print(f"상세 블로그 정보: {results['total_blogs_visited']}개") + + # 이미지 다운로드 통계 출력 + if 'image_download_info' in results: + dl_info = results['image_download_info'] + print(f"\n=== 이미지 다운로드 통계 ===") + print(f"총 처리: {dl_info['total_images']}개") + print(f"다운로드 성공: {dl_info['downloaded']}개") + if 'statistics' in dl_info: + stats = dl_info['statistics'] + print(f" - 신규 저장: {stats['success']}개") + print(f" - 기존 파일: {stats['file_exists']}개") + print(f" - 중복 제거: {stats['duplicate']}개") + print(f" - 크기 필터링: {stats['size_filtered']}개") + + except Exception as e: + print(f"오류 발생: {e}") + import traceback + traceback.print_exc() + + finally: + input("\n엔터를 누르면 브라우저를 종료합니다...") + crawler.close() -asyncio.run(playwright_test()) - -if __name__=="__main__": - with open(os.path.join(HOME_PATH, "success.txt"), "w") as fp: - json.dump("", fp) +if __name__ == "__main__": + asyncio.run(main()) diff --git a/requirements.txt b/requirements.txt index 4f3e4ad..56a4fe5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,8 @@ playwright dotenv +selenium +requests +pillow azure-identity azure-mgmt-resource azure-mgmt-compute \ No newline at end of file diff --git a/selenium_crawler/config.json b/selenium_crawler/config.json new file mode 100644 index 0000000..b98616b --- /dev/null +++ b/selenium_crawler/config.json @@ -0,0 +1,51 @@ +{ + "crawler": { + "max_blogs": 2, + "default_search_query": "선돌막국수" + }, + "image_filter": { + "min_width": 400, + "min_height": 400, + "min_file_size_kb": 10, + "max_file_size_mb": 10, + "require_both_dimensions": false, + "allowed_formats": [ + ".jpg", + ".jpeg", + ".png", + ".webp" + ] + }, + "paths": { + "project_dir_linux": "/data/crawler" + }, + "webdriver": { + "headless": false, + "window_size": "1920,1080", + "page_load_timeout": 20, + "implicit_wait": 5, + "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + }, + "loading_delays": { + "initial_page_load": 3, + "after_search": 5, + "iframe_wait": 2, + "search_result_wait": 3, + "place_detail_wait": 3, + "dynamic_wait": { + "enabled": true, + "check_interval": 0.5, + "max_wait": 15, + "elements_to_check": [ + ".GHAhO", + ".place_name", + "span.LDgIH" + ] + } + }, + "logging": { + "console_level": "INFO", + "file_level": "DEBUG", + "log_format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + } +} \ No newline at end of file diff --git a/selenium_crawler/config_manager.py b/selenium_crawler/config_manager.py new file mode 100644 index 0000000..f3234e1 --- /dev/null +++ b/selenium_crawler/config_manager.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +설정 관리 도구 +config.json 파일을 쉽게 편집할 수 있는 도구 +""" + +import json +from pathlib import Path +from logger_utils import config + + +def display_config(): + """현재 설정 표시""" + print("\n=== 현재 설정 ===") + current_config = config.get_all() + print(json.dumps(current_config, indent=2, ensure_ascii=False)) + + +def edit_crawler_settings(): + """크롤러 설정 편집""" + print("\n=== 크롤러 설정 편집 ===") + + # 현재 값 표시 + max_blogs = config.get('crawler.max_blogs', 5) + default_query = config.get('crawler.default_search_query', '선돌막국수') + + print(f"현재 설정:") + print(f" - 최대 블로그 수: {max_blogs}") + print(f" - 기본 검색어: {default_query}") + + # 새 값 입력 + new_max_blogs = input(f"\n새로운 최대 블로그 수 (현재: {max_blogs}, Enter로 유지): ").strip() + if new_max_blogs.isdigit(): + config.update('crawler.max_blogs', int(new_max_blogs)) + print(f"✓ 최대 블로그 수를 {new_max_blogs}로 변경했습니다.") + + new_default_query = input(f"새로운 기본 검색어 (현재: {default_query}, Enter로 유지): ").strip() + if new_default_query: + config.update('crawler.default_search_query', new_default_query) + print(f"✓ 기본 검색어를 '{new_default_query}'로 변경했습니다.") + + +def edit_image_filter_settings(): + """이미지 필터 설정 편집""" + print("\n=== 이미지 필터 설정 편집 ===") + + # 현재 값 표시 + image_filter = config.get('image_filter', {}) + + print(f"현재 설정:") + print(f" - 최소 가로: {image_filter.get('min_width', 400)}px") + print(f" - 최소 세로: {image_filter.get('min_height', 400)}px") + print(f" - 최소 파일 크기: {image_filter.get('min_file_size_kb', 10)}KB") + print(f" - 최대 파일 크기: {image_filter.get('max_file_size_mb', 10)}MB") + print(f" - 크기 조건: {'AND' if image_filter.get('require_both_dimensions', False) else 'OR'}") + print(f" - 허용 포맷: {', '.join(image_filter.get('allowed_formats', []))}") + + # 새 값 입력 + new_min_width = input(f"\n새로운 최소 가로 (현재: {image_filter.get('min_width', 400)}, Enter로 유지): ").strip() + if new_min_width.isdigit(): + config.update('image_filter.min_width', int(new_min_width)) + print(f"✓ 최소 가로를 {new_min_width}px로 변경했습니다.") + + new_min_height = input(f"새로운 최소 세로 (현재: {image_filter.get('min_height', 400)}, Enter로 유지): ").strip() + if new_min_height.isdigit(): + config.update('image_filter.min_height', int(new_min_height)) + print(f"✓ 최소 세로를 {new_min_height}px로 변경했습니다.") + + new_min_size = input(f"새로운 최소 파일 크기 KB (현재: {image_filter.get('min_file_size_kb', 10)}, Enter로 유지): ").strip() + if new_min_size.isdigit(): + config.update('image_filter.min_file_size_kb', int(new_min_size)) + print(f"✓ 최소 파일 크기를 {new_min_size}KB로 변경했습니다.") + + new_max_size = input(f"새로운 최대 파일 크기 MB (현재: {image_filter.get('max_file_size_mb', 10)}, Enter로 유지): ").strip() + if new_max_size.isdigit(): + config.update('image_filter.max_file_size_mb', int(new_max_size)) + print(f"✓ 최대 파일 크기를 {new_max_size}MB로 변경했습니다.") + + both_dimensions = input("가로와 세로 모두 조건 만족 필요? (y/N): ").strip().lower() + if both_dimensions in ['y', 'n']: + config.update('image_filter.require_both_dimensions', both_dimensions == 'y') + print(f"✓ 크기 조건을 {'AND' if both_dimensions == 'y' else 'OR'}로 변경했습니다.") + + +def edit_webdriver_settings(): + """웹드라이버 설정 편집""" + print("\n=== 웹드라이버 설정 편집 ===") + + # 현재 값 표시 + webdriver = config.get('webdriver', {}) + + print(f"현재 설정:") + print(f" - Headless 모드: {webdriver.get('headless', False)}") + print(f" - 창 크기: {webdriver.get('window_size', '1920,1080')}") + print(f" - 페이지 로드 타임아웃: {webdriver.get('page_load_timeout', 30)}초") + print(f" - Implicit Wait: {webdriver.get('implicit_wait', 10)}초") + + # Headless 모드 + headless = input("\nHeadless 모드 사용? (y/N): ").strip().lower() + if headless in ['y', 'n']: + config.update('webdriver.headless', headless == 'y') + print(f"✓ Headless 모드를 {'활성화' if headless == 'y' else '비활성화'}했습니다.") + + # 타임아웃 설정 + new_timeout = input(f"새로운 페이지 로드 타임아웃 초 (현재: {webdriver.get('page_load_timeout', 30)}, Enter로 유지): ").strip() + if new_timeout.isdigit(): + config.update('webdriver.page_load_timeout', int(new_timeout)) + print(f"✓ 페이지 로드 타임아웃을 {new_timeout}초로 변경했습니다.") + + +def main(): + """메인 함수""" + print("=== 네이버 크롤러 설정 관리 도구 ===") + + while True: + print("\n메뉴:") + print("1. 현재 설정 보기") + print("2. 크롤러 설정 편집") + print("3. 이미지 필터 설정 편집") + print("4. 웹드라이버 설정 편집") + print("5. 설정 저장 및 종료") + print("0. 저장하지 않고 종료") + + choice = input("\n선택: ").strip() + + if choice == '1': + display_config() + elif choice == '2': + edit_crawler_settings() + elif choice == '3': + edit_image_filter_settings() + elif choice == '4': + edit_webdriver_settings() + elif choice == '5': + config.save_config() + print("\n설정이 저장되었습니다.") + break + elif choice == '0': + print("\n설정을 저장하지 않고 종료합니다.") + break + else: + print("\n잘못된 선택입니다.") + + +if __name__ == "__main__": + main() diff --git a/selenium_crawler/logger_utils.py b/selenium_crawler/logger_utils.py new file mode 100644 index 0000000..0969649 --- /dev/null +++ b/selenium_crawler/logger_utils.py @@ -0,0 +1,315 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +로깅 유틸리티 모듈 +네이버 크롤러에서 사용하는 공통 로깅 기능 +""" + +import os +import sys +import json +import logging +from datetime import datetime +from typing import Optional, Dict, Any +from pathlib import Path + + +class Config: + """설정 파일 관리 클래스""" + + _instance = None + _config = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(Config, cls).__new__(cls) + return cls._instance + + def __init__(self): + if self._config is None: + self.load_config() + + def load_config(self, config_path: str = None): + """설정 파일 로드""" + if config_path is None: + # 현재 스크립트 위치에서 config.json 찾기 + current_dir = Path(__file__).parent + config_path = current_dir / 'config.json' + + try: + with open(config_path, 'r', encoding='utf-8') as f: + self._config = json.load(f) + print(f"설정 파일 로드 완료: {config_path}") + except FileNotFoundError: + print(f"설정 파일을 찾을 수 없습니다: {config_path}") + print("기본 설정을 사용합니다.") + self._config = self._get_default_config() + except json.JSONDecodeError as e: + print(f"설정 파일 파싱 오류: {e}") + print("기본 설정을 사용합니다.") + self._config = self._get_default_config() + + def _get_default_config(self) -> Dict[str, Any]: + """기본 설정 반환""" + return { + "crawler": { + "max_blogs": 5, + "default_search_query": "선돌막국수" + }, + "image_filter": { + "min_width": 400, + "min_height": 400, + "min_file_size_kb": 10, + "max_file_size_mb": 10, + "require_both_dimensions": False, + "allowed_formats": [".jpg", ".jpeg", ".png", ".webp"] + }, + "paths": { + "project_dir_linux": "/data/crawling" + }, + "webdriver": { + "headless": False, + "window_size": "1920,1080", + "page_load_timeout": 30, + "implicit_wait": 10 + }, + "logging": { + "console_level": "INFO", + "file_level": "DEBUG", + "log_format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + } + } + + def get(self, key_path: str, default=None): + """ + 점 표기법으로 중첩된 설정값 가져오기 + 예: config.get('crawler.max_blogs') + """ + keys = key_path.split('.') + value = self._config + + for key in keys: + if isinstance(value, dict) and key in value: + value = value[key] + else: + return default + + return value + + def get_all(self) -> Dict[str, Any]: + """전체 설정 반환""" + return self._config.copy() + + def save_config(self, config_path: str = None): + """설정 파일 저장""" + if config_path is None: + current_dir = Path(__file__).parent + config_path = current_dir / 'config.json' + + try: + with open(config_path, 'w', encoding='utf-8') as f: + json.dump(self._config, f, ensure_ascii=False, indent=2) + print(f"설정 파일 저장 완료: {config_path}") + except Exception as e: + print(f"설정 파일 저장 실패: {e}") + + def update(self, key_path: str, value: Any): + """설정값 업데이트""" + keys = key_path.split('.') + config = self._config + + for key in keys[:-1]: + if key not in config: + config[key] = {} + config = config[key] + + config[keys[-1]] = value + + +# 전역 설정 인스턴스 +config = Config() + + +def setup_logger( + name: str, + log_dir: str, + log_prefix: str = "log", + console_level: int = None, + file_level: int = None +) -> logging.Logger: + """ + 로거 설정 함수 + Args: + name: 로거 이름 + log_dir: 로그 파일 저장 디렉토리 + log_prefix: 로그 파일명 접두사 + console_level: 콘솔 출력 로그 레벨 (None이면 설정에서 읽음) + file_level: 파일 출력 로그 레벨 (None이면 설정에서 읽음) + + Returns: + 설정된 로거 객체 + """ + # 설정에서 로그 레벨 가져오기 + if console_level is None: + console_level_str = config.get('logging.console_level', 'INFO') + console_level = getattr(logging, console_level_str) + + if file_level is None: + file_level_str = config.get('logging.file_level', 'DEBUG') + file_level = getattr(logging, file_level_str) + + # 로그 디렉토리 생성 + os.makedirs(log_dir, exist_ok=True) + + # 로그 포맷 설정 + log_format = config.get('logging.log_format', + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + # 로그 파일 경로 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = os.path.join(log_dir, f'{log_prefix}_{timestamp}.log') + + # 로거 생성 + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + + # 기존 핸들러 제거 (중복 방지) + logger.handlers.clear() + + # 파일 핸들러 + file_handler = logging.FileHandler(log_file, encoding='utf-8') + file_handler.setLevel(file_level) + file_handler.setFormatter(logging.Formatter(log_format)) + + # 콘솔 핸들러 + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(console_level) + console_handler.setFormatter(logging.Formatter(log_format)) + + # 핸들러 추가 + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + logger.info(f"로거 '{name}' 초기화 완료") + logger.info(f"로그 파일: {log_file}") + + return logger + + +def get_logger(name: str) -> logging.Logger: + return logging.getLogger(name) + + +class LoggerMixin: + def setup_logger( + self, + logger_name: Optional[str] = None, + log_dir: Optional[str] = None, + log_prefix: Optional[str] = None + ): + """ + 클래스용 로거 설정 + Args: + logger_name: 로거 이름 (기본값: 클래스명) + log_dir: 로그 디렉토리 (기본값: self.project_dir/logs) + log_prefix: 로그 파일 접두사 (기본값: 클래스명) + """ + if logger_name is None: + logger_name = self.__class__.__name__ + + if log_dir is None: + log_dir = os.path.join(getattr(self, 'project_dir', '.'), 'logs') + + if log_prefix is None: + log_prefix = self.__class__.__name__.lower() + + self.logger = setup_logger( + name=logger_name, + log_dir=log_dir, + log_prefix=log_prefix + ) + + def log_debug(self, message: str): + """디버그 로그""" + if hasattr(self, 'logger'): + self.logger.debug(message) + + def log_info(self, message: str): + """정보 로그""" + if hasattr(self, 'logger'): + self.logger.info(message) + + def log_warning(self, message: str): + """경고 로그""" + if hasattr(self, 'logger'): + self.logger.warning(message) + + def log_error(self, message: str, exc_info: bool = False): + """에러 로그""" + if hasattr(self, 'logger'): + self.logger.error(message, exc_info=exc_info) + + def log_critical(self, message: str): + """치명적 에러 로그""" + if hasattr(self, 'logger'): + self.logger.critical(message) + + +def log_execution_time(logger: logging.Logger = None): + """ + 함수 실행 시간을 로깅하는 데코레이터 + + Args: + logger: 사용할 로거 (없으면 함수명으로 새로 생성) + """ + import functools + import time + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + nonlocal logger + if logger is None: + logger = logging.getLogger(func.__name__) + + start_time = time.time() + logger.debug(f"{func.__name__} 시작") + + try: + result = func(*args, **kwargs) + elapsed = time.time() - start_time + logger.info(f"{func.__name__} 완료 (실행시간: {elapsed:.2f}초)") + return result + except Exception as e: + elapsed = time.time() - start_time + logger.error(f"{func.__name__} 실패 (실행시간: {elapsed:.2f}초): {str(e)}") + raise + + return wrapper + return decorator + + +def log_exception(logger: logging.Logger = None): + """ + 예외를 로깅하는 데코레이터 + + Args: + logger: 사용할 로거 + """ + import functools + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + nonlocal logger + if logger is None: + logger = logging.getLogger(func.__name__) + + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(f"{func.__name__}에서 예외 발생: {str(e)}", exc_info=True) + raise + + return wrapper + return decorator diff --git a/selenium_crawler/naver_blog_crawler.py b/selenium_crawler/naver_blog_crawler.py new file mode 100644 index 0000000..05ffb34 --- /dev/null +++ b/selenium_crawler/naver_blog_crawler.py @@ -0,0 +1,576 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +네이버 블로그 크롤러 모듈 +블로그 콘텐츠 수집 및 이미지 다운로드 기능 +""" + +import os +import time +import re +import json +import requests +import hashlib +from datetime import datetime +from typing import List, Dict, Optional, Tuple, Set +from urllib.parse import urlparse, parse_qs, urlunparse, urlencode +from dataclasses import dataclass +from PIL import Image +from io import BytesIO + +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait + +from .logger_utils import LoggerMixin, log_execution_time, log_exception + + +@dataclass +class ImageFilterConfig: + """이미지 필터링 설정""" + min_width: int = 400 + min_height: int = 400 + min_file_size_kb: int = 10 # 최소 파일 크기 (KB) + max_file_size_mb: int = 10 # 최대 파일 크기 (MB) + allowed_formats: Set[str] = None + require_both_dimensions: bool = False # True면 width AND height 모두 조건 만족, False면 OR + + def __post_init__(self): + if self.allowed_formats is None: + self.allowed_formats = {'.jpg', '.jpeg', '.png', '.webp'} + + +def clean_image_url(url: str) -> str: + """ + 이미지 URL 정리 (불필요한 파라미터 제거) + + Args: + url: 원본 URL + + Returns: + 정리된 URL + """ + # autoRotate=true&type=w800& 제거 + url = url.replace("autoRotate=true&type=w800&", "") + url = url.replace("autoRotate=true&", "") + url = url.replace("type=w800&", "") + + # URL 파싱하여 파라미터 정리 + try: + parsed = urlparse(url) + params = parse_qs(parsed.query) + + # 불필요한 파라미터 제거 + params.pop('autoRotate', None) + + # type 파라미터 처리 + if 'type' in params: + # 작은 사이즈면 큰 사이즈로 변경 + if params['type'][0] in ['f', 'f120', 'f240', 'w240', 'w500']: + params['type'] = ['w800'] + + # 정리된 URL 재구성 + new_query = urlencode(params, doseq=True) + url = urlunparse(( + parsed.scheme, + parsed.netloc, + parsed.path, + parsed.params, + new_query, + parsed.fragment + )) + except: + pass + + return url + + +class NaverBlogCrawler(LoggerMixin): + """네이버 블로그 크롤러""" + + def __init__(self, driver: webdriver.Chrome, wait: WebDriverWait, project_dir: str, + image_filter: ImageFilterConfig = None): + """ + 초기화 + + Args: + driver: 웹드라이버 + wait: WebDriverWait 객체 + project_dir: 프로젝트 디렉토리 + image_filter: 이미지 필터 설정 + """ + self.driver = driver + self.wait = wait + self.project_dir = project_dir + self.image_filter = image_filter or ImageFilterConfig() + + # 중복 이미지 방지를 위한 해시 추적 + self.downloaded_hashes: Set[str] = set() + self.image_metadata: List[Dict] = [] + + # 로거 설정 + self.setup_logger(log_prefix='blog_crawler') + + @log_execution_time() + def visit_and_extract_blog(self, url: str, title: str, index: int) -> Optional[Dict]: + """ + 블로그 방문 및 내용 추출 + + Args: + url: 블로그 URL + title: 블로그 제목 + index: 인덱스 + + Returns: + 블로그 정보 딕셔너리 + """ + blog_info = { + 'index': index, + 'title': title, + 'blog_title': '', + 'url': url, + 'content': '', + 'images': [], + 'extracted_at': datetime.now().isoformat(), + 'is_ad': False, + 'ad_phrases': [] + } + + try: + # 새 탭 열기 + original_window = self.driver.current_window_handle + self.driver.execute_script("window.open('');") + self.driver.switch_to.window(self.driver.window_handles[-1]) + + # 블로그 페이지 로드 + self.driver.get(url) + time.sleep(3) + + # 페이지 타이틀 추출 + try: + blog_info['blog_title'] = self.driver.title + except: + blog_info['blog_title'] = "타이틀 없음" + + # 네이버 블로그 iframe 처리 + if "blog.naver.com" in url: + try: + self.driver.switch_to.frame("mainFrame") + self.log_debug("네이버 블로그 mainFrame 전환 성공") + except: + pass + + # 본문 추출 + content_text = self._extract_blog_content() + if content_text: + blog_info['content'] = content_text[:2000] + + # 광고성 문구 검색 + ad_keywords = ['광고', '협찬', '제공받', '초대받', '스폰서', '파트너스', '수수료', '원고료'] + for keyword in ad_keywords: + if keyword in content_text: + blog_info['is_ad'] = True + idx = content_text.find(keyword) + start = max(0, idx - 30) + end = min(len(content_text), idx + 30) + blog_info['ad_phrases'].append(content_text[start:end].strip()) + + # 이미지 URL 수집 + blog_info['images'] = self._collect_blog_images() + + # 스크린샷 저장 + self._save_screenshot(f"blog_{index}") + + except Exception as e: + self.log_error(f"블로그 내용 추출 중 오류: {e}") + + finally: + # 탭 닫고 원래 창으로 돌아가기 + self.driver.close() + self.driver.switch_to.window(original_window) + + return blog_info + + def _extract_blog_content(self) -> str: + """블로그 본문 추출""" + content_selectors = [ + ".se-main-container", + "div.se-component", + "div[class*='postViewArea']", + "#postViewArea", + ".post-view", + ".article-view", + ".entry-content", + "article", + "main", + "[role='article']", + ".content", + "#content" + ] + + content_text = "" + for selector in content_selectors: + try: + elements = self.driver.find_elements(By.CSS_SELECTOR, selector) + for elem in elements: + text = elem.text.strip() + if len(text) > 100: + content_text += text + "\n\n" + if len(content_text) > 2000: + break + except: + continue + + return content_text + + def _collect_blog_images(self) -> List[str]: + """블로그 내 이미지 URL 수집""" + image_urls = [] + + try: + img_selectors = [ + "img[src*='blogfiles']", + "img[src*='postfiles']", + "img[src*='phinf']", + ".se-image img", + ".post-view img", + "article img", + ".content img" + ] + + seen_urls = set() + + for selector in img_selectors: + images = self.driver.find_elements(By.CSS_SELECTOR, selector) + for img in images: + try: + src = img.get_attribute("src") + if (src and src.startswith("http") and + src not in seen_urls and + not any(skip in src.lower() for skip in ['icon', 'emoji', 'button', 'logo'])): + + original_url = clean_image_url(src) + seen_urls.add(original_url) + image_urls.append(original_url) + + if len(image_urls) >= 10: + break + except: + continue + + self.log_debug(f"블로그에서 {len(image_urls)}개 이미지 URL 수집") + + except Exception as e: + self.log_error(f"이미지 수집 중 오류: {e}") + + return image_urls + + @log_execution_time() + def download_all_images(self, results: Dict) -> Dict: + """ + 모든 이미지 다운로드 + + Args: + results: 크롤링 결과 딕셔너리 + + Returns: + 다운로드 정보 딕셔너리 + """ + self.log_info("\n=== 이미지 다운로드 시작 ===") + + # 타임스탬프로 폴더 생성 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + download_dir = os.path.join(self.project_dir, 'images', f"{results['query']}_{timestamp}") + os.makedirs(download_dir, exist_ok=True) + + all_urls = [] + + # 사진 탭에서 수집한 URL + photo_urls = [(url, f"photo_{i+1}") for i, url in enumerate(results.get('photo_urls', []))] + all_urls.extend(photo_urls) + + # 블로그에서 수집한 URL + for blog_idx, blog in enumerate(results.get('detailed_blogs', [])): + for img_idx, url in enumerate(blog.get('images', [])): + all_urls.append((url, f"blog{blog_idx+1}_img{img_idx+1}")) + + # 중복 제거 + unique_urls = {} + for url, prefix in all_urls: + cleaned_url = clean_image_url(url) + if cleaned_url not in unique_urls: + unique_urls[cleaned_url] = prefix + + self.log_info(f"총 {len(unique_urls)}개의 고유 이미지 다운로드 예정") + self.log_info(f"필터 조건: {self._get_filter_description()}") + + # 통계 추적 + stats = { + 'success': 0, + 'duplicate': 0, + 'size_filtered': 0, + 'file_size_filtered': 0, + 'format_filtered': 0, + 'network_error': 0, + 'other_error': 0, + 'file_exists': 0 + } + + # 다운로드 실행 + for url, prefix in unique_urls.items(): + success, reason = self._download_image(url, download_dir, prefix, stats['success'] + 1) + + # 통계 업데이트 + if success: + if reason == "파일존재": + stats['file_exists'] += 1 + else: + stats['success'] += 1 + else: + if "중복" in reason: + stats['duplicate'] += 1 + elif "크기부족" in reason: + stats['size_filtered'] += 1 + elif "파일크기" in reason: + stats['file_size_filtered'] += 1 + elif "포맷" in reason: + stats['format_filtered'] += 1 + elif "네트워크" in reason: + stats['network_error'] += 1 + else: + stats['other_error'] += 1 + + # 메타데이터 저장 + self._save_image_metadata(download_dir) + + # 통계 로깅 + total_saved = stats['success'] + stats['file_exists'] + total_filtered = stats['duplicate'] + stats['size_filtered'] + stats['file_size_filtered'] + stats['format_filtered'] + + self.log_info(f"✓ 이미지 다운로드 완료:") + self.log_info(f" 저장: {total_saved}개 (신규: {stats['success']}개, 기존: {stats['file_exists']}개)") + self.log_info(f" 필터링: {total_filtered}개") + self.log_info(f"저장 위치: {download_dir}") + + # 다운로드 정보 반환 + return { + 'download_dir': download_dir, + 'total_images': len(unique_urls), + 'downloaded': total_saved, + 'statistics': stats, + 'filter_config': { + 'min_width': self.image_filter.min_width, + 'min_height': self.image_filter.min_height, + 'min_file_size_kb': self.image_filter.min_file_size_kb, + 'max_file_size_mb': self.image_filter.max_file_size_mb, + 'allowed_formats': list(self.image_filter.allowed_formats), + 'require_both_dimensions': self.image_filter.require_both_dimensions + }, + 'timestamp': timestamp + } + + def _download_image(self, url: str, save_dir: str, prefix: str, index: int) -> Tuple[bool, str]: + """개별 이미지 다운로드""" + try: + # 파일 확장자 추출 + parsed_url = urlparse(url) + path = parsed_url.path + ext = os.path.splitext(path)[1].lower() + + # 확장자가 없거나 이상한 경우 기본값 + if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']: + ext = '.jpg' + + # 파일명 생성 + filename = f"{prefix}_{index:03d}{ext}" + filepath = os.path.join(save_dir, filename) + + # 이미 존재하면 스킵 + if os.path.exists(filepath): + self.log_debug(f"이미 존재: {filename}") + return True, "파일존재" + + # 다운로드 + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Referer': 'https://map.naver.com/' + } + + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + + # 이미지 해시 계산 (중복 체크용) + image_hash = self._calculate_image_hash(response.content) + + # 중복 이미지 체크 + if self._is_duplicate_image(image_hash): + self.log_debug(f"중복 이미지 스킵: {filename}") + return False, "중복" + + # 고급 이미지 필터링 + is_valid, width, height, reason = self._check_image_advanced(response.content) + + if not is_valid: + self.log_debug(f"필터링 제외: {filename} - {reason}") + return False, reason + + # 이미지 저장 + with open(filepath, 'wb') as f: + f.write(response.content) + + # 중복 방지를 위해 해시 추가 + self.downloaded_hashes.add(image_hash) + + # 메타데이터 기록 + self._add_image_metadata( + filename=filename, + url=url, + width=width, + height=height, + file_size=len(response.content), + image_hash=image_hash, + download_time=datetime.now().isoformat() + ) + + self.log_debug(f"다운로드 성공: {filename} ({width}x{height}, {len(response.content)/1024:.1f}KB)") + return True, "성공" + + except requests.exceptions.RequestException as e: + self.log_warning(f"다운로드 실패: {url[:50]}... - {str(e)}") + return False, f"네트워크오류: {str(e)}" + except Exception as e: + self.log_error(f"이미지 저장 중 오류: {str(e)}") + return False, f"저장오류: {str(e)}" + + def _check_image_advanced(self, image_data: bytes) -> Tuple[bool, int, int, str]: + """고급 이미지 필터링 검사""" + try: + with Image.open(BytesIO(image_data)) as img: + width, height = img.size + format_ext = f".{img.format.lower()}" if img.format else ".unknown" + + # 파일 크기 체크 (KB) + file_size_kb = len(image_data) / 1024 + file_size_mb = file_size_kb / 1024 + + # 크기 조건 체크 + if self.image_filter.require_both_dimensions: + size_ok = width >= self.image_filter.min_width and height >= self.image_filter.min_height + else: + size_ok = width >= self.image_filter.min_width or height >= self.image_filter.min_height + + # 파일 크기 조건 체크 + file_size_ok = (self.image_filter.min_file_size_kb <= file_size_kb <= + self.image_filter.max_file_size_mb * 1024) + + # 포맷 조건 체크 + format_ok = format_ext in self.image_filter.allowed_formats + + is_valid = size_ok and file_size_ok and format_ok + + # 실패 이유 반환 + reasons = [] + if not size_ok: + reasons.append(f"크기부족: {width}x{height}") + if not file_size_ok: + reasons.append(f"파일크기: {file_size_kb:.1f}KB") + if not format_ok: + reasons.append(f"포맷: {format_ext}") + + failure_reason = ", ".join(reasons) if reasons else "통과" + + return is_valid, width, height, failure_reason + + except Exception as e: + self.log_debug(f"이미지 분석 실패: {e}") + return False, 0, 0, f"분석실패: {str(e)}" + + def _calculate_image_hash(self, image_data: bytes) -> str: + """이미지 데이터의 MD5 해시값 계산""" + return hashlib.md5(image_data).hexdigest() + + def _is_duplicate_image(self, image_hash: str) -> bool: + """중복 이미지인지 확인""" + return image_hash in self.downloaded_hashes + + def _add_image_metadata(self, filename: str, url: str, width: int, height: int, + file_size: int, image_hash: str, download_time: str): + """이미지 메타데이터 추가""" + metadata = { + 'filename': filename, + 'original_url': url, + 'width': width, + 'height': height, + 'file_size_bytes': file_size, + 'file_size_kb': round(file_size / 1024, 2), + 'hash': image_hash, + 'download_time': download_time, + 'format': os.path.splitext(filename)[1].lower() + } + self.image_metadata.append(metadata) + + def _save_image_metadata(self, download_dir: str): + """이미지 메타데이터를 JSON 파일로 저장""" + if not self.image_metadata: + return + + metadata_file = os.path.join(download_dir, 'image_metadata.json') + + metadata_summary = { + 'collection_info': { + 'total_images': len(self.image_metadata), + 'collection_time': datetime.now().isoformat(), + 'filter_config': { + 'min_width': self.image_filter.min_width, + 'min_height': self.image_filter.min_height, + 'min_file_size_kb': self.image_filter.min_file_size_kb, + 'max_file_size_mb': self.image_filter.max_file_size_mb, + 'allowed_formats': list(self.image_filter.allowed_formats), + 'require_both_dimensions': self.image_filter.require_both_dimensions + } + }, + 'statistics': { + 'total_file_size_mb': round(sum(img['file_size_bytes'] for img in self.image_metadata) / 1024 / 1024, 2), + 'average_width': round(sum(img['width'] for img in self.image_metadata) / len(self.image_metadata), 1), + 'average_height': round(sum(img['height'] for img in self.image_metadata) / len(self.image_metadata), 1), + 'format_distribution': self._get_format_distribution() + }, + 'images': self.image_metadata + } + + with open(metadata_file, 'w', encoding='utf-8') as f: + json.dump(metadata_summary, f, ensure_ascii=False, indent=2) + + self.log_info(f"메타데이터 저장: {metadata_file}") + + def _get_format_distribution(self) -> Dict[str, int]: + """이미지 포맷 분포 계산""" + formats = {} + for img in self.image_metadata: + fmt = img['format'] + formats[fmt] = formats.get(fmt, 0) + 1 + return formats + + def _get_filter_description(self) -> str: + """필터 설정 설명 반환""" + conditions = [] + + if self.image_filter.require_both_dimensions: + conditions.append(f"크기 {self.image_filter.min_width}x{self.image_filter.min_height} 이상(AND)") + else: + conditions.append(f"가로 {self.image_filter.min_width}px 또는 세로 {self.image_filter.min_height}px 이상(OR)") + + conditions.append(f"파일크기 {self.image_filter.min_file_size_kb}KB~{self.image_filter.max_file_size_mb}MB") + conditions.append(f"포맷 {', '.join(sorted(self.image_filter.allowed_formats))}") + + return " | ".join(conditions) + + def _save_screenshot(self, name: str): + """스크린샷 저장""" + try: + filename = f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" + filepath = os.path.join(self.project_dir, 'debug', filename) + self.driver.save_screenshot(filepath) + self.log_debug(f"스크린샷 저장: {filename}") + except: + pass diff --git a/selenium_crawler/naver_integrated_crawler.py b/selenium_crawler/naver_integrated_crawler.py new file mode 100644 index 0000000..2622479 --- /dev/null +++ b/selenium_crawler/naver_integrated_crawler.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +네이버 지도 + 블로그 통합 크롤러 (리팩토링 버전) +메인 실행 파일 +""" + +import os +import sys +import platform +import json +import time +from datetime import datetime +from typing import Dict + +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.ui import WebDriverWait + +from .logger_utils import setup_logger, config +from .naver_map_crawler import NaverMapCrawler +from .naver_blog_crawler import NaverBlogCrawler, ImageFilterConfig + + +class NaverIntegratedCrawler: + def __init__(self, project_dir: str = r'/home/jhyeu/workspace/data', + image_filter: ImageFilterConfig = None): + """ + 초기화 + Args: + project_dir: 프로젝트 디렉토리 + image_filter: 이미지 필터 설정 + """ + self.project_dir = project_dir + self.image_filter = image_filter or ImageFilterConfig() + + # 디렉토리 생성 + os.makedirs(os.path.join(self.project_dir, 'debug'), exist_ok=True) + os.makedirs(os.path.join(self.project_dir, 'data'), exist_ok=True) + os.makedirs(os.path.join(self.project_dir, 'logs'), exist_ok=True) + os.makedirs(os.path.join(self.project_dir, 'images'), exist_ok=True) + + # 로거 설정 + self.logger = setup_logger( + 'NaverIntegratedCrawler', + os.path.join(self.project_dir, 'logs'), + 'integrated_crawler' + ) + + self.driver = None + self.wait = None + self.map_crawler = None + self.blog_crawler = None + + def setup_driver(self): + """웹드라이버 설정""" + self.logger.info("웹드라이버 설정 시작...") + + # 설정 파일에서 웹드라이버 설정 읽기 + webdriver_config = config.get('webdriver', {}) + + chrome_options = Options() + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--disable-blink-features=AutomationControlled') + chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) + chrome_options.add_experimental_option('useAutomationExtension', False) + + # 추가 안정성 옵션들 + chrome_options.add_argument('--disable-gpu') + chrome_options.add_argument(f'--window-size={webdriver_config.get("window_size", "1920,1080")}') + chrome_options.add_argument('--disable-web-security') + chrome_options.add_argument('--allow-running-insecure-content') + + # User-Agent 설정 + user_agent = webdriver_config.get('user_agent', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') + chrome_options.add_argument(f'--user-agent={user_agent}') + + # 디버깅을 위해 headless 모드 설정 + headless = webdriver_config.get('headless', False) + if headless or (platform.system() == 'Linux' and not os.environ.get('DISPLAY')): + chrome_options.add_argument('--headless') + self.logger.info(" - Headless 모드로 실행") + else: + self.logger.info(" - GUI 모드로 실행") + + self.driver = webdriver.Chrome(options=chrome_options) + + # 페이지 로드 타임아웃 설정 + page_load_timeout = webdriver_config.get('page_load_timeout', 30) + implicit_wait = webdriver_config.get('implicit_wait', 10) + + self.driver.set_page_load_timeout(page_load_timeout) + self.driver.implicitly_wait(implicit_wait) + + self.wait = WebDriverWait(self.driver, 30) # 20 -> 30초로 증가 + #self.driver.maximize_window() + + # 자동화 감지 방지 + self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + + # 크롤러 인스턴스 생성 + self.map_crawler = NaverMapCrawler(self.driver, self.wait, self.project_dir) + self.blog_crawler = NaverBlogCrawler(self.driver, self.wait, self.project_dir, self.image_filter) + + self.logger.info(" ✓ 웹드라이버 초기화 완료\n") + + def collect_all_info(self, query: str = "선돌막국수", max_blogs: int = 5) -> Dict: + """ + 지도 정보와 블로그 정보 통합 수집 + + Args: + query: 검색어 + max_blogs: 최대 블로그 수집 개수 + + Returns: + 수집 결과 딕셔너리 + """ + self.logger.info(f"=== {query} 통합 정보 수집 시작 ===\n") + + results = { + 'query': query, + 'collection_time': datetime.now().isoformat(), + 'place_info': {}, + 'map_blog_reviews': [], + 'detailed_blogs': [], + 'photo_urls': [], + 'total_blogs_visited': 0 + } + + try: + # 1. 네이버 지도 검색 + if not self.map_crawler.search_place(query): + self.logger.error("장소 검색 실패") + return results + + # 2. 지도에서 기본 정보 수집 + place_info = self.map_crawler.collect_place_info() + if place_info: + results['place_info'] = place_info + self.logger.info(f"✓ 장소 기본 정보 수집 완료: {place_info.get('name', 'N/A')}") + + # 3. 사진 수집 + self.logger.info("\n=== 사진 수집 시작 ===") + photo_urls = self.map_crawler.collect_photo_urls() + results['photo_urls'] = photo_urls + self.logger.info(f"✓ 사진 {len(photo_urls)}개 수집 완료\n") + + # 4. 지도의 블로그 리뷰 수집 + map_reviews = self.map_crawler.collect_map_blog_reviews() + results['map_blog_reviews'] = map_reviews + self.logger.info(f"✓ 지도에서 블로그 리뷰 {len(map_reviews)}개 수집\n") + + # 5. 블로그 상세 정보 수집 + blog_links = self.map_crawler.collect_blog_links(max_blogs) + + if blog_links: + self.logger.info(f"✓ {len(blog_links)}개의 블로그 링크 수집 완료\n") + + # 각 블로그 방문 + for idx, (title, url) in enumerate(blog_links): + self.logger.info(f"\n[{idx+1}/{len(blog_links)}] 블로그 방문 중...") + self.logger.info(f"제목: {title}") + self.logger.info(f"URL: {url[:80]}...") + + blog_content = self.blog_crawler.visit_and_extract_blog(url, title, idx+1) + if blog_content: + results['detailed_blogs'].append(blog_content) + results['total_blogs_visited'] += 1 + self.logger.info(f"✓ 내용 수집 완료 (사진 {len(blog_content['images'])}개 포함)") + + # 다음 블로그를 위해 목록으로 돌아가기 + if idx < len(blog_links) - 1: + self.map_crawler.return_to_blog_list() + time.sleep(1) + + except Exception as e: + self.logger.error(f"\n오류 발생: {e}", exc_info=True) + + # 결과 저장 + self._save_integrated_results(results) + + return results + + def _save_integrated_results(self, results: Dict): + """통합 결과 저장""" + # JSON 저장 + json_file = os.path.join(self.project_dir, 'data', + f'integrated_result_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json') + + with open(json_file, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) + + # 이미지 다운로드 + download_info = self.blog_crawler.download_all_images(results) + results['image_download_info'] = download_info + + # 텍스트 보고서 작성 + self._generate_report(results) + + self.logger.info(f"\n=== 수집 완료 ===") + self.logger.info(f"JSON 파일: {json_file}") + + def _generate_report(self, results: Dict): + """텍스트 보고서 생성""" + report_file = os.path.join(self.project_dir, 'data', + f'integrated_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt') + + with open(report_file, 'w', encoding='utf-8') as f: + f.write(f"=== {results['query']} 통합 수집 결과 ===\n\n") + f.write(f"수집 일시: {results['collection_time']}\n\n") + + # 장소 정보 + f.write("=== 장소 기본 정보 ===\n") + place = results['place_info'] + if place: + f.write(f"상호명: {place.get('name', 'N/A')}\n") + f.write(f"주소: {place.get('address', 'N/A')}\n") + f.write(f"전화번호: {place.get('tel', 'N/A')}\n") + f.write(f"카테고리: {place.get('category', 'N/A')}\n") + f.write(f"영업시간: {place.get('business_hours', 'N/A')}\n") + f.write(f"홈페이지: {place.get('homepage', 'N/A')}\n") + f.write(f"설명: {place.get('description', 'N/A')}\n") + + # 사진 정보 + f.write(f"\n=== 사진 정보 ===\n") + f.write(f"수집한 사진 URL 수: {len(results['photo_urls'])}개\n") + for i, url in enumerate(results['photo_urls'][:5], 1): + f.write(f"{i}. {url}\n") + if len(results['photo_urls']) > 5: + f.write(f"... 외 {len(results['photo_urls'])-5}개\n") + + # 지도 블로그 리뷰 + f.write(f"\n=== 지도 블로그 리뷰 ===\n") + f.write(f"리뷰 수: {len(results['map_blog_reviews'])}개\n") + + ad_count = sum(1 for r in results['map_blog_reviews'] if r['is_ad']) + f.write(f"광고성 리뷰: {ad_count}개\n\n") + + for i, review in enumerate(results['map_blog_reviews'][:5], 1): + f.write(f"{i}. {review['title']}\n") + if review['is_ad']: + f.write(f" [광고] {', '.join(review['ad_phrases'])}\n") + f.write(f" 날짜: {review.get('date', 'N/A')}\n") + + # 상세 블로그 정보 + f.write(f"\n=== 상세 블로그 정보 ===\n") + f.write(f"방문한 블로그 수: {results['total_blogs_visited']}개\n\n") + + total_blog_images = 0 + for blog in results['detailed_blogs']: + f.write(f"\n[{blog['index']}번째 블로그]\n") + f.write(f"제목: {blog['title']}\n") + f.write(f"블로그 타이틀: {blog['blog_title']}\n") + f.write(f"URL: {blog['url']}\n") + if blog['is_ad']: + f.write(f"광고성 포스트: YES\n") + f.write(f"광고 문구: {', '.join(blog['ad_phrases'][:2])}\n") + f.write(f"블로그 내 이미지: {len(blog['images'])}개\n") + + if blog['images']: + f.write("이미지 URL:\n") + for i, img_url in enumerate(blog['images'][:3], 1): + f.write(f" {i}. {img_url}\n") + if len(blog['images']) > 3: + f.write(f" ... 외 {len(blog['images'])-3}개\n") + + f.write(f"내용 (일부):\n{blog['content'][:300]}...\n") + f.write("-" * 80 + "\n") + + total_blog_images += len(blog['images']) + + # 통계 + f.write(f"\n=== 수집 통계 ===\n") + f.write(f"총 수집된 이미지 URL: {len(results['photo_urls']) + total_blog_images}개\n") + f.write(f" - 사진탭: {len(results['photo_urls'])}개\n") + f.write(f" - 블로그: {total_blog_images}개\n") + + # 이미지 다운로드 정보 + if 'image_download_info' in results: + dl_info = results['image_download_info'] + f.write(f"\n=== 이미지 다운로드 ===\n") + f.write(f"다운로드 디렉토리: {dl_info.get('download_dir', 'N/A')}\n") + f.write(f"다운로드 성공: {dl_info.get('downloaded', 0)}개\n") + f.write(f"전체 처리: {dl_info.get('total_images', 0)}개\n") + + if 'statistics' in dl_info: + stats = dl_info['statistics'] + f.write(f"\n상세 통계:\n") + f.write(f" - 신규 저장: {stats.get('success', 0)}개\n") + f.write(f" - 기존 파일: {stats.get('file_exists', 0)}개\n") + f.write(f" - 중복 제거: {stats.get('duplicate', 0)}개\n") + f.write(f" - 크기 필터링: {stats.get('size_filtered', 0)}개\n") + f.write(f" - 파일크기 필터링: {stats.get('file_size_filtered', 0)}개\n") + f.write(f" - 포맷 필터링: {stats.get('format_filtered', 0)}개\n") + + # 광고성 포스트 통계 + ad_blogs = sum(1 for b in results['detailed_blogs'] if b['is_ad']) + f.write(f"\n광고성 포스트: {ad_blogs}/{len(results['detailed_blogs'])}개 ") + if results['detailed_blogs']: + f.write(f"({ad_blogs/len(results['detailed_blogs'])*100:.1f}%)\n") + + self.logger.info(f"보고서 저장: {report_file}") + + def close(self): + """리소스 정리""" + if self.driver: + self.driver.quit() + self.logger.info("웹드라이버 종료") diff --git a/selenium_crawler/naver_map_crawler.py b/selenium_crawler/naver_map_crawler.py new file mode 100644 index 0000000..8b715ce --- /dev/null +++ b/selenium_crawler/naver_map_crawler.py @@ -0,0 +1,573 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +네이버 지도 크롤러 모듈 +지도 검색 및 장소 정보 수집 기능 +""" + +import os +import time +import re +from datetime import datetime +from typing import List, Dict, Optional + +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.common.keys import Keys +from selenium.common.exceptions import TimeoutException, NoSuchElementException + +from .logger_utils import LoggerMixin, log_execution_time, log_exception, config + + +class NaverMapCrawler(LoggerMixin): + """네이버 지도 크롤러""" + + def __init__(self, driver: webdriver.Chrome, wait: WebDriverWait, project_dir: str): + """ + 초기화 + + Args: + driver: 웹드라이버 + wait: WebDriverWait 객체 + project_dir: 프로젝트 디렉토리 + """ + self.driver = driver + self.wait = wait + self.project_dir = project_dir + self.current_frame = None + + # 로거 설정 + self.setup_logger(log_prefix='map_crawler') + + @log_execution_time() + def search_place(self, query: str) -> bool: + """ + 장소 검색 + + Args: + query: 검색어 + + Returns: + 성공 여부 + """ + try: + # 로딩 지연 시간 설정 읽기 + loading_delays = config.get('loading_delays', {}) + + self.log_info(f"네이버 지도 접속 및 '{query}' 검색 시작...") + self.driver.get("https://map.naver.com/") + + # 초기 페이지 로드 대기 시간 증가 + initial_wait = loading_delays.get('initial_page_load', 10) + self.log_info(f"페이지 로드 대기: {initial_wait}초") + time.sleep(initial_wait) + self._save_screenshot("01_main_page") + + # 검색창 찾기 (재시도 로직 추가) + search_box = None + for attempt in range(3): + search_box = self._find_search_box() + if search_box: + break + self.log_warning(f"검색창 찾기 시도 {attempt + 1}/3 실패, 재시도...") + time.sleep(3) + + if not search_box: + self.log_error("검색창을 찾을 수 없습니다") + return False + + # 검색 수행 + self.log_info(f"검색어 입력: {query}") + search_box.clear() + time.sleep(1) + search_box.send_keys(query) + time.sleep(1) + search_box.send_keys(Keys.RETURN) + + # 검색 후 대기 시간 증가 + after_search_wait = loading_delays.get('after_search', 15) + self.log_info(f"검색 결과 로드 대기: {after_search_wait}초") + time.sleep(after_search_wait) + self._save_screenshot("02_search_result") + + # searchIframe 로드 대기 (시간 증가) + try: + iframe_wait = loading_delays.get('iframe_wait', 5) + self.wait.until(EC.presence_of_element_located((By.ID, "searchIframe"))) + self.log_info("searchIframe 로드 확인") + time.sleep(iframe_wait) + except: + self.log_warning("searchIframe을 찾을 수 없음") + + # 검색 결과 클릭 + self.log_info("검색 결과 클릭 시도...") + result_wait = loading_delays.get('search_result_wait', 10) + time.sleep(result_wait) + + if self._click_search_result(): + # entryIframe이 나타날 때까지 대기 (최대 10초) + try: + self.driver.switch_to.default_content() + WebDriverWait(self.driver, 10).until( + EC.presence_of_element_located((By.ID, "entryIframe")) + ) + self.log_info("entryIframe 로드 확인") + + # iframe으로 전환하고 장소명 요소 대기 + self.driver.switch_to.frame("entryIframe") + WebDriverWait(self.driver, 5).until( + EC.presence_of_element_located((By.CSS_SELECTOR, ".GHAhO, .place_name, h2.place_name")) + ) + self.driver.switch_to.default_content() + self.log_info("장소 상세 페이지 로드 완료") + + except TimeoutException: + self.log_warning("장소 상세 페이지 로드 타임아웃") + + self._save_screenshot("03_place_detail") + return True + else: + self.log_error("검색 결과 클릭 실패") + return False + + except Exception as e: + self.log_error(f"검색 중 오류: {e}", exc_info=True) + return False + + @log_exception() + def collect_place_info(self) -> Dict: + """ + 장소 기본 정보 수집 + + Returns: + 장소 정보 딕셔너리 + """ + place_info = { + 'name': '', + 'address': '', + 'tel': '', + 'category': '', + 'business_hours': '', + 'homepage': '', + 'description': '', + 'collected_at': datetime.now().isoformat() + } + + try: + # entryIframe으로 전환 + self._ensure_correct_frame("entryIframe") + # 동적 대기로 변경됨 + + # 정보 수집 매핑 + info_mappings = { + 'name': [".GHAhO", ".place_name", "h2.place_name", "._3XamX"], + 'address': ["span.LDgIH", ".address", "._2yqUQ", "span[class*='addr']"], + 'category': [".DJJvD", ".category", "._3ocDE"] + } + + # 각 정보 수집 + for field, selectors in info_mappings.items(): + for selector in selectors: + try: + elem = self.driver.find_element(By.CSS_SELECTOR, selector) + place_info[field] = elem.text.strip() + if place_info[field]: + break + except: + continue + + # 전화번호 수집 (특별 처리) + place_info['tel'] = self._extract_phone_number() + + # 영업시간 + try: + hours_elem = self.driver.find_element(By.CSS_SELECTOR, "._1APRQ") + place_info['business_hours'] = hours_elem.text.strip() + except: + pass + + # 홈페이지 + try: + homepage_elem = self.driver.find_element(By.CSS_SELECTOR, "a[class*='homepage']") + place_info['homepage'] = homepage_elem.get_attribute('href') + except: + pass + + # 설명 + try: + desc_elem = self.driver.find_element(By.CSS_SELECTOR, ".T8RMe") + place_info['description'] = desc_elem.text.strip() + except: + pass + + self.log_info(f"장소 정보 수집 완료: {place_info['name']} - {place_info['address']}") + + except Exception as e: + self.log_error(f"장소 정보 수집 중 오류: {e}") + + return place_info + + def collect_photo_urls(self) -> List[str]: + """ + 사진 탭에서 사진 URL 수집 + + Returns: + 사진 URL 리스트 + """ + photo_urls = [] + + try: + self._ensure_correct_frame("entryIframe") + + # 사진 탭 클릭 + if not self._click_tab("사진"): + self.log_warning("사진 탭을 찾을 수 없습니다") + return photo_urls + + time.sleep(3) + + # 사진 URL 수집 + photo_selectors = [ + "img[class*='photo']", + "img[class*='thumb']", + "img[src*='phinf']", + "img[src*='blogfiles']" + ] + + all_images = [] + for selector in photo_selectors: + images = self.driver.find_elements(By.CSS_SELECTOR, selector) + all_images.extend(images) + + seen_urls = set() + for img in all_images[:20]: + try: + src = img.get_attribute("src") or img.get_attribute("data-src") + if src and src.startswith("http") and src not in seen_urls: + from naver_blog_crawler import clean_image_url + original_url = clean_image_url(src) + seen_urls.add(original_url) + photo_urls.append(original_url) + except: + continue + + self.log_info(f"사진 탭에서 {len(photo_urls)}개 URL 수집") + + except Exception as e: + self.log_error(f"사진 수집 중 오류: {e}") + + return photo_urls + + def collect_map_blog_reviews(self) -> List[Dict]: + """ + 지도에서 블로그 리뷰 정보 수집 + + Returns: + 블로그 리뷰 리스트 + """ + reviews = [] + + try: + self._ensure_correct_frame("entryIframe") + + # 블로그 탭 클릭 + if not self._click_tab("블로그"): + self.log_warning("블로그 탭을 찾을 수 없습니다") + return reviews + + time.sleep(3) + + # 블로그 리뷰 요소 찾기 + review_elems = self.driver.find_elements(By.CSS_SELECTOR, "li._2kAri") + + for idx, elem in enumerate(review_elems[:10]): + try: + review_text = elem.text.strip() + lines = review_text.split('\n') + + review_info = { + 'title': lines[0] if lines else '', + 'preview': ' '.join(lines[1:3]) if len(lines) > 1 else '', + 'date': '', + 'is_ad': False, + 'ad_phrases': [] + } + + # 날짜 찾기 + date_match = re.search(r'(\d{4}\.\d{1,2}\.\d{1,2})', review_text) + if date_match: + review_info['date'] = date_match.group(1) + + # 광고성 문구 검색 + ad_keywords = ['광고', '협찬', '제공받', '초대받', '스폰서', '파트너스', '수수료'] + full_text = review_info['title'] + ' ' + review_info['preview'] + + for keyword in ad_keywords: + if keyword in full_text: + review_info['is_ad'] = True + idx = full_text.find(keyword) + start = max(0, idx - 20) + end = min(len(full_text), idx + 30) + review_info['ad_phrases'].append(full_text[start:end].strip()) + + reviews.append(review_info) + self.log_debug(f"리뷰 수집: {review_info['title'][:30]}...") + + except Exception as e: + self.log_debug(f"리뷰 추출 중 오류: {e}") + continue + + self.log_info(f"지도에서 블로그 리뷰 {len(reviews)}개 수집") + + except Exception as e: + self.log_error(f"블로그 리뷰 수집 중 오류: {e}") + + return reviews + + def collect_blog_links(self, max_count: int = 5) -> List[tuple]: + """ + 블로그 링크 수집 + + Args: + max_count: 최대 수집 개수 + + Returns: + (제목, URL) 튜플 리스트 + """ + blog_links = [] + + self._ensure_correct_frame("entryIframe") + + review_selectors = [ + "li._2kAri", + "li[class*='blog']", + "ul[class*='list'] > li", + ".place_section_content li" + ] + + review_elements = [] + + for selector in review_selectors: + try: + elements = self.driver.find_elements(By.CSS_SELECTOR, selector) + if elements: + valid_elements = [] + for elem in elements: + try: + text = elem.text.strip() + if text and len(text) > 20 and '\n' in text: + valid_elements.append(elem) + except: + continue + + if valid_elements: + review_elements = valid_elements + self.log_info(f"유효한 리뷰 요소 {len(valid_elements)}개 발견") + break + + except: + continue + + # 블로그 링크 추출 + for elem in review_elements[:max_count]: + try: + text = elem.text.strip() + lines = text.split('\n') + title = lines[0] if lines else "제목 없음" + + # URL 추출 + url = self._extract_url_from_element(elem) + + if url: + blog_links.append((title, url)) + self.log_debug(f"블로그 링크 수집: {title[:30]}...") + + except Exception as e: + self.log_debug(f"링크 추출 중 오류: {e}") + continue + + self.log_info(f"총 {len(blog_links)}개의 블로그 링크 수집") + + return blog_links + + def _find_search_box(self): + """검색창 찾기""" + self.log_debug("검색창 찾기 시도 중...") + + time.sleep(2) + + selectors = [ + "input.input_search", + "input[placeholder*='검색']", + "input[type='search']", + "#search-input", + "input[name='query']", + "//input[@placeholder]", + "//input[contains(@class, 'search')]" + ] + + for selector in selectors: + try: + if selector.startswith('//'): + elem = self.driver.find_element(By.XPATH, selector) + else: + elem = self.driver.find_element(By.CSS_SELECTOR, selector) + + if elem.is_displayed() and elem.is_enabled(): + self.log_debug(f"검색창 발견: {selector}") + return elem + + except: + continue + + return None + + def _click_search_result(self) -> bool: + """검색 결과 클릭""" + # 동적 대기로 변경 + + # searchIframe 전환 시도 + try: + self.driver.switch_to.default_content() + self.wait.until(EC.presence_of_element_located((By.ID, "searchIframe"))) + self.driver.switch_to.frame("searchIframe") + self.log_debug("searchIframe으로 전환 성공") + except: + self.log_debug("searchIframe 전환 실패") + + selectors = [ + "a.place_bluelink", + "span.place_bluelink", + "//span[contains(@class, 'place')]", + "//a[contains(@class, 'place')]", + "li[class*='item'] a", + "div[class*='item'] a" + ] + + for selector in selectors: + try: + if selector.startswith('//'): + elements = self.driver.find_elements(By.XPATH, selector) + else: + elements = self.driver.find_elements(By.CSS_SELECTOR, selector) + + if elements: + elem = elements[0] + self.driver.execute_script("arguments[0].scrollIntoView(true);", elem) + time.sleep(1) + + try: + elem.click() + except: + self.driver.execute_script("arguments[0].click();", elem) + + self.log_debug(f"검색 결과 클릭 성공: {selector}") + return True + + except: + continue + + self.driver.switch_to.default_content() + return False + + def _click_tab(self, tab_name: str) -> bool: + """탭 클릭""" + tab_selectors = [ + f"//a[contains(text(), '{tab_name}')]", + f"//span[contains(text(), '{tab_name}')]", + f"a[href*='{tab_name.lower()}']" + ] + + for selector in tab_selectors: + try: + if selector.startswith('//'): + elem = self.driver.find_element(By.XPATH, selector) + else: + elem = self.driver.find_element(By.CSS_SELECTOR, selector) + + if elem.is_displayed(): + self.driver.execute_script("arguments[0].click();", elem) + self.log_debug(f"{tab_name} 탭 클릭 성공") + return True + except: + continue + + return False + + def _extract_phone_number(self) -> str: + """전화번호 추출""" + tel_selectors = ["span.xlx7Q", "a[href^='tel:']", "._3ZA0S", ".phone"] + + for selector in tel_selectors: + try: + elem = self.driver.find_element(By.CSS_SELECTOR, selector) + tel_text = elem.text.strip() + tel_match = re.search(r'(\d{2,4}[-.]?\d{3,4}[-.]?\d{4})', tel_text) + if tel_match: + return tel_match.group(1) + except: + continue + + return '' + + def _extract_url_from_element(self, elem) -> Optional[str]: + """요소에서 URL 추출""" + # a 태그에서 href 찾기 + try: + link_elements = elem.find_elements(By.TAG_NAME, "a") + for link_elem in link_elements: + href = link_elem.get_attribute("href") + if href and href.startswith("http"): + return href + except: + pass + + # onclick 속성에서 URL 추출 + try: + onclick = elem.get_attribute("onclick") + if onclick and "http" in onclick: + url_match = re.search(r'(https?://[^\s\'"]+)', onclick) + if url_match: + return url_match.group(1) + except: + pass + + return None + + def _ensure_correct_frame(self, target_frame: str = None): + """올바른 프레임으로 전환""" + if target_frame != self.current_frame: + if target_frame is None: + self.driver.switch_to.default_content() + self.current_frame = None + self.log_debug("기본 프레임으로 전환") + else: + try: + self.driver.switch_to.default_content() + self.wait.until(EC.presence_of_element_located((By.ID, target_frame))) + self.driver.switch_to.frame(target_frame) + self.current_frame = target_frame + self.log_debug(f"{target_frame}으로 전환 성공") + except Exception as e: + self.log_debug(f"{target_frame} 전환 실패: {e}") + + def _save_screenshot(self, name: str): + """스크린샷 저장""" + try: + filename = f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" + filepath = os.path.join(self.project_dir, 'debug', filename) + self.driver.save_screenshot(filepath) + self.log_debug(f"스크린샷 저장: {filename}") + except: + pass + + def return_to_blog_list(self): + """블로그 목록으로 돌아가기""" + try: + self._ensure_correct_frame(None) + self._ensure_correct_frame("entryIframe") + self.log_debug("블로그 목록으로 복귀") + except Exception as e: + self.log_error(f"블로그 목록 복귀 중 오류: {e}")