From 3faf56e93b8333a67d6515da4858c7dba9a2c7fc Mon Sep 17 00:00:00 2001 From: Konstantin Voykov Date: Sat, 29 Jun 2024 01:01:26 +0300 Subject: [PATCH 1/2] metrics: remove dependency of queue repo avoiding using internal types --- dummy.go | 30 +++++++++++++++--------------- leak.go | 14 ++++++++++++-- metrics.go | 4 ++-- metrics/log/writer.go | 22 ++++++++-------------- metrics/prometheus/writer.go | 15 +++++---------- queue.go | 6 +++--- worker.go | 21 ++++++++++++++++++--- 7 files changed, 63 insertions(+), 49 deletions(-) diff --git a/dummy.go b/dummy.go index 700b466..7c0ac2d 100644 --- a/dummy.go +++ b/dummy.go @@ -6,21 +6,21 @@ import "time" // Need just to reduce checks in code. type DummyMetrics struct{} -func (DummyMetrics) WorkerSetup(_, _, _ uint) {} -func (DummyMetrics) WorkerInit(_ uint32) {} -func (DummyMetrics) WorkerSleep(_ uint32) {} -func (DummyMetrics) WorkerWakeup(_ uint32) {} -func (DummyMetrics) WorkerWait(_ uint32, _ time.Duration) {} -func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ WorkerStatus) {} -func (DummyMetrics) QueuePut() {} -func (DummyMetrics) QueuePull() {} -func (DummyMetrics) QueueRetry() {} -func (DummyMetrics) QueueLeak(_ LeakDirection) {} -func (DummyMetrics) QueueDeadline() {} -func (DummyMetrics) QueueLost() {} -func (DummyMetrics) SubqPut(_ string) {} -func (DummyMetrics) SubqPull(_ string) {} -func (DummyMetrics) SubqLeak(_ string) {} +func (DummyMetrics) WorkerSetup(_, _, _ uint) {} +func (DummyMetrics) WorkerInit(_ uint32) {} +func (DummyMetrics) WorkerSleep(_ uint32) {} +func (DummyMetrics) WorkerWakeup(_ uint32) {} +func (DummyMetrics) WorkerWait(_ uint32, _ time.Duration) {} +func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ string) {} +func (DummyMetrics) QueuePut() {} +func (DummyMetrics) QueuePull() {} +func (DummyMetrics) QueueRetry() {} +func (DummyMetrics) QueueLeak(_ string) {} +func (DummyMetrics) QueueDeadline() {} +func (DummyMetrics) QueueLost() {} +func (DummyMetrics) SubqPut(_ string) {} +func (DummyMetrics) SubqPull(_ string) {} +func (DummyMetrics) SubqLeak(_ string) {} // DummyDLQ is a stub DLQ implementation. It does nothing and need for queues with leak tolerance. // It just leaks data to the trash. diff --git a/leak.go b/leak.go index 74a3ff8..9b66987 100644 --- a/leak.go +++ b/leak.go @@ -9,6 +9,16 @@ const ( // LeakDirectionFront takes old item from queue front and redirects it to DLQ. Thus releases space for the new // incoming item in the queue. LeakDirectionFront - - defaultFrontLeakAttempts = 5 ) + +const defaultFrontLeakAttempts = 5 + +func (ld LeakDirection) String() string { + switch ld { + case LeakDirectionRear: + return "rear" + case LeakDirectionFront: + return "front" + } + return "unknown" +} diff --git a/metrics.go b/metrics.go index c3f9da8..5f53b72 100644 --- a/metrics.go +++ b/metrics.go @@ -17,7 +17,7 @@ type MetricsWriter interface { // WorkerWait registers how many worker waits due to delayed execution. WorkerWait(idx uint32, dur time.Duration) // WorkerStop registers when sleeping worker stops. - WorkerStop(idx uint32, force bool, status WorkerStatus) + WorkerStop(idx uint32, force bool, status string) // QueuePut registers income of new item to the queue. QueuePut() // QueuePull registers outgoing of item from the queue. @@ -26,7 +26,7 @@ type MetricsWriter interface { QueueRetry() // QueueLeak registers item's leak from the full queue. // Param dir indicates leak direction and may be "rear" or "front". - QueueLeak(dir LeakDirection) + QueueLeak(direction string) // QueueDeadline registers amount of skipped processing of items due to deadline. QueueDeadline() // QueueLost registers lost items missed queue and DLQ. diff --git a/metrics/log/writer.go b/metrics/log/writer.go index fc74fad..c8e27a7 100644 --- a/metrics/log/writer.go +++ b/metrics/log/writer.go @@ -3,8 +3,6 @@ package log import ( "log" "time" - - q "github.com/koykov/queue" ) // MetricsWriter is Log implementation of queue.MetricsWriter. @@ -41,9 +39,9 @@ func (w MetricsWriter) WorkerWait(idx uint32, delay time.Duration) { log.Printf("queue %s: worker %d waits %s\n", w.name, idx, delay) } -func (w MetricsWriter) WorkerStop(idx uint32, force bool, status q.WorkerStatus) { +func (w MetricsWriter) WorkerStop(idx uint32, force bool, status string) { if force { - log.Printf("queue %s: worker %d caught force stop signal (current status %d)\n", w.name, idx, status) + log.Printf("queue %s: worker %d caught force stop signal (current status %s)\n", w.name, idx, status) } else { log.Printf("queue %s: worker %d caught stop signal\n", w.name, idx) } @@ -61,12 +59,8 @@ func (w MetricsWriter) QueueRetry() { log.Printf("queue %s: retry item processing due to fail\n", w.name) } -func (w MetricsWriter) QueueLeak(dir q.LeakDirection) { - dirs := "rear" - if dir == q.LeakDirectionFront { - dirs = "front" - } - log.Printf("queue %s: queue leak from %s\n", w.name, dirs) +func (w MetricsWriter) QueueLeak(direction string) { + log.Printf("queue %s: queue leak from %s\n", w.name, direction) } func (w MetricsWriter) QueueDeadline() { @@ -77,14 +71,14 @@ func (w MetricsWriter) QueueLost() { log.Printf("queue %s: queue lost\n", w.name) } -func (w MetricsWriter) SubQueuePut(subq string) { +func (w MetricsWriter) SubqPut(subq string) { log.Printf("queue %s/%s: new item come to the queue\n", w.name, subq) } -func (w MetricsWriter) SubQueuePull(subq string) { +func (w MetricsWriter) SubqPull(subq string) { log.Printf("queue %s/%s: item leave the queue\n", w.name, subq) } -func (w MetricsWriter) SubQueueDrop(subq string) { - log.Printf("queue %s/%s: queue drop item\n", w.name, subq) +func (w MetricsWriter) SubqLeak(subq string) { + log.Printf("queue %s/%s: queue leak item\n", w.name, subq) } diff --git a/metrics/prometheus/writer.go b/metrics/prometheus/writer.go index 1c64196..f0f757e 100644 --- a/metrics/prometheus/writer.go +++ b/metrics/prometheus/writer.go @@ -3,7 +3,6 @@ package prometheus import ( "time" - q "github.com/koykov/queue" "github.com/prometheus/client_golang/prometheus" ) @@ -141,13 +140,13 @@ func (w MetricsWriter) WorkerWait(_ uint32, delay time.Duration) { promWorkerWait.WithLabelValues(w.name).Observe(float64(delay.Nanoseconds() / int64(w.prec))) } -func (w MetricsWriter) WorkerStop(_ uint32, force bool, status q.WorkerStatus) { +func (w MetricsWriter) WorkerStop(_ uint32, force bool, status string) { promWorkerIdle.WithLabelValues(w.name).Inc() if force { switch status { - case q.WorkerStatusActive: + case "active": promWorkerActive.WithLabelValues(w.name).Add(-1) - case q.WorkerStatusSleep: + case "sleep": promWorkerSleep.WithLabelValues(w.name).Add(-1) } } else { @@ -169,12 +168,8 @@ func (w MetricsWriter) QueueRetry() { promQueueRetry.WithLabelValues(w.name).Inc() } -func (w MetricsWriter) QueueLeak(dir q.LeakDirection) { - dirs := "rear" - if dir == q.LeakDirectionFront { - dirs = "front" - } - promQueueLeak.WithLabelValues(w.name, dirs).Inc() +func (w MetricsWriter) QueueLeak(direction string) { + promQueueLeak.WithLabelValues(w.name, direction).Inc() promQueueSize.WithLabelValues(w.name).Dec() } diff --git a/queue.go b/queue.go index d4cd80a..dcb0bb2 100644 --- a/queue.go +++ b/queue.go @@ -298,7 +298,7 @@ func (q *Queue) renqueue(itm *item) (err error) { q.mw().QueueLost() return } - q.mw().QueueLeak(LeakDirectionFront) + q.mw().QueueLeak(LeakDirectionFront.String()) if q.engine.enqueue(itm, false) { return } else { @@ -309,7 +309,7 @@ func (q *Queue) renqueue(itm *item) (err error) { } // Rear direction, just leak item. err = q.c().DLQ.Enqueue(itm.payload) - q.mw().QueueLeak(LeakDirectionRear) + q.mw().QueueLeak(LeakDirectionRear.String()) } } else { // Regular put (blocking mode). @@ -383,7 +383,7 @@ func (q *Queue) close(force bool) error { itm, _ := q.engine.dequeue() if q.CheckBit(flagLeaky) { _ = q.c().DLQ.Enqueue(itm.payload) - q.mw().QueueLeak(LeakDirectionFront) + q.mw().QueueLeak(LeakDirectionFront.String()) } else { q.mw().QueueLost() } diff --git a/worker.go b/worker.go index adbb90b..501b093 100644 --- a/worker.go +++ b/worker.go @@ -6,13 +6,28 @@ import ( ) type WorkerStatus uint32 -type signal uint32 const ( WorkerStatusIdle WorkerStatus = iota WorkerStatusActive WorkerStatusSleep +) + +func (s WorkerStatus) String() string { + switch s { + case WorkerStatusIdle: + return "idle" + case WorkerStatusActive: + return "active" + case WorkerStatusSleep: + return "sleep" + } + return "unknown" +} + +type signal uint32 +const ( sigInit signal = iota sigSleep sigWakeup @@ -133,7 +148,7 @@ func (w *worker) await(queue *Queue) { _ = queue.renqueue(&itm) } else if queue.CheckBit(flagLeaky) && w.c().FailToDLQ { _ = w.c().DLQ.Enqueue(itm.payload) - w.mw().QueueLeak(LeakDirectionFront) + w.mw().QueueLeak(LeakDirectionFront.String()) } } case WorkerStatusIdle: @@ -180,7 +195,7 @@ func (w *worker) stop(force bool) { } w.l().Printf(msg, w.idx) } - w.mw().WorkerStop(w.idx, force, w.getStatus()) + w.mw().WorkerStop(w.idx, force, w.getStatus().String()) w.setStatus(WorkerStatusIdle) w.notifyCtl() } From 8d4768e4915d1939100bf95202158a3119960f2d Mon Sep 17 00:00:00 2001 From: Konstantin Voykov Date: Sat, 29 Jun 2024 01:01:40 +0300 Subject: [PATCH 2/2] metrics: go.mod issues --- metrics/log/go.mod | 3 +++ metrics/prometheus/go.mod | 15 +++++++++++++++ metrics/prometheus/go.sum | 17 +++++++++++++++++ 3 files changed, 35 insertions(+) create mode 100644 metrics/log/go.mod create mode 100644 metrics/prometheus/go.mod create mode 100644 metrics/prometheus/go.sum diff --git a/metrics/log/go.mod b/metrics/log/go.mod new file mode 100644 index 0000000..dce7629 --- /dev/null +++ b/metrics/log/go.mod @@ -0,0 +1,3 @@ +module github.com/koykov/queue/metrics/log + +go 1.18 diff --git a/metrics/prometheus/go.mod b/metrics/prometheus/go.mod new file mode 100644 index 0000000..35dde86 --- /dev/null +++ b/metrics/prometheus/go.mod @@ -0,0 +1,15 @@ +module github.com/koykov/queue/metrics/prometheus + +go 1.18 + +require github.com/prometheus/client_golang v1.19.1 + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.48.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect + golang.org/x/sys v0.17.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect +) diff --git a/metrics/prometheus/go.sum b/metrics/prometheus/go.sum new file mode 100644 index 0000000..843bcae --- /dev/null +++ b/metrics/prometheus/go.sum @@ -0,0 +1,17 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=