-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcsi_block.py
179 lines (164 loc) · 6.18 KB
/
csi_block.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from logging import basicConfig, getLogger, INFO
from configparser import ConfigParser
from libs.osintchck import OSINTBlock
def update_block_list(
path_to_block_list, csi_ip_list
):
"""Upates file with data from OSINTBlock instances.
This function updates a file that already contains IP addresses
that are being blocked with data returned by an OSINTBlock instance
(and the associated methods.) This is accompslihed by reading a
file, removing old data obtained by the generate_block_list method
and writing the new data to the end of the file.
Keyword arguments:
path_to_block_list - A string that is the location of a file that
contains a list of specifically formatted IP addresses.
csi_ip_list - A list of IP addresses returned by
generate_block_list.
Outputs:
Nothing is returned, however, block list is updated with the new
IP addresses returned by generate_block_list.
Raises:
OSError - Occurs if path_to_block_list does not exist or cannot be
edited due to permissions errors."""
log = getLogger('auto_ip_block')
# Opening block list file. If we can't due to an OSError, quit
# and make a log entry.
try:
block_file = open(path_to_block_list, 'r', encoding='ascii')
except OSError:
log.exception('Unable to open block file. This needs to be fixed.')
exit(1)
temp_block_list = []
# Writing current block list from file.
for line in block_file:
temp_block_list.append(line.strip('\n'))
block_file.close()
# Removing old ABL entries.
indexes = []
for entry in temp_block_list:
if '#ABL' in entry:
indexes.append(temp_block_list.index(entry))
if len(indexes) >= 1:
try:
del temp_block_list[min(indexes):max(indexes) + 1]
except ValueError:
log.exception('No ABL entries in block list to remove.')
# Writing new ABL entries to a list.
for entry in csi_ip_list:
temp_block_list.append(entry)
# Opening block list file. If we can't due to an OSError, quit
# and make a log entry.
try:
block_file = open(path_to_block_list, 'w', encoding='ascii')
except OSError:
log.exception('Unable to open block file. This needs to be fixed.')
exit(1)
# Updating IP block file.
for entry in temp_block_list:
block_file.write(entry + '\n')
block_file.close()
def remove_csi_ips(path_to_block_list):
"""Edits list to remove IPs gathered from CSI.
This function is designed to be used in a "break glass in case of
emergency" situations where the IPs gathered from different CSI
sources.
Keyword arguments:
path_to_block_list - str(), the path to the block list location.
Outputs:
Nothing is returned by this function.
Raises:
OSError - Occurs when the blocklist does not exist or cannot be
opened due to permissions issues."""
log = getLogger('auto_ip_block')
blocked_ips = []
# Opening block list file. If we can't due to an OSError, quit
# and make a log entry.
try:
block_list = open(path_to_block_list, 'r', encoding='ascii')
except OSError:
log.exception('Unable to open block file. This needs to be fixed.')
exit(1)
# Getting current block list.
for line in block_list:
blocked_ips.append(line.strip('\n'))
block_list.close()
# Removing ABL entries.
indexes = []
for entry in blocked_ips:
if '#ABL' in entry:
indexes.append(blocked_ips.index(entry))
try:
del blocked_ips[min(indexes):max(indexes) + 1]
except ValueError:
log.exception('Error occured when removing ABL entries.')
# Opening block list file. If we can't due to an OSError, quit
# and make a log entry.
try:
block_list = open(path_to_block_list, 'w', encoding='ascii')
except OSError:
log.exception('Unable to open block file. This needs to be fixed.')
exit(1)
# Writing blocked ips (without the ABL entries) back to the block
# list file.
for entry in blocked_ips:
block_list.write(entry + '\n')
block_list.close()
def main():
"""Doing the thing."""
# Setting logging.
log = getLogger('auto_ip_block')
basicConfig(
format='%(asctime)s %(name)s %(levelname)s: %(message)s',
datefmt='%m/%d/%Y %H:%M:%S',
level=INFO,
filename='csi_auto_block.log'
)
ip_block = OSINTBlock()
# Getting the block file's path from a config.
config = ConfigParser()
config.read('config.cnf')
block_path = config['block']['path']
# Retrieving the block lists.
# Emerging threat's known compromised host list.
emerging_threat_response = ip_block.get_et_ch()
if emerging_threat_response != 200:
log.error(
'%d response code from ET', emerging_threat_response
)
# URLHaus Botnet C2 list.
abuse_ch_response = ip_block.get_ssl_bl()
if abuse_ch_response != 200:
log.error(
'%d response code from abuse.ch', abuse_ch_response
)
# Cisco Talos IP block list.
cisco_response = ip_block.get_talos_list()
if cisco_response != 200:
log.error(
'%d response code from Cisco Talos', cisco_response
)
# Blocklist.de ban list.
blocklist_de_response = ip_block.get_blde_list()
if blocklist_de_response != 200:
log.error(
'%d response code from blocklist.de', blocklist_de_response
)
# Nothink.org's list of servers that conduct brute force attacks
# against SSH servers.
nothink_response = ip_block.get_nt_ssh_bl()
if nothink_response != 200:
log.error(
'%d response code from Nothink.org', nothink_response
)
# AbuseIP DB's list of IPs that have an abuse score of 100 (default).
abuse_ip_response = ip_block.get_adb_bl(config['api']['aipdb'])
if abuse_ip_response != 200:
log.error(
'%d response code from Abuse IP DB', abuse_ip_response
)
# Consolidating the list and writing it to a file.
auto_block_list = ip_block.generate_block_list()
update_block_list(block_path, auto_block_list)
if __name__ == '__main__':
main()