Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement CMS commands #277

Merged
merged 9 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/about/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ description: Change log of all fakeredis releases
### 🚀 Features

- Implement all cuckoo filter commands #276
- Implement all Count-Min Sketch commands #277

### 🐛 Bug Fixes

Expand Down
17 changes: 8 additions & 9 deletions docs/redis-commands/RedisBloom.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,36 +96,35 @@ Returns information about a Cuckoo Filter



## `cms` commands (6/6 implemented)

### Unsupported cms commands
> To implement support for a command, see [here](../../guides/implement-command/)

#### [CMS.INITBYDIM](https://redis.io/commands/cms.initbydim/) <small>(not implemented)</small>
### [CMS.INITBYDIM](https://redis.io/commands/cms.initbydim/)

Initializes a Count-Min Sketch to dimensions specified by user

#### [CMS.INITBYPROB](https://redis.io/commands/cms.initbyprob/) <small>(not implemented)</small>
### [CMS.INITBYPROB](https://redis.io/commands/cms.initbyprob/)

Initializes a Count-Min Sketch to accommodate requested tolerances.

#### [CMS.INCRBY](https://redis.io/commands/cms.incrby/) <small>(not implemented)</small>
### [CMS.INCRBY](https://redis.io/commands/cms.incrby/)

Increases the count of one or more items by increment

#### [CMS.QUERY](https://redis.io/commands/cms.query/) <small>(not implemented)</small>
### [CMS.QUERY](https://redis.io/commands/cms.query/)

Returns the count for one or more items in a sketch

#### [CMS.MERGE](https://redis.io/commands/cms.merge/) <small>(not implemented)</small>
### [CMS.MERGE](https://redis.io/commands/cms.merge/)

Merges several sketches into one sketch

#### [CMS.INFO](https://redis.io/commands/cms.info/) <small>(not implemented)</small>
### [CMS.INFO](https://redis.io/commands/cms.info/)

Returns information about a sketch




### Unsupported topk commands
> To implement support for a command, see [here](../../guides/implement-command/)

Expand Down
4 changes: 2 additions & 2 deletions fakeredis/_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ def apply(
and type(item.value) is not type_.type_
):
raise SimpleError(msgs.WRONGTYPE_MSG)
if (
type_.type_ is not None
if (msgs.FLAG_NO_INITIATE not in self.flags
and type_.type_ is not None
and item is None
and type_.type_ is not bytes
):
Expand Down
3 changes: 2 additions & 1 deletion fakeredis/_fakesocket.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fakeredis.stack import JSONCommandsMixin, BFCommandsMixin, CFCommandsMixin
from fakeredis.stack import JSONCommandsMixin, BFCommandsMixin, CFCommandsMixin, CMSCommandsMixin
from ._basefakesocket import BaseFakeSocket
from .commands_mixins.bitmap_mixin import BitmapCommandsMixin
from .commands_mixins.connection_mixin import ConnectionCommandsMixin
Expand Down Expand Up @@ -42,6 +42,7 @@ class FakeSocket(
GeoCommandsMixin,
BFCommandsMixin,
CFCommandsMixin,
CMSCommandsMixin,
):
def __init__(self, server, db):
super(FakeSocket, self).__init__(server, db)
1 change: 1 addition & 0 deletions fakeredis/_msgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
FLAG_NO_SCRIPT = "s" # Command not allowed in scripts
FLAG_LEAVE_EMPTY_VAL = "v"
FLAG_TRANSACTION = "t"
FLAG_NO_INITIATE = "i"
GEO_UNSUPPORTED_UNIT = "unsupported unit provided. please use M, KM, FT, MI"
LPOS_RANK_CAN_NOT_BE_ZERO = (
"RANK can't be zero: use 1 to start from the first match, 2 from the second ... "
Expand Down
2 changes: 1 addition & 1 deletion fakeredis/commands.json

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion fakeredis/stack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ class JSONCommandsMixin: # type: ignore # noqa: E303

from ._bf_mixin import BFCommandsMixin # noqa: F401
from ._cf_mixin import CFCommandsMixin # noqa: F401
from ._cms_mixin import CMSCommandsMixin # noqa: F401
except ImportError as e:
if e.name == "fakeredis.stack._bf_mixin" or e.name == "fakeredis.stack._cf_mixin":
raise e


class BFCommandsMixin: # type: ignore # noqa: E303
class BFCommandsMixin: # noqa: E303
pass


class CFCommandsMixin: # noqa: E303
pass


class CMSCommandsMixin: # noqa: E303
pass
123 changes: 123 additions & 0 deletions fakeredis/stack/_cms_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Command mixin for emulating `redis-py`'s Count-min sketch functionality."""

import probables

from fakeredis import _msgs as msgs
from fakeredis._commands import command, CommandItem, Int, Key, Float
from fakeredis._helpers import OK, SimpleString, SimpleError, casematch


class CountMinSketch(probables.CountMinSketch):
def __init__(self, width: int = None, depth: int = None, probability: float = None, error_rate: float = None):
super().__init__(width=width, depth=depth, error_rate=error_rate, confidence=probability)


class CMSCommandsMixin:

@command(
name="CMS.INCRBY",
fixed=(Key(CountMinSketch), bytes, bytes),
repeat=(bytes, bytes,),
flags=msgs.FLAG_NO_INITIATE,
)
def cms_incrby(self, key: CommandItem, *args: bytes):
if key.value is None:
raise SimpleError("CMS: key does not exist")
pairs = []
for i in range(0, len(args), 2):
try:
pairs.append((args[i], int(args[i + 1])))
except ValueError:
raise SimpleError("CMS: Cannot parse number")
res = []
for pair in pairs:
res.append(key.value.add(pair[0], pair[1]))
key.updated()
return res

@command(
name="CMS.INFO",
fixed=(Key(CountMinSketch),),
repeat=(),
flags=msgs.FLAG_NO_INITIATE,
)
def cms_info(self, key: CommandItem):
if key.value is None:
raise SimpleError("CMS: key does not exist")
return [
b"width", key.value.width,
b"depth", key.value.depth,
b"count", key.value.elements_added,
]

@command(
name="CMS.INITBYDIM",
fixed=(Key(CountMinSketch), Int, Int),
repeat=(),
flags=msgs.FLAG_NO_INITIATE,
)
def cms_initbydim(self, key: CommandItem, width: int, depth: int) -> SimpleString:
if key.value is not None:
raise SimpleError("CMS key already set")
if width < 1:
raise SimpleError("CMS: invalid width")
if depth < 1:
raise SimpleError("CMS: invalid depth")
key.update(CountMinSketch(width=width, depth=depth))
return OK

@command(
name="CMS.INITBYPROB",
fixed=(Key(CountMinSketch), Float, Float),
repeat=(),
flags=msgs.FLAG_NO_INITIATE,
)
def cms_initby_prob(self, key: CommandItem, error_rate: float, probability: float) -> SimpleString:
if key.value is not None:
raise SimpleError("CMS key already set")
if error_rate <= 0 or error_rate >= 1:
raise SimpleError("CMS: invalid overestimation value")
if probability <= 0 or probability >= 1:
raise SimpleError("CMS: invalid prob value")
key.update(CountMinSketch(probability=probability, error_rate=error_rate))
return OK

@command(
name="CMS.MERGE",
fixed=(Key(CountMinSketch), Int, bytes),
repeat=(bytes,),
flags=msgs.FLAG_NO_INITIATE,
)
def cms_merge(self, dest_key: CommandItem, num_keys: int, *args: bytes) -> SimpleString:
if dest_key.value is None:
raise SimpleError("CMS: key does not exist")

if num_keys < 1:
raise SimpleError("CMS: wrong number of keys")
weights = [1, ]
for i, arg in enumerate(args):
if casematch(b"weights", arg):
weights = [int(i) for i in args[i + 1:]]
if len(weights) != num_keys:
raise SimpleError("CMS: wrong number of keys/weights")
args = args[:i]
break
dest_key.value.clear()
for i, arg in enumerate(args):
item = self._db.get(arg, None)
if item is None or not isinstance(item.value, CountMinSketch):
raise SimpleError("CMS: key does not exist")
for _ in range(weights[i % len(weights)]):
dest_key.value.join(item.value)
return OK

@command(
name="CMS.QUERY",
fixed=(Key(CountMinSketch), bytes),
repeat=(bytes,),
flags=msgs.FLAG_NO_INITIATE,
)
def cms_query(self, key: CommandItem, *items: bytes):
if key.value is None:
raise SimpleError("CMS: key does not exist")
return [key.value.check(item) for item in items]
39 changes: 4 additions & 35 deletions test/test_stack/test_bloom_redis_py.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,10 @@
json_tests = pytest.importorskip("probables")


def get_protocol_version(r):
if isinstance(r, redis.Redis) or isinstance(r, redis.asyncio.Redis):
return r.connection_pool.connection_kwargs.get("protocol")
elif isinstance(r, redis.cluster.AbstractRedisCluster):
return r.nodes_manager.connection_kwargs.get("protocol")


def assert_resp_response(r, response, resp2_expected, resp3_expected):
protocol = get_protocol_version(r)
if protocol in [2, "2", None]:
assert response == resp2_expected
else:
assert response == resp3_expected


def intlist(obj):
return [int(v) for v in obj]


@pytest.mark.xfail
def test_create(r: redis.Redis):
"""Test CREATE/RESERVE calls"""
assert r.bf().create("bloom", 0.01, 1000)
Expand All @@ -36,7 +20,7 @@ def test_create(r: redis.Redis):
assert r.cf().create("cuckoo_mi", 1000, max_iterations=10)
assert r.cms().initbydim("cmsDim", 100, 5)
assert r.cms().initbyprob("cmsProb", 0.01, 0.01)
assert r.topk().reserve("topk", 5, 100, 5, 0.9)
# assert r.topk().reserve("topk", 5, 100, 5, 0.9) TODO


def test_bf_reserve(r: redis.Redis):
Expand Down Expand Up @@ -68,24 +52,9 @@ def test_bf_insert(r: redis.Redis):
assert 0 == r.bf().exists("bloom", "noexist")
assert [1, 0] == intlist(r.bf().mexists("bloom", "foo", "noexist"))
info = r.bf().info("bloom")
assert_resp_response(
r,
2,
info.get("insertedNum"),
info.get("Number of items inserted"),
)
assert_resp_response(
r,
1000,
info.get("capacity"),
info.get("Capacity"),
)
assert_resp_response(
r,
1,
info.get("filterNum"),
info.get("Number of filters"),
)
assert 2 == info.get("insertedNum")
assert 1000 == info.get("capacity")
assert 1 == info.get("filterNum")


def test_bf_scandump_and_loadchunk(r: redis.Redis):
Expand Down
Loading