Skip to content

Commit

Permalink
feat(app): fix auto delete in trash but not deleted in es
Browse files Browse the repository at this point in the history
  • Loading branch information
MorvanZhou committed Nov 11, 2024
1 parent abe640d commit 6acff98
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/retk/core/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ async def batch_delete(au: tps.AuthedUser, nids: List[str]) -> const.CodeEnum:

backup.delete_node_md(uid=au.u.id, nids=nids)

code = await client.search.delete_batch(au=au, nids=nids)
code = await client.search.delete_batch(uid=au.u.id, nids=nids)
if code != const.CodeEnum.OK:
logger.error(f"delete search index failed, code: {code}")

Expand Down
60 changes: 58 additions & 2 deletions src/retk/core/scheduler/tasks/auto_clean_trash.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

from bson.tz_util import utc

from retk import config
from retk.models.client import init_mongo
from retk import config, const
from retk.core.node import backup
from retk.logger import logger
from retk.models.client import init_mongo, init_search
from retk.models.coll import CollNameEnum


Expand Down Expand Up @@ -34,8 +36,62 @@ async def _auto_clean_trash(delta_days=30):
"inTrashAt": {"$lt": datetime.now(tz=utc) - timedelta(days=delta_days)}
}).to_list(None)

uid_nids = {}
for node in old_nodes:
uid = node["uid"]
if uid not in uid_nids:
uid_nids[uid] = []
uid_nids[uid].append(node["id"])

# Delete all old nodes in node_md
for uid, nids in uid_nids.items():
backup.delete_node_md(uid=uid, nids=nids)

used_space_delta = {}
# remove toNodes for linked nodes
for n in old_nodes:
if n["uid"] not in used_space_delta:
used_space_delta[n["uid"]] = 0
used_space_delta[n["uid"]] -= len(n["md"].encode("utf-8"))
for linked_nid in n["toNodeIds"]:
if config.is_local_db():
# no $pull support
to_n = await db[CollNameEnum.nodes.value].find_one({"id": linked_nid})
if to_n is None:
return
try:
to_n["fromNodeIds"].remove(n["id"])
await db[CollNameEnum.nodes.value].update_one(
{"id": linked_nid},
{"$set": {"fromNodeIds": to_n["fromNodeIds"]}}
)
except ValueError:
pass
else:
await db[CollNameEnum.nodes.value].update_one(
{"id": linked_nid},
{"$pull": {"fromNodeIds": n["id"]}}
)

# Update space
for uid, delta in used_space_delta.items():
if delta == 0:
continue
await db[CollNameEnum.users.value].update_one(
{"id": uid},
{"$inc": {"usedSpace": delta}}
)

# Delete all old nodes in trash
result = await db[CollNameEnum.nodes.value].delete_many({
"_id": {"$in": [node["_id"] for node in old_nodes]}
})

# Delete all old nodes in elastic search
search = await init_search()
for uid, nids in uid_nids.items():
code = await search.delete_batch(uid=uid, nids=nids)
if code != const.CodeEnum.OK:
logger.error(f"delete search index failed, code: {code}")
logger.info(f"auto_clean_trash: {result.deleted_count} nodes deleted")
return result.deleted_count
28 changes: 14 additions & 14 deletions src/retk/models/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ def init_mongo(connection_timeout: int) -> Union["AsyncIOMotorClient", MongitaCl
return mongo, db


async def init_search() -> BaseEngine:
conf = config.get_settings()
if config.is_local_db():
if not conf.RETHINK_LOCAL_STORAGE_PATH.exists():
raise FileNotFoundError(f"Path not exists: {conf.RETHINK_LOCAL_STORAGE_PATH}")
search = LocalSearcher()
else:
search = ESSearcher()

await search.init()
return search


class Client:
coll: Collections = Collections()
mongo: Optional[Union["AsyncIOMotorClient", MongitaClientDisk]] = None
Expand All @@ -52,7 +65,7 @@ class Client:

async def init(self):
self.init_mongo()
await self.init_search()
self.search = await init_search()

if config.is_local_db():
await self.local_try_create_or_restore()
Expand Down Expand Up @@ -83,19 +96,6 @@ def init_mongo(self):
self.coll.llm_extend_node_queue = db[CollNameEnum.llm_extend_node_queue.value]
self.coll.llm_extended_node = db[CollNameEnum.llm_extended_node.value]

async def init_search(self):
conf = config.get_settings()
if config.is_local_db():
if not conf.RETHINK_LOCAL_STORAGE_PATH.exists():
raise FileNotFoundError(f"Path not exists: {conf.RETHINK_LOCAL_STORAGE_PATH}")
if not isinstance(self.search, LocalSearcher):
self.search = LocalSearcher()
else:
if not isinstance(self.search, ESSearcher):
self.search = ESSearcher()

await self.search.init()

async def close(self):
if self.search is not None:
await self.search.close()
Expand Down
4 changes: 2 additions & 2 deletions src/retk/models/search_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ async def enable(self, au: AuthedUser, nid: str) -> const.CodeEnum:
...

@abstractmethod
async def delete(self, au: AuthedUser, nid: str) -> const.CodeEnum:
async def delete(self, uid: str, nid: str) -> const.CodeEnum:
...

@abstractmethod
async def add_batch(self, au: AuthedUser, docs: List[SearchDoc]) -> const.CodeEnum:
...

@abstractmethod
async def delete_batch(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum:
async def delete_batch(self, uid: str, nids: List[str]) -> const.CodeEnum:
...

@abstractmethod
Expand Down
14 changes: 7 additions & 7 deletions src/retk/models/search_engine/engine_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,16 +360,16 @@ async def enable(self, au: AuthedUser, nid: str) -> const.CodeEnum:
return const.CodeEnum.OPERATION_FAILED
return const.CodeEnum.OK

async def delete(self, au: AuthedUser, nid: str) -> const.CodeEnum:
async def delete(self, uid: str, nid: str) -> const.CodeEnum:
doc = await self.es.get(
index=self.index,
id=nid,
)
if doc["_source"]["uid"] != au.u.id:
logger.error(f"node not belong to user {au.u.id=} {nid=}")
if doc["_source"]["uid"] != uid:
logger.error(f"node not belong to user {uid=} {nid=}")
return const.CodeEnum.NODE_NOT_EXIST
if not doc["_source"]["inTrash"]:
logger.error(f"doc not in trash, deletion failed {au.u.id=} {nid=}")
logger.error(f"doc not in trash, deletion failed {uid=} {nid=}")
return const.CodeEnum.OPERATION_FAILED

resp = await self.es.delete(
Expand All @@ -378,7 +378,7 @@ async def delete(self, au: AuthedUser, nid: str) -> const.CodeEnum:
refresh=True,
)
if resp.meta.status != 201:
logger.error(f"delete failed {au.u.id=} {nid=}")
logger.error(f"delete failed {uid=} {nid=}")
return const.CodeEnum.OPERATION_FAILED
return const.CodeEnum.OK

Expand All @@ -403,7 +403,7 @@ async def add_batch(self, au: AuthedUser, docs: List[SearchDoc]) -> const.CodeEn
now = now + datetime.timedelta(seconds=0.001)
return await self._batch_ops(actions, op_type="add", refresh=False)

async def delete_batch(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum:
async def delete_batch(self, uid: str, nids: List[str]) -> const.CodeEnum:
resp = await self.es.delete_by_query(
index=self.index,
body={
Expand All @@ -413,7 +413,7 @@ async def delete_batch(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum:
{"ids": {"values": nids}},
{
"term": {
"uid": au.u.id
"uid": uid
}
},
{
Expand Down
8 changes: 4 additions & 4 deletions src/retk/models/search_engine/engine_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ async def disable(self, au: AuthedUser, nid: str) -> const.CodeEnum:
async def enable(self, au: AuthedUser, nid: str) -> const.CodeEnum:
return await self._trash_disable_ops_batch(au=au, nids=[nid], disable=False)

async def delete(self, au: AuthedUser, nid: str) -> const.CodeEnum:
return await self.delete_batch(au=au, nids=[nid])
async def delete(self, uid: str, nid: str) -> const.CodeEnum:
return await self.delete_batch(uid=uid, nids=[nid])

async def add_batch(self, au: AuthedUser, docs: List[SearchDoc]) -> const.CodeEnum:
writer = self.ix.writer()
Expand All @@ -142,10 +142,10 @@ async def batch_to_trash(self, au: AuthedUser, nids: List[str]) -> const.CodeEnu
async def restore_batch_from_trash(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum:
return await self._trash_disable_ops_batch(au=au, nids=nids, in_trash=False)

async def delete_batch(self, au: AuthedUser, nids: List[str]) -> const.CodeEnum:
async def delete_batch(self, uid: str, nids: List[str]) -> const.CodeEnum:
writer = self.ix.writer()
for nid in nids:
q = And([Term("uid", au.u.id), Term("nid", nid), Term("inTrash", True)])
q = And([Term("uid", uid), Term("nid", nid), Term("inTrash", True)])
count = writer.delete_by_query(q=q)
if count != 1:
logger.error(f"nid {nid} not found or more than one found")
Expand Down
2 changes: 1 addition & 1 deletion tests/test_data_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def test_restore_search(self):
self.assertEqual(20 + base_count, await client.search.count_all())

code = await client.search.delete_batch(
au=self.au,
uid=self.au.u.id,
nids=nids[:10],
)
self.assertEqual(const.CodeEnum.OK, code)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_search_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def test_batch_add_update_delete(self):
self.assertEqual(const.CodeEnum.OK, code)
self.assertEqual(20, await self.searcher.count_all())

code = await self.searcher.delete_batch(au=self.au, nids=[f"nid{i}" for i in range(10)])
code = await self.searcher.delete_batch(uid=self.au.u.id, nids=[f"nid{i}" for i in range(10)])
self.assertEqual(const.CodeEnum.OK, code)
self.assertEqual(10, await self.searcher.count_all())

Expand Down
2 changes: 1 addition & 1 deletion tests/test_search_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async def test_batch_add_update_delete(self):
code = await self.searcher.batch_to_trash(au=self.au, nids=[f"nid{i}" for i in range(10)])
self.assertEqual(const.CodeEnum.OK, code)
self.assertEqual(20, await self.searcher.count_all())
code = await self.searcher.delete_batch(au=self.au, nids=[f"nid{i}" for i in range(10)])
code = await self.searcher.delete_batch(uid=self.au.u.id, nids=[f"nid{i}" for i in range(10)])
self.assertEqual(const.CodeEnum.OK, code)
self.assertEqual(10, await self.searcher.count_all())

Expand Down

0 comments on commit 6acff98

Please sign in to comment.