Skip to content

Commit

Permalink
Be more careful when calling method in other thread
Browse files Browse the repository at this point in the history
  • Loading branch information
medihack committed Apr 7, 2024
1 parent ad210f2 commit 94fb190
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions adit/selective_transfer/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def connect(self):
await self.accept()

async def disconnect(self, code: int) -> None:
self._abort_operators()
await self._abort_operators()
self.pool.shutdown(wait=False, cancel_futures=True)
logger.debug("Disconnected from WebSocket client with code: %s", code)

Expand All @@ -80,7 +80,7 @@ async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
# First we abort all operators as we received a new command what to do
self.current_message_id += 1
message_id = self.current_message_id
self._abort_operators()
await self._abort_operators()

if action == "cancel" or action == "reset":
# The connectors are already aborted, so we can just update the UI
Expand Down Expand Up @@ -126,11 +126,12 @@ async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
else:
raise ValueError(f"Invalid action to process: {action}")

def _abort_operators(self) -> None:
async def _abort_operators(self) -> None:
loop = asyncio.get_event_loop()
while self.query_operators:
for operator in self.query_operators[:]:
self.query_operators.remove(operator)
operator.abort()
loop.call_soon_threadsafe(operator.abort)

@database_sync_to_async
def check_permission(self) -> str | None:
Expand Down

0 comments on commit 94fb190

Please sign in to comment.