Skip to content

Commit

Permalink
Merge pull request #3415 from ingef/feature/import-better-rejection-m…
Browse files Browse the repository at this point in the history
…essage

Import better rejection message
  • Loading branch information
awildturtok authored Apr 30, 2024
2 parents 515ff09 + 17b44ed commit 0e37263
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public void updateImport(Namespace namespace, InputStream inputStream) {
datasetRegistry.get(namespace.getDataset().getId()),
inputStream,
config.getCluster().getEntityBucketSize(),
config,
true
);

Expand Down Expand Up @@ -63,7 +62,6 @@ public void addImport(Namespace namespace, InputStream inputStream) {
datasetRegistry.get(namespace.getDataset().getId()),
inputStream,
config.getCluster().getEntityBucketSize(),
config,
false
);
namespace.getJobManager().addSlowJob(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -12,17 +13,13 @@
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response;

import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.datasets.Import;
import com.bakdata.conquery.models.datasets.ImportColumn;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.MajorTypeId;
import com.bakdata.conquery.models.events.stores.root.ColumnStore;
import com.bakdata.conquery.models.exceptions.JSONException;
import com.bakdata.conquery.models.identifiable.ids.specific.BucketId;
Expand All @@ -32,6 +29,7 @@
import com.bakdata.conquery.models.messages.namespaces.specific.AddImport;
import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket;
import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob;
import com.bakdata.conquery.models.preproc.PPColumn;
import com.bakdata.conquery.models.preproc.PreprocessedData;
import com.bakdata.conquery.models.preproc.PreprocessedHeader;
import com.bakdata.conquery.models.preproc.PreprocessedReader;
Expand All @@ -44,6 +42,9 @@
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -63,7 +64,7 @@ public class ImportJob extends Job {
private final PreprocessedHeader header;
private final PreprocessedData container;

public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStream inputStream, int entityBucketSize, ConqueryConfig config, boolean update)
public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStream inputStream, int entityBucketSize, boolean update)
throws IOException {

try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) {
Expand All @@ -86,7 +87,14 @@ public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStre
}

// Ensure that Import and Table have the same schema
header.assertMatch(table);
final List<String> validationErrors = ensureHeadersMatch(table, header);

if(!validationErrors.isEmpty()){
final String errorMessage = String.join("\n -", validationErrors);

log.error("Problems concerning Import `{}`:{}", header.getName(), errorMessage);
throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]:%s", header.getTable(), header.getName(), table.getId(), errorMessage));
}

final ImportId importId = new ImportId(table.getId(), header.getName());
final Import processedImport = namespace.getStorage().getImport(importId);
Expand Down Expand Up @@ -122,6 +130,34 @@ else if (processedImport != null) {
}
}

/**
* Verify that the supplied table matches the preprocessed data in shape.
*/
public static List<String> ensureHeadersMatch(Table table, PreprocessedHeader importHeaders) {
// final StringJoiner errors = new StringJoiner("\n - ", "\n - ", "");

final List<String> errors = new ArrayList<>();

if (table.getColumns().length != importHeaders.getColumns().length) {
errors.add(String.format("Import column count=%d does not match table column count=%d", importHeaders.getColumns().length, table.getColumns().length));
}

final Map<String, MajorTypeId> typesByName = Arrays.stream(importHeaders.getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType));

for (PPColumn column : importHeaders.getColumns()) {
if (!typesByName.containsKey(column.getName())) {
errors.add("Column[%s] is missing."
.formatted(column.getName()));
}
else if (!typesByName.get(column.getName()).equals(column.getType())) {
errors.add("Column[%s] Types do not match %s != %s"
.formatted(column.getName(), typesByName.get(column.getName()), column.getType()));
}
}

return errors;
}


@Override
public void execute() throws JSONException, InterruptedException, IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
package com.bakdata.conquery.models.preproc;

import java.util.Arrays;
import java.util.Map;
import java.util.StringJoiner;
import java.util.stream.Collectors;

import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.events.MajorTypeId;
import com.fasterxml.jackson.annotation.JsonCreator;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down Expand Up @@ -52,38 +44,4 @@ public class PreprocessedHeader {
*/
private int validityHash;


/**
* Verify that the supplied table matches the preprocessed' data in shape.
*/
public void assertMatch(Table table) {
StringJoiner errors = new StringJoiner("\n");

if (table.getColumns().length != getColumns().length) {
errors.add(String.format("Import column count=`%d` does not match table column count=`%d`", getColumns().length, table.getColumns().length));
}

final Map<String, MajorTypeId> typesByName = Arrays.stream(getColumns())
.collect(Collectors.toMap(PPColumn::getName, PPColumn::getType));

for (int i = 0; i < Math.min(table.getColumns().length, getColumns().length); i++) {
final Column column = table.getColumns()[i];

if(!typesByName.containsKey(column.getName())){
errors.add(String.format("Column[%s] is missing.", column.getName()));
}
else if (!typesByName.get(column.getName()).equals(column.getType())) {
errors.add(String.format("Column[%s] Types do not match %s != %s"
, column.getName(), typesByName.get(column.getName()), column.getType())
);
}
}

if (errors.length() != 0) {
log.error("Problems concerning Import `{}`:\n{}", name, errors);
throw new IllegalArgumentException(String.format("Headers[%s.%s] do not match Table[%s]. More info in logs.", getTable(), getName(), table.getId()));
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.bakdata.conquery.resources.ResourceConstants.*;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -45,6 +46,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -140,7 +142,7 @@ public void addTable(Table table) {
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@Path("cqpp")
public void updateCqppImport(@NotNull InputStream importStream) throws IOException {
processor.updateImport(namespace, new GZIPInputStream(importStream));
processor.updateImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream)));
}

@PUT
Expand All @@ -157,9 +159,10 @@ public void updateImport(@NotNull @QueryParam("file") File importFile) throws We
@POST
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@Path("cqpp")
public void uploadImport(@NotNull InputStream importStream) throws IOException {
log.info("Importing from file upload");
processor.addImport(namespace, new GZIPInputStream(importStream));
@SneakyThrows
public void uploadImport(@NotNull InputStream importStream) {
log.debug("Importing from file upload");
processor.addImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream)));
}

@POST
Expand All @@ -177,9 +180,7 @@ public void addImport(@QueryParam("file") File importFile) throws WebApplication

@POST
@Path("concepts")
public void addConcept(
@QueryParam("force") @DefaultValue("false") boolean force,
Concept concept) {
public void addConcept(@QueryParam("force") @DefaultValue("false") boolean force, Concept concept) {
processor.addConcept(namespace.getDataset(), concept, force);
}

Expand Down

0 comments on commit 0e37263

Please sign in to comment.