Skip to content

Commit

Permalink
Synchronize flush_all().
Browse files Browse the repository at this point in the history
The former comment about deadlock seems outdated to me. Signal
handlers run in the main thread and Fishtest executes no code
in the main thread once initialization is finished. So
signal handlers behave like asynchronous events in the same way
as api calls do, and the latter appear currently to be deadlock free.
  • Loading branch information
vdbergh committed Jan 30, 2025
1 parent af6b29a commit df4501d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
5 changes: 4 additions & 1 deletion server/fishtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ def init_rundb(event):
# We do not want to do the following in the constructor of rundb since
# it writes to the db and starts the flush timer.
if rundb.is_primary_instance():
rundb.update_aggregated_data()
# We install signal handlers when all cache sensitive code in the
# main thread is finished. In that way we can safely use
# locks in the signal handlers (which also run in the main thread).
signal.signal(signal.SIGINT, rundb.exit_run)
signal.signal(signal.SIGTERM, rundb.exit_run)
rundb.update_aggregated_data()
rundb.schedule_tasks()

config.add_subscriber(add_rundb, NewRequest)
Expand Down
11 changes: 5 additions & 6 deletions server/fishtest/run_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,11 @@ def flush_buffers(self):
self.runs.replace_one({"_id": oldest_run_id}, oldest_run)

def flush_all(self):
# Note that we do not grab locks because this method is
# called from a signal handler and grabbing locks might deadlock
for run_id in list(self.run_cache):
entry = self.run_cache.get(run_id, None)
if entry is not None and entry["is_changed"]:
self.runs.replace_one({"_id": ObjectId(run_id)}, entry["run"])
with self.run_cache_lock:
for run_id, entry in self.run_cache.items():
if entry["is_changed"]:
with self.active_run_lock(run_id):
self.runs.replace_one({"_id": ObjectId(run_id)}, entry["run"])

def clean_cache(self):
now = time.time()
Expand Down
2 changes: 1 addition & 1 deletion server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ def exit_run(self, signum, frame):
if self.scheduler is not None:
print("Stopping scheduler... ", flush=True)
self.scheduler.stop()
if self.__is_primary_instance:
if self.is_primary_instance():
print("Flushing cache... ", flush=True)
self.run_cache.flush_all()
if self.port >= 0:
Expand Down

0 comments on commit df4501d

Please sign in to comment.