From 8d106c5bf83a100a44b41095b0017108ecc67278 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 30 Apr 2024 10:58:03 +0200 Subject: [PATCH 1/5] remove unused config param --- .../conquery/mode/cluster/ClusterImportHandler.java | 2 -- .../com/bakdata/conquery/models/jobs/ImportJob.java | 10 ++++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java index 91f07d5d72..991d5efd64 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java @@ -35,7 +35,6 @@ public void updateImport(Namespace namespace, InputStream inputStream) { datasetRegistry.get(namespace.getDataset().getId()), inputStream, config.getCluster().getEntityBucketSize(), - config, true ); @@ -63,7 +62,6 @@ public void addImport(Namespace namespace, InputStream inputStream) { datasetRegistry.get(namespace.getDataset().getId()), inputStream, config.getCluster().getEntityBucketSize(), - config, false ); namespace.getJobManager().addSlowJob(job); diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java index f7fd2d29b7..e244af0f59 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java @@ -12,11 +12,6 @@ import java.util.Set; import java.util.stream.Collectors; -import jakarta.ws.rs.BadRequestException; -import jakarta.ws.rs.WebApplicationException; -import jakarta.ws.rs.core.Response; - -import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.Import; @@ -44,6 +39,9 @@ import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Response; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -63,7 +61,7 @@ public class ImportJob extends Job { private final PreprocessedHeader header; private final PreprocessedData container; - public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStream inputStream, int entityBucketSize, ConqueryConfig config, boolean update) + public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStream inputStream, int entityBucketSize, boolean update) throws IOException { try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) { From 8f57dbbe1759806bc2c4d87de9be3282cfc50d5f Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 30 Apr 2024 10:58:24 +0200 Subject: [PATCH 2/5] throw validation exception as WebException --- .../models/preproc/PreprocessedHeader.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java index 2bcb522c07..0f25265771 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java @@ -5,10 +5,10 @@ import java.util.StringJoiner; import java.util.stream.Collectors; -import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.events.MajorTypeId; import com.fasterxml.jackson.annotation.JsonCreator; +import jakarta.ws.rs.BadRequestException; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; @@ -54,34 +54,31 @@ public class PreprocessedHeader { /** - * Verify that the supplied table matches the preprocessed' data in shape. + * Verify that the supplied table matches the preprocessed data in shape. */ public void assertMatch(Table table) { - StringJoiner errors = new StringJoiner("\n"); + final StringJoiner errors = new StringJoiner("\n - "); if (table.getColumns().length != getColumns().length) { - errors.add(String.format("Import column count=`%d` does not match table column count=`%d`", getColumns().length, table.getColumns().length)); + errors.add(String.format("Import column count=%d does not match table column count=%d", getColumns().length, table.getColumns().length)); } - final Map typesByName = Arrays.stream(getColumns()) - .collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); + final Map typesByName = Arrays.stream(getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); - for (int i = 0; i < Math.min(table.getColumns().length, getColumns().length); i++) { - final Column column = table.getColumns()[i]; - - if(!typesByName.containsKey(column.getName())){ - errors.add(String.format("Column[%s] is missing.", column.getName())); + for (PPColumn column : getColumns()) { + if (!typesByName.containsKey(column.getName())) { + errors.add("Column[%s] is missing." + .formatted(column.getName())); } else if (!typesByName.get(column.getName()).equals(column.getType())) { - errors.add(String.format("Column[%s] Types do not match %s != %s" - , column.getName(), typesByName.get(column.getName()), column.getType()) - ); + errors.add("Column[%s] Types do not match %s != %s" + .formatted(column.getName(), typesByName.get(column.getName()), column.getType())); } } if (errors.length() != 0) { - log.error("Problems concerning Import `{}`:\n{}", name, errors); - throw new IllegalArgumentException(String.format("Headers[%s.%s] do not match Table[%s]. More info in logs.", getTable(), getName(), table.getId())); + log.error("Problems concerning Import `{}`:\n - {}", name, errors); + throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]: %s", getTable(), getName(), table.getId(), errors)); } } From 8914eceb366c33497f4efa1a74f7cff580c76bee Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 30 Apr 2024 10:59:37 +0200 Subject: [PATCH 3/5] cleanup mapping of exceptions --- .../admin/rest/AdminDatasetResource.java | 36 +++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java index 4a1de584ce..52883c45f3 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java @@ -2,6 +2,7 @@ import static com.bakdata.conquery.resources.ResourceConstants.*; +import java.io.BufferedInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -33,6 +34,7 @@ import jakarta.ws.rs.DELETE; import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; +import jakarta.ws.rs.InternalServerErrorException; import jakarta.ws.rs.POST; import jakarta.ws.rs.PUT; import jakarta.ws.rs.Path; @@ -141,6 +143,14 @@ public void addTable(Table table) { @Path("cqpp") public void updateCqppImport(@NotNull InputStream importStream) throws IOException { processor.updateImport(namespace, new GZIPInputStream(importStream)); + try { + } + catch (WebApplicationException wex) { + throw wex; + } + catch (Exception ex) { + throw new InternalServerErrorException(ex); + } } @PUT @@ -152,6 +162,12 @@ public void updateImport(@NotNull @QueryParam("file") File importFile) throws We catch (IOException err) { throw new WebApplicationException(String.format("Invalid file (`%s`) supplied.", importFile), err, Status.BAD_REQUEST); } + catch (WebApplicationException wex) { + throw wex; + } + catch (Exception ex) { + throw new InternalServerErrorException(ex); + } } @POST @@ -160,6 +176,16 @@ public void updateImport(@NotNull @QueryParam("file") File importFile) throws We public void uploadImport(@NotNull InputStream importStream) throws IOException { log.info("Importing from file upload"); processor.addImport(namespace, new GZIPInputStream(importStream)); + public void uploadImport(@NotNull InputStream importStream) { + log.debug("Importing from file upload"); + try { + } + catch (WebApplicationException wex) { + throw wex; + } + catch (Exception ex) { + throw new InternalServerErrorException(ex); + } } @POST @@ -172,14 +198,18 @@ public void addImport(@QueryParam("file") File importFile) throws WebApplication log.warn("Unable to process import", err); throw new WebApplicationException(String.format("Invalid file (`%s`) supplied.", importFile), err, Status.BAD_REQUEST); } + catch (WebApplicationException wex) { + throw wex; + } + catch (Exception ex) { + throw new InternalServerErrorException(ex); + } } @POST @Path("concepts") - public void addConcept( - @QueryParam("force") @DefaultValue("false") boolean force, - Concept concept) { + public void addConcept(@QueryParam("force") @DefaultValue("false") boolean force, Concept concept) { processor.addConcept(namespace.getDataset(), concept, force); } From 5e628941048c73316a1c879aecbacea13a1159d8 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 30 Apr 2024 11:00:01 +0200 Subject: [PATCH 4/5] use BufferedInputStreams for CQPP from HTTP --- .../conquery/resources/admin/rest/AdminDatasetResource.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java index 52883c45f3..fcec135897 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java @@ -142,8 +142,8 @@ public void addTable(Table table) { @Consumes(MediaType.APPLICATION_OCTET_STREAM) @Path("cqpp") public void updateCqppImport(@NotNull InputStream importStream) throws IOException { - processor.updateImport(namespace, new GZIPInputStream(importStream)); try { + processor.updateImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); } catch (WebApplicationException wex) { throw wex; @@ -173,12 +173,10 @@ public void updateImport(@NotNull @QueryParam("file") File importFile) throws We @POST @Consumes(MediaType.APPLICATION_OCTET_STREAM) @Path("cqpp") - public void uploadImport(@NotNull InputStream importStream) throws IOException { - log.info("Importing from file upload"); - processor.addImport(namespace, new GZIPInputStream(importStream)); public void uploadImport(@NotNull InputStream importStream) { log.debug("Importing from file upload"); try { + processor.addImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); } catch (WebApplicationException wex) { throw wex; From 17b44ed1ec8c366cf38e047f0cc3635bd4033f5c Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 30 Apr 2024 12:15:08 +0200 Subject: [PATCH 5/5] move error generation to ImportJob#createOrUpdate --- .../conquery/models/jobs/ImportJob.java | 40 ++++++++++++++++++- .../models/preproc/PreprocessedHeader.java | 39 ------------------ .../admin/rest/AdminDatasetResource.java | 35 ++-------------- 3 files changed, 43 insertions(+), 71 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java index e244af0f59..92fbd6ee47 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -18,6 +19,7 @@ import com.bakdata.conquery.models.datasets.ImportColumn; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.events.Bucket; +import com.bakdata.conquery.models.events.MajorTypeId; import com.bakdata.conquery.models.events.stores.root.ColumnStore; import com.bakdata.conquery.models.exceptions.JSONException; import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; @@ -27,6 +29,7 @@ import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; +import com.bakdata.conquery.models.preproc.PPColumn; import com.bakdata.conquery.models.preproc.PreprocessedData; import com.bakdata.conquery.models.preproc.PreprocessedHeader; import com.bakdata.conquery.models.preproc.PreprocessedReader; @@ -84,7 +87,14 @@ public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStre } // Ensure that Import and Table have the same schema - header.assertMatch(table); + final List validationErrors = ensureHeadersMatch(table, header); + + if(!validationErrors.isEmpty()){ + final String errorMessage = String.join("\n -", validationErrors); + + log.error("Problems concerning Import `{}`:{}", header.getName(), errorMessage); + throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]:%s", header.getTable(), header.getName(), table.getId(), errorMessage)); + } final ImportId importId = new ImportId(table.getId(), header.getName()); final Import processedImport = namespace.getStorage().getImport(importId); @@ -120,6 +130,34 @@ else if (processedImport != null) { } } + /** + * Verify that the supplied table matches the preprocessed data in shape. + */ + public static List ensureHeadersMatch(Table table, PreprocessedHeader importHeaders) { +// final StringJoiner errors = new StringJoiner("\n - ", "\n - ", ""); + + final List errors = new ArrayList<>(); + + if (table.getColumns().length != importHeaders.getColumns().length) { + errors.add(String.format("Import column count=%d does not match table column count=%d", importHeaders.getColumns().length, table.getColumns().length)); + } + + final Map typesByName = Arrays.stream(importHeaders.getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); + + for (PPColumn column : importHeaders.getColumns()) { + if (!typesByName.containsKey(column.getName())) { + errors.add("Column[%s] is missing." + .formatted(column.getName())); + } + else if (!typesByName.get(column.getName()).equals(column.getType())) { + errors.add("Column[%s] Types do not match %s != %s" + .formatted(column.getName(), typesByName.get(column.getName()), column.getType())); + } + } + + return errors; + } + @Override public void execute() throws JSONException, InterruptedException, IOException { diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java index 0f25265771..b032dbb8a8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java @@ -1,14 +1,6 @@ package com.bakdata.conquery.models.preproc; -import java.util.Arrays; -import java.util.Map; -import java.util.StringJoiner; -import java.util.stream.Collectors; - -import com.bakdata.conquery.models.datasets.Table; -import com.bakdata.conquery.models.events.MajorTypeId; import com.fasterxml.jackson.annotation.JsonCreator; -import jakarta.ws.rs.BadRequestException; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; @@ -52,35 +44,4 @@ public class PreprocessedHeader { */ private int validityHash; - - /** - * Verify that the supplied table matches the preprocessed data in shape. - */ - public void assertMatch(Table table) { - final StringJoiner errors = new StringJoiner("\n - "); - - if (table.getColumns().length != getColumns().length) { - errors.add(String.format("Import column count=%d does not match table column count=%d", getColumns().length, table.getColumns().length)); - } - - final Map typesByName = Arrays.stream(getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); - - for (PPColumn column : getColumns()) { - if (!typesByName.containsKey(column.getName())) { - errors.add("Column[%s] is missing." - .formatted(column.getName())); - } - else if (!typesByName.get(column.getName()).equals(column.getType())) { - errors.add("Column[%s] Types do not match %s != %s" - .formatted(column.getName(), typesByName.get(column.getName()), column.getType())); - } - } - - if (errors.length() != 0) { - log.error("Problems concerning Import `{}`:\n - {}", name, errors); - throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]: %s", getTable(), getName(), table.getId(), errors)); - } - } - - } diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java index fcec135897..7feeecc553 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java @@ -34,7 +34,6 @@ import jakarta.ws.rs.DELETE; import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; -import jakarta.ws.rs.InternalServerErrorException; import jakarta.ws.rs.POST; import jakarta.ws.rs.PUT; import jakarta.ws.rs.Path; @@ -47,6 +46,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -142,15 +142,7 @@ public void addTable(Table table) { @Consumes(MediaType.APPLICATION_OCTET_STREAM) @Path("cqpp") public void updateCqppImport(@NotNull InputStream importStream) throws IOException { - try { - processor.updateImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); - } - catch (WebApplicationException wex) { - throw wex; - } - catch (Exception ex) { - throw new InternalServerErrorException(ex); - } + processor.updateImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); } @PUT @@ -162,28 +154,15 @@ public void updateImport(@NotNull @QueryParam("file") File importFile) throws We catch (IOException err) { throw new WebApplicationException(String.format("Invalid file (`%s`) supplied.", importFile), err, Status.BAD_REQUEST); } - catch (WebApplicationException wex) { - throw wex; - } - catch (Exception ex) { - throw new InternalServerErrorException(ex); - } } @POST @Consumes(MediaType.APPLICATION_OCTET_STREAM) @Path("cqpp") + @SneakyThrows public void uploadImport(@NotNull InputStream importStream) { log.debug("Importing from file upload"); - try { - processor.addImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); - } - catch (WebApplicationException wex) { - throw wex; - } - catch (Exception ex) { - throw new InternalServerErrorException(ex); - } + processor.addImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream))); } @POST @@ -196,12 +175,6 @@ public void addImport(@QueryParam("file") File importFile) throws WebApplication log.warn("Unable to process import", err); throw new WebApplicationException(String.format("Invalid file (`%s`) supplied.", importFile), err, Status.BAD_REQUEST); } - catch (WebApplicationException wex) { - throw wex; - } - catch (Exception ex) { - throw new InternalServerErrorException(ex); - } }