pipeline
pacsanini.pipeline.orchestra
#
The orchestra pipeline provides access to an all inclusive method
for finding, moving, and parsing DICOM resources.
run_pacsanini_pipeline(p_config, nb_threads=1, init_db=False)
#
Run a DICOM collection and structuring flow. Overall, this will
take care of: 1. Loading the flow's configuration 2. (optional) If the results destination is a database and needs to be created, create the database and its tables. 2. Finding all DICOM resources to move 3. Moving all found DICOM resources 4. (optional) If the results destination is not a database, DICOM file metadata will be parsed in CSV format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
p_config |
Union[pacsanini.config.PacsaniniConfig, str] |
The configuration file to use for the pipeline's execution. |
required |
nb_threads |
int |
The number of threads to use for parsing DICOM resources. This is only used if the results backend is not a database. The default is 1. |
1 |
init_db |
bool |
If the results backend is a database and init_db is True, the database and its tables will be created. The default is False. |
False |
Returns:
| Type | Description |
|---|---|
State |
The flow's end state. |
Source code in pacsanini/pipeline/orchestra.py
def run_pacsanini_pipeline(
p_config: Union[PacsaniniConfig, str], nb_threads: int = 1, init_db: bool = False
) -> State:
"""Run a DICOM collection and structuring flow. Overall, this will
take care of:
1. Loading the flow's configuration
2. (optional) If the results destination is a database and
needs to be created, create the database and its tables.
2. Finding all DICOM resources to move
3. Moving all found DICOM resources
4. (optional) If the results destination is not a database,
DICOM file metadata will be parsed in CSV format.
Parameters
----------
p_config : Union[PacsaniniConfig, str]
The configuration file to use for the pipeline's execution.
nb_threads : int
The number of threads to use for parsing DICOM resources. This
is only used if the results backend is not a database. The default
is 1.
init_db : bool
If the results backend is a database and init_db is True, the
database and its tables will be created. The default is False.
Returns
-------
State
The flow's end state.
"""
if isinstance(p_config, str):
ext = p_config.rsplit(".", 1)[-1].lower()
load_func = (
PacsaniniConfig.from_json if ext == "json" else PacsaniniConfig.from_yaml
)
config_ = load_func(p_config)
else:
config_ = p_config
if not config_.can_find():
raise InvalidConfigError("Missing find configuration.")
if not config_.can_move():
raise InvalidConfigError("Missing move configuration.")
if not config_.can_parse() and not is_db_uri(config_.storage.resources):
raise InvalidConfigError("Missing parse configuration.")
context["pacsanini_config"] = config_
with Flow("My First Flow") as flow:
config_path_param = Parameter("config_path_param")
nb_threads_param = Parameter("nb_threads_param", default=1)
init_db_param = Parameter("init_db_param", default=False)
config = load_configuration(config_path_param)
needs_creating = check_if_database_creation_needed(config, init_db_param)
with case(needs_creating, True): # type: ignore
creation_task = create_database_and_tables(config)
flow.add_task(creation_task)
find_dicom_resources.skip_on_upstream_skip = False
find_task = find_dicom_resources(config, upstream_tasks=[creation_task])
move_taks = move_dicom_resources(config, upstream_tasks=[find_task])
needs_parsing = check_if_parsing_needed(config, upstream_tasks=[move_taks])
with case(needs_parsing, True): # type: ignore
parse_task = parse_dicom_resources(config, nb_threads_param)
flow.add_task(parse_task)
return flow.run(
config_path_param=p_config, nb_threads_param=nb_threads, init_db_param=init_db
)
pacsanini.pipeline.tasks
#
The tasks module defines individual tasks that can be reused in different
pipelines/flows.
check_if_database_creation_needed(config, create_db)
#
Check if a database needs to be created.
Source code in pacsanini/pipeline/tasks.py
@task
def check_if_database_creation_needed(config: PacsaniniConfig, create_db: bool):
"""Check if a database needs to be created."""
return create_db and is_db_uri(config.storage.resources)
check_if_parsing_needed(config)
#
Check whether the results backend is a database or not.
Returns True if the backend is not a database.
Source code in pacsanini/pipeline/tasks.py
@task
def check_if_parsing_needed(config: PacsaniniConfig):
"""Check whether the results backend is a database or not.
Returns True if the backend is not a database.
"""
return not is_db_uri(config.storage.resources)
create_database_and_tables(config)
#
Create the pacsanini database before anything is done.
Source code in pacsanini/pipeline/tasks.py
@task
def create_database_and_tables(config: PacsaniniConfig):
"""Create the pacsanini database before anything is done."""
initialize_database(config)
find_dicom_resources(config)
#
Find DICOM resources and store results in the specified
destination.
Source code in pacsanini/pipeline/tasks.py
@task(**NETWORK_TASK_PARAMS, state_handlers=[find_email_notifier])
def find_dicom_resources(config: PacsaniniConfig):
"""Find DICOM resources and store results in the specified
destination.
"""
output = config.storage.resources
query_level = config.find.query_level
patient_query = query_level == QueryLevel.PATIENT
args = (config.net.local_node, config.net.called_node, output)
kwargs = dict(
dicom_fields=config.find.search_fields,
start_date=config.find.start_date,
end_date=config.find.end_date,
modality=config.find.modality,
)
# type: ignore
if is_db_uri(output):
del kwargs["dicom_fields"]
find_func = ( # type: ignore
net.patient_find2sql if patient_query else net.study_find2sql
)
else:
find_func = (
net.patient_find2csv # type: ignore
if patient_query
else net.study_find2csv # type: ignore
)
find_func(*args, **kwargs) # type: ignore
load_configuration(config_path)
#
Load the pipeline/flow configuration.
Source code in pacsanini/pipeline/tasks.py
@task
def load_configuration(config_path: str) -> PacsaniniConfig:
"""Load the pipeline/flow configuration."""
if isinstance(config_path, PacsaniniConfig):
return config_path
ext = config_path.lower().rsplit(".", 1)[-1]
if ext == "json":
config = PacsaniniConfig.from_json(config_path)
else:
config = PacsaniniConfig.from_yaml(config_path)
return config
move_dicom_resources(config)
#
Move DICOM resources that have been previously
retrieved -possibly by the find_dicom_resources task.
Source code in pacsanini/pipeline/tasks.py
@task(**NETWORK_TASK_PARAMS, state_handlers=[move_email_notifier])
def move_dicom_resources(config: PacsaniniConfig):
"""Move DICOM resources that have been previously
retrieved -possibly by the find_dicom_resources task.
"""
input_src = config.storage.resources
query_level = config.move.query_level
patient_level = query_level == QueryLevel.PATIENT
args = (config.net.local_node, config.net.called_node)
kwargs = dict(
dest_node=config.net.dest_node,
directory=config.storage.directory,
sort_by=config.storage.sort_by,
start_time=config.move.start_time,
end_time=config.move.end_time,
)
if is_db_uri(input_src):
with db.get_db_session(input_src) as db_session:
kwargs["db_session"] = db_session
resources = db.get_study_uids_to_move(db_session)
kwargs["study_uids"] = resources
list(net.move_studies(*args, **kwargs)) # type: ignore
else:
resources = read_resources(input_src, query_level)
if patient_level:
kwargs["patient_ids"] = resources
move_func = net.move_patients
else:
kwargs["study_uids"] = resources
move_func = net.move_studies # type: ignore
list(move_func(*args, **kwargs)) # type: ignore
parse_dicom_resources(config, nb_threads)
#
Parse DICOM resources to CSV format.
Source code in pacsanini/pipeline/tasks.py
@task
def parse_dicom_resources(config: PacsaniniConfig, nb_threads: int):
"""Parse DICOM resources to CSV format."""
io.parse_dir2csv(
config.storage.directory,
config.get_tags(),
config.storage.resources_meta,
nb_threads=nb_threads,
)