Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Attestation pool take 1: interop #176

Draft
wants to merge 64 commits into
base: interop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
69eef6f
Initial commit of the attestation pool feature
mkalinin Aug 22, 2019
d09e87e
Update signature verification in the pool
mkalinin Aug 23, 2019
47023da
Rework reactor logic in attestation pool
mkalinin Aug 27, 2019
771fe79
Instantiate attestation pool
mkalinin Aug 27, 2019
4a039bb
Check isInitialized in attestation processors if needed
mkalinin Aug 28, 2019
da9fb0e
Add javadocs and polish attestation pool impl
mkalinin Aug 28, 2019
548625c
Continue to polish and document attestation pool
mkalinin Aug 28, 2019
19b7403
Get rid of redundant Input class
mkalinin Aug 28, 2019
4494576
Simplify name of processors in attestaiton pool
mkalinin Aug 28, 2019
817abdd
Add a javadoc to InMemoryAttestationPool class
mkalinin Aug 28, 2019
faf766b
Add attestation pool diagram
mkalinin Aug 28, 2019
e9182d0
Update pool diagram
mkalinin Aug 28, 2019
cb4130e
Rework attestation pool processors
mkalinin Aug 30, 2019
df528e5
JUNIT5: add. Add test draft
eustimenko Sep 2, 2019
a1219ba
Create attestation-pool at test
eustimenko Sep 2, 2019
d394a3b
add reactor tests
eustimenko Sep 2, 2019
23f74e2
pool-test: add verify blockchain creation
eustimenko Sep 2, 2019
5a47137
pool-test: add verify ReceivedAttestation creation
eustimenko Sep 2, 2019
1ea3c89
pool-test: verify SlotsNumber creation
eustimenko Sep 2, 2019
037ba11
pool-test: a little refactroing
eustimenko Sep 2, 2019
0b1e0f1
pool-test: fix parent block chain
eustimenko Sep 2, 2019
0aff0ce
pool-test: add empty checkpoint publisher
eustimenko Sep 2, 2019
7138a49
test: fix NPE to create UnknownAttestationPool
eustimenko Sep 3, 2019
5208599
test: correct verification
eustimenko Sep 4, 2019
b65b8cf
Switch to rocksDB JNI 6.2.2 since it includes compression codecs on W…
ericsson49 Aug 29, 2019
5339f80
Configuration of eth1_block_hash value for EmulatorContract added.
ericsson49 Aug 29, 2019
8bbaacd
Mocked start key pairs generation implemented.
ericsson49 Aug 29, 2019
5020f61
Withdrawal credentials for mocked start generation implemented.
ericsson49 Aug 29, 2019
11b7521
Added a command line option to specify yaml spec constants file (simp…
ericsson49 Aug 29, 2019
bfabbea
Modified default config to match interop settings.
ericsson49 Aug 29, 2019
6c2e407
For interop, MAX_EFFECTIVE_BALANCE should be used as a balance value.
ericsson49 Aug 29, 2019
9a87620
Use BCParameters.ORDER to initialize SimulationKeyPairGenerator#CURVE…
mkalinin Aug 30, 2019
b84de20
test: draft commit
eustimenko Sep 5, 2019
e1f2b1e
test: fix check TimeFrameFilter
eustimenko Sep 5, 2019
ba01970
Implement simple suboptimal attestation churn
mkalinin Sep 5, 2019
2db4a97
Verify attestations against state before aggregation
mkalinin Sep 5, 2019
f9edbd2
test: change out to SimpleProcessor
eustimenko Sep 5, 2019
e669476
Update attestation pool with a bunch of fixes
mkalinin Sep 7, 2019
87676b1
Merge branch 'feature/attestation-pool' of github.com:harmony-dev/bea…
mkalinin Sep 7, 2019
26da60d
Use SimpleProcessor all over attestation pool
mkalinin Sep 7, 2019
c358638
Merge branch 'develop' into feature/attestation-pool
mkalinin Sep 7, 2019
49e95b4
Merge followups
mkalinin Sep 7, 2019
86813ff
Integrate attestation pool into simulator
mkalinin Sep 8, 2019
c00918e
pool: test valid TimeProcessor
eustimenko Sep 10, 2019
11bb800
pool: test valid SanityProcessor
eustimenko Sep 10, 2019
5e0d7bd
pool: correct testing valid/invalid SanityProcessor
eustimenko Sep 10, 2019
6f46160
pool: test DoubleWorkProcessor
eustimenko Sep 10, 2019
5ab558c
pool: test SignatureEncodingChecker
eustimenko Sep 10, 2019
441dc74
Merge branch 'feature/attestation-pool' of github.com:harmony-dev/bea…
mkalinin Sep 11, 2019
f4ba71f
pool: test processors by one test
eustimenko Sep 11, 2019
5ecceca
pool: SignatureEncodingProcessorTest
eustimenko Sep 11, 2019
eda3012
test: correct test attestation to be valid
eustimenko Sep 12, 2019
85b5f68
test: IdentificationProcessor
eustimenko Sep 12, 2019
a1c2751
test: initialize tuple storage
eustimenko Sep 17, 2019
01c0c7c
test: remove inmemory attestation pool test
eustimenko Sep 18, 2019
33f0629
Modify update finality to comply with the fork choice spec.
ericsson49 Sep 18, 2019
94cd7be
Modify get_head to comply with the fork_choice spec: add tie break.
ericsson49 Sep 18, 2019
07d2003
attestation-pool: update eth2.0-spec-tests
eustimenko Sep 19, 2019
4c6541d
Modify update finality to comply with the fork choice spec.
ericsson49 Sep 18, 2019
67541c0
Modify get_head to comply with the fork_choice spec: add tie break.
ericsson49 Sep 18, 2019
122e9af
Merge remote-tracking branch 'origin/fix/fork-choice-issues' into fix…
ericsson49 Sep 19, 2019
1bc6ce9
Check that BytesValue are compared the same way in Java and in python3
ericsson49 Sep 20, 2019
cf24e15
Replace string comparison with bytes' in get_winning_crosslink_and_at…
mkalinin Sep 23, 2019
8d653af
Merge pull request #199 from harmony-dev/fix/fork-choice-issues
mkalinin Sep 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions chain/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,20 @@ dependencies {

implementation 'com.google.guava:guava'
implementation 'io.projectreactor:reactor-core'
api "org.apache.commons:commons-collections4"

testImplementation 'org.mockito:mockito-core'
testCompile 'io.projectreactor:reactor-test'

// Gradle does not import test sources alongside with main sources
// use a workaround until better solution will be found
testImplementation project(':consensus').sourceSets.test.output

testImplementation 'org.junit.jupiter:junit-jupiter:5.5.1'

test {
useJUnitPlatform {
excludeTags 'FIX'
}
}
}
18 changes: 18 additions & 0 deletions chain/src/main/java/org/ethereum/beacon/chain/BeaconChain.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.ethereum.beacon.chain;

import org.ethereum.beacon.core.state.Checkpoint;
import org.reactivestreams.Publisher;

public interface BeaconChain {
Expand All @@ -15,5 +16,22 @@ public interface BeaconChain {
*/
BeaconTuple getRecentlyProcessed();

/**
* Returns the most recent justified checkpoint.
*
* @return a checkpoint.
*/
Publisher<Checkpoint> getJustifiedCheckpoints();

/**
* Returns the most recent finalized checkpoint.
*
* <p><b>Note:</b> finalized checkpoints are published by {@link #getJustifiedCheckpoints()}
* either.
*
* @return a checkpoint.
*/
Publisher<Checkpoint> getFinalizedCheckpoints();

void init();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.chain.storage.BeaconTupleStorage;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.BeaconStateEx;
import org.ethereum.beacon.consensus.BlockTransition;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.transition.EmptySlotTransition;
import org.ethereum.beacon.consensus.verifier.BeaconBlockVerifier;
import org.ethereum.beacon.consensus.verifier.BeaconStateVerifier;
import org.ethereum.beacon.consensus.verifier.VerificationResult;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.BeaconState;
import org.ethereum.beacon.core.state.Checkpoint;
import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.stream.SimpleProcessor;
Expand All @@ -36,6 +37,8 @@ public class DefaultBeaconChain implements MutableBeaconChain {
private final BeaconTupleStorage tupleStorage;

private final SimpleProcessor<BeaconTupleDetails> blockStream;
private final SimpleProcessor<Checkpoint> justifiedCheckpointStream;
private final SimpleProcessor<Checkpoint> finalizedCheckpointStream;
private final Schedulers schedulers;

private BeaconTuple recentlyProcessed;
Expand All @@ -58,6 +61,10 @@ public DefaultBeaconChain(
this.schedulers = schedulers;

blockStream = new SimpleProcessor<>(schedulers.events(), "DefaultBeaconChain.block");
justifiedCheckpointStream =
new SimpleProcessor<>(schedulers.events(), "DefaultBeaconChain.justifiedCheckpoint");
finalizedCheckpointStream =
new SimpleProcessor<>(schedulers.events(), "DefaultBeaconChain.finalizedCheckpoint");
}

@Override
Expand All @@ -66,6 +73,8 @@ public void init() {
throw new IllegalStateException("Couldn't start from empty storage");
}
this.recentlyProcessed = fetchRecentTuple();
justifiedCheckpointStream.onNext(fetchJustifiedCheckpoint());
finalizedCheckpointStream.onNext(fetchFinalizedCheckpoint());
blockStream.onNext(new BeaconTupleDetails(recentlyProcessed));
}

Expand Down Expand Up @@ -117,7 +126,7 @@ public synchronized ImportResult insert(BeaconBlock block) {

BeaconTuple newTuple = BeaconTuple.of(block, postBlockState);
tupleStorage.put(newTuple);
updateFinality(parentState, postBlockState);
updateFinality(postBlockState);

chainStorage.commit();

Expand All @@ -144,15 +153,49 @@ public BeaconTuple getRecentlyProcessed() {
return recentlyProcessed;
}

private void updateFinality(BeaconState previous, BeaconState current) {
if (!previous.getFinalizedCheckpoint().equals(current.getFinalizedCheckpoint())) {
private void updateFinality(BeaconState current) {
boolean finalizedStorageUpdated = false;
boolean justifiedStorageUpdated = false;
if (current
.getFinalizedCheckpoint()
.getEpoch()
.greater(fetchFinalizedCheckpoint().getEpoch())) {
chainStorage.getFinalizedStorage().set(current.getFinalizedCheckpoint());
finalizedStorageUpdated = true;
}
if (!previous.getCurrentJustifiedCheckpoint().equals(current.getCurrentJustifiedCheckpoint())) {
if (current
.getCurrentJustifiedCheckpoint()
.getEpoch()
.greater(fetchJustifiedCheckpoint().getEpoch())) {
chainStorage.getJustifiedStorage().set(current.getCurrentJustifiedCheckpoint());
justifiedStorageUpdated = true;
}
// publish updates after both storages have been updated
// the order can be important if a finalizedCheckpointStream subscriber will look
// into justified storage
// in general, it may be important to publish after commit has succeeded
if (finalizedStorageUpdated) {
finalizedCheckpointStream.onNext(current.getFinalizedCheckpoint());
}
if (justifiedStorageUpdated) {
justifiedCheckpointStream.onNext(current.getCurrentJustifiedCheckpoint());
}
}

private Checkpoint fetchJustifiedCheckpoint() {
return chainStorage
.getJustifiedStorage()
.get()
.orElseThrow(() -> new RuntimeException("Justified checkpoint not found"));
}

private Checkpoint fetchFinalizedCheckpoint() {
return chainStorage
.getFinalizedStorage()
.get()
.orElseThrow(() -> new RuntimeException("Finalized checkpoint not found"));
}

private BeaconStateEx pullParentState(BeaconBlock block) {
Optional<BeaconTuple> parent = tupleStorage.get(block.getParentRoot());
checkArgument(parent.isPresent(), "No parent for block %s", block);
Expand Down Expand Up @@ -187,4 +230,14 @@ private boolean rejectedByTime(BeaconBlock block) {
public Publisher<BeaconTupleDetails> getBlockStatesStream() {
return blockStream;
}

@Override
public Publisher<Checkpoint> getJustifiedCheckpoints() {
return justifiedCheckpointStream;
}

@Override
public Publisher<Checkpoint> getFinalizedCheckpoints() {
return finalizedCheckpointStream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.ethereum.beacon.chain;

import org.reactivestreams.Publisher;

public interface ForkChoiceProcessor {
Publisher<BeaconChainHead> getChainHeads();
}
183 changes: 183 additions & 0 deletions chain/src/main/java/org/ethereum/beacon/chain/LMDGhostProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package org.ethereum.beacon.chain;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.spec.ForkChoice.LatestMessage;
import org.ethereum.beacon.consensus.spec.ForkChoice.Store;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.BeaconState;
import org.ethereum.beacon.core.operations.Attestation;
import org.ethereum.beacon.core.operations.attestation.AttestationData;
import org.ethereum.beacon.core.operations.slashing.IndexedAttestation;
import org.ethereum.beacon.core.state.Checkpoint;
import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.core.types.ValidatorIndex;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.stream.SimpleProcessor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import tech.pegasys.artemis.ethereum.core.Hash32;

public class LMDGhostProcessor implements ForkChoiceProcessor {

private final int SEARCH_LIMIT = Integer.MAX_VALUE;

private final BeaconChainSpec spec;
private final BeaconChainStorage storage;
private final SimpleProcessor<BeaconChainHead> chainHeadStream;

private final Map<ValidatorIndex, LatestMessage> latestMessageStorage = new HashMap<>();
private Checkpoint justifiedCheckpoint = Checkpoint.EMPTY;
private Hash32 currentHeadRoot = Hash32.ZERO;

public LMDGhostProcessor(
BeaconChainSpec spec,
BeaconChainStorage storage,
Schedulers schedulers,
Publisher<Checkpoint> justifiedCheckpoints,
Publisher<IndexedAttestation> wireAttestations,
Publisher<? extends BeaconTuple> importedBlocks) {
this.spec = spec;
this.storage = storage;

Scheduler scheduler = schedulers.newSingleThreadDaemon("lmd-ghost-processor").toReactor();
this.chainHeadStream = new SimpleProcessor<>(scheduler, "LMDGhostProcessor.chainHeadStream");

Flux.from(justifiedCheckpoints).publishOn(scheduler).subscribe(this::onNewJustifiedCheckpoint);
Flux.from(wireAttestations).publishOn(scheduler).subscribe(this::onNewAttestation);
Flux.from(importedBlocks).publishOn(scheduler).subscribe(this::onNewImportedBlock);
}

private void onNewImportedBlock(BeaconTuple tuple) {
if (!isJustifiedAncestor(tuple.getBlock())) {
return;
}

for (Attestation attestation : tuple.getBlock().getBody().getAttestations()) {
List<ValidatorIndex> indices =
spec.get_attesting_indices(
tuple.getState(), attestation.getData(), attestation.getAggregationBits());
processAttestation(indices, attestation.getData());
}

updateHead();
}

private boolean isJustifiedAncestor(BeaconBlock block) {
// genesis shortcut
if (justifiedCheckpoint.equals(Checkpoint.EMPTY) && block.getSlot().equals(SlotNumber.ZERO)) {
return true;
}

BeaconBlock ancestor = block;
while (spec.compute_epoch_of_slot(ancestor.getSlot())
.greaterEqual(justifiedCheckpoint.getEpoch())) {
Optional<BeaconBlock> parent = storage.getBlockStorage().get(ancestor.getParentRoot());
if (!parent.isPresent()) {
return false;
}
if (parent.get().getParentRoot().equals(justifiedCheckpoint.getRoot())) {
return true;
}
ancestor = parent.get();
}

return false;
}

private void onNewAttestation(IndexedAttestation attestation) {
List<ValidatorIndex> indices = new ArrayList<>(attestation.getCustodyBit0Indices().listCopy());
indices.addAll(attestation.getCustodyBit1Indices().listCopy());
processAttestation(indices, attestation.getData());
updateHead();
}

private void processAttestation(List<ValidatorIndex> indices, AttestationData data) {
LatestMessage message =
new LatestMessage(data.getTarget().getEpoch(), data.getBeaconBlockRoot());
indices.forEach(
index -> {
latestMessageStorage.merge(
index,
message,
(oldMessage, newMessage) -> {
if (newMessage.getEpoch().greater(oldMessage.getEpoch())) {
return newMessage;
} else {
return oldMessage;
}
});
});
}

private void updateHead() {
Hash32 newHeadRoot = getHeadRoot();
if (!newHeadRoot.equals(currentHeadRoot)) {
BeaconTuple tuple = storage.getTupleStorage().get(newHeadRoot).get();
currentHeadRoot = newHeadRoot;
chainHeadStream.onNext(new BeaconChainHead(tuple));
}
}

private Hash32 getHeadRoot() {
return spec.get_head(
new Store() {

@Override
public Checkpoint getJustifiedCheckpoint() {
return storage.getJustifiedStorage().get().get();
}

@Override
public Checkpoint getFinalizedCheckpoint() {
return storage.getFinalizedStorage().get().get();
}

@Override
public Optional<BeaconBlock> getBlock(Hash32 root) {
return storage.getBlockStorage().get(root);
}

@Override
public Optional<BeaconState> getState(Hash32 root) {
return storage.getStateStorage().get(root);
}

@Override
public Optional<LatestMessage> getLatestMessage(ValidatorIndex index) {
return Optional.ofNullable(latestMessageStorage.get(index));
}

@Override
public List<Hash32> getChildren(Hash32 root) {
return storage.getBlockStorage().getChildren(root, SEARCH_LIMIT).stream()
.map(spec::signing_root)
.collect(Collectors.toList());
}
});
}

private void onNewJustifiedCheckpoint(Checkpoint checkpoint) {
if (checkpoint.getEpoch().greater(justifiedCheckpoint.getEpoch())) {
justifiedCheckpoint = checkpoint;
resetLatestMessages();
updateHead();
}
}

private void resetLatestMessages() {
latestMessageStorage.clear();
}

@Override
public Publisher<BeaconChainHead> getChainHeads() {
return chainHeadStream;
}
}
Loading