Skip to content

Commit

Permalink
feat(relay-monitor): Validator Registrations (#428)
Browse files Browse the repository at this point in the history
* feat(relay-monitor): Validator Registrations

* feat: Add MEV relay validator registration handling

* style: Remove unnecessary comment and empty line

* chore(golangci.yml): add gocritic, gosec, and wsl linters to exclude rules for test files to enhance code quality checks

* refactor(validator_set_test.go): rename min and max variables to mi and ma for improved readability and consistency in naming conventions

* style(validator_set_test.go): rename variables for minimum and maximum values to improve readability and consistency in the test code
  • Loading branch information
samcm authored Dec 12, 2024
1 parent 638ba2c commit 5f6adb7
Show file tree
Hide file tree
Showing 23 changed files with 3,477 additions and 1,971 deletions.
9 changes: 8 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ linters-settings:
nolintlint:
require-explanation: true
require-specific: true

issues:
exclude-rules:
# Exclude some linters from running on tests files.
- path: _test\.go
linters:
- gocritic
- gosec
- wsl
linters:
disable-all: true
enable:
Expand Down
19 changes: 19 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ transforms:
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
beacon_api_eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK"
mev_relay_validator_registration: .event.name == "MEV_RELAY_VALIDATOR_REGISTRATION"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -926,3 +927,21 @@ sinks:
codec: json
librdkafka_options:
message.max.bytes: "10485760" # 10MB
mev_relay_validator_registration_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.mev_relay_validator_registration
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: mev-relay-validator-registration
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
librdkafka_options:
message.max.bytes: "10485760" # 10MB
102 changes: 102 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ transforms:
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK"
mev_relay_validator_registration: .event.name == "MEV_RELAY_VALIDATOR_REGISTRATION"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand Down Expand Up @@ -411,6 +412,7 @@ transforms:
- xatu_server_events_router.mev_relay_bid_trace_builder_block_submission
- xatu_server_events_router.mev_relay_proposer_payload_delivered
- xatu_server_events_router.eth_v3_validator_block
- xatu_server_events_router.mev_relay_validator_registration
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -2009,7 +2011,86 @@ transforms:
del(.event)
del(.meta)
del(.data)
mev_relay_validator_registration_formatted:
type: remap
inputs:
- xatu_server_events_router.mev_relay_validator_registration
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
# DATA
.timestamp = .data.message.timestamp
.fee_recipient = .data.message.fee_recipient
.gas_limit = .data.message.gas_limit
# ADDITIONAL DATA
.relay_name = .meta.client.additional_data.relay.name
.validator_index = .meta.client.additional_data.validator_index
.slot = .meta.client.additional_data.slot.number
.epoch = .meta.client.additional_data.epoch.number
.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
.wallclock_epoch = .meta.client.additional_data.wallclock_epoch.number
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
wallclock_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_slot.start_date_time, format: "%+");
if err == null {
.wallclock_slot_start_date_time = to_unix_timestamp(wallclock_slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_epoch = .meta.client.additional_data.wallclock_epoch.number
wallclock_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_epoch.start_date_time, format: "%+");
if err == null {
.wallclock_epoch_start_date_time = to_unix_timestamp(wallclock_epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.relay_name = .meta.client.additional_data.relay.name

.updated_date_time = to_unix_timestamp(now())


del(.meta_consensus_implementation)
del(.meta_network_id)


del(.event)
del(.meta)
del(.data)
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -2658,4 +2739,25 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
mev_relay_validator_registration_clickhouse:
type: clickhouse
inputs:
- mev_relay_validator_registration_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: mev_relay_validator_registration
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false


Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS mev_relay_validator_registration ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS mev_relay_validator_registration_local ON CLUSTER '{cluster}' SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
CREATE TABLE default.mev_relay_validator_registration_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'When the bid was fetched' CODEC(DoubleDelta, ZSTD(1)),
`timestamp` Int64 COMMENT 'The timestamp of the bid' CODEC(DoubleDelta, ZSTD(1)),
`relay_name` String COMMENT 'The relay that the bid was fetched from' CODEC(ZSTD(1)),
`validator_index` UInt32 COMMENT 'The validator index of the validator registration' CODEC(ZSTD(1)),
`gas_limit` UInt64 COMMENT 'The gas limit of the validator registration' CODEC(DoubleDelta, ZSTD(1)),
`fee_recipient` String COMMENT 'The fee recipient of the validator registration' CODEC(ZSTD(1)),
`slot` UInt32 COMMENT 'Slot number derived from the validator registration `timestamp` field' CODEC(DoubleDelta, ZSTD(1)),
`slot_start_date_time` DateTime COMMENT 'The slot start time derived from the validator registration `timestamp` field' CODEC(DoubleDelta, ZSTD(1)),
`epoch` UInt32 COMMENT 'Epoch number derived from the validator registration `timestamp` field' CODEC(DoubleDelta, ZSTD(1)),
`epoch_start_date_time` DateTime COMMENT 'The epoch start time derived from the validator registration `timestamp` field' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_slot` UInt32 COMMENT 'The wallclock slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_slot_start_date_time` DateTime COMMENT 'The start time for the slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_epoch` UInt32 COMMENT 'The wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_epoch_start_date_time` DateTime COMMENT 'The start time for the wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_id` String COMMENT 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.' CODEC(ZSTD(1)),
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name',
`meta_labels` Map(String, String) COMMENT 'Labels associated with the event' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/{database}/tables/{table}/{shard}',
'{replica}',
updated_date_time
) PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY
(
slot_start_date_time,
meta_network_name,
meta_client_name,
relay_name,
validator_index,
timestamp
) COMMENT 'Contains MEV relay validator registrations data.';

CREATE TABLE default.mev_relay_validator_registration ON CLUSTER '{cluster}' AS default.mev_relay_validator_registration_local ENGINE = Distributed(
'{cluster}',
default,
mev_relay_validator_registration_local,
cityHash64(
slot,
meta_network_name
)
);
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ services:
"mev-relay-bid-trace-builder-block-submission"
"mev-relay-proposer-payload-delivered"
"beacon-api-eth-v3-validator-block"
"mev-relay-validator-registration"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down
Loading

0 comments on commit 5f6adb7

Please sign in to comment.