-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ros_alarms: Add high CPU temperature alarm (closes #1207)
- Loading branch information
Showing
7 changed files
with
191 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
103 changes: 103 additions & 0 deletions
103
SubjuGator/command/subjugator_alarm/alarm_handlers/temp.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import json | ||
from contextlib import nullcontext | ||
from unittest import mock | ||
|
||
import rospy | ||
from ros_alarms import Alarm, AlarmBroadcaster, HandlerBase | ||
from std_msgs.msg import String | ||
|
||
|
||
class Temperature(HandlerBase): | ||
""" | ||
Alarm (inheriting from :class:`ros_alarms.HandlerBase`) which monitors the temperature | ||
of a system. | ||
Attributes: | ||
alarm_name (str): The name of the alarm. Set to ``temp``. | ||
""" | ||
|
||
alarm_name = "temp" | ||
|
||
def __init__(self): | ||
self._killed = False | ||
self.ab = AlarmBroadcaster(self.alarm_name, node_name="temp_kill") | ||
self.in_test = rospy.get_param("/subjugator_alarm_temp_test_flag", False) | ||
self.limit_amounts = {} | ||
if self.in_test: | ||
|
||
def _set_test_values(call: String): | ||
self.limit_amounts = json.loads(call.data) | ||
|
||
self.test_values_sub = rospy.Subscriber( | ||
"/subjugator_alarm_temp_test_values", | ||
String, | ||
_set_test_values, | ||
) | ||
rospy.Timer(rospy.Duration(0.5), self._temp_check) | ||
|
||
def _temp_check(self, *args): | ||
# Only run if the user has psutil installed | ||
try: | ||
import psutil | ||
except ModuleNotFoundError: | ||
return | ||
tv = self.limit_amounts.copy() | ||
with ( | ||
mock.patch( | ||
"psutil._psplatform.sensors_temperatures", | ||
return_value=tv, | ||
) | ||
if self.in_test | ||
else nullcontext() | ||
): | ||
temps = psutil.sensors_temperatures() | ||
any_high_or_crit = False | ||
for name, entries in temps.items(): | ||
for entry in entries: | ||
high = ( | ||
entry.current > (entry.high) if entry.high is not None else None | ||
) | ||
crit = entry.critical and entry.current > (0.9 * entry.critical) | ||
if not self._killed and high: | ||
rospy.logwarn( | ||
f"Temperature of a sensor ('{name}') is quite high...", | ||
) | ||
self.ab.raise_alarm( | ||
problem_description=f"Temperature of '{name}' too high: {entry.current}*C, high: {entry.critical}*C", | ||
parameters={"Temperature": entry.current}, | ||
severity=5, | ||
) | ||
elif not self._killed and crit: | ||
rospy.logwarn( | ||
f"Temperature of a sensor ('{name}') is critical...", | ||
) | ||
self.ab.raise_alarm( | ||
problem_description=f"Temperature of '{name}' too high: {entry.current}*C, critical: {entry.critical}*C", | ||
parameters={"Temperature": entry.current}, | ||
severity=5, | ||
) | ||
# was not previously killed | ||
if crit or high: | ||
any_high_or_crit = True | ||
if self._killed and not any_high_or_crit: | ||
rospy.logwarn("No temperatures are high enough, unkilling...") | ||
self.ab.clear_alarm() | ||
|
||
def raised(self, alarm: Alarm) -> None: | ||
""" | ||
Triggers when the alarm is raised. Sets the state of the alarm monitor to | ||
represent that the alarm was killed. | ||
Parameters: | ||
alarm (ros_alarms.Alarm): The alarm which was raised. | ||
""" | ||
self._killed = True | ||
|
||
def cleared(self, alarm: Alarm) -> None: | ||
""" | ||
Triggers when the alarm is cleared. Sets the state of the alarm monitor to | ||
Parameters: | ||
alarm (ros_alarms.Alarm): The alarm which was cleared. | ||
""" | ||
self._killed = False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<launch> | ||
<param name="/is_simulation" value="True" /> | ||
<param name="/subjugator_alarm_temp_test_flag" value="True" /> | ||
<include file="$(find subjugator_alarm)/launch/alarms.launch" /> | ||
<test test-name="temp_alarm_test" pkg="subjugator_alarm" type="temp_test.py" /> | ||
</launch> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
#! /usr/bin/env python3 | ||
from __future__ import annotations | ||
|
||
import json | ||
import unittest | ||
|
||
import rospy | ||
import rostest | ||
from ros_alarms import AlarmListener | ||
from ros_alarms_msgs.msg import Alarm | ||
from std_msgs.msg import String | ||
|
||
|
||
class TemperatureAlarmTest(unittest.TestCase): | ||
def setUp(self): | ||
self.listener = AlarmListener("temp") | ||
self.alive = False | ||
self.set_values_pub = rospy.Publisher( | ||
"/subjugator_alarm_temp_test_values", | ||
String, | ||
queue_size=5, | ||
) | ||
self.alarm_updates_sub = rospy.Subscriber( | ||
"/alarm/updates", | ||
Alarm, | ||
self._updates_cb, | ||
) | ||
|
||
def _make_data( | ||
self, | ||
current: float, | ||
high: float, | ||
critical: float, | ||
) -> dict[str, list[tuple[str, float, float, float]]]: | ||
# label: current, high, critical | ||
return {"test_temp_device": [("label", current, high, critical)]} | ||
|
||
def _updates_cb(self, alarm: Alarm): | ||
if alarm.alarm_name == "temp": | ||
self.alive = True | ||
|
||
def _send_and_test(self, current: float, high: float, critical: float): | ||
d = self._make_data(current, high, critical) | ||
for _ in range(15): | ||
self.set_values_pub.publish(String(json.dumps(d))) | ||
rospy.sleep(0.1) | ||
|
||
def test_sequence(self): | ||
self.listener.wait_for_server() | ||
# current > critical | ||
self._send_and_test(2, 1, 5) | ||
self.assertTrue(self.listener.is_raised()) | ||
# normal | ||
self._send_and_test(2, 500, 500) | ||
self.assertTrue(self.listener.is_cleared()) | ||
# current > high | ||
self._send_and_test(2, 5, 1) | ||
self.assertTrue(self.listener.is_raised()) | ||
# normal | ||
self._send_and_test(0, 0, 0) | ||
self.assertTrue(self.listener.is_cleared()) | ||
# current within 90% of high | ||
self._send_and_test(91, 200, 100) | ||
self.assertTrue(self.listener.is_raised()) | ||
|
||
|
||
if __name__ == "__main__": | ||
rospy.init_node("temp_test", anonymous=True) | ||
rostest.rosrun("subjugator_alarm", "temp_test", TemperatureAlarmTest) | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters