-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscheduler.go
82 lines (66 loc) · 1.51 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package main
import (
"database/sql"
)
// Scheduler assigns queries to worker threads
// based on hostnames in the queries. Potential
// for more balanced scheduling if hostname affinity
// is not a required characteristic.
type Scheduler struct {
pool Pool
done chan *Worker
i int
hostmap map[string]int
}
// Pool is a collection of worker threads
type Pool []*Worker
// NewScheduler returns a scheduler by creating a pool of threads
func NewScheduler(workerCount int, db *sql.DB) *Scheduler {
done := make(chan *Worker, workerCount)
b := &Scheduler{make(Pool, 0, workerCount), done, 0, make(map[string]int)}
for i := 0; i < workerCount; i++ {
w := &Worker{ID: i, requests: make(chan Request, 1)}
go w.work(b.done, db)
b.pool = append(b.pool, w)
}
return b
}
func (b *Scheduler) schedule(work chan Request, done chan struct{}) {
for {
select {
case req := <-work:
b.dispatch(req)
case _ = <-b.done:
case <-done:
b.print()
return
}
}
}
func (b *Scheduler) print() {
sum := 0
// fmt.Printf("\nWork Schedule:\n")
for _, w := range b.pool {
// fmt.Printf("%d ", w.processed)
sum += w.processed
}
_ = float64(sum) / float64(len(b.pool))
// fmt.Printf("queries/worker %.2f\n", avg)
}
func (b *Scheduler) dispatch(req Request) {
var w *Worker
host := req.query.Hostname
val, ok := b.hostmap[host]
if ok == true {
w = &*b.pool[val]
} else {
b.hostmap[host] = b.i
w = &*b.pool[b.i]
b.i++
}
w.requests <- req
w.processed++
if b.i >= len(b.pool) {
b.i = 0
}
}