Skip to content

Commit

Permalink
Use the new LensKit pipeline abstraction (#81)
Browse files Browse the repository at this point in the history
This updates the POPROX pipeline to use a vendored copy of the LensKit pipeline abstraction.

The LensKit pipeline is heavily inspired by the POPROX one. My design
process was basically “how do I take the core idea from the POPROX
pipeline — which I really like — and add the capabilities I need for
LensKit + build it on a DAG instead of linear and iterative state
updates?”.

The pipeline docs are rendered here:
https://lkpy.lenskit.org/en/latest/pipeline.html
  • Loading branch information
mdekstrand authored and sophiasun0515 committed Aug 12, 2024
1 parent fa17091 commit 04f3f85
Show file tree
Hide file tree
Showing 15 changed files with 1,943 additions and 47 deletions.
55 changes: 33 additions & 22 deletions src/poprox_recommender/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from poprox_recommender.components.filters import TopicFilter
from poprox_recommender.components.joiners import Fill
from poprox_recommender.components.rankers.topk import TopkRanker
from poprox_recommender.components.samplers import UniformSampler
from poprox_recommender.components.samplers.uniform import UniformSampler
from poprox_recommender.components.scorers import ArticleScorer
from poprox_recommender.lkpipeline import Pipeline, PipelineState
from poprox_recommender.model import get_model
from poprox_recommender.pipeline import PipelineState, RecommendationPipeline

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand All @@ -25,22 +25,25 @@ def select_articles(
algo_params: dict[str, Any] | None = None,
) -> PipelineState:
"""
Select articles with default recommender configuration.
Select articles with default recommender configuration. It returns a
pipeline state whose ``default`` is the final list of recommendations.
"""
pipeline = None

pipeline = personalized_pipeline(num_slots, algo_params)
assert pipeline is not None

inputs = {
"candidate": candidate_articles,
"clicked": clicked_articles,
"profile": interest_profile,
}
recs = pipeline.node("recommender")
topk = pipeline.node("ranker", missing="none")
if topk is None:
wanted = (recs,)
else:
wanted = (topk, recs)

return pipeline(inputs)
return pipeline.run_all(*wanted, candidate=candidate_articles, clicked=clicked_articles, profile=interest_profile)


def personalized_pipeline(num_slots: int, algo_params: dict[str, Any] | None = None) -> RecommendationPipeline | None:
def personalized_pipeline(num_slots: int, algo_params: dict[str, Any] | None = None) -> Pipeline | None:
"""
Create the default personalized recommendation pipeline.
Expand Down Expand Up @@ -82,23 +85,31 @@ def personalized_pipeline(num_slots: int, algo_params: dict[str, Any] | None = N
logger.info("Recommendations will be ranked with plain top-k.")
ranker = topk_ranker

pipeline = RecommendationPipeline(name=diversify)
# TODO put pipeline name back in
pipeline = Pipeline()

# Define pipeline inputs
candidates = pipeline.create_input("candidate", ArticleSet)
clicked = pipeline.create_input("clicked", ArticleSet)
profile = pipeline.create_input("profile", InterestProfile)

# Compute embeddings
pipeline.add(article_embedder, inputs=["candidate"], output="candidate")
pipeline.add(article_embedder, inputs=["clicked"], output="clicked")
pipeline.add(user_embedder, inputs=["clicked", "profile"], output="profile")
e_cand = pipeline.add_component("candidate-embedder", article_embedder, article_set=candidates)
e_click = pipeline.add_component("history-emberdder", article_embedder, article_set=clicked)
e_user = pipeline.add_component("user-embedder", user_embedder, clicked_articles=e_click, interest_profile=profile)

# Score and rank articles with diversification/calibration reranking
pipeline.add(article_scorer, inputs=["candidate", "profile"], output="candidate")
pipeline.add(ranker, inputs=["candidate", "profile"], output="reranked")

# Output the plain descending-by-score ranking for comparison
pipeline.add(topk_ranker, inputs=["candidate", "profile"], output="ranked")
o_scored = pipeline.add_component("scorer", article_scorer, candidate_articles=e_cand, interest_profile=e_user)
o_topk = pipeline.add_component("ranker", topk_ranker, candidate_articles=o_scored, interest_profile=e_user)
if ranker is topk_ranker:
o_rank = o_topk
else:
o_rank = pipeline.add_component("reranker", ranker, candidate_articles=o_scored, interest_profile=e_user)

# Fallback in case not enough articles came from the ranker
pipeline.add(topic_filter, inputs=["candidate", "profile"], output="topical")
pipeline.add(sampler, inputs=["topical", "candidate"], output="sampled")
pipeline.add(fill, inputs=["reranked", "sampled"], output="recs")
# TODO: make this lazy so the sampler only runs if the reranker isn't enough
o_filtered = pipeline.add_component("topic-filter", topic_filter, candidate=candidates, interest_profile=profile)
o_sampled = pipeline.add_component("sampler", sampler, candidate=o_filtered, backup=candidates)
pipeline.add_component("recommender", fill, candidates1=o_rank, candidates2=o_sampled)

return pipeline
24 changes: 10 additions & 14 deletions src/poprox_recommender/evaluation/offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from tqdm import tqdm

from poprox_recommender.data.mind import TEST_REC_COUNT, MindData
from poprox_recommender.lkpipeline import PipelineState
from poprox_recommender.logging_config import setup_logging
from poprox_recommender.pipeline import RecommendationPipeline

Expand All @@ -25,7 +26,6 @@
from poprox_recommender.default import personalized_pipeline
from poprox_recommender.evaluation.metrics import rank_biased_overlap
from poprox_recommender.paths import model_file_path, project_root
from poprox_recommender.pipeline import PipelineState

logger = logging.getLogger("poprox_recommender.test_offline")

Expand All @@ -45,27 +45,23 @@ def custom_encoder(obj):
return str(obj)


def recsys_metric(
mind_data: MindData,
request: RecommendationRequest,
pipeline_state: PipelineState,
):
def recsys_metric(mind_data: MindData, request: RecommendationRequest, state: PipelineState):
# recommendations {account id (uuid): LIST[Article]}
# use the url of Article
final = state["recommender"]

recs = pd.DataFrame({"item": [a.article_id for a in pipeline_state.recs]})
recs = pd.DataFrame({"item": [a.article_id for a in final.articles]})
truth = mind_data.user_truth(request.interest_profile.profile_id)

# RR should look for *clicked* articles, not just all impression articles
single_rr = topn.recip_rank(recs, truth[truth["rating"] > 0])
single_ndcg5 = topn.ndcg(recs, truth, k=5)
single_ndcg10 = topn.ndcg(recs, truth, k=10)

ranked = pipeline_state.elements.get("ranked", None)
reranked = pipeline_state.elements.get("reranked", None)
if ranked and reranked:
single_rbo5 = rank_biased_overlap(ranked, reranked, k=5)
single_rbo10 = rank_biased_overlap(ranked, reranked, k=10)
if "ranker" in state:
topk = state["ranker"]
single_rbo5 = rank_biased_overlap(topk, final, k=5)
single_rbo10 = rank_biased_overlap(topk, final, k=10)
else:
single_rbo5 = None
single_rbo10 = None
Expand Down Expand Up @@ -115,7 +111,7 @@ def recsys_metric(
"clicked": ArticleSet(articles=request.past_articles),
"profile": request.interest_profile,
}
outputs = pipeline(inputs)
state = pipeline.run_all(**inputs)
if request.interest_profile.click_history.article_ids:
personalized = 1
else:
Expand All @@ -125,7 +121,7 @@ def recsys_metric(
raise e

logger.debug("measuring for user %s", request.interest_profile.profile_id)
single_ndcg5, single_ndcg10, single_rr, single_rbo5, single_rbo10 = recsys_metric(mind_data, request, outputs)
single_ndcg5, single_ndcg10, single_rr, single_rbo5, single_rbo10 = recsys_metric(mind_data, request, state)
user_csv.writerow(
[
request.interest_profile.profile_id,
Expand Down
7 changes: 2 additions & 5 deletions src/poprox_recommender/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
import logging

from poprox_concepts import ArticleSet
from poprox_concepts.api.recommendations import (
RecommendationRequest,
RecommendationResponse,
)
from poprox_concepts.api.recommendations import RecommendationRequest, RecommendationResponse
from poprox_recommender.default import select_articles
from poprox_recommender.topics import user_topic_preference

Expand Down Expand Up @@ -62,7 +59,7 @@ def generate_recs(event, context):
logger.info("Constructing response...")
resp_body = RecommendationResponse.model_validate(
{
"recommendations": {profile.profile_id: outputs.recs},
"recommendations": {profile.profile_id: outputs.default.articles},
}
)

Expand Down
Loading

0 comments on commit 04f3f85

Please sign in to comment.