From d5e807ee465d3100e67f138c145442e744062cc0 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Wed, 28 Feb 2024 16:02:46 +0100 Subject: [PATCH] fixes dropping sharedDictionaries when the same one is referenced multiple times in one table --- .../bakdata/conquery/models/jobs/ImportJob.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java index ee3923983a..b788b6dec7 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java @@ -201,14 +201,13 @@ private static Map createLocalIdReplacements(Map importDictionaries(DistributedNamespace namespace, Map dicts, Column[] columns, String importName, Table table, IdMutex sharedDictionaryLocks) { + private static Map importDictionaries(DistributedNamespace namespace, Map dicts, Column[] columns, String importName, Table table, IdMutex sharedDictionaryLocks) { // Empty Maps are Coalesced to null by Jackson if (dicts == null) { return Collections.emptyMap(); } - final Map out = new ConcurrentHashMap<>(); log.debug("BEGIN importing {} Dictionaries", dicts.size()); @@ -228,6 +227,8 @@ private static Map importDictionaries(DistributedName storeAndDistributeDictionary(namespace, dictionary); }); + final Map out = new ConcurrentHashMap<>(); + // We group by sharedDictionary to avoid sending dictionaries multliple times Arrays.stream(columns) .parallel() @@ -258,7 +259,7 @@ private static Map importDictionaries(DistributedName final DictionaryMapping mapping = DictionaryMapping.createAndImport(importDictionary, sharedDictionary); newIds += mapping.getNumberOfNewIds(); - out.put(refColumn.getName(), mapping); + out.put(column, mapping); } if (newIds > 0) { @@ -296,7 +297,7 @@ public void execute() throws JSONException, InterruptedException, IOException { log.info("Importing Dictionaries"); - Map sharedDictionaryMappings = + Map sharedDictionaryMappings = importDictionaries(namespace, dictionaries.getDictionaries(), table.getColumns(), header.getName(), table, sharedDictionaryLocks); log.info("Remapping Dictionaries {}", sharedDictionaryMappings.values()); @@ -485,7 +486,7 @@ private void distributeWorkerResponsibilities(DictionaryMapping primaryMapping) /** * Apply new positions into incoming shared dictionaries. */ - private void remapToSharedDictionary(Map mappings, Map values) { + private void remapToSharedDictionary(Map mappings, Map values) { if (mappings.isEmpty()) { log.trace("No columns with shared dictionary appear to be in the import."); @@ -497,11 +498,11 @@ private void remapToSharedDictionary(Map mappings, Ma // we need to find a new Type for the index-Column as it's going to be remapped and might change in size mappings.entrySet().parallelStream() .forEach(entry -> { - final String columnName = entry.getKey(); + final Column column = entry.getKey(); final DictionaryMapping mapping = entry.getValue(); - final StringStore stringStore = (StringStore) values.get(columnName); - log.debug("Remapping Column[{}] = {} with {}", columnName, stringStore, mapping); + final StringStore stringStore = (StringStore) values.get(column.getName()); + log.debug("Remapping Column[{}] = {} with {}", column, stringStore, mapping); final IntegerParser indexParser = new IntegerParser(config); final IntSummaryStatistics statistics = mapping.target().intStream().summaryStatistics();