-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.go
278 lines (229 loc) · 6.82 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package abd
import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/vitalyisaev2/abd/broadcast"
"github.com/vitalyisaev2/abd/node"
"github.com/vitalyisaev2/abd/utils"
)
// Register provides public interface for a register.
type Register interface {
// Write - write value
Write(ctx context.Context, val utils.Value) error
// Read - read value
Read(ctx context.Context) (utils.Value, error)
}
// Process - the ABD algorithm participant.
type Process interface {
Register // every process must server as a register for the external clients
node.Node // every process must serve as a cluster member
// ID - returns process' unique id
ID() utils.ProcessID
// Quit terminates algorithm
Quit()
}
var _ Process = (*processImpl)(nil)
// nolint: govet // FIXME: dig into alignments
type processImpl struct {
broadcast broadcast.Broadcast // abstracts from the way of communication with other processes
localValue utils.Value // locally stored copy TODO: change to template when Go implements generics
localTimestamp utils.SequenceNumber // largest known timestamp
t utils.SequenceNumber // sequence number of the writer
r utils.SequenceNumber // sequence number of the reader
requestChan chan interface{} // request channel, also used for synchronization
exitChan chan struct{} // termination channel
wg sync.WaitGroup // used for graceful stop
id utils.ProcessID // globally unique ID
}
type writeRequest struct {
ctx context.Context
responseChan chan error
val utils.Value
}
type readRequest struct {
ctx context.Context
responseChan chan *readResponse
}
type readResponse struct {
err error
val utils.Value
}
type receiveStoreRequest struct {
responseChan chan error
ctx context.Context
timestamp utils.SequenceNumber
val utils.Value
}
type receiveLoadRequest struct {
ctx context.Context
responseChan chan *utils.ReadResult
}
func (p *processImpl) Write(ctx context.Context, val utils.Value) error {
request := &writeRequest{
ctx: ctx,
val: val,
responseChan: make(chan error, 1),
}
select {
case p.requestChan <- request:
case <-p.exitChan:
return utils.ErrProcessIsTerminating
}
select {
case err := <-request.responseChan:
return err
case <-p.exitChan:
return utils.ErrProcessIsTerminating
}
}
func (p *processImpl) Read(ctx context.Context) (utils.Value, error) {
request := &readRequest{
ctx: ctx,
responseChan: make(chan *readResponse, 1),
}
select {
case p.requestChan <- request:
case <-p.exitChan:
return 0, utils.ErrProcessIsTerminating
}
select {
case resp := <-request.responseChan:
return resp.val, resp.err
case <-p.exitChan:
return 0, utils.ErrProcessIsTerminating
}
}
func (p *processImpl) Store(ctx context.Context, val utils.Value, timestamp utils.SequenceNumber) error {
request := &receiveStoreRequest{
ctx: ctx,
val: val,
timestamp: timestamp,
responseChan: make(chan error, 1),
}
select {
case p.requestChan <- request:
case <-p.exitChan:
return utils.ErrProcessIsTerminating
}
select {
case err := <-request.responseChan:
return err
case <-p.exitChan:
return utils.ErrProcessIsTerminating
}
}
func (p *processImpl) Load(ctx context.Context) *utils.ReadResult {
request := &receiveLoadRequest{
ctx: ctx,
responseChan: make(chan *utils.ReadResult, 1),
}
select {
case p.requestChan <- request:
case <-p.exitChan:
return &utils.ReadResult{Err: utils.ErrProcessIsTerminating}
}
select {
case resp := <-request.responseChan:
return resp
case <-p.exitChan:
return &utils.ReadResult{Err: utils.ErrProcessIsTerminating}
}
}
func (p *processImpl) loop() {
defer p.wg.Done()
for {
select {
case request := <-p.requestChan:
p.handleRequest(request)
case <-p.exitChan:
return
}
}
}
func (p *processImpl) handleRequest(request interface{}) {
switch t := request.(type) {
case *writeRequest:
t.responseChan <- p.handleWrite(t.ctx, t.val)
case *readRequest:
val, err := p.handleRead(t.ctx)
t.responseChan <- &readResponse{val: val, err: err}
case *receiveStoreRequest:
t.responseChan <- p.handleStore(t.val, t.timestamp)
case *receiveLoadRequest:
t.responseChan <- p.handleLoad()
default:
panic("unexpected message type")
}
}
func (p *processImpl) handleWrite(ctx context.Context, val utils.Value) error {
p.t++
if err := p.broadcast.Store(ctx, val, p.t); err != nil {
return errors.Wrapf(err, "Broadcast Store value=%v term=%v", val, p.t)
}
return nil
}
func (p *processImpl) handleRead(ctx context.Context) (utils.Value, error) {
p.r++
results := p.broadcast.Load(ctx, p.r)
latestValue, latestTimestamp, err := results.LatestTimestamp()
if err != nil {
return 0, errors.Wrapf(err, "Broadcast Load term=%v", p.r)
}
if latestTimestamp > p.localTimestamp {
p.localValue = latestValue
p.localTimestamp = latestTimestamp
}
return p.localValue, nil
}
//nolint: unparam // now the errors are not possible, but things may change in future
func (p *processImpl) handleStore(val utils.Value, timestamp utils.SequenceNumber) error {
if timestamp > p.localTimestamp {
p.localValue = val
p.localTimestamp = timestamp
}
return nil
}
func (p *processImpl) handleLoad() *utils.ReadResult {
return &utils.ReadResult{Timestamp: p.localTimestamp, Value: p.localValue}
}
func (p *processImpl) ID() utils.ProcessID { return p.id }
func (p *processImpl) Quit() {
close(p.exitChan)
p.wg.Wait()
}
var _ node.Client = (*clientLocalNonblocking)(nil)
// clientLocalNonblocking makes it possible to perform private calls to the process
// (important for the broadcast implementation).
type clientLocalNonblocking struct {
p *processImpl
}
func (c clientLocalNonblocking) Store(_ context.Context, val utils.Value, timestamp utils.SequenceNumber) error {
if err := c.p.handleStore(val, timestamp); err != nil {
return errors.Wrap(err, "local handle store")
}
return nil
}
func (c clientLocalNonblocking) Load(_ context.Context) *utils.ReadResult { return c.p.handleLoad() }
func (c clientLocalNonblocking) ID() utils.ProcessID { return c.p.ID() }
// NewProcess is a constructor of the process of an ABD algorithm.
func NewProcess(id utils.ProcessID, bc broadcast.Broadcast) (Process, error) {
p := &processImpl{
localValue: 0,
localTimestamp: 0,
t: 0,
r: 0,
broadcast: bc,
requestChan: make(chan interface{}), // no capacity: used for synchronization
exitChan: make(chan struct{}),
id: id,
wg: sync.WaitGroup{},
}
// register itself in the broadcast
if err := p.broadcast.RegisterClient(&clientLocalNonblocking{p: p}); err != nil {
return nil, errors.Wrap(err, "register client")
}
p.wg.Add(1)
go p.loop()
return p, nil
}