Skip to content

Commit

Permalink
close client on failure to unregister metrics and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed Feb 24, 2025
1 parent 8e14cea commit 795e0d9
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.bakdata.conquery.mode.cluster;

import java.io.IOException;
import jakarta.validation.Validator;
import java.util.NoSuchElementException;

import com.bakdata.conquery.io.mina.MinaAttributes;
import com.bakdata.conquery.io.mina.NetworkSession;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.jobs.Job;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.jobs.ReactingJob;
Expand Down Expand Up @@ -33,7 +34,6 @@ public class ClusterConnectionManager extends IoHandlerAdapter {

private final DatasetRegistry<DistributedNamespace> datasetRegistry;
private final JobManager jobManager;
private final Validator validator;
private final ConqueryConfig config;
private final InternalMapperFactory internalMapperFactory;
@Getter
Expand Down Expand Up @@ -86,7 +86,12 @@ public void messageReceived(IoSession session, Object message) {
);

if (toManagerNode instanceof ForwardToNamespace nsMesg) {
datasetRegistry.get(nsMesg.getDatasetId()).getJobManager().addSlowJob(job);
DatasetId datasetId = nsMesg.getDatasetId();
DistributedNamespace namespace = datasetRegistry.get(datasetId);
if (namespace == null) {
throw new NoSuchElementException("Unable to find namespace with id %s for message: %s".formatted(datasetId, message) );
}
namespace.getJobManager().addSlowJob(job);
}
else if (toManagerNode instanceof SlowMessage slowMessage) {
slowMessage.setProgressReporter(job.getProgressReporter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,27 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.Invocation;
import jakarta.ws.rs.core.GenericType;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import com.bakdata.conquery.apiv1.execution.ExecutionStatus;
import com.bakdata.conquery.apiv1.execution.FullExecutionStatus;
import com.bakdata.conquery.apiv1.execution.OverviewExecutionStatus;
import com.bakdata.conquery.apiv1.query.Query;
import com.bakdata.conquery.integration.json.ConqueryTestSpec;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.auth.entities.Role;
import com.bakdata.conquery.models.auth.entities.User;
import com.bakdata.conquery.models.exceptions.JSONException;
import com.bakdata.conquery.models.execution.ExecutionState;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.identifiable.ids.specific.RoleId;
import com.bakdata.conquery.resources.api.DatasetQueryResource;
import com.bakdata.conquery.resources.api.QueryResource;
import com.bakdata.conquery.resources.hierarchies.HierarchyHelper;
import com.bakdata.conquery.util.support.StandaloneSupport;
import com.fasterxml.jackson.databind.JsonNode;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.GenericType;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -36,75 +35,61 @@
public class IntegrationUtils {


/**
* Load the constellation of roles, users and permissions into the provided storage.
*/
public static void importPermissionConstellation(MetaStorage storage, Role[] roles, RequiredUser[] rUsers) {

for (Role role : roles) {
storage.addRole(role);
}

for (RequiredUser rUser : rUsers) {
final User user = rUser.getUser();
storage.addUser(user);

final RoleId[] rolesInjected = rUser.getRolesInjected();

for (RoleId mandatorId : rolesInjected) {
user.addRole(storage.getRole(mandatorId));
}
}
}


public static Query parseQuery(StandaloneSupport support, JsonNode rawQuery) throws JSONException, IOException {
return ConqueryTestSpec.parseSubTree(support, rawQuery, Query.class, true);
}

/**
* Send a query onto the conquery instance and assert the result's size.
*
* @return
*/
public static ManagedExecutionId assertQueryResult(StandaloneSupport conquery, Object query, long expectedSize, ExecutionState expectedState, User user, int expectedResponseCode) {
public static ManagedExecutionId assertQueryResult(StandaloneSupport conquery, Object query, long expectedSize, ExecutionState expectedState, @Nullable User user, int expectedResponseCode) {
final URI postQueryURI = getPostQueryURI(conquery);

// Submit Query
final Response response = conquery.getClient()
.target(postQueryURI)
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(query, MediaType.APPLICATION_JSON_TYPE));

// Submit Query
Invocation.Builder request = conquery.getClient()
.target(postQueryURI)
.request(MediaType.APPLICATION_JSON_TYPE);

assertThat(response.getStatusInfo().getStatusCode())
.as(() -> response.readEntity(String.class))
.isEqualTo(expectedResponseCode);
if (user != null) {
// Override authentication if user is provided
final String userToken = conquery.getAuthorizationController()
.getConqueryTokenRealm()
.createTokenForUser(user.getId());

if (expectedState == ExecutionState.FAILED && !response.getStatusInfo().getFamily().equals(Response.Status.Family.SUCCESSFUL)) {
return null;
request.header("Authorization", "Bearer " + userToken);
}

final JsonNode jsonNode = response.readEntity(JsonNode.class);
try(final Response response = request
.post(Entity.entity(query, MediaType.APPLICATION_JSON_TYPE))) {

final String id = jsonNode.get(ExecutionStatus.Fields.id).asText();
assertThat(response.getStatusInfo().getStatusCode())
.as(() -> response.readEntity(String.class))
.isEqualTo(expectedResponseCode);

// TODO implement this properly: ExecutionStatus status = response.readEntity(ExecutionStatus.Full.class);
if (expectedState == ExecutionState.FAILED && !response.getStatusInfo().getFamily().equals(Response.Status.Family.SUCCESSFUL)) {
return null;
}

final JsonNode execStatusRaw = getRawExecutionStatus(id, conquery, user);
// TODO implement this properly: ExecutionStatus status = response.readEntity(ExecutionStatus.Full.class);
final JsonNode jsonNode = response.readEntity(JsonNode.class);
final String id = jsonNode.get(ExecutionStatus.Fields.id).asText();

final String status = execStatusRaw.get(ExecutionStatus.Fields.status).asText();
final long numberOfResults = execStatusRaw.get(ExecutionStatus.Fields.numberOfResults).asLong(0);
final JsonNode execStatusRaw = getRawExecutionStatus(id, conquery, user);

assertThat(status).isEqualTo(expectedState.name());
final String status = execStatusRaw.get(ExecutionStatus.Fields.status).asText();
final long numberOfResults = execStatusRaw.get(ExecutionStatus.Fields.numberOfResults).asLong(0);

if (expectedState == ExecutionState.DONE && expectedSize != -1) {
assertThat(numberOfResults)
.describedAs("Query results")
.isEqualTo(expectedSize);
}
assertThat(status).isEqualTo(expectedState.name());

return ManagedExecutionId.Parser.INSTANCE.parse(id);
if (expectedState == ExecutionState.DONE && expectedSize != -1) {
assertThat(numberOfResults)
.describedAs("Query results")
.isEqualTo(expectedSize);
}

return ManagedExecutionId.Parser.INSTANCE.parse(id);
}
}

public static List<OverviewExecutionStatus> getAllQueries(StandaloneSupport conquery, int expectedResponseCode) {
Expand Down

0 comments on commit 795e0d9

Please sign in to comment.