from itertools import permutations
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
from scipy.stats import spearmanr
from BLEval.evaluator import Evaluator
from BLEval.data import EvaluationData, RunResult
def _to_series(df: pd.DataFrame) -> pd.Series:
"""
Convert a ranked-edge DataFrame to a signed-weight Series indexed by (Gene1, Gene2).
Self-loops are removed. Duplicate (Gene1, Gene2) pairs are deduplicated by
keeping the row with the highest absolute EdgeWeight, preserving its sign.
Parameters
----------
df : pd.DataFrame
Ranked edges with columns Gene1, Gene2, EdgeWeight.
Returns
-------
pd.Series
Signed EdgeWeight values indexed by (Gene1, Gene2) MultiIndex.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError(f"df must be DataFrame, got {type(df)}")
no_loops = df[df['Gene1'] != df['Gene2']].copy()
no_loops['_abs'] = no_loops['EdgeWeight'].abs()
no_loops = no_loops.sort_values('_abs', ascending=False).drop(columns=['_abs'])
no_loops = no_loops.drop_duplicates(subset=['Gene1', 'Gene2'], keep='first')
idx = pd.MultiIndex.from_arrays([no_loops['Gene1'], no_loops['Gene2']])
return pd.Series(no_loops['EdgeWeight'].values, index=idx)
def _compute_median_spearman(
runs: List[RunResult],
algo: str,
gene_set: set,
) -> Tuple[float, float]:
"""
Compute the median and mean absolute deviation of pairwise Spearman
correlations for one algorithm across runs.
Weight arrays for each run are precomputed once against the fixed edge
universe, then all pairwise correlations are obtained in a single
scipy.stats.spearmanr matrix call rather than one call per pair. Runs
whose weight vector is constant (all weights identical) are excluded before
the matrix call, since Spearman is undefined for constant inputs. Returns
(nan, nan) when fewer than two usable runs exist or all correlations are
undefined.
Parameters
----------
runs : list of RunResult
All runs within a DatasetGroup.
algo : str
Algorithm name to evaluate.
gene_set : set
Unique genes from the ground truth network. Determines the fixed edge
universe (all directed non-self-loop pairs) used for alignment.
Returns
-------
tuple of (float, float)
Median Spearman correlation and mean absolute deviation across all
pairwise run combinations, or (nan, nan) if the metric cannot be computed.
"""
if not isinstance(runs, list):
raise TypeError(f"runs must be list, got {type(runs)}")
if not isinstance(algo, str):
raise TypeError(f"algo must be str, got {type(algo)}")
if not isinstance(gene_set, set):
raise TypeError(f"gene_set must be set, got {type(gene_set)}")
algo_runs = [run for run in runs if algo in run.ranked_edges]
if len(algo_runs) < 2:
return float('nan'), float('nan')
# Build fixed edge universe once — all directed pairs among GT genes
possible_edges = list(permutations(sorted(gene_set), 2))
if not possible_edges:
return float('nan'), float('nan')
fixed_idx = pd.MultiIndex.from_tuples(possible_edges)
# Precompute weight array for each run once, skipping constant-weight runs
# (Spearman is undefined when one vector has zero variance).
weight_arrays: List[np.ndarray] = []
for run in algo_runs:
w = _to_series(run.ranked_edges[algo]).reindex(fixed_idx, fill_value=0.0).values
if not np.all(w == w[0]):
weight_arrays.append(w)
if len(weight_arrays) < 2:
return float('nan'), float('nan')
# Stack into (n_edges, n_runs) matrix and compute all pairwise correlations
# in one call; much faster than calling spearmanr once per pair.
matrix = np.column_stack(weight_arrays)
result = spearmanr(matrix)
# spearmanr returns a scalar correlation when there are exactly 2 columns,
# and a correlation matrix when there are more.
# Use .correlation (scipy < 1.7) which is available in all supported versions.
n = len(weight_arrays)
if n == 2:
corr_arr = np.array([float(result.correlation)])
else:
corr_matrix = np.asarray(result.correlation)
rows, cols = np.triu_indices(n, k=1)
corr_arr = corr_matrix[rows, cols]
corr_arr = corr_arr[~np.isnan(corr_arr)]
if corr_arr.size == 0:
return float('nan'), float('nan')
median = float(np.median(corr_arr))
mad = float(np.mean(np.abs(corr_arr - np.mean(corr_arr))))
return median, mad
[docs]class Spearman(Evaluator):
"""
Evaluator that measures how consistently each algorithm ranks edges across
runs within a DatasetGroup.
For each algorithm, all pairs of runs are compared by computing the Spearman
rank correlation of their signed EdgeWeight vectors over the fixed universe
of all directed gene pairs in the ground truth (missing edges receive weight
0, signed weights preserved). Weight arrays are precomputed once per run
and all pairwise correlations are obtained via a single matrix call to
scipy.stats.spearmanr. The median and mean absolute deviation of all
pairwise correlations are reported. For each DatasetGroup, writes
Spearman.csv to dataset_path. Rows are algorithms; columns are
MedianSpearman and MADSpearman.
"""
def __call__(self, evaluation_data: EvaluationData) -> None:
"""
Compute median pairwise Spearman correlation per algorithm and write
results to dataset_path/Spearman.csv.
Parameters
----------
evaluation_data : EvaluationData
Loaded predicted networks organised by dataset and run.
Returns
-------
None
"""
if not isinstance(evaluation_data, EvaluationData):
raise TypeError(
f"evaluation_data must be EvaluationData, got {type(evaluation_data)}"
)
for dataset_group in evaluation_data:
runs = dataset_group.runs
algos = sorted({algo for run in runs for algo in run.ranked_edges})
if not algos:
continue
# Load ground truth gene set from the first run with an existing file.
# All runs in a DatasetGroup share the same ground truth network.
gt_path = next(
(run.ground_truth_path for run in runs
if run.ground_truth_path.exists()),
None,
)
if gt_path is None:
print(
f"Warning: no ground truth found for dataset "
f"'{dataset_group.dataset_id}', skipping Spearman."
)
continue
gt_df = pd.read_csv(gt_path, header=0)
gene_set = set(gt_df['Gene1']).union(set(gt_df['Gene2']))
# results[algo] = (median, mad)
results: Dict[str, Tuple[float, float]] = {}
for algo in algos:
results[algo] = _compute_median_spearman(runs, algo, gene_set)
out_df = pd.DataFrame.from_dict(
results, orient='index', columns=['MedianSpearman', 'MADSpearman']
)
out_df.index.name = 'Algorithm'
dataset_group.dataset_path.mkdir(parents=True, exist_ok=True)
out_path = dataset_group.dataset_path / 'Spearman.csv'
out_df.to_csv(out_path)
print(f"Wrote Spearman results to {out_path}")