diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ee2b71c9..6f806e9d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -43,7 +43,7 @@ jobs: name: > py:${{ matrix.python-version }},${{ matrix.redis-image }}, redis-py:${{ matrix.redis-py }},cov:${{ matrix.coverage }}, - lupa:${{ matrix.lupa }}, json:${{matrix.extra}} + extra:${{matrix.extra}} needs: - "lint" runs-on: ubuntu-latest @@ -58,14 +58,12 @@ jobs: - python-version: "3.11" redis-image: "redis/redis-stack:6.2.6-v10" redis-py: "5.0.1" - lupa: true - extra: true + extra: true # json, bf, lupa, cf hypothesis: true - python-version: "3.11" redis-image: "redis/redis-stack-server:7.2.0-v0" redis-py: "5.0.1" - lupa: true - extra: true # json, bf + extra: true # json, bf, lupa, cf coverage: true hypothesis: true @@ -97,14 +95,10 @@ jobs: echo "$HOME/.poetry/bin" >> $GITHUB_PATH poetry install poetry run pip install redis==${{ matrix.redis-py }} - - name: Install lupa - if: ${{ matrix.lupa }} - run: | - poetry run pip install "fakeredis[lua]" - name: Install json if: ${{ matrix.extra }} run: | - poetry run pip install "fakeredis[json,bf]" + poetry run pip install "fakeredis[json,bf,cf,lua]" - name: Get version id: getVersion shell: bash diff --git a/docs/about/changelog.md b/docs/about/changelog.md index 5a845bfb..5a22a9ed 100644 --- a/docs/about/changelog.md +++ b/docs/about/changelog.md @@ -5,7 +5,11 @@ description: Change log of all fakeredis releases ## Next release -## v2.20.2 +## v2.21.0 + +### 🚀 Features + +- Implement all cuckoo filter commands #276 ### 🐛 Bug Fixes @@ -13,8 +17,10 @@ description: Change log of all fakeredis releases ### 🧰 Maintenance -- Support for redis-py 5.1.0b1 +- Support for redis-py 5.1.0b3 - Improve `@testtools.run_test_if_redispy_ver` +- Refactor bloom filter commands implementation to use [pyprobables](https://github.com/barrust/pyprobables) instead of + pybloom_live ## v2.20.1 diff --git a/docs/redis-commands/Redis.md b/docs/redis-commands/Redis.md index c6aae43f..e40c7dcf 100644 --- a/docs/redis-commands/Redis.md +++ b/docs/redis-commands/Redis.md @@ -619,7 +619,7 @@ Closes the connection. Resets the connection. -## `bitmap` commands (6/7 implemented) +## `bitmap` commands (6/6 implemented) ### [BITCOUNT](https://redis.io/commands/bitcount/) @@ -646,13 +646,6 @@ Returns a bit value by offset. Sets or clears the bit at offset of the string value. Creates the key if it doesn't exist. -### Unsupported bitmap commands -> To implement support for a command, see [here](../../guides/implement-command/) - -#### [BITFIELD_RO](https://redis.io/commands/bitfield_ro/) (not implemented) - -Performs arbitrary read-only bitfield integer operations on strings. - ## `list` commands (22/22 implemented) diff --git a/docs/redis-commands/RedisBloom.md b/docs/redis-commands/RedisBloom.md index 88451a02..81d20f60 100644 --- a/docs/redis-commands/RedisBloom.md +++ b/docs/redis-commands/RedisBloom.md @@ -44,60 +44,59 @@ Returns the cardinality of a Bloom filter +## `cf` commands (12/12 implemented) -### Unsupported cf commands -> To implement support for a command, see [here](../../guides/implement-command/) - -#### [CF.RESERVE](https://redis.io/commands/cf.reserve/) (not implemented) +### [CF.RESERVE](https://redis.io/commands/cf.reserve/) Creates a new Cuckoo Filter -#### [CF.ADD](https://redis.io/commands/cf.add/) (not implemented) +### [CF.ADD](https://redis.io/commands/cf.add/) Adds an item to a Cuckoo Filter -#### [CF.ADDNX](https://redis.io/commands/cf.addnx/) (not implemented) +### [CF.ADDNX](https://redis.io/commands/cf.addnx/) Adds an item to a Cuckoo Filter if the item did not exist previously. -#### [CF.INSERT](https://redis.io/commands/cf.insert/) (not implemented) +### [CF.INSERT](https://redis.io/commands/cf.insert/) Adds one or more items to a Cuckoo Filter. A filter will be created if it does not exist -#### [CF.INSERTNX](https://redis.io/commands/cf.insertnx/) (not implemented) +### [CF.INSERTNX](https://redis.io/commands/cf.insertnx/) Adds one or more items to a Cuckoo Filter if the items did not exist previously. A filter will be created if it does not exist -#### [CF.EXISTS](https://redis.io/commands/cf.exists/) (not implemented) +### [CF.EXISTS](https://redis.io/commands/cf.exists/) Checks whether one or more items exist in a Cuckoo Filter -#### [CF.MEXISTS](https://redis.io/commands/cf.mexists/) (not implemented) +### [CF.MEXISTS](https://redis.io/commands/cf.mexists/) Checks whether one or more items exist in a Cuckoo Filter -#### [CF.DEL](https://redis.io/commands/cf.del/) (not implemented) +### [CF.DEL](https://redis.io/commands/cf.del/) Deletes an item from a Cuckoo Filter -#### [CF.COUNT](https://redis.io/commands/cf.count/) (not implemented) +### [CF.COUNT](https://redis.io/commands/cf.count/) Return the number of times an item might be in a Cuckoo Filter -#### [CF.SCANDUMP](https://redis.io/commands/cf.scandump/) (not implemented) +### [CF.SCANDUMP](https://redis.io/commands/cf.scandump/) Begins an incremental save of the bloom filter -#### [CF.LOADCHUNK](https://redis.io/commands/cf.loadchunk/) (not implemented) +### [CF.LOADCHUNK](https://redis.io/commands/cf.loadchunk/) Restores a filter previously saved using SCANDUMP -#### [CF.INFO](https://redis.io/commands/cf.info/) (not implemented) +### [CF.INFO](https://redis.io/commands/cf.info/) Returns information about a Cuckoo Filter + ### Unsupported cms commands > To implement support for a command, see [here](../../guides/implement-command/) diff --git a/fakeredis/_fakesocket.py b/fakeredis/_fakesocket.py index 4937be5e..0d9e703b 100644 --- a/fakeredis/_fakesocket.py +++ b/fakeredis/_fakesocket.py @@ -1,4 +1,4 @@ -from fakeredis.stack import JSONCommandsMixin, BFCommandsMixin +from fakeredis.stack import JSONCommandsMixin, BFCommandsMixin, CFCommandsMixin from ._basefakesocket import BaseFakeSocket from .commands_mixins.bitmap_mixin import BitmapCommandsMixin from .commands_mixins.connection_mixin import ConnectionCommandsMixin @@ -41,6 +41,7 @@ class FakeSocket( JSONCommandsMixin, GeoCommandsMixin, BFCommandsMixin, + CFCommandsMixin, ): def __init__(self, server, db): super(FakeSocket, self).__init__(server, db) diff --git a/fakeredis/stack/__init__.py b/fakeredis/stack/__init__.py index e053193c..47d3a933 100644 --- a/fakeredis/stack/__init__.py +++ b/fakeredis/stack/__init__.py @@ -11,13 +11,18 @@ class JSONCommandsMixin: # type: ignore # noqa: E303 pass try: - import pybloom_live # noqa: F401 + import probables # noqa: F401 from ._bf_mixin import BFCommandsMixin # noqa: F401 + from ._cf_mixin import CFCommandsMixin # noqa: F401 except ImportError as e: - if e.name == "fakeredis.stack._bf_mixin": + if e.name == "fakeredis.stack._bf_mixin" or e.name == "fakeredis.stack._cf_mixin": raise e class BFCommandsMixin: # type: ignore # noqa: E303 pass + + + class CFCommandsMixin: # noqa: E303 + pass diff --git a/fakeredis/stack/_bf_mixin.py b/fakeredis/stack/_bf_mixin.py index 429e783e..3f126e51 100644 --- a/fakeredis/stack/_bf_mixin.py +++ b/fakeredis/stack/_bf_mixin.py @@ -1,7 +1,7 @@ """Command mixin for emulating `redis-py`'s BF functionality.""" import io -import pybloom_live +from probables import ExpandingBloomFilter from fakeredis import _msgs as msgs from fakeredis._command_args_parsing import extract_args @@ -9,22 +9,28 @@ from fakeredis._helpers import SimpleError, OK, casematch -class ScalableBloomFilter(pybloom_live.ScalableBloomFilter): +class ScalableBloomFilter(ExpandingBloomFilter): NO_GROWTH = 0 - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.filters.append( - pybloom_live.BloomFilter( - capacity=self.initial_capacity, - error_rate=self.error_rate * self.ratio)) + def __init__(self, capacity: int = 100, error_rate: float = 0.001, scale: int = 2): + super().__init__(capacity, error_rate) + self.scale: int = scale - def add(self, key): + def add(self, key: bytes, force: bool = False) -> bool: if key in self: return True - if self.scale == self.NO_GROWTH and self.filters and self.filters[-1].count >= self.filters[-1].capacity: + if self.scale == self.NO_GROWTH and self.elements_added >= self.estimated_elements: raise SimpleError(msgs.FILTER_FULL_MSG) - return super(ScalableBloomFilter, self).add(key) + super(ScalableBloomFilter, self).add(key) + return False + + @classmethod + def frombytes(cls, b: bytes, **kwargs) -> "ScalableBloomFilter": + size, est_els, added_els, fpr = cls._parse_footer(b) + blm = ScalableBloomFilter(capacity=est_els, error_rate=fpr) + blm._parse_blooms(b, size) + blm._added_elements = added_els + return blm class BFCommandsMixin: @@ -64,7 +70,7 @@ def bf_madd(self, key, *values): repeat=(), ) def bf_card(self, key): - return len(key.value) + return key.value.elements_added @command( name="BF.EXISTS", @@ -147,10 +153,10 @@ def bf_info(self, key: CommandItem, *args: bytes): raise SimpleError(msgs.SYNTAX_ERROR_MSG) if len(args) == 0: return [ - b'Capacity', key.value.capacity, - b'Size', key.value.capacity, - b'Number of filters', len(key.value.filters), - b'Number of items inserted', key.value.count, + b'Capacity', key.value.estimated_elements, + b'Size', key.value.elements_added, + b'Number of filters', key.value.expansions + 1, + b'Number of items inserted', key.value.elements_added, b'Expansion rate', key.value.scale if key.value.scale > 0 else None, ] if casematch(args[0], b'CAPACITY'): @@ -178,10 +184,7 @@ def bf_scandump(self, key: CommandItem, iterator: int): f = io.BytesIO() if iterator == 0: - key.value.tofile(f) - f.seek(0) - s = f.read() - f.close() + s = bytes(key.value) return [1, s] else: return [0, None] @@ -195,8 +198,5 @@ def bf_scandump(self, key: CommandItem, iterator: int): def bf_loadchunk(self, key: CommandItem, iterator: int, data: bytes): if key.value is not None and type(key.value) is not ScalableBloomFilter: raise SimpleError(msgs.NOT_FOUND_MSG) - f = io.BytesIO(data) - key.value = ScalableBloomFilter.fromfile(f) - f.close() - key.updated() + key.update(ScalableBloomFilter.frombytes(data)) return OK diff --git a/fakeredis/stack/_cf_mixin.py b/fakeredis/stack/_cf_mixin.py new file mode 100644 index 00000000..73e92c51 --- /dev/null +++ b/fakeredis/stack/_cf_mixin.py @@ -0,0 +1,231 @@ +"""Command mixin for emulating `redis-py`'s cuckoo filter functionality.""" +import io + +from probables import CountingCuckooFilter, CuckooFilterFullError + +from fakeredis import _msgs as msgs +from fakeredis._command_args_parsing import extract_args +from fakeredis._commands import command, CommandItem, Int, Key +from fakeredis._helpers import SimpleError, OK, casematch + + +class ScalableCuckooFilter(CountingCuckooFilter): + + def __init__(self, capacity: int, bucket_size: int = 2, max_iterations: int = 20, expansion: int = 1): + super().__init__(capacity, bucket_size, max_iterations, expansion) + self.initial_capacity: int = capacity + self.inserted: int = 0 + self.deleted: int = 0 + + def insert(self, item: bytes) -> bool: + try: + super().add(item) + except CuckooFilterFullError: + return False + self.inserted += 1 + return True + + def count(self, item: bytes) -> int: + return super().check(item) + + def delete(self, item: bytes) -> bool: + if super().remove(item): + self.deleted += 1 + return True + return False + + +class CFCommandsMixin: + + @staticmethod + def _cf_add(key: CommandItem, item: bytes) -> int: + if key.value is None: + key.update(ScalableCuckooFilter(1024)) + res = key.value.insert(item) + key.updated() + return 1 if res else 0 + + @staticmethod + def _cf_exist(key: CommandItem, item: bytes) -> int: + return 1 if (item in key.value) else 0 + + @command( + name="CF.ADD", + fixed=(Key(ScalableCuckooFilter), bytes), + repeat=(), + ) + def cf_add(self, key: CommandItem, value: bytes): + return CFCommandsMixin._cf_add(key, value) + + @command( + name="CF.ADDNX", + fixed=(Key(ScalableCuckooFilter), bytes), + repeat=(), + ) + def cf_addnx(self, key: CommandItem, value: bytes): + if value in key.value: + return 0 + return CFCommandsMixin._cf_add(key, value) + + @command( + name="CF.COUNT", + fixed=(Key(ScalableCuckooFilter), bytes), + repeat=(), + ) + def cf_count(self, key: CommandItem, item: bytes): + return 1 if self._cf_exist(key, item) else 0 # todo + + @command( + name="CF.DEL", + fixed=(Key(ScalableCuckooFilter), bytes), + repeat=(), + ) + def cf_del(self, key: CommandItem, value: bytes): + if key.value is None: + raise SimpleError(msgs.NOT_FOUND_MSG) + res = key.value.delete(value) + return 1 if res else 0 + + @command( + name="CF.EXISTS", + fixed=(Key(ScalableCuckooFilter), bytes), + repeat=(), + ) + def cf_exist(self, key: CommandItem, value: bytes): + return CFCommandsMixin._cf_exist(key, value) + + @command( + name="CF.INFO", + fixed=(Key(),), + repeat=(), + ) + def cf_info(self, key: CommandItem): + if key.value is None or type(key.value) is not ScalableCuckooFilter: + raise SimpleError('...') + return [ + b'Size', key.value.capacity, + b'Number of buckets', len(key.value.buckets), + b'Number of filters', (key.value.capacity / key.value.initial_capacity) / key.value.expansion_rate, + b'Number of items inserted', key.value.inserted, + b'Number of items deleted', key.value.deleted, + b'Bucket size', key.value.bucket_size, + b'Max iterations', key.value.max_swaps, + b'Expansion rate', key.value.expansion_rate, + ] + + @command( + name="CF.INSERT", + fixed=(Key(),), + repeat=(bytes,), + ) + def cf_insert(self, key: CommandItem, *args: bytes): + (capacity, no_create), left_args = extract_args( + args, ("+capacity", "nocreate"), + error_on_unexpected=False, left_from_first_unexpected=True) + # if no_create and (capacity is not None or error_rate is not None): + # raise SimpleError("...") + if len(left_args) < 2 or not casematch(left_args[0], b'items'): + raise SimpleError("...") + items = left_args[1:] + capacity = capacity or 1024 + + if key.value is None and no_create: + raise SimpleError(msgs.NOT_FOUND_MSG) + if key.value is None: + key.value = ScalableCuckooFilter(capacity) + res = list() + for item in items: + res.append(self._cf_add(key, item)) + key.updated() + return res + + @command( + name="CF.INSERTNX", + fixed=(Key(),), + repeat=(bytes,), + ) + def cf_insertnx(self, key: CommandItem, *args: bytes): + (capacity, no_create), left_args = extract_args( + args, ("+capacity", "nocreate"), + error_on_unexpected=False, left_from_first_unexpected=True) + # if no_create and (capacity is not None or error_rate is not None): + # raise SimpleError("...") + if len(left_args) < 2 or not casematch(left_args[0], b'items'): + raise SimpleError("...") + items = left_args[1:] + capacity = capacity or 1024 + if key.value is None and no_create: + raise SimpleError(msgs.NOT_FOUND_MSG) + if key.value is None: + key.value = ScalableCuckooFilter(capacity) + res = list() + for item in items: + if item in key.value: + res.append(0) + else: + res.append(self._cf_add(key, item)) + key.updated() + return res + + @command( + name="CF.MEXISTS", + fixed=(Key(ScalableCuckooFilter), bytes), + repeat=(bytes,), + ) + def cf_mexists(self, key: CommandItem, *values: bytes): + res = list() + for value in values: + res.append(CFCommandsMixin._cf_exist(key, value)) + return res + + @command( + name="CF.RESERVE", + fixed=(Key(), Int,), + repeat=(bytes,), + flags=msgs.FLAG_LEAVE_EMPTY_VAL, + ) + def cf_reserve(self, key: CommandItem, capacity: int, *args: bytes): + if key.value is not None: + raise SimpleError(msgs.ITEM_EXISTS_MSG) + (bucketsize, maxiterations, expansion), _ = extract_args(args, ("+bucketsize", "+maxiterations", "+expansion")) + + maxiterations = maxiterations or 20 + bucketsize = bucketsize or 2 + value = ScalableCuckooFilter(capacity, bucket_size=bucketsize, max_iterations=maxiterations) + key.update(value) + return OK + + @command( + name="CF.SCANDUMP", + fixed=(Key(), Int,), + repeat=(), + flags=msgs.FLAG_LEAVE_EMPTY_VAL, + ) + def cf_scandump(self, key: CommandItem, iterator: int): + if key.value is None: + raise SimpleError(msgs.NOT_FOUND_MSG) + f = io.BytesIO() + + if iterator == 0: + key.value.tofile(f) + f.seek(0) + s = f.read() + f.close() + return [1, s] + else: + return [0, None] + + @command( + name="CF.LOADCHUNK", + fixed=(Key(), Int, bytes), + repeat=(), + flags=msgs.FLAG_LEAVE_EMPTY_VAL, + ) + def cf_loadchunk(self, key: CommandItem, iterator: int, data: bytes): + if key.value is not None and type(key.value) is not ScalableCuckooFilter: + raise SimpleError(msgs.NOT_FOUND_MSG) + f = io.BytesIO(data) + key.value = ScalableCuckooFilter.fromfile(f) + f.close() + key.updated() + return OK diff --git a/pyproject.toml b/pyproject.toml index 2613ab9d..a358225a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ name = "fakeredis" packages = [ { include = "fakeredis" }, ] -version = "2.20.2" +version = "2.21.0" description = "Python implementation of redis API, can be used for testing purposes." readme = "README.md" keywords = ["redis", "RedisJson", "tests", "redis-stack"] @@ -47,12 +47,14 @@ redis = ">=4" sortedcontainers = "^2" lupa = { version = ">=1.14,<3.0", optional = true } jsonpath-ng = { version = "^1.6", optional = true } -pybloom-live = { version = "^4.0", optional = true } +pyprobables = { version = "^0.6", optional = true } [tool.poetry.extras] lua = ["lupa"] json = ["jsonpath-ng"] -bf = ["pybloom-live"] +bf = ["pyprobables"] +cf = ["pyprobables"] +probabilistic = ["pyprobables"] [tool.poetry.dev-dependencies] coverage = "^7" diff --git a/test/test_stack/test_bloom_redis_py.py b/test/test_stack/test_bloom_redis_py.py index af6cfd3a..bb2a4b9f 100644 --- a/test/test_stack/test_bloom_redis_py.py +++ b/test/test_stack/test_bloom_redis_py.py @@ -2,7 +2,7 @@ import redis.commands.bf from redis.commands.bf import BFInfo -json_tests = pytest.importorskip("pybloom_live") +json_tests = pytest.importorskip("probables") def get_protocol_version(r): diff --git a/test/test_stack/test_bloomfilter.py b/test/test_stack/test_bloomfilter.py index 268ab3dc..5d9dad65 100644 --- a/test/test_stack/test_bloomfilter.py +++ b/test/test_stack/test_bloomfilter.py @@ -3,7 +3,7 @@ from fakeredis import _msgs as msgs -json_tests = pytest.importorskip("pybloom_live") +bloom_tests = pytest.importorskip("probables") def test_bf_add(r: redis.Redis): diff --git a/test/test_stack/test_cuckoofilter.py b/test/test_stack/test_cuckoofilter.py new file mode 100644 index 00000000..ba4142f8 --- /dev/null +++ b/test/test_stack/test_cuckoofilter.py @@ -0,0 +1,35 @@ +import pytest +import redis + +cuckoofilters_tests = pytest.importorskip("probables") + + +def test_cf_add_and_insert(r: redis.Redis): + assert r.cf().create("cuckoo", 1000) + assert r.cf().add("cuckoo", "filter") + assert not r.cf().addnx("cuckoo", "filter") + assert 1 == r.cf().addnx("cuckoo", "newItem") + assert [1] == r.cf().insert("captest", ["foo"]) + assert [1] == r.cf().insert("captest", ["foo"], capacity=1000) + assert [1] == r.cf().insertnx("captest", ["bar"]) + assert [1] == r.cf().insertnx("captest", ["food"], nocreate="1") + assert [0, 0, 1] == r.cf().insertnx("captest", ["foo", "bar", "baz"]) + assert [0] == r.cf().insertnx("captest", ["bar"], capacity=1000) + assert [1] == r.cf().insert("empty1", ["foo"], capacity=1000) + assert [1] == r.cf().insertnx("empty2", ["bar"], capacity=1000) + info = r.cf().info("captest") + assert info.get("insertedNum") == 5 + assert info.get("deletedNum") == 0 + assert info.get("filterNum") == 1 + + +def test_cf_exists_and_del(r: redis.Redis): + assert r.cf().create("cuckoo", 1000) + assert r.cf().add("cuckoo", "filter") + assert r.cf().exists("cuckoo", "filter") + assert not r.cf().exists("cuckoo", "notexist") + assert [1, 0] == r.cf().mexists("cuckoo", "filter", "notexist") + assert 1 == r.cf().count("cuckoo", "filter") + assert 0 == r.cf().count("cuckoo", "notexist") + assert r.cf().delete("cuckoo", "filter") + assert 0 == r.cf().count("cuckoo", "filter")