-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
227 lines (199 loc) · 9.51 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import os
import json
import base64
import asyncio
import websockets
from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.websockets import WebSocketDisconnect
from twilio.twiml.voice_response import VoiceResponse, Connect, Say, Stream
from dotenv import load_dotenv
load_dotenv()
# Configuration
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
PORT = int(os.getenv('PORT', 5050))
SYSTEM_MESSAGE = (
"You are a helpful and bubbly AI assistant who loves to chat about "
"anything the user is interested in and is prepared to offer them facts. "
"You have a penchant for dad jokes, owl jokes, and rickrolling – subtly. "
"Always stay positive, but work in a joke when appropriate."
)
VOICE = 'alloy'
LOG_EVENT_TYPES = [
'error', 'response.content.done', 'rate_limits.updated',
'response.done', 'input_audio_buffer.committed',
'input_audio_buffer.speech_stopped', 'input_audio_buffer.speech_started',
'session.created'
]
SHOW_TIMING_MATH = False
app = FastAPI()
if not OPENAI_API_KEY:
raise ValueError('Missing the OpenAI API key. Please set it in the .env file.')
@app.get("/", response_class=JSONResponse)
async def index_page():
return {"message": "Twilio Media Stream Server is running!"}
@app.api_route("/incoming-call", methods=["GET", "POST"])
async def handle_incoming_call(request: Request):
"""Handle incoming call and return TwiML response to connect to Media Stream."""
response = VoiceResponse()
# <Say> punctuation to improve text-to-speech flow
response.say("Please wait while we connect your call to the A. I. voice assistant, powered by Twilio and the Open-A.I. Realtime API")
response.pause(length=1)
response.say("O.K. you can start talking!")
host = request.url.hostname
connect = Connect()
connect.stream(url=f'wss://{host}/media-stream')
response.append(connect)
return HTMLResponse(content=str(response), media_type="application/xml")
@app.websocket("/media-stream")
async def handle_media_stream(websocket: WebSocket):
"""Handle WebSocket connections between Twilio and OpenAI."""
print("Client connected")
await websocket.accept()
async with websockets.connect(
'wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01',
extra_headers={
"Authorization": f"Bearer {OPENAI_API_KEY}",
"OpenAI-Beta": "realtime=v1"
}
) as openai_ws:
await initialize_session(openai_ws)
# Connection specific state
stream_sid = None
latest_media_timestamp = 0
last_assistant_item = None
mark_queue = []
response_start_timestamp_twilio = None
async def receive_from_twilio():
"""Receive audio data from Twilio and send it to the OpenAI Realtime API."""
nonlocal stream_sid, latest_media_timestamp
try:
async for message in websocket.iter_text():
data = json.loads(message)
if data['event'] == 'media' and openai_ws.open:
latest_media_timestamp = int(data['media']['timestamp'])
audio_append = {
"type": "input_audio_buffer.append",
"audio": data['media']['payload']
}
await openai_ws.send(json.dumps(audio_append))
elif data['event'] == 'start':
stream_sid = data['start']['streamSid']
print(f"Incoming stream has started {stream_sid}")
response_start_timestamp_twilio = None
latest_media_timestamp = 0
last_assistant_item = None
elif data['event'] == 'mark':
if mark_queue:
mark_queue.pop(0)
except WebSocketDisconnect:
print("Client disconnected.")
if openai_ws.open:
await openai_ws.close()
async def send_to_twilio():
"""Receive events from the OpenAI Realtime API, send audio back to Twilio."""
nonlocal stream_sid, last_assistant_item, response_start_timestamp_twilio
try:
async for openai_message in openai_ws:
response = json.loads(openai_message)
if response['type'] in LOG_EVENT_TYPES:
print(f"Received event: {response['type']}", response)
if response.get('type') == 'response.audio.delta' and 'delta' in response:
audio_payload = base64.b64encode(base64.b64decode(response['delta'])).decode('utf-8')
audio_delta = {
"event": "media",
"streamSid": stream_sid,
"media": {
"payload": audio_payload
}
}
await websocket.send_json(audio_delta)
if response_start_timestamp_twilio is None:
response_start_timestamp_twilio = latest_media_timestamp
if SHOW_TIMING_MATH:
print(f"Setting start timestamp for new response: {response_start_timestamp_twilio}ms")
# Update last_assistant_item safely
if response.get('item_id'):
last_assistant_item = response['item_id']
await send_mark(websocket, stream_sid)
# Trigger an interruption. Your use case might work better using `input_audio_buffer.speech_stopped`, or combining the two.
if response.get('type') == 'input_audio_buffer.speech_started':
print("Speech started detected.")
if last_assistant_item:
print(f"Interrupting response with id: {last_assistant_item}")
await handle_speech_started_event()
except Exception as e:
print(f"Error in send_to_twilio: {e}")
async def handle_speech_started_event():
"""Handle interruption when the caller's speech starts."""
nonlocal response_start_timestamp_twilio, last_assistant_item
print("Handling speech started event.")
if mark_queue and response_start_timestamp_twilio is not None:
elapsed_time = latest_media_timestamp - response_start_timestamp_twilio
if SHOW_TIMING_MATH:
print(f"Calculating elapsed time for truncation: {latest_media_timestamp} - {response_start_timestamp_twilio} = {elapsed_time}ms")
if last_assistant_item:
if SHOW_TIMING_MATH:
print(f"Truncating item with ID: {last_assistant_item}, Truncated at: {elapsed_time}ms")
truncate_event = {
"type": "conversation.item.truncate",
"item_id": last_assistant_item,
"content_index": 0,
"audio_end_ms": elapsed_time
}
await openai_ws.send(json.dumps(truncate_event))
await websocket.send_json({
"event": "clear",
"streamSid": stream_sid
})
mark_queue.clear()
last_assistant_item = None
response_start_timestamp_twilio = None
async def send_mark(connection, stream_sid):
if stream_sid:
mark_event = {
"event": "mark",
"streamSid": stream_sid,
"mark": {"name": "responsePart"}
}
await connection.send_json(mark_event)
mark_queue.append('responsePart')
await asyncio.gather(receive_from_twilio(), send_to_twilio())
async def send_initial_conversation_item(openai_ws):
"""Send initial conversation item if AI talks first."""
initial_conversation_item = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": "Greet the user with 'Hello there! I am an AI voice assistant powered by Twilio and the OpenAI Realtime API. You can ask me for facts, jokes, or anything you can imagine. How can I help you?'"
}
]
}
}
await openai_ws.send(json.dumps(initial_conversation_item))
await openai_ws.send(json.dumps({"type": "response.create"}))
async def initialize_session(openai_ws):
"""Control initial session with OpenAI."""
session_update = {
"type": "session.update",
"session": {
"turn_detection": {"type": "server_vad"},
"input_audio_format": "g711_ulaw",
"output_audio_format": "g711_ulaw",
"voice": VOICE,
"instructions": SYSTEM_MESSAGE,
"modalities": ["text", "audio"],
"temperature": 0.8,
}
}
print('Sending session update:', json.dumps(session_update))
await openai_ws.send(json.dumps(session_update))
# Uncomment the next line to have the AI speak first
# await send_initial_conversation_item(openai_ws)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=PORT)