-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontroller.da
executable file
·343 lines (279 loc) · 11.8 KB
/
controller.da
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# Copyright (c) 2014-2016 Jon Brandvein, Bo Lin, Yanhong Annie Liu
# Copyright (c) 2014-2016 Stony Brook University
# Copyright (c) 2014-2016 The Research Foundation of SUNY
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
This is a small library for instrumenting DistAlgo programs for
benchmarking. It assumes that each process is created in main()
and lives for the duration of the benchmark.
A single controller process is created to coordinate when the
other processes ("controllees") begin and end, and to collect
their results. The controller is initialized with the number of
controllees (nprocs) to expect, and does the following steps:
1. Wait until we've received a ('CTL_Ready',) message from everyone.
2. Broadcast ('CTL_Start',) to everyone.
3. Wait for nprocs many ('CTL_Done', rudata, rugroup_id) messages.
4. Send ('CTL_Terminate',) to everyone.
The controllees will send a ready message as soon as they have started
up, and they will wait for a terminate message before destroying
themselves. This lets everyone measure the time between the beginning of
step (2) and the end of step (3), excluding the time needed by the system
for creating and destroying processes.
Depending on the platform, controllees will report their process time
and memory usage, while the controller will measure wall-clock time.
The controller will dump all statistics out in the form of a line
###OUTPUT: <data>
where <data> is a JSON-encoded object with the following form:
{'Wallclock_time': <time value>
'All': <metrics>
<group 1>: <metrics>
...
<group n>: <metrics>
}
Each <metrics> is a JSON object with time/memory usage info. Metrics
for all processes managed by the controller are aggregated under the
'All' key. In addition, if a process's class uses the rugroup decorator,
its results will be separately aggregated under a group key.
To adapt a DistAlgo program to use this, make the following changes.
At the top of the file, add the line
controller = import_da('controller')
For each process class,
- add controller.Controllee as a base class
- add a process field and setup() parameter "ctl"
- in the body of setup(), add the line "super().setup(ctl)"
- above the definition of run(), add the decorator "@controller.run"
- optionally, add the decorator @controller.rugroup(<id>) to the class,
to make this class's processes group their result metrics under
the key <id>
In main(), first create a Controller process using the code
ctl = new(controller.Controller, num= 1)
setup(ctl, (nprocs,))
start(ctl)
where nprocs is the number of controllee processes. Then in each
controllee's new() or setup() method, add ctl to the list of
passed-in arguments.
As an extension, we support having the controller listen for only
threshold many done messages; after the threshold is reached, the
controller will send a ('CTL_Stop',) message to all remaining
controllees, which will prompt them to stop what they're doing, report
their statistics in a done message, and wait for a terminate message
before exiting.
"""
import sys
import time
import json
import os
from itertools import chain
# Resource usage data tracking. Reports several measurements
# of time between start and end, and memory usage at end.
class WinResourceUsageData:
"""Tracks process time only."""
def start(self):
self.start_cputime = time.process_time()
def end(self):
self.end_cputime = time.process_time()
self.results = {
'Total_process_time': self.end_cputime - self.start_cputime,
}
@classmethod
def aggregate(cls, rudata_points):
return {
'Total_process_time': sum(p.results['Total_process_time']
for p in rudata_points),
'Total_processes': len(rudata_points),
}
class PosixResourceUsageData:
"""Tracks utime, stime, and maxrss."""
def start(self):
self.start_data = resource.getrusage(resource.RUSAGE_SELF)
def end(self):
self.end_data = resource.getrusage(resource.RUSAGE_SELF)
def diff(attr):
return (getattr(self.end_data, attr) -
getattr(self.start_data, attr))
self.results = {
'Total_user_time': diff('ru_utime'),
'Total_system_time': diff('ru_stime'),
'Total_process_time': diff('ru_utime') + diff('ru_stime'),
'Total_memory': self.end_data.ru_maxrss,
}
@classmethod
def aggregate(cls, rudata_points):
def sumof(attr):
return sum(p.results[attr] for p in rudata_points)
aggr_results = {k: sumof(k) for k in [
'Total_user_time', 'Total_system_time',
'Total_process_time', 'Total_memory']}
aggr_results['Total_processes'] = len(rudata_points)
return aggr_results
if sys.platform == "win32":
ResourceUsageData = WinResourceUsageData
else:
import resource
ResourceUsageData = PosixResourceUsageData
class Controller(process):
def setup(nprocs, threshold=None):
# Number of done messages we should receive before sending
# stop messages to everyone else.
if threshold is None:
threshold = nprocs
# Processes we are controlling (populated when we receive
# ready messages from them), and the subset of processes
# from which we have received done messages.
self.ps = set()
self.done_ps = set()
# Number of processes that have sent ready/done messages.
self.readys = 0
self.dones = 0
# Whether we have sent the stop messages out.
self.sent_stop = False
# Dictionary from resource usage group identifier
# to lists of rudata points for processes in that group.
self.rudata_points = {}
self.ctl_verbose = True
def verboutput(s):
if ctl_verbose:
output(s)
def receive(msg=('CTL_Ready',), from_=source):
# Count readys.
ps.add(source)
readys += 1
verboutput('Got Ready from {} ({}/{})'.format(source, readys, nprocs))
def receive(msg=('CTL_Done', rudata, rugroup_id), from_=source):
# Count dones, record metrics.
dones += 1
done_ps.add(source)
rudata_points.setdefault(rugroup_id, []).append(rudata)
if threshold == nprocs:
verboutput('Got Done from {} ({}/{})'.format(
source, dones, nprocs))
else:
verboutput('Got Done from {} ({}/{}, need {} to stop)'.format(
source, dones, nprocs, threshold))
# If the threshold is reached, send stop to everyone else.
if dones >= threshold and not self.sent_stop:
rest_ps = ps - done_ps
verboutput("Controller stopping everyone")
send(('CTL_Stop',), to=rest_ps)
self.sent_stop = True
def run():
# Start 'em up.
await(readys == nprocs)
verboutput("Controller starting everyone")
t1 = time.perf_counter()
send(('CTL_Start',), to=ps)
# Shut 'em down.
await(dones == nprocs)
t2 = time.perf_counter()
verboutput("Everyone done")
send(('CTL_Terminate',), to=ps)
jsondata = {}
for rugroup_id, points in rudata_points.items():
if rugroup_id is None:
continue
jsondata[rugroup_id] = ResourceUsageData.aggregate(points)
allpoints = list(chain(*rudata_points.values()))
jsondata['All'] = ResourceUsageData.aggregate(allpoints)
jsondata['Wallclock_time'] = t2 - t1
jsonoutput = json.dumps(jsondata['All'])
print("###OUTPUT: " + jsonoutput)
# with open('data.json', 'a') as outfile:
# json.dump(jsonoutput, outfile)
#outfile.write("\n")
#import csv
Parsed_data = json.loads(jsonoutput)
#Performance_Statistics = open('EmployData1.csv', 'a')
#csvwriter = csv.writer(Performance_Statistics)
#count = 0
#if count == 0:
# val1 = []
# val1.append('Total_user_time')
# val1.append('Total_system_time')
# val1.append('Total_process_time')
# val1.append('Total_memory')
# val1.append('Total_processes')
# csvwriter.writerow(val1)
# count = count + 1
#val = []
#val.append(Parsed_data['Total_user_time'])
#val.append(Parsed_data['Total_system_time'])
val1 = Parsed_data['Total_process_time']
#val.append(Parsed_data['Total_memory'])
#val.append(Parsed_data['Total_processes'])
val2 = t2 - t1
#for word in val:
#csvwriter.writerow(val)
mytuple = (val1, val2)
#Performance_Statistics.close()
if await(some(received(('moredata',i,)))):
send(('perfData',mytuple,i), to=nodeof(self))
await(received(('done',), from_=nodeof(self)))
time.sleep(1)
class Controllee(process):
def setup(ctl):
self.rudata = ResourceUsageData()
self.ctl_verbose = True
self.ctl_done = False
def verboutput(s):
if ctl_verbose:
output(s)
def ctl_begin():
# Tell controller we're initialized, then wait for the signal
# before starting. Start recording metrics.
send(('CTL_Ready',), to=ctl)
await(received(('CTL_Start',)))
self.rudata.start()
def ctl_end():
# Stop recording metrics. Tell controller we're done,
# then wait for the signal before terminating.
self.ctl_done = True
self.rudata.end()
rugroup_id = getattr(self, 'ctl_rugroup_id', None)
send(('CTL_Done', self.rudata, rugroup_id), to=ctl)
await(received(('CTL_Terminate',)))
verboutput("Terminating...")
def receive(msg=('CTL_Stop',)):
# If we already sent the done message, i.e., we were in the
# process of finishing when we were stopped, ignore.
self.verboutput("Received stop")
if self.ctl_done:
return
ctl_end()
exit()
def run():
# Stub to squelch warning.
pass
def run(func):
"""Decorator for Process.run() to call controllee hooks."""
def ctl_run(self):
self.ctl_begin()
func(self)
self.ctl_end()
return ctl_run
def rugroup(rugroup_id):
"""Decorator for annotating a process controllee subclass
with a resource usage group identifier. Results for processes
in the same group will be aggregated reported together.
"""
def f(proc):
proc.ctl_rugroup_id = rugroup_id
return proc
return f