diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java index e244af0f59..92fbd6ee47 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -18,6 +19,7 @@ import com.bakdata.conquery.models.datasets.ImportColumn; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.events.Bucket; +import com.bakdata.conquery.models.events.MajorTypeId; import com.bakdata.conquery.models.events.stores.root.ColumnStore; import com.bakdata.conquery.models.exceptions.JSONException; import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; @@ -27,6 +29,7 @@ import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; +import com.bakdata.conquery.models.preproc.PPColumn; import com.bakdata.conquery.models.preproc.PreprocessedData; import com.bakdata.conquery.models.preproc.PreprocessedHeader; import com.bakdata.conquery.models.preproc.PreprocessedReader; @@ -84,7 +87,14 @@ public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStre } // Ensure that Import and Table have the same schema - header.assertMatch(table); + final List validationErrors = ensureHeadersMatch(table, header); + + if(!validationErrors.isEmpty()){ + final String errorMessage = String.join("\n -", validationErrors); + + log.error("Problems concerning Import `{}`:{}", header.getName(), errorMessage); + throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]:%s", header.getTable(), header.getName(), table.getId(), errorMessage)); + } final ImportId importId = new ImportId(table.getId(), header.getName()); final Import processedImport = namespace.getStorage().getImport(importId); @@ -120,6 +130,34 @@ else if (processedImport != null) { } } + /** + * Verify that the supplied table matches the preprocessed data in shape. + */ + public static List ensureHeadersMatch(Table table, PreprocessedHeader importHeaders) { +// final StringJoiner errors = new StringJoiner("\n - ", "\n - ", ""); + + final List errors = new ArrayList<>(); + + if (table.getColumns().length != importHeaders.getColumns().length) { + errors.add(String.format("Import column count=%d does not match table column count=%d", importHeaders.getColumns().length, table.getColumns().length)); + } + + final Map typesByName = Arrays.stream(importHeaders.getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); + + for (PPColumn column : importHeaders.getColumns()) { + if (!typesByName.containsKey(column.getName())) { + errors.add("Column[%s] is missing." + .formatted(column.getName())); + } + else if (!typesByName.get(column.getName()).equals(column.getType())) { + errors.add("Column[%s] Types do not match %s != %s" + .formatted(column.getName(), typesByName.get(column.getName()), column.getType())); + } + } + + return errors; + } + @Override public void execute() throws JSONException, InterruptedException, IOException { diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java index 0f25265771..b032dbb8a8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java @@ -1,14 +1,6 @@ package com.bakdata.conquery.models.preproc; -import java.util.Arrays; -import java.util.Map; -import java.util.StringJoiner; -import java.util.stream.Collectors; - -import com.bakdata.conquery.models.datasets.Table; -import com.bakdata.conquery.models.events.MajorTypeId; import com.fasterxml.jackson.annotation.JsonCreator; -import jakarta.ws.rs.BadRequestException; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; @@ -52,35 +44,4 @@ public class PreprocessedHeader { */ private int validityHash; - - /** - * Verify that the supplied table matches the preprocessed data in shape. - */ - public void assertMatch(Table table) { - final StringJoiner errors = new StringJoiner("\n - "); - - if (table.getColumns().length != getColumns().length) { - errors.add(String.format("Import column count=%d does not match table column count=%d", getColumns().length, table.getColumns().length)); - } - - final Map typesByName = Arrays.stream(getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); - - for (PPColumn column : getColumns()) { - if (!typesByName.containsKey(column.getName())) { - errors.add("Column[%s] is missing." - .formatted(column.getName())); - } - else if (!typesByName.get(column.getName()).equals(column.getType())) { - errors.add("Column[%s] Types do not match %s != %s" - .formatted(column.getName(), typesByName.get(column.getName()), column.getType())); - } - } - - if (errors.length() != 0) { - log.error("Problems concerning Import `{}`:\n - {}", name, errors); - throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]: %s", getTable(), getName(), table.getId(), errors)); - } - } - - } diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java index fcec135897..7feeecc553 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java @@ -34,7 +34,6 @@ import jakarta.ws.rs.DELETE; import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; -import jakarta.ws.rs.InternalServerErrorException; import jakarta.ws.rs.POST; import jakarta.ws.rs.PUT; import jakarta.ws.rs.Path; @@ -47,6 +46,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -142,15 +142,7 @@ public void addTable(Table table) { @Consumes(MediaType.APPLICATION_OCTET_STREAM) @Path("cqpp") public void updateCqppImport(@NotNull InputStream importStream) throws IOException { - try { - processor.updateImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); - } - catch (WebApplicationException wex) { - throw wex; - } - catch (Exception ex) { - throw new InternalServerErrorException(ex); - } + processor.updateImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); } @PUT @@ -162,28 +154,15 @@ public void updateImport(@NotNull @QueryParam("file") File importFile) throws We catch (IOException err) { throw new WebApplicationException(String.format("Invalid file (`%s`) supplied.", importFile), err, Status.BAD_REQUEST); } - catch (WebApplicationException wex) { - throw wex; - } - catch (Exception ex) { - throw new InternalServerErrorException(ex); - } } @POST @Consumes(MediaType.APPLICATION_OCTET_STREAM) @Path("cqpp") + @SneakyThrows public void uploadImport(@NotNull InputStream importStream) { log.debug("Importing from file upload"); - try { - processor.addImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); - } - catch (WebApplicationException wex) { - throw wex; - } - catch (Exception ex) { - throw new InternalServerErrorException(ex); - } + processor.addImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); } @POST @@ -196,12 +175,6 @@ public void addImport(@QueryParam("file") File importFile) throws WebApplication log.warn("Unable to process import", err); throw new WebApplicationException(String.format("Invalid file (`%s`) supplied.", importFile), err, Status.BAD_REQUEST); } - catch (WebApplicationException wex) { - throw wex; - } - catch (Exception ex) { - throw new InternalServerErrorException(ex); - } }