diff --git a/src/schematic/event_buffer.py b/src/schematic/event_buffer.py index f10cedd..9e62c37 100644 --- a/src/schematic/event_buffer.py +++ b/src/schematic/event_buffer.py @@ -30,6 +30,7 @@ def __init__( # Start periodic flushing thread self.flush_thread = threading.Thread(target=self._periodic_flush) + self.flush_thread.daemon = True self.flush_thread.start() def _flush(self): @@ -47,17 +48,9 @@ def _flush(self): self.current_size = 0 def _periodic_flush(self): - while True: - if self.shutdown.wait(timeout=self.interval): - # Stop accepting new events - self.stopped = True - - # Flush any remaining events - self._flush() - - break - else: - self._flush() + while not self.shutdown.is_set(): + self._flush() + self.shutdown.wait(timeout=self.interval) def push(self, event: CreateEventRequestBody): if self.stopped: @@ -74,8 +67,9 @@ def push(self, event: CreateEventRequestBody): def stop(self): try: + self.stopped = True self.shutdown.set() - self.flush_thread.join() + self.flush_thread.join(timeout=5) except Exception as e: self.logger.error(f"Panic occurred while closing client: {e}") @@ -116,18 +110,14 @@ async def _flush(self): self.current_size = 0 async def _periodic_flush(self): - while True: + while not self.shutdown_event.is_set(): + await self._flush() try: - await asyncio.wait_for(self.shutdown_event.wait(), timeout=self.interval) - # Stop accepting new events - self.stopped = True - - # Flush any remaining events - await self._flush() - - break + await asyncio.wait_for( + self.shutdown_event.wait(), timeout=self.interval + ) except asyncio.TimeoutError: - await self._flush() + pass async def push(self, event: CreateEventRequestBody): if self.stopped: @@ -144,6 +134,7 @@ async def push(self, event: CreateEventRequestBody): async def stop(self): try: + self.stopped = True self.shutdown_event.set() await self.flush_task except Exception as e: