-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshodan_thread.py
112 lines (91 loc) · 2.3 KB
/
shodan_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import shodan
import sys
import configparser
import requests
import threading
import queue
config = configparser.ConfigParser()
config.read('conf.ini')
# Get testing value
test = config.getboolean('shodan', 'test')
if test:
test_url = config.get('shodan', 'test_url')
# Get Shodan API key
SHODAN_API_KEY = config.get('shodan', 'key')
api = shodan.Shodan(SHODAN_API_KEY)
# Get number of threads to use
NUMT = config.get('shodan', 'NUMT')
q = queue.Queue()
san = queue.Queue()
'''
Search results using Shodan API
'''
def search():
try:
results = api.search('X-Marathon-Leader')
for result in results['matches']:
try:
ip = result['http']['host']
ip_str = result['ip_str']
if ip != ip_str:
ip = ip_str
port = str(result['port'])
loc = result['http']['location']
if port == '443':
url = 'https://' + ip + loc
elif port == '8443':
url = 'https://' + ip + ':' + port + loc
else:
url = 'http://' + ip + ':' + port + loc
q.put(url)
except KeyError:
continue
except shodan.APIError as e:
print('Error: {}'.format(e))
'''
Sanitize list, remove not 200 OK URLs
'''
def sanitize():
url = q.get()
try:
status = requests.get(url, timeout=5, verify=False).status_code
if status == 200:
san.put(url)
except (requests.ConnectTimeout, requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e:
return
'''
Producer work
'''
def producer():
try:
search()
except (OSError, KeyboardInterrupt):
sys.exit(0)
'''
Consumer work
'''
def consumer():
while not q.empty():
sanitize()
'''
Start threads function
'''
def startThreads():
p = threading.Thread(target=producer)
consumers = [threading.Thread(target = consumer) for i in range(int(NUMT))]
p.daemon = True
p.start()
p.join()
for c in consumers:
c.daemon = True
c.start()
for c in consumers:
c.join()
def start():
if test:
return [test_url]
startThreads()
l = []
while not san.empty():
v = san.get()
l.append(v)