"""저작권 침해 탐지 파이프라인 (PDF VII장 권장 아키텍처). 3단 캐스케이딩: 1차) MinHash + LSH 1차 필터 — 대규모 코퍼스에서 후보 N건 빠르게 추출 2차) 자서전 모드 전처리 (옵션) — 공통 표현 제거 + NER 마스킹 3차) 삼중 유사도 정밀 비교 — text(임베딩) + lemma(형태소) + element(자카드) 4차) 분류 — 10종 법령 메타 태그 (주/보조) + 케이스 매핑 후방 호환: - infringement_type (5종 enum): 기존 UI/통합 코드용으로 유지 - tags + case_id: PDF 분류체계 신규 필드 """ from __future__ import annotations import logging from datetime import datetime, timezone from app.api.schemas import ( TAG_LABEL_KO, DetectOptions, DetectRequest, DetectResponse, DocumentMetadata, InfringementTag, InfringementType, MatchResult, ScoreBreakdown, ) from app.core.config import Settings, get_settings from app.engine.autobiography_filter import preprocess_for_autobiography from app.engine.corpus import load_corpus from app.engine.extractor import Extractor, get_extractor from app.engine.lsh_filter import LshIndex from app.engine.similarity import ( DualSimilarityIndex, SimilarityHit, build_text_backend, ) from app.engine.structural import extract_lemmas from app.engine.taxonomy import Taxonomy, load_taxonomy logger = logging.getLogger(__name__) class PlagiarismDetector: def __init__(self, settings: Settings | None = None, extractor: Extractor | None = None): self.settings = settings or get_settings() self._extractor: Extractor = extractor or get_extractor(self.settings) self.taxonomy: Taxonomy | None = load_taxonomy(self.settings.taxonomy_path) self._corpus = load_corpus(self.settings.corpus_path) # 자서전 모드면 코퍼스도 동일 전처리 적용 후 인덱싱 logger.info("Building corpus indexes (autobiography_mode=%s)", self.settings.autobiography_mode) if self.settings.autobiography_mode: self._corpus_preprocessed_texts = [ preprocess_for_autobiography( d.text, self.settings.autobiography_patterns_path, self.settings.enable_entity_masking, ) for d in self._corpus ] else: self._corpus_preprocessed_texts = [d.text for d in self._corpus] # 정밀 비교용 인덱스 (전처리된 텍스트 사용) from app.engine.corpus import ReferenceDoc preprocessed_docs = [ ReferenceDoc(doc_id=d.doc_id, title=d.title, text=pt) for d, pt in zip(self._corpus, self._corpus_preprocessed_texts) ] self._corpus_elements = [self._extractor.extract(t) for t in self._corpus_preprocessed_texts] self._corpus_lemmas = [extract_lemmas(t) for t in self._corpus_preprocessed_texts] text_backend = build_text_backend(preprocessed_docs, self.settings) self._index = DualSimilarityIndex( docs=preprocessed_docs, doc_elements=self._corpus_elements, doc_lemmas=self._corpus_lemmas, settings=self.settings, text_backend=text_backend, ) # 1차 LSH 필터 (PDF VII-3) self._lsh: LshIndex | None = None if self.settings.use_lsh_filter: self._lsh = LshIndex(preprocessed_docs, threshold=self.settings.lsh_threshold) # source_doc → ReferenceDoc 매핑 self._docs_by_id = {d.doc_id: d for d in self._corpus} @property def corpus_size(self) -> int: return len(self._corpus) def detect( self, doc_id: str, text: str, metadata: DocumentMetadata | None = None, options: DetectOptions | None = None, ) -> DetectResponse: opts = options or DetectOptions() threshold = opts.threshold if opts.threshold is not None else self.settings.similarity_threshold # 요청 단위 자서전 모드 override autobio_mode = ( self.settings.autobiography_mode if opts.autobiography_mode is None else opts.autobiography_mode ) # 자서전 모드 전처리 query_text = ( preprocess_for_autobiography( text, self.settings.autobiography_patterns_path, self.settings.enable_entity_masking, ) if autobio_mode else text ) # 요소 추출 (원본 텍스트 기준 — 사용자 검토용) elements = self._extractor.extract(text) # 1차 LSH 필터 (옵션) candidate_ids: set[str] | None = None candidates_count: int | None = None lsh_jaccards: dict[str, float] = {} if self._lsh: cands = self._lsh.query(query_text, top_k=self.settings.lsh_top_k) candidate_ids = {c.doc_id for c in cands} candidates_count = len(cands) lsh_jaccards = {c.doc_id: c.jaccard for c in cands} # 정밀 비교 (LSH 후보가 있으면 그것만, 없으면 풀스캔) hits = self._index.query(query_text, elements, top_k=opts.top_k) if candidate_ids is not None: hits = [h for h in hits if h.doc_id in candidate_ids] matches = [ self._to_match(h, opts.return_evidence, lsh_jaccards.get(h.doc_id)) for h in hits if h.score >= threshold ] confidence = matches[0].similarity if matches else (hits[0].score if hits else 0.0) is_infringement = bool(matches) ccl_basis = self._build_ccl_basis(matches) if is_infringement else None return DetectResponse( doc_id=doc_id, is_infringement=is_infringement, confidence=round(confidence, 4), extracted_elements=elements, matches=matches, ccl_basis=ccl_basis, autobiography_mode=autobio_mode, candidates_before_filter=candidates_count, engine_version=self.settings.engine_version, analyzed_at=datetime.now(timezone.utc), ) def detect_request(self, req: DetectRequest) -> DetectResponse: return self.detect(req.doc_id, req.text, req.metadata, req.options) def _to_match(self, hit: SimilarityHit, return_evidence: bool, lsh_j: float | None) -> MatchResult: legacy_type = _classify_legacy(hit) tags = self._assign_tags(hit, legacy_type) case = self.taxonomy.find_case([t.tag for t in tags if t.role == "primary"]) if self.taxonomy else None return MatchResult( source_doc=hit.doc_id, source_title=hit.title, similarity=round(hit.score, 4), tags=tags, case_id=case.case_id if case else None, case_title=case.title if case else None, infringement_type=legacy_type, evidence_spans=hit.evidence if return_evidence else [], score_breakdown=ScoreBreakdown( text_sim=round(hit.text_sim, 4), lemma_sim=round(hit.lemma_sim, 4), character_sim=round(hit.element_sim.get("characters", 0.0), 4), motif_sim=round(hit.element_sim.get("motifs", 0.0), 4), lsh_jaccard=round(lsh_j, 4) if lsh_j is not None else None, ), ) def _assign_tags(self, hit: SimilarityHit, legacy: InfringementType) -> list[InfringementTag]: """삼중 유사도 분포 → 10종 법령 태그(주/보조) 매핑. 규칙(PDF IX장 매핑표 기반): - copy/패러프레이즈 수준 표절 (lemma↑ or text↑) → 복제권(주) + 공중송신권(보조) - lemma만 매우 높음 → 인용 표시 누락(주 보조) - 인물 일치도 매우 높음(서사·구조 차용) → 2차적저작물작성권(주) + 자기창작인양표시(보조) - 구조 미달 가공 신호 (text 낮음 + lemma 중간) → 2차적저작물 미달 가공 """ text_sim = hit.text_sim lemma_sim = hit.lemma_sim char_sim = hit.element_sim.get("characters", 0.0) motif_sim = hit.element_sim.get("motifs", 0.0) primary: list[str] = [] secondary: list[str] = [] # 복제권: 표면 또는 lemma가 강하게 일치 if lemma_sim >= 0.70 or text_sim >= 0.70: primary.append("reproduction") secondary.append("public_transmission") # 전자책 게재 가정 # 표절 실무 - 인용 누락 primary.append("citation_missing") # 2차적저작물작성권: 구조·서사 차용 (인물/모티프 일치 + 표면은 낮음) elif (char_sim >= 0.40 or motif_sim >= 0.50) and text_sim < 0.40: primary.append("derivative_work") secondary.append("attribution") secondary.append("citation_missing") # 미달 가공 가능성 if text_sim < 0.30 and lemma_sim < 0.50: secondary.append("substandard_derivative") # 부분 변형 (text 중간 + 인물 일치) elif text_sim >= 0.40 and char_sim >= 0.30: primary.append("reproduction") secondary.append("derivative_work") secondary.append("citation_missing") # 낮은 매칭이지만 임계 통과한 경우 else: secondary.append("reproduction") # 중복 제거 + 태그 객체화 primary = list(dict.fromkeys(primary)) secondary = [s for s in dict.fromkeys(secondary) if s not in primary] out: list[InfringementTag] = [] for t in primary: out.append(InfringementTag(tag=t, role="primary", label_ko=TAG_LABEL_KO[t])) for t in secondary: out.append(InfringementTag(tag=t, role="secondary", label_ko=TAG_LABEL_KO[t])) return out def _build_ccl_basis(self, matches: list[MatchResult]) -> str: top = matches[0] sb = top.score_breakdown breakdown = "" if sb: breakdown = ( f" [text={sb.text_sim:.2f} / lemma={sb.lemma_sim:.2f} " f"/ char={sb.character_sim:.2f} / motif={sb.motif_sim:.2f}]" ) primary_labels = [t.label_ko for t in top.tags if t.role == "primary"] tag_summary = ", ".join(primary_labels) if primary_labels else "확인 필요" case_part = f" 추정 케이스 {top.case_id} ({top.case_title})." if top.case_id else "" return ( f"'{top.source_title}'와 결합 유사도 {top.similarity:.2%}로 매칭. " f"주 침해 태그: {tag_summary}.{case_part}{breakdown}" ) def _classify_legacy(hit: SimilarityHit) -> InfringementType: """후방 호환 - 단일 enum 분류 (UI/기존 통합 코드용).""" elem = hit.element_sim char_sim = elem.get("characters", 0.0) motif_sim = elem.get("motifs", 0.0) if hit.lemma_sim >= 0.70: return "copy" if hit.text_sim >= 0.70: return "copy" if hit.text_sim >= 0.40 and char_sim >= 0.30: return "transform" if hit.lemma_sim >= 0.40 and char_sim < 0.20: return "plot" if motif_sim >= 0.50 and char_sim < 0.20: return "plot" if char_sim >= 0.40: return "character" return "unknown" # 후방 호환 alias (테스트가 _classify import) _classify = _classify_legacy