Skip to content

Commit

Permalink
cs249 obs avoidance init implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
wuxiaohua1011 committed Dec 7, 2021
1 parent 5ec3c16 commit 1529975
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 34 deletions.
173 changes: 140 additions & 33 deletions ROAR/agent_module/cs249_agent.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ROAR.agent_module.agent import Agent
from ROAR.utilities_module.data_structures_models import SensorsData, Transform
from ROAR.utilities_module.data_structures_models import SensorsData, Transform, Location
from ROAR.utilities_module.vehicle_models import Vehicle, VehicleControl
from ROAR.configurations.configuration import Configuration as AgentConfig
import cv2
Expand All @@ -14,6 +14,19 @@
from ROAR.perception_module.depth_to_pointcloud_detector import DepthToPointCloudDetector
import open3d as o3d
import math
from enum import Enum


class CS249AgentModes(Enum):
NORMAL = 1
OBSTACLE_AVOID = 2
OBSTACLE_RECOVER = 3
OBSTACLE_BYPASS = 4


class ObstacleEnum(Enum):
LEFT = 1
RIGHT = 2


class CS249Agent(Agent):
Expand All @@ -37,11 +50,11 @@ def __init__(self, vehicle: Vehicle, agent_settings: AgentConfig, **kwargs):
self.prev_steerings: deque = deque(maxlen=10)

# point cloud visualization
self.vis = o3d.visualization.Visualizer()
self.vis.create_window(width=500, height=500)
self.pcd = o3d.geometry.PointCloud()
self.coordinate_frame = o3d.geometry.TriangleMesh.create_coordinate_frame()
self.points_added = False
# self.vis = o3d.visualization.Visualizer()
# self.vis.create_window(width=500, height=500)
# self.pcd = o3d.geometry.PointCloud()
# self.coordinate_frame = o3d.geometry.TriangleMesh.create_coordinate_frame()
# self.points_added = False

# pointcloud and ground plane detection
self.depth2pointcloud = DepthToPointCloudDetector(agent=self)
Expand All @@ -59,37 +72,131 @@ def __init__(self, vehicle: Vehicle, agent_settings: AgentConfig, **kwargs):
self.cx = len(self.occu_map) // 2
self.cz = 0

# modes of the vehicle
self.mode = CS249AgentModes.NORMAL
self.obstacle_is_at = None

# obstacle avoidance hyperparameters
self.avoid_throttle = 0.2
self.avoid_left_steer = -1
self.avoid_right_steer = 0.3

self.obstacle_avoid_starting_coord: Optional[Location] = None
self.avoid_max_dist = 0.25
self.bypass_dist_dist = 0.8
self.recover_max_dist = 0.4

def run_step(self, sensors_data: SensorsData, vehicle: Vehicle) -> VehicleControl:
super().run_step(sensors_data=sensors_data, vehicle=vehicle)
if self.front_depth_camera.data is not None and self.front_rgb_camera.data is not None:
pcd: o3d.geometry.PointCloud = self.depth2pointcloud.run_in_series(self.front_depth_camera.data,
self.front_rgb_camera.data)
pcd = self.filter_ground(pcd=pcd,
max_dist=self.max_dist,
height_threshold=self.height_threshold,
ransac_dist_threshold=self.ransac_dist_threshold,
ransac_n=self.ransac_n,
ransac_itr=self.ransac_itr)

self.non_blocking_pcd_visualization(pcd=pcd,
axis_size=1,
should_show_axis=True)
occu_map = self.occu_map_from_pcd(pcd=pcd, scaling_factor=self.scaling_factor,
cx=self.cx, cz=self.cz)
left, center, right = self.find_obstacle_l_c_r(occu_map=occu_map, debug=True)

left, center, right = self.find_obstacles_via_depth_to_pcd()
if left[0] and center[0] and right[0]:
# self.logger.info("Can't find feasible path around obstacle")
return VehicleControl(brake=True)

if (left[0] or center[0] or right[0]) and \
(self.mode != CS249AgentModes.OBSTACLE_AVOID and
self.mode != CS249AgentModes.OBSTACLE_BYPASS and
self.mode != CS249AgentModes.OBSTACLE_RECOVER):
self.mode = CS249AgentModes.OBSTACLE_AVOID
self.obstacle_is_at = ObstacleEnum.LEFT if left[0] else ObstacleEnum.RIGHT
self.obstacle_avoid_starting_coord = self.vehicle.transform.location.copy()
elif self.mode == CS249AgentModes.OBSTACLE_AVOID or self.mode == CS249AgentModes.OBSTACLE_RECOVER \
or self.mode == CS249AgentModes.OBSTACLE_BYPASS:
if self.mode == CS249AgentModes.OBSTACLE_AVOID:
return self.avoid_obstacle(left, center, right)
elif self.mode == CS249AgentModes.OBSTACLE_BYPASS:
return self.bypass_obstacle()
elif self.mode == CS249AgentModes.OBSTACLE_RECOVER:
return self.recover_obstacle()
else:
return VehicleControl(brake=True)
return VehicleControl(brake=True)

# if self.is_lead_car:
# return self.lead_car_step()
# else:
# return self.follower_step()

def avoid_obstacle(self, left, center, right):
if self.obstacle_is_at is not None:
curr_coord = self.vehicle.transform.location.copy()
if curr_coord.distance(self.obstacle_avoid_starting_coord) < self.avoid_max_dist:
if self.obstacle_is_at == ObstacleEnum.LEFT:
self.logger.info("Avoiding obstacle --> TURING RIGHT")
return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_right_steer, brake=False)
else:
self.logger.info("Avoiding obstacle --> TURING LEFT")
return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_left_steer, brake=False)
else:
self.logger.info("Avoiding obstacle --> DONE! shifting to obstacle bypass")
self.mode = CS249AgentModes.OBSTACLE_BYPASS
self.obstacle_avoid_starting_coord = self.vehicle.transform.location.copy()
else:
self.logger.error("No obstacle to avoid")
return VehicleControl(brake=True)

def bypass_obstacle(self):
if self.obstacle_is_at is not None:
curr_coord = self.vehicle.transform.location.copy()
if curr_coord.distance(self.obstacle_avoid_starting_coord) < self.bypass_dist_dist:
if self.obstacle_is_at == ObstacleEnum.LEFT:
self.logger.info("BYPASSING --> TURING LEFT")
return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_left_steer, brake=False)
else:
self.logger.info("BYPASSING --> TURING RIGHT")
return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_right_steer, brake=False)
else:
self.logger.info("BYPASSING --> DONE! Shifting to recover mode")
self.mode = CS249AgentModes.OBSTACLE_RECOVER
self.obstacle_avoid_starting_coord = self.vehicle.transform.location.copy()
else:
self.logger.error("No obstacle to bypass")
return VehicleControl(brake=True)

def recover_obstacle(self):
if self.obstacle_is_at is not None:
curr_coord = self.vehicle.transform.location.copy()
if curr_coord.distance(self.obstacle_avoid_starting_coord) < self.recover_max_dist:
if self.obstacle_is_at == ObstacleEnum.LEFT:
self.logger.info("Recovering --> TURING RIGHT")
return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_right_steer, brake=False)
else:
self.logger.info("Recovering --> TURING LEFT")
return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_left_steer, brake=False)

else:
self.logger.info("Recovering --> DONE! Shifting to normal state")
# self.mode = CS249AgentModes.NORMAL
# self.obstacle_is_at = None
# self.obstacle_avoid_starting_coord = None
else:
self.logger.error("No obstacle to recover from")
return VehicleControl(brake=True)

def find_obstacles_via_depth_to_pcd(self):
pcd: o3d.geometry.PointCloud = self.depth2pointcloud.run_in_series(self.front_depth_camera.data,
self.front_rgb_camera.data)
pcd = self.filter_ground(pcd=pcd,
max_dist=self.max_dist,
height_threshold=self.height_threshold,
ransac_dist_threshold=self.ransac_dist_threshold,
ransac_n=self.ransac_n,
ransac_itr=self.ransac_itr)

# self.non_blocking_pcd_visualization(pcd=pcd,
# axis_size=1,
# should_show_axis=True)
occu_map = self.occu_map_from_pcd(pcd=pcd, scaling_factor=self.scaling_factor,
cx=self.cx, cz=self.cz)
left, center, right = self.find_obstacle_l_c_r(occu_map=occu_map, debug=True)
return left, center, right

def find_obstacle_l_c_r(self, occu_map,
left_occ_thresh=0.3,
center_occ_thresh=0.5,
right_occ_thresh=0.3,
debug=False) -> Tuple[Tuple[bool, float], Tuple[bool, float], Tuple[bool, float]]:
left_occ_thresh=0.5,
center_occ_thresh=0.5,
right_occ_thresh=0.5,
debug=False) -> Tuple[Tuple[bool, float], Tuple[bool, float], Tuple[bool, float]]:
"""
Given an occupancy map `occu_map`, find whether in `left`, `center`, `right` which is/are occupied and
also its probability for occupied.
Expand All @@ -106,14 +213,14 @@ def find_obstacle_l_c_r(self, occu_map,
backtorgb = cv2.cvtColor(occu_map, cv2.COLOR_GRAY2RGB)

height, width, channel = backtorgb.shape
left_rec_start = (25 * width // 100, 40 * height // 100)
left_rec_end = (80 * width // 100, 45 * height // 100)
left_rec_start = (40 * width // 100, 35 * height // 100)
left_rec_end = (80 * width // 100, 40 * height // 100)

mid_rec_start = (25 * width // 100, 47 * height // 100)
mid_rec_start = (40 * width // 100, 48 * height // 100)
mid_rec_end = (80 * width // 100, 52 * height // 100)

right_rec_start = (25 * width // 100, 55 * height // 100)
right_rec_end = (80 * width // 100, 60 * height // 100)
right_rec_start = (40 * width // 100, 60 * height // 100)
right_rec_end = (80 * width // 100, 65 * height // 100)
right = self.is_occupied(m=occu_map, start=right_rec_start, end=right_rec_end, threshold=left_occ_thresh)
center = self.is_occupied(m=occu_map, start=mid_rec_start, end=mid_rec_end, threshold=center_occ_thresh)
left = self.is_occupied(m=occu_map, start=left_rec_start, end=left_rec_end, threshold=right_occ_thresh)
Expand Down Expand Up @@ -159,8 +266,8 @@ def is_occupied(m, start, end, threshold) -> Tuple[bool, float]:
cropped = m[start[1]:end[1], start[0]:end[0]]
area = (end[1] - start[1]) * (end[0] - start[0])
spots_free = (cropped > 0.8).sum()
ratio = spots_free / area
return ratio < threshold, ratio # if spots_free/area < threshold, then this place is occupied
ratio = round(spots_free / area, 3)
return ratio < threshold, ratio # if spots_free/area < threshold, then this place is occupied

def occu_map_from_pcd(self, pcd: o3d.geometry.PointCloud, scaling_factor, cx, cz) -> np.ndarray:
"""
Expand Down
3 changes: 2 additions & 1 deletion ROAR_Unity/unity_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ def display_system_status(self, frame: np.ndarray):
color=(0, 255, 0), thickness=1, lineType=cv2.LINE_AA)

frame = cv2.putText(img=frame,
text=f"Auto = {self.is_auto} | {self.control_streamer.control_tx}",
text=f"Auto = {self.is_auto} | {self.control_streamer.control_tx} | "
f"Steering Offset: {round(self.ios_config.steering_offset, 3)}",
org=(20, frame.shape[0] - 20), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.6,
color=(0, 255, 0), # BGR
thickness=1, lineType=cv2.LINE_AA)
Expand Down

0 comments on commit 1529975

Please sign in to comment.