Skip to content

Commit

Permalink
Use Streams for value auto-completion
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsrnhld committed Jan 24, 2024
1 parent dd5d148 commit 5114ac9
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.bakdata.conquery.mode;

import java.util.List;
import java.util.stream.Stream;

import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.models.datasets.Column;

public interface StorageHandler {

List<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column);
Stream<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column);

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.bakdata.conquery.mode.cluster;

import java.util.List;
import java.util.stream.Stream;

import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.StorageHandler;
Expand All @@ -11,13 +11,12 @@
public class ClusterStorageHandler implements StorageHandler {

@Override
public List<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column) {
public Stream<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column) {
return namespaceStorage.getAllImports().stream()
.filter(imp -> imp.getTable().equals(column.getTable()))
.flatMap(imp -> {
final ImportColumn importColumn = imp.getColumns()[column.getPosition()];
return ((StringStore) importColumn.getTypeDescription()).iterateValues();
})
.toList();
});
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.bakdata.conquery.mode.local;

import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.StorageHandler;
Expand All @@ -10,7 +9,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.Record1;
import org.jooq.Result;
import org.jooq.Select;
import org.jooq.impl.DSL;

Expand All @@ -21,24 +19,23 @@ public class SqlStorageHandler implements StorageHandler {
private final SqlExecutionService sqlExecutionService;

@Override
public List<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column) {
public Stream<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column) {
Select<Record1<Object>> columValuesQuery = DSL.selectDistinct(DSL.field(DSL.name(column.getName())))
.from(DSL.table(DSL.name(column.getTable().getName())));
return queryForDistinctValues(columValuesQuery);
}

private List<String> queryForDistinctValues(Select<Record1<Object>> columValuesQuery) {
Result<?> result = sqlExecutionService.execute(columValuesQuery);
private Stream<String> queryForDistinctValues(Select<Record1<Object>> columValuesQuery) {
try {
return result.getValues(0, String.class).stream()
// the database might return null or an empty string as a distinct value
.filter(value -> value != null && !value.isEmpty())
.toList();
return sqlExecutionService.fetchStream(columValuesQuery)
.map(record -> record.get(0, String.class))
// the database might return null or an empty string as a distinct value
.filter(value -> value != null && !value.isBlank());
}
catch (Exception e) {
log.error("Expecting exactly 1 column in Result when querying for distinct values of a column. Query: {}. Error: {}", columValuesQuery, e);
log.error("Expecting exactly 1 column in Result when querying for distinct values of a column. Query: {}.", columValuesQuery, e);
}
return Collections.emptyList();
return Stream.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,9 @@ public TrieSearch<FrontendValue> createTrieSearch(IndexConfig config, NamespaceS
final TrieSearch<FrontendValue> search = new TrieSearch<>(suffixLength, config.getSearchSplitChars());

storage.getStorageHandler()
.lookupColumnValues(storage, this).stream()
.lookupColumnValues(storage, this)
.map(value -> new FrontendValue(value, value))
.onClose(() -> log.debug("DONE processing values for {}", getId()))

.forEach(feValue -> search.addItem(feValue, FilterSearch.extractKeywords(feValue)));


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import com.bakdata.conquery.models.error.ConqueryError;
import com.bakdata.conquery.models.query.resultinfo.ResultInfo;
Expand All @@ -18,6 +19,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.Select;
import org.jooq.exception.DataAccessException;
Expand All @@ -40,7 +42,7 @@ public SqlExecutionResult execute(SqlManagedQuery sqlQuery) {
return result;
}

public Result<?> execute(Select<?> query) {
public Result<?> fetch(Select<?> query) {
log.debug("Executing query: \n{}", query);
try {
return dslContext.fetch(query);
Expand All @@ -50,6 +52,24 @@ public Result<?> execute(Select<?> query) {
}
}

/**
* Executes the query and returns the results as a Stream.
* <p>
* Note: The returned Stream is resourceful. It must be closed by the caller, because it contains a reference to an open ResultSet (and PreparedStatement).
*
* @param query The query to be executed.
* @return A Stream of query results.
*/
public <R extends Record> Stream<R> fetchStream(Select<R> query) {
log.debug("Executing query: \n{}", query);
try {
return dslContext.fetchStream(query);
}
catch (DataAccessException exception) {
throw new ConqueryError.SqlError(exception);
}
}

private SqlExecutionResult createStatementAndExecute(SqlManagedQuery sqlQuery, Connection connection) {

String sqlString = sqlQuery.getSqlQuery().getSql();
Expand Down

0 comments on commit 5114ac9

Please sign in to comment.