Skip to content

Commit

Permalink
move error generation to ImportJob#createOrUpdate
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok committed Apr 30, 2024
1 parent 5e62894 commit 17b44ed
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -18,6 +19,7 @@
import com.bakdata.conquery.models.datasets.ImportColumn;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.MajorTypeId;
import com.bakdata.conquery.models.events.stores.root.ColumnStore;
import com.bakdata.conquery.models.exceptions.JSONException;
import com.bakdata.conquery.models.identifiable.ids.specific.BucketId;
Expand All @@ -27,6 +29,7 @@
import com.bakdata.conquery.models.messages.namespaces.specific.AddImport;
import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket;
import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob;
import com.bakdata.conquery.models.preproc.PPColumn;
import com.bakdata.conquery.models.preproc.PreprocessedData;
import com.bakdata.conquery.models.preproc.PreprocessedHeader;
import com.bakdata.conquery.models.preproc.PreprocessedReader;
Expand Down Expand Up @@ -84,7 +87,14 @@ public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStre
}

// Ensure that Import and Table have the same schema
header.assertMatch(table);
final List<String> validationErrors = ensureHeadersMatch(table, header);

if(!validationErrors.isEmpty()){
final String errorMessage = String.join("\n -", validationErrors);

log.error("Problems concerning Import `{}`:{}", header.getName(), errorMessage);
throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]:%s", header.getTable(), header.getName(), table.getId(), errorMessage));
}

final ImportId importId = new ImportId(table.getId(), header.getName());
final Import processedImport = namespace.getStorage().getImport(importId);
Expand Down Expand Up @@ -120,6 +130,34 @@ else if (processedImport != null) {
}
}

/**
* Verify that the supplied table matches the preprocessed data in shape.
*/
public static List<String> ensureHeadersMatch(Table table, PreprocessedHeader importHeaders) {
// final StringJoiner errors = new StringJoiner("\n - ", "\n - ", "");

final List<String> errors = new ArrayList<>();

if (table.getColumns().length != importHeaders.getColumns().length) {
errors.add(String.format("Import column count=%d does not match table column count=%d", importHeaders.getColumns().length, table.getColumns().length));
}

final Map<String, MajorTypeId> typesByName = Arrays.stream(importHeaders.getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType));

for (PPColumn column : importHeaders.getColumns()) {
if (!typesByName.containsKey(column.getName())) {
errors.add("Column[%s] is missing."
.formatted(column.getName()));
}
else if (!typesByName.get(column.getName()).equals(column.getType())) {
errors.add("Column[%s] Types do not match %s != %s"
.formatted(column.getName(), typesByName.get(column.getName()), column.getType()));
}
}

return errors;
}


@Override
public void execute() throws JSONException, InterruptedException, IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
package com.bakdata.conquery.models.preproc;

import java.util.Arrays;
import java.util.Map;
import java.util.StringJoiner;
import java.util.stream.Collectors;

import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.events.MajorTypeId;
import com.fasterxml.jackson.annotation.JsonCreator;
import jakarta.ws.rs.BadRequestException;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
Expand Down Expand Up @@ -52,35 +44,4 @@ public class PreprocessedHeader {
*/
private int validityHash;


/**
* Verify that the supplied table matches the preprocessed data in shape.
*/
public void assertMatch(Table table) {
final StringJoiner errors = new StringJoiner("\n - ");

if (table.getColumns().length != getColumns().length) {
errors.add(String.format("Import column count=%d does not match table column count=%d", getColumns().length, table.getColumns().length));
}

final Map<String, MajorTypeId> typesByName = Arrays.stream(getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType));

for (PPColumn column : getColumns()) {
if (!typesByName.containsKey(column.getName())) {
errors.add("Column[%s] is missing."
.formatted(column.getName()));
}
else if (!typesByName.get(column.getName()).equals(column.getType())) {
errors.add("Column[%s] Types do not match %s != %s"
.formatted(column.getName(), typesByName.get(column.getName()), column.getType()));
}
}

if (errors.length() != 0) {
log.error("Problems concerning Import `{}`:\n - {}", name, errors);
throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]: %s", getTable(), getName(), table.getId(), errors));
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.InternalServerErrorException;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
Expand All @@ -47,6 +46,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -142,15 +142,7 @@ public void addTable(Table table) {
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@Path("cqpp")
public void updateCqppImport(@NotNull InputStream importStream) throws IOException {
try {
processor.updateImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream)));
}
catch (WebApplicationException wex) {
throw wex;
}
catch (Exception ex) {
throw new InternalServerErrorException(ex);
}
processor.updateImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream)));
}

@PUT
Expand All @@ -162,28 +154,15 @@ public void updateImport(@NotNull @QueryParam("file") File importFile) throws We
catch (IOException err) {
throw new WebApplicationException(String.format("Invalid file (`%s`) supplied.", importFile), err, Status.BAD_REQUEST);
}
catch (WebApplicationException wex) {
throw wex;
}
catch (Exception ex) {
throw new InternalServerErrorException(ex);
}
}

@POST
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@Path("cqpp")
@SneakyThrows
public void uploadImport(@NotNull InputStream importStream) {
log.debug("Importing from file upload");
try {
processor.addImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream)));
}
catch (WebApplicationException wex) {
throw wex;
}
catch (Exception ex) {
throw new InternalServerErrorException(ex);
}
processor.addImport(namespace, new GZIPInputStream(new BufferedInputStream(importStream)));
}

@POST
Expand All @@ -196,12 +175,6 @@ public void addImport(@QueryParam("file") File importFile) throws WebApplication
log.warn("Unable to process import", err);
throw new WebApplicationException(String.format("Invalid file (`%s`) supplied.", importFile), err, Status.BAD_REQUEST);
}
catch (WebApplicationException wex) {
throw wex;
}
catch (Exception ex) {
throw new InternalServerErrorException(ex);
}
}


Expand Down

0 comments on commit 17b44ed

Please sign in to comment.