From 977190dfeb17824c1910dc4216f558edd651dcc2 Mon Sep 17 00:00:00 2001 From: Dmytro Horkhover Date: Tue, 13 Feb 2024 14:28:57 +0200 Subject: [PATCH 1/2] fix: update latency and count metrics --- internal/metric/metric.go | 80 ++++++++++++++++++--------------------- server.go | 4 +- state.go | 34 ++++++++--------- 3 files changed, 55 insertions(+), 63 deletions(-) diff --git a/internal/metric/metric.go b/internal/metric/metric.go index 496f15d5..fc0d2553 100644 --- a/internal/metric/metric.go +++ b/internal/metric/metric.go @@ -11,7 +11,9 @@ import ( "strconv" "time" + "github.com/minio/kes/internal/api" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/expfmt" ) @@ -20,33 +22,36 @@ import ( func New() *Metrics { requestStatusLabels := []string{"code"} + registry := prometheus.NewRegistry() + promFactory := promauto.With(registry) + metrics := &Metrics{ - registry: prometheus.NewRegistry(), - requestSucceeded: prometheus.NewCounterVec(prometheus.CounterOpts{ + gatherer: registry, + requestSucceeded: promFactory.NewCounterVec(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "http", Name: "request_success", Help: "Number of requests that have been served successfully.", }, requestStatusLabels), - requestErrored: prometheus.NewCounterVec(prometheus.CounterOpts{ + requestErrored: promFactory.NewCounterVec(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "http", Name: "request_error", Help: "Number of request that failed due to some error. (HTTP 4xx status code)", }, requestStatusLabels), - requestFailed: prometheus.NewCounterVec(prometheus.CounterOpts{ + requestFailed: promFactory.NewCounterVec(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "http", Name: "request_failure", Help: "Number of request that failed due to some internal failure. (HTTP 5xx status code)", }, requestStatusLabels), - requestActive: prometheus.NewGauge(prometheus.GaugeOpts{ + requestActive: promFactory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "http", Name: "request_active", Help: "Number of active requests that are not finished, yet.", }), - requestLatency: prometheus.NewHistogram(prometheus.HistogramOpts{ + requestLatency: promFactory.NewHistogram(prometheus.HistogramOpts{ Namespace: "kes", Subsystem: "http", Name: "response_time", @@ -54,13 +59,13 @@ func New() *Metrics { Help: "Histogram of request response times spawning from 10ms to 10s.", }), - errorLogEvents: prometheus.NewCounter(prometheus.CounterOpts{ + errorLogEvents: promFactory.NewCounter(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "log", Name: "error_events", Help: "Number of error log events written to the error log targets.", }), - auditLogEvents: prometheus.NewCounter(prometheus.CounterOpts{ + auditLogEvents: promFactory.NewCounter(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "log", Name: "audit_events", @@ -68,45 +73,45 @@ func New() *Metrics { }), startTime: time.Now(), - upTimeInSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ + upTimeInSeconds: promFactory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "up_time", Help: "The time the server has been up and running in seconds.", }), - numCPUs: prometheus.NewGauge(prometheus.GaugeOpts{ + numCPUs: promFactory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "num_cpu", Help: "The number of logical CPUs available on the system. It may be larger than the number of usable CPUs.", }), - numUsableCPUs: prometheus.NewGauge(prometheus.GaugeOpts{ + numUsableCPUs: promFactory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "num_cpu_used", Help: "The number of logical CPUs usable by the server. It may be smaller than the number of available CPUs.", }), - numThreads: prometheus.NewGauge(prometheus.GaugeOpts{ + numThreads: promFactory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "num_threads", Help: "The number of concurrent co-routines/threads that currently exists.", }), - memHeapUsed: prometheus.NewGauge(prometheus.GaugeOpts{ + memHeapUsed: promFactory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "mem_heap_used", Help: "The number of bytes that are currently allocated on the heap memory.", }), - memHeapObjects: prometheus.NewGauge(prometheus.GaugeOpts{ + memHeapObjects: promFactory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "mem_heap_objects", Help: "The number of objects that are currently allocated on the heap memory.", }), - memStackUsed: prometheus.NewGauge(prometheus.GaugeOpts{ + memStackUsed: promFactory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "mem_stack_used", @@ -114,28 +119,13 @@ func New() *Metrics { }), } - metrics.registry.MustRegister(metrics.requestSucceeded) - metrics.registry.MustRegister(metrics.requestErrored) - metrics.registry.MustRegister(metrics.requestFailed) - metrics.registry.MustRegister(metrics.requestActive) - metrics.registry.MustRegister(metrics.requestLatency) - metrics.registry.MustRegister(metrics.errorLogEvents) - metrics.registry.MustRegister(metrics.auditLogEvents) - metrics.registry.MustRegister(metrics.upTimeInSeconds) - metrics.registry.MustRegister(metrics.numCPUs) - metrics.registry.MustRegister(metrics.numUsableCPUs) - metrics.registry.MustRegister(metrics.numThreads) - metrics.registry.MustRegister(metrics.memHeapUsed) - metrics.registry.MustRegister(metrics.memHeapObjects) - metrics.registry.MustRegister(metrics.memStackUsed) - return metrics } // Metrics is a type that gathers various metrics and information // about an application. type Metrics struct { - registry *prometheus.Registry + gatherer prometheus.Gatherer requestSucceeded *prometheus.CounterVec requestFailed *prometheus.CounterVec @@ -171,7 +161,7 @@ func (m *Metrics) EncodeTo(encoder expfmt.Encoder) error { m.memHeapObjects.Set(float64(memStats.HeapObjects)) m.memStackUsed.Set(float64(memStats.StackSys)) - metrics, err := m.registry.Gather() + metrics, err := m.gatherer.Gather() if err != nil { return err } @@ -190,21 +180,22 @@ func (m *Metrics) EncodeTo(encoder expfmt.Encoder) error { // Count distingushes requests that fail with some sort of // well-defined error (HTTP 4xx) and requests that fail due // to some internal error (HTTP 5xx). -func (m *Metrics) Count(h http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +func (m *Metrics) Count(h api.Handler) api.Handler { + return api.HandlerFunc(func(resp *api.Response, req *api.Request) { m.requestActive.Inc() defer m.requestActive.Dec() - rw := countResponseWriter{ - ResponseWriter: w, + rw := &countResponseWriter{ + ResponseWriter: resp.ResponseWriter, succeeded: m.requestSucceeded, errored: m.requestErrored, failed: m.requestFailed, } - if flusher, ok := w.(http.Flusher); ok { + if flusher, ok := resp.ResponseWriter.(http.Flusher); ok { rw.flusher = flusher } - h.ServeHTTP(&rw, r) + resp.ResponseWriter = rw + h.ServeAPI(resp, req) }) } @@ -215,17 +206,18 @@ func (m *Metrics) Count(h http.Handler) http.Handler { // application takes to generate and send a response after // receiving a request. It basically shows how many request // the application can handle. -func (m *Metrics) Latency(h http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - rw := latencyResponseWriter{ - ResponseWriter: w, +func (m *Metrics) Latency(h api.Handler) api.Handler { + return api.HandlerFunc(func(resp *api.Response, req *api.Request) { + rw := &latencyResponseWriter{ + ResponseWriter: resp.ResponseWriter, start: time.Now(), histogram: m.requestLatency, } - if flusher, ok := w.(http.Flusher); ok { + if flusher, ok := rw.ResponseWriter.(http.Flusher); ok { rw.flusher = flusher } - h.ServeHTTP(&rw, r) + resp.ResponseWriter = rw + h.ServeAPI(resp, req) }) } diff --git a/server.go b/server.go index 88f2f652..fcd28490 100644 --- a/server.go +++ b/server.go @@ -258,7 +258,7 @@ func (s *Server) Update(conf *Config) (io.Closer, error) { state.Audit.h = conf.AuditLog } - mux, routes := initRoutes(s, conf.Routes) + mux, routes := initRoutes(s, conf.Routes, state.Metrics) state.Routes = routes s.tls.Store(conf.TLS.Clone()) @@ -427,7 +427,7 @@ func (s *Server) listen(ctx context.Context, ln net.Listener, conf *Config) (net state.Audit = newAuditLogger(conf.AuditLog, &s.AuditLevel) } - mux, routes := initRoutes(s, conf.Routes) + mux, routes := initRoutes(s, conf.Routes, state.Metrics) state.Routes = routes s.tls.Store(conf.TLS.Clone()) diff --git a/state.go b/state.go index a13d1663..9f6a6a81 100644 --- a/state.go +++ b/state.go @@ -40,7 +40,7 @@ type identityEntry struct { *kes.Policy } -func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, map[string]api.Route) { +func initRoutes(s *Server, routeConfig map[string]RouteConfig, metrics *metric.Metrics) (*http.ServeMux, map[string]api.Route) { routes := map[string]api.Route{ api.PathVersion: { Method: http.MethodGet, @@ -89,7 +89,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.createKey), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.createKey))), }, api.PathKeyImport: { Method: http.MethodPut, @@ -97,7 +97,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 1 * mem.MB, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.importKey), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.importKey))), }, api.PathKeyDescribe: { Method: http.MethodGet, @@ -105,7 +105,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.describeKey), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.describeKey))), }, api.PathKeyList: { Method: http.MethodGet, @@ -113,7 +113,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.listKeys), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.listKeys))), }, api.PathKeyDelete: { Method: http.MethodDelete, @@ -121,7 +121,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.deleteKey), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.deleteKey))), }, api.PathKeyEncrypt: { Method: http.MethodPut, @@ -129,7 +129,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 1 * mem.MB, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.encryptKey), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.encryptKey))), }, api.PathKeyGenerate: { Method: http.MethodPut, @@ -137,7 +137,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 1 * mem.MB, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.generateKey), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.generateKey))), }, api.PathKeyDecrypt: { Method: http.MethodPut, @@ -145,7 +145,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 1 * mem.MB, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.decryptKey), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.decryptKey))), }, api.PathKeyHMAC: { Method: http.MethodPut, @@ -153,7 +153,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 1 * mem.MB, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.hmacKey), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.hmacKey))), }, api.PathPolicyDescribe: { @@ -170,7 +170,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.readPolicy), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.readPolicy))), }, api.PathPolicyList: { Method: http.MethodGet, @@ -178,7 +178,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.listPolicies), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.listPolicies))), }, api.PathIdentityDescribe: { @@ -187,7 +187,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.describeIdentity), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.describeIdentity))), }, api.PathIdentityList: { Method: http.MethodGet, @@ -195,7 +195,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.listIdentities), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.listIdentities))), }, api.PathIdentitySelfDescribe: { Method: http.MethodGet, @@ -203,7 +203,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 15 * time.Second, Auth: insecureIdentifyOnly{}, // Anyone can use the self-describe API as long as a client cert is provided - Handler: api.HandlerFunc(s.selfDescribeIdentity), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.selfDescribeIdentity))), }, api.PathLogError: { @@ -212,7 +212,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 0, // No timeout Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.logError), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.logError))), }, api.PathLogAudit: { Method: http.MethodGet, @@ -220,7 +220,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig) (*http.ServeMux, MaxBody: 0, Timeout: 0, // No timeout Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.logAudit), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.logAudit))), }, } From db1421b8a14d01a4aea2fb284020cdc202415cb2 Mon Sep 17 00:00:00 2001 From: Dmytro Horkhover Date: Wed, 14 Feb 2024 11:47:04 +0200 Subject: [PATCH 2/2] fix: error and audit event metrics --- internal/api/api.go | 14 ++++ internal/metric/metric.go | 162 +++++++++++++++++++++----------------- state.go | 6 +- 3 files changed, 105 insertions(+), 77 deletions(-) diff --git a/internal/api/api.go b/internal/api/api.go index b3fcc559..2112ca02 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -211,6 +211,11 @@ type Response struct { http.ResponseWriter } +var ( + _ http.ResponseWriter = (*Response)(nil) + _ http.Flusher = (*Response)(nil) +) + // Reply is a shorthand for api.Reply. It sends just an HTTP // status code to the client. The response body is empty. func (r *Response) Reply(code int) { Reply(r, code) } @@ -245,6 +250,15 @@ func ReplyWith(r *Response, code int, data any) error { return json.NewEncoder(r).Encode(data) } +// Flush sends any buffered data to the client. +// +// This method will be called by http.ResponseController. +func (r *Response) Flush() { + if flusher, ok := r.ResponseWriter.(http.Flusher); ok { + flusher.Flush() + } +} + // ReadBody reads the request body into v using the // request content encoding. // diff --git a/internal/metric/metric.go b/internal/metric/metric.go index fc0d2553..9d56b380 100644 --- a/internal/metric/metric.go +++ b/internal/metric/metric.go @@ -5,7 +5,6 @@ package metric import ( - "io" "net/http" "runtime" "strconv" @@ -23,35 +22,35 @@ func New() *Metrics { requestStatusLabels := []string{"code"} registry := prometheus.NewRegistry() - promFactory := promauto.With(registry) + factory := promauto.With(registry) metrics := &Metrics{ gatherer: registry, - requestSucceeded: promFactory.NewCounterVec(prometheus.CounterOpts{ + requestSucceeded: factory.NewCounterVec(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "http", Name: "request_success", Help: "Number of requests that have been served successfully.", }, requestStatusLabels), - requestErrored: promFactory.NewCounterVec(prometheus.CounterOpts{ + requestErrored: factory.NewCounterVec(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "http", Name: "request_error", Help: "Number of request that failed due to some error. (HTTP 4xx status code)", }, requestStatusLabels), - requestFailed: promFactory.NewCounterVec(prometheus.CounterOpts{ + requestFailed: factory.NewCounterVec(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "http", Name: "request_failure", Help: "Number of request that failed due to some internal failure. (HTTP 5xx status code)", }, requestStatusLabels), - requestActive: promFactory.NewGauge(prometheus.GaugeOpts{ + requestActive: factory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "http", Name: "request_active", Help: "Number of active requests that are not finished, yet.", }), - requestLatency: promFactory.NewHistogram(prometheus.HistogramOpts{ + requestLatency: factory.NewHistogram(prometheus.HistogramOpts{ Namespace: "kes", Subsystem: "http", Name: "response_time", @@ -59,13 +58,13 @@ func New() *Metrics { Help: "Histogram of request response times spawning from 10ms to 10s.", }), - errorLogEvents: promFactory.NewCounter(prometheus.CounterOpts{ + errorLogEvents: factory.NewCounter(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "log", Name: "error_events", Help: "Number of error log events written to the error log targets.", }), - auditLogEvents: promFactory.NewCounter(prometheus.CounterOpts{ + auditLogEvents: factory.NewCounter(prometheus.CounterOpts{ Namespace: "kes", Subsystem: "log", Name: "audit_events", @@ -73,45 +72,45 @@ func New() *Metrics { }), startTime: time.Now(), - upTimeInSeconds: promFactory.NewGauge(prometheus.GaugeOpts{ + upTimeInSeconds: factory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "up_time", Help: "The time the server has been up and running in seconds.", }), - numCPUs: promFactory.NewGauge(prometheus.GaugeOpts{ + numCPUs: factory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "num_cpu", Help: "The number of logical CPUs available on the system. It may be larger than the number of usable CPUs.", }), - numUsableCPUs: promFactory.NewGauge(prometheus.GaugeOpts{ + numUsableCPUs: factory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "num_cpu_used", Help: "The number of logical CPUs usable by the server. It may be smaller than the number of available CPUs.", }), - numThreads: promFactory.NewGauge(prometheus.GaugeOpts{ + numThreads: factory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "num_threads", Help: "The number of concurrent co-routines/threads that currently exists.", }), - memHeapUsed: promFactory.NewGauge(prometheus.GaugeOpts{ + memHeapUsed: factory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "mem_heap_used", Help: "The number of bytes that are currently allocated on the heap memory.", }), - memHeapObjects: promFactory.NewGauge(prometheus.GaugeOpts{ + memHeapObjects: factory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "mem_heap_objects", Help: "The number of objects that are currently allocated on the heap memory.", }), - memStackUsed: promFactory.NewGauge(prometheus.GaugeOpts{ + memStackUsed: factory.NewGauge(prometheus.GaugeOpts{ Namespace: "kes", Subsystem: "system", Name: "mem_stack_used", @@ -191,9 +190,7 @@ func (m *Metrics) Count(h api.Handler) api.Handler { errored: m.requestErrored, failed: m.requestFailed, } - if flusher, ok := resp.ResponseWriter.(http.Flusher); ok { - rw.flusher = flusher - } + defer rw.updateMetrics() resp.ResponseWriter = rw h.ServeAPI(resp, req) }) @@ -213,9 +210,7 @@ func (m *Metrics) Latency(h api.Handler) api.Handler { start: time.Now(), histogram: m.requestLatency, } - if flusher, ok := rw.ResponseWriter.(http.Flusher); ok { - rw.flusher = flusher - } + defer rw.updateMetrics() resp.ResponseWriter = rw h.ServeAPI(resp, req) }) @@ -225,36 +220,61 @@ func (m *Metrics) Latency(h api.Handler) api.Handler { // the error event log counter on each write call. // // The returned io.Writer never returns an error on writes. -func (m *Metrics) ErrorEventCounter() io.Writer { - return eventCounter{metric: m.errorLogEvents} +func (m *Metrics) ErrorEventCounter(h api.Handler) api.Handler { + return api.HandlerFunc(func(resp *api.Response, req *api.Request) { + resp.ResponseWriter = &eventCounterWriter{ + ResponseWriter: resp.ResponseWriter, + metric: m.errorLogEvents, + } + h.ServeAPI(resp, req) + }) } // AuditEventCounter returns an io.Writer that increments // the audit event log counter on each write call. // -// The returned io.Writer never returns an error on writes. -func (m *Metrics) AuditEventCounter() io.Writer { - return eventCounter{metric: m.auditLogEvents} +// The returned http.ResponseWriter never returns an error on writes. +func (m *Metrics) AuditEventCounter(h api.Handler) api.Handler { + return api.HandlerFunc(func(resp *api.Response, req *api.Request) { + resp.ResponseWriter = &eventCounterWriter{ + ResponseWriter: resp.ResponseWriter, + metric: m.auditLogEvents, + } + h.ServeAPI(resp, req) + }) } -type eventCounter struct { +type eventCounterWriter struct { + http.ResponseWriter metric prometheus.Counter } -func (w eventCounter) Write(p []byte) (int, error) { +var ( + _ http.ResponseWriter = (*eventCounterWriter)(nil) + _ http.Flusher = (*eventCounterWriter)(nil) +) + +func (w *eventCounterWriter) Write(p []byte) (int, error) { w.metric.Inc() return len(p), nil } +// Flush sends any buffered data to the client. +// +// This method will be called by http.ResponseController. +func (w *eventCounterWriter) Flush() { + if flusher, ok := w.ResponseWriter.(http.Flusher); ok { + flusher.Flush() + } +} + // latencyResponseWriter is an http.ResponseWriter that // measures the internal request-response latency. type latencyResponseWriter struct { http.ResponseWriter - flusher http.Flusher start time.Time // The point in time when the request was received histogram prometheus.Histogram // The latency histogram - written bool // Inidicates whether the HTTP headers have been written } var ( @@ -262,25 +282,20 @@ var ( _ http.Flusher = (*latencyResponseWriter)(nil) ) -func (w *latencyResponseWriter) WriteHeader(status int) { - w.ResponseWriter.WriteHeader(status) - if !w.written { - w.histogram.Observe(time.Since(w.start).Seconds()) - w.written = true - } +// Updates metric request-response latency. +func (w *latencyResponseWriter) updateMetrics() { + w.histogram.Observe(time.Since(w.start).Seconds()) } +// Flush sends any buffered data to the client. +// +// This method will be called by http.ResponseController. func (w *latencyResponseWriter) Flush() { - if w.flusher != nil { - w.flusher.Flush() + if flusher, ok := w.ResponseWriter.(http.Flusher); ok { + flusher.Flush() } } -// Unwrap returns the underlying http.ResponseWriter. -// -// This method is implemented for http.ResponseController. -func (w *latencyResponseWriter) Unwrap() http.ResponseWriter { return w.ResponseWriter } - // countResponseWriter is an http.ResponseWriter that // counts the number of requests partition by requests // that: @@ -289,13 +304,13 @@ func (w *latencyResponseWriter) Unwrap() http.ResponseWriter { return w.Response // - Failed (HTTP 5xx) type countResponseWriter struct { http.ResponseWriter - flusher http.Flusher succeeded *prometheus.CounterVec errored *prometheus.CounterVec failed *prometheus.CounterVec - prometheus.Metric - written bool // Inidicates whether the HTTP headers have been written + + // HTTP status code set by WriteHeader + status int } var ( @@ -305,36 +320,35 @@ var ( func (w *countResponseWriter) WriteHeader(status int) { w.ResponseWriter.WriteHeader(status) - if !w.written { - switch { - case status >= 200 && status < 300: - w.succeeded.WithLabelValues(strconv.Itoa(status)).Inc() - case status >= 400 && status < 500: - w.errored.WithLabelValues(strconv.Itoa(status)).Inc() - case status >= 500 && status < 600: - w.failed.WithLabelValues(strconv.Itoa(status)).Inc() - default: - // We panic to signal that the server returned a status code - // that is not tracked. If, in the future, the application - // returns a new (kind of) status code it should be collected - // as well. - // Otherwise, we would silently ignore new status codes and the - // metrics would be incomplete. - panic("metrics: unexpected response status code " + strconv.Itoa(status)) - } - w.written = true - } + w.status = status } -func (w *countResponseWriter) Write(b []byte) (int, error) { return w.ResponseWriter.Write(b) } - -func (w *countResponseWriter) Flush() { - if w.flusher != nil { - w.flusher.Flush() +// Updates metrics count partitioned by response status code. +// Must be called when response is done. +func (w *countResponseWriter) updateMetrics() { + switch { + case w.status >= 200 && w.status < 300: + w.succeeded.WithLabelValues(strconv.Itoa(w.status)).Inc() + case w.status >= 400 && w.status < 500: + w.errored.WithLabelValues(strconv.Itoa(w.status)).Inc() + case w.status >= 500 && w.status < 600: + w.failed.WithLabelValues(strconv.Itoa(w.status)).Inc() + default: + // We panic to signal that the server returned a status code + // that is not tracked. If, in the future, the application + // returns a new (kind of) status code it should be collected + // as well. + // Otherwise, we would silently ignore new status codes and the + // metrics would be incomplete. + panic("metrics: unexpected response status code " + strconv.Itoa(w.status)) } } -// Unwrap returns the underlying http.ResponseWriter. +// Flush sends any buffered data to the client. // -// This method is implemented for http.ResponseController. -func (w *countResponseWriter) Unwrap() http.ResponseWriter { return w.ResponseWriter } +// This method will be called by http.ResponseController. +func (w *countResponseWriter) Flush() { + if flusher, ok := w.ResponseWriter.(http.Flusher); ok { + flusher.Flush() + } +} diff --git a/state.go b/state.go index 9f6a6a81..eae8c359 100644 --- a/state.go +++ b/state.go @@ -162,7 +162,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig, metrics *metric.M MaxBody: 0, Timeout: 15 * time.Second, Auth: (*verifyIdentity)(&s.state), - Handler: api.HandlerFunc(s.describePolicy), + Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.describePolicy))), }, api.PathPolicyRead: { Method: http.MethodGet, @@ -212,7 +212,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig, metrics *metric.M MaxBody: 0, Timeout: 0, // No timeout Auth: (*verifyIdentity)(&s.state), - Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.logError))), + Handler: metrics.ErrorEventCounter(api.HandlerFunc(s.logError)), }, api.PathLogAudit: { Method: http.MethodGet, @@ -220,7 +220,7 @@ func initRoutes(s *Server, routeConfig map[string]RouteConfig, metrics *metric.M MaxBody: 0, Timeout: 0, // No timeout Auth: (*verifyIdentity)(&s.state), - Handler: metrics.Latency(metrics.Count(api.HandlerFunc(s.logAudit))), + Handler: metrics.AuditEventCounter(api.HandlerFunc(s.logAudit)), }, }