Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Attestation pool take 1: interop #176

Draft
wants to merge 64 commits into
base: interop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
69eef6f
Initial commit of the attestation pool feature
mkalinin Aug 22, 2019
d09e87e
Update signature verification in the pool
mkalinin Aug 23, 2019
47023da
Rework reactor logic in attestation pool
mkalinin Aug 27, 2019
771fe79
Instantiate attestation pool
mkalinin Aug 27, 2019
4a039bb
Check isInitialized in attestation processors if needed
mkalinin Aug 28, 2019
da9fb0e
Add javadocs and polish attestation pool impl
mkalinin Aug 28, 2019
548625c
Continue to polish and document attestation pool
mkalinin Aug 28, 2019
19b7403
Get rid of redundant Input class
mkalinin Aug 28, 2019
4494576
Simplify name of processors in attestaiton pool
mkalinin Aug 28, 2019
817abdd
Add a javadoc to InMemoryAttestationPool class
mkalinin Aug 28, 2019
faf766b
Add attestation pool diagram
mkalinin Aug 28, 2019
e9182d0
Update pool diagram
mkalinin Aug 28, 2019
cb4130e
Rework attestation pool processors
mkalinin Aug 30, 2019
df528e5
JUNIT5: add. Add test draft
eustimenko Sep 2, 2019
a1219ba
Create attestation-pool at test
eustimenko Sep 2, 2019
d394a3b
add reactor tests
eustimenko Sep 2, 2019
23f74e2
pool-test: add verify blockchain creation
eustimenko Sep 2, 2019
5a47137
pool-test: add verify ReceivedAttestation creation
eustimenko Sep 2, 2019
1ea3c89
pool-test: verify SlotsNumber creation
eustimenko Sep 2, 2019
037ba11
pool-test: a little refactroing
eustimenko Sep 2, 2019
0b1e0f1
pool-test: fix parent block chain
eustimenko Sep 2, 2019
0aff0ce
pool-test: add empty checkpoint publisher
eustimenko Sep 2, 2019
7138a49
test: fix NPE to create UnknownAttestationPool
eustimenko Sep 3, 2019
5208599
test: correct verification
eustimenko Sep 4, 2019
b65b8cf
Switch to rocksDB JNI 6.2.2 since it includes compression codecs on W…
ericsson49 Aug 29, 2019
5339f80
Configuration of eth1_block_hash value for EmulatorContract added.
ericsson49 Aug 29, 2019
8bbaacd
Mocked start key pairs generation implemented.
ericsson49 Aug 29, 2019
5020f61
Withdrawal credentials for mocked start generation implemented.
ericsson49 Aug 29, 2019
11b7521
Added a command line option to specify yaml spec constants file (simp…
ericsson49 Aug 29, 2019
bfabbea
Modified default config to match interop settings.
ericsson49 Aug 29, 2019
6c2e407
For interop, MAX_EFFECTIVE_BALANCE should be used as a balance value.
ericsson49 Aug 29, 2019
9a87620
Use BCParameters.ORDER to initialize SimulationKeyPairGenerator#CURVE…
mkalinin Aug 30, 2019
b84de20
test: draft commit
eustimenko Sep 5, 2019
e1f2b1e
test: fix check TimeFrameFilter
eustimenko Sep 5, 2019
ba01970
Implement simple suboptimal attestation churn
mkalinin Sep 5, 2019
2db4a97
Verify attestations against state before aggregation
mkalinin Sep 5, 2019
f9edbd2
test: change out to SimpleProcessor
eustimenko Sep 5, 2019
e669476
Update attestation pool with a bunch of fixes
mkalinin Sep 7, 2019
87676b1
Merge branch 'feature/attestation-pool' of github.com:harmony-dev/bea…
mkalinin Sep 7, 2019
26da60d
Use SimpleProcessor all over attestation pool
mkalinin Sep 7, 2019
c358638
Merge branch 'develop' into feature/attestation-pool
mkalinin Sep 7, 2019
49e95b4
Merge followups
mkalinin Sep 7, 2019
86813ff
Integrate attestation pool into simulator
mkalinin Sep 8, 2019
c00918e
pool: test valid TimeProcessor
eustimenko Sep 10, 2019
11bb800
pool: test valid SanityProcessor
eustimenko Sep 10, 2019
5e0d7bd
pool: correct testing valid/invalid SanityProcessor
eustimenko Sep 10, 2019
6f46160
pool: test DoubleWorkProcessor
eustimenko Sep 10, 2019
5ab558c
pool: test SignatureEncodingChecker
eustimenko Sep 10, 2019
441dc74
Merge branch 'feature/attestation-pool' of github.com:harmony-dev/bea…
mkalinin Sep 11, 2019
f4ba71f
pool: test processors by one test
eustimenko Sep 11, 2019
5ecceca
pool: SignatureEncodingProcessorTest
eustimenko Sep 11, 2019
eda3012
test: correct test attestation to be valid
eustimenko Sep 12, 2019
85b5f68
test: IdentificationProcessor
eustimenko Sep 12, 2019
a1c2751
test: initialize tuple storage
eustimenko Sep 17, 2019
01c0c7c
test: remove inmemory attestation pool test
eustimenko Sep 18, 2019
33f0629
Modify update finality to comply with the fork choice spec.
ericsson49 Sep 18, 2019
94cd7be
Modify get_head to comply with the fork_choice spec: add tie break.
ericsson49 Sep 18, 2019
07d2003
attestation-pool: update eth2.0-spec-tests
eustimenko Sep 19, 2019
4c6541d
Modify update finality to comply with the fork choice spec.
ericsson49 Sep 18, 2019
67541c0
Modify get_head to comply with the fork_choice spec: add tie break.
ericsson49 Sep 18, 2019
122e9af
Merge remote-tracking branch 'origin/fix/fork-choice-issues' into fix…
ericsson49 Sep 19, 2019
1bc6ce9
Check that BytesValue are compared the same way in Java and in python3
ericsson49 Sep 20, 2019
cf24e15
Replace string comparison with bytes' in get_winning_crosslink_and_at…
mkalinin Sep 23, 2019
8d653af
Merge pull request #199 from harmony-dev/fix/fork-choice-issues
mkalinin Sep 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.ethereum.beacon.chain.pool;

import java.util.List;
import org.ethereum.beacon.chain.pool.checker.SanityChecker;
import org.ethereum.beacon.chain.pool.checker.SignatureEncodingChecker;
import org.ethereum.beacon.chain.pool.checker.TimeFrameFilter;
import org.ethereum.beacon.chain.pool.churn.OffChainAggregates;
import org.ethereum.beacon.chain.pool.reactor.ChurnProcessor;
import org.ethereum.beacon.chain.pool.reactor.DoubleWorkProcessor;
import org.ethereum.beacon.chain.pool.reactor.IdentificationProcessor;
import org.ethereum.beacon.chain.pool.reactor.SanityProcessor;
import org.ethereum.beacon.chain.pool.reactor.SignatureEncodingProcessor;
import org.ethereum.beacon.chain.pool.reactor.TimeProcessor;
import org.ethereum.beacon.chain.pool.reactor.VerificationProcessor;
import org.ethereum.beacon.chain.pool.registry.ProcessedAttestations;
Expand All @@ -15,13 +18,11 @@
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.state.Checkpoint;
import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.schedulers.Scheduler;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.stream.Fluxes;
import org.ethereum.beacon.stream.Fluxes.FluxSplit;
import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;

/**
* An implementation of attestation pool based on <a href="https://projectreactor.io/">Reactor</a>
Expand All @@ -36,16 +37,17 @@ public class InMemoryAttestationPool implements AttestationPool {
private final Publisher<BeaconBlock> chainHeads;
private final Schedulers schedulers;

private final TimeProcessor timeProcessor;
private final SanityProcessor sanityChecker;
private final TimeFrameFilter timeFrameFilter;
private final SanityChecker sanityChecker;
private final SignatureEncodingChecker encodingChecker;
private final ProcessedAttestations processedFilter;
private final IdentificationProcessor identifier;
private final VerificationProcessor verifier;
private final ChurnProcessor churn;
private final UnknownAttestationPool unknownPool;
private final BatchVerifier verifier;

private final DirectProcessor<ReceivedAttestation> invalidAttestations = DirectProcessor.create();
private final DirectProcessor<ReceivedAttestation> validAttestations = DirectProcessor.create();
private final DirectProcessor<ReceivedAttestation> unknownAttestations = DirectProcessor.create();
private final DirectProcessor<OffChainAggregates> offChainAggregates = DirectProcessor.create();

public InMemoryAttestationPool(
Publisher<ReceivedAttestation> source,
Expand All @@ -58,82 +60,85 @@ public InMemoryAttestationPool(
SanityChecker sanityChecker,
SignatureEncodingChecker encodingChecker,
ProcessedAttestations processedFilter,
UnknownAttestationPool unknownAttestationPool,
UnknownAttestationPool unknownPool,
BatchVerifier batchVerifier) {
this.source = source;
this.newSlots = newSlots;
this.finalizedCheckpoints = finalizedCheckpoints;
this.importedBlocks = importedBlocks;
this.chainHeads = chainHeads;
this.schedulers = schedulers;
this.timeProcessor = new TimeProcessor(timeFrameFilter);
this.sanityChecker = new SanityProcessor(sanityChecker);
this.timeFrameFilter = timeFrameFilter;
this.sanityChecker = sanityChecker;
this.encodingChecker = encodingChecker;
this.processedFilter = processedFilter;
this.identifier = new IdentificationProcessor(unknownAttestationPool);
this.verifier = new VerificationProcessor(batchVerifier);
this.churn = new ChurnProcessor();
this.unknownPool = unknownPool;
this.verifier = batchVerifier;
}

@Override
public void start() {
Scheduler executor =
schedulers.newParallelDaemon("attestation-pool-%d", AttestationPool.MAX_THREADS);

Flux<?> sourceFx = Flux.from(source).publishOn(executor.toReactor());
Flux<?> newSlotsFx = Flux.from(newSlots).publishOn(executor.toReactor());
Flux<?> importedBlocksFx = Flux.from(importedBlocks).publishOn(executor.toReactor());
Flux<?> finalizedCheckpointsFx =
Flux.from(finalizedCheckpoints).publishOn(executor.toReactor());
Flux<?> chainHeadsFx = Flux.from(chainHeads).publishOn(executor.toReactor());

// start from time frame processor
Flux.merge(sourceFx, newSlotsFx, finalizedCheckpointsFx).subscribe(timeProcessor);
FluxSplit<CheckedAttestation> timeFrameOut =
Fluxes.split(timeProcessor, CheckedAttestation::isPassed);

// subscribe sanity checker
Flux.merge(
timeFrameOut.getSatisfied().map(CheckedAttestation::getAttestation),
finalizedCheckpointsFx)
.subscribe(sanityChecker);
FluxSplit<CheckedAttestation> sanityOut =
Fluxes.split(sanityChecker, CheckedAttestation::isPassed);

// filter already processed attestations
Flux<ReceivedAttestation> newAttestations =
sanityOut
.getSatisfied()
.map(CheckedAttestation::getAttestation)
.filter(processedFilter::add);
Scheduler parallelExecutor =
schedulers
.newParallelDaemon("attestation-pool-%d", AttestationPool.MAX_THREADS)
.toReactor();

// create sources
Flux<ReceivedAttestation> sourceFx = Flux.from(source);
Flux<SlotNumber> newSlotsFx = Flux.from(newSlots);
Flux<Checkpoint> finalizedCheckpointsFx = Flux.from(finalizedCheckpoints);
Flux<BeaconBlock> importedBlocksFx = Flux.from(importedBlocks);
Flux<BeaconBlock> chainHeadsFx = Flux.from(chainHeads);

// check time frames
TimeProcessor timeProcessor =
new TimeProcessor(
timeFrameFilter, schedulers, sourceFx, finalizedCheckpointsFx, newSlotsFx);

// run sanity check
SanityProcessor sanityProcessor =
new SanityProcessor(sanityChecker, schedulers, timeProcessor, finalizedCheckpointsFx);

// discard already processed attestations
DoubleWorkProcessor doubleWorkProcessor =
new DoubleWorkProcessor(processedFilter, schedulers, sanityProcessor.getValid());

// check signature encoding
FluxSplit<ReceivedAttestation> encodingCheckOut =
Fluxes.split(newAttestations, encodingChecker::check);
SignatureEncodingProcessor encodingProcessor =
new SignatureEncodingProcessor(
encodingChecker, doubleWorkProcessor.publishOn(parallelExecutor));

// identify attestation target
Flux.merge(encodingCheckOut.getSatisfied(), newSlotsFx, importedBlocksFx).subscribe(identifier);
IdentificationProcessor identificationProcessor =
new IdentificationProcessor(
unknownPool, schedulers, encodingProcessor.getValid(), newSlotsFx, importedBlocksFx);

// verify attestations
identifier.bufferTimeout(VERIFIER_BUFFER_SIZE, VERIFIER_INTERVAL).subscribe(verifier);
FluxSplit<CheckedAttestation> verifierOut =
Fluxes.split(verifier, CheckedAttestation::isPassed);
Flux<List<ReceivedAttestation>> verificationThrottle =
identificationProcessor
.getIdentified()
.publishOn(parallelExecutor)
.bufferTimeout(VERIFIER_BUFFER_SIZE, VERIFIER_INTERVAL);
VerificationProcessor verificationProcessor =
new VerificationProcessor(verifier, verificationThrottle);

// feed churn
Flux.merge(
verifierOut.getSatisfied().map(CheckedAttestation::getAttestation),
newSlotsFx,
chainHeadsFx);
ChurnProcessor churnProcessor = new ChurnProcessor(schedulers, chainHeadsFx, newSlotsFx);

Scheduler outScheduler = schedulers.events().toReactor();
// expose valid attestations
verificationProcessor.getValid().publishOn(outScheduler).subscribe(validAttestations);
// expose not yet identified
identificationProcessor.getUnknown().publishOn(outScheduler).subscribe(unknownAttestations);
// expose aggregates
churnProcessor.publishOn(outScheduler).subscribe(offChainAggregates);
// expose invalid attestations
Flux.merge(
sanityOut.getUnsatisfied().map(CheckedAttestation::getAttestation),
encodingCheckOut.getUnsatisfied(),
verifierOut.getUnsatisfied().map(CheckedAttestation::getAttestation))
sanityProcessor.getInvalid(),
encodingProcessor.getInvalid(),
verificationProcessor.getInvalid())
.publishOn(outScheduler)
.subscribe(invalidAttestations);

// expose valid attestations
verifierOut.getSatisfied().map(CheckedAttestation::getAttestation).subscribe(validAttestations);
}

@Override
Expand All @@ -148,11 +153,11 @@ public Publisher<ReceivedAttestation> getInvalid() {

@Override
public Publisher<ReceivedAttestation> getUnknownAttestations() {
return identifier.getUnknownAttestations();
return unknownAttestations;
}

@Override
public Publisher<OffChainAggregates> getAggregates() {
return churn;
return offChainAggregates;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
package org.ethereum.beacon.chain.pool.reactor;

import org.ethereum.beacon.chain.pool.ReceivedAttestation;
import org.ethereum.beacon.chain.pool.churn.OffChainAggregates;
import org.ethereum.beacon.stream.AbstractDelegateProcessor;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.stream.OutsourcePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;

public class ChurnProcessor
extends AbstractDelegateProcessor<Object, OffChainAggregates> {
public class ChurnProcessor extends Flux<OffChainAggregates> {

@Override
protected void hookOnNext(Object value) {
private final OutsourcePublisher<OffChainAggregates> out = new OutsourcePublisher<>();

public ChurnProcessor(
Schedulers schedulers, Flux<BeaconBlock> chainHeads, Flux<SlotNumber> newSlots) {}

protected void hookOnNext(ReceivedAttestation value) {
// TODO implement
}

@Override
public void subscribe(CoreSubscriber<? super OffChainAggregates> actual) {
out.subscribe(actual);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.ethereum.beacon.chain.pool.reactor;

import org.ethereum.beacon.chain.pool.ReceivedAttestation;
import org.ethereum.beacon.chain.pool.registry.ProcessedAttestations;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.stream.OutsourcePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;

/**
* Passes attestations through {@link ProcessedAttestations} filter.
*
* <p>Input:
*
* <ul>
* <li>attestations
* </ul>
*
* <p>Output:
*
* <ul>
* <li>Not yet processed attestations
* </ul>
*/
public class DoubleWorkProcessor extends Flux<ReceivedAttestation> {

private final ProcessedAttestations processedAttestations;
private final OutsourcePublisher<ReceivedAttestation> out = new OutsourcePublisher<>();

public DoubleWorkProcessor(
ProcessedAttestations processedAttestations,
Schedulers schedulers,
Flux<ReceivedAttestation> source) {
this.processedAttestations = processedAttestations;

Scheduler scheduler = schedulers.newSingleThreadDaemon("pool-double-work").toReactor();
source.publishOn(scheduler).subscribe(this::hookOnNext);
}

private void hookOnNext(ReceivedAttestation attestation) {
if (!processedAttestations.add(attestation)) {
out.publishOut(attestation);
}
}

@Override
public void subscribe(CoreSubscriber<? super ReceivedAttestation> actual) {
out.subscribe(actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import org.ethereum.beacon.chain.pool.registry.UnknownAttestationPool;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.stream.AbstractDelegateProcessor;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.FluxSink;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.stream.OutsourcePublisher;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;

/**
* Processor throttling attestations through {@link UnknownAttestationPool}.
* Passes attestations through {@link UnknownAttestationPool}.
*
* <p>Input:
*
Expand All @@ -24,46 +25,49 @@
* <ul>
* <li>instantly identified attestations
* <li>attestations identified upon a new block come
* <li>attestations with not yet imported block
* </ul>
*/
public class IdentificationProcessor extends AbstractDelegateProcessor<Object, ReceivedAttestation> {
public class IdentificationProcessor {

private final UnknownAttestationPool pool;
private final DirectProcessor<ReceivedAttestation> unknownAttestations = DirectProcessor.create();
private final FluxSink<ReceivedAttestation> unknownOut = unknownAttestations.sink();
private final OutsourcePublisher<ReceivedAttestation> identified = new OutsourcePublisher<>();
private final OutsourcePublisher<ReceivedAttestation> unknown = new OutsourcePublisher<>();

public IdentificationProcessor(UnknownAttestationPool pool) {
public IdentificationProcessor(
UnknownAttestationPool pool,
Schedulers schedulers,
Flux<ReceivedAttestation> source,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider using more general Publisher as an input. I believe Flux.from(Publisher) shortcuts the call when the parameter is Flux so this should be for free

Flux<SlotNumber> newSlots,
Flux<BeaconBlock> newImportedBlocks) {
this.pool = pool;
}

@Override
protected void hookOnNext(Object value) {
if (value.getClass().equals(BeaconBlock.class)) {
// forward attestations identified with a new block
pool.feedNewImportedBlock((BeaconBlock) value).forEach(this::publishOut);
} else if (value.getClass().equals(SlotNumber.class)) {
pool.feedNewSlot((SlotNumber) value);
} else if (value.getClass().equals(ReceivedAttestation.class)) {
if (pool.isInitialized()) {
return;
}
Scheduler scheduler =
schedulers.newSingleThreadDaemon("pool-attestation-identifier").toReactor();
newSlots.publishOn(scheduler).subscribe(this.pool::feedNewSlot);
newImportedBlocks.publishOn(scheduler).subscribe(this::hookOnNext);
source.publishOn(scheduler).subscribe(this::hookOnNext);
}

ReceivedAttestation attestation = (ReceivedAttestation) value;
private void hookOnNext(BeaconBlock block) {
pool.feedNewImportedBlock(block).forEach(identified::publishOut);
}

if (!pool.add(attestation)) {
// forward attestations not added to the pool
publishOut(attestation);
private void hookOnNext(ReceivedAttestation attestation) {
if (pool.isInitialized()) {
if (pool.add(attestation)) {
unknown.publishOut(attestation);
} else {
// expose not yet identified attestations
unknownOut.next(attestation);
identified.publishOut(attestation);
}
} else {
throw new IllegalArgumentException(
"Unsupported input type: " + value.getClass().getSimpleName());
}
}

public DirectProcessor<ReceivedAttestation> getUnknownAttestations() {
return unknownAttestations;
public OutsourcePublisher<ReceivedAttestation> getIdentified() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't expose OutsourcePublisher implementation until the caller is supposed to write items to this stream. Wouldn't returning Publisher suits?

return identified;
}

public OutsourcePublisher<ReceivedAttestation> getUnknown() {
return unknown;
}
}
Loading