diff --git a/analyzer/consensus/consensus.go b/analyzer/consensus/consensus.go index 2d3e1e28e..58d267efb 100644 --- a/analyzer/consensus/consensus.go +++ b/analyzer/consensus/consensus.go @@ -163,6 +163,9 @@ func (m *Main) Start() { backoff.Success() height++ } + m.logger.Info( + fmt.Sprintf("finished processing all blocks in the configured range [%d, %d]", + m.cfg.Range.From, m.cfg.Range.To)) } // Name returns the name of the Main. @@ -285,7 +288,7 @@ func (m *Main) processBlock(ctx context.Context, height int64) error { m.queueRuntimeRegistrations, m.queueEntityEvents, m.queueNodeEvents, - m.queueRegistryEvents, + m.queueRegistryEventInserts, } { if err := f(batch, data.RegistryData); err != nil { return err @@ -293,11 +296,12 @@ func (m *Main) processBlock(ctx context.Context, height int64) error { } for _, f := range []func(*storage.QueryBatch, *storage.StakingData) error{ - m.queueTransfers, + m.queueRegularTransfers, m.queueBurns, m.queueEscrows, m.queueAllowanceChanges, - m.queueStakingEvents, + m.queueStakingEventInserts, + m.queueDisbursementTransfers, } { if err := f(batch, data.StakingData); err != nil { return err @@ -318,14 +322,14 @@ func (m *Main) processBlock(ctx context.Context, height int64) error { m.queueExecutions, m.queueFinalizations, m.queueVotes, - m.queueGovernanceEvents, + m.queueGovernanceEventInserts, } { if err := f(batch, data.GovernanceData); err != nil { return err } } - if err := m.queueRootHashEvents(batch, data.RootHashData); err != nil { + if err := m.queueRootHashEventInserts(batch, data.RootHashData); err != nil { return err } @@ -566,7 +570,7 @@ func (m *Main) queueNodeEvents(batch *storage.QueryBatch, data *storage.Registry return nil } -func (m *Main) queueRegistryEvents(batch *storage.QueryBatch, data *storage.RegistryData) error { +func (m *Main) queueRegistryEventInserts(batch *storage.QueryBatch, data *storage.RegistryData) error { eventInsertQuery := m.qf.ConsensusEventInsertQuery() for _, event := range data.Events { @@ -592,7 +596,7 @@ func (m *Main) queueRegistryEvents(batch *storage.QueryBatch, data *storage.Regi return nil } -func (m *Main) queueRootHashEvents(batch *storage.QueryBatch, data *storage.RootHashData) error { +func (m *Main) queueRootHashEventInserts(batch *storage.QueryBatch, data *storage.RootHashData) error { eventInsertQuery := m.qf.ConsensusEventInsertQuery() for _, event := range data.Events { @@ -618,11 +622,47 @@ func (m *Main) queueRootHashEvents(batch *storage.QueryBatch, data *storage.Root return nil } -func (m *Main) queueTransfers(batch *storage.QueryBatch, data *storage.StakingData) error { +// Enum of transfer types. We single out transfers that deduct from the special +// "fee accumulator" account. These deductions/disbursements happen at the end +// of each block. However, oasis-core returns each block's events in the +// following order: BeginBlockEvents, EndBlockEvents (which include +// disbursements), TxEvents (which fill the fee accumulator). Thus, processing +// the events in order results in a temporary negative balance for the fee +// accumulator, which violates our DB checks. We therefore artificially split +// transfer events into two: accumulator disbursements, and all others. We +// process the former at the very end. +// We might be able to remove this once https://github.com/oasisprotocol/oasis-core/pull/5117 +// is deployed, making oasis-core send the "correct" event order on its own. But +// Cobalt (pre-Damask network) will never be fixed. +type TransferType string + +const ( + TransferTypeAccumulatorDisbursement TransferType = "AccumulatorDisbursement" + TransferTypeOther TransferType = "Other" +) + +func (m *Main) queueRegularTransfers(batch *storage.QueryBatch, data *storage.StakingData) error { + return m.queueTransfers(batch, data, TransferTypeOther) +} + +func (m *Main) queueDisbursementTransfers(batch *storage.QueryBatch, data *storage.StakingData) error { + return m.queueTransfers(batch, data, TransferTypeAccumulatorDisbursement) +} + +func (m *Main) queueTransfers(batch *storage.QueryBatch, data *storage.StakingData, targetType TransferType) error { senderUpdateQuery := m.qf.ConsensusSenderUpdateQuery() receiverUpsertQuery := m.qf.ConsensusReceiverUpdateQuery() for _, transfer := range data.Transfers { + // Filter out transfers that are not of the target type. + typ := TransferTypeOther // type of the current transfer + if transfer.From == staking.FeeAccumulatorAddress { + typ = TransferTypeAccumulatorDisbursement + } + if typ != targetType { + continue + } + batch.Queue(senderUpdateQuery, transfer.From.String(), transfer.Amount.String(), @@ -753,7 +793,7 @@ func (m *Main) queueAllowanceChanges(batch *storage.QueryBatch, data *storage.St return nil } -func (m *Main) queueStakingEvents(batch *storage.QueryBatch, data *storage.StakingData) error { +func (m *Main) queueStakingEventInserts(batch *storage.QueryBatch, data *storage.StakingData) error { eventInsertQuery := m.qf.ConsensusEventInsertQuery() for _, event := range data.Events { @@ -894,7 +934,7 @@ func (m *Main) queueVotes(batch *storage.QueryBatch, data *storage.GovernanceDat return nil } -func (m *Main) queueGovernanceEvents(batch *storage.QueryBatch, data *storage.GovernanceData) error { +func (m *Main) queueGovernanceEventInserts(batch *storage.QueryBatch, data *storage.GovernanceData) error { eventInsertQuery := m.qf.ConsensusEventInsertQuery() for _, event := range data.Events {