diff --git a/src/poprox_recommender/default.py b/src/poprox_recommender/default.py index b7438184..b26b0e36 100644 --- a/src/poprox_recommender/default.py +++ b/src/poprox_recommender/default.py @@ -8,10 +8,10 @@ from poprox_recommender.components.filters import TopicFilter from poprox_recommender.components.joiners import Fill from poprox_recommender.components.rankers.topk import TopkRanker -from poprox_recommender.components.samplers import UniformSampler +from poprox_recommender.components.samplers.uniform import UniformSampler from poprox_recommender.components.scorers import ArticleScorer +from poprox_recommender.lkpipeline import Pipeline, PipelineState from poprox_recommender.model import get_model -from poprox_recommender.pipeline import PipelineState, RecommendationPipeline logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -25,22 +25,25 @@ def select_articles( algo_params: dict[str, Any] | None = None, ) -> PipelineState: """ - Select articles with default recommender configuration. + Select articles with default recommender configuration. It returns a + pipeline state whose ``default`` is the final list of recommendations. """ pipeline = None pipeline = personalized_pipeline(num_slots, algo_params) + assert pipeline is not None - inputs = { - "candidate": candidate_articles, - "clicked": clicked_articles, - "profile": interest_profile, - } + recs = pipeline.node("recommender") + topk = pipeline.node("ranker", missing="none") + if topk is None: + wanted = (recs,) + else: + wanted = (topk, recs) - return pipeline(inputs) + return pipeline.run_all(*wanted, candidate=candidate_articles, clicked=clicked_articles, profile=interest_profile) -def personalized_pipeline(num_slots: int, algo_params: dict[str, Any] | None = None) -> RecommendationPipeline | None: +def personalized_pipeline(num_slots: int, algo_params: dict[str, Any] | None = None) -> Pipeline | None: """ Create the default personalized recommendation pipeline. @@ -82,23 +85,31 @@ def personalized_pipeline(num_slots: int, algo_params: dict[str, Any] | None = N logger.info("Recommendations will be ranked with plain top-k.") ranker = topk_ranker - pipeline = RecommendationPipeline(name=diversify) + # TODO put pipeline name back in + pipeline = Pipeline() + + # Define pipeline inputs + candidates = pipeline.create_input("candidate", ArticleSet) + clicked = pipeline.create_input("clicked", ArticleSet) + profile = pipeline.create_input("profile", InterestProfile) # Compute embeddings - pipeline.add(article_embedder, inputs=["candidate"], output="candidate") - pipeline.add(article_embedder, inputs=["clicked"], output="clicked") - pipeline.add(user_embedder, inputs=["clicked", "profile"], output="profile") + e_cand = pipeline.add_component("candidate-embedder", article_embedder, article_set=candidates) + e_click = pipeline.add_component("history-emberdder", article_embedder, article_set=clicked) + e_user = pipeline.add_component("user-embedder", user_embedder, clicked_articles=e_click, interest_profile=profile) # Score and rank articles with diversification/calibration reranking - pipeline.add(article_scorer, inputs=["candidate", "profile"], output="candidate") - pipeline.add(ranker, inputs=["candidate", "profile"], output="reranked") - - # Output the plain descending-by-score ranking for comparison - pipeline.add(topk_ranker, inputs=["candidate", "profile"], output="ranked") + o_scored = pipeline.add_component("scorer", article_scorer, candidate_articles=e_cand, interest_profile=e_user) + o_topk = pipeline.add_component("ranker", topk_ranker, candidate_articles=o_scored, interest_profile=e_user) + if ranker is topk_ranker: + o_rank = o_topk + else: + o_rank = pipeline.add_component("reranker", ranker, candidate_articles=o_scored, interest_profile=e_user) # Fallback in case not enough articles came from the ranker - pipeline.add(topic_filter, inputs=["candidate", "profile"], output="topical") - pipeline.add(sampler, inputs=["topical", "candidate"], output="sampled") - pipeline.add(fill, inputs=["reranked", "sampled"], output="recs") + # TODO: make this lazy so the sampler only runs if the reranker isn't enough + o_filtered = pipeline.add_component("topic-filter", topic_filter, candidate=candidates, interest_profile=profile) + o_sampled = pipeline.add_component("sampler", sampler, candidate=o_filtered, backup=candidates) + pipeline.add_component("recommender", fill, candidates1=o_rank, candidates2=o_sampled) return pipeline diff --git a/src/poprox_recommender/evaluation/offline.py b/src/poprox_recommender/evaluation/offline.py index ad37ff5d..03527869 100644 --- a/src/poprox_recommender/evaluation/offline.py +++ b/src/poprox_recommender/evaluation/offline.py @@ -9,6 +9,7 @@ from tqdm import tqdm from poprox_recommender.data.mind import TEST_REC_COUNT, MindData +from poprox_recommender.lkpipeline import PipelineState from poprox_recommender.logging_config import setup_logging from poprox_recommender.pipeline import RecommendationPipeline @@ -25,7 +26,6 @@ from poprox_recommender.default import personalized_pipeline from poprox_recommender.evaluation.metrics import rank_biased_overlap from poprox_recommender.paths import model_file_path, project_root -from poprox_recommender.pipeline import PipelineState logger = logging.getLogger("poprox_recommender.test_offline") @@ -45,15 +45,12 @@ def custom_encoder(obj): return str(obj) -def recsys_metric( - mind_data: MindData, - request: RecommendationRequest, - pipeline_state: PipelineState, -): +def recsys_metric(mind_data: MindData, request: RecommendationRequest, state: PipelineState): # recommendations {account id (uuid): LIST[Article]} # use the url of Article + final = state["recommender"] - recs = pd.DataFrame({"item": [a.article_id for a in pipeline_state.recs]}) + recs = pd.DataFrame({"item": [a.article_id for a in final.articles]}) truth = mind_data.user_truth(request.interest_profile.profile_id) # RR should look for *clicked* articles, not just all impression articles @@ -61,11 +58,10 @@ def recsys_metric( single_ndcg5 = topn.ndcg(recs, truth, k=5) single_ndcg10 = topn.ndcg(recs, truth, k=10) - ranked = pipeline_state.elements.get("ranked", None) - reranked = pipeline_state.elements.get("reranked", None) - if ranked and reranked: - single_rbo5 = rank_biased_overlap(ranked, reranked, k=5) - single_rbo10 = rank_biased_overlap(ranked, reranked, k=10) + if "ranker" in state: + topk = state["ranker"] + single_rbo5 = rank_biased_overlap(topk, final, k=5) + single_rbo10 = rank_biased_overlap(topk, final, k=10) else: single_rbo5 = None single_rbo10 = None @@ -115,7 +111,7 @@ def recsys_metric( "clicked": ArticleSet(articles=request.past_articles), "profile": request.interest_profile, } - outputs = pipeline(inputs) + state = pipeline.run_all(**inputs) if request.interest_profile.click_history.article_ids: personalized = 1 else: @@ -125,7 +121,7 @@ def recsys_metric( raise e logger.debug("measuring for user %s", request.interest_profile.profile_id) - single_ndcg5, single_ndcg10, single_rr, single_rbo5, single_rbo10 = recsys_metric(mind_data, request, outputs) + single_ndcg5, single_ndcg10, single_rr, single_rbo5, single_rbo10 = recsys_metric(mind_data, request, state) user_csv.writerow( [ request.interest_profile.profile_id, diff --git a/src/poprox_recommender/handler.py b/src/poprox_recommender/handler.py index 72a1c380..b80a7a8f 100644 --- a/src/poprox_recommender/handler.py +++ b/src/poprox_recommender/handler.py @@ -2,10 +2,7 @@ import logging from poprox_concepts import ArticleSet -from poprox_concepts.api.recommendations import ( - RecommendationRequest, - RecommendationResponse, -) +from poprox_concepts.api.recommendations import RecommendationRequest, RecommendationResponse from poprox_recommender.default import select_articles from poprox_recommender.topics import user_topic_preference @@ -62,7 +59,7 @@ def generate_recs(event, context): logger.info("Constructing response...") resp_body = RecommendationResponse.model_validate( { - "recommendations": {profile.profile_id: outputs.recs}, + "recommendations": {profile.profile_id: outputs.default.articles}, } ) diff --git a/src/poprox_recommender/lkpipeline/__init__.py b/src/poprox_recommender/lkpipeline/__init__.py new file mode 100644 index 00000000..9405d0c2 --- /dev/null +++ b/src/poprox_recommender/lkpipeline/__init__.py @@ -0,0 +1,504 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University +# Copyright (C) 2023-2024 Drexel University +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +""" +A vendored copy of LensKit's pipeline abstraction, without trainability support. +""" + +# pyright: strict +from __future__ import annotations + +import logging +from types import FunctionType +from typing import Literal, cast +from uuid import uuid4 + +from typing_extensions import Any, LiteralString, TypeVar, overload + +from .components import Component, ConfigurableComponent +from .nodes import ND, ComponentNode, FallbackNode, InputNode, LiteralNode, Node +from .state import PipelineState + +__all__ = [ + "Pipeline", + "Node", + "Component", + "ConfigurableComponent", +] + +_log = logging.getLogger(__name__) + +# common type var for quick use +T = TypeVar("T") +T1 = TypeVar("T1") +T2 = TypeVar("T2") +T3 = TypeVar("T3") +T4 = TypeVar("T4") +T5 = TypeVar("T5") + + +class Pipeline: + """ + LensKit recommendation pipeline. This is the core abstraction for using + LensKit models and other components to produce recommendations in a useful + way. It allows you to wire together components in (mostly) abitrary graphs, + train them on data, and serialize pipelines to disk for use elsewhere. + + If you have a scoring model and just want to generate recommenations with a + default setup and minimal configuration, see :func:`topn_pipeline`. + """ + + _nodes: dict[str, Node[Any]] + _aliases: dict[str, Node[Any]] + _defaults: dict[str, Node[Any] | Any] + _components: dict[str, Component[Any]] + + def __init__(self): + self._nodes = {} + self._aliases = {} + self._defaults = {} + self._components = {} + self._clear_caches() + + @property + def nodes(self) -> list[Node[object]]: + """ + Get the nodes in the pipeline graph. + """ + return list(self._nodes.values()) + + @overload + def node(self, node: str, *, missing: Literal["error"] = "error") -> Node[object]: ... + @overload + def node(self, node: str, *, missing: Literal["none"] | None) -> Node[object] | None: ... + @overload + def node(self, node: Node[T]) -> Node[T]: ... + def node(self, node: str | Node[Any], *, missing: Literal["error", "none"] | None = "error") -> Node[object] | None: + """ + Get the pipeline node with the specified name. If passed a node, it + returns the node or fails if the node is not a member of the pipeline. + + Args: + node: + The name of the pipeline node to look up, or a node to check for + membership. + + Returns: + The pipeline node, if it exists. + + Raises: + KeyError: + The specified node does not exist. + """ + if isinstance(node, Node): + self._check_member_node(node) + return node + elif node in self._aliases: + return self._aliases[node] + elif node in self._nodes: + return self._nodes[node] + elif missing == "none" or missing is None: + return None + else: + raise KeyError(f"node {node}") + + def create_input(self, name: str, *types: type[T] | None) -> Node[T]: + """ + Create an input node for the pipeline. Pipelines expect their inputs to + be provided when they are run. + + Args: + name: + The name of the input. The name must be unique in the pipeline + (among both components and inputs). + types: + The allowable types of the input; input data can be of any + specified type. If ``None`` is among the allowed types, the + input can be omitted. + + Returns: + A pipeline node representing this input. + + Raises: + ValueError: + a node with the specified ``name`` already exists. + """ + self._check_available_name(name) + + node = InputNode[Any](name, types=set((t if t is not None else type[None]) for t in types)) + self._nodes[name] = node + self._clear_caches() + return node + + def literal(self, value: T) -> LiteralNode[T]: + name = str(uuid4()) + node = LiteralNode(name, value, types=set([type(value)])) + self._nodes[name] = node + return node + + def set_default(self, name: LiteralString, node: Node[Any] | object) -> None: + """ + Set the default wiring for a component input. Components that declare + an input parameter with the specified ``name`` but no configured input + will be wired to this node. + + This is intended to be used for things like wiring up `user` parameters + to semi-automatically receive the target user's identity and history. + + Args: + name: + The name of the parameter to set a default for. + node: + The node or literal value to wire to this parameter. + """ + if not isinstance(node, Node): + node = self.literal(node) + self._defaults[name] = node + self._clear_caches() + + def get_default(self, name: str) -> Node[Any] | None: + """ + Get the default wiring for an input name. + """ + return self._defaults.get(name, None) + + def alias(self, alias: str, node: Node[Any] | str) -> None: + """ + Create an alias for a node. After aliasing, the node can be retrieved + from :meth:`node` using either its original name or its alias. + + Args: + alias: + The alias to add to the node. + node: + The node (or node name) to alias. + + Raises: + ValueError: + if the alias is already used as an alias or node name. + """ + node = self.node(node) + self._check_available_name(alias) + self._aliases[alias] = node + self._clear_caches() + + def add_component(self, name: str, obj: Component[ND], **inputs: Node[Any] | object) -> Node[ND]: + """ + Add a component and connect it into the graph. + + Args: + name: + The name of the component in the pipeline. The name must be + unique in the pipeline (among both components and inputs). + obj: + The component itself. + inputs: + The component's input wiring. See :ref:`pipeline-connections` + for details. + + Returns: + The node representing this component in the pipeline. + """ + self._check_available_name(name) + + node = ComponentNode(name, obj) + self._nodes[name] = node + self._components[name] = obj + + self.connect(node, **inputs) + + self._clear_caches() + return node + + def replace_component( + self, + name: str | Node[ND], + obj: Component[ND], + **inputs: Node[Any] | object, + ) -> Node[ND]: + """ + Replace a component in the graph. The new component must have a type + that is compatible with the old component. The old component's input + connections will be replaced (as the new component may have different + inputs), but any connections that use the old component to supply an + input will use the new component instead. + """ + if isinstance(name, Node): + name = name.name + + node = ComponentNode(name, obj) + self._nodes[name] = node + self._components[name] = obj + + self.connect(node, **inputs) + + self._clear_caches() + return node + + def use_first_of(self, name: str, *nodes: Node[T | None]) -> Node[T]: + """ + Create a new node whose value is the first defined (not ``None``) value + of the specified nodes. If a node is an input node and its value is not + supplied, it is treated as ``None`` in this case instead of failing the + run. This method is used for things like filling in optional pipeline + inputs. For example, if you want the pipeline to take candidate items + through an ``items`` input, but look them up from the user's history and + the training data if ``items`` is not supplied, you would do: + + .. code:: python + + pipe = Pipeline() + # allow candidate items to be optionally specified + items = pipe.create_input('items', list[EntityId], None) + # find candidates from the training data (optional) + lookup_candidates = pipe.add_component( + 'select-candidates', + UnratedTrainingItemsCandidateSelector(), + user=history, + ) + # if the client provided items as a pipeline input, use those; otherwise + # use the candidate selector we just configured. + candidates = pipe.use_first_of('candidates', items, lookup_candidates) + + .. note:: + + This method does not distinguish between an input being unspecified and + explicitly specified as ``None``. + + .. note:: + + This method does *not* implement item-level fallbacks, only + fallbacks at the level of entire results. That is, you can use it + to use component A as a fallback for B if B returns ``None``, but it + will not use B to fill in missing scores for individual items that A + did not score. A specific itemwise fallback component is needed for + such an operation. + + .. note:: + If one of the fallback elements is a component ``A`` that depends on + another component or input ``B``, and ``B`` is missing or returns + ``None`` such that ``A`` would usually fail, then ``A`` will be + skipped and the fallback will move on to the next node. This works + with arbitrarily-deep transitive chains. + + Args: + name: + The name of the node. + nodes: + The nodes to try, in order, to satisfy this node. + """ + node = FallbackNode(name, list(nodes)) + self._nodes[name] = node + self._clear_caches() + return node + + def connect(self, obj: str | Node[Any], **inputs: Node[Any] | str | object): + """ + Provide additional input connections for a component that has already + been added. See :ref:`pipeline-connections` for details. + + Args: + obj: + The name or node of the component to wire. + inputs: + The component's input wiring. For each keyword argument in the + component's function signature, that argument can be provided + here with an input that the pipeline will provide to that + argument of the component when the pipeline is run. + """ + if isinstance(obj, Node): + node = obj + else: + node = self.node(obj) + if not isinstance(node, ComponentNode): + raise TypeError(f"only component nodes can be wired, not {node}") + + for k, n in inputs.items(): + if isinstance(n, Node): + n = cast(Node[Any], n) + self._check_member_node(n) + node.connections[k] = n.name + else: + lit = self.literal(n) + node.connections[k] = lit.name + + self._clear_caches() + + def component_configs(self) -> dict[str, dict[str, Any]]: + """ + Get the configurations for the components. This is the configurations + only, it does not include pipeline inputs or wiring. + """ + return { + name: comp.get_config() + for (name, comp) in self._components.items() + if isinstance(comp, ConfigurableComponent) + } + + def clone(self, *, params: bool = False) -> Pipeline: + """ + Clone the pipeline, optionally including trained parameters. + + Args: + params: + Pass ``True`` to clone parameters as well as the configuration + and wiring. + + Returns: + A new pipeline with the same components and wiring, but fresh + instances created by round-tripping the configuration. + """ + if params: # pragma: nocover + raise NotImplementedError() + + clone = Pipeline() + for node in self.nodes: + match node: + case InputNode(name, types=types): + if types is None: + types = set[type]() + clone.create_input(name, *types) + case LiteralNode(name, value): + clone._nodes[name] = LiteralNode(name, value) + case FallbackNode(name, alts): + clone.use_first_of(name, *alts) + case ComponentNode(name, comp, _inputs, wiring): + if isinstance(comp, FunctionType): + comp = comp + elif isinstance(comp, ConfigurableComponent): + comp = comp.__class__.from_config(comp.get_config()) + else: + comp = comp.__class__() + cn = clone.add_component(node.name, comp) # type: ignore + for wn, wt in wiring.items(): + clone.connect(cn, **{wn: clone.node(wt)}) + case _: # pragma: nocover + raise RuntimeError(f"invalid node {node}") + + return clone + + @overload + def run(self, /, **kwargs: object) -> object: ... + @overload + def run(self, node: str, /, **kwargs: object) -> object: ... + @overload + def run(self, n1: str, n2: str, /, *nrest: str, **kwargs: object) -> tuple[object]: ... + @overload + def run(self, node: Node[T], /, **kwargs: object) -> T: ... + @overload + def run(self, n1: Node[T1], n2: Node[T2], /, **kwargs: object) -> tuple[T1, T2]: ... + @overload + def run(self, n1: Node[T1], n2: Node[T2], n3: Node[T3], /, **kwargs: object) -> tuple[T1, T2, T3]: ... + @overload + def run( + self, n1: Node[T1], n2: Node[T2], n3: Node[T3], n4: Node[T4], /, **kwargs: object + ) -> tuple[T1, T2, T3, T4]: ... + @overload + def run( + self, + n1: Node[T1], + n2: Node[T2], + n3: Node[T3], + n4: Node[T4], + n5: Node[T5], + /, + **kwargs: object, + ) -> tuple[T1, T2, T3, T4, T5]: ... + def run(self, *nodes: str | Node[Any], **kwargs: object) -> object: + """ + Run the pipeline and obtain the return value(s) of one or more of its + components. See :ref:`pipeline-execution` for details of the pipeline + execution model. + + .. todo:: + Add cycle detection. + + Args: + nodes: + The component(s) to run. + kwargs: + The pipeline's inputs, as defined with :meth:`create_input`. + + Returns: + The pipeline result. If zero or one nodes are specified, the result + is returned as-is. If multiple nodes are specified, their results + are returned in a tuple. + + Raises: + ValueError: + when one or more required inputs are missing. + TypeError: + when one or more required inputs has an incompatible type. + other: + exceptions thrown by components are passed through. + """ + if not nodes: + nodes = (self._last_node(),) + state = self.run_all(*nodes, **kwargs) + results = [state[self.node(n).name] for n in nodes] + + if len(results) > 1: + return tuple(results) + else: + return results[0] + + def run_all(self, *nodes: str | Node[Any], **kwargs: object) -> PipelineState: + """ + Run all nodes in the pipeline, or all nodes required to fulfill the + requested node, and return a mapping with the full pipeline state (the + data attached to each node). This is useful in cases where client code + needs to be able to inspect the data at arbitrary steps of the pipeline. + It differs from :meth:`run` in two ways: + + 1. It returns the data from all nodes as a mapping (dictionary-like + object), not just the specified nodes as a tuple. + 2. If no nodes are specified, it runs *all* nodes instead of only the + last node. This has the consequence of running nodes that are not + required to fulfill the last node (such scenarios typically result + from using :meth:`use_first_of`). + + Args: + nodes: + The nodes to run, as positional arguments (if no nodes are + specified, this method runs all nodes). + kwargs: + The inputs. + + Returns: + The full pipeline state, with :attr:`~PipelineState.default` set to + the last node specified (either the last node in `nodes`, or the + last node added to the pipeline). + """ + from .runner import PipelineRunner + + runner = PipelineRunner(self, kwargs) + node_list = [self.node(n) for n in nodes] + if not node_list: + node_list = self.nodes + + last = None + for node in node_list: + runner.run(node) + last = node.name + + return PipelineState(runner.state, {a: t.name for (a, t) in self._aliases.items()}, last) + + def _last_node(self) -> Node[object]: + if not self._nodes: + raise RuntimeError("pipeline is empty") + return list(self._nodes.values())[-1] + + def _check_available_name(self, name: str) -> None: + if name in self._nodes or name in self._aliases: + raise ValueError(f"pipeline already has node {name}") + + def _check_member_node(self, node: Node[Any]) -> None: + nw = self._nodes.get(node.name) + if nw is not node: + raise RuntimeError(f"node {node} not in pipeline") + + def _clear_caches(self): + pass diff --git a/src/poprox_recommender/lkpipeline/components.py b/src/poprox_recommender/lkpipeline/components.py new file mode 100644 index 00000000..e525640f --- /dev/null +++ b/src/poprox_recommender/lkpipeline/components.py @@ -0,0 +1,131 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University +# Copyright (C) 2023-2024 Drexel University +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +"Definition of the component interfaces." + +# pyright: strict +from __future__ import annotations + +import inspect +from typing import Callable, ClassVar, TypeAlias + +from typing_extensions import Any, Protocol, Self, TypeVar, override, runtime_checkable + +# COut is only return, so Component[U] can be assigned to Component[T] if U ≼ T. +COut = TypeVar("COut", covariant=True) +Component: TypeAlias = Callable[..., COut] + + +@runtime_checkable +class ConfigurableComponent(Protocol): # pragma: nocover + """ + Interface for configurable pipeline components (those that have + hyperparameters). A configurable component supports two additional + operations: + + * saving its configuration with :meth:`get_config`. + * creating a new instance from a saved configuration with the class method + :meth:`from_config`. + + A component must implement both of these methods to be considered + configurable. For most common cases, extending the :class:`AutoConfig` + class is sufficient to provide working implementations of these methods. + + If a component is *not* configurable, then it should either be a function or + a class that can be constructed with no arguments. + + .. note:: + + Configuration data should be JSON-compatible (strings, numbers, etc.). + + .. note:: + + Implementations must also implement ``__call__``. + """ + + @classmethod + def from_config(cls, cfg: dict[str, Any]) -> Self: + """ + Reinstantiate this component from configuration values. + """ + raise NotImplementedError() + + def get_config(self) -> dict[str, object]: + """ + Get this component's configured hyperparameters. + """ + raise NotImplementedError() + + +class AutoConfig(ConfigurableComponent): + """ + Mixin class providing automatic configuration support based on constructor + arguments. + + This method provides implementations of :meth:`get_config` and + :meth:`from_config` that inspect the constructor arguments and instance + variables to automatically provide configuration support. By default, all + constructor parameters will be considered configuration parameters, and + their values will be read from instance variables of the same name. + Subclasses can also define :data:`EXTRA_CONFIG_FIELDS` and + :data:`IGNORED_CONFIG_FIELDS` class variables to modify this behavior. + Missing attributes are silently ignored. + + In the simple case, you can write a class like this and get config for free: + + .. code:: python + + class MyComponent(AutoConfig): + some_param: int + + def __init__(self, some_param: int = 20): + self.some_param = some_param + + For compatibility with pipeline serialization, all configuration data should + be JSON-compatible. + """ + + EXTRA_CONFIG_FIELDS: ClassVar[list[str]] = [] + """ + Names of instance variables that should be included in the configuration + dictionary even though they do not correspond to named constructor + arguments. + + .. note:: + + This is rarely needed, and usually needs to be coupled with ``**kwargs`` + in the constructor to make the resulting objects constructible. + """ + + IGNORED_CONFIG_FIELDS: ClassVar[list[str]] = [] + """ + Names of constructor parameters that should be excluded from the + configuration dictionary. + """ + + @override + def get_config(self) -> dict[str, object]: + """ + Get the configuration by inspecting the constructor and instance + variables. + """ + sig = inspect.signature(self.__class__) + names = list(sig.parameters.keys()) + self.EXTRA_CONFIG_FIELDS + params: dict[str, Any] = {} + for name in names: + if name not in self.IGNORED_CONFIG_FIELDS and hasattr(self, name): + params[name] = getattr(self, name) + + return params + + @override + @classmethod + def from_config(cls, cfg: dict[str, Any]) -> Self: + """ + Create a class from the specified construction. Configuration elements + are passed to the constructor as keywrod arguments. + """ + return cls(**cfg) diff --git a/src/poprox_recommender/lkpipeline/nodes.py b/src/poprox_recommender/lkpipeline/nodes.py new file mode 100644 index 00000000..55f92b15 --- /dev/null +++ b/src/poprox_recommender/lkpipeline/nodes.py @@ -0,0 +1,99 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University +# Copyright (C) 2023-2024 Drexel University +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +# pyright: strict + +import warnings +from inspect import Signature, signature + +from typing_extensions import Generic, TypeVar + +from .components import Component +from .types import TypecheckWarning + +# Nodes are (conceptually) immutable data containers, so Node[U] can be assigned +# to Node[T] if U ≼ T. +ND = TypeVar("ND", covariant=True) + + +class Node(Generic[ND]): + """ + Representation of a single node in a :class:`Pipeline`. + """ + + __match_args__ = ("name",) + + name: str + "The name of this node." + types: set[type] | None + "The set of valid data types of this node, or None for no typechecking." + + def __init__(self, name: str, *, types: set[type] | None = None): + self.name = name + self.types = types + + def __str__(self) -> str: + return f"<{self.__class__.__name__} {self.name}>" + + +class InputNode(Node[ND], Generic[ND]): + """ + An input node. + """ + + +class FallbackNode(Node[ND], Generic[ND]): + """ + Node for trying several nodes in turn. + """ + + __match_args__ = ("name", "alternatives") + + alternatives: list[Node[ND | None]] + "The nodes that can possibly fulfil this node." + + def __init__(self, name: str, alternatives: list[Node[ND | None]]): + super().__init__(name) + self.alternatives = alternatives + + +class LiteralNode(Node[ND], Generic[ND]): + __match_args__ = ("name", "value") + value: ND + "The value associated with this node" + + def __init__(self, name: str, value: ND, *, types: set[type] | None = None): + super().__init__(name, types=types) + self.value = value + + +class ComponentNode(Node[ND], Generic[ND]): + __match_args__ = ("name", "component", "inputs", "connections") + + component: Component[ND] + "The component associated with this node" + + inputs: dict[str, type | None] + "The component's inputs." + + connections: dict[str, str] + "The component's input connections." + + def __init__(self, name: str, component: Component[ND]): + super().__init__(name) + self.component = component + self.connections = {} + + sig = signature(component) + if sig.return_annotation == Signature.empty: + warnings.warn(f"component {component} has no return type annotation", TypecheckWarning, 2) + else: + self.types = set([sig.return_annotation]) + + self.inputs = { + param.name: None if param.annotation == Signature.empty else param.annotation + for param in sig.parameters.values() + } diff --git a/src/poprox_recommender/lkpipeline/runner.py b/src/poprox_recommender/lkpipeline/runner.py new file mode 100644 index 00000000..f838c458 --- /dev/null +++ b/src/poprox_recommender/lkpipeline/runner.py @@ -0,0 +1,144 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University +# Copyright (C) 2023-2024 Drexel University +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +""" +Pipeline runner logic. +""" + +# pyright: strict +import logging +from typing import Any, Literal, TypeAlias + +from . import Pipeline +from .components import Component +from .nodes import ComponentNode, FallbackNode, InputNode, LiteralNode, Node +from .types import is_compatible_data + +_log = logging.getLogger(__name__) +State: TypeAlias = Literal["pending", "in-progress", "finished", "failed"] + + +class PipelineRunner: + """ + Node status and results for a single pipeline run. + + This class operates recursively; pipelines should never be so deep that + recursion fails. + """ + + pipe: Pipeline + inputs: dict[str, Any] + status: dict[str, State] + state: dict[str, Any] + + def __init__(self, pipe: Pipeline, inputs: dict[str, Any]): + self.pipe = pipe + self.inputs = inputs + self.status = {n.name: "pending" for n in pipe.nodes} + self.state = {} + + def run(self, node: Node[Any], *, required: bool = True) -> Any: + """ + Run the pipleline to obtain the results of a node. + """ + status = self.status[node.name] + if status == "finished": + return self.state[node.name] + elif status == "in-progress": + raise RuntimeError(f"pipeline cycle encountered at {node}") + elif status == "failed": # pragma: nocover + raise RuntimeError(f"{node} previously failed") + + _log.debug("processing node %s", node) + self.status[node.name] = "in-progress" + try: + self._run_node(node, required) + self.status[node.name] = "finished" + except Exception as e: + _log.error("node %s failed with error %s", node, e) + self.status[node.name] = "failed" + raise e + + try: + return self.state[node.name] + except KeyError as e: + if required: + raise e + else: + return None + + def _run_node(self, node: Node[Any], required: bool) -> None: + match node: + case LiteralNode(name, value): + self.state[name] = value + case InputNode(name, types=types): + self._inject_input(name, types, required) + case ComponentNode(name, comp, inputs, wiring): + self._run_component(name, comp, inputs, wiring, required) + case FallbackNode(name, alts): + self._run_fallback(name, alts) + case _: # pragma: nocover + raise RuntimeError(f"invalid node {node}") + + def _inject_input(self, name: str, types: set[type] | None, required: bool) -> None: + val = self.inputs.get(name, None) + if val is None and required and types and not is_compatible_data(None, *types): + raise RuntimeError(f"input {name} not specified") + + if val is not None and types and not is_compatible_data(val, *types): + raise TypeError(f"invalid data for input {name} (expected {types}, got {type(val)})") + + self.state[name] = val + + def _run_component( + self, + name: str, + comp: Component[Any], + inputs: dict[str, type | None], + wiring: dict[str, str], + required: bool, + ) -> None: + in_data = {} + _log.debug("processing inputs for component %s", name) + for iname, itype in inputs.items(): + src = wiring.get(iname, None) + if src is not None: + snode = self.pipe.node(src) + else: + snode = self.pipe.get_default(iname) + + if snode is None: + ival = None + else: + if required and itype: + ireq = not is_compatible_data(None, itype) + else: + ireq = False + ival = self.run(snode, required=ireq) + + # bail out if we're trying to satisfy a non-required dependency + if ival is None and itype and not is_compatible_data(None, itype) and not required: + return None + + if itype and not is_compatible_data(ival, itype): + raise TypeError( + f"input {iname} for component {name}" f" has invalid type {type(ival)} (expected {itype})" + ) + + in_data[iname] = ival + + _log.debug("running component %s", name) + self.state[name] = comp(**in_data) + + def _run_fallback(self, name: str, alternatives: list[Node[Any]]) -> None: + for alt in alternatives: + val = self.run(alt, required=False) + if val is not None: + self.state[name] = val + return + + # got this far, no alternatives + raise RuntimeError(f"no alternative for {name} returned data") diff --git a/src/poprox_recommender/lkpipeline/state.py b/src/poprox_recommender/lkpipeline/state.py new file mode 100644 index 00000000..0cf383fe --- /dev/null +++ b/src/poprox_recommender/lkpipeline/state.py @@ -0,0 +1,86 @@ +# pyright: strict +from collections.abc import Mapping +from typing import Any, Iterator + + +class PipelineState(Mapping[str, Any]): + """ + Full results of running a pipeline. A pipeline state is a dictionary + mapping node names to their results; it is implemented as a separate class + instead of directly using a dictionary to allow data to be looked up by node + aliases in addition to original node names (and to be read-only). + + Client code will generally not construct this class directly. + + Args: + state: + The pipeline state to wrap. The state object stores a reference to + this dictionary. + aliases: + Dictionary of node aliases. + default: + The name of the default node (whose data should be returned by + :attr:`default` ). + """ + + _state: dict[str, Any] + _aliases: dict[str, str] + _default: str | None = None + + def __init__( + self, state: dict[str, Any] | None = None, aliases: dict[str, str] | None = None, default: str | None = None + ) -> None: + self._state = state if state is not None else {} + self._aliases = aliases if aliases is not None else {} + self._default = default + if default is not None and default not in self: + raise ValueError("default node is not in state or aliases") + + @property + def default(self) -> Any: + """ + Return the data from of the default node (typically the last node run). + + Returns: + The data associated with the default node. + + Raises: + ValueError: if there is no specified default node. + """ + if self._default is not None: + return self[self._default] + else: + raise ValueError("pipeline state has no default value") + + @property + def default_node(self) -> str | None: + "Return the name of the default node (typically the last node run)." + return self._default + + def __len__(self): + return len(self._state) + + def __contains__(self, key: object) -> bool: + if key in self._state: + return True + if key in self._aliases: + return self._aliases[key] in self + else: + return False + + def __getitem__(self, key: str) -> Any: + if key in self._state: + return self._state[key] + elif key in self._aliases: + return self[self._aliases[key]] + else: + raise KeyError(f"pipeline node <{key}>") + + def __iter__(self) -> Iterator[str]: + return iter(self._state) + + def __str__(self) -> str: + return f"" + + def __repr__(self) -> str: + return f"" diff --git a/src/poprox_recommender/lkpipeline/types.py b/src/poprox_recommender/lkpipeline/types.py new file mode 100644 index 00000000..00c91c4b --- /dev/null +++ b/src/poprox_recommender/lkpipeline/types.py @@ -0,0 +1,120 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University +# Copyright (C) 2023-2024 Drexel University +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +# pyright: basic +from __future__ import annotations + +import warnings +from types import GenericAlias +from typing import Union, _GenericAlias, get_args, get_origin # type: ignore + +import numpy as np + + +class TypecheckWarning(UserWarning): + "Warnings about type-checking logic." + + pass + + +def is_compatible_type(typ: type, *targets: type) -> bool: + """ + Make a best-effort check whether a type is compatible with at least one + target type. This function is limited by limitations of the Python type + system and the effort required to (re-)write a full type checker. It is + written to be over-accepting instead of over-restrictive, so it can be used + to reject clearly incompatible types without rejecting combinations it + cannot properly check. + + Args: + typ: + The type to check. + targets: + One or more target types to check against. + + Returns: + ``False`` if it is clear that the specified type is incompatible with + all of the targets, and ``True`` otherwise. + """ + for target in targets: + # try a straight subclass check first, but gracefully handle incompatible types + try: + if issubclass(typ, target): + return True + except TypeError: + pass + + if isinstance(target, (GenericAlias, _GenericAlias)): + tcls = get_origin(target) + # if we're matching a raw type against a generic, just check the origin + if isinstance(typ, GenericAlias): + warnings.warn(f"cannot type-check generic type {typ}", TypecheckWarning) + cls = get_origin(typ) + if issubclass(cls, tcls): # type: ignore + return True + elif isinstance(typ, type): + print(typ, type(typ)) + if issubclass(typ, tcls): # type: ignore + return True + elif typ == int and issubclass(target, (float, complex)): # noqa: E721 + return True + elif typ == float and issubclass(target, complex): # noqa: E721 + return True + + return False + + +def is_compatible_data(obj: object, *targets: type) -> bool: + """ + Make a best-effort check whether a type is compatible with at least one + target type. This function is limited by limitations of the Python type + system and the effort required to (re-)write a full type checker. It is + written to be over-accepting instead of over-restrictive, so it can be used + to reject clearly incompatible types without rejecting combinations it + cannot properly check. + + Args: + typ: + The type to check. + targets: + One or more target types to check against. + + Returns: + ``False`` if it is clear that the specified type is incompatible with + all of the targets, and ``True`` otherwise. + """ + for target in targets: + # try a straight subclass check first, but gracefully handle incompatible types + try: + if isinstance(obj, target): + return True + except TypeError: + pass + + if get_origin(target) == Union: + types = get_args(target) + if is_compatible_data(obj, *types): + return True + elif isinstance(target, (GenericAlias, _GenericAlias)): + tcls = get_origin(target) + if isinstance(obj, np.ndarray) and tcls == np.ndarray: + # check for type compatibility + _sz, dtw = get_args(target) + (dt,) = get_args(dtw) + if issubclass(obj.dtype.type, dt): + return True + elif isinstance(tcls, type) and isinstance(obj, tcls): + warnings.warn( + f"cannot type-check object of type {type(obj)} against generic", + TypecheckWarning, + ) + return True + elif isinstance(obj, int) and issubclass(target, (float, complex)): # noqa: E721 + return True + elif isinstance(obj, float) and issubclass(target, complex): # noqa: E721 + return True + + return False diff --git a/tests/components/test_topic_calibration.py b/tests/components/test_topic_calibration.py index 550a1d65..716789ff 100644 --- a/tests/components/test_topic_calibration.py +++ b/tests/components/test_topic_calibration.py @@ -34,11 +34,13 @@ def test_request_with_topic_calibrator(): ) # do we get recommendations? - assert len(topic_calibrated_outputs.recs) > 0 - assert len(base_outputs.recs) == len(topic_calibrated_outputs.recs) + tco_recs = topic_calibrated_outputs.default.articles + bo_recs = base_outputs.default.articles + assert len(tco_recs) > 0 + assert len(bo_recs) == len(tco_recs) - base_article_ids = [article.article_id for article in base_outputs.recs] - calibrated_article_ids = [article.article_id for article in topic_calibrated_outputs.recs] + base_article_ids = [article.article_id for article in bo_recs] + calibrated_article_ids = [article.article_id for article in tco_recs] # are the recommendation lists different? assert base_article_ids != calibrated_article_ids diff --git a/tests/integration/test_smoke.py b/tests/integration/test_smoke.py index dd437906..394a9204 100644 --- a/tests/integration/test_smoke.py +++ b/tests/integration/test_smoke.py @@ -25,7 +25,7 @@ def test_direct_basic_request(): req.num_recs, ) # do we get recommendations? - assert len(outputs.recs) > 0 + assert len(outputs.default.articles) > 0 def test_direct_basic_request_without_clicks(): @@ -44,4 +44,4 @@ def test_direct_basic_request_without_clicks(): req.num_recs, ) # do we get recommendations? - assert len(outputs.recs) > 0 + assert len(outputs.default.articles) > 0 diff --git a/tests/lkpipeline/test_pipeline.py b/tests/lkpipeline/test_pipeline.py new file mode 100644 index 00000000..b782cb55 --- /dev/null +++ b/tests/lkpipeline/test_pipeline.py @@ -0,0 +1,573 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University +# Copyright (C) 2023-2024 Drexel University +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +# pyright: strict +from uuid import UUID + +from pytest import fail, raises +from typing_extensions import assert_type + +from poprox_recommender.lkpipeline import InputNode, Node, Pipeline + + +def test_init_empty(): + pipe = Pipeline() + assert len(pipe.nodes) == 0 + + +def test_create_input(): + "create an input node" + pipe = Pipeline() + src = pipe.create_input("user", int, str) + assert_type(src, Node[int | str]) + assert isinstance(src, InputNode) + assert src.name == "user" + assert src.types == set([int, str]) + + assert len(pipe.nodes) == 1 + assert pipe.node("user") is src + + +def test_lookup_optional(): + "lookup a node without failing" + pipe = Pipeline() + pipe.create_input("user", int, str) + + assert pipe.node("item", missing="none") is None + + +def test_lookup_missing(): + "lookup a node without failing" + pipe = Pipeline() + pipe.create_input("user", int, str) + + with raises(KeyError): + pipe.node("item") + + +def test_dup_input_fails(): + "create an input node" + pipe = Pipeline() + pipe.create_input("user", int, str) + + with raises(ValueError, match="has node"): + pipe.create_input("user", UUID) + + +def test_dup_component_fails(): + "create an input node" + pipe = Pipeline() + pipe.create_input("user", int, str) + + with raises(ValueError, match="has node"): + pipe.add_component("user", lambda x: x) # type: ignore + + +def test_dup_alias_fails(): + "create an input node" + pipe = Pipeline() + n = pipe.create_input("user", int, str) + + with raises(ValueError, match="has node"): + pipe.alias("user", n) # type: ignore + + +def test_alias(): + "alias a node" + pipe = Pipeline() + user = pipe.create_input("user", int, str) + + pipe.alias("person", user) + + assert pipe.node("person") is user + + # aliases conflict as well + with raises(ValueError): + pipe.create_input("person", bytes) + + +def test_component_type(): + pipe = Pipeline() + msg = pipe.create_input("msg", str) + + def incr(msg: str) -> str: + return msg + + node = pipe.add_component("return", incr, msg=msg) + assert node.name == "return" + assert node.types == set([str]) + + +def test_single_input(): + pipe = Pipeline() + msg = pipe.create_input("msg", str) + + def incr(msg: str) -> str: + return msg + + node = pipe.add_component("return", incr, msg=msg) + + ret = pipe.run(node, msg="hello") + assert ret == "hello" + + ret = pipe.run(node, msg="world") + assert ret == "world" + + +def test_single_input_required(): + pipe = Pipeline() + msg = pipe.create_input("msg", str) + + def incr(msg: str) -> str: + return msg + + node = pipe.add_component("return", incr, msg=msg) + + with raises(RuntimeError, match="not specified"): + pipe.run(node) + + +def test_single_optional_input(): + pipe = Pipeline() + msg = pipe.create_input("msg", str, None) + + def fill(msg: str | None) -> str: + return msg if msg is not None else "undefined" + + node = pipe.add_component("return", fill, msg=msg) + + assert pipe.run(node) == "undefined" + assert pipe.run(node, msg="hello") == "hello" + + +def test_single_input_typecheck(): + pipe = Pipeline() + msg = pipe.create_input("msg", str) + + def incr(msg: str) -> str: + return msg + + node = pipe.add_component("return", incr, msg=msg) + + with raises(TypeError): + pipe.run(node, msg=47) + + +def test_component_type_mismatch(): + pipe = Pipeline() + + def incr(msg: str) -> str: + return msg + + node = pipe.add_component("return", incr, msg=47) + with raises(TypeError): + pipe.run(node) + + +def test_component_unwired_input(): + pipe = Pipeline() + msg = pipe.create_input("msg", str) + + def ident(msg: str, m2: str | None) -> str: + if m2: + return msg + m2 + else: + return msg + + node = pipe.add_component("return", ident, msg=msg) + assert pipe.run(node, msg="hello") == "hello" + + +def test_chain(): + pipe = Pipeline() + x = pipe.create_input("x", int) + + def incr(x: int) -> int: + return x + 1 + + def triple(x: int) -> int: + return x * 3 + + ni = pipe.add_component("incr", incr, x=x) + nt = pipe.add_component("triple", triple, x=ni) + + # run default pipe + ret = pipe.run(x=1) + assert ret == 6 + + # run explicitly + assert pipe.run(nt, x=2) == 9 + + # run only first node + assert pipe.run(ni, x=10) == 11 + + +def test_simple_graph(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + na = pipe.add_component("add", add, x=nd, y=b) + + assert pipe.run(a=1, b=7) == 9 + assert pipe.run(na, a=3, b=7) == 13 + assert pipe.run(nd, a=3, b=7) == 6 + + +def test_cycle(): + pipe = Pipeline() + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double) + na = pipe.add_component("add", add, x=nd, y=b) + pipe.connect(nd, x=na) + + with raises(RuntimeError, match="cycle"): + pipe.run(a=1, b=7) + + +def test_replace_component(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def triple(x: int) -> int: + return x * 3 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + na = pipe.add_component("add", add, x=nd, y=b) + + nt = pipe.replace_component("double", triple, x=a) + + # run through the end + assert pipe.run(a=1, b=7) == 10 + assert pipe.run(na, a=3, b=7) == 16 + # run only the first component + assert pipe.run(nt, a=3, b=7) == 9 + + # old node should be missing! + with raises(RuntimeError, match="not in pipeline"): + pipe.run(nd, a=3, b=7) + + +def test_default_wiring(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + pipe.set_default("y", b) + + nd = pipe.add_component("double", double, x=a) + na = pipe.add_component("add", add, x=nd) + + assert pipe.run(a=1, b=7) == 9 + assert pipe.run(na, a=3, b=7) == 13 + + +def test_run_by_name(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + pipe.add_component("add", add, x=nd, y=b) + + assert pipe.run("double", a=1, b=7) == 2 + + +def test_invalid_type(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + pipe.add_component("add", add, x=nd, y=b) + + with raises(TypeError): + pipe.run(a=1, b="seven") + + +def test_run_by_alias(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + na = pipe.add_component("add", add, x=nd, y=b) + + pipe.alias("result", na) + + assert pipe.run("result", a=1, b=7) == 9 + + +def test_run_all(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + na = pipe.add_component("add", add, x=nd, y=b) + + pipe.alias("result", na) + + state = pipe.run_all(a=1, b=7) + assert state["double"] == 2 + assert state["add"] == 9 + assert state["result"] == 9 + + +def test_run_all_limit(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + na = pipe.add_component("add", add, x=nd, y=b) + + pipe.alias("result", na) + + state = pipe.run_all("double", a=1, b=7) + assert state["double"] == 2 + assert "add" not in state + assert "result" not in state + + +def test_connect_literal(): + pipe = Pipeline() + a = pipe.create_input("a", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + na = pipe.add_component("add", add, x=nd, y=2) + + assert pipe.run(na, a=3) == 8 + + +def test_connect_literal_explicit(): + pipe = Pipeline() + a = pipe.create_input("a", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + na = pipe.add_component("add", add, x=nd, y=pipe.literal(2)) + + assert pipe.run(na, a=3) == 8 + + +def test_fail_missing_input(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + na = pipe.add_component("add", add, x=nd, y=b) + + with raises(RuntimeError, match=r"input.*not specified"): + pipe.run(na, a=3) + + # missing inputs only matter if they are required + assert pipe.run(nd, a=3) == 6 + + +def test_fallback_input(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def negative(x: int) -> int: + return -x + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + nn = pipe.add_component("negate", negative, x=a) + fb = pipe.use_first_of("fill-operand", b, nn) + na = pipe.add_component("add", add, x=nd, y=fb) + + # 3 * 2 + -3 = 3 + assert pipe.run(na, a=3) == 3 + + +def test_fallback_only_run_if_needed(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def negative(x: int) -> int: + fail("fallback component run when not needed") + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + nn = pipe.add_component("negate", negative, x=a) + fb = pipe.use_first_of("fill-operand", b, nn) + na = pipe.add_component("add", add, x=nd, y=fb) + + assert pipe.run(na, a=3, b=8) == 14 + + +def test_fallback_fail_with_missing_options(): + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def negative(x: int) -> int | None: + return None + + def double(x: int) -> int: + return x * 2 + + def add(x: int, y: int) -> int: + return x + y + + nd = pipe.add_component("double", double, x=a) + nn = pipe.add_component("negate", negative, x=a) + fb = pipe.use_first_of("fill-operand", b, nn) + na = pipe.add_component("add", add, x=nd, y=fb) + + with raises(RuntimeError, match="no alternative"): + pipe.run(na, a=3) + + +def test_fallback_transitive(): + "test that a fallback works if a dependency's dependency fails" + pipe = Pipeline() + ia = pipe.create_input("a", int) + ib = pipe.create_input("b", int) + + def double(x: int) -> int: + return 2 * x + + # two components, each with a different input + c1 = pipe.add_component("double-a", double, x=ia) + c2 = pipe.add_component("double-b", double, x=ib) + # use the first that succeeds + c = pipe.use_first_of("result", c1, c2) + + # omitting the first input should result in the second component + assert pipe.run(c, b=17) == 34 + + +def test_fallback_transitive_deeper(): + "deeper transitive fallback test" + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def negative(x: int) -> int: + return -x + + def double(x: int) -> int: + return x * 2 + + nd = pipe.add_component("double", double, x=a) + nn = pipe.add_component("negate", negative, x=nd) + nr = pipe.use_first_of("fill-operand", nn, b) + + assert pipe.run(nr, b=8) == 8 + + +def test_fallback_transitive_nodefail(): + "deeper transitive fallback test" + pipe = Pipeline() + a = pipe.create_input("a", int) + b = pipe.create_input("b", int) + + def negative(x: int) -> int | None: + # make this return None in some cases to trigger failure + if x >= 0: + return -x + else: + return None + + def double(x: int) -> int: + return x * 2 + + nd = pipe.add_component("double", double, x=a) + nn = pipe.add_component("negate", negative, x=nd) + nr = pipe.use_first_of("fill-operand", nn, b) + + assert pipe.run(nr, a=2, b=8) == -4 + assert pipe.run(nr, a=-7, b=8) == 8 diff --git a/tests/lkpipeline/test_pipeline_config.py b/tests/lkpipeline/test_pipeline_config.py new file mode 100644 index 00000000..7f4f73ae --- /dev/null +++ b/tests/lkpipeline/test_pipeline_config.py @@ -0,0 +1,108 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University +# Copyright (C) 2023-2024 Drexel University +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +import json + +from poprox_recommender.lkpipeline import Pipeline +from poprox_recommender.lkpipeline.components import AutoConfig +from poprox_recommender.lkpipeline.nodes import ComponentNode + + +class Prefixer(AutoConfig): + prefix: str + + def __init__(self, prefix: str = "hello"): + self.prefix = prefix + + def __call__(self, msg: str) -> str: + return self.prefix + msg + + +class Question: + "test component that is not configurable but is a class" + + def __call__(self, msg: str) -> str: + return msg + "?" + + +def exclaim(msg: str) -> str: + return msg + "!" + + +def test_auto_config_roundtrip(): + comp = Prefixer("FOOBIE BLETCH") + + cfg = comp.get_config() + assert "prefix" in cfg + + c2 = Prefixer.from_config(cfg) + assert c2 is not comp + assert c2.prefix == comp.prefix + + +def test_pipeline_config(): + comp = Prefixer("scroll named ") + + pipe = Pipeline() + msg = pipe.create_input("msg", str) + pipe.add_component("prefix", comp, msg=msg) + + assert pipe.run(msg="FOOBIE BLETCH") == "scroll named FOOBIE BLETCH" + + config = pipe.component_configs() + print(json.dumps(config, indent=2)) + + assert "prefix" in config + assert config["prefix"]["prefix"] == "scroll named " + + +def test_pipeline_clone(): + comp = Prefixer("scroll named ") + + pipe = Pipeline() + msg = pipe.create_input("msg", str) + pipe.add_component("prefix", comp, msg=msg) + + assert pipe.run(msg="FOOBIE BLETCH") == "scroll named FOOBIE BLETCH" + + p2 = pipe.clone() + n2 = p2.node("prefix") + assert isinstance(n2, ComponentNode) + assert isinstance(n2.component, Prefixer) + assert n2.component is not comp + assert n2.component.prefix == comp.prefix + + assert p2.run(msg="HACKEM MUCHE") == "scroll named HACKEM MUCHE" + + +def test_pipeline_clone_with_function(): + comp = Prefixer("scroll named ") + + pipe = Pipeline() + msg = pipe.create_input("msg", str) + pfx = pipe.add_component("prefix", comp, msg=msg) + pipe.add_component("exclaim", exclaim, msg=pfx) + + assert pipe.run(msg="FOOBIE BLETCH") == "scroll named FOOBIE BLETCH!" + + p2 = pipe.clone() + + assert p2.run(msg="HACKEM MUCHE") == "scroll named HACKEM MUCHE!" + + +def test_pipeline_clone_with_nonconfig_class(): + comp = Prefixer("scroll named ") + + pipe = Pipeline() + msg = pipe.create_input("msg", str) + pfx = pipe.add_component("prefix", comp, msg=msg) + pipe.add_component("question", Question(), msg=pfx) + + assert pipe.run(msg="FOOBIE BLETCH") == "scroll named FOOBIE BLETCH?" + + p2 = pipe.clone() + + assert p2.run(msg="HACKEM MUCHE") == "scroll named HACKEM MUCHE?" diff --git a/tests/lkpipeline/test_pipeline_state.py b/tests/lkpipeline/test_pipeline_state.py new file mode 100644 index 00000000..074aef67 --- /dev/null +++ b/tests/lkpipeline/test_pipeline_state.py @@ -0,0 +1,38 @@ +from pytest import raises + +from poprox_recommender.lkpipeline import PipelineState + + +def test_empty(): + state = PipelineState() + assert len(state) == 0 + assert not state + assert "scroll" not in state + + with raises(KeyError): + state["scroll"] + + +def test_single_value(): + state = PipelineState({"scroll": "HACKEM MUCHE"}) + assert len(state) == 1 + assert state + assert "scroll" in state + assert state["scroll"] == "HACKEM MUCHE" + + +def test_alias(): + state = PipelineState({"scroll": "HACKEM MUCHE"}, {"book": "scroll"}) + assert len(state) == 1 + assert state + assert "scroll" in state + assert "book" in state + assert state["book"] == "HACKEM MUCHE" + + +def test_alias_missing(): + state = PipelineState({"scroll": "HACKEM MUCHE"}, {"book": "manuscript"}) + assert len(state) == 1 + assert state + assert "scroll" in state + assert "book" not in state diff --git a/tests/lkpipeline/test_pipeline_types.py b/tests/lkpipeline/test_pipeline_types.py new file mode 100644 index 00000000..70b09e0f --- /dev/null +++ b/tests/lkpipeline/test_pipeline_types.py @@ -0,0 +1,87 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University +# Copyright (C) 2023-2024 Drexel University +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +""" +Tests for the pipeline type-checking functions. +""" + +import typing +from collections.abc import Iterable, Sequence + +import numpy as np +import pandas as pd +from numpy.typing import ArrayLike, NDArray +from pytest import warns + +from poprox_recommender.lkpipeline.types import TypecheckWarning, is_compatible_data, is_compatible_type + + +def test_type_compat_identical(): + assert is_compatible_type(int, int) + assert is_compatible_type(str, str) + + +def test_type_compat_assignable(): + assert is_compatible_type(int, float) + + +def test_type_raw_compat_with_generic(): + assert is_compatible_type(list, list[int]) + assert not is_compatible_type(set, list[int]) + + +def test_type_compat_protocol(): + assert is_compatible_type(list, Sequence) + assert is_compatible_type(list, typing.Sequence) + assert not is_compatible_type(set, Sequence) + assert not is_compatible_type(set, typing.Sequence) + assert is_compatible_type(set, Iterable) + + +def test_type_compat_protocol_generic(): + assert is_compatible_type(list, Sequence[int]) + assert is_compatible_type(list, typing.Sequence[int]) + + +def test_type_compat_generics_with_protocol(): + assert is_compatible_type(list[int], Sequence[int]) + + +def test_type_incompat_generics(): + with warns(TypecheckWarning): + assert is_compatible_type(list[int], list[str]) + with warns(TypecheckWarning): + assert is_compatible_type(list[int], Sequence[str]) + + +def test_data_compat_basic(): + assert is_compatible_data(72, int) + assert is_compatible_data("hello", str) + assert not is_compatible_data(72, str) + + +def test_data_compat_float_assignabile(): + assert is_compatible_data(72, float) + + +def test_data_compat_generic(): + assert is_compatible_data(["foo"], list[str]) + # this is compatible because we can't check generics + with warns(TypecheckWarning): + assert is_compatible_data([72], list[str]) + + +def test_numpy_typecheck(): + assert is_compatible_data(np.arange(10, dtype="i8"), NDArray[np.int64]) + assert is_compatible_data(np.arange(10, dtype="i4"), NDArray[np.int32]) + assert is_compatible_data(np.arange(10), ArrayLike) + assert is_compatible_data(np.arange(10), NDArray[np.integer]) + # numpy types can be checked + assert not is_compatible_data(np.arange(10), NDArray[np.float64]) + + +def test_pandas_typecheck(): + assert is_compatible_data(pd.Series(["a", "b"]), ArrayLike)