diff --git a/ROAR/agent_module/cs249_agent.py b/ROAR/agent_module/cs249_agent.py index eae0a2f9..7e36f21e 100644 --- a/ROAR/agent_module/cs249_agent.py +++ b/ROAR/agent_module/cs249_agent.py @@ -1,5 +1,5 @@ from ROAR.agent_module.agent import Agent -from ROAR.utilities_module.data_structures_models import SensorsData, Transform +from ROAR.utilities_module.data_structures_models import SensorsData, Transform, Location from ROAR.utilities_module.vehicle_models import Vehicle, VehicleControl from ROAR.configurations.configuration import Configuration as AgentConfig import cv2 @@ -14,6 +14,19 @@ from ROAR.perception_module.depth_to_pointcloud_detector import DepthToPointCloudDetector import open3d as o3d import math +from enum import Enum + + +class CS249AgentModes(Enum): + NORMAL = 1 + OBSTACLE_AVOID = 2 + OBSTACLE_RECOVER = 3 + OBSTACLE_BYPASS = 4 + + +class ObstacleEnum(Enum): + LEFT = 1 + RIGHT = 2 class CS249Agent(Agent): @@ -37,11 +50,11 @@ def __init__(self, vehicle: Vehicle, agent_settings: AgentConfig, **kwargs): self.prev_steerings: deque = deque(maxlen=10) # point cloud visualization - self.vis = o3d.visualization.Visualizer() - self.vis.create_window(width=500, height=500) - self.pcd = o3d.geometry.PointCloud() - self.coordinate_frame = o3d.geometry.TriangleMesh.create_coordinate_frame() - self.points_added = False + # self.vis = o3d.visualization.Visualizer() + # self.vis.create_window(width=500, height=500) + # self.pcd = o3d.geometry.PointCloud() + # self.coordinate_frame = o3d.geometry.TriangleMesh.create_coordinate_frame() + # self.points_added = False # pointcloud and ground plane detection self.depth2pointcloud = DepthToPointCloudDetector(agent=self) @@ -59,25 +72,45 @@ def __init__(self, vehicle: Vehicle, agent_settings: AgentConfig, **kwargs): self.cx = len(self.occu_map) // 2 self.cz = 0 + # modes of the vehicle + self.mode = CS249AgentModes.NORMAL + self.obstacle_is_at = None + + # obstacle avoidance hyperparameters + self.avoid_throttle = 0.2 + self.avoid_left_steer = -1 + self.avoid_right_steer = 0.3 + + self.obstacle_avoid_starting_coord: Optional[Location] = None + self.avoid_max_dist = 0.25 + self.bypass_dist_dist = 0.8 + self.recover_max_dist = 0.4 + def run_step(self, sensors_data: SensorsData, vehicle: Vehicle) -> VehicleControl: super().run_step(sensors_data=sensors_data, vehicle=vehicle) if self.front_depth_camera.data is not None and self.front_rgb_camera.data is not None: - pcd: o3d.geometry.PointCloud = self.depth2pointcloud.run_in_series(self.front_depth_camera.data, - self.front_rgb_camera.data) - pcd = self.filter_ground(pcd=pcd, - max_dist=self.max_dist, - height_threshold=self.height_threshold, - ransac_dist_threshold=self.ransac_dist_threshold, - ransac_n=self.ransac_n, - ransac_itr=self.ransac_itr) - - self.non_blocking_pcd_visualization(pcd=pcd, - axis_size=1, - should_show_axis=True) - occu_map = self.occu_map_from_pcd(pcd=pcd, scaling_factor=self.scaling_factor, - cx=self.cx, cz=self.cz) - left, center, right = self.find_obstacle_l_c_r(occu_map=occu_map, debug=True) - + left, center, right = self.find_obstacles_via_depth_to_pcd() + if left[0] and center[0] and right[0]: + # self.logger.info("Can't find feasible path around obstacle") + return VehicleControl(brake=True) + + if (left[0] or center[0] or right[0]) and \ + (self.mode != CS249AgentModes.OBSTACLE_AVOID and + self.mode != CS249AgentModes.OBSTACLE_BYPASS and + self.mode != CS249AgentModes.OBSTACLE_RECOVER): + self.mode = CS249AgentModes.OBSTACLE_AVOID + self.obstacle_is_at = ObstacleEnum.LEFT if left[0] else ObstacleEnum.RIGHT + self.obstacle_avoid_starting_coord = self.vehicle.transform.location.copy() + elif self.mode == CS249AgentModes.OBSTACLE_AVOID or self.mode == CS249AgentModes.OBSTACLE_RECOVER \ + or self.mode == CS249AgentModes.OBSTACLE_BYPASS: + if self.mode == CS249AgentModes.OBSTACLE_AVOID: + return self.avoid_obstacle(left, center, right) + elif self.mode == CS249AgentModes.OBSTACLE_BYPASS: + return self.bypass_obstacle() + elif self.mode == CS249AgentModes.OBSTACLE_RECOVER: + return self.recover_obstacle() + else: + return VehicleControl(brake=True) return VehicleControl(brake=True) # if self.is_lead_car: @@ -85,11 +118,85 @@ def run_step(self, sensors_data: SensorsData, vehicle: Vehicle) -> VehicleContro # else: # return self.follower_step() + def avoid_obstacle(self, left, center, right): + if self.obstacle_is_at is not None: + curr_coord = self.vehicle.transform.location.copy() + if curr_coord.distance(self.obstacle_avoid_starting_coord) < self.avoid_max_dist: + if self.obstacle_is_at == ObstacleEnum.LEFT: + self.logger.info("Avoiding obstacle --> TURING RIGHT") + return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_right_steer, brake=False) + else: + self.logger.info("Avoiding obstacle --> TURING LEFT") + return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_left_steer, brake=False) + else: + self.logger.info("Avoiding obstacle --> DONE! shifting to obstacle bypass") + self.mode = CS249AgentModes.OBSTACLE_BYPASS + self.obstacle_avoid_starting_coord = self.vehicle.transform.location.copy() + else: + self.logger.error("No obstacle to avoid") + return VehicleControl(brake=True) + + def bypass_obstacle(self): + if self.obstacle_is_at is not None: + curr_coord = self.vehicle.transform.location.copy() + if curr_coord.distance(self.obstacle_avoid_starting_coord) < self.bypass_dist_dist: + if self.obstacle_is_at == ObstacleEnum.LEFT: + self.logger.info("BYPASSING --> TURING LEFT") + return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_left_steer, brake=False) + else: + self.logger.info("BYPASSING --> TURING RIGHT") + return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_right_steer, brake=False) + else: + self.logger.info("BYPASSING --> DONE! Shifting to recover mode") + self.mode = CS249AgentModes.OBSTACLE_RECOVER + self.obstacle_avoid_starting_coord = self.vehicle.transform.location.copy() + else: + self.logger.error("No obstacle to bypass") + return VehicleControl(brake=True) + + def recover_obstacle(self): + if self.obstacle_is_at is not None: + curr_coord = self.vehicle.transform.location.copy() + if curr_coord.distance(self.obstacle_avoid_starting_coord) < self.recover_max_dist: + if self.obstacle_is_at == ObstacleEnum.LEFT: + self.logger.info("Recovering --> TURING RIGHT") + return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_right_steer, brake=False) + else: + self.logger.info("Recovering --> TURING LEFT") + return VehicleControl(throttle=self.avoid_throttle, steering=self.avoid_left_steer, brake=False) + + else: + self.logger.info("Recovering --> DONE! Shifting to normal state") + # self.mode = CS249AgentModes.NORMAL + # self.obstacle_is_at = None + # self.obstacle_avoid_starting_coord = None + else: + self.logger.error("No obstacle to recover from") + return VehicleControl(brake=True) + + def find_obstacles_via_depth_to_pcd(self): + pcd: o3d.geometry.PointCloud = self.depth2pointcloud.run_in_series(self.front_depth_camera.data, + self.front_rgb_camera.data) + pcd = self.filter_ground(pcd=pcd, + max_dist=self.max_dist, + height_threshold=self.height_threshold, + ransac_dist_threshold=self.ransac_dist_threshold, + ransac_n=self.ransac_n, + ransac_itr=self.ransac_itr) + + # self.non_blocking_pcd_visualization(pcd=pcd, + # axis_size=1, + # should_show_axis=True) + occu_map = self.occu_map_from_pcd(pcd=pcd, scaling_factor=self.scaling_factor, + cx=self.cx, cz=self.cz) + left, center, right = self.find_obstacle_l_c_r(occu_map=occu_map, debug=True) + return left, center, right + def find_obstacle_l_c_r(self, occu_map, - left_occ_thresh=0.3, - center_occ_thresh=0.5, - right_occ_thresh=0.3, - debug=False) -> Tuple[Tuple[bool, float], Tuple[bool, float], Tuple[bool, float]]: + left_occ_thresh=0.5, + center_occ_thresh=0.5, + right_occ_thresh=0.5, + debug=False) -> Tuple[Tuple[bool, float], Tuple[bool, float], Tuple[bool, float]]: """ Given an occupancy map `occu_map`, find whether in `left`, `center`, `right` which is/are occupied and also its probability for occupied. @@ -106,14 +213,14 @@ def find_obstacle_l_c_r(self, occu_map, backtorgb = cv2.cvtColor(occu_map, cv2.COLOR_GRAY2RGB) height, width, channel = backtorgb.shape - left_rec_start = (25 * width // 100, 40 * height // 100) - left_rec_end = (80 * width // 100, 45 * height // 100) + left_rec_start = (40 * width // 100, 35 * height // 100) + left_rec_end = (80 * width // 100, 40 * height // 100) - mid_rec_start = (25 * width // 100, 47 * height // 100) + mid_rec_start = (40 * width // 100, 48 * height // 100) mid_rec_end = (80 * width // 100, 52 * height // 100) - right_rec_start = (25 * width // 100, 55 * height // 100) - right_rec_end = (80 * width // 100, 60 * height // 100) + right_rec_start = (40 * width // 100, 60 * height // 100) + right_rec_end = (80 * width // 100, 65 * height // 100) right = self.is_occupied(m=occu_map, start=right_rec_start, end=right_rec_end, threshold=left_occ_thresh) center = self.is_occupied(m=occu_map, start=mid_rec_start, end=mid_rec_end, threshold=center_occ_thresh) left = self.is_occupied(m=occu_map, start=left_rec_start, end=left_rec_end, threshold=right_occ_thresh) @@ -159,8 +266,8 @@ def is_occupied(m, start, end, threshold) -> Tuple[bool, float]: cropped = m[start[1]:end[1], start[0]:end[0]] area = (end[1] - start[1]) * (end[0] - start[0]) spots_free = (cropped > 0.8).sum() - ratio = spots_free / area - return ratio < threshold, ratio # if spots_free/area < threshold, then this place is occupied + ratio = round(spots_free / area, 3) + return ratio < threshold, ratio # if spots_free/area < threshold, then this place is occupied def occu_map_from_pcd(self, pcd: o3d.geometry.PointCloud, scaling_factor, cx, cz) -> np.ndarray: """ diff --git a/ROAR_Unity/unity_runner.py b/ROAR_Unity/unity_runner.py index 6180f553..c4479bd1 100644 --- a/ROAR_Unity/unity_runner.py +++ b/ROAR_Unity/unity_runner.py @@ -370,7 +370,8 @@ def display_system_status(self, frame: np.ndarray): color=(0, 255, 0), thickness=1, lineType=cv2.LINE_AA) frame = cv2.putText(img=frame, - text=f"Auto = {self.is_auto} | {self.control_streamer.control_tx}", + text=f"Auto = {self.is_auto} | {self.control_streamer.control_tx} | " + f"Steering Offset: {round(self.ios_config.steering_offset, 3)}", org=(20, frame.shape[0] - 20), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.6, color=(0, 255, 0), # BGR thickness=1, lineType=cv2.LINE_AA)