Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test ci #19

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a133bd2
Remove launch.json
efunneko Apr 27, 2024
d9b3d36
Merge remote-tracking branch 'upstream/main'
efunneko Apr 27, 2024
c9f4091
Merge remote-tracking branch 'upstream/main'
efunneko Apr 27, 2024
dd861b0
Merge remote-tracking branch 'upstream/main'
efunneko Apr 29, 2024
7785992
Merge remote-tracking branch 'upstream/main'
efunneko May 5, 2024
0ae3420
Merge remote-tracking branch 'upstream/main'
efunneko May 9, 2024
2f7ec70
chore: Refactor SlackReceiver to handle channel events and join new c…
efunneko May 15, 2024
d852259
* Add ability for a component to send a message directly to a named flow
efunneko May 23, 2024
dd5b7f9
feat: Update default stream_batch_size to 15 in LangChainChatModelWit…
efunneko May 27, 2024
79b1021
Merge remote-tracking branch 'upstream/main'
efunneko May 27, 2024
8b01c1c
Merge remote-tracking branch 'upstream/main'
efunneko May 30, 2024
261d095
Update import statement in main.py
efunneko Jun 5, 2024
002e9b9
Another major reorganization of directory structure to make it more s…
efunneko Jun 6, 2024
fea2759
Fixed some documentation generation after package reorganization
efunneko Jun 6, 2024
3b421b8
Merge branch 'main' into main
efunneko Jun 6, 2024
5b21a74
Merge remote-tracking branch 'upstream/main'
efunneko Jun 6, 2024
bcac8ea
chore: Remove unused slack.yaml configuration file
efunneko Jun 6, 2024
5e4d350
Made some changes in utils.py for dynamic loading. We will no longer …
efunneko Jun 8, 2024
8c809e8
Moved slack components into their own plugin: solace-ai-connector-slack.
efunneko Jun 9, 2024
9cbba1d
chore: Update component_base.py to include flow_lock_manager and flow…
efunneko Jun 18, 2024
fc21829
chore: Update trust_store_path for Solace API
efunneko Jun 18, 2024
85bf89a
Bump up to latest Solace API and small fix in a debug log
efunneko Jul 8, 2024
f140ca6
DATAGO-79372: Add Publish workflow (#3)
AmanRiat1 Jul 8, 2024
df0fd3d
DATAGO-78654 : Add CI (#4)
artyom-morozov Jul 8, 2024
1c15df5
Merge remote-tracking branch 'upstream/main'
efunneko Jul 11, 2024
714e239
chore: Fix a bug in the user_processor component to properly return s…
efunneko Jul 11, 2024
1873ae4
Test clean up prints (#7)
artyom-morozov Jul 11, 2024
c1a5ba8
Add coverage CI workflow
artyom-morozov Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ jobs:
shell: bash

- name: SonarQube Scan
if: always()
if: always() && github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
uses: sonarsource/sonarqube-scan-action@v2.2.0
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
Expand All @@ -118,6 +118,7 @@ jobs:

- name: SonarQube Quality Gate check
id: sonarqube-quality-gate-check
if: always() && github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
uses: sonarsource/sonarqube-quality-gate-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def from_properties(self, broker_properties: dict):

def build(self):
if self.broker_properties["broker_type"] == "solace":
print("Building Solace Messaging Service", self.broker_properties)
return SolaceMessaging(self.broker_properties)

raise ValueError(
Expand Down
32 changes: 14 additions & 18 deletions src/solace_ai_connector/common/messaging/solace_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,10 @@ def on_message(self, message: InboundMessage):
else message.get_payload_as_bytes()
)
if isinstance(payload, bytearray):
print(f"Received a message of type: {type(payload)}. Decoding to string")
payload = payload.decode()

topic = message.get_destination_name()
print("\n" + f"Received message on: {topic}")
print("\n" + f"Message payload: {payload} \n")
# topic = message.get_destination_name()
self.receiver.ack(message)
# print("\n" + f"Message dump: {message} \n")


class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
Expand All @@ -72,19 +68,16 @@ class ServiceEventHandler(
ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener
):
def on_reconnected(self, service_event: ServiceEvent):
print("\non_reconnected")
print(f"Error cause: {service_event.get_cause()}")
print(f"Message: {service_event.get_message()}")
log.debug("Reconnected to broker: %s", service_event.get_cause())
log.debug("Message: %s", service_event.get_message())

def on_reconnecting(self, event: "ServiceEvent"):
print("\non_reconnecting")
print(f"Error cause: {event.get_cause()}")
print(f"Message: {event.get_message()}")
log.debug("Reconnecting - Error cause: %s", event.get_cause())
log.debug("Message: %s", event.get_message())

def on_service_interrupted(self, event: "ServiceEvent"):
print("\non_service_interrupted")
print(f"Error cause: {event.get_cause()}")
print(f"Message: {event.get_message()}")
log.debug("Service interrupted - Error cause: %s", event.get_cause())
log.debug("Message: %s", event.get_message())


def set_python_solace_log_level(level: str):
Expand Down Expand Up @@ -112,7 +105,6 @@ def __init__(self, broker_properties: dict):
# set_python_solace_log_level("DEBUG")

def __del__(self):
print("DESTRUCTOR: SolaceMessaging")
self.disconnect()

def connect(self):
Expand Down Expand Up @@ -196,7 +188,11 @@ def bind_to_queue(self, queue_name: str, subscriptions: list = None):

# Handle API exception
except PubSubPlusClientError as exception:
print(f"\nMake sure queue {queue_name} exists on broker!", exception)
log.warning(
"Error creating persistent receiver for queue [%s], %s",
queue_name,
exception,
)

# Add to list of receivers
self.persistent_receivers.append(self.persistent_receiver)
Expand All @@ -206,15 +202,15 @@ def bind_to_queue(self, queue_name: str, subscriptions: list = None):
for subscription in subscriptions:
sub = TopicSubscription.of(subscription.get("topic"))
self.persistent_receiver.add_subscription(sub)
print(f"Subscribed to topic: {subscription}")
log.debug("Subscribed to topic: %s", subscription)

return self.persistent_receiver

def disconnect(self):
try:
self.messaging_service.disconnect()
except Exception as exception: # pylint: disable=broad-except
print(f"Error disconnecting: {exception}")
log.debug("Error disconnecting: %s", exception)

def is_connected(self):
return self.messaging_service.is_connected()
Expand Down
1 change: 0 additions & 1 deletion src/solace_ai_connector/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def import_from_directories(module_name, base_path=None):
if "." in module_name:
module_file = module_name.replace(".", os.sep)
module_path = os.path.join(directory, module_file + ".py")
# print(f"module_path: {module_path}")
if os.path.exists(module_path):
try:
# if module_path.startswith("src/solace_ai_connector"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ def invoke_model(
"configurable": {"session_id": session_id},
},
):
# print(f"Streaming chunk: {chunk.content}")
aggregate_result += chunk.content
current_batch += chunk.content
if len(current_batch.split()) >= self.stream_batch_size:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ def invoke(self, message, data):
component_processing = self.get_config("component_processing")
if component_processing and callable(component_processing):
return component_processing(message)
return data
return component_processing
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ def decode_payload(self, payload):
return payload

def acknowledge_message(self, broker_message):
# print("Acknowledging message")
self.messaging_service.ack_message(broker_message)

def get_acknowledgement_callback(self):
Expand Down
Empty file added test
Empty file.
121 changes: 121 additions & 0 deletions tests/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,124 @@ def test_missing_item_filter():

# Clean up
dispose_connector(connector)


def test_filter_with_multi_stage_data():
"""Test the filter component with a previous stage passing on data and the filter
input_transforms copying that data into a user_data area"""
config_yaml = """
log:
log_file_level: DEBUG
log_file: solace_ai_connector.log
flows:
- name: test_flow
components:
- component_name: user_processor
component_module: user_processor
component_config:
component_processing:
invoke:
module: invoke_functions
function: add
params:
positional:
- 5
- 6
- component_name: message_filter
component_module: message_filter
component_config:
filter_expression:
invoke:
module: invoke_functions
function: not_equal
params:
positional:
- 1
- 2
input_transforms:
- type: copy
source_expression: previous
dest_expression: user_data.output
- component_name: pass_through
component_module: pass_through


"""
connector, flows = create_test_flows(config_yaml, queue_timeout=1)
flow = flows[0]

# Send 1 message
message = Message(payload={"my_list": [1, 2, 3], "my_obj": {"a": 1, "b": 2}})
send_message_to_flow(flow, message)

# Expect a message
try:
output_message = get_message_from_flow(flow)
assert output_message.get_data("input.payload:my_list") == [1, 2, 3]
assert output_message.get_data("user_data.output") == 11
finally:
# Clean up
dispose_connector(connector)


def test_filter_with_multi_stage_data_with_timer_input():
"""Test the filter component with a previous stage passing on data and the filter
input_transforms copying that data into a user_data area - this time with a timer causing the message to be sent
"""
config_yaml = """
log:
log_file_level: DEBUG
log_file: solace_ai_connector.log
trace:
trace_file: solace_ai_connector.trace
flows:
- name: test_flow
components:
- component_name: timer_input
component_module: timer_input
component_config:
interval_ms: 500
skip_messages_if_behind: false
- component_name: user_processor
component_module: user_processor
component_config:
component_processing:
invoke:
module: invoke_functions
function: add
params:
positional:
- 5
- 6
- component_name: message_filter
component_module: message_filter
component_config:
filter_expression:
invoke:
module: invoke_functions
function: not_equal
params:
positional:
- 1
- 2
input_transforms:
- type: copy
source_expression: previous
dest_expression: user_data.output
- component_name: pass_through
component_module: pass_through


"""
connector, flows = create_test_flows(config_yaml, queue_timeout=3)
flow = flows[0]

try:
# Get the output messages (should be at least 3 seconds worth)
for _ in range(3):
msg = get_message_from_flow(flow)
assert msg.get_data("user_data.output") == 11
finally:
# Clean up
dispose_connector(connector)

Loading