Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor storage #381

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor stream handling to use threading instead of multiprocessing …
…for improved control and responsiveness
  • Loading branch information
timonmerk committed Nov 19, 2024
commit efe54ec63c09cdfaf2ccf4cbc4956bc3935b9bc6
5 changes: 1 addition & 4 deletions py_neuromodulation/gui/backend/app_backend.py
Original file line number Diff line number Diff line change
@@ -114,10 +114,7 @@ async def handle_stream_control(data: dict):
if action == "stop":
self.logger.info("Stopping stream")
self.pynm_state.stream_handling_queue.put("stop")

# will this be enough time for stopping?
self.pynm_state.stream_controller_process.terminate()
self.pynm_state.run_func_process.terminate()
self.pynm_state.stop_event_ws.set()

return {"message": f"Stream action '{action}' executed"}

38 changes: 23 additions & 15 deletions py_neuromodulation/gui/backend/app_pynm.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
import asyncio
import logging
import threading
import numpy as np
import multiprocess as mp

import multiprocessing as mp
from threading import Thread
from queue import Queue
from py_neuromodulation.stream import Stream, NMSettings
from py_neuromodulation.utils import set_channels
from py_neuromodulation.utils.io import read_mne_data
from py_neuromodulation import logger

async def run_stream_controller(feature_queue: asyncio.Queue, rawdata_queue: asyncio.Queue,
websocket_manager_features: "WebSocketManager"):
while True:
await asyncio.sleep(0.002)
logger.info("wait for feature queue")
websocket_manager_features: "WebSocketManager", stop_event: threading.Event):
while not stop_event.wait(0.002):
#await asyncio.sleep(0.002)
if not feature_queue.empty() and websocket_manager_features is not None:
feature_dict = feature_queue.get()
logger.info("Sending message to Websocket")
await websocket_manager_features.send_cbor(feature_dict)
# here the rawdata queue could also be used to send raw data, potentiall through different websocket?

def run_stream_controller_sync(feature_queue, rawdata_queue, websocket_manager_features):
asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager_features))
def run_stream_controller_sync(feature_queue, rawdata_queue, websocket_manager_features, stop_event: threading.Event):
asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager_features, stop_event))

class PyNMState:
def __init__(
@@ -48,22 +49,29 @@ def start_run_function(

self.stream.settings = self.settings

self.stream_handling_queue = mp.Queue()
self.feature_queue = mp.Queue()
self.rawdata_queue = mp.Queue()
self.stream_handling_queue = Queue()
self.feature_queue = Queue()
self.rawdata_queue = Queue()

self.logger.info("Starting run controller function")
self.stream_controller_process = mp.Process(
self.stop_event_ws = threading.Event()
self.stream_controller_process = Thread(
target=run_stream_controller_sync,
args=(self.feature_queue, self.rawdata_queue, websocket_manager_features),
daemon=True,
args=(self.feature_queue,
self.rawdata_queue,
websocket_manager_features,
self.stop_event_ws
),
)

# this function also has to be in a process
is_stream_lsl = self.lsl_stream_name is not None
stream_lsl_name = self.lsl_stream_name if self.lsl_stream_name is not None else ""

self.run_func_process = mp.Process(
self.run_func_process = Thread(
target=self.stream.run,
daemon=True,
kwargs={
"out_dir" : out_dir,
"experiment_name" : experiment_name,
@@ -75,7 +83,7 @@ def start_run_function(
},
)

#self.stream_controller_process.start()
self.stream_controller_process.start()
self.run_func_process.start()

def setup_lsl_stream(
Loading