-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdefense.py
77 lines (62 loc) · 2.43 KB
/
defense.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import sys
import signal
import os
import multiprocessing
from time import sleep
from multiprocessing import Process
from defense.detection.mlids.classifier_impl import MessageClassifier
import response
import click
import ruleids
@click.command()
@click.option('-t', '--time', type=float,
help='Sets length of defense (default=unbounded)')
@click.option('--quiet-ruleids', is_flag=True,
help='Suppress ruleids output')
@click.option('--quiet-ml', is_flag=True,
help='Suppress machine learning output')
@click.option('--quiet-response', is_flag=True,
help='Suppress response output')
@click.option('-q', '--quiet', is_flag=True,
help='Suppress all output')
@click.option('-c', '--channel', type=str,
help='Channel on which to listen for payloads (e.g. vcan0, can0)'
' (default=vcan0)')
def main(time, channel, quiet, quiet_ruleids, quiet_ml, quiet_response):
payloads = multiprocessing.Queue()
if channel is None:
channel = 'vcan0'
if quiet:
quiet_ruleids = True
quiet_response = True
quiet_ml = True
ruleids_defense = Process(target=ruleids.detect_attacks,
args=(payloads, quiet_ruleids, channel,))
ml_ids = MessageClassifier(os.getcwd() + '/ml_ids_model')
ml_defense = Process(target=ml_ids.detect_attacks,
args=(payloads, quiet_ml, channel))
response_scheme = Process(target=response.response,
args=(payloads, quiet_response, channel,))
ruleids_defense.start()
ml_defense.start()
response_scheme.start()
if time is not None:
sleep(time)
# generate keyboard interrupts to quit out of all processes
os.kill(ruleids_defense.pid, signal.SIGINT)
os.kill(ml_defense.pid, signal.SIGINT)
os.kill(response_scheme.pid, signal.SIGINT)
os.kill(os.getpid(), signal.SIGINT)
def dummy_response_scheme(payloads, quiet):
while True:
# Block until we get a can msg the defense found as malicious
payload = payloads.get()
if not quiet:
print("Response scheme received payload: ")
print("\t{}".format(payload))
print("\tmessage num: {}, caught by: {}".format(payload.index, payload.defense))
print("")
def dummy_machine_learning_defense(payloads, quiet, channel):
pass # do nothing
if __name__ == '__main__':
main()