Skip to content

Commit

Permalink
Merge pull request #3232 from ingef/feature/combine-shared-dict-merging
Browse files Browse the repository at this point in the history
Combine shared dict merging
  • Loading branch information
awildturtok authored Jan 29, 2024
2 parents 5127b18 + df3b5d7 commit df0c2f8
Showing 1 changed file with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,33 +228,42 @@ private static Map<String, DictionaryMapping> importDictionaries(DistributedName
storeAndDistributeDictionary(namespace, dictionary);
});

// We group by sharedDictionary to avoid sending dictionaries multliple times
Arrays.stream(columns)
.parallel()
.filter(column -> column.getType() == MajorTypeId.STRING)
.filter(col -> col.getSharedDictionary() != null)
.filter(col -> dicts.containsKey(col.getName()))
.forEach(column -> {
final Dictionary importDictionary = dicts.get(column.getName());

final String sharedDictionaryName = column.getSharedDictionary();
log.debug("Column[{}.{}.{}] part of shared Dictionary[{}]", table.getId(), importName, column.getName(), sharedDictionaryName);

.collect(Collectors.groupingBy(Column::getSharedDictionary))
.values()
.forEach(allColumns -> {
final Column refColumn = allColumns.get(0);
final String sharedDictionaryName = refColumn.getSharedDictionary();
final DictionaryId dictionaryId = new DictionaryId(namespace.getDataset().getId(), sharedDictionaryName);

log.debug("Column[{}.{}.{}] part of shared Dictionary[{}]", table.getId(), importName, refColumn.getName(), sharedDictionaryName);

// We have to lock here, as sibling columns might both use the same shared-dictionary
try (IdMutex.Locked lock = sharedDictionaryLocks.acquire(dictionaryId)) {
final Dictionary sharedDictionary = namespace.getStorage().getDictionary(dictionaryId);

ResourceUtil.throwNotFoundIfNull(dictionaryId, sharedDictionary);
log.trace("Merging into shared Dictionary[{}]", sharedDictionary);

final DictionaryMapping mapping = DictionaryMapping.createAndImport(importDictionary, sharedDictionary);
int newIds = 0;

if (mapping.getNumberOfNewIds() != 0) {
storeAndDistributeDictionary(namespace, mapping.getTargetDictionary());
for (Column column : allColumns) {
final Dictionary importDictionary = dicts.get(column.getName());

final DictionaryMapping mapping = DictionaryMapping.createAndImport(importDictionary, sharedDictionary);

newIds += mapping.getNumberOfNewIds();
out.put(refColumn.getName(), mapping);
}

out.put(column.getName(), mapping);
if (newIds > 0) {
storeAndDistributeDictionary(namespace, sharedDictionary);
}
}
});
return out;
Expand Down

0 comments on commit df0c2f8

Please sign in to comment.