o2o-plagiarism-ai/app/engine/similarity.py

273 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""삼중 유사도 알고리즘.
계획서 p.21 "이중 유사도 분석" + 전임자 지침 (lemma 교집합 구조 분석) 결합:
① text_sim : 표면 의미 유사도 (TF-IDF / 임베딩 코사인)
② lemma_sim : 형태소 기본형 교집합 비율 (복붙+말투변경 탐지)
③ element_sim: 인물/모티프 자카드 (구조 분석)
최종 score = w1·text + w2·lemma + w3·char + w4·motif
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from typing import Protocol
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from app.api.schemas import EvidenceSpan, ExtractedElements
from app.core.config import Settings, get_settings
from app.engine.corpus import ReferenceDoc
from app.engine.structural import extract_lemmas, lemma_overlap_ratio
logger = logging.getLogger(__name__)
@dataclass
class SimilarityHit:
doc_id: str
title: str
score: float # 결합 유사도
text_sim: float # ① 표면
lemma_sim: float # ② 구조 (lemma 교집합 비율)
element_sim: dict[str, float] # ③ 인물/모티프/장르
evidence: list[EvidenceSpan]
# ---------- ① 표면 유사도 백엔드 ----------
class TextSimilarityBackend(Protocol):
def query(self, text: str) -> np.ndarray: ...
def _korean_tokenizer(text: str) -> list[str]:
words = re.findall(r"[가-힣A-Za-z0-9]+", text)
grams: list[str] = []
for w in words:
if len(w) <= 3:
grams.append(w)
else:
grams.extend(w[i : i + 3] for i in range(len(w) - 2))
return grams
class TfidfBackend:
def __init__(self, docs: list[ReferenceDoc]):
self._docs = docs
if not docs:
self._vectorizer = None
self._matrix = None
return
self._vectorizer = TfidfVectorizer(
tokenizer=_korean_tokenizer, lowercase=False, token_pattern=None, min_df=1,
)
self._matrix = self._vectorizer.fit_transform([d.text for d in docs])
def query(self, text: str) -> np.ndarray:
if not self._docs or self._vectorizer is None:
return np.array([])
vec = self._vectorizer.transform([text])
return cosine_similarity(vec, self._matrix).ravel()
class EmbeddingBackend:
def __init__(self, docs: list[ReferenceDoc], settings: Settings):
from openai import OpenAI
self._docs = docs
self._model = settings.openai_embedding_model
self._client = OpenAI(api_key=settings.openai_api_key)
if not docs:
self._matrix = None
return
self._matrix = self._embed_batch([d.text[:8000] for d in docs])
def _embed_batch(self, texts: list[str]) -> np.ndarray:
resp = self._client.embeddings.create(model=self._model, input=texts)
arr = np.array([item.embedding for item in resp.data], dtype=np.float32)
norms = np.linalg.norm(arr, axis=1, keepdims=True)
norms[norms == 0] = 1.0
return arr / norms
def query(self, text: str) -> np.ndarray:
if not self._docs or self._matrix is None:
return np.array([])
q = self._embed_batch([text[:8000]])
return (q @ self._matrix.T).ravel()
class KoSimCSEBackend:
"""한국어 오픈소스 임베딩 (PDF VII-3 권장 - KoSimCSE/KoSBERT).
BM-K/KoSimCSE-roberta-multitask 등 sentence-transformers 호환 모델 사용.
OpenAI 의존 없이 로컬에서 동작 → 데이터 외부 노출 0, 호출 비용 0.
첫 호출 시 모델 자동 다운로드 (~500MB).
"""
_model_cache: dict[str, object] = {}
def __init__(self, docs: list[ReferenceDoc], settings: Settings):
from sentence_transformers import SentenceTransformer
self._docs = docs
self._model_name = settings.kosimcse_model
self._max_length = settings.kosimcse_max_length
# 모델 재사용 (코퍼스 재빌드 시 매번 로드 방지)
if self._model_name not in KoSimCSEBackend._model_cache:
logger.info("Loading SentenceTransformer model: %s", self._model_name)
KoSimCSEBackend._model_cache[self._model_name] = SentenceTransformer(self._model_name)
self._model = KoSimCSEBackend._model_cache[self._model_name]
if not docs:
self._matrix = None
return
texts = [d.text[: self._max_length * 4] for d in docs] # 문자 기준 잘림 (토큰화 후 다시 잘림)
self._matrix = self._encode(texts)
def _encode(self, texts: list[str]) -> np.ndarray:
emb = self._model.encode(
texts,
normalize_embeddings=True,
show_progress_bar=False,
batch_size=8,
)
return np.array(emb, dtype=np.float32)
def query(self, text: str) -> np.ndarray:
if not self._docs or self._matrix is None:
return np.array([])
q = self._encode([text[: self._max_length * 4]])
return (q @ self._matrix.T).ravel()
# ---------- ③ 요소 유사도 ----------
def _jaccard(a: list[str], b: list[str]) -> float:
sa, sb = set(s.lower() for s in a), set(s.lower() for s in b)
if not sa and not sb:
return 0.0
return len(sa & sb) / max(1, len(sa | sb))
def _element_similarities(q: ExtractedElements, c: ExtractedElements) -> dict[str, float]:
return {
"characters": _jaccard(q.characters, c.characters),
"motifs": _jaccard(q.motifs, c.motifs),
"keywords": _jaccard(q.keywords, c.keywords),
"genre": 1.0 if q.genre and q.genre == c.genre else 0.0,
}
# ---------- 삼중 유사도 인덱스 ----------
class DualSimilarityIndex:
"""text_sim × lemma_sim × element_sim 가중 결합."""
def __init__(
self,
docs: list[ReferenceDoc],
doc_elements: list[ExtractedElements],
doc_lemmas: list[list[str]],
settings: Settings,
text_backend: TextSimilarityBackend,
):
if not (len(docs) == len(doc_elements) == len(doc_lemmas)):
raise ValueError("docs/elements/lemmas length mismatch")
self._docs = docs
self._doc_elements = doc_elements
self._doc_lemmas = doc_lemmas
self._settings = settings
self._text_backend = text_backend
def query(
self,
text: str,
query_elements: ExtractedElements,
top_k: int = 5,
) -> list[SimilarityHit]:
if not self._docs:
return []
text_scores = self._text_backend.query(text)
if text_scores.size == 0:
return []
query_lemmas = extract_lemmas(text)
s = self._settings
hits: list[SimilarityHit] = []
for idx, doc in enumerate(self._docs):
text_sim = float(text_scores[idx])
lemma_sim = lemma_overlap_ratio(query_lemmas, self._doc_lemmas[idx])
elem_sim = _element_similarities(query_elements, self._doc_elements[idx])
combined = (
s.weight_text_sim * text_sim
+ s.weight_lemma_sim * lemma_sim
+ s.weight_char_sim * elem_sim["characters"]
+ s.weight_motif_sim * elem_sim["motifs"]
)
if combined <= 0:
continue
hits.append(
SimilarityHit(
doc_id=doc.doc_id,
title=doc.title,
score=combined,
text_sim=text_sim,
lemma_sim=lemma_sim,
element_sim=elem_sim,
evidence=_find_evidence_spans(text, doc.text),
)
)
hits.sort(key=lambda h: h.score, reverse=True)
return hits[:top_k]
def _find_evidence_spans(query: str, reference: str, ngram: int = 6, max_spans: int = 5) -> list[EvidenceSpan]:
if len(query) < ngram or len(reference) < ngram:
return []
ref_grams = {reference[i : i + ngram] for i in range(len(reference) - ngram + 1)}
spans: list[EvidenceSpan] = []
i = 0
while i <= len(query) - ngram and len(spans) < max_spans:
if query[i : i + ngram] in ref_grams:
start = i
end = i + ngram
while end < len(query) and query[end - ngram + 1 : end + 1] in ref_grams:
end += 1
spans.append(EvidenceSpan(start=start, end=end, matched=query[start:end]))
i = end
else:
i += 1
return spans
def build_text_backend(docs: list[ReferenceDoc], settings: Settings) -> TextSimilarityBackend:
"""우선순위: KoSimCSE (자체) → OpenAI 임베딩 → TF-IDF (폴백).
PDF VII-3 권장은 KoSimCSE/KoSBERT. 자체 모델/오픈소스 우선.
"""
if settings.use_kosimcse:
try:
return KoSimCSEBackend(docs, settings)
except Exception as exc:
logger.warning("KoSimCSE backend init failed, trying next: %s", exc)
if settings.use_embedding_similarity and settings.has_openai:
try:
logger.info("Using EmbeddingBackend (model=%s)", settings.openai_embedding_model)
return EmbeddingBackend(docs, settings)
except Exception as exc:
logger.warning("Embedding backend init failed, falling back to TF-IDF: %s", exc)
logger.info("Using TfidfBackend")
return TfidfBackend(docs)
SimilarityIndex = DualSimilarityIndex # 후방 호환