diff --git a/config.yaml b/config.yaml index fcb46677..360e99da 100644 --- a/config.yaml +++ b/config.yaml @@ -9,7 +9,6 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} diff --git a/examples/ack_test.yaml b/examples/ack_test.yaml index 08314aa9..41fb2eb5 100644 --- a/examples/ack_test.yaml +++ b/examples/ack_test.yaml @@ -15,7 +15,6 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} diff --git a/examples/anthropic_bedrock.yaml b/examples/anthropic_bedrock.yaml index 7c35bb60..03a0c6c9 100644 --- a/examples/anthropic_bedrock.yaml +++ b/examples/anthropic_bedrock.yaml @@ -3,7 +3,10 @@ # sends a message to an Anthropic Bedrock model, and # sends the response back to the Solace broker # It will ask the model to write a dry joke about the input -# message. It takes the entire payload of the input message +# message. +# Send a message to the Solace broker topics `my/topic1` or `my/topic2` +# with a plain text payload. The model will respond with a dry joke to the +# same topic prefixed with `response/`. (e.g. `response/my/topic1`) # # Dependencies: # pip install langchain_aws langchain_community @@ -28,12 +31,12 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} broker_password: ${SOLACE_BROKER_PASSWORD} broker_vpn: ${SOLACE_BROKER_VPN} + payload_encoding: utf-8 # List of flows flows: @@ -51,7 +54,6 @@ flows: qos: 1 - topic: my/topic2 qos: 1 - payload_encoding: utf-8 payload_format: text - component_name: llm @@ -81,13 +83,7 @@ flows: - component_name: solace_sw_broker component_module: broker_output component_config: - broker_connection_share: ${SOLACE_BROKER_URL} - broker_type: solace - broker_url: ${SOLACE_BROKER_URL} - broker_username: ${SOLACE_BROKER_USERNAME} - broker_password: ${SOLACE_BROKER_PASSWORD} - broker_vpn: ${SOLACE_BROKER_VPN} - payload_encoding: utf-8 + <<: *broker_connection payload_format: text input_transforms: - type: copy diff --git a/examples/error_handler.yaml b/examples/error_handler.yaml index a8c700e6..f2277949 100644 --- a/examples/error_handler.yaml +++ b/examples/error_handler.yaml @@ -25,7 +25,6 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} diff --git a/examples/request_reply.yaml b/examples/request_reply.yaml index 3cdae477..69e5834e 100644 --- a/examples/request_reply.yaml +++ b/examples/request_reply.yaml @@ -16,7 +16,6 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} diff --git a/src/solace_ai_connector/common/utils.py b/src/solace_ai_connector/common/utils.py index 003e2ff7..4996c3b1 100644 --- a/src/solace_ai_connector/common/utils.py +++ b/src/solace_ai_connector/common/utils.py @@ -6,6 +6,7 @@ import re import builtins import subprocess +import types from .log import log @@ -94,8 +95,11 @@ def resolve_config_values(config, allow_source_expression=False): return config -def import_module(name, base_path=None, component_package=None): - """Import a module by name""" +def import_module(module, base_path=None, component_package=None): + """Import a module by name or return the module object if it's already imported""" + + if isinstance(module, types.ModuleType): + return module if component_package: install_package(component_package) @@ -104,14 +108,13 @@ def import_module(name, base_path=None, component_package=None): if base_path not in sys.path: sys.path.append(base_path) try: - module = importlib.import_module(name) - return module + return importlib.import_module(module) except ModuleNotFoundError as exc: # If the module does not have a path associated with it, try # importing it from the known prefixes - annoying that this # is necessary. It seems you can't dynamically import a module # that is listed in an __init__.py file :( - if "." not in name: + if "." not in module: for prefix_prefix in ["solace_ai_connector", "."]: for prefix in [ ".components", @@ -123,22 +126,21 @@ def import_module(name, base_path=None, component_package=None): ".transforms", ".common", ]: - full_name = f"{prefix_prefix}{prefix}.{name}" + full_name = f"{prefix_prefix}{prefix}.{module}" try: if full_name.startswith("."): - module = importlib.import_module( + return importlib.import_module( full_name, package=__package__ ) else: - module = importlib.import_module(full_name) - return module + return importlib.import_module(full_name) except ModuleNotFoundError: pass except Exception as e: raise ImportError( f"Module load error for {full_name}: {e}" ) from e - raise ModuleNotFoundError(f"Module '{name}' not found") from exc + raise ModuleNotFoundError(f"Module '{module}' not found") from exc def invoke_config(config, allow_source_expression=False): diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index bd4c52c4..e20ecae9 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -10,6 +10,7 @@ from ..common.message import Message from ..common.trace_message import TraceMessage from ..common.event import Event, EventType +from ..flow.request_response_flow_controller import RequestResponseFlowController DEFAULT_QUEUE_TIMEOUT_MS = 200 DEFAULT_QUEUE_MAX_DEPTH = 5 @@ -38,6 +39,8 @@ def __init__(self, module_info, **kwargs): resolve_config_values(self.component_config) + self.request_response_flow_controllers = {} + self.next_component = None self.thread = None self.queue_timeout_ms = DEFAULT_QUEUE_TIMEOUT_MS @@ -58,6 +61,10 @@ def create_thread_and_run(self): return self.thread def run(self): + # Init the request response controllers here so that we know + # the connector is fully initialized and all flows are created + self.initialize_request_response_flow_controllers() + while not self.stop_signal.is_set(): event = None try: @@ -214,7 +221,11 @@ def get_config(self, key=None, default=None): val = self.component_config.get(key, None) if val is None: val = self.config.get(key, default) - if callable(val): + if callable(val) and key not in [ + "invoke_handler", + "get_next_event_handler", + "send_message_handler", + ]: if self.current_message is None: raise ValueError( f"Component {self.log_identifier} is trying to use an `invoke` config " @@ -365,3 +376,35 @@ def cleanup(self): self.input_queue.get_nowait() except queue.Empty: break + + def initialize_request_response_flow_controllers(self): + request_response_flow_controllers_config = self.config.get( + "request_response_flow_controllers", [] + ) + if request_response_flow_controllers_config: + for rrfc_config in request_response_flow_controllers_config: + name = rrfc_config.get("name") + if not name: + raise ValueError( + f"Request Response Flow Controller in component {self.name} does not have a name" + ) + + rrfc = RequestResponseFlowController( + config=rrfc_config, connector=self.connector + ) + + if not rrfc: + raise ValueError( + f"Request Response Flow Controller failed to initialize in component {self.name}" + ) + + self.request_response_flow_controllers[name] = rrfc + + def get_request_response_flow_controller(self, name): + return self.request_response_flow_controllers.get(name) + + def send_request_response_flow_message(self, rrfc_name, message, data): + rrfc = self.get_request_response_flow_controller(rrfc_name) + if rrfc: + return rrfc.send_message(message, data) + return None diff --git a/src/solace_ai_connector/components/general/delay.py b/src/solace_ai_connector/components/general/delay.py index d4a05d03..8d8aaf02 100644 --- a/src/solace_ai_connector/components/general/delay.py +++ b/src/solace_ai_connector/components/general/delay.py @@ -38,5 +38,6 @@ def __init__(self, **kwargs): super().__init__(info, **kwargs) def invoke(self, message, data): - sleep(self.get_config("delay")) + delay = self.get_config("delay") + sleep(delay) return deepcopy(data) diff --git a/src/solace_ai_connector/components/general/for_testing/handler_callback.py b/src/solace_ai_connector/components/general/for_testing/handler_callback.py new file mode 100644 index 00000000..12d0ea72 --- /dev/null +++ b/src/solace_ai_connector/components/general/for_testing/handler_callback.py @@ -0,0 +1,67 @@ +"""This test component allows a tester to configure callback handlers for + get_next_event, send_message and invoke methods""" + +from ...component_base import ComponentBase + + +info = { + "class_name": "HandlerCallback", + "description": ( + "This test component allows a tester to configure callback handlers for " + "get_next_event, send_message and invoke methods" + ), + "config_parameters": [ + { + "name": "get_next_event_handler", + "required": False, + "description": "The callback handler for the get_next_event method", + "type": "function", + }, + { + "name": "send_message_handler", + "required": False, + "description": "The callback handler for the send_message method", + "type": "function", + }, + { + "name": "invoke_handler", + "required": False, + "description": "The callback handler for the invoke method", + "type": "function", + }, + ], + "input_schema": { + "type": "object", + "properties": {}, + }, + "output_schema": { + "type": "object", + "properties": {}, + }, +} + + +class HandlerCallback(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.get_next_event_handler = self.get_config("get_next_event_handler") + self.send_message_handler = self.get_config("send_message_handler") + self.invoke_handler = self.get_config("invoke_handler") + + def get_next_event(self): + if self.get_next_event_handler: + return self.get_next_event_handler(self) + else: + return super().get_next_event() + + def send_message(self, message): + if self.send_message_handler: + return self.send_message_handler(self, message) + else: + return super().send_message(message) + + def invoke(self, message, data): + if self.invoke_handler: + return self.invoke_handler(self, message, data) + else: + return super().invoke(message, data) diff --git a/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py b/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py index 5fde36fa..ba7fe646 100644 --- a/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py +++ b/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py @@ -46,6 +46,11 @@ def __init__(self, **kwargs): def invoke(self, message, data): session_id = data.get("session_id") clear_history_but_keep_depth = data.get("clear_history_but_keep_depth") + try: + if clear_history_but_keep_depth is not None: + clear_history_but_keep_depth = max(0, int(clear_history_but_keep_depth)) + except (TypeError, ValueError): + clear_history_but_keep_depth = 0 messages = data.get("messages", []) with self.get_lock(self.history_key): diff --git a/src/solace_ai_connector/flow/flow.py b/src/solace_ai_connector/flow/flow.py index 9adfb27e..ea5091c0 100644 --- a/src/solace_ai_connector/flow/flow.py +++ b/src/solace_ai_connector/flow/flow.py @@ -66,6 +66,9 @@ def __init__( self.cache_service = connector.cache_service if connector else None self.create_components() + def get_input_queue(self): + return self.flow_input_queue + def create_components(self): # Loop through the components and create them for index, component in enumerate(self.flow_config.get("components", [])): @@ -77,23 +80,21 @@ def create_components(self): for component in component_group: component.set_next_component(self.component_groups[index + 1][0]) + self.flow_input_queue = self.component_groups[0][0].get_input_queue() + + def run(self): # Now one more time to create threads and run them - for index, component_group in enumerate(self.component_groups): + for _index, component_group in enumerate(self.component_groups): for component in component_group: thread = component.create_thread_and_run() self.threads.append(thread) - self.flow_input_queue = self.component_groups[0][0].get_input_queue() - def create_component_group(self, component, index): component_module = component.get("component_module", "") base_path = component.get("component_base_path", None) component_package = component.get("component_package", None) num_instances = component.get("num_instances", 1) - # component_config = component.get("component_config", {}) - # component_name = component.get("component_name", "") - # imported_module = import_from_directories(component_module) imported_module = import_module(component_module, base_path, component_package) try: @@ -136,6 +137,12 @@ def create_component_group(self, component, index): def get_flow_input_queue(self): return self.flow_input_queue + # This will set the next component in all the components in the + # last component group + def set_next_component(self, component): + for comp in self.component_groups[-1]: + comp.set_next_component(component) + def wait_for_threads(self): for thread in self.threads: thread.join() diff --git a/src/solace_ai_connector/flow/request_response_flow_controller.py b/src/solace_ai_connector/flow/request_response_flow_controller.py new file mode 100644 index 00000000..373f2ec7 --- /dev/null +++ b/src/solace_ai_connector/flow/request_response_flow_controller.py @@ -0,0 +1,130 @@ +""" +This file will handle sending a message to a named flow and then +receiving the output message from that flow. It will also support the result +message being a streamed message that comes in multiple parts. + +Each component can optionally create multiple of these using the configuration: + +```yaml +- name: example_flow + components: + - component_name: example_component + component_module: custom_component + request_response_flow_controllers: + - name: example_controller + flow_name: llm_flow + streaming: true + streaming_last_message_expression: input.payload:streaming.last_message + timeout_ms: 300000 +``` + +""" + +import queue +import time +from typing import Dict, Any + +from ..common.message import Message +from ..common.event import Event, EventType + + +# This is a very basic component which will be stitched onto the final component in the flow +class RequestResponseControllerOuputComponent: + def __init__(self, controller): + self.controller = controller + + def enqueue(self, event): + self.controller.enqueue_response(event) + + +# This is the main class that will be used to send messages to a flow and receive the response +class RequestResponseFlowController: + def __init__(self, config: Dict[str, Any], connector): + self.config = config + self.connector = connector + self.flow_name = config.get("flow_name") + self.streaming = config.get("streaming", False) + self.streaming_last_message_expression = config.get( + "streaming_last_message_expression" + ) + self.timeout_s = config.get("timeout_ms", 30000) / 1000 + self.input_queue = None + self.response_queue = None + self.enqueue_time = None + self.request_outstanding = False + + flow = connector.get_flow(self.flow_name) + + if not flow: + raise ValueError(f"Flow {self.flow_name} not found") + + self.setup_queues(flow) + + def setup_queues(self, flow): + # Input queue to send the message to the flow + self.input_queue = flow.get_input_queue() + + # Response queue to receive the response from the flow + self.response_queue = queue.Queue() + rrcComponent = RequestResponseControllerOuputComponent(self) + flow.set_next_component(rrcComponent) + + def send_message(self, message: Message, data: Any): + # Make a new message, but copy the data from the original message + payload = message.get_payload() + topic = message.get_topic() + user_properties = message.get_user_properties() + new_message = Message( + payload=payload, topic=topic, user_properties=user_properties + ) + new_message.set_previous(data) + + if not self.input_queue: + raise ValueError(f"Input queue for flow {self.flow_name} not found") + + event = Event(EventType.MESSAGE, new_message) + self.enqueue_time = time.time() + self.request_outstanding = True + self.input_queue.put(event) + return self.response_iterator + + def response_iterator(self): + while True: + now = time.time() + elapsed_time = now - self.enqueue_time + remaining_timeout = self.timeout_s - elapsed_time + if self.streaming: + # If we are in streaming mode, we will return individual messages + # until we receive the last message. Use the expression to determine + # if this is the last message + while True: + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + yield message, message.get_previous() + if self.streaming_last_message_expression: + last_message = message.get_data( + self.streaming_last_message_expression + ) + if last_message: + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.timeout_s: + raise TimeoutError("Timeout waiting for response") + + else: + # If we are not in streaming mode, we will return a single message + # and then stop the iterator + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + yield message, message.get_previous() + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.timeout_s: + raise TimeoutError("Timeout waiting for response") + + def enqueue_response(self, event): + self.response_queue.put(event) diff --git a/src/solace_ai_connector/solace_ai_connector.py b/src/solace_ai_connector/solace_ai_connector.py index f41f50f4..7c4a6881 100644 --- a/src/solace_ai_connector/solace_ai_connector.py +++ b/src/solace_ai_connector/solace_ai_connector.py @@ -62,6 +62,8 @@ def create_flows(self): flow_input_queue = flow_instance.get_flow_input_queue() self.flow_input_queues[flow.get("name")] = flow_input_queue self.flows.append(flow_instance) + for flow in self.flows: + flow.run() def create_flow(self, flow: dict, index: int, flow_instance_index: int): """Create a single flow""" @@ -202,6 +204,13 @@ def get_flows(self): """Return the flows""" return self.flows + def get_flow(self, flow_name): + """Return a specific flow by name""" + for flow in self.flows: + if flow.name == flow_name: + return flow + return None + def setup_cache_service(self): """Setup the cache service""" cache_config = self.config.get("cache", {}) diff --git a/tests/utils_for_test_files.py b/src/solace_ai_connector/test_utils/utils_for_test_files.py similarity index 62% rename from tests/utils_for_test_files.py rename to src/solace_ai_connector/test_utils/utils_for_test_files.py index 2bb9dfc9..15a64927 100644 --- a/tests/utils_for_test_files.py +++ b/src/solace_ai_connector/test_utils/utils_for_test_files.py @@ -1,8 +1,6 @@ -"""Collection of functions to be used in test files""" - +import os import queue import sys -import os import yaml sys.path.insert(0, os.path.abspath("src")) @@ -10,6 +8,7 @@ from solace_ai_connector.solace_ai_connector import SolaceAiConnector from solace_ai_connector.common.log import log from solace_ai_connector.common.event import Event, EventType +from solace_ai_connector.common.message import Message # from solace_ai_connector.common.message import Message @@ -61,12 +60,16 @@ def enqueue(self, message): self.next_component_queue.put(event) -def create_connector(config_yaml, event_handlers=None, error_queue=None): - """Create a connector from a config""" +def create_connector(config_or_yaml, event_handlers=None, error_queue=None): + """Create a connector from a config that can be an object or a yaml string""" + + config = config_or_yaml + if isinstance(config_or_yaml, str): + config = yaml.safe_load(config_or_yaml) # Create the connector connector = SolaceAiConnector( - yaml.safe_load(config_yaml), + config, event_handlers=event_handlers, error_queue=error_queue, ) @@ -76,15 +79,85 @@ def create_connector(config_yaml, event_handlers=None, error_queue=None): return connector -def create_test_flows(config_yaml, queue_timeout=None, error_queue=None, queue_size=0): +def run_component_test( + module_or_name, + validation_func, + component_config=None, + input_data=None, + input_messages=None, + input_selection=None, + input_transforms=None, +): + if not input_data and not input_messages: + raise ValueError("Either input_data or input_messages must be provided") + + if input_data and input_messages: + raise ValueError("Only one of input_data or input_messages can be provided") + + if input_data and not isinstance(input_data, list): + input_data = [input_data] + + if input_messages and not isinstance(input_messages, list): + input_messages = [input_messages] + + if not input_messages: + input_messages = [] + + if input_selection: + if isinstance(input_selection, str): + input_selection = {"source_expression": input_selection} + + connector = None + try: + connector, flows = create_test_flows( + { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "test_component", + "component_module": module_or_name, + "component_config": component_config or {}, + "input_selection": input_selection, + "input_transforms": input_transforms, + } + ], + } + ] + } + ) + + if input_data: + for data in input_data: + message = Message(payload=data) + message.set_previous(data) + input_messages.append(message) + + # Send each message through, one at a time + for message in input_messages: + send_message_to_flow(flows[0], message) + output_message = get_message_from_flow(flows[0]) + validation_func(output_message.get_previous(), output_message, message) + + finally: + if connector: + dispose_connector(connector) + + +def create_test_flows( + config_or_yaml, queue_timeout=None, error_queue=None, queue_size=0 +): # Create the connector - connector = create_connector(config_yaml, error_queue=error_queue) + connector = create_connector(config_or_yaml, error_queue=error_queue) flows = connector.get_flows() # For each of the flows, add the input and output components flow_info = [] for flow in flows: + if flow.flow_config.get("test_ignore", False): + continue input_component = TestInputComponent( flow.component_groups[0][0].get_input_queue() ) diff --git a/tests/test_acks.py b/tests/test_acks.py index c067fb56..bf0b1eae 100644 --- a/tests/test_acks.py +++ b/tests/test_acks.py @@ -1,11 +1,11 @@ """This file tests acks in a flow""" import sys -import queue sys.path.append("src") +import queue -from utils_for_test_files import ( # pylint: disable=wrong-import-position +from solace_ai_connector.test_utils.utils_for_test_files import ( # pylint: disable=wrong-import-position # create_connector, # create_and_run_component, dispose_connector, diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index a288410e..8826f6ab 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -1,8 +1,11 @@ """Some tests to verify the aggregate component works as expected""" +import sys + +sys.path.append("src") import time -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, dispose_connector, send_message_to_flow, diff --git a/tests/test_config_file.py b/tests/test_config_file.py index 5bd34f73..593bc0e9 100644 --- a/tests/test_config_file.py +++ b/tests/test_config_file.py @@ -1,19 +1,26 @@ """Test various things related to the configuration file""" import sys -import yaml import pytest +import yaml sys.path.append("src") -from utils_for_test_files import ( # pylint: disable=wrong-import-position +from solace_ai_connector.test_utils.utils_for_test_files import ( # pylint: disable=wrong-import-position create_connector, + create_test_flows, + dispose_connector, + send_message_to_flow, + get_message_from_flow, ) from solace_ai_connector.solace_ai_connector import ( # pylint: disable=wrong-import-position SolaceAiConnector, ) +from solace_ai_connector.common.message import Message +import solace_ai_connector.components.general.pass_through + # from solace_ai_connector.common.log import log @@ -143,6 +150,46 @@ def test_no_component_module(): assert str(e) == "component_module not provided in flow 0, component 0" +def test_static_import_and_object_config(): + """Test that we can statically import a module and pass an object for the config""" + + config = { + "log": {"log_file_level": "DEBUG", "log_file": "solace_ai_connector.log"}, + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "delay1", + "component_module": solace_ai_connector.components.general.pass_through, + "component_config": {"delay": 0.1}, + "input_selection": {"source_expression": "input.payload"}, + } + ], + } + ], + } + connector = None + try: + connector, flows = create_test_flows(config) + + # Test pushing a simple message through the delay component + message = Message(payload={"text": "Hello, World!"}) + send_message_to_flow(flows[0], message) + + # Get the output message + output_message = get_message_from_flow(flows[0]) + + # Check that the output is correct + assert output_message.get_data("previous") == {"text": "Hello, World!"} + + except Exception as e: + pytest.fail(f"Test failed with exception: {e}") + finally: + if "connector" in locals(): + dispose_connector(connector) + + def test_bad_module(): """Test that the program exits if the component module is not found""" try: diff --git a/tests/test_error_flows.py b/tests/test_error_flows.py index b8ff4d72..8e7edfe6 100644 --- a/tests/test_error_flows.py +++ b/tests/test_error_flows.py @@ -2,11 +2,11 @@ import sys -# import queue - sys.path.append("src") -from utils_for_test_files import ( # pylint: disable=wrong-import-position +# import queue + +from solace_ai_connector.test_utils.utils_for_test_files import ( # pylint: disable=wrong-import-position create_test_flows, # create_and_run_component, dispose_connector, diff --git a/tests/test_filter.py b/tests/test_filter.py index b43b72f8..478cc94d 100644 --- a/tests/test_filter.py +++ b/tests/test_filter.py @@ -1,8 +1,12 @@ """Some tests to verify the filter component works as expected""" +import sys + +sys.path.append("src") + # import pytest -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, # create_connector, dispose_connector, diff --git a/tests/test_flows.py b/tests/test_flows.py index 6196ae1e..4687fda9 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -1,9 +1,12 @@ """This test file tests all things to do with the flows and the components that make up the flows""" +import sys + +sys.path.append("src") import pytest import time -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, create_connector, dispose_connector, diff --git a/tests/test_invoke.py b/tests/test_invoke.py index 58d0b771..fa0de0f3 100644 --- a/tests/test_invoke.py +++ b/tests/test_invoke.py @@ -5,13 +5,14 @@ sys.path.append("src") -from utils_for_test_files import ( # pylint: disable=wrong-import-position + +from solace_ai_connector.test_utils.utils_for_test_files import ( create_and_run_component, ) -from solace_ai_connector.common.utils import ( # pylint: disable=wrong-import-position +from solace_ai_connector.common.utils import ( resolve_config_values, ) -from solace_ai_connector.common.message import ( # pylint: disable=wrong-import-position +from solace_ai_connector.common.message import ( Message, ) @@ -1083,16 +1084,13 @@ def test_invoke_with_uuid_generator(): response = resolve_config_values( { "a": { - "invoke": { - "module": "invoke_functions", - "function": "uuid" - }, + "invoke": {"module": "invoke_functions", "function": "uuid"}, }, } - ) - + ) + # Check if the output is of type string assert type(response["a"]) == str # Check if the output is a valid UUID - assert len(response["a"]) == 36 \ No newline at end of file + assert len(response["a"]) == 36 diff --git a/tests/test_iterate.py b/tests/test_iterate.py index a33baccc..cffb7630 100644 --- a/tests/test_iterate.py +++ b/tests/test_iterate.py @@ -1,8 +1,12 @@ """Some tests to verify the iterate component works as expected""" +import sys + +sys.path.append("src") + # import pytest -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, # create_connector, dispose_connector, diff --git a/tests/test_message_get_set_data.py b/tests/test_message_get_set_data.py index f2258d1f..33622eda 100644 --- a/tests/test_message_get_set_data.py +++ b/tests/test_message_get_set_data.py @@ -1,11 +1,11 @@ """This test fixture will test the get_data and set_data methods of the Message class""" +import sys +sys.path.append("src") import json import base64 -import sys import pytest -sys.path.append("src") from solace_ai_connector.common.message import Message # Create a few different messages to test with diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py new file mode 100644 index 00000000..5719badd --- /dev/null +++ b/tests/test_request_response_controller.py @@ -0,0 +1,227 @@ +import sys + +sys.path.append("src") + +from solace_ai_connector.test_utils.utils_for_test_files import ( + create_test_flows, + dispose_connector, + send_message_to_flow, + get_message_from_flow, +) +from solace_ai_connector.common.message import Message + + +def test_request_response_flow_controller_basic(): + """Test basic functionality of the RequestResponseFlowController""" + + def test_invoke_handler(component, message, _data): + # Call the request_response_flow + data_iter = component.send_request_response_flow_message( + "test_controller", message, {"test": "data"} + ) + + # Just a single message with no streaming + for message, _data in data_iter(): + assert message.get_data("previous") == {"test": "data"} + assert message.get_data("input.payload") == {"text": "Hello, World!"} + + return "done" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "request_response_flow_controllers": [ + { + "name": "test_controller", + "flow_name": "request_response_flow", + "timeout_ms": 500000, + } + ], + } + ], + }, + { + "name": "request_response_flow", + "test_ignore": True, + "components": [ + {"component_name": "responder", "component_module": "pass_through"} + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] + + try: + + # Send a message to the input flow + send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + assert output_message.get_data("previous") == "done" + + finally: + dispose_connector(connector) + + +# Test simple streaming request response +# Use the iterate component to break a single message into multiple messages +def test_request_response_flow_controller_streaming(): + """Test streaming functionality of the RequestResponseFlowController""" + + def test_invoke_handler(component, message, data): + # Call the request_response_flow + data_iter = component.send_request_response_flow_message( + "test_controller", + message, + [ + {"test": "data1", "streaming": {"last_message": False}}, + {"test": "data2", "streaming": {"last_message": False}}, + {"test": "data3", "streaming": {"last_message": True}}, + ], + ) + + # Expecting 3 messages + results = [] + for message, data in data_iter(): + results.append(data.get("test")) + + assert results == ["data1", "data2", "data3"] + return "done" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "request_response_flow_controllers": [ + { + "name": "test_controller", + "flow_name": "request_response_flow", + "streaming": True, + "streaming_last_message_expression": "previous:streaming.last_message", + "timeout_ms": 500000, + } + ], + } + ], + }, + { + "name": "request_response_flow", + "test_ignore": True, + "components": [ + {"component_name": "responder", "component_module": "iterate"} + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] + + try: + + # Send a message to the input flow + send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + assert output_message.get_data("previous") == "done" + + except Exception as e: + print(e) + assert False + + finally: + dispose_connector(connector) + + +# Test the timeout functionality +def test_request_response_flow_controller_timeout(): + """Test timeout functionality of the RequestResponseFlowController""" + + def test_invoke_handler(component, message, data): + # Call the request_response_flow + data_iter = component.send_request_response_flow_message( + "test_controller", message, {"test": "data"} + ) + + # This will timeout + try: + for message, data in data_iter(): + assert message.get_data("previous") == {"test": "data"} + assert message.get_data("input.payload") == {"text": "Hello, World!"} + except TimeoutError: + return "timeout" + return "done" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "request_response_flow_controllers": [ + { + "name": "test_controller", + "flow_name": "request_response_flow", + "timeout_ms": 1000, + } + ], + } + ], + }, + { + "name": "request_response_flow", + "test_ignore": True, + "components": [ + { + "component_name": "responder", + "component_module": "delay", + "component_config": { + "delay": 5, + }, + } + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] + + try: + + # Send a message to the input flow + send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + assert output_message.get_data("previous") == "timeout" + + finally: + dispose_connector(connector) diff --git a/tests/test_timer_input.py b/tests/test_timer_input.py index 343a7d87..b8897e2f 100644 --- a/tests/test_timer_input.py +++ b/tests/test_timer_input.py @@ -1,9 +1,12 @@ """Test the timer input component""" +import sys + +sys.path.append("src") import time import pytest -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, create_connector, dispose_connector, diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 9b001505..2efe69cc 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -4,14 +4,16 @@ sys.path.append("src") -from utils_for_test_files import ( # pylint: disable=wrong-import-position +from solace_ai_connector.test_utils.utils_for_test_files import ( # pylint: disable=wrong-import-position create_connector, create_and_run_component, + run_component_test, # dispose_connector, ) from solace_ai_connector.common.message import ( # pylint: disable=wrong-import-position Message, ) +import solace_ai_connector.components.general.pass_through def test_basic_copy_transform(): @@ -44,6 +46,64 @@ def test_basic_copy_transform(): assert output_message.get_data("previous") == "Hello, World!" +def test_transform_with_run_component_test(): + """This test is actually testing the test infrastructure method: run_component_test""" + + def validation_func(output_data, output_message, _input_message): + assert output_data == "Hello, World!" + assert output_message.get_data("user_data.temp") == { + "payload": {"text": "Hello, World!", "greeting": "Static Greeting!"} + } + + run_component_test( + "pass_through", + validation_func, + input_data={"text": "Hello, World!"}, + input_transforms=[ + { + "type": "copy", + "source_expression": "input.payload", + "dest_expression": "user_data.temp:payload", + }, + { + "type": "copy", + "source_value": "Static Greeting!", + "dest_expression": "user_data.temp:payload.greeting", + }, + ], + input_selection={"source_expression": "user_data.temp:payload.text"}, + ) + + +def test_transform_with_run_component_test_with_static_import(): + """This test is actually testing the test infrastructure method: run_component_test""" + + def validation_func(output_data, output_message, _input_message): + assert output_data == "Hello, World!" + assert output_message.get_data("user_data.temp") == { + "payload": {"text": "Hello, World!", "greeting": "Static Greeting!"} + } + + run_component_test( + solace_ai_connector.components.general.pass_through, + validation_func, + input_data={"text": "Hello, World!"}, + input_transforms=[ + { + "type": "copy", + "source_expression": "input.payload", + "dest_expression": "user_data.temp:payload", + }, + { + "type": "copy", + "source_value": "Static Greeting!", + "dest_expression": "user_data.temp:payload.greeting", + }, + ], + input_selection={"source_expression": "user_data.temp:payload.text"}, + ) + + def test_basic_map_transform(): """Test the basic map transform""" # Create a simple configuration