Skip to content

Commit

Permalink
Migrate PageRank mutate to facade
Browse files Browse the repository at this point in the history
  • Loading branch information
vnickolov committed Nov 22, 2023
1 parent d3a1cca commit 895267e
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.neo4j.gds.algorithms.AlgorithmComputationResult;
import org.neo4j.gds.algorithms.NodePropertyMutateResult;
import org.neo4j.gds.algorithms.centrality.specificfields.DefaultCentralitySpecificFields;
import org.neo4j.gds.algorithms.centrality.specificfields.PageRankSpecificFields;
import org.neo4j.gds.algorithms.mutateservices.MutateNodePropertyService;
import org.neo4j.gds.algorithms.runner.AlgorithmRunner;
import org.neo4j.gds.betweenness.BetweennessCentralityMutateConfig;
Expand All @@ -30,6 +31,8 @@
import org.neo4j.gds.core.concurrency.DefaultPool;
import org.neo4j.gds.degree.DegreeCentralityMutateConfig;
import org.neo4j.gds.harmonic.HarmonicCentralityMutateConfig;
import org.neo4j.gds.pagerank.PageRankMutateConfig;
import org.neo4j.gds.pagerank.PageRankResult;
import org.neo4j.gds.result.CentralityStatistics;

import java.util.function.Supplier;
Expand Down Expand Up @@ -121,20 +124,52 @@ public NodePropertyMutateResult<DefaultCentralitySpecificFields> harmonicCentral
);
}

public NodePropertyMutateResult<PageRankSpecificFields> pageRank(
String graphName,
PageRankMutateConfig configuration,
boolean shouldComputeCentralityDistribution
) {
// 1. Run the algorithm and time the execution
var intermediateResult = runWithTiming(
() -> centralityAlgorithmsFacade.pageRank(graphName, configuration)
);

CentralityFunctionSupplier<PageRankResult> centralityFunctionSupplier = CentralityAlgorithmResult::centralityScoreProvider;
SpecificFieldsWithCentralityDistributionSupplier<PageRankResult, PageRankSpecificFields> specificFieldsSupplier = (r, c) -> new PageRankSpecificFields(
r.iterations(),
r.didConverge(),
c
);
Supplier<PageRankSpecificFields> emptyASFSupplier = () -> PageRankSpecificFields.EMPTY;

NodePropertyValuesMapper<PageRankResult> nodePropertyValuesMapper = PageRankResult::nodePropertyValues;

return mutateNodeProperty(
intermediateResult.algorithmResult,
configuration,
centralityFunctionSupplier,
nodePropertyValuesMapper,
specificFieldsSupplier,
shouldComputeCentralityDistribution,
intermediateResult.computeMilliseconds,
emptyASFSupplier
);
}


<RESULT extends CentralityAlgorithmResult, CONFIG extends MutateNodePropertyConfig> NodePropertyMutateResult<DefaultCentralitySpecificFields> mutateNodeProperty(
private <RESULT extends CentralityAlgorithmResult, CONFIG extends MutateNodePropertyConfig> NodePropertyMutateResult<DefaultCentralitySpecificFields> mutateNodeProperty(
AlgorithmComputationResult<RESULT> algorithmResult,
CONFIG configuration,
boolean shouldComputeCentralityDistribution,
long computeMilliseconds
) {

CentralityFunctionSupplier<RESULT> centralityFunctionSupplier = (r) -> r.centralityScoreProvider();
CentralityFunctionSupplier<RESULT> centralityFunctionSupplier = CentralityAlgorithmResult::centralityScoreProvider;
SpecificFieldsWithCentralityDistributionSupplier<RESULT, DefaultCentralitySpecificFields> specificFieldsSupplier = (r, c) -> new DefaultCentralitySpecificFields(
c);
Supplier<DefaultCentralitySpecificFields> emptyASFSupplier = () -> DefaultCentralitySpecificFields.EMPTY;

NodePropertyValuesMapper<RESULT> nodePropertyValuesMapper = (r) -> r.nodePropertyValues();
NodePropertyValuesMapper<RESULT> nodePropertyValuesMapper = CentralityAlgorithmResult::nodePropertyValues;

return mutateNodeProperty(
algorithmResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.neo4j.gds.BaseProc;
import org.neo4j.gds.executor.MemoryEstimationExecutor;
import org.neo4j.gds.executor.ProcedureExecutor;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankMutateResult;
import org.neo4j.gds.results.MemoryEstimateResult;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
Expand All @@ -37,7 +38,7 @@ public class ArticleRankMutateProc extends BaseProc {

@Procedure(value = "gds.articleRank.mutate", mode = READ)
@Description(ARTICLE_RANK_DESCRIPTION)
public Stream<MutateResult> mutate(
public Stream<PageRankMutateResult> mutate(
@Name(value = "graphName") String graphName,
@Name(value = "configuration", defaultValue = "{}") Map<String, Object> configuration
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.neo4j.gds.BaseProc;
import org.neo4j.gds.executor.MemoryEstimationExecutor;
import org.neo4j.gds.executor.ProcedureExecutor;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankMutateResult;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankProcCompanion;
import org.neo4j.gds.results.MemoryEstimateResult;
import org.neo4j.procedure.Description;
Expand All @@ -37,7 +38,7 @@ public class EigenvectorMutateProc extends BaseProc {

@Procedure(value = "gds.eigenvector.mutate", mode = READ)
@Description(PageRankProcCompanion.EIGENVECTOR_DESCRIPTION)
public Stream<MutateResult> mutate(
public Stream<PageRankMutateResult> mutate(
@Name(value = "graphName") String graphName,
@Name(value = "configuration", defaultValue = "{}") Map<String, Object> configuration
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import org.neo4j.gds.BaseProc;
import org.neo4j.gds.executor.MemoryEstimationExecutor;
import org.neo4j.gds.executor.ProcedureExecutor;
import org.neo4j.gds.procedures.GraphDataScience;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankMutateResult;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankProcCompanion;
import org.neo4j.gds.results.MemoryEstimateResult;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
Expand All @@ -35,16 +37,16 @@

public class PageRankMutateProc extends BaseProc {

@Context
public GraphDataScience facade;

@Procedure(value = "gds.pageRank.mutate", mode = READ)
@Description(PageRankProcCompanion.PAGE_RANK_DESCRIPTION)
public Stream<MutateResult> mutate(
public Stream<PageRankMutateResult> mutate(
@Name(value = "graphName") String graphName,
@Name(value = "configuration", defaultValue = "{}") Map<String, Object> configuration
) {
return new ProcedureExecutor<>(
new PageRankMutateSpec(),
executionContext()
).compute(graphName, configuration);
return facade.centrality().pageRankMutate(graphName, configuration);
}

@Procedure(value = "gds.pageRank.mutate.estimate", mode = READ)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.gds.executor.ExecutionContext;
import org.neo4j.gds.executor.GdsCallable;
import org.neo4j.gds.executor.NewConfigFunction;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankMutateResult;
import org.neo4j.gds.result.AbstractResultBuilder;

import java.util.List;
Expand All @@ -37,7 +38,7 @@
import static org.neo4j.gds.procedures.centrality.pagerank.PageRankProcCompanion.PAGE_RANK_DESCRIPTION;

@GdsCallable(name = "gds.pageRank.mutate", description = PAGE_RANK_DESCRIPTION, executionMode = MUTATE_NODE_PROPERTY)
public class PageRankMutateSpec implements AlgorithmSpec<PageRankAlgorithm, PageRankResult,PageRankMutateConfig,Stream<MutateResult>,PageRankAlgorithmFactory<PageRankMutateConfig>> {
public class PageRankMutateSpec implements AlgorithmSpec<PageRankAlgorithm, PageRankResult,PageRankMutateConfig,Stream<PageRankMutateResult>,PageRankAlgorithmFactory<PageRankMutateConfig>> {

@Override
public String name() {
Expand All @@ -55,7 +56,7 @@ public NewConfigFunction<PageRankMutateConfig> newConfigFunction() {
}

@Override
public ComputationResultConsumer<PageRankAlgorithm, PageRankResult, PageRankMutateConfig, Stream<MutateResult>> computationResultConsumer() {
public ComputationResultConsumer<PageRankAlgorithm, PageRankResult, PageRankMutateConfig, Stream<PageRankMutateResult>> computationResultConsumer() {
return new MutatePropertyComputationResultConsumer<>(
computationResult ->
List.of(
Expand All @@ -70,11 +71,11 @@ public ComputationResultConsumer<PageRankAlgorithm, PageRankResult, PageRankMuta
);
}

private AbstractResultBuilder<MutateResult> resultBuilder(
private AbstractResultBuilder<PageRankMutateResult> resultBuilder(
ComputationResult<PageRankAlgorithm, PageRankResult, PageRankMutateConfig> computationResult,
ExecutionContext executionContext
) {
var builder = new MutateResult.Builder(
var builder = new PageRankMutateResult.Builder(
executionContext.returnColumns(),
computationResult.config().concurrency()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
import org.neo4j.gds.harmonic.HarmonicCentralityWriteConfig;
import org.neo4j.gds.procedures.centrality.alphaharmonic.AlphaHarmonicStreamResult;
import org.neo4j.gds.procedures.centrality.alphaharmonic.AlphaHarmonicWriteResult;
import org.neo4j.gds.pagerank.PageRankMutateConfig;
import org.neo4j.gds.pagerank.PageRankStatsConfig;
import org.neo4j.gds.pagerank.PageRankStreamConfig;
import org.neo4j.gds.procedures.centrality.betacloseness.BetaClosenessCentralityMutateResult;
import org.neo4j.gds.procedures.centrality.betacloseness.BetaClosenessCentralityWriteResult;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankComputationalResultTransformer;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankMutateResult;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankStatsResult;
import org.neo4j.gds.procedures.configparser.ConfigurationParser;
import org.neo4j.gds.results.MemoryEstimateResult;
Expand Down Expand Up @@ -519,6 +521,22 @@ public Stream<PageRankStatsResult> pageRankStats(
}


public Stream<PageRankMutateResult> pageRankMutate(
String graphName,
Map<String, Object> configuration
) {
var config = createConfig(configuration, PageRankMutateConfig::of);

var computationResult = mutateBusinessFacade.pageRank(
graphName,
config,
procedureReturnColumns.contains("centralityDistribution")
);

return Stream.of(PageRankComputationalResultTransformer.toMutateResult(computationResult, config));
}


// ################################################################################################################

// FIXME: the following two methods are duplicate, find a good place for them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
*/
package org.neo4j.gds.procedures.centrality.pagerank;

import org.neo4j.gds.algorithms.NodePropertyMutateResult;
import org.neo4j.gds.algorithms.StatsResult;
import org.neo4j.gds.algorithms.centrality.specificfields.PageRankSpecificFields;
import org.neo4j.gds.pagerank.PageRankMutateConfig;
import org.neo4j.gds.pagerank.PageRankStatsConfig;

public final class PageRankComputationalResultTransformer {
Expand All @@ -29,6 +31,7 @@ private PageRankComputationalResultTransformer() {}

public static PageRankStatsResult toStatsResult(
StatsResult<PageRankSpecificFields> computationResult,
@SuppressWarnings("TypeMayBeWeakened")
PageRankStatsConfig configuration
) {
return new PageRankStatsResult(
Expand All @@ -42,4 +45,22 @@ public static PageRankStatsResult toStatsResult(
);
}

public static PageRankMutateResult toMutateResult(
NodePropertyMutateResult<PageRankSpecificFields> computationResult,
@SuppressWarnings("TypeMayBeWeakened")
PageRankMutateConfig config
) {

return new PageRankMutateResult(
computationResult.algorithmSpecificFields().ranIterations(),
computationResult.algorithmSpecificFields().didConverge(),
computationResult.algorithmSpecificFields().centralityDistribution(),
computationResult.preProcessingMillis(),
computationResult.computeMillis(),
computationResult.postProcessingMillis(),
computationResult.mutateMillis(),
computationResult.nodePropertiesWritten(),
config.toMap()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.pagerank;
package org.neo4j.gds.procedures.centrality.pagerank;

import org.jetbrains.annotations.Nullable;
import org.neo4j.gds.api.ProcedureReturnColumns;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankProcCompanion;
import org.neo4j.gds.procedures.centrality.pagerank.PageRankStatsResult;

import java.util.Map;

@SuppressWarnings("unused")
public final class MutateResult extends PageRankStatsResult {
public final class PageRankMutateResult extends PageRankStatsResult {

public final long mutateMillis;
public final long nodePropertiesWritten;

MutateResult(
PageRankMutateResult(
long ranIterations,
boolean didConverge,
@Nullable Map<String, Object> centralityDistribution,
Expand All @@ -57,15 +55,15 @@ public final class MutateResult extends PageRankStatsResult {
}


static class Builder extends PageRankProcCompanion.PageRankResultBuilder<MutateResult> {
public static class Builder extends PageRankProcCompanion.PageRankResultBuilder<PageRankMutateResult> {

Builder(ProcedureReturnColumns returnColumns, int concurrency) {
public Builder(ProcedureReturnColumns returnColumns, int concurrency) {
super(returnColumns, concurrency);
}

@Override
public MutateResult buildResult() {
return new MutateResult(
public PageRankMutateResult buildResult() {
return new PageRankMutateResult(
ranIterations,
didConverge,
centralityHistogram,
Expand Down

0 comments on commit 895267e

Please sign in to comment.