-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathtcpdumpscan.py
84 lines (73 loc) · 3.08 KB
/
tcpdumpscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
import argparse
from datetime import datetime
import json
import os.path
import urllib.request
import time
import socket
import sys
import re
import ssl
import subprocess
def main():
parser = argparse.ArgumentParser(description="Submit find3 passive scan results")
parser.add_argument("-u", dest="url", required=False, help="The find3 REST endpoint url", default="https://cloud.internalpositioning.com")
parser.add_argument("-f", dest="family", required=True, help="The find3 family identifier")
parser.add_argument("-i", dest="iface", required=True, help="The monitor mode interface")
args = parser.parse_args()
ssl._create_default_https_context = ssl._create_unverified_context
parser = SignalParser(args.family, args.url, args.iface)
parser.read()
def is_valid_file(parser, arg):
if not os.path.exists(arg):
parser.error("The file %s does not exist!" % arg)
else:
return arg
class SignalParser():
def __init__(self, family, url, iface):
self.family = family
self.url = url
self.iface = iface
def send_payload(self, stations, last_seen):
payload = self.build_payload(stations, last_seen)
clen = len(payload)
headers = {'Content-Type': 'application/json', 'Content-Length': clen}
req = urllib.request.Request(self.url + '/passive', payload, headers, method='POST')
f = urllib.request.urlopen(req)
response = f.read()
print("response: %s" % response)
f.close()
def build_payload(self, stations, last_seen):
body = {'f': self.family}
body['t'] = int(last_seen.timestamp())
body['d'] = socket.gethostname()
body['s'] = {'wifi': stations}
return str(json.dumps(body)).encode('utf8')
def read(self):
proc = subprocess.Popen(['tcpdump', '-l', '-i', self.iface, '-e', '-s', str(256), 'type', 'mgt', 'subtype', 'probe-req'], stdout=subprocess.PIPE)
stations = {}
start = datetime.now()
for bb in iter(proc.stdout.readline, ''):
try:
row = str(bb, 'utf8')
if (row is None or "Probe" not in row):
time.sleep(0.1)
continue
rows = row.split(' ')
last_seen = datetime.strptime(rows[0].strip(), "%H:%M:%S.%f")
last_seen = datetime.combine(datetime.today(), last_seen.time())
power = re.match(r".*(-\d+|0)dBm", row).group(1)
station = re.match(r".*SA:(.*?)\s", row).group(1)
stations[station] = int(power)
elapsed = datetime.now() - start
if (len(stations) > 0 and int(power) != 0):
print("Found %s with signal power of %d dBm at %s" % (station, int(power), last_seen))
self.send_payload(stations, last_seen)
stations = {}
except Exception as e :
time.sleep(0.02)
sys.stdout.flush()
print("Failed to read from tcpdump: %s" % (str(e)))
if __name__ == "__main__":
main()