From 465bd93502bce4ac3b2c4d1eda01952568e717de Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sat, 23 Nov 2024 17:20:51 +0000 Subject: [PATCH] :sparkles: decode: add `raise_on_limit_exceeded` option (#11) --- src/qs_codec/decode.py | 54 +++++++++++++++++++--- src/qs_codec/models/decode_options.py | 7 ++- src/qs_codec/utils/utils.py | 15 ++---- tests/unit/decode_test.py | 66 ++++++++++++++++++++++++++- tests/unit/example_test.py | 2 +- 5 files changed, 122 insertions(+), 22 deletions(-) diff --git a/src/qs_codec/decode.py b/src/qs_codec/decode.py index 09a2579..c79f043 100644 --- a/src/qs_codec/decode.py +++ b/src/qs_codec/decode.py @@ -48,9 +48,19 @@ def _interpret_numeric_entities(value: str) -> str: return re.sub(r"&#(\d+);", lambda match: chr(int(match.group(1))), value) -def _parse_array_value(value: t.Any, options: DecodeOptions) -> t.Any: +def _parse_array_value(value: t.Any, options: DecodeOptions, current_list_length: int) -> t.Any: if isinstance(value, str) and value and options.comma and "," in value: - return value.split(",") + split_val: t.List[str] = value.split(",") + if options.raise_on_limit_exceeded and len(split_val) > options.list_limit: + raise ValueError( + f"List limit exceeded: Only {options.list_limit} element{'' if options.list_limit == 1 else 's'} allowed in a list." + ) + return split_val + + if options.raise_on_limit_exceeded and current_list_length >= options.list_limit: + raise ValueError( + f"List limit exceeded: Only {options.list_limit} element{'' if options.list_limit == 1 else 's'} allowed in a list." + ) return value @@ -61,11 +71,26 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str clean_str: str = value.replace("?", "", 1) if options.ignore_query_prefix else value clean_str = clean_str.replace("%5B", "[").replace("%5b", "[").replace("%5D", "]").replace("%5d", "]") limit: t.Optional[int] = None if isinf(options.parameter_limit) else options.parameter_limit # type: ignore [assignment] + + if limit is not None and limit <= 0: + raise ValueError("Parameter limit must be a positive integer.") + parts: t.List[str] if isinstance(options.delimiter, re.Pattern): - parts = re.split(options.delimiter, clean_str) if not limit else re.split(options.delimiter, clean_str)[:limit] + parts = ( + re.split(options.delimiter, clean_str) + if (limit is None) or not limit + else re.split(options.delimiter, clean_str)[: (limit + 1 if options.raise_on_limit_exceeded else limit)] + ) else: - parts = clean_str.split(options.delimiter) if not limit else clean_str.split(options.delimiter)[:limit] + parts = ( + clean_str.split(options.delimiter) + if (limit is None) or not limit + else clean_str.split(options.delimiter)[: (limit + 1 if options.raise_on_limit_exceeded else limit)] + ) + + if options.raise_on_limit_exceeded and (limit is not None) and len(parts) > limit: + raise ValueError(f"Parameter limit exceeded: Only {limit} parameter{'' if limit == 1 else 's'} allowed.") skip_index: int = -1 # Keep track of where the utf8 sentinel was found i: int @@ -98,7 +123,11 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str else: key = options.decoder(part[:pos], charset) val = Utils.apply( - _parse_array_value(part[pos + 1 :], options), + _parse_array_value( + part[pos + 1 :], + options, + len(obj[key]) if key in obj and isinstance(obj[key], (list, tuple)) else 0, + ), lambda v: options.decoder(v, charset), ) @@ -123,7 +152,20 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str def _parse_object( chain: t.Union[t.List[str], t.Tuple[str, ...]], val: t.Any, options: DecodeOptions, values_parsed: bool ) -> t.Any: - leaf: t.Any = val if values_parsed else _parse_array_value(val, options) + current_list_length: int = 0 + + if bool(chain) and chain[-1] == "[]": + parent_key: t.Optional[int] + + try: + parent_key = int("".join(chain[0:-1])) + except ValueError: + parent_key = None + + if parent_key is not None and isinstance(val, (list, tuple)) and parent_key in dict(enumerate(val)): + current_list_length = len(val[parent_key]) + + leaf: t.Any = val if values_parsed else _parse_array_value(val, options, current_list_length) i: int for i in reversed(range(len(chain))): diff --git a/src/qs_codec/models/decode_options.py b/src/qs_codec/models/decode_options.py index 533b017..ea3f94e 100644 --- a/src/qs_codec/models/decode_options.py +++ b/src/qs_codec/models/decode_options.py @@ -78,10 +78,13 @@ class DecodeOptions: """To disable ``list`` parsing entirely, set ``parse_lists`` to ``False``.""" strict_depth: bool = False - """Set to ``True`` to throw an error when the input exceeds the ``depth`` limit.""" + """Set to ``True`` to raise an error when the input exceeds the ``depth`` limit.""" strict_null_handling: bool = False - """Set to true to decode values without ``=`` to ``None``.""" + """Set to ``True`` to decode values without ``=`` to ``None``.""" + + raise_on_limit_exceeded: bool = False + """Set to ``True`` to raise an error when the input contains more parameters than the ``list_limit``.""" decoder: t.Callable[[t.Optional[str], t.Optional[Charset]], t.Any] = DecodeUtils.decode """Set a ``Callable`` to affect the decoding of the input.""" diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index eef7983..348be2f 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -35,7 +35,10 @@ def merge( else: target_[len(target_)] = source - target = list(filter(lambda el: not isinstance(el, Undefined), target_.values())) + if any(isinstance(value, Undefined) for value in target_.values()): + target = {str(i): target_[i] for i in target_ if not isinstance(target_[i], Undefined)} + else: + target = list(filter(lambda el: not isinstance(el, Undefined), target_.values())) else: if isinstance(source, (list, tuple)): if all((isinstance(el, t.Mapping) or isinstance(el, Undefined)) for el in target) and all( @@ -123,20 +126,10 @@ def compact(value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: queue.append({"obj": obj, "prop": key}) refs.append(val) - Utils._compact_queue(queue) Utils._remove_undefined_from_map(value) return value - @staticmethod - def _compact_queue(queue: t.List[t.Dict]) -> None: - while len(queue) > 1: - item = queue.pop() - obj = item["obj"][item["prop"]] - - if isinstance(obj, (list, tuple)): - item["obj"][item["prop"]] = list(filter(lambda el: not isinstance(el, Undefined), obj)) - @staticmethod def _remove_undefined_from_list(value: t.List) -> None: i: int = len(value) - 1 diff --git a/tests/unit/decode_test.py b/tests/unit/decode_test.py index e395fd7..88e6307 100644 --- a/tests/unit/decode_test.py +++ b/tests/unit/decode_test.py @@ -178,7 +178,7 @@ def test_parses_a_mix_of_simple_and_explicit_lists(self) -> None: assert decode("a[0]=b&a=c") == {"a": ["b", "c"]} assert decode("a=b&a[0]=c") == {"a": ["b", "c"]} - assert decode("a[1]=b&a=c", DecodeOptions(list_limit=20)) == {"a": ["b", "c"]} + assert decode("a[1]=b&a=c", DecodeOptions(list_limit=20)) == {"a": {"1": "b", "2": "c"}} assert decode("a[]=b&a=c", DecodeOptions(list_limit=0)) == {"a": ["b", "c"]} assert decode("a[]=b&a=c") == {"a": ["b", "c"]} @@ -292,7 +292,9 @@ def test_allows_for_empty_strings_in_lists(self) -> None: assert decode("a[]=&a[]=b&a[]=c") == {"a": ["", "b", "c"]} def test_compacts_sparse_lists(self) -> None: - assert decode("a[10]=1&a[2]=2", DecodeOptions(list_limit=20)) == {"a": ["2", "1"]} + decoded = decode("a[10]=1&a[2]=2", DecodeOptions(list_limit=20)) + assert decoded == {"a": {"10": "1", "2": "2"}} + assert decoded != {"a": ["2", "1"]} assert decode("a[1][b][2][c]=1", DecodeOptions(list_limit=20)) == {"a": [{"b": [{"c": "1"}]}]} assert decode("a[1][2][3][c]=1", DecodeOptions(list_limit=20)) == {"a": [[[{"c": "1"}]]]} assert decode("a[1][2][3][c][1]=1", DecodeOptions(list_limit=20)) == {"a": [[[{"c": ["1"]}]]]} @@ -684,3 +686,63 @@ def test_decodes_successfully_when_depth_is_within_the_limit_with_strict_depth_f def test_does_not_throw_when_depth_is_exactly_at_the_limit_with_strict_depth_true(self) -> None: assert decode("a[b][c]=d", DecodeOptions(depth=2, strict_depth=True)) == {"a": {"b": {"c": "d"}}} + + +class TestParameterList: + def test_does_not_raise_error_when_within_parameter_limit(self) -> None: + assert decode("a=1&b=2&c=3", DecodeOptions(parameter_limit=5, raise_on_limit_exceeded=True)) == { + "a": "1", + "b": "2", + "c": "3", + } + + def test_raises_error_when_parameter_limit_exceeded(self) -> None: + with pytest.raises(ValueError): + decode("a=1&b=2&c=3&d=4&e=5&f=6", DecodeOptions(parameter_limit=3, raise_on_limit_exceeded=True)) + + def test_silently_truncates_when_throw_on_limit_exceeded_is_not_given(self) -> None: + assert decode("a=1&b=2&c=3&d=4&e=5", DecodeOptions(parameter_limit=3)) == {"a": "1", "b": "2", "c": "3"} + + def test_silently_truncates_when_parameter_limit_exceeded_without_error(self) -> None: + assert decode("a=1&b=2&c=3&d=4&e=5", DecodeOptions(parameter_limit=3, raise_on_limit_exceeded=False)) == { + "a": "1", + "b": "2", + "c": "3", + } + + def test_allows_unlimited_parameters_when_parameter_limit_set_to_infinity(self) -> None: + assert decode("a=1&b=2&c=3&d=4&e=5&f=6", DecodeOptions(parameter_limit=float("inf"))) == { + "a": "1", + "b": "2", + "c": "3", + "d": "4", + "e": "5", + "f": "6", + } + + +class TestListLimit: + def test_does_not_raise_error_when_within_list_limit(self) -> None: + assert decode("a[]=1&a[]=2&a[]=3", DecodeOptions(list_limit=5, raise_on_limit_exceeded=True)) == { + "a": ["1", "2", "3"], + } + + def test_raises_error_when_list_limit_exceeded(self) -> None: + with pytest.raises(ValueError): + decode("a[]=1&a[]=2&a[]=3&a[]=4", DecodeOptions(list_limit=3, raise_on_limit_exceeded=True)) + + def test_converts_list_to_map_if_length_is_greater_than_limit(self) -> None: + assert decode("a[1]=1&a[2]=2&a[3]=3&a[4]=4&a[5]=5&a[6]=6", DecodeOptions(list_limit=5)) == { + "a": {"1": "1", "2": "2", "3": "3", "4": "4", "5": "5", "6": "6"} + } + + def test_handles_list_limit_of_zero_correctly(self) -> None: + assert decode("a[]=1&a[]=2", DecodeOptions(list_limit=0)) == {"a": ["1", "2"]} + + def test_handles_negative_list_limit_correctly(self) -> None: + with pytest.raises(ValueError): + decode("a[]=1&a[]=2", DecodeOptions(list_limit=-1, raise_on_limit_exceeded=True)) + + def test_applies_list_limit_to_nested_lists(self) -> None: + with pytest.raises(ValueError): + decode("a[0][]=1&a[0][]=2&a[0][]=3&a[0][]=4", DecodeOptions(list_limit=3, raise_on_limit_exceeded=True)) diff --git a/tests/unit/example_test.py b/tests/unit/example_test.py index 38ab51c..6de9475 100644 --- a/tests/unit/example_test.py +++ b/tests/unit/example_test.py @@ -120,7 +120,7 @@ def test_lists(self): # Note that the only difference between an index in a `list` and a key in a `dict` is that the value between the # brackets must be a number to create a `list`. When creating `list`s with specific indices, **qs_codec** will compact # a sparse `list` to only the existing values preserving their order: - assert qs_codec.decode("a[1]=b&a[15]=c") == {"a": ["b", "c"]} + assert qs_codec.decode("a[1]=b&a[15]=c") == {"a": {"1": "b", "15": "c"}} # Note that an empty string is also a value, and will be preserved: assert qs_codec.decode("a[]=&a[]=b") == {"a": ["", "b"]}