diff --git a/NaviGator/hardware_drivers/navigator_light_kill_board/CMakeLists.txt b/NaviGator/hardware_drivers/navigator_light_kill_board/CMakeLists.txt new file mode 100644 index 000000000..5e958fa9c --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_light_kill_board/CMakeLists.txt @@ -0,0 +1,15 @@ +cmake_minimum_required(VERSION 3.0.2) +project(navigator_light_kill_board) + +find_package(catkin REQUIRED COMPONENTS + rospy +) + +catkin_python_setup() + +catkin_package( +) +if(CATKIN_ENABLE_TESTING) + find_package(rostest REQUIRED) + add_rostest(test/simulated.test) +endif() diff --git a/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/__init__.py b/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/__init__.py new file mode 100644 index 000000000..02c5d16ea --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/__init__.py @@ -0,0 +1,8 @@ +""" +The :mod:`navigator_ball_launcher` module is used to control the ball launcher +on NaviGator. The module implements the electrical protocol to communicate with +a board that controls the flywheel and servo to drop the balls. +""" + +from .driver import NaviGatorLightKillDevice +from .packets import KillReceivePacket, KillSetPacket, KillStatus, SetMovementModePacket diff --git a/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/driver.py b/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/driver.py new file mode 100644 index 000000000..f1dfcf2ba --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/driver.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from typing import Union + +from electrical_protocol import AckPacket, NackPacket, ROSSerialDevice +from ros_alarms import AlarmListener + +from .packets import KillReceivePacket, KillSetPacket, SetMovementModePacket + + +class NaviGatorLightKillDevice( + ROSSerialDevice[ + Union[KillSetPacket, SetMovementModePacket], + Union[KillReceivePacket, AckPacket, NackPacket], + ], +): + + def __init__(self, port: str): + super().__init__(port, 115200) + self._hw_kill_listener = AlarmListener("hw-kill", self._hw_kill_alarm_cb) + self._kill_listener = AlarmListener("kill", self.kill_alarm_cb) + + def _hw_kill_listener(self, alarm): + pass + + def on_packet_received(self, packet): + pass diff --git a/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/packets.py b/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/packets.py new file mode 100644 index 000000000..c7868cc25 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_light_kill_board/navigator_light_kill_board/packets.py @@ -0,0 +1,32 @@ +from dataclasses import dataclass +from enum import Enum + +from electrical_protocol import Packet + + +class KillStatus(Enum): + MOBO_REQUESTED = 0 + RF_KILL = 1 + EMERGENCY_STOP = 2 + + +@dataclass +class KillSetPacket(Packet, class_id=0x10, subclass_id=0x00, payload_format=" + + navigator_light_kill_board + 1.0.0 + The navigator_light_kill_board package + Cameron Brown + MIT + catkin + rospy + rospy + + diff --git a/NaviGator/hardware_drivers/navigator_light_kill_board/setup.py b/NaviGator/hardware_drivers/navigator_light_kill_board/setup.py new file mode 100644 index 000000000..d558ff487 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_light_kill_board/setup.py @@ -0,0 +1,11 @@ +# ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD + +from catkin_pkg.python_setup import generate_distutils_setup +from setuptools import setup + +# Fetch values from package.xml +setup_args = generate_distutils_setup( + packages=["navigator_light_kill_board"], +) + +setup(**setup_args) diff --git a/NaviGator/hardware_drivers/navigator_light_kill_board/test/simulated.test b/NaviGator/hardware_drivers/navigator_light_kill_board/test/simulated.test new file mode 100644 index 000000000..3b8255b44 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_light_kill_board/test/simulated.test @@ -0,0 +1,5 @@ + + + + + diff --git a/NaviGator/hardware_drivers/navigator_light_kill_board/test/test_simulated_shooter.py b/NaviGator/hardware_drivers/navigator_light_kill_board/test/test_simulated_shooter.py new file mode 100755 index 000000000..a2396fd63 --- /dev/null +++ b/NaviGator/hardware_drivers/navigator_light_kill_board/test/test_simulated_shooter.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + +import os +import pty +import unittest + +import rospy +import rostest +from electrical_protocol import AckPacket +from navigator_ball_launcher.driver import BallLauncherDevice +from navigator_ball_launcher.packets import ReleaseBallPacket, SetSpinPacket +from navigator_msgs.srv import ( + BallLauncherDrops, + BallLauncherDropsRequest, +) +from std_srvs.srv import Empty, EmptyRequest, SetBool, SetBoolRequest + + +class SimulatedBasicTest(unittest.TestCase): + def __init__(self, *args): + super().__init__(*args) + + @classmethod + def setUpClass(cls): + cls.master, cls.slave = pty.openpty() + serial_name = os.ttyname(cls.slave) + cls.device = BallLauncherDevice(serial_name) + cls.drop_ball_proxy = rospy.ServiceProxy( + "/test_simulated_shooter/drop_ball", + Empty, + ) + cls.spin_service_proxy = rospy.ServiceProxy( + "/test_simulated_shooter/spin", + SetBool, + ) + cls.drops_service_proxy = rospy.ServiceProxy( + "/test_simulated_shooter/number_of_drops", + BallLauncherDrops, + ) + cls.drop_ball_proxy.wait_for_service() + cls.spin_service_proxy.wait_for_service() + cls.drops_service_proxy.wait_for_service() + + def test_drop(self): + before_drops_resp = self.drops_service_proxy(BallLauncherDropsRequest()) + self.drop_ball_proxy(EmptyRequest()) + # Ensure that we hear the packet + ReleaseBallPacket.from_bytes(os.read(self.master, 100)) + # and then send back the simulated ACK + import time + + print(time.time(), "sending") + os.write(self.master, bytes(AckPacket())) + print(time.time(), "sent!") + after_drops_resp = self.drops_service_proxy(BallLauncherDropsRequest()) + self.assertEqual(before_drops_resp.count + 1, after_drops_resp.count) + + def test_spin(self): + before_drops_resp = self.drops_service_proxy(BallLauncherDropsRequest()) + self.spin_service_proxy(SetBoolRequest(True)) + # Ensure that we hear the packet + packet = SetSpinPacket.from_bytes(os.read(self.master, 100)) + self.assertEqual(packet.spin_up, True) + # and then send back the simulated ACK + os.write(self.master, bytes(AckPacket())) + self.spin_service_proxy(SetBoolRequest(False)) + # Ensure that we hear the packet + packet = SetSpinPacket.from_bytes(os.read(self.master, 100)) + self.assertEqual(packet.spin_up, False) + # and then send back the simulated ACK + os.write(self.master, bytes(AckPacket())) + after_drops_resp = self.drops_service_proxy(BallLauncherDropsRequest()) + self.assertEqual(before_drops_resp.count, after_drops_resp.count) + + @classmethod + def tearDownClass(cls): + os.close(cls.master) + os.close(cls.slave) + + +if __name__ == "__main__": + rospy.init_node("test_simulated_shooter") + rostest.rosrun( + "navigator_ball_launcher", + "test_simulated_shooter", + SimulatedBasicTest, + ) + unittest.main()