"""
BEELINE Run (:mod:`BLRun`) module contains the following main class:
- :class:`BLRun.BLRun` and three additional classes used in the definition of BLRun class
- :class:`BLRun.ConfigParser`
- :class:`BLRun.InputSettings`
- :class:`BLRun.OutputSettings`
"""
import yaml
import argparse
import itertools
from collections import defaultdict
from pathlib import Path
import multiprocessing
from multiprocessing import Pool, cpu_count
import concurrent.futures
from typing import Dict, List
import yaml
import argparse
import itertools
from collections import defaultdict
from pathlib import Path
import multiprocessing
from multiprocessing import Pool, cpu_count
import concurrent.futures
from typing import Dict, List
from BLRun.runner import Runner
import os
import pandas as pd
[docs]class OutputSettings(object):
'''
Structure for storing the names of directories that output should
be written to
'''
def __init__(self, base_dir, output_prefix: Path) -> None:
self.base_dir = base_dir
self.output_prefix = output_prefix
[docs]class BLRun(object):
'''
The BLRun object is created by parsing a user-provided configuration
file. Its methods provide for further processing its inputs into
a series of jobs to be run, as well as running these jobs.
'''
def __init__(self,
input_settings: InputSettings,
output_settings: OutputSettings) -> None:
self.input_settings = input_settings
self.output_settings = output_settings
self.runners: Dict[int, Runner] = self.__create_runners()
def __create_runners(self) -> Dict[int, List[Runner]]:
'''
Instantiate the set of runners based on parameters provided via the
configuration file. Each runner is supplied an interactome, collection,
the set of algorithms to be run, and graphspace credentials, in
addition to the custom parameters each runner may or may not define.
'''
runners: Dict[int, Runner] = defaultdict(list)
order = 0
for dataset in self.input_settings.datasets:
for runner in self.input_settings.algorithms:
data = {}
data['name'] = runner[0]
data['params'] = runner[1]
data['inputDir'] = Path.cwd().joinpath(self.input_settings.datadir.joinpath(dataset['name']))
data['exprData'] = dataset['exprData']
data['cellData'] = dataset['cellData']
data['trueEdges'] = dataset['trueEdges']
if 'should_run' in data['params'] and \
data['params']['should_run'] is False:
print("Skipping %s" % (data['name']))
continue
runners[order] = Runner(data)
order += 1
return runners
[docs] def execute_runners(self, parallel=False, num_threads=1):
'''
Run each of the algorithms
'''
base_output_dir = self.output_settings.base_dir
batches = self.runners.keys()
for batch in batches:
if parallel==True:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
futures = [executor.submit(runner.run, base_output_dir)
for runner in self.runners[batch]]
# https://stackoverflow.com/questions/35711160/detect-failed-tasks-in-concurrent-futures
# Re-raise exception if produced
for future in concurrent.futures.as_completed(futures):
future.result()
executor.shutdown(wait=True)
else:
for runner in self.runners[batch]:
runner.run(output_dir=base_output_dir)
[docs]class ConfigParser(object):
'''
Define static methods for parsing a config file that sets a large number
of parameters for the pipeline
'''
[docs] @staticmethod
def parse(config_file_handle) -> BLRun:
config_map = yaml.load(config_file_handle)
return BLRun(
ConfigParser.__parse_input_settings(
config_map['input_settings']),
ConfigParser.__parse_output_settings(
config_map['output_settings']))
@staticmethod
def __parse_input_settings(input_settings_map) -> InputSettings:
input_dir = input_settings_map['input_dir']
dataset_dir = input_settings_map['dataset_dir']
datasets = input_settings_map['datasets']
return InputSettings(
Path(input_dir, dataset_dir),
datasets,
ConfigParser.__parse_algorithms(
input_settings_map['algorithms']))
@staticmethod
def __parse_algorithms(algorithms_list):
algorithms = []
for algorithm in algorithms_list:
combos = [dict(zip(algorithm['params'], val))
for val in itertools.product(
*(algorithm['params'][param]
for param in algorithm['params']))]
for combo in combos:
algorithms.append([algorithm['name'],combo])
return algorithms
@staticmethod
def __parse_output_settings(output_settings_map) -> OutputSettings:
output_dir = Path(output_settings_map['output_dir'])
output_prefix = Path(output_settings_map['output_prefix'])
return OutputSettings(output_dir,
output_prefix)