-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsync.go
150 lines (125 loc) · 4.33 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package fun
import (
"context"
"sync"
"github.com/tychoish/fun/ft"
)
func lock(mtx sync.Locker) sync.Locker { mtx.Lock(); return mtx }
func with(mtx sync.Locker) { mtx.Unlock() }
// WaitGroup works like sync.WaitGroup, except that the Wait method
// takes a context (and can be passed as a fun.Operation). The
// implementation is exceptionally simple. The only constraint, like
// sync.WaitGroup, is that you can never modify the value of the
// internal counter such that it is negative, event transiently. The
// implementation does not require background resources aside from
// Wait, which creates a single goroutine that lives for the entire
// time that Wait is running, but no other background resources are
// created. Multiple copies of Wait can be safely called at once, and
// the WaitGroup is reusable more than once.
//
// This implementation is about 50% slower than sync.WaitGroup after
// informal testing. It provides a little extra flexiblity and
// introspection, with similar semantics, that may be worth the
// additional performance hit.
type WaitGroup struct {
mu sync.Mutex
cond *sync.Cond
counter int
}
func (wg *WaitGroup) init() {
if wg.cond == nil {
wg.cond = sync.NewCond(&wg.mu)
}
}
// Add modifies the internal counter. Raises an ErrInvariantViolation
// error if any modification causes the internal coutner to be less
// than 0.
func (wg *WaitGroup) Add(num int) {
wg.mu.Lock()
defer wg.mu.Unlock()
wg.init()
Invariant.IsTrue(wg.counter+num >= 0, "cannot decrement waitgroup to less than 0: ", wg.counter, " + ", num)
wg.counter += num
if wg.counter == 0 {
wg.cond.Broadcast()
}
}
// Done marks a single operation as done.
func (wg *WaitGroup) Done() { wg.Add(-1) }
// Inc adds one item to the wait group.
func (wg *WaitGroup) Inc() { wg.Add(1) }
// Num returns the number of pending workers.
func (wg *WaitGroup) Num() int {
wg.mu.Lock()
defer wg.mu.Unlock()
return wg.counter
}
// IsDone returns true if there is pending work, and false otherwise.
func (wg *WaitGroup) IsDone() bool {
wg.mu.Lock()
defer wg.mu.Unlock()
return wg.counter == 0
}
// Operation returns with WaitGroups Wait method as a Operation.
func (wg *WaitGroup) Operation() Operation { return wg.Wait }
// DoTimes uses the WaitGroup to launch an operation in a worker pool
// of the specified size, and does not block until the operation returns.
func (wg *WaitGroup) DoTimes(ctx context.Context, n int, op Operation) {
ft.DoTimes(n, func() { wg.Launch(ctx, op) })
}
// Launch increments the WaitGroup and starts the operation in a go
// routine.
func (wg *WaitGroup) Launch(ctx context.Context, op Operation) {
wg.Inc()
op.PostHook(wg.Done).Background(ctx)
}
// Worker returns a worker that will block on the wait group
// returning and return the conbext's error if one exits.
func (wg *WaitGroup) Worker() Worker {
return func(ctx context.Context) error { wg.Wait(ctx); return ctx.Err() }
}
// Wait blocks until either the context is canceled or all items have
// completed.
//
// Wait is pasable or usable as a fun.Operation.
//
// In many cases, callers should not rely on the Wait operation
// returning after the context expires: If Done() calls are used in
// situations that respect a context cancellation, aborting the Wait
// on a context cancellation, particularly when Wait gets a context
// that has the same lifecycle as the operations its waiting on, the
// result is that worker routines will leak. Nevertheless, in some
// situations, when workers may take a long time to respond to a
// context cancellation, being able to set a second deadline on
// Waiting may be useful.
//
// Consider using `fun.Operation(wg.Wait).Block()` if you want blocking
// semantics with the other features of this WaitGroup implementation.
func (wg *WaitGroup) Wait(ctx context.Context) {
wg.mu.Lock()
defer wg.mu.Unlock()
if wg.counter == 0 || ctx.Err() != nil {
return
}
wg.init()
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
// need this to wake up any waiters in the case that the
// context has been canceled, to avoid having many
// theads/waiters blocking.
go func() { <-ctx.Done(); wg.cond.Broadcast() }()
for {
select {
case <-ctx.Done():
return
default:
// block until the context is canceled or we
// are signaled.
wg.cond.Wait()
if wg.counter == 0 {
return
}
}
}
}