diff --git a/modules/telestion-application/src/main/java/org/telestion/application/RocketSound.java b/modules/telestion-application/src/main/java/org/telestion/application/RocketSound.java index 6da3fdd2..d439e803 100644 --- a/modules/telestion-application/src/main/java/org/telestion/application/RocketSound.java +++ b/modules/telestion-application/src/main/java/org/telestion/application/RocketSound.java @@ -4,9 +4,9 @@ //import de.jvpichowski.rocketsound.messages.base.GpsData; //import de.jvpichowski.rocketsound.messages.base.Position; import java.util.Collections; -import java.util.HashMap; import java.util.List; import org.telestion.core.connection.EventbusTcpBridge; +import org.telestion.core.database.DataListener; import org.telestion.core.database.DataService; import org.telestion.core.database.MongoDatabaseService; import org.telestion.core.message.Address; @@ -24,7 +24,7 @@ public static void main(String[] args) { /*Launcher.start( new MessageLogger(), - new MockRocketPublisher(Address.incoming(MongoDatabaseService.class, "save")), + new MockRocketPublisher(Address.outgoing(MockRocketPublisher.class, "pub")), new EventbusTcpBridge( "localhost", 9870, List.of( @@ -34,10 +34,18 @@ public static void main(String[] args) { Address.incoming(DataService.class, "find") ), List.of( - Address.outgoing(MockRocketPublisher.class, "pub") + Address.outgoing(MockRocketPublisher.class, "pub"), + Address.outgoing(MongoDatabaseService.class, "save") )), new MongoDatabaseService("raketenpraktikum", "raketenpraktikumPool"), new DataService(dataTypeMap, Collections.emptyMap()) + );*/ + /*new DataService(Collections.emptyMap()), + new DataListener( + List.of( + Address.outgoing(MockRocketPublisher.class, "pub") + ) + ) );*/ } } diff --git a/modules/telestion-core/src/main/java/org/telestion/core/connection/EventbusTcpBridge.java b/modules/telestion-core/src/main/java/org/telestion/core/connection/EventbusTcpBridge.java index ef3407b8..a2566d5c 100644 --- a/modules/telestion-core/src/main/java/org/telestion/core/connection/EventbusTcpBridge.java +++ b/modules/telestion-core/src/main/java/org/telestion/core/connection/EventbusTcpBridge.java @@ -48,6 +48,21 @@ * } * *

+ *

+ * To support that the one can send data type specific data on the outbound address vertx supports regex. + * You can add a regex to new PermittedOptions(). + *

+ *

+ * An example looks like this: + * + *

+ * {@code
+ * 	SockJSBridgeOptions sockJSBridgeOptions = new SockJSBridgeOptions()
+ * 	.addOutboundPermitted(new PermittedOptions().setAddressRegex("(
)(\/(\S+))?")); + * } + *
+ * Which results in permission granted to all messages to the given address optionally suffixed with e.g. "/className. + *

* * @see README.md for more information */ @@ -104,7 +119,9 @@ private Router bridgeHandler(List inboundPermitted, List outboun inboundPermitted .forEach(addr -> sockJsBridgeOptions.addInboundPermitted(new PermittedOptions().setAddress(addr))); outboundPermitted - .forEach(addr -> sockJsBridgeOptions.addOutboundPermitted(new PermittedOptions().setAddress(addr))); + .forEach(addr -> sockJsBridgeOptions.addOutboundPermitted(new PermittedOptions().setAddressRegex( + "(" + addr + ")(\\/(\\S+))?" + ))); SockJSHandler sockJsHandler = SockJSHandler.create(vertx); return sockJsHandler.bridge(sockJsBridgeOptions); diff --git a/modules/telestion-core/src/main/java/org/telestion/core/database/DataListener.java b/modules/telestion-core/src/main/java/org/telestion/core/database/DataListener.java new file mode 100644 index 00000000..6b67ed19 --- /dev/null +++ b/modules/telestion-core/src/main/java/org/telestion/core/database/DataListener.java @@ -0,0 +1,52 @@ +package org.telestion.core.database; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import java.util.Collections; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.telestion.api.message.JsonMessage; +import org.telestion.api.config.Config; +import org.telestion.core.message.Address; + +public final class DataListener extends AbstractVerticle { + private final Configuration forcedConfig; + private Configuration config; + + private final Logger logger = LoggerFactory.getLogger(DataListener.class); + + private final String save = Address.incoming(DataService.class, "save"); + + public DataListener() { + this.forcedConfig = null; + } + + public DataListener(List listeningAddresses) { + this.forcedConfig = new Configuration(listeningAddresses); + } + + @Override + public void start(Promise startPromise) throws Exception { + config = Config.get(forcedConfig, config(), Configuration.class); + this.registerConsumers(); + startPromise.complete(); + } + + private void registerConsumers() { + config.listeningAddresses().forEach(address -> { + vertx.eventBus().consumer(address, document -> { + JsonMessage.on(JsonMessage.class, document, msg -> { + vertx.eventBus().publish(save, msg.json()); + }); + }); + }); + } + + private static record Configuration(@JsonProperty List listeningAddresses) { + private Configuration() { + this(Collections.emptyList()); + } + } +} diff --git a/modules/telestion-core/src/main/java/org/telestion/core/database/DataOperation.java b/modules/telestion-core/src/main/java/org/telestion/core/database/DataOperation.java index f349c670..bf26ee55 100644 --- a/modules/telestion-core/src/main/java/org/telestion/core/database/DataOperation.java +++ b/modules/telestion-core/src/main/java/org/telestion/core/database/DataOperation.java @@ -2,14 +2,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.vertx.core.json.JsonObject; -import java.util.Optional; import org.telestion.api.message.JsonMessage; public record DataOperation( - @JsonProperty String operationAddress, @JsonProperty JsonObject data, - @JsonProperty Optional params) implements JsonMessage { + @JsonProperty JsonObject params) implements JsonMessage { private DataOperation() { - this("", new JsonObject(), Optional.of(new JsonObject())); + this(new JsonObject(), new JsonObject()); } } diff --git a/modules/telestion-core/src/main/java/org/telestion/core/database/DataRequest.java b/modules/telestion-core/src/main/java/org/telestion/core/database/DataRequest.java index d8845aea..68ecb256 100644 --- a/modules/telestion-core/src/main/java/org/telestion/core/database/DataRequest.java +++ b/modules/telestion-core/src/main/java/org/telestion/core/database/DataRequest.java @@ -6,10 +6,11 @@ import org.telestion.api.message.JsonMessage; public record DataRequest( - @JsonProperty List classNames, + @JsonProperty String className, + @JsonProperty JsonObject query, @JsonProperty String operation, - @JsonProperty JsonObject query) implements JsonMessage { + @JsonProperty JsonObject operationParams) implements JsonMessage { private DataRequest() { - this(List.of(""), "", new JsonObject()); + this("", new JsonObject(), "", new JsonObject()); } } diff --git a/modules/telestion-core/src/main/java/org/telestion/core/database/DataService.java b/modules/telestion-core/src/main/java/org/telestion/core/database/DataService.java index 39bc0a60..947ab2ba 100644 --- a/modules/telestion-core/src/main/java/org/telestion/core/database/DataService.java +++ b/modules/telestion-core/src/main/java/org/telestion/core/database/DataService.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.vertx.core.*; -import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import java.util.*; import org.slf4j.Logger; @@ -14,10 +13,7 @@ /** * DataService is a verticle which is the interface to a underlying database implementation. * All data requests should come to the DataService and will be parsed and executed. - * TODO: Save data with the DataService. Right now the save command is handled by the db directly. - * TODO: DataService listens for publishes of new data from UART / Mavlink / ... * TODO: DataOperations like Integrate, Differentiate, Offset, Sum, ... - * TODO: Change dataTypeMap to Class.forName(...) and get the class name string from frontend. * TODO: MongoDB Queries explanation and implementation in fetchLatestData. * */ @@ -44,11 +40,11 @@ public DataService() { /** * This constructor supplies default options. * - * @param dataTypeMap Map of String->Class for incoming dataRequests + * @param dataTypes List of all full class names of the data types * @param dataOperationMap Map of String->DataOperation for incoming dataRequests */ - public DataService(Map> dataTypeMap, Map dataOperationMap) { - this.forcedConfig = new Configuration(dataTypeMap, dataOperationMap); + public DataService(Map dataOperationMap) { + this.forcedConfig = new Configuration(dataOperationMap); } @Override @@ -70,20 +66,20 @@ private void registerConsumers() { request.fail(-1, res.cause().getMessage()); return; } - logger.info(res.result().toString()); request.reply(res.result()); }); }); }); vertx.eventBus().consumer(inSave, document -> { - vertx.eventBus().request(dbSave, document, res -> { - if (res.failed()) { - logger.error(res.cause().getMessage()); - document.fail(-1, res.cause().getMessage()); - return; - } - logger.info(res.result().toString()); - document.reply(res.result().body()); + JsonMessage.on(JsonMessage.class, document, doc -> { + vertx.eventBus().request(dbSave, doc.json(), res -> { + if (res.failed()) { + logger.error(res.cause().getMessage()); + document.fail(-1, res.cause().getMessage()); + return; + } + document.reply(res.result().body()); + }); }); }); } @@ -91,87 +87,48 @@ private void registerConsumers() { /** * Parse and dispatch incoming DataRequests. * - * @param request Determines which dataTypes should be retrieved and if an Operation should be executed. + * @param request Determines which dataType should be retrieved and if an Operation should be executed. * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. */ private void dataRequestDispatcher(DataRequest request, Handler> resultHandler) { - var dataTypes = new ArrayList>(); - request.classNames().forEach(clazz -> { - if (config.dataTypeMap.get(clazz) != null) { - dataTypes.add(config.dataTypeMap.get(clazz)); - } - }); - if (dataTypes.size() == 1) { - if (!request.operation().equals("")) { - logger.info("Operations are not yet supported!"); - /* TODO: - DataOperation dOp = new DataOperation(request.operation().get().operationAddress(), - new JsonObject(), request.operation().get().params()); - this.fetchLatestData(request.dataTypes().get(0), res -> { + // TODO: If className is empty, check if query exists and just pass the query to the DatabaseClient + try { + var dataType = Class.forName(request.className()); + if (request.operation().isEmpty()) { + this.fetchLatestData(dataType, request.query(), res -> { if (res.failed()) { resultHandler.handle(Future.failedFuture(res.cause().getMessage())); return; } - dOp.data().put("data", res.result()); - }).applyManipulation(dOp, resultHandler);*/ + resultHandler.handle(Future.succeededFuture(res.result())); + }); } else { - this.fetchLatestData(dataTypes.get(0), res -> { + var dataOperation = new DataOperation(new JsonObject(), request.operationParams()); + this.fetchLatestData(dataType, request.query(), res -> { if (res.failed()) { resultHandler.handle(Future.failedFuture(res.cause().getMessage())); return; } - resultHandler.handle(Future.succeededFuture(res.result())); + dataOperation.data().put("data", res.result()); }); + this.applyManipulation(request.operation(), dataOperation, resultHandler); } - } else { - this.fetchLatestData(dataTypes, res -> { - if (res.failed()) { - resultHandler.handle(Future.failedFuture(res.cause().getMessage())); - return; - } - JsonObject wrapped = new JsonObject().put("data", res.result()); - resultHandler.handle(Future.succeededFuture(wrapped)); - }); + } catch (ClassNotFoundException e) { + logger.error("ClassNotFoundException: {}", e.getMessage()); } } /** - * Method to fetch the latest data of a specified data types. + * Request data from another verticle and handle the result of the request. * - * @param dataTypes Determines which data types should be fetched. - * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. - * @return the data service to chain operations. + * @param address Address String of the desired verticle. + * @param message Object to send to the desired verticle. + * @param resultHandler Handles the result of the requested operation. */ - private DataService fetchLatestData(List> dataTypes, - Handler> resultHandler) { - JsonArray result = new JsonArray(); - dataTypes.forEach(dataType -> { - DbRequest dbRequest = new DbRequest(dataType, new JsonObject()); - vertx.eventBus().request(dbFind, dbRequest, reply -> { - if (reply.failed()) { - logger.error(reply.cause().getMessage()); - resultHandler.handle(Future.failedFuture(reply.cause().getMessage())); - return; - } - JsonObject dataRes = new JsonObject(); - result.add(dataRes.put("data", reply.result().body())); - }); - }); - resultHandler.handle(Future.succeededFuture(result)); - return this; - } - - /** - * Method to fetch the latest data of a specified data type. - * - * @param dataType Determines which data type should be fetched. - * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. - * @return the data service to chain operations. - */ - private DataService fetchLatestData(Class dataType, Handler> resultHandler) { + private void requestResultHandler( + String address, JsonMessage message, Handler> resultHandler) { JsonObject result = new JsonObject(); - DbRequest dbRequest = new DbRequest(dataType, new JsonObject()); - vertx.eventBus().request(dbFind, dbRequest, reply -> { + vertx.eventBus().request(address, message, reply -> { if (reply.failed()) { logger.error(reply.cause().getMessage()); resultHandler.handle(Future.failedFuture(reply.cause().getMessage())); @@ -180,34 +137,37 @@ private DataService fetchLatestData(Class dataType, Handler dataType, JsonObject query, + Handler> resultHandler) { + DbRequest dbRequest = new DbRequest(dataType, query); + this.requestResultHandler(dbFind, dbRequest, resultHandler); } /** * Apply data operation to fetched data. * - * @param dataOperation Determines which manipulation should be applied. - * @param resultHandler Handles the request to the data operation verticle. Can be failed or succeeded. + * @param dataOperation Determines which manipulation should be applied. + * @param resultHandler Handles the request to the data operation verticle. Can be failed or succeeded. */ - private void applyManipulation(DataOperation dataOperation, Handler> resultHandler) { - JsonObject result = new JsonObject(); - vertx.eventBus().request(dataOperation.operationAddress(), dataOperation, reply -> { - if (reply.failed()) { - logger.error(reply.cause().getMessage()); - resultHandler.handle(Future.failedFuture(reply.cause().getMessage())); - return; - } - result.put("data", reply.result().body()); - resultHandler.handle(Future.succeededFuture(result)); - }); + private void applyManipulation(String operationAddress, DataOperation dataOperation, + Handler> resultHandler) { + this.requestResultHandler(operationAddress, dataOperation, resultHandler); } private static record Configuration( - @JsonProperty Map> dataTypeMap, @JsonProperty Map dataOperationMap ) { private Configuration() { - this(Collections.emptyMap(), Collections.emptyMap()); + this(Collections.emptyMap()); } } } diff --git a/modules/telestion-core/src/main/java/org/telestion/core/database/MongoDatabaseService.java b/modules/telestion-core/src/main/java/org/telestion/core/database/MongoDatabaseService.java index 8ec5decd..3812643c 100644 --- a/modules/telestion-core/src/main/java/org/telestion/core/database/MongoDatabaseService.java +++ b/modules/telestion-core/src/main/java/org/telestion/core/database/MongoDatabaseService.java @@ -85,6 +85,9 @@ private void registerConsumers() { * Save the received document to the database. * If a MongoDB-ObjectId is specified data will be upserted, meaning if the id does not exist it will be inserted, * otherwise it will be updated. Else it will be inserted with a new id. + * If the save was successful the database looks for the newly saved document and publishes it to the database + * outgoing address concatenated with "/Class.name". With this behaviour clients (e.g. Frontend) can listen + * to the outgoing address of a specific data value and will always be provided with the most recent data. * * @param document a JsonMessage validated through the JsonMessage.on method */ @@ -102,7 +105,7 @@ private void save(JsonMessage document) { return; } DbResponse dbRes = new DbResponse(document.getClass(), rec.result()); - vertx.eventBus().publish(outSave, dbRes.json()); + vertx.eventBus().publish(outSave.concat("/").concat(document.className()), dbRes.json()); }); }); } diff --git a/modules/telestion-mavlink/mavlink b/modules/telestion-mavlink/mavlink index d6a1b6d5..73afa7c6 160000 --- a/modules/telestion-mavlink/mavlink +++ b/modules/telestion-mavlink/mavlink @@ -1 +1 @@ -Subproject commit d6a1b6d5a08514801086a42edfb49d14b0cff9c7 +Subproject commit 73afa7c60c3fa43d230b66a777eca2ec6b1fbcff diff --git a/plugins/telestion-space/telestion-plugin-daedalus2 b/plugins/telestion-space/telestion-plugin-daedalus2 index 3cdb514c..881280cd 160000 --- a/plugins/telestion-space/telestion-plugin-daedalus2 +++ b/plugins/telestion-space/telestion-plugin-daedalus2 @@ -1 +1 @@ -Subproject commit 3cdb514cbe9f2f038bf7f26219544344d82537cf +Subproject commit 881280cd9fda579560ce7cb7b1552ec0e1d45655