"""삼중 유사도 알고리즘. 계획서 p.21 "이중 유사도 분석" + 전임자 지침 (lemma 교집합 구조 분석) 결합: ① text_sim : 표면 의미 유사도 (TF-IDF / 임베딩 코사인) ② lemma_sim : 형태소 기본형 교집합 비율 (복붙+말투변경 탐지) ③ element_sim: 인물/모티프 자카드 (구조 분석) 최종 score = w1·text + w2·lemma + w3·char + w4·motif """ from __future__ import annotations import logging import re from dataclasses import dataclass from typing import Protocol import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from app.api.schemas import EvidenceSpan, ExtractedElements from app.core.config import Settings, get_settings from app.engine.corpus import ReferenceDoc from app.engine.structural import extract_lemmas, lemma_overlap_ratio logger = logging.getLogger(__name__) @dataclass class SimilarityHit: doc_id: str title: str score: float # 결합 유사도 text_sim: float # ① 표면 lemma_sim: float # ② 구조 (lemma 교집합 비율) element_sim: dict[str, float] # ③ 인물/모티프/장르 evidence: list[EvidenceSpan] # ---------- ① 표면 유사도 백엔드 ---------- class TextSimilarityBackend(Protocol): def query(self, text: str) -> np.ndarray: ... def _korean_tokenizer(text: str) -> list[str]: words = re.findall(r"[가-힣A-Za-z0-9]+", text) grams: list[str] = [] for w in words: if len(w) <= 3: grams.append(w) else: grams.extend(w[i : i + 3] for i in range(len(w) - 2)) return grams class TfidfBackend: def __init__(self, docs: list[ReferenceDoc]): self._docs = docs if not docs: self._vectorizer = None self._matrix = None return self._vectorizer = TfidfVectorizer( tokenizer=_korean_tokenizer, lowercase=False, token_pattern=None, min_df=1, ) self._matrix = self._vectorizer.fit_transform([d.text for d in docs]) def query(self, text: str) -> np.ndarray: if not self._docs or self._vectorizer is None: return np.array([]) vec = self._vectorizer.transform([text]) return cosine_similarity(vec, self._matrix).ravel() class EmbeddingBackend: def __init__(self, docs: list[ReferenceDoc], settings: Settings): from openai import OpenAI self._docs = docs self._model = settings.openai_embedding_model self._client = OpenAI(api_key=settings.openai_api_key) if not docs: self._matrix = None return self._matrix = self._embed_batch([d.text[:8000] for d in docs]) def _embed_batch(self, texts: list[str]) -> np.ndarray: resp = self._client.embeddings.create(model=self._model, input=texts) arr = np.array([item.embedding for item in resp.data], dtype=np.float32) norms = np.linalg.norm(arr, axis=1, keepdims=True) norms[norms == 0] = 1.0 return arr / norms def query(self, text: str) -> np.ndarray: if not self._docs or self._matrix is None: return np.array([]) q = self._embed_batch([text[:8000]]) return (q @ self._matrix.T).ravel() class KoSimCSEBackend: """한국어 오픈소스 임베딩 (PDF VII-3 권장 - KoSimCSE/KoSBERT). BM-K/KoSimCSE-roberta-multitask 등 sentence-transformers 호환 모델 사용. OpenAI 의존 없이 로컬에서 동작 → 데이터 외부 노출 0, 호출 비용 0. 첫 호출 시 모델 자동 다운로드 (~500MB). """ _model_cache: dict[str, object] = {} def __init__(self, docs: list[ReferenceDoc], settings: Settings): from sentence_transformers import SentenceTransformer self._docs = docs self._model_name = settings.kosimcse_model self._max_length = settings.kosimcse_max_length # 모델 재사용 (코퍼스 재빌드 시 매번 로드 방지) if self._model_name not in KoSimCSEBackend._model_cache: logger.info("Loading SentenceTransformer model: %s", self._model_name) KoSimCSEBackend._model_cache[self._model_name] = SentenceTransformer(self._model_name) self._model = KoSimCSEBackend._model_cache[self._model_name] if not docs: self._matrix = None return texts = [d.text[: self._max_length * 4] for d in docs] # 문자 기준 잘림 (토큰화 후 다시 잘림) self._matrix = self._encode(texts) def _encode(self, texts: list[str]) -> np.ndarray: emb = self._model.encode( texts, normalize_embeddings=True, show_progress_bar=False, batch_size=8, ) return np.array(emb, dtype=np.float32) def query(self, text: str) -> np.ndarray: if not self._docs or self._matrix is None: return np.array([]) q = self._encode([text[: self._max_length * 4]]) return (q @ self._matrix.T).ravel() # ---------- ③ 요소 유사도 ---------- def _jaccard(a: list[str], b: list[str]) -> float: sa, sb = set(s.lower() for s in a), set(s.lower() for s in b) if not sa and not sb: return 0.0 return len(sa & sb) / max(1, len(sa | sb)) def _element_similarities(q: ExtractedElements, c: ExtractedElements) -> dict[str, float]: return { "characters": _jaccard(q.characters, c.characters), "motifs": _jaccard(q.motifs, c.motifs), "keywords": _jaccard(q.keywords, c.keywords), "genre": 1.0 if q.genre and q.genre == c.genre else 0.0, } # ---------- 삼중 유사도 인덱스 ---------- class DualSimilarityIndex: """text_sim × lemma_sim × element_sim 가중 결합.""" def __init__( self, docs: list[ReferenceDoc], doc_elements: list[ExtractedElements], doc_lemmas: list[list[str]], settings: Settings, text_backend: TextSimilarityBackend, ): if not (len(docs) == len(doc_elements) == len(doc_lemmas)): raise ValueError("docs/elements/lemmas length mismatch") self._docs = docs self._doc_elements = doc_elements self._doc_lemmas = doc_lemmas self._settings = settings self._text_backend = text_backend def query( self, text: str, query_elements: ExtractedElements, top_k: int = 5, ) -> list[SimilarityHit]: if not self._docs: return [] text_scores = self._text_backend.query(text) if text_scores.size == 0: return [] query_lemmas = extract_lemmas(text) s = self._settings hits: list[SimilarityHit] = [] for idx, doc in enumerate(self._docs): text_sim = float(text_scores[idx]) lemma_sim = lemma_overlap_ratio(query_lemmas, self._doc_lemmas[idx]) elem_sim = _element_similarities(query_elements, self._doc_elements[idx]) combined = ( s.weight_text_sim * text_sim + s.weight_lemma_sim * lemma_sim + s.weight_char_sim * elem_sim["characters"] + s.weight_motif_sim * elem_sim["motifs"] ) if combined <= 0: continue hits.append( SimilarityHit( doc_id=doc.doc_id, title=doc.title, score=combined, text_sim=text_sim, lemma_sim=lemma_sim, element_sim=elem_sim, evidence=_find_evidence_spans(text, doc.text), ) ) hits.sort(key=lambda h: h.score, reverse=True) return hits[:top_k] def _find_evidence_spans(query: str, reference: str, ngram: int = 6, max_spans: int = 5) -> list[EvidenceSpan]: if len(query) < ngram or len(reference) < ngram: return [] ref_grams = {reference[i : i + ngram] for i in range(len(reference) - ngram + 1)} spans: list[EvidenceSpan] = [] i = 0 while i <= len(query) - ngram and len(spans) < max_spans: if query[i : i + ngram] in ref_grams: start = i end = i + ngram while end < len(query) and query[end - ngram + 1 : end + 1] in ref_grams: end += 1 spans.append(EvidenceSpan(start=start, end=end, matched=query[start:end])) i = end else: i += 1 return spans def build_text_backend(docs: list[ReferenceDoc], settings: Settings) -> TextSimilarityBackend: """우선순위: KoSimCSE (자체) → OpenAI 임베딩 → TF-IDF (폴백). PDF VII-3 권장은 KoSimCSE/KoSBERT. 자체 모델/오픈소스 우선. """ if settings.use_kosimcse: try: return KoSimCSEBackend(docs, settings) except Exception as exc: logger.warning("KoSimCSE backend init failed, trying next: %s", exc) if settings.use_embedding_similarity and settings.has_openai: try: logger.info("Using EmbeddingBackend (model=%s)", settings.openai_embedding_model) return EmbeddingBackend(docs, settings) except Exception as exc: logger.warning("Embedding backend init failed, falling back to TF-IDF: %s", exc) logger.info("Using TfidfBackend") return TfidfBackend(docs) SimilarityIndex = DualSimilarityIndex # 후방 호환