Source code for BLEval.AUPRC

from itertools import permutations
from typing import Dict

import pandas as pd
from sklearn.metrics import auc, precision_recall_curve

from BLEval.evaluator import Evaluator
from BLEval.data import EvaluationData


def _compute_auprc(
    ranked_edges: pd.DataFrame,
    gt_df: pd.DataFrame,
) -> float:
    """
    Compute the area under the Precision-Recall curve for one algorithm on one run.

    Scores are computed over the fixed universe of all possible directed pairs
    among ground truth genes (all permutations of length 2, self-loops excluded).
    Predicted edges absent from the universe receive score 0. Edge weights are
    taken as absolute values so that both positive (activating) and negative
    (inhibitory) predictions are ranked by confidence magnitude. Duplicate
    (Gene1, Gene2) pairs in ranked_edges are deduplicated by keeping the row with
    the highest absolute EdgeWeight. Returns float('nan') if no true edges appear
    in the scored universe (PR curve is undefined).

    Parameters
    ----------
    ranked_edges : pd.DataFrame
        Predicted edge list with columns Gene1, Gene2, EdgeWeight.
        Higher absolute EdgeWeight indicates greater confidence.
    gt_df : pd.DataFrame
        Ground truth network with columns Gene1, Gene2. Used to derive both the
        set of true edges and the fixed scoring universe.

    Returns
    -------
    float
        AUPRC score in [0, 1], or nan if the metric cannot be computed.
    """
    if not isinstance(ranked_edges, pd.DataFrame):
        raise TypeError(f"ranked_edges must be DataFrame, got {type(ranked_edges)}")
    if not isinstance(gt_df, pd.DataFrame):
        raise TypeError(f"gt_df must be DataFrame, got {type(gt_df)}")

    # Fixed universe: all directed pairs among GT genes, self-loops excluded
    gt_genes = sorted(set(gt_df['Gene1']).union(set(gt_df['Gene2'])))
    possible_edges = list(permutations(gt_genes, 2))

    true_edge_set = set(zip(gt_df['Gene1'], gt_df['Gene2']))

    # Deduplicate predicted edges: per (Gene1, Gene2), keep highest abs weight.
    # Self-loops removed before deduplication as they are never valid predictions.
    pred = ranked_edges[ranked_edges['Gene1'] != ranked_edges['Gene2']].copy()
    pred['_abs'] = pred['EdgeWeight'].abs()
    pred = pred.sort_values('_abs', ascending=False).drop_duplicates(
        subset=['Gene1', 'Gene2']
    )
    pred_lookup = dict(zip(zip(pred['Gene1'], pred['Gene2']), pred['_abs'].astype(float)))

    true_labels = [1 if e in true_edge_set else 0 for e in possible_edges]
    pred_scores = [pred_lookup.get(e, 0.0) for e in possible_edges]

    # PR curve is undefined when no positive examples exist in the universe
    if sum(true_labels) == 0:
        return float('nan')

    precision, recall, _ = precision_recall_curve(true_labels, pred_scores)
    return float(auc(recall, precision))


[docs]class AUPRC(Evaluator): """ Evaluator that computes the area under the Precision-Recall curve (AUPRC) for each algorithm against the ground truth network. For each DatasetGroup, writes AUPRC.csv to dataset_path. Rows are algorithms and columns are run_ids. Runs whose ground truth file is missing are skipped with a warning. """ def __call__(self, evaluation_data: EvaluationData) -> None: """ Compute AUPRC per algorithm per run and write results to dataset_path/AUPRC.csv. Parameters ---------- evaluation_data : EvaluationData Loaded predicted networks organised by dataset and run. Returns ------- None """ if not isinstance(evaluation_data, EvaluationData): raise TypeError(f"evaluation_data must be EvaluationData, got {type(evaluation_data)}") for dataset_group in evaluation_data: # results[algo][run_id] = auprc score results: Dict[str, Dict[str, float]] = {} for run in dataset_group: if not run.ground_truth_path.exists(): print( f"Warning: ground truth not found at {run.ground_truth_path}, " f"skipping run '{run.run_id}'." ) continue gt_df = self._load_ground_truth(run.ground_truth_path) for algo, ranked_edges_df in run.ranked_edges.items(): score = _compute_auprc(ranked_edges_df, gt_df) results.setdefault(algo, {})[run.run_id] = score if not results: continue # Build output DataFrame: rows = algorithms, columns = run_ids out_df = pd.DataFrame(results).T out_df.index.name = 'Algorithm' dataset_group.dataset_path.mkdir(parents=True, exist_ok=True) out_path = dataset_group.dataset_path / 'AUPRC.csv' out_df.to_csv(out_path) print(f"Wrote AUPRC results to {out_path}")