-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathattempt.go
76 lines (70 loc) · 2.5 KB
/
attempt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/*
Copyright 2021 Joseph Cumines
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigbuff
import (
"context"
"errors"
"time"
)
// LinearAttempt returns a new channel that will be published, at most, every rate, for a maximum total of count
// messages, and will be closed after either reaching count or context cancel, whichever comes first. Note that it
// is buffered and will start with a single value, and behaves identically to time.NewTicker in that it will attempt
// to keep a constant rate, but compensates for slow consumers, and in that the value received will be the time at which
// the last attempt was scheduled (the offset from the current time being equivalent to the conflation rate). Either
// rate or count being <= 0 or ctx being nil will trigger a panic. Note that the initial publish will happen inline,
// and context errors are guarded, meaning if the context returns an error when first checked then the returned channel
// will always be closed, with no values sent.
//
// This implementation is designed to be iterated over, by using range, with resource freeing via context cancel.
// It is very useful when implementing something that will attempt to perform an action at a maximum rate, for a maximum
// amount of times, e.g. for linear retry logic.
func LinearAttempt(ctx context.Context, rate time.Duration, count int) <-chan time.Time {
if ctx == nil || rate <= 0 || count <= 0 {
panic(errors.New(`bigbuff.LinearAttempt invalid input`))
}
c := make(chan time.Time, 1)
if ctx.Err() != nil {
close(c)
return c
}
c <- time.Now()
count--
if count <= 0 {
close(c)
return c
}
go func() {
defer close(c)
ticker := time.NewTicker(rate)
defer ticker.Stop()
for i := 0; i < count; {
var t time.Time
select {
case <-ctx.Done():
return
case t = <-ticker.C:
}
if ctx.Err() != nil {
// guarantee at most one tick after context cancel
return
}
select {
case c <- t:
i++
default:
// slow consumer, retry send next tick
}
}
}()
return c
}