-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathpipeline.go
89 lines (77 loc) · 2.05 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package warppipe
import (
"context"
)
type stageFn func(context.Context, <-chan *Changeset, chan error) chan *Changeset
// makeStageFunc wraps a StageFunc and returns a stageFn.
func makeStageFunc(sFun StageFunc) stageFn {
f := func(ctx context.Context, inCh <-chan *Changeset, errCh chan error) chan *Changeset {
outCh := make(chan *Changeset)
go func() {
defer close(outCh)
for {
select {
case change := <-inCh:
c, err := sFun(change)
if err != nil {
errCh <- err
}
if c == nil {
continue
}
outCh <- c
case <-ctx.Done():
return
}
}
}()
return outCh
}
return f
}
// StageFunc is a function for processing changesets in a pipeline Stage.
// It accepts a single argument, a Changset, and returns one of:
// (Changeset, nil): If the stage was successful
// (nil, nil): If the changeset should be dropped (useful for filtering)
// (nil, error): If there was an error during the stage
type StageFunc func(*Changeset) (*Changeset, error)
// Stage is a pipeline stage.
type Stage struct {
Name string
Fn stageFn
}
// Pipeline represents a sequence of stages for processing Changesets.
type Pipeline struct {
stages []*Stage
outCh <-chan *Changeset
errCh chan error
}
// NewPipeline returns a new Pipeline.
func NewPipeline() *Pipeline {
return &Pipeline{
stages: []*Stage{},
outCh: make(chan *Changeset),
errCh: make(chan error),
}
}
// AddStage adds a new Stage to the pipeline
func (p *Pipeline) AddStage(name string, fn StageFunc) {
p.stages = append(p.stages, &Stage{
Name: name,
Fn: makeStageFunc(fn),
})
}
// Start starts the pipeline, consuming off of a source chan that emits *Changeset.
func (p *Pipeline) Start(ctx context.Context, sourceCh <-chan *Changeset) (<-chan *Changeset, <-chan error) {
if len(p.stages) > 0 {
initStage := p.stages[0]
outCh := initStage.Fn(ctx, sourceCh, p.errCh)
for _, stage := range p.stages[1:] {
outCh = stage.Fn(ctx, outCh, p.errCh)
}
p.outCh = outCh
} else {
p.outCh = sourceCh
}
return p.outCh, p.errCh
}