577 lines
22 KiB
Python
577 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
네이버 블로그 크롤러 모듈
|
|
블로그 콘텐츠 수집 및 이미지 다운로드 기능
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import re
|
|
import json
|
|
import requests
|
|
import hashlib
|
|
from datetime import datetime
|
|
from typing import List, Dict, Optional, Tuple, Set
|
|
from urllib.parse import urlparse, parse_qs, urlunparse, urlencode
|
|
from dataclasses import dataclass
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
|
|
from .logger_utils import LoggerMixin, log_execution_time, log_exception
|
|
|
|
|
|
@dataclass
|
|
class ImageFilterConfig:
|
|
"""이미지 필터링 설정"""
|
|
min_width: int = 400
|
|
min_height: int = 400
|
|
min_file_size_kb: int = 10 # 최소 파일 크기 (KB)
|
|
max_file_size_mb: int = 10 # 최대 파일 크기 (MB)
|
|
allowed_formats: Set[str] = None
|
|
require_both_dimensions: bool = False # True면 width AND height 모두 조건 만족, False면 OR
|
|
|
|
def __post_init__(self):
|
|
if self.allowed_formats is None:
|
|
self.allowed_formats = {'.jpg', '.jpeg', '.png', '.webp'}
|
|
|
|
|
|
def clean_image_url(url: str) -> str:
|
|
"""
|
|
이미지 URL 정리 (불필요한 파라미터 제거)
|
|
|
|
Args:
|
|
url: 원본 URL
|
|
|
|
Returns:
|
|
정리된 URL
|
|
"""
|
|
# autoRotate=true&type=w800& 제거
|
|
url = url.replace("autoRotate=true&type=w800&", "")
|
|
url = url.replace("autoRotate=true&", "")
|
|
url = url.replace("type=w800&", "")
|
|
|
|
# URL 파싱하여 파라미터 정리
|
|
try:
|
|
parsed = urlparse(url)
|
|
params = parse_qs(parsed.query)
|
|
|
|
# 불필요한 파라미터 제거
|
|
params.pop('autoRotate', None)
|
|
|
|
# type 파라미터 처리
|
|
if 'type' in params:
|
|
# 작은 사이즈면 큰 사이즈로 변경
|
|
if params['type'][0] in ['f', 'f120', 'f240', 'w240', 'w500']:
|
|
params['type'] = ['w800']
|
|
|
|
# 정리된 URL 재구성
|
|
new_query = urlencode(params, doseq=True)
|
|
url = urlunparse((
|
|
parsed.scheme,
|
|
parsed.netloc,
|
|
parsed.path,
|
|
parsed.params,
|
|
new_query,
|
|
parsed.fragment
|
|
))
|
|
except:
|
|
pass
|
|
|
|
return url
|
|
|
|
|
|
class NaverBlogCrawler(LoggerMixin):
|
|
"""네이버 블로그 크롤러"""
|
|
|
|
def __init__(self, driver: webdriver.Chrome, wait: WebDriverWait, project_dir: str,
|
|
image_filter: ImageFilterConfig = None):
|
|
"""
|
|
초기화
|
|
|
|
Args:
|
|
driver: 웹드라이버
|
|
wait: WebDriverWait 객체
|
|
project_dir: 프로젝트 디렉토리
|
|
image_filter: 이미지 필터 설정
|
|
"""
|
|
self.driver = driver
|
|
self.wait = wait
|
|
self.project_dir = project_dir
|
|
self.image_filter = image_filter or ImageFilterConfig()
|
|
|
|
# 중복 이미지 방지를 위한 해시 추적
|
|
self.downloaded_hashes: Set[str] = set()
|
|
self.image_metadata: List[Dict] = []
|
|
|
|
# 로거 설정
|
|
self.setup_logger(log_prefix='blog_crawler')
|
|
|
|
@log_execution_time()
|
|
def visit_and_extract_blog(self, url: str, title: str, index: int) -> Optional[Dict]:
|
|
"""
|
|
블로그 방문 및 내용 추출
|
|
|
|
Args:
|
|
url: 블로그 URL
|
|
title: 블로그 제목
|
|
index: 인덱스
|
|
|
|
Returns:
|
|
블로그 정보 딕셔너리
|
|
"""
|
|
blog_info = {
|
|
'index': index,
|
|
'title': title,
|
|
'blog_title': '',
|
|
'url': url,
|
|
'content': '',
|
|
'images': [],
|
|
'extracted_at': datetime.now().isoformat(),
|
|
'is_ad': False,
|
|
'ad_phrases': []
|
|
}
|
|
|
|
try:
|
|
# 새 탭 열기
|
|
original_window = self.driver.current_window_handle
|
|
self.driver.execute_script("window.open('');")
|
|
self.driver.switch_to.window(self.driver.window_handles[-1])
|
|
|
|
# 블로그 페이지 로드
|
|
self.driver.get(url)
|
|
time.sleep(3)
|
|
|
|
# 페이지 타이틀 추출
|
|
try:
|
|
blog_info['blog_title'] = self.driver.title
|
|
except:
|
|
blog_info['blog_title'] = "타이틀 없음"
|
|
|
|
# 네이버 블로그 iframe 처리
|
|
if "blog.naver.com" in url:
|
|
try:
|
|
self.driver.switch_to.frame("mainFrame")
|
|
self.log_debug("네이버 블로그 mainFrame 전환 성공")
|
|
except:
|
|
pass
|
|
|
|
# 본문 추출
|
|
content_text = self._extract_blog_content()
|
|
if content_text:
|
|
blog_info['content'] = content_text[:2000]
|
|
|
|
# 광고성 문구 검색
|
|
ad_keywords = ['광고', '협찬', '제공받', '초대받', '스폰서', '파트너스', '수수료', '원고료']
|
|
for keyword in ad_keywords:
|
|
if keyword in content_text:
|
|
blog_info['is_ad'] = True
|
|
idx = content_text.find(keyword)
|
|
start = max(0, idx - 30)
|
|
end = min(len(content_text), idx + 30)
|
|
blog_info['ad_phrases'].append(content_text[start:end].strip())
|
|
|
|
# 이미지 URL 수집
|
|
blog_info['images'] = self._collect_blog_images()
|
|
|
|
# 스크린샷 저장
|
|
self._save_screenshot(f"blog_{index}")
|
|
|
|
except Exception as e:
|
|
self.log_error(f"블로그 내용 추출 중 오류: {e}")
|
|
|
|
finally:
|
|
# 탭 닫고 원래 창으로 돌아가기
|
|
self.driver.close()
|
|
self.driver.switch_to.window(original_window)
|
|
|
|
return blog_info
|
|
|
|
def _extract_blog_content(self) -> str:
|
|
"""블로그 본문 추출"""
|
|
content_selectors = [
|
|
".se-main-container",
|
|
"div.se-component",
|
|
"div[class*='postViewArea']",
|
|
"#postViewArea",
|
|
".post-view",
|
|
".article-view",
|
|
".entry-content",
|
|
"article",
|
|
"main",
|
|
"[role='article']",
|
|
".content",
|
|
"#content"
|
|
]
|
|
|
|
content_text = ""
|
|
for selector in content_selectors:
|
|
try:
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for elem in elements:
|
|
text = elem.text.strip()
|
|
if len(text) > 100:
|
|
content_text += text + "\n\n"
|
|
if len(content_text) > 2000:
|
|
break
|
|
except:
|
|
continue
|
|
|
|
return content_text
|
|
|
|
def _collect_blog_images(self) -> List[str]:
|
|
"""블로그 내 이미지 URL 수집"""
|
|
image_urls = []
|
|
|
|
try:
|
|
img_selectors = [
|
|
"img[src*='blogfiles']",
|
|
"img[src*='postfiles']",
|
|
"img[src*='phinf']",
|
|
".se-image img",
|
|
".post-view img",
|
|
"article img",
|
|
".content img"
|
|
]
|
|
|
|
seen_urls = set()
|
|
|
|
for selector in img_selectors:
|
|
images = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
for img in images:
|
|
try:
|
|
src = img.get_attribute("src")
|
|
if (src and src.startswith("http") and
|
|
src not in seen_urls and
|
|
not any(skip in src.lower() for skip in ['icon', 'emoji', 'button', 'logo'])):
|
|
|
|
original_url = clean_image_url(src)
|
|
seen_urls.add(original_url)
|
|
image_urls.append(original_url)
|
|
|
|
if len(image_urls) >= 10:
|
|
break
|
|
except:
|
|
continue
|
|
|
|
self.log_debug(f"블로그에서 {len(image_urls)}개 이미지 URL 수집")
|
|
|
|
except Exception as e:
|
|
self.log_error(f"이미지 수집 중 오류: {e}")
|
|
|
|
return image_urls
|
|
|
|
@log_execution_time()
|
|
def download_all_images(self, results: Dict) -> Dict:
|
|
"""
|
|
모든 이미지 다운로드
|
|
|
|
Args:
|
|
results: 크롤링 결과 딕셔너리
|
|
|
|
Returns:
|
|
다운로드 정보 딕셔너리
|
|
"""
|
|
self.log_info("\n=== 이미지 다운로드 시작 ===")
|
|
|
|
# 타임스탬프로 폴더 생성
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
download_dir = os.path.join(self.project_dir, 'images', f"{results['query']}_{timestamp}")
|
|
os.makedirs(download_dir, exist_ok=True)
|
|
|
|
all_urls = []
|
|
|
|
# 사진 탭에서 수집한 URL
|
|
photo_urls = [(url, f"photo_{i+1}") for i, url in enumerate(results.get('photo_urls', []))]
|
|
all_urls.extend(photo_urls)
|
|
|
|
# 블로그에서 수집한 URL
|
|
for blog_idx, blog in enumerate(results.get('detailed_blogs', [])):
|
|
for img_idx, url in enumerate(blog.get('images', [])):
|
|
all_urls.append((url, f"blog{blog_idx+1}_img{img_idx+1}"))
|
|
|
|
# 중복 제거
|
|
unique_urls = {}
|
|
for url, prefix in all_urls:
|
|
cleaned_url = clean_image_url(url)
|
|
if cleaned_url not in unique_urls:
|
|
unique_urls[cleaned_url] = prefix
|
|
|
|
self.log_info(f"총 {len(unique_urls)}개의 고유 이미지 다운로드 예정")
|
|
self.log_info(f"필터 조건: {self._get_filter_description()}")
|
|
|
|
# 통계 추적
|
|
stats = {
|
|
'success': 0,
|
|
'duplicate': 0,
|
|
'size_filtered': 0,
|
|
'file_size_filtered': 0,
|
|
'format_filtered': 0,
|
|
'network_error': 0,
|
|
'other_error': 0,
|
|
'file_exists': 0
|
|
}
|
|
|
|
# 다운로드 실행
|
|
for url, prefix in unique_urls.items():
|
|
success, reason = self._download_image(url, download_dir, prefix, stats['success'] + 1)
|
|
|
|
# 통계 업데이트
|
|
if success:
|
|
if reason == "파일존재":
|
|
stats['file_exists'] += 1
|
|
else:
|
|
stats['success'] += 1
|
|
else:
|
|
if "중복" in reason:
|
|
stats['duplicate'] += 1
|
|
elif "크기부족" in reason:
|
|
stats['size_filtered'] += 1
|
|
elif "파일크기" in reason:
|
|
stats['file_size_filtered'] += 1
|
|
elif "포맷" in reason:
|
|
stats['format_filtered'] += 1
|
|
elif "네트워크" in reason:
|
|
stats['network_error'] += 1
|
|
else:
|
|
stats['other_error'] += 1
|
|
|
|
# 메타데이터 저장
|
|
self._save_image_metadata(download_dir)
|
|
|
|
# 통계 로깅
|
|
total_saved = stats['success'] + stats['file_exists']
|
|
total_filtered = stats['duplicate'] + stats['size_filtered'] + stats['file_size_filtered'] + stats['format_filtered']
|
|
|
|
self.log_info(f"✓ 이미지 다운로드 완료:")
|
|
self.log_info(f" 저장: {total_saved}개 (신규: {stats['success']}개, 기존: {stats['file_exists']}개)")
|
|
self.log_info(f" 필터링: {total_filtered}개")
|
|
self.log_info(f"저장 위치: {download_dir}")
|
|
|
|
# 다운로드 정보 반환
|
|
return {
|
|
'download_dir': download_dir,
|
|
'total_images': len(unique_urls),
|
|
'downloaded': total_saved,
|
|
'statistics': stats,
|
|
'filter_config': {
|
|
'min_width': self.image_filter.min_width,
|
|
'min_height': self.image_filter.min_height,
|
|
'min_file_size_kb': self.image_filter.min_file_size_kb,
|
|
'max_file_size_mb': self.image_filter.max_file_size_mb,
|
|
'allowed_formats': list(self.image_filter.allowed_formats),
|
|
'require_both_dimensions': self.image_filter.require_both_dimensions
|
|
},
|
|
'timestamp': timestamp
|
|
}
|
|
|
|
def _download_image(self, url: str, save_dir: str, prefix: str, index: int) -> Tuple[bool, str]:
|
|
"""개별 이미지 다운로드"""
|
|
try:
|
|
# 파일 확장자 추출
|
|
parsed_url = urlparse(url)
|
|
path = parsed_url.path
|
|
ext = os.path.splitext(path)[1].lower()
|
|
|
|
# 확장자가 없거나 이상한 경우 기본값
|
|
if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']:
|
|
ext = '.jpg'
|
|
|
|
# 파일명 생성
|
|
filename = f"{prefix}_{index:03d}{ext}"
|
|
filepath = os.path.join(save_dir, filename)
|
|
|
|
# 이미 존재하면 스킵
|
|
if os.path.exists(filepath):
|
|
self.log_debug(f"이미 존재: {filename}")
|
|
return True, "파일존재"
|
|
|
|
# 다운로드
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Referer': 'https://map.naver.com/'
|
|
}
|
|
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
# 이미지 해시 계산 (중복 체크용)
|
|
image_hash = self._calculate_image_hash(response.content)
|
|
|
|
# 중복 이미지 체크
|
|
if self._is_duplicate_image(image_hash):
|
|
self.log_debug(f"중복 이미지 스킵: {filename}")
|
|
return False, "중복"
|
|
|
|
# 고급 이미지 필터링
|
|
is_valid, width, height, reason = self._check_image_advanced(response.content)
|
|
|
|
if not is_valid:
|
|
self.log_debug(f"필터링 제외: {filename} - {reason}")
|
|
return False, reason
|
|
|
|
# 이미지 저장
|
|
with open(filepath, 'wb') as f:
|
|
f.write(response.content)
|
|
|
|
# 중복 방지를 위해 해시 추가
|
|
self.downloaded_hashes.add(image_hash)
|
|
|
|
# 메타데이터 기록
|
|
self._add_image_metadata(
|
|
filename=filename,
|
|
url=url,
|
|
width=width,
|
|
height=height,
|
|
file_size=len(response.content),
|
|
image_hash=image_hash,
|
|
download_time=datetime.now().isoformat()
|
|
)
|
|
|
|
self.log_debug(f"다운로드 성공: {filename} ({width}x{height}, {len(response.content)/1024:.1f}KB)")
|
|
return True, "성공"
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.log_warning(f"다운로드 실패: {url[:50]}... - {str(e)}")
|
|
return False, f"네트워크오류: {str(e)}"
|
|
except Exception as e:
|
|
self.log_error(f"이미지 저장 중 오류: {str(e)}")
|
|
return False, f"저장오류: {str(e)}"
|
|
|
|
def _check_image_advanced(self, image_data: bytes) -> Tuple[bool, int, int, str]:
|
|
"""고급 이미지 필터링 검사"""
|
|
try:
|
|
with Image.open(BytesIO(image_data)) as img:
|
|
width, height = img.size
|
|
format_ext = f".{img.format.lower()}" if img.format else ".unknown"
|
|
|
|
# 파일 크기 체크 (KB)
|
|
file_size_kb = len(image_data) / 1024
|
|
file_size_mb = file_size_kb / 1024
|
|
|
|
# 크기 조건 체크
|
|
if self.image_filter.require_both_dimensions:
|
|
size_ok = width >= self.image_filter.min_width and height >= self.image_filter.min_height
|
|
else:
|
|
size_ok = width >= self.image_filter.min_width or height >= self.image_filter.min_height
|
|
|
|
# 파일 크기 조건 체크
|
|
file_size_ok = (self.image_filter.min_file_size_kb <= file_size_kb <=
|
|
self.image_filter.max_file_size_mb * 1024)
|
|
|
|
# 포맷 조건 체크
|
|
format_ok = format_ext in self.image_filter.allowed_formats
|
|
|
|
is_valid = size_ok and file_size_ok and format_ok
|
|
|
|
# 실패 이유 반환
|
|
reasons = []
|
|
if not size_ok:
|
|
reasons.append(f"크기부족: {width}x{height}")
|
|
if not file_size_ok:
|
|
reasons.append(f"파일크기: {file_size_kb:.1f}KB")
|
|
if not format_ok:
|
|
reasons.append(f"포맷: {format_ext}")
|
|
|
|
failure_reason = ", ".join(reasons) if reasons else "통과"
|
|
|
|
return is_valid, width, height, failure_reason
|
|
|
|
except Exception as e:
|
|
self.log_debug(f"이미지 분석 실패: {e}")
|
|
return False, 0, 0, f"분석실패: {str(e)}"
|
|
|
|
def _calculate_image_hash(self, image_data: bytes) -> str:
|
|
"""이미지 데이터의 MD5 해시값 계산"""
|
|
return hashlib.md5(image_data).hexdigest()
|
|
|
|
def _is_duplicate_image(self, image_hash: str) -> bool:
|
|
"""중복 이미지인지 확인"""
|
|
return image_hash in self.downloaded_hashes
|
|
|
|
def _add_image_metadata(self, filename: str, url: str, width: int, height: int,
|
|
file_size: int, image_hash: str, download_time: str):
|
|
"""이미지 메타데이터 추가"""
|
|
metadata = {
|
|
'filename': filename,
|
|
'original_url': url,
|
|
'width': width,
|
|
'height': height,
|
|
'file_size_bytes': file_size,
|
|
'file_size_kb': round(file_size / 1024, 2),
|
|
'hash': image_hash,
|
|
'download_time': download_time,
|
|
'format': os.path.splitext(filename)[1].lower()
|
|
}
|
|
self.image_metadata.append(metadata)
|
|
|
|
def _save_image_metadata(self, download_dir: str):
|
|
"""이미지 메타데이터를 JSON 파일로 저장"""
|
|
if not self.image_metadata:
|
|
return
|
|
|
|
metadata_file = os.path.join(download_dir, 'image_metadata.json')
|
|
|
|
metadata_summary = {
|
|
'collection_info': {
|
|
'total_images': len(self.image_metadata),
|
|
'collection_time': datetime.now().isoformat(),
|
|
'filter_config': {
|
|
'min_width': self.image_filter.min_width,
|
|
'min_height': self.image_filter.min_height,
|
|
'min_file_size_kb': self.image_filter.min_file_size_kb,
|
|
'max_file_size_mb': self.image_filter.max_file_size_mb,
|
|
'allowed_formats': list(self.image_filter.allowed_formats),
|
|
'require_both_dimensions': self.image_filter.require_both_dimensions
|
|
}
|
|
},
|
|
'statistics': {
|
|
'total_file_size_mb': round(sum(img['file_size_bytes'] for img in self.image_metadata) / 1024 / 1024, 2),
|
|
'average_width': round(sum(img['width'] for img in self.image_metadata) / len(self.image_metadata), 1),
|
|
'average_height': round(sum(img['height'] for img in self.image_metadata) / len(self.image_metadata), 1),
|
|
'format_distribution': self._get_format_distribution()
|
|
},
|
|
'images': self.image_metadata
|
|
}
|
|
|
|
with open(metadata_file, 'w', encoding='utf-8') as f:
|
|
json.dump(metadata_summary, f, ensure_ascii=False, indent=2)
|
|
|
|
self.log_info(f"메타데이터 저장: {metadata_file}")
|
|
|
|
def _get_format_distribution(self) -> Dict[str, int]:
|
|
"""이미지 포맷 분포 계산"""
|
|
formats = {}
|
|
for img in self.image_metadata:
|
|
fmt = img['format']
|
|
formats[fmt] = formats.get(fmt, 0) + 1
|
|
return formats
|
|
|
|
def _get_filter_description(self) -> str:
|
|
"""필터 설정 설명 반환"""
|
|
conditions = []
|
|
|
|
if self.image_filter.require_both_dimensions:
|
|
conditions.append(f"크기 {self.image_filter.min_width}x{self.image_filter.min_height} 이상(AND)")
|
|
else:
|
|
conditions.append(f"가로 {self.image_filter.min_width}px 또는 세로 {self.image_filter.min_height}px 이상(OR)")
|
|
|
|
conditions.append(f"파일크기 {self.image_filter.min_file_size_kb}KB~{self.image_filter.max_file_size_mb}MB")
|
|
conditions.append(f"포맷 {', '.join(sorted(self.image_filter.allowed_formats))}")
|
|
|
|
return " | ".join(conditions)
|
|
|
|
def _save_screenshot(self, name: str):
|
|
"""스크린샷 저장"""
|
|
try:
|
|
filename = f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
|
|
filepath = os.path.join(self.project_dir, 'debug', filename)
|
|
self.driver.save_screenshot(filepath)
|
|
self.log_debug(f"스크린샷 저장: {filename}")
|
|
except:
|
|
pass
|