273 lines
9.4 KiB
Python
273 lines
9.4 KiB
Python
"""삼중 유사도 알고리즘.
|
||
|
||
계획서 p.21 "이중 유사도 분석" + 전임자 지침 (lemma 교집합 구조 분석) 결합:
|
||
① text_sim : 표면 의미 유사도 (TF-IDF / 임베딩 코사인)
|
||
② lemma_sim : 형태소 기본형 교집합 비율 (복붙+말투변경 탐지)
|
||
③ element_sim: 인물/모티프 자카드 (구조 분석)
|
||
|
||
최종 score = w1·text + w2·lemma + w3·char + w4·motif
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import re
|
||
from dataclasses import dataclass
|
||
from typing import Protocol
|
||
|
||
import numpy as np
|
||
from sklearn.feature_extraction.text import TfidfVectorizer
|
||
from sklearn.metrics.pairwise import cosine_similarity
|
||
|
||
from app.api.schemas import EvidenceSpan, ExtractedElements
|
||
from app.core.config import Settings, get_settings
|
||
from app.engine.corpus import ReferenceDoc
|
||
from app.engine.structural import extract_lemmas, lemma_overlap_ratio
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class SimilarityHit:
|
||
doc_id: str
|
||
title: str
|
||
score: float # 결합 유사도
|
||
text_sim: float # ① 표면
|
||
lemma_sim: float # ② 구조 (lemma 교집합 비율)
|
||
element_sim: dict[str, float] # ③ 인물/모티프/장르
|
||
evidence: list[EvidenceSpan]
|
||
|
||
|
||
# ---------- ① 표면 유사도 백엔드 ----------
|
||
|
||
class TextSimilarityBackend(Protocol):
|
||
def query(self, text: str) -> np.ndarray: ...
|
||
|
||
|
||
def _korean_tokenizer(text: str) -> list[str]:
|
||
words = re.findall(r"[가-힣A-Za-z0-9]+", text)
|
||
grams: list[str] = []
|
||
for w in words:
|
||
if len(w) <= 3:
|
||
grams.append(w)
|
||
else:
|
||
grams.extend(w[i : i + 3] for i in range(len(w) - 2))
|
||
return grams
|
||
|
||
|
||
class TfidfBackend:
|
||
def __init__(self, docs: list[ReferenceDoc]):
|
||
self._docs = docs
|
||
if not docs:
|
||
self._vectorizer = None
|
||
self._matrix = None
|
||
return
|
||
self._vectorizer = TfidfVectorizer(
|
||
tokenizer=_korean_tokenizer, lowercase=False, token_pattern=None, min_df=1,
|
||
)
|
||
self._matrix = self._vectorizer.fit_transform([d.text for d in docs])
|
||
|
||
def query(self, text: str) -> np.ndarray:
|
||
if not self._docs or self._vectorizer is None:
|
||
return np.array([])
|
||
vec = self._vectorizer.transform([text])
|
||
return cosine_similarity(vec, self._matrix).ravel()
|
||
|
||
|
||
class EmbeddingBackend:
|
||
def __init__(self, docs: list[ReferenceDoc], settings: Settings):
|
||
from openai import OpenAI
|
||
|
||
self._docs = docs
|
||
self._model = settings.openai_embedding_model
|
||
self._client = OpenAI(api_key=settings.openai_api_key)
|
||
if not docs:
|
||
self._matrix = None
|
||
return
|
||
self._matrix = self._embed_batch([d.text[:8000] for d in docs])
|
||
|
||
def _embed_batch(self, texts: list[str]) -> np.ndarray:
|
||
resp = self._client.embeddings.create(model=self._model, input=texts)
|
||
arr = np.array([item.embedding for item in resp.data], dtype=np.float32)
|
||
norms = np.linalg.norm(arr, axis=1, keepdims=True)
|
||
norms[norms == 0] = 1.0
|
||
return arr / norms
|
||
|
||
def query(self, text: str) -> np.ndarray:
|
||
if not self._docs or self._matrix is None:
|
||
return np.array([])
|
||
q = self._embed_batch([text[:8000]])
|
||
return (q @ self._matrix.T).ravel()
|
||
|
||
|
||
class KoSimCSEBackend:
|
||
"""한국어 오픈소스 임베딩 (PDF VII-3 권장 - KoSimCSE/KoSBERT).
|
||
|
||
BM-K/KoSimCSE-roberta-multitask 등 sentence-transformers 호환 모델 사용.
|
||
OpenAI 의존 없이 로컬에서 동작 → 데이터 외부 노출 0, 호출 비용 0.
|
||
첫 호출 시 모델 자동 다운로드 (~500MB).
|
||
"""
|
||
|
||
_model_cache: dict[str, object] = {}
|
||
|
||
def __init__(self, docs: list[ReferenceDoc], settings: Settings):
|
||
from sentence_transformers import SentenceTransformer
|
||
|
||
self._docs = docs
|
||
self._model_name = settings.kosimcse_model
|
||
self._max_length = settings.kosimcse_max_length
|
||
|
||
# 모델 재사용 (코퍼스 재빌드 시 매번 로드 방지)
|
||
if self._model_name not in KoSimCSEBackend._model_cache:
|
||
logger.info("Loading SentenceTransformer model: %s", self._model_name)
|
||
KoSimCSEBackend._model_cache[self._model_name] = SentenceTransformer(self._model_name)
|
||
self._model = KoSimCSEBackend._model_cache[self._model_name]
|
||
|
||
if not docs:
|
||
self._matrix = None
|
||
return
|
||
texts = [d.text[: self._max_length * 4] for d in docs] # 문자 기준 잘림 (토큰화 후 다시 잘림)
|
||
self._matrix = self._encode(texts)
|
||
|
||
def _encode(self, texts: list[str]) -> np.ndarray:
|
||
emb = self._model.encode(
|
||
texts,
|
||
normalize_embeddings=True,
|
||
show_progress_bar=False,
|
||
batch_size=8,
|
||
)
|
||
return np.array(emb, dtype=np.float32)
|
||
|
||
def query(self, text: str) -> np.ndarray:
|
||
if not self._docs or self._matrix is None:
|
||
return np.array([])
|
||
q = self._encode([text[: self._max_length * 4]])
|
||
return (q @ self._matrix.T).ravel()
|
||
|
||
|
||
# ---------- ③ 요소 유사도 ----------
|
||
|
||
def _jaccard(a: list[str], b: list[str]) -> float:
|
||
sa, sb = set(s.lower() for s in a), set(s.lower() for s in b)
|
||
if not sa and not sb:
|
||
return 0.0
|
||
return len(sa & sb) / max(1, len(sa | sb))
|
||
|
||
|
||
def _element_similarities(q: ExtractedElements, c: ExtractedElements) -> dict[str, float]:
|
||
return {
|
||
"characters": _jaccard(q.characters, c.characters),
|
||
"motifs": _jaccard(q.motifs, c.motifs),
|
||
"keywords": _jaccard(q.keywords, c.keywords),
|
||
"genre": 1.0 if q.genre and q.genre == c.genre else 0.0,
|
||
}
|
||
|
||
|
||
# ---------- 삼중 유사도 인덱스 ----------
|
||
|
||
class DualSimilarityIndex:
|
||
"""text_sim × lemma_sim × element_sim 가중 결합."""
|
||
|
||
def __init__(
|
||
self,
|
||
docs: list[ReferenceDoc],
|
||
doc_elements: list[ExtractedElements],
|
||
doc_lemmas: list[list[str]],
|
||
settings: Settings,
|
||
text_backend: TextSimilarityBackend,
|
||
):
|
||
if not (len(docs) == len(doc_elements) == len(doc_lemmas)):
|
||
raise ValueError("docs/elements/lemmas length mismatch")
|
||
self._docs = docs
|
||
self._doc_elements = doc_elements
|
||
self._doc_lemmas = doc_lemmas
|
||
self._settings = settings
|
||
self._text_backend = text_backend
|
||
|
||
def query(
|
||
self,
|
||
text: str,
|
||
query_elements: ExtractedElements,
|
||
top_k: int = 5,
|
||
) -> list[SimilarityHit]:
|
||
if not self._docs:
|
||
return []
|
||
|
||
text_scores = self._text_backend.query(text)
|
||
if text_scores.size == 0:
|
||
return []
|
||
|
||
query_lemmas = extract_lemmas(text)
|
||
|
||
s = self._settings
|
||
hits: list[SimilarityHit] = []
|
||
for idx, doc in enumerate(self._docs):
|
||
text_sim = float(text_scores[idx])
|
||
lemma_sim = lemma_overlap_ratio(query_lemmas, self._doc_lemmas[idx])
|
||
elem_sim = _element_similarities(query_elements, self._doc_elements[idx])
|
||
|
||
combined = (
|
||
s.weight_text_sim * text_sim
|
||
+ s.weight_lemma_sim * lemma_sim
|
||
+ s.weight_char_sim * elem_sim["characters"]
|
||
+ s.weight_motif_sim * elem_sim["motifs"]
|
||
)
|
||
if combined <= 0:
|
||
continue
|
||
hits.append(
|
||
SimilarityHit(
|
||
doc_id=doc.doc_id,
|
||
title=doc.title,
|
||
score=combined,
|
||
text_sim=text_sim,
|
||
lemma_sim=lemma_sim,
|
||
element_sim=elem_sim,
|
||
evidence=_find_evidence_spans(text, doc.text),
|
||
)
|
||
)
|
||
|
||
hits.sort(key=lambda h: h.score, reverse=True)
|
||
return hits[:top_k]
|
||
|
||
|
||
def _find_evidence_spans(query: str, reference: str, ngram: int = 6, max_spans: int = 5) -> list[EvidenceSpan]:
|
||
if len(query) < ngram or len(reference) < ngram:
|
||
return []
|
||
ref_grams = {reference[i : i + ngram] for i in range(len(reference) - ngram + 1)}
|
||
spans: list[EvidenceSpan] = []
|
||
i = 0
|
||
while i <= len(query) - ngram and len(spans) < max_spans:
|
||
if query[i : i + ngram] in ref_grams:
|
||
start = i
|
||
end = i + ngram
|
||
while end < len(query) and query[end - ngram + 1 : end + 1] in ref_grams:
|
||
end += 1
|
||
spans.append(EvidenceSpan(start=start, end=end, matched=query[start:end]))
|
||
i = end
|
||
else:
|
||
i += 1
|
||
return spans
|
||
|
||
|
||
def build_text_backend(docs: list[ReferenceDoc], settings: Settings) -> TextSimilarityBackend:
|
||
"""우선순위: KoSimCSE (자체) → OpenAI 임베딩 → TF-IDF (폴백).
|
||
|
||
PDF VII-3 권장은 KoSimCSE/KoSBERT. 자체 모델/오픈소스 우선.
|
||
"""
|
||
if settings.use_kosimcse:
|
||
try:
|
||
return KoSimCSEBackend(docs, settings)
|
||
except Exception as exc:
|
||
logger.warning("KoSimCSE backend init failed, trying next: %s", exc)
|
||
if settings.use_embedding_similarity and settings.has_openai:
|
||
try:
|
||
logger.info("Using EmbeddingBackend (model=%s)", settings.openai_embedding_model)
|
||
return EmbeddingBackend(docs, settings)
|
||
except Exception as exc:
|
||
logger.warning("Embedding backend init failed, falling back to TF-IDF: %s", exc)
|
||
logger.info("Using TfidfBackend")
|
||
return TfidfBackend(docs)
|
||
|
||
|
||
SimilarityIndex = DualSimilarityIndex # 후방 호환
|