From 297a2f9ada0b599b1d93ff05bd80bd7cf8a6e503 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 26 May 2024 11:57:08 -0700 Subject: [PATCH] add message bytes to metrics middleware --- pkg/middleware/metrics.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pkg/middleware/metrics.go b/pkg/middleware/metrics.go index 4495dc5..2da4ca8 100644 --- a/pkg/middleware/metrics.go +++ b/pkg/middleware/metrics.go @@ -43,10 +43,10 @@ func (r MetricRole) String() string { } type MetricsObserver interface { - OnUnaryRequest(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error) - OnMultiRequest(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, responseCount int, errorCount int) - OnStreamSend(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error) - OnStreamRecv(role MetricRole, rpcInfo psrpc.RPCInfo, err error) + OnUnaryRequest(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error, rxBytes, txBytes int) + OnMultiRequest(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, responseCount, errorCount, rxBytes, txBytes int) + OnStreamSend(role MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error, bytes int) + OnStreamRecv(role MetricRole, rpcInfo psrpc.RPCInfo, err error, bytes int) OnStreamOpen(role MetricRole, rpcInfo psrpc.RPCInfo) OnStreamClose(role MetricRole, rpcInfo psrpc.RPCInfo) } @@ -70,7 +70,9 @@ func newClientRPCMetricsInterceptor(observer MetricsObserver) psrpc.ClientRPCInt return func(rpcInfo psrpc.RPCInfo, next psrpc.ClientRPCHandler) psrpc.ClientRPCHandler { return func(ctx context.Context, req proto.Message, opts ...psrpc.RequestOption) (res proto.Message, err error) { start := time.Now() - defer func() { observer.OnUnaryRequest(ClientRole, rpcInfo, time.Since(start), err) }() + defer func() { + observer.OnUnaryRequest(ClientRole, rpcInfo, time.Since(start), err, proto.Size(req), proto.Size(res)) + }() return next(ctx, req, opts...) } } @@ -87,9 +89,9 @@ func newServerRPCMetricsInterceptor(observer MetricsObserver) psrpc.ServerRPCInt } else { errorCount++ } - observer.OnMultiRequest(ServerRole, rpcInfo, time.Since(start), responseCount, errorCount) + observer.OnMultiRequest(ServerRole, rpcInfo, time.Since(start), responseCount, errorCount, proto.Size(req), proto.Size(res)) } else { - observer.OnUnaryRequest(ServerRole, rpcInfo, time.Since(start), err) + observer.OnUnaryRequest(ServerRole, rpcInfo, time.Since(start), err, proto.Size(req), proto.Size(res)) } }() return handler(ctx, req) @@ -116,13 +118,13 @@ type streamMetricsInterceptor struct { } func (s *streamMetricsInterceptor) Recv(msg proto.Message) (err error) { - s.observer.OnStreamRecv(s.role, s.info, err) + s.observer.OnStreamRecv(s.role, s.info, err, proto.Size(msg)) return s.StreamHandler.Recv(msg) } func (s *streamMetricsInterceptor) Send(msg proto.Message, opts ...psrpc.StreamOption) (err error) { start := time.Now() - defer func() { s.observer.OnStreamSend(s.role, s.info, time.Since(start), err) }() + defer func() { s.observer.OnStreamSend(s.role, s.info, time.Since(start), err, proto.Size(msg)) }() return s.StreamHandler.Send(msg, opts...) } @@ -149,10 +151,13 @@ type multiRPCMetricsInterceptor struct { info psrpc.RPCInfo responseCount int errorCount int + rxBytes int + txBytes int } func (r *multiRPCMetricsInterceptor) Send(ctx context.Context, req proto.Message, opts ...psrpc.RequestOption) error { r.start = time.Now() + r.txBytes += proto.Size(req) return r.ClientMultiRPCHandler.Send(ctx, req, opts...) } @@ -162,10 +167,11 @@ func (r *multiRPCMetricsInterceptor) Recv(msg proto.Message, err error) { } else { r.errorCount++ } + r.rxBytes += proto.Size(msg) r.ClientMultiRPCHandler.Recv(msg, err) } func (r *multiRPCMetricsInterceptor) Close() { - r.observer.OnMultiRequest(ClientRole, r.info, time.Since(r.start), r.responseCount, r.errorCount) + r.observer.OnMultiRequest(ClientRole, r.info, time.Since(r.start), r.responseCount, r.errorCount, r.rxBytes, r.txBytes) r.ClientMultiRPCHandler.Close() }