From b48c052a8e021b3f3bf0ce85f5a346cf4ba5c384 Mon Sep 17 00:00:00 2001 From: Arsenii Shatokhin Date: Thu, 25 Jan 2024 08:04:40 +0400 Subject: [PATCH] Changed logic for checking responses in thread_async.py --- agency_swarm/agency/agency.py | 6 ++-- agency_swarm/agents/agent.py | 17 ++++----- agency_swarm/threads/thread_async.py | 54 +++++++++++++++++++++------- 3 files changed, 54 insertions(+), 23 deletions(-) diff --git a/agency_swarm/agency/agency.py b/agency_swarm/agency/agency.py index b5ea5a58..6f00971b 100644 --- a/agency_swarm/agency/agency.py +++ b/agency_swarm/agency/agency.py @@ -28,8 +28,8 @@ class ThreadsCallbacks(TypedDict): class Agency: ThreadType = Thread - send_message_tool_description = """Use this tool to facilitate direct, synchronous communication between specialized agents within your agency. When you send a message using this tool, you receive a response exclusively from the designated recipient agent. To continue the dialogue, invoke this tool again with the desired recipient and your follow-up message. Remember, communication here is synchronous; the recipient agent won't perform any tasks post-response. You are responsible for relaying the recipient agent's responses back to the user, as they do not have direct access to these replies. Keep engaging with the tool for continuous interaction until the task is fully resolved.""" - send_message_tool_description_async = """Use this tool to facilitate direct, asynchronous communication between specialized agents within your agency. When you send a message using this tool, you initiate the task with the recepient agent. To check the status of the task and recieve a response, please invoke the 'GetResponse' tool with the same recipient agent. You are responsible for relaying the recipient agent's responses back to the user, as they do not have direct access to these replies. Remember that you can't check the status yourself later, user needs to tell you when to do so. Keep engaging with this tool until the task is fully resolved.""" + send_message_tool_description = """Use this tool for synchronous communication with agents within your agency. Send messages and receive direct responses from designated agents. For ongoing dialogue, resend messages to specific agents. Communication is synchronous, without post-response tasks. Relay agent responses to the user, who lacks direct access. Continue using the tool for continuous interaction until task completion.""" + send_message_tool_description_async = """Use this tool for asynchronous communication with other agents within your agency. Initiate tasks by messaging, and check status and responses with 'GetResponse' tool. Relay responses to the user, who instructs on status checks. Continue until task completion.""" def __init__(self, agency_chart: List, shared_instructions: str = "", shared_files: List = None, async_mode: Literal['threading'] = None, @@ -171,7 +171,7 @@ def run_demo(self): except StopIteration as e: pass - def get_openapi_schema(self, url: str): + def get_customgpt_schema(self, url: str): """Returns the OpenAPI schema for the agency from the CEO agent, that you can use to integrate with custom gpts. Parameters: diff --git a/agency_swarm/agents/agent.py b/agency_swarm/agents/agent.py index 1f6d39f5..86d43d49 100644 --- a/agency_swarm/agents/agent.py +++ b/agency_swarm/agents/agent.py @@ -352,17 +352,15 @@ def get_openapi_schema(self, url): defs = openai_schema['parameters']['$defs'] del openai_schema['parameters']['$defs'] - schema['paths']["/" + tool.__name__] = { + schema['paths']["/" + openai_schema['name']] = { "post": { "description": openai_schema['description'], - "operationId": tool.__name__, + "operationId": openai_schema['name'], "parameters": [], "requestBody": { "content": { "application/json": { - "schema": { - "$ref": "#/components/schemas/" + tool.__name__ - } + "schema": openai_schema['parameters'] } }, "required": True, @@ -377,10 +375,13 @@ def get_openapi_schema(self, url): } if defs: - schema['components']['schemas'][tool.__name__] = openai_schema['parameters'], - schema['components']['schemas'].update(defs) + schema['components']['schemas'].update(**defs) - return json.dumps(schema, indent=2).replace("#/$defs/", "#/components/schemas/") + print(openai_schema) + + schema = json.dumps(schema, indent=2).replace("#/$defs/", "#/components/schemas/") + + return schema # --- Settings Methods --- diff --git a/agency_swarm/threads/thread_async.py b/agency_swarm/threads/thread_async.py index e9a95e47..ad9aab13 100644 --- a/agency_swarm/threads/thread_async.py +++ b/agency_swarm/threads/thread_async.py @@ -26,13 +26,16 @@ def worker(self, message: str, message_files=None): def get_completion_async(self, message: str, message_files=None): if self.pythread and self.pythread.is_alive(): - return "System Notification: 'Agent is busy, so your message was not recived. Please always use 'GetResponse' tool to check for status first, before using 'SendMessage' tool again for the same agent.'" + return "System Notification: 'Agent is busy, so your message was not received. Please always use 'GetResponse' tool to check for status first, before using 'SendMessage' tool again for the same agent.'" elif self.pythread and not self.pythread.is_alive(): self.pythread.join() self.pythread = None - return self.response + self.response = None - self.response = None + run = self.get_last_run() + + if run and run.status in ['queued', 'in_progress', 'requires_action']: + return "System Notification: 'Agent is busy, so your message was not received. Please always use 'GetResponse' tool to check for status first, before using 'SendMessage' tool again for the same agent.'" self.pythread = threading.Thread(target=self.worker, args=(message, message_files)) @@ -41,12 +44,39 @@ def get_completion_async(self, message: str, message_files=None): return "System Notification: 'Task has started. Please notify the user that they can tell you to check the status later. You can do this with the 'GetResponse' tool, but don't mention this tool to the user. " - def check_status(self): - if self.pythread and self.pythread.is_alive(): - return "System Notification: 'Agent is busy. Please tell the user that they need to wait and ask you to check for status again later.'" - elif self.pythread and not self.pythread.is_alive(): - self.pythread.join() - self.pythread = None - return self.response - else: - return "System Notification: 'Agent is available. Please use 'SendMessage' tool to send a message.'" \ No newline at end of file + def check_status(self, run=None): + if not run: + run = self.get_last_run() + + if not run: + return "System Notification: 'Agent is ready to receive a message. Please send a message with the 'SendMessage' tool.'" + + # check run status + if run.status in ['queued', 'in_progress', 'requires_action']: + return "System Notification: 'Task is not completed yet. Please tell the user to wait and try again later.'" + + if run.status == "failed": + return f"System Notification: 'Agent run failed with error: {run.last_error.message}. You may send another message with the 'SendMessage' tool.'" + + messages = self.client.beta.threads.messages.list( + thread_id=self.id, + order="desc", + ) + + return f"""{self.recipient_agent.name} Response: '{messages.data[0].content[0].text.value}'""" + + def get_last_run(self): + if not self.thread: + self.init_thread() + + runs = self.client.beta.threads.runs.list( + thread_id=self.thread.id, + order="desc", + ) + + if len(runs.data) == 0: + return None + + run = runs.data[0] + + return run \ No newline at end of file