Skip to content

Commit

Permalink
Merge pull request #220 from TelestionTeam/feat-database-improvements
Browse files Browse the repository at this point in the history
Feat database improvements
  • Loading branch information
jvpichowski authored Mar 2, 2021
2 parents 6df7715 + a67b0ab commit 23a73c3
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
//import de.jvpichowski.rocketsound.messages.base.GpsData;
//import de.jvpichowski.rocketsound.messages.base.Position;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.telestion.core.connection.EventbusTcpBridge;
import org.telestion.core.database.DataListener;
import org.telestion.core.database.DataService;
import org.telestion.core.database.MongoDatabaseService;
import org.telestion.core.message.Address;
Expand All @@ -24,7 +24,7 @@ public static void main(String[] args) {

/*Launcher.start(
new MessageLogger(),
new MockRocketPublisher(Address.incoming(MongoDatabaseService.class, "save")),
new MockRocketPublisher(Address.outgoing(MockRocketPublisher.class, "pub")),
new EventbusTcpBridge(
"localhost", 9870,
List.of(
Expand All @@ -34,10 +34,18 @@ public static void main(String[] args) {
Address.incoming(DataService.class, "find")
),
List.of(
Address.outgoing(MockRocketPublisher.class, "pub")
Address.outgoing(MockRocketPublisher.class, "pub"),
Address.outgoing(MongoDatabaseService.class, "save")
)),
new MongoDatabaseService("raketenpraktikum", "raketenpraktikumPool"),
new DataService(dataTypeMap, Collections.emptyMap())
);*/
/*new DataService(Collections.emptyMap()),
new DataListener(
List.of(
Address.outgoing(MockRocketPublisher.class, "pub")
)
)
);*/
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@
* }
* </pre>
* </p>
* <p>
* To support that the one can send data type specific data on the outbound address vertx supports regex.
* You can add a regex to <code>new PermittedOptions()</code>.
* </p>
* <p>
* An example looks like this:
*
* <pre>
* {@code
* SockJSBridgeOptions sockJSBridgeOptions = new SockJSBridgeOptions()
* .addOutboundPermitted(new PermittedOptions().setAddressRegex("(<Address>)(\/(\S+))?"));
* }
* </pre>
* Which results in permission granted to all messages to the given address optionally suffixed with e.g. "/className.
* </p>
*
* @see <a href="../../../../../../../README.md">README.md</a> for more information
*/
Expand Down Expand Up @@ -104,7 +119,9 @@ private Router bridgeHandler(List<String> inboundPermitted, List<String> outboun
inboundPermitted
.forEach(addr -> sockJsBridgeOptions.addInboundPermitted(new PermittedOptions().setAddress(addr)));
outboundPermitted
.forEach(addr -> sockJsBridgeOptions.addOutboundPermitted(new PermittedOptions().setAddress(addr)));
.forEach(addr -> sockJsBridgeOptions.addOutboundPermitted(new PermittedOptions().setAddressRegex(
"(" + addr + ")(\\/(\\S+))?"
)));

SockJSHandler sockJsHandler = SockJSHandler.create(vertx);
return sockJsHandler.bridge(sockJsBridgeOptions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.telestion.core.database;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.telestion.api.message.JsonMessage;
import org.telestion.api.config.Config;
import org.telestion.core.message.Address;

public final class DataListener extends AbstractVerticle {
private final Configuration forcedConfig;
private Configuration config;

private final Logger logger = LoggerFactory.getLogger(DataListener.class);

private final String save = Address.incoming(DataService.class, "save");

public DataListener() {
this.forcedConfig = null;
}

public DataListener(List<String> listeningAddresses) {
this.forcedConfig = new Configuration(listeningAddresses);
}

@Override
public void start(Promise<Void> startPromise) throws Exception {
config = Config.get(forcedConfig, config(), Configuration.class);
this.registerConsumers();
startPromise.complete();
}

private void registerConsumers() {
config.listeningAddresses().forEach(address -> {
vertx.eventBus().consumer(address, document -> {
JsonMessage.on(JsonMessage.class, document, msg -> {
vertx.eventBus().publish(save, msg.json());
});
});
});
}

private static record Configuration(@JsonProperty List<String> listeningAddresses) {
private Configuration() {
this(Collections.emptyList());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import io.vertx.core.json.JsonObject;
import java.util.Optional;
import org.telestion.api.message.JsonMessage;

public record DataOperation(
@JsonProperty String operationAddress,
@JsonProperty JsonObject data,
@JsonProperty Optional<JsonObject> params) implements JsonMessage {
@JsonProperty JsonObject params) implements JsonMessage {
private DataOperation() {
this("", new JsonObject(), Optional.of(new JsonObject()));
this(new JsonObject(), new JsonObject());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import org.telestion.api.message.JsonMessage;

public record DataRequest(
@JsonProperty List<String> classNames,
@JsonProperty String className,
@JsonProperty JsonObject query,
@JsonProperty String operation,
@JsonProperty JsonObject query) implements JsonMessage {
@JsonProperty JsonObject operationParams) implements JsonMessage {
private DataRequest() {
this(List.of(""), "", new JsonObject());
this("", new JsonObject(), "", new JsonObject());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import io.vertx.core.*;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.util.*;
import org.slf4j.Logger;
Expand All @@ -14,10 +13,7 @@
/**
* DataService is a verticle which is the interface to a underlying database implementation.
* All data requests should come to the DataService and will be parsed and executed.
* TODO: Save data with the DataService. Right now the save command is handled by the db directly.
* TODO: DataService listens for publishes of new data from UART / Mavlink / ...
* TODO: DataOperations like Integrate, Differentiate, Offset, Sum, ...
* TODO: Change dataTypeMap to Class.forName(...) and get the class name string from frontend.
* TODO: MongoDB Queries explanation and implementation in fetchLatestData.
*
*/
Expand All @@ -44,11 +40,11 @@ public DataService() {
/**
* This constructor supplies default options.
*
* @param dataTypeMap Map of String->Class<?> for incoming dataRequests
* @param dataTypes List of all full class names of the data types
* @param dataOperationMap Map of String->DataOperation for incoming dataRequests
*/
public DataService(Map<String, Class<?>> dataTypeMap, Map<String, DataOperation> dataOperationMap) {
this.forcedConfig = new Configuration(dataTypeMap, dataOperationMap);
public DataService(Map<String, DataOperation> dataOperationMap) {
this.forcedConfig = new Configuration(dataOperationMap);
}

@Override
Expand All @@ -70,108 +66,69 @@ private void registerConsumers() {
request.fail(-1, res.cause().getMessage());
return;
}
logger.info(res.result().toString());
request.reply(res.result());
});
});
});
vertx.eventBus().consumer(inSave, document -> {
vertx.eventBus().request(dbSave, document, res -> {
if (res.failed()) {
logger.error(res.cause().getMessage());
document.fail(-1, res.cause().getMessage());
return;
}
logger.info(res.result().toString());
document.reply(res.result().body());
JsonMessage.on(JsonMessage.class, document, doc -> {
vertx.eventBus().request(dbSave, doc.json(), res -> {
if (res.failed()) {
logger.error(res.cause().getMessage());
document.fail(-1, res.cause().getMessage());
return;
}
document.reply(res.result().body());
});
});
});
}

/**
* Parse and dispatch incoming DataRequests.
*
* @param request Determines which dataTypes should be retrieved and if an Operation should be executed.
* @param request Determines which dataType should be retrieved and if an Operation should be executed.
* @param resultHandler Handles the request to the underlying database. Can be failed or succeeded.
*/
private void dataRequestDispatcher(DataRequest request, Handler<AsyncResult<JsonObject>> resultHandler) {
var dataTypes = new ArrayList<Class<?>>();
request.classNames().forEach(clazz -> {
if (config.dataTypeMap.get(clazz) != null) {
dataTypes.add(config.dataTypeMap.get(clazz));
}
});
if (dataTypes.size() == 1) {
if (!request.operation().equals("")) {
logger.info("Operations are not yet supported!");
/* TODO:
DataOperation dOp = new DataOperation(request.operation().get().operationAddress(),
new JsonObject(), request.operation().get().params());
this.fetchLatestData(request.dataTypes().get(0), res -> {
// TODO: If className is empty, check if query exists and just pass the query to the DatabaseClient
try {
var dataType = Class.forName(request.className());
if (request.operation().isEmpty()) {
this.fetchLatestData(dataType, request.query(), res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause().getMessage()));
return;
}
dOp.data().put("data", res.result());
}).applyManipulation(dOp, resultHandler);*/
resultHandler.handle(Future.succeededFuture(res.result()));
});
} else {
this.fetchLatestData(dataTypes.get(0), res -> {
var dataOperation = new DataOperation(new JsonObject(), request.operationParams());
this.fetchLatestData(dataType, request.query(), res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause().getMessage()));
return;
}
resultHandler.handle(Future.succeededFuture(res.result()));
dataOperation.data().put("data", res.result());
});
this.applyManipulation(request.operation(), dataOperation, resultHandler);
}
} else {
this.fetchLatestData(dataTypes, res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause().getMessage()));
return;
}
JsonObject wrapped = new JsonObject().put("data", res.result());
resultHandler.handle(Future.succeededFuture(wrapped));
});
} catch (ClassNotFoundException e) {
logger.error("ClassNotFoundException: {}", e.getMessage());
}
}

/**
* Method to fetch the latest data of a specified data types.
* Request data from another verticle and handle the result of the request.
*
* @param dataTypes Determines which data types should be fetched.
* @param resultHandler Handles the request to the underlying database. Can be failed or succeeded.
* @return the data service to chain operations.
* @param address Address String of the desired verticle.
* @param message Object to send to the desired verticle.
* @param resultHandler Handles the result of the requested operation.
*/
private DataService fetchLatestData(List<Class<?>> dataTypes,
Handler<AsyncResult<JsonArray>> resultHandler) {
JsonArray result = new JsonArray();
dataTypes.forEach(dataType -> {
DbRequest dbRequest = new DbRequest(dataType, new JsonObject());
vertx.eventBus().request(dbFind, dbRequest, reply -> {
if (reply.failed()) {
logger.error(reply.cause().getMessage());
resultHandler.handle(Future.failedFuture(reply.cause().getMessage()));
return;
}
JsonObject dataRes = new JsonObject();
result.add(dataRes.put("data", reply.result().body()));
});
});
resultHandler.handle(Future.succeededFuture(result));
return this;
}

/**
* Method to fetch the latest data of a specified data type.
*
* @param dataType Determines which data type should be fetched.
* @param resultHandler Handles the request to the underlying database. Can be failed or succeeded.
* @return the data service to chain operations.
*/
private DataService fetchLatestData(Class<?> dataType, Handler<AsyncResult<JsonObject>> resultHandler) {
private void requestResultHandler(
String address, JsonMessage message, Handler<AsyncResult<JsonObject>> resultHandler) {
JsonObject result = new JsonObject();
DbRequest dbRequest = new DbRequest(dataType, new JsonObject());
vertx.eventBus().request(dbFind, dbRequest, reply -> {
vertx.eventBus().request(address, message, reply -> {
if (reply.failed()) {
logger.error(reply.cause().getMessage());
resultHandler.handle(Future.failedFuture(reply.cause().getMessage()));
Expand All @@ -180,34 +137,37 @@ private DataService fetchLatestData(Class<?> dataType, Handler<AsyncResult<JsonO
result.put("data", reply.result().body());
resultHandler.handle(Future.succeededFuture(result));
});
return this;
}

/**
* Method to fetch the latest data of a specified data type.
*
* @param dataType Determines which data type should be fetched.
* @param query MongoDB query, can be empty JsonObject if no specific query is needed.
* @param resultHandler Handles the request to the underlying database. Can be failed or succeeded.
*/
private void fetchLatestData(Class<?> dataType, JsonObject query,
Handler<AsyncResult<JsonObject>> resultHandler) {
DbRequest dbRequest = new DbRequest(dataType, query);
this.requestResultHandler(dbFind, dbRequest, resultHandler);
}

/**
* Apply data operation to fetched data.
*
* @param dataOperation Determines which manipulation should be applied.
* @param resultHandler Handles the request to the data operation verticle. Can be failed or succeeded.
* @param dataOperation Determines which manipulation should be applied.
* @param resultHandler Handles the request to the data operation verticle. Can be failed or succeeded.
*/
private void applyManipulation(DataOperation dataOperation, Handler<AsyncResult<JsonObject>> resultHandler) {
JsonObject result = new JsonObject();
vertx.eventBus().request(dataOperation.operationAddress(), dataOperation, reply -> {
if (reply.failed()) {
logger.error(reply.cause().getMessage());
resultHandler.handle(Future.failedFuture(reply.cause().getMessage()));
return;
}
result.put("data", reply.result().body());
resultHandler.handle(Future.succeededFuture(result));
});
private void applyManipulation(String operationAddress, DataOperation dataOperation,
Handler<AsyncResult<JsonObject>> resultHandler) {
this.requestResultHandler(operationAddress, dataOperation, resultHandler);
}

private static record Configuration(
@JsonProperty Map<String, Class<?>> dataTypeMap,
@JsonProperty Map<String, DataOperation> dataOperationMap
) {
private Configuration() {
this(Collections.emptyMap(), Collections.emptyMap());
this(Collections.emptyMap());
}
}
}
Loading

0 comments on commit 23a73c3

Please sign in to comment.