#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 네이버 블로그 크롤러 모듈 블로그 콘텐츠 수집 및 이미지 다운로드 기능 """ import os import time import re import json import requests import hashlib from datetime import datetime from typing import List, Dict, Optional, Tuple, Set from urllib.parse import urlparse, parse_qs, urlunparse, urlencode from dataclasses import dataclass from PIL import Image from io import BytesIO from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from .logger_utils import LoggerMixin, log_execution_time, log_exception @dataclass class ImageFilterConfig: """이미지 필터링 설정""" min_width: int = 400 min_height: int = 400 min_file_size_kb: int = 10 # 최소 파일 크기 (KB) max_file_size_mb: int = 10 # 최대 파일 크기 (MB) allowed_formats: Set[str] = None require_both_dimensions: bool = False # True면 width AND height 모두 조건 만족, False면 OR def __post_init__(self): if self.allowed_formats is None: self.allowed_formats = {'.jpg', '.jpeg', '.png', '.webp'} def clean_image_url(url: str) -> str: """ 이미지 URL 정리 (불필요한 파라미터 제거) Args: url: 원본 URL Returns: 정리된 URL """ # autoRotate=true&type=w800& 제거 url = url.replace("autoRotate=true&type=w800&", "") url = url.replace("autoRotate=true&", "") url = url.replace("type=w800&", "") # URL 파싱하여 파라미터 정리 try: parsed = urlparse(url) params = parse_qs(parsed.query) # 불필요한 파라미터 제거 params.pop('autoRotate', None) # type 파라미터 처리 if 'type' in params: # 작은 사이즈면 큰 사이즈로 변경 if params['type'][0] in ['f', 'f120', 'f240', 'w240', 'w500']: params['type'] = ['w800'] # 정리된 URL 재구성 new_query = urlencode(params, doseq=True) url = urlunparse(( parsed.scheme, parsed.netloc, parsed.path, parsed.params, new_query, parsed.fragment )) except: pass return url class NaverBlogCrawler(LoggerMixin): """네이버 블로그 크롤러""" def __init__(self, driver: webdriver.Chrome, wait: WebDriverWait, project_dir: str, image_filter: ImageFilterConfig = None): """ 초기화 Args: driver: 웹드라이버 wait: WebDriverWait 객체 project_dir: 프로젝트 디렉토리 image_filter: 이미지 필터 설정 """ self.driver = driver self.wait = wait self.project_dir = project_dir self.image_filter = image_filter or ImageFilterConfig() # 중복 이미지 방지를 위한 해시 추적 self.downloaded_hashes: Set[str] = set() self.image_metadata: List[Dict] = [] # 로거 설정 self.setup_logger(log_prefix='blog_crawler') @log_execution_time() def visit_and_extract_blog(self, url: str, title: str, index: int) -> Optional[Dict]: """ 블로그 방문 및 내용 추출 Args: url: 블로그 URL title: 블로그 제목 index: 인덱스 Returns: 블로그 정보 딕셔너리 """ blog_info = { 'index': index, 'title': title, 'blog_title': '', 'url': url, 'content': '', 'images': [], 'extracted_at': datetime.now().isoformat(), 'is_ad': False, 'ad_phrases': [] } try: # 새 탭 열기 original_window = self.driver.current_window_handle self.driver.execute_script("window.open('');") self.driver.switch_to.window(self.driver.window_handles[-1]) # 블로그 페이지 로드 self.driver.get(url) time.sleep(3) # 페이지 타이틀 추출 try: blog_info['blog_title'] = self.driver.title except: blog_info['blog_title'] = "타이틀 없음" # 네이버 블로그 iframe 처리 if "blog.naver.com" in url: try: self.driver.switch_to.frame("mainFrame") self.log_debug("네이버 블로그 mainFrame 전환 성공") except: pass # 본문 추출 content_text = self._extract_blog_content() if content_text: blog_info['content'] = content_text[:2000] # 광고성 문구 검색 ad_keywords = ['광고', '협찬', '제공받', '초대받', '스폰서', '파트너스', '수수료', '원고료'] for keyword in ad_keywords: if keyword in content_text: blog_info['is_ad'] = True idx = content_text.find(keyword) start = max(0, idx - 30) end = min(len(content_text), idx + 30) blog_info['ad_phrases'].append(content_text[start:end].strip()) # 이미지 URL 수집 blog_info['images'] = self._collect_blog_images() # 스크린샷 저장 self._save_screenshot(f"blog_{index}") except Exception as e: self.log_error(f"블로그 내용 추출 중 오류: {e}") finally: # 탭 닫고 원래 창으로 돌아가기 self.driver.close() self.driver.switch_to.window(original_window) return blog_info def _extract_blog_content(self) -> str: """블로그 본문 추출""" content_selectors = [ ".se-main-container", "div.se-component", "div[class*='postViewArea']", "#postViewArea", ".post-view", ".article-view", ".entry-content", "article", "main", "[role='article']", ".content", "#content" ] content_text = "" for selector in content_selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: text = elem.text.strip() if len(text) > 100: content_text += text + "\n\n" if len(content_text) > 2000: break except: continue return content_text def _collect_blog_images(self) -> List[str]: """블로그 내 이미지 URL 수집""" image_urls = [] try: img_selectors = [ "img[src*='blogfiles']", "img[src*='postfiles']", "img[src*='phinf']", ".se-image img", ".post-view img", "article img", ".content img" ] seen_urls = set() for selector in img_selectors: images = self.driver.find_elements(By.CSS_SELECTOR, selector) for img in images: try: src = img.get_attribute("src") if (src and src.startswith("http") and src not in seen_urls and not any(skip in src.lower() for skip in ['icon', 'emoji', 'button', 'logo'])): original_url = clean_image_url(src) seen_urls.add(original_url) image_urls.append(original_url) if len(image_urls) >= 10: break except: continue self.log_debug(f"블로그에서 {len(image_urls)}개 이미지 URL 수집") except Exception as e: self.log_error(f"이미지 수집 중 오류: {e}") return image_urls @log_execution_time() def download_all_images(self, results: Dict) -> Dict: """ 모든 이미지 다운로드 Args: results: 크롤링 결과 딕셔너리 Returns: 다운로드 정보 딕셔너리 """ self.log_info("\n=== 이미지 다운로드 시작 ===") # 타임스탬프로 폴더 생성 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") download_dir = os.path.join(self.project_dir, 'images', f"{results['query']}_{timestamp}") os.makedirs(download_dir, exist_ok=True) all_urls = [] # 사진 탭에서 수집한 URL photo_urls = [(url, f"photo_{i+1}") for i, url in enumerate(results.get('photo_urls', []))] all_urls.extend(photo_urls) # 블로그에서 수집한 URL for blog_idx, blog in enumerate(results.get('detailed_blogs', [])): for img_idx, url in enumerate(blog.get('images', [])): all_urls.append((url, f"blog{blog_idx+1}_img{img_idx+1}")) # 중복 제거 unique_urls = {} for url, prefix in all_urls: cleaned_url = clean_image_url(url) if cleaned_url not in unique_urls: unique_urls[cleaned_url] = prefix self.log_info(f"총 {len(unique_urls)}개의 고유 이미지 다운로드 예정") self.log_info(f"필터 조건: {self._get_filter_description()}") # 통계 추적 stats = { 'success': 0, 'duplicate': 0, 'size_filtered': 0, 'file_size_filtered': 0, 'format_filtered': 0, 'network_error': 0, 'other_error': 0, 'file_exists': 0 } # 다운로드 실행 for url, prefix in unique_urls.items(): success, reason = self._download_image(url, download_dir, prefix, stats['success'] + 1) # 통계 업데이트 if success: if reason == "파일존재": stats['file_exists'] += 1 else: stats['success'] += 1 else: if "중복" in reason: stats['duplicate'] += 1 elif "크기부족" in reason: stats['size_filtered'] += 1 elif "파일크기" in reason: stats['file_size_filtered'] += 1 elif "포맷" in reason: stats['format_filtered'] += 1 elif "네트워크" in reason: stats['network_error'] += 1 else: stats['other_error'] += 1 # 메타데이터 저장 self._save_image_metadata(download_dir) # 통계 로깅 total_saved = stats['success'] + stats['file_exists'] total_filtered = stats['duplicate'] + stats['size_filtered'] + stats['file_size_filtered'] + stats['format_filtered'] self.log_info(f"✓ 이미지 다운로드 완료:") self.log_info(f" 저장: {total_saved}개 (신규: {stats['success']}개, 기존: {stats['file_exists']}개)") self.log_info(f" 필터링: {total_filtered}개") self.log_info(f"저장 위치: {download_dir}") # 다운로드 정보 반환 return { 'download_dir': download_dir, 'total_images': len(unique_urls), 'downloaded': total_saved, 'statistics': stats, 'filter_config': { 'min_width': self.image_filter.min_width, 'min_height': self.image_filter.min_height, 'min_file_size_kb': self.image_filter.min_file_size_kb, 'max_file_size_mb': self.image_filter.max_file_size_mb, 'allowed_formats': list(self.image_filter.allowed_formats), 'require_both_dimensions': self.image_filter.require_both_dimensions }, 'timestamp': timestamp } def _download_image(self, url: str, save_dir: str, prefix: str, index: int) -> Tuple[bool, str]: """개별 이미지 다운로드""" try: # 파일 확장자 추출 parsed_url = urlparse(url) path = parsed_url.path ext = os.path.splitext(path)[1].lower() # 확장자가 없거나 이상한 경우 기본값 if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']: ext = '.jpg' # 파일명 생성 filename = f"{prefix}_{index:03d}{ext}" filepath = os.path.join(save_dir, filename) # 이미 존재하면 스킵 if os.path.exists(filepath): self.log_debug(f"이미 존재: {filename}") return True, "파일존재" # 다운로드 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Referer': 'https://map.naver.com/' } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() # 이미지 해시 계산 (중복 체크용) image_hash = self._calculate_image_hash(response.content) # 중복 이미지 체크 if self._is_duplicate_image(image_hash): self.log_debug(f"중복 이미지 스킵: {filename}") return False, "중복" # 고급 이미지 필터링 is_valid, width, height, reason = self._check_image_advanced(response.content) if not is_valid: self.log_debug(f"필터링 제외: {filename} - {reason}") return False, reason # 이미지 저장 with open(filepath, 'wb') as f: f.write(response.content) # 중복 방지를 위해 해시 추가 self.downloaded_hashes.add(image_hash) # 메타데이터 기록 self._add_image_metadata( filename=filename, url=url, width=width, height=height, file_size=len(response.content), image_hash=image_hash, download_time=datetime.now().isoformat() ) self.log_debug(f"다운로드 성공: {filename} ({width}x{height}, {len(response.content)/1024:.1f}KB)") return True, "성공" except requests.exceptions.RequestException as e: self.log_warning(f"다운로드 실패: {url[:50]}... - {str(e)}") return False, f"네트워크오류: {str(e)}" except Exception as e: self.log_error(f"이미지 저장 중 오류: {str(e)}") return False, f"저장오류: {str(e)}" def _check_image_advanced(self, image_data: bytes) -> Tuple[bool, int, int, str]: """고급 이미지 필터링 검사""" try: with Image.open(BytesIO(image_data)) as img: width, height = img.size format_ext = f".{img.format.lower()}" if img.format else ".unknown" # 파일 크기 체크 (KB) file_size_kb = len(image_data) / 1024 file_size_mb = file_size_kb / 1024 # 크기 조건 체크 if self.image_filter.require_both_dimensions: size_ok = width >= self.image_filter.min_width and height >= self.image_filter.min_height else: size_ok = width >= self.image_filter.min_width or height >= self.image_filter.min_height # 파일 크기 조건 체크 file_size_ok = (self.image_filter.min_file_size_kb <= file_size_kb <= self.image_filter.max_file_size_mb * 1024) # 포맷 조건 체크 format_ok = format_ext in self.image_filter.allowed_formats is_valid = size_ok and file_size_ok and format_ok # 실패 이유 반환 reasons = [] if not size_ok: reasons.append(f"크기부족: {width}x{height}") if not file_size_ok: reasons.append(f"파일크기: {file_size_kb:.1f}KB") if not format_ok: reasons.append(f"포맷: {format_ext}") failure_reason = ", ".join(reasons) if reasons else "통과" return is_valid, width, height, failure_reason except Exception as e: self.log_debug(f"이미지 분석 실패: {e}") return False, 0, 0, f"분석실패: {str(e)}" def _calculate_image_hash(self, image_data: bytes) -> str: """이미지 데이터의 MD5 해시값 계산""" return hashlib.md5(image_data).hexdigest() def _is_duplicate_image(self, image_hash: str) -> bool: """중복 이미지인지 확인""" return image_hash in self.downloaded_hashes def _add_image_metadata(self, filename: str, url: str, width: int, height: int, file_size: int, image_hash: str, download_time: str): """이미지 메타데이터 추가""" metadata = { 'filename': filename, 'original_url': url, 'width': width, 'height': height, 'file_size_bytes': file_size, 'file_size_kb': round(file_size / 1024, 2), 'hash': image_hash, 'download_time': download_time, 'format': os.path.splitext(filename)[1].lower() } self.image_metadata.append(metadata) def _save_image_metadata(self, download_dir: str): """이미지 메타데이터를 JSON 파일로 저장""" if not self.image_metadata: return metadata_file = os.path.join(download_dir, 'image_metadata.json') metadata_summary = { 'collection_info': { 'total_images': len(self.image_metadata), 'collection_time': datetime.now().isoformat(), 'filter_config': { 'min_width': self.image_filter.min_width, 'min_height': self.image_filter.min_height, 'min_file_size_kb': self.image_filter.min_file_size_kb, 'max_file_size_mb': self.image_filter.max_file_size_mb, 'allowed_formats': list(self.image_filter.allowed_formats), 'require_both_dimensions': self.image_filter.require_both_dimensions } }, 'statistics': { 'total_file_size_mb': round(sum(img['file_size_bytes'] for img in self.image_metadata) / 1024 / 1024, 2), 'average_width': round(sum(img['width'] for img in self.image_metadata) / len(self.image_metadata), 1), 'average_height': round(sum(img['height'] for img in self.image_metadata) / len(self.image_metadata), 1), 'format_distribution': self._get_format_distribution() }, 'images': self.image_metadata } with open(metadata_file, 'w', encoding='utf-8') as f: json.dump(metadata_summary, f, ensure_ascii=False, indent=2) self.log_info(f"메타데이터 저장: {metadata_file}") def _get_format_distribution(self) -> Dict[str, int]: """이미지 포맷 분포 계산""" formats = {} for img in self.image_metadata: fmt = img['format'] formats[fmt] = formats.get(fmt, 0) + 1 return formats def _get_filter_description(self) -> str: """필터 설정 설명 반환""" conditions = [] if self.image_filter.require_both_dimensions: conditions.append(f"크기 {self.image_filter.min_width}x{self.image_filter.min_height} 이상(AND)") else: conditions.append(f"가로 {self.image_filter.min_width}px 또는 세로 {self.image_filter.min_height}px 이상(OR)") conditions.append(f"파일크기 {self.image_filter.min_file_size_kb}KB~{self.image_filter.max_file_size_mb}MB") conditions.append(f"포맷 {', '.join(sorted(self.image_filter.allowed_formats))}") return " | ".join(conditions) def _save_screenshot(self, name: str): """스크린샷 저장""" try: filename = f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" filepath = os.path.join(self.project_dir, 'debug', filename) self.driver.save_screenshot(filepath) self.log_debug(f"스크린샷 저장: {filename}") except: pass