-
Notifications
You must be signed in to change notification settings - Fork 0
/
dynamic_scheduler.go
80 lines (63 loc) · 1.74 KB
/
dynamic_scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package gowq
import (
"context"
"fmt"
"time"
)
// Start runs the job scheduler, it blocks unless a new job is available.
func (w *workQueue[Result]) Start(ctx context.Context) {
w.shutdownChan = make(chan bool)
w.dynamicJobQueueLock.Lock()
w.dynamicJobQueue = make(chan Job[Result])
w.dynamicJobQueueLock.Unlock()
workersQueue := make(chan bool, w.nWorkers)
for {
w.shutdownRequiredLock.Lock()
shutdownRequired := w.shutdownRequired
w.shutdownRequiredLock.Unlock()
if shutdownRequired && len(w.dynamicJobQueue) == 0 {
break
}
var job Job[Result]
select {
case job = <-w.dynamicJobQueue:
default:
time.Sleep(1 * time.Millisecond)
continue
}
workersQueue <- true
go func(ctx context.Context) {
result, err := job(ctx)
if err != nil {
w.appendError(fmt.Errorf("%w: %s", ErrJobFailed, err.Error()))
} else {
w.appendResult(result)
}
<-workersQueue
}(ctx)
}
close(w.dynamicJobQueue)
w.shutdownChan <- true
}
// Schedule sends a new job to the scheduler.
func (w *workQueue[Result]) Schedule(job Job[Result]) {
w.ensureQueueStarted("Schedule")
w.dynamicJobQueue <- job
}
// Shutdown signals the job scheduler that it should terminate execution and
// then waits until the scheduler actually ends.
// Note that the scheduler will run until the job queue is not empty.
func (w *workQueue[Result]) Shutdown() bool {
w.ensureQueueStarted("Shutdown")
w.shutdownRequiredLock.Lock()
w.shutdownRequired = true
w.shutdownRequiredLock.Unlock()
return <-w.shutdownChan
}
func (w *workQueue[Result]) ensureQueueStarted(method string) {
w.dynamicJobQueueLock.Lock()
defer w.dynamicJobQueueLock.Unlock()
if w.dynamicJobQueue == nil {
panic(fmt.Errorf("%s failed: %w", method, ErrQueueNotStarted))
}
}