Understanding How MNE-BIDS_pipeline builds

Author

bot

Published

December 1, 2023

The provided Python script is a command-line utility for a pipeline, typically used for processing data. Here’s a breakdown of its functionality and structure:

  1. Imports and Definitions: The script starts by importing necessary modules and defining some functions. This includes modules for argument parsing (argparse), file path handling (pathlib), logging, parallel processing, and specific functions from other modules within the same package (prefixed with _).

  2. Argument Parsing: The script uses the argparse module to define command-line arguments that control the behavior of the pipeline. These arguments include:

    • config: Path to a configuration file specifying pipeline settings.
    • create-config: Creates a template configuration file.
    • steps: Defines specific processing steps or groups of steps to run.
    • root-dir, deriv_root, subject, session, task, run: Specify paths and identifiers for the data to process.
    • n_jobs: Number of parallel processes to execute.
    • interactive, debug, no-cache: Flags for interactive mode, debugging, and disabling caching.
  3. Argument Validation: The script checks if the provided arguments are valid, particularly focusing on the configuration file.

  4. Processing Steps Identification: Based on the steps argument, the script identifies which processing steps or stages are to be executed. It does this by parsing the steps argument and mapping them to corresponding modules.

  5. Configuration Loading and Overrides: The script loads the pipeline configuration from the specified file and applies any overrides from the command-line arguments.

  6. Pipeline Execution:

    • The script iterates over the identified processing steps.
    • For each step, it logs the start, executes the main function of the corresponding module (which is where the actual processing happens), and then logs the time taken for the step.
    • This execution can be parallelized based on the n_jobs argument.
  7. Logging and Error Handling: Throughout its execution, the script logs various messages, including errors and execution time for each step. The --debug option enables additional debugging information on error.

  8. Pipeline Configuration and Modular Design: The script is designed to be modular, where each step in the pipeline is encapsulated in a separate module. The pipeline’s behavior is controlled by a configuration file, allowing for flexible and customizable data processing.

  9. Interactive and Cache Options: The script supports interactive mode and can disable caching of intermediate results, providing flexibility for different use cases.

Overall, the script is a command-line interface for a data processing pipeline, where the specific processing steps are modularized and controlled via a configuration file. The use of command-line arguments allows for flexible execution of different parts of the pipeline.

import argparse
import pathlib
from textwrap import dedent
import time
from typing import List
from types import ModuleType, SimpleNamespace

import numpy as np
from ._config_utils import _get_step_modules
from ._config_import import _import_config
from ._config_template import create_template_config
from ._logging import logger, gen_log_kwargs
from ._parallel import get_parallel_backend
from ._run import _short_step_path
def main():
    from . import __version__

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--version", action="version", version=f"%(prog)s {__version__}"
    )
    parser.add_argument("config", nargs="?", default=None)
    parser.add_argument(
        "--config",
        dest="config_switch",
        default=None,
        metavar="FILE",
        help="The path of the pipeline configuration file to use.",
    )
    parser.add_argument(
        "--create-config",
        dest="create_config",
        default=None,
        metavar="FILE",
        help="Create a template configuration file with the specified name. "
        "If specified, all other parameters will be ignored.",
    ),
    parser.add_argument(
        "--steps",
        dest="steps",
        default="all",
        help=dedent(
            """\
        The processing steps to run.
        Can either be one of the processing groups 'preprocessing', sensor',
        'source', 'report',  or 'all',  or the name of a processing group plus
        the desired step sans the step number and
        filename extension, separated by a '/'. For example, to run ICA, you
        would pass 'sensor/run_ica`. If unspecified, will run all processing
        steps. Can also be a tuple of steps."""
        ),
    )
    parser.add_argument(
        "--root-dir",
        dest="root_dir",
        default=None,
        help="BIDS root directory of the data to process.",
    )
    parser.add_argument(
        "--deriv_root",
        dest="deriv_root",
        default=None,
        help=dedent(
            """\
        The root of the derivatives directory
        in which the pipeline will store the processing results.
        If unspecified, this will be derivatives/mne-bids-pipeline
        inside the BIDS root."""
        ),
    ),
    parser.add_argument(
        "--subject", dest="subject", default=None, help="The subject to process."
    )
    parser.add_argument(
        "--session", dest="session", default=None, help="The session to process."
    )
    parser.add_argument(
        "--task", dest="task", default=None, help="The task to process."
    )
    parser.add_argument("--run", dest="run", default=None, help="The run to process.")
    parser.add_argument(
        "--n_jobs",
        dest="n_jobs",
        type=int,
        default=None,
        help="The number of parallel processes to execute.",
    )
    parser.add_argument(
        "--interactive",
        dest="interactive",
        action="store_true",
        help="Enable interactive mode.",
    )
    parser.add_argument(
        "--debug", dest="debug", action="store_true", help="Enable debugging on error."
    )
    parser.add_argument(
        "--no-cache",
        dest="no_cache",
        action="store_true",
        help="Disable caching of intermediate results.",
    )
    options = parser.parse_args()

    if options.create_config is not None:
        target_path = pathlib.Path(options.create_config)
        create_template_config(target_path=target_path, overwrite=False)
        return

    config = options.config
    config_switch = options.config_switch
    bad = False
    if config is None:
        if config_switch is None:
            bad = "neither was provided"
        else:
            config = config_switch
    elif config_switch is not None:
        bad = "both were provided"
    if bad:
        parser.error(
            "❌ You must specify a configuration file either as a single "
            f"argument or with --config, but {bad}."
        )
    steps = options.steps
    root_dir = options.root_dir
    deriv_root = options.deriv_root
    subject, session = options.subject, options.session
    task, run = options.task, options.run
    n_jobs = options.n_jobs
    interactive, debug = options.interactive, options.debug
    cache = not options.no_cache

    if isinstance(steps, str) and "," in steps:
        # Work around limitation in Fire: --steps=foo,bar/baz won't produce a
        # tuple ('foo', 'bar/baz'), but a string 'foo,bar/baz'.
        steps = tuple(steps.split(","))
    elif isinstance(steps, str):
        steps = (steps,)

    on_error = "debug" if debug else None
    cache = "1" if cache else "0"

    processing_stages = []
    processing_steps = []
    for steps_ in steps:
        if "/" in steps_:
            stage, step = steps_.split("/")
            processing_stages.append(stage)
            processing_steps.append(step)
        else:
            # User specified "sensor", "preprocessing" or similar, but without
            # any further grouping.
            processing_stages.append(steps_)
            processing_steps.append(None)

    config_path = pathlib.Path(config).expanduser().resolve(strict=True)
    overrides = SimpleNamespace()
    if root_dir:
        overrides.bids_root = pathlib.Path(root_dir).expanduser().resolve(strict=True)
    if deriv_root:
        overrides.deriv_root = (
            pathlib.Path(deriv_root).expanduser().resolve(strict=False)
        )
    if subject:
        overrides.subjects = [subject]
    if session:
        overrides.sessions = [session]
    if task:
        overrides.task = task
    if run:
        overrides.runs = run
    if interactive:
        overrides.interactive = interactive
    if n_jobs:
        overrides.n_jobs = int(n_jobs)
    if on_error:
        overrides.on_error = on_error
    if not cache:
        overrides.memory_location = False

    step_modules: List[ModuleType] = []
    STEP_MODULES = _get_step_modules()
    for stage, step in zip(processing_stages, processing_steps):
        if stage not in STEP_MODULES.keys():
            raise ValueError(
                f"Invalid step requested: '{stage}'. "
                f"It should be one of {list(STEP_MODULES.keys())}."
            )

        if step is None:
            # User specified `sensors`, `source`, or similar
            step_modules.extend(STEP_MODULES[stage])
        else:
            # User specified 'stage/step'
            for step_module in STEP_MODULES[stage]:
                step_name = pathlib.Path(step_module.__file__).name
                if step in step_name:
                    step_modules.append(step_module)
                    break
            else:
                # We've iterated over all steps, but none matched!
                raise ValueError(f"Invalid steps requested: {stage}/{step}")

    if processing_stages[0] != "all":
        # Always run the directory initialization steps, but skip for 'all',
        # because it already includes them – and we want to avoid running
        # them twice.
        step_modules = [*STEP_MODULES["init"], *step_modules]

    logger.title("Welcome aboard MNE-BIDS-Pipeline! 👋")
    msg = f"Using configuration: {config}"
    __mne_bids_pipeline_step__ = pathlib.Path(__file__)  # used for logging
    logger.info(**gen_log_kwargs(message=msg, emoji="📝"))
    config_imported = _import_config(
        config_path=config_path,
        overrides=overrides,
    )
    # Initialize dask now
    with get_parallel_backend(config_imported.exec_params):
        pass
    del __mne_bids_pipeline_step__
    logger.end()

    for step_module in step_modules:
        start = time.time()
        step = _short_step_path(pathlib.Path(step_module.__file__))
        logger.title(title=f"{step}")
        step_module.main(config=config_imported)
        elapsed = time.time() - start
        hours, remainder = divmod(elapsed, 3600)
        hours = int(hours)
        minutes, seconds = divmod(remainder, 60)
        minutes = int(minutes)
        seconds = int(np.ceil(seconds))  # always take full seconds
        elapsed = f"{seconds}s"
        if minutes:
            elapsed = f"{minutes}m {elapsed}"
        if hours:
            elapsed = f"{hours}h {elapsed}"
        logger.end(f"done ({elapsed})")