Skip to content

Commit

Permalink
add message bytes to metrics middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
paulwe committed May 26, 2024
1 parent 8ba067a commit 297a2f9
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions pkg/middleware/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func (r MetricRole) String() string {
}

type MetricsObserver interface {
OnUnaryRequest(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error)
OnMultiRequest(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, responseCount int, errorCount int)
OnStreamSend(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error)
OnStreamRecv(role MetricRole, rpcInfo psrpc.RPCInfo, err error)
OnUnaryRequest(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error, rxBytes, txBytes int)
OnMultiRequest(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, responseCount, errorCount, rxBytes, txBytes int)
OnStreamSend(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error, bytes int)
OnStreamRecv(role MetricRole, rpcInfo psrpc.RPCInfo, err error, bytes int)
OnStreamOpen(role MetricRole, rpcInfo psrpc.RPCInfo)
OnStreamClose(role MetricRole, rpcInfo psrpc.RPCInfo)
}
Expand All @@ -70,7 +70,9 @@ func newClientRPCMetricsInterceptor(observer MetricsObserver) psrpc.ClientRPCInt
return func(rpcInfo psrpc.RPCInfo, next psrpc.ClientRPCHandler) psrpc.ClientRPCHandler {
return func(ctx context.Context, req proto.Message, opts ...psrpc.RequestOption) (res proto.Message, err error) {
start := time.Now()
defer func() { observer.OnUnaryRequest(ClientRole, rpcInfo, time.Since(start), err) }()
defer func() {
observer.OnUnaryRequest(ClientRole, rpcInfo, time.Since(start), err, proto.Size(req), proto.Size(res))
}()
return next(ctx, req, opts...)
}
}
Expand All @@ -87,9 +89,9 @@ func newServerRPCMetricsInterceptor(observer MetricsObserver) psrpc.ServerRPCInt
} else {
errorCount++
}
observer.OnMultiRequest(ServerRole, rpcInfo, time.Since(start), responseCount, errorCount)
observer.OnMultiRequest(ServerRole, rpcInfo, time.Since(start), responseCount, errorCount, proto.Size(req), proto.Size(res))
} else {
observer.OnUnaryRequest(ServerRole, rpcInfo, time.Since(start), err)
observer.OnUnaryRequest(ServerRole, rpcInfo, time.Since(start), err, proto.Size(req), proto.Size(res))
}
}()
return handler(ctx, req)
Expand All @@ -116,13 +118,13 @@ type streamMetricsInterceptor struct {
}

func (s *streamMetricsInterceptor) Recv(msg proto.Message) (err error) {
s.observer.OnStreamRecv(s.role, s.info, err)
s.observer.OnStreamRecv(s.role, s.info, err, proto.Size(msg))
return s.StreamHandler.Recv(msg)
}

func (s *streamMetricsInterceptor) Send(msg proto.Message, opts ...psrpc.StreamOption) (err error) {
start := time.Now()
defer func() { s.observer.OnStreamSend(s.role, s.info, time.Since(start), err) }()
defer func() { s.observer.OnStreamSend(s.role, s.info, time.Since(start), err, proto.Size(msg)) }()
return s.StreamHandler.Send(msg, opts...)
}

Expand All @@ -149,10 +151,13 @@ type multiRPCMetricsInterceptor struct {
info psrpc.RPCInfo
responseCount int
errorCount int
rxBytes int
txBytes int
}

func (r *multiRPCMetricsInterceptor) Send(ctx context.Context, req proto.Message, opts ...psrpc.RequestOption) error {
r.start = time.Now()
r.txBytes += proto.Size(req)
return r.ClientMultiRPCHandler.Send(ctx, req, opts...)
}

Expand All @@ -162,10 +167,11 @@ func (r *multiRPCMetricsInterceptor) Recv(msg proto.Message, err error) {
} else {
r.errorCount++
}
r.rxBytes += proto.Size(msg)
r.ClientMultiRPCHandler.Recv(msg, err)
}

func (r *multiRPCMetricsInterceptor) Close() {
r.observer.OnMultiRequest(ClientRole, r.info, time.Since(r.start), r.responseCount, r.errorCount)
r.observer.OnMultiRequest(ClientRole, r.info, time.Since(r.start), r.responseCount, r.errorCount, r.rxBytes, r.txBytes)
r.ClientMultiRPCHandler.Close()
}

0 comments on commit 297a2f9

Please sign in to comment.