Skip to content

Commit

Permalink
Merge pull request #162 from Keyn34/main
Browse files Browse the repository at this point in the history
  • Loading branch information
LalithShiyam authored Dec 9, 2024
2 parents 6ef1ee4 + dfaf3df commit a18fd9d
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 71 deletions.
7 changes: 4 additions & 3 deletions moosez/benchmarking/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import os
import numpy
import threading
from typing import Union, Tuple, List, Dict


class PerformanceObserver:
def __init__(self, image: str | None = None, model: str | None = None, polling_rate: float = 0.1):
def __init__(self, image: Union[str, None] = None, model: Union[str, None] = None, polling_rate: float = 0.1):
self.monitoring = False
self.polling_rate = polling_rate
self.monitoring_thread = None
Expand Down Expand Up @@ -48,7 +49,7 @@ def __get_memory_usage_of_process_tree(self):
continue
return memory_usage / (1024 * 1024) # Convert to MB

def __monitor_memory_usage(self, interval):
def __monitor_memory_usage(self, interval: float):
while self.monitoring:
current_time = time.time() - self.monitoring_start_time
current_memory_MB = self.__get_memory_usage_of_process_tree()
Expand Down Expand Up @@ -114,7 +115,7 @@ def plot_performance(self, path: str):
plt.savefig(os.path.join(path, f'performance_plot.png'))
plt.close()

def get_peak_resources(self) -> list:
def get_peak_resources(self) -> List:
image_name = os.path.basename(self.metadata_image)
model_name = self.metadata_model
if self.metadata_image_size is not None and isinstance(self.metadata_image_size, (list, tuple)):
Expand Down
3 changes: 2 additions & 1 deletion moosez/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import os
from pathlib import Path
import requests
from typing import Union
from moosez import constants
from moosez import system


def download_enhance_data(download_directory: str | None, output_manager: system.OutputManager):
def download_enhance_data(download_directory: Union[str, None], output_manager: system.OutputManager):

output_manager.log_update(f" - Downloading ENHANCE 1.6k data")
if not download_directory:
Expand Down
11 changes: 6 additions & 5 deletions moosez/file_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import shutil
from datetime import datetime
from multiprocessing import Pool
from typing import Union, Tuple, List
from moosez import constants


Expand All @@ -35,7 +36,7 @@ def create_directory(directory_path: str) -> None:
os.makedirs(directory_path)


def get_files(directory: str, prefix: str | tuple[str, ...], suffix: str | tuple[str, ...]) -> list[str]:
def get_files(directory: str, prefix: Union[str, Tuple[str, ...]], suffix: Union[str, Tuple[str, ...]]) -> List[str]:
"""
Returns the list of files in the directory with the specified wildcard.
Expand Down Expand Up @@ -65,7 +66,7 @@ def get_files(directory: str, prefix: str | tuple[str, ...], suffix: str | tuple
return files


def moose_folder_structure(parent_directory: str) -> tuple[str, str, str]:
def moose_folder_structure(parent_directory: str) -> Tuple[str, str, str]:
"""
Creates the moose folder structure.
Expand Down Expand Up @@ -98,7 +99,7 @@ def copy_file(file: str, destination: str) -> None:
shutil.copy(file, destination)


def copy_files_to_destination(files: list[str], destination: str) -> None:
def copy_files_to_destination(files: List[str], destination: str) -> None:
"""
Copies the files inside the list to the destination directory in a parallel fashion.
Expand All @@ -112,7 +113,7 @@ def copy_files_to_destination(files: list[str], destination: str) -> None:
pool.starmap(copy_file, [(file, destination) for file in files])


def select_files_by_modality(moose_compliant_subjects: list[str], modality_tag: str) -> list:
def select_files_by_modality(moose_compliant_subjects: List[str], modality_tag: str) -> List:
"""
Selects the files with the selected modality tag from the moose-compliant folders.
Expand All @@ -134,7 +135,7 @@ def select_files_by_modality(moose_compliant_subjects: list[str], modality_tag:
return selected_files


def find_pet_file(folder: str) -> str | None:
def find_pet_file(folder: str) -> Union[str, None]:
"""
Finds the PET file in the specified folder.
Expand Down
8 changes: 4 additions & 4 deletions moosez/image_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import re
import shutil
import unicodedata

import SimpleITK
import dicom2nifti
import pydicom
from typing import Union, Dict
from moosez import system


def non_nifti_to_nifti(input_path: str, output_manager: system.OutputManager, output_directory: str = None) -> None:
def non_nifti_to_nifti(input_path: str, output_manager: system.OutputManager, output_directory: Union[str, None] = None) -> None:
"""
Converts any image format known to ITK to NIFTI
Expand Down Expand Up @@ -170,7 +170,7 @@ def is_dicom_file(filename: str) -> bool:
return False


def create_dicom_lookup(dicom_dir: str) -> dict:
def create_dicom_lookup(dicom_dir: str) -> Dict:
"""
Create a lookup dictionary from DICOM files.
Expand Down Expand Up @@ -211,7 +211,7 @@ def create_dicom_lookup(dicom_dir: str) -> dict:
return dicom_info


def rename_nifti_files(nifti_dir: str, dicom_info: dict) -> None:
def rename_nifti_files(nifti_dir: str, dicom_info: Dict) -> None:
"""
Rename NIfTI files based on a lookup dictionary.
Expand Down
49 changes: 25 additions & 24 deletions moosez/image_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import scipy.ndimage as ndimage
import nibabel
import os
from typing import Union, Tuple, List, Dict
from moosez.constants import CHUNK_THRESHOLD_RESAMPLING, CHUNK_THRESHOLD_INFERRING
from moosez import models
from moosez import system
Expand Down Expand Up @@ -100,12 +101,12 @@ def get_shape_statistics(mask_image: SimpleITK.Image, model: models.Model, out_c
stats_df.to_csv(out_csv)


def limit_fov(image_array: np.array, segmentation_array: np.array, fov_label: list[int] | int, largest_component_only: bool = False):
def limit_fov(image_array: np.array, segmentation_array: np.array, fov_label: Union[List[int], int], largest_component_only: bool = False):

if largest_component_only:
segmentation_array = largest_connected_component(segmentation_array, fov_label)

if type(fov_label) is list:
if isinstance(fov_label, list):
z_indices = np.where((segmentation_array >= fov_label[0]) & (segmentation_array <= fov_label[1]))[0]
else:
z_indices = np.where(segmentation_array == fov_label)[0]
Expand All @@ -117,7 +118,7 @@ def limit_fov(image_array: np.array, segmentation_array: np.array, fov_label: li
return limited_fov_array, {"z_min": z_min, "z_max": z_max, "original_shape": image_array.shape}


def expand_segmentation_fov(limited_fov_segmentation_array: np.ndarray, original_fov_info: dict) -> np.ndarray:
def expand_segmentation_fov(limited_fov_segmentation_array: np.ndarray, original_fov_info: Dict) -> np.ndarray:
z_min = original_fov_info["z_min"]
z_max = original_fov_info["z_max"]
original_shape = original_fov_info["original_shape"]
Expand Down Expand Up @@ -179,13 +180,13 @@ def largest_connected_component(segmentation_array, intensities):

class ImageChunker:
@staticmethod
def __compute_interior_indices(axis_length: int, number_of_chunks: int) -> (list[int], list[int]):
def __compute_interior_indices(axis_length: int, number_of_chunks: int) -> Tuple[List[int], List[int]]:
start = [int(round(k * axis_length / number_of_chunks)) for k in range(number_of_chunks)]
end = [int(round((k + 1) * axis_length / number_of_chunks)) for k in range(number_of_chunks)]
return start, end

@staticmethod
def __chunk_array_with_overlap(array_shape: list[int] | tuple[int, ...], splits_per_dimension: list[int] | tuple[int, ...], overlap_per_dimension: list[int] | tuple[int, ...]) -> list[dict]:
def __chunk_array_with_overlap(array_shape: Union[List[int], Tuple[int, ...]], splits_per_dimension: Union[List[int], Tuple[int, ...]], overlap_per_dimension: Union[List[int], Tuple[int, ...]]) -> List[Dict]:
dims = array_shape
num_dims = len(array_shape)
starts_list = []
Expand Down Expand Up @@ -232,7 +233,7 @@ def __chunk_array_with_overlap(array_shape: list[int] | tuple[int, ...], splits_
return chunk_info

@staticmethod
def array_to_chunks(image_array: np.ndarray, splits_per_dimension: list[int] | tuple[int, ...], overlap_per_dimension: list[int] | tuple[int, ...]) -> (list[np.ndarray], list[dict]):
def array_to_chunks(image_array: np.ndarray, splits_per_dimension: Union[List[int], Tuple[int, ...]], overlap_per_dimension: Union[List[int], Tuple[int, ...]]) -> Tuple[List[np.ndarray], List[Dict]]:
chunk_info = ImageChunker.__chunk_array_with_overlap(image_array.shape, splits_per_dimension, overlap_per_dimension)
image_chunks = []
positions = []
Expand All @@ -248,7 +249,7 @@ def array_to_chunks(image_array: np.ndarray, splits_per_dimension: list[int] | t
return image_chunks, positions

@staticmethod
def chunks_to_array(image_chunks: list[np.ndarray], image_chunk_positions: dict, final_shape: list[int] | tuple[int, ...]) -> np.ndarray:
def chunks_to_array(image_chunks: List[np.ndarray], image_chunk_positions: List[Dict], final_shape: Union[List[int], Tuple[int, ...]]) -> np.ndarray:
final_arr = np.empty(final_shape, dtype=image_chunks[0].dtype)
for image_chunk, image_chunk_position in zip(image_chunks, image_chunk_positions):
interior_region = image_chunk[image_chunk_position['interior_slice']]
Expand All @@ -257,7 +258,7 @@ def chunks_to_array(image_chunks: list[np.ndarray], image_chunk_positions: dict,
return final_arr

@staticmethod
def determine_splits(image_array: np.ndarray) -> tuple:
def determine_splits(image_array: np.ndarray) -> Tuple:
image_shape = image_array.shape
splits = []
for axis in image_shape:
Expand Down Expand Up @@ -305,8 +306,8 @@ def chunk_along_axis(axis: int) -> int:
return split

@staticmethod
def resample_chunk_SimpleITK(image_chunk: da.array, input_spacing: tuple, interpolation_method: int,
output_spacing: tuple, output_size: tuple) -> da.array:
def resample_chunk_SimpleITK(image_chunk: da.array, input_spacing: Tuple, interpolation_method: int,
output_spacing: Tuple, output_size: Tuple) -> da.array:
"""
Resamples a dask array chunk.
Expand Down Expand Up @@ -336,8 +337,8 @@ def resample_chunk_SimpleITK(image_chunk: da.array, input_spacing: tuple, interp

@staticmethod
def resample_image_SimpleITK_DASK(sitk_image: SimpleITK.Image, interpolation: str,
output_spacing: tuple = (1.5, 1.5, 1.5),
output_size: tuple = None) -> SimpleITK.Image:
output_spacing: Tuple[float, float, float] = (1.5, 1.5, 1.5),
output_size: Union[Tuple, None] = None) -> SimpleITK.Image:
"""
Resamples a sitk_image using Dask and SimpleITK.
Expand Down Expand Up @@ -365,7 +366,7 @@ def resample_image_SimpleITK_DASK(sitk_image: SimpleITK.Image, interpolation: st

@staticmethod
def reslice_identity(reference_image: SimpleITK.Image, moving_image: SimpleITK.Image,
output_image_path: str = None, is_label_image: bool = False) -> SimpleITK.Image:
output_image_path: Union[str, None] = None, is_label_image: bool = False) -> SimpleITK.Image:
"""
Reslices an image to the same space as another image.
Expand Down Expand Up @@ -396,8 +397,8 @@ def reslice_identity(reference_image: SimpleITK.Image, moving_image: SimpleITK.I

@staticmethod
def resample_image_SimpleITK_DASK_array(sitk_image: SimpleITK.Image, interpolation: str,
output_spacing: tuple = (1.5, 1.5, 1.5),
output_size: tuple = None) -> np.array:
output_spacing: Tuple[float, float, float] = (1.5, 1.5, 1.5),
output_size: Union[Tuple[float, float, float], None] = None) -> np.array:
if interpolation == 'nearest':
interpolation_method = SimpleITK.sitkNearestNeighbor
elif interpolation == 'linear':
Expand Down Expand Up @@ -436,13 +437,13 @@ def resample_segmentation(reference_image: SimpleITK.Image, segmentation_image:
return resampled_sitk_image


def determine_orientation_code(image: nibabel.Nifti1Image) -> [tuple | list, str]:
def determine_orientation_code(image: nibabel.Nifti1Image) -> Tuple[Union[Tuple, List], str]:
affine = image.affine
orthonormal_orientation = nibabel.orientations.aff2axcodes(affine)
return orthonormal_orientation, ''.join(orthonormal_orientation)


def confirm_orthonormality(image: nibabel.Nifti1Image) -> tuple[nibabel.Nifti1Image, bool]:
def confirm_orthonormality(image: nibabel.Nifti1Image) -> Tuple[nibabel.Nifti1Image, bool]:
data = image.get_fdata()
affine = image.affine
header = image.header
Expand Down Expand Up @@ -473,7 +474,7 @@ def confirm_orthonormality(image: nibabel.Nifti1Image) -> tuple[nibabel.Nifti1Im
return image, orthonormalized


def confirm_orientation(image: nibabel.Nifti1Image) -> tuple[nibabel.Nifti1Image, bool]:
def confirm_orientation(image: nibabel.Nifti1Image) -> Tuple[nibabel.Nifti1Image, bool]:
data = image.get_fdata()
affine = image.affine
header = image.header
Expand Down Expand Up @@ -520,27 +521,27 @@ def convert_to_sitk(image: nibabel.Nifti1Image) -> SimpleITK.Image:
return sitk_image


def standardize_image(image_path: str, output_manager: system.OutputManager, standardization_output_path: str | None) -> SimpleITK.Image:
def standardize_image(image_path: str, output_manager: system.OutputManager, standardization_output_path: Union[str, None]) -> SimpleITK.Image:
image = nibabel.load(image_path)
_, original_orientation = determine_orientation_code(image)
output_manager.log_update(f" Image loaded. Orientation: {original_orientation}")
output_manager.log_update(f" - Image loaded. Orientation: {original_orientation}")

image, orthonormalized = confirm_orthonormality(image)
if orthonormalized:
_, orthonormal_orientation = determine_orientation_code(image)
output_manager.log_update(f" Image orthonormalized. Orientation: {orthonormal_orientation}")
output_manager.log_update(f" - Image orthonormalized. Orientation: {orthonormal_orientation}")
image, reoriented = confirm_orientation(image)
if reoriented:
_, reoriented_orientation = determine_orientation_code(image)
output_manager.log_update(f" Image reoriented. Orientation: {reoriented_orientation}")
output_manager.log_update(f" - Image reoriented. Orientation: {reoriented_orientation}")
sitk_image = convert_to_sitk(image)
output_manager.log_update(f" Image converted to SimpleITK.")
output_manager.log_update(f" - Image converted to SimpleITK.")

processing_steps = [orthonormalized, reoriented]
prefixes = ["orthonormal", "reoriented"]

if standardization_output_path is not None and any(processing_steps):
output_manager.log_update(f" Writing standardized image.")
output_manager.log_update(f" - Writing standardized image.")
prefix = "_".join([prefix for processing_step, prefix in zip(processing_steps, prefixes) if processing_step])
output_path = os.path.join(standardization_output_path, f"{prefix}_{os.path.basename(image_path)}")
SimpleITK.WriteImage(sitk_image, output_path)
Expand Down
7 changes: 4 additions & 3 deletions moosez/input_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@
# ----------------------------------------------------------------------------------------------------------------------

import os
from typing import Tuple, List, Dict
from moosez import constants
from moosez import models
from moosez import system


def determine_model_expectations(model_routine: dict[tuple, list[models.ModelWorkflow]], output_manager: system.OutputManager) -> list:
def determine_model_expectations(model_routine: Dict[Tuple, List[models.ModelWorkflow]], output_manager: system.OutputManager) -> List:
"""
Display expected modality for the model.
This function displays the expected modality for the given model name. It also checks for a special case where
'FDG-PET-CT' should be split into 'FDG-PET' and 'CT'.
:param model_routine: The model routine
:type model_routine: dict[tuple, list[models.ModelWorkflow]]
:type model_routine: Dict[Tuple, List[models.ModelWorkflow]]
:param output_manager: The output manager
:type output_manager: system.OutputManager
:return: A list of modalities.
Expand Down Expand Up @@ -75,7 +76,7 @@ def determine_model_expectations(model_routine: dict[tuple, list[models.ModelWor
return required_modalities


def select_moose_compliant_subjects(subject_paths: list[str], modality_tags: list[str], output_manager: system.OutputManager) -> list[str]:
def select_moose_compliant_subjects(subject_paths: List[str], modality_tags: List[str], output_manager: system.OutputManager) -> List[str]:
"""
Selects the subjects that have the files that have names that are compliant with the moosez.
Expand Down
Loading

0 comments on commit a18fd9d

Please sign in to comment.