Skip to content

Commit

Permalink
Merge branch 'dev' into yash/k8s-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
rasswanth-s authored Jan 30, 2024
2 parents e1c14e4 + 5130907 commit c35df6e
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions packages/syft/src/syft/service/queue/zmq_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,10 @@ def update_consumer_state_for_worker(
def worker_waiting(self, worker: Worker):
"""This worker is now waiting for work."""
# Queue to broker and service waiting lists
self.waiting.append(worker)
worker.service.waiting.append(worker)
if worker not in self.waiting:
self.waiting.append(worker)
if worker not in worker.service.waiting:
worker.service.waiting.append(worker)
worker.reset_expiry()
self.update_consumer_state_for_worker(worker.syft_worker_id, ConsumerState.IDLE)
self.dispatch(worker.service, None)
Expand Down Expand Up @@ -529,7 +531,10 @@ def process_worker(self, address: bytes, msg: List[bytes]):

elif QueueMsgProtocol.W_HEARTBEAT == command:
if worker_ready:
worker.reset_expiry()
# If worker is ready then reset expiry
# and add it to worker waiting list
# if not already present
self.worker_waiting(worker)
else:
# extract the syft worker id and worker pool name from the message
# Get the corresponding worker pool and worker
Expand Down Expand Up @@ -700,7 +705,7 @@ def _run(self):
finally:
self.clear_job()
elif command == QueueMsgProtocol.W_HEARTBEAT:
pass
self.set_producer_alive()
elif command == QueueMsgProtocol.W_DISCONNECT:
self.reconnect_to_producer()
else:
Expand Down

0 comments on commit c35df6e

Please sign in to comment.