"""사내 plagia_result 데이터셋(pos 500 + neg 500)으로 하이브리드 성능 평가. 전임자 가이드 검증: - 의미기반 스코어: 기존 1단계 모델의 메타데이터 임베딩 코사인 (이미 계산되어 있음) - 구조기반 스코어: 우리 엔진의 lemma 교집합 비율 (본문 텍스트 기반) - 조합: hybrid = α * meta_sim + (1-α) * lemma_sim → α와 threshold 그리드 서치로 최적 F1 도출 사용: python scripts/evaluate_o2o_dataset.py \ --data-dir /Users/marineyang/Desktop/work/code/AI_publish_3rdtest/25/plagia_result """ from __future__ import annotations import argparse import json import logging import sys from dataclasses import dataclass from pathlib import Path from typing import Iterable import numpy as np ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from app.engine.structural import extract_lemmas, lemma_overlap_ratio # noqa: E402 logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") logger = logging.getLogger("eval-o2o") @dataclass class Sample: sample_id: str is_plagiarism: bool original_text: str augmented_text: str meta_sim: float | None # 기존 모델 점수 lemma_sim: float | None = None # 평가 시 채워짐 def _load_meta_sims(data_dir: Path) -> dict[str, float]: candidates = sorted(data_dir.glob("all_similarities_*.json")) if not candidates: return {} with candidates[-1].open("r", encoding="utf-8") as f: data = json.load(f) out: dict[str, float] = {} for row in data.get("pos_results", []): if row.get("cosine_similarity") is not None: out[row["id"]] = float(row["cosine_similarity"]) for row in data.get("neg_results", []): if row.get("cosine_similarity") is not None: out[row["id"]] = float(row["cosine_similarity"]) return out def _load_samples(data_dir: Path) -> list[Sample]: pos = json.load((data_dir / "plagiarism_pos_metadata.json").open("r", encoding="utf-8")) neg = json.load((data_dir / "plagiarism_neg_metadata.json").open("r", encoding="utf-8")) meta_sims = _load_meta_sims(data_dir) samples: list[Sample] = [] for i, item in enumerate(pos, start=1): sid = f"POS{i:03d}" samples.append(Sample( sample_id=sid, is_plagiarism=True, original_text=item["original_text"], augmented_text=item["augmented_text"], meta_sim=meta_sims.get(sid), )) for i, item in enumerate(neg, start=1): sid = f"NEG{i:03d}" samples.append(Sample( sample_id=sid, is_plagiarism=False, original_text=item["original_text"], augmented_text=item["augmented_text"], meta_sim=meta_sims.get(sid), )) return samples def _compute_lemma_sims(samples: list[Sample]) -> None: """augmented (의심 표절본) 기준 lemma 교집합 비율.""" for i, s in enumerate(samples, 1): q = extract_lemmas(s.augmented_text) r = extract_lemmas(s.original_text) s.lemma_sim = lemma_overlap_ratio(q, r) if i % 200 == 0: logger.info("Lemma extraction %d/%d", i, len(samples)) def _metrics(scores: np.ndarray, labels: np.ndarray, threshold: float) -> dict[str, float]: pred = scores >= threshold tp = int(((pred == 1) & (labels == 1)).sum()) fp = int(((pred == 1) & (labels == 0)).sum()) tn = int(((pred == 0) & (labels == 0)).sum()) fn = int(((pred == 0) & (labels == 1)).sum()) precision = tp / (tp + fp) if (tp + fp) else 0.0 recall = tp / (tp + fn) if (tp + fn) else 0.0 f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0 acc = (tp + tn) / max(1, tp + fp + tn + fn) return { "threshold": threshold, "precision": precision, "recall": recall, "f1": f1, "accuracy": acc, "tp": tp, "fp": fp, "tn": tn, "fn": fn, } def _best_threshold(scores: np.ndarray, labels: np.ndarray, grid: Iterable[float] = None) -> dict[str, float]: if grid is None: grid = np.arange(0.05, 0.99, 0.01) best = None for t in grid: m = _metrics(scores, labels, float(t)) if best is None or m["f1"] > best["f1"]: best = m return best def _distribution_summary(scores: np.ndarray, labels: np.ndarray) -> str: pos_scores = scores[labels == 1] neg_scores = scores[labels == 0] return ( f"POS n={len(pos_scores)} avg={pos_scores.mean():.4f} std={pos_scores.std():.4f} " f"min={pos_scores.min():.4f} max={pos_scores.max():.4f}\n" f"NEG n={len(neg_scores)} avg={neg_scores.mean():.4f} std={neg_scores.std():.4f} " f"min={neg_scores.min():.4f} max={neg_scores.max():.4f}\n" f"분리도(POS평균 - NEG평균) = {pos_scores.mean() - neg_scores.mean():+.4f}" ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--data-dir", required=True, help="plagia_result 디렉토리") parser.add_argument("--out-json", default="data/training/o2o_eval_result.json") args = parser.parse_args() data_dir = Path(args.data_dir).expanduser().resolve() samples = _load_samples(data_dir) logger.info("Loaded %d samples (%d POS, %d NEG)", len(samples), sum(s.is_plagiarism for s in samples), sum(not s.is_plagiarism for s in samples)) logger.info("Computing lemma overlap for all pairs...") _compute_lemma_sims(samples) # 메타 점수가 누락된 샘플 제외 (기존 결과 누락분) valid = [s for s in samples if s.meta_sim is not None and s.lemma_sim is not None] logger.info("Valid samples for evaluation: %d", len(valid)) labels = np.array([1 if s.is_plagiarism else 0 for s in valid]) meta_scores = np.array([s.meta_sim for s in valid]) lemma_scores = np.array([s.lemma_sim for s in valid]) print() print("=" * 72) print("[1] 메타 임베딩 점수 단독 (기존 1단계 모델 재현)") print("=" * 72) print(_distribution_summary(meta_scores, labels)) best_meta = _best_threshold(meta_scores, labels) print(f"\n최적 F1: threshold={best_meta['threshold']:.2f} " f"P={best_meta['precision']:.4f} R={best_meta['recall']:.4f} " f"F1={best_meta['f1']:.4f} Acc={best_meta['accuracy']:.4f}") print(f" TP={best_meta['tp']} FP={best_meta['fp']} " f"TN={best_meta['tn']} FN={best_meta['fn']}") print() print("=" * 72) print("[2] Lemma 교집합 점수 단독 (우리가 추가한 구조 분석)") print("=" * 72) print(_distribution_summary(lemma_scores, labels)) best_lemma = _best_threshold(lemma_scores, labels) print(f"\n최적 F1: threshold={best_lemma['threshold']:.2f} " f"P={best_lemma['precision']:.4f} R={best_lemma['recall']:.4f} " f"F1={best_lemma['f1']:.4f} Acc={best_lemma['accuracy']:.4f}") print(f" TP={best_lemma['tp']} FP={best_lemma['fp']} " f"TN={best_lemma['tn']} FN={best_lemma['fn']}") print() print("=" * 72) print("[3] 하이브리드 = α·meta + (1-α)·lemma ── α 그리드 서치") print("=" * 72) print(f"{'α(meta)':>9} {'threshold':>10} {'precision':>10} {'recall':>10} {'F1':>8} {'acc':>8}") best_hybrid = None best_alpha = None rows = [] for alpha in np.arange(0.0, 1.01, 0.05): combined = alpha * meta_scores + (1 - alpha) * lemma_scores m = _best_threshold(combined, labels) rows.append((float(alpha), m)) print(f"{alpha:>9.2f} {m['threshold']:>10.2f} {m['precision']:>10.4f} " f"{m['recall']:>10.4f} {m['f1']:>8.4f} {m['accuracy']:>8.4f}") if best_hybrid is None or m["f1"] > best_hybrid["f1"]: best_hybrid = m best_alpha = float(alpha) print() print("=" * 72) print("[4] 요약 비교") print("=" * 72) print(f"{'모델':25s} {'precision':>10} {'recall':>10} {'F1':>8} {'threshold':>10}") print(f"{'기존 모델 (result.json)':25s} {0.9520:>10.4f} {0.9560:>10.4f} {0.9540:>8.4f} {0.78:>10.2f}") print(f"{'메타 단독 (재현)':25s} {best_meta['precision']:>10.4f} " f"{best_meta['recall']:>10.4f} {best_meta['f1']:>8.4f} {best_meta['threshold']:>10.2f}") print(f"{'Lemma 단독':25s} {best_lemma['precision']:>10.4f} " f"{best_lemma['recall']:>10.4f} {best_lemma['f1']:>8.4f} {best_lemma['threshold']:>10.2f}") print(f"{f'하이브리드 (α={best_alpha:.2f})':25s} {best_hybrid['precision']:>10.4f} " f"{best_hybrid['recall']:>10.4f} {best_hybrid['f1']:>8.4f} {best_hybrid['threshold']:>10.2f}") out_path = ROOT / args.out_json out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", encoding="utf-8") as f: json.dump({ "n_valid": len(valid), "meta_only": best_meta, "lemma_only": best_lemma, "hybrid_best": {"alpha": best_alpha, **best_hybrid}, "hybrid_grid": [{"alpha": a, **m} for a, m in rows], "distributions": { "meta_pos_avg": float(meta_scores[labels == 1].mean()), "meta_neg_avg": float(meta_scores[labels == 0].mean()), "lemma_pos_avg": float(lemma_scores[labels == 1].mean()), "lemma_neg_avg": float(lemma_scores[labels == 0].mean()), }, }, f, ensure_ascii=False, indent=2) print(f"\n결과 저장: {out_path}") return 0 if __name__ == "__main__": raise SystemExit(main())