|
|
""" |
|
|
Single-dataset statistical comparison: control vs competitors via |
|
|
Nadeau–Bengio corrected t-tests on outer folds (k-fold CV), with Holm FWER control. |
|
|
No Wilcoxon. No Friedman. Test metrics are shown for context only. |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from math import sqrt |
|
|
|
|
|
from statsmodels.stats.multitest import multipletests |
|
|
from scipy.stats import t as tdist |
|
|
|
|
|
np.random.seed(42) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_list_in_brackets(inner: str) -> Optional[List[float]]: |
|
|
parts = [p.strip() for p in inner.split(",") if p.strip()] |
|
|
out: List[float] = [] |
|
|
for p in parts: |
|
|
try: |
|
|
out.append(float(p)) |
|
|
except Exception: |
|
|
return None |
|
|
return out |
|
|
|
|
|
|
|
|
def parse_log_file( |
|
|
log_path: Path, expected_k: int = 5 |
|
|
) -> Tuple[Optional[List[float]], Optional[List[float]]]: |
|
|
""" |
|
|
Extract K outer-fold validation RMSE and MAE arrays. |
|
|
Looks for: |
|
|
VAL FOLD RMSEs: [v1, ..., vK] |
|
|
VAL FOLD MAEs: [v1, ..., vK] |
|
|
""" |
|
|
try: |
|
|
text = log_path.read_text() |
|
|
except Exception: |
|
|
return None, None |
|
|
|
|
|
rmse_match = re.search( |
|
|
r"VAL\s+FOLD\s+RMSEs:\s*\[(.*?)\]", text, flags=re.IGNORECASE |
|
|
) |
|
|
mae_match = re.search(r"VAL\s+FOLD\s+MAEs:\s*\[(.*?)\]", text, flags=re.IGNORECASE) |
|
|
if not (rmse_match and mae_match): |
|
|
return None, None |
|
|
|
|
|
rmse_scores = _parse_list_in_brackets(rmse_match.group(1)) |
|
|
mae_scores = _parse_list_in_brackets(mae_match.group(1)) |
|
|
if rmse_scores is None or mae_scores is None: |
|
|
return None, None |
|
|
if len(rmse_scores) != expected_k or len(mae_scores) != expected_k: |
|
|
return None, None |
|
|
return rmse_scores, mae_scores |
|
|
|
|
|
|
|
|
def parse_csv_file(csv_path: Path) -> Optional[Dict[str, float]]: |
|
|
""" |
|
|
Parse *_final_results.csv with expected columns: |
|
|
val_rmse_mean, val_rmse_std, val_mae_mean, val_mae_std, |
|
|
test_rmse_mean, test_rmse_ci_low, test_rmse_ci_high, |
|
|
test_mae_mean, test_mae_ci_low, test_mae_ci_high |
|
|
""" |
|
|
try: |
|
|
df = pd.read_csv(csv_path) |
|
|
except Exception: |
|
|
return None |
|
|
cols = [ |
|
|
"val_rmse_mean", |
|
|
"val_rmse_std", |
|
|
"val_mae_mean", |
|
|
"val_mae_std", |
|
|
"test_rmse_mean", |
|
|
"test_rmse_ci_low", |
|
|
"test_rmse_ci_high", |
|
|
"test_mae_mean", |
|
|
"test_mae_ci_low", |
|
|
"test_mae_ci_high", |
|
|
] |
|
|
if not all(c in df.columns for c in cols): |
|
|
return None |
|
|
row = df.iloc[0] |
|
|
return {k: float(row[k]) for k in cols} |
|
|
|
|
|
|
|
|
def clean_base_and_exp_id(base_name: str) -> Tuple[str, str]: |
|
|
""" |
|
|
Strip trailing _YYYYMMDD_HHMMSS. exp_id := first two tokens (e.g., 'gat_ecfp'), |
|
|
or 'polyatomic_polyatomic'. |
|
|
""" |
|
|
clean = re.sub(r"_\d{8}_\d{6}$", "", base_name) |
|
|
toks = clean.split("_") |
|
|
exp_id = "_".join(toks[:2]) if len(toks) >= 2 else clean |
|
|
return clean, exp_id |
|
|
|
|
|
|
|
|
def discover_dataset(results_dir: Path, expected_k: int = 5) -> Dict[str, Dict]: |
|
|
""" |
|
|
Scan one dataset dir. Keep first valid (log,csv) per exp_id. |
|
|
Returns: |
|
|
data[exp_id] = { |
|
|
'rmse_folds': [K] or None, |
|
|
'mae_folds' : [K] or None, |
|
|
'summary' : {...}, |
|
|
'log_path' : str, |
|
|
'csv_path' : str |
|
|
} |
|
|
""" |
|
|
data: Dict[str, Dict] = {} |
|
|
for model_dir in results_dir.iterdir(): |
|
|
if not model_dir.is_dir(): |
|
|
continue |
|
|
for log_file in sorted(model_dir.glob("*.txt")): |
|
|
base = log_file.stem |
|
|
clean, exp_id = clean_base_and_exp_id(base) |
|
|
csv_file = log_file.with_name(f"{clean}_final_results.csv") |
|
|
if not csv_file.exists(): |
|
|
continue |
|
|
rmse_scores, mae_scores = parse_log_file(log_file, expected_k=expected_k) |
|
|
summary_stats = parse_csv_file(csv_file) |
|
|
if summary_stats is None: |
|
|
continue |
|
|
if exp_id not in data: |
|
|
data[exp_id] = { |
|
|
"rmse_folds": rmse_scores, |
|
|
"mae_folds": mae_scores, |
|
|
"summary": summary_stats, |
|
|
"log_path": str(log_file), |
|
|
"csv_path": str(csv_file), |
|
|
} |
|
|
return data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def nb_corrected_t(diffs: np.ndarray, k: int) -> Tuple[float, float, float]: |
|
|
""" |
|
|
Nadeau–Bengio corrected resampled t for k-fold CV differences. |
|
|
diffs: array length k with (competitor - control) per fold (loss metric). |
|
|
Negative mean favors control. Returns (t_stat, SE_NB, mean_diff). |
|
|
Correction uses rho0 = 1/(k-1) → SE_NB = sqrt((1/k + 1/(k-1)) * s^2), |
|
|
where s^2 is unbiased sample variance across folds. |
|
|
""" |
|
|
diffs = np.asarray(diffs, dtype=float) |
|
|
if diffs.size != k: |
|
|
raise ValueError("diffs length must equal k") |
|
|
mean_d = float(diffs.mean()) |
|
|
if k <= 1: |
|
|
return np.nan, np.nan, mean_d |
|
|
s2 = float(np.var(diffs, ddof=1)) |
|
|
rho0 = 1.0 / (k - 1.0) |
|
|
se_nb = sqrt((1.0 / k + rho0) * s2) |
|
|
t_stat = ( |
|
|
mean_d / se_nb |
|
|
if se_nb > 0 |
|
|
else (-np.inf if mean_d < 0 else np.inf if mean_d > 0 else 0.0) |
|
|
) |
|
|
return t_stat, se_nb, mean_d |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
ap = argparse.ArgumentParser( |
|
|
description="Single-dataset comparison: NB-corrected t on outer folds (control vs all) with Holm; prints Test metrics for context." |
|
|
) |
|
|
ap.add_argument( |
|
|
"--results_dir", |
|
|
type=str, |
|
|
required=True, |
|
|
help="Dataset directory (e.g., ./logs_hyperparameter/qm9) with model-family subfolders.", |
|
|
) |
|
|
ap.add_argument( |
|
|
"--output_dir", type=str, required=True, help="Directory to save the report." |
|
|
) |
|
|
ap.add_argument( |
|
|
"--exp_name", |
|
|
type=str, |
|
|
required=True, |
|
|
help="Base name for the output report file.", |
|
|
) |
|
|
ap.add_argument( |
|
|
"--control_model", |
|
|
type=str, |
|
|
required=True, |
|
|
help="Control exp_id (e.g., 'polyatomic_polyatomic'). Unique substring allowed.", |
|
|
) |
|
|
ap.add_argument( |
|
|
"--k", type=int, default=5, help="Expected number of outer folds (default: 5)." |
|
|
) |
|
|
ap.add_argument( |
|
|
"--alpha", |
|
|
type=float, |
|
|
default=0.05, |
|
|
help="FWER level for Holm; also used for NB CIs if --print_ci.", |
|
|
) |
|
|
ap.add_argument( |
|
|
"--print_ci", |
|
|
action="store_true", |
|
|
help="Also print NB-style CIs for the mean fold difference.", |
|
|
) |
|
|
args = ap.parse_args() |
|
|
|
|
|
results_dir = Path(args.results_dir) |
|
|
out_dir = Path(args.output_dir) |
|
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
report_path = out_dir / f"{args.exp_name}_NB_single_dataset.txt" |
|
|
|
|
|
data = discover_dataset(results_dir, expected_k=args.k) |
|
|
if not data: |
|
|
raise SystemExit( |
|
|
f"No models found under {results_dir} with valid logs + *_final_results.csv" |
|
|
) |
|
|
|
|
|
exp_ids = sorted(data.keys()) |
|
|
|
|
|
|
|
|
ctrl = args.control_model |
|
|
if ctrl not in data: |
|
|
matches = [e for e in exp_ids if ctrl.lower() in e.lower()] |
|
|
if len(matches) == 1: |
|
|
ctrl = matches[0] |
|
|
else: |
|
|
raise SystemExit( |
|
|
f"Control '{args.control_model}' not found. Available exp_ids: {exp_ids}" |
|
|
) |
|
|
|
|
|
with open(report_path, "w") as f: |
|
|
f.write("=" * 110 + "\n") |
|
|
f.write( |
|
|
f"Dataset: {results_dir.name} — Control vs competitors (NB-corrected t on outer folds; Holm across competitors)\n" |
|
|
) |
|
|
f.write("=" * 110 + "\n\n") |
|
|
f.write(f"Control exp_id: {ctrl}\n") |
|
|
f.write(f"k folds: {args.k}, alpha: {args.alpha}\n\n") |
|
|
|
|
|
|
|
|
header = f"{'Model (exp_id)':<26} | {'Test RMSE (95% CI)':<30} | {'Test MAE (95% CI)':<30} | {'Val RMSE mean±sd':<22} | {'Val MAE mean±sd':<22}\n" |
|
|
f.write(header) |
|
|
f.write("-" * len(header) + "\n") |
|
|
for exp_id in exp_ids: |
|
|
s = data[exp_id]["summary"] |
|
|
line = ( |
|
|
f"{exp_id:<26} | " |
|
|
f"{s['test_rmse_mean']:.6f} [{s['test_rmse_ci_low']:.6f}, {s['test_rmse_ci_high']:.6f}] | " |
|
|
f"{s['test_mae_mean']:.6f} [{s['test_mae_ci_low']:.6f}, {s['test_mae_ci_high']:.6f}] | " |
|
|
f"{s['val_rmse_mean']:.6f} ± {s['val_rmse_std']:.6f} | " |
|
|
f"{s['val_mae_mean']:.6f} ± {s['val_mae_std']:.6f}\n" |
|
|
) |
|
|
f.write(line) |
|
|
f.write("\n") |
|
|
|
|
|
|
|
|
comps = [e for e in exp_ids if e != ctrl] |
|
|
rows = [] |
|
|
pvals_rmse, labels_rmse = [], [] |
|
|
pvals_mae, labels_mae = [], [] |
|
|
|
|
|
ctrl_rmse = np.array( |
|
|
data[ctrl]["rmse_folds"] if data[ctrl]["rmse_folds"] is not None else [], |
|
|
dtype=float, |
|
|
) |
|
|
ctrl_mae = np.array( |
|
|
data[ctrl]["mae_folds"] if data[ctrl]["mae_folds"] is not None else [], |
|
|
dtype=float, |
|
|
) |
|
|
|
|
|
if ctrl_rmse.size != args.k or ctrl_mae.size != args.k: |
|
|
f.write( |
|
|
"WARNING: Control model missing complete fold arrays; NB testing cannot proceed.\n" |
|
|
) |
|
|
else: |
|
|
for comp in comps: |
|
|
comp_rmse = data[comp]["rmse_folds"] |
|
|
comp_mae = data[comp]["mae_folds"] |
|
|
if comp_rmse is None or comp_mae is None: |
|
|
continue |
|
|
comp_rmse = np.array(comp_rmse, dtype=float) |
|
|
comp_mae = np.array(comp_mae, dtype=float) |
|
|
if comp_rmse.size != args.k or comp_mae.size != args.k: |
|
|
continue |
|
|
|
|
|
|
|
|
d_rmse = comp_rmse - ctrl_rmse |
|
|
d_mae = comp_mae - ctrl_mae |
|
|
|
|
|
t_rmse, se_rmse, mean_rmse = nb_corrected_t(d_rmse, k=args.k) |
|
|
t_mae, se_mae, mean_mae = nb_corrected_t(d_mae, k=args.k) |
|
|
|
|
|
df = args.k - 1 |
|
|
|
|
|
p_rmse = float(tdist.sf(t_rmse, df=df)) |
|
|
p_mae = float(tdist.sf(t_mae, df=df)) |
|
|
|
|
|
pvals_rmse.append(p_rmse) |
|
|
labels_rmse.append(f"{ctrl} vs {comp}") |
|
|
pvals_mae.append(p_mae) |
|
|
labels_mae.append(f"{ctrl} vs {comp}") |
|
|
|
|
|
row = { |
|
|
"comparison": f"{ctrl} vs {comp}", |
|
|
"mean_diff_RMSE(comp-ctrl)": mean_rmse, |
|
|
"t_NB_RMSE": t_rmse, |
|
|
"p_one_sided_RMSE": p_rmse, |
|
|
"mean_diff_MAE(comp-ctrl)": mean_mae, |
|
|
"t_NB_MAE": t_mae, |
|
|
"p_one_sided_MAE": p_mae, |
|
|
} |
|
|
|
|
|
if args.print_ci: |
|
|
tcrit = float(tdist.ppf(1 - args.alpha / 2, df=df)) |
|
|
row["NB_CI_RMSE_low"] = mean_rmse - tcrit * se_rmse |
|
|
row["NB_CI_RMSE_high"] = mean_rmse + tcrit * se_rmse |
|
|
row["NB_CI_MAE_low"] = mean_mae - tcrit * se_mae |
|
|
row["NB_CI_MAE_high"] = mean_mae + tcrit * se_mae |
|
|
|
|
|
rows.append(row) |
|
|
|
|
|
df_rows = pd.DataFrame(rows) |
|
|
if not df_rows.empty: |
|
|
f.write("--- NB-corrected t (outer folds) per competitor ---\n") |
|
|
f.write( |
|
|
df_rows.to_string(index=False, float_format=lambda x: f"{x:.6f}") |
|
|
+ "\n\n" |
|
|
) |
|
|
else: |
|
|
f.write( |
|
|
"No comparable competitors with complete fold arrays. Nothing to test.\n\n" |
|
|
) |
|
|
|
|
|
|
|
|
def holm_table(pvals: List[float], labels: List[str], title: str): |
|
|
if not pvals: |
|
|
f.write(f"{title}\n(no p-values)\n\n") |
|
|
return |
|
|
reject, p_adj, _, _ = multipletests( |
|
|
pvals=pvals, alpha=args.alpha, method="holm" |
|
|
) |
|
|
df = pd.DataFrame( |
|
|
{ |
|
|
"comparison": labels, |
|
|
"p_raw": pvals, |
|
|
"p_holm": p_adj, |
|
|
"Significant": reject.astype(bool), |
|
|
} |
|
|
) |
|
|
f.write(title + "\n") |
|
|
f.write( |
|
|
df.sort_values("p_raw").to_string( |
|
|
index=False, float_format=lambda x: f"{x:.6f}" |
|
|
) |
|
|
+ "\n\n" |
|
|
) |
|
|
|
|
|
holm_table( |
|
|
pvals_rmse, labels_rmse, "--- Holm-adjusted p-values (RMSE family) ---" |
|
|
) |
|
|
holm_table( |
|
|
pvals_mae, labels_mae, "--- Holm-adjusted p-values (MAE family) ---" |
|
|
) |
|
|
|
|
|
f.write("=" * 110 + "\nNotes:\n") |
|
|
f.write( |
|
|
"• Tests are within-dataset, one-sided for control superiority, on outer-fold differences with Nadeau–Bengio SE correction (df = k-1).\n" |
|
|
) |
|
|
f.write( |
|
|
"• Holm controls family-wise error across competitors per metric family.\n" |
|
|
) |
|
|
f.write( |
|
|
"• Held-out Test metrics above are for context only; no fold-based omnibus tests are used.\n" |
|
|
) |
|
|
|
|
|
print(f"Report successfully written to: {report_path}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|