Source code for BLEval.EarlyPrecision

from itertools import permutations, product
from typing import Dict, Set, Tuple

import pandas as pd

from BLEval.evaluator import Evaluator
from BLEval.data import EvaluationData


def _build_edge_universe(
    gt_df: pd.DataFrame,
    tf_edges: bool,
) -> Tuple[Set[Tuple[str, str]], Set[Tuple[str, str]]]:
    """
    Build the candidate edge universe and the set of true edges from a GT DataFrame.

    When tf_edges is False the universe is all directed non-self-loop pairs
    among GT genes. When tf_edges is True the universe is restricted to edges
    whose source gene appears as Gene1 in the ground truth (i.e. TF→gene edges),
    which is appropriate for experimental scRNA-seq data where regulators are
    known. True edges are the intersection of GT edges with the universe so
    that the random-baseline computation is consistent with the candidate pool.

    Parameters
    ----------
    gt_df : pd.DataFrame
        Ground truth network with at least columns Gene1, Gene2. Self-loops
        and duplicate rows are removed internally.
    tf_edges : bool
        When True, restrict the universe to TF→gene edges (no self-loops).
        When False, use all directed non-self-loop gene pairs.

    Returns
    -------
    possible_edges : set of (str, str)
        Complete candidate edge universe (no self-loops).
    true_edges : set of (str, str)
        Ground truth edges within the candidate universe (no self-loops).
    """
    if not isinstance(gt_df, pd.DataFrame):
        raise TypeError(f"gt_df must be DataFrame, got {type(gt_df)}")
    if not isinstance(tf_edges, bool):
        raise TypeError(f"tf_edges must be bool, got {type(tf_edges)}")

    gt_no_self = gt_df[gt_df['Gene1'] != gt_df['Gene2']].drop_duplicates()
    unique_nodes = sorted(set(gt_df['Gene1']).union(set(gt_df['Gene2'])))

    if tf_edges:
        tf_genes = set(gt_no_self['Gene1'])
        all_genes = set(unique_nodes)
        # All TF→gene edges: source is a known TF, target is any gene, no self-loops
        possible_edges = {(tf, g) for tf in tf_genes for g in all_genes if tf != g}
    else:
        possible_edges = set(permutations(unique_nodes, 2))

    all_gt_edges = set(zip(gt_no_self['Gene1'], gt_no_self['Gene2']))
    # Intersect with universe so true_edges is consistent with possible_edges
    true_edges = all_gt_edges & possible_edges

    return possible_edges, true_edges


def _compute_early_precision(
    ranked_edges: pd.DataFrame,
    true_edges: Set[Tuple[str, str]],
    possible_edges: Set[Tuple[str, str]],
    tf_edges: bool,
) -> float:
    """
    Compute the Early Precision Ratio (EPR) for one algorithm on one run.

    EPR = early_precision / random_precision, where early precision is the
    fraction of true positives among the selected top-k predictions and
    random_precision = |true_edges| / |possible_edges|. k is the number of
    true edges (the size of the ground truth network). A tie-aware cutoff
    (bestVal = max(nonZeroMin, edgeWeightTopk)) expands the selected set to
    include all tied predictions at the boundary, avoiding the large block of
    zero-weight edges when the k-th prediction has weight 0. The denominator
    is the actual number of selected predictions (not fixed k), consistent
    with the tie-expansion. Returns 0 when no predictions remain after
    filtering, and nan when the universe or ground truth is empty.

    Parameters
    ----------
    ranked_edges : pd.DataFrame
        Predicted edge list with columns Gene1, Gene2, EdgeWeight.
        Higher absolute EdgeWeight indicates greater confidence.
    true_edges : set of (str, str)
        Ground truth directed edges within the candidate universe.
    possible_edges : set of (str, str)
        Complete candidate edge universe; used to compute the random baseline
        and, when tf_edges is True, to filter predictions.
    tf_edges : bool
        When True, predictions are pre-filtered to the candidate universe
        before ranking. When False, all non-self-loop predictions are ranked.

    Returns
    -------
    float
        EPR value (>1 = better than random, =1 = at random, <1 = below random),
        0 if no predictions remain after filtering, or nan if the metric
        cannot be computed.
    """
    if not isinstance(ranked_edges, pd.DataFrame):
        raise TypeError(f"ranked_edges must be DataFrame, got {type(ranked_edges)}")
    if not isinstance(true_edges, set):
        raise TypeError(f"true_edges must be set, got {type(true_edges)}")
    if not isinstance(possible_edges, set):
        raise TypeError(f"possible_edges must be set, got {type(possible_edges)}")
    if not isinstance(tf_edges, bool):
        raise TypeError(f"tf_edges must be bool, got {type(tf_edges)}")

    num_edges = len(true_edges)
    if num_edges == 0 or len(possible_edges) == 0:
        return float('nan')

    random_eprc = num_edges / len(possible_edges)

    # Remove self-loops; deduplicate by (Gene1, Gene2) keeping highest abs weight
    predicted = ranked_edges[ranked_edges['Gene1'] != ranked_edges['Gene2']].copy()
    predicted['_abs'] = predicted['EdgeWeight'].abs()
    predicted = (
        predicted
        .sort_values('_abs', ascending=False)
        .drop_duplicates(subset=['Gene1', 'Gene2'])
        .reset_index(drop=True)
    )

    if tf_edges:
        # Restrict predictions to the TF→gene candidate universe
        keep = [(r.Gene1, r.Gene2) in possible_edges for r in predicted.itertuples()]
        predicted = predicted[keep].reset_index(drop=True)

    if predicted.empty:
        return 0.0

    # Tie-aware cutoff at rank k, capped at the number of available predictions
    maxk = min(len(predicted), num_edges)
    edge_weight_topk = float(predicted.iloc[maxk - 1]['_abs'])

    nonzero = predicted.loc[predicted['_abs'] > 0, '_abs']
    non_zero_min = float(nonzero.min()) if not nonzero.empty else 0.0

    # Expand to include all tied predictions at the boundary; avoids pulling in
    # the large block of zero-weight edges when edgeWeightTopk is 0
    best_val = max(non_zero_min, edge_weight_topk)
    selected = predicted[predicted['_abs'] >= best_val]

    if selected.empty:
        return 0.0

    selected_edges = set(zip(selected['Gene1'], selected['Gene2']))
    intersection = selected_edges & true_edges

    # Denominator is the actual selected count, not fixed k, to account for ties
    eprec = len(intersection) / len(selected_edges)
    return eprec / random_eprc


[docs]class EarlyPrecision(Evaluator): """ Evaluator that computes the Early Precision Ratio (EPR) for each algorithm against the ground truth network. EPR equals early precision (fraction of true positives in top-k predictions) divided by the expected precision of a random predictor over the candidate edge universe. k equals the number of ground truth edges excluding self-loops. EPR > 1 indicates better-than-random performance; EPR = 1 is the random baseline; EPR < 1 is below random. When tf_edges is True, the candidate universe and predictions are restricted to TF→gene edges (appropriate for experimental scRNA-seq data). When False, all directed non-self-loop gene pairs form the universe (appropriate for synthetic/simulated data). For each DatasetGroup, writes EarlyPrecision.csv to dataset_path. Rows are algorithms and columns are run_ids. """ def __init__(self, tf_edges: bool = False) -> None: """ Parameters ---------- tf_edges : bool, optional When True, restrict the candidate universe and predictions to TF→gene edges. Default is False (use all directed non-self-loop gene pairs), appropriate for synthetic data. """ if not isinstance(tf_edges, bool): raise TypeError(f"tf_edges must be bool, got {type(tf_edges)}") self.tf_edges = tf_edges def __call__(self, evaluation_data: EvaluationData) -> None: """ Compute EPR per algorithm per run and write results to dataset_path/EarlyPrecision.csv. Parameters ---------- evaluation_data : EvaluationData Loaded predicted networks organised by dataset and run. Returns ------- None """ if not isinstance(evaluation_data, EvaluationData): raise TypeError( f"evaluation_data must be EvaluationData, got {type(evaluation_data)}" ) for dataset_group in evaluation_data: # Load the ground truth once per group; all runs share the same GT. gt_path = next( (run.ground_truth_path for run in dataset_group if run.ground_truth_path.exists()), None, ) if gt_path is None: print( f"Warning: no ground truth found for dataset " f"'{dataset_group.dataset_id}', skipping EarlyPrecision." ) continue gt_df = self._load_ground_truth(gt_path) possible_edges, true_edges = _build_edge_universe(gt_df, self.tf_edges) # results[algo][run_id] = EPR score results: Dict[str, Dict[str, float]] = {} for run in dataset_group: for algo, ranked_edges_df in run.ranked_edges.items(): score = _compute_early_precision( ranked_edges_df, true_edges, possible_edges, self.tf_edges ) results.setdefault(algo, {})[run.run_id] = score if not results: continue # Build output DataFrame: rows = algorithms, columns = run_ids out_df = pd.DataFrame(results).T out_df.index.name = 'Algorithm' dataset_group.dataset_path.mkdir(parents=True, exist_ok=True) out_path = dataset_group.dataset_path / 'EarlyPrecision.csv' out_df.to_csv(out_path) print(f"Wrote EarlyPrecision results to {out_path}")