Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/sql/feature/absolute-form-conver…
Browse files Browse the repository at this point in the history
…sion' into sql/feature/absolute-form-conversion
  • Loading branch information
jnsrnhld committed Apr 18, 2024
2 parents 5f3dffb + 7dcf72a commit 27e6d88
Show file tree
Hide file tree
Showing 24 changed files with 380 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.bakdata.conquery.apiv1.frontend.FrontendValue;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.models.config.IndexConfig;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.datasets.concepts.Searchable;
Expand Down Expand Up @@ -40,7 +39,7 @@
@ToString
@Slf4j
@CPSType(id = "CSV_TEMPLATE", base = SearchIndex.class)
public class FilterTemplate extends IdentifiableImpl<SearchIndexId> implements Searchable<SearchIndexId>, SearchIndex {
public class FilterTemplate extends IdentifiableImpl<SearchIndexId> implements Searchable, SearchIndex {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -90,7 +89,7 @@ public boolean isSearchDisabled() {
return false;
}

public TrieSearch<FrontendValue> createTrieSearch(IndexConfig config, NamespaceStorage storage) {
public TrieSearch<FrontendValue> createTrieSearch(IndexConfig config) {

final URI resolvedURI = FileUtil.getResolvedUri(config.getBaseUrl(), getFilePath());
log.trace("Resolved filter template reference url for search '{}': {}", this.getId(), resolvedURI);
Expand Down
71 changes: 71 additions & 0 deletions backend/src/main/java/com/bakdata/conquery/apiv1/LabelMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.bakdata.conquery.apiv1;

import java.util.List;
import java.util.stream.Collectors;

import com.bakdata.conquery.apiv1.frontend.FrontendValue;
import com.bakdata.conquery.models.config.IndexConfig;
import com.bakdata.conquery.models.datasets.concepts.Searchable;
import com.bakdata.conquery.models.identifiable.ids.specific.FilterId;
import com.bakdata.conquery.models.query.FilterSearch;
import com.bakdata.conquery.util.search.TrieSearch;
import com.google.common.collect.BiMap;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;

@Getter
@RequiredArgsConstructor
@Slf4j
@EqualsAndHashCode
public class LabelMap implements Searchable {

private final FilterId id;
@Delegate
private final BiMap<String, String> delegate;
private final int minSuffixLength;
private final boolean generateSearchSuffixes;

@Override
public TrieSearch<FrontendValue> createTrieSearch(IndexConfig config) {

final TrieSearch<FrontendValue> search = config.createTrieSearch(true);

final List<FrontendValue> collected = delegate.entrySet().stream()
.map(entry -> new FrontendValue(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());

if (log.isTraceEnabled()) {
log.trace("Labels for {}: `{}`", getId(), collected.stream().map(FrontendValue::toString).collect(Collectors.toList()));
}

StopWatch timer = StopWatch.createStarted();
log.trace("START-SELECT ADDING_ITEMS for {}", getId());

collected.forEach(feValue -> search.addItem(feValue, FilterSearch.extractKeywords(feValue)));

log.trace("DONE-SELECT ADDING_ITEMS for {} in {}", getId(), timer);

timer.reset();
log.trace("START-SELECT SHRINKING for {}", getId());

search.shrinkToFit();

log.trace("DONE-SELECT SHRINKING for {} in {}", getId(), timer);

return search;
}

@Override
public boolean isGenerateSuffixes() {
return generateSearchSuffixes;
}

@Override
public boolean isSearchDisabled() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,15 +554,14 @@ public Stream<Map<String, String>> resolveEntities(Subject subject, List<FilterV
.filter(Predicate.not(Map::isEmpty));
}

public ResultStatistics getResultStatistics(ManagedQuery managedQuery) {
final Query query = managedQuery.getQuery();
final List<ResultInfo> resultInfos = query.getResultInfos();
public ResultStatistics getResultStatistics(SingleTableResult managedQuery) {
final List<ResultInfo> resultInfos = managedQuery.getResultInfos();

final Optional<ResultInfo>
dateInfo =
query.getResultInfos().stream().filter(info -> info.getSemantics().contains(new SemanticType.EventDateT())).findFirst();
resultInfos.stream().filter(info -> info.getSemantics().contains(new SemanticType.EventDateT())).findFirst();

final int dateIndex = dateInfo.map(resultInfos::indexOf).orElse(0 /*Discarded if dateInfo is not present*/);
final Optional<Integer> dateIndex = dateInfo.map(resultInfos::indexOf);

final Locale locale = I18n.LOCALE.get();
final NumberFormat decimalFormat = NumberFormat.getNumberInstance(locale);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,29 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.jetbrains.annotations.NotNull;

@RequiredArgsConstructor
@Slf4j
public class NetworkSession implements MessageSender<NetworkMessage<?>> {
public static final int MAX_MESSAGE_LENGTH = 30;
public static final int MAX_QUEUE_LENGTH = 20;
@Getter
private final IoSession session;
private final LinkedBlockingQueue<NetworkMessage<?>> queuedMessages = new LinkedBlockingQueue<>(20);
private final LinkedBlockingQueue<NetworkMessage<?>> queuedMessages = new LinkedBlockingQueue<>(MAX_QUEUE_LENGTH);

@Override
public WriteFuture send(final NetworkMessage<?> message) {
try {
while (!queuedMessages.offer(message, 2, TimeUnit.MINUTES)) {
log.debug(
"Waiting for full writing queue for {}\n\tcurrently filled by: {}",
message,
new ArrayList<>(queuedMessages)
.stream()
.map(Objects::toString)
.collect(Collectors.joining("\n\t\t"))
log.debug("Waiting for full writing queue for {} currently filled by:\n\t- {}",
message,
log.isTraceEnabled()
? new ArrayList<>(queuedMessages).stream()
.map(Objects::toString)
.map(NetworkSession::shorten)
.collect(Collectors.joining("\n\t\t- "))
: "%s messages".formatted(queuedMessages.size())
);
}
}
Expand All @@ -45,6 +49,16 @@ public WriteFuture send(final NetworkMessage<?> message) {
return future;
}

@NotNull
private static String shorten(String desc) {
if (desc.length() <= MAX_MESSAGE_LENGTH) {
return desc;
}

return desc.substring(0, MAX_MESSAGE_LENGTH) + "…";

}

@Override
public void trySend(final NetworkMessage<?> message) {
if (isConnected()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
import java.util.List;
import java.util.Set;

import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validator;

import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.models.config.StoreFactory;
import com.bakdata.conquery.models.datasets.Column;
Expand All @@ -26,6 +23,8 @@
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validator;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.ToString;
Expand Down Expand Up @@ -101,14 +100,16 @@ private void decorateSecondaryIdDescriptionStore(IdentifiableStore<SecondaryIdDe

private void decorateTableStore(IdentifiableStore<Table> store) {
store.onAdd(table -> {
for (Column c : table.getColumns()) {
getCentralRegistry().register(c);
}
}).onRemove(table -> {
for (Column c : table.getColumns()) {
getCentralRegistry().remove(c);
}
});
for (Column column : table.getColumns()) {
column.init();
getCentralRegistry().register(column);
}
})
.onRemove(table -> {
for (Column c : table.getColumns()) {
getCentralRegistry().remove(c);
}
});
}

private void decorateConceptStore(IdentifiableStore<Concept<?>> store) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final C

JobManager jobManager = new JobManager(storage.getDataset().getName(), config.isFailOnError());

FilterSearch filterSearch = new FilterSearch(storage, jobManager, config.getCsv(), config.getIndex());
FilterSearch filterSearch = new FilterSearch(config.getIndex());
return new NamespaceSetupData(injectables, indexService, communicationMapper, preprocessMapper, jobManager, filterSearch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import com.bakdata.conquery.apiv1.frontend.FrontendValue;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.models.config.IndexConfig;
import com.bakdata.conquery.models.datasets.concepts.Searchable;
import com.bakdata.conquery.models.events.MajorTypeId;
Expand All @@ -26,7 +25,7 @@
@Setter
@NoArgsConstructor
@Slf4j
public class Column extends Labeled<ColumnId> implements NamespacedIdentifiable<ColumnId>, Searchable<ColumnId> {
public class Column extends Labeled<ColumnId> implements NamespacedIdentifiable<ColumnId>, Searchable {

public static final int UNKNOWN_POSITION = -1;

Expand All @@ -45,8 +44,8 @@ public class Column extends Labeled<ColumnId> implements NamespacedIdentifiable<
private boolean searchDisabled = false;

@JsonIgnore
@Getter(lazy = true)
private final int position = ArrayUtils.indexOf(getTable().getColumns(), this);
private int position = -1;

/**
* if this is set this column counts as the secondary id of the given name for this
* table
Expand Down Expand Up @@ -75,8 +74,11 @@ public Dataset getDataset() {
* We create only an empty search here, because the content is provided through {@link com.bakdata.conquery.models.messages.namespaces.specific.RegisterColumnValues} and filled by the caller.
*/
@Override
public TrieSearch<FrontendValue> createTrieSearch(IndexConfig config, NamespaceStorage storage) {

public TrieSearch<FrontendValue> createTrieSearch(IndexConfig config) {
return config.createTrieSearch(isGenerateSuffixes());
}

public void init() {
position = ArrayUtils.indexOf(getTable().getColumns(), this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
import java.util.List;

import com.bakdata.conquery.apiv1.frontend.FrontendValue;
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.models.config.IndexConfig;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.identifiable.Identifiable;
import com.bakdata.conquery.models.identifiable.ids.Id;
import com.bakdata.conquery.models.query.FilterSearch;
import com.bakdata.conquery.util.search.TrieSearch;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand All @@ -18,25 +14,12 @@
* <p>
* Searchable classes describe how a search should be constructed, and provide the values with getSearchValues.
*/
public interface Searchable<ID extends Id<? extends Identifiable<? extends ID>>> extends Identifiable<ID> {

public Dataset getDataset();
public interface Searchable {

/**
* All available {@link FrontendValue}s for searching in a {@link TrieSearch}.
*/
TrieSearch<FrontendValue> createTrieSearch(IndexConfig config, NamespaceStorage storage);

/**
* The actual Searchables to use, if there is potential for deduplication/pooling.
*
* @implSpec The order of objects returned is used to also sort search results from different sources.
*/
@JsonIgnore
default List<Searchable<?>> getSearchReferences() {
//Hopefully the only candidate will be Column
return List.of(this);
}
TrieSearch<FrontendValue> createTrieSearch(IndexConfig config);

/**
* Parameter used in the construction of {@link com.bakdata.conquery.util.search.TrieSearch}, defining the shortest suffix to create.
Expand Down
Loading

0 comments on commit 27e6d88

Please sign in to comment.