From df4501d3ba9dc0a833f35cf3e412964d8717f419 Mon Sep 17 00:00:00 2001 From: Michel Van den Bergh Date: Thu, 30 Jan 2025 07:50:52 +0000 Subject: [PATCH] Synchronize flush_all(). The former comment about deadlock seems outdated to me. Signal handlers run in the main thread and Fishtest executes no code in the main thread once initialization is finished. So signal handlers behave like asynchronous events in the same way as api calls do, and the latter appear currently to be deadlock free. --- server/fishtest/__init__.py | 5 ++++- server/fishtest/run_cache.py | 11 +++++------ server/fishtest/rundb.py | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/server/fishtest/__init__.py b/server/fishtest/__init__.py index 3fcf99ec6..2f8514ba5 100644 --- a/server/fishtest/__init__.py +++ b/server/fishtest/__init__.py @@ -98,9 +98,12 @@ def init_rundb(event): # We do not want to do the following in the constructor of rundb since # it writes to the db and starts the flush timer. if rundb.is_primary_instance(): + rundb.update_aggregated_data() + # We install signal handlers when all cache sensitive code in the + # main thread is finished. In that way we can safely use + # locks in the signal handlers (which also run in the main thread). signal.signal(signal.SIGINT, rundb.exit_run) signal.signal(signal.SIGTERM, rundb.exit_run) - rundb.update_aggregated_data() rundb.schedule_tasks() config.add_subscriber(add_rundb, NewRequest) diff --git a/server/fishtest/run_cache.py b/server/fishtest/run_cache.py index 24edfdc26..716ab6e75 100644 --- a/server/fishtest/run_cache.py +++ b/server/fishtest/run_cache.py @@ -156,12 +156,11 @@ def flush_buffers(self): self.runs.replace_one({"_id": oldest_run_id}, oldest_run) def flush_all(self): - # Note that we do not grab locks because this method is - # called from a signal handler and grabbing locks might deadlock - for run_id in list(self.run_cache): - entry = self.run_cache.get(run_id, None) - if entry is not None and entry["is_changed"]: - self.runs.replace_one({"_id": ObjectId(run_id)}, entry["run"]) + with self.run_cache_lock: + for run_id, entry in self.run_cache.items(): + if entry["is_changed"]: + with self.active_run_lock(run_id): + self.runs.replace_one({"_id": ObjectId(run_id)}, entry["run"]) def clean_cache(self): now = time.time() diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 93fe7fece..1b2caa958 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -681,7 +681,7 @@ def exit_run(self, signum, frame): if self.scheduler is not None: print("Stopping scheduler... ", flush=True) self.scheduler.stop() - if self.__is_primary_instance: + if self.is_primary_instance(): print("Flushing cache... ", flush=True) self.run_cache.flush_all() if self.port >= 0: