diff --git a/ROAR/agent_module/aruco_following_agent.py b/ROAR/agent_module/aruco_following_agent.py index c7d0e43c..36aba728 100644 --- a/ROAR/agent_module/aruco_following_agent.py +++ b/ROAR/agent_module/aruco_following_agent.py @@ -2,66 +2,31 @@ from ROAR.utilities_module.data_structures_models import SensorsData from ROAR.utilities_module.vehicle_models import Vehicle, VehicleControl from ROAR.configurations.configuration import Configuration as AgentConfig +from ROAR.perception_module.aruco_detector import ArucoDetector +from ROAR.utilities_module.data_structures_models import Rotation, Location, Transform +from typing import Optional import cv2 import cv2.aruco as aruco - +import numpy as np +from collections import deque +from ROAR.control_module.aruco_pid_controller import SimplePIDController class ArucoFollowingAgent(Agent): def __init__(self, vehicle: Vehicle, agent_settings: AgentConfig, **kwargs): super().__init__(vehicle, agent_settings, **kwargs) - key = getattr(aruco, f'DICT_{5}X{5}_{250}') - self.arucoDict = aruco.Dictionary_get(key) - self.arucoParam = aruco.DetectorParameters_create() - self.tracking_id = 0 - self.marker_length = 0.1 # in meters + self.aruco_detector = ArucoDetector(aruco_id=2, agent=self) + self.controller = SimplePIDController(agent=self, distance_to_keep=0.5, center_x=-0.3) def run_step(self, sensors_data: SensorsData, vehicle: Vehicle) -> VehicleControl: super().run_step(sensors_data=sensors_data, vehicle=vehicle) - result: dict = self.findArucoMarkers() # {marker id -> bbox} - if result: - img = self.front_rgb_camera.data.copy() - if self.tracking_id in result: - bbox = result[self.tracking_id] - rvec, tvec, markerPoints = cv2.aruco.estimatePoseSingleMarkers(bbox, self.marker_length, - self.front_rgb_camera.intrinsics_matrix, - self.front_rgb_camera.distortion_coefficient) - x, y, z = tvec[0][0] - print(x,y,z) - # cv2.aruco.drawAxis(img, - # self.front_rgb_camera.intrinsics_matrix, - # self.front_rgb_camera.distortion_coefficient, - # rvec, - # tvec, - # self.marker_length) - - cv2.putText(img, f"{tvec}", - (10, 50), - cv2.FONT_HERSHEY_SIMPLEX, - 1, - (0, 0, 255), 2) - # cv2.putText(img, f"{rvec}", - # (10, 100), - # cv2.FONT_HERSHEY_SIMPLEX, - # 1, - # (0, 0, 255), 2) - - cv2.imshow("img", img) - cv2.waitKey(1) + # None if nothing is detected, else, transformation matrix P + result: Optional[np.ndarray] = self.aruco_detector.run_in_series() + if result is not None: + r: Rotation = self.aruco_detector.rotationMatrixToEulerAngles(result[0:3, 0:3]) + t: Location = Location(x=result[0][3], y=result[1][3], z=result[2][3]) + p: Transform = Transform(location=t, rotation=r) + control = self.controller.run_in_series(next_waypoint=p) + print(control) + return control return self.vehicle.control - - def findArucoMarkers(self): - if self.front_rgb_camera.data is not None: - img = self.front_rgb_camera.data.copy() - gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - bboxs, ids, rejected = aruco.detectMarkers(gray, self.arucoDict, parameters=self.arucoParam, - cameraMatrix=self.front_rgb_camera.intrinsics_matrix, - distCoeff=self.front_rgb_camera.distortion_coefficient) - log = dict() - if ids is None: - return log - else: - for i in ids: - log[i[0]] = bboxs - return log - return dict() diff --git a/ROAR/agent_module/forward_only_agent.py b/ROAR/agent_module/forward_only_agent.py index 4fbed620..c9e03bfd 100644 --- a/ROAR/agent_module/forward_only_agent.py +++ b/ROAR/agent_module/forward_only_agent.py @@ -10,10 +10,18 @@ class ForwardOnlyAgent(Agent): def __init__(self, vehicle: Vehicle, agent_settings: AgentConfig, **kwargs): super().__init__(vehicle, agent_settings, **kwargs) self.log = deque(maxlen=100) + self.should_brake = False def run_step(self, sensors_data: SensorsData, vehicle: Vehicle) -> VehicleControl: super().run_step(sensors_data=sensors_data, vehicle=vehicle) if self.front_depth_camera.data is not None: cv2.imshow("depth", self.front_depth_camera.data) cv2.waitKey(1) - return VehicleControl(throttle=0.4, steering=0) + + if self.should_brake: + return VehicleControl(throttle=-0.1, steering=0) + else: + if abs(self.vehicle.get_speed(self.vehicle)) > 8: + self.should_brake = True + return VehicleControl(throttle=-0.1, steering=0) + return VehicleControl(throttle=0.4, steering=0) diff --git a/ROAR/agent_module/special_agents/recording_agent.py b/ROAR/agent_module/special_agents/recording_agent.py index f170ffcd..3389ca37 100644 --- a/ROAR/agent_module/special_agents/recording_agent.py +++ b/ROAR/agent_module/special_agents/recording_agent.py @@ -58,11 +58,12 @@ def __init__(self, target_speed=20, **kwargs): def run_step(self, sensors_data: SensorsData, vehicle: Vehicle) -> VehicleControl: super(RecordingAgent, self).run_step(sensors_data=sensors_data, vehicle=vehicle) - self.transform_history.append(self.vehicle.transform) + # self.transform_history.append(self.vehicle.transform) - control = self.local_planner.run_in_series() + # control = self.local_planner.run_in_series() # if self.kwargs.get(self.option, None) is not None: # points = self.kwargs[self.option] # self.occupancy_map.update_async(points) # self.occupancy_map.visualize(transform=self.vehicle.transform, view_size=(200, 200)) - return control + # return VehicleControl(throttle=0.2, steering=0) + diff --git a/ROAR/control_module/aruco_pid_controller.py b/ROAR/control_module/aruco_pid_controller.py new file mode 100644 index 00000000..f91d09e5 --- /dev/null +++ b/ROAR/control_module/aruco_pid_controller.py @@ -0,0 +1,67 @@ +from ROAR.control_module.controller import Controller +from ROAR.utilities_module.data_structures_models import Transform +from ROAR.utilities_module.vehicle_models import Vehicle +from ROAR.utilities_module.vehicle_models import VehicleControl +from collections import deque +import numpy as np + + +class SimplePIDController(Controller): + def __init__(self, agent, distance_to_keep=0.5, center_x=-0.2, **kwargs): + super().__init__(agent, **kwargs) + + self.yaw_error_buffer = deque(maxlen=20) + + self.lat_error_queue = deque(maxlen=20) # this is how much error you want to accumulate + self.long_error_queue = deque(maxlen=20) # this is how much error you want to accumulate + self.center_x = center_x + self.lat_kp = 1 # this is how much you want to steer + self.lat_kd = 0 # this is how much you want to resist change + self.lat_ki = 0.005 # this is the correction on past error + self.x_error_weight = 1 + self.yaw_error_weight = 0.9 + + self.distance_to_keep = distance_to_keep + self.max_throttle = 0.2 + self.lon_kp = 0.16 # this is how much you want to steer + self.lon_kd = 0.1 # this is how much you want to resist change + self.lon_ki = 0.005 # this is the correction on past error + + def run_in_series(self, next_waypoint=None, **kwargs) -> VehicleControl: + control = VehicleControl() + self.lateral_pid_control(next_waypoint=next_waypoint, control=control) + self.long_pid_control(next_waypoint=next_waypoint, control=control) + return control + + def lateral_pid_control(self, next_waypoint: Transform, control: VehicleControl): + x_error = (next_waypoint.location.x - self.center_x) / next_waypoint.location.z + yaw_error = np.deg2rad(next_waypoint.rotation.yaw - 0) + self.yaw_error_buffer.append(yaw_error) + error = x_error * self.x_error_weight + np.average(self.yaw_error_buffer) * self.yaw_error_weight + print(round(x_error, 3), round(yaw_error, 3), round(error, 3)) + + error_dt = 0 if len(self.lat_error_queue) == 0 else error - self.lat_error_queue[-1] + self.lat_error_queue.append(error) + error_it = sum(self.lat_error_queue) + + e_p = self.lat_kp * error + e_d = self.lat_kd * error_dt + e_i = self.lat_ki * error_it + lat_control = np.clip((e_p + e_d + e_i), -1, 1) + control.steering = lat_control + + def long_pid_control(self, next_waypoint: Transform, control: VehicleControl): + dist_to_car = next_waypoint.location.z + if dist_to_car < self.distance_to_keep: + control.brake = True + control.throttle = 0 + else: + error = dist_to_car - self.distance_to_keep + error_dt = 0 if len(self.long_error_queue) == 0 else error - self.long_error_queue[-1] + self.long_error_queue.append(error) + error_it = sum(self.long_error_queue) + e_p = self.lon_kp * error + e_d = self.lon_kd * error_dt + e_i = self.lon_ki * error_it + long_control = np.clip(e_p + e_d + e_i, -1, self.max_throttle) + control.throttle = long_control diff --git a/ROAR/perception_module/aruco_detector.py b/ROAR/perception_module/aruco_detector.py new file mode 100644 index 00000000..efffb0a0 --- /dev/null +++ b/ROAR/perception_module/aruco_detector.py @@ -0,0 +1,100 @@ +from ROAR.perception_module.detector import Detector +from ROAR.agent_module.agent import Agent +import cv2 +import cv2.aruco as aruco +import numpy as np +import math +from ROAR.utilities_module.data_structures_models import Rotation + + +class ArucoDetector(Detector): + def __init__(self, aruco_id: int, agent: Agent, aruco_dict_key='DICT_5X5_250', marker_length=0.1, **kwargs): + super().__init__(agent, **kwargs) + self.aruco_id: int = aruco_id + self.aruco_dict = aruco.Dictionary_get(getattr(aruco, aruco_dict_key)) + self.aruco_param = aruco.DetectorParameters_create() + self.marker_length = marker_length # in meters + + def run_in_series(self, **kwargs): + if self.agent.front_rgb_camera.data is not None: + try: + img = self.agent.front_rgb_camera.data.copy() + result: dict = self.findArucoMarkers(img) # {marker id -> bbox} + if result and self.aruco_id in result: + bbox = result[self.aruco_id] + rvec, tvec, markerPoints = cv2.aruco.estimatePoseSingleMarkers(bbox, self.marker_length, + self.agent.front_rgb_camera.intrinsics_matrix, + self.agent.front_rgb_camera.distortion_coefficient) + cv2.aruco.drawAxis(img, + self.agent.front_rgb_camera.intrinsics_matrix, + self.agent.front_rgb_camera.distortion_coefficient, + rvec, + tvec, + self.marker_length) + R = np.array(cv2.Rodrigues(rvec)[0]) + T = tvec + P = self.constructTransformation(R, T) + return P + except Exception as e: + return None + return None + + def findArucoMarkers(self, img): + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + bboxs, ids, rejected = aruco.detectMarkers(gray, self.aruco_dict, parameters=self.aruco_param, + cameraMatrix=self.agent.front_rgb_camera.intrinsics_matrix, + distCoeff=self.agent.front_rgb_camera.distortion_coefficient) + log = dict() + if ids is None: + return log + else: + for i in ids: + log[i[0]] = bboxs + return log + + def save(self, **kwargs): + pass + + # Checks if a matrix is a valid rotation matrix. + def isRotationMatrix(self, R): + Rt = np.transpose(R) + shouldBeIdentity = np.dot(Rt, R) + I = np.identity(3, dtype=R.dtype) + n = np.linalg.norm(I - shouldBeIdentity) + return n < 1e-6 + + # Calculates rotation matrix to euler angles + # The result is the same as MATLAB except the order + # of the euler angles ( x and z are swapped ). + def rotationMatrixToEulerAngles(self, R) -> Rotation: + assert (self.isRotationMatrix(R)) + + sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0]) + + singular = sy < 1e-6 + + if not singular: + x = math.atan2(R[2, 1], R[2, 2]) + y = math.atan2(-R[2, 0], sy) + z = math.atan2(R[1, 0], R[0, 0]) + else: + x = math.atan2(-R[1, 2], R[1, 1]) + y = math.atan2(-R[2, 0], sy) + z = 0 + + euler_angles_deg = np.rad2deg(np.array([x, y, z])) + + rotation = Rotation( + roll=euler_angles_deg[2], + pitch=euler_angles_deg[0], + yaw=euler_angles_deg[1], + ) + return rotation + + @staticmethod + def constructTransformation(R, T): + transformation_matrix = np.zeros([4, 4]) + transformation_matrix[0:3, 0:3] = R + transformation_matrix[0:3, 3] = T + transformation_matrix[3, 3] = 1 + return transformation_matrix diff --git a/misc/aruco_detection.py b/misc/aruco_detection.py index 801a8f00..944ebc25 100644 --- a/misc/aruco_detection.py +++ b/misc/aruco_detection.py @@ -3,6 +3,7 @@ import numpy as np import os from pathlib import Path +import math def findArucoMarkers(img, arucoDict, arucoParam, intrinsics, distortion): @@ -13,13 +14,14 @@ def findArucoMarkers(img, arucoDict, arucoParam, intrinsics, distortion): return bboxs, ids, rejected -def main(markerSize, totalMarkers, intrinsics: np.ndarray, distortion:np.ndarray, should_draw_axis=False): +def detectAruco(markerSize, totalMarkers, intrinsics: np.ndarray, distortion: np.ndarray, should_draw_axis=False): key = getattr(aruco, f'DICT_{markerSize}X{markerSize}_{totalMarkers}') arucoDict = aruco.Dictionary_get(key) arucoParam = aruco.DetectorParameters_create() cap = cv2.VideoCapture(0) while True: success, img = cap.read() + # img = cv2.flip(img, 1) if success: corners, ids, rejected = findArucoMarkers(img, arucoDict, arucoParam, intrinsics, distortion) if should_draw_axis and len(corners) > 0: @@ -34,7 +36,14 @@ def main(markerSize, totalMarkers, intrinsics: np.ndarray, distortion:np.ndarray # Draw Axis cv2.aruco.drawAxis(img, intrinsics, distortion, rvec, tvec, 0.01) - print(f"id = {ids[i]} --> tvec = {tvec}") + # print(f"id = {ids[i]} --> tvec = {tvec}, rvec = {rvec}") + R = np.array(cv2.Rodrigues(rvec)[0]) + P = constructTransformation(R, tvec) + euler_angles_rad = rotationMatrixToEulerAngles(R) + euler_angles_deg = np.rad2deg(euler_angles_rad) + print(f"roll: {euler_angles_deg[2]}, pitch: {euler_angles_deg[0]}, yaw: {euler_angles_deg[1]}") + + # print(matrix) cv2.imshow('img', img) k = cv2.waitKey(30) & 0xff if k == 27: @@ -43,14 +52,96 @@ def main(markerSize, totalMarkers, intrinsics: np.ndarray, distortion:np.ndarray cv2.destroyAllWindows() +# Checks if a matrix is a valid rotation matrix. +def isRotationMatrix(R): + Rt = np.transpose(R) + shouldBeIdentity = np.dot(Rt, R) + I = np.identity(3, dtype=R.dtype) + n = np.linalg.norm(I - shouldBeIdentity) + return n < 1e-6 + + +# Calculates rotation matrix to euler angles +# The result is the same as MATLAB except the order +# of the euler angles ( x and z are swapped ). +def rotationMatrixToEulerAngles(R): + assert (isRotationMatrix(R)) + + sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0]) + + singular = sy < 1e-6 + + if not singular: + x = math.atan2(R[2, 1], R[2, 2]) + y = math.atan2(-R[2, 0], sy) + z = math.atan2(R[1, 0], R[0, 0]) + else: + x = math.atan2(-R[1, 2], R[1, 1]) + y = math.atan2(-R[2, 0], sy) + z = 0 + + return np.array([x, y, z]) + + +def constructTransformation(R, T): + transformation_matrix = np.zeros([4, 4]) + transformation_matrix[0:3, 0:3] = R + transformation_matrix[0:3, 3] = T + transformation_matrix[3, 3] = 1 + return transformation_matrix + + def loadCalib(caliberation_file: Path): npzfile = np.load(caliberation_file.as_posix()) return npzfile['intrinsics'], npzfile['distortion'], \ npzfile['new_intrinsics'], npzfile['roi'] +def warpImageOnAruco(markerSize, totalMarkers, intrinsics, distortion, should_draw_axis): + key = getattr(aruco, f'DICT_{markerSize}X{markerSize}_{totalMarkers}') + arucoDict = aruco.Dictionary_get(key) + arucoParam = aruco.DetectorParameters_create() + cap = cv2.VideoCapture(0) + pts_dst = np.array([[921, 731], [113, 732], [1127, 909], [927, 905]]) # caliberate this on first scene + + while True: + success, img = cap.read() + # img = cv2.flip(img, 1) + if success: + corners, ids, rejected = findArucoMarkers(img, arucoDict, arucoParam, intrinsics, distortion) + if should_draw_axis and len(corners) > 0: + for i in range(0, len(ids)): + # Estimate pose of each marker and return the values rvec and tvec--- + # (different from those of camera coefficients) + rvec, tvec, markerPoints = cv2.aruco.estimatePoseSingleMarkers(corners[i], 0.02, + intrinsics, + distortion) + # Draw a square around the markers + cv2.aruco.drawDetectedMarkers(img, corners) + + # Draw Axis + cv2.aruco.drawAxis(img, intrinsics, distortion, rvec, tvec, 0.01) + + # print(corners[0][0]) + h, status = cv2.findHomography(np.array(corners[0][0]), pts_dst) + im_out = cv2.warpPerspective(img, h, (img.shape[0], img.shape[1])) + cv2.imshow("imout", im_out) + # print(matrix) + cv2.imshow('img', img) + k = cv2.waitKey(30) & 0xff + if k == 27: + break + cap.release() + cv2.destroyAllWindows() + + +def main(markerSize, totalMarkers, intrinsics: np.ndarray, distortion: np.ndarray, should_draw_axis=False): + detectAruco(markerSize, totalMarkers, intrinsics, distortion, should_draw_axis) + # warpImageOnAruco(markerSize, totalMarkers, intrinsics, distortion, should_draw_axis) + + if __name__ == '__main__': intrinsics, distortion, new_intrinsics, roi = loadCalib(Path("calib.npz")) - print(type(intrinsics), type(distortion)) - # main(markerSize=5, totalMarkers=250, should_draw_axis=True, - # intrinsics=intrinsics, distortion=distortion) + # print(type(intrinsics), type(distortion)) + main(markerSize=5, totalMarkers=250, should_draw_axis=True, + intrinsics=intrinsics, distortion=distortion) diff --git a/runner_ios.py b/runner_ios.py index f3836258..d88f76be 100644 --- a/runner_ios.py +++ b/runner_ios.py @@ -114,7 +114,7 @@ def is_glove_online(host, port): json.dump(ios_config.dict(), ios_config_file_path.open('w'), indent=4) if success or args.reconnect is False: - agent = ForwardOnlyAgent(vehicle=Vehicle(), agent_settings=agent_config, should_init_default_cam=True) + agent = ArucoFollowingAgent(vehicle=Vehicle(), agent_settings=agent_config, should_init_default_cam=True) if args.use_unity: runner = iOSUnityRunner(agent=agent, ios_config=ios_config) else: