import argparse
import pathlib
from textwrap import dedent
import time
from typing import List
from types import ModuleType, SimpleNamespace
import numpy as np
The provided Python script is a command-line utility for a pipeline, typically used for processing data. Here’s a breakdown of its functionality and structure:
Imports and Definitions: The script starts by importing necessary modules and defining some functions. This includes modules for argument parsing (
argparse
), file path handling (pathlib
), logging, parallel processing, and specific functions from other modules within the same package (prefixed with_
).Argument Parsing: The script uses the
argparse
module to define command-line arguments that control the behavior of the pipeline. These arguments include:config
: Path to a configuration file specifying pipeline settings.create-config
: Creates a template configuration file.steps
: Defines specific processing steps or groups of steps to run.root-dir
,deriv_root
,subject
,session
,task
,run
: Specify paths and identifiers for the data to process.n_jobs
: Number of parallel processes to execute.interactive
,debug
,no-cache
: Flags for interactive mode, debugging, and disabling caching.
Argument Validation: The script checks if the provided arguments are valid, particularly focusing on the configuration file.
Processing Steps Identification: Based on the
steps
argument, the script identifies which processing steps or stages are to be executed. It does this by parsing thesteps
argument and mapping them to corresponding modules.Configuration Loading and Overrides: The script loads the pipeline configuration from the specified file and applies any overrides from the command-line arguments.
Pipeline Execution:
- The script iterates over the identified processing steps.
- For each step, it logs the start, executes the main function of the corresponding module (which is where the actual processing happens), and then logs the time taken for the step.
- This execution can be parallelized based on the
n_jobs
argument.
Logging and Error Handling: Throughout its execution, the script logs various messages, including errors and execution time for each step. The
--debug
option enables additional debugging information on error.Pipeline Configuration and Modular Design: The script is designed to be modular, where each step in the pipeline is encapsulated in a separate module. The pipeline’s behavior is controlled by a configuration file, allowing for flexible and customizable data processing.
Interactive and Cache Options: The script supports interactive mode and can disable caching of intermediate results, providing flexibility for different use cases.
Overall, the script is a command-line interface for a data processing pipeline, where the specific processing steps are modularized and controlled via a configuration file. The use of command-line arguments allows for flexible execution of different parts of the pipeline.
from ._config_utils import _get_step_modules
from ._config_import import _import_config
from ._config_template import create_template_config
from ._logging import logger, gen_log_kwargs
from ._parallel import get_parallel_backend
from ._run import _short_step_path
def main():
from . import __version__
= argparse.ArgumentParser()
parser
parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}"
)"config", nargs="?", default=None)
parser.add_argument(
parser.add_argument("--config",
="config_switch",
dest=None,
default="FILE",
metavarhelp="The path of the pipeline configuration file to use.",
)
parser.add_argument("--create-config",
="create_config",
dest=None,
default="FILE",
metavarhelp="Create a template configuration file with the specified name. "
"If specified, all other parameters will be ignored.",
),
parser.add_argument("--steps",
="steps",
dest="all",
defaulthelp=dedent(
"""\
The processing steps to run.
Can either be one of the processing groups 'preprocessing', sensor',
'source', 'report', or 'all', or the name of a processing group plus
the desired step sans the step number and
filename extension, separated by a '/'. For example, to run ICA, you
would pass 'sensor/run_ica`. If unspecified, will run all processing
steps. Can also be a tuple of steps."""
),
)
parser.add_argument("--root-dir",
="root_dir",
dest=None,
defaulthelp="BIDS root directory of the data to process.",
)
parser.add_argument("--deriv_root",
="deriv_root",
dest=None,
defaulthelp=dedent(
"""\
The root of the derivatives directory
in which the pipeline will store the processing results.
If unspecified, this will be derivatives/mne-bids-pipeline
inside the BIDS root."""
),
),
parser.add_argument("--subject", dest="subject", default=None, help="The subject to process."
)
parser.add_argument("--session", dest="session", default=None, help="The session to process."
)
parser.add_argument("--task", dest="task", default=None, help="The task to process."
)"--run", dest="run", default=None, help="The run to process.")
parser.add_argument(
parser.add_argument("--n_jobs",
="n_jobs",
desttype=int,
=None,
defaulthelp="The number of parallel processes to execute.",
)
parser.add_argument("--interactive",
="interactive",
dest="store_true",
actionhelp="Enable interactive mode.",
)
parser.add_argument("--debug", dest="debug", action="store_true", help="Enable debugging on error."
)
parser.add_argument("--no-cache",
="no_cache",
dest="store_true",
actionhelp="Disable caching of intermediate results.",
)= parser.parse_args()
options
if options.create_config is not None:
= pathlib.Path(options.create_config)
target_path =target_path, overwrite=False)
create_template_config(target_pathreturn
= options.config
config = options.config_switch
config_switch = False
bad if config is None:
if config_switch is None:
= "neither was provided"
bad else:
= config_switch
config elif config_switch is not None:
= "both were provided"
bad if bad:
parser.error("❌ You must specify a configuration file either as a single "
f"argument or with --config, but {bad}."
)= options.steps
steps = options.root_dir
root_dir = options.deriv_root
deriv_root = options.subject, options.session
subject, session = options.task, options.run
task, run = options.n_jobs
n_jobs = options.interactive, options.debug
interactive, debug = not options.no_cache
cache
if isinstance(steps, str) and "," in steps:
# Work around limitation in Fire: --steps=foo,bar/baz won't produce a
# tuple ('foo', 'bar/baz'), but a string 'foo,bar/baz'.
= tuple(steps.split(","))
steps elif isinstance(steps, str):
= (steps,)
steps
= "debug" if debug else None
on_error = "1" if cache else "0"
cache
= []
processing_stages = []
processing_steps for steps_ in steps:
if "/" in steps_:
= steps_.split("/")
stage, step
processing_stages.append(stage)
processing_steps.append(step)else:
# User specified "sensor", "preprocessing" or similar, but without
# any further grouping.
processing_stages.append(steps_)None)
processing_steps.append(
= pathlib.Path(config).expanduser().resolve(strict=True)
config_path = SimpleNamespace()
overrides if root_dir:
= pathlib.Path(root_dir).expanduser().resolve(strict=True)
overrides.bids_root if deriv_root:
= (
overrides.deriv_root =False)
pathlib.Path(deriv_root).expanduser().resolve(strict
)if subject:
= [subject]
overrides.subjects if session:
= [session]
overrides.sessions if task:
= task
overrides.task if run:
= run
overrides.runs if interactive:
= interactive
overrides.interactive if n_jobs:
= int(n_jobs)
overrides.n_jobs if on_error:
= on_error
overrides.on_error if not cache:
= False
overrides.memory_location
= []
step_modules: List[ModuleType] = _get_step_modules()
STEP_MODULES for stage, step in zip(processing_stages, processing_steps):
if stage not in STEP_MODULES.keys():
raise ValueError(
f"Invalid step requested: '{stage}'. "
f"It should be one of {list(STEP_MODULES.keys())}."
)
if step is None:
# User specified `sensors`, `source`, or similar
step_modules.extend(STEP_MODULES[stage])else:
# User specified 'stage/step'
for step_module in STEP_MODULES[stage]:
= pathlib.Path(step_module.__file__).name
step_name if step in step_name:
step_modules.append(step_module)break
else:
# We've iterated over all steps, but none matched!
raise ValueError(f"Invalid steps requested: {stage}/{step}")
if processing_stages[0] != "all":
# Always run the directory initialization steps, but skip for 'all',
# because it already includes them – and we want to avoid running
# them twice.
= [*STEP_MODULES["init"], *step_modules]
step_modules
"Welcome aboard MNE-BIDS-Pipeline! 👋")
logger.title(= f"Using configuration: {config}"
msg = pathlib.Path(__file__) # used for logging
__mne_bids_pipeline_step__ **gen_log_kwargs(message=msg, emoji="📝"))
logger.info(= _import_config(
config_imported =config_path,
config_path=overrides,
overrides
)# Initialize dask now
with get_parallel_backend(config_imported.exec_params):
pass
del __mne_bids_pipeline_step__
logger.end()
for step_module in step_modules:
= time.time()
start = _short_step_path(pathlib.Path(step_module.__file__))
step =f"{step}")
logger.title(title=config_imported)
step_module.main(config= time.time() - start
elapsed = divmod(elapsed, 3600)
hours, remainder = int(hours)
hours = divmod(remainder, 60)
minutes, seconds = int(minutes)
minutes = int(np.ceil(seconds)) # always take full seconds
seconds = f"{seconds}s"
elapsed if minutes:
= f"{minutes}m {elapsed}"
elapsed if hours:
= f"{hours}h {elapsed}"
elapsed f"done ({elapsed})") logger.end(