Skip to content

Commit

Permalink
Merge pull request #3318 from ingef/feature/fix-multiple-shared-dicts…
Browse files Browse the repository at this point in the history
…-in-one-table

fixes dropping sharedDictionaries when the same one is referenced mul…
  • Loading branch information
awildturtok authored Feb 29, 2024
2 parents 8ee24cc + d5e807e commit 2faca03
Showing 1 changed file with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,13 @@ private static Map<DictionaryId, Dictionary> createLocalIdReplacements(Map<Strin
* Create mappings for shared dictionaries dict.
* This is not synchronized because the methods is called within the job execution.
*/
private static Map<String, DictionaryMapping> importDictionaries(DistributedNamespace namespace, Map<String, Dictionary> dicts, Column[] columns, String importName, Table table, IdMutex<DictionaryId> sharedDictionaryLocks) {
private static Map<Column, DictionaryMapping> importDictionaries(DistributedNamespace namespace, Map<String, Dictionary> dicts, Column[] columns, String importName, Table table, IdMutex<DictionaryId> sharedDictionaryLocks) {

// Empty Maps are Coalesced to null by Jackson
if (dicts == null) {
return Collections.emptyMap();
}

final Map<String, DictionaryMapping> out = new ConcurrentHashMap<>();

log.debug("BEGIN importing {} Dictionaries", dicts.size());

Expand All @@ -228,6 +227,8 @@ private static Map<String, DictionaryMapping> importDictionaries(DistributedName
storeAndDistributeDictionary(namespace, dictionary);
});

final Map<Column, DictionaryMapping> out = new ConcurrentHashMap<>();

// We group by sharedDictionary to avoid sending dictionaries multliple times
Arrays.stream(columns)
.parallel()
Expand Down Expand Up @@ -258,7 +259,7 @@ private static Map<String, DictionaryMapping> importDictionaries(DistributedName
final DictionaryMapping mapping = DictionaryMapping.createAndImport(importDictionary, sharedDictionary);

newIds += mapping.getNumberOfNewIds();
out.put(refColumn.getName(), mapping);
out.put(column, mapping);
}

if (newIds > 0) {
Expand Down Expand Up @@ -296,7 +297,7 @@ public void execute() throws JSONException, InterruptedException, IOException {

log.info("Importing Dictionaries");

Map<String, DictionaryMapping> sharedDictionaryMappings =
Map<Column, DictionaryMapping> sharedDictionaryMappings =
importDictionaries(namespace, dictionaries.getDictionaries(), table.getColumns(), header.getName(), table, sharedDictionaryLocks);

log.info("Remapping Dictionaries {}", sharedDictionaryMappings.values());
Expand Down Expand Up @@ -485,7 +486,7 @@ private void distributeWorkerResponsibilities(DictionaryMapping primaryMapping)
/**
* Apply new positions into incoming shared dictionaries.
*/
private void remapToSharedDictionary(Map<String, DictionaryMapping> mappings, Map<String, ColumnStore> values) {
private void remapToSharedDictionary(Map<Column, DictionaryMapping> mappings, Map<String, ColumnStore> values) {

if (mappings.isEmpty()) {
log.trace("No columns with shared dictionary appear to be in the import.");
Expand All @@ -497,11 +498,11 @@ private void remapToSharedDictionary(Map<String, DictionaryMapping> mappings, Ma
// we need to find a new Type for the index-Column as it's going to be remapped and might change in size
mappings.entrySet().parallelStream()
.forEach(entry -> {
final String columnName = entry.getKey();
final Column column = entry.getKey();
final DictionaryMapping mapping = entry.getValue();

final StringStore stringStore = (StringStore) values.get(columnName);
log.debug("Remapping Column[{}] = {} with {}", columnName, stringStore, mapping);
final StringStore stringStore = (StringStore) values.get(column.getName());
log.debug("Remapping Column[{}] = {} with {}", column, stringStore, mapping);
final IntegerParser indexParser = new IntegerParser(config);
final IntSummaryStatistics statistics = mapping.target().intStream().summaryStatistics();

Expand Down

0 comments on commit 2faca03

Please sign in to comment.