From 5b71c7a8e45195272ff12793a6570397a4b80dab Mon Sep 17 00:00:00 2001 From: Khoroshevskyi Date: Fri, 20 Dec 2024 16:22:15 -0500 Subject: [PATCH] improved reindexing batching --- bbconf/modules/bedfiles.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/bbconf/modules/bedfiles.py b/bbconf/modules/bedfiles.py index 047d1ec..20f7bb6 100644 --- a/bbconf/modules/bedfiles.py +++ b/bbconf/modules/bedfiles.py @@ -1258,7 +1258,7 @@ def _sql_search_count(self, query: str) -> int: count = session.execute(statement).one() return count[0] - def reindex_qdrant(self, batch: int = 1000) -> None: + def reindex_qdrant(self, batch: int = 100) -> None: """ Re-upload all files to quadrant. !Warning: only hg38 genome can be added to qdrant! @@ -1271,7 +1271,9 @@ def reindex_qdrant(self, batch: int = 1000) -> None: """ bb_client = BBClient() - annotation_result = self.get_ids_list(limit=100000, genome=QDRANT_GENOME) + annotation_result = self.get_ids_list( + limit=100000, genome=QDRANT_GENOME, offset=0 + ) if not annotation_result.results: _LOGGER.error("No bed files found.") @@ -1280,6 +1282,7 @@ def reindex_qdrant(self, batch: int = 1000) -> None: with tqdm(total=len(results), position=0, leave=True) as pbar: points_list = [] + processed_number = 0 for record in results: try: bed_region_set_obj = GRegionSet(bb_client.seek(record.id)) @@ -1298,18 +1301,26 @@ def reindex_qdrant(self, batch: int = 1000) -> None: ), ) ) + processed_number += 1 + if processed_number % batch == 0: + pbar.set_description(f"Uploading points to qdrant using batch...") + operation_info = self._config.qdrant_engine.qd_client.upsert( + collection_name=self._config.config.qdrant.file_collection, + points=points_list, + ) + pbar.write("Uploaded batch to qdrant.") + points_list = [] + assert operation_info.status == "completed" + pbar.write(f"File: {record.id} successfully indexed.") pbar.update(1) _LOGGER.info(f"Uploading points to qdrant using batches...") - for i in range(0, len(points_list), batch): - operation_info = self._config.qdrant_engine.qd_client.upsert( - collection_name=self._config.config.qdrant.file_collection, - points=points_list[i : i + batch], - ) - - assert operation_info.status == "completed" - + operation_info = self._config.qdrant_engine.qd_client.upsert( + collection_name=self._config.config.qdrant.file_collection, + points=points_list, + ) + assert operation_info.status == "completed" return None def delete_qdrant_point(self, identifier: str) -> None: