Skip to content

Commit

Permalink
Merge pull request #21 from koykov/metrics-1
Browse files Browse the repository at this point in the history
metrics: remove dependency of queue repo avoiding using internal types
  • Loading branch information
koykov authored Jun 28, 2024
2 parents 37ba537 + 8d4768e commit 8484e6b
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 49 deletions.
30 changes: 15 additions & 15 deletions dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ import "time"
// Need just to reduce checks in code.
type DummyMetrics struct{}

func (DummyMetrics) WorkerSetup(_, _, _ uint) {}
func (DummyMetrics) WorkerInit(_ uint32) {}
func (DummyMetrics) WorkerSleep(_ uint32) {}
func (DummyMetrics) WorkerWakeup(_ uint32) {}
func (DummyMetrics) WorkerWait(_ uint32, _ time.Duration) {}
func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ WorkerStatus) {}
func (DummyMetrics) QueuePut() {}
func (DummyMetrics) QueuePull() {}
func (DummyMetrics) QueueRetry() {}
func (DummyMetrics) QueueLeak(_ LeakDirection) {}
func (DummyMetrics) QueueDeadline() {}
func (DummyMetrics) QueueLost() {}
func (DummyMetrics) SubqPut(_ string) {}
func (DummyMetrics) SubqPull(_ string) {}
func (DummyMetrics) SubqLeak(_ string) {}
func (DummyMetrics) WorkerSetup(_, _, _ uint) {}
func (DummyMetrics) WorkerInit(_ uint32) {}
func (DummyMetrics) WorkerSleep(_ uint32) {}
func (DummyMetrics) WorkerWakeup(_ uint32) {}
func (DummyMetrics) WorkerWait(_ uint32, _ time.Duration) {}
func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ string) {}
func (DummyMetrics) QueuePut() {}
func (DummyMetrics) QueuePull() {}
func (DummyMetrics) QueueRetry() {}
func (DummyMetrics) QueueLeak(_ string) {}
func (DummyMetrics) QueueDeadline() {}
func (DummyMetrics) QueueLost() {}
func (DummyMetrics) SubqPut(_ string) {}
func (DummyMetrics) SubqPull(_ string) {}
func (DummyMetrics) SubqLeak(_ string) {}

// DummyDLQ is a stub DLQ implementation. It does nothing and need for queues with leak tolerance.
// It just leaks data to the trash.
Expand Down
14 changes: 12 additions & 2 deletions leak.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ const (
// LeakDirectionFront takes old item from queue front and redirects it to DLQ. Thus releases space for the new
// incoming item in the queue.
LeakDirectionFront

defaultFrontLeakAttempts = 5
)

const defaultFrontLeakAttempts = 5

func (ld LeakDirection) String() string {
switch ld {
case LeakDirectionRear:
return "rear"
case LeakDirectionFront:
return "front"
}
return "unknown"
}
4 changes: 2 additions & 2 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type MetricsWriter interface {
// WorkerWait registers how many worker waits due to delayed execution.
WorkerWait(idx uint32, dur time.Duration)
// WorkerStop registers when sleeping worker stops.
WorkerStop(idx uint32, force bool, status WorkerStatus)
WorkerStop(idx uint32, force bool, status string)
// QueuePut registers income of new item to the queue.
QueuePut()
// QueuePull registers outgoing of item from the queue.
Expand All @@ -26,7 +26,7 @@ type MetricsWriter interface {
QueueRetry()
// QueueLeak registers item's leak from the full queue.
// Param dir indicates leak direction and may be "rear" or "front".
QueueLeak(dir LeakDirection)
QueueLeak(direction string)
// QueueDeadline registers amount of skipped processing of items due to deadline.
QueueDeadline()
// QueueLost registers lost items missed queue and DLQ.
Expand Down
3 changes: 3 additions & 0 deletions metrics/log/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/koykov/queue/metrics/log

go 1.18
22 changes: 8 additions & 14 deletions metrics/log/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package log
import (
"log"
"time"

q "github.com/koykov/queue"
)

// MetricsWriter is Log implementation of queue.MetricsWriter.
Expand Down Expand Up @@ -41,9 +39,9 @@ func (w MetricsWriter) WorkerWait(idx uint32, delay time.Duration) {
log.Printf("queue %s: worker %d waits %s\n", w.name, idx, delay)
}

func (w MetricsWriter) WorkerStop(idx uint32, force bool, status q.WorkerStatus) {
func (w MetricsWriter) WorkerStop(idx uint32, force bool, status string) {
if force {
log.Printf("queue %s: worker %d caught force stop signal (current status %d)\n", w.name, idx, status)
log.Printf("queue %s: worker %d caught force stop signal (current status %s)\n", w.name, idx, status)
} else {
log.Printf("queue %s: worker %d caught stop signal\n", w.name, idx)
}
Expand All @@ -61,12 +59,8 @@ func (w MetricsWriter) QueueRetry() {
log.Printf("queue %s: retry item processing due to fail\n", w.name)
}

func (w MetricsWriter) QueueLeak(dir q.LeakDirection) {
dirs := "rear"
if dir == q.LeakDirectionFront {
dirs = "front"
}
log.Printf("queue %s: queue leak from %s\n", w.name, dirs)
func (w MetricsWriter) QueueLeak(direction string) {
log.Printf("queue %s: queue leak from %s\n", w.name, direction)
}

func (w MetricsWriter) QueueDeadline() {
Expand All @@ -77,14 +71,14 @@ func (w MetricsWriter) QueueLost() {
log.Printf("queue %s: queue lost\n", w.name)
}

func (w MetricsWriter) SubQueuePut(subq string) {
func (w MetricsWriter) SubqPut(subq string) {
log.Printf("queue %s/%s: new item come to the queue\n", w.name, subq)
}

func (w MetricsWriter) SubQueuePull(subq string) {
func (w MetricsWriter) SubqPull(subq string) {
log.Printf("queue %s/%s: item leave the queue\n", w.name, subq)
}

func (w MetricsWriter) SubQueueDrop(subq string) {
log.Printf("queue %s/%s: queue drop item\n", w.name, subq)
func (w MetricsWriter) SubqLeak(subq string) {
log.Printf("queue %s/%s: queue leak item\n", w.name, subq)
}
15 changes: 15 additions & 0 deletions metrics/prometheus/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/koykov/queue/metrics/prometheus

go 1.18

require github.com/prometheus/client_golang v1.19.1

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/sys v0.17.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
17 changes: 17 additions & 0 deletions metrics/prometheus/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
15 changes: 5 additions & 10 deletions metrics/prometheus/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package prometheus
import (
"time"

q "github.com/koykov/queue"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -141,13 +140,13 @@ func (w MetricsWriter) WorkerWait(_ uint32, delay time.Duration) {
promWorkerWait.WithLabelValues(w.name).Observe(float64(delay.Nanoseconds() / int64(w.prec)))
}

func (w MetricsWriter) WorkerStop(_ uint32, force bool, status q.WorkerStatus) {
func (w MetricsWriter) WorkerStop(_ uint32, force bool, status string) {
promWorkerIdle.WithLabelValues(w.name).Inc()
if force {
switch status {
case q.WorkerStatusActive:
case "active":
promWorkerActive.WithLabelValues(w.name).Add(-1)
case q.WorkerStatusSleep:
case "sleep":
promWorkerSleep.WithLabelValues(w.name).Add(-1)
}
} else {
Expand All @@ -169,12 +168,8 @@ func (w MetricsWriter) QueueRetry() {
promQueueRetry.WithLabelValues(w.name).Inc()
}

func (w MetricsWriter) QueueLeak(dir q.LeakDirection) {
dirs := "rear"
if dir == q.LeakDirectionFront {
dirs = "front"
}
promQueueLeak.WithLabelValues(w.name, dirs).Inc()
func (w MetricsWriter) QueueLeak(direction string) {
promQueueLeak.WithLabelValues(w.name, direction).Inc()
promQueueSize.WithLabelValues(w.name).Dec()
}

Expand Down
6 changes: 3 additions & 3 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (q *Queue) renqueue(itm *item) (err error) {
q.mw().QueueLost()
return
}
q.mw().QueueLeak(LeakDirectionFront)
q.mw().QueueLeak(LeakDirectionFront.String())
if q.engine.enqueue(itm, false) {
return
} else {
Expand All @@ -309,7 +309,7 @@ func (q *Queue) renqueue(itm *item) (err error) {
}
// Rear direction, just leak item.
err = q.c().DLQ.Enqueue(itm.payload)
q.mw().QueueLeak(LeakDirectionRear)
q.mw().QueueLeak(LeakDirectionRear.String())
}
} else {
// Regular put (blocking mode).
Expand Down Expand Up @@ -383,7 +383,7 @@ func (q *Queue) close(force bool) error {
itm, _ := q.engine.dequeue()
if q.CheckBit(flagLeaky) {
_ = q.c().DLQ.Enqueue(itm.payload)
q.mw().QueueLeak(LeakDirectionFront)
q.mw().QueueLeak(LeakDirectionFront.String())
} else {
q.mw().QueueLost()
}
Expand Down
21 changes: 18 additions & 3 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,28 @@ import (
)

type WorkerStatus uint32
type signal uint32

const (
WorkerStatusIdle WorkerStatus = iota
WorkerStatusActive
WorkerStatusSleep
)

func (s WorkerStatus) String() string {
switch s {
case WorkerStatusIdle:
return "idle"
case WorkerStatusActive:
return "active"
case WorkerStatusSleep:
return "sleep"
}
return "unknown"
}

type signal uint32

const (
sigInit signal = iota
sigSleep
sigWakeup
Expand Down Expand Up @@ -133,7 +148,7 @@ func (w *worker) await(queue *Queue) {
_ = queue.renqueue(&itm)
} else if queue.CheckBit(flagLeaky) && w.c().FailToDLQ {
_ = w.c().DLQ.Enqueue(itm.payload)
w.mw().QueueLeak(LeakDirectionFront)
w.mw().QueueLeak(LeakDirectionFront.String())
}
}
case WorkerStatusIdle:
Expand Down Expand Up @@ -180,7 +195,7 @@ func (w *worker) stop(force bool) {
}
w.l().Printf(msg, w.idx)
}
w.mw().WorkerStop(w.idx, force, w.getStatus())
w.mw().WorkerStop(w.idx, force, w.getStatus().String())
w.setStatus(WorkerStatusIdle)
w.notifyCtl()
}
Expand Down

0 comments on commit 8484e6b

Please sign in to comment.