Skip to content

pipeline

pacsanini.pipeline.orchestra #

The orchestra pipeline provides access to an all inclusive method

for finding, moving, and parsing DICOM resources.

run_pacsanini_pipeline(p_config, nb_threads=1, init_db=False) #

Run a DICOM collection and structuring flow. Overall, this will

take care of: 1. Loading the flow's configuration 2. (optional) If the results destination is a database and needs to be created, create the database and its tables. 2. Finding all DICOM resources to move 3. Moving all found DICOM resources 4. (optional) If the results destination is not a database, DICOM file metadata will be parsed in CSV format.

Parameters:

Name Type Description Default
p_config Union[pacsanini.config.PacsaniniConfig, str]

The configuration file to use for the pipeline's execution.

required
nb_threads int

The number of threads to use for parsing DICOM resources. This is only used if the results backend is not a database. The default is 1.

1
init_db bool

If the results backend is a database and init_db is True, the database and its tables will be created. The default is False.

False

Returns:

Type Description
State

The flow's end state.

Source code in pacsanini/pipeline/orchestra.py
def run_pacsanini_pipeline(
    p_config: Union[PacsaniniConfig, str], nb_threads: int = 1, init_db: bool = False
) -> State:
    """Run a DICOM collection and structuring flow. Overall, this will
    take care of:
        1. Loading the flow's configuration
        2. (optional) If the results destination is a database and
           needs to be created, create the database and its tables.
        2. Finding all DICOM resources to move
        3. Moving all found DICOM resources
        4. (optional) If the results destination is not a database,
           DICOM file metadata will be parsed in CSV format.

    Parameters
    ----------
    p_config : Union[PacsaniniConfig, str]
        The configuration file to use for the pipeline's execution.
    nb_threads : int
        The number of threads to use for parsing DICOM resources. This
        is only used if the results backend is not a database. The default
        is 1.
    init_db : bool
        If the results backend is a database and init_db is True, the
        database and its tables will be created. The default is False.

    Returns
    -------
    State
        The flow's end state.
    """
    if isinstance(p_config, str):
        ext = p_config.rsplit(".", 1)[-1].lower()
        load_func = (
            PacsaniniConfig.from_json if ext == "json" else PacsaniniConfig.from_yaml
        )
        config_ = load_func(p_config)
    else:
        config_ = p_config

    if not config_.can_find():
        raise InvalidConfigError("Missing find configuration.")
    if not config_.can_move():
        raise InvalidConfigError("Missing move configuration.")
    if not config_.can_parse() and not is_db_uri(config_.storage.resources):
        raise InvalidConfigError("Missing parse configuration.")

    context["pacsanini_config"] = config_

    with Flow("My First Flow") as flow:
        config_path_param = Parameter("config_path_param")
        nb_threads_param = Parameter("nb_threads_param", default=1)
        init_db_param = Parameter("init_db_param", default=False)

        config = load_configuration(config_path_param)

        needs_creating = check_if_database_creation_needed(config, init_db_param)
        with case(needs_creating, True):  # type: ignore
            creation_task = create_database_and_tables(config)
            flow.add_task(creation_task)

        find_dicom_resources.skip_on_upstream_skip = False
        find_task = find_dicom_resources(config, upstream_tasks=[creation_task])
        move_taks = move_dicom_resources(config, upstream_tasks=[find_task])

        needs_parsing = check_if_parsing_needed(config, upstream_tasks=[move_taks])
        with case(needs_parsing, True):  # type: ignore
            parse_task = parse_dicom_resources(config, nb_threads_param)
            flow.add_task(parse_task)

    return flow.run(
        config_path_param=p_config, nb_threads_param=nb_threads, init_db_param=init_db
    )

pacsanini.pipeline.tasks #

The tasks module defines individual tasks that can be reused in different

pipelines/flows.

check_if_database_creation_needed(config, create_db) #

Check if a database needs to be created.

Source code in pacsanini/pipeline/tasks.py
@task
def check_if_database_creation_needed(config: PacsaniniConfig, create_db: bool):
    """Check if a database needs to be created."""
    return create_db and is_db_uri(config.storage.resources)

check_if_parsing_needed(config) #

Check whether the results backend is a database or not.

Returns True if the backend is not a database.

Source code in pacsanini/pipeline/tasks.py
@task
def check_if_parsing_needed(config: PacsaniniConfig):
    """Check whether the results backend is a database or not.
    Returns True if the backend is not a database.
    """
    return not is_db_uri(config.storage.resources)

create_database_and_tables(config) #

Create the pacsanini database before anything is done.

Source code in pacsanini/pipeline/tasks.py
@task
def create_database_and_tables(config: PacsaniniConfig):
    """Create the pacsanini database before anything is done."""
    initialize_database(config)

find_dicom_resources(config) #

Find DICOM resources and store results in the specified

destination.

Source code in pacsanini/pipeline/tasks.py
@task(**NETWORK_TASK_PARAMS, state_handlers=[find_email_notifier])
def find_dicom_resources(config: PacsaniniConfig):
    """Find DICOM resources and store results in the specified
    destination.
    """
    output = config.storage.resources
    query_level = config.find.query_level
    patient_query = query_level == QueryLevel.PATIENT

    args = (config.net.local_node, config.net.called_node, output)
    kwargs = dict(
        dicom_fields=config.find.search_fields,
        start_date=config.find.start_date,
        end_date=config.find.end_date,
        modality=config.find.modality,
    )

    # type: ignore
    if is_db_uri(output):
        del kwargs["dicom_fields"]
        find_func = (  # type: ignore
            net.patient_find2sql if patient_query else net.study_find2sql
        )
    else:
        find_func = (
            net.patient_find2csv  # type: ignore
            if patient_query
            else net.study_find2csv  # type: ignore
        )
    find_func(*args, **kwargs)  # type: ignore

load_configuration(config_path) #

Load the pipeline/flow configuration.

Source code in pacsanini/pipeline/tasks.py
@task
def load_configuration(config_path: str) -> PacsaniniConfig:
    """Load the pipeline/flow configuration."""
    if isinstance(config_path, PacsaniniConfig):
        return config_path
    ext = config_path.lower().rsplit(".", 1)[-1]
    if ext == "json":
        config = PacsaniniConfig.from_json(config_path)
    else:
        config = PacsaniniConfig.from_yaml(config_path)
    return config

move_dicom_resources(config) #

Move DICOM resources that have been previously

retrieved -possibly by the find_dicom_resources task.

Source code in pacsanini/pipeline/tasks.py
@task(**NETWORK_TASK_PARAMS, state_handlers=[move_email_notifier])
def move_dicom_resources(config: PacsaniniConfig):
    """Move DICOM resources that have been previously
    retrieved -possibly by the find_dicom_resources task.
    """
    input_src = config.storage.resources
    query_level = config.move.query_level
    patient_level = query_level == QueryLevel.PATIENT

    args = (config.net.local_node, config.net.called_node)
    kwargs = dict(
        dest_node=config.net.dest_node,
        directory=config.storage.directory,
        sort_by=config.storage.sort_by,
        start_time=config.move.start_time,
        end_time=config.move.end_time,
    )

    if is_db_uri(input_src):
        with db.get_db_session(input_src) as db_session:
            kwargs["db_session"] = db_session
            resources = db.get_study_uids_to_move(db_session)
            kwargs["study_uids"] = resources
            list(net.move_studies(*args, **kwargs))  # type: ignore
    else:
        resources = read_resources(input_src, query_level)
        if patient_level:
            kwargs["patient_ids"] = resources
            move_func = net.move_patients
        else:
            kwargs["study_uids"] = resources
            move_func = net.move_studies  # type: ignore
        list(move_func(*args, **kwargs))  # type: ignore

parse_dicom_resources(config, nb_threads) #

Parse DICOM resources to CSV format.

Source code in pacsanini/pipeline/tasks.py
@task
def parse_dicom_resources(config: PacsaniniConfig, nb_threads: int):
    """Parse DICOM resources to CSV format."""
    io.parse_dir2csv(
        config.storage.directory,
        config.get_tags(),
        config.storage.resources_meta,
        nb_threads=nb_threads,
    )