Source code for BLEval.Borda

from typing import Dict, List, Tuple

import numpy as np
import pandas as pd

from BLEval.evaluator import Evaluator
from BLEval.data import EvaluationData


def _normalize(arr: np.ndarray) -> np.ndarray:
    """
    Min-max normalize an array to [0, 1].

    Parameters
    ----------
    arr : np.ndarray
        1-D array of floats.

    Returns
    -------
    np.ndarray
        Normalized array. Returns a zero array if max == min.
    """
    mn, mx = arr.min(), arr.max()
    if mx == mn:
        return np.zeros_like(arr, dtype=float)
    return 1.0 * (arr - mn) / (mx - mn)


def _compute_borda_aggregations(
    ranked_edges: Dict[str, pd.DataFrame],
) -> Dict[str, pd.DataFrame]:
    """
    Compute four Borda count aggregations over a set of algorithm predictions.

    For each algorithm, every predicted edge (excluding self-loops) is given a
    rank (1 = highest EdgeWeight). Edges absent from an algorithm's list are
    assigned a rank one beyond that algorithm's last predicted edge. The Borda
    score for an edge from algorithm i is:

        borda_i(edge) = N_total - rank_i(edge) + 1

    where N_total is the total number of unique non-self-loop edges across all
    algorithms. Scores are then aggregated with mean, median, min, and max
    across algorithms. An empty dict is returned when there are no predictions.

    Parameters
    ----------
    ranked_edges : dict[str, pd.DataFrame]
        Keyed by algorithm name. Each DataFrame has columns Gene1, Gene2,
        EdgeWeight. Higher EdgeWeight means greater confidence.

    Returns
    -------
    dict[str, pd.DataFrame]
        Keyed by aggregation method name: 'mean', 'median', 'min', 'max'.
        Each value is a DataFrame with columns Gene1, Gene2, Score, sorted by
        Score descending.
    """
    if not isinstance(ranked_edges, dict):
        raise TypeError(f"ranked_edges must be dict, got {type(ranked_edges)}")

    if not ranked_edges:
        return {}

    # Collect the union of all non-self-loop edges across all algorithms
    all_edges: set = set()
    for df in ranked_edges.values():
        if not isinstance(df, pd.DataFrame):
            raise TypeError(f"Each ranked_edges value must be DataFrame, got {type(df)}")
        no_loops = df[df['Gene1'] != df['Gene2']]
        all_edges.update(zip(no_loops['Gene1'], no_loops['Gene2']))

    if not all_edges:
        return {}

    # Deterministic ordering so the score matrix rows are stable
    edge_list: List[Tuple[str, str]] = sorted(all_edges)
    n_total = len(edge_list)
    edge_index = {edge: idx for idx, edge in enumerate(edge_list)}

    # scores_matrix[edge_idx, algo_idx] = Borda score for that edge from that algo
    n_algos = len(ranked_edges)
    scores_matrix = np.zeros((n_total, n_algos), dtype=float)

    for col_idx, (algo, df) in enumerate(ranked_edges.items()):
        no_loops = df[df['Gene1'] != df['Gene2']].copy()
        # Rank edges: sort descending by EdgeWeight, assign 1-based rank
        no_loops = no_loops.sort_values('EdgeWeight', ascending=False).reset_index(drop=True)
        n_algo = len(no_loops)

        # Borda score for predicted edges (rank = position index + 1)
        for rank_minus1, row in enumerate(no_loops.itertuples(index=False)):
            edge = (row.Gene1, row.Gene2)
            borda = n_total - (rank_minus1 + 1) + 1  # = n_total - rank_minus1
            scores_matrix[edge_index[edge], col_idx] = borda

        # Borda score for edges absent from this algorithm's list:
        # treat their rank as n_algo + 1 → score = n_total - n_algo
        absent_score = n_total - n_algo
        predicted_set = set(zip(no_loops['Gene1'], no_loops['Gene2']))
        for edge in edge_list:
            if edge not in predicted_set:
                scores_matrix[edge_index[edge], col_idx] = absent_score

    gene1 = [e[0] for e in edge_list]
    gene2 = [e[1] for e in edge_list]

    aggregations = {
        'mean':   np.mean(scores_matrix,   axis=1),
        'median': np.median(scores_matrix, axis=1),
        'min':    np.min(scores_matrix,    axis=1),
        'max':    np.max(scores_matrix,    axis=1),
    }

    results: Dict[str, pd.DataFrame] = {}
    for method, agg_scores in aggregations.items():
        out_df = pd.DataFrame({'Gene1': gene1, 'Gene2': gene2, 'Score': agg_scores})
        out_df = out_df.sort_values('Score', ascending=False).reset_index(drop=True)
        results[method] = out_df

    return results


[docs]class Borda(Evaluator): """ Evaluator that aggregates per-algorithm ranked edge lists into a single consensus ranking using Borda count. For each run, four aggregation methods are computed: mean, median, min, and max of per-algorithm Borda scores. Per-run scores for each method are then summarised across runs by taking the median. The final output is a single file written to dataset_path: dataset_path/Borda.csv Columns: Gene1, Gene2, BORDA (median of mean), mBORDA (median of median), sBORDA (median of min), smBORDA (median of max). Rows are sorted by BORDA descending. """ # Maps internal aggregation method names to output column names _METHOD_COLUMNS = { 'mean': 'BORDA', 'median': 'mBORDA', 'min': 'sBORDA', 'max': 'smBORDA', } def __call__(self, evaluation_data: EvaluationData) -> None: """ Compute Borda aggregations per run, take the median across runs for each method, and write a single Borda.csv to each DatasetGroup's dataset_path. Parameters ---------- evaluation_data : EvaluationData Loaded predicted networks organised by dataset and run. Returns ------- None """ if not isinstance(evaluation_data, EvaluationData): raise TypeError( f"evaluation_data must be EvaluationData, got {type(evaluation_data)}" ) for dataset_group in evaluation_data: # method → {run_id → Series(Score, index=(Gene1, Gene2))} per_method: Dict[str, Dict[str, pd.Series]] = {m: {} for m in self._METHOD_COLUMNS} for run in dataset_group: if not run.ranked_edges: print( f"Warning: no algorithm predictions for run '{run.run_id}' " f"in dataset '{dataset_group.dataset_id}', skipping." ) continue aggregations = _compute_borda_aggregations(run.ranked_edges) if not aggregations: print( f"Warning: Borda aggregation produced no output for run " f"'{run.run_id}' in dataset '{dataset_group.dataset_id}', skipping." ) continue for method, ranked_df in aggregations.items(): # Index the Score series by (Gene1, Gene2) tuples for alignment idx = pd.MultiIndex.from_arrays( [ranked_df['Gene1'], ranked_df['Gene2']], names=['Gene1', 'Gene2'] ) per_method[method][run.run_id] = pd.Series( ranked_df['Score'].values, index=idx, name=run.run_id ) # Skip dataset if no runs produced any output if not any(per_method[m] for m in self._METHOD_COLUMNS): continue dataset_group.dataset_path.mkdir(parents=True, exist_ok=True) # For each method, combine runs, take the median across runs, # then normalize to [0, 1] using min-max normalization. col_series = {} for method, col_name in self._METHOD_COLUMNS.items(): run_series = per_method[method] if not run_series: continue # rows = (Gene1, Gene2), columns = run_ids runs_df = pd.concat(run_series.values(), axis=1) median_scores = runs_df.median(axis=1) col_series[col_name] = pd.Series( _normalize(median_scores.values), index=median_scores.index, ) if not col_series: continue out_df = pd.DataFrame(col_series) out_df.index.names = ['Gene1', 'Gene2'] out_df = out_df.reset_index().sort_values('BORDA', ascending=False) out_path = dataset_group.dataset_path / 'Borda.csv' out_df.to_csv(out_path, index=False) print(f"Wrote Borda.csv to {out_path}")