Skip to content

Commit

Permalink
Add open() API to UTransport (#153)
Browse files Browse the repository at this point in the history
Some transports require initialization that should not be done inside of the constructor so we add the open() API that can be implemented to do the async initialization of the underlining transport.

#146
  • Loading branch information
Steven Hartley authored Jul 15, 2024
1 parent 1e96f9a commit b71c1ab
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 14 deletions.
40 changes: 26 additions & 14 deletions src/main/java/org/eclipse/uprotocol/transport/UTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,19 @@
package org.eclipse.uprotocol.transport;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;

import org.eclipse.uprotocol.v1.UCode;
import org.eclipse.uprotocol.v1.UMessage;
import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.v1.UUri;

/**
* UTransport is the uP-L1 interface that provides a common API for uE
* developers to send and receive messages.
* UTransport implementations contain the details for connecting to the
* underlying transport technology and
* sending UMessage using the configured technology. For more information please
* refer to
* UTransport is the uP-L1 interface that provides a common API for uE developers to send and receive messages.
* UTransport implementations contain the details for connecting to the underlying transport technology and
* sending UMessage using the configured technology. For more information please refer to
* https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l1/README.adoc.
*/

public interface UTransport {

/**
Expand All @@ -45,6 +42,7 @@ public interface UTransport {
*/
CompletionStage<UStatus> send(UMessage message);


/**
* Register {@code UListener} for {@code UUri} source filters to be called when
* a message is received.
Expand All @@ -62,6 +60,7 @@ default CompletionStage<UStatus> registerListener(UUri sourceFilter, UListener l
return registerListener(sourceFilter, null, listener);
}


/**
* Register {@code UListener} for {@code UUri} source and sink filters to be
* called when a message is received.
Expand All @@ -79,10 +78,10 @@ default CompletionStage<UStatus> registerListener(UUri sourceFilter, UListener l
*/
CompletionStage<UStatus> registerListener(UUri sourceFilter, UUri sinkFilter, UListener listener);


/**
* Unregister {@code UListener} for {@code UUri} source filters. Messages
* arriving on this topic will
* no longer be processed by this listener.
* arriving on this topic will no longer be processed by this listener.
*
* @param sourceFilter The UAttributes::source address pattern that the message
* to receive needs to match.
Expand All @@ -97,10 +96,10 @@ default CompletionStage<UStatus> unregisterListener(UUri sourceFilter, UListener
return unregisterListener(sourceFilter, null, listener);
}


/**
* Unregister {@code UListener} for {@code UUri} source and sink filters.
* Messages arriving on this topic will
* no longer be processed by this listener.
* Messages arriving on this topic will no longer be processed by this listener.
*
* @param sourceFilter The UAttributes::source address pattern that the message
* to receive needs to match.
Expand All @@ -115,18 +114,31 @@ default CompletionStage<UStatus> unregisterListener(UUri sourceFilter, UListener
*/
CompletionStage<UStatus> unregisterListener(UUri sourceFilter, UUri sinkFilter, UListener listener);


/**
* Return the source address for the uE (authority, entity, and resource
* information)
* Return the source address of the uE.
* The Source address is passed to the constructor of a given transport
*
* @return UUri containing the source address
*/
UUri getSource();


/**
* Open the connection to the transport that will trigger any registered listeners
* to be registered.
*
* @return Returns {@link UStatus} with {@link UCode.OK} if the connection is
* opened correctly, otherwise it returns with the appropriate failure.
*/
default CompletionStage<UStatus> open() {
return CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build());
}


/**
* Close the connection to the transport that will trigger any registered listeners
* to be unregistered.
*/
void close();
default void close() { }
}
37 changes: 37 additions & 0 deletions src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -84,6 +85,42 @@ public void test_unhappy_register_unlistener() {
assertEquals(result.toCompletableFuture().join().getCode(), UCode.INTERNAL);
}

@Test
@DisplayName("Test happy path calling open() API")
public void test_happy_open() {
UTransport transport = new HappyUTransport();
assertEquals(transport.open().toCompletableFuture().join().getCode(), UCode.OK);
}

@Test
@DisplayName("Test default oepn() and close() APIs")
public void test_default_open_close() {
UTransport transport = new UTransport() {
@Override
public CompletionStage<UStatus> send(UMessage message) {
return CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build());
}

@Override
public CompletionStage<UStatus> registerListener(UUri source, UUri sink, UListener listener) {
return CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build());
}

@Override
public CompletionStage<UStatus> unregisterListener(UUri source, UUri sink, UListener listener) {
return CompletableFuture.completedFuture(UStatus.newBuilder().setCode(UCode.OK).build());
}

@Override
public UUri getSource() {
return UUri.getDefaultInstance();
}
};

assertDoesNotThrow(() -> transport.close());
}


class MyListener implements UListener {
@Override
public void onReceive(UMessage message) {}
Expand Down

0 comments on commit b71c1ab

Please sign in to comment.