Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Features safe time #156

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
2 changes: 1 addition & 1 deletion chain/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies {
implementation project(':db:core')
implementation project(':pow:core')
api project(':ssz')
implementation project(':util')
implementation project(':util:core')

implementation 'com.google.guava:guava'
implementation 'io.projectreactor:reactor-core'
Expand Down
2 changes: 1 addition & 1 deletion consensus/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ dependencies {
implementation project(':core')
implementation project(':crypto')
implementation project(':ssz')
implementation project(':util')
implementation project(':util:core')

implementation 'com.google.guava:guava'

Expand Down
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dependencies {
implementation project(':types')
api project(':types')
implementation project(':crypto')
implementation project(':ssz')

Expand Down
2 changes: 1 addition & 1 deletion pow/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ dependencies {
implementation project(':core')
implementation project(':consensus')
implementation project(':ssz')
implementation project(':util')
implementation project(':util:core')
implementation project(':crypto')

implementation 'io.projectreactor:reactor-core'
Expand Down
2 changes: 1 addition & 1 deletion pow/ethereumj/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies {
implementation project(':types')
implementation project(':core')
implementation project(':consensus')
implementation project(':util')
implementation project(':util:core')
implementation ("org.ethereum:ethereumj-core") {
changing = true

Expand Down
2 changes: 1 addition & 1 deletion pow/validator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dependencies {
implementation project(':chain')
implementation project(':consensus')
implementation project(':validator')
implementation project(':util')
implementation project(':util:core')
implementation project(':db:core')

implementation 'io.projectreactor:reactor-core'
Expand Down
4 changes: 3 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ include 'test'
// Strict types definition
include 'types'
// Standalone utils without any relation to Ethereum 2.0
include 'util'
include 'util:core'
// Eth2.0 time utilities
include 'util:time'
// Validator services
include 'validator'
// Wire API mock
Expand Down
2 changes: 1 addition & 1 deletion ssz/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apply plugin: 'java-library'

dependencies {
implementation project(':types')
implementation project(':util')
implementation project(':util:core')
implementation 'net.consensys.cava:cava-ssz'
implementation 'net.consensys.cava:cava-units'

Expand Down
2 changes: 1 addition & 1 deletion start/benchmaker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ application {
dependencies {
implementation project(':types')
implementation project(':wire')
implementation project(':util')
implementation project(':util:core')
implementation project(':start:common')
implementation project(':start:config')
implementation project(':crypto')
Expand Down
2 changes: 1 addition & 1 deletion start/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies {
implementation project(':ssz')
implementation project(':validator')
implementation project(':wire')
implementation project(':util')
implementation project(':util:core')

implementation 'com.google.guava:guava'
implementation 'io.projectreactor:reactor-core'
Expand Down
2 changes: 1 addition & 1 deletion start/config/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ dependencies {
implementation project(':consensus')
implementation project(':crypto')
implementation project(':types')
implementation project(':util')
implementation project(':util:core')

implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand Down
2 changes: 1 addition & 1 deletion start/node/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ application {
dependencies {
implementation project(':types')
implementation project(':wire')
implementation project(':util')
implementation project(':util:core')
implementation project(':start:common')
implementation project(':start:config')
implementation project(':crypto')
Expand Down
2 changes: 1 addition & 1 deletion start/simulator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ createScript(project, 'org.ethereum.beacon.simulator.Simulator', 'simulator')
dependencies {
implementation project(':types')
implementation project(':wire')
implementation project(':util')
implementation project(':util:core')
implementation project(':start:common')
implementation project(':start:config')
implementation project(':crypto')
Expand Down
2 changes: 1 addition & 1 deletion test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies {
testImplementation project(':db:core')
testImplementation project(':chain')
testImplementation project(':start:simulator')
testImplementation project(':util')
testImplementation project(':util:core')
testImplementation project(':pow:core')
}

Expand Down
5 changes: 0 additions & 5 deletions util/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
dependencies {
implementation 'io.projectreactor:reactor-core'
implementation 'com.google.guava:guava'
implementation 'commons-beanutils:commons-beanutils'
}
5 changes: 5 additions & 0 deletions util/core/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
dependencies {
implementation 'io.projectreactor:reactor-core'
implementation 'com.google.guava:guava'
implementation 'commons-beanutils:commons-beanutils'
}
12 changes: 12 additions & 0 deletions util/time/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dependencies {
implementation project(':util:core')
implementation project(':core')

implementation 'io.projectreactor:reactor-core'
implementation 'commons-net:commons-net'
implementation 'org.apache.logging.log4j:log4j-core'

testImplementation project(':consensus')
testImplementation project(':ssz')
testImplementation 'junit:junit'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.ethereum.beacon.time;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.core.types.Time;
import org.ethereum.beacon.schedulers.Scheduler;
import org.ethereum.beacon.stream.SimpleProcessor;
import org.ethereum.beacon.time.provider.NetworkTime;
import org.ethereum.beacon.time.provider.StatisticsTime;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.concurrent.atomic.AtomicLong;

/**
* Strategy which prioritize {@link NetworkTime} but if the delta between {@link NetworkTime} and
* {@link StatisticsTime} goes above allowedDelta, it uses {@link StatisticsTime} as time provider.
*/
public class NetworkFirstStrategy implements TimeStrategy {
private final SimpleProcessor<Time> timeProcessor;
private final AtomicLong latestNetwork = new AtomicLong(-1);
private final AtomicLong latestStatistics = new AtomicLong(-1);
private static final Logger logger = LogManager.getLogger("time");

public NetworkFirstStrategy(
Scheduler scheduler,
NetworkTime networkTime,
StatisticsTime statisticsTime,
int allowedDelta) {
this.timeProcessor = new SimpleProcessor<Time>(scheduler, "NetworkFirstStrategy");
Flux.from(networkTime.getTimeStream())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we have a component with two inbound streams and a single processor that handles them both?

.subscribe(
t -> {
this.latestNetwork.set(t.getValue());
if (latestStatistics.get() == -1) {
return;
}
if (Math.abs(latestNetwork.get() - latestStatistics.get()) <= allowedDelta) {
timeProcessor.onNext(t);
}
});
Flux.from(statisticsTime.getTimeStream())
.subscribe(
t -> {
this.latestStatistics.set(t.getValue());
if (latestNetwork.get() == -1) {
return;
}
if (Math.abs(latestNetwork.get() - latestStatistics.get()) > allowedDelta) {
logger.trace(() -> String.format("Using time from statistics time provider: %s", t.getValue()));
timeProcessor.onNext(t);
}
});
}

@Override
public Publisher<Time> getTimeStream() {
return timeProcessor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.ethereum.beacon.time;

import org.ethereum.beacon.core.types.Time;
import org.ethereum.beacon.time.provider.StatisticsTime;
import org.reactivestreams.Publisher;

public class StatisticsStrategy implements TimeStrategy {
private final StatisticsTime statisticsTime;

public StatisticsStrategy(StatisticsTime statisticsTime) {
this.statisticsTime = statisticsTime;
}

@Override
public Publisher<Time> getTimeStream() {
return statisticsTime.getTimeStream();
}
}
12 changes: 12 additions & 0 deletions util/time/src/main/java/org/ethereum/beacon/time/TimeStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.ethereum.beacon.time;

import org.ethereum.beacon.core.types.Time;
import org.reactivestreams.Publisher;

/**
* High level time supplier, uses several {@link org.ethereum.beacon.time.provider.TimeProvider} to
* produce result time
*/
public interface TimeStrategy {
Publisher<Time> getTimeStream();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.ethereum.beacon.time.mapper;

import org.ethereum.beacon.core.types.Time;
import org.ethereum.beacon.schedulers.Scheduler;
import org.ethereum.beacon.stream.SimpleProcessor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.function.Function;

public class ObjectTimeMapper<T> implements TimeMapper {
private final Function<T, Time> timeFunc;
private final SimpleProcessor<Time> timeProcessor;

public ObjectTimeMapper(
Scheduler scheduler, Publisher<T> objectStream, Function<T, Time> timeFunc) {
this.timeProcessor = new SimpleProcessor<>(scheduler, "TimeMapper");
this.timeFunc = timeFunc;
Flux.from(objectStream).map(this::mapObjectFunc).subscribe(timeProcessor::onNext);
}

Time mapObjectFunc(T obj) {
return timeFunc.apply(obj);
}

@Override
public Publisher<Time> getTimeStream() {
return timeProcessor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.ethereum.beacon.time.mapper;

import org.ethereum.beacon.core.types.Time;
import org.reactivestreams.Publisher;

/** Maps stream of objects with some kind of time info to time stream */
public interface TimeMapper {
Publisher<Time> getTimeStream();
}
Loading