diff --git a/dummy.go b/dummy.go index dc9fb85..8ae28cb 100644 --- a/dummy.go +++ b/dummy.go @@ -15,7 +15,7 @@ func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ WorkerStatus) {} func (DummyMetrics) QueuePut() {} func (DummyMetrics) QueuePull() {} func (DummyMetrics) QueueRetry() {} -func (DummyMetrics) QueueLeak(_ string) {} +func (DummyMetrics) QueueLeak(_ LeakDirection) {} func (DummyMetrics) QueueLost() {} // DummyDLQ is a stub DLQ implementation. It does nothing and need for queues with leak tolerance. diff --git a/leak.go b/leak.go index 7313b5f..74a3ff8 100644 --- a/leak.go +++ b/leak.go @@ -12,14 +12,3 @@ const ( defaultFrontLeakAttempts = 5 ) - -func (ld LeakDirection) String() string { - switch ld { - case LeakDirectionRear: - return "rear" - case LeakDirectionFront: - return "front" - default: - return "unknown" - } -} diff --git a/metrics.go b/metrics.go index 2371d72..fdb17f7 100644 --- a/metrics.go +++ b/metrics.go @@ -26,7 +26,7 @@ type MetricsWriter interface { QueueRetry() // QueueLeak registers item's leak from the full queue. // Param dir indicates leak direction and may be "rear" or "front". - QueueLeak(dir string) + QueueLeak(dir LeakDirection) // QueueLost registers lost items missed queue and DLQ. QueueLost() } diff --git a/queue.go b/queue.go index 9b2d5eb..6adba71 100644 --- a/queue.go +++ b/queue.go @@ -255,7 +255,7 @@ func (q *Queue) renqueue(itm *item) (err error) { if err = q.c().DLQ.Enqueue(itmf.payload); err != nil { return } - q.mw().QueueLeak(LeakDirectionFront.String()) + q.mw().QueueLeak(LeakDirectionFront) select { case q.stream <- *itm: return @@ -267,7 +267,7 @@ func (q *Queue) renqueue(itm *item) (err error) { } // Rear direction, just leak item. err = q.c().DLQ.Enqueue(itm.payload) - q.mw().QueueLeak(LeakDirectionRear.String()) + q.mw().QueueLeak(LeakDirectionRear) } } else { // Regular put (blocking mode). @@ -341,7 +341,7 @@ func (q *Queue) close(force bool) error { itm := <-q.stream if q.CheckBit(flagLeaky) { _ = q.c().DLQ.Enqueue(itm.payload) - q.mw().QueueLeak(LeakDirectionFront.String()) + q.mw().QueueLeak(LeakDirectionFront) } else { q.mw().QueueLost() } diff --git a/worker.go b/worker.go index 04e18f5..79a37d7 100644 --- a/worker.go +++ b/worker.go @@ -120,7 +120,7 @@ func (w *worker) await(queue *Queue) { _ = queue.renqueue(&itm) } else if queue.CheckBit(flagLeaky) && w.c().FailToDLQ { _ = w.c().DLQ.Enqueue(itm.payload) - w.mw().QueueLeak(w.c().LeakDirection.String()) + w.mw().QueueLeak(w.c().LeakDirection) } } case WorkerStatusIdle: