Source code for BLEval.Motifs

from typing import Dict, Set, Tuple

import networkx as nx
import pandas as pd

from BLEval.evaluator import Evaluator
from BLEval.data import EvaluationData, DatasetGroup


def _build_digraph(edges: Set[Tuple[str, str]]) -> nx.DiGraph:
    """
    Build a directed graph from a set of edges, excluding self-loops.

    Parameters
    ----------
    edges : set of (str, str)
        Directed edges as (Gene1, Gene2) tuples.

    Returns
    -------
    nx.DiGraph
    """
    if not isinstance(edges, set):
        raise TypeError(f"edges must be set, got {type(edges)}")

    G = nx.DiGraph()
    G.add_edges_from((a, b) for a, b in edges if a != b)
    return G


def _count_mi(G: nx.DiGraph) -> int:
    """
    Count two-node mutual interactions (MI) in a directed graph.

    A mutual interaction exists when both A→B and B→A are present.
    Each pair {A, B} is counted once.

    Parameters
    ----------
    G : nx.DiGraph

    Returns
    -------
    int
        Number of mutual interaction pairs.
    """
    return sum(1 for a, b in G.edges() if G.has_edge(b, a)) // 2


def _count_fbl(G: nx.DiGraph) -> int:
    """
    Count three-node feedback loops (FBL) in a directed graph.

    A feedback loop is a directed cycle of exactly three distinct nodes:
    A→B→C→A. Each cycle is counted once regardless of its starting node.

    Parameters
    ----------
    G : nx.DiGraph

    Returns
    -------
    int
        Number of three-node feedback loops.
    """
    return sum(1 for cycle in nx.simple_cycles(G) if len(cycle) == 3)


def _count_ffl(G: nx.DiGraph) -> int:
    """
    Count three-node feedforward loops (FFL) in a directed graph.

    A feedforward loop consists of three distinct nodes A, B, C where
    A→B, A→C, and B→C all exist. Node A is the source that regulates
    both B (directly and indirectly via B→C) and C directly.

    Parameters
    ----------
    G : nx.DiGraph

    Returns
    -------
    int
        Number of three-node feedforward loops.
    """
    count = 0
    for a in G.nodes():
        out_a = set(G.successors(a))
        for b in out_a:
            # C must be a successor of B, a successor of A, and distinct from A and B
            for c in G.successors(b):
                if c in out_a and c != a and c != b:
                    count += 1
    return count


def _motif_counts(edges: Set[Tuple[str, str]]) -> Tuple[int, int, int]:
    """
    Compute FBL, FFL, and MI motif counts for a set of directed edges.

    Parameters
    ----------
    edges : set of (str, str)
        Directed edges as (Gene1, Gene2) tuples.

    Returns
    -------
    tuple of (int, int, int)
        Counts of (FBL, FFL, MI) motifs.
    """
    G = _build_digraph(edges)
    return _count_fbl(G), _count_ffl(G), _count_mi(G)


def _safe_ratio(numerator: int, denominator: int) -> float:
    """
    Compute numerator / denominator, returning nan when denominator is zero.

    Parameters
    ----------
    numerator : int
    denominator : int

    Returns
    -------
    float
    """
    if denominator == 0:
        return float('nan')
    return numerator / denominator


[docs]class Motifs(Evaluator): """ Evaluator that computes ratios of three-node feedback loop (FBL), three-node feedforward loop (FFL), and two-node mutual interaction (MI) motif counts between the predicted top-k network and the reference network. For each algorithm and run, the top-k predicted edges (k = number of ground truth edges, excluding self-loops) are used to build a directed graph. Motif counts in that graph are divided by the corresponding counts in the ground truth network to produce dimensionless ratios. For each DatasetGroup, writes three CSV files to dataset_path: - motifs_FBL.csv — three-node feedback loop ratios - motifs_FFL.csv — three-node feedforward loop ratios - motifs_MI.csv — two-node mutual interaction ratios Each CSV has algorithms as rows and run_ids as columns. """ def __call__(self, evaluation_data: EvaluationData) -> None: """ Compute motif ratios per algorithm per run and write results to dataset_path/motifs_{FBL,FFL,MI}.csv. Parameters ---------- evaluation_data : EvaluationData Loaded predicted networks organised by dataset and run. Returns ------- None """ if not isinstance(evaluation_data, EvaluationData): raise TypeError( f"evaluation_data must be EvaluationData, got {type(evaluation_data)}" ) for dataset_group in evaluation_data: # Load the reference network from the first run with an accessible # ground truth. All runs in a DatasetGroup share the same ground truth. ref_edges: Set[Tuple[str, str]] = None for run in dataset_group: if run.ground_truth_path.exists(): gt_df = self._load_ground_truth(run.ground_truth_path) ref_edges = set(zip(gt_df['Gene1'], gt_df['Gene2'])) break if ref_edges is None: print( f"Warning: no ground truth found for dataset " f"'{dataset_group.dataset_id}', skipping motif computation." ) continue # k = number of ground truth edges excluding self-loops k = sum(1 for g1, g2 in ref_edges if g1 != g2) if k == 0: print( f"Warning: ground truth for dataset '{dataset_group.dataset_id}' " "has no non-self-loop edges, skipping motif computation." ) continue # Motif counts in the reference network ref_fbl, ref_ffl, ref_mi = _motif_counts(ref_edges) # results_X[algo][run_id] = ratio for motif X results_fbl: Dict[str, Dict[str, float]] = {} results_ffl: Dict[str, Dict[str, float]] = {} results_mi: Dict[str, Dict[str, float]] = {} for run in dataset_group: for algo, ranked_edges_df in run.ranked_edges.items(): # Select top-k predicted edges, excluding self-loops no_self = ranked_edges_df[ ranked_edges_df['Gene1'] != ranked_edges_df['Gene2'] ] top_k_df = no_self.iloc[no_self['EdgeWeight'].abs().argsort()[::-1]].head(k) top_k_edges = set(zip(top_k_df['Gene1'], top_k_df['Gene2'])) pred_fbl, pred_ffl, pred_mi = _motif_counts(top_k_edges) results_fbl.setdefault(algo, {})[run.run_id] = _safe_ratio(pred_fbl, ref_fbl) results_ffl.setdefault(algo, {})[run.run_id] = _safe_ratio(pred_ffl, ref_ffl) results_mi.setdefault(algo, {})[run.run_id] = _safe_ratio(pred_mi, ref_mi) if not results_fbl: continue dataset_group.dataset_path.mkdir(parents=True, exist_ok=True) for label, results in [('FBL', results_fbl), ('FFL', results_ffl), ('MI', results_mi)]: out_df = pd.DataFrame(results).T out_df.index.name = 'Algorithm' out_path = dataset_group.dataset_path / f'motifs_{label}.csv' out_df.to_csv(out_path) print(f"Wrote motif ({label}) results to {out_path}")