-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBIRRTX.py
234 lines (206 loc) · 7.39 KB
/
BIRRTX.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import argparse
import importlib
import logging
import sys
import math
import copy
import random
import time
import numpy as np
from collections import namedtuple
import robot
import grid
import graph
import obstacles
import printing
import constants
# Matthew Westbrook
# Batch Informed Rapidly Exploring Randomizing Tree (BIRRT^x)
# -Navigate obstacles with real time planner and
# time varying obstacles. Graph expands from random sampling
# (batch informed or normal RRT random) and improves path
# when possible to within epsilon consistency. Current path to
# goal from each node makes a tree as in RRT. Planning is done
# from goal to start and re-planning is done as neccessary.
# Time to reach goal is affected by speed of connecting robot
# to root of tree, amount that plan is able to be improved during
# traversal, and robot speed.
def get_batch(R, C, blocked, bound, p1, p2):
QV = []
for i in range(0,constants.m):
[sx, sy] = [random.uniform(0, C-.1), random.uniform(0, R-.1)]
if grid.coord2grid(C, sx, sy) not in blocked:
d = grid.ed(p1[0],p1[1], sx, sy) + grid.ed(sx, sy, p2[0],p2[1])
if d < bound:
QV.append([d, [sx, sy]])
QV.sort(key=lambda x: x[0])
return QV
def search(alg, R, C, blocked, unknown, unknown_loc, x0, y0, gx, gy):
# Tracks what edges and vertices are in each grid cell
v_cell = [[] for i in range(0, R*C)]
e_cell = [[] for i in range(0, R*C)]
# Tracks vertices that could be connected without obstacles in the way
e_blocked = [[] for i in range(0, R*C)]
# Initialize: Create start conditions for planning
# Start position
x = x0
y = y0
v_bot = -1
# Start time
alg_start = time.time()
# Is bot connected to tree
v_connect = False
# No movement until initial plan found
moving = False
# Put goal location in graph
V = 1
# -Vertex locations as numpy array
v_loc = np.array([[gx, gy]])
# -Vertex additional info structure is gval, lmcval, parent vertex, children, validity (incase later blocked)
v_info = [[0.0, 0.0, -1, {}, True]]
# -Outgoing edge in structure is list of dictionaries for each vertex
# where key is other vertex and value is edge length
eoi = [{}]
# -Outgoing edge out
eoo = [{}]
# -Running edge in
eri = [{}]
# -Running edge out
ero = [{}]
v_cell[grid.coord2grid(R, x, y)].append(0)
# Queue of vertices to rewire
Q = []
# List of disconnected vertices
v_orphan = []
# Batch of samples if using batch informed
QV = []
cost = float("inf")
# Save first map after start connected
first_saved = False
block_remove = []
block_add = []
loops = 0
t_print = time.time()
# Loop until robot close to goal
while (grid.ed(x, y, gx, gy) > constants.GOAL_RADIUS):
# Neighborhood radius
r = grid.shrinking_ball(V)
# Update robot and obstacle if it has started moving
if moving:
cost = v_info[v_bot][0]
v_bot_old = v_bot
[v_bot, p, x, y, th, movement_start, movement_time] = robot.update_bot(movement_start, movement_time, v_bot, eri, p, v_loc, v_info, x, y, th)
if p == -1:
x = gx
y = gy
break
[block_remove, block_add, unknown_loc, moving] = obstacles.check_obstacles(C, blocked, unknown_loc, unknown, Q, v_bot, v_cell, e_cell,\
[x, y], e_blocked, v_loc, v_info, v_orphan, r, eoi, eoo, eri, ero)
if len(block_remove) > 0 or len(block_add) > 0:
printing.print_update(alg_start, block_remove, block_add)
if not moving:
cost = float("inf")
else:
if not v_bot == v_bot_old:
printing.print_robot_path(v_bot, v_loc, v_info)
t_print = time.time()
# If not moving, see if robot is ready to move
else:
[success, v_loc] = robot.try_to_start(v_loc, x, y, C, blocked, e_blocked, v_info, r, V, eoi, eoo, eri, ero, v_cell, e_cell, Q, v_orphan)
if success:
if not first_saved:
first_saved = True
started_moving = time.time()
print("STARTED MOVING IN: " + str(started_moving-alg_start))
printing.print_map(v_loc, v_info, "first_path.csv")
v_bot = V
V += 1
v_connect = True
moving = True
movement_start = time.time()
p = v_info[v_bot][2]
movement_time = grid.ed(v_loc[v_bot,0],v_loc[v_bot,1],v_loc[p,0],v_loc[p,1])/constants.s
th = grid.angle(v_loc[v_bot,0], v_loc[v_bot,1], v_loc[p,0], v_loc[p,1])
# Batch informed sampling or random
if alg == 1:
# New sample batch if empty or all remaining are useless
if len(QV) == 0:
QV = get_batch(R, C, blocked, cost, [x, y], [gx, gy])
elif QV[0][0] > cost:
QV = get_batch(R, C, blocked, cost, [x, y], [gx, gy])
if len(QV) > 0:
[d, [sx, sy]] = QV.pop(0)
else:
continue
else:
[sx, sy] = [random.uniform(0, C-.1), random.uniform(0, R-.1)]
# Nearest vertex
v_near = grid.nearest([sx, sy], v_loc)
if grid.ed(v_loc[v_near,0], v_loc[v_near,1], sx, sy) > constants.delta:
[sx, sy] = grid.saturate([sx, sy], v_loc[v_near])
# Extend graph if possible
if grid.coord2grid(C, sx, sy) not in blocked:
[success, v_loc] = graph.extend(C, blocked, e_blocked, [sx, sy], v_loc, v_info, r, V, eoi, eoo, eri, ero, v_cell, e_cell)
if success:
graph.rewire_neighbors(Q, V, v_info, r, eoi[V], eri[V], ero[V])
graph.reduce_inconsistency(Q, v_bot, v_info, eoi, eri, eoo, ero, r, v_orphan)
V += 1
block_remove = []
block_add = []
loops += 1
print(str(V) + " NODES EXPANDED")
print("REACHED GOAL IN: " + str(time.time()-alg_start))
printing.print_map(v_loc, v_info, "final_path.csv")
def main():
# Determine rrtx or birrtx
alg = 0
for line in sys.argv:
if line == "-BIRRTX":
alg = 1
# Read in map
# -first two lines are rows and columns
# -third line is robot start location
# -forth line is goal location
# -rest of lines are initial guess of map and true map
# Map size
COLUMNS = int(sys.stdin.readline())
ROWS = int(sys.stdin.readline())
# Initial location
arg = sys.stdin.readline().rstrip().split()
[x0, y0] = [float(arg[0]), float(arg[1])]
# Goal location
arg = sys.stdin.readline().rstrip().split()
[gx, gy] = [float(arg[0]), float(arg[1])]
# Blocked cells list (current knowledge of map)
blocked = {}
blocked_coord = []
for i in range(0,ROWS):
cells = sys.stdin.readline().rstrip()
for j in range(0, COLUMNS):
if (cells[j] == "#"):
# space is blocked
blocked[(ROWS-i-1)*COLUMNS+j] = 1
blocked_coord.append([j, (ROWS-i-1)])
# do nothing if space is empty
printing.print_grid(blocked_coord, "first_grid.csv")
# Cells that are different from initial knowledge
sys.stdin.readline()
unknown = {}
unknown_loc = np.array([[2*ROWS, 2*COLUMNS]])
blocked_coord = []
for i in range(0,ROWS):
cells = sys.stdin.readline().rstrip()
for j in range(0, COLUMNS):
if (cells[j] == "#"):
blocked_coord.append([j, (ROWS-i-1)])
# Check if different from initial knowledge
if (cells[j] == "#") and (((ROWS-i-1)*COLUMNS+j) not in blocked):
unknown[(ROWS-i-1)*COLUMNS+j] = 1
unknown_loc = np.append(unknown_loc, [[j, ROWS-i-1]], axis = 0)
elif (cells[j] == "_") and (((ROWS-i-1)*COLUMNS+j) in blocked):
unknown[(ROWS-i-1)*COLUMNS+j] = 1
unknown_loc = np.append(unknown_loc, [[j, ROWS-i-1]], axis = 0)
search(alg, ROWS, COLUMNS, blocked, unknown, unknown_loc, x0, y0, gx, gy)
printing.print_grid(blocked_coord, "final_grid.csv")
if __name__ == '__main__':
main()