Skip to content

Commit

Permalink
Added tests for MQTT-Create and fine-grained authorisation
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed Jan 18, 2025
1 parent 840dccf commit 199d373
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fixed #2073: Projects plugin has no default value for isPublic and restricted columns.
* MariaDB: avoid use of distinctOn on MariaDb, as not supported.
* Fixed exception in Projects plugin when fetching Locations.
* Added tests for MQTT-Create and fine-grained authorisation.


## Release version 2.5.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ public PersistenceManager getPm() {
}

public PrincipalExtended getUserPrincipal(String clientId) {
return authProvider.getUserPrincipal(clientId);
final PrincipalExtended userPrincipal = authProvider.getUserPrincipal(clientId);
LOGGER.debug("User principal for {} is {}", clientId, userPrincipal);
return userPrincipal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ private void init(ServerVersion serverVersion, Map<String, String> properties) {
TestSuite suite = TestSuite.getInstance();
serverSettings = suite.getServerSettings(properties);
try {
service = new StaService(new URI(serverSettings.getServiceUrl(version)).toURL());
sSrvc = service.service;
sMdl = service.modelSensing;
mMdl = service.modelMultiDatastream;
tMdl = service.modelTasking;

sSrvc = createService();
sMdl = sSrvc.getModel(SensorThingsV11Sensing.class);
mMdl = sSrvc.getModel(SensorThingsV11MultiDatastream.class);
tMdl = sSrvc.getModel(SensorThingsV11Tasking.class);
service = new StaService(sSrvc, sMdl, tMdl, mMdl);
} catch (MalformedURLException ex) {
LOGGER.error("Failed to create URL", ex);
}
Expand All @@ -113,6 +114,12 @@ private void init(ServerVersion serverVersion, Map<String, String> properties) {
}
}

protected SensorThingsService createService() throws MalformedURLException, URISyntaxException {
return new SensorThingsService(new SensorThingsV11Sensing(), new SensorThingsV11MultiDatastream(), new SensorThingsV11Tasking())
.setBaseUrl(new URI(serverSettings.getServiceUrl(version)).toURL())
.init();
}

protected abstract void setUpVersion() throws ServiceFailureException, URISyntaxException;

protected void tearDownVersion() throws ServiceFailureException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static de.fraunhofer.iosb.ilt.frostserver.persistence.pgjooq.utils.ConnectionUtils.TAG_DB_PASSWRD;
import static de.fraunhofer.iosb.ilt.frostserver.persistence.pgjooq.utils.ConnectionUtils.TAG_DB_URL;
import static de.fraunhofer.iosb.ilt.frostserver.persistence.pgjooq.utils.ConnectionUtils.TAG_DB_USERNAME;
import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.PREFIX_AUTH;
import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.PREFIX_PERSISTENCE;
import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.PREFIX_PLUGINS;
import static de.fraunhofer.iosb.ilt.frostserver.settings.PersistenceSettings.TAG_AUTO_UPDATE_DATABASE;
Expand Down Expand Up @@ -454,14 +455,15 @@ private void startMqttServer(int key, Map<String, String> parameters) throws IOE

String dbDriver = parameters.getOrDefault(PREFIX_PERSISTENCE + TAG_DB_DRIVER, "org.postgresql.Driver");
properties.put(PREFIX_PERSISTENCE + PersistenceSettings.TAG_IMPLEMENTATION_CLASS, VAL_PERSISTENCE_MANAGER);
properties.put(PREFIX_PERSISTENCE + TAG_AUTO_UPDATE_DATABASE, "false");
properties.put(PREFIX_PERSISTENCE + TAG_DB_DRIVER, dbDriver);
properties.put(PREFIX_PERSISTENCE + TAG_DB_URL, createDbUrl(dbDriver, parameters.get(KEY_DB_NAME)));
properties.put(PREFIX_PERSISTENCE + TAG_DB_USERNAME, VAL_PG_USER);
properties.put(PREFIX_PERSISTENCE + TAG_DB_PASSWRD, VAL_PG_PASS);
properties.put("bus." + BusSettings.TAG_IMPLEMENTATION_CLASS, "de.fraunhofer.iosb.ilt.frostserver.messagebus.MqttMessageBus");
properties.put("bus." + MqttMessageBus.TAG_MQTT_BROKER, "tcp://" + mqttBus.getHost() + ":" + mqttBus.getFirstMappedPort());
properties.putAll(parameters);
properties.put(PREFIX_PERSISTENCE + TAG_AUTO_UPDATE_DATABASE, "false");
properties.put(PREFIX_AUTH + TAG_AUTO_UPDATE_DATABASE, "false");

CoreSettings coreSettings = new CoreSettings(properties);
FrostMqttServer server = new FrostMqttServer(coreSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ void test01BatchRequest() {
+ "\r\n"
+ "--batch_36522ad7-fc75-4b56-8c71-56071383e77b--";
String response = postBatch("batch_36522ad7-fc75-4b56-8c71-56071383e77b", batchContent);
String thingId = Utils.quoteForUrl(eh2.getEntity(sMdl.etThing, "$orderby=id%20desc").get("@iot.id"));
String thingId = Utils.quoteForUrl(eh2.getEntityJson(sMdl.etThing, "$orderby=id%20desc").get("@iot.id"));
String batchBoundary = response.split("\n", 2)[0];
int mixedBoundaryStart = response.indexOf("boundary=") + 9;
String mixedBoundary = response.substring(mixedBoundaryStart, mixedBoundaryStart + 40);
Expand Down Expand Up @@ -447,7 +447,7 @@ void test06JsonBatchRequest() throws ServiceFailureException {
request = StringUtils.replace(request, "$thing0", formatKeyValuesForUrl(THINGS.get(0)));
request = StringUtils.replace(request, "$thing1", formatKeyValuesForUrl(THINGS.get(1)));
String response = postBatch(null, request);
String thingId = Utils.quoteForUrl(eh2.getEntity(sMdl.etThing, "$orderby=id%20desc").get("@iot.id"));
String thingId = Utils.quoteForUrl(eh2.getEntityJson(sMdl.etThing, "$orderby=id%20desc").get("@iot.id"));

try {
String expResponse = """
Expand Down Expand Up @@ -518,8 +518,8 @@ void test07JsonBatchRequestWithChangeSetReferencingNewEntitiesInBody() {
request = StringUtils.replace(request, "$thing0", formatKeyValuesForUrl(THINGS.get(0)));
String response = postBatch(null, request);

String sensorId = Utils.quoteForUrl(eh2.getEntity(sMdl.etSensor, "$orderby=id%20desc").get("@iot.id"));
String datastreamId = Utils.quoteForUrl(eh2.getEntity(sMdl.etDatastream, "$orderby=id%20desc").get("@iot.id"));
String sensorId = Utils.quoteForUrl(eh2.getEntityJson(sMdl.etSensor, "$orderby=id%20desc").get("@iot.id"));
String datastreamId = Utils.quoteForUrl(eh2.getEntityJson(sMdl.etDatastream, "$orderby=id%20desc").get("@iot.id"));

try {
BatchResponseJson expected = mapper.readValue("{\"responses\":["
Expand Down Expand Up @@ -581,8 +581,8 @@ void test08JsonBatchRequestWithChangeSetReferencingNewEntitiesInUrl() {
request = StringUtils.replace(request, "$post2", post2);
String response = postBatch(null, request);

String sensorId = Utils.quoteForUrl(eh2.getEntity(sMdl.etSensor, "$orderby=id%20desc").get("@iot.id"));
String datastreamId = Utils.quoteForUrl(eh2.getEntity(sMdl.etDatastream, "$orderby=id%20desc").get("@iot.id"));
String sensorId = Utils.quoteForUrl(eh2.getEntityJson(sMdl.etSensor, "$orderby=id%20desc").get("@iot.id"));
String datastreamId = Utils.quoteForUrl(eh2.getEntityJson(sMdl.etDatastream, "$orderby=id%20desc").get("@iot.id"));

try {
BatchResponseJson expected = mapper.readValue("{\"responses\":["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void check01_CreateObservationDirect() throws ServiceFailureException {
JsonNode createdObservation = getObservation();
mqttHelper.publish(entityHelper.createUrl(sMdl.etObservation), createdObservation.toString());

JsonNode latestObservation = entityHelper.getEntityWithRetry(
JsonNode latestObservation = entityHelper.getEntityJsonWithRetry(
sMdl.etObservation,
"Datastream($select=id),FeatureOfInterest($select=id)&$select=result,phenomenonTime,validTime,parameters",
10);
Expand All @@ -106,7 +106,7 @@ void check02_CreateObservationViaDatastream() throws ServiceFailureException {
datastreamId = createdObservation.get("Datastream").get(ControlInformation.ID);
mqttHelper.publish(entityHelper.createUrl(sMdl.etDatastream, datastreamId, "/Observations"), createdObservation.toString());

JsonNode latestObservation = entityHelper.getEntityWithRetry(
JsonNode latestObservation = entityHelper.getEntityJsonWithRetry(
sMdl.etObservation,
"Datastream($select=id),FeatureOfInterest($select=id)&$select=result,phenomenonTime,validTime,parameters",
10);
Expand All @@ -122,7 +122,7 @@ void check03_CreateObservationViaFeatureOfInterest() throws ServiceFailureExcept
featureOfInterestId = createdObservation.get("FeatureOfInterest").get(ControlInformation.ID);
mqttHelper.publish(entityHelper.createUrl(sMdl.etFeatureOfInterest, featureOfInterestId, "/Observations"), createdObservation.toString());

JsonNode latestObservation = entityHelper.getEntityWithRetry(
JsonNode latestObservation = entityHelper.getEntityJsonWithRetry(
sMdl.etObservation,
"Datastream($select=id),FeatureOfInterest($select=id)&$select=result,phenomenonTime,validTime,parameters",
10);
Expand All @@ -136,7 +136,7 @@ void check04_CreateObservationWithDeepInsert() throws ServiceFailureException {
JsonNode createdObservation = getObservationWithDeepInsert();
mqttHelper.publish(entityHelper.createUrl(sMdl.etObservation), createdObservation.toString());

JsonNode latestObservation = entityHelper.getEntityWithRetry(
JsonNode latestObservation = entityHelper.getEntityJsonWithRetry(
sMdl.etObservation,
expandQueryFromJsonObject(createdObservation),
10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import de.fraunhofer.iosb.ilt.frostclient.SensorThingsService;
import de.fraunhofer.iosb.ilt.frostclient.dao.Dao;
import de.fraunhofer.iosb.ilt.frostclient.exception.ServiceFailureException;
import de.fraunhofer.iosb.ilt.frostclient.json.serialize.JsonWriter;
import de.fraunhofer.iosb.ilt.frostclient.model.Entity;
import de.fraunhofer.iosb.ilt.frostclient.model.EntityType;
import de.fraunhofer.iosb.ilt.frostclient.model.PkValue;
import de.fraunhofer.iosb.ilt.frostclient.model.property.EntityPropertyMain;
import de.fraunhofer.iosb.ilt.frostclient.model.property.NavigationPropertyEntity;
Expand All @@ -60,6 +62,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -149,8 +152,12 @@ private static String resourceUrl(String path, String name) {
protected static EntityHelper2 ehAdminProject2;

private static MqttHelper2 mqttHelperAdmin;
private static MqttHelper2 mqttHelperWrite;
private static MqttHelper2 mqttHelperRead;
private static MqttHelper2 mqttHelperAdminProject1;
private static MqttHelper2 mqttHelperAdminProject2;
private static MqttHelper2 mqttHelperObsCreaterProject1;
private static MqttHelper2 mqttHelperObsCreaterProject2;

private final boolean anonymousReadAllowed;
private final AuthTestHelper ath;
Expand Down Expand Up @@ -190,9 +197,13 @@ protected void setUpVersion() throws ServiceFailureException {
ehAdminProject1 = setCaches(new EntityHelper2(serviceAdminProject1));
ehAdminProject2 = setCaches(new EntityHelper2(serviceAdminProject2));

mqttHelperAdmin = new MqttHelper2(serviceAdmin, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs());
mqttHelperAdminProject1 = new MqttHelper2(serviceAdminProject1, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs());
mqttHelperAdminProject2 = new MqttHelper2(serviceAdminProject2, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs());
mqttHelperAdmin = new MqttHelper2(serviceAdmin, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs(), "TC-" + ADMIN);
mqttHelperWrite = new MqttHelper2(serviceWrite, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs(), "TC-" + WRITE);
mqttHelperRead = new MqttHelper2(serviceRead, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs(), "TC-" + READ);
mqttHelperAdminProject1 = new MqttHelper2(serviceAdminProject1, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs(), "TC-" + ADMIN_P1);
mqttHelperAdminProject2 = new MqttHelper2(serviceAdminProject2, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs(), "TC-" + ADMIN_P2);
mqttHelperObsCreaterProject1 = new MqttHelper2(serviceObsCreaterProject1, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs(), "TC-" + OBS_CREATE_P1);
mqttHelperObsCreaterProject2 = new MqttHelper2(serviceObsCreaterProject2, serverSettings.getMqttUrl(), serverSettings.getMqttTimeOutMs(), "TC-" + OBS_CREATE_P2);
createEntities();
}

Expand Down Expand Up @@ -474,6 +485,38 @@ void test_06c_ThingCreateForProject1WithDatastream() {
createForFail(OBS_CREATE_P2, serviceObsCreaterProject2, creator, serviceAdmin.dao(mdlSensing.etThing), THINGS, H403);
}

@Test
void test_06d_ThingCreateForProject1Mqtt() throws JsonProcessingException {
LOGGER.info(" test_06d_ThingCreateForProject1Mqtt");
EntityCreator creator = (user) -> mdlSensing.newThing(user + " MQTT-Thing", "A Thing made by " + user + " using MQTT")
.addNavigationEntity(mdlUsers.npThingProjects, PROJECTS.get(0).withOnlyPk());
StringCreator filterCreator = (user) -> "name eq " + StringHelper.quoteForUrl(user + " MQTT-Thing");
String topic = version.urlPart + '/' + sMdl.etThing.mainSet;

List<MqttCreateTester> testers = new ArrayList<>();
testers.add(new MqttCreateTester(mqttHelperRead, ehAdmin, READ, creator, filterCreator, topic, sMdl.etThing, false));
testers.add(new MqttCreateTester(mqttHelperWrite, ehAdmin, WRITE, creator, filterCreator, topic, sMdl.etThing, true));
testers.add(new MqttCreateTester(mqttHelperAdminProject1, ehAdmin, ADMIN_P1, creator, filterCreator, topic, sMdl.etThing, true));
testers.add(new MqttCreateTester(mqttHelperAdminProject2, ehAdmin, ADMIN_P2, creator, filterCreator, topic, sMdl.etThing, false));
testers.add(new MqttCreateTester(mqttHelperObsCreaterProject1, ehAdmin, OBS_CREATE_P1, creator, filterCreator, topic, sMdl.etThing, false));
testers.add(new MqttCreateTester(mqttHelperObsCreaterProject2, ehAdmin, OBS_CREATE_P2, creator, filterCreator, topic, sMdl.etThing, false));

for (var tester : testers) {
tester.start();
}
for (var tester : testers) {
tester.join();
if (tester.hasCreatedEntity()) {
LOGGER.info("Found Entity for {}: {}", tester.name, tester.getCreatedEntity());
THINGS.add(tester.getCreatedEntity());
}
}
for (var tester : testers) {
LOGGER.info(" User {}, {}, Message {}", tester.name, tester.isSuccess(), tester.getMessage());
assertTrue(tester.isSuccess(), tester.getMessage());
}
}

@Test
void test_07a_DatastreamRelinkToThing2() {
LOGGER.info(" test_07a_DatastreamRelinkToThing2");
Expand Down Expand Up @@ -744,4 +787,104 @@ public static interface EntityCreator {

public Entity create(String user);
}

public static interface StringCreator {

public String create(String user);
}

private static class MqttCreateTester {

private static final int JOIN_TIMEOUT = 1500;

private final MqttHelper2 mh;
private final EntityHelper2 eh;
private final String name;
private final EntityCreator entityCreator;
private final StringCreator filterCreator;
private final String topic;
private final EntityType et;
private final boolean expectSuccess;

private boolean done;
private boolean success;
private String message;
private Entity createdEntity;

public MqttCreateTester(MqttHelper2 mh, EntityHelper2 eh, String name, EntityCreator entityCreator, StringCreator filterCreator, String topic, EntityType et, boolean expectSuccess) {
this.mh = mh;
this.eh = eh;
this.name = name;
this.entityCreator = entityCreator;
this.filterCreator = filterCreator;
this.topic = topic;
this.et = et;
this.expectSuccess = expectSuccess;
this.success = false;
this.message = "Still running for " + name;
}

private Thread thread;

public void start() {
thread = new Thread(this::executeTest);
thread.start();
}

public void join() {
if (thread == null) {
return;
}
LOGGER.info("Joining {}", name);
try {
thread.join(JOIN_TIMEOUT);
} catch (InterruptedException ex) {
LOGGER.error("Interrupted", ex);
}
}

private void executeTest() {
try {
Entity entity = entityCreator.create(name);
String json = JsonWriter.writeEntity(entity);
mh.publish(topic, json);
createdEntity = eh.getEntityWithRetry(et, filterCreator.create(name), null, 10);
if (createdEntity == null && !expectSuccess) {
success = true;
message = "Success";
} else if (createdEntity != null && expectSuccess) {
success = true;
message = "Success";
} else {
success = false;
message = "Failed for " + name + ". Entity: " + Objects.toString(createdEntity) + " expectSuccess: " + expectSuccess;
}
} catch (JsonProcessingException | ServiceFailureException ex) {
LOGGER.error("Failed to create JSON or fetch entity");
}
done = true;
}

public boolean isDone() {
return done;
}

public boolean isSuccess() {
return success;
}

public String getMessage() {
return message;
}

public boolean hasCreatedEntity() {
return createdEntity != null;
}

public Entity getCreatedEntity() {
return createdEntity;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void createServices() {
serviceObsCreaterProject2 = AuthTestHelper.setAuthBasic(createService(), "ObsCreaterProject2", "ObsCreaterProject2");
}

private SensorThingsService createService() {
@Override
protected SensorThingsService createService() {
if (!baseService.isBaseUrlSet()) {
try {
baseService.setBaseUrl(new URI(serverSettings.getServiceUrl(version)))
Expand Down
Loading

0 comments on commit 199d373

Please sign in to comment.