from abc import ABC, abstractmethod
from pathlib import Path
import subprocess
import pandas as pd
[docs]class Runner(ABC):
"""
Abstract base_input class for BEELINE GRN inference algorithm runners.
Subclasses must implement generateInputs, run, and parseOutput.
Attributes set here reflect the fields accessed by runner implementations.
"""
def __init__(self, root: Path, config: dict):
"""
Parameters
----------
root : Path
Root path from which all subpaths are resolved.
config : dict
Merged configuration for a single dataset + algorithm run.
Expected structure:
input:
input_dir: <str> input directory (absolute, or relative to root)
dataset:
dataset_id: <str> dataset group label (path segment under input_dir)
run_id: <str> run label (path segment under dataset_id)
exprData: <str> expression data filename
pseudoTimeData: <str> pseudotime data filename
groundTruthNetwork: <str> ground truth network filename
output_settings:
output_dir: <str> output directory (absolute, or relative to root)
algo_name: <str> name of the algorithm (appended to output_dir)
params: <dict> algorithm-specific parameters
"""
inp = config['input']
ds = config['dataset']
input_dir_path = Path(inp['input_dir'])
output_dir_path = Path(config['output_settings']['output_dir'])
# experiment_id : str — optional; when set, an experiment_id segment is
# inserted between output_dir and the dataset path so multiple experiment
# runs can coexist under the same base output directory.
experiment_id_prefix = config['output_settings'].get('experiment_id', '')
base_input = input_dir_path if input_dir_path.is_absolute() else root / input_dir_path
base_output = output_dir_path if output_dir_path.is_absolute() else root / output_dir_path
if experiment_id_prefix:
base_output = base_output / experiment_id_prefix
base_output = base_output / ds['dataset_id'] / ds['run_id'] / config['algo_name']
base_input = base_input.resolve()
base_output = base_output.resolve()
# input_dir: run-level input directory (expression data, pseudo-time).
self.input_dir = base_input / ds['dataset_id'] / ds['run_id']
self.output_dir = base_output
self.working_dir = base_output / "working_dir"
# Erase working directory so stale inputs from prior runs are not reused.
if self.working_dir.exists():
for item in sorted(self.working_dir.rglob('*'), reverse=True):
item.unlink() if (item.is_file() or item.is_symlink()) else item.rmdir()
self.working_dir.rmdir()
# Pre-create output_dir and working_dir so docker cannot claim them as root.
self.output_dir.mkdir(parents=True, exist_ok=True)
self.working_dir.mkdir(parents=True, exist_ok=True)
# Precompute progress message for CLI output.
self.running_message = (
f"Running {config['algo_name']} | dataset: {ds['dataset_id']} | run: {ds['run_id']}"
)
self.exprData = ds.get('exprData', 'ExpressionData.csv')
self.pseudoTimeData = ds.get('pseudoTimeData', 'PseudoTime.csv')
self.groundTruthNetwork = ds.get('groundTruthNetwork', 'GroundTruthNetwork.csv')
# ground_truth_file: full path to the dataset-level ground truth CSV.
self.ground_truth_file = base_input / ds['dataset_id'] / self.groundTruthNetwork
# image: Docker image name used to run this algorithm (e.g. "grnbeeline/genie3:base").
# Mandatory — every algorithm entry in the config must supply this field.
if 'image' not in config or not config['image']:
raise ValueError("Algorithm config must include a non-empty 'image' field.")
if not isinstance(config['image'], str):
raise TypeError(f"'image' must be a str, got {type(config['image'])}")
self.image = config['image']
# Unwrap single-element lists so runners receive scalar values.
# YAML config files commonly wrap param values in brackets
# (e.g. `pVal: [0.01]`), which YAML parses as a list.
raw_params = config.get('params', {})
self.params = {
k: (v[0] if isinstance(v, list) and len(v) == 1 else v)
for k, v in raw_params.items()
}
[docs] @abstractmethod
def run(self):
"""Execute the inference algorithm."""
[docs] @abstractmethod
def parseOutput(self) -> None:
"""
Parse raw algorithm output and write a ranked edge list to disk.
Implementations should build a DataFrame with columns Gene1, Gene2,
EdgeWeight and pass it to self._write_ranked_edges(). Returns early
without writing if the expected output file is missing.
"""
def _run_docker(self, cmd: str, append: bool = False) -> None:
"""
Execute a shell command and write combined stdout/stderr to output.txt.
Parameters
----------
cmd : str
Shell command to execute (passed to the shell verbatim).
append : bool
If True, append to an existing output.txt. Use for runners that
invoke docker in a loop so all container output is collected in
one file. Defaults to False (overwrite).
"""
if not isinstance(cmd, str):
raise TypeError(f"cmd must be str, got {type(cmd)}")
if not isinstance(append, bool):
raise TypeError(f"append must be bool, got {type(append)}")
mode = 'a' if append else 'w'
with open(self.output_dir / 'output.txt', mode) as f:
proc = subprocess.Popen(
cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True,
)
for line in proc.stdout:
f.write(line)
f.flush()
proc.wait()
if proc.returncode != 0:
raise RuntimeError(
f"Docker command failed (exit {proc.returncode}). "
f"See {self.output_dir / 'output.txt'} for details."
)
def _write_ranked_edges(self, df: pd.DataFrame) -> None:
"""
Write a ranked edge list to self.output_dir/rankedEdges.csv.
Parameters
----------
df : pd.DataFrame
DataFrame with columns Gene1, Gene2, EdgeWeight.
Returns
-------
None
Raises
------
FileNotFoundError
If self.output_dir does not exist at the time of writing.
TypeError
If df is not a pd.DataFrame.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError(f"df must be pd.DataFrame, got {type(df)}")
if not self.output_dir.is_dir():
raise FileNotFoundError(
f"Output directory does not exist: {self.output_dir}")
df.to_csv(self.output_dir / 'rankedEdges.csv', sep='\t', index=False)