From 67641328a8dc99febbf1c0819eca6a93c3c38541 Mon Sep 17 00:00:00 2001 From: Lasse Westh-Nielsen Date: Tue, 13 Aug 2024 10:14:34 +0200 Subject: [PATCH] migrate list progress --- .../java/org/neo4j/gds/ListProgressProc.java | 171 +----------------- .../operations/DefaultResultRenderer.java | 2 +- 2 files changed, 10 insertions(+), 163 deletions(-) diff --git a/proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java b/proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java index 7505742a44..f24d95fae9 100644 --- a/proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java +++ b/proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java @@ -19,189 +19,36 @@ */ package org.neo4j.gds; -import org.apache.commons.lang3.time.DurationFormatUtils; -import org.neo4j.gds.core.utils.ClockService; -import org.neo4j.gds.core.utils.progress.JobId; -import org.neo4j.gds.core.utils.progress.TaskStore; -import org.neo4j.gds.core.utils.progress.tasks.DepthAwareTaskVisitor; -import org.neo4j.gds.core.utils.progress.tasks.Task; -import org.neo4j.gds.core.utils.progress.tasks.TaskTraversal; -import org.neo4j.gds.procedures.operations.StructuredOutputHelper; +import org.neo4j.gds.procedures.GraphDataScienceProcedures; +import org.neo4j.gds.procedures.operations.ProgressResult; import org.neo4j.procedure.Context; import org.neo4j.procedure.Description; import org.neo4j.procedure.Internal; import org.neo4j.procedure.Name; import org.neo4j.procedure.Procedure; -import org.neo4j.values.storable.LocalTimeValue; -import java.time.Instant; -import java.time.LocalTime; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.neo4j.gds.utils.StringFormatting.formatWithLocale; - -public class ListProgressProc extends BaseProc { - static final int PROGRESS_BAR_LENGTH = 10; +public class ListProgressProc { private static final String DESCRIPTION = "List progress events for currently running tasks."; @Context - public TaskStore taskStore; + public GraphDataScienceProcedures facade; @Internal @Deprecated(forRemoval = true) @Procedure(value = "gds.beta.listProgress", deprecatedBy = "gds.listProgress") @Description(DESCRIPTION) - public Stream betaListProgress( - @Name(value = "jobId", defaultValue = "") String jobId - ) { - executionContext() - .metricsFacade() - .deprecatedProcedures().called("gds.beta.listProgress"); - - executionContext() - .log() - .warn("Procedure `gds.beta.listProgress` has been deprecated, please use `gds.listProgress`."); + public Stream betaListProgress(@Name(value = "jobId", defaultValue = "") String jobId) { + facade.deprecatedProcedures().called("gds.beta.listProgress"); + facade.log().warn("Procedure `gds.beta.listProgress` has been deprecated, please use `gds.listProgress`."); return listProgress(jobId); } @Procedure("gds.listProgress") @Description(DESCRIPTION) - public Stream listProgress( - @Name(value = "jobId", defaultValue = "") String jobId - ) { - return jobId.isBlank() - ? jobsSummaryView() - : jobDetailView(jobId); - } - - private Stream jobsSummaryView() { - if (isGdsAdmin()) { - return taskStore.query().map(ProgressResult::fromTaskStoreEntry); - } else { - return taskStore.query(username()).map(ProgressResult::fromTaskStoreEntry); - } - } - - private Stream jobDetailView(String jobIdAsString) { - var jobId = new JobId(jobIdAsString); - - if (isGdsAdmin()) { - var progressResults = taskStore - .query(jobId) - .flatMap(ListProgressProc::jobProgress) - .collect(Collectors.toList()); - - if (progressResults.isEmpty()) { - throw new IllegalArgumentException(formatWithLocale( - "No task with job id `%s` was found.", - jobIdAsString - )); - } - - return progressResults.stream(); - } else { - return taskStore.query(username(), jobId).map(ListProgressProc::jobProgress).orElseThrow( - () -> new IllegalArgumentException(formatWithLocale( - "No task with job id `%s` was found.", - jobIdAsString - )) - ); - } - } - - private static Stream jobProgress(TaskStore.UserTask userTask) { - var jobProgressVisitor = new JobProgressVisitor(userTask.jobId(), userTask.username()); - TaskTraversal.visitPreOrderWithDepth(userTask.task(), jobProgressVisitor); - return jobProgressVisitor.progressRowsStream(); - } - - @SuppressWarnings("unused") - public static class ProgressResult { - public String username; - public String jobId; - public String taskName; - public String progress; - public String progressBar; - public String status; - public LocalTimeValue timeStarted; - public String elapsedTime; - - static ProgressResult fromTaskStoreEntry(String username, Map.Entry taskStoreEntry) { - var jobId = taskStoreEntry.getKey(); - var task = taskStoreEntry.getValue(); - return new ProgressResult(username, task, jobId, task.description()); - } - - static ProgressResult fromTaskStoreEntry(TaskStore.UserTask userTask) { - return new ProgressResult(userTask.username(), userTask.task(), userTask.jobId(), userTask.task().description()); - } - - static ProgressResult fromTaskWithDepth(String username, Task task, JobId jobId, int depth) { - var treeViewTaskName = StructuredOutputHelper.treeViewDescription(task.description(), depth); - return new ProgressResult(username, task, jobId, treeViewTaskName); - } - - public ProgressResult(String username, Task task, JobId jobId, String taskName) { - var progressContainer = task.getProgress(); - - this.jobId = jobId.asString(); - this.taskName = taskName; - this.username = username; - this.progress = StructuredOutputHelper.computeProgress(progressContainer); - this.progressBar = StructuredOutputHelper.progressBar(progressContainer, PROGRESS_BAR_LENGTH); - this.status = task.status().name(); - this.timeStarted = localTimeValue(task); - this.elapsedTime = prettyElapsedTime(task); - } - - private LocalTimeValue localTimeValue(Task task) { - if (task.hasNotStarted()) { - return null; - } - return LocalTimeValue.localTime(LocalTime.ofInstant( - Instant.ofEpochMilli(task.startTime()), - ZoneId.systemDefault() - )); - } - - private String prettyElapsedTime(Task task) { - if (task.hasNotStarted()) { - return "Not yet started"; - } - var finishTime = task.finishTime(); - var finishTimeOrNow = finishTime != -1 - ? finishTime - : ClockService.clock().millis(); - var elapsedTime = finishTimeOrNow - task.startTime(); - return DurationFormatUtils.formatDurationWords(elapsedTime, true, true); - } - } - - public static class JobProgressVisitor extends DepthAwareTaskVisitor { - - private final JobId jobId; - private final String username; - private final List progressRows; - - JobProgressVisitor(JobId jobId, String username) { - this.jobId = jobId; - this.username = username; - this.progressRows = new ArrayList<>(); - } - - Stream progressRowsStream() { - return this.progressRows.stream(); - } - - @Override - public void visit(Task task) { - progressRows.add(ProgressResult.fromTaskWithDepth(username, task, jobId, depth())); - } + public Stream listProgress(@Name(value = "jobId", defaultValue = "") String jobId) { + return facade.operations().listProgress(jobId); } } diff --git a/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/DefaultResultRenderer.java b/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/DefaultResultRenderer.java index 6d1b4629be..d8210abbe1 100644 --- a/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/DefaultResultRenderer.java +++ b/procedures/operations-facade/src/main/java/org/neo4j/gds/procedures/operations/DefaultResultRenderer.java @@ -59,7 +59,7 @@ private IllegalArgumentException createException() { return new IllegalArgumentException( formatWithLocale( "No task with job id `%s` was found.", - jobId + jobId.asString() ) ); }