-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdump_db_json.py
executable file
·145 lines (108 loc) · 4.37 KB
/
dump_db_json.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/python3
# Dumps the database into JSON. The output file contains multiple
# reasonably sized JSON objects, separated by newlines.
import psycopg2 as pg
from config import DB_NAME
import json
import zlib
import sys
def bytes_to_json(obj):
"Encode bytes as a repr(bytes) and set __method__ to 'eval'."
if isinstance(obj, bytes):
return {'__method__': 'eval', '__value__': repr(obj)}
raise TypeError(repr(obj) + ' is not JSON serializable')
def dump_case_id_sha1(db, fp):
'Dump the cases table as JSON.'
with db.cursor() as c:
c.execute('SELECT id, sha1 FROM cases ORDER BY id')
results = c.fetchall()
results = [{'id': x[0], 'sha1': x[1]} for x in results]
json.dump({'cases': results}, fp)
print(file=fp)
def dump_case_contents(db, fp):
'''Dump case contents as a newline-separated series of JSON
objects.'''
with db.cursor() as c:
c.execute('SELECT id, z_contents FROM case_view ORDER BY id')
for id_, z_contents in c:
res = {'id': id_, 'contents': zlib.decompress(z_contents)}
json.dump({'case': res}, fp, default=bytes_to_json)
print(file=fp)
def dump_test_runs(db, fp):
'Dump test runs as JSON.'
with db.cursor() as c:
c.execute('SELECT id, start_time, end_time, clang_version, ' +
' llvm_version FROM test_runs ORDER BY id')
res = c.fetchall()
res = [{'id': x[0], 'start_time': x[1], 'end_time': x[2],
'clang_version': x[3], 'llvm_version': x[4]}
for x in res]
json.dump({'test_runs': res}, fp)
print(file=fp)
def dump_reduced_cases(db, fp):
'''Dump reduced case metadata as a newline-separated series of JSON
objects.'''
with db.cursor() as c:
c.execute('SELECT id, original, clang_version, llvm_version, ' +
' result FROM reduced_cases ORDER BY id')
for id_, original, clang_version, llvm_version, result in c:
res = {'id': id_, 'original': original,
'clang_version': clang_version,
'llvm_version': llvm_version, 'result': result}
json.dump({'reduced_case': res}, fp)
print(file=fp)
def dump_reduced_contents(db, fp):
'''Dump reduced case contents as a newline-separated series of JSON
objects.'''
with db.cursor() as c:
c.execute('SELECT reduced_id, contents FROM reduced_contents ' +
' ORDER BY reduced_id')
for id_, contents in c:
res = {'id': id_, 'contents': bytes(contents)}
json.dump({'reduced_case': res}, fp, default=bytes_to_json)
print(file=fp)
def dump_outputs(db, fp):
'''Dump clang outputs from failed cases as a newline-separated series
of JSON objects.'''
with db.cursor() as c:
c.execute('SELECT case_id, output FROM outputs ' +
' ORDER BY case_id')
for id_, output in c:
res = {'id': id_, 'output': zlib.decompress(output)}
json.dump({'output': res}, fp, default=bytes_to_json)
print(file=fp)
def dump_result_strings(db, fp):
'Dump result strings as JSON.'
with db.cursor() as c:
c.execute('SELECT id, str FROM result_strings ORDER BY id')
results = c.fetchall()
results = [{'id': x[0], 'str': x[1]} for x in results]
json.dump({'result_strings': results}, fp)
print(file=fp)
def dump_results(db, fp):
'Dump results as a newline-separated series of JSON objects.'
with db.cursor() as c:
c.execute('SELECT id, case_id, test_run, result ' +
' FROM results ORDER BY id')
for id_, case_id, test_run, result in c:
res = {'id': id_, 'case_id': case_id, 'test_run': test_run,
'result': result}
json.dump({'result': res}, fp)
print(file=fp)
def dump_all(db, fp):
'''Dump the entire database as a newline-separated series of JSON
objects.'''
dump_case_id_sha1(db, sys.stdout)
dump_case_contents(db, sys.stdout)
dump_test_runs(db, sys.stdout)
dump_reduced_cases(db, sys.stdout)
dump_reduced_contents(db, sys.stdout)
dump_outputs(db, sys.stdout)
dump_result_strings(db, sys.stdout)
dump_results(db, sys.stdout)
def main():
db = pg.connect(database=DB_NAME)
with db:
dump_all(db, sys.stdout)
if __name__ == '__main__':
main()