Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PeerDAS #322

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ transforms:
libp2p_trace_gossipsub_beacon_attestation: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION"
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
beacon_api_eth_v1_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
libp2p_trace_gossipsub_beacon_data_column_sidecar : .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_DATA_COLUMN_SIDECAR"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -872,4 +873,20 @@ sinks:
healthcheck:
enabled: true
encoding:
codec: json
codec: json
libp2p_trace_gossipsub_beacon_data_column_sidecar_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_data_column_sidecar
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-gossipsub-beacon-data-column-sidecar
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
105 changes: 105 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ transforms:
libp2p_trace_gossipsub_beacon_block: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
libp2p_trace_gossipsub_beacon_attestation: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION"
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
libp2p_trace_gossipsub_beacon_data_column_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_DATA_COLUMN_SIDECAR"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand All @@ -231,6 +232,7 @@ transforms:
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_block
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_attestation
- xatu_server_events_router.libp2p_trace_gossipsub_blob_sidecar
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_data_column_sidecar
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -761,6 +763,89 @@ transforms:
.topic_name = topicParts[3]
.topic_encoding = topicParts[4]

.message_size = .meta.client.additional_data.message_size
.message_id = .meta.client.additional_data.message_id

.updated_date_time = to_unix_timestamp(now())

del(.event)
del(.meta)
del(.data)
del(.path)
libp2p_trace_gossipsub_beacon_data_column_sidecar_formatted:
type: remap
inputs:
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_data_column_sidecar
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}

peer_id_key, err = .meta.client.additional_data.metadata.peer_id + .meta.ethereum.network.name
if err != null {
.error = err
.error_description = "failed to generate peer id unique key"
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.unique_key = seahash(.event.id)

.propagation_slot_start_diff = .meta.client.additional_data.propagation.slot_start_diff

.state_root = .data.signed_block_header.message.state_root
.column_index = .data.column_index
.kzg_commitments = .data.kzg_commitments
.kzg_proof = .data.kzg_proof
.kzg_commitments_inclusion_proof = .data.kzg_commitments_inclusion_proof

.slot = .data.data.slot
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
wallclock_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_slot.start_date_time, format: "%+");
if err == null {
.wallclock_slot_start_date_time = to_unix_timestamp(wallclock_slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_epoch = .meta.client.additional_data.epoch.number
wallclock_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_epoch.start_date_time, format: "%+");
if err == null {
.wallclock_epoch_start_date_time = to_unix_timestamp(wallclock_epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

.topic_layer = topicParts[1]
.topic_fork_digest_value = topicParts[2]
.topic_name = topicParts[3]
.topic_encoding = topicParts[4]

.message_size = .meta.client.additional_data.message_size
.message_id = .meta.client.additional_data.message_id
.updated_date_time = to_unix_timestamp(now())
Expand Down Expand Up @@ -1826,3 +1911,23 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
libp2p_trace_gossipsub_beacon_data_column_sidecar_clickhouse:
type: clickhouse
inputs:
- libp2p_trace_gossipsub_beacon_data_column_sidecar_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: libp2p_gossipsub_beacon_data_column_sidecar
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS libp2p_gossipsub_beacon_data_column_sidecar ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS libp2p_gossipsub_beacon_data_column_sidecar_local ON CLUSTER '{cluster}';
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
CREATE TABLE libp2p_gossipsub_beacon_data_column_sidecar_local on cluster '{cluster}' (
unique_key Int64,
updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
event_date_time DateTime64(3) Codec(DoubleDelta, ZSTD(1)),
slot UInt32 Codec(DoubleDelta, ZSTD(1)),
slot_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
epoch UInt32 Codec(DoubleDelta, ZSTD(1)),
epoch_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
column_index UInt64 Codec(ZSTD(1)),
kzg_commitments Array(String) Codec(ZSTD(1)),
kzg_proof String Codec(ZSTD(1)),
kzg_commitments_inclusion_proof String Codec(ZSTD(1)),
state_root FixedString(66) Codec(ZSTD(1)),
wallclock_slot UInt32 Codec(DoubleDelta, ZSTD(1)),
wallclock_slot_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
wallclock_epoch UInt32 Codec(DoubleDelta, ZSTD(1)),
wallclock_epoch_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
propagation_slot_start_diff UInt32 Codec(ZSTD(1)),
peer_id_unique_key Int64,
message_id String CODEC(ZSTD(1)),
message_size UInt32 Codec(ZSTD(1)),
topic_layer LowCardinality(String),
topic_fork_digest_value LowCardinality(String),
topic_name LowCardinality(String),
topic_encoding LowCardinality(String),
meta_client_name LowCardinality(String),
meta_client_id String Codec(ZSTD(1)),
meta_client_version LowCardinality(String),
meta_client_implementation LowCardinality(String),
meta_client_os LowCardinality(String),
meta_client_ip Nullable(IPv6) Codec(ZSTD(1)),
meta_client_geo_city LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_country LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_country_code LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_continent_code LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_longitude Nullable(Float64) Codec(ZSTD(1)),
meta_client_geo_latitude Nullable(Float64) Codec(ZSTD(1)),
meta_client_geo_autonomous_system_number Nullable(UInt32) Codec(ZSTD(1)),
meta_client_geo_autonomous_system_organization Nullable(String) Codec(ZSTD(1)),
meta_network_id Int32 Codec(DoubleDelta, ZSTD(1)),
meta_network_name LowCardinality(String)
) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY (slot_start_date_time, unique_key, meta_network_name, meta_client_name);

ALTER TABLE libp2p_gossipsub_beacon_data_column_sidecar_local ON CLUSTER '{cluster}'
MODIFY COMMENT 'Table for libp2p gossipsub beacon data column sidecar data.',
COMMENT COLUMN unique_key 'Unique identifier for each record',
COMMENT COLUMN updated_date_time 'Timestamp when the record was last updated',
COMMENT COLUMN event_date_time 'Timestamp of the event with millisecond precision',
COMMENT COLUMN slot 'Slot number associated with the event',
COMMENT COLUMN slot_start_date_time 'Start date and time of the slot',
COMMENT COLUMN epoch 'The epoch number in the data column sidecar',
COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started',
COMMENT COLUMN column_index 'Index of the column in the data',
COMMENT COLUMN kzg_commitments 'Commitments for the column',
COMMENT COLUMN kzg_proof 'Proof for the column',
COMMENT COLUMN kzg_commitments_inclusion_proof 'Inclusion proof for the column',
COMMENT COLUMN state_root 'State root for signed block header',
COMMENT COLUMN wallclock_slot 'Slot number of the wall clock when the event was received',
COMMENT COLUMN wallclock_slot_start_date_time 'Start date and time of the wall clock slot when the event was received',
COMMENT COLUMN wallclock_epoch 'Epoch number of the wall clock when the event was received',
COMMENT COLUMN wallclock_epoch_start_date_time 'Start date and time of the wall clock epoch when the event was received',
COMMENT COLUMN propagation_slot_start_diff 'Difference in slot start time for propagation',
COMMENT COLUMN peer_id_unique_key 'Unique key associated with the identifier of the peer',
COMMENT COLUMN message_id 'Identifier of the message',
COMMENT COLUMN message_size 'Size of the message in bytes',
COMMENT COLUMN topic_layer 'Layer of the topic in the gossipsub protocol',
COMMENT COLUMN topic_fork_digest_value 'Fork digest value of the topic',
COMMENT COLUMN topic_name 'Name of the topic',
COMMENT COLUMN topic_encoding 'Encoding used for the topic',
COMMENT COLUMN meta_client_name 'Name of the client that generated the event',
COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.',
COMMENT COLUMN meta_client_version 'Version of the client that generated the event',
COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event',
COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event',
COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event',
COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event',
COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event',
COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event',
COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event',
COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event',
COMMENT COLUMN meta_network_id 'Network ID associated with the client',
COMMENT COLUMN meta_network_name 'Name of the network associated with the client';

CREATE TABLE libp2p_gossipsub_beacon_data_column_sidecar on cluster '{cluster}' AS libp2p_gossipsub_beacon_data_column_sidecar_local
ENGINE = Distributed('{cluster}', default, libp2p_gossipsub_beacon_data_column_sidecar_local, unique_key);
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/ethpandaops/xatu

go 1.22.0

replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240527060557-ae23464cdc41
replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240725023104-113a74f8b721

require (
github.com/IBM/sarama v1.43.0
Expand Down Expand Up @@ -33,7 +33,7 @@ require (
github.com/probe-lab/hermes v0.0.0-20240327153144-a2528356b4f7
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.0
github.com/prysmaticlabs/prysm/v5 v5.0.2
github.com/prysmaticlabs/prysm/v5 v5.0.4-0.20240510074809-a76de4f79a99
github.com/r3labs/sse/v2 v2.10.0
github.com/redis/go-redis/v9 v9.5.1
github.com/sirupsen/logrus v1.9.3
Expand All @@ -48,6 +48,7 @@ require (
google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.33.0
gopkg.in/cenkalti/backoff.v1 v1.1.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -95,7 +96,7 @@ require (
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/ethereum/c-kzg-4844 v1.0.2-0.20240507203752-26d3b4156f7a // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/ferranbt/fastssz v0.1.3 // indirect
github.com/flynn/noise v1.1.0 // indirect
Expand Down Expand Up @@ -255,7 +256,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/apimachinery v0.20.0 // indirect
k8s.io/client-go v0.20.0 // indirect
k8s.io/klog/v2 v2.80.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R3nlY=
github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/c-kzg-4844 v1.0.2-0.20240507203752-26d3b4156f7a h1:EV64oiDZGl97cptCieq1X7KrumSbP4MhmKg0/ll65wo=
github.com/ethereum/c-kzg-4844 v1.0.2-0.20240507203752-26d3b4156f7a/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/go-ethereum v1.13.15 h1:U7sSGYGo4SPjP6iNIifNoyIAiNjrmQkz6EwQG+/EZWo=
github.com/ethereum/go-ethereum v1.13.15/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/ethpandaops/beacon v0.38.0 h1:sMFlq49t/PIrp7DlSWgM+OgPAyblvMeV+jr2AOW6ls0=
Expand All @@ -266,8 +266,8 @@ github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
github.com/ethpandaops/ethwallclock v0.3.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24=
github.com/ethpandaops/hermes v0.0.0-20240527060557-ae23464cdc41 h1:KM07rlGX9EQm6ZawvL0BDMJ5Rf3brZIJKsmiFS92wI0=
github.com/ethpandaops/hermes v0.0.0-20240527060557-ae23464cdc41/go.mod h1:uMPOUopuxLk1Qktn66udTjRBLqJJ+CNfiCyxNQda4Ow=
github.com/ethpandaops/hermes v0.0.0-20240725023104-113a74f8b721 h1:8fwmolf051iMka7QkuiZ7a97wTwTEshNA0RoaLKOy7o=
github.com/ethpandaops/hermes v0.0.0-20240725023104-113a74f8b721/go.mod h1:IZgi+dl4W0aDbxy0IB/hTfJtiDDEe/AkkDzvt/rgvZ8=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
Expand Down Expand Up @@ -878,8 +878,8 @@ github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c h1:9PHRCuO
github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c/go.mod h1:ZRws458tYHS/Zs936OQ6oCrL+Ict5O4Xpwve1UQ6C9M=
github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294 h1:q9wE0ZZRdTUAAeyFP/w0SwBEnCqlVy2+on6X2/e+eAU=
github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294/go.mod h1:ZVEbRdnMkGhp/pu35zq4SXxtvUwWK0J1MATtekZpH2Y=
github.com/prysmaticlabs/prysm/v5 v5.0.2 h1:xcSUvrCVfOGslKYUb5Hpyz98N9I8fC2p7DMAZfiqEIA=
github.com/prysmaticlabs/prysm/v5 v5.0.2/go.mod h1:XG4nOU925zemOimoexcrFP4oA57f+RTQbp7V/TH9UOM=
github.com/prysmaticlabs/prysm/v5 v5.0.4-0.20240510074809-a76de4f79a99 h1:RGPhzM/qJR8QqXK8ep2qs3ZABM4tPl0bnGFhDADr+NM=
github.com/prysmaticlabs/prysm/v5 v5.0.4-0.20240510074809-a76de4f79a99/go.mod h1:I3fzhjqrdv/17V3ckLKHLIU2K00kuFzFLw3P24EuKlo=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM=
Expand Down
Loading
Loading