Skip to content

Commit

Permalink
Address some missing fixes for s3 traces (probe-lab#44)
Browse files Browse the repository at this point in the history
* remname events

* fix: lower column naming + explicit declaration of list[string]
  • Loading branch information
cortze authored Feb 5, 2025
1 parent 42282b5 commit 7898128
Showing 1 changed file with 41 additions and 41 deletions.
82 changes: 41 additions & 41 deletions host/parquet_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (e EventType) String() string {
case EventTypeGenericEvent:
return "generic"
case EventTypeAddRemovePeer:
return "add_remove_peer"
return "add_remove"
case EventTypeGraftPrune:
return "graft_prune"
case EventTypeControlRPC:
Expand All @@ -58,7 +58,7 @@ func (e EventType) String() string {
case EventTypeMsgArrivals:
return "msg_arrival"
case EventTypeJoinLeaveTopic:
return "join_topic"
return "join_leave"
case EventTypeConnectDisconnectPeer:
return "connect_disconnect"
default:
Expand Down Expand Up @@ -152,9 +152,9 @@ var _ LocalyProducedEvent = (*BaseEvent)(nil)

// For analysis purposes, we need to pair the events one with eachother
type BaseEvent struct {
Timestamp int64
Type string
ProducerID string
Timestamp int64 `parquet:"timestamp"`
Type string `parquet:"type"`
ProducerID string `parquet:"producer_id"`
}

func (b *BaseEvent) GetProducerID() string {
Expand All @@ -163,8 +163,8 @@ func (b *BaseEvent) GetProducerID() string {

type GossipAddRemovePeerEvent struct {
BaseEvent
SubType string
RemotePeerID string
SubType string `parquet:"sub_type"`
RemotePeerID string `parquet:"remote_peer_id"`
// Protocol string // removing it for now, to keep a constant format across Add and Remove
}

Expand All @@ -188,9 +188,9 @@ func addRemovePeerFromEvent(subType EventSubType, rawEvent *TraceEvent) (map[Eve

type GossipGraftPruneEvent struct {
BaseEvent
SubType string
RemotePeerID string
Topic string
SubType string `parquet:"sub_type"`
RemotePeerID string `parquet:"remote_peer_id"`
Topic string `parquet:"topic"`
}

func graftPruneFromEvent(subType EventSubType, rawEvent *TraceEvent) (map[EventType][]any, error) {
Expand All @@ -217,35 +217,35 @@ func graftPruneFromEvent(subType EventSubType, rawEvent *TraceEvent) (map[EventT
// tracks the direction and the number of message_ids per control
type SendRecvRPCEvent struct {
BaseRPCEvent
Ihaves int32
Iwants int32
Idontwants int32
Ihaves int32 `parquet:"ihaves"`
Iwants int32 `parquet:"iwants"`
Idontwants int32 `parquet:"idontwants"`
}

type BaseRPCEvent struct {
BaseEvent
IsOg bool // since we will divide original IHAVES into different rows off keep track of OG events for Control msg ids
Direction string
RemotePeerID string
IsOg bool `parquet:"is_og"` // since we will divide original IHAVES into different rows off keep track of OG events for Control msg ids
Direction string `parquet:"direction"`
RemotePeerID string `parquet:"remote_peer_id"`
}

type GossipIhaveEvent struct {
BaseRPCEvent
Topic string
MsgIDs []string
Msgs int
Topic string `parquet:"topic"`
MsgIDs []string `parquet:"msg_ids,list"`
Msgs int `parquet:"msgs"`
}

type GossipIwantEvent struct {
BaseRPCEvent
MsgIDs []string
Msgs int
MsgIDs []string `parquet:"msg_ids,list"`
Msgs int `parquet:"msgs"`
}

type GossipIdontwantEvent struct {
BaseRPCEvent
MsgIDs []string
Msgs int
MsgIDs []string `parquet:"msg_ids,list"`
Msgs int `parquet:"msgs"`
}

type RPCdirection int8
Expand Down Expand Up @@ -411,13 +411,13 @@ func sendRecvDropRPCFromEvent(rpcDirection RPCdirection, rawEvent *TraceEvent) (

type GossipMsgArrivalEvent struct {
BaseEvent
SubType string
RemotePeerID string
Topic string
MsgID string
Local bool
MsgSize int64
SeqNo string
SubType string `parquet:"sub_type"`
RemotePeerID string `parquet:"remote_peer_id"`
Topic string `parquet:"topic"`
MsgID string `parquet:"msg_id"`
Local bool `parquet:"local"`
MsgSize int64 `parquet:"msg_size"`
SeqNo string `parquet:"seq_no"`
}

func msgArrivalFromEvent(subType EventSubType, rawEvent *TraceEvent) (map[EventType][]any, error) {
Expand Down Expand Up @@ -454,8 +454,8 @@ func msgArrivalFromEvent(subType EventSubType, rawEvent *TraceEvent) (map[EventT

type GossipJoinLeaveTopicEvent struct {
BaseEvent
SubType string
Topic string
SubType string `parquet:"sub_type"`
Topic string `parquet:"topic"`
}

func joinLeaveTopicFromEvent(subType EventSubType, rawEvent *TraceEvent) (map[EventType][]any, error) {
Expand All @@ -478,13 +478,13 @@ func joinLeaveTopicFromEvent(subType EventSubType, rawEvent *TraceEvent) (map[Ev

type Libp2pConnectDisconnectEvent struct {
BaseEvent
SubType string
RemotePeerID string
RemotePeerMaddrs string
AgentVersion string
Direction string
Opened int64
Limited bool
SubType string `parquet:"sub_type"`
RemotePeerID string `parquet:"remote_peer_id"`
RemotePeerMaddrs string `parquet:"remote_peer_maddrs"`
AgentVersion string `parquet:"agent_version"`
Direction string `parquet:"direction"`
Opened int64 `parquet:"opened"`
Limited bool `parquet:"limited"`
}

func connectDisconnectFromEvent(subType EventSubType, rawEvent *TraceEvent) (map[EventType][]any, error) {
Expand Down Expand Up @@ -582,8 +582,8 @@ func RenderEvent(rawEvent *TraceEvent) (map[EventType][]any, error) {
// if we don't have a generic parquet format for a trace, use the generic one
type GenericParquetEvent struct {
BaseEvent
Topic string
Payload string
Topic string `parquet:"topic"`
Payload string `parquet:"payload"`
}

func GenericTraceFromEvent(t *TraceEvent) *GenericParquetEvent {
Expand Down

0 comments on commit 7898128

Please sign in to comment.