-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzebra.py
74 lines (65 loc) · 3.24 KB
/
zebra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import requests
import concurrent.futures
import re
class Zebra:
def __init__(self, key):
self.headers = {'X-TBA-Auth-Key':key}
def get_link(self, match_key):
baseURL = 'https://www.thebluealliance.com/api/v3/'
url=baseURL+"match/"+match_key+"/zebra_motionworks"
return url
# Output: competition level, match number
def get_info(self, event, match_key):
level = re.sub(r'\d', ' ', match_key[len(event)+1:]).split(" ")[0]
if level!="sf":
return (level, match_key[match_key.rindex("m")+1:])
else:
return (level, re.findall(r'\d+', match_key[len(event)+1:])[0])
# Output: success, match_data
def get_tba_data(self, match_key):
data = requests.get(url=self.get_link(match_key), headers=self.headers).json()
if data is None:
return True, []
if 'Error' in data:
return False, []
return True, data
def get_raw_data(self, match_key, max_data_loss):
data = {}
alliances = ['red', 'blue']
success, match_data = self.get_tba_data(match_key)
if not success:
return match_key, data
if match_data!=[]:
times = match_data["times"]
for color in alliances:
alliance_data = match_data['alliances'][color]
for team_data in alliance_data:
x_coordinates = team_data["xs"]
y_coordinates = team_data["ys"]
if x_coordinates[0:max_data_loss+1].count(None)<=max_data_loss and y_coordinates[0:max_data_loss+1].count(None)<=max_data_loss:
starting_time = times[x_coordinates.index(next(item for item in x_coordinates if item is not None))]
x_coordinates = list(filter((None).__ne__, team_data["xs"]))
y_coordinates = list(filter((None).__ne__, team_data["ys"]))
data[team_data["team_key"]]={"alliance":color, "starting_time":starting_time,"x":x_coordinates, "y":y_coordinates}
return match_key, data
def get_event_zebra(self, event, max_data_loss=10):
threads = []
data = {}
matches = requests.get(url='https://www.thebluealliance.com/api/v3/event/'+event+'/matches/keys', headers=self.headers).json()
with concurrent.futures.ThreadPoolExecutor() as executor:
for match_key in matches:
thread = executor.submit(self.get_raw_data, match_key, max_data_loss)
threads.append(thread)
for thread in threads:
match_key, new_data = thread.result()
competition_level, match_number = self.get_info(event, match_key)
for team_number in new_data.keys():
if team_number not in data.keys():
data[team_number]={}
if competition_level not in data[team_number].keys():
data[team_number][competition_level]={}
data[team_number][competition_level][match_number]=new_data[team_number]
return data
def get_match_zebra(self, match_key, max_data_loss=10):
_, match_data = self.get_raw_data(match_key, max_data_loss)
return match_data