o2o-plagiarism-ai/scripts/evaluate_o2o_dataset.py

238 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""사내 plagia_result 데이터셋(pos 500 + neg 500)으로 하이브리드 성능 평가.
전임자 가이드 검증:
- 의미기반 스코어: 기존 1단계 모델의 메타데이터 임베딩 코사인 (이미 계산되어 있음)
- 구조기반 스코어: 우리 엔진의 lemma 교집합 비율 (본문 텍스트 기반)
- 조합: hybrid = α * meta_sim + (1-α) * lemma_sim
→ α와 threshold 그리드 서치로 최적 F1 도출
사용:
python scripts/evaluate_o2o_dataset.py \
--data-dir /Users/marineyang/Desktop/work/code/AI_publish_3rdtest/25/plagia_result
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
import numpy as np
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from app.engine.structural import extract_lemmas, lemma_overlap_ratio # noqa: E402
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.getLogger("eval-o2o")
@dataclass
class Sample:
sample_id: str
is_plagiarism: bool
original_text: str
augmented_text: str
meta_sim: float | None # 기존 모델 점수
lemma_sim: float | None = None # 평가 시 채워짐
def _load_meta_sims(data_dir: Path) -> dict[str, float]:
candidates = sorted(data_dir.glob("all_similarities_*.json"))
if not candidates:
return {}
with candidates[-1].open("r", encoding="utf-8") as f:
data = json.load(f)
out: dict[str, float] = {}
for row in data.get("pos_results", []):
if row.get("cosine_similarity") is not None:
out[row["id"]] = float(row["cosine_similarity"])
for row in data.get("neg_results", []):
if row.get("cosine_similarity") is not None:
out[row["id"]] = float(row["cosine_similarity"])
return out
def _load_samples(data_dir: Path) -> list[Sample]:
pos = json.load((data_dir / "plagiarism_pos_metadata.json").open("r", encoding="utf-8"))
neg = json.load((data_dir / "plagiarism_neg_metadata.json").open("r", encoding="utf-8"))
meta_sims = _load_meta_sims(data_dir)
samples: list[Sample] = []
for i, item in enumerate(pos, start=1):
sid = f"POS{i:03d}"
samples.append(Sample(
sample_id=sid,
is_plagiarism=True,
original_text=item["original_text"],
augmented_text=item["augmented_text"],
meta_sim=meta_sims.get(sid),
))
for i, item in enumerate(neg, start=1):
sid = f"NEG{i:03d}"
samples.append(Sample(
sample_id=sid,
is_plagiarism=False,
original_text=item["original_text"],
augmented_text=item["augmented_text"],
meta_sim=meta_sims.get(sid),
))
return samples
def _compute_lemma_sims(samples: list[Sample]) -> None:
"""augmented (의심 표절본) 기준 lemma 교집합 비율."""
for i, s in enumerate(samples, 1):
q = extract_lemmas(s.augmented_text)
r = extract_lemmas(s.original_text)
s.lemma_sim = lemma_overlap_ratio(q, r)
if i % 200 == 0:
logger.info("Lemma extraction %d/%d", i, len(samples))
def _metrics(scores: np.ndarray, labels: np.ndarray, threshold: float) -> dict[str, float]:
pred = scores >= threshold
tp = int(((pred == 1) & (labels == 1)).sum())
fp = int(((pred == 1) & (labels == 0)).sum())
tn = int(((pred == 0) & (labels == 0)).sum())
fn = int(((pred == 0) & (labels == 1)).sum())
precision = tp / (tp + fp) if (tp + fp) else 0.0
recall = tp / (tp + fn) if (tp + fn) else 0.0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0
acc = (tp + tn) / max(1, tp + fp + tn + fn)
return {
"threshold": threshold, "precision": precision, "recall": recall, "f1": f1,
"accuracy": acc, "tp": tp, "fp": fp, "tn": tn, "fn": fn,
}
def _best_threshold(scores: np.ndarray, labels: np.ndarray,
grid: Iterable[float] = None) -> dict[str, float]:
if grid is None:
grid = np.arange(0.05, 0.99, 0.01)
best = None
for t in grid:
m = _metrics(scores, labels, float(t))
if best is None or m["f1"] > best["f1"]:
best = m
return best
def _distribution_summary(scores: np.ndarray, labels: np.ndarray) -> str:
pos_scores = scores[labels == 1]
neg_scores = scores[labels == 0]
return (
f"POS n={len(pos_scores)} avg={pos_scores.mean():.4f} std={pos_scores.std():.4f} "
f"min={pos_scores.min():.4f} max={pos_scores.max():.4f}\n"
f"NEG n={len(neg_scores)} avg={neg_scores.mean():.4f} std={neg_scores.std():.4f} "
f"min={neg_scores.min():.4f} max={neg_scores.max():.4f}\n"
f"분리도(POS평균 - NEG평균) = {pos_scores.mean() - neg_scores.mean():+.4f}"
)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--data-dir", required=True, help="plagia_result 디렉토리")
parser.add_argument("--out-json", default="data/training/o2o_eval_result.json")
args = parser.parse_args()
data_dir = Path(args.data_dir).expanduser().resolve()
samples = _load_samples(data_dir)
logger.info("Loaded %d samples (%d POS, %d NEG)",
len(samples), sum(s.is_plagiarism for s in samples),
sum(not s.is_plagiarism for s in samples))
logger.info("Computing lemma overlap for all pairs...")
_compute_lemma_sims(samples)
# 메타 점수가 누락된 샘플 제외 (기존 결과 누락분)
valid = [s for s in samples if s.meta_sim is not None and s.lemma_sim is not None]
logger.info("Valid samples for evaluation: %d", len(valid))
labels = np.array([1 if s.is_plagiarism else 0 for s in valid])
meta_scores = np.array([s.meta_sim for s in valid])
lemma_scores = np.array([s.lemma_sim for s in valid])
print()
print("=" * 72)
print("[1] 메타 임베딩 점수 단독 (기존 1단계 모델 재현)")
print("=" * 72)
print(_distribution_summary(meta_scores, labels))
best_meta = _best_threshold(meta_scores, labels)
print(f"\n최적 F1: threshold={best_meta['threshold']:.2f} "
f"P={best_meta['precision']:.4f} R={best_meta['recall']:.4f} "
f"F1={best_meta['f1']:.4f} Acc={best_meta['accuracy']:.4f}")
print(f" TP={best_meta['tp']} FP={best_meta['fp']} "
f"TN={best_meta['tn']} FN={best_meta['fn']}")
print()
print("=" * 72)
print("[2] Lemma 교집합 점수 단독 (우리가 추가한 구조 분석)")
print("=" * 72)
print(_distribution_summary(lemma_scores, labels))
best_lemma = _best_threshold(lemma_scores, labels)
print(f"\n최적 F1: threshold={best_lemma['threshold']:.2f} "
f"P={best_lemma['precision']:.4f} R={best_lemma['recall']:.4f} "
f"F1={best_lemma['f1']:.4f} Acc={best_lemma['accuracy']:.4f}")
print(f" TP={best_lemma['tp']} FP={best_lemma['fp']} "
f"TN={best_lemma['tn']} FN={best_lemma['fn']}")
print()
print("=" * 72)
print("[3] 하이브리드 = α·meta + (1-α)·lemma ── α 그리드 서치")
print("=" * 72)
print(f"{'α(meta)':>9} {'threshold':>10} {'precision':>10} {'recall':>10} {'F1':>8} {'acc':>8}")
best_hybrid = None
best_alpha = None
rows = []
for alpha in np.arange(0.0, 1.01, 0.05):
combined = alpha * meta_scores + (1 - alpha) * lemma_scores
m = _best_threshold(combined, labels)
rows.append((float(alpha), m))
print(f"{alpha:>9.2f} {m['threshold']:>10.2f} {m['precision']:>10.4f} "
f"{m['recall']:>10.4f} {m['f1']:>8.4f} {m['accuracy']:>8.4f}")
if best_hybrid is None or m["f1"] > best_hybrid["f1"]:
best_hybrid = m
best_alpha = float(alpha)
print()
print("=" * 72)
print("[4] 요약 비교")
print("=" * 72)
print(f"{'모델':25s} {'precision':>10} {'recall':>10} {'F1':>8} {'threshold':>10}")
print(f"{'기존 모델 (result.json)':25s} {0.9520:>10.4f} {0.9560:>10.4f} {0.9540:>8.4f} {0.78:>10.2f}")
print(f"{'메타 단독 (재현)':25s} {best_meta['precision']:>10.4f} "
f"{best_meta['recall']:>10.4f} {best_meta['f1']:>8.4f} {best_meta['threshold']:>10.2f}")
print(f"{'Lemma 단독':25s} {best_lemma['precision']:>10.4f} "
f"{best_lemma['recall']:>10.4f} {best_lemma['f1']:>8.4f} {best_lemma['threshold']:>10.2f}")
print(f"{f'하이브리드 (α={best_alpha:.2f})':25s} {best_hybrid['precision']:>10.4f} "
f"{best_hybrid['recall']:>10.4f} {best_hybrid['f1']:>8.4f} {best_hybrid['threshold']:>10.2f}")
out_path = ROOT / args.out_json
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", encoding="utf-8") as f:
json.dump({
"n_valid": len(valid),
"meta_only": best_meta,
"lemma_only": best_lemma,
"hybrid_best": {"alpha": best_alpha, **best_hybrid},
"hybrid_grid": [{"alpha": a, **m} for a, m in rows],
"distributions": {
"meta_pos_avg": float(meta_scores[labels == 1].mean()),
"meta_neg_avg": float(meta_scores[labels == 0].mean()),
"lemma_pos_avg": float(lemma_scores[labels == 1].mean()),
"lemma_neg_avg": float(lemma_scores[labels == 0].mean()),
},
}, f, ensure_ascii=False, indent=2)
print(f"\n결과 저장: {out_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())