Skip to content

Commit

Permalink
add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
cortze committed Jan 31, 2025
1 parent 52c1831 commit 99dca22
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 5 deletions.
1 change: 1 addition & 0 deletions cmd/hermes/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func rootBefore(c *cli.Context) error {
}
if dataStreamType == host.DataStreamTypeS3 {
s3conf := &host.S3DSConfig{
Meter: tele.NoopMeterProvider().Meter("hermes_s3"),
Flushers: rootConfig.S3Flushers,
FlushInterval: rootConfig.S3FlushInterval,
ByteLimit: int64(rootConfig.S3ByteLimit),
Expand Down
2 changes: 2 additions & 0 deletions eth/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func NewNode(cfg *NodeConfig) (*Node, error) {
ds = host.NewCallbackDataStream()

case host.DataStreamTypeS3:
// get the metrics tracer and meter from the root config
cfg.S3Config.Meter = cfg.Meter
var err error
ds, err = host.NewS3DataStream(*cfg.S3Config)
if err != nil {
Expand Down
120 changes: 115 additions & 5 deletions host/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/probe-lab/hermes/tele"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

parquet "github.com/parquet-go/parquet-go"
)
Expand All @@ -23,6 +25,9 @@ var (
S3ConnectionTimeout = 5 * time.Second
S3OpTimeout = 10 * time.Second
DefaultByteLimit = int64(10 * 1024 * 1024) // 10MB

// metrics
S3MeterName = "github.com/probe-lab/hermes/s3"
)

type S3DataStream struct {
Expand All @@ -41,7 +46,8 @@ type S3DataStream struct {

// counter to identify event-files on S3 - unique per peer
// should be reset on restart with the peer_id
fileCnt atomic.Int64
fileCnt atomic.Int64
s3Telemetry *s3Telemetry
}

var _ DataStream = (*S3DataStream)(nil)
Expand All @@ -68,6 +74,10 @@ func NewS3DataStream(baseCfg S3DSConfig) (*S3DataStream, error) {
for _, evType := range allEventTypes {
eventStore[evType] = newEventBatcher(baseCfg.ByteLimit)
}
s3Telemetry, err := newS3Telemetry(baseCfg.Meter)
if err != nil {
return nil, err
}
return &S3DataStream{
config: baseCfg,
client: s3client,
Expand All @@ -76,6 +86,7 @@ func NewS3DataStream(baseCfg S3DSConfig) (*S3DataStream, error) {
flushersDone: make(chan struct{}),
pFlusherDone: make(chan struct{}),
restartPflusherC: make(chan struct{}),
s3Telemetry: s3Telemetry,
}, nil
}

Expand Down Expand Up @@ -170,11 +181,17 @@ func (s3ds *S3DataStream) PutRecord(ctx context.Context, event *TraceEvent) erro
batcher.Lock()
batcher.addNewEvents(events)
if batcher.isFull() {
tSinceLastReset := time.Since(batcher.lastResetT)
submissionT := &EventSubmissionTask{
EventType: t,
Events: batcher.reset(),
}
batcher.Unlock()
s3ds.s3Telemetry.batcherFlushIntervalHist.Record(
ctx,
int64(tSinceLastReset.Seconds()),
metric.WithAttributes(attribute.String("event_type", t.String())),
)
return s3ds.submitRecords(ctx, submissionT)
}
batcher.Unlock()
Expand Down Expand Up @@ -314,7 +331,8 @@ func (s3ds *S3DataStream) startPeriodicFlusher(ctx context.Context, interval tim
return
case <-flushCheckTicker.C:
for evType, batcher := range s3ds.eventStore {
if time.Since(batcher.GetLastResetTime()) < interval {
tSinceLastReset := time.Since(batcher.GetLastResetTime())
if tSinceLastReset < interval {
slog.Debug("not in time to flush", "event-type", evType, "waiting to pflush", interval-time.Since(batcher.lastResetT))
continue
}
Expand All @@ -327,6 +345,11 @@ func (s3ds *S3DataStream) startPeriodicFlusher(ctx context.Context, interval tim
if err := s3ds.submitRecords(opCtx, submissionT); err != nil {
slog.Error("submitting last records to s3", tele.LogAttrError(err), "event-type", evType)
}
s3ds.s3Telemetry.batcherFlushIntervalHist.Record(
ctx,
int64(tSinceLastReset.Seconds()),
metric.WithAttributes(attribute.String("event_type", evType.String())),
)
cancel()
}
flushCheckTicker.Reset(1 * time.Second)
Expand Down Expand Up @@ -455,12 +478,35 @@ func (s3ds *S3DataStream) startS3Flusher(
slog.Error("uploading file to s3", tele.LogAttrError(err))
continue
}
uploadT := time.Since(uploadStartT)
totalT := time.Since(formatStartT)
// register the metrics
s3ds.s3Telemetry.bulkTraceHist.Record(
ctx,
int64(len(eventT.Events)),
metric.WithAttributes(attribute.String("event_type", eventT.EventType.String())),
)
s3ds.s3Telemetry.parquetFormatingLatencyHist.Record(
ctx,
formatT.Milliseconds(),
metric.WithAttributes(attribute.String("event_type", eventT.EventType.String())),
)
s3ds.s3Telemetry.s3SubmissionLantencyHist.Record(
ctx,
uploadT.Milliseconds(),
metric.WithAttributes(attribute.String("event_type", eventT.EventType.String())),
)
s3ds.s3Telemetry.parquetFileSizeHist.Record(
ctx,
int64(totBytes/(1024*1024)),
metric.WithAttributes(attribute.String("event_type", eventT.EventType.String())),
)
slog.Info("submitted file to s3",
"MB", float32(totBytes)/(1024.0*1024.0),
"s3key", eventT.S3Key,
"formating-time", formatT,
"upload-time", time.Since(uploadStartT),
"total-time", time.Since(formatStartT),
"upload-time", uploadT,
"total-time", totalT,
)

case <-ctx.Done():
Expand Down Expand Up @@ -499,15 +545,16 @@ func EventsToBytes[T any](

// S3DSConfig belongs to the configuration needed to stablish a connection with an s3 instance
type S3DSConfig struct {
Meter metric.Meter
Flushers int
ByteLimit int64
FlushInterval time.Duration
AccessKeyID string
SecretKey string
Region string
Endpoint string
Bucket string
Tag string
FlushInterval time.Duration
}

// IsValid checks whether the current configuration is valid or not
Expand Down Expand Up @@ -644,3 +691,66 @@ func (b *eventBatcher) GetLastResetTime() time.Time {
defer b.RUnlock()
return b.lastResetT
}

type s3Telemetry struct {
bulkTraceHist metric.Int64Histogram
parquetFormatingLatencyHist metric.Int64Histogram
s3SubmissionLantencyHist metric.Int64Histogram
parquetFileSizeHist metric.Int64Histogram
batcherFlushIntervalHist metric.Int64Histogram
}

func newS3Telemetry(meter metric.Meter) (*s3Telemetry, error) {
bulkTraceHist, err := meter.Int64Histogram(
"s3_bulk_traced_events",
metric.WithDescription("Number of bulk events added on each s3 submission per event category"),
metric.WithExplicitBucketBoundaries(
0, 100, 200, 500, 1000, 1500, 4000, 8000, 10000,
),
)
if err != nil {
return nil, err
}

parquetFormatingLatencyHist, err := meter.Int64Histogram(
"s3_parquet_formating_latency_ms",
metric.WithDescription("Milliseconds that it took to format the given traces into a parquet file"),
)
if err != nil {
return nil, err
}

s3SubmissionLatencyHist, err := meter.Int64Histogram(
"s3_submission_latency_ms",
metric.WithDescription("Milliseconds that it took to submite parquet files to s3"),
)
if err != nil {
return nil, err
}

parquetFileSizeHist, err := meter.Int64Histogram(
"s3_parquet_file_size_mb",
metric.WithDescription("Size (in MB) of the submited parquet files to s3"),
metric.WithExplicitBucketBoundaries(0, 2, 5, 10, 12, 15, 20, 25, 30),
)
if err != nil {
return nil, err
}

batcherFlushIntervalHist, err := meter.Int64Histogram(
"s3_batcher_flush_interval_s",
metric.WithDescription("Interval between batcher flush times (in seconds)"),
metric.WithExplicitBucketBoundaries(0, 1, 2, 3, 4, 5, 10, 12, 15, 20, 30),
)
if err != nil {
return nil, err
}

return &s3Telemetry{
bulkTraceHist: bulkTraceHist,
parquetFormatingLatencyHist: parquetFormatingLatencyHist,
s3SubmissionLantencyHist: s3SubmissionLatencyHist,
parquetFileSizeHist: parquetFileSizeHist,
batcherFlushIntervalHist: batcherFlushIntervalHist,
}, nil
}
1 change: 1 addition & 0 deletions host/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func Test_S3_Periodic_Flusher(t *testing.T) {
func getTestS3client(ctx context.Context, t *testing.T) *S3DataStream {
// basic configuration
cfg := S3DSConfig{
Meter: tele.NoopMeterProvider().Meter("testing_hermes"),
Flushers: 1,
FlushInterval: 1 * time.Second,
Endpoint: "http://localhost:4566",
Expand Down

0 comments on commit 99dca22

Please sign in to comment.