Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relay-monitor): Validator Registrations #428

Merged
merged 6 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ linters-settings:
nolintlint:
require-explanation: true
require-specific: true

issues:
exclude-rules:
# Exclude some linters from running on tests files.
- path: _test\.go
linters:
- gocritic
- gosec
- wsl
linters:
disable-all: true
enable:
Expand Down
19 changes: 19 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ transforms:
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
beacon_api_eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK"
mev_relay_validator_registration: .event.name == "MEV_RELAY_VALIDATOR_REGISTRATION"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -926,3 +927,21 @@ sinks:
codec: json
librdkafka_options:
message.max.bytes: "10485760" # 10MB
mev_relay_validator_registration_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.mev_relay_validator_registration
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: mev-relay-validator-registration
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
librdkafka_options:
message.max.bytes: "10485760" # 10MB
102 changes: 102 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ transforms:
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK"
mev_relay_validator_registration: .event.name == "MEV_RELAY_VALIDATOR_REGISTRATION"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand Down Expand Up @@ -411,6 +412,7 @@ transforms:
- xatu_server_events_router.mev_relay_bid_trace_builder_block_submission
- xatu_server_events_router.mev_relay_proposer_payload_delivered
- xatu_server_events_router.eth_v3_validator_block
- xatu_server_events_router.mev_relay_validator_registration
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -2009,7 +2011,86 @@ transforms:
del(.event)
del(.meta)
del(.data)
mev_relay_validator_registration_formatted:
type: remap
inputs:
- xatu_server_events_router.mev_relay_validator_registration
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}

# DATA
.timestamp = .data.message.timestamp
.fee_recipient = .data.message.fee_recipient
.gas_limit = .data.message.gas_limit

# ADDITIONAL DATA
.relay_name = .meta.client.additional_data.relay.name
.validator_index = .meta.client.additional_data.validator_index
.slot = .meta.client.additional_data.slot.number
.epoch = .meta.client.additional_data.epoch.number
.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
.wallclock_epoch = .meta.client.additional_data.wallclock_epoch.number


slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number

epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
wallclock_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_slot.start_date_time, format: "%+");
if err == null {
.wallclock_slot_start_date_time = to_unix_timestamp(wallclock_slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock slot start date time"
log(., level: "error", rate_limit_secs: 60)
}

.wallclock_epoch = .meta.client.additional_data.wallclock_epoch.number
wallclock_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_epoch.start_date_time, format: "%+");
if err == null {
.wallclock_epoch_start_date_time = to_unix_timestamp(wallclock_epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

.relay_name = .meta.client.additional_data.relay.name


.updated_date_time = to_unix_timestamp(now())


del(.meta_consensus_implementation)
del(.meta_network_id)


del(.event)
del(.meta)
del(.data)
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -2658,4 +2739,25 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
mev_relay_validator_registration_clickhouse:
type: clickhouse
inputs:
- mev_relay_validator_registration_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: mev_relay_validator_registration
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false


Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS mev_relay_validator_registration ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS mev_relay_validator_registration_local ON CLUSTER '{cluster}' SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
CREATE TABLE default.mev_relay_validator_registration_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'When the bid was fetched' CODEC(DoubleDelta, ZSTD(1)),
`timestamp` Int64 COMMENT 'The timestamp of the bid' CODEC(DoubleDelta, ZSTD(1)),
`relay_name` String COMMENT 'The relay that the bid was fetched from' CODEC(ZSTD(1)),
`validator_index` UInt32 COMMENT 'The validator index of the validator registration' CODEC(ZSTD(1)),
`gas_limit` UInt64 COMMENT 'The gas limit of the validator registration' CODEC(DoubleDelta, ZSTD(1)),
`fee_recipient` String COMMENT 'The fee recipient of the validator registration' CODEC(ZSTD(1)),
`slot` UInt32 COMMENT 'Slot number derived from the validator registration `timestamp` field' CODEC(DoubleDelta, ZSTD(1)),
`slot_start_date_time` DateTime COMMENT 'The slot start time derived from the validator registration `timestamp` field' CODEC(DoubleDelta, ZSTD(1)),
`epoch` UInt32 COMMENT 'Epoch number derived from the validator registration `timestamp` field' CODEC(DoubleDelta, ZSTD(1)),
`epoch_start_date_time` DateTime COMMENT 'The epoch start time derived from the validator registration `timestamp` field' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_slot` UInt32 COMMENT 'The wallclock slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_slot_start_date_time` DateTime COMMENT 'The start time for the slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_epoch` UInt32 COMMENT 'The wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_epoch_start_date_time` DateTime COMMENT 'The start time for the wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_id` String COMMENT 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.' CODEC(ZSTD(1)),
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name',
`meta_labels` Map(String, String) COMMENT 'Labels associated with the event' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/{database}/tables/{table}/{shard}',
'{replica}',
updated_date_time
) PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY
(
slot_start_date_time,
meta_network_name,
meta_client_name,
relay_name,
validator_index,
timestamp
) COMMENT 'Contains MEV relay validator registrations data.';

CREATE TABLE default.mev_relay_validator_registration ON CLUSTER '{cluster}' AS default.mev_relay_validator_registration_local ENGINE = Distributed(
'{cluster}',
default,
mev_relay_validator_registration_local,
cityHash64(
slot,
meta_network_name
)
);
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ services:
"mev-relay-bid-trace-builder-block-submission"
"mev-relay-proposer-payload-delivered"
"beacon-api-eth-v3-validator-block"
"mev-relay-validator-registration"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down
Loading
Loading