-
Notifications
You must be signed in to change notification settings - Fork 0
/
theta_star.py
144 lines (113 loc) · 4.83 KB
/
theta_star.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import heapq
import math
from bresenham import make_line
from config import MAP_RESOLUTION, CRUISE_SPEED
from cost_functions import total_marginal_costs, marginal_vehicle_costs, fixed_vehicle_costs
class Node:
def __init__(self, x, y):
self.x = x
self.y = y
self.g = (float('inf'), float('inf'))
self.f = float('inf')
self.parent = None
def __lt__(self, other):
return self.f < other.f
def __eq__(self, other):
return self.x == other.x and self.y == other.y
def __hash__(self):
return hash((self.x, self.y))
def distance(node1, node2):
dx = node1.x - node2.x
dy = node1.y - node2.y
return math.sqrt(dx ** 2 + dy ** 2)
def reconstruct_path(current, start, grid):
path = []
cost_real = fixed_vehicle_costs() + current.g[0]
cost_img = current.g[1]
risk = 0
cumdist = 0
while current is not start:
if distance(current, current.parent) < 1.42:
risk += grid[current.y][current.x]
else:
nodes = list(make_line(current.x, current.y, current.parent.x, current.parent.y))
for node in nodes:
risk += grid[node[1]][node[0]]
cumdist += distance(current, current.parent)
path.append((current.x, current.y))
current = current.parent
print("Risk: {}".format(risk))
return path[::-1], cost_real, cost_img, cumdist * MAP_RESOLUTION, (cumdist * MAP_RESOLUTION) / CRUISE_SPEED
def heuristic(node1, node2):
time_between = (distance(node1, node2) * MAP_RESOLUTION) / CRUISE_SPEED
return marginal_vehicle_costs(time_between) / 10
# return distance(node1, node2)
def _calc_cost(child: Node, best: Node, grid):
g1_cost = total_marginal_costs(grid, best, child)
g2_cost = total_marginal_costs(grid, best.parent, child)
g1_real, g1_img = best.g[0] + g1_cost[0], best.g[1] + g1_cost[1]
g2_real, g2_img = best.parent.g[0] + g2_cost[0], best.parent.g[1] + g2_cost[1]
g1 = g1_real + g1_img
g2 = g2_real + g2_img
if g2 <= g1:
return g2_real, g2_img, best.parent
else:
return g1_real, g1_img, best
def run_theta_star(grid, start, goal):
rows, cols = len(grid), len(grid[0])
open_set = []
closed_set = set()
start_node = Node(start[0], start[1])
goal_node = Node(goal[0], goal[1])
start_node.g = (0, 0)
start_node.f = 0
start_node.parent = start_node
heapq.heappush(open_set, start_node)
while open_set:
current = heapq.heappop(open_set)
if current == goal_node:
return reconstruct_path(current, start_node, grid)
closed_set.add(current)
neighbors = []
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
if dx == 0 and dy == 0:
continue
x, y = current.x + dx, current.y + dy
if 0 <= x < cols and 0 <= y < rows:
neighbor = Node(x, y)
neighbors.append(neighbor)
for neighbor in neighbors:
if neighbor in closed_set:
continue
cost_real, cost_img, parent = _calc_cost(neighbor, current, grid)
neighbor.g = (cost_real, cost_img)
neighbor.parent = parent
neighbor.f = cost_real + cost_img + heuristic(neighbor, goal_node)
# # Check heuristic consistency
# h_s = heuristic(parent, goal_node)
# h_s_prime = heuristic(neighbor, goal_node)
# c_s_prime = total_marginal_costs(grid, parent, neighbor)
# if h_s > h_s_prime + c_s_prime:
# print("Heuristic inconsistency detected!")
# print("h(s) = {}, h(s') = {}, c(s, s') = {}".format(h_s, h_s_prime, c_s_prime))
#
# # Check heuristic admissability
# c_gs_prime = total_marginal_costs(grid, neighbor, goal_node)
# if h_s_prime >= c_gs_prime:
# print("Heuristic admissability violated!")
# print("h(s') = {}, c(s', g) = {}".format(h_s_prime, c_gs_prime))
# Calculate the cost from the current node to the neighbor
if grid[neighbor.y][neighbor.x] >= grid[current.y][current.x]:
# Avoid the direct connection if the cost from the parent is higher
tentative_g = (float('inf'), float('inf'))
else:
tentative_cost = total_marginal_costs(grid, current, neighbor)
tentative_g = (current.g[0] + tentative_cost[0], current.g[1] + tentative_cost[1])
if sum(tentative_g) < sum(neighbor.g):
neighbor.g = tentative_g
neighbor.f = sum(neighbor.g) + heuristic(neighbor, goal_node)
neighbor.parent = current
if neighbor not in open_set:
heapq.heappush(open_set, neighbor)
return None # No path found