Skip to content

Commit

Permalink
Some maintenance work tidying the code (#18)
Browse files Browse the repository at this point in the history
Cleanup some debug info
  • Loading branch information
efunneko authored Jul 12, 2024
1 parent 6ed035b commit 2920bcf
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 25 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
- name: Run Lint
continue-on-error: true
run: |
hatch run +py=312 lint:ruff check -o lint.json --output-format json ./src ./tests
hatch run lint:ruff check -o lint.json --output-format json
shell: bash

- name: Run Structured Tests
Expand All @@ -102,7 +102,7 @@ jobs:
shell: bash

- name: SonarQube Scan
if: always()
if: always() && github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
uses: sonarsource/sonarqube-scan-action@v2.2.0
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
Expand All @@ -118,6 +118,7 @@ jobs:
- name: SonarQube Quality Gate check
id: sonarqube-quality-gate-check
if: always() && github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
uses: sonarsource/sonarqube-quality-gate-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def from_properties(self, broker_properties: dict):

def build(self):
if self.broker_properties["broker_type"] == "solace":
print("Building Solace Messaging Service", self.broker_properties)
return SolaceMessaging(self.broker_properties)

raise ValueError(
Expand Down
32 changes: 14 additions & 18 deletions src/solace_ai_connector/common/messaging/solace_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,10 @@ def on_message(self, message: InboundMessage):
else message.get_payload_as_bytes()
)
if isinstance(payload, bytearray):
print(f"Received a message of type: {type(payload)}. Decoding to string")
payload = payload.decode()

topic = message.get_destination_name()
print("\n" + f"Received message on: {topic}")
print("\n" + f"Message payload: {payload} \n")
# topic = message.get_destination_name()
self.receiver.ack(message)
# print("\n" + f"Message dump: {message} \n")


class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
Expand All @@ -72,19 +68,16 @@ class ServiceEventHandler(
ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener
):
def on_reconnected(self, service_event: ServiceEvent):
print("\non_reconnected")
print(f"Error cause: {service_event.get_cause()}")
print(f"Message: {service_event.get_message()}")
log.debug("Reconnected to broker: %s", service_event.get_cause())
log.debug("Message: %s", service_event.get_message())

def on_reconnecting(self, event: "ServiceEvent"):
print("\non_reconnecting")
print(f"Error cause: {event.get_cause()}")
print(f"Message: {event.get_message()}")
log.debug("Reconnecting - Error cause: %s", event.get_cause())
log.debug("Message: %s", event.get_message())

def on_service_interrupted(self, event: "ServiceEvent"):
print("\non_service_interrupted")
print(f"Error cause: {event.get_cause()}")
print(f"Message: {event.get_message()}")
log.debug("Service interrupted - Error cause: %s", event.get_cause())
log.debug("Message: %s", event.get_message())


def set_python_solace_log_level(level: str):
Expand Down Expand Up @@ -112,7 +105,6 @@ def __init__(self, broker_properties: dict):
# set_python_solace_log_level("DEBUG")

def __del__(self):
print("DESTRUCTOR: SolaceMessaging")
self.disconnect()

def connect(self):
Expand Down Expand Up @@ -196,7 +188,11 @@ def bind_to_queue(self, queue_name: str, subscriptions: list = None):

# Handle API exception
except PubSubPlusClientError as exception:
print(f"\nMake sure queue {queue_name} exists on broker!", exception)
log.warning(
"Error creating persistent receiver for queue [%s], %s",
queue_name,
exception,
)

# Add to list of receivers
self.persistent_receivers.append(self.persistent_receiver)
Expand All @@ -206,15 +202,15 @@ def bind_to_queue(self, queue_name: str, subscriptions: list = None):
for subscription in subscriptions:
sub = TopicSubscription.of(subscription.get("topic"))
self.persistent_receiver.add_subscription(sub)
print(f"Subscribed to topic: {subscription}")
log.debug("Subscribed to topic: %s", subscription)

return self.persistent_receiver

def disconnect(self):
try:
self.messaging_service.disconnect()
except Exception as exception: # pylint: disable=broad-except
print(f"Error disconnecting: {exception}")
log.debug("Error disconnecting: %s", exception)

def is_connected(self):
return self.messaging_service.is_connected()
Expand Down
1 change: 0 additions & 1 deletion src/solace_ai_connector/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def import_from_directories(module_name, base_path=None):
if "." in module_name:
module_file = module_name.replace(".", os.sep)
module_path = os.path.join(directory, module_file + ".py")
# print(f"module_path: {module_path}")
if os.path.exists(module_path):
try:
# if module_path.startswith("src/solace_ai_connector"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ def invoke_model(
"configurable": {"session_id": session_id},
},
):
# print(f"Streaming chunk: {chunk.content}")
aggregate_result += chunk.content
current_batch += chunk.content
if len(current_batch.split()) >= self.stream_batch_size:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ def invoke(self, message, data):
component_processing = self.get_config("component_processing")
if component_processing and callable(component_processing):
return component_processing(message)
return data
return component_processing
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ def decode_payload(self, payload):
return payload

def acknowledge_message(self, broker_message):
# print("Acknowledging message")
self.messaging_service.ack_message(broker_message)

def get_acknowledgement_callback(self):
Expand Down
Empty file added test
Empty file.
121 changes: 121 additions & 0 deletions tests/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,124 @@ def test_missing_item_filter():

# Clean up
dispose_connector(connector)


def test_filter_with_multi_stage_data():
"""Test the filter component with a previous stage passing on data and the filter
input_transforms copying that data into a user_data area"""
config_yaml = """
log:
log_file_level: DEBUG
log_file: solace_ai_connector.log
flows:
- name: test_flow
components:
- component_name: user_processor
component_module: user_processor
component_config:
component_processing:
invoke:
module: invoke_functions
function: add
params:
positional:
- 5
- 6
- component_name: message_filter
component_module: message_filter
component_config:
filter_expression:
invoke:
module: invoke_functions
function: not_equal
params:
positional:
- 1
- 2
input_transforms:
- type: copy
source_expression: previous
dest_expression: user_data.output
- component_name: pass_through
component_module: pass_through
"""
connector, flows = create_test_flows(config_yaml, queue_timeout=1)
flow = flows[0]

# Send 1 message
message = Message(payload={"my_list": [1, 2, 3], "my_obj": {"a": 1, "b": 2}})
send_message_to_flow(flow, message)

# Expect a message
try:
output_message = get_message_from_flow(flow)
assert output_message.get_data("input.payload:my_list") == [1, 2, 3]
assert output_message.get_data("user_data.output") == 11
finally:
# Clean up
dispose_connector(connector)


def test_filter_with_multi_stage_data_with_timer_input():
"""Test the filter component with a previous stage passing on data and the filter
input_transforms copying that data into a user_data area - this time with a timer causing the message to be sent
"""
config_yaml = """
log:
log_file_level: DEBUG
log_file: solace_ai_connector.log
trace:
trace_file: solace_ai_connector.trace
flows:
- name: test_flow
components:
- component_name: timer_input
component_module: timer_input
component_config:
interval_ms: 500
skip_messages_if_behind: false
- component_name: user_processor
component_module: user_processor
component_config:
component_processing:
invoke:
module: invoke_functions
function: add
params:
positional:
- 5
- 6
- component_name: message_filter
component_module: message_filter
component_config:
filter_expression:
invoke:
module: invoke_functions
function: not_equal
params:
positional:
- 1
- 2
input_transforms:
- type: copy
source_expression: previous
dest_expression: user_data.output
- component_name: pass_through
component_module: pass_through
"""
connector, flows = create_test_flows(config_yaml, queue_timeout=3)
flow = flows[0]

try:
# Get the output messages (should be at least 3 seconds worth)
for _ in range(3):
msg = get_message_from_flow(flow)
assert msg.get_data("user_data.output") == 11
finally:
# Clean up
dispose_connector(connector)

0 comments on commit 2920bcf

Please sign in to comment.