-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomm_finder.py
149 lines (126 loc) · 4.08 KB
/
comm_finder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def same_communication(p1, p2):
ips = [p1.src_ip, p1.dest_ip]
ports = [p1.src_port, p1.dest_port]
if p2.src_ip in ips and p2.dest_ip in ips and p2.src_port in ports and p2.dest_port in ports:
return True
else:
return False
def same_icmp_communication(p1, p2):
ips = [p1.src_ip, p1.dest_ip]
seq_n = p1.icmp_seq_n
if p2.src_ip in ips and p2.dest_ip in ips and p2.icmp_seq_n == seq_n:
return True
else:
return False
def get_flag_byte_from_packet(p):
return p.packet[94:96].decode('utf-8')
def contains_flag(all_flags, requested_flags):
for fl in requested_flags:
if fl == 'syn':
if int(all_flags, 16) & (1 << 1) != 0:
return True
elif fl == 'rst':
if int(all_flags, 16) & (1 << 2) != 0:
return True
elif fl == 'fin':
if int(all_flags, 16) & 1 != 0:
return True
return False
def is_complete(communication):
if len(communication) < 3:
return False
flag_syn = get_flag_byte_from_packet(communication[0])
flag_fin = get_flag_byte_from_packet(communication[-2])
flag_rst = get_flag_byte_from_packet(communication[-1])
if contains_flag(flag_syn, ['syn']) and (contains_flag(flag_fin, ['fin']) or contains_flag(flag_rst, ['rst'])):
return True
else:
return False
def find_comms(packets):
incomplete_communications = []
complete_communications = []
communication = []
packet = None
get_packet = 1
i = 0
while packets:
if get_packet:
packet = packets.pop(0)
i -= 1
get_packet = 0
communication.append(packet)
else:
p = packets[i]
if same_communication(packet, p):
communication.append(p)
packets.pop(i)
i -= 1
i += 1
if i == len(packets):
get_packet = 1
i = 0
if is_complete(communication):
complete_communications.append(communication.copy())
else:
incomplete_communications.append(communication.copy())
communication.clear()
return next(iter(complete_communications or []), None), next(iter(incomplete_communications or []), None)
def find_tftp_comms(tftp_datagrams, p_val_by_name):
communication = []
communications = []
ips = []
port = 0
packet = None
get_packet = 1
i = 0
while tftp_datagrams:
if get_packet:
packet = tftp_datagrams.pop(0)
i -= 1
get_packet = 0
communication.append(packet)
else:
p = tftp_datagrams[i]
ips.append(packet.src_ip)
ips.append(packet.dest_ip)
if packet.src_port != p_val_by_name['TFTP']:
port = packet.src_port
elif packet.dest_port != p_val_by_name['TFTP']:
port = packet.dest_port
if p.src_ip in ips and p.dest_ip in ips:
if p.src_port == port or p.dest_port == port:
communication.append(p)
tftp_datagrams.pop(i)
i -= 1
i += 1
if i == len(tftp_datagrams):
get_packet = 1
i = 0
communications.append(communication.copy())
communication.clear()
return communications
def find_icmp_comms(packets, p_name_by_val, p_val_by_name):
communications = []
communication = []
packet = None
get_packet = 1
i = 0
while packets:
if get_packet:
packet = packets.pop(0)
i -= 1
get_packet = 0
communication.append(packet)
else:
p = packets[i]
if same_icmp_communication(packet, p):
communication.append(p)
packets.pop(i)
i -= 1
i += 1
if i == len(packets):
get_packet = 1
i = 0
communications.append(communication.copy())
communication.clear()
return communications