diff --git a/SubjuGator/command/subjugator_alarm/CMakeLists.txt b/SubjuGator/command/subjugator_alarm/CMakeLists.txt
index 9a814c5b0..1866138ee 100644
--- a/SubjuGator/command/subjugator_alarm/CMakeLists.txt
+++ b/SubjuGator/command/subjugator_alarm/CMakeLists.txt
@@ -20,3 +20,8 @@ catkin_package(
# INCLUDE_DIRS include
# LIBRARIES ${PROJECT_NAME}
)
+
+if(CATKIN_ENABLE_TESTING)
+ find_package(rostest REQUIRED)
+ add_rostest(test/temp.test)
+endif()
diff --git a/SubjuGator/command/subjugator_alarm/alarm_handlers/__init__.py b/SubjuGator/command/subjugator_alarm/alarm_handlers/__init__.py
index f99903c3a..fbff4b10b 100644
--- a/SubjuGator/command/subjugator_alarm/alarm_handlers/__init__.py
+++ b/SubjuGator/command/subjugator_alarm/alarm_handlers/__init__.py
@@ -4,4 +4,5 @@
from .kill import Kill
from .network_loss import NetworkLoss
from .odom_kill import OdomKill
+from .temp import Temperature
from .thruster_out import ThrusterOut
diff --git a/SubjuGator/command/subjugator_alarm/alarm_handlers/temp.py b/SubjuGator/command/subjugator_alarm/alarm_handlers/temp.py
new file mode 100644
index 000000000..e7cdf65c8
--- /dev/null
+++ b/SubjuGator/command/subjugator_alarm/alarm_handlers/temp.py
@@ -0,0 +1,103 @@
+import json
+from contextlib import nullcontext
+from unittest import mock
+
+import rospy
+from ros_alarms import Alarm, AlarmBroadcaster, HandlerBase
+from std_msgs.msg import String
+
+
+class Temperature(HandlerBase):
+ """
+ Alarm (inheriting from :class:`ros_alarms.HandlerBase`) which monitors the temperature
+ of a system.
+
+ Attributes:
+ alarm_name (str): The name of the alarm. Set to ``temp``.
+ """
+
+ alarm_name = "temp"
+
+ def __init__(self):
+ self._killed = False
+ self.ab = AlarmBroadcaster(self.alarm_name, node_name="temp_kill")
+ self.in_test = rospy.get_param("/subjugator_alarm_temp_test_flag", False)
+ self.limit_amounts = {}
+ if self.in_test:
+
+ def _set_test_values(call: String):
+ self.limit_amounts = json.loads(call.data)
+
+ self.test_values_sub = rospy.Subscriber(
+ "/subjugator_alarm_temp_test_values",
+ String,
+ _set_test_values,
+ )
+ rospy.Timer(rospy.Duration(0.5), self._temp_check)
+
+ def _temp_check(self, *args):
+ # Only run if the user has psutil installed
+ try:
+ import psutil
+ except ModuleNotFoundError:
+ return
+ tv = self.limit_amounts.copy()
+ with (
+ mock.patch(
+ "psutil._psplatform.sensors_temperatures",
+ return_value=tv,
+ )
+ if self.in_test
+ else nullcontext()
+ ):
+ temps = psutil.sensors_temperatures()
+ any_high_or_crit = False
+ for name, entries in temps.items():
+ for entry in entries:
+ high = (
+ entry.current > (entry.high) if entry.high is not None else None
+ )
+ crit = entry.critical and entry.current > (0.9 * entry.critical)
+ if not self._killed and high:
+ rospy.logwarn(
+ f"Temperature of a sensor ('{name}') is quite high...",
+ )
+ self.ab.raise_alarm(
+ problem_description=f"Temperature of '{name}' too high: {entry.current}*C, high: {entry.critical}*C",
+ parameters={"Temperature": entry.current},
+ severity=5,
+ )
+ elif not self._killed and crit:
+ rospy.logwarn(
+ f"Temperature of a sensor ('{name}') is critical...",
+ )
+ self.ab.raise_alarm(
+ problem_description=f"Temperature of '{name}' too high: {entry.current}*C, critical: {entry.critical}*C",
+ parameters={"Temperature": entry.current},
+ severity=5,
+ )
+ # was not previously killed
+ if crit or high:
+ any_high_or_crit = True
+ if self._killed and not any_high_or_crit:
+ rospy.logwarn("No temperatures are high enough, unkilling...")
+ self.ab.clear_alarm()
+
+ def raised(self, alarm: Alarm) -> None:
+ """
+ Triggers when the alarm is raised. Sets the state of the alarm monitor to
+ represent that the alarm was killed.
+
+ Parameters:
+ alarm (ros_alarms.Alarm): The alarm which was raised.
+ """
+ self._killed = True
+
+ def cleared(self, alarm: Alarm) -> None:
+ """
+ Triggers when the alarm is cleared. Sets the state of the alarm monitor to
+
+ Parameters:
+ alarm (ros_alarms.Alarm): The alarm which was cleared.
+ """
+ self._killed = False
diff --git a/SubjuGator/command/subjugator_alarm/launch/alarms.launch b/SubjuGator/command/subjugator_alarm/launch/alarms.launch
index 37a20680e..1b9ad2b20 100644
--- a/SubjuGator/command/subjugator_alarm/launch/alarms.launch
+++ b/SubjuGator/command/subjugator_alarm/launch/alarms.launch
@@ -6,13 +6,13 @@
- kill: ["bus-voltage", "height-over-bottom", "thruster-out", "network-loss", "mission-kill", "odom-kill", "hw-kill"]
+ kill: ["bus-voltage", "height-over-bottom", "thruster-out", "temp", "network-loss", "mission-kill", "odom-kill", "hw-kill"]
kill, thruster-out, bus-voltage, height-over-bottom, network-loss, mission-kill, odom-kill,
- actuator-board-disconnect, actuator-board-packet-error, hw-kill, go
+ actuator-board-disconnect, actuator-board-packet-error, hw-kill, go, temp
diff --git a/SubjuGator/command/subjugator_alarm/test/temp.test b/SubjuGator/command/subjugator_alarm/test/temp.test
new file mode 100644
index 000000000..2b32d6145
--- /dev/null
+++ b/SubjuGator/command/subjugator_alarm/test/temp.test
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
diff --git a/SubjuGator/command/subjugator_alarm/test/temp_test.py b/SubjuGator/command/subjugator_alarm/test/temp_test.py
new file mode 100755
index 000000000..cb4bfc0ca
--- /dev/null
+++ b/SubjuGator/command/subjugator_alarm/test/temp_test.py
@@ -0,0 +1,70 @@
+#! /usr/bin/env python3
+from __future__ import annotations
+
+import json
+import unittest
+
+import rospy
+import rostest
+from ros_alarms import AlarmListener
+from ros_alarms_msgs.msg import Alarm
+from std_msgs.msg import String
+
+
+class TemperatureAlarmTest(unittest.TestCase):
+ def setUp(self):
+ self.listener = AlarmListener("temp")
+ self.alive = False
+ self.set_values_pub = rospy.Publisher(
+ "/subjugator_alarm_temp_test_values",
+ String,
+ queue_size=5,
+ )
+ self.alarm_updates_sub = rospy.Subscriber(
+ "/alarm/updates",
+ Alarm,
+ self._updates_cb,
+ )
+
+ def _make_data(
+ self,
+ current: float,
+ high: float,
+ critical: float,
+ ) -> dict[str, list[tuple[str, float, float, float]]]:
+ # label: current, high, critical
+ return {"test_temp_device": [("label", current, high, critical)]}
+
+ def _updates_cb(self, alarm: Alarm):
+ if alarm.alarm_name == "temp":
+ self.alive = True
+
+ def _send_and_test(self, current: float, high: float, critical: float):
+ d = self._make_data(current, high, critical)
+ for _ in range(15):
+ self.set_values_pub.publish(String(json.dumps(d)))
+ rospy.sleep(0.1)
+
+ def test_sequence(self):
+ self.listener.wait_for_server()
+ # current > critical
+ self._send_and_test(2, 1, 5)
+ self.assertTrue(self.listener.is_raised())
+ # normal
+ self._send_and_test(2, 500, 500)
+ self.assertTrue(self.listener.is_cleared())
+ # current > high
+ self._send_and_test(2, 5, 1)
+ self.assertTrue(self.listener.is_raised())
+ # normal
+ self._send_and_test(0, 0, 0)
+ self.assertTrue(self.listener.is_cleared())
+ # current within 90% of high
+ self._send_and_test(91, 200, 100)
+ self.assertTrue(self.listener.is_raised())
+
+
+if __name__ == "__main__":
+ rospy.init_node("temp_test", anonymous=True)
+ rostest.rosrun("subjugator_alarm", "temp_test", TemperatureAlarmTest)
+ unittest.main()
diff --git a/requirements.txt b/requirements.txt
index bcecb0c69..138fc9f29 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -39,6 +39,9 @@ pygments==2.17.2
# External Devices
pyserial==3.5
+# Drivers
+psutil==5.5.1
+
# Linting
black==24.3.0
modernize==0.8.0 # This can be removed after Noetic Migration is complete.