Skip to content

Commit

Permalink
Merge branch 'magic-service-query-text-search' of github.com:Flixtast…
Browse files Browse the repository at this point in the history
…ic/qlever into magic-service-query-text-search
  • Loading branch information
Flixtastic committed Jan 28, 2025
2 parents 50c846f + cee48c6 commit 2c65d22
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 112 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ jobs:
submodules: 'recursive'
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
with:
# As of Jan, 28, 2025 the default value here (`binfmt:latest`)
# downloads a QEMU version that leads to segfaults in the compiler.
# We therefore fix a working version
# TODO<joka921> GitHub actions now has ARM runners,
# avoid cross-compilation completely
image : 'tonistiigi/binfmt:desktop-v8.1.5-44'
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
Expand Down Expand Up @@ -56,7 +63,7 @@ jobs:
tags: adfreiburg/qlever:test

- name: E2E in Docker
run: |
run: |
sudo mkdir ${{github.workspace}}/e2e_data
sudo chmod a+rwx ${{github.workspace}}/e2e_data
sudo docker run -i --rm -v "${{github.workspace}}/e2e_data:/app/e2e_data/" --entrypoint e2e/e2e.sh adfreiburg/qlever:test
Expand Down
141 changes: 81 additions & 60 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include "util/MemorySize/MemorySize.h"
#include "util/OnDestructionDontThrowDuringStackUnwinding.h"
#include "util/ParseableDuration.h"
#include "util/TypeIdentity.h"
#include "util/TypeTraits.h"
#include "util/http/HttpUtils.h"
#include "util/http/websocket/MessageSender.h"

using namespace std::string_literals;
using namespace ad_utility::url_parser::sparqlOperation;

template <typename T>
using Awaitable = Server::Awaitable<T>;
Expand Down Expand Up @@ -169,28 +172,47 @@ void Server::run(const string& indexBaseName, bool useText, bool usePatterns,
// _____________________________________________________________________________
ad_utility::url_parser::ParsedRequest Server::parseHttpRequest(
const ad_utility::httpUtils::HttpRequest auto& request) {
using namespace ad_utility::use_type_identity;
// For an HTTP request, `request.target()` yields the HTTP Request-URI.
// This is a concatenation of the URL path and the query strings.
using namespace ad_utility::url_parser::sparqlOperation;
auto parsedUrl = ad_utility::url_parser::parseRequestTarget(request.target());
ad_utility::url_parser::ParsedRequest parsedRequest{
std::move(parsedUrl.path_), std::move(parsedUrl.parameters_), None{}};

// Some valid requests (e.g. QLever's custom commands like retrieving index
// statistics) don't have a query. So an empty operation is not necessarily an
// error.
auto setOperationIfSpecifiedInParams =
[&parsedRequest]<typename Operation>(string_view paramName) {
auto operation = ad_utility::url_parser::getParameterCheckAtMostOnce(
parsedRequest.parameters_, paramName);
if (operation.has_value()) {
parsedRequest.operation_ = Operation{operation.value()};
parsedRequest.parameters_.erase(paramName);
}
};
auto setOperationIfSpecifiedInParams = [&parsedRequest]<typename Operation>(
TI<Operation>,
string_view paramName) {
auto operation = ad_utility::url_parser::getParameterCheckAtMostOnce(
parsedRequest.parameters_, paramName);
if (operation.has_value()) {
parsedRequest.operation_ = Operation{operation.value(), {}};
parsedRequest.parameters_.erase(paramName);
}
};
auto addToDatasetClausesIfOperationIs = [&parsedRequest]<typename Operation>(
TI<Operation>,
const std::string& key,
bool isNamed) {
if (Operation* op = std::get_if<Operation>(&parsedRequest.operation_)) {
ad_utility::appendVector(op->datasetClauses_,
ad_utility::url_parser::parseDatasetClausesFrom(
parsedRequest.parameters_, key, isNamed));
}
};
auto addDatasetClauses = [&addToDatasetClausesIfOperationIs] {
addToDatasetClausesIfOperationIs(ti<Query>, "default-graph-uri", false);
addToDatasetClausesIfOperationIs(ti<Query>, "named-graph-uri", true);
addToDatasetClausesIfOperationIs(ti<Update>, "using-graph-uri", false);
addToDatasetClausesIfOperationIs(ti<Update>, "using-named-graph-uri", true);
};

if (request.method() == http::verb::get) {
setOperationIfSpecifiedInParams.template operator()<Query>("query");
setOperationIfSpecifiedInParams(ti<Query>, "query");
addDatasetClauses();

if (parsedRequest.parameters_.contains("update")) {
throw std::runtime_error("SPARQL Update is not allowed as GET request.");
}
Expand Down Expand Up @@ -258,17 +280,19 @@ ad_utility::url_parser::ParsedRequest Server::parseHttpRequest(
throw std::runtime_error(
R"(Request must only contain one of "query" and "update".)");
}
setOperationIfSpecifiedInParams.template operator()<Query>("query");
setOperationIfSpecifiedInParams.template operator()<Update>("update");

setOperationIfSpecifiedInParams(ti<Query>, "query");
setOperationIfSpecifiedInParams(ti<Update>, "update");
addDatasetClauses();
return parsedRequest;
}
if (contentType.starts_with(contentTypeSparqlQuery)) {
parsedRequest.operation_ = Query{request.body()};
parsedRequest.operation_ = Query{request.body(), {}};
addDatasetClauses();
return parsedRequest;
}
if (contentType.starts_with(contentTypeSparqlUpdate)) {
parsedRequest.operation_ = Update{request.body()};
parsedRequest.operation_ = Update{request.body(), {}};
addDatasetClauses();
return parsedRequest;
}
throw std::runtime_error(absl::StrCat(
Expand Down Expand Up @@ -337,16 +361,6 @@ Awaitable<void> Server::process(
const auto parsedHttpRequest = parseHttpRequest(request);
const auto& parameters = parsedHttpRequest.parameters_;

auto checkParameterNotPresent = [&parameters](
const std::string& parameterName) {
if (parameters.contains(parameterName)) {
throw NotSupportedException(absl::StrCat(
parameterName, " parameter is currently not supported by QLever."));
}
};
checkParameterNotPresent("default-graph-uri");
checkParameterNotPresent("named-graph-uri");

// We always want to call `Server::checkParameter` with the same first
// parameter.
auto checkParameter = std::bind_front(&ad_utility::url_parser::checkParameter,
Expand Down Expand Up @@ -476,14 +490,13 @@ Awaitable<void> Server::process(

auto visitQuery = [&checkParameter, &accessTokenOk, &request, &send,
&parameters, &requestTimer,
this](ad_utility::url_parser::sparqlOperation::Query query)
-> Awaitable<void> {
this](Query query) -> Awaitable<void> {
if (auto timeLimit = co_await verifyUserSubmittedQueryTimeout(
checkParameter("timeout", std::nullopt), accessTokenOk, request,
send)) {
co_return co_await processQueryOrUpdate<OperationType::Query>(
parameters, query.query_, requestTimer, std::move(request), send,
timeLimit.value());
co_return co_await processQueryOrUpdate(parameters, query, requestTimer,
std::move(request), send,
timeLimit.value());
} else {
// If the optional is empty, this indicates an error response has been
// sent to the client already. We can stop here.
Expand All @@ -492,25 +505,22 @@ Awaitable<void> Server::process(
};
auto visitUpdate =
[&checkParameter, &accessTokenOk, &request, &send, &parameters,
&requestTimer, this, &requireValidAccessToken](
const ad_utility::url_parser::sparqlOperation::Update& update)
-> Awaitable<void> {
&requestTimer, this,
&requireValidAccessToken](const Update& update) -> Awaitable<void> {
requireValidAccessToken("SPARQL Update");
if (auto timeLimit = co_await verifyUserSubmittedQueryTimeout(
checkParameter("timeout", std::nullopt), accessTokenOk, request,
send)) {
co_return co_await processQueryOrUpdate<OperationType::Update>(
parameters, update.update_, requestTimer, std::move(request), send,
timeLimit.value());
co_return co_await processQueryOrUpdate(parameters, update, requestTimer,
std::move(request), send,
timeLimit.value());
} else {
// If the optional is empty, this indicates an error response has been
// sent to the client already. We can stop here.
co_return;
}
};
auto visitNone =
[&response, &send, &request](
ad_utility::url_parser::sparqlOperation::None) -> Awaitable<void> {
auto visitNone = [&response, &send, &request](None) -> Awaitable<void> {
// If there was no "query", but any of the URL parameters processed before
// produced a `response`, send that now. Note that if multiple URL
// parameters were processed, only the `response` from the last one is sent.
Expand Down Expand Up @@ -549,11 +559,10 @@ std::pair<bool, bool> Server::determineResultPinning(

// ____________________________________________________________________________
Server::PlannedQuery Server::setupPlannedQuery(
const ad_utility::url_parser::ParamValueMap& params,
const std::vector<DatasetClause>& queryDatasets,
const std::string& operation, QueryExecutionContext& qec,
SharedCancellationHandle handle, TimeLimit timeLimit,
const ad_utility::Timer& requestTimer) const {
auto queryDatasets = ad_utility::url_parser::parseDatasetClauses(params);
PlannedQuery plannedQuery =
parseAndPlan(operation, queryDatasets, qec, handle, timeLimit);
auto& qet = plannedQuery.queryExecutionTree_;
Expand Down Expand Up @@ -784,7 +793,7 @@ ad_utility::websocket::MessageSender Server::createMessageSender(

// ____________________________________________________________________________
Awaitable<void> Server::processQuery(
const ad_utility::url_parser::ParamValueMap& params, const string& query,
const ad_utility::url_parser::ParamValueMap& params, const Query& query,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
Expand All @@ -793,7 +802,7 @@ Awaitable<void> Server::processQuery(
<< ad_utility::toString(mediaType) << "\"" << std::endl;

ad_utility::websocket::MessageSender messageSender =
createMessageSender(queryHub_, request, query);
createMessageSender(queryHub_, request, query.query_);
auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

Expand All @@ -803,7 +812,7 @@ Awaitable<void> Server::processQuery(
LOG(INFO) << "Processing the following SPARQL query:"
<< (pinResult ? " [pin result]" : "")
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< query << std::endl;
<< query.query_ << std::endl;
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
Expand All @@ -817,10 +826,10 @@ Awaitable<void> Server::processQuery(
// an explicit variable instead of directly `co_await`-ing it.
auto coroutine = computeInNewThread(
queryThreadPool_,
[this, &params, &query, &qec, cancellationHandle, &timeLimit,
[this, &query, &qec, cancellationHandle, &timeLimit,
&requestTimer]() -> std::optional<PlannedQuery> {
return setupPlannedQuery(params, query, qec, cancellationHandle,
timeLimit, requestTimer);
return setupPlannedQuery(query.datasetClauses_, query.query_, qec,
cancellationHandle, timeLimit, requestTimer);
},
cancellationHandle);
auto plannedQueryOpt = co_await std::move(coroutine);
Expand Down Expand Up @@ -938,20 +947,21 @@ json Server::createResponseMetadataForUpdate(
}
// ____________________________________________________________________________
json Server::processUpdateImpl(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
const ad_utility::url_parser::ParamValueMap& params, const Update& update,
ad_utility::Timer& requestTimer, TimeLimit timeLimit, auto& messageSender,
ad_utility::SharedCancellationHandle cancellationHandle,
DeltaTriples& deltaTriples) {
auto [pinSubtrees, pinResult] = determineResultPinning(params);
LOG(INFO) << "Processing the following SPARQL update:"
<< (pinResult ? " [pin result]" : "")
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< update << std::endl;
<< update.update_ << std::endl;
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
auto plannedQuery = setupPlannedQuery(params, update, qec, cancellationHandle,
timeLimit, requestTimer);
auto plannedQuery =
setupPlannedQuery(update.datasetClauses_, update.update_, qec,
cancellationHandle, timeLimit, requestTimer);
auto qet = plannedQuery.queryExecutionTree_;

if (!plannedQuery.parsedQuery_.hasUpdateClause()) {
Expand Down Expand Up @@ -984,11 +994,11 @@ json Server::processUpdateImpl(

// ____________________________________________________________________________
Awaitable<void> Server::processUpdate(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
const ad_utility::url_parser::ParamValueMap& params, const Update& update,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
auto messageSender = createMessageSender(queryHub_, request, update);
auto messageSender = createMessageSender(queryHub_, request, update.update_);

auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);
Expand Down Expand Up @@ -1021,14 +1031,16 @@ Awaitable<void> Server::processUpdate(
}

// ____________________________________________________________________________
template <Server::OperationType type>
template <typename Operation>
Awaitable<void> Server::processQueryOrUpdate(
const ad_utility::url_parser::ParamValueMap& params,
const string& queryOrUpdate, ad_utility::Timer& requestTimer,
const Operation& operation, ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
using namespace ad_utility::httpUtils;

static_assert(ad_utility::SameAsAny<Operation, Query, Update>);

http::status responseStatus = http::status::ok;

// Put the whole query processing in a try-catch block. If any exception
Expand All @@ -1041,11 +1053,12 @@ Awaitable<void> Server::processQueryOrUpdate(
// access to the runtimeInformation in the case of an error.
std::optional<PlannedQuery> plannedQuery;
try {
if constexpr (type == OperationType::Query) {
co_await processQuery(params, queryOrUpdate, requestTimer, request, send,
if constexpr (std::is_same_v<Operation, Query>) {
co_await processQuery(params, operation, requestTimer, request, send,
timeLimit);
} else {
co_await processUpdate(params, queryOrUpdate, requestTimer, request, send,
static_assert(std::is_same_v<Operation, Update>);
co_await processUpdate(params, operation, requestTimer, request, send,
timeLimit);
}
} catch (const ParseException& e) {
Expand Down Expand Up @@ -1086,8 +1099,16 @@ Awaitable<void> Server::processQueryOrUpdate(
LOG(ERROR) << metadata.value().query_ << std::endl;
}
}
const std::string& operationStr = [&operation]() -> const std::string& {
if constexpr (std::is_same_v<Operation, Query>) {
return operation.query_;
} else {
static_assert(std::is_same_v<Operation, Update>);
return operation.update_;
}
}();
auto errorResponseJson = composeErrorResponseJson(
queryOrUpdate, exceptionErrorMsg.value(), requestTimer, metadata);
operationStr, exceptionErrorMsg.value(), requestTimer, metadata);
if (plannedQuery.has_value()) {
errorResponseJson["runtimeInformation"] =
nlohmann::ordered_json(plannedQuery.value()
Expand Down
Loading

0 comments on commit 2c65d22

Please sign in to comment.