Skip to content

Commit

Permalink
Changed logic for checking responses in thread_async.py
Browse files Browse the repository at this point in the history
  • Loading branch information
VRSEN committed Jan 25, 2024
1 parent 71272df commit b48c052
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 23 deletions.
6 changes: 3 additions & 3 deletions agency_swarm/agency/agency.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class ThreadsCallbacks(TypedDict):

class Agency:
ThreadType = Thread
send_message_tool_description = """Use this tool to facilitate direct, synchronous communication between specialized agents within your agency. When you send a message using this tool, you receive a response exclusively from the designated recipient agent. To continue the dialogue, invoke this tool again with the desired recipient and your follow-up message. Remember, communication here is synchronous; the recipient agent won't perform any tasks post-response. You are responsible for relaying the recipient agent's responses back to the user, as they do not have direct access to these replies. Keep engaging with the tool for continuous interaction until the task is fully resolved."""
send_message_tool_description_async = """Use this tool to facilitate direct, asynchronous communication between specialized agents within your agency. When you send a message using this tool, you initiate the task with the recepient agent. To check the status of the task and recieve a response, please invoke the 'GetResponse' tool with the same recipient agent. You are responsible for relaying the recipient agent's responses back to the user, as they do not have direct access to these replies. Remember that you can't check the status yourself later, user needs to tell you when to do so. Keep engaging with this tool until the task is fully resolved."""
send_message_tool_description = """Use this tool for synchronous communication with agents within your agency. Send messages and receive direct responses from designated agents. For ongoing dialogue, resend messages to specific agents. Communication is synchronous, without post-response tasks. Relay agent responses to the user, who lacks direct access. Continue using the tool for continuous interaction until task completion."""
send_message_tool_description_async = """Use this tool for asynchronous communication with other agents within your agency. Initiate tasks by messaging, and check status and responses with 'GetResponse' tool. Relay responses to the user, who instructs on status checks. Continue until task completion."""

def __init__(self, agency_chart: List, shared_instructions: str = "", shared_files: List = None,
async_mode: Literal['threading'] = None,
Expand Down Expand Up @@ -171,7 +171,7 @@ def run_demo(self):
except StopIteration as e:
pass

def get_openapi_schema(self, url: str):
def get_customgpt_schema(self, url: str):
"""Returns the OpenAPI schema for the agency from the CEO agent, that you can use to integrate with custom gpts.
Parameters:
Expand Down
17 changes: 9 additions & 8 deletions agency_swarm/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,17 +352,15 @@ def get_openapi_schema(self, url):
defs = openai_schema['parameters']['$defs']
del openai_schema['parameters']['$defs']

schema['paths']["/" + tool.__name__] = {
schema['paths']["/" + openai_schema['name']] = {
"post": {
"description": openai_schema['description'],
"operationId": tool.__name__,
"operationId": openai_schema['name'],
"parameters": [],
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/" + tool.__name__
}
"schema": openai_schema['parameters']
}
},
"required": True,
Expand All @@ -377,10 +375,13 @@ def get_openapi_schema(self, url):
}

if defs:
schema['components']['schemas'][tool.__name__] = openai_schema['parameters'],
schema['components']['schemas'].update(defs)
schema['components']['schemas'].update(**defs)

return json.dumps(schema, indent=2).replace("#/$defs/", "#/components/schemas/")
print(openai_schema)

schema = json.dumps(schema, indent=2).replace("#/$defs/", "#/components/schemas/")

return schema

# --- Settings Methods ---

Expand Down
54 changes: 42 additions & 12 deletions agency_swarm/threads/thread_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ def worker(self, message: str, message_files=None):

def get_completion_async(self, message: str, message_files=None):
if self.pythread and self.pythread.is_alive():
return "System Notification: 'Agent is busy, so your message was not recived. Please always use 'GetResponse' tool to check for status first, before using 'SendMessage' tool again for the same agent.'"
return "System Notification: 'Agent is busy, so your message was not received. Please always use 'GetResponse' tool to check for status first, before using 'SendMessage' tool again for the same agent.'"
elif self.pythread and not self.pythread.is_alive():
self.pythread.join()
self.pythread = None
return self.response
self.response = None

self.response = None
run = self.get_last_run()

if run and run.status in ['queued', 'in_progress', 'requires_action']:
return "System Notification: 'Agent is busy, so your message was not received. Please always use 'GetResponse' tool to check for status first, before using 'SendMessage' tool again for the same agent.'"

self.pythread = threading.Thread(target=self.worker,
args=(message, message_files))
Expand All @@ -41,12 +44,39 @@ def get_completion_async(self, message: str, message_files=None):

return "System Notification: 'Task has started. Please notify the user that they can tell you to check the status later. You can do this with the 'GetResponse' tool, but don't mention this tool to the user. "

def check_status(self):
if self.pythread and self.pythread.is_alive():
return "System Notification: 'Agent is busy. Please tell the user that they need to wait and ask you to check for status again later.'"
elif self.pythread and not self.pythread.is_alive():
self.pythread.join()
self.pythread = None
return self.response
else:
return "System Notification: 'Agent is available. Please use 'SendMessage' tool to send a message.'"
def check_status(self, run=None):
if not run:
run = self.get_last_run()

if not run:
return "System Notification: 'Agent is ready to receive a message. Please send a message with the 'SendMessage' tool.'"

# check run status
if run.status in ['queued', 'in_progress', 'requires_action']:
return "System Notification: 'Task is not completed yet. Please tell the user to wait and try again later.'"

if run.status == "failed":
return f"System Notification: 'Agent run failed with error: {run.last_error.message}. You may send another message with the 'SendMessage' tool.'"

messages = self.client.beta.threads.messages.list(
thread_id=self.id,
order="desc",
)

return f"""{self.recipient_agent.name} Response: '{messages.data[0].content[0].text.value}'"""

def get_last_run(self):
if not self.thread:
self.init_thread()

runs = self.client.beta.threads.runs.list(
thread_id=self.thread.id,
order="desc",
)

if len(runs.data) == 0:
return None

run = runs.data[0]

return run

0 comments on commit b48c052

Please sign in to comment.