From 75feba8c5695806dd99731f1019af16d83bfe0ec Mon Sep 17 00:00:00 2001 From: toni-neurosc <10654467+toni-neurosc@users.noreply.github.com> Date: Fri, 12 Jan 2024 00:30:24 +0100 Subject: [PATCH 1/6] Add parallel batch processing --- py_neuromodulation/nm_stream_offline.py | 51 ++++++++++++++++--------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/py_neuromodulation/nm_stream_offline.py b/py_neuromodulation/nm_stream_offline.py index fc98a1a0..40251ede 100644 --- a/py_neuromodulation/nm_stream_offline.py +++ b/py_neuromodulation/nm_stream_offline.py @@ -2,6 +2,9 @@ import math import os +import multiprocessing as mp +from itertools import count + import numpy as np import pandas as pd @@ -83,42 +86,50 @@ def _handle_data(self, data: np.ndarray | pd.DataFrame) -> np.ndarray: ) return data.to_numpy() + def _process_batch(self, data_batch, cnt_samples): + feature_series = self.run_analysis.process( + data_batch.astype(np.float64) + ) + feature_series = self._add_timestamp(feature_series, cnt_samples) + return feature_series + def _run_offline( self, data: np.ndarray, out_path_root: _PathLike | None = None, folder_name: str = "sub", + parallel: bool = True, + num_threads = None ) -> pd.DataFrame: generator = nm_generator.raw_data_generator( data=data, settings=self.settings, sfreq=self.sfreq, ) - features = [] + sample_add = self.sfreq / self.run_analysis.sfreq_features offset_time = self.settings["segment_length_features_ms"] # offset_start = np.ceil(offset_time / 1000 * self.sfreq).astype(int) offset_start = offset_time / 1000 * self.sfreq - cnt_samples = offset_start - - while True: - data_batch = next(generator, None) - if data_batch is None: - break - feature_series = self.run_analysis.process( - data_batch.astype(np.float64) - ) - feature_series = self._add_timestamp(feature_series, cnt_samples) - features.append(feature_series) - - if self.model is not None: - prediction = self.model.predict(feature_series) - - cnt_samples += sample_add + if parallel: + try: mp.set_start_method('fork') # Set process start method. 'spawn' and 'forkserver' do not work + except RuntimeError: pass # mp.set_start_method() will crash the program if called more than once + pool = mp.Pool(processes=num_threads) # Create sub-process pool. Faster than concurrent.futures.ProcessPoolExecutor() + # Assign tasks to sub-processes, starmap is same as map, only for 2+ arguments that must be zipped + feature_df = pd.DataFrame(pool.starmap(self._process_batch, zip(generator, count(offset_start, sample_add)))) + # Prevent memory leaks by releasing process pool resources + pool.close() + pool.join() + else: + # If no parallelization required, is faster to not use a process pool at all + feature_df = pd.DataFrame(map(self._process_batch, generator, count(offset_start, sample_add))) + + # I don't know what this does :( + # if self.model is not None: + # prediction = self.model.predict(features[-1]) - feature_df = pd.DataFrame(features) feature_df = self._add_labels(features=feature_df, data=data) self.save_after_stream(out_path_root, folder_name, feature_df) @@ -253,6 +264,8 @@ def run( data: np.ndarray | pd.DataFrame = None, out_path_root: _PathLike | None = None, folder_name: str = "sub", + parallel: bool = True, + num_threads = None ) -> pd.DataFrame: """Call run function for offline stream. @@ -281,4 +294,4 @@ def run( elif self.data is None and data is None: raise ValueError("No data passed to run function.") - return self._run_offline(data, out_path_root, folder_name) + return self._run_offline(data, out_path_root, folder_name, parallel=parallel, num_threads=num_threads) From 460b9867390a86f99d0f683e5d2b5a419e6e8c88 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Thu, 18 Jan 2024 11:46:31 +0100 Subject: [PATCH 2/6] add joblib parallel feature computation --- py_neuromodulation/nm_stream_offline.py | 58 +++++++++++++++---------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/py_neuromodulation/nm_stream_offline.py b/py_neuromodulation/nm_stream_offline.py index 40251ede..5c582e08 100644 --- a/py_neuromodulation/nm_stream_offline.py +++ b/py_neuromodulation/nm_stream_offline.py @@ -1,8 +1,7 @@ """Module for offline data streams.""" -import math import os -import multiprocessing as mp +from joblib import Parallel, delayed from itertools import count import numpy as np @@ -92,14 +91,14 @@ def _process_batch(self, data_batch, cnt_samples): ) feature_series = self._add_timestamp(feature_series, cnt_samples) return feature_series - + def _run_offline( self, data: np.ndarray, out_path_root: _PathLike | None = None, folder_name: str = "sub", - parallel: bool = True, - num_threads = None + parallel: bool = False, + n_jobs: int = -2, ) -> pd.DataFrame: generator = nm_generator.raw_data_generator( data=data, @@ -114,21 +113,30 @@ def _run_offline( offset_start = offset_time / 1000 * self.sfreq if parallel: - try: mp.set_start_method('fork') # Set process start method. 'spawn' and 'forkserver' do not work - except RuntimeError: pass # mp.set_start_method() will crash the program if called more than once - pool = mp.Pool(processes=num_threads) # Create sub-process pool. Faster than concurrent.futures.ProcessPoolExecutor() - # Assign tasks to sub-processes, starmap is same as map, only for 2+ arguments that must be zipped - feature_df = pd.DataFrame(pool.starmap(self._process_batch, zip(generator, count(offset_start, sample_add)))) - # Prevent memory leaks by releasing process pool resources - pool.close() - pool.join() + l_features = Parallel(n_jobs=n_jobs, verbose=10)( + delayed(self._process_batch)(data_batch, cnt_samples) + for data_batch, cnt_samples in zip( + generator, count(offset_start, sample_add) + ) + ) + else: - # If no parallelization required, is faster to not use a process pool at all - feature_df = pd.DataFrame(map(self._process_batch, generator, count(offset_start, sample_add))) - - # I don't know what this does :( - # if self.model is not None: - # prediction = self.model.predict(features[-1]) + l_features = [] + cnt_samples = offset_start + while True: + data_batch = next(generator, None) + if data_batch is None: + break + feature_series = self.run_analysis.process( + data_batch.astype(np.float64) + ) + feature_series = self._add_timestamp( + feature_series, cnt_samples + ) + l_features.append(feature_series) + + cnt_samples += sample_add + feature_df = pd.DataFrame(l_features) feature_df = self._add_labels(features=feature_df, data=data) @@ -264,8 +272,8 @@ def run( data: np.ndarray | pd.DataFrame = None, out_path_root: _PathLike | None = None, folder_name: str = "sub", - parallel: bool = True, - num_threads = None + parallel: bool = False, + n_jobs: int = -2, ) -> pd.DataFrame: """Call run function for offline stream. @@ -294,4 +302,10 @@ def run( elif self.data is None and data is None: raise ValueError("No data passed to run function.") - return self._run_offline(data, out_path_root, folder_name, parallel=parallel, num_threads=num_threads) + return self._run_offline( + data, + out_path_root, + folder_name, + parallel=parallel, + n_jobs=n_jobs, + ) From bbe3cd24d2f6d9fe9744e70289b9c2546118114b Mon Sep 17 00:00:00 2001 From: toni-neurosc <10654467+toni-neurosc@users.noreply.github.com> Date: Sat, 20 Jan 2024 14:52:04 +0100 Subject: [PATCH 3/6] Add Joblib as multiprocessing backend for Windows/MacOS --- py_neuromodulation/nm_stream_offline.py | 31 +++++++++++++++---------- pyproject.toml | 3 ++- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/py_neuromodulation/nm_stream_offline.py b/py_neuromodulation/nm_stream_offline.py index 40251ede..20bb9c4e 100644 --- a/py_neuromodulation/nm_stream_offline.py +++ b/py_neuromodulation/nm_stream_offline.py @@ -1,9 +1,11 @@ """Module for offline data streams.""" import math import os +from platform import system as get_os_name import multiprocessing as mp from itertools import count +from joblib import Parallel, delayed import numpy as np import pandas as pd @@ -113,18 +115,23 @@ def _run_offline( # offset_start = np.ceil(offset_time / 1000 * self.sfreq).astype(int) offset_start = offset_time / 1000 * self.sfreq - if parallel: - try: mp.set_start_method('fork') # Set process start method. 'spawn' and 'forkserver' do not work - except RuntimeError: pass # mp.set_start_method() will crash the program if called more than once - pool = mp.Pool(processes=num_threads) # Create sub-process pool. Faster than concurrent.futures.ProcessPoolExecutor() - # Assign tasks to sub-processes, starmap is same as map, only for 2+ arguments that must be zipped - feature_df = pd.DataFrame(pool.starmap(self._process_batch, zip(generator, count(offset_start, sample_add)))) - # Prevent memory leaks by releasing process pool resources - pool.close() - pool.join() - else: - # If no parallelization required, is faster to not use a process pool at all - feature_df = pd.DataFrame(map(self._process_batch, generator, count(offset_start, sample_add))) + match parallel: + case True: + match get_os_name(): + case 'Linux': # Use standard multiprocessing module + try: mp.set_start_method('fork') # 'spawn' and 'forkserver' do not work + except RuntimeError: pass # mp.set_start_method() will crash the program if called more than once + pool = mp.Pool(processes=num_threads) # faster than concurrent.futures.ProcessPoolExecutor() + feature_df = pd.DataFrame(pool.starmap(self._process_batch, zip(generator, count(offset_start, sample_add)))) + pool.close() + pool.join() + case 'Windows' | 'Darwin': # Use Joblib + if num_threads is None: num_threads = -1 # use all cores + feature_df = pd.DataFrame(Parallel(n_jobs=num_threads, prefer='processes')( + delayed(self._process_batch)(batch, n) for batch, n in zip(generator, count(offset_start, sample_add)))) + case False: + # If no parallelization required, is faster to not use a process pool at all + feature_df = pd.DataFrame(map(self._process_batch, generator, count(offset_start, sample_add))) # I don't know what this does :( # if self.model is not None: diff --git a/pyproject.toml b/pyproject.toml index 132537a9..e31ac7e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,8 @@ dependencies = [ "seaborn >= 0.11", "notebook", "ipython", - "pybispectra>=1.1.0" + "pybispectra>=1.1.0", + "joblib>=1.3.2" ] [project.optional-dependencies] From ad5b82593160a01805ce50786ca30bcc19ecfeb2 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Sat, 27 Jan 2024 15:46:01 +0100 Subject: [PATCH 4/6] change coherence_objects from class to object attribute, otherwise multiprocessing with fail due to pickeling --- py_neuromodulation/nm_coherence.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/py_neuromodulation/nm_coherence.py b/py_neuromodulation/nm_coherence.py index 7b9ecb73..b713de33 100644 --- a/py_neuromodulation/nm_coherence.py +++ b/py_neuromodulation/nm_coherence.py @@ -111,7 +111,7 @@ def get_coh(self, features_compute, x, y): class NM_Coherence(nm_features_abc.Feature): - coherence_objects: Iterable[CoherenceObject] = [] + def __init__( self, settings: dict, ch_names: Iterable[str], sfreq: float @@ -119,6 +119,7 @@ def __init__( self.s = settings self.sfreq = sfreq self.ch_names = ch_names + self.coherence_objects: Iterable[CoherenceObject] = [] for idx_coh in range(len(self.s["coherence"]["channels"])): fband_names = self.s["coherence"]["frequency_bands"] From 6b3aa6376ef6be10253125741f35bee9db12b300 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Sat, 27 Jan 2024 15:50:09 +0100 Subject: [PATCH 5/6] adapt Exception message to new setting keyword --- py_neuromodulation/nm_mne_connectivity.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py_neuromodulation/nm_mne_connectivity.py b/py_neuromodulation/nm_mne_connectivity.py index b31d4625..a13e4fdc 100644 --- a/py_neuromodulation/nm_mne_connectivity.py +++ b/py_neuromodulation/nm_mne_connectivity.py @@ -59,7 +59,7 @@ def get_epoched_data( if epochs.events.shape[0] < 2: raise Exception( f"A minimum of 2 epochs is required for mne_connectivity," - f" got only {epochs.events.shape[0]}. Increase settings['segment_length']" + f" got only {epochs.events.shape[0]}. Increase settings['segment_length_features_ms']" ) return epochs From 0bd383590c7bb1c5ad4c5fbbf0f31887bbf8c818 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Sat, 27 Jan 2024 15:59:41 +0100 Subject: [PATCH 6/6] add joblib parallelization --- py_neuromodulation/nm_stream_offline.py | 91 ++++++++++++++++--------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/py_neuromodulation/nm_stream_offline.py b/py_neuromodulation/nm_stream_offline.py index 0cdc5002..b93368c2 100644 --- a/py_neuromodulation/nm_stream_offline.py +++ b/py_neuromodulation/nm_stream_offline.py @@ -1,15 +1,11 @@ """Module for offline data streams.""" -import math import os -from platform import system as get_os_name - -import multiprocessing as mp -from itertools import count from joblib import Parallel, delayed - import numpy as np import pandas as pd +from itertools import count + import mne from py_neuromodulation import ( @@ -72,7 +68,6 @@ def _add_timestamp( Due to normalization run_analysis needs to keep track of the counted samples. These are accessed here for time conversion. """ - timestamp = cnt_samples * 1000 / self.sfreq feature_series["time"] = cnt_samples * 1000 / self.sfreq if self.verbose: @@ -106,6 +101,27 @@ def _handle_data(self, data: np.ndarray | pd.DataFrame) -> np.ndarray: f"Data columns: {names_data}, nm_channels.name: {names_data}." ) return data.to_numpy() + + def _check_settings_for_parallel(self): + """Check specified settings and raise error if parallel processing is not possible. + + Raises: + ValueError: depending on the settings, parallel processing is not possible + """ + + if "raw_normalization" in self.settings["preprocessing"]: + raise ValueError( + "Parallel processing is not possible with raw_normalization normalization." + ) + if self.settings["postprocessing"]["feature_normalization"] is True: + raise ValueError( + "Parallel processing is not possible with feature normalization." + ) + if self.settings["features"]["bursts"] is True: + raise ValueError( + "Parallel processing is not possible with burst estimation." + ) + def _process_batch(self, data_batch, cnt_samples): feature_series = self.run_analysis.process( @@ -119,8 +135,8 @@ def _run_offline( data: np.ndarray, out_path_root: _PathLike | None = None, folder_name: str = "sub", - parallel: bool = True, - num_threads = None + parallel: bool = False, + n_jobs: int = -2, ) -> pd.DataFrame: generator = nm_generator.raw_data_generator( data=data, @@ -134,29 +150,33 @@ def _run_offline( # offset_start = np.ceil(offset_time / 1000 * self.sfreq).astype(int) offset_start = offset_time / 1000 * self.sfreq - match parallel: - case True: - match get_os_name(): - case 'Linux': # Use standard multiprocessing module - try: mp.set_start_method('fork') # 'spawn' and 'forkserver' do not work - except RuntimeError: pass # mp.set_start_method() will crash the program if called more than once - pool = mp.Pool(processes=num_threads) # faster than concurrent.futures.ProcessPoolExecutor() - feature_df = pd.DataFrame(pool.starmap(self._process_batch, zip(generator, count(offset_start, sample_add)))) - pool.close() - pool.join() - case 'Windows' | 'Darwin': # Use Joblib - if num_threads is None: num_threads = -1 # use all cores - feature_df = pd.DataFrame(Parallel(n_jobs=num_threads, prefer='processes')( - delayed(self._process_batch)(batch, n) for batch, n in zip(generator, count(offset_start, sample_add)))) - case False: - # If no parallelization required, is faster to not use a process pool at all - feature_df = pd.DataFrame(map(self._process_batch, generator, count(offset_start, sample_add))) - - # I don't know what this does :( - # if self.model is not None: - # prediction = self.model.predict(features[-1]) + if parallel: + l_features = Parallel(n_jobs=n_jobs, verbose=10)( + delayed(self._process_batch)(data_batch, cnt_samples) + for data_batch, cnt_samples in zip( + generator, count(offset_start, sample_add) + ) + ) - feature_df = self._add_labels(features=feature_df, data=data) + else: + l_features = [] + cnt_samples = offset_start + while True: + data_batch = next(generator, None) + if data_batch is None: + break + feature_series = self.run_analysis.process( + data_batch.astype(np.float64) + ) + feature_series = self._add_timestamp( + feature_series, cnt_samples + ) + l_features.append(feature_series) + + cnt_samples += sample_add + feature_df = pd.DataFrame(l_features) + + feature_df = self._add_target(feature_series=feature_df, data=data) self.save_after_stream(out_path_root, folder_name, feature_df) @@ -292,8 +312,8 @@ def run( data: np.ndarray | pd.DataFrame = None, out_path_root: _PathLike | None = None, folder_name: str = "sub", - parallel: bool = True, - num_threads = None + parallel: bool = False, + n_jobs: int = -2 ) -> pd.DataFrame: """Call run function for offline stream. @@ -321,5 +341,8 @@ def run( data = self._handle_data(self.data) elif self.data is None and data is None: raise ValueError("No data passed to run function.") + + if parallel is True: + self._check_settings_for_parallel() - return self._run_offline(data, out_path_root, folder_name, parallel=parallel, num_threads=num_threads) + return self._run_offline(data, out_path_root, folder_name, parallel=parallel, n_jobs=n_jobs)