From a0e926aab4c8e15b2e7bd93a854a1e0641c8086c Mon Sep 17 00:00:00 2001 From: Nick Bobrowski <39348559+bonk1t@users.noreply.github.com> Date: Tue, 24 Dec 2024 13:14:20 +0000 Subject: [PATCH] Integrate AgentOps; Minor fixes in thread.py --- .gitignore | 2 +- agency_swarm/threads/thread.py | 133 ++++++++---------- agency_swarm/util/tracking/__init__.py | 21 ++- agency_swarm/util/tracking/langchain_types.py | 5 - .../util/tracking/local_callback_handler.py | 2 +- docs/advanced-usage/observability.md | 108 +++++++------- tests/demos/demo_observability.py | 11 +- 7 files changed, 133 insertions(+), 149 deletions(-) diff --git a/.gitignore b/.gitignore index 4581b917..de30eb09 100644 --- a/.gitignore +++ b/.gitignore @@ -121,7 +121,7 @@ celerybeat.pid # Environments .env -.venv +.venv/ env/ venv/ ENV/ diff --git a/agency_swarm/threads/thread.py b/agency_swarm/threads/thread.py index 5e4b3d5c..b1463fc0 100644 --- a/agency_swarm/threads/thread.py +++ b/agency_swarm/threads/thread.py @@ -5,7 +5,7 @@ import re import time from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import List, Type, Union +from typing import Any, Type, Union from uuid import UUID from openai import APIError, BadRequestError @@ -89,10 +89,10 @@ def init_thread(self): def get_completion_stream( self, - message: Union[str, List[dict], None], + message: Union[str, list[dict], None], event_handler: Type[AgencyEventHandler] | None = None, - message_files: List[str] = None, - attachments: List[Attachment] | None = None, + message_files: list[str] = None, + attachments: list[Attachment] | None = None, recipient_agent: Agent = None, additional_instructions: str = None, tool_choice: AssistantToolChoice = None, @@ -112,9 +112,9 @@ def get_completion_stream( def get_completion( self, - message: Union[str, List[dict], None], - message_files: List[str] = None, - attachments: List[dict] | None = None, + message: Union[str, list[dict], None], + message_files: list[str] = None, + attachments: list[dict] | None = None, recipient_agent: Union[Agent, None] = None, additional_instructions: str = None, event_handler: Type[AgencyEventHandler] | None = None, @@ -178,6 +178,7 @@ def get_completion( ) chain_run_id = self._run.id + final_output = None # Track the final output # Chain start if self.callback_handler: @@ -198,29 +199,24 @@ def get_completion( # chat model start callback if self.callback_handler: - chat_messages = [] if isinstance(message, str): - chat_messages = [[HumanMessage(content=message)]] - - kwargs = { - "invocation_params": { - "_type": "openai", - "model": self._run.model, - "temperature": self._run.temperature, - }, - "name": recipient_agent.name if recipient_agent else "Unknown", - } + agent_name = recipient_agent.name if recipient_agent else "Unknown" self.callback_handler.on_chat_model_start( - serialized={"name": kwargs["name"], "id": [self._run.id]}, - messages=chat_messages, + serialized={"name": agent_name, "id": [self._run.id]}, + messages=[[HumanMessage(content=message)]], run_id=self._run.id, parent_run_id=chain_run_id, metadata={ "agent_name": self.agent.name, "recipient_agent_name": recipient_agent.name, }, - **kwargs, + invocation_params={ + "_type": "openai", + "model": self._run.model, + "temperature": self._run.temperature, + }, + name=agent_name, ) try: @@ -230,13 +226,12 @@ def get_completion( while True: self._run_until_done() - # function execution if self._run.status == "requires_action": self._called_recepients = [] tool_calls = ( self._run.required_action.submit_tool_outputs.tool_calls ) - tool_outputs_and_names = [] # list of tuples (name, tool_output) + tool_outputs_and_names: list[tuple[str, Any]] = [] self._track_tool_calls(tool_calls, chain_run_id) @@ -309,23 +304,8 @@ def handle_output(tool_call, output): output = yield from handle_output(tool_call, output) if output_as_result: self._cancel_run() - # chain end - if self.callback_handler: - self.callback_handler.on_chain_end( - outputs={"response": output}, - run_id=chain_run_id, - parent_run_id=parent_run_id, - ) - finish = AgentFinish( - return_values={"response": output}, - log=output, - ) - self.callback_handler.on_agent_finish( - finish=finish, - run_id=self._run.id, - parent_run_id=chain_run_id, - ) - return output + final_output = output + break else: sync_tool_calls += async_tool_calls @@ -354,22 +334,11 @@ def handle_output(tool_call, output): output = yield from handle_output(tool_call, output) if output_as_result: self._cancel_run() - # chain end - if self.callback_handler: - self.callback_handler.on_chain_end( - outputs={"response": output}, - run_id=chain_run_id, - parent_run_id=parent_run_id, - ) - finish = AgentFinish( - return_values={"response": output}, log=output - ) - self.callback_handler.on_agent_finish( - finish=finish, - run_id=self._run.id, - parent_run_id=chain_run_id, - ) - return output + final_output = output + break + + if final_output is not None: + break tool_outputs = [t for _, t in tool_outputs_and_names] tool_names = [n for n, _ in tool_outputs_and_names] @@ -547,23 +516,35 @@ def handle_output(tool_call, output): ) continue - # chain end - if self.callback_handler: - self.callback_handler.on_chain_end( - outputs={"response": last_message}, - run_id=chain_run_id, - parent_run_id=parent_run_id, - ) - # agent finish - finish = AgentFinish( - return_values={"response": last_message}, log=last_message - ) - self.callback_handler.on_agent_finish( - finish=finish, - run_id=self._run.id, - parent_run_id=chain_run_id, - ) - return last_message + if final_output is None: + final_output = last_message + break + + # Ensure final_output is a string + if inspect.isgenerator(final_output): + final_output = "".join(list(final_output)) + elif not isinstance(final_output, str): + final_output = str(final_output) + + # Only fire callbacks once at the end + if self.callback_handler: + self.callback_handler.on_chain_end( + outputs={"response": final_output}, + run_id=chain_run_id, + parent_run_id=parent_run_id, + ) + finish = AgentFinish( + return_values={"response": final_output}, + log=final_output, + ) + self.callback_handler.on_agent_finish( + finish=finish, + run_id=self._run.id, + parent_run_id=chain_run_id, + ) + + return final_output + except Exception as e: # chain error if self.callback_handler: @@ -720,7 +701,7 @@ def _get_last_assistant_message(self): raise Exception("No assistant message found in the thread") def create_message( - self, message: str, role: str = "user", attachments: List[dict] = None + self, message: str, role: str = "user", attachments: list[dict] = None ): try: return self.client.beta.threads.messages.create( @@ -921,7 +902,7 @@ def get_messages(self, limit=None): return all_messages - def _track_tool_calls(self, tool_calls: List[ToolCall], chain_run_id: str) -> None: + def _track_tool_calls(self, tool_calls: list[ToolCall], parent_run_id: str) -> None: """Send agent_action before each tool call""" if not self.callback_handler: return @@ -936,5 +917,5 @@ def _track_tool_calls(self, tool_calls: List[ToolCall], chain_run_id: str) -> No self.callback_handler.on_agent_action( action=action, run_id=self._run.id, - parent_run_id=chain_run_id, + parent_run_id=parent_run_id, ) diff --git a/agency_swarm/util/tracking/__init__.py b/agency_swarm/util/tracking/__init__.py index f61f570d..c626d0d8 100644 --- a/agency_swarm/util/tracking/__init__.py +++ b/agency_swarm/util/tracking/__init__.py @@ -1,3 +1,4 @@ +import logging import threading from typing import Callable, Literal @@ -6,9 +7,11 @@ _callback_handler = None _lock = threading.Lock() +logger = logging.getLogger(__name__) -SUPPORTED_TRACKERS = ["langfuse", "local"] -SUPPORTED_TRACKERS_TYPE = Literal["langfuse", "local"] + +SUPPORTED_TRACKERS = ["agentops", "langfuse", "local"] +SUPPORTED_TRACKERS_TYPE = Literal["agentops", "langfuse", "local"] def get_callback_handler(): @@ -27,16 +30,24 @@ def init_tracking(tracker_name: SUPPORTED_TRACKERS_TYPE, **kwargs): if tracker_name not in SUPPORTED_TRACKERS: raise ValueError(f"Invalid tracker name: {tracker_name}") + logger.info(f"Initializing tracking with {tracker_name}...") + use_langchain_types() if tracker_name == "local": from .local_callback_handler import LocalCallbackHandler - set_callback_handler(lambda: LocalCallbackHandler(**kwargs)) + handler_class = LocalCallbackHandler + elif tracker_name == "agentops": + from agentops import LangchainCallbackHandler + + handler_class = LangchainCallbackHandler elif tracker_name == "langfuse": - from langfuse.callback import CallbackHandler as LangfuseCallbackHandler + from langfuse.callback import CallbackHandler + + handler_class = CallbackHandler - set_callback_handler(lambda: LangfuseCallbackHandler(**kwargs)) + set_callback_handler(lambda: handler_class(**kwargs)) __all__ = [ diff --git a/agency_swarm/util/tracking/langchain_types.py b/agency_swarm/util/tracking/langchain_types.py index 07aa2627..26b03f4d 100644 --- a/agency_swarm/util/tracking/langchain_types.py +++ b/agency_swarm/util/tracking/langchain_types.py @@ -51,11 +51,6 @@ def use_langchain_types() -> None: from langchain.schema import AgentFinish as LangchainAgentFinish from langchain.schema import HumanMessage as LangchainHumanMessage - # Call model_rebuild on these imported classes to resolve forward references - LangchainAgentAction.model_rebuild() - LangchainAgentFinish.model_rebuild() - LangchainHumanMessage.model_rebuild() - AgentAction.set_implementation(LangchainAgentAction) AgentFinish.set_implementation(LangchainAgentFinish) HumanMessage.set_implementation(LangchainHumanMessage) diff --git a/agency_swarm/util/tracking/local_callback_handler.py b/agency_swarm/util/tracking/local_callback_handler.py index 02172786..a1716fba 100644 --- a/agency_swarm/util/tracking/local_callback_handler.py +++ b/agency_swarm/util/tracking/local_callback_handler.py @@ -422,7 +422,7 @@ def on_retriever_end( parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: - docs_json = json.dumps([doc.dict() for doc in documents]) + docs_json = json.dumps([doc.model_dump() for doc in documents]) self._update_event( run_id, set_end_time=True, parent_run_id=parent_run_id, documents=docs_json ) diff --git a/docs/advanced-usage/observability.md b/docs/advanced-usage/observability.md index 122c51f6..2ca96493 100644 --- a/docs/advanced-usage/observability.md +++ b/docs/advanced-usage/observability.md @@ -1,79 +1,77 @@ # Observability -Agency Swarm supports tracking your agents using Langchain callbacks. This allows you to monitor and analyze the behavior and performance of your agents. - -To use tracking with Langchain callbacks, you need to install the langchain package: +Agency Swarm supports agent tracking through Langchain callbacks, helping you monitor and analyze agent behavior and performance. To get started, install the Langchain package: ```bash pip install langchain ``` +--- + ## Langfuse -Langfuse is an observability platform that allows you to track and analyze the execution of your agents in detail. It provides features like tracing, metrics, and debugging tools. +Langfuse is a platform for advanced observability, offering tracing, metrics, and debugging tools. To use Langfuse with Agency Swarm: -To use Langfuse with Agency Swarm, follow these steps: +1. Install the Langfuse package: + ```bash + pip install langfuse + ``` +2. Set environment variables for your secret and public keys (available on the Langfuse dashboard): + ```bash + export LANGFUSE_SECRET_KEY= + export LANGFUSE_PUBLIC_KEY= + ``` +3. Initialize tracking in your code: + ```python + from agency_swarm import init_tracking + init_tracking("langfuse") + ``` -1. Install the langfuse package: + You can also pass additional configuration options, for example: + ```python + init_tracking("langfuse", debug=True, host="custom-host", user_id="user-123") + ``` -```bash -pip install langfuse -``` +For more information, consult the Langfuse documentation at [Langfuse Documentation](https://langfuse.com/docs/integrations/langchain/tracing#add-langfuse-to-your-langchain-application). -2. Set the LANGFUSE_SECRET_KEY and LANGFUSE_PUBLIC_KEY environment variables: +--- -```bash -export LANGFUSE_SECRET_KEY= -export LANGFUSE_PUBLIC_KEY= -``` +## AgentOps [WIP] -You can get your keys from the [Langfuse dashboard](https://cloud.langfuse.com/). +AgentOps is another observability platform for managing and tracking your agents: -3. Initialize the tracking in your code: +1. Install the SDK and LangChain dependency: + ```bash + pip install agentops + pip install 'agentops[langchain]' + ``` +2. Set your API key in a `.env` file: + ```bash + AGENTOPS_API_KEY= + ``` +3. Run your agent. Then visit [app.agentops.ai/drilldown](https://app.agentops.ai/drilldown) to observe your agent in action. After the run, AgentOps prints a clickable URL in the console that takes you directly to your session in the dashboard. -```python -from agency_swarm import init_tracking - -init_tracking("langfuse") -``` - -You can pass additional configuration options to the Langfuse callback handler: - -```python -init_tracking( - "langfuse", - debug=True, - host="custom-host", - user_id="user-123", -) -``` +Demo GIF: +[View Demo](https://github.com/AgentOps-AI/agentops/blob/main/docs/images/link-to-session.gif?raw=true) -For additional parameters and more information on the Langfuse callback handler, see the [Langfuse documentation](https://langfuse.com/docs/integrations/langchain/tracing#add-langfuse-to-your-langchain-application). +--- ## Local -The local tracker provides a lightweight solution for logging agent activities to a SQLite database. +The local tracker logs agent activities to a lightweight SQLite database. To use it: -To use the local tracker, you'll need to install the tiktoken package: +1. Install the tiktoken package: + ```bash + pip install tiktoken + ``` +2. Initialize local tracking: + ```python + from agency_swarm import init_tracking + init_tracking("local") + ``` -```bash -pip install tiktoken -``` - -To use the local tracker, simply initialize it in your code: - -```python -from agency_swarm import init_tracking - -init_tracking("local") -``` - -This will create a SQLite database in the current working directory. - -For custom database location: +A SQLite database will be created in the current directory. To specify a custom path: -```python -from agency_swarm import init_tracking - -init_tracking("local", db_path="path/to/your/database.db") -``` + ```python + init_tracking("local", db_path="path/to/your/database.db") + ``` diff --git a/tests/demos/demo_observability.py b/tests/demos/demo_observability.py index d830b33c..ea5a621b 100644 --- a/tests/demos/demo_observability.py +++ b/tests/demos/demo_observability.py @@ -11,8 +11,9 @@ def main(): # Set the tracker type - TRACKER = "local" - # TRACKER = "langfuse" + # TRACKER = "local" + # TRACKER = "agentops" + TRACKER = "langfuse" # Initialize tracking based on the selected tracker init_tracking(TRACKER) @@ -21,19 +22,16 @@ def main(): ceo = Agent( name="CEO", description="Manages projects and coordinates between team members", - temperature=0.5, ) developer = Agent( name="Developer", description="Implements technical solutions and writes code", - temperature=0.3, ) analyst = Agent( name="Data Analyst", description="Analyzes data and provides insights", - temperature=0.4, ) # Define the communication flows within the agency @@ -43,7 +41,8 @@ def main(): [ceo, developer], # CEO can communicate with Developer [ceo, analyst], # CEO can communicate with Analyst [developer, analyst], # Developer can communicate with Analyst - ] + ], + temperature=0.01, ) # Run the demo with Gradio interface