Skip to content

Commit

Permalink
navigator_pico_kill_board: Add temporary adrian driver
Browse files Browse the repository at this point in the history
  • Loading branch information
cbrxyz committed Nov 7, 2024
1 parent 6f78b54 commit 2447bf9
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
cmake_minimum_required(VERSION 2.8.3)
project(navigator_pico_kill_board)
find_package(catkin REQUIRED COMPONENTS
mil_usb_to_can
)
add_rostest(test/simulated_board.test)
catkin_python_setup()
catkin_package()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .handle import ThrusterAndKillBoard
from .packets import KillRecievePacket, KillSetPacket, KillStatus
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/python3
from __future__ import annotations

import rospy
from mil_usb_to_can.sub9 import CANDeviceHandle, NackPacket
from ros_alarms import AlarmBroadcaster, AlarmListener
from ros_alarms_msgs.msg import Alarm
from std_srvs.srv import SetBool, SetBoolRequest, SetBoolResponse

from .packets import (
KillRecievePacket,
KillSetPacket,
KillStatus,
)
from .thruster import make_thruster_dictionary


class ThrusterAndKillBoard(CANDeviceHandle):
"""
Device handle for the thrust and kill board.
"""

ID_MAPPING = {
"FLH": 0,
"FRH": 1,
"FLV": 2,
"FRV": 3,
"BLH": 4,
"BRH": 5,
"BLV": 6,
"BRV": 7,
}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize thruster mapping from params
self.thrusters = make_thruster_dictionary(
rospy.get_param("/thruster_layout/thrusters"),
)
# Tracks last hw-kill alarm update
self._last_hw_kill = None
# Used to raise/clear hw-kill when board updates
self._kill_broadcaster = AlarmBroadcaster("hw-kill")
# Listens to hw-kill updates to ensure another nodes doesn't manipulate it
self._hw_kill_listener = AlarmListener(
"hw-kill",
callback_funct=self.on_hw_kill,
)
# Provide service for alarm handler to set/clear the motherboard kill
self._unkill_service = rospy.Service("/set_mobo_kill", SetBool, self.set_kill)
self._last_heartbeat = rospy.Time.now()
self._last_packet = None
self._updated_kill = False

def set_kill(self, req: SetBoolRequest) -> SetBoolResponse:
"""
Called on service calls to ``/set_mobo_kill``, sending the appropriate
packet to the board to unassert or assert to motherboard-origin kill.
Args:
req (SetBoolRequest): The service request.
Returns:
SetBoolResponse: The service response.
"""
self.send_data(KillSetPacket(req.data, KillStatus.MOBO_REQUESTED))
start = rospy.Time.now()
while rospy.Time.now() - start < rospy.Duration(1):
if self._last_packet is not None:
break

if isinstance(self._last_packet, NackPacket):
raise RuntimeError("Request not acknowledged.")

start = rospy.Time.now()
while rospy.Time.now() - start < rospy.Duration(1):
if self._updated_kill:
break

if self._updated_kill:
self._updated_kill = False
return SetBoolResponse(success=True)
else:
return SetBoolResponse(
success=False,
message="No response from board after 1 second.",
)

def on_hw_kill(self, alarm: Alarm) -> None:
"""
Update the classes' hw-kill alarm to the latest update.
Args:
alarm (:class:`~ros_alarms_msgs.msg._Alarm.Alarm`): The alarm message to update with.
"""
self._last_hw_kill = alarm

def update_software_kill(self, raised: bool, message: str):
# If the current status differs from the alarm, update the alarm
severity = 2 if raised else 0
self._updated_kill = True
self._hw_kill_listener.wait_for_server()
if (
self._last_hw_kill is None
or self._last_hw_kill.raised != raised
or self._last_hw_kill.problem_description != message
or self._last_hw_kill.severity != severity
):
if raised:
self._kill_broadcaster.raise_alarm(
severity=severity,
problem_description=message,
)
else:
self._kill_broadcaster.clear_alarm(severity=severity)

def on_data(
self,
data: KillRecievePacket,
) -> None:
"""
Parse the two bytes and raise kills according to a set of specifications
listed below.
"""
print(data)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dataclasses import dataclass
from enum import IntEnum

from electrical_protocol import Packet


class KillStatus(IntEnum):
MOBO_REQUESTED = 0
RF_KILL = 1
EMERGENCY_STOP = 2


@dataclass
class KillSetPacket(Packet, class_id=0x10, subclass_id=0x0, payload_format="?B"):
set: bool
status: KillStatus


@dataclass
class KillRecievePacket(Packet, class_id=0x10, subclass_id=0x1, payload_format="?B"):
set: bool
status: KillStatus
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/python3
from __future__ import annotations

import rospy
from mil_usb_to_can.sub9 import AckPacket, NackPacket, SimulatedCANDeviceHandle
from std_srvs.srv import SetBool, SetBoolRequest, SetBoolResponse

from .packets import (
HeartbeatReceivePacket,
HeartbeatSetPacket,
KillReceivePacket,
KillSetPacket,
KillStatus,
ThrustSetPacket,
)


class ThrusterAndKillBoardSimulation(SimulatedCANDeviceHandle):
"""
Serial simulator for the thruster and kill board,
providing services to simulate physical plug connections/disconnections.
Inherits from :class:`~mil_usb_to_can.SimulatedCANDevice`.
Attributes:
kill (bool): Whether the hard kill was set.
"""

HEARTBEAT_TIMEOUT_SECONDS = rospy.Duration(1.0)

def __init__(self, *args, **kwargs):
self.kill = False
self._last_heartbeat = None
super().__init__(*args, **kwargs)
self._update_timer = rospy.Timer(rospy.Duration(1), self._check_for_timeout)
self._kill_srv = rospy.Service("/simulate_kill", SetBool, self.set_kill)

def _check_for_timeout(self, _: rospy.timer.TimerEvent):
if self.heartbeat_timedout and not self.kill:
self.send_data(bytes(KillSetPacket(True, KillStatus.BOARD_HEARTBEAT_LOST)))
self.kill = True

def set_kill(self, req: SetBoolRequest) -> SetBoolResponse:
"""
Called by the `/simulate_kill` service to set the kill state of the
simluated device.
Args:
req (SetBoolRequest): The request to set the service with.
Returns:
SetBoolResponse: The response to the service that the operation was successful.
"""
self.kill = req.data
return SetBoolResponse(success=True)

@property
def heartbeat_timedout(self) -> bool:
"""
Whether the heartbeat timed out.
Returns:
bool: The status of the heartbeat timing out.
"""
return (
self._last_heartbeat is None
or (rospy.Time.now() - self._last_heartbeat)
> self.HEARTBEAT_TIMEOUT_SECONDS
)

def on_data(
self,
packet: HeartbeatSetPacket | ThrustSetPacket | KillSetPacket,
) -> None:
"""
Serves as the data handler for the device. Handles :class:`KillMessage`,
:class:`ThrustPacket`, and :class:`HeartbeatMessage` types.
"""
if isinstance(packet, HeartbeatSetPacket):
self._last_heartbeat = rospy.Time.now()
self.send_data(bytes(HeartbeatReceivePacket()))

elif isinstance(packet, ThrustSetPacket):
self.send_data(bytes(AckPacket()))

elif isinstance(packet, KillSetPacket):
self.kill = packet.set
self.send_data(bytes(AckPacket()))
self.send_data(bytes(KillReceivePacket(packet.set, packet.status)))

else:
self.send_data(bytes(NackPacket()))
13 changes: 13 additions & 0 deletions NaviGator/hardware_drivers/navigator_pico_kill_board/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0"?>
<package format="2">
<name>navigator_pico_kill_board</name>
<version>0.0.0</version>
<description>The navigator_pico_kill_board package</description>
<maintainer email="cameron@brown.email">Cameron Brown</maintainer>
<license>MIT</license>
<author>Cameron Brown</author>
<buildtool_depend>catkin</buildtool_depend>
<build_depend>mil_usb_to_can</build_depend>
<build_export_depend>mil_usb_to_can</build_export_depend>
<exec_depend>mil_usb_to_can</exec_depend>
</package>
11 changes: 11 additions & 0 deletions NaviGator/hardware_drivers/navigator_pico_kill_board/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD

from catkin_pkg.python_setup import generate_distutils_setup
from setuptools import setup

# Fetch values from package.xml
setup_args = generate_distutils_setup(
packages=["navigator_pico_kill_board"],
)

setup(**setup_args)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<launch>
<param name="/is_simulation" value="True" />

<node pkg="mil_usb_to_can" type="sub9_driver.py" name="usb_to_can_driver">
<rosparam>
device_handles:
sub9_thrust_and_kill_board.ThrusterAndKillBoard: [sub9_thrust_and_kill_board.KillReceivePacket]
simulated_devices:
sub9_thrust_and_kill_board.ThrusterAndKillBoardSimulation: [sub9_thrust_and_kill_board.HeartbeatSetPacket, sub9_thrust_and_kill_board.ThrustSetPacket, sub9_thrust_and_kill_board.KillSetPacket]
</rosparam>
</node>

<include file="$(find subjugator_launch)/launch/subsystems/thruster_mapper.launch" />
<include file="$(find subjugator_alarm)/launch/alarms.launch" />

<test test-name="simulated_board_test" pkg="sub9_thrust_and_kill_board" type="simulated_board_test.py" time-limit="5.0"/>
</launch>
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/usr/bin/env python3
import unittest

import rospy
from ros_alarms import AlarmListener
from std_srvs.srv import SetBool, SetBoolRequest
from sub9_thrust_and_kill_board.packets import (
HeartbeatReceivePacket,
HeartbeatSetPacket,
KillReceivePacket,
KillSetPacket,
KillStatus,
ThrusterId,
ThrustSetPacket,
)


class SimulatedBoardTest(unittest.TestCase):
"""
Integration test for CAN2USB board driver. Talks
to a simulated CAN device which should add two integers
"""

def __init__(self, *args):
self.kill_srv = rospy.ServiceProxy("/set_mobo_kill", SetBool)
self.hw_alarm_listener = AlarmListener("hw-kill")
super().__init__(*args)

def test_correct_response(self):
"""
Test we can get correct data through CAN bus at least ten times.
"""
self.kill_srv.wait_for_service(5)
self.hw_alarm_listener.wait_for_server()
self.assertTrue(self.kill_srv(SetBoolRequest(True)).success)
self.assertTrue(self.hw_alarm_listener.is_raised(True))
self.assertTrue(self.kill_srv(SetBoolRequest(False)).success)
self.assertTrue(self.hw_alarm_listener.is_raised(False))

def test_packet(self):
# ThrustSetPacket
thrust_set_packet = ThrustSetPacket(ThrusterId.FLH, 0.5)
self.assertEqual(thrust_set_packet.thruster_id, ThrusterId.FLH)
self.assertEqual(
bytes(thrust_set_packet),
b"7\x01\x02\x02\x05\x00\x00\x00\x00\x00?H\x84",
)
# HeartbeatSetPacket
heartbeat_set_packet = HeartbeatSetPacket()
self.assertEqual(bytes(heartbeat_set_packet), b"7\x01\x02\x00\x00\x00\x02\x08")
# HeartbeatReceivePacket
heartbeat_receive_packet = HeartbeatReceivePacket()
self.assertEqual(
bytes(heartbeat_receive_packet),
b"7\x01\x02\x01\x00\x00\x03\x0b",
)
# KillSetPacket
kill_set_packet = KillSetPacket(True, KillStatus.BATTERY_LOW)
self.assertEqual(bytes(kill_set_packet), b"7\x01\x02\x03\x02\x00\x01\x04\x0c)")
# KillReceivePacket
kill_receive_packet = KillReceivePacket(True, KillStatus.BATTERY_LOW)
self.assertEqual(
bytes(kill_receive_packet),
b"7\x01\x02\x04\x02\x00\x01\x04\r.",
)


if __name__ == "__main__":
rospy.init_node("simulated_board_test", anonymous=True)
import rostest

rostest.rosrun(
"sub9_thrust_and_kill_board",
"simulated_board_test",
SimulatedBoardTest,
)
unittest.main()

0 comments on commit 2447bf9

Please sign in to comment.