From 418572bfaf8f4220afb3528b103100d2d114629b Mon Sep 17 00:00:00 2001 From: "Timothy J. Baek" Date: Sun, 2 Jun 2024 12:02:59 -0700 Subject: [PATCH] enh: langfuse --- examples/filters/langfuse_filter_pipeline.py | 41 ++++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index eb8e7514..f88cad3d 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -2,7 +2,7 @@ title: Langfuse Filter Pipeline author: open-webui date: 2024-05-30 -version: 1.0 +version: 1.1 license: MIT description: A filter pipeline that uses Langfuse. requirements: langfuse @@ -12,13 +12,12 @@ from schemas import OpenAIChatMessage import os - +from utils.pipelines.main import get_last_user_message, get_last_assistant_message from pydantic import BaseModel from langfuse import Langfuse class Pipeline: - class Valves(BaseModel): # List target pipeline ids (models) that this filter will be connected to. # If you want to connect this filter to all pipelines, you can set pipelines to ["*"] @@ -58,6 +57,7 @@ def __init__(self): ) self.langfuse = None + self.chat_generations = {} pass async def on_startup(self): @@ -98,21 +98,38 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: session_id=body["chat_id"], ) + generation = trace.generation( + name=body["chat_id"], + model=body["model"], + input=body["messages"], + metadata={"interface": "open-webui"}, + ) + + self.chat_generations[body["chat_id"]] = generation print(trace.get_trace_url()) return body async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: print(f"outlet:{__name__}") - - trace = self.langfuse.trace( - name=f"filter:{__name__}", - input=body, - user_id=user["id"], - metadata={"name": user["name"]}, - session_id=body["chat_id"], + if body["chat_id"] not in self.chat_generations: + return body + + generation = self.chat_generations[body["chat_id"]] + + user_message = get_last_user_message(body["messages"]) + generated_message = get_last_assistant_message(body["messages"]) + + # Update usage cost based on the length of the input and output messages + # Below does not reflect the actual cost of the API + # You can adjust the cost based on your requirements + generation.end( + output=generated_message, + usage={ + "totalCost": (len(user_message) + len(generated_message)) / 1000, + "unit": "CHARACTERS", + }, + metadata={"interface": "open-webui"}, ) - print(trace.get_trace_url()) - return body