o2o-plagiarism-ai/app/engine/detector.py

282 lines
11 KiB
Python

"""저작권 침해 탐지 파이프라인 (PDF VII장 권장 아키텍처).
3단 캐스케이딩:
1차) MinHash + LSH 1차 필터 — 대규모 코퍼스에서 후보 N건 빠르게 추출
2차) 자서전 모드 전처리 (옵션) — 공통 표현 제거 + NER 마스킹
3차) 삼중 유사도 정밀 비교 — text(임베딩) + lemma(형태소) + element(자카드)
4차) 분류 — 10종 법령 메타 태그 (주/보조) + 케이스 매핑
후방 호환:
- infringement_type (5종 enum): 기존 UI/통합 코드용으로 유지
- tags + case_id: PDF 분류체계 신규 필드
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from app.api.schemas import (
TAG_LABEL_KO,
DetectOptions,
DetectRequest,
DetectResponse,
DocumentMetadata,
InfringementTag,
InfringementType,
MatchResult,
ScoreBreakdown,
)
from app.core.config import Settings, get_settings
from app.engine.autobiography_filter import preprocess_for_autobiography
from app.engine.corpus import load_corpus
from app.engine.extractor import Extractor, get_extractor
from app.engine.lsh_filter import LshIndex
from app.engine.similarity import (
DualSimilarityIndex,
SimilarityHit,
build_text_backend,
)
from app.engine.structural import extract_lemmas
from app.engine.taxonomy import Taxonomy, load_taxonomy
logger = logging.getLogger(__name__)
class PlagiarismDetector:
def __init__(self, settings: Settings | None = None, extractor: Extractor | None = None):
self.settings = settings or get_settings()
self._extractor: Extractor = extractor or get_extractor(self.settings)
self.taxonomy: Taxonomy | None = load_taxonomy(self.settings.taxonomy_path)
self._corpus = load_corpus(self.settings.corpus_path)
# 자서전 모드면 코퍼스도 동일 전처리 적용 후 인덱싱
logger.info("Building corpus indexes (autobiography_mode=%s)", self.settings.autobiography_mode)
if self.settings.autobiography_mode:
self._corpus_preprocessed_texts = [
preprocess_for_autobiography(
d.text,
self.settings.autobiography_patterns_path,
self.settings.enable_entity_masking,
)
for d in self._corpus
]
else:
self._corpus_preprocessed_texts = [d.text for d in self._corpus]
# 정밀 비교용 인덱스 (전처리된 텍스트 사용)
from app.engine.corpus import ReferenceDoc
preprocessed_docs = [
ReferenceDoc(doc_id=d.doc_id, title=d.title, text=pt)
for d, pt in zip(self._corpus, self._corpus_preprocessed_texts)
]
self._corpus_elements = [self._extractor.extract(t) for t in self._corpus_preprocessed_texts]
self._corpus_lemmas = [extract_lemmas(t) for t in self._corpus_preprocessed_texts]
text_backend = build_text_backend(preprocessed_docs, self.settings)
self._index = DualSimilarityIndex(
docs=preprocessed_docs,
doc_elements=self._corpus_elements,
doc_lemmas=self._corpus_lemmas,
settings=self.settings,
text_backend=text_backend,
)
# 1차 LSH 필터 (PDF VII-3)
self._lsh: LshIndex | None = None
if self.settings.use_lsh_filter:
self._lsh = LshIndex(preprocessed_docs, threshold=self.settings.lsh_threshold)
# source_doc → ReferenceDoc 매핑
self._docs_by_id = {d.doc_id: d for d in self._corpus}
@property
def corpus_size(self) -> int:
return len(self._corpus)
def detect(
self,
doc_id: str,
text: str,
metadata: DocumentMetadata | None = None,
options: DetectOptions | None = None,
) -> DetectResponse:
opts = options or DetectOptions()
threshold = opts.threshold if opts.threshold is not None else self.settings.similarity_threshold
# 요청 단위 자서전 모드 override
autobio_mode = (
self.settings.autobiography_mode if opts.autobiography_mode is None
else opts.autobiography_mode
)
# 자서전 모드 전처리
query_text = (
preprocess_for_autobiography(
text, self.settings.autobiography_patterns_path,
self.settings.enable_entity_masking,
)
if autobio_mode else text
)
# 요소 추출 (원본 텍스트 기준 — 사용자 검토용)
elements = self._extractor.extract(text)
# 1차 LSH 필터 (옵션)
candidate_ids: set[str] | None = None
candidates_count: int | None = None
lsh_jaccards: dict[str, float] = {}
if self._lsh:
cands = self._lsh.query(query_text, top_k=self.settings.lsh_top_k)
candidate_ids = {c.doc_id for c in cands}
candidates_count = len(cands)
lsh_jaccards = {c.doc_id: c.jaccard for c in cands}
# 정밀 비교 (LSH 후보가 있으면 그것만, 없으면 풀스캔)
hits = self._index.query(query_text, elements, top_k=opts.top_k)
if candidate_ids is not None:
hits = [h for h in hits if h.doc_id in candidate_ids]
matches = [
self._to_match(h, opts.return_evidence, lsh_jaccards.get(h.doc_id))
for h in hits if h.score >= threshold
]
confidence = matches[0].similarity if matches else (hits[0].score if hits else 0.0)
is_infringement = bool(matches)
ccl_basis = self._build_ccl_basis(matches) if is_infringement else None
return DetectResponse(
doc_id=doc_id,
is_infringement=is_infringement,
confidence=round(confidence, 4),
extracted_elements=elements,
matches=matches,
ccl_basis=ccl_basis,
autobiography_mode=autobio_mode,
candidates_before_filter=candidates_count,
engine_version=self.settings.engine_version,
analyzed_at=datetime.now(timezone.utc),
)
def detect_request(self, req: DetectRequest) -> DetectResponse:
return self.detect(req.doc_id, req.text, req.metadata, req.options)
def _to_match(self, hit: SimilarityHit, return_evidence: bool, lsh_j: float | None) -> MatchResult:
legacy_type = _classify_legacy(hit)
tags = self._assign_tags(hit, legacy_type)
case = self.taxonomy.find_case([t.tag for t in tags if t.role == "primary"]) if self.taxonomy else None
return MatchResult(
source_doc=hit.doc_id,
source_title=hit.title,
similarity=round(hit.score, 4),
tags=tags,
case_id=case.case_id if case else None,
case_title=case.title if case else None,
infringement_type=legacy_type,
evidence_spans=hit.evidence if return_evidence else [],
score_breakdown=ScoreBreakdown(
text_sim=round(hit.text_sim, 4),
lemma_sim=round(hit.lemma_sim, 4),
character_sim=round(hit.element_sim.get("characters", 0.0), 4),
motif_sim=round(hit.element_sim.get("motifs", 0.0), 4),
lsh_jaccard=round(lsh_j, 4) if lsh_j is not None else None,
),
)
def _assign_tags(self, hit: SimilarityHit, legacy: InfringementType) -> list[InfringementTag]:
"""삼중 유사도 분포 → 10종 법령 태그(주/보조) 매핑.
규칙(PDF IX장 매핑표 기반):
- copy/패러프레이즈 수준 표절 (lemma↑ or text↑) → 복제권(주) + 공중송신권(보조)
- lemma만 매우 높음 → 인용 표시 누락(주 보조)
- 인물 일치도 매우 높음(서사·구조 차용) → 2차적저작물작성권(주) + 자기창작인양표시(보조)
- 구조 미달 가공 신호 (text 낮음 + lemma 중간) → 2차적저작물 미달 가공
"""
text_sim = hit.text_sim
lemma_sim = hit.lemma_sim
char_sim = hit.element_sim.get("characters", 0.0)
motif_sim = hit.element_sim.get("motifs", 0.0)
primary: list[str] = []
secondary: list[str] = []
# 복제권: 표면 또는 lemma가 강하게 일치
if lemma_sim >= 0.70 or text_sim >= 0.70:
primary.append("reproduction")
secondary.append("public_transmission") # 전자책 게재 가정
# 표절 실무 - 인용 누락
primary.append("citation_missing")
# 2차적저작물작성권: 구조·서사 차용 (인물/모티프 일치 + 표면은 낮음)
elif (char_sim >= 0.40 or motif_sim >= 0.50) and text_sim < 0.40:
primary.append("derivative_work")
secondary.append("attribution")
secondary.append("citation_missing")
# 미달 가공 가능성
if text_sim < 0.30 and lemma_sim < 0.50:
secondary.append("substandard_derivative")
# 부분 변형 (text 중간 + 인물 일치)
elif text_sim >= 0.40 and char_sim >= 0.30:
primary.append("reproduction")
secondary.append("derivative_work")
secondary.append("citation_missing")
# 낮은 매칭이지만 임계 통과한 경우
else:
secondary.append("reproduction")
# 중복 제거 + 태그 객체화
primary = list(dict.fromkeys(primary))
secondary = [s for s in dict.fromkeys(secondary) if s not in primary]
out: list[InfringementTag] = []
for t in primary:
out.append(InfringementTag(tag=t, role="primary", label_ko=TAG_LABEL_KO[t]))
for t in secondary:
out.append(InfringementTag(tag=t, role="secondary", label_ko=TAG_LABEL_KO[t]))
return out
def _build_ccl_basis(self, matches: list[MatchResult]) -> str:
top = matches[0]
sb = top.score_breakdown
breakdown = ""
if sb:
breakdown = (
f" [text={sb.text_sim:.2f} / lemma={sb.lemma_sim:.2f} "
f"/ char={sb.character_sim:.2f} / motif={sb.motif_sim:.2f}]"
)
primary_labels = [t.label_ko for t in top.tags if t.role == "primary"]
tag_summary = ", ".join(primary_labels) if primary_labels else "확인 필요"
case_part = f" 추정 케이스 {top.case_id} ({top.case_title})." if top.case_id else ""
return (
f"'{top.source_title}'와 결합 유사도 {top.similarity:.2%}로 매칭. "
f"주 침해 태그: {tag_summary}.{case_part}{breakdown}"
)
def _classify_legacy(hit: SimilarityHit) -> InfringementType:
"""후방 호환 - 단일 enum 분류 (UI/기존 통합 코드용)."""
elem = hit.element_sim
char_sim = elem.get("characters", 0.0)
motif_sim = elem.get("motifs", 0.0)
if hit.lemma_sim >= 0.70:
return "copy"
if hit.text_sim >= 0.70:
return "copy"
if hit.text_sim >= 0.40 and char_sim >= 0.30:
return "transform"
if hit.lemma_sim >= 0.40 and char_sim < 0.20:
return "plot"
if motif_sim >= 0.50 and char_sim < 0.20:
return "plot"
if char_sim >= 0.40:
return "character"
return "unknown"
# 후방 호환 alias (테스트가 _classify import)
_classify = _classify_legacy