diff --git a/src/retk/core/node/node.py b/src/retk/core/node/node.py index 3bee96f..d5162e7 100644 --- a/src/retk/core/node/node.py +++ b/src/retk/core/node/node.py @@ -361,7 +361,7 @@ async def batch_delete(au: tps.AuthedUser, nids: List[str]) -> const.CodeEnum: backup.delete_node_md(uid=au.u.id, nids=nids) - code = await client.search.delete_batch(au=au, nids=nids) + code = await client.search.delete_batch(uid=au.u.id, nids=nids) if code != const.CodeEnum.OK: logger.error(f"delete search index failed, code: {code}") diff --git a/src/retk/core/scheduler/tasks/auto_clean_trash.py b/src/retk/core/scheduler/tasks/auto_clean_trash.py index cee798e..8383ce1 100644 --- a/src/retk/core/scheduler/tasks/auto_clean_trash.py +++ b/src/retk/core/scheduler/tasks/auto_clean_trash.py @@ -3,8 +3,10 @@ from bson.tz_util import utc -from retk import config -from retk.models.client import init_mongo +from retk import config, const +from retk.core.node import backup +from retk.logger import logger +from retk.models.client import init_mongo, init_search from retk.models.coll import CollNameEnum @@ -34,8 +36,62 @@ async def _auto_clean_trash(delta_days=30): "inTrashAt": {"$lt": datetime.now(tz=utc) - timedelta(days=delta_days)} }).to_list(None) + uid_nids = {} + for node in old_nodes: + uid = node["uid"] + if uid not in uid_nids: + uid_nids[uid] = [] + uid_nids[uid].append(node["id"]) + + # Delete all old nodes in node_md + for uid, nids in uid_nids.items(): + backup.delete_node_md(uid=uid, nids=nids) + + used_space_delta = {} + # remove toNodes for linked nodes + for n in old_nodes: + if n["uid"] not in used_space_delta: + used_space_delta[n["uid"]] = 0 + used_space_delta[n["uid"]] -= len(n["md"].encode("utf-8")) + for linked_nid in n["toNodeIds"]: + if config.is_local_db(): + # no $pull support + to_n = await db[CollNameEnum.nodes.value].find_one({"id": linked_nid}) + if to_n is None: + return + try: + to_n["fromNodeIds"].remove(n["id"]) + await db[CollNameEnum.nodes.value].update_one( + {"id": linked_nid}, + {"$set": {"fromNodeIds": to_n["fromNodeIds"]}} + ) + except ValueError: + pass + else: + await db[CollNameEnum.nodes.value].update_one( + {"id": linked_nid}, + {"$pull": {"fromNodeIds": n["id"]}} + ) + + # Update space + for uid, delta in used_space_delta.items(): + if delta == 0: + continue + await db[CollNameEnum.users.value].update_one( + {"id": uid}, + {"$inc": {"usedSpace": delta}} + ) + # Delete all old nodes in trash result = await db[CollNameEnum.nodes.value].delete_many({ "_id": {"$in": [node["_id"] for node in old_nodes]} }) + + # Delete all old nodes in elastic search + search = await init_search() + for uid, nids in uid_nids.items(): + code = await search.delete_batch(uid=uid, nids=nids) + if code != const.CodeEnum.OK: + logger.error(f"delete search index failed, code: {code}") + logger.info(f"auto_clean_trash: {result.deleted_count} nodes deleted") return result.deleted_count diff --git a/src/retk/models/client.py b/src/retk/models/client.py index a3ecd69..a01a035 100644 --- a/src/retk/models/client.py +++ b/src/retk/models/client.py @@ -44,6 +44,19 @@ def init_mongo(connection_timeout: int) -> Union["AsyncIOMotorClient", MongitaCl return mongo, db +async def init_search() -> BaseEngine: + conf = config.get_settings() + if config.is_local_db(): + if not conf.RETHINK_LOCAL_STORAGE_PATH.exists(): + raise FileNotFoundError(f"Path not exists: {conf.RETHINK_LOCAL_STORAGE_PATH}") + search = LocalSearcher() + else: + search = ESSearcher() + + await search.init() + return search + + class Client: coll: Collections = Collections() mongo: Optional[Union["AsyncIOMotorClient", MongitaClientDisk]] = None @@ -52,7 +65,7 @@ class Client: async def init(self): self.init_mongo() - await self.init_search() + self.search = await init_search() if config.is_local_db(): await self.local_try_create_or_restore() @@ -83,19 +96,6 @@ def init_mongo(self): self.coll.llm_extend_node_queue = db[CollNameEnum.llm_extend_node_queue.value] self.coll.llm_extended_node = db[CollNameEnum.llm_extended_node.value] - async def init_search(self): - conf = config.get_settings() - if config.is_local_db(): - if not conf.RETHINK_LOCAL_STORAGE_PATH.exists(): - raise FileNotFoundError(f"Path not exists: {conf.RETHINK_LOCAL_STORAGE_PATH}") - if not isinstance(self.search, LocalSearcher): - self.search = LocalSearcher() - else: - if not isinstance(self.search, ESSearcher): - self.search = ESSearcher() - - await self.search.init() - async def close(self): if self.search is not None: await self.search.close() diff --git a/src/retk/models/search_engine/engine.py b/src/retk/models/search_engine/engine.py index 445137a..b484111 100644 --- a/src/retk/models/search_engine/engine.py +++ b/src/retk/models/search_engine/engine.py @@ -90,7 +90,7 @@ async def enable(self, au: AuthedUser, nid: str) -> const.CodeEnum: ... @abstractmethod - async def delete(self, au: AuthedUser, nid: str) -> const.CodeEnum: + async def delete(self, uid: str, nid: str) -> const.CodeEnum: ... @abstractmethod @@ -98,7 +98,7 @@ async def add_batch(self, au: AuthedUser, docs: List[SearchDoc]) -> const.CodeEn ... @abstractmethod - async def delete_batch(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum: + async def delete_batch(self, uid: str, nids: List[str]) -> const.CodeEnum: ... @abstractmethod diff --git a/src/retk/models/search_engine/engine_es.py b/src/retk/models/search_engine/engine_es.py index fdbfa9f..987c8e7 100644 --- a/src/retk/models/search_engine/engine_es.py +++ b/src/retk/models/search_engine/engine_es.py @@ -360,16 +360,16 @@ async def enable(self, au: AuthedUser, nid: str) -> const.CodeEnum: return const.CodeEnum.OPERATION_FAILED return const.CodeEnum.OK - async def delete(self, au: AuthedUser, nid: str) -> const.CodeEnum: + async def delete(self, uid: str, nid: str) -> const.CodeEnum: doc = await self.es.get( index=self.index, id=nid, ) - if doc["_source"]["uid"] != au.u.id: - logger.error(f"node not belong to user {au.u.id=} {nid=}") + if doc["_source"]["uid"] != uid: + logger.error(f"node not belong to user {uid=} {nid=}") return const.CodeEnum.NODE_NOT_EXIST if not doc["_source"]["inTrash"]: - logger.error(f"doc not in trash, deletion failed {au.u.id=} {nid=}") + logger.error(f"doc not in trash, deletion failed {uid=} {nid=}") return const.CodeEnum.OPERATION_FAILED resp = await self.es.delete( @@ -378,7 +378,7 @@ async def delete(self, au: AuthedUser, nid: str) -> const.CodeEnum: refresh=True, ) if resp.meta.status != 201: - logger.error(f"delete failed {au.u.id=} {nid=}") + logger.error(f"delete failed {uid=} {nid=}") return const.CodeEnum.OPERATION_FAILED return const.CodeEnum.OK @@ -403,7 +403,7 @@ async def add_batch(self, au: AuthedUser, docs: List[SearchDoc]) -> const.CodeEn now = now + datetime.timedelta(seconds=0.001) return await self._batch_ops(actions, op_type="add", refresh=False) - async def delete_batch(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum: + async def delete_batch(self, uid: str, nids: List[str]) -> const.CodeEnum: resp = await self.es.delete_by_query( index=self.index, body={ @@ -413,7 +413,7 @@ async def delete_batch(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum: {"ids": {"values": nids}}, { "term": { - "uid": au.u.id + "uid": uid } }, { diff --git a/src/retk/models/search_engine/engine_local.py b/src/retk/models/search_engine/engine_local.py index 3b295b7..bb06087 100644 --- a/src/retk/models/search_engine/engine_local.py +++ b/src/retk/models/search_engine/engine_local.py @@ -114,8 +114,8 @@ async def disable(self, au: AuthedUser, nid: str) -> const.CodeEnum: async def enable(self, au: AuthedUser, nid: str) -> const.CodeEnum: return await self._trash_disable_ops_batch(au=au, nids=[nid], disable=False) - async def delete(self, au: AuthedUser, nid: str) -> const.CodeEnum: - return await self.delete_batch(au=au, nids=[nid]) + async def delete(self, uid: str, nid: str) -> const.CodeEnum: + return await self.delete_batch(uid=uid, nids=[nid]) async def add_batch(self, au: AuthedUser, docs: List[SearchDoc]) -> const.CodeEnum: writer = self.ix.writer() @@ -142,10 +142,10 @@ async def batch_to_trash(self, au: AuthedUser, nids: List[str]) -> const.CodeEnu async def restore_batch_from_trash(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum: return await self._trash_disable_ops_batch(au=au, nids=nids, in_trash=False) - async def delete_batch(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum: + async def delete_batch(self, uid: str, nids: List[str]) -> const.CodeEnum: writer = self.ix.writer() for nid in nids: - q = And([Term("uid", au.u.id), Term("nid", nid), Term("inTrash", True)]) + q = And([Term("uid", uid), Term("nid", nid), Term("inTrash", True)]) count = writer.delete_by_query(q=q) if count != 1: logger.error(f"nid {nid} not found or more than one found") diff --git a/tests/test_data_restore.py b/tests/test_data_restore.py index 13e1252..1ca73d6 100644 --- a/tests/test_data_restore.py +++ b/tests/test_data_restore.py @@ -45,7 +45,7 @@ async def test_restore_search(self): self.assertEqual(20 + base_count, await client.search.count_all()) code = await client.search.delete_batch( - au=self.au, + uid=self.au.u.id, nids=nids[:10], ) self.assertEqual(const.CodeEnum.OK, code) diff --git a/tests/test_search_es.py b/tests/test_search_es.py index 90bd2c7..6202b96 100644 --- a/tests/test_search_es.py +++ b/tests/test_search_es.py @@ -227,7 +227,7 @@ async def test_batch_add_update_delete(self): self.assertEqual(const.CodeEnum.OK, code) self.assertEqual(20, await self.searcher.count_all()) - code = await self.searcher.delete_batch(au=self.au, nids=[f"nid{i}" for i in range(10)]) + code = await self.searcher.delete_batch(uid=self.au.u.id, nids=[f"nid{i}" for i in range(10)]) self.assertEqual(const.CodeEnum.OK, code) self.assertEqual(10, await self.searcher.count_all()) diff --git a/tests/test_search_local.py b/tests/test_search_local.py index 0b91950..78d2676 100644 --- a/tests/test_search_local.py +++ b/tests/test_search_local.py @@ -175,7 +175,7 @@ async def test_batch_add_update_delete(self): code = await self.searcher.batch_to_trash(au=self.au, nids=[f"nid{i}" for i in range(10)]) self.assertEqual(const.CodeEnum.OK, code) self.assertEqual(20, await self.searcher.count_all()) - code = await self.searcher.delete_batch(au=self.au, nids=[f"nid{i}" for i in range(10)]) + code = await self.searcher.delete_batch(uid=self.au.u.id, nids=[f"nid{i}" for i in range(10)]) self.assertEqual(const.CodeEnum.OK, code) self.assertEqual(10, await self.searcher.count_all())