diff --git a/examples/ccai-agentassist-five9-grpc/.env b/examples/ccai-agentassist-five9-grpc/.env new file mode 100644 index 0000000000..7e6a892ae1 --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/.env @@ -0,0 +1,7 @@ +SERVER_ADDESS=0.0.0.0 +PORT=8080 +PROJECT_ID= +CONVERSATION_PROFILE_ID= +CHUNK_SIZE=1024 +RESTART_TIMEOUT=160 +MAX_LOOKBACK=3 \ No newline at end of file diff --git a/examples/ccai-agentassist-five9-grpc/README.md b/examples/ccai-agentassist-five9-grpc/README.md new file mode 100644 index 0000000000..d3f6496d4d --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/README.md @@ -0,0 +1,229 @@ +Copyright 2024 Google. This software is provided as-is, without warranty or +representation for any use or purpose. Your use of it is subject to your +agreement with Google. + +# Five9 Voicestream Integration with Agent Assist + +This is a PoC to integrate Five9 Voicestream with Agent Assist. + +## Project Structure + +``` +. +├── assets +│ └── FAQ.csv +├── client +│ ├── audio +│ │ ├── END_USER.wav +│ │ └── HUMAN_AGENT.wav +│ └── client_voicestream.py +├── .env +├── proto +│ ├── voice_pb2_grpc.py +│ ├── voice_pb2.py +│ └── voice.proto +├── README.md +├── requirements.txt +└── server + ├── server.py + ├── services + │ └── get_suggestions.py + └── utils + ├── conversation_management.py + └── participant_management.py +``` + +## Components +- Agent Assist +- Five9 with VoiceStream + +## Setup Instructions + +### GCP Project Setup + +#### Creating a Project in the Google Cloud Platform Console + +If you haven't already created a project, create one now. Projects enable you to +manage all Google Cloud Platform resources for your app, including deployment, +access control, billing, and services. + +1. Open the [Cloud Platform Console][cloud-console]. +1. In the drop-down menu at the top, select **Create a project**. +1. Give your project a name. +1. Make a note of the project ID, which might be different from the project + name. The project ID is used in commands and in configurations. + +[cloud-console]: https://console.cloud.google.com/ + +#### Enabling billing for your project. + +If you haven't already enabled billing for your project, [enable +billing][enable-billing] now. Enabling billing allows is required to use Cloud +Bigtable and to create VM instances. + +[enable-billing]: https://console.cloud.google.com/project/_/settings + +#### Install the Google Cloud SDK. + +If you haven't already installed the Google Cloud SDK, [install the Google +Cloud SDK][cloud-sdk] now. The SDK contains tools and libraries that enable you +to create and manage resources on Google Cloud Platform. + +[cloud-sdk]: https://cloud.google.com/sdk/ + +#### Setting Google Application Default Credentials + +Set your [Google Application Default +Credentials][application-default-credentials] by [initializing the Google Cloud +SDK][cloud-sdk-init] with the command: + +``` + gcloud init +``` + +Generate a credentials file by running the +[application-default login](https://cloud.google.com/sdk/gcloud/reference/auth/application-default/login) +command: + +``` + gcloud auth application-default login +``` + +[cloud-sdk-init]: https://cloud.google.com/sdk/docs/initializing + +[application-default-credentials]: https://developers.google.com/identity/protocols/application-default-credentials + +#### Create a Knowledge Base + +Agent Assist follows a conversation between a human agent and an end-user and provide the human agent with relevant document suggestions. These suggestions are based on knowledge bases, namely, collections of documents that you upload to Agent Assist. These documents are called knowledge documents and can be either articles (for use with Article Suggestion) or FAQ documents (for use with FAQ Assist). + +In this specific implementation, a CSV sheet with FAQ will be used as knowledge document. +> [FAQ CSV file](./assets/FAQ.csv) +> [Create a Knowledge Base](https://cloud.google.com/agent-assist/docs/knowledge-base) + +#### Create a Conversation Profile + +A conversation profile configures a set of parameters that control the suggestions made to an agent. + +> [Create/Edit an Agent Assist Conversation Profile](https://cloud.google.com/agent-assist/docs/conversation-profile#create_and_edit_a_conversation_profile) + +While creating the the conversation profile, check the FAQs box. In the "Knowledge bases" input box, select the recently created Knowledge Base. The other values in the section should be set as default. + +Once the conversation profile is created, you can find the CONVERSATION_PROFILE_ID (Integration ID) in the following ways: + +> Open [Agent Assist](https://agentassist.cloud.google.com/), then Conversation +> Profiles on the left bottom + +### Usage Pre-requisites + +- FAQs Suggestions should be enabled in the Agent Assist Conversation Profile +- Agent Assist will only give you suggestions to conversations with Human Agents. It will not + give suggestions if the conversation is being guided by virtual agents. + + +### Local Development Set Up + +This application is designed to run on port 8080. Upon launch, the +application will initialize and bind to port 8080, making it accessible for +incoming connections. This can be changed in the .env file. + +#### Protocol Buffer Compiler: + +This implementation leverages from Buffer compilers for service definitions and data serialization. In this case, protoc is used to compile Five9's protofile. + + ``` +NOTE: The compilation of the Five9's Voicestream protofile was already made, therefore this step can be skipped. But if an update of the protofile is needed, please follow these steps to properly output the required python files. + ``` + +> [Protocol Buffer Compiler Installation](https://grpc.io/docs/protoc-installation/) +> [Five9's Voicestream protofile](./proto/voice.proto) + +To compile the protofile: +> Open a terminal window +> Go to the root where your proto folder is +> Run the following command: + ``` + python3 -m grpc_tools.protoc -I proto --python_out=proto --grpc_python_out=proto proto/voice.proto + ``` +> Two python files will be generated inside the proto folder. + > [voice_pb2_grpc.py](./proto/voice_pb2_grpc.py) + > [voice_pb2.py](./proto/voice_pb2.py) + + +#### Set of variables: + +The following variables need to be set up in the .env file inside the root folder + +``` +SERVER_ADDRESS : + Target server address + +PORT : + Connection Port + +PROJECT_ID : + GCP Project ID where the Agent Assist Conversation Profile is deployed. + +CONVERSATION_PROFILE_ID : + Agent Assist Conversation Profile ID + +CHUNK_SIZE : + Number of bytes of audio to be sent each time + +RESTART_TIMEOUT : + Timeout of one stream + +MAX_LOOKBACK : + Lookback for unprocessed audio data + +``` + +### Steps to follow + +## Start gRPC Server + +Start the gRPC Server controller. This will start a server on port 8080, where the voicestream client will send the data. + +> [Server Controller](./server/server.py) + +Inside the server folder, run the following command: + +``` +python server.py +``` + +## Start gRPC Client + +According to Five9's Self Service Developer Guide: + +``` +VoiceStream does not support multi-channel streaming. VoiceStream +transmits each distinct audio stream over a separate gRPC session: one +for audio from the agent, and one for audio to the agent. +``` + +In order to simulate this behaviour using our local environment, the same script should be run simultaneously. One that sends the customer audio (END_USER) and one that sends the agent audio (HUMAN_AGENT) + +> [Five9 Voicestream Client](./client/client_voicestream.py) + +Inside the client folder, run the following command to send the human agent audio: + +``` +python client_voicestream.py --role=HUMAN_AGENT --call_id= + +``` +In another terminal, run the following command to send the customer audio: +``` +python client_voicestream.py --role=END_USER --call_id= + +``` + +In order for both streams to be associated to the same conversation it is fundamental to specify a destination CONVERSATION_ID. For this to happen, the CALL_ID specified in the initial configuration sent by Five9 will be passed to the Agent Assist as the internal CONVERSATION_ID. In this implementation, we are manually defining this CALL_ID for testing purposes. + + +# References +1.[Agent Assist Documentation](https://cloud.google.com/agent-assist/docs) +2.[Dialogflow](https://cloud.google.com/dialogflow/docs) +3.[Five9 VoiceStream](https://www.five9.com/news/news-releases/five9-announces-five9-voicestream) +4,[Five9 VoiceStream Release Notes](https://releasenotes.five9.com/space/RNA/23143057870/VoiceStream) + diff --git a/examples/ccai-agentassist-five9-grpc/assets/FAQ.csv b/examples/ccai-agentassist-five9-grpc/assets/FAQ.csv new file mode 100644 index 0000000000..fdb4ac831c --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/assets/FAQ.csv @@ -0,0 +1,2 @@ +Where is my package? It hasn't arrived yet,I can help you track it down. Please provide the tracking number. You can also track your package directly on our website at www.everestexpeditions.com/tracking using your tracking number. +Do you know the specific time my package will arrive?,Unfortunately we do not have access to individual driver information. However most deliveries arrive before 6 PM. \ No newline at end of file diff --git a/examples/ccai-agentassist-five9-grpc/client/audio/END_USER.wav b/examples/ccai-agentassist-five9-grpc/client/audio/END_USER.wav new file mode 100644 index 0000000000..0f49c83e14 Binary files /dev/null and b/examples/ccai-agentassist-five9-grpc/client/audio/END_USER.wav differ diff --git a/examples/ccai-agentassist-five9-grpc/client/audio/HUMAN_AGENT.wav b/examples/ccai-agentassist-five9-grpc/client/audio/HUMAN_AGENT.wav new file mode 100644 index 0000000000..f5fae7bc39 Binary files /dev/null and b/examples/ccai-agentassist-five9-grpc/client/audio/HUMAN_AGENT.wav differ diff --git a/examples/ccai-agentassist-five9-grpc/client/client_voicestream.py b/examples/ccai-agentassist-five9-grpc/client/client_voicestream.py new file mode 100644 index 0000000000..3ca5ae6344 --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/client/client_voicestream.py @@ -0,0 +1,107 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import sys +import grpc +import logging +import wave +import argparse +import os +from dotenv import load_dotenv + +sys.path.append("../proto") +from voice_pb2 import VoiceConfig, StreamingConfig, StreamingVoiceRequest #pylint: disable=wrong-import-position +from voice_pb2_grpc import VoiceStub #pylint: disable=wrong-import-position + +load_dotenv('../.env') +server_address = os.getenv("SERVER_ADDRESS", "0.0.0.0") +server_port = os.getenv("PORT", "8080") +size = int(os.getenv("CHUNK_SIZE", 1024)) + +def generate_chunks(config, audio, chunk_size): + """Send initial audio configuration and audio content in chunks""" + # Send initial configuration + yield StreamingVoiceRequest(streaming_config=config) + + # Send audio content + while chunk := audio.readframes(chunk_size): # NOQA + yield StreamingVoiceRequest(audio_content=chunk) + +def run(role, call_id): + """Send requests to the server""" + + audio_path = f"audio/{role}.wav" + + #Map call_leg + #0 Agent - 1 Customer. Ignoring supervisor + call_leg = 0 if role=="HUMAN_AGENT" else 1 + print(call_leg) + + # Get Audio config + wf = wave.open(audio_path, "rb") + sample_rate = wf.getframerate() #Number of frames per second + chunk_size = size #Number of frames to be sent in each request + + # Voice Config + voice_config = VoiceConfig(encoding=1, #1 for LINEAR16 + sample_rate_hertz=sample_rate + ) + + config = StreamingConfig(voice_config=voice_config, + vcc_call_id=call_id, + domain_id="customer_domain", + campaign_id="campaing_associated", + agent_id="agent_identifier", + call_leg=call_leg, + trust_token="trust_token_123", + subscription_id="sub_id_123", + skill_id="skill_identifier" + ) + + + with grpc.insecure_channel(f"{server_address}:{server_port}") as channel: + stub = VoiceStub(channel) + responses = stub.StreamingVoice(generate_chunks(config=config, + audio=wf, + chunk_size=chunk_size)) + + # If the response has a status code + # and it is different from SRV_REQ_START_STREAMING (1001), then stop + for response in responses: + print(f"Client received: {response}") + + #Verify if the initial handshake was successful + if response.status.code and response.status.code != 1001: + break + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + + #Parse arguments + parser = argparse.ArgumentParser() + parser.add_argument("--role", + type=str, + default=None, + help="Please specify the role as HUMAN_AGENT or END_USER") + parser.add_argument("--call_id", + type=str, + default=None, + help="Please specify the CallId") + + args = parser.parse_args() + + run(role=args.role, call_id=args.call_id) diff --git a/examples/ccai-agentassist-five9-grpc/proto/voice.proto b/examples/ccai-agentassist-five9-grpc/proto/voice.proto new file mode 100644 index 0000000000..e49df368f5 --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/proto/voice.proto @@ -0,0 +1,160 @@ +syntax = "proto3"; + +package five9.voicestream; + +import "google/protobuf/timestamp.proto"; + +option java_package = "com.five9.voicestream.grpc"; + + +// Service to stream live voice of a participant in a call. +service Voice { + // Method for bidirectional streaming of messages: + // send audio of the user's speech and receive feedback in response. + rpc StreamingVoice(stream StreamingVoiceRequest) returns (stream StreamingVoiceResponse) {} +} + + +// The message sent by the client for the 'StreamingVoice' method. +// Multiple 'StreamingVoiceRequest' messages are sent repeatedly as defined below. +// The first message must be 'streaming_config' containing control data specific to the call being streamed. +// The subsequent messages must be 'audio_content' with audio payload. +// After sending the 'streaming_config' message, the client must wait for a response from the server +// with status code SRV_START_STREAMING before sending audio payloads. +// Optionally status messages 'streaming_status' can be sent any time to provide +// additional information e.g events, notifications about the call. +message StreamingVoiceRequest { + // The streaming request, which is either a streaming config, audio content or client status. + oneof streaming_request { + // Provides information about the call and the participant to the receiver. + // The first 'StreamingVoiceRequest' message must contain a 'StreamingConfig' message. + StreamingConfig streaming_config = 1; + + // As the user speaks the speech is sent in chunks of audio data + // sequentially in a stream of 'StreamingVoiceRequest' messages. + // The audio bytes must have been encoded as specified in 'StreamingConfig'. + bytes audio_content = 2; + + // Provides additional information related to the call or stream e.g. events + // like CALL_ENDED, HOLD, RESUME etc. Errors or statistics etc. + StreamingStatus streaming_status = 3; + } + // The time this message was created. + // This must be set for messages with audio data. Optional for other type of messages. + google.protobuf.Timestamp send_time = 4; +} + +// Provides information to the receiver that specifies how to process the request. +message StreamingConfig { + VoiceConfig voice_config = 1; + + // CallID to identify a call within a domain in Five9. + string vcc_call_id = 2; + + string domain_id = 3; + + string campaign_id = 4; + + string agent_id = 5; + + enum CallLeg { + AGENT = 0; + CUSTOMER = 1; + SUPERVISOR = 2; + } + + // Identifies the role of the participant + CallLeg call_leg = 6; + + string trust_token = 7; + + // same call can be streamed for multiple subscribers/filters + string subscription_id = 8; + + // Skill Id associated with this audio stream. + string skill_id = 9; + +} + +// Provides information about audio data in the request. +message VoiceConfig { + // Supported audio encoding types. + + // *********************************************** + // N.B. Currently VoiceStream supports only LINEAR16 at 8000 Hz + // Other encodings and sample rates may be supported in the future, but are + // currently ignored. + // *********************************************** + + enum AudioEncoding { + // Not specified. + ENCODING_UNSPECIFIED = 0; + + // Uncompressed 16-bit signed little-endian samples (Linear PCM). + LINEAR16 = 1; + + // 8-bit samples that compand 14-bit audio samples using G.711 PCMU/mu-law. + MULAW = 2; + + // Adaptive Multi-Rate Narrowband codec. `sample_rate_hertz` must be 8000. + AMR = 3; + + // Adaptive Multi-Rate Wideband codec. `sample_rate_hertz` must be 16000. + AMR_WB = 4; + } + + // Encoding of data sent in 'StreamingVoiceRequest' messages with audio. + AudioEncoding encoding = 1; + + // Sampling rate in Hertz of the audio data sent in 'StreamingVoiceRequest' messages. + // Currently only 8000 is supported by VoiceStream. + int32 sample_rate_hertz = 2; +} + +// The status message which can be used by either client or server +// in the Request or Response message respectively. +message StreamingStatus { + + enum StatusCode { + SUCCESS = 0; + // Client status codes + CLT_CALL_ENDED = 1; // Call ended. Close the gRPC channel. + CLT_CALL_HOLD = 2; + CLT_CALL_RESUME = 3; + CLT_DISCONNECT = 4; // Client closing gRPC channel. + CLT_ERROR_NO_RESOURCE = 100; + CLT_ERROR_TIMEOUT = 101; + CLT_ERROR_GENERIC = 102; + + // Server status codes + SRV_REQ_START_STREAMING = 1001; // Start sending audio + SRV_REQ_PAUSE_STREAMING = 1002; // Stop sending audio + SRV_REQ_DISCONNECT = 1011; // Close the existing gRPC channel. + SRV_REQ_RECONNECT = 1012; // Close the existing channel and then start all over again on a new channel. + SRV_ERROR_NO_RESOURCE = 1100; + SRV_ERROR_TIMEOUT = 1101; + SRV_ERROR_GENERIC = 1102; + } + + // The status code, which is either a client side code or server side code. + StatusCode code = 1; + + // A description of the status + string message = 2; +} + +// The message sent by the server for the 'StreamingVoice' method. +// The server may send status or provide feedback about the call using this message. +// The first message must be with status code SRV_START_STREAMING, and it must be +// sent after receiving configuration 'streaming_config' in request message from the client. +message StreamingVoiceResponse { + // Provides notifications e.g. events, errors etc about the stream to the client. + StreamingStatus status = 1; + + // Optional: Provides additional data e.g. feedback about the stream to the client. + StreamingFeedback feedback = 2; +} + +message StreamingFeedback { + VoiceConfig voice_config = 1; +} diff --git a/examples/ccai-agentassist-five9-grpc/proto/voice_pb2.py b/examples/ccai-agentassist-five9-grpc/proto/voice_pb2.py new file mode 100644 index 0000000000..e727eb1296 --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/proto/voice_pb2.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: voice.proto +# Protobuf Python Version: 5.26.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 # noqa: F401 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bvoice.proto\x12\x11\x66ive9.voicestream\x1a\x1fgoogle/protobuf/timestamp.proto\"\xf4\x01\n\x15StreamingVoiceRequest\x12>\n\x10streaming_config\x18\x01 \x01(\x0b\x32\".five9.voicestream.StreamingConfigH\x00\x12\x17\n\raudio_content\x18\x02 \x01(\x0cH\x00\x12>\n\x10streaming_status\x18\x03 \x01(\x0b\x32\".five9.voicestream.StreamingStatusH\x00\x12-\n\tsend_time\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.TimestampB\x13\n\x11streaming_request\"\xc8\x02\n\x0fStreamingConfig\x12\x34\n\x0cvoice_config\x18\x01 \x01(\x0b\x32\x1e.five9.voicestream.VoiceConfig\x12\x13\n\x0bvcc_call_id\x18\x02 \x01(\t\x12\x11\n\tdomain_id\x18\x03 \x01(\t\x12\x13\n\x0b\x63\x61mpaign_id\x18\x04 \x01(\t\x12\x10\n\x08\x61gent_id\x18\x05 \x01(\t\x12<\n\x08\x63\x61ll_leg\x18\x06 \x01(\x0e\x32*.five9.voicestream.StreamingConfig.CallLeg\x12\x13\n\x0btrust_token\x18\x07 \x01(\t\x12\x17\n\x0fsubscription_id\x18\x08 \x01(\t\x12\x10\n\x08skill_id\x18\t \x01(\t\"2\n\x07\x43\x61llLeg\x12\t\n\x05\x41GENT\x10\x00\x12\x0c\n\x08\x43USTOMER\x10\x01\x12\x0e\n\nSUPERVISOR\x10\x02\"\xc1\x01\n\x0bVoiceConfig\x12>\n\x08\x65ncoding\x18\x01 \x01(\x0e\x32,.five9.voicestream.VoiceConfig.AudioEncoding\x12\x19\n\x11sample_rate_hertz\x18\x02 \x01(\x05\"W\n\rAudioEncoding\x12\x18\n\x14\x45NCODING_UNSPECIFIED\x10\x00\x12\x0c\n\x08LINEAR16\x10\x01\x12\t\n\x05MULAW\x10\x02\x12\x07\n\x03\x41MR\x10\x03\x12\n\n\x06\x41MR_WB\x10\x04\"\xcd\x03\n\x0fStreamingStatus\x12;\n\x04\x63ode\x18\x01 \x01(\x0e\x32-.five9.voicestream.StreamingStatus.StatusCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"\xeb\x02\n\nStatusCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x12\n\x0e\x43LT_CALL_ENDED\x10\x01\x12\x11\n\rCLT_CALL_HOLD\x10\x02\x12\x13\n\x0f\x43LT_CALL_RESUME\x10\x03\x12\x12\n\x0e\x43LT_DISCONNECT\x10\x04\x12\x19\n\x15\x43LT_ERROR_NO_RESOURCE\x10\x64\x12\x15\n\x11\x43LT_ERROR_TIMEOUT\x10\x65\x12\x15\n\x11\x43LT_ERROR_GENERIC\x10\x66\x12\x1c\n\x17SRV_REQ_START_STREAMING\x10\xe9\x07\x12\x1c\n\x17SRV_REQ_PAUSE_STREAMING\x10\xea\x07\x12\x17\n\x12SRV_REQ_DISCONNECT\x10\xf3\x07\x12\x16\n\x11SRV_REQ_RECONNECT\x10\xf4\x07\x12\x1a\n\x15SRV_ERROR_NO_RESOURCE\x10\xcc\x08\x12\x16\n\x11SRV_ERROR_TIMEOUT\x10\xcd\x08\x12\x16\n\x11SRV_ERROR_GENERIC\x10\xce\x08\"\x84\x01\n\x16StreamingVoiceResponse\x12\x32\n\x06status\x18\x01 \x01(\x0b\x32\".five9.voicestream.StreamingStatus\x12\x36\n\x08\x66\x65\x65\x64\x62\x61\x63k\x18\x02 \x01(\x0b\x32$.five9.voicestream.StreamingFeedback\"I\n\x11StreamingFeedback\x12\x34\n\x0cvoice_config\x18\x01 \x01(\x0b\x32\x1e.five9.voicestream.VoiceConfig2t\n\x05Voice\x12k\n\x0eStreamingVoice\x12(.five9.voicestream.StreamingVoiceRequest\x1a).five9.voicestream.StreamingVoiceResponse\"\x00(\x01\x30\x01\x42\x1c\n\x1a\x63om.five9.voicestream.grpcb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'voice_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'\n\032com.five9.voicestream.grpc' + _globals['_STREAMINGVOICEREQUEST']._serialized_start=68 + _globals['_STREAMINGVOICEREQUEST']._serialized_end=312 + _globals['_STREAMINGCONFIG']._serialized_start=315 + _globals['_STREAMINGCONFIG']._serialized_end=643 + _globals['_STREAMINGCONFIG_CALLLEG']._serialized_start=593 + _globals['_STREAMINGCONFIG_CALLLEG']._serialized_end=643 + _globals['_VOICECONFIG']._serialized_start=646 + _globals['_VOICECONFIG']._serialized_end=839 + _globals['_VOICECONFIG_AUDIOENCODING']._serialized_start=752 + _globals['_VOICECONFIG_AUDIOENCODING']._serialized_end=839 + _globals['_STREAMINGSTATUS']._serialized_start=842 + _globals['_STREAMINGSTATUS']._serialized_end=1303 + _globals['_STREAMINGSTATUS_STATUSCODE']._serialized_start=940 + _globals['_STREAMINGSTATUS_STATUSCODE']._serialized_end=1303 + _globals['_STREAMINGVOICERESPONSE']._serialized_start=1306 + _globals['_STREAMINGVOICERESPONSE']._serialized_end=1438 + _globals['_STREAMINGFEEDBACK']._serialized_start=1440 + _globals['_STREAMINGFEEDBACK']._serialized_end=1513 + _globals['_VOICE']._serialized_start=1515 + _globals['_VOICE']._serialized_end=1631 +# @@protoc_insertion_point(module_scope) diff --git a/examples/ccai-agentassist-five9-grpc/proto/voice_pb2_grpc.py b/examples/ccai-agentassist-five9-grpc/proto/voice_pb2_grpc.py new file mode 100644 index 0000000000..ff697d7792 --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/proto/voice_pb2_grpc.py @@ -0,0 +1,107 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +import voice_pb2 as voice__pb2 + +GRPC_GENERATED_VERSION = '1.64.1' +GRPC_VERSION = grpc.__version__ +EXPECTED_ERROR_RELEASE = '1.65.0' +SCHEDULED_RELEASE_DATE = 'June 25, 2024' +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + warnings.warn( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in voice_pb2_grpc.py depends on' # noqa: F541 + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + + f' This warning will become an error in {EXPECTED_ERROR_RELEASE},' + + f' scheduled for release on {SCHEDULED_RELEASE_DATE}.', + RuntimeWarning + ) + + +class VoiceStub(object): + """Service to stream live voice of a participant in a call. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.StreamingVoice = channel.stream_stream( + '/five9.voicestream.Voice/StreamingVoice', + request_serializer=voice__pb2.StreamingVoiceRequest.SerializeToString, + response_deserializer=voice__pb2.StreamingVoiceResponse.FromString, + _registered_method=True) + + +class VoiceServicer(object): + """Service to stream live voice of a participant in a call. + """ + + def StreamingVoice(self, request_iterator, context): + """Method for bidirectional streaming of messages: + send audio of the user's speech and receive feedback in response. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_VoiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'StreamingVoice': grpc.stream_stream_rpc_method_handler( + servicer.StreamingVoice, + request_deserializer=voice__pb2.StreamingVoiceRequest.FromString, + response_serializer=voice__pb2.StreamingVoiceResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'five9.voicestream.Voice', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('five9.voicestream.Voice', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class Voice(object): + """Service to stream live voice of a participant in a call. + """ + + @staticmethod + def StreamingVoice(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream( + request_iterator, + target, + '/five9.voicestream.Voice/StreamingVoice', + voice__pb2.StreamingVoiceRequest.SerializeToString, + voice__pb2.StreamingVoiceResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/examples/ccai-agentassist-five9-grpc/requirements.txt b/examples/ccai-agentassist-five9-grpc/requirements.txt new file mode 100644 index 0000000000..e50c5e8ab7 --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/requirements.txt @@ -0,0 +1,31 @@ +astroid==3.2.3 +cachetools==5.3.3 +certifi==2024.7.4 +cffi==1.16.0 +charset-normalizer==3.3.2 +dill==0.3.8 +google-api-core==2.19.1 +google-auth==2.32.0 +google-cloud-dialogflow==2.30.1 +googleapis-common-protos==1.63.2 +grpcio==1.65.0 +grpcio-status==1.65.0 +grpcio-tools==1.64.1 +idna==3.7 +isort==5.13.2 +mccabe==0.7.0 +platformdirs==4.2.2 +proto-plus==1.24.0 +protobuf==5.27.1 +pyasn1==0.6.0 +pyasn1_modules==0.4.0 +PyAudio==0.2.14 +pycparser==2.22 +pylint==3.2.5 +python-dotenv==1.0.1 +requests==2.32.3 +rsa==4.9 +six==1.16.0 +sounddevice==0.4.7 +tomlkit==0.13.0 +urllib3==2.2.2 diff --git a/examples/ccai-agentassist-five9-grpc/server/server.py b/examples/ccai-agentassist-five9-grpc/server/server.py new file mode 100644 index 0000000000..94331e43da --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/server/server.py @@ -0,0 +1,45 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import grpc +from concurrent import futures +import logging +import sys +import os +from dotenv import load_dotenv +import sounddevice # noqa: F401 + +sys.path.append("..") +sys.path.append("../proto") +from services.get_suggestions import VoiceServicer #pylint: disable=wrong-import-position +from voice_pb2_grpc import add_VoiceServicer_to_server #pylint: disable=wrong-import-position + +load_dotenv('../.env') +server_port = os.getenv("PORT", "8080") + +def _serve(port:str): + """Start gRPC server""" + server = grpc.server(futures.ThreadPoolExecutor(max_workers=5)) + add_VoiceServicer_to_server(VoiceServicer(), server) + print("Server Started!") + bind_address = f"[::]:{port}" + server.add_insecure_port(bind_address) + server.start() + logging.info("Listening on %s.", bind_address) + server.wait_for_termination() + +if __name__ == "__main__": + _serve(server_port) diff --git a/examples/ccai-agentassist-five9-grpc/server/services/get_suggestions.py b/examples/ccai-agentassist-five9-grpc/server/services/get_suggestions.py new file mode 100644 index 0000000000..0663e80580 --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/server/services/get_suggestions.py @@ -0,0 +1,289 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import sys +from voice_pb2 import StreamingStatus, StreamingVoiceResponse +from voice_pb2_grpc import VoiceServicer +import utils.conversation_management as conversation_management +import utils.participant_management as participant_management +import logging +from six.moves import queue +import pyaudio +from google.api_core.exceptions import DeadlineExceeded +import os +from dotenv import load_dotenv + +load_dotenv('../.env') + +# Get Agent Assist Config +project_id = os.getenv("PROJECT_ID") +conversation_profile_id = os.getenv("CONVERSATION_PROFILE_ID") +chunk_size = int(os.getenv("CHUNK_SIZE")) +restart_timeout = int(os.getenv("RESTART_TIMEOUT")) # seconds +max_lookback = int(os.getenv("MAX_LOOKBACK")) # seconds + +YELLOW = "\033[0;33m" + +class CallStream: + """Opens a recording stream as a generator yielding the audio chunks.""" + + def __init__(self, rate, size, input_generator): + self._rate = rate + self._chunk_size = size + self.input_generator = input_generator + self._num_channels = 1 + self._buff = queue.Queue() + self.is_final = False + self.closed = True + self.terminate = True + # Count the number of times the stream analyze content restarts. + self.restart_counter = 0 + self.last_start_time = 0 + # Time end of the last is_final in millisec since last_start_time. + self.is_final_offset = 0 + # Save the audio chunks generated from the start of the audio stream for + # replay after restart. + self.audio_input_chunks = [] + self.new_stream = True + self._audio_interface = pyaudio.PyAudio() + self._audio_stream = self._audio_interface.open( + format=pyaudio.paInt16, + channels=self._num_channels, + rate=self._rate, + output=True, + stream_callback=self._callback, + frames_per_buffer=self._chunk_size + ) + + def __enter__(self): + self.closed = False + self.terminate = False + return self + + def __exit__(self, type, value, traceback): #pylint: disable=redefined-builtin + self._audio_stream.stop_stream() + self._audio_stream.close() + self.closed = True + self.terminate = True #Added + # Signal the generator to terminate so that the client's + # streaming_recognize method will not block the process termination. + self._buff.put(None) + self._audio_interface.terminate() + + def _callback(self, in_data, frame_count, time_info, status): #pylint: disable=unused-argument + # If len(data) is less than requested frame_count, PyAudio automatically + # assumes the stream is finished, and the stream stops. + + data = next(self.input_generator) + + if data.audio_content: + self._buff.put(data.audio_content) + return (data.audio_content, pyaudio.paContinue) + + def generator(self): + """Stream Audio from Call to API and to local buffer""" + try: + # Handle restart. + # print("restart generator") + # Flip the bit of is_final so it can continue stream. + self.is_final = False + total_processed_time = self.last_start_time + self.is_final_offset + processed_bytes_length = ( + int(total_processed_time * self._rate * 16 / 8) / 1000 + ) + self.last_start_time = total_processed_time + # Send out bytes stored in self.audio_input_chunks that is after the + # processed_bytes_length. + if processed_bytes_length != 0: + audio_bytes = b"".join(self.audio_input_chunks) + # Lookback for unprocessed audio data. + need_to_process_length = min( + int(len(audio_bytes) - processed_bytes_length), + int(max_lookback * self._rate * 16 / 8), + ) + # Note that you need to explicitly use `int` type for substring. + need_to_process_bytes = audio_bytes[(-1) * need_to_process_length :] + yield need_to_process_bytes + + while not self.closed and not self.is_final: + data = [] + # Use a blocking get() to ensure there's at least one chunk of + # data, and stop iteration if the chunk is None, indicating the + # end of the audio stream. + try: + chunk = self._buff.get(timeout=3) + except: # pylint: disable=bare-except + print("[Closing Stream] Conversation has ended.") + self.closed = True + self.terminate = True + break + + data.append(chunk) + + # Now try to the rest of chunks if there are any left in the _buff. + while True: + try: + chunk = self._buff.get(block=False) + if chunk is None: + return + data.append(chunk) + + except queue.Empty: + break + + self.audio_input_chunks.extend(data) + + if data: + yield b"".join(data) + + finally: + # print("Stop generator") + pass + +class VoiceServicer(VoiceServicer): #pylint: disable=function-redefined + """ + This class creates a conversation, sends audio chunks for + transcription and get live recommendations from Agent Assist + """ + def StreamingVoice(self, request_iterator, context): #pylint: disable=unused-argument + + # Initial handshake + logging.info("Reading config!") + try: + streaming_config = next(request_iterator).streaming_config + + # Be aware that if these field are not in the config object, + # it will fail silently, therefore an additional check might be necessary + conversation_id = streaming_config.vcc_call_id + sample_rate = streaming_config.voice_config.sample_rate_hertz + + if streaming_config.call_leg == 1: + participant_role = "END_USER" + else: + participant_role = "HUMAN_AGENT" + + #Send status to the client. + streaming_status = StreamingStatus(code=1001, + message="Start sending audio!") + logging.info("Responding to the client: %s", streaming_status.message) + + yield StreamingVoiceResponse(status=streaming_status) + + except: # pylint: disable=bare-except + #Send status to the client. + streaming_status = StreamingStatus(code=1102, + message="Config error!") + logging.info("Responding to the client: %s", streaming_status.message) # pylint: disable=bare-except + + #Send status to the client. + yield StreamingVoiceResponse(status=streaming_status) + + return #Interrupt the rest of the code + + # Try to create a conversation if it does not exist + + logging.info("""Creating Conversation with conversation_id: %s""", + conversation_id) + + try: + # Create conversation. + conversation_management.create_conversation( + project_id=project_id, + conversation_profile_id=conversation_profile_id, + conversation_id=conversation_id + ) + except: # pylint: disable=bare-except + #Conversation already exists + logging.info("""conversation_id: %s already exist. Connecting.""", + conversation_id) + + # Get conversation + conversation_management.get_conversation( + project_id=project_id, conversation_id=conversation_id + ) + + # Create participant. + logging.info("Creating Participant with ROLE: %s", participant_role) + participant = participant_management.create_participant( + project_id=project_id, + conversation_id=conversation_id, + role=participant_role + ) + participant_id = participant.name.split("participants/")[1].rstrip() + logging.info("""Created participant with ROLE: %s + and PARTICIPANT_ID: %s + """, + participant_role, + participant_id) + + # Open stream + mic_manager = CallStream(sample_rate, chunk_size, request_iterator) + sys.stdout.write(YELLOW) + sys.stdout.write("End (ms) Transcript Results/Status\n") + sys.stdout.write("====================================================\n") + + with mic_manager as stream: + while not stream.closed: + stream.terminate = False + while not stream.terminate: + try: + # print(f"New Streaming Analyze Request: {stream.restart_counter}") + stream.restart_counter += 1 + # Send request to streaming and get response. + responses = participant_management.analyze_content_audio_stream( + conversation_id=conversation_id, + participant_id=participant_id, + sample_rate_herz=sample_rate, + stream=stream, + timeout=restart_timeout, + language_code="en-US", + single_utterance=False, + ) + + # Now, print the final transcription responses to user. + for response in responses: + # print(response) + if response.human_agent_suggestion_results: + # print(response.human_agent_suggestion_results) + for sugg in response.human_agent_suggestion_results: + if sugg.suggest_faq_answers_response.faq_answers: + faq_answer = sugg.suggest_faq_answers_response.faq_answers[0] + print("=========== Suggestion ============") + print("Question:", faq_answer.question) + print("Suggested Answer:", faq_answer.answer) + print("Source Document:", faq_answer.source) + + if response.recognition_result.is_final: + # offset return from recognition_result is relative + # to the beginning of audio stream. + offset = response.recognition_result.speech_end_offset + stream.is_final_offset = int( + offset.seconds * 1000 + offset.microseconds / 1000 + ) + # Half-close the stream with gRPC + # (in Python just stop yielding requests) + stream.is_final = True + # print('terminated', stream.terminate) + + except DeadlineExceeded: + print("Deadline Exceeded, restarting.") + + if stream.terminate: + conversation_management.complete_conversation( + project_id=project_id, conversation_id=conversation_id + ) + break + \ No newline at end of file diff --git a/examples/ccai-agentassist-five9-grpc/server/utils/conversation_management.py b/examples/ccai-agentassist-five9-grpc/server/utils/conversation_management.py new file mode 100644 index 0000000000..5d57b9bd43 --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/server/utils/conversation_management.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python + +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Dialogflow API Python sample showing how to manage Conversations. +""" + +from google.cloud import dialogflow_v2beta1 as dialogflow + + +# [START dialogflow_create_conversation] +def create_conversation(project_id, conversation_profile_id, conversation_id): + """Creates a conversation with given values + + Args: + project_id: The GCP project linked with the conversation. + conversation_profile_id: The conversation profile id used to create + conversation.""" + + client = dialogflow.ConversationsClient() + conversation_profile_client = dialogflow.ConversationProfilesClient() + project_path = client.common_project_path(project_id) + conversation_profile_path = conversation_profile_client.conversation_profile_path( + project_id, + conversation_profile_id + ) + + conversation = {"conversation_profile": conversation_profile_path} + request = dialogflow.CreateConversationRequest( + parent=project_path, + conversation=conversation, + conversation_id=conversation_id + ) + + response = client.create_conversation( + request=request + ) + + print("Life Cycle State:", response.lifecycle_state) + print("Conversation Profile Name:", response.conversation_profile) + print("Name:", response.name) + return response + +# [END dialogflow_create_conversation] + +# [START dialogflow_get_conversation] +def get_conversation(project_id, conversation_id): + """Gets a specific conversation profile. + + Args: + project_id: The GCP project linked with the conversation. + conversation_id: Id of the conversation.""" + + client = dialogflow.ConversationsClient() + conversation_path = client.conversation_path(project_id, conversation_id) + + response = client.get_conversation(name=conversation_path) + + print("Life Cycle State:", response.lifecycle_state) + print("Conversation Profile Name:", response.conversation_profile) + print("Name:", response.name) + return response + +# [END dialogflow_get_conversation] + +# [START dialogflow_complete_conversation] +def complete_conversation(project_id, conversation_id): + """ + Completes the specified conversation. Finished conversations + are purged from the database after 30 days. + + Args: + project_id: The GCP project linked with the conversation. + conversation_id: Id of the conversation.""" + + client = dialogflow.ConversationsClient() + conversation_path = client.conversation_path(project_id, conversation_id) + conversation = client.complete_conversation(name=conversation_path) + print("Completed Conversation.") + print("Life Cycle State:", conversation.lifecycle_state) + print("Conversation Profile Name:", conversation.conversation_profile) + print("Name:", conversation.name) + return conversation + +# [END dialogflow_complete_conversation] diff --git a/examples/ccai-agentassist-five9-grpc/server/utils/participant_management.py b/examples/ccai-agentassist-five9-grpc/server/utils/participant_management.py new file mode 100644 index 0000000000..35c67cfb66 --- /dev/null +++ b/examples/ccai-agentassist-five9-grpc/server/utils/participant_management.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python + +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Dialogflow API Python sample showing how to manage Participants. +""" + +from google.cloud import dialogflow_v2beta1 as dialogflow +import google.auth + +ROLES = ["HUMAN_AGENT", "AUTOMATED_AGENT", "END_USER"] + +# [START dialogflow_create_participant] +def create_participant(project_id: str, conversation_id: str, role: str): + """Creates a participant in a given conversation. + + Args: + project_id: The GCP project linked with the conversation profile. + conversation_id: Id of the conversation. + participant: participant to be created.""" + + client = dialogflow.ParticipantsClient() + conversation_path = dialogflow.ConversationsClient.conversation_path( + project_id, conversation_id + ) + + if role in ROLES: + response = client.create_participant( + parent=conversation_path, participant={"role": role}, timeout=600 + ) + print("Participant Created.") + print("Role:", response.role) + print("Name:", response.name) + + return response + +# [END dialogflow_create_participant] + +# [START dialogflow_analyze_content_audio_stream] +def analyze_content_audio_stream( + conversation_id: str, + participant_id: str, + sample_rate_herz: int, + stream, + timeout: int, + language_code: str, + single_utterance=False, +): + """Stream audio streams to Dialogflow and receive transcripts and + suggestions. + + Args: + conversation_id: Id of the conversation. + participant_id: Id of the participant. + sample_rate_herz: herz rate of the sample. + stream: the stream to process. It should have generator() method to + yield input_audio. + timeout: the timeout of one stream. + language_code: the language code of the audio. Example: en-US + single_utterance: whether to use single_utterance. + """ + credentials, project_id = google.auth.default() + client = dialogflow.ParticipantsClient(credentials=credentials) + + participant_name = client.participant_path( + project_id, conversation_id, participant_id + ) + + audio_config = dialogflow.types.audio_config.InputAudioConfig( + audio_encoding=dialogflow.types.audio_config.AudioEncoding.AUDIO_ENCODING_LINEAR_16, + sample_rate_hertz=sample_rate_herz, + language_code=language_code, + single_utterance=single_utterance + ) + + def gen_requests(participant_name, audio_config, stream): + """Generates requests for streaming.""" + # print('Generating req') + audio_generator = stream.generator() + yield dialogflow.types.participant.StreamingAnalyzeContentRequest( + participant=participant_name, audio_config=audio_config, + ) + + for content in audio_generator: + yield dialogflow.types.participant.StreamingAnalyzeContentRequest( + input_audio=content, + ) + + return client.streaming_analyze_content( + gen_requests(participant_name, audio_config, stream), timeout=timeout + ) + +# [END dialogflow_analyze_content_audio_stream] + \ No newline at end of file diff --git a/helpers/exclusion_list.txt b/helpers/exclusion_list.txt index f213d0bae5..ff870b5f2e 100644 --- a/helpers/exclusion_list.txt +++ b/helpers/exclusion_list.txt @@ -64,3 +64,4 @@ ./examples/redis-cluster-gke ./examples/spinnaker ./examples/tensorflow-profiling-examples +./examples/ccai-agentassist-five9-grpc