Skip to content

Commit

Permalink
Merge pull request #3 from koykov/force-close-lock
Browse files Browse the repository at this point in the history
Balancer lock on DE queue.
  • Loading branch information
koykov authored Nov 10, 2022
2 parents 36462a3 + 75d7aa6 commit cd81965
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/koykov/queue

go 1.16

require github.com/koykov/bitset v0.0.0-20220630192021-a713ee0d389d
require github.com/koykov/bitset v1.0.0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
github.com/koykov/bitset v0.0.0-20220630192021-a713ee0d389d h1:b82iiOxU5g1mcgKfmJPg47WGLumM91t9wPNWN1a3DbM=
github.com/koykov/bitset v0.0.0-20220630192021-a713ee0d389d/go.mod h1:Mg0YlEO6vG0Vd2N5qZhyH1ss2ZuuMZR8iYNKshQcm6I=
github.com/koykov/bitset v1.0.0 h1:2mEbAhKelhpdWqnpa+mR3HRhdMsto5od7ACOi6MIAmk=
github.com/koykov/bitset v1.0.0/go.mod h1:DVR3bH49c1oOcNtD38h+aQq7lp1ZY91cXmjOldlTk8A=
7 changes: 4 additions & 3 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,6 @@ func (q *Queue) close(force bool) error {
// Wait till all enqueue operations will finish.
for atomic.LoadInt64(&q.enqlock) > 0 {
}
// Close the stream.
// Please note, this is not the end for regular close case. Workers continue works while queue has items.
close(q.stream)

if force {
// Immediately stop all active/sleeping workers.
Expand All @@ -329,6 +326,10 @@ func (q *Queue) close(force bool) error {
}
}
}
// Close the stream.
// Please note, this is not the end for regular close case. Workers continue works while queue has items.
close(q.stream)

return nil
}

Expand Down
47 changes: 38 additions & 9 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ type worker struct {
idx uint32
// Status of the worker.
status WorkerStatus
// Pause channel between put to sleep and stop.
pause chan struct{}
// Channel to control sleep and stop states.
// This channel delivers two signals:
// * wakeup for slept workers
// * force close for active workers
ctl chan struct{}
// Last signal timestamp.
lastTS int64
// Worker instance.
Expand All @@ -42,7 +45,7 @@ func makeWorker(idx uint32, config *Config) *worker {
w := &worker{
idx: idx,
status: WorkerStatusIdle,
pause: make(chan struct{}, 1),
ctl: make(chan struct{}, 1),
proc: config.Worker,
config: config,
}
Expand Down Expand Up @@ -70,7 +73,7 @@ func (w *worker) await(queue *Queue) {
switch w.getStatus() {
case WorkerStatusSleep:
// Wait config.SleepInterval.
<-w.pause
<-w.ctl
case WorkerStatusActive:
// Read itm from the stream.
itm, ok := <-queue.stream
Expand All @@ -81,15 +84,30 @@ func (w *worker) await(queue *Queue) {
}
w.mw().QueuePull()

var intr bool
// Check delayed execution.
if itm.dexpire > 0 {
now := queue.clk().Now().UnixNano()
if delta := time.Duration(itm.dexpire - now); delta > 0 {
// Processing time has not yet arrived. So wait till delay ends.
time.Sleep(delta)
select {
case <-time.After(delta):
break
case <-w.ctl:
// Waiting interrupted due to force close signal.
intr = true
// Calculate real wait time.
delta = time.Duration(queue.clk().Now().UnixNano() - now)
break
}
w.mw().WorkerWait(w.idx, delta)
}
}
if intr {
// Return item back to the queue due to interrupt signal.
_ = queue.renqueue(&itm)
return
}

// Forward itm to dequeuer.
if err := w.proc.Do(itm.payload); err != nil {
Expand Down Expand Up @@ -137,7 +155,7 @@ func (w *worker) wakeup() {
}
w.setStatus(WorkerStatusActive)
w.mw().WorkerWakeup(w.idx)
w.pause <- struct{}{}
w.notifyCtl()
}

// Stop (or force stop) worker.
Expand All @@ -151,8 +169,19 @@ func (w *worker) stop(force bool) {
}
w.mw().WorkerStop(w.idx, force, w.getStatus())
w.setStatus(WorkerStatusIdle)
// Notify pause channel about stop.
w.pause <- struct{}{}
w.notifyCtl()
}

// Check if ctl channel is empty and send signal (wakeup or force close).
func (w *worker) notifyCtl() {
// Check ctl channel for previously undelivered signal.
if len(w.ctl) > 0 {
// Clear ctl channel to prevent locking.
_, _ = <-w.ctl
}

// Send stop signal to ctl channel.
w.ctl <- struct{}{}
}

// Set worker status.
Expand All @@ -167,7 +196,7 @@ func (w *worker) getStatus() WorkerStatus {

// Check if worker slept enough time.
func (w *worker) sleptEnough() bool {
dur := time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&w.lastTS))
dur := time.Duration(w.c().Clock.Now().UnixNano() - atomic.LoadInt64(&w.lastTS))
return dur >= w.c().SleepInterval
}

Expand Down

0 comments on commit cd81965

Please sign in to comment.