Skip to content

Commit

Permalink
Fix Cypher projections under parallel runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
knutwalker committed Nov 20, 2023
1 parent 86e118a commit f90ee2f
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,11 @@ public long transactionId(KernelTransactionHandle kernelTransactionHandle) {
return kernelTransactionHandle.lastTransactionTimestampWhenStarted();
}

@Override
public long transactionId(KernelTransaction kernelTransaction) {
return kernelTransaction.lastTransactionTimestampWhenStarted();
}

@Override
public void reserveNeo4jIds(IdGeneratorFactory generatorFactory, int size, CursorContext cursorContext) {
IdGenerator idGenerator = generatorFactory.get(RecordIdType.NODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,11 @@ public long transactionId(KernelTransactionHandle kernelTransactionHandle) {
return kernelTransactionHandle.getTransactionSequenceNumber();
}

@Override
public long transactionId(KernelTransaction kernelTransaction) {
return kernelTransaction.getTransactionSequenceNumber();
}

@Override
public void reserveNeo4jIds(IdGeneratorFactory generatorFactory, int size, CursorContext cursorContext) {
IdGenerator idGenerator = generatorFactory.get(RecordIdType.NODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ UserFunctionSignature userFunctionSignature(

long transactionId(KernelTransactionHandle kernelTransactionHandle);

long transactionId(KernelTransaction kernelTransaction);

void reserveNeo4jIds(IdGeneratorFactory generatorFactory, int size, CursorContext cursorContext);

TransactionalContext newQueryContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ public static long transactionId(KernelTransactionHandle kernelTransactionHandle
return IMPL.transactionId(kernelTransactionHandle);
}

public static long transactionId(KernelTransaction kernelTransaction) {
return IMPL.transactionId(kernelTransaction);
}

public static void reserveNeo4jIds(IdGeneratorFactory generatorFactory, int size, CursorContext cursorContext) {
IMPL.reserveNeo4jIds(generatorFactory, size, cursorContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.gds.api.DatabaseId;
import org.neo4j.gds.compat.CompatUserAggregationFunction;
import org.neo4j.gds.compat.CompatUserAggregator;
import org.neo4j.gds.compat.GraphDatabaseApiProxy;
import org.neo4j.gds.compat.Neo4jProxy;
import org.neo4j.gds.core.Username;
import org.neo4j.gds.core.loading.Capabilities.WriteMode;
Expand All @@ -35,6 +36,7 @@
import org.neo4j.internal.kernel.api.procs.QualifiedName;
import org.neo4j.internal.kernel.api.procs.UserFunctionSignature;
import org.neo4j.kernel.api.procedure.Context;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.procedure.Name;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.TextValue;
Expand Down Expand Up @@ -108,7 +110,8 @@ public CompatUserAggregator create(Context ctx) throws ProcedureException {
var metricsFacade = Neo4jProxy.lookupComponentProvider(ctx, MetricsFacade.class, true);
var username = Neo4jProxy.lookupComponentProvider(ctx, Username.class, true);
var transaction = Neo4jProxy.lookupComponentProvider(ctx, Transaction.class, true);
var queryProvider = ExecutingQueryProvider.fromTransaction(transaction);
var ktxs = GraphDatabaseApiProxy.resolveDependency(databaseService, KernelTransactions.class);
var queryProvider = ExecutingQueryProvider.fromTransaction(ktxs, transaction);

var runsOnCompositeDatabase = Neo4jProxy.isCompositeDatabase(databaseService);
var writeMode = runsOnCompositeDatabase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.gds.api.DatabaseId;
import org.neo4j.gds.compat.CompatUserAggregationFunction;
import org.neo4j.gds.compat.CompatUserAggregator;
import org.neo4j.gds.compat.GraphDatabaseApiProxy;
import org.neo4j.gds.compat.Neo4jProxy;
import org.neo4j.gds.core.Username;
import org.neo4j.gds.core.loading.Capabilities.WriteMode;
Expand All @@ -35,6 +36,7 @@
import org.neo4j.internal.kernel.api.procs.QualifiedName;
import org.neo4j.internal.kernel.api.procs.UserFunctionSignature;
import org.neo4j.kernel.api.procedure.Context;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.procedure.Name;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.TextValue;
Expand Down Expand Up @@ -107,7 +109,8 @@ public CompatUserAggregator create(Context ctx) throws ProcedureException {
var metricsFacade = Neo4jProxy.lookupComponentProvider(ctx, MetricsFacade.class, true);
var username = Neo4jProxy.lookupComponentProvider(ctx, Username.class, true);
var transaction = Neo4jProxy.lookupComponentProvider(ctx, Transaction.class, true);
var queryProvider = ExecutingQueryProvider.fromTransaction(transaction);
var ktxs = GraphDatabaseApiProxy.resolveDependency(databaseService, KernelTransactions.class);
var queryProvider = ExecutingQueryProvider.fromTransaction(ktxs, transaction);

var runsOnCompositeDatabase = Neo4jProxy.isCompositeDatabase(databaseService);
var writeMode = runsOnCompositeDatabase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
*/
package org.neo4j.gds.projection;

import org.neo4j.gds.compat.Neo4jProxy;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;

import java.util.Optional;

interface ExecutingQueryProvider {
Optional<String> executingQuery();

static ExecutingQueryProvider fromTransaction(Transaction transaction) {
return new TxQuery(transaction);
static ExecutingQueryProvider fromTransaction(KernelTransactions ktxs, Transaction transaction) {
return new TxQuery(ktxs, transaction);
}

static ExecutingQueryProvider empty() {
Expand All @@ -39,9 +40,11 @@ static ExecutingQueryProvider empty() {


final class TxQuery implements ExecutingQueryProvider {
private final KernelTransactions ktxs;
private final Transaction transaction;

TxQuery(Transaction transaction) {
TxQuery(KernelTransactions ktxs, Transaction transaction) {
this.ktxs = ktxs;
this.transaction = transaction;
}

Expand All @@ -51,15 +54,13 @@ public Optional<String> executingQuery() {
return Optional.empty();
}

try (var statement = ((InternalTransaction) this.transaction).kernelTransaction().acquireStatement()) {
if (!(statement instanceof KernelStatement)) {
return Optional.empty();
}

return ((KernelStatement) statement).queryRegistry().executingQuery().flatMap(eq ->
eq.snapshot()
.obfuscatedQueryText()
.or(() -> Optional.ofNullable(eq.rawQueryText())));
}
var txId = Neo4jProxy.transactionId(((InternalTransaction) this.transaction).kernelTransaction());
return this.ktxs.activeTransactions().stream()
.filter(handle -> Neo4jProxy.transactionId(handle) == txId)
.flatMap(handle -> handle.executingQuery().stream())
.flatMap(eq -> eq.snapshot()
.obfuscatedQueryText()
.or(() -> Optional.ofNullable(eq.rawQueryText())).stream())
.findAny();
}
}

0 comments on commit f90ee2f

Please sign in to comment.