Skip to content

Commit

Permalink
storage: split runtime events related accounts into a separate table
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Nov 30, 2024
1 parent 7170c28 commit 4c81c25
Show file tree
Hide file tree
Showing 16 changed files with 256 additions and 195 deletions.
6 changes: 3 additions & 3 deletions analyzer/aggregate_stats/aggregate_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,14 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) {
switch {
case err == nil:
// Continues below.
case errors.Is(pgx.ErrNoRows, err):
case errors.Is(err, pgx.ErrNoRows):
// No stats yet. Start at the earliest indexed block.
var earliestBlockTs *time.Time
earliestBlockTs, err = a.earliestBlockTs(statCtx, statsComputation.layer)
switch {
case err == nil:
latestComputed = floorWindow(earliestBlockTs)
case errors.Is(pgx.ErrNoRows, err):
case errors.Is(err, pgx.ErrNoRows):
// No data log a debug only log.
logger.Debug("no stats available yet, skipping iteration")
cancel()
Expand All @@ -230,7 +230,7 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) {
switch {
case err == nil:
// Continues below.
case errors.Is(pgx.ErrNoRows, err):
case errors.Is(err, pgx.ErrNoRows):
logger.Debug("no stats available yet, skipping iteration")
cancel()
continue
Expand Down
5 changes: 4 additions & 1 deletion analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,12 +797,15 @@ func (m *processor) queueRootHashMessageUpserts(batch *storage.QueryBatch, data
// The runtime has its own staking account, which is what
// performs these actions, e.g. when sending or receiving the
// consensus token. Register that as related to the message.
if _, err := addresses.RegisterRelatedRuntimeAddress(messageData.addressPreimages, messageData.relatedAddresses, event.RoothashExecutorCommitted.RuntimeID); err != nil {
if runtimeAddr, err := addresses.RegisterRuntimeAddress(messageData.addressPreimages, event.RoothashExecutorCommitted.RuntimeID); err != nil {
logger.Info("register runtime address failed",
"runtime_id", event.RoothashExecutorCommitted.RuntimeID,
"err", err,
)
} else {
messageData.relatedAddresses[runtimeAddr] = struct{}{}
}

for addr, preimageData := range messageData.addressPreimages {
batch.Queue(queries.AddressPreimageInsert,
addr,
Expand Down
12 changes: 8 additions & 4 deletions analyzer/consensus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData {
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Transfer.To)
to, err := addresses.FromOCSAddress(m.Staking.Transfer.To)
if err != nil {
logger.Info("register related address 'to' failed",
"message_type", messageData.messageType,
"err", err,
)
}
messageData.relatedAddresses[to] = struct{}{}
case m.Staking.Withdraw != nil:
messageData.messageType = apiTypes.RoothashMessageTypeStakingWithdraw
body, err := json.Marshal(m.Staking.Withdraw)
Expand All @@ -53,13 +54,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData {
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Withdraw.From)
from, err := addresses.FromOCSAddress(m.Staking.Withdraw.From)
if err != nil {
logger.Info("register related address 'from' failed",
"message_type", messageData.messageType,
"err", err,
)
}
messageData.relatedAddresses[from] = struct{}{}
case m.Staking.AddEscrow != nil:
messageData.messageType = apiTypes.RoothashMessageTypeStakingAddEscrow
body, err := json.Marshal(m.Staking.AddEscrow)
Expand All @@ -71,13 +73,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData {
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.AddEscrow.Account)
account, err := addresses.FromOCSAddress(m.Staking.AddEscrow.Account)
if err != nil {
logger.Info("register related address 'account' failed",
"message_type", messageData.messageType,
"err", err,
)
}
messageData.relatedAddresses[account] = struct{}{}
case m.Staking.ReclaimEscrow != nil:
messageData.messageType = apiTypes.RoothashMessageTypeStakingReclaimEscrow
body, err := json.Marshal(m.Staking.ReclaimEscrow)
Expand All @@ -89,13 +92,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData {
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.ReclaimEscrow.Account)
account, err := addresses.FromOCSAddress(m.Staking.ReclaimEscrow.Account)
if err != nil {
logger.Info("register related address 'account' failed",
"message_type", messageData.messageType,
"err", err,
)
}
messageData.relatedAddresses[account] = struct{}{}
default:
logger.Info("unhandled staking message",
"staking_message", m.Staking,
Expand Down
14 changes: 9 additions & 5 deletions analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,10 @@ var (
SELECT epochs.id, epochs.start_height, prev_epoch.validators
FROM chain.epochs as epochs
LEFT JOIN history.validators as history
ON epochs.id = history.epoch
ON epochs.id = history.epoch
LEFT JOIN chain.epochs as prev_epoch
ON epochs.id = prev_epoch.id + 1
WHERE
WHERE
history.epoch IS NULL AND
epochs.id >= $1
ORDER BY epochs.id
Expand All @@ -531,7 +531,7 @@ var (
ValidatorStakingRewardUpdate = `
UPDATE history.validators
SET staking_rewards = $3
WHERE
WHERE
id = $1 AND
epoch = $2`

Expand All @@ -545,7 +545,7 @@ var (
FROM chain.epochs AS epochs
LEFT JOIN history.validators AS history
ON epochs.id = history.epoch
WHERE
WHERE
history.epoch IS NULL AND
epochs.id >= $1`

Expand Down Expand Up @@ -636,9 +636,13 @@ var (
tx_hash = $2`

RuntimeEventInsert = `
INSERT INTO chain.runtime_events (runtime, round, tx_index, tx_hash, tx_eth_hash, timestamp, type, body, related_accounts, evm_log_name, evm_log_params, evm_log_signature)
INSERT INTO chain.runtime_events (runtime, round, event_index, tx_index, tx_hash, tx_eth_hash, timestamp, type, body, evm_log_name, evm_log_params, evm_log_signature)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`

RuntimeEventRelatedAccountsInsert = `
INSERT INTO chain.runtime_events_related_accounts (runtime, round, event_index, related_account)
SELECT $1, $2, $3, unnest($4::text[])`

// We use COALESCE here to avoid overwriting existing data with null values.
RuntimeEventEvmParsedFieldsUpdate = `
UPDATE chain.runtime_events
Expand Down
Loading

0 comments on commit 4c81c25

Please sign in to comment.