import math
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Iterator, List, Tuple
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
[docs]def get_algo_ids(config: dict) -> List[str]:
"""
Return the list of enabled algorithm IDs from the config.
Parameters
----------
config : dict
Parsed YAML configuration.
Returns
-------
list[str]
Algorithm IDs whose should_run flag is truthy.
"""
if not isinstance(config, dict):
raise TypeError(f"config must be dict, got {type(config)}")
algos = []
for a in config['input_settings'].get('algorithms', []):
flag = a.get('should_run', [False])
if flag[0] if isinstance(flag, list) else flag:
algos.append(a['algorithm_id'])
return algos
[docs]def iter_datasets_with_runs(
config: dict,
root: Path,
) -> Iterator[Tuple[str, str, Path, Path, list]]:
"""
Yield (dataset_id, dataset_label, dataset_path, gt_path, runs) for each
enabled dataset.
Extends iter_datasets by also yielding the raw list of run dicts from the
config, so callers can determine run count and resolve per-run paths via
dataset_path / run['run_id'].
Parameters
----------
config : dict
Parsed YAML configuration.
root : Path
Working directory from which config paths are resolved.
Yields
------
tuple of (str, str, Path, Path, list)
dataset_id, dataset_label (nickname if set else dataset_id), output
dataset directory, ground truth file path, list of run dicts from
config (each has at least 'run_id').
"""
if not isinstance(config, dict):
raise TypeError(f"config must be dict, got {type(config)}")
if not isinstance(root, Path):
raise TypeError(f"root must be Path, got {type(root)}")
input_settings = config['input_settings']
output_settings = config['output_settings']
input_dir = (root / input_settings['input_dir']).resolve()
output_dir = (root / output_settings['output_dir']).resolve()
# experiment_id : str — optional; when set, an experiment_id segment is
# inserted between output_dir and the dataset path.
experiment_id = output_settings.get('experiment_id', '')
if experiment_id:
output_dir = output_dir / experiment_id
for ds in input_settings.get('datasets', []):
should_run = ds.get('should_run', [True])
if not (should_run[0] if isinstance(should_run, list) else should_run):
continue
dataset_id = ds['dataset_id']
# dataset_label : str — nickname used for plot labels; defaults to
# dataset_id when 'nickname' is absent from the dataset config entry.
dataset_label = ds.get('nickname', dataset_id)
gt_filename = ds.get('groundTruthNetwork', 'GroundTruthNetwork.csv')
dataset_path = output_dir / dataset_id
gt_path = input_dir / dataset_id / gt_filename
# Resolve the run list: either scan input subdirectories or use the
# explicit 'runs' list from the config.
if ds.get('scan_run_subdirectories'):
# ds_input_path : Path — input_dir/dataset_id/
ds_input_path = input_dir / dataset_id
if not ds_input_path.is_dir():
raise FileNotFoundError(
f"scan_run_subdirectories is set for dataset '{dataset_id}' "
f"but input directory '{ds_input_path}' does not exist."
)
runs = [{'run_id': d.name}
for d in sorted(ds_input_path.iterdir()) if d.is_dir()]
if not runs:
raise RuntimeError(
f"scan_run_subdirectories is set for dataset '{dataset_id}' "
f"but no subdirectories were found in '{ds_input_path}'."
)
else:
runs = ds.get('runs', [])
yield dataset_id, dataset_label, dataset_path, gt_path, runs
[docs]def iter_datasets(
config: dict,
root: Path,
) -> Iterator[Tuple[str, str, Path, Path]]:
"""
Yield (dataset_id, dataset_label, dataset_path, gt_path) for each enabled
dataset in config.
Parameters
----------
config : dict
Parsed YAML configuration.
root : Path
Working directory from which config paths are resolved.
Yields
------
tuple of (str, str, Path, Path)
dataset_id, dataset_label (nickname if set else dataset_id), output
dataset directory, ground truth file path.
"""
if not isinstance(config, dict):
raise TypeError(f"config must be dict, got {type(config)}")
if not isinstance(root, Path):
raise TypeError(f"root must be Path, got {type(root)}")
input_settings = config['input_settings']
output_settings = config['output_settings']
input_dir = (root / input_settings['input_dir']).resolve()
output_dir = (root / output_settings['output_dir']).resolve()
# experiment_id : str — optional; when set, an experiment_id segment is
# inserted between output_dir and the dataset path.
experiment_id = output_settings.get('experiment_id', '')
if experiment_id:
output_dir = output_dir / experiment_id
for ds in input_settings.get('datasets', []):
should_run = ds.get('should_run', [True])
if not (should_run[0] if isinstance(should_run, list) else should_run):
continue
dataset_id = ds['dataset_id']
# dataset_label : str — nickname used for plot labels; defaults to
# dataset_id when 'nickname' is absent from the dataset config entry.
dataset_label = ds.get('nickname', dataset_id)
gt_filename = ds.get('groundTruthNetwork', 'GroundTruthNetwork.csv')
dataset_path = output_dir / dataset_id
gt_path = input_dir / dataset_id / gt_filename
yield dataset_id, dataset_label, dataset_path, gt_path
[docs]def random_classifier_baseline(gt_path: Path) -> float:
"""
Compute the expected precision of a random predictor for a dataset.
Equals k / (n*(n-1)) where k is the number of non-self-loop ground truth
edges and n is the number of unique genes. This is the random baseline for
both AUPRC and early precision (the PR curve of a random predictor is flat
at height k / n_possible).
Parameters
----------
gt_path : Path
Path to the ground truth edge list CSV (columns Gene1, Gene2).
Returns
-------
float
Random predictor baseline in (0, 1], or nan if undefined (empty network).
"""
if not isinstance(gt_path, Path):
raise TypeError(f"gt_path must be Path, got {type(gt_path)}")
gt = pd.read_csv(gt_path, header=0)
gt = gt[gt['Gene1'] != gt['Gene2']]
k = len(gt)
genes = set(gt['Gene1']).union(set(gt['Gene2']))
n = len(genes)
n_possible = n * (n - 1)
if n_possible == 0 or k == 0:
return float('nan')
return k / n_possible
[docs]def load_dataset_metric(
dataset_path: Path,
metric_csv: str,
) -> Dict[str, List[float]]:
"""
Load per-algorithm metric values from one dataset's pre-computed CSV.
The CSV is expected to have rows = algorithms (index column) and columns
= run_ids. Returns an empty dict (with a warning) if the file is missing.
Parameters
----------
dataset_path : Path
Output directory for the dataset (contains the metric CSV).
metric_csv : str
Filename of the metric CSV (e.g. 'AUPRC.csv').
Returns
-------
dict[str, list[float]]
Algorithm name -> list of non-NaN run values for this dataset.
"""
if not isinstance(dataset_path, Path):
raise TypeError(f"dataset_path must be Path, got {type(dataset_path)}")
if not isinstance(metric_csv, str):
raise TypeError(f"metric_csv must be str, got {type(metric_csv)}")
csv_path = dataset_path / metric_csv
if not csv_path.exists():
print(f"Warning: {csv_path} not found, skipping.")
return {}
df = pd.read_csv(csv_path, index_col=0)
return {
str(algo): [v for v in df.loc[algo].tolist() if not math.isnan(v)]
for algo in df.index
}
[docs]class Plotter(ABC):
"""
Abstract base class for BEELINE plot generators.
Each subclass implements __call__ to read pre-computed evaluation CSVs and
write one or more plot files to a caller-specified output directory.
Shared loading and rendering helpers are provided as module-level functions
in this module: iter_datasets, iter_datasets_with_runs, load_dataset_metric,
make_box_figure, random_classifier_baseline.
"""
@abstractmethod
def __call__(self, config: dict, output_dir: Path, root: Path) -> None:
"""
Generate plots from pre-computed evaluation CSVs.
Parameters
----------
config : dict
Parsed YAML configuration.
output_dir : Path
Directory where plot files are written.
root : Path
Working directory from which config paths are resolved.
Returns
-------
None
"""
...