238 lines
9.5 KiB
Python
238 lines
9.5 KiB
Python
"""사내 plagia_result 데이터셋(pos 500 + neg 500)으로 하이브리드 성능 평가.
|
||
|
||
전임자 가이드 검증:
|
||
- 의미기반 스코어: 기존 1단계 모델의 메타데이터 임베딩 코사인 (이미 계산되어 있음)
|
||
- 구조기반 스코어: 우리 엔진의 lemma 교집합 비율 (본문 텍스트 기반)
|
||
- 조합: hybrid = α * meta_sim + (1-α) * lemma_sim
|
||
→ α와 threshold 그리드 서치로 최적 F1 도출
|
||
|
||
사용:
|
||
python scripts/evaluate_o2o_dataset.py \
|
||
--data-dir /Users/marineyang/Desktop/work/code/AI_publish_3rdtest/25/plagia_result
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import sys
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Iterable
|
||
|
||
import numpy as np
|
||
|
||
ROOT = Path(__file__).resolve().parent.parent
|
||
sys.path.insert(0, str(ROOT))
|
||
|
||
from app.engine.structural import extract_lemmas, lemma_overlap_ratio # noqa: E402
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
|
||
logger = logging.getLogger("eval-o2o")
|
||
|
||
|
||
@dataclass
|
||
class Sample:
|
||
sample_id: str
|
||
is_plagiarism: bool
|
||
original_text: str
|
||
augmented_text: str
|
||
meta_sim: float | None # 기존 모델 점수
|
||
lemma_sim: float | None = None # 평가 시 채워짐
|
||
|
||
|
||
def _load_meta_sims(data_dir: Path) -> dict[str, float]:
|
||
candidates = sorted(data_dir.glob("all_similarities_*.json"))
|
||
if not candidates:
|
||
return {}
|
||
with candidates[-1].open("r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
out: dict[str, float] = {}
|
||
for row in data.get("pos_results", []):
|
||
if row.get("cosine_similarity") is not None:
|
||
out[row["id"]] = float(row["cosine_similarity"])
|
||
for row in data.get("neg_results", []):
|
||
if row.get("cosine_similarity") is not None:
|
||
out[row["id"]] = float(row["cosine_similarity"])
|
||
return out
|
||
|
||
|
||
def _load_samples(data_dir: Path) -> list[Sample]:
|
||
pos = json.load((data_dir / "plagiarism_pos_metadata.json").open("r", encoding="utf-8"))
|
||
neg = json.load((data_dir / "plagiarism_neg_metadata.json").open("r", encoding="utf-8"))
|
||
meta_sims = _load_meta_sims(data_dir)
|
||
|
||
samples: list[Sample] = []
|
||
for i, item in enumerate(pos, start=1):
|
||
sid = f"POS{i:03d}"
|
||
samples.append(Sample(
|
||
sample_id=sid,
|
||
is_plagiarism=True,
|
||
original_text=item["original_text"],
|
||
augmented_text=item["augmented_text"],
|
||
meta_sim=meta_sims.get(sid),
|
||
))
|
||
for i, item in enumerate(neg, start=1):
|
||
sid = f"NEG{i:03d}"
|
||
samples.append(Sample(
|
||
sample_id=sid,
|
||
is_plagiarism=False,
|
||
original_text=item["original_text"],
|
||
augmented_text=item["augmented_text"],
|
||
meta_sim=meta_sims.get(sid),
|
||
))
|
||
return samples
|
||
|
||
|
||
def _compute_lemma_sims(samples: list[Sample]) -> None:
|
||
"""augmented (의심 표절본) 기준 lemma 교집합 비율."""
|
||
for i, s in enumerate(samples, 1):
|
||
q = extract_lemmas(s.augmented_text)
|
||
r = extract_lemmas(s.original_text)
|
||
s.lemma_sim = lemma_overlap_ratio(q, r)
|
||
if i % 200 == 0:
|
||
logger.info("Lemma extraction %d/%d", i, len(samples))
|
||
|
||
|
||
def _metrics(scores: np.ndarray, labels: np.ndarray, threshold: float) -> dict[str, float]:
|
||
pred = scores >= threshold
|
||
tp = int(((pred == 1) & (labels == 1)).sum())
|
||
fp = int(((pred == 1) & (labels == 0)).sum())
|
||
tn = int(((pred == 0) & (labels == 0)).sum())
|
||
fn = int(((pred == 0) & (labels == 1)).sum())
|
||
precision = tp / (tp + fp) if (tp + fp) else 0.0
|
||
recall = tp / (tp + fn) if (tp + fn) else 0.0
|
||
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0
|
||
acc = (tp + tn) / max(1, tp + fp + tn + fn)
|
||
return {
|
||
"threshold": threshold, "precision": precision, "recall": recall, "f1": f1,
|
||
"accuracy": acc, "tp": tp, "fp": fp, "tn": tn, "fn": fn,
|
||
}
|
||
|
||
|
||
def _best_threshold(scores: np.ndarray, labels: np.ndarray,
|
||
grid: Iterable[float] = None) -> dict[str, float]:
|
||
if grid is None:
|
||
grid = np.arange(0.05, 0.99, 0.01)
|
||
best = None
|
||
for t in grid:
|
||
m = _metrics(scores, labels, float(t))
|
||
if best is None or m["f1"] > best["f1"]:
|
||
best = m
|
||
return best
|
||
|
||
|
||
def _distribution_summary(scores: np.ndarray, labels: np.ndarray) -> str:
|
||
pos_scores = scores[labels == 1]
|
||
neg_scores = scores[labels == 0]
|
||
return (
|
||
f"POS n={len(pos_scores)} avg={pos_scores.mean():.4f} std={pos_scores.std():.4f} "
|
||
f"min={pos_scores.min():.4f} max={pos_scores.max():.4f}\n"
|
||
f"NEG n={len(neg_scores)} avg={neg_scores.mean():.4f} std={neg_scores.std():.4f} "
|
||
f"min={neg_scores.min():.4f} max={neg_scores.max():.4f}\n"
|
||
f"분리도(POS평균 - NEG평균) = {pos_scores.mean() - neg_scores.mean():+.4f}"
|
||
)
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--data-dir", required=True, help="plagia_result 디렉토리")
|
||
parser.add_argument("--out-json", default="data/training/o2o_eval_result.json")
|
||
args = parser.parse_args()
|
||
|
||
data_dir = Path(args.data_dir).expanduser().resolve()
|
||
samples = _load_samples(data_dir)
|
||
logger.info("Loaded %d samples (%d POS, %d NEG)",
|
||
len(samples), sum(s.is_plagiarism for s in samples),
|
||
sum(not s.is_plagiarism for s in samples))
|
||
|
||
logger.info("Computing lemma overlap for all pairs...")
|
||
_compute_lemma_sims(samples)
|
||
|
||
# 메타 점수가 누락된 샘플 제외 (기존 결과 누락분)
|
||
valid = [s for s in samples if s.meta_sim is not None and s.lemma_sim is not None]
|
||
logger.info("Valid samples for evaluation: %d", len(valid))
|
||
|
||
labels = np.array([1 if s.is_plagiarism else 0 for s in valid])
|
||
meta_scores = np.array([s.meta_sim for s in valid])
|
||
lemma_scores = np.array([s.lemma_sim for s in valid])
|
||
|
||
print()
|
||
print("=" * 72)
|
||
print("[1] 메타 임베딩 점수 단독 (기존 1단계 모델 재현)")
|
||
print("=" * 72)
|
||
print(_distribution_summary(meta_scores, labels))
|
||
best_meta = _best_threshold(meta_scores, labels)
|
||
print(f"\n최적 F1: threshold={best_meta['threshold']:.2f} "
|
||
f"P={best_meta['precision']:.4f} R={best_meta['recall']:.4f} "
|
||
f"F1={best_meta['f1']:.4f} Acc={best_meta['accuracy']:.4f}")
|
||
print(f" TP={best_meta['tp']} FP={best_meta['fp']} "
|
||
f"TN={best_meta['tn']} FN={best_meta['fn']}")
|
||
|
||
print()
|
||
print("=" * 72)
|
||
print("[2] Lemma 교집합 점수 단독 (우리가 추가한 구조 분석)")
|
||
print("=" * 72)
|
||
print(_distribution_summary(lemma_scores, labels))
|
||
best_lemma = _best_threshold(lemma_scores, labels)
|
||
print(f"\n최적 F1: threshold={best_lemma['threshold']:.2f} "
|
||
f"P={best_lemma['precision']:.4f} R={best_lemma['recall']:.4f} "
|
||
f"F1={best_lemma['f1']:.4f} Acc={best_lemma['accuracy']:.4f}")
|
||
print(f" TP={best_lemma['tp']} FP={best_lemma['fp']} "
|
||
f"TN={best_lemma['tn']} FN={best_lemma['fn']}")
|
||
|
||
print()
|
||
print("=" * 72)
|
||
print("[3] 하이브리드 = α·meta + (1-α)·lemma ── α 그리드 서치")
|
||
print("=" * 72)
|
||
print(f"{'α(meta)':>9} {'threshold':>10} {'precision':>10} {'recall':>10} {'F1':>8} {'acc':>8}")
|
||
best_hybrid = None
|
||
best_alpha = None
|
||
rows = []
|
||
for alpha in np.arange(0.0, 1.01, 0.05):
|
||
combined = alpha * meta_scores + (1 - alpha) * lemma_scores
|
||
m = _best_threshold(combined, labels)
|
||
rows.append((float(alpha), m))
|
||
print(f"{alpha:>9.2f} {m['threshold']:>10.2f} {m['precision']:>10.4f} "
|
||
f"{m['recall']:>10.4f} {m['f1']:>8.4f} {m['accuracy']:>8.4f}")
|
||
if best_hybrid is None or m["f1"] > best_hybrid["f1"]:
|
||
best_hybrid = m
|
||
best_alpha = float(alpha)
|
||
|
||
print()
|
||
print("=" * 72)
|
||
print("[4] 요약 비교")
|
||
print("=" * 72)
|
||
print(f"{'모델':25s} {'precision':>10} {'recall':>10} {'F1':>8} {'threshold':>10}")
|
||
print(f"{'기존 모델 (result.json)':25s} {0.9520:>10.4f} {0.9560:>10.4f} {0.9540:>8.4f} {0.78:>10.2f}")
|
||
print(f"{'메타 단독 (재현)':25s} {best_meta['precision']:>10.4f} "
|
||
f"{best_meta['recall']:>10.4f} {best_meta['f1']:>8.4f} {best_meta['threshold']:>10.2f}")
|
||
print(f"{'Lemma 단독':25s} {best_lemma['precision']:>10.4f} "
|
||
f"{best_lemma['recall']:>10.4f} {best_lemma['f1']:>8.4f} {best_lemma['threshold']:>10.2f}")
|
||
print(f"{f'하이브리드 (α={best_alpha:.2f})':25s} {best_hybrid['precision']:>10.4f} "
|
||
f"{best_hybrid['recall']:>10.4f} {best_hybrid['f1']:>8.4f} {best_hybrid['threshold']:>10.2f}")
|
||
|
||
out_path = ROOT / args.out_json
|
||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with out_path.open("w", encoding="utf-8") as f:
|
||
json.dump({
|
||
"n_valid": len(valid),
|
||
"meta_only": best_meta,
|
||
"lemma_only": best_lemma,
|
||
"hybrid_best": {"alpha": best_alpha, **best_hybrid},
|
||
"hybrid_grid": [{"alpha": a, **m} for a, m in rows],
|
||
"distributions": {
|
||
"meta_pos_avg": float(meta_scores[labels == 1].mean()),
|
||
"meta_neg_avg": float(meta_scores[labels == 0].mean()),
|
||
"lemma_pos_avg": float(lemma_scores[labels == 1].mean()),
|
||
"lemma_neg_avg": float(lemma_scores[labels == 0].mean()),
|
||
},
|
||
}, f, ensure_ascii=False, indent=2)
|
||
print(f"\n결과 저장: {out_path}")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|