"""KoSimCSE + Lemma + 자카드 통합 평가 시각화 (v2.1.0-kosimcse 기준). 기존 visualize_eval.py 가 OpenAI 메타 임베딩만 측정했던 것을 보강: - KoSimCSE 본문 임베딩 코사인 (자체 산출물) - Lemma 교집합 비율 (형태소 구조) - 인물/모티프 자카드 (요소) - 4-way 결합 (text 0.30 + lemma 0.45 + char 0.15 + motif 0.10) - 기존 OpenAI 메타 임베딩(이미 계산된 값) 도 baseline 으로 비교 사용: python scripts/visualize_eval_v2.py \ --data-dir /Users/marineyang/Desktop/work/code/AI_publish_3rdtest/25/plagia_result """ from __future__ import annotations import argparse import json import logging import sys from pathlib import Path import matplotlib matplotlib.use("Agg") import matplotlib.font_manager as fm import matplotlib.pyplot as plt import numpy as np ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from app.engine.extractor import RuleExtractor # noqa: E402 from app.engine.similarity import _element_similarities # noqa: E402 from app.engine.structural import extract_lemmas, lemma_overlap_ratio # noqa: E402 logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") logger = logging.getLogger("visualize-v2") # 운영 가중치 (.env 기본값) W_TEXT = 0.30 W_LEMMA = 0.45 W_CHAR = 0.15 W_MOTIF = 0.10 # KoSimCSE 모델 KOSIMCSE_MODEL = "BM-K/KoSimCSE-roberta-multitask" KOSIMCSE_MAX_CHARS = 2048 def _setup_korean_font() -> None: for name in ["AppleSDGothicNeo-Regular", "Apple SD Gothic Neo", "NanumGothic", "Nanum Gothic", "Malgun Gothic"]: if name in {f.name for f in fm.fontManager.ttflist}: plt.rcParams["font.family"] = name break plt.rcParams["axes.unicode_minus"] = False def _load_data(data_dir: Path): pos = json.load((data_dir / "plagiarism_pos_metadata.json").open(encoding="utf-8")) neg = json.load((data_dir / "plagiarism_neg_metadata.json").open(encoding="utf-8")) sims_path = sorted(data_dir.glob("all_similarities_*.json"))[-1] sims = json.load(sims_path.open(encoding="utf-8")) meta_map: dict[str, float] = {} for r in sims.get("pos_results", []) + sims.get("neg_results", []): if r.get("cosine_similarity") is not None: meta_map[r["id"]] = float(r["cosine_similarity"]) rows = [] for i, item in enumerate(pos, 1): sid = f"POS{i:03d}" if sid not in meta_map: continue rows.append((sid, True, item["original_text"], item["augmented_text"], meta_map[sid])) for i, item in enumerate(neg, 1): sid = f"NEG{i:03d}" if sid not in meta_map: continue rows.append((sid, False, item["original_text"], item["augmented_text"], meta_map[sid])) return rows def _compute_all_scores(rows): """모든 점수 컴포넌트 계산: KoSimCSE / Lemma / Char / Motif / 기존 메타.""" from sentence_transformers import SentenceTransformer logger.info("Loading KoSimCSE: %s", KOSIMCSE_MODEL) model = SentenceTransformer(KOSIMCSE_MODEL) extractor = RuleExtractor() labels = np.array([1 if r[1] else 0 for r in rows]) meta_emb = np.array([r[4] for r in rows], dtype=np.float32) # KoSimCSE: original + augmented 모두 임베딩 후 페어별 cosine originals = [r[2][:KOSIMCSE_MAX_CHARS] for r in rows] augments = [r[3][:KOSIMCSE_MAX_CHARS] for r in rows] logger.info("Encoding %d original texts with KoSimCSE...", len(originals)) orig_emb = model.encode(originals, normalize_embeddings=True, show_progress_bar=False, batch_size=16) logger.info("Encoding %d augmented texts...", len(augments)) aug_emb = model.encode(augments, normalize_embeddings=True, show_progress_bar=False, batch_size=16) kosimcse_sim = (orig_emb * aug_emb).sum(axis=1) # Lemma / Char / Motif lemma_scores, char_scores, motif_scores = [], [], [] for i, (sid, _, orig, aug, _) in enumerate(rows, 1): q_l = extract_lemmas(aug) r_l = extract_lemmas(orig) lemma_scores.append(lemma_overlap_ratio(q_l, r_l)) q_e = extractor.extract(aug) r_e = extractor.extract(orig) es = _element_similarities(q_e, r_e) char_scores.append(es["characters"]) motif_scores.append(es["motifs"]) if i % 200 == 0: logger.info("Lemma/element %d/%d", i, len(rows)) return { "labels": labels, "meta_emb": meta_emb, "kosimcse": np.array(kosimcse_sim), "lemma": np.array(lemma_scores), "char": np.array(char_scores), "motif": np.array(motif_scores), } def _metrics(scores: np.ndarray, labels: np.ndarray, t: float): pred = scores >= t tp = int(((pred == 1) & (labels == 1)).sum()) fp = int(((pred == 1) & (labels == 0)).sum()) tn = int(((pred == 0) & (labels == 0)).sum()) fn = int(((pred == 0) & (labels == 1)).sum()) p = tp / (tp + fp) if (tp + fp) else 0.0 r = tp / (tp + fn) if (tp + fn) else 0.0 f1 = 2 * p * r / (p + r) if (p + r) else 0.0 acc = (tp + tn) / max(1, tp + fp + tn + fn) return {"threshold": t, "precision": p, "recall": r, "f1": f1, "accuracy": acc, "tp": tp, "fp": fp, "tn": tn, "fn": fn} def _best(scores, labels, grid=None): if grid is None: grid = np.arange(0.05, 0.99, 0.01) rows = [_metrics(scores, labels, float(t)) for t in grid] return max(rows, key=lambda m: m["f1"]), rows def _dist_summary(scores, labels): p = scores[labels == 1]; n = scores[labels == 0] return { "pos_avg": float(p.mean()), "pos_std": float(p.std()), "neg_avg": float(n.mean()), "neg_std": float(n.std()), "separation": float(p.mean() - n.mean()), } def plot_distributions(scores, out_path: Path): labels = scores["labels"] fig, axes = plt.subplots(1, 2, figsize=(13, 5)) bins = np.linspace(0, 1, 41) for ax, key, title, default_t in [ (axes[0], "kosimcse", "KoSimCSE 코사인 (본문 의미)", 0.50), (axes[1], "lemma", "Lemma 교집합 비율 (형태소 구조)", 0.59), ]: ax.hist(scores[key][labels == 1], bins=bins, alpha=0.6, label="POS (표절)", color="#d62728") ax.hist(scores[key][labels == 0], bins=bins, alpha=0.6, label="NEG (비표절)", color="#1f77b4") ax.set_title(title); ax.set_xlabel("score"); ax.set_ylabel("count") ax.axvline(default_t, color="black", linestyle="--", alpha=0.5) ax.legend(); ax.grid(alpha=0.3) fig.suptitle("점수 컴포넌트 분포 — POS vs NEG", fontsize=14) fig.tight_layout() fig.savefig(out_path, dpi=120, bbox_inches="tight") plt.close(fig) def plot_threshold_curves(scores, out_path: Path): labels = scores["labels"] grid = np.arange(0.05, 0.99, 0.01) series = { "메타 임베딩 (OpenAI, 이전 baseline)": scores["meta_emb"], "KoSimCSE 단독 (자체 산출물)": scores["kosimcse"], "Lemma 단독": scores["lemma"], "하이브리드 (운영 가중치)": ( W_TEXT * scores["kosimcse"] + W_LEMMA * scores["lemma"] + W_CHAR * scores["char"] + W_MOTIF * scores["motif"] ), } colors = ["#1f77b4", "#ff7f0e", "#9467bd", "#2ca02c"] fig, axes = plt.subplots(1, 2, figsize=(14, 5)) for (name, s), c in zip(series.items(), colors): curve = [_metrics(s, labels, float(t)) for t in grid] axes[0].plot(grid, [m["f1"] for m in curve], label=name, color=c, linewidth=2.5 if "하이브리드" in name else 1.8) axes[1].plot([m["recall"] for m in curve], [m["precision"] for m in curve], label=name, color=c, linewidth=2.5 if "하이브리드" in name else 1.8) axes[0].set_title("Threshold별 F1"); axes[0].set_xlabel("threshold"); axes[0].set_ylabel("F1") axes[0].set_ylim(0.0, 1.0); axes[0].grid(alpha=0.3); axes[0].legend(fontsize=9) axes[1].set_title("Precision-Recall"); axes[1].set_xlabel("recall"); axes[1].set_ylabel("precision") axes[1].set_xlim(0.5, 1.0); axes[1].set_ylim(0.5, 1.0); axes[1].grid(alpha=0.3); axes[1].legend(fontsize=9) fig.suptitle("모델 성능 곡선 — 운영 가중치(text 0.30 / lemma 0.45 / char 0.15 / motif 0.10)", fontsize=13) fig.tight_layout() fig.savefig(out_path, dpi=120, bbox_inches="tight") plt.close(fig) def plot_comparison(scores, out_path: Path): labels = scores["labels"] hybrid = ( W_TEXT * scores["kosimcse"] + W_LEMMA * scores["lemma"] + W_CHAR * scores["char"] + W_MOTIF * scores["motif"] ) best_meta, _ = _best(scores["meta_emb"], labels) best_kos, _ = _best(scores["kosimcse"], labels) best_lemma, _ = _best(scores["lemma"], labels) best_hybrid, _ = _best(hybrid, labels) result_json = {"precision": 0.952, "recall": 0.956, "f1": 0.954, "threshold": 0.78} models = ["기존 result.json", "메타 임베딩", "KoSimCSE", "Lemma", "하이브리드"] metrics_data = [result_json, best_meta, best_kos, best_lemma, best_hybrid] precisions = [m["precision"] for m in metrics_data] recalls = [m["recall"] for m in metrics_data] f1s = [m["f1"] for m in metrics_data] x = np.arange(len(models)); w = 0.27 fig, ax = plt.subplots(figsize=(12, 5.5)) ax.bar(x - w, precisions, w, label="Precision", color="#1f77b4") ax.bar(x, recalls, w, label="Recall", color="#ff7f0e") ax.bar(x + w, f1s, w, label="F1", color="#2ca02c") for i, (p, r, f1) in enumerate(zip(precisions, recalls, f1s)): ax.text(i - w, p + 0.005, f"{p:.3f}", ha="center", fontsize=8) ax.text(i, r + 0.005, f"{r:.3f}", ha="center", fontsize=8) ax.text(i + w, f1 + 0.005, f"{f1:.3f}", ha="center", fontsize=8) ax.set_xticks(x); ax.set_xticklabels(models, fontsize=10) ax.set_ylim(0.7, 1.0); ax.set_ylabel("점수"); ax.grid(alpha=0.3, axis="y"); ax.legend() ax.set_title("모델 성능 비교 (사내 1000쌍, F1 최적 threshold)", fontsize=13) fig.tight_layout() fig.savefig(out_path, dpi=120, bbox_inches="tight") plt.close(fig) return best_meta, best_kos, best_lemma, best_hybrid, result_json def write_report(out_path: Path, scores, best_meta, best_kos, best_lemma, best_hybrid, result_json): labels = scores["labels"] n = int(len(labels)); n_pos = int(labels.sum()); n_neg = n - n_pos s = {k: _dist_summary(scores[k], labels) for k in ["meta_emb", "kosimcse", "lemma"]} md = f"""# 사내 plagia_result 데이터셋 평가 리포트 (v2.1.0-kosimcse) - **데이터셋**: 표절 페어 {n_pos}건 + 비표절 페어 {n_neg}건 (총 {n}쌍) - **엔진 버전**: o2o-plagiarism-2.1.0-kosimcse - **운영 가중치**: text(KoSimCSE) {W_TEXT} / lemma {W_LEMMA} / char {W_CHAR} / motif {W_MOTIF} ## 1. 점수 컴포넌트 분포 | 점수 | POS 평균 | NEG 평균 | 분리도 | std(POS/NEG) | |---|---|---|---|---| | 메타 임베딩 (OpenAI, baseline) | {s['meta_emb']['pos_avg']:.4f} | {s['meta_emb']['neg_avg']:.4f} | **+{s['meta_emb']['separation']:.4f}** | {s['meta_emb']['pos_std']:.3f} / {s['meta_emb']['neg_std']:.3f} | | **KoSimCSE 본문 (자체)** | **{s['kosimcse']['pos_avg']:.4f}** | **{s['kosimcse']['neg_avg']:.4f}** | **+{s['kosimcse']['separation']:.4f}** | {s['kosimcse']['pos_std']:.3f} / {s['kosimcse']['neg_std']:.3f} | | **Lemma 교집합** | **{s['lemma']['pos_avg']:.4f}** | **{s['lemma']['neg_avg']:.4f}** | **+{s['lemma']['separation']:.4f}** | {s['lemma']['pos_std']:.3f} / {s['lemma']['neg_std']:.3f} | → 그래프: `reports/01_score_distributions.png` ## 2. 모델별 최적 성능 (F1 최대화 threshold) | 모델 | Precision | Recall | **F1** | Threshold | |---|---|---|---|---| | 기존 result.json (전임자 1단계) | {result_json['precision']:.4f} | {result_json['recall']:.4f} | **{result_json['f1']:.4f}** | {result_json['threshold']:.2f} | | 메타 임베딩 단독 (OpenAI) | {best_meta['precision']:.4f} | {best_meta['recall']:.4f} | {best_meta['f1']:.4f} | {best_meta['threshold']:.2f} | | **KoSimCSE 단독 (자체)** | **{best_kos['precision']:.4f}** | **{best_kos['recall']:.4f}** | **{best_kos['f1']:.4f}** | {best_kos['threshold']:.2f} | | **Lemma 단독** | **{best_lemma['precision']:.4f}** | **{best_lemma['recall']:.4f}** | **{best_lemma['f1']:.4f}** | {best_lemma['threshold']:.2f} | | **하이브리드 (운영 가중치)** ⭐ | **{best_hybrid['precision']:.4f}** | **{best_hybrid['recall']:.4f}** | **{best_hybrid['f1']:.4f}** | {best_hybrid['threshold']:.2f} | → 그래프: `reports/02_threshold_curves.png`, `reports/03_model_comparison.png` ## 3. Confusion Matrix (하이브리드, threshold={best_hybrid['threshold']:.2f}) | | 예측: 표절 | 예측: 비표절 | |---|---|---| | **실제: 표절** | TP = {best_hybrid['tp']} | FN = {best_hybrid['fn']} | | **실제: 비표절** | FP = {best_hybrid['fp']} | TN = {best_hybrid['tn']} | ## 4. 결론 1. **KoSimCSE 도입으로 자체 산출물 정합성 확보** — OpenAI 의존 0, 호출 비용 0, 데이터 외부 노출 0 2. **Lemma 컴포넌트가 단독으로도 강력** — F1 {best_lemma['f1']:.4f} (자서전 도메인의 어미 변경 표절을 결정적으로 잡음) 3. **하이브리드가 가장 안정** — recall {best_hybrid['recall']:.4f} (실제 표절을 거의 다 잡음) 4. **PDF v1.2 권장 임계값 0.85 와의 관계** — 본 평가는 plagia_result 데이터 (출판 콘텐츠) 기준 F1 최적치이며, 실제 자서전 도메인에서는 PDF 권장 0.85 적용을 우선 (정밀도 우선, 재현율 일부 손실 감수) """ out_path.write_text(md, encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--data-dir", required=True) parser.add_argument("--out-dir", default=str(ROOT / "reports")) args = parser.parse_args() _setup_korean_font() data_dir = Path(args.data_dir).expanduser().resolve() out_dir = Path(args.out_dir).resolve() out_dir.mkdir(parents=True, exist_ok=True) rows = _load_data(data_dir) logger.info("Loaded %d valid samples", len(rows)) scores = _compute_all_scores(rows) logger.info("Plotting distributions..."); plot_distributions(scores, out_dir / "01_score_distributions.png") logger.info("Plotting threshold curves..."); plot_threshold_curves(scores, out_dir / "02_threshold_curves.png") logger.info("Plotting comparison...") best_meta, best_kos, best_lemma, best_hybrid, result_json = plot_comparison(scores, out_dir / "03_model_comparison.png") write_report(out_dir / "REPORT.md", scores, best_meta, best_kos, best_lemma, best_hybrid, result_json) print() print("=" * 60) print("v2 평가 리포트 생성 완료") print("=" * 60) print(f" 📊 {out_dir / '01_score_distributions.png'}") print(f" 📊 {out_dir / '02_threshold_curves.png'}") print(f" 📊 {out_dir / '03_model_comparison.png'}") print(f" 📄 {out_dir / 'REPORT.md'}") return 0 if __name__ == "__main__": raise SystemExit(main())