Skip to content

Commit

Permalink
Merge pull request #5 from koykov/leak-direction-1
Browse files Browse the repository at this point in the history
Constants in leak metrics.
  • Loading branch information
koykov authored Nov 12, 2022
2 parents 62ee180 + ef53e65 commit 6fc3dfa
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 17 deletions.
2 changes: 1 addition & 1 deletion dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ WorkerStatus) {}
func (DummyMetrics) QueuePut() {}
func (DummyMetrics) QueuePull() {}
func (DummyMetrics) QueueRetry() {}
func (DummyMetrics) QueueLeak(_ string) {}
func (DummyMetrics) QueueLeak(_ LeakDirection) {}
func (DummyMetrics) QueueLost() {}

// DummyDLQ is a stub DLQ implementation. It does nothing and need for queues with leak tolerance.
Expand Down
11 changes: 0 additions & 11 deletions leak.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,3 @@ const (

defaultFrontLeakAttempts = 5
)

func (ld LeakDirection) String() string {
switch ld {
case LeakDirectionRear:
return "rear"
case LeakDirectionFront:
return "front"
default:
return "unknown"
}
}
2 changes: 1 addition & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type MetricsWriter interface {
QueueRetry()
// QueueLeak registers item's leak from the full queue.
// Param dir indicates leak direction and may be "rear" or "front".
QueueLeak(dir string)
QueueLeak(dir LeakDirection)
// QueueLost registers lost items missed queue and DLQ.
QueueLost()
}
6 changes: 3 additions & 3 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (q *Queue) renqueue(itm *item) (err error) {
if err = q.c().DLQ.Enqueue(itmf.payload); err != nil {
return
}
q.mw().QueueLeak(LeakDirectionFront.String())
q.mw().QueueLeak(LeakDirectionFront)
select {
case q.stream <- *itm:
return
Expand All @@ -267,7 +267,7 @@ func (q *Queue) renqueue(itm *item) (err error) {
}
// Rear direction, just leak item.
err = q.c().DLQ.Enqueue(itm.payload)
q.mw().QueueLeak(LeakDirectionRear.String())
q.mw().QueueLeak(LeakDirectionRear)
}
} else {
// Regular put (blocking mode).
Expand Down Expand Up @@ -341,7 +341,7 @@ func (q *Queue) close(force bool) error {
itm := <-q.stream
if q.CheckBit(flagLeaky) {
_ = q.c().DLQ.Enqueue(itm.payload)
q.mw().QueueLeak(LeakDirectionFront.String())
q.mw().QueueLeak(LeakDirectionFront)
} else {
q.mw().QueueLost()
}
Expand Down
2 changes: 1 addition & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (w *worker) await(queue *Queue) {
_ = queue.renqueue(&itm)
} else if queue.CheckBit(flagLeaky) && w.c().FailToDLQ {
_ = w.c().DLQ.Enqueue(itm.payload)
w.mw().QueueLeak(w.c().LeakDirection.String())
w.mw().QueueLeak(w.c().LeakDirection)
}
}
case WorkerStatusIdle:
Expand Down

0 comments on commit 6fc3dfa

Please sign in to comment.