Source code for BLEval.data

from pathlib import Path
from typing import Dict, Iterator, List

import pandas as pd


[docs]class RunResult: """ Predicted networks and ground truth reference for a single run. ranked_edges maps algorithm name to its ranked edge list DataFrame (columns: Gene1, Gene2, EdgeWeight). Algorithms whose rankedEdges.csv is missing are omitted with a warning rather than raising an error. """ def __init__( self, run_id: str, ranked_edges: Dict[str, pd.DataFrame], ground_truth_path: Path, run_path: Path, ) -> None: # run_id : str — matches run_id from the config 'runs' list # ranked_edges : dict[str, pd.DataFrame] — keyed by algorithm_id # ground_truth_path : Path — path to the ground truth edge list file # run_path : Path — output directory for this run (output_dir/dataset_id/run_id) if not isinstance(run_id, str): raise TypeError(f"run_id must be str, got {type(run_id)}") if not isinstance(ranked_edges, dict): raise TypeError(f"ranked_edges must be dict, got {type(ranked_edges)}") if not isinstance(ground_truth_path, Path): raise TypeError(f"ground_truth_path must be Path, got {type(ground_truth_path)}") if not isinstance(run_path, Path): raise TypeError(f"run_path must be Path, got {type(run_path)}") self.run_id: str = run_id self.ranked_edges: Dict[str, pd.DataFrame] = ranked_edges self.ground_truth_path: Path = ground_truth_path self.run_path: Path = run_path
[docs]class DatasetGroup: """ All runs belonging to one dataset entry in the config. Runs in a DatasetGroup share the same ground truth network and represent multiple perturbations or noise realisations of the same biological system. Evaluation metrics (e.g. Jaccard, Spearman) are aggregated across runs within a group. """ def __init__(self, dataset_id: str, runs: List[RunResult], dataset_path: Path) -> None: # dataset_id : str — matches dataset_id from the config 'datasets' list # runs : list[RunResult] — one entry per run in the config 'runs' list # dataset_path : Path — output directory for this dataset (output_dir/dataset_id) if not isinstance(dataset_id, str): raise TypeError(f"dataset_id must be str, got {type(dataset_id)}") if not isinstance(runs, list) or not all(isinstance(r, RunResult) for r in runs): raise TypeError("runs must be a list of RunResult objects") if not isinstance(dataset_path, Path): raise TypeError(f"dataset_path must be Path, got {type(dataset_path)}") self.dataset_id: str = dataset_id self.runs: List[RunResult] = runs self.dataset_path: Path = dataset_path def __iter__(self) -> Iterator[RunResult]: return iter(self.runs)
[docs]class EvaluationData: """ Loads and organises predicted networks from the output directory, mirroring the hierarchical structure of the 'datasets' section of config.yaml. Top level: datasets (DatasetGroup), each grouping multiple runs that share the same ground truth. Within each run, ranked edge lists are keyed by algorithm name. Algorithms with missing output files are skipped. """ def __init__(self, config: dict, root: Path = None) -> None: # config : dict — parsed YAML configuration # root : Path — working directory; defaults to cwd if None if not isinstance(config, dict): raise TypeError(f"config must be dict, got {type(config)}") if root is None: root = Path.cwd() if not isinstance(root, Path): raise TypeError(f"root must be Path, got {type(root)}") self.datasets: List[DatasetGroup] = self._load(config, root) def _load(self, config: dict, root: Path) -> List[DatasetGroup]: """ Parse config and load rankedEdges DataFrames from disk. Parameters ---------- config : dict Parsed YAML configuration dictionary. root : Path Root directory from which relative paths are resolved. Returns ------- list[DatasetGroup] One DatasetGroup per enabled dataset entry in the config. """ input_settings = config['input_settings'] output_settings = config['output_settings'] input_dir = (root / input_settings['input_dir']).resolve() output_dir = (root / output_settings['output_dir']).resolve() # experiment_id : str — optional; when set, an experiment_id segment is # inserted between output_dir and the dataset path. experiment_id = output_settings.get('experiment_id', '') if experiment_id: output_dir = output_dir / experiment_id # Collect algorithm IDs that are enabled in the config. # should_run values may be bare booleans or single-element lists. algos: List[str] = [ a['algorithm_id'] for a in input_settings.get('algorithms', []) if (lambda v: v[0] if isinstance(v, list) else v)( a.get('should_run', [False]) ) ] groups: List[DatasetGroup] = [] for ds in input_settings.get('datasets', []): should_run = ds.get('should_run', [True]) if not (should_run[0] if isinstance(should_run, list) else should_run): continue dataset_id: str = ds['dataset_id'] # outputs / dataset_id dataset_path: Path = output_dir / dataset_id # Resolve the run list: either scan input subdirectories or use the # explicit 'runs' list from the config. if ds.get('scan_run_subdirectories'): # ds_input_path : Path — input_dir/dataset_id/ ds_input_path: Path = input_dir / dataset_id if not ds_input_path.is_dir(): raise FileNotFoundError( f"scan_run_subdirectories is set for dataset '{dataset_id}' " f"but input directory '{ds_input_path}' does not exist." ) run_dicts = [{'run_id': d.name} for d in sorted(ds_input_path.iterdir()) if d.is_dir()] if not run_dicts: raise RuntimeError( f"scan_run_subdirectories is set for dataset '{dataset_id}' " f"but no subdirectories were found in '{ds_input_path}'." ) else: run_dicts = ds.get('runs', []) runs: List[RunResult] = [] for run in run_dicts: run_id: str = run['run_id'] gt_filename: str = ds.get('groundTruthNetwork', 'GroundTruthNetwork.csv') # Ground truth: inputs / dataset_id / filename gt_path: Path = input_dir / dataset_id / gt_filename # outputs / dataset_id / run_id run_path: Path = output_dir / dataset_id / run_id # Ranked edges: run_path / algo / rankedEdges.csv ranked_edges: Dict[str, pd.DataFrame] = {} for algo in algos: edges_path = run_path / algo / 'rankedEdges.csv' if edges_path.exists(): ranked_edges[algo] = pd.read_csv(edges_path, sep='\t', header=0) else: print(f"Warning: {edges_path} not found, skipping.") runs.append(RunResult(run_id, ranked_edges, gt_path, run_path)) groups.append(DatasetGroup(dataset_id, runs, dataset_path)) return groups def __iter__(self) -> Iterator[DatasetGroup]: return iter(self.datasets)