Skip to content

Commit

Permalink
Merge pull request #397 from ethpandaops/feat/cannon-block-compression
Browse files Browse the repository at this point in the history
Feat/cannon block compression
  • Loading branch information
Savid authored Oct 11, 2024
2 parents c5b5c73 + cd279a7 commit 2c1b572
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/ethpandaops/beacon v0.42.0
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756
github.com/ethpandaops/ethwallclock v0.3.0
github.com/ferranbt/fastssz v0.1.3
github.com/go-co-op/gocron v1.27.1
github.com/golang/protobuf v1.5.4
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb
Expand Down Expand Up @@ -101,7 +102,6 @@ require (
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/ferranbt/fastssz v0.1.3 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
Expand Down
33 changes: 26 additions & 7 deletions pkg/cannon/deriver/beacon/eth/v2/beacon_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v2

import (
"context"
"encoding/json"
"fmt"
"time"

Expand All @@ -14,8 +13,8 @@ import (
"github.com/ethpandaops/xatu/pkg/observability"
"github.com/ethpandaops/xatu/pkg/proto/eth"
xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1"
xatuethv2 "github.com/ethpandaops/xatu/pkg/proto/eth/v2"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
ssz "github.com/ferranbt/fastssz"
"github.com/golang/snappy"
"github.com/google/uuid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -280,7 +279,7 @@ func (b *BeaconBlockDeriver) createEventFromBlock(ctx context.Context, block *sp
},
}

additionalData, err := b.getAdditionalData(ctx, block, data)
additionalData, err := b.getAdditionalData(ctx, block)
if err != nil {
b.log.WithError(err).Error("Failed to get extra beacon block data")

Expand All @@ -294,7 +293,7 @@ func (b *BeaconBlockDeriver) createEventFromBlock(ctx context.Context, block *sp
return decoratedEvent, nil
}

func (b *BeaconBlockDeriver) getAdditionalData(_ context.Context, block *spec.VersionedSignedBeaconBlock, data *xatuethv2.EventBlockV2) (*xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data, error) {
func (b *BeaconBlockDeriver) getAdditionalData(_ context.Context, block *spec.VersionedSignedBeaconBlock) (*xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data, error) {
extra := &xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data{}

slotI, err := block.Slot()
Expand Down Expand Up @@ -332,13 +331,18 @@ func (b *BeaconBlockDeriver) getAdditionalData(_ context.Context, block *spec.Ve
}
}

dataAsJSON, err := json.Marshal(block)
blockMessage, err := getBlockMessage(block)
if err != nil {
return nil, err
}

dataSize := len(dataAsJSON)
compressedData := snappy.Encode(nil, dataAsJSON)
sszData, err := ssz.MarshalSSZ(blockMessage)
if err != nil {
return nil, err
}

dataSize := len(sszData)
compressedData := snappy.Encode(nil, sszData)
compressedDataSize := len(compressedData)

blockRoot, err := block.Root()
Expand Down Expand Up @@ -387,3 +391,18 @@ func (b *BeaconBlockDeriver) getAdditionalData(_ context.Context, block *spec.Ve

return extra, nil
}

func getBlockMessage(block *spec.VersionedSignedBeaconBlock) (ssz.Marshaler, error) {
switch block.Version {
case spec.DataVersionAltair:
return block.Altair.Message, nil
case spec.DataVersionBellatrix:
return block.Bellatrix.Message, nil
case spec.DataVersionCapella:
return block.Capella.Message, nil
case spec.DataVersionDeneb:
return block.Deneb.Message, nil
default:
return nil, fmt.Errorf("unsupported block version: %s", block.Version)
}
}
44 changes: 32 additions & 12 deletions pkg/sentry/event/beacon/eth/v2/beacon_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (

"github.com/attestantio/go-eth2-client/spec"
"github.com/ethpandaops/xatu/pkg/proto/eth"
xatuethv2 "github.com/ethpandaops/xatu/pkg/proto/eth/v2"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/ethpandaops/xatu/pkg/sentry/ethereum"
ssz "github.com/ferranbt/fastssz"
"github.com/golang/snappy"
"github.com/google/uuid"
ttlcache "github.com/jellydator/ttlcache/v3"
hashstructure "github.com/mitchellh/hashstructure/v2"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
Expand Down Expand Up @@ -66,7 +65,7 @@ func (e *BeaconBlock) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error
},
}

additionalData, err := e.getAdditionalData(ctx, data)
additionalData, err := e.getAdditionalData(ctx)
if err != nil {
e.log.WithError(err).Error("Failed to get extra beacon block data")
} else {
Expand Down Expand Up @@ -123,7 +122,7 @@ func (e *BeaconBlock) ShouldIgnore(ctx context.Context) (bool, error) {
return false, nil
}

func (e *BeaconBlock) getAdditionalData(_ context.Context, data *xatuethv2.EventBlockV2) (*xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data, error) {
func (e *BeaconBlock) getAdditionalData(_ context.Context) (*xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data, error) {
extra := &xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data{}

slotI, err := e.event.Slot()
Expand Down Expand Up @@ -164,15 +163,23 @@ func (e *BeaconBlock) getAdditionalData(_ context.Context, data *xatuethv2.Event
}
}

dataAsJSON, err := protojson.Marshal(data)
blockMessage, err := getBlockMessage(e.event)
if err != nil {
return nil, err
e.log.WithError(err).Warn("Failed to get block message to compute block size. Missing fork version?")
} else {
sszData, err := ssz.MarshalSSZ(blockMessage)
if err != nil {
e.log.WithError(err).Warn("Failed to marshal (SSZ) block message to compute block size")
} else {
dataSize := len(sszData)
compressedData := snappy.Encode(nil, sszData)
compressedDataSize := len(compressedData)

extra.TotalBytes = wrapperspb.UInt64(uint64(dataSize))
extra.TotalBytesCompressed = wrapperspb.UInt64(uint64(compressedDataSize))
}
}

dataSize := len(dataAsJSON)
compressedData := snappy.Encode(nil, dataAsJSON)
compressedDataSize := len(compressedData)

switch e.event.Version {
case spec.DataVersionBellatrix:
bellatrixTxs := make([][]byte, len(e.event.Bellatrix.Message.Body.ExecutionPayload.Transactions))
Expand Down Expand Up @@ -200,11 +207,24 @@ func (e *BeaconBlock) getAdditionalData(_ context.Context, data *xatuethv2.Event
compressedTransactions := snappy.Encode(nil, transactionsBytes)
compressedTxSize := len(compressedTransactions)

extra.TotalBytes = wrapperspb.UInt64(uint64(dataSize))
extra.TotalBytesCompressed = wrapperspb.UInt64(uint64(compressedDataSize))
extra.TransactionsCount = wrapperspb.UInt64(uint64(txCount))
extra.TransactionsTotalBytes = wrapperspb.UInt64(uint64(txSize))
extra.TransactionsTotalBytesCompressed = wrapperspb.UInt64(uint64(compressedTxSize))

return extra, nil
}

func getBlockMessage(block *spec.VersionedSignedBeaconBlock) (ssz.Marshaler, error) {
switch block.Version {
case spec.DataVersionAltair:
return block.Altair.Message, nil
case spec.DataVersionBellatrix:
return block.Bellatrix.Message, nil
case spec.DataVersionCapella:
return block.Capella.Message, nil
case spec.DataVersionDeneb:
return block.Deneb.Message, nil
default:
return nil, fmt.Errorf("unsupported block version: %s", block.Version)
}
}

0 comments on commit 2c1b572

Please sign in to comment.