From b5a9626df4a5531c5fac778f3f4cfcc937e1a8de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=B8=85=27=CF=89=27=E0=B8=85?= Date: Mon, 23 Dec 2024 11:34:41 +0800 Subject: [PATCH] fix: RuntimeWarning: coroutine 'Channel.close' was never awaited when closing async client (#2497) Signed-off-by: Ruichen Bao --- pymilvus/client/async_grpc_handler.py | 4 ++-- pymilvus/milvus_client/async_milvus_client.py | 4 ++-- pymilvus/orm/connections.py | 7 +++++++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pymilvus/client/async_grpc_handler.py b/pymilvus/client/async_grpc_handler.py index c9fe1320e..ca847f8b4 100644 --- a/pymilvus/client/async_grpc_handler.py +++ b/pymilvus/client/async_grpc_handler.py @@ -109,9 +109,9 @@ def __enter__(self): def __exit__(self: object, exc_type: object, exc_val: object, exc_tb: object): pass - def close(self): + async def close(self): self.deregister_state_change_callbacks() - self._async_channel.close() + await self._async_channel.close() def _setup_authorization_interceptor(self, user: str, password: str, token: str): keys = [] diff --git a/pymilvus/milvus_client/async_milvus_client.py b/pymilvus/milvus_client/async_milvus_client.py index 61d5f71db..efb507158 100644 --- a/pymilvus/milvus_client/async_milvus_client.py +++ b/pymilvus/milvus_client/async_milvus_client.py @@ -516,8 +516,8 @@ def create_schema(cls, **kwargs): kwargs["check_fields"] = False # do not check fields for now return CollectionSchema([], **kwargs) - def close(self): - connections.disconnect(self._using) + async def close(self): + await connections.async_disconnect(self._using) def _get_connection(self): return connections._fetch_handler(self._using) diff --git a/pymilvus/orm/connections.py b/pymilvus/orm/connections.py index 151b31e7d..5add5cb76 100644 --- a/pymilvus/orm/connections.py +++ b/pymilvus/orm/connections.py @@ -285,6 +285,13 @@ def disconnect(self, alias: str): if alias in self._connected_alias: self._connected_alias.pop(alias).close() + async def async_disconnect(self, alias: str): + if not isinstance(alias, str): + raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias)) + + if alias in self._connected_alias: + await self._connected_alias.pop(alias).close() + def remove_connection(self, alias: str): """Removes connection from the registry.