Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move InMemorySubscriber from L2 to L3 client module #56

Merged
28 changes: 19 additions & 9 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

== Overview

This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Python defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <<sdk-packages>> below. Each package contains a README.adoc file that describes the purpose of the package and how to use it.
This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Python defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <<sdk-packages>> below and organized by the layers of the protocol.

Each package contains a README.adoc file that describes the purpose of the package and how to use it.

The module contains the factory methods, serializers, and validators for all data types defined in the specifications, and any data models that either haven't or couldn't be defined in uprotocol-core-api yet (ex. UPayload) This library fits into the big picture of the uProtocol SDK as seen in the diagram below.

Expand Down Expand Up @@ -63,23 +65,31 @@ The Library is broken up into different packages that are described in <<sdk-pac


.SDK Packages
[#sdk-packages,width=100%,cols="20%,80%",options="header"]
[#sdk-packages,width=100%,cols="1,2,5",options="header"]
|===

| Package | Purpose
| Package | Protocol Layer | Purpose

| link:uprotocol/client/README.adoc[`*client*`]
| https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l3[Application Layer (uP-L3)]
| Top level client-facing interfaces to communication with USubscription, UDiscovery, and UTwin services.

| link:uprotocol/communication/README.adoc[`*communication*`]
| https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l2[communication layer (uP-L2)]
| Common implementation of communication messaging patterns (publisher, subscriber, Rpcclient, RpcServer, etc..) that is build on top of the L1 transport interface (see below)

| link:uprotocol/transport/README.adoc[`*transport*`]
| https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/up-l1/README.adoc[Transport Layer (uP-L1)]
| Interface and data model for how to send() and receive() messages in a common way across various transport technologies (ex. zenoh, mqtt, http, etc...). the interface is implemented by transports (ex. up-transport-zenoh-python), and the interface is then used to build the uProtocol layer 2 communication layer implementation.

| link:uprotocol/uri/README.adoc[`*uri*`]
|
| Uniform Resource Identifier (RFC3986), how uProtocol addresses things (devices, software, methods, topics, etc...) on the network.

| link:uprotocol/uuid/README.adoc[`*uuid*`]
|
| Identifier used to uniquely identify (and timestamp) messages that are sent.

| link:uprotocol/communication/README.adoc[`*communication*`]
| Interface to build entities that use UTransport APIs to communicate with other entities. This is described in further detail on the up-spec page about https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l2[L2 APIs].

| link:uprotocol/transport/README.adoc[`*transport*`]
| Interface and data model declaration used for bidirectional point-2-point communication between uEs. This interface is then implemented by client libraries (described https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l1/README.adoc[here]) for a given underlying transport (ex. Binder, MQTT, Zenoh, SOME/IP, DDS, HTTP, etc…​)

|===

NOTE: Please visit the READMEs in <<sdk-packages>> for examples of how to use the different data types and their factories, validators, and serializers.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "up-python"
version = "0.1.3-dev"
version = "0.2.0-dev"
description = "Language specific uProtocol library for building and using UUri, UUID, UAttributes, UTransport, and more."
authors = ["Neelam Kushwah <neelam.kushwah@gm.com>"]
license = "The Apache License, Version 2.0"
Expand Down
Empty file added tests/test_client/__init__.py
Empty file.
Empty file.
Empty file.

Large diffs are not rendered by default.

20 changes: 15 additions & 5 deletions tests/test_communication/mock_utransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ async def send(self, message: UMessage) -> UStatus:

if message.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST:
response = self.build_response(message)
self._notify_listeners(response)
await self._notify_listeners(response)

return UStatus(code=UCode.OK)

def _notify_listeners(self, response: UMessage):
async def _notify_listeners(self, response: UMessage):
for listener in self.listeners:
listener.on_receive(response)
await listener.on_receive(response)

async def register_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus:
self.listeners.append(listener)
Expand Down Expand Up @@ -130,12 +130,22 @@ def build_response(self, request):
)


class CommStatusUCodeOKTransport(MockUTransport):
def build_response(self, request):
status = UStatus(code=UCode.OK, message="No Communication Error")
return (
UMessageBuilder.response_for_request(request.attributes)
.with_commstatus(status.code)
.build_from_upayload(UPayload.pack(status))
)


class EchoUTransport(MockUTransport):
def build_response(self, request):
return request

async def send(self, message):
response = self.build_response(message)
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(self._notify_listeners, response)

await self._notify_listeners(response)
return UStatus(code=UCode.OK)
13 changes: 12 additions & 1 deletion tests/test_communication/test_inmemoryrpcclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

import unittest

from tests.test_communication.mock_utransport import CommStatusTransport, MockUTransport, TimeoutUTransport
from tests.test_communication.mock_utransport import (
CommStatusTransport,
CommStatusUCodeOKTransport,
MockUTransport,
TimeoutUTransport,
)
from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient
from uprotocol.communication.upayload import UPayload
Expand Down Expand Up @@ -101,6 +106,12 @@ async def test_invoke_method_with_comm_status_transport(self):
self.assertEqual(UCode.FAILED_PRECONDITION, context.exception.status.code)
self.assertEqual("Communication error [FAILED_PRECONDITION]", context.exception.status.message)

async def test_invoke_method_with_comm_status_transport(self):
rpc_client = InMemoryRpcClient(CommStatusUCodeOKTransport())
payload = UPayload.pack_to_any(UUri())
response = await rpc_client.invoke_method(self.create_method_uri(), payload, None)
self.assertEqual(UCode.OK, UPayload.unpack(response, UStatus).code)

async def test_invoke_method_with_error_transport(self):
class ErrorUTransport(MockUTransport):
async def send(self, message):
Expand Down
21 changes: 20 additions & 1 deletion tests/test_communication/test_inmemoryrpcserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from unittest.mock import AsyncMock, MagicMock

from tests.test_communication.mock_utransport import EchoUTransport
from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient
from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer
from uprotocol.communication.requesthandler import RequestHandler
from uprotocol.communication.upayload import UPayload
Expand All @@ -27,6 +29,7 @@
from uprotocol.transport.utransport import UTransport
from uprotocol.uri.serializer.uriserializer import UriSerializer
from uprotocol.v1.ucode_pb2 import UCode
from uprotocol.v1.umessage_pb2 import UMessage
from uprotocol.v1.uri_pb2 import UUri
from uprotocol.v1.ustatus_pb2 import UStatus

Expand Down Expand Up @@ -136,7 +139,7 @@ async def custom_register_listener_behavior(source: UUri, listener: UListener, s
async def custom_send_behavior(message):
serialized_uri = UriSerializer().serialize(message.attributes.sink)
if serialized_uri in listeners:
listeners[serialized_uri].on_receive(message)
await listeners[serialized_uri].on_receive(message)
return UStatus(code=UCode.OK)

self.mock_transport.send = AsyncMock(side_effect=custom_send_behavior)
Expand Down Expand Up @@ -196,6 +199,22 @@ async def test_handle_requests_exception(self):
status = await transport.send(request)
self.assertEqual(status.code, UCode.OK)

async def test_end_to_end_rpc_with_test_transport(self):
class MyRequestHandler(RequestHandler):
def handle_request(self, message: UMessage) -> UPayload:
return UPayload.pack(UUri())

handler = MyRequestHandler()
test_transport = EchoUTransport()
server = InMemoryRpcServer(test_transport)
method = self.create_method_uri()

self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK)
rpc_client = InMemoryRpcClient(test_transport)
response = await rpc_client.invoke_method(method, None, CallOptions.DEFAULT)
self.assertIsNotNone(response)
self.assertEqual(response, UPayload.pack(UUri()))


if __name__ == '__main__':
unittest.main()
6 changes: 3 additions & 3 deletions tests/test_communication/test_simplenotifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def test_register_listener(self):
self.transport.get_source.return_value = self.source

class TestListener(UListener):
def on_receive(self, message: UMessage):
async def on_receive(self, message: UMessage):
pass

listener = TestListener()
Expand All @@ -78,7 +78,7 @@ async def test_unregister_notification_listener(self):
self.transport.unregister_listener.return_value = UStatus(code=UCode.OK)

class TestListener(UListener):
def on_receive(self, message: UMessage):
async def on_receive(self, message: UMessage):
pass

listener = TestListener()
Expand All @@ -97,7 +97,7 @@ async def test_unregister_listener_not_registered(self):
self.transport.unregister_listener.return_value = UStatus(code=UCode.NOT_FOUND)

class TestListener(UListener):
def on_receive(self, message: UMessage):
async def on_receive(self, message: UMessage):
pass

listener = TestListener()
Expand Down
51 changes: 2 additions & 49 deletions tests/test_communication/test_uclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
from uprotocol.communication.uclient import UClient
from uprotocol.communication.upayload import UPayload
from uprotocol.communication.ustatuserror import UStatusError
from uprotocol.core.usubscription.v3.usubscription_pb2 import (
SubscriptionResponse,
SubscriptionStatus,
UnsubscribeResponse,
)
from uprotocol.transport.builder.umessagebuilder import UMessageBuilder
from uprotocol.transport.ulistener import UListener
from uprotocol.v1.ucode_pb2 import UCode
from uprotocol.v1.umessage_pb2 import UMessage
Expand All @@ -36,7 +30,7 @@


class MyListener(UListener):
def on_receive(self, umsg: UMessage) -> None:
async def on_receive(self, umsg: UMessage) -> None:
# Handle receiving subscriptions here
assert umsg is not None

Expand Down Expand Up @@ -159,24 +153,6 @@ async def test_invoke_method_with_multi_invoke_transport(self):
self.assertFalse(future_result1.exception())
self.assertFalse(future_result2.exception())

async def test_subscribe_happy_path(self):
topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000)
transport = HappySubscribeUTransport()
upclient = UClient(transport)
subscription_response = await upclient.subscribe(topic, self.listener, CallOptions(timeout=5000))
# check for successfully subscribed
self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED)

async def test_unregister_listener(self):
topic = create_topic()
my_listener = create_autospec(UListener, instance=True)

subscriber = UClient(HappySubscribeUTransport())
subscription_response = await subscriber.subscribe(topic, my_listener, CallOptions.DEFAULT)
self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED)
status = await subscriber.unregister_listener(topic, my_listener)
self.assertEqual(status.code, UCode.OK)

async def test_registering_request_listener(self):
handler = create_autospec(RequestHandler, instance=True)
server = UClient(MockUTransport())
Expand Down Expand Up @@ -209,7 +185,7 @@ async def test_happy_path_for_all_apis_async(self):
client = UClient(MockUTransport())

class MyUListener(UListener):
def on_receive(self, umsg: UMessage) -> None:
async def on_receive(self, umsg: UMessage) -> None:
pass

class MyRequestHandler(RequestHandler):
Expand All @@ -225,9 +201,6 @@ async def run_tests():
client.notify(create_topic(), create_destination_uri()),
client.publish(create_topic()),
client.invoke_method(create_method_uri(), UPayload.pack(None), CallOptions.DEFAULT),
client.subscribe(create_topic(), listener),
client.unsubscribe(create_topic(), listener),
unregister_listener_not_registered(client, listener),
client.register_notification_listener(create_topic(), listener),
client.unregister_notification_listener(create_topic(), listener),
register_and_unregister_request_handler(client, handler),
Expand All @@ -251,26 +224,6 @@ def create_method_uri():
return UUri(authority_name="neelam", ue_id=4, ue_version_major=1, resource_id=3)


class HappySubscribeUTransport(MockUTransport):
def build_response(self, request):
return UMessageBuilder.response_for_request(request.attributes).build_from_upayload(
UPayload.pack(
SubscriptionResponse(
status=SubscriptionStatus(
state=SubscriptionStatus.State.SUBSCRIBED, message="Successfully Subscribed"
)
)
)
)


class HappyUnSubscribeUTransport(MockUTransport):
def build_response(self, request):
return UMessageBuilder.response_for_request(request.attributes).build_from_upayload(
UPayload.pack(UnsubscribeResponse())
)


class CommStatusTransport(MockUTransport):
def build_response(self, request):
status = UStatus(UCode.FAILED_PRECONDITION, "CommStatus Error")
Expand Down
8 changes: 4 additions & 4 deletions tests/test_transport/test_utransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@


class MyListener(UListener):
def on_receive(self, message):
super().on_receive(message)
async def on_receive(self, message):
await super().on_receive(message)
pass


Expand All @@ -36,7 +36,7 @@ async def send(self, message):
return UStatus(code=UCode.INVALID_ARGUMENT if message is None else UCode.OK)

async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus:
listener.on_receive(UMessage())
await listener.on_receive(UMessage())
return UStatus(code=UCode.OK)

async def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None):
Expand All @@ -54,7 +54,7 @@ async def send(self, message):
return UStatus(code=UCode.INTERNAL)

async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus:
listener.on_receive(None)
await listener.on_receive(None)
return UStatus(code=UCode.INTERNAL)

async def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None):
Expand Down
50 changes: 50 additions & 0 deletions uprotocol/client/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Application Layer APIs (uP-L3 Interface)

The following module includes the client-facing https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l3[Application Layer (uP-L3)] interfaces to communication with USubscription, UDiscovery, and UTwin services.


## uP-L3 Interfaces

.Interfaces (uP-L3 Interface)
[cols="1,1,3",options="header"]
|===
| Interface | Implementation(s) | Description

| xref:/v3/usubscriptionclient.py[*USubscriptionClient*] | xref:/v3/InMemoryUSubscriptionClient.py[InMemoryUSubscriptionClient] | Subscription Management APIs to subscribe(), unsubscribe() and fetch information from the subscription database.
|===


The module includes the interface for the client-facing APIs as well as a simple in-memory implementation that is based on the uP-L2 in-memory implementations. the term in-memory is used to indicate that the data required by the code is cached inside of the object and not persisted to a given database backend, this design is useful for embedded applications (i.e. in the vehicle) however will not scale to the multi-tenanted cloud applications.

## Examples


=== Subscribe and Unsubscribe
[,python]
----
transport = # your UTransport instance

#subscription topic
topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000)

#Listener to process incoming events on the topic
class MySubscriptionListener(UListener):
def on_receive(self, umsg: UMessage) -> None:
# Handle receiving published message
pass

listener = MySubscriptionListener()

# Optional handler that is called whenever the SubscriptionState changes for the subscriber
class MySubscriptionChangeHandler(SubscriptionChangeHandler)
def handle_subscription_change(self, topic: UUri, status: SubscriptionStatus) -> None:
# Handle subscription status changes if you're interested like when
# the subscription state changes from SUBSCRIBE_PENDING to SUBSCRIBED
pass

subscriber : InMemoryUSubscriptionClient = InMemoryUSubscriptionClient(transport)
response : SubscriptionResponse = subscriber.subscribe(topic, listener, CallOptions.DEFAULT, MySubscriptionChangeHandler())

#UnSubscribe from the topic
status : UStatus = subscriber.unsubscribe(topic, listener)
----
Empty file added uprotocol/client/__init__.py
Empty file.
Empty file.
Empty file.
Loading
Loading