diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java index 723e4f68f0..ee3923983a 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java @@ -228,19 +228,21 @@ private static Map importDictionaries(DistributedName storeAndDistributeDictionary(namespace, dictionary); }); + // We group by sharedDictionary to avoid sending dictionaries multliple times Arrays.stream(columns) .parallel() .filter(column -> column.getType() == MajorTypeId.STRING) .filter(col -> col.getSharedDictionary() != null) .filter(col -> dicts.containsKey(col.getName())) - .forEach(column -> { - final Dictionary importDictionary = dicts.get(column.getName()); - - final String sharedDictionaryName = column.getSharedDictionary(); - log.debug("Column[{}.{}.{}] part of shared Dictionary[{}]", table.getId(), importName, column.getName(), sharedDictionaryName); - + .collect(Collectors.groupingBy(Column::getSharedDictionary)) + .values() + .forEach(allColumns -> { + final Column refColumn = allColumns.get(0); + final String sharedDictionaryName = refColumn.getSharedDictionary(); final DictionaryId dictionaryId = new DictionaryId(namespace.getDataset().getId(), sharedDictionaryName); + log.debug("Column[{}.{}.{}] part of shared Dictionary[{}]", table.getId(), importName, refColumn.getName(), sharedDictionaryName); + // We have to lock here, as sibling columns might both use the same shared-dictionary try (IdMutex.Locked lock = sharedDictionaryLocks.acquire(dictionaryId)) { final Dictionary sharedDictionary = namespace.getStorage().getDictionary(dictionaryId); @@ -248,13 +250,20 @@ private static Map importDictionaries(DistributedName ResourceUtil.throwNotFoundIfNull(dictionaryId, sharedDictionary); log.trace("Merging into shared Dictionary[{}]", sharedDictionary); - final DictionaryMapping mapping = DictionaryMapping.createAndImport(importDictionary, sharedDictionary); + int newIds = 0; - if (mapping.getNumberOfNewIds() != 0) { - storeAndDistributeDictionary(namespace, mapping.getTargetDictionary()); + for (Column column : allColumns) { + final Dictionary importDictionary = dicts.get(column.getName()); + + final DictionaryMapping mapping = DictionaryMapping.createAndImport(importDictionary, sharedDictionary); + + newIds += mapping.getNumberOfNewIds(); + out.put(refColumn.getName(), mapping); } - out.put(column.getName(), mapping); + if (newIds > 0) { + storeAndDistributeDictionary(namespace, sharedDictionary); + } } }); return out;