-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Attestation pool take 1: interop #176
Draft
mkalinin
wants to merge
64
commits into
interop
Choose a base branch
from
feature/attestation-pool
base: interop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 1 commit
Commits
Show all changes
64 commits
Select commit
Hold shift + click to select a range
69eef6f
Initial commit of the attestation pool feature
mkalinin d09e87e
Update signature verification in the pool
mkalinin 47023da
Rework reactor logic in attestation pool
mkalinin 771fe79
Instantiate attestation pool
mkalinin 4a039bb
Check isInitialized in attestation processors if needed
mkalinin da9fb0e
Add javadocs and polish attestation pool impl
mkalinin 548625c
Continue to polish and document attestation pool
mkalinin 19b7403
Get rid of redundant Input class
mkalinin 4494576
Simplify name of processors in attestaiton pool
mkalinin 817abdd
Add a javadoc to InMemoryAttestationPool class
mkalinin faf766b
Add attestation pool diagram
mkalinin e9182d0
Update pool diagram
mkalinin cb4130e
Rework attestation pool processors
mkalinin df528e5
JUNIT5: add. Add test draft
eustimenko a1219ba
Create attestation-pool at test
eustimenko d394a3b
add reactor tests
eustimenko 23f74e2
pool-test: add verify blockchain creation
eustimenko 5a47137
pool-test: add verify ReceivedAttestation creation
eustimenko 1ea3c89
pool-test: verify SlotsNumber creation
eustimenko 037ba11
pool-test: a little refactroing
eustimenko 0b1e0f1
pool-test: fix parent block chain
eustimenko 0aff0ce
pool-test: add empty checkpoint publisher
eustimenko 7138a49
test: fix NPE to create UnknownAttestationPool
eustimenko 5208599
test: correct verification
eustimenko b65b8cf
Switch to rocksDB JNI 6.2.2 since it includes compression codecs on W…
ericsson49 5339f80
Configuration of eth1_block_hash value for EmulatorContract added.
ericsson49 8bbaacd
Mocked start key pairs generation implemented.
ericsson49 5020f61
Withdrawal credentials for mocked start generation implemented.
ericsson49 11b7521
Added a command line option to specify yaml spec constants file (simp…
ericsson49 bfabbea
Modified default config to match interop settings.
ericsson49 6c2e407
For interop, MAX_EFFECTIVE_BALANCE should be used as a balance value.
ericsson49 9a87620
Use BCParameters.ORDER to initialize SimulationKeyPairGenerator#CURVE…
mkalinin b84de20
test: draft commit
eustimenko e1f2b1e
test: fix check TimeFrameFilter
eustimenko ba01970
Implement simple suboptimal attestation churn
mkalinin 2db4a97
Verify attestations against state before aggregation
mkalinin f9edbd2
test: change out to SimpleProcessor
eustimenko e669476
Update attestation pool with a bunch of fixes
mkalinin 87676b1
Merge branch 'feature/attestation-pool' of github.com:harmony-dev/bea…
mkalinin 26da60d
Use SimpleProcessor all over attestation pool
mkalinin c358638
Merge branch 'develop' into feature/attestation-pool
mkalinin 49e95b4
Merge followups
mkalinin 86813ff
Integrate attestation pool into simulator
mkalinin c00918e
pool: test valid TimeProcessor
eustimenko 11bb800
pool: test valid SanityProcessor
eustimenko 5e0d7bd
pool: correct testing valid/invalid SanityProcessor
eustimenko 6f46160
pool: test DoubleWorkProcessor
eustimenko 5ab558c
pool: test SignatureEncodingChecker
eustimenko 441dc74
Merge branch 'feature/attestation-pool' of github.com:harmony-dev/bea…
mkalinin f4ba71f
pool: test processors by one test
eustimenko 5ecceca
pool: SignatureEncodingProcessorTest
eustimenko eda3012
test: correct test attestation to be valid
eustimenko 85b5f68
test: IdentificationProcessor
eustimenko a1c2751
test: initialize tuple storage
eustimenko 01c0c7c
test: remove inmemory attestation pool test
eustimenko 33f0629
Modify update finality to comply with the fork choice spec.
ericsson49 94cd7be
Modify get_head to comply with the fork_choice spec: add tie break.
ericsson49 07d2003
attestation-pool: update eth2.0-spec-tests
eustimenko 4c6541d
Modify update finality to comply with the fork choice spec.
ericsson49 67541c0
Modify get_head to comply with the fork_choice spec: add tie break.
ericsson49 122e9af
Merge remote-tracking branch 'origin/fix/fork-choice-issues' into fix…
ericsson49 1bc6ce9
Check that BytesValue are compared the same way in Java and in python3
ericsson49 cf24e15
Replace string comparison with bytes' in get_winning_crosslink_and_at…
mkalinin 8d653af
Merge pull request #199 from harmony-dev/fix/fork-choice-issues
mkalinin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
24 changes: 19 additions & 5 deletions
24
chain/src/main/java/org/ethereum/beacon/chain/pool/reactor/ChurnProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,27 @@ | ||
package org.ethereum.beacon.chain.pool.reactor; | ||
|
||
import org.ethereum.beacon.chain.pool.ReceivedAttestation; | ||
import org.ethereum.beacon.chain.pool.churn.OffChainAggregates; | ||
import org.ethereum.beacon.stream.AbstractDelegateProcessor; | ||
import org.ethereum.beacon.core.BeaconBlock; | ||
import org.ethereum.beacon.core.types.SlotNumber; | ||
import org.ethereum.beacon.schedulers.Schedulers; | ||
import org.ethereum.beacon.stream.OutsourcePublisher; | ||
import reactor.core.CoreSubscriber; | ||
import reactor.core.publisher.Flux; | ||
|
||
public class ChurnProcessor | ||
extends AbstractDelegateProcessor<Object, OffChainAggregates> { | ||
public class ChurnProcessor extends Flux<OffChainAggregates> { | ||
|
||
@Override | ||
protected void hookOnNext(Object value) { | ||
private final OutsourcePublisher<OffChainAggregates> out = new OutsourcePublisher<>(); | ||
|
||
public ChurnProcessor( | ||
Schedulers schedulers, Flux<BeaconBlock> chainHeads, Flux<SlotNumber> newSlots) {} | ||
|
||
protected void hookOnNext(ReceivedAttestation value) { | ||
// TODO implement | ||
} | ||
|
||
@Override | ||
public void subscribe(CoreSubscriber<? super OffChainAggregates> actual) { | ||
out.subscribe(actual); | ||
} | ||
} |
51 changes: 51 additions & 0 deletions
51
chain/src/main/java/org/ethereum/beacon/chain/pool/reactor/DoubleWorkProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package org.ethereum.beacon.chain.pool.reactor; | ||
|
||
import org.ethereum.beacon.chain.pool.ReceivedAttestation; | ||
import org.ethereum.beacon.chain.pool.registry.ProcessedAttestations; | ||
import org.ethereum.beacon.schedulers.Schedulers; | ||
import org.ethereum.beacon.stream.OutsourcePublisher; | ||
import reactor.core.CoreSubscriber; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.scheduler.Scheduler; | ||
|
||
/** | ||
* Passes attestations through {@link ProcessedAttestations} filter. | ||
* | ||
* <p>Input: | ||
* | ||
* <ul> | ||
* <li>attestations | ||
* </ul> | ||
* | ||
* <p>Output: | ||
* | ||
* <ul> | ||
* <li>Not yet processed attestations | ||
* </ul> | ||
*/ | ||
public class DoubleWorkProcessor extends Flux<ReceivedAttestation> { | ||
|
||
private final ProcessedAttestations processedAttestations; | ||
private final OutsourcePublisher<ReceivedAttestation> out = new OutsourcePublisher<>(); | ||
|
||
public DoubleWorkProcessor( | ||
ProcessedAttestations processedAttestations, | ||
Schedulers schedulers, | ||
Flux<ReceivedAttestation> source) { | ||
this.processedAttestations = processedAttestations; | ||
|
||
Scheduler scheduler = schedulers.newSingleThreadDaemon("pool-double-work").toReactor(); | ||
source.publishOn(scheduler).subscribe(this::hookOnNext); | ||
} | ||
|
||
private void hookOnNext(ReceivedAttestation attestation) { | ||
if (!processedAttestations.add(attestation)) { | ||
out.publishOut(attestation); | ||
} | ||
} | ||
|
||
@Override | ||
public void subscribe(CoreSubscriber<? super ReceivedAttestation> actual) { | ||
out.subscribe(actual); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,12 +4,13 @@ | |
import org.ethereum.beacon.chain.pool.registry.UnknownAttestationPool; | ||
import org.ethereum.beacon.core.BeaconBlock; | ||
import org.ethereum.beacon.core.types.SlotNumber; | ||
import org.ethereum.beacon.stream.AbstractDelegateProcessor; | ||
import reactor.core.publisher.DirectProcessor; | ||
import reactor.core.publisher.FluxSink; | ||
import org.ethereum.beacon.schedulers.Schedulers; | ||
import org.ethereum.beacon.stream.OutsourcePublisher; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.scheduler.Scheduler; | ||
|
||
/** | ||
* Processor throttling attestations through {@link UnknownAttestationPool}. | ||
* Passes attestations through {@link UnknownAttestationPool}. | ||
* | ||
* <p>Input: | ||
* | ||
|
@@ -24,46 +25,49 @@ | |
* <ul> | ||
* <li>instantly identified attestations | ||
* <li>attestations identified upon a new block come | ||
* <li>attestations with not yet imported block | ||
* </ul> | ||
*/ | ||
public class IdentificationProcessor extends AbstractDelegateProcessor<Object, ReceivedAttestation> { | ||
public class IdentificationProcessor { | ||
|
||
private final UnknownAttestationPool pool; | ||
private final DirectProcessor<ReceivedAttestation> unknownAttestations = DirectProcessor.create(); | ||
private final FluxSink<ReceivedAttestation> unknownOut = unknownAttestations.sink(); | ||
private final OutsourcePublisher<ReceivedAttestation> identified = new OutsourcePublisher<>(); | ||
private final OutsourcePublisher<ReceivedAttestation> unknown = new OutsourcePublisher<>(); | ||
|
||
public IdentificationProcessor(UnknownAttestationPool pool) { | ||
public IdentificationProcessor( | ||
UnknownAttestationPool pool, | ||
Schedulers schedulers, | ||
Flux<ReceivedAttestation> source, | ||
Flux<SlotNumber> newSlots, | ||
Flux<BeaconBlock> newImportedBlocks) { | ||
this.pool = pool; | ||
} | ||
|
||
@Override | ||
protected void hookOnNext(Object value) { | ||
if (value.getClass().equals(BeaconBlock.class)) { | ||
// forward attestations identified with a new block | ||
pool.feedNewImportedBlock((BeaconBlock) value).forEach(this::publishOut); | ||
} else if (value.getClass().equals(SlotNumber.class)) { | ||
pool.feedNewSlot((SlotNumber) value); | ||
} else if (value.getClass().equals(ReceivedAttestation.class)) { | ||
if (pool.isInitialized()) { | ||
return; | ||
} | ||
Scheduler scheduler = | ||
schedulers.newSingleThreadDaemon("pool-attestation-identifier").toReactor(); | ||
newSlots.publishOn(scheduler).subscribe(this.pool::feedNewSlot); | ||
newImportedBlocks.publishOn(scheduler).subscribe(this::hookOnNext); | ||
source.publishOn(scheduler).subscribe(this::hookOnNext); | ||
} | ||
|
||
ReceivedAttestation attestation = (ReceivedAttestation) value; | ||
private void hookOnNext(BeaconBlock block) { | ||
pool.feedNewImportedBlock(block).forEach(identified::publishOut); | ||
} | ||
|
||
if (!pool.add(attestation)) { | ||
// forward attestations not added to the pool | ||
publishOut(attestation); | ||
private void hookOnNext(ReceivedAttestation attestation) { | ||
if (pool.isInitialized()) { | ||
if (pool.add(attestation)) { | ||
unknown.publishOut(attestation); | ||
} else { | ||
// expose not yet identified attestations | ||
unknownOut.next(attestation); | ||
identified.publishOut(attestation); | ||
} | ||
} else { | ||
throw new IllegalArgumentException( | ||
"Unsupported input type: " + value.getClass().getSimpleName()); | ||
} | ||
} | ||
|
||
public DirectProcessor<ReceivedAttestation> getUnknownAttestations() { | ||
return unknownAttestations; | ||
public OutsourcePublisher<ReceivedAttestation> getIdentified() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't expose |
||
return identified; | ||
} | ||
|
||
public OutsourcePublisher<ReceivedAttestation> getUnknown() { | ||
return unknown; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider using more general
Publisher
as an input. I believeFlux.from(Publisher)
shortcuts the call when the parameter isFlux
so this should be for free