Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
cbrxyz committed Sep 30, 2024
1 parent 7501cac commit db1ff9a
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
cmake_minimum_required(VERSION 3.0.2)
project(navigator_light_kill_board)

find_package(catkin REQUIRED COMPONENTS
rospy
)

catkin_python_setup()

catkin_package(
)
if(CATKIN_ENABLE_TESTING)
find_package(rostest REQUIRED)
add_rostest(test/simulated.test)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
The :mod:`navigator_ball_launcher` module is used to control the ball launcher
on NaviGator. The module implements the electrical protocol to communicate with
a board that controls the flywheel and servo to drop the balls.
"""

from .driver import NaviGatorLightKillDevice
from .packets import KillReceivePacket, KillSetPacket, KillStatus, SetMovementModePacket
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

from typing import Union

from electrical_protocol import AckPacket, NackPacket, ROSSerialDevice
from ros_alarms import AlarmListener

from .packets import KillReceivePacket, KillSetPacket, SetMovementModePacket


class NaviGatorLightKillDevice(
ROSSerialDevice[
Union[KillSetPacket, SetMovementModePacket],
Union[KillReceivePacket, AckPacket, NackPacket],
],
):

def __init__(self, port: str):
super().__init__(port, 115200)
self._hw_kill_listener = AlarmListener("hw-kill", self._hw_kill_alarm_cb)
self._kill_listener = AlarmListener("kill", self.kill_alarm_cb)

def _hw_kill_listener(self, alarm):
pass

def on_packet_received(self, packet):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dataclasses import dataclass
from enum import Enum

from electrical_protocol import Packet


class KillStatus(Enum):
MOBO_REQUESTED = 0
RF_KILL = 1
EMERGENCY_STOP = 2


@dataclass
class KillSetPacket(Packet, class_id=0x10, subclass_id=0x00, payload_format="<?B"):
set: bool
status: KillStatus


@dataclass
class KillReceivePacket(Packet, class_id=0x10, subclass_id=0x01, payload_format="<?B"):
set: bool
status: KillStatus


@dataclass
class SetMovementModePacket(
Packet,
class_id=0x10,
subclass_id=0x02,
payload_format="<?",
):
autonomous: bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#! /usr/bin/env python3
import rospy
from navigator_ball_launcher.driver import BallLauncherDevice

if __name__ == "__main__":
rospy.init_node("ball_launcher")
device = BallLauncherDevice(str(rospy.get_param("~port")))
rospy.spin()
12 changes: 12 additions & 0 deletions NaviGator/hardware_drivers/navigator_light_kill_board/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0"?>
<package>
<name>navigator_light_kill_board</name>
<version>1.0.0</version>
<description>The navigator_light_kill_board package</description>
<maintainer email="cameron.brown@todo.todo">Cameron Brown</maintainer>
<license>MIT</license>
<buildtool_depend>catkin</buildtool_depend>
<build_depend>rospy</build_depend>
<run_depend>rospy</run_depend>
<export/>
</package>
11 changes: 11 additions & 0 deletions NaviGator/hardware_drivers/navigator_light_kill_board/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD

from catkin_pkg.python_setup import generate_distutils_setup
from setuptools import setup

# Fetch values from package.xml
setup_args = generate_distutils_setup(
packages=["navigator_light_kill_board"],
)

setup(**setup_args)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<launch>
<param name="/is_simulation" value="True" />
<test pkg="navigator_ball_launcher" test-name="test_simulated_shooter" type="test_simulated_shooter.py" />
</launch>
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3

import os
import pty
import unittest

import rospy
import rostest
from electrical_protocol import AckPacket
from navigator_ball_launcher.driver import BallLauncherDevice
from navigator_ball_launcher.packets import ReleaseBallPacket, SetSpinPacket
from navigator_msgs.srv import (
BallLauncherDrops,
BallLauncherDropsRequest,
)
from std_srvs.srv import Empty, EmptyRequest, SetBool, SetBoolRequest


class SimulatedBasicTest(unittest.TestCase):
def __init__(self, *args):
super().__init__(*args)

@classmethod
def setUpClass(cls):
cls.master, cls.slave = pty.openpty()
serial_name = os.ttyname(cls.slave)
cls.device = BallLauncherDevice(serial_name)
cls.drop_ball_proxy = rospy.ServiceProxy(
"/test_simulated_shooter/drop_ball",
Empty,
)
cls.spin_service_proxy = rospy.ServiceProxy(
"/test_simulated_shooter/spin",
SetBool,
)
cls.drops_service_proxy = rospy.ServiceProxy(
"/test_simulated_shooter/number_of_drops",
BallLauncherDrops,
)
cls.drop_ball_proxy.wait_for_service()
cls.spin_service_proxy.wait_for_service()
cls.drops_service_proxy.wait_for_service()

def test_drop(self):
before_drops_resp = self.drops_service_proxy(BallLauncherDropsRequest())
self.drop_ball_proxy(EmptyRequest())
# Ensure that we hear the packet
ReleaseBallPacket.from_bytes(os.read(self.master, 100))
# and then send back the simulated ACK
import time

print(time.time(), "sending")
os.write(self.master, bytes(AckPacket()))
print(time.time(), "sent!")
after_drops_resp = self.drops_service_proxy(BallLauncherDropsRequest())
self.assertEqual(before_drops_resp.count + 1, after_drops_resp.count)

def test_spin(self):
before_drops_resp = self.drops_service_proxy(BallLauncherDropsRequest())
self.spin_service_proxy(SetBoolRequest(True))
# Ensure that we hear the packet
packet = SetSpinPacket.from_bytes(os.read(self.master, 100))
self.assertEqual(packet.spin_up, True)
# and then send back the simulated ACK
os.write(self.master, bytes(AckPacket()))
self.spin_service_proxy(SetBoolRequest(False))
# Ensure that we hear the packet
packet = SetSpinPacket.from_bytes(os.read(self.master, 100))
self.assertEqual(packet.spin_up, False)
# and then send back the simulated ACK
os.write(self.master, bytes(AckPacket()))
after_drops_resp = self.drops_service_proxy(BallLauncherDropsRequest())
self.assertEqual(before_drops_resp.count, after_drops_resp.count)

@classmethod
def tearDownClass(cls):
os.close(cls.master)
os.close(cls.slave)


if __name__ == "__main__":
rospy.init_node("test_simulated_shooter")
rostest.rosrun(
"navigator_ball_launcher",
"test_simulated_shooter",
SimulatedBasicTest,
)
unittest.main()

0 comments on commit db1ff9a

Please sign in to comment.