Skip to content

Commit

Permalink
Merge pull request #271 from oasisprotocol/mitjat/accumulator-disburs…
Browse files Browse the repository at this point in the history
…ements-last

bugfix: Process fee accumulator disbursements last
  • Loading branch information
mitjat authored Jan 5, 2023
2 parents 8dbb874 + 58baeaa commit 47cbf25
Showing 1 changed file with 50 additions and 10 deletions.
60 changes: 50 additions & 10 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func (m *Main) Start() {
backoff.Success()
height++
}
m.logger.Info(
fmt.Sprintf("finished processing all blocks in the configured range [%d, %d]",
m.cfg.Range.From, m.cfg.Range.To))
}

// Name returns the name of the Main.
Expand Down Expand Up @@ -285,19 +288,20 @@ func (m *Main) processBlock(ctx context.Context, height int64) error {
m.queueRuntimeRegistrations,
m.queueEntityEvents,
m.queueNodeEvents,
m.queueRegistryEvents,
m.queueRegistryEventInserts,
} {
if err := f(batch, data.RegistryData); err != nil {
return err
}
}

for _, f := range []func(*storage.QueryBatch, *storage.StakingData) error{
m.queueTransfers,
m.queueRegularTransfers,
m.queueBurns,
m.queueEscrows,
m.queueAllowanceChanges,
m.queueStakingEvents,
m.queueStakingEventInserts,
m.queueDisbursementTransfers,
} {
if err := f(batch, data.StakingData); err != nil {
return err
Expand All @@ -318,14 +322,14 @@ func (m *Main) processBlock(ctx context.Context, height int64) error {
m.queueExecutions,
m.queueFinalizations,
m.queueVotes,
m.queueGovernanceEvents,
m.queueGovernanceEventInserts,
} {
if err := f(batch, data.GovernanceData); err != nil {
return err
}
}

if err := m.queueRootHashEvents(batch, data.RootHashData); err != nil {
if err := m.queueRootHashEventInserts(batch, data.RootHashData); err != nil {
return err
}

Expand Down Expand Up @@ -566,7 +570,7 @@ func (m *Main) queueNodeEvents(batch *storage.QueryBatch, data *storage.Registry
return nil
}

func (m *Main) queueRegistryEvents(batch *storage.QueryBatch, data *storage.RegistryData) error {
func (m *Main) queueRegistryEventInserts(batch *storage.QueryBatch, data *storage.RegistryData) error {
eventInsertQuery := m.qf.ConsensusEventInsertQuery()

for _, event := range data.Events {
Expand All @@ -592,7 +596,7 @@ func (m *Main) queueRegistryEvents(batch *storage.QueryBatch, data *storage.Regi
return nil
}

func (m *Main) queueRootHashEvents(batch *storage.QueryBatch, data *storage.RootHashData) error {
func (m *Main) queueRootHashEventInserts(batch *storage.QueryBatch, data *storage.RootHashData) error {
eventInsertQuery := m.qf.ConsensusEventInsertQuery()

for _, event := range data.Events {
Expand All @@ -618,11 +622,47 @@ func (m *Main) queueRootHashEvents(batch *storage.QueryBatch, data *storage.Root
return nil
}

func (m *Main) queueTransfers(batch *storage.QueryBatch, data *storage.StakingData) error {
// Enum of transfer types. We single out transfers that deduct from the special
// "fee accumulator" account. These deductions/disbursements happen at the end
// of each block. However, oasis-core returns each block's events in the
// following order: BeginBlockEvents, EndBlockEvents (which include
// disbursements), TxEvents (which fill the fee accumulator). Thus, processing
// the events in order results in a temporary negative balance for the fee
// accumulator, which violates our DB checks. We therefore artificially split
// transfer events into two: accumulator disbursements, and all others. We
// process the former at the very end.
// We might be able to remove this once https://github.com/oasisprotocol/oasis-core/pull/5117
// is deployed, making oasis-core send the "correct" event order on its own. But
// Cobalt (pre-Damask network) will never be fixed.
type TransferType string

const (
TransferTypeAccumulatorDisbursement TransferType = "AccumulatorDisbursement"
TransferTypeOther TransferType = "Other"
)

func (m *Main) queueRegularTransfers(batch *storage.QueryBatch, data *storage.StakingData) error {
return m.queueTransfers(batch, data, TransferTypeOther)
}

func (m *Main) queueDisbursementTransfers(batch *storage.QueryBatch, data *storage.StakingData) error {
return m.queueTransfers(batch, data, TransferTypeAccumulatorDisbursement)
}

func (m *Main) queueTransfers(batch *storage.QueryBatch, data *storage.StakingData, targetType TransferType) error {
senderUpdateQuery := m.qf.ConsensusSenderUpdateQuery()
receiverUpsertQuery := m.qf.ConsensusReceiverUpdateQuery()

for _, transfer := range data.Transfers {
// Filter out transfers that are not of the target type.
typ := TransferTypeOther // type of the current transfer
if transfer.From == staking.FeeAccumulatorAddress {
typ = TransferTypeAccumulatorDisbursement
}
if typ != targetType {
continue
}

batch.Queue(senderUpdateQuery,
transfer.From.String(),
transfer.Amount.String(),
Expand Down Expand Up @@ -753,7 +793,7 @@ func (m *Main) queueAllowanceChanges(batch *storage.QueryBatch, data *storage.St
return nil
}

func (m *Main) queueStakingEvents(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *Main) queueStakingEventInserts(batch *storage.QueryBatch, data *storage.StakingData) error {
eventInsertQuery := m.qf.ConsensusEventInsertQuery()

for _, event := range data.Events {
Expand Down Expand Up @@ -894,7 +934,7 @@ func (m *Main) queueVotes(batch *storage.QueryBatch, data *storage.GovernanceDat
return nil
}

func (m *Main) queueGovernanceEvents(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *Main) queueGovernanceEventInserts(batch *storage.QueryBatch, data *storage.GovernanceData) error {
eventInsertQuery := m.qf.ConsensusEventInsertQuery()

for _, event := range data.Events {
Expand Down

0 comments on commit 47cbf25

Please sign in to comment.