Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype using new LensKit pipeline abstraction #81

Merged
merged 13 commits into from
Aug 9, 2024
55 changes: 33 additions & 22 deletions src/poprox_recommender/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from poprox_recommender.components.filters import TopicFilter
from poprox_recommender.components.joiners import Fill
from poprox_recommender.components.rankers.topk import TopkRanker
from poprox_recommender.components.samplers import UniformSampler
from poprox_recommender.components.samplers.uniform import UniformSampler
from poprox_recommender.components.scorers import ArticleScorer
from poprox_recommender.lkpipeline import Pipeline, PipelineState
from poprox_recommender.model import get_model
from poprox_recommender.pipeline import PipelineState, RecommendationPipeline

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand All @@ -25,22 +25,25 @@ def select_articles(
algo_params: dict[str, Any] | None = None,
) -> PipelineState:
"""
Select articles with default recommender configuration.
Select articles with default recommender configuration. It returns a
pipeline state whose ``default`` is the final list of recommendations.
"""
pipeline = None

pipeline = personalized_pipeline(num_slots, algo_params)
assert pipeline is not None

inputs = {
"candidate": candidate_articles,
"clicked": clicked_articles,
"profile": interest_profile,
}
recs = pipeline.node("recommender")
topk = pipeline.node("ranker", missing="none")
if topk is None:
wanted = (recs,)
else:
wanted = (topk, recs)

return pipeline(inputs)
return pipeline.run_all(*wanted, candidate=candidate_articles, clicked=clicked_articles, profile=interest_profile)


def personalized_pipeline(num_slots: int, algo_params: dict[str, Any] | None = None) -> RecommendationPipeline | None:
def personalized_pipeline(num_slots: int, algo_params: dict[str, Any] | None = None) -> Pipeline | None:
"""
Create the default personalized recommendation pipeline.

Expand Down Expand Up @@ -82,23 +85,31 @@ def personalized_pipeline(num_slots: int, algo_params: dict[str, Any] | None = N
logger.info("Recommendations will be ranked with plain top-k.")
ranker = topk_ranker

pipeline = RecommendationPipeline(name=diversify)
# TODO put pipeline name back in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth resolving this now or want to wait for a follow-up PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#85 adds it :)

pipeline = Pipeline()

# Define pipeline inputs
candidates = pipeline.create_input("candidate", ArticleSet)
clicked = pipeline.create_input("clicked", ArticleSet)
profile = pipeline.create_input("profile", InterestProfile)

# Compute embeddings
pipeline.add(article_embedder, inputs=["candidate"], output="candidate")
pipeline.add(article_embedder, inputs=["clicked"], output="clicked")
pipeline.add(user_embedder, inputs=["clicked", "profile"], output="profile")
e_cand = pipeline.add_component("candidate-embedder", article_embedder, article_set=candidates)
e_click = pipeline.add_component("history-emberdder", article_embedder, article_set=clicked)
e_user = pipeline.add_component("user-embedder", user_embedder, clicked_articles=e_click, interest_profile=profile)

# Score and rank articles with diversification/calibration reranking
pipeline.add(article_scorer, inputs=["candidate", "profile"], output="candidate")
pipeline.add(ranker, inputs=["candidate", "profile"], output="reranked")

# Output the plain descending-by-score ranking for comparison
pipeline.add(topk_ranker, inputs=["candidate", "profile"], output="ranked")
o_scored = pipeline.add_component("scorer", article_scorer, candidate_articles=e_cand, interest_profile=e_user)
o_topk = pipeline.add_component("ranker", topk_ranker, candidate_articles=o_scored, interest_profile=e_user)
if ranker is topk_ranker:
o_rank = o_topk
else:
o_rank = pipeline.add_component("reranker", ranker, candidate_articles=o_scored, interest_profile=e_user)

# Fallback in case not enough articles came from the ranker
pipeline.add(topic_filter, inputs=["candidate", "profile"], output="topical")
pipeline.add(sampler, inputs=["topical", "candidate"], output="sampled")
pipeline.add(fill, inputs=["reranked", "sampled"], output="recs")
# TODO: make this lazy so the sampler only runs if the reranker isn't enough
o_filtered = pipeline.add_component("topic-filter", topic_filter, candidate=candidates, interest_profile=profile)
o_sampled = pipeline.add_component("sampler", sampler, candidate=o_filtered, backup=candidates)
pipeline.add_component("recommender", fill, candidates1=o_rank, candidates2=o_sampled)

return pipeline
24 changes: 10 additions & 14 deletions src/poprox_recommender/evaluation/offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from tqdm import tqdm

from poprox_recommender.data.mind import TEST_REC_COUNT, MindData
from poprox_recommender.lkpipeline import PipelineState
from poprox_recommender.logging_config import setup_logging
from poprox_recommender.pipeline import RecommendationPipeline

Expand All @@ -25,7 +26,6 @@
from poprox_recommender.default import personalized_pipeline
from poprox_recommender.evaluation.metrics import rank_biased_overlap
from poprox_recommender.paths import model_file_path, project_root
from poprox_recommender.pipeline import PipelineState

logger = logging.getLogger("poprox_recommender.test_offline")

Expand All @@ -45,27 +45,23 @@ def custom_encoder(obj):
return str(obj)


def recsys_metric(
mind_data: MindData,
request: RecommendationRequest,
pipeline_state: PipelineState,
):
def recsys_metric(mind_data: MindData, request: RecommendationRequest, state: PipelineState):
# recommendations {account id (uuid): LIST[Article]}
# use the url of Article
final = state["recommender"]

recs = pd.DataFrame({"item": [a.article_id for a in pipeline_state.recs]})
recs = pd.DataFrame({"item": [a.article_id for a in final.articles]})
truth = mind_data.user_truth(request.interest_profile.profile_id)

# RR should look for *clicked* articles, not just all impression articles
single_rr = topn.recip_rank(recs, truth[truth["rating"] > 0])
single_ndcg5 = topn.ndcg(recs, truth, k=5)
single_ndcg10 = topn.ndcg(recs, truth, k=10)

ranked = pipeline_state.elements.get("ranked", None)
reranked = pipeline_state.elements.get("reranked", None)
if ranked and reranked:
single_rbo5 = rank_biased_overlap(ranked, reranked, k=5)
single_rbo10 = rank_biased_overlap(ranked, reranked, k=10)
if "ranker" in state:
topk = state["ranker"]
single_rbo5 = rank_biased_overlap(topk, final, k=5)
single_rbo10 = rank_biased_overlap(topk, final, k=10)
else:
single_rbo5 = None
single_rbo10 = None
Expand Down Expand Up @@ -115,7 +111,7 @@ def recsys_metric(
"clicked": ArticleSet(articles=request.past_articles),
"profile": request.interest_profile,
}
outputs = pipeline(inputs)
state = pipeline.run_all(**inputs)
if request.interest_profile.click_history.article_ids:
personalized = 1
else:
Expand All @@ -125,7 +121,7 @@ def recsys_metric(
raise e

logger.debug("measuring for user %s", request.interest_profile.profile_id)
single_ndcg5, single_ndcg10, single_rr, single_rbo5, single_rbo10 = recsys_metric(mind_data, request, outputs)
single_ndcg5, single_ndcg10, single_rr, single_rbo5, single_rbo10 = recsys_metric(mind_data, request, state)
user_csv.writerow(
[
request.interest_profile.profile_id,
Expand Down
7 changes: 2 additions & 5 deletions src/poprox_recommender/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
import logging

from poprox_concepts import ArticleSet
from poprox_concepts.api.recommendations import (
RecommendationRequest,
RecommendationResponse,
)
from poprox_concepts.api.recommendations import RecommendationRequest, RecommendationResponse
from poprox_recommender.default import select_articles
from poprox_recommender.topics import user_topic_preference

Expand Down Expand Up @@ -62,7 +59,7 @@ def generate_recs(event, context):
logger.info("Constructing response...")
resp_body = RecommendationResponse.model_validate(
{
"recommendations": {profile.profile_id: outputs.recs},
"recommendations": {profile.profile_id: outputs.default.articles},
}
)

Expand Down
Loading
Loading