Skip to content

Commit

Permalink
fixes dropping sharedDictionaries when the same one is referenced mul…
Browse files Browse the repository at this point in the history
…tiple times in one table
  • Loading branch information
awildturtok committed Feb 28, 2024
1 parent 70dc817 commit d5e807e
Showing 1 changed file with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,13 @@ private static Map<DictionaryId, Dictionary> createLocalIdReplacements(Map<Strin
* Create mappings for shared dictionaries dict.
* This is not synchronized because the methods is called within the job execution.
*/
private static Map<String, DictionaryMapping> importDictionaries(DistributedNamespace namespace, Map<String, Dictionary> dicts, Column[] columns, String importName, Table table, IdMutex<DictionaryId> sharedDictionaryLocks) {
private static Map<Column, DictionaryMapping> importDictionaries(DistributedNamespace namespace, Map<String, Dictionary> dicts, Column[] columns, String importName, Table table, IdMutex<DictionaryId> sharedDictionaryLocks) {

// Empty Maps are Coalesced to null by Jackson
if (dicts == null) {
return Collections.emptyMap();
}

final Map<String, DictionaryMapping> out = new ConcurrentHashMap<>();

log.debug("BEGIN importing {} Dictionaries", dicts.size());

Expand All @@ -228,6 +227,8 @@ private static Map<String, DictionaryMapping> importDictionaries(DistributedName
storeAndDistributeDictionary(namespace, dictionary);
});

final Map<Column, DictionaryMapping> out = new ConcurrentHashMap<>();

// We group by sharedDictionary to avoid sending dictionaries multliple times
Arrays.stream(columns)
.parallel()
Expand Down Expand Up @@ -258,7 +259,7 @@ private static Map<String, DictionaryMapping> importDictionaries(DistributedName
final DictionaryMapping mapping = DictionaryMapping.createAndImport(importDictionary, sharedDictionary);

newIds += mapping.getNumberOfNewIds();
out.put(refColumn.getName(), mapping);
out.put(column, mapping);
}

if (newIds > 0) {
Expand Down Expand Up @@ -296,7 +297,7 @@ public void execute() throws JSONException, InterruptedException, IOException {

log.info("Importing Dictionaries");

Map<String, DictionaryMapping> sharedDictionaryMappings =
Map<Column, DictionaryMapping> sharedDictionaryMappings =
importDictionaries(namespace, dictionaries.getDictionaries(), table.getColumns(), header.getName(), table, sharedDictionaryLocks);

log.info("Remapping Dictionaries {}", sharedDictionaryMappings.values());
Expand Down Expand Up @@ -485,7 +486,7 @@ private void distributeWorkerResponsibilities(DictionaryMapping primaryMapping)
/**
* Apply new positions into incoming shared dictionaries.
*/
private void remapToSharedDictionary(Map<String, DictionaryMapping> mappings, Map<String, ColumnStore> values) {
private void remapToSharedDictionary(Map<Column, DictionaryMapping> mappings, Map<String, ColumnStore> values) {

if (mappings.isEmpty()) {
log.trace("No columns with shared dictionary appear to be in the import.");
Expand All @@ -497,11 +498,11 @@ private void remapToSharedDictionary(Map<String, DictionaryMapping> mappings, Ma
// we need to find a new Type for the index-Column as it's going to be remapped and might change in size
mappings.entrySet().parallelStream()
.forEach(entry -> {
final String columnName = entry.getKey();
final Column column = entry.getKey();
final DictionaryMapping mapping = entry.getValue();

final StringStore stringStore = (StringStore) values.get(columnName);
log.debug("Remapping Column[{}] = {} with {}", columnName, stringStore, mapping);
final StringStore stringStore = (StringStore) values.get(column.getName());
log.debug("Remapping Column[{}] = {} with {}", column, stringStore, mapping);
final IntegerParser indexParser = new IntegerParser(config);
final IntSummaryStatistics statistics = mapping.target().intStream().summaryStatistics();

Expand Down

0 comments on commit d5e807e

Please sign in to comment.