-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
241 lines (202 loc) · 6.33 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package main
import (
"fmt"
"sync"
"time"
"github.com/jucardi/go-terminal-colors"
)
const (
INTERNAL = iota
SEND
RECV
)
var gColors = []fmtc.Color{ fmtc.LightRed, fmtc.LightGreen, fmtc.LightCyan }
// now is a wrapper around time.Now that prints only the time and no date
func now() string {
return time.Now().Format("15:05:16.000000000")
}
// Message contains a vector timestamp that is send to other processes.
// Supposedly, there should also be a data payload within the message,
// but since this code is used to simulate vector clock algorithm, it is omited.
type Message struct {
t []int
}
// ProcessEvent is predefined action that a process should perform. These are
// generated by the pool and executed by the processes in sequential fashion.
// ch is nil only for events that do not send anything.
type ProcessEvent struct {
op int
ch chan Message
}
// Process is an abstraction of a process, running on another host. It maintains
// a vector clock that is used to provide a partial order for the events in a
// distributed system. Ch contains a separate channel for each process inside the
// simulated network. A channel for a process with a certain ID can be accessed via
// Ch[ID].
type Process struct {
Id int
Clock []int
Ch []chan Message
scenario []ProcessEvent
}
func NewProcess(procId, procCount int) (p *Process) {
p = &Process{
Id : procId,
Clock : make([]int, procCount),
Ch : make([]chan Message, procCount),
scenario : make([]ProcessEvent, 0),
}
for i := range p.Ch {
if i == procId {
continue
}
p.Ch[i] = make(chan Message)
}
return
}
func (p *Process) PrintMessage(localId int, msg string) {
color := fmtc.Gray
if p.Id < len(gColors) {
color = gColors[p.Id]
}
procChar := byte('a' + p.Id)
fmtc.WithColors(color).
Printf("Process %c %v : %c%v, ts=%v, real=%v\n",
procChar, msg, procChar, localId, p.Clock, now())
}
// Increment increments the component in the clock that represents
// the process itself.
func (p *Process) Increment() {
p.Clock[p.Id]++
}
// Sync synchronizes internal vector clock with a provided one. That is,
// each respective component of the two vectors are taken and the maximum one
// is chosen to be the new component of the internal clock.
func (p *Process) Sync(t []int) {
if len(p.Clock) != len(t) {
panic(fmt.Errorf("process %d was requested to sync with a vector of different dimension: %v vs. %v, respectively",
p.Id, p.Clock, t))
}
for i := range t {
if p.Clock[i] < t[i] {
p.Clock[i] = t[i]
}
}
}
// Send will encapsulate its internal vector clock into a Message and send over
// to the provided channel.
func (p *Process) Send(dest chan Message) {
t := make([]int, len(p.Clock))
copy(t, p.Clock)
dest<-Message{ t }
}
// Append provides builder pattern for the process' scenario by appending the event
// to its end.
func (p *Process) Append(event ProcessEvent) {
p.scenario = append(p.scenario, event)
}
// ProcessPool is a helper class for building the simulation scenario and executing it.
type ProcessPool struct {
pool []*Process
}
func NewProcessPool(procCount int) (pp *ProcessPool) {
pp = &ProcessPool{
pool : make([]*Process, procCount),
}
for i := range pp.pool {
pp.pool[i] = NewProcess(i, procCount)
}
return
}
// Internal event in a process is an event that does not affect
// other processes in the distributed system.
func (pp *ProcessPool) Internal(procId int) {
ev := ProcessEvent{ INTERNAL, nil }
pp.pool[procId].Append(ev)
}
// Transfer event corresponds to message transfer between two processes.
// Note, that Transfer will create two events in the two respective processes:
// one for sending and other for receiving. That means that simulated transfer
// will happend instanly. Usually, events can be rearranged, so that Receive(b)
// instantly follows Send(a, b). In that case, these two lines can be replaced
// by a single Transfer(a, b).
func (pp *ProcessPool) Transfer(senderId, receiverId int) {
pp.Send(senderId, receiverId)
pp.Receive(receiverId, senderId)
}
// Send will create a send event only for the sender process. If receiver needs
// to receive the message sent by this event, then Receive needs to be called
// for the recepient process also.
func (pp *ProcessPool) Send(senderId, receiverId int) {
ev := ProcessEvent{ SEND, pp.pool[receiverId].Ch[senderId] }
pp.pool[senderId].Append(ev)
}
// Receive will make the recipient process to expect a message from the sender.
// This event will block the recipient.
func (pp *ProcessPool) Receive(receiverId, senderId int) {
ev := ProcessEvent{ RECV, pp.pool[receiverId].Ch[senderId] }
pp.pool[receiverId].Append(ev)
}
func (pp *ProcessPool) Run() {
var wg sync.WaitGroup
for _, p := range pp.pool {
wg.Add(1)
go procHandler(p, &wg)
}
wg.Wait()
}
func procHandler(proc *Process, wg *sync.WaitGroup) {
defer wg.Done()
for i, ev := range proc.scenario {
switch ev.op {
case INTERNAL:
proc.Increment()
proc.PrintMessage(i, "MY ")
case SEND:
proc.Increment()
proc.PrintMessage(i, "SEND")
proc.Send(ev.ch)
case RECV:
msg := <-ev.ch
proc.Sync(msg.t)
proc.Increment()
proc.PrintMessage(i, "RECV")
}
}
}
// An example from the article
// https://towardsdatascience.com/understanding-lamport-timestamps-with-pythons-multiprocessing-library-12a6427881c6
func example1() {
pool := NewProcessPool(3)
pool.Internal(0)
pool.Transfer(0, 1)
pool.Internal(0)
pool.Transfer(1, 0)
pool.Transfer(1, 2)
pool.Transfer(2, 1)
pool.Internal(0)
pool.Run()
fmt.Println("")
}
// An example given in the homework description
func example2() {
pool := NewProcessPool(3)
pool.Transfer(0, 1)
pool.Transfer(0, 1)
pool.Internal(0)
pool.Transfer(1, 0)
pool.Transfer(2, 1)
pool.Internal(0)
pool.Internal(1)
pool.Internal(0)
pool.Transfer(1, 0)
pool.Transfer(1, 2)
pool.Internal(2)
pool.Transfer(1, 2)
pool.Run()
fmt.Println("")
}
func main() {
example1()
example2()
}