Skip to content

Commit

Permalink
Merge branch 'master' into faster-index-building-1-of-N
Browse files Browse the repository at this point in the history
  • Loading branch information
joka921 authored Nov 9, 2023
2 parents 541fac0 + efea020 commit 766e4bb
Show file tree
Hide file tree
Showing 46 changed files with 806 additions and 408 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ jobs:
run: |
brew install llvm@16
brew install conan@2
echo 'export PATH="/usr/local/opt/llvm/bin:$PATH"' >> ~/.bash_profile
echo PATH="/usr/local/opt/llvm/bin:$PATH" >> $GITHUB_ENV
echo 'export PATH="/usr/local/opt/llvm@16/bin:$PATH"' >> ~/.bash_profile
echo PATH="/usr/local/opt/llvm@16/bin:$PATH" >> $GITHUB_ENV
echo 'export LDFLAGS="-L/usr/local/opt/llvm@16/lib -L/usr/local/opt/llvm@16/lib/c++ -Wl,-rpath,/usr/local/opt/llvm@16/lib/c++"' >> ~/.bash_profile
echo LDFLAGS="-L/usr/local/opt/llvm@16/lib -L/usr/local/opt/llvm@16/lib/c++ -Wl,-rpath,/usr/local/opt/llvm@16/lib/c++" >> $GITHUB_ENV
echo 'export CPPFLAGS="-I/usr/local/opt/llvm@16/include"' >> ~/.bash_profile
echo CPPFLAGS="/usr/local/opt/llvm@16/include" >> $GITHUB_ENV
source ~/.bash_profile
- name: Pring clang version
- name: Print clang version
run: clang++ --version

- name: Cache for conan
Expand Down
21 changes: 7 additions & 14 deletions src/engine/CartesianProductJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,11 @@ bool CartesianProductJoin::knownEmptyResult() {
return std::ranges::any_of(childView(), &Operation::knownEmptyResult);
}

// Copy each element from the `inputColumn` `groupSize` times to the
// `targetColumn`. Repeat until the `targetColumn` is copletely filled. Skip the
// first `offset` write operations to the `targetColumn`. Call `checkForTimeout`
// after each write. If `StaticGroupSize != 0`, then the group size is known at
// compile time which allows for more efficient loop processing for very small
// group sizes.
template <size_t StaticGroupSize = 0>
static void writeResultColumn(std::span<Id> targetColumn,
std::span<const Id> inputColumn, size_t groupSize,
size_t offset, auto& checkForTimeout) {
// ____________________________________________________________________________
template <size_t StaticGroupSize>
void CartesianProductJoin::writeResultColumn(std::span<Id> targetColumn,
std::span<const Id> inputColumn,
size_t groupSize, size_t offset) {
if (StaticGroupSize != 0) {
AD_CORRECTNESS_CHECK(StaticGroupSize == groupSize);
}
Expand All @@ -117,7 +112,7 @@ static void writeResultColumn(std::span<Id> targetColumn,
}
targetColumn[numRowsWritten] = inputColumn[i];
++numRowsWritten;
checkForTimeout();
checkCancellation();
}
};
if constexpr (StaticGroupSize == 0) {
Expand Down Expand Up @@ -183,7 +178,6 @@ ResultTable CartesianProductJoin::computeResult() {

result.resize(totalSizeIncludingLimit);

auto checkForTimeout = checkTimeoutAfterNCallsFactory(1'000'000);
if (totalSizeIncludingLimit != 0) {
// A `groupSize` of N means that each row of the current result is copied N
// times adjacent to each other.
Expand All @@ -196,8 +190,7 @@ ResultTable CartesianProductJoin::computeResult() {
for (const auto& inputCol : input.getColumns()) {
decltype(auto) resultCol = result.getColumn(resultColIdx);
ad_utility::callFixedSize(groupSize, [&]<size_t I>() {
writeResultColumn<I>(resultCol, inputCol, groupSize, offset,
checkForTimeout);
writeResultColumn<I>(resultCol, inputCol, groupSize, offset);
});
++resultColIdx;
}
Expand Down
11 changes: 11 additions & 0 deletions src/engine/CartesianProductJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ class CartesianProductJoin : public Operation {
private:
//! Compute the result of the query-subtree rooted at this element..
ResultTable computeResult() override;

// Copy each element from the `inputColumn` `groupSize` times to the
// `targetColumn`. Repeat until the `targetColumn` is copletely filled. Skip
// the first `offset` write operations to the `targetColumn`. Call
// `checkCancellation` after each write. If `StaticGroupSize != 0`, then the
// group size is known at compile time which allows for more efficient loop
// processing for very small group sizes.
template <size_t StaticGroupSize = 0>
void writeResultColumn(std::span<Id> targetColumn,
std::span<const Id> inputColumn, size_t groupSize,
size_t offset);
};

#endif // QLEVER_CARTESIANPRODUCTJOIN_H
3 changes: 1 addition & 2 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,9 @@ void GroupBy::doGroupBy(const IdTable& dynInput,
currentGroupBlock.push_back(std::pair<size_t, Id>(col, input(0, col)));
}
size_t blockStart = 0;
auto checkTimeoutAfterNCalls = checkTimeoutAfterNCallsFactory(32000);

for (size_t pos = 1; pos < input.size(); pos++) {
checkTimeoutAfterNCalls(currentGroupBlock.size());
checkCancellation();
bool rowMatchesCurrentBlock =
std::all_of(currentGroupBlock.begin(), currentGroupBlock.end(),
[&](const auto& columns) {
Expand Down
10 changes: 5 additions & 5 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ ResultTable IndexScan::computeResult() {
const auto permutedTriple = getPermutedTriple();
if (numVariables_ == 2) {
idTable = index.scan(*permutedTriple[0], std::nullopt, permutation_,
_timeoutTimer);
cancellationHandle_);
} else if (numVariables_ == 1) {
idTable = index.scan(*permutedTriple[0], *permutedTriple[1], permutation_,
_timeoutTimer);
cancellationHandle_);
} else {
AD_CORRECTNESS_CHECK(numVariables_ == 3);
computeFullScan(&idTable, permutation_);
Expand Down Expand Up @@ -259,8 +259,8 @@ void IndexScan::computeFullScan(IdTable* result,
size_t i = 0;
const auto& permutationImpl =
getExecutionContext()->getIndex().getImpl().getPermutation(permutation);
auto triplesView = TriplesView(permutationImpl, ignoredRanges,
isTripleIgnored, _timeoutTimer);
auto triplesView = TriplesView(permutationImpl, cancellationHandle_,
ignoredRanges, isTripleIgnored);
for (const auto& triple : triplesView) {
if (i >= resultSize) {
break;
Expand Down Expand Up @@ -290,7 +290,7 @@ Permutation::IdTableGenerator IndexScan::getLazyScan(
col1Id = s.getPermutedTriple()[1]->toValueId(index.getVocab()).value();
}
return index.getPermutation(s.permutation())
.lazyScan(col0Id, col1Id, std::move(blocks), s._timeoutTimer);
.lazyScan(col0Id, col1Id, std::move(blocks), s.cancellationHandle_);
};

// ________________________________________________________________
Expand Down
19 changes: 12 additions & 7 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,17 @@ Join::ScanMethodType Join::getScanMethod(
// this works because the join operations execution Context never changes
// during its lifetime
const auto& idx = _executionContext->getIndex();
const auto scanLambda = [&idx](const Permutation::Enum perm) {
return [&idx, perm](Id id) { return idx.scan(id, std::nullopt, perm); };
};
const auto scanLambda =
[&idx](
const Permutation::Enum perm,
std::shared_ptr<ad_utility::CancellationHandle> cancellationHandle) {
return [&idx, perm,
cancellationHandle = std::move(cancellationHandle)](Id id) {
return idx.scan(id, std::nullopt, perm, cancellationHandle);
};
};
AD_CORRECTNESS_CHECK(scan.getResultWidth() == 3);
return scanLambda(scan.permutation());
return scanLambda(scan.permutation(), cancellationHandle_);
}

// _____________________________________________________________________________
Expand All @@ -320,7 +326,7 @@ void Join::doComputeJoinWithFullScanDummyRight(const IdTable& ndr,
LOG(TRACE) << "Inner scan with ID: " << currentJoinId << endl;
// The scan is a relatively expensive disk operation, so we can afford to
// check for timeouts before each call.
checkTimeout();
checkCancellation();
IdTable jr = scan(currentJoinId);
LOG(TRACE) << "Got #items: " << jr.size() << endl;
// Build the cross product.
Expand Down Expand Up @@ -451,8 +457,7 @@ void Join::join(const IdTable& a, ColumnIndex jc1, const IdTable& b,
if (a.empty() || b.empty()) {
return;
}
[[maybe_unused]] auto checkTimeoutAfterNCalls =
checkTimeoutAfterNCallsFactory();
checkCancellation();
ad_utility::JoinColumnMapping joinColumnData{
{{jc1, jc2}}, a.numColumns(), b.numColumns()};
auto joinColumnL = a.getColumn(jc1);
Expand Down
8 changes: 3 additions & 5 deletions src/engine/Minus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,21 @@ void Minus::computeMinus(
*/
auto writeResult = [&result, &a](size_t ia) { result.push_back(a[ia]); };

auto checkTimeout = checkTimeoutAfterNCallsFactory();

size_t ia = 0, ib = 0;
while (ia < a.size() && ib < b.size()) {
// Join columns 0 are the primary sort columns
while (a(ia, joinColumns[0][0]) < b(ib, joinColumns[0][1])) {
// Write a result
writeResult(ia);
ia++;
checkTimeout();
checkCancellation();
if (ia >= a.size()) {
goto finish;
}
}
while (b(ib, joinColumns[0][1]) < a(ia, joinColumns[0][0])) {
ib++;
checkTimeout();
checkCancellation();
if (ib >= b.size()) {
goto finish;
}
Expand Down Expand Up @@ -189,7 +187,7 @@ void Minus::computeMinus(
default:
AD_FAIL();
}
checkTimeout();
checkCancellation();
}
}
finish:
Expand Down
49 changes: 25 additions & 24 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,24 @@ vector<string> Operation::collectWarnings() const {
}

// ________________________________________________________________________
void Operation::recursivelySetTimeoutTimer(
const ad_utility::SharedConcurrentTimeoutTimer& timer) {
_timeoutTimer = timer;
for (auto child : getChildren()) {
if (child) {
child->recursivelySetTimeoutTimer(timer);
}
}
void Operation::recursivelySetCancellationHandle(
SharedCancellationHandle cancellationHandle) {
AD_CORRECTNESS_CHECK(cancellationHandle);
forAllDescendants([&cancellationHandle](auto child) {
child->getRootOperation()->recursivelySetCancellationHandle(
cancellationHandle);
});
cancellationHandle_ = std::move(cancellationHandle);
}

// ________________________________________________________________________

void Operation::recursivelySetTimeConstraint(
std::chrono::steady_clock::time_point deadline) {
deadline_ = deadline;
forAllDescendants([deadline](auto child) {
child->getRootOperation()->recursivelySetTimeConstraint(deadline);
});
}

// ________________________________________________________________________
Expand Down Expand Up @@ -117,12 +127,7 @@ shared_ptr<const ResultTable> Operation::getResult(bool isRoot,
}
});
auto computeLambda = [this, &timer] {
if (_timeoutTimer->wlock()->hasTimedOut()) {
throw ad_utility::TimeoutException(
"Timeout in operation with no or insufficient timeout "
"functionality, before " +
getDescriptor());
}
checkCancellation([this]() { return "Before " + getDescriptor(); });
runtimeInfo().status_ = RuntimeInformation::Status::inProgress;
signalQueryUpdate();
ResultTable result = computeResult();
Expand All @@ -136,12 +141,7 @@ shared_ptr<const ResultTable> Operation::getResult(bool isRoot,
// change in the DEBUG builds.
AD_EXPENSIVE_CHECK(
result.checkDefinedness(getExternallyVisibleVariableColumns()));
if (_timeoutTimer->wlock()->hasTimedOut()) {
throw ad_utility::TimeoutException(
"Timeout in " + getDescriptor() +
". This timeout was not caught inside the actual computation, "
"which indicates insufficient timeout functionality.");
}
checkCancellation([this]() { return "In " + getDescriptor(); });
// Make sure that the results that are written to the cache have the
// correct runtimeInfo. The children of the runtime info are already set
// correctly because the result was computed, so we can pass `nullopt` as
Expand Down Expand Up @@ -213,10 +213,11 @@ shared_ptr<const ResultTable> Operation::getResult(bool isRoot,
}

// ______________________________________________________________________
void Operation::checkTimeout() const {
if (_timeoutTimer->wlock()->hasTimedOut()) {
throw ad_utility::TimeoutException("Timeout in " + getDescriptor());
}

std::chrono::milliseconds Operation::remainingTime() const {
auto interval = deadline_ - std::chrono::steady_clock::now();
return std::max(
0ms, std::chrono::duration_cast<std::chrono::milliseconds>(interval));
}

// _______________________________________________________________________
Expand Down
76 changes: 43 additions & 33 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
#include "engine/VariableToColumnMap.h"
#include "parser/data/LimitOffsetClause.h"
#include "parser/data/Variable.h"
#include "util/CancellationHandle.h"
#include "util/CompilerExtensions.h"
#include "util/Exception.h"
#include "util/Log.h"
#include "util/Timer.h"
#include "util/TypeTraits.h"

// forward declaration needed to break dependencies
class QueryExecutionTree;

class Operation {
using SharedCancellationHandle =
std::shared_ptr<ad_utility::CancellationHandle>;
using Milliseconds = std::chrono::milliseconds;

public:
Expand Down Expand Up @@ -144,11 +148,20 @@ class Operation {
shared_ptr<const ResultTable> getResult(bool isRoot = false,
bool onlyReadFromCache = false);

// Use the same timeout timer for all children of an operation (= query plan
// rooted at that operation). As soon as one child times out, the whole
// operation times out.
void recursivelySetTimeoutTimer(
const ad_utility::SharedConcurrentTimeoutTimer& timer);
// Use the same cancellation handle for all children of an operation (= query
// plan rooted at that operation). As soon as one child is aborted, the whole
// operation is aborted out.
void recursivelySetCancellationHandle(
SharedCancellationHandle cancellationHandle);

template <typename Rep, typename Period>
void recursivelySetTimeConstraint(
std::chrono::duration<Rep, Period> duration) {
recursivelySetTimeConstraint(std::chrono::steady_clock::now() + duration);
}

void recursivelySetTimeConstraint(
std::chrono::steady_clock::time_point deadline);

// True iff this operation directly implement a `LIMIT` clause on its result.
[[nodiscard]] virtual bool supportsLimit() const { return false; }
Expand Down Expand Up @@ -200,35 +213,32 @@ class Operation {
return _warnings;
}

// Check if there is still time left and throw a TimeoutException otherwise.
// This will be called at strategic places on code that potentially can take a
// (too) long time.
void checkTimeout() const;

// Handles the timeout of this operation.
ad_utility::SharedConcurrentTimeoutTimer _timeoutTimer =
std::make_shared<ad_utility::ConcurrentTimeoutTimer>(
ad_utility::TimeoutTimer::unlimited());

// Returns a lambda with the following behavior: For every call, increase the
// internal counter i by countIncrease. If the counter exceeds countMax, check
// for timeout and reset the counter to zero. That way, the expensive timeout
// check is called only rarely. Note that we sometimes need to "simulate"
// several operations at a time, hence the countIncrease.
auto checkTimeoutAfterNCallsFactory(
size_t numOperationsBetweenTimeoutChecks =
NUM_OPERATIONS_BETWEEN_TIMEOUT_CHECKS) const {
return [numOperationsBetweenTimeoutChecks, i = 0ull,
this](size_t countIncrease = 1) mutable {
i += countIncrease;
if (i >= numOperationsBetweenTimeoutChecks) {
_timeoutTimer->wlock()->checkTimeoutAndThrow(
[this]() { return "Timeout in " + getDescriptor(); });
i = 0;
}
};
// Check if the cancellation flag has been set and throw an exception if
// that's the case. This will be called at strategic places on code that
// potentially can take a (too) long time. This function is designed to be
// as lightweight as possible because of that. The `detailSupplier` allows to
// pass a message to add to any potential exception that might be thrown.
AD_ALWAYS_INLINE void checkCancellation(
const ad_utility::InvocableWithReturnType<std::string_view> auto&
detailSupplier) const {
cancellationHandle_->throwIfCancelled(detailSupplier);
}

// Same as checkCancellation, but with the descriptor of this operation
// as string.
AD_ALWAYS_INLINE void checkCancellation() const {
cancellationHandle_->throwIfCancelled(&Operation::getDescriptor, this);
}

std::chrono::milliseconds remainingTime() const;

/// Pointer to the cancellation handle of this operation.
SharedCancellationHandle cancellationHandle_ =
std::make_shared<SharedCancellationHandle::element_type>();

std::chrono::steady_clock::time_point deadline_ =
std::chrono::steady_clock::time_point::max();

// Get the mapping from variables to column indices. This mapping may only be
// used internally, because the actually visible variables might be different
// in case of a subquery.
Expand Down
Loading

0 comments on commit 766e4bb

Please sign in to comment.