From 5e95d771e139f0546619fa287f6dc7d5e6dacf23 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Tue, 30 Jan 2024 13:07:54 +0530 Subject: [PATCH] fix consumer not added to waiting list during heartbeat message in producer - self producer alive if heartbeat message received from producer in consumer --- packages/syft/src/syft/service/queue/zmq_queue.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/service/queue/zmq_queue.py b/packages/syft/src/syft/service/queue/zmq_queue.py index 42289d3dbae..dd8b2740b6d 100644 --- a/packages/syft/src/syft/service/queue/zmq_queue.py +++ b/packages/syft/src/syft/service/queue/zmq_queue.py @@ -406,8 +406,10 @@ def update_consumer_state_for_worker( def worker_waiting(self, worker: Worker): """This worker is now waiting for work.""" # Queue to broker and service waiting lists - self.waiting.append(worker) - worker.service.waiting.append(worker) + if worker not in self.waiting: + self.waiting.append(worker) + if worker not in worker.service.waiting: + worker.service.waiting.append(worker) worker.reset_expiry() self.update_consumer_state_for_worker(worker.syft_worker_id, ConsumerState.IDLE) self.dispatch(worker.service, None) @@ -527,7 +529,10 @@ def process_worker(self, address: bytes, msg: List[bytes]): elif QueueMsgProtocol.W_HEARTBEAT == command: if worker_ready: - worker.reset_expiry() + # If worker is ready then reset expiry + # and add it to worker waiting list + # if not already present + self.worker_waiting(worker) else: # extract the syft worker id and worker pool name from the message # Get the corresponding worker pool and worker @@ -698,7 +703,7 @@ def _run(self): finally: self.clear_job() elif command == QueueMsgProtocol.W_HEARTBEAT: - pass + self.set_producer_alive() elif command == QueueMsgProtocol.W_DISCONNECT: self.reconnect_to_producer() else: