Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Attestation pool take 1: interop #176

Draft
wants to merge 64 commits into
base: interop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
69eef6f
Initial commit of the attestation pool feature
mkalinin Aug 22, 2019
d09e87e
Update signature verification in the pool
mkalinin Aug 23, 2019
47023da
Rework reactor logic in attestation pool
mkalinin Aug 27, 2019
771fe79
Instantiate attestation pool
mkalinin Aug 27, 2019
4a039bb
Check isInitialized in attestation processors if needed
mkalinin Aug 28, 2019
da9fb0e
Add javadocs and polish attestation pool impl
mkalinin Aug 28, 2019
548625c
Continue to polish and document attestation pool
mkalinin Aug 28, 2019
19b7403
Get rid of redundant Input class
mkalinin Aug 28, 2019
4494576
Simplify name of processors in attestaiton pool
mkalinin Aug 28, 2019
817abdd
Add a javadoc to InMemoryAttestationPool class
mkalinin Aug 28, 2019
faf766b
Add attestation pool diagram
mkalinin Aug 28, 2019
e9182d0
Update pool diagram
mkalinin Aug 28, 2019
cb4130e
Rework attestation pool processors
mkalinin Aug 30, 2019
df528e5
JUNIT5: add. Add test draft
eustimenko Sep 2, 2019
a1219ba
Create attestation-pool at test
eustimenko Sep 2, 2019
d394a3b
add reactor tests
eustimenko Sep 2, 2019
23f74e2
pool-test: add verify blockchain creation
eustimenko Sep 2, 2019
5a47137
pool-test: add verify ReceivedAttestation creation
eustimenko Sep 2, 2019
1ea3c89
pool-test: verify SlotsNumber creation
eustimenko Sep 2, 2019
037ba11
pool-test: a little refactroing
eustimenko Sep 2, 2019
0b1e0f1
pool-test: fix parent block chain
eustimenko Sep 2, 2019
0aff0ce
pool-test: add empty checkpoint publisher
eustimenko Sep 2, 2019
7138a49
test: fix NPE to create UnknownAttestationPool
eustimenko Sep 3, 2019
5208599
test: correct verification
eustimenko Sep 4, 2019
b65b8cf
Switch to rocksDB JNI 6.2.2 since it includes compression codecs on W…
ericsson49 Aug 29, 2019
5339f80
Configuration of eth1_block_hash value for EmulatorContract added.
ericsson49 Aug 29, 2019
8bbaacd
Mocked start key pairs generation implemented.
ericsson49 Aug 29, 2019
5020f61
Withdrawal credentials for mocked start generation implemented.
ericsson49 Aug 29, 2019
11b7521
Added a command line option to specify yaml spec constants file (simp…
ericsson49 Aug 29, 2019
bfabbea
Modified default config to match interop settings.
ericsson49 Aug 29, 2019
6c2e407
For interop, MAX_EFFECTIVE_BALANCE should be used as a balance value.
ericsson49 Aug 29, 2019
9a87620
Use BCParameters.ORDER to initialize SimulationKeyPairGenerator#CURVE…
mkalinin Aug 30, 2019
b84de20
test: draft commit
eustimenko Sep 5, 2019
e1f2b1e
test: fix check TimeFrameFilter
eustimenko Sep 5, 2019
ba01970
Implement simple suboptimal attestation churn
mkalinin Sep 5, 2019
2db4a97
Verify attestations against state before aggregation
mkalinin Sep 5, 2019
f9edbd2
test: change out to SimpleProcessor
eustimenko Sep 5, 2019
e669476
Update attestation pool with a bunch of fixes
mkalinin Sep 7, 2019
87676b1
Merge branch 'feature/attestation-pool' of github.com:harmony-dev/bea…
mkalinin Sep 7, 2019
26da60d
Use SimpleProcessor all over attestation pool
mkalinin Sep 7, 2019
c358638
Merge branch 'develop' into feature/attestation-pool
mkalinin Sep 7, 2019
49e95b4
Merge followups
mkalinin Sep 7, 2019
86813ff
Integrate attestation pool into simulator
mkalinin Sep 8, 2019
c00918e
pool: test valid TimeProcessor
eustimenko Sep 10, 2019
11bb800
pool: test valid SanityProcessor
eustimenko Sep 10, 2019
5e0d7bd
pool: correct testing valid/invalid SanityProcessor
eustimenko Sep 10, 2019
6f46160
pool: test DoubleWorkProcessor
eustimenko Sep 10, 2019
5ab558c
pool: test SignatureEncodingChecker
eustimenko Sep 10, 2019
441dc74
Merge branch 'feature/attestation-pool' of github.com:harmony-dev/bea…
mkalinin Sep 11, 2019
f4ba71f
pool: test processors by one test
eustimenko Sep 11, 2019
5ecceca
pool: SignatureEncodingProcessorTest
eustimenko Sep 11, 2019
eda3012
test: correct test attestation to be valid
eustimenko Sep 12, 2019
85b5f68
test: IdentificationProcessor
eustimenko Sep 12, 2019
a1c2751
test: initialize tuple storage
eustimenko Sep 17, 2019
01c0c7c
test: remove inmemory attestation pool test
eustimenko Sep 18, 2019
33f0629
Modify update finality to comply with the fork choice spec.
ericsson49 Sep 18, 2019
94cd7be
Modify get_head to comply with the fork_choice spec: add tie break.
ericsson49 Sep 18, 2019
07d2003
attestation-pool: update eth2.0-spec-tests
eustimenko Sep 19, 2019
4c6541d
Modify update finality to comply with the fork choice spec.
ericsson49 Sep 18, 2019
67541c0
Modify get_head to comply with the fork_choice spec: add tie break.
ericsson49 Sep 18, 2019
122e9af
Merge remote-tracking branch 'origin/fix/fork-choice-issues' into fix…
ericsson49 Sep 19, 2019
1bc6ce9
Check that BytesValue are compared the same way in Java and in python3
ericsson49 Sep 20, 2019
cf24e15
Replace string comparison with bytes' in get_winning_crosslink_and_at…
mkalinin Sep 23, 2019
8d653af
Merge pull request #199 from harmony-dev/fix/fork-choice-issues
mkalinin Sep 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {

implementation 'com.google.guava:guava'
implementation 'io.projectreactor:reactor-core'
api "org.apache.commons:commons-collections4"

testImplementation 'org.mockito:mockito-core'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package org.ethereum.beacon.chain.pool;

import java.time.Duration;
import org.ethereum.beacon.chain.BeaconChain;
import org.ethereum.beacon.chain.pool.checker.SanityChecker;
import org.ethereum.beacon.chain.pool.checker.SignatureEncodingChecker;
import org.ethereum.beacon.chain.pool.checker.TimeFrameFilter;
import org.ethereum.beacon.chain.pool.churn.OffChainAggregates;
import org.ethereum.beacon.chain.pool.registry.ProcessedAttestations;
import org.ethereum.beacon.chain.pool.registry.UnknownAttestationPool;
import org.ethereum.beacon.chain.pool.verifier.AttestationVerifier;
import org.ethereum.beacon.chain.pool.verifier.BatchVerifier;
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.transition.EmptySlotTransition;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.state.Checkpoint;
import org.ethereum.beacon.core.types.EpochNumber;
import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.schedulers.Schedulers;
import org.reactivestreams.Publisher;

/**
* Attestation pool API.
*
* <p>Along with {@link BeaconChain} attestation pool is one of the central components of the Beacon
* chain client. Its main responsibilities are to verify attestation coming from the wire and
* accumulate those of them which are not yet included on chain.
*
* <p>A list of attestation pool clients:
*
* <ul>
* <li>Wire - valid attestations should be further propagated; peers sent attestations that are
* eventually invalid must be dropped.
* <li>Fork choice - LMD GHOST is driven by attestations received from the wire even if they are
* not yet on chain.
* <li>Validator - attestations not yet included on chain should be included into a new block
* produced by proposer.
* </ul>
*/
public interface AttestationPool {

/** A number of threads in the executor processing attestation pool. */
int MAX_THREADS = 32;

/** Discard attestations with target epoch greater than current epoch plus this number. */
EpochNumber MAX_ATTESTATION_LOOKAHEAD = EpochNumber.of(1);

/**
* Max number of attestations kept by processed attestations registry. An entry of this registry
* should be represented by a hash of registered attestation.
*/
int MAX_PROCESSED_ATTESTATIONS = 1_000_000;

/** Max number of attestations made to not yet known block that could be kept in memory. */
int MAX_UNKNOWN_ATTESTATIONS = 100_000;

/** Max size of a buffer that collects attestations before passing them on the main verifier. */
int VERIFIER_BUFFER_SIZE = 10_000;

/** A throttling interval for verifier buffer. */
Duration VERIFIER_INTERVAL = Duration.ofMillis(50);

/**
* Valid attestations publisher.
*
* @return a publisher.
*/
Publisher<ReceivedAttestation> getValid();

/**
* Invalid attestations publisher.
*
* @return a publisher.
*/
Publisher<ReceivedAttestation> getInvalid();

/**
* Publishes attestations which block is not yet a known block.
*
* <p>These attestations should be passed to a wire module in order to request a block.
*
* @return a publisher.
*/
Publisher<ReceivedAttestation> getUnknownAttestations();

/**
* Publishes aggregated attestations that are not yet included on chain.
*
* <p>It should be a source of attestations for block proposer.
*
* @return a publisher.
*/
Publisher<OffChainAggregates> getAggregates();

/** Launches the pool. */
void start();

static AttestationPool create(
Publisher<ReceivedAttestation> source,
Publisher<SlotNumber> newSlots,
Publisher<Checkpoint> finalizedCheckpoints,
Publisher<BeaconBlock> importedBlocks,
Publisher<BeaconBlock> chainHeads,
Schedulers schedulers,
BeaconChainSpec spec,
BeaconChainStorage storage,
EmptySlotTransition emptySlotTransition) {

TimeFrameFilter timeFrameFilter = new TimeFrameFilter(spec, MAX_ATTESTATION_LOOKAHEAD);
SanityChecker sanityChecker = new SanityChecker(spec);
SignatureEncodingChecker encodingChecker = new SignatureEncodingChecker();
ProcessedAttestations processedFilter =
new ProcessedAttestations(spec::hash_tree_root, MAX_PROCESSED_ATTESTATIONS);
UnknownAttestationPool unknownAttestationPool =
new UnknownAttestationPool(
storage.getBlockStorage(), spec, MAX_ATTESTATION_LOOKAHEAD, MAX_UNKNOWN_ATTESTATIONS);
BatchVerifier batchVerifier =
new AttestationVerifier(storage.getTupleStorage(), spec, emptySlotTransition);

return new InMemoryAttestationPool(
source,
newSlots,
finalizedCheckpoints,
importedBlocks,
chainHeads,
schedulers,
timeFrameFilter,
sanityChecker,
encodingChecker,
processedFilter,
unknownAttestationPool,
batchVerifier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.ethereum.beacon.chain.pool;

/** A simple DTO that carries an attestation and a result of some check run against it. */
public class CheckedAttestation {
private final boolean passed;
private final ReceivedAttestation attestation;

public CheckedAttestation(boolean passed, ReceivedAttestation attestation) {
this.passed = passed;
this.attestation = attestation;
}

public boolean isPassed() {
return passed;
}

public ReceivedAttestation getAttestation() {
return attestation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package org.ethereum.beacon.chain.pool;

import java.util.List;
import org.ethereum.beacon.chain.pool.checker.SanityChecker;
import org.ethereum.beacon.chain.pool.checker.SignatureEncodingChecker;
import org.ethereum.beacon.chain.pool.checker.TimeFrameFilter;
import org.ethereum.beacon.chain.pool.churn.OffChainAggregates;
import org.ethereum.beacon.chain.pool.reactor.ChurnProcessor;
import org.ethereum.beacon.chain.pool.reactor.DoubleWorkProcessor;
import org.ethereum.beacon.chain.pool.reactor.IdentificationProcessor;
import org.ethereum.beacon.chain.pool.reactor.SanityProcessor;
import org.ethereum.beacon.chain.pool.reactor.SignatureEncodingProcessor;
import org.ethereum.beacon.chain.pool.reactor.TimeProcessor;
import org.ethereum.beacon.chain.pool.reactor.VerificationProcessor;
import org.ethereum.beacon.chain.pool.registry.ProcessedAttestations;
import org.ethereum.beacon.chain.pool.registry.UnknownAttestationPool;
import org.ethereum.beacon.chain.pool.verifier.BatchVerifier;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.state.Checkpoint;
import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.schedulers.Schedulers;
import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;

/**
* An implementation of attestation pool based on <a href="https://projectreactor.io/">Reactor</a>
* library, one of the implementation of reactive streams.
*/
public class InMemoryAttestationPool implements AttestationPool {

private final Publisher<ReceivedAttestation> source;
private final Publisher<SlotNumber> newSlots;
private final Publisher<Checkpoint> finalizedCheckpoints;
private final Publisher<BeaconBlock> importedBlocks;
private final Publisher<BeaconBlock> chainHeads;
private final Schedulers schedulers;

private final TimeFrameFilter timeFrameFilter;
private final SanityChecker sanityChecker;
private final SignatureEncodingChecker encodingChecker;
private final ProcessedAttestations processedFilter;
private final UnknownAttestationPool unknownPool;
private final BatchVerifier verifier;

private final DirectProcessor<ReceivedAttestation> invalidAttestations = DirectProcessor.create();
private final DirectProcessor<ReceivedAttestation> validAttestations = DirectProcessor.create();
private final DirectProcessor<ReceivedAttestation> unknownAttestations = DirectProcessor.create();
private final DirectProcessor<OffChainAggregates> offChainAggregates = DirectProcessor.create();

public InMemoryAttestationPool(
Publisher<ReceivedAttestation> source,
Publisher<SlotNumber> newSlots,
Publisher<Checkpoint> finalizedCheckpoints,
Publisher<BeaconBlock> importedBlocks,
Publisher<BeaconBlock> chainHeads,
Schedulers schedulers,
TimeFrameFilter timeFrameFilter,
SanityChecker sanityChecker,
SignatureEncodingChecker encodingChecker,
ProcessedAttestations processedFilter,
UnknownAttestationPool unknownPool,
BatchVerifier batchVerifier) {
this.source = source;
this.newSlots = newSlots;
this.finalizedCheckpoints = finalizedCheckpoints;
this.importedBlocks = importedBlocks;
this.chainHeads = chainHeads;
this.schedulers = schedulers;
this.timeFrameFilter = timeFrameFilter;
this.sanityChecker = sanityChecker;
this.encodingChecker = encodingChecker;
this.processedFilter = processedFilter;
this.unknownPool = unknownPool;
this.verifier = batchVerifier;
}

@Override
public void start() {
Scheduler parallelExecutor =
schedulers
.newParallelDaemon("attestation-pool-%d", AttestationPool.MAX_THREADS)
.toReactor();

// create sources
Flux<ReceivedAttestation> sourceFx = Flux.from(source);
Flux<SlotNumber> newSlotsFx = Flux.from(newSlots);
Flux<Checkpoint> finalizedCheckpointsFx = Flux.from(finalizedCheckpoints);
Flux<BeaconBlock> importedBlocksFx = Flux.from(importedBlocks);
Flux<BeaconBlock> chainHeadsFx = Flux.from(chainHeads);

// check time frames
TimeProcessor timeProcessor =
new TimeProcessor(
timeFrameFilter, schedulers, sourceFx, finalizedCheckpointsFx, newSlotsFx);

// run sanity check
SanityProcessor sanityProcessor =
new SanityProcessor(sanityChecker, schedulers, timeProcessor, finalizedCheckpointsFx);

// discard already processed attestations
DoubleWorkProcessor doubleWorkProcessor =
new DoubleWorkProcessor(processedFilter, schedulers, sanityProcessor.getValid());

// check signature encoding
SignatureEncodingProcessor encodingProcessor =
new SignatureEncodingProcessor(
encodingChecker, doubleWorkProcessor.publishOn(parallelExecutor));

// identify attestation target
IdentificationProcessor identificationProcessor =
new IdentificationProcessor(
unknownPool, schedulers, encodingProcessor.getValid(), newSlotsFx, importedBlocksFx);

// verify attestations
Flux<List<ReceivedAttestation>> verificationThrottle =
identificationProcessor
.getIdentified()
.publishOn(parallelExecutor)
.bufferTimeout(VERIFIER_BUFFER_SIZE, VERIFIER_INTERVAL);
VerificationProcessor verificationProcessor =
new VerificationProcessor(verifier, verificationThrottle);

// feed churn
ChurnProcessor churnProcessor = new ChurnProcessor(schedulers, chainHeadsFx, newSlotsFx);

Scheduler outScheduler = schedulers.events().toReactor();
// expose valid attestations
verificationProcessor.getValid().publishOn(outScheduler).subscribe(validAttestations);
// expose not yet identified
identificationProcessor.getUnknown().publishOn(outScheduler).subscribe(unknownAttestations);
// expose aggregates
churnProcessor.publishOn(outScheduler).subscribe(offChainAggregates);
// expose invalid attestations
Flux.merge(
sanityProcessor.getInvalid(),
encodingProcessor.getInvalid(),
verificationProcessor.getInvalid())
.publishOn(outScheduler)
.subscribe(invalidAttestations);
}

@Override
public Publisher<ReceivedAttestation> getValid() {
return validAttestations;
}

@Override
public Publisher<ReceivedAttestation> getInvalid() {
return invalidAttestations;
}

@Override
public Publisher<ReceivedAttestation> getUnknownAttestations() {
return unknownAttestations;
}

@Override
public Publisher<OffChainAggregates> getAggregates() {
return offChainAggregates;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.ethereum.beacon.chain.pool;

import org.ethereum.beacon.core.operations.Attestation;
import org.ethereum.beacon.types.p2p.NodeId;

/** An attestation received from the wire. */
public class ReceivedAttestation {

/** An id of a node sent this attestation. */
private final NodeId sender;
/** An attestation message itself. */
private final Attestation message;

public ReceivedAttestation(NodeId sender, Attestation message) {
this.sender = sender;
this.message = message;
}

public NodeId getSender() {
return sender;
}

public Attestation getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.ethereum.beacon.chain.pool;

/**
* Stateful processor.
*
* <p>A processor that requires particular inner state to be initialized before it could be safely
* called by its clients.
*
* <p>{@link #isInitialized()} method indicates whether processor has already been initialized or
* not. It's a client responsibility to check {@link #isInitialized()} result before calling to the
* instance of this processor.
*
* <p>Implementor MAY throw an {@link AssertionError} if it's been called before inner state has
* been initialised.
*/
public interface StatefulProcessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the purpose of this interface.
You either assert that a component is inited or just skip an entry if not inited. Couldn't this be the internal logic of this component?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, we have checkers which returns value of boolean type saying whether check passed or not. These checkers may have several inputs and state initialized from either one or all of those inputs. Since, check MUST returns true/false there is no room for merely discarding the value fed to a checker. We might use Optional<Boolean> for that but I am not sure that it would be a better choice.


/**
* Checks whether processor state is initialized or not.
*
* @return {@code true} if processor is ready to work, {@link false}, otherwise.
*/
boolean isInitialized();
}
Loading