Skip to content

Commit

Permalink
set busy timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Praneeth Bedapudi <praneethbedapudi@gmail.com>
  • Loading branch information
bedapudi6788 committed Dec 12, 2023
1 parent 2ec7dc4 commit 0e472f5
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 328 deletions.
5 changes: 2 additions & 3 deletions liteindex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .defined_index import DefinedIndex
from .function_cache import function_cache
from .kv_index import KVIndex
from .common_utils import EvictAny, EvictLRU, EvictLFU
# from .function_cache import function_cache
# from .kv_index import KVIndex
6 changes: 0 additions & 6 deletions liteindex/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,3 @@ def set_ulimit():
break
except:
limit = limit // 2


# Eviction policy names
EvictAny = "any"
EvictLRU = "lru"
EvictLFU = "lfu"
4 changes: 4 additions & 0 deletions liteindex/defined_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def __connection(self):
f"PRAGMA cache_size=-{self.ram_cache_mb * 1024}"
)

self.__local_storage.db_conn.execute(
f"PRAGMA BUSY_TIMEOUT=60000"
)

return self.__local_storage.db_conn

@property
Expand Down
223 changes: 73 additions & 150 deletions liteindex/function_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,159 +4,82 @@
import pickle
import hashlib
import tempfile
from .common_utils import set_ulimit, EvictAny, EvictLRU, EvictLFU
from .common_utils import set_ulimit

set_ulimit()




def function_cache(
dir=tempfile.mkdtemp(),
max_size_mb=1000,
eviction_policy=EvictAny,
invalidate_older_than_seconds=None,
):
env = lmdb.open(
path=dir,
subdir=True,
map_size=max_size_mb * 1024**2,
metasync=False,
sync=False,
create=True,
writemap=False,
max_readers=2048,
meminit=False,
max_dbs=3,
)

cache_db = env.open_db(b"cache", create=True)
if eviction_policy == EvictLRU:
last_accessed_time_to_key_hash_db = env.open_db(
b"last_accessed_time_to_key_hash",
class Cache:
def __init__(self, dir=tempfile.mkdtemp(), max_size_mb=2048, eviction_policy=None):
self.dir = dir
self.max_size_mb = max_size_mb
self.eviction_policy = eviction_policy if max_size_mb is not None else EvictNone

self.__env = lmdb.open(
path=dir,
subdir=True,
map_size=max_size_mb * 1024**2 if eviction_policy else 512 ** 5,
metasync=True,
sync=True,
create=True,
dupsort=True,
integerdup=True,
integerkey=True,
dupfixed=True,
)
key_hash_to_last_accessed_time_db = env.open_db(
b"key_hash_to_last_accessed_time", create=True
writemap=False,
max_readers=2048,
meminit=False,
max_dbs=3,
)
elif eviction_policy == EvictLFU:
access_count_to_key_hash_db = env.open_db(
b"access_count_to_key_hash",
create=True,
dupsort=True,
integerdup=True,
integerkey=True,
dupfixed=True,
)
key_hash_to_access_count_db = env.open_db(
b"key_hash_to_access_count", create=True
)

def decorator(func):
def wrapper(*args, **kwargs):
inputs_hash = hashlib.sha256(
pickle.dumps((args, kwargs), protocol=pickle.HIGHEST_PROTOCOL)
).digest()

with env.begin(
write=eviction_policy in {EvictLRU, EvictLFU}, buffers=True
) as txn:
result = txn.get(inputs_hash, db=cache_db)
if result is not None:
if eviction_policy == EvictLRU:
_time_in_bytes = int(time.time() * 1e6).to_bytes(
8, byteorder="big"
)
txn.put(
_time_in_bytes,
inputs_hash,
db=last_accessed_time_to_key_hash_db,
dupdata=True,
)
txn.put(
inputs_hash,
_time_in_bytes,
db=key_hash_to_last_accessed_time_db,
)
elif eviction_policy == EvictLFU:
_access_count_in_bytes = txn.get(
inputs_hash, db=key_hash_to_access_count_db
)
_access_count = int.from_bytes(
_access_count_in_bytes, byteorder="big"
)
txn.put(
inputs_hash,
(_access_count + 1).to_bytes(8, byteorder="big"),
db=key_hash_to_access_count_db,
)
txn.delete(
_access_count_in_bytes,
inputs_hash,
db=access_count_to_key_hash_db,
)

return pickle.loads(result)

result = func(*args, **kwargs)

with env.begin(write=True) as txn:
stat = txn.stat(db=cache_db)
current_size_mb = stat["psize"] * stat["leaf_pages"] / 1024**2
current_count = stat["entries"]
if current_size_mb > max_size_mb * 0.8:
if eviction_policy == EvictAny:
cursor = txn.cursor(db=cache_db)
while True:
for _ in range(current_count // 3):
if cursor.first() is None:
break
cursor.delete()

stat = txn.stat(db=cache_db)
current_size_mb = (
stat["psize"] * stat["leaf_pages"] / 1024**2
)
if current_size_mb < max_size_mb * 0.8:
break

txn.put(
inputs_hash,
pickle.dumps(result, protocol=pickle.HIGHEST_PROTOCOL),
db=cache_db,
)

if eviction_policy == EvictLRU:
_time_in_bytes = int(time.time()).to_bytes(8, byteorder="big")
txn.put(
_time_in_bytes,
inputs_hash,
db=last_accessed_time_to_key_hash_db,
dupdata=True,
)
txn.put(
inputs_hash,
_time_in_bytes,
db=key_hash_to_last_accessed_time_db,
)
elif eviction_policy == EvictLFU:
txn.put(
inputs_hash,
b"\x00\x00\x00\x00\x00\x00\x00\x01",
db=key_hash_to_access_count_db,
)
txn.put(
b"\x00\x00\x00\x00\x00\x00\x00\x01",
inputs_hash,
db=access_count_to_key_hash_db,
)

return result

return wrapper

return decorator
self.__cache_db = self.__env.open_db(b"cache", create=True)
if eviction_policy == EvictLRU:
self.__last_accessed_time_to_key_hash_db = self.__env.open_db(
b"last_accessed_time_to_key_hash",
create=True,
dupsort=True,
integerdup=True,
integerkey=True,
dupfixed=True,
)
self.__key_hash_to_last_accessed_time_db = self.__env.open_db(
b"key_hash_to_last_accessed_time", create=True
)
elif eviction_policy == EvictLFU:
self.__access_count_to_key_hash_db = self.__env.open_db(
b"access_count_to_key_hash",
create=True,
dupsort=True,
integerdup=True,
integerkey=True,
dupfixed=True,
)
self.__key_hash_to_access_count_db = self.__env.open_db(
b"key_hash_to_access_count", create=True
)


def __setitem__(self, key, value):
with self.__env.begin(write=True) as txn:
txn.put(
hashlib.sha256(pickle.dumps(key, protocol=pickle.HIGHEST_PROTOCOL)).digest(),
pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL),
db=self.__cache_db,
)

def __getitem__(self, key):
with self.__env.begin(write=False) as txn:
result = txn.get(hashlib.sha256(pickle.dumps(key, protocol=pickle.HIGHEST_PROTOCOL)).digest(), db=self.__cache_db)
return pickle.loads(result) if result is not None else None



if __name__ == "__main__":
test_cache = Cache(max_size_mb=1000, eviction_policy=EvictNone)
s = time.time()
for i in range(100000):
test_cache[i] = i
print(time.time() - s)

import diskcache
dc_cache = diskcache.Index(tempfile.mkdtemp())
s = time.time()
for i in range(100000):
dc_cache[i] = i
print(time.time() - s)

Loading

0 comments on commit 0e472f5

Please sign in to comment.