From 82b20c19e8f28dd38d964e8bfce4575812ce2ef6 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sat, 3 Aug 2024 10:28:46 -0400 Subject: [PATCH 1/4] add test for transitive pipeline fallbacks --- lenskit/tests/test_pipeline.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lenskit/tests/test_pipeline.py b/lenskit/tests/test_pipeline.py index 29373f038..af3a1ff0d 100644 --- a/lenskit/tests/test_pipeline.py +++ b/lenskit/tests/test_pipeline.py @@ -472,6 +472,25 @@ def add(x: int, y: int) -> int: pipe.run(na, a=3) +def test_fallback_transitive(): + "test that a fallback works if a dependency's dependency fails" + pipe = Pipeline() + ia = pipe.create_input("a", int) + ib = pipe.create_input("b", int) + + def double(x: int) -> int: + return 2 * x + + # two components, each with a different input + c1 = pipe.add_component("double-a", double, x=ia) + c2 = pipe.add_component("double-b", double, x=ib) + # use the first that succeeds + c = pipe.use_first_of("result", c1, c2) + + # omitting the first input should result in the second component + assert pipe.run(c, b=17) == 34 + + def test_train(ml_ds: Dataset): pipe = Pipeline() item = pipe.create_input("item", int) From 1335f5526d3a30fc2d532e91d00a6f8650f806c8 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sat, 3 Aug 2024 10:32:06 -0400 Subject: [PATCH 2/4] add depth & docs for transitivity --- lenskit/lenskit/pipeline/__init__.py | 32 +++++++++++++++++----------- lenskit/tests/test_pipeline.py | 19 +++++++++++++++++ 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/lenskit/lenskit/pipeline/__init__.py b/lenskit/lenskit/pipeline/__init__.py index b676eab69..871977ccf 100644 --- a/lenskit/lenskit/pipeline/__init__.py +++ b/lenskit/lenskit/pipeline/__init__.py @@ -257,23 +257,21 @@ def use_first_of(self, name: str, *nodes: Node[T | None]) -> Node[T]: .. code:: python - pipe = Pipeline() - # allow candidate items to be optionally specified - items = pipe.create_input('items', list[EntityId], None) - # find candidates from the training data (optional) - lookup_candidates = pipe.add_component( - 'select-candidates', - UnratedTrainingItemsCandidateSelector(), + pipe = Pipeline() # allow candidate items to be optionally specified + items = pipe.create_input('items', list[EntityId], None) # find + candidates from the training data (optional) lookup_candidates = + pipe.add_component( + 'select-candidates', UnratedTrainingItemsCandidateSelector(), user=history, - ) - # if the client provided items as a pipeline input, use those; otherwise - # use the candidate selector we just configured. - candidates = pipe.use_first_of('candidates', items, lookup_candidates) + ) # if the client provided items as a pipeline input, use those; + otherwise # use the candidate selector we just configured. + candidates = pipe.use_first_of('candidates', items, + lookup_candidates) .. note:: - This method does not distinguish between an input being unspecified and - explicitly specified as ``None``. + This method does not distinguish between an input being unspecified + and explicitly specified as ``None``. .. note:: @@ -284,6 +282,14 @@ def use_first_of(self, name: str, *nodes: Node[T | None]) -> Node[T]: did not score. A specific itemwise fallback component is needed for such an operation. + .. note:: + + If one of the fallback elements is a component ``A`` that depends on + another component or input ``B``, and ``B`` is missing or returns + ``None`` such that ``A`` would usually fail, then ``A`` will be + skipped and the fallback will move on to the next node. This works + with arbitrarily-deep transitive chains. + Args: name: The name of the node. diff --git a/lenskit/tests/test_pipeline.py b/lenskit/tests/test_pipeline.py index af3a1ff0d..cb30dc08c 100644 --- a/lenskit/tests/test_pipeline.py +++ b/lenskit/tests/test_pipeline.py @@ -491,6 +491,25 @@ def double(x: int) -> int: assert pipe.run(c, b=17) == 34 +def test_fallback_transitive_deeper(): + "deeper transitive fallback test" + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def negative(x: int) -> int: + return -x + + def double(x: int) -> int: + return x * 2 + + nd = pipe.add_component("double", double, x=a) + nn = pipe.add_component("negate", negative, x=nd) + nr = pipe.use_first_of("fill-operand", nn, b) + + assert pipe.run(nr, b=8) == 8 + + def test_train(ml_ds: Dataset): pipe = Pipeline() item = pipe.create_input("item", int) From 60e783a0781777329acdc62a8d5aafbed716b2e2 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sat, 3 Aug 2024 10:36:28 -0400 Subject: [PATCH 3/4] make fallback chains work transitively --- lenskit/lenskit/pipeline/runner.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/lenskit/lenskit/pipeline/runner.py b/lenskit/lenskit/pipeline/runner.py index d15084c8b..d4d601693 100644 --- a/lenskit/lenskit/pipeline/runner.py +++ b/lenskit/lenskit/pipeline/runner.py @@ -62,7 +62,13 @@ def run(self, node: Node[Any], *, required: bool = True) -> Any: self.status[node.name] = "failed" raise e - return self.state[node.name] + try: + return self.state[node.name] + except KeyError as e: + if required: + raise e + else: + return None def _run_node(self, node: Node[Any], required: bool) -> None: match node: @@ -71,7 +77,7 @@ def _run_node(self, node: Node[Any], required: bool) -> None: case InputNode(name, types=types): self._inject_input(name, types, required) case ComponentNode(name, comp, inputs, wiring): - self._run_component(name, comp, inputs, wiring) + self._run_component(name, comp, inputs, wiring, required) case FallbackNode(name, alts): self._run_fallback(name, alts) case _: # pragma: nocover @@ -93,6 +99,7 @@ def _run_component( comp: Component[Any], inputs: dict[str, type | None], wiring: dict[str, str], + required: bool, ) -> None: in_data = {} _log.debug("processing inputs for component %s", name) @@ -106,11 +113,15 @@ def _run_component( if snode is None: ival = None else: - if itype: - required = not is_compatible_data(None, itype) + if required and itype: + ireq = not is_compatible_data(None, itype) else: - required = False - ival = self.run(snode, required=required) + ireq = False + ival = self.run(snode, required=ireq) + + # bail out if we're trying to satisfy a non-required dependency + if ival is None and itype and not is_compatible_data(None, itype) and not required: + return None if itype and not is_compatible_data(ival, itype): raise TypeError( From de9be525a033e6a30eae0f01c6bb1198380aefa9 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sat, 3 Aug 2024 10:37:58 -0400 Subject: [PATCH 4/4] add test for component-based failure --- lenskit/tests/test_pipeline.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/lenskit/tests/test_pipeline.py b/lenskit/tests/test_pipeline.py index cb30dc08c..8954faa04 100644 --- a/lenskit/tests/test_pipeline.py +++ b/lenskit/tests/test_pipeline.py @@ -510,6 +510,30 @@ def double(x: int) -> int: assert pipe.run(nr, b=8) == 8 +def test_fallback_transitive_nodefail(): + "deeper transitive fallback test" + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def negative(x: int) -> int | None: + # make this return None in some cases to trigger failure + if x >= 0: + return -x + else: + return None + + def double(x: int) -> int: + return x * 2 + + nd = pipe.add_component("double", double, x=a) + nn = pipe.add_component("negate", negative, x=nd) + nr = pipe.use_first_of("fill-operand", nn, b) + + assert pipe.run(nr, a=2, b=8) == -4 + assert pipe.run(nr, a=-7, b=8) == 8 + + def test_train(ml_ds: Dataset): pipe = Pipeline() item = pipe.create_input("item", int)