forked from TheAdityaKedia/Flocking-Simulator-Python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflocking.py
305 lines (248 loc) · 10 KB
/
flocking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import numpy as np
from numpy.linalg import norm
from parso import parse
from scipy.spatial.distance import pdist, squareform
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
import random
import argparse
matplotlib.rcParams["animation.writer"] = "avconv"
plt.rcParams["animation.html"] = "html5"
"""robot specific simulation parameters"""
num_robots = 50 # number of robots in the simulation
bot_size = 0.25 # size of each robot
sense_range = 50 # range of robots sensor
avoidence_range = 5 # distance at which other robots will be avoided
max_vel = 100 # robots will move at 'max_vel' m/s at their best
close_enough = 5 # acceptable range for arriving at target
dt = 0.01 # simulation timestep set to 10ms
"""map specific simulation parameters"""
line1 = np.array([[0, 200], [200, 200]]) # co-ordinate of the first line in the arena
line2 = np.array([[200, 200], [200, 0]]) # co-ordinates of the second line in the arena
target1 = line1[1] # first target that the robots must rach
target2 = line2[1] # second target taht the robots must reach
lines = np.array([line1, line2])
targets = np.array([target1, target2])
"""generating robots, velocities and distance matrix"""
x_pos = random.sample(range(-50, 51), num_robots)
y_pos = random.sample(range(150, 251), num_robots)
robots_pos = np.array(
[[i, j] for i, j in zip(x_pos, y_pos)]
) # 'robots_pos' contains the locations of all robots
robots_vel = np.zeros(
(num_robots, 2)
) # 'robots_vel' contains the velocity of each robots
fig = plt.figure(figsize=(6, 6))
ax = fig.add_subplot(111, aspect="equal", xlim=(-70, 270), ylim=(-70, 270))
(line1_plot,) = ax.plot(
[line1[0, 0], line1[1, 0]], [line1[0, 1], line1[1, 1]], color="black"
)
(line2_plot,) = ax.plot(
[line2[0, 0], line2[1, 0]], [line2[0, 1], line2[1, 1]], color="black"
)
(robots_plot,) = ax.plot(robots_pos[:, 0], robots_pos[:, 1], "ro", ms=2)
# intitialize the array of robot velocities to random numbers between [0, 1)
# scale them by the maximum allowed velocity
robots_vel = np.random.rand(num_robots, 2) * max_vel
def init():
line1_plot.set_data([line1[0, 0], line1[1, 0]], [line1[0, 1], line1[1, 1]])
line2_plot.set_data([line2[0, 0], line2[1, 0]], [line2[0, 1], line2[1, 1]])
return (line1_plot, line2_plot)
def get_centroid():
"""Returns centroid of the entire system of robots."""
centroid = (
np.sum(robots_pos, axis=0) / num_robots
) # The centroid is simply the average of the x and y coordinates of all points
return centroid
def get_average_distance_from_centroid():
"""Returns the average distance of all robots from the centroid."""
centroid = get_centroid()
distance = np.mean([norm(centroid - robots_pos[i]) for i in range(num_robots)])
return distance
def get_distance_matrix():
"""Returns a squared distance matrix of all robots in the system"""
distances = pdist(robots_pos)
distance_matrix = squareform(distances)
return distance_matrix
def get_unit_vector(vector):
"""Given a vector as an np array, this function returns the unit vector in that direction"""
return vector / norm(vector)
def get_direction(point1, point2):
"""Given 2 points as np arrays, this function returns the unit vector of the distnce from point 1 to point 2"""
return get_unit_vector(point2 - point1)
def reached_target(target, centroid):
"""Given a taget and the centroid of the system of robots, this function returns true if centroid is close enught to the target"""
if target[1] == target1[1]:
if (target[0] - centroid[0]) < close_enough:
return True
else:
if (target[1] - centroid[1]) < close_enough:
return True
return False
def homing_towards_line(i, line):
"""Function to make a robot home towards a line
Takes as argument the index of the robot to move and the line towards which it should move
Returns the unit velocity vector to be emparted to the robot in order to home towards the line"""
bot_position = robots_pos[i]
velocity = np.zeros((2))
if line is line1:
if bot_position[1] > line[0, 1]:
velocity[1] = -1
else:
velocity = 1
if line is line2:
if bot_position[0] > line[0, 0]:
velocity[0] = -1
else:
velocity = 1
return velocity
def homing_towards_point(i, point):
"""Function to make a robot home towards a point
Takes as argument the index of the robot to move and the point towards which it should move
Returns the velocity to be imparted to the robot in order to home towards the point"""
bot_position = robots_pos[i]
unit_vector = get_direction(bot_position, point)
velocity = unit_vector
return velocity
def aggregate(i):
"""This function takes as argument the index of a robot
It checks the robot's sensor readings to see other robots in range
It then instructs the robot to move towards other robots
Returns the velocity to be imparted to the robot to for aggregation"""
bot_position = robots_pos[i] # retireve position of current robot
velocity = np.zeros((2)) # This is the vlocity that will be returned
bot_distances = np.array(
distance_matrix[i]
) # retrieve distances of all robots to this robot
(close_robots,) = np.where(
bot_distances < sense_range
) # get list of indicies for the robots that are in range
(index,) = np.where(
close_robots == i
) # find the index of this robot in the above list
close_robots = np.delete(close_robots, index) # delete that entry from the list
# close_robots now contains the indices of all the robots in range of robot i
# for every robot in sensor range calculate the unit vector to that robot
# add it to this robot's velocity
# then find the unit vector of that velocity
# finally multiply that unit vector for the velocity with the max veocity
if close_robots.size != 0:
for bot in close_robots:
bot_position_2 = robots_pos[bot]
velocity = velocity + get_direction(bot_position, bot_position_2)
velocity = get_unit_vector(velocity)
return velocity
def avoid_peers(i): # Same as Dispersion
"""This funtion takes as input the index of a robot
It checks the robot's sensor readings to see other robots that are too close for comfort
It then instructs the robot to move away from those robots that are within avoidence range
Returns the velocity to be imparted to the robot to for avoidence"""
bot_position = robots_pos[i] # retireve position of current robot
velocity = np.zeros((2)) # This is the vlocity that will be returned
bot_distances = distance_matrix[i] # retrieve distances of all robots to this robot
(close_robots,) = np.where(
bot_distances < avoidence_range
) # get list of indicies for the robots that are in range
(index,) = np.where(
close_robots == i
) # find the index of this robot in the above list
close_robots = np.delete(close_robots, index) # delete that entry from the list
# for every robot in avoidence range calculate the unit vector to that robot
# add it to this robot's velocity (don't worry, we wil negate the whole thing at the end)
# then find the unit vector of tat velocity vector
# finally multiply that unit vector of the velocity with negateive one
if close_robots.size != 0:
for bot in close_robots:
bot_position_2 = robots_pos[bot]
velocity = velocity + get_direction(bot_position, bot_position_2)
velocity = -1 * get_unit_vector(velocity)
return velocity
"""weight for motion primitives"""
w_home_point = 1
w_home_line = 0.25
w_aggregate = 2
w_avoid = 4
"""initial path"""
line = line1
point = target1
"""Region Simulation function"""
# This function gets called every step of the animation
def simulate(t):
global distance_matrix
global robots_vel
global robots_pos
global centroid
global avg_dist
global timestep
global centroid_line_distance
global line
global point
# calculate distance matrix
distance_matrix = get_distance_matrix()
# calculate centroid
centroid = get_centroid()
# switch tagets when the first target has basically been reached
if reached_target(point, centroid):
line = line2
point = target2
robots_vel = np.array(
[
max_vel
* get_unit_vector(
w_home_line * homing_towards_line(i, line)
+ w_home_point * homing_towards_point(i, point)
+ w_aggregate * aggregate(i)
+ w_avoid * avoid_peers(i)
)
for i in range(num_robots)
]
)
robots_pos = robots_pos + dt * robots_vel
return robots_pos
def update(t):
positions = simulate(t)
robots_plot.set_data(positions[:, 0], positions[:, 1])
return (robots_plot,)
if __name__ == "__main__":
"""Run simulation depending on parsed arguments."""
parser = argparse.ArgumentParser()
parser.add_argument(
"--num-robots",
type=int,
nargs=1,
help="Number of robots used in flocking simulation.",
)
parser.add_argument(
"--num-adversarial",
type=int,
nargs=1,
help="Number of adversarial robots used in flocking simulation.",
)
parser.add_argument(
"--iters", type=int, nargs=1, help="Number of iterations of simulation."
)
parser.add_argument(
"--noise",
type=bool,
nargs=1,
help="Amount of Gaussian noise involved in communications between robots.",
)
parser.add_argument(
"--max-velocity", type=int, nargs=1, help="Max velocity of robots."
)
parser.add_argument(
"--sensor-range", type=int, nargs=1, help="Sensor range of each robot."
)
args = parser.parse_args()
print(f"iters: {args.iters}")
animate = animation.FuncAnimation(
fig,
update,
# init_func=init,
frames=5000,
interval=4,
# blit=True
)
plt.show()