-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
526 lines (398 loc) · 14 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
#
# Test and benchmark the satisfiability checker with various formulas
#
# Uses systemd and Linux's cgroups v2 to limit time and memory usage for each
# test case, so it only works under Linux. systemd 255 or above is required
# to measure the peak memory usage. pystemd is used to connect with systemd.
#
import json
import math
import time
from multiprocessing import Process, Pipe, connection
import pystemd
# Some signatures we need are missing from Pystemd
from pystemd.systemd1.unit_signatures import KNOWN_UNIT_SIGNATURES
KNOWN_UNIT_SIGNATURES[b'AddRef'] = b'b'
KNOWN_UNIT_SIGNATURES[b'PIDs'] = b'au'
from ctxform.logics import PROBLEM_CLASS, PARSER_CLASS
from ctxform.ltl import LTLProblem
from ctxform.parser import Operator as Op
#
# Functions to generate some families of formulas
#
def change_head(formula: tuple):
"""Change the operator on the head of the formula"""
head, *args = formula
match head:
case Op.WUNTIL:
new_op = Op.UNTIL
case Op.UNTIL:
new_op = Op.WUNTIL
case _:
return formula
return new_op, *args
def nested_formula(n: int, weak: bool = False, m: int = 0):
"""Nested LTL formula extended from Esparza et al. JACM paper"""
if n == 0:
return Op.WUNTIL if weak else Op.UNTIL, (Op.VAR, f'p{m}'), (Op.VAR, f'p{m+1}')
else:
recursive = (Op.CTX, f'c{m}', nested_formula(n - 1, not weak, m + 1))
fixed = (Op.VAR, f'p{m}')
return (Op.WUNTIL, recursive, fixed) if weak else (Op.UNTIL, fixed, recursive)
def nested_equivalent(n: int, weak: bool = False, m: int = 0):
"""Reduced nested_formula"""
if n == 0:
return nested_formula(0, weak, m)
elif weak:
nested = nested_formula(n - 1, not weak, m + 1)
return (Op.DISJUNCTION,
(Op.CONJUNCTION,
(Op.ALWAYS, (Op.EVENTUALLY, nested[2])),
# The original one with W below
(Op.WUNTIL,
(Op.CTX, f'c{m}', change_head(nested)),
(Op.VAR, f'p{m}'),
)
),
# The original one with U above and something else to the right
(Op.UNTIL,
(Op.CTX, f'c{m}', nested_equivalent(n - 1, not weak, m + 1)),
(Op.DISJUNCTION,
(Op.VAR, f'p{m}'),
(Op.ALWAYS, (Op.CTX, f'c{m}', (Op.LIT, False))),
)
)
)
else:
nested = nested_formula(n - 1, not weak, m + 1)
return (Op.DISJUNCTION,
# The original one with U below
(Op.UNTIL,
(Op.VAR, f'p{m}'),
(Op.CTX, f'c{m}', change_head(nested)),
),
(Op.CONJUNCTION,
(Op.EVENTUALLY, (Op.ALWAYS, nested[1])),
# The original one with U above and something else to the right
(Op.WUNTIL,
(Op.CONJUNCTION,
(Op.VAR, f'p{m}'),
(Op.EVENTUALLY, (Op.CTX, f'c{m}', (Op.LIT, True))),
),
(Op.CTX, f'c{m}', nested_equivalent(n - 1, not weak, m + 1)),
),
)
)
def fgf_formula(n: int, fg: bool = True, m: int = 0, same: bool = False):
"""Formulas mixing FG and GF"""
ctx = 'c' if same else f'c{m}'
if n == 0:
return Op.CTX, ctx, (Op.VAR, 'p')
else:
first, second = Op.ALWAYS, Op.EVENTUALLY
if fg:
first, second = second, first
return Op.CTX, ctx, (first, (second, fgf_formula(n - 1, not fg, m + 1, same)))
def fgf_equivalent(n: int, fg: bool = True, m: int = 0, same: bool = False):
"""Equivalent to fgf_formula"""
ctx = 'c' if same else f'c{m}'
if n == 0:
return fgf_formula(n, fg, m, same)
else:
first, second = Op.ALWAYS, Op.EVENTUALLY
if fg:
first, second = second, first
return (Op.DISJUNCTION,
(Op.CONJUNCTION,
(first, (second, fgf_equivalent(n - 1, not fg, m + 1, same))),
(Op.CTX, ctx, (Op.LIT, True))
),
(Op.CTX, ctx, (Op.LIT, False))
)
#
# Functions to obtain information about formulas
#
def formula_size(form):
"""Size of the formula"""
head, *args = form
match head:
case Op.CTX | Op.LIT | Op.VAR:
return 1
case _:
return 1 + sum(formula_size(arg) for arg in args)
def formula_depth(form):
"""Size of the formula"""
head, *args = form
match head:
case Op.LIT | Op.VAR:
return 0
case Op.CTX:
return 1 + formula_depth(args[1])
case _:
return max(formula_depth(arg) for arg in args)
#
# Functions to control resource usage
#
class ControlledRunner:
"""Run functions in a controlled environment"""
def __init__(self, memory_limit=4000000000, time_limit=600, scope_name=b'ctxform-test.scope'):
# Memory limit in bytes
self.memory_limit = memory_limit
# Time limit in seconds
self.time_limit = time_limit
# Name of the systemd scope
self.scope_name = scope_name
# Connect to the user session bus
self.bus = pystemd.dbuslib.DBus(user_mode=True)
self.bus.open()
# Manager to connect with the user session systemd
self.manager = pystemd.systemd1.Manager(bus=self.bus)
self.manager.load()
# Check systemd version for compatibility
version = self.manager.Version.decode('utf-8')
self.recent_enough = int(version.split('.', maxsplit=1)[0]) >= 255
if not self.recent_enough:
print(f'Using systemd version {version} while memory peak measurement requires systemd 255 or above.')
def __del__(self):
self.bus.close()
def run(self, func, args=()):
"""Run a function in the controlled environment"""
# Start the process for the given function
parent_conn, child_conn = Pipe()
process = Process(target=self.target, args=(child_conn, func, args))
process.start()
properties = {
'Description': 'Test scope for ctxform',
'PIDs': [process.pid],
'MemoryMax': self.memory_limit,
'MemorySwapMax': 0,
# 'RuntimeMaxUSec': self.time_limit * 1000000, # done by wait
'AddRef': True,
}
# Start a scope where to confine the process and let it go on
self.manager.Manager.StartTransientUnit(self.scope_name, b'fail', properties, ())
# Wait until the unit is active to start the process
unit = pystemd.systemd1.Unit(self.scope_name, bus=self.bus)
unit.load()
unit.Unref() # removes the reference from AddRef
while unit.ActiveState != b'active':
time.sleep(0.25)
# Send a message to start the actual algorithm (otherwise the process
# will start unconfined and the measures will be inaccurate)
parent_conn.send('start')
# Wait until the process has finished, successfully or not
# (process.sentinel signals whether the process has finished,
# while parent_conn will wake up this when a message is available)
ready = connection.wait([process.sentinel, parent_conn], timeout=self.time_limit + 1)
# Process does not finish in time
if not ready:
memory_peak = unit.MemoryPeak if self.recent_enough else -1
cpu_usage = unit.CPUUsageNSec
# Kill the unit
unit.Kill(b'all', 9)
result = dict(ok=False, cpu_usage=cpu_usage, memory_peak=memory_peak, reason='timeout')
# Process has finished successfully
elif ready[0] != process.sentinel:
memory_peak = unit.MemoryPeak if self.recent_enough else -1
cpu_usage = unit.CPUUsageNSec
# Result contains the time measures by the child process
result = parent_conn.recv()
# Once the measures have been taken, we signal the process to finish
parent_conn.send('ok')
process.join()
result = dict(ok=True, cpu_usage=cpu_usage, memory_peak=memory_peak, result=result)
# Process has failed
else:
# Wait until systemd become aware of the failure
while unit.ActiveState == b'active':
time.sleep(0.25)
result = unit.Result
cpu_usage = unit.CPUUsageNSec
# Reset the failed unit to reuse it with the next test case
unit.ResetFailed()
# result would be oom-killer (out of memory)
result = dict(ok=False, cpu_usage=cpu_usage, reason=result.decode())
# Wait until the unit gets inactive
while unit.ActiveState != b'inactive':
time.sleep(0.25)
return result
@staticmethod
def target(conn, func, args):
"""What gets executed by the child process"""
# Wait until the scope is set
conn.recv()
try:
result = func(*args)
except Exception as e:
result = e
# Tell the caller that we are ready
conn.send(result)
# Wait until the metadata has been captured
conn.recv()
def benchmark_method1(problem_class, f1, f2):
"""Benchmark two formulas"""
problem = problem_class(f1, f2)
start = time.perf_counter_ns()
result = problem.solve_with_context(timeout=None)[1]
end = time.perf_counter_ns()
extra = {}
# For LTL, capture the number of automata states
if problem_class == LTLProblem:
extra['left-states'] = problem.aut_left.num_states()
extra['left-not-states'] = problem.aut_not_left.num_states()
extra['right-states'] = problem.aut_right.num_states()
extra['right-not-states'] = problem.aut_not_right.num_states()
return result, end - start, extra
def benchmark_method2(problem_class, f1, f2):
"""Benchmark two formulas"""
problem = problem_class(f1, f2)
start = time.perf_counter_ns()
result = problem.solve(timeout=None)[0]
end = time.perf_counter_ns()
# Obtain the canonical context
# canonical = problem.canonical_context(simplified=True)
# if isinstance(canonical, tuple):
# canonical = problem.canonical_context(simplified=False)
# Capture the number of contexts, new variables, and clauses
extra = {
'ctx-vars': len(problem.transformer.ctx_map),
'ctx-occur': len(problem.transformer.ctx_ap_map),
'cond-clauses': 2 * sum(math.comb(len(group), 2) for group in problem.transformer.ctx_map.values()),
# 'canonical': str(to_spot(canonical['f']).simplify()) if canonical is not None else '',
}
# For LTL, capture the number of automata states
if problem_class == LTLProblem:
extra['left-states'] = problem.aut_left.num_states()
extra['left-not-states'] = problem.aut_not_left.num_states()
extra['right-states'] = problem.aut_right.num_states()
extra['right-not-states'] = problem.aut_not_right.num_states()
extra['cond-states'] = problem.aut_cond.num_states()
return result, end - start, extra
def dump_result(out, logic: str, name: str, lhs, rhs, mth1, mth2, arg=None):
"""Dump the result to a ndJSON file"""
# Common information
summary = {
'logic': logic,
'id': name,
'arg': arg,
'lhs': {
'size': formula_size(lhs),
'depth': formula_depth(lhs),
},
'rhs': {
'size': formula_size(rhs),
'depth': formula_depth(rhs),
},
}
# Result of each method
for k, mth in enumerate((mth1, mth2), start=1):
if mth is not None:
if mth['ok']:
if isinstance(mth['result'], Exception):
summary[f'mth{k}'] = {
'ok': False,
'reason': 'exception',
'details': str(mth['result']),
}
else:
result, etime, extra = mth['result']
summary[f'mth{k}'] = {
'ok': True,
'result': result,
'time': etime,
'cpu': mth['cpu_usage'],
'memory': mth['memory_peak'],
} | extra
else:
summary[f'mth{k}'] = {
'ok': False,
'reason': mth['reason'],
}
json.dump(summary, out)
out.write('\n')
out.flush()
def load_static_formulas(filename='formulas.json'):
"""Load formulas from file"""
# Load formulas from file
if filename.endswith('.toml'):
import tomllib
with open(filename, 'rb') as tf:
static_forms = tomllib.load(tf)
else:
with open(filename, 'rb') as jf:
static_forms = json.load(jf)
# Import between logics
for logic, imports in static_forms.get('import', {}).items():
for other in imports:
static_forms[logic] |= static_forms[other]
static_forms.pop('import')
return static_forms
def show_info(rid, mth):
"""Show brief information about a run"""
msg = f'{mth["cpu_usage"] / 1e9:.2}' if mth['ok'] else mth['reason']
return f' ({rid}) {msg}'
def main():
"""Run all tests and benchmarks"""
import argparse
args_parser = argparse.ArgumentParser(description='Test runner')
args_parser.add_argument('--input', '-i', help='JSON/TOML file with formulas', default='test/formulas.json')
args_parser.add_argument('--output', '-o', help='Path to save the results (ndjson)', default='results.ndjson')
args_parser.add_argument('--timeout', help='Timeout for executions', type=int, default=900)
args_parser.add_argument('--memlimit', help='Memory limit (in Mb)', type=int, default=8000)
args_parser.add_argument('--skip-generated', help='Do not check generated formulas', action='store_true')
args = args_parser.parse_args()
# Load static formulas from file
static_forms = load_static_formulas(args.input)
# Test written before the case in the terminal
# pretext = '\x1b[1K\r'
pretext = '\n'
# Runner of test cases
runner = ControlledRunner(time_limit=args.timeout, memory_limit=args.memlimit * 1000000)
with open(args.output, 'w') as out:
# Fixed size cases
for logic, cases in static_forms.items():
Problem = PROBLEM_CLASS[logic]
Parser = PARSER_CLASS[logic]
parser = Parser()
for name, (lhs, rhs) in cases.items():
print(f'{pretext}{logic} {name}', end='', flush=True)
plhs = parser.parse(lhs)
prhs = parser.parse(rhs)
mth2 = runner.run(benchmark_method2, args=(Problem, plhs, prhs))
print(show_info(2, mth2), end='', flush=True)
mth1 = runner.run(benchmark_method1, args=(Problem, plhs, prhs))
print(show_info(1, mth1), end='', flush=True)
dump_result(out, logic, name, plhs, prhs, mth1, mth2)
# Skip checking generated formulas
if args.skip_generated:
print()
return
# The rest are LTL cases
Problem = PROBLEM_CLASS['ltl']
logic = 'ltl'
# Nested formula
for k in range(3):
for weak in (True, False):
print(f'{pretext}nested {k} {weak}', end='', flush=True)
f1 = nested_formula(k, weak)
f2 = nested_equivalent(k, weak)
mth1 = runner.run(benchmark_method1, args=(Problem, f1, f2))
print(show_info(1, mth1), end='', flush=True)
mth2 = runner.run(benchmark_method2, args=(Problem, f1, f2))
print(show_info(2, mth1), end='', flush=True)
dump_result(out, logic, 'nested', f1, f2, mth1, mth2, arg={'n': k, 'weak': weak})
# Nested FG and GF
for k in range(4):
for same in (True, False):
for fg in (True, False):
print(f'{pretext}fgf {k} {same} {fg}', end='', flush=True)
f1 = fgf_formula(k, fg=fg, same=same)
f2 = fgf_equivalent(k, fg=fg, same=same)
mth1 = runner.run(benchmark_method1, args=(Problem, f1, f2))
print(show_info(1, mth1), end='', flush=True)
mth2 = runner.run(benchmark_method2, args=(Problem, f1, f2))
print(show_info(2, mth2), end='', flush=True)
dump_result(out, logic, 'fgf', f1, f2, mth1, mth2, arg={'n': k, 'same': same, 'fg': fg})
print()
if __name__ == '__main__':
main()