Skip to content

Commit

Permalink
Apply Scan Hints to SessionOptions (#2580)
Browse files Browse the repository at this point in the history
* Initial commit for scan hint overrides for shard index methods

* Update mocks for copy constructor test

* SessionOptions has methods to apply scan hints and scan consistency options, fixed several instances where new session options were created and scan hints were not applied

* Refactor ScannerFactory to support setting execution hints by table name or functional name

* clean up unused variable, import

* add reserved 'expansion' hint key, refactor ScannerFactory.applyConfigs to support falling back to secondary hint key
  • Loading branch information
apmoriarty authored Oct 11, 2024
1 parent 279c8a0 commit ead56e3
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public class GenericQueryConfiguration implements Serializable {

// either IMMEDIATE or EVENTUAL
private Map<String,ScannerBase.ConsistencyLevel> tableConsistencyLevels = new HashMap<>();
// provides default scan hints
// NOTE: accumulo reserves the execution hint name 'meta'
// NOTE: datawave reserves the execution hint name 'expansion' for index expansion
private Map<String,Map<String,String>> tableHints = new HashMap<>();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
public static final String QUERY_LOGIC_NAME_SOURCE = "queryLogic";

@SuppressWarnings("unused")
private static final long serialVersionUID = -4354990715046146110L;
private static final long serialVersionUID = 1071528787909021061L;
private static final Logger log = Logger.getLogger(ShardQueryConfiguration.class);

// is this a tld query, explicitly default to false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@ public ScannerStream visit(ASTEQNode node, Object data) {
String queryString = fieldName + "=='" + literal + "'";
options.addScanIterator(QueryScannerHelper.getQueryInfoIterator(config.getQuery(), false, queryString));

// easier to apply hints to new options than deal with copying existing hints between
options.applyExecutionHints(config.getIndexTableName(), config.getTableHints());
options.applyConsistencyLevel(config.getIndexTableName(), config.getTableConsistencyLevels());

scannerSession.setOptions(options);
scannerSession.setMaxResults(config.getMaxIndexBatchSize());
scannerSession.setExecutor(streamExecutor);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package datawave.query.jexl.lookups;

import static datawave.query.jexl.lookups.ShardIndexQueryTableStaticMethods.EXPANSION_HINT_KEY;

import java.io.IOException;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -126,7 +129,10 @@ public synchronized void submit() {
log.debug("Range: " + range);
bs = null;
try {
bs = scannerFactory.newScanner(config.getIndexTableName(), config.getAuthorizations(), config.getNumQueryThreads(), config.getQuery());
// the 'newScanner' method in the ScannerFactory has no knowledge about the 'expansion' hint, so determine hint here
String hintKey = config.getTableHints().containsKey(EXPANSION_HINT_KEY) ? EXPANSION_HINT_KEY : config.getIndexTableName();

bs = scannerFactory.newScanner(config.getIndexTableName(), config.getAuthorizations(), config.getNumQueryThreads(), config.getQuery(), hintKey);

bs.setRanges(Collections.singleton(range));
bs.fetchColumnFamily(new Text(literalRange.getFieldName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public class ShardIndexQueryTableStaticMethods {

private static FastDateFormat formatter = FastDateFormat.getInstance("yyyyMMdd");

// name reserved for executor pools
public static final String EXPANSION_HINT_KEY = "expansion";

/**
* Create an IndexLookup task to find field names give a JexlNode and a set of Types for that node
*
Expand Down Expand Up @@ -440,9 +443,13 @@ public static Range getLiteralRange(String fieldName, String normalizedQueryTerm
* check for limiting unique terms
* @return the scanner session
* @throws InvocationTargetException
* if no target exists
* @throws NoSuchMethodException
* if no method exists
* @throws InstantiationException
* if there is a problem initializing
* @throws IllegalAccessException
* if there is an illegal access
* @throws IOException
* dates can't be formatted
*/
Expand All @@ -455,7 +462,9 @@ public static ScannerSession configureTermMatchOnly(ShardQueryConfiguration conf
return null;
}

ScannerSession bs = scannerFactory.newLimitedScanner(AnyFieldScanner.class, tableName, config.getAuthorizations(), config.getQuery());
String hintKey = config.getTableHints().containsKey(EXPANSION_HINT_KEY) ? EXPANSION_HINT_KEY : config.getIndexTableName();

ScannerSession bs = scannerFactory.newLimitedScanner(AnyFieldScanner.class, tableName, config.getAuthorizations(), config.getQuery(), hintKey);

bs.setRanges(ranges);

Expand Down Expand Up @@ -483,7 +492,9 @@ public static ScannerSession configureLimitedDiscovery(ShardQueryConfiguration c
return null;
}

ScannerSession bs = scannerFactory.newLimitedScanner(AnyFieldScanner.class, tableName, config.getAuthorizations(), config.getQuery());
String hintKey = config.getTableHints().containsKey(EXPANSION_HINT_KEY) ? EXPANSION_HINT_KEY : tableName;

ScannerSession bs = scannerFactory.newLimitedScanner(AnyFieldScanner.class, tableName, config.getAuthorizations(), config.getQuery(), hintKey);

bs.setRanges(ranges);

Expand Down Expand Up @@ -511,6 +522,13 @@ public static final void configureGlobalIndexDateRangeFilter(ShardQueryConfigura
}
IteratorSetting cfg = configureGlobalIndexDateRangeFilter(config, dateRange);
bs.addScanIterator(cfg);

// unused method, but we'll still configure execution hints if possible
String executionHintKey = config.getTableHints().containsKey(EXPANSION_HINT_KEY) ? EXPANSION_HINT_KEY : config.getIndexTableName();

if (config.getTableHints().containsKey(executionHintKey)) {
bs.setExecutionHints(config.getTableHints().get(executionHintKey));
}
}

public static final IteratorSetting configureGlobalIndexDateRangeFilter(ShardQueryConfiguration config, LongRange dateRange) {
Expand Down Expand Up @@ -580,6 +598,16 @@ public static final void configureGlobalIndexTermMatchingIterator(ShardQueryConf

bs.addScanIterator(cfg);

// unused method, but we'll still configure execution hints if possible
if (!reverseIndex) {
// only apply hints to the global index
String hintKey = config.getTableHints().containsKey(EXPANSION_HINT_KEY) ? EXPANSION_HINT_KEY : config.getIndexTableName();

if (config.getTableHints().containsKey(hintKey)) {
bs.setExecutionHints(config.getTableHints().get(hintKey));
}
}

setExpansionFields(config, bs, reverseIndex, expansionFields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public List<ScannerChunk> apply(QueryData qd) {

options.setQueryConfig(this.config);

String tableName = tableId.canonical();
options.applyExecutionHints(tableName, config.getTableHints());
options.applyConsistencyLevel(tableName, config.getTableConsistencyLevels());

chunks.add(new ScannerChunk(options, plan.getRanges(), qd, server));
} catch (Exception e) {
log.error(e);
Expand Down
Loading

0 comments on commit ead56e3

Please sign in to comment.