From 1120a9efcd527b71477bd528d55fef50b7cc33c8 Mon Sep 17 00:00:00 2001 From: Lasse Westh-Nielsen Date: Wed, 31 Jul 2024 16:52:32 +0200 Subject: [PATCH] create an operations procedure facade module and migrate listProgress in there --- .../machinery/RequestScopedDependencies.java | 25 ++- .../src/main/java/org/neo4j/gds/BaseProc.java | 17 -- proc/community/build.gradle | 1 + proc/machine-learning/build.gradle | 1 + proc/misc/build.gradle | 1 + .../java/org/neo4j/gds/ListProgressProc.java | 171 +----------------- .../java/org/neo4j/gds/ProcedureRunner.java | 5 +- procedures/facade/build.gradle | 4 +- .../GraphDataScienceProcedures.java | 11 ++ .../GraphDataScienceProceduresBuilder.java | 8 + .../GraphDataScienceExtensionBuilder.java | 1 + .../integration/GraphDataScienceProvider.java | 6 + .../GraphDataScienceProviderFactory.java | 3 + procedures/operations-facade/README.md | 5 + procedures/operations-facade/build.gradle | 26 +++ .../operations/JobProgressVisitor.java | 49 +++++ .../operations/OperationsProcedureFacade.java | 91 ++++++++++ .../procedures/operations/ProgressResult.java | 91 ++++++++++ .../operations}/StructuredOutputHelper.java | 6 +- .../StructuredOutputHelperTest.java | 3 +- settings.gradle | 3 + 21 files changed, 332 insertions(+), 196 deletions(-) create mode 100644 procedures/operations-facade/README.md create mode 100644 procedures/operations-facade/build.gradle create mode 100644 procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/JobProgressVisitor.java create mode 100644 procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/OperationsProcedureFacade.java create mode 100644 procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/ProgressResult.java rename {proc/misc/src/main/java/org/neo4j/gds => procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations}/StructuredOutputHelper.java (92%) rename {proc/misc/src/test/java/org/neo4j/gds => procedures/operations-facade/src/test/java/org/neo4j/gds/procedures/operations}/StructuredOutputHelperTest.java (98%) diff --git a/applications/algorithms/machinery/src/main/java/org/neo4j/gds/applications/algorithms/machinery/RequestScopedDependencies.java b/applications/algorithms/machinery/src/main/java/org/neo4j/gds/applications/algorithms/machinery/RequestScopedDependencies.java index 49728881610..253f0ff2742 100644 --- a/applications/algorithms/machinery/src/main/java/org/neo4j/gds/applications/algorithms/machinery/RequestScopedDependencies.java +++ b/applications/algorithms/machinery/src/main/java/org/neo4j/gds/applications/algorithms/machinery/RequestScopedDependencies.java @@ -23,6 +23,7 @@ import org.neo4j.gds.api.GraphLoaderContext; import org.neo4j.gds.api.User; import org.neo4j.gds.core.utils.progress.TaskRegistryFactory; +import org.neo4j.gds.core.utils.progress.TaskStore; import org.neo4j.gds.core.utils.warnings.UserLogRegistryFactory; import org.neo4j.gds.core.utils.warnings.UserLogStore; import org.neo4j.gds.termination.TerminationFlag; @@ -35,6 +36,7 @@ public final class RequestScopedDependencies { private final DatabaseId databaseId; private final GraphLoaderContext graphLoaderContext; private final TaskRegistryFactory taskRegistryFactory; + private final TaskStore taskStore; private final TerminationFlag terminationFlag; private final User user; private final UserLogRegistryFactory userLogRegistryFactory; @@ -49,6 +51,7 @@ private RequestScopedDependencies( DatabaseId databaseId, GraphLoaderContext graphLoaderContext, TaskRegistryFactory taskRegistryFactory, + TaskStore taskStore, TerminationFlag terminationFlag, User user, UserLogRegistryFactory userLogRegistryFactory, @@ -57,13 +60,15 @@ private RequestScopedDependencies( this.databaseId = databaseId; this.graphLoaderContext = graphLoaderContext; this.taskRegistryFactory = taskRegistryFactory; + this.taskStore = taskStore; this.terminationFlag = terminationFlag; this.user = user; this.userLogRegistryFactory = userLogRegistryFactory; this.userLogStore = userLogStore; } - public static RequestScopedDependenciesBuilder builder(){ - return new RequestScopedDependenciesBuilder(); + + public static RequestScopedDependenciesBuilder builder() { + return new RequestScopedDependenciesBuilder(); } public DatabaseId getDatabaseId() { @@ -79,6 +84,10 @@ public TaskRegistryFactory getTaskRegistryFactory() { return taskRegistryFactory; } + public TaskStore getTaskStore() { + return taskStore; + } + public TerminationFlag getTerminationFlag() { return terminationFlag; } @@ -96,7 +105,6 @@ public UserLogStore getUserLogStore() { } - /** * A handy builder where you can include as many or as few components as you are interested in. * We deliberately do not have defaults, @@ -107,6 +115,7 @@ public static class RequestScopedDependenciesBuilder { private GraphLoaderContext graphLoaderContext; private TerminationFlag terminationFlag; private TaskRegistryFactory taskRegistryFactory; + private TaskStore taskStore; private User user; private UserLogRegistryFactory userLogRegistryFactory; private UserLogStore userLogStore; @@ -121,13 +130,16 @@ public RequestScopedDependenciesBuilder with(GraphLoaderContext graphLoaderConte return this; } - - public RequestScopedDependenciesBuilder with(TaskRegistryFactory taskRegistryFactory) { this.taskRegistryFactory = taskRegistryFactory; return this; } + public RequestScopedDependenciesBuilder with(TaskStore taskStore) { + this.taskStore = taskStore; + return this; + } + public RequestScopedDependenciesBuilder with(TerminationFlag terminationFlag) { this.terminationFlag = terminationFlag; return this; @@ -148,13 +160,12 @@ public RequestScopedDependenciesBuilder with(UserLogStore userLogStore) { return this; } - - public RequestScopedDependencies build() { return new RequestScopedDependencies( databaseId, graphLoaderContext, taskRegistryFactory, + taskStore, terminationFlag, user, userLogRegistryFactory, diff --git a/proc/common/src/main/java/org/neo4j/gds/BaseProc.java b/proc/common/src/main/java/org/neo4j/gds/BaseProc.java index bd87626b6ab..f5f2545b60c 100644 --- a/proc/common/src/main/java/org/neo4j/gds/BaseProc.java +++ b/proc/common/src/main/java/org/neo4j/gds/BaseProc.java @@ -25,7 +25,6 @@ import org.neo4j.gds.config.BaseConfig; import org.neo4j.gds.core.CypherMapAccess; import org.neo4j.gds.core.Username; -import org.neo4j.gds.core.loading.GraphStoreCatalog; import org.neo4j.gds.core.loading.GraphStoreCatalogEntry; import org.neo4j.gds.core.utils.progress.TaskRegistryFactory; import org.neo4j.gds.core.utils.warnings.UserLogRegistryFactory; @@ -51,8 +50,6 @@ import java.util.function.Supplier; -import static org.neo4j.gds.utils.StringFormatting.formatWithLocale; - public abstract class BaseProc { public static final String ESTIMATE_DESCRIPTION = "Returns an estimation of the memory consumption for that procedure."; @@ -125,20 +122,6 @@ protected final void validateConfig(CypherMapAccess cypherConfig, BaseConfig con cypherConfig.requireOnlyKeysFrom(config.configKeys()); } - protected final void validateGraphNameAndEnsureItDoesNotExist(String username, String graphName) { - validateGraphName(graphName); - if (GraphStoreCatalog.exists(username, databaseId(), graphName)) { - throw new IllegalArgumentException(formatWithLocale( - "A graph with name '%s' already exists.", - graphName - )); - } - } - - private void validateGraphName(String graphName) { - CypherMapAccess.failOnBlank("graphName", graphName); - } - public ExecutionContext executionContext() { return databaseService == null ? ExecutionContext.EMPTY diff --git a/proc/community/build.gradle b/proc/community/build.gradle index a0c81a66d62..a299bc180c3 100644 --- a/proc/community/build.gradle +++ b/proc/community/build.gradle @@ -31,6 +31,7 @@ dependencies { implementation project(':neo4j-api') implementation project(':node-embedding-algorithms') implementation project(':opengds-procedure-facade') + implementation project(':operations-procedure-facade') implementation project(':progress-tracking') implementation project(':string-formatting') implementation project(':termination') diff --git a/proc/machine-learning/build.gradle b/proc/machine-learning/build.gradle index c0bbcfb9136..b17aa155ca1 100644 --- a/proc/machine-learning/build.gradle +++ b/proc/machine-learning/build.gradle @@ -45,6 +45,7 @@ dependencies { implementation project(':neo4j-api') implementation project(':node-embedding-algorithms') implementation project(':opengds-procedure-facade') + implementation project(':operations-procedure-facade') implementation project(':pipeline') implementation project(':proc-common') implementation project(':progress-tracking') diff --git a/proc/misc/build.gradle b/proc/misc/build.gradle index 23cff7f8a65..ebcbd50410f 100644 --- a/proc/misc/build.gradle +++ b/proc/misc/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation project(':logging') implementation project(':memory-usage') implementation project(':opengds-procedure-facade') + implementation project(':operations-procedure-facade') implementation project(':proc-common') implementation project(':proc-embeddings') implementation project(':progress-tracking') diff --git a/proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java b/proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java index 59804a7a9b8..f24d95fae91 100644 --- a/proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java +++ b/proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java @@ -19,189 +19,36 @@ */ package org.neo4j.gds; -import org.apache.commons.lang3.time.DurationFormatUtils; -import org.neo4j.gds.core.utils.ClockService; -import org.neo4j.gds.core.utils.progress.JobId; -import org.neo4j.gds.core.utils.progress.TaskStore; -import org.neo4j.gds.core.utils.progress.tasks.DepthAwareTaskVisitor; -import org.neo4j.gds.core.utils.progress.tasks.Task; -import org.neo4j.gds.core.utils.progress.tasks.TaskTraversal; +import org.neo4j.gds.procedures.GraphDataScienceProcedures; +import org.neo4j.gds.procedures.operations.ProgressResult; import org.neo4j.procedure.Context; import org.neo4j.procedure.Description; import org.neo4j.procedure.Internal; import org.neo4j.procedure.Name; import org.neo4j.procedure.Procedure; -import org.neo4j.values.storable.LocalTimeValue; -import java.time.Instant; -import java.time.LocalTime; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.neo4j.gds.utils.StringFormatting.formatWithLocale; - -public class ListProgressProc extends BaseProc { - - static final int PROGRESS_BAR_LENGTH = 10; +public class ListProgressProc { private static final String DESCRIPTION = "List progress events for currently running tasks."; @Context - public TaskStore taskStore; + public GraphDataScienceProcedures facade; @Internal @Deprecated(forRemoval = true) @Procedure(value = "gds.beta.listProgress", deprecatedBy = "gds.listProgress") @Description(DESCRIPTION) - public Stream betaListProgress( - @Name(value = "jobId", defaultValue = "") String jobId - ) { - executionContext() - .metricsFacade() - .deprecatedProcedures().called("gds.beta.listProgress"); - - executionContext() - .log() - .warn("Procedure `gds.beta.listProgress` has been deprecated, please use `gds.listProgress`."); + public Stream betaListProgress(@Name(value = "jobId", defaultValue = "") String jobId) { + facade.deprecatedProcedures().called("gds.beta.listProgress"); + facade.log().warn("Procedure `gds.beta.listProgress` has been deprecated, please use `gds.listProgress`."); return listProgress(jobId); } @Procedure("gds.listProgress") @Description(DESCRIPTION) - public Stream listProgress( - @Name(value = "jobId", defaultValue = "") String jobId - ) { - return jobId.isBlank() - ? jobsSummaryView() - : jobDetailView(jobId); - } - - private Stream jobsSummaryView() { - if (isGdsAdmin()) { - return taskStore.query().map(ProgressResult::fromTaskStoreEntry); - } else { - return taskStore.query(username()).map(ProgressResult::fromTaskStoreEntry); - } - } - - private Stream jobDetailView(String jobIdAsString) { - var jobId = new JobId(jobIdAsString); - - if (isGdsAdmin()) { - var progressResults = taskStore - .query(jobId) - .flatMap(ListProgressProc::jobProgress) - .collect(Collectors.toList()); - - if (progressResults.isEmpty()) { - throw new IllegalArgumentException(formatWithLocale( - "No task with job id `%s` was found.", - jobIdAsString - )); - } - - return progressResults.stream(); - } else { - return taskStore.query(username(), jobId).map(ListProgressProc::jobProgress).orElseThrow( - () -> new IllegalArgumentException(formatWithLocale( - "No task with job id `%s` was found.", - jobIdAsString - )) - ); - } - } - - private static Stream jobProgress(TaskStore.UserTask userTask) { - var jobProgressVisitor = new JobProgressVisitor(userTask.jobId(), userTask.username()); - TaskTraversal.visitPreOrderWithDepth(userTask.task(), jobProgressVisitor); - return jobProgressVisitor.progressRowsStream(); - } - - @SuppressWarnings("unused") - public static class ProgressResult { - public String username; - public String jobId; - public String taskName; - public String progress; - public String progressBar; - public String status; - public LocalTimeValue timeStarted; - public String elapsedTime; - - static ProgressResult fromTaskStoreEntry(String username, Map.Entry taskStoreEntry) { - var jobId = taskStoreEntry.getKey(); - var task = taskStoreEntry.getValue(); - return new ProgressResult(username, task, jobId, task.description()); - } - - static ProgressResult fromTaskStoreEntry(TaskStore.UserTask userTask) { - return new ProgressResult(userTask.username(), userTask.task(), userTask.jobId(), userTask.task().description()); - } - - static ProgressResult fromTaskWithDepth(String username, Task task, JobId jobId, int depth) { - var treeViewTaskName = StructuredOutputHelper.treeViewDescription(task.description(), depth); - return new ProgressResult(username, task, jobId, treeViewTaskName); - } - - public ProgressResult(String username, Task task, JobId jobId, String taskName) { - var progressContainer = task.getProgress(); - - this.jobId = jobId.asString(); - this.taskName = taskName; - this.username = username; - this.progress = StructuredOutputHelper.computeProgress(progressContainer); - this.progressBar = StructuredOutputHelper.progressBar(progressContainer, PROGRESS_BAR_LENGTH); - this.status = task.status().name(); - this.timeStarted = localTimeValue(task); - this.elapsedTime = prettyElapsedTime(task); - } - - private LocalTimeValue localTimeValue(Task task) { - if (task.hasNotStarted()) { - return null; - } - return LocalTimeValue.localTime(LocalTime.ofInstant( - Instant.ofEpochMilli(task.startTime()), - ZoneId.systemDefault() - )); - } - - private String prettyElapsedTime(Task task) { - if (task.hasNotStarted()) { - return "Not yet started"; - } - var finishTime = task.finishTime(); - var finishTimeOrNow = finishTime != -1 - ? finishTime - : ClockService.clock().millis(); - var elapsedTime = finishTimeOrNow - task.startTime(); - return DurationFormatUtils.formatDurationWords(elapsedTime, true, true); - } - } - - public static class JobProgressVisitor extends DepthAwareTaskVisitor { - - private final JobId jobId; - private final String username; - private final List progressRows; - - JobProgressVisitor(JobId jobId, String username) { - this.jobId = jobId; - this.username = username; - this.progressRows = new ArrayList<>(); - } - - Stream progressRowsStream() { - return this.progressRows.stream(); - } - - @Override - public void visit(Task task) { - progressRows.add(ProgressResult.fromTaskWithDepth(username, task, jobId, depth())); - } + public Stream listProgress(@Name(value = "jobId", defaultValue = "") String jobId) { + return facade.operations().listProgress(jobId); } } diff --git a/proc/test/src/main/java/org/neo4j/gds/ProcedureRunner.java b/proc/test/src/main/java/org/neo4j/gds/ProcedureRunner.java index ab4dd8f22a4..0734c3dab5f 100644 --- a/proc/test/src/main/java/org/neo4j/gds/ProcedureRunner.java +++ b/proc/test/src/main/java/org/neo4j/gds/ProcedureRunner.java @@ -31,6 +31,7 @@ import org.neo4j.gds.core.Username; import org.neo4j.gds.core.loading.GraphStoreCatalogService; import org.neo4j.gds.core.model.OpenModelCatalog; +import org.neo4j.gds.core.utils.progress.EmptyTaskStore; import org.neo4j.gds.core.utils.progress.TaskRegistryFactory; import org.neo4j.gds.core.utils.warnings.EmptyUserLogRegistryFactory; import org.neo4j.gds.core.utils.warnings.EmptyUserLogStore; @@ -152,13 +153,13 @@ private static GraphDataScienceProcedures createGraphDataScienceProcedures( ) { var gdsLog = new LogAdapter(log); - var procedureContext = WriteContext.builder() - .build(); + var procedureContext = WriteContext.builder().build(); var requestScopedDependencies = RequestScopedDependencies.builder() .with(new DatabaseIdAccessor().getDatabaseId(graphDatabaseService)) .with(GraphLoaderContext.NULL_CONTEXT) .with(taskRegistryFactory) + .with(EmptyTaskStore.INSTANCE) .with(new User(username.username(), false)) .with(EmptyUserLogRegistryFactory.INSTANCE) .with(EmptyUserLogStore.INSTANCE) diff --git a/procedures/facade/build.gradle b/procedures/facade/build.gradle index 9bfd924adba..369ed085a1c 100644 --- a/procedures/facade/build.gradle +++ b/procedures/facade/build.gradle @@ -5,9 +5,6 @@ description = 'Neo4j Graph Data Science :: OpenGDS Procedure Facade' group = 'org.neo4j.gds' dependencies { - - - // the necessary Neo4j things neodeps().each { compileOnly(group: 'org.neo4j', name: it, version: ver.'neo4j') { transitive = false @@ -45,6 +42,7 @@ dependencies { implementation project(':native-projection') implementation project(':neo4j-api') implementation project(':node-embedding-algorithms') + implementation project(':operations-procedure-facade') implementation project(':path-finding-algorithms') implementation project(':pipeline') implementation project(':progress-tracking') diff --git a/procedures/facade/src/main/java/org/neo4j/gds/procedures/GraphDataScienceProcedures.java b/procedures/facade/src/main/java/org/neo4j/gds/procedures/GraphDataScienceProcedures.java index 78089bd8ac0..70255a901e8 100644 --- a/procedures/facade/src/main/java/org/neo4j/gds/procedures/GraphDataScienceProcedures.java +++ b/procedures/facade/src/main/java/org/neo4j/gds/procedures/GraphDataScienceProcedures.java @@ -40,6 +40,7 @@ import org.neo4j.gds.procedures.algorithms.configuration.ConfigurationCreator; import org.neo4j.gds.procedures.algorithms.configuration.ConfigurationParser; import org.neo4j.gds.procedures.catalog.CatalogProcedureFacade; +import org.neo4j.gds.procedures.operations.OperationsProcedureFacade; import org.neo4j.gds.procedures.pipelines.PipelinesProcedureFacade; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Transaction; @@ -53,6 +54,7 @@ public class GraphDataScienceProcedures { private final AlgorithmsProcedureFacade algorithmsProcedureFacade; private final CatalogProcedureFacade catalogProcedureFacade; + private final OperationsProcedureFacade operationsProcedureFacade; private final PipelinesProcedureFacade pipelinesProcedureFacade; private final DeprecatedProceduresMetricService deprecatedProceduresMetricService; @@ -64,12 +66,14 @@ public class GraphDataScienceProcedures { Log log, AlgorithmsProcedureFacade algorithmsProcedureFacade, CatalogProcedureFacade catalogProcedureFacade, + OperationsProcedureFacade operationsProcedureFacade, PipelinesProcedureFacade pipelinesProcedureFacade, DeprecatedProceduresMetricService deprecatedProceduresMetricService ) { this.log = log; this.algorithmsProcedureFacade = algorithmsProcedureFacade; this.catalogProcedureFacade = catalogProcedureFacade; + this.operationsProcedureFacade = operationsProcedureFacade; this.pipelinesProcedureFacade = pipelinesProcedureFacade; this.deprecatedProceduresMetricService = deprecatedProceduresMetricService; } @@ -141,6 +145,8 @@ public static GraphDataScienceProcedures create( var pathFindingProcedureFacade = algorithmProcedureFacadeBuilder.createPathFindingProcedureFacade(); var similarityProcedureFacade = algorithmProcedureFacadeBuilder.createSimilarityProcedureFacade(); + var operationsProcedureFacade = new OperationsProcedureFacade(requestScopedDependencies); + var pipelinesProcedureFacade = new PipelinesProcedureFacade(requestScopedDependencies.getUser()); return new GraphDataScienceProceduresBuilder(log) @@ -149,6 +155,7 @@ public static GraphDataScienceProcedures create( .with(communityProcedureFacade) .with(miscellaneousProcedureFacade) .with(nodeEmbeddingsProcedureFacade) + .with(operationsProcedureFacade) .with(pathFindingProcedureFacade) .with(pipelinesProcedureFacade) .with(similarityProcedureFacade) @@ -168,6 +175,10 @@ public CatalogProcedureFacade catalog() { return catalogProcedureFacade; } + public OperationsProcedureFacade operations() { + return operationsProcedureFacade; + } + public PipelinesProcedureFacade pipelines() { return pipelinesProcedureFacade; } diff --git a/procedures/facade/src/main/java/org/neo4j/gds/procedures/GraphDataScienceProceduresBuilder.java b/procedures/facade/src/main/java/org/neo4j/gds/procedures/GraphDataScienceProceduresBuilder.java index 08c425537e2..8464a1f85b2 100644 --- a/procedures/facade/src/main/java/org/neo4j/gds/procedures/GraphDataScienceProceduresBuilder.java +++ b/procedures/facade/src/main/java/org/neo4j/gds/procedures/GraphDataScienceProceduresBuilder.java @@ -29,6 +29,7 @@ import org.neo4j.gds.procedures.algorithms.pathfinding.PathFindingProcedureFacade; import org.neo4j.gds.procedures.algorithms.similarity.SimilarityProcedureFacade; import org.neo4j.gds.procedures.catalog.CatalogProcedureFacade; +import org.neo4j.gds.procedures.operations.OperationsProcedureFacade; import org.neo4j.gds.procedures.pipelines.PipelinesProcedureFacade; /** @@ -44,6 +45,7 @@ public class GraphDataScienceProceduresBuilder { private CommunityProcedureFacade communityProcedureFacade; private MiscellaneousProcedureFacade miscellaneousProcedureFacade; private NodeEmbeddingsProcedureFacade nodeEmbeddingsProcedureFacade; + private OperationsProcedureFacade operationsProcedureFacade; private PathFindingProcedureFacade pathFindingProcedureFacade; private PipelinesProcedureFacade pipelinesProcedureFacade; private SimilarityProcedureFacade similarityProcedureFacade; @@ -78,6 +80,11 @@ public GraphDataScienceProceduresBuilder with(NodeEmbeddingsProcedureFacade node return this; } + public GraphDataScienceProceduresBuilder with(OperationsProcedureFacade operationsProcedureFacade) { + this.operationsProcedureFacade = operationsProcedureFacade; + return this; + } + public GraphDataScienceProceduresBuilder with(PathFindingProcedureFacade pathFindingProcedureFacade) { this.pathFindingProcedureFacade = pathFindingProcedureFacade; return this; @@ -112,6 +119,7 @@ public GraphDataScienceProcedures build() { log, algorithmsProcedureFacade, catalogProcedureFacade, + operationsProcedureFacade, pipelinesProcedureFacade, deprecatedProceduresMetricService ); diff --git a/procedures/integration/src/main/java/org/neo4j/gds/procedures/integration/GraphDataScienceExtensionBuilder.java b/procedures/integration/src/main/java/org/neo4j/gds/procedures/integration/GraphDataScienceExtensionBuilder.java index c388b799dfc..06aa51122ee 100644 --- a/procedures/integration/src/main/java/org/neo4j/gds/procedures/integration/GraphDataScienceExtensionBuilder.java +++ b/procedures/integration/src/main/java/org/neo4j/gds/procedures/integration/GraphDataScienceExtensionBuilder.java @@ -197,6 +197,7 @@ public Lifecycle build() { private void registerGraphDataScienceComponent() { var graphDataScienceProvider = graphDataScienceProviderFactory.createGraphDataScienceProvider( taskRegistryFactoryService, + taskStoreService, useMaxMemoryEstimation, userLogServices ); diff --git a/procedures/integration/src/main/java/org/neo4j/gds/procedures/integration/GraphDataScienceProvider.java b/procedures/integration/src/main/java/org/neo4j/gds/procedures/integration/GraphDataScienceProvider.java index 8d34e4c22bd..50ca7e7621d 100644 --- a/procedures/integration/src/main/java/org/neo4j/gds/procedures/integration/GraphDataScienceProvider.java +++ b/procedures/integration/src/main/java/org/neo4j/gds/procedures/integration/GraphDataScienceProvider.java @@ -31,6 +31,7 @@ import org.neo4j.gds.configuration.LimitsConfiguration; import org.neo4j.gds.core.loading.GraphStoreCatalogService; import org.neo4j.gds.core.model.ModelCatalog; +import org.neo4j.gds.core.utils.progress.TaskStoreService; import org.neo4j.gds.core.write.ExporterContext; import org.neo4j.gds.logging.Log; import org.neo4j.gds.metrics.algorithms.AlgorithmMetricsService; @@ -80,6 +81,7 @@ public class GraphDataScienceProvider implements ThrowingFunction. + */ +package org.neo4j.gds.procedures.operations; + +import org.neo4j.gds.core.utils.progress.JobId; +import org.neo4j.gds.core.utils.progress.tasks.DepthAwareTaskVisitor; +import org.neo4j.gds.core.utils.progress.tasks.Task; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +public class JobProgressVisitor extends DepthAwareTaskVisitor { + private final JobId jobId; + private final String username; + private final List progressRows; + + JobProgressVisitor(JobId jobId, String username) { + this.jobId = jobId; + this.username = username; + this.progressRows = new ArrayList<>(); + } + + Stream progressRowsStream() { + return progressRows.stream(); + } + + @Override + public void visit(Task task) { + progressRows.add(ProgressResult.fromTaskWithDepth(username, task, jobId, depth())); + } +} diff --git a/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/OperationsProcedureFacade.java b/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/OperationsProcedureFacade.java new file mode 100644 index 00000000000..f528734369e --- /dev/null +++ b/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/OperationsProcedureFacade.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.gds.procedures.operations; + +import org.neo4j.gds.applications.algorithms.machinery.RequestScopedDependencies; +import org.neo4j.gds.core.utils.progress.JobId; +import org.neo4j.gds.core.utils.progress.TaskStore; +import org.neo4j.gds.core.utils.progress.tasks.TaskTraversal; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.neo4j.gds.utils.StringFormatting.formatWithLocale; + +public class OperationsProcedureFacade { + private final RequestScopedDependencies requestScopedDependencies; + + public OperationsProcedureFacade(RequestScopedDependencies requestScopedDependencies) { + this.requestScopedDependencies = requestScopedDependencies; + } + + public Stream listProgress(String jobId) { + return jobId.isBlank() + ? jobsSummaryView() + : jobDetailView(jobId); + } + + private Stream jobsSummaryView() { + var taskStore = requestScopedDependencies.getTaskStore(); + var user = requestScopedDependencies.getUser(); + + if (user.isAdmin()) { + return taskStore.query().map(ProgressResult::fromTaskStoreEntry); + } else { + return taskStore.query(user.getUsername()).map(ProgressResult::fromTaskStoreEntry); + } + } + + private Stream jobDetailView(String jobIdAsString) { + var jobId = new JobId(jobIdAsString); + + var taskStore = requestScopedDependencies.getTaskStore(); + var user = requestScopedDependencies.getUser(); + + if (user.isAdmin()) { + var progressResults = taskStore + .query(jobId) + .flatMap(this::jobProgress) + .collect(Collectors.toList()); + + if (progressResults.isEmpty()) { + throw new IllegalArgumentException(formatWithLocale( + "No task with job id `%s` was found.", + jobIdAsString + )); + } + + return progressResults.stream(); + } else { + return taskStore.query(user.getUsername(), jobId).map(this::jobProgress).orElseThrow( + () -> new IllegalArgumentException(formatWithLocale( + "No task with job id `%s` was found.", + jobIdAsString + )) + ); + } + } + + private Stream jobProgress(TaskStore.UserTask userTask) { + var jobProgressVisitor = new JobProgressVisitor(userTask.jobId(), userTask.username()); + TaskTraversal.visitPreOrderWithDepth(userTask.task(), jobProgressVisitor); + return jobProgressVisitor.progressRowsStream(); + } +} diff --git a/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/ProgressResult.java b/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/ProgressResult.java new file mode 100644 index 00000000000..d7e36a32443 --- /dev/null +++ b/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/ProgressResult.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.gds.procedures.operations; + +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.neo4j.gds.core.utils.ClockService; +import org.neo4j.gds.core.utils.progress.JobId; +import org.neo4j.gds.core.utils.progress.TaskStore; +import org.neo4j.gds.core.utils.progress.tasks.Task; +import org.neo4j.values.storable.LocalTimeValue; + +import java.time.Instant; +import java.time.LocalTime; +import java.time.ZoneId; + +public class ProgressResult { + public String username; + public String jobId; + public String taskName; + public String progress; + public String progressBar; + public String status; + public LocalTimeValue timeStarted; + public String elapsedTime; + + static ProgressResult fromTaskStoreEntry(TaskStore.UserTask userTask) { + return new ProgressResult( + userTask.username(), + userTask.task(), + userTask.jobId(), + userTask.task().description() + ); + } + + static ProgressResult fromTaskWithDepth(String username, Task task, JobId jobId, int depth) { + var treeViewTaskName = StructuredOutputHelper.treeViewDescription(task.description(), depth); + return new ProgressResult(username, task, jobId, treeViewTaskName); + } + + public ProgressResult(String username, Task task, JobId jobId, String taskName) { + var progressContainer = task.getProgress(); + + this.jobId = jobId.asString(); + this.taskName = taskName; + this.username = username; + this.progress = StructuredOutputHelper.computeProgress(progressContainer); + this.progressBar = StructuredOutputHelper.progressBar(progressContainer, 10); + this.status = task.status().name(); + this.timeStarted = localTimeValue(task); + this.elapsedTime = prettyElapsedTime(task); + } + + private LocalTimeValue localTimeValue(Task task) { + if (task.hasNotStarted()) { + return null; + } + return LocalTimeValue.localTime(LocalTime.ofInstant( + Instant.ofEpochMilli(task.startTime()), + ZoneId.systemDefault() + )); + } + + private String prettyElapsedTime(Task task) { + if (task.hasNotStarted()) { + return "Not yet started"; + } + var finishTime = task.finishTime(); + var finishTimeOrNow = finishTime != -1 + ? finishTime + : ClockService.clock().millis(); + var elapsedTime = finishTimeOrNow - task.startTime(); + return DurationFormatUtils.formatDurationWords(elapsedTime, true, true); + } +} diff --git a/proc/misc/src/main/java/org/neo4j/gds/StructuredOutputHelper.java b/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/StructuredOutputHelper.java similarity index 92% rename from proc/misc/src/main/java/org/neo4j/gds/StructuredOutputHelper.java rename to procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/StructuredOutputHelper.java index 9a9cfbe9ca8..349874f3e2e 100644 --- a/proc/misc/src/main/java/org/neo4j/gds/StructuredOutputHelper.java +++ b/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/StructuredOutputHelper.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -package org.neo4j.gds; +package org.neo4j.gds.procedures.operations; import org.neo4j.gds.core.utils.progress.tasks.Progress; import org.neo4j.gds.core.utils.progress.tasks.Task; @@ -40,7 +40,7 @@ private StructuredOutputHelper() {} /** * Produce a progress bar string in the format of: [######~~~~] */ - static String progressBar(Progress progress, int progressBarLength) { + public static String progressBar(Progress progress, int progressBarLength) { if (progress.volume() == Task.UNKNOWN_VOLUME) { return formatWithLocale("[~~~~%s~~~]", UNKNOWN); } @@ -66,7 +66,7 @@ public static String computeProgress(Progress progress) { return decimalFormat.format(progressPercentage); } - static String treeViewDescription(String description, int depth) { + public static String treeViewDescription(String description, int depth) { return TASK_LEVEL_INDENTATION.repeat(depth) + TASK_BRANCH_TOKEN + description; diff --git a/proc/misc/src/test/java/org/neo4j/gds/StructuredOutputHelperTest.java b/procedures/operations-facade/src/test/java/org/neo4j/gds/procedures/operations/StructuredOutputHelperTest.java similarity index 98% rename from proc/misc/src/test/java/org/neo4j/gds/StructuredOutputHelperTest.java rename to procedures/operations-facade/src/test/java/org/neo4j/gds/procedures/operations/StructuredOutputHelperTest.java index 53012083d26..50017b74b75 100644 --- a/proc/misc/src/test/java/org/neo4j/gds/StructuredOutputHelperTest.java +++ b/procedures/operations-facade/src/test/java/org/neo4j/gds/procedures/operations/StructuredOutputHelperTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -package org.neo4j.gds; +package org.neo4j.gds.procedures.operations; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -27,7 +27,6 @@ import static org.assertj.core.api.Assertions.assertThat; class StructuredOutputHelperTest { - @ParameterizedTest @CsvSource(value = { "4, 10, 10, [####~~~~~~]", diff --git a/settings.gradle b/settings.gradle index fc7a2f38e39..687836255d9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -266,6 +266,9 @@ project(':opengds-procedure-facade').projectDir = file('procedures/facade') include('algorithms-procedure-facade') project(':algorithms-procedure-facade').projectDir = file('procedures/algorithms-facade') +include('operations-procedure-facade') +project(':operations-procedure-facade').projectDir = file('procedures/operations-facade') + include('progress-tracking') project(':progress-tracking').projectDir = file('progress-tracking')