Skip to content

Commit

Permalink
migrated navigator_emergency_control, navigator_joystick_control, and…
Browse files Browse the repository at this point in the history
… navigator_keyboard_control to ros2
  • Loading branch information
Nihar3430 committed Apr 28, 2024
1 parent f6786ea commit 8734935
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env python3
import rospy
import sys

import rclpy
from rclpy.duration import Duration
from remote_control_lib import RemoteControl
from sensor_msgs.msg import Joy

Expand Down Expand Up @@ -32,11 +35,11 @@ class Joystick:
"""

def __init__(self):
self.force_scale = rospy.get_param("/joystick_wrench/force_scale", 600)
self.torque_scale = rospy.get_param("/joystick_wrench/torque_scale", 500)
self.force_scale = self.declare_parameter("/joystick_wrench/force_scale", 600)
self.torque_scale = self.declare_parameter("/joystick_wrench/torque_scale", 500)

self.remote = RemoteControl("emergency", "/wrench/emergency")
rospy.Subscriber("joy_emergency", Joy, self.joy_recieved)
self.create_subscription(Joy, "joy_emergency", self.joy_recieved)

self.active = False
self.reset()
Expand Down Expand Up @@ -80,15 +83,16 @@ def check_for_timeout(self, joy: Joy):
# No change in state
# The controller times out after 15 minutes
if (
rospy.Time.now() - self.last_joy.header.stamp > rospy.Duration(15 * 60)
self.get_clock().now() - self.last_joy.header.stamp
> Duration(seconds=(15 * 60))
and self.active
):
rospy.logwarn("Controller Timed out. Hold start to resume.")
self.get_logger().warn("Controller Timed out. Hold start to resume.")
self.reset()

else:
joy.header.stamp = (
rospy.Time.now()
self.get_clock().now()
) # In the sim, stamps weren't working right
self.last_joy = joy

Expand All @@ -102,7 +106,7 @@ def joy_recieved(self, joy: Joy) -> None:
Args:
joy (Joy): The Joy message.
"""
self.last_time = rospy.Time.now()
self.last_time = self.get_clock().now()
self.check_for_timeout(joy)

# Assigns readable names to the buttons that are used
Expand All @@ -116,14 +120,14 @@ def joy_recieved(self, joy: Joy) -> None:
thruster_deploy = bool(joy.buttons[5])

if go_inactive and not self.last_go_inactive:
rospy.loginfo("Go inactive pressed. Going inactive")
self.get_logger().info("Go inactive pressed. Going inactive")
self.reset()
return

# Reset controller state if only start is pressed down about 1 seconds
self.start_count += start
if self.start_count > 5:
rospy.loginfo("Resetting controller state")
self.get_logger().info("Resetting controller state")
self.reset()
self.active = True

Expand Down Expand Up @@ -170,19 +174,22 @@ def joy_recieved(self, joy: Joy) -> None:
rotation = joy.axes[3] * self.torque_scale
self.remote.publish_wrench(x, y, rotation, joy.header.stamp)

def die_check(self, _: rospy.timer.TimerEvent) -> None:
def die_check(self, _: rclpy.timer.TimerEvent) -> None:
"""
Publishes zeros after 2 seconds of no update in case node dies.
"""
# No new instructions after 2 seconds
if self.active and rospy.Time.now() - self.last_time > rospy.Duration(2):
if self.active and self.get_clock().now() - self.last_time > Duration(
seconds=2,
):
# Zero the wrench, reset
self.reset()


if __name__ == "__main__":
rospy.init_node("emergency")
rclpy.init(args=sys.argv)
node = rclpy.create_node("emergency")

emergency = Joystick()
rospy.Timer(rospy.Duration(1), emergency.die_check, oneshot=False)
rospy.spin()
node.create_timer(Duration(seconds=1.0), emergency.die_check)
rclpy.spin(node)
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env python3
import rospy
import sys

import rclpy
from rclpy.duration import Duration
from remote_control_lib import RemoteControl
from sensor_msgs.msg import Joy

Expand All @@ -11,12 +14,12 @@

class Joystick:
def __init__(self):
self.force_scale = rospy.get_param("~force_scale", 600)
self.torque_scale = rospy.get_param("~torque_scale", 500)
self.force_scale = self.declare_parameter("~force_scale", 600)
self.torque_scale = self.declare_parameter("~torque_scale", 500)

self.remote = RemoteControl("joystick", "/wrench/rc")
self.reset()
rospy.Subscriber("joy", Joy, self.joy_recieved)
self.create_subscription(Joy, "joy", self.joy_recieved)

def reset(self):
"""
Expand Down Expand Up @@ -49,15 +52,16 @@ def check_for_timeout(self, joy):
if (
joy.axes == self.last_joy.axes
and joy.buttons == self.last_joy.buttons
and rospy.Time.now() - self.last_joy.header.stamp > rospy.Duration(15 * 60)
and self.get_clock().now() - self.last_joy.header.stamp
> Duration(seconds=(15 * 60))
and self.active
):
rospy.logwarn("Controller Timed out. Hold start to resume.")
self.get_logger().warn("Controller Timed out. Hold start to resume.")
self.reset()

else:
joy.header.stamp = (
rospy.Time.now()
self.get_clock().now()
) # In the sim, stamps weren't working right
self.last_joy = joy

Expand All @@ -78,14 +82,14 @@ def joy_recieved(self, joy):
thruster_deploy = bool(joy.buttons[5]) # RB

if back and not self.last_back:
rospy.loginfo("Back pressed. Going inactive")
self.get_logger().info("Back pressed. Going inactive")
self.reset()
return

# Reset controller state if only start is pressed down about 1 second
self.start_count += start
if self.start_count > 5:
rospy.loginfo("Resetting controller state")
self.get_logger().info("Resetting controller state")
self.reset()
self.active = True

Expand Down Expand Up @@ -143,11 +147,13 @@ def joy_recieved(self, joy):
x = joy.axes[1] * self.force_scale
y = joy.axes[0] * self.force_scale
rotation = joy.axes[3] * self.torque_scale
self.remote.publish_wrench(x, y, rotation, rospy.Time.now())
self.remote.publish_wrench(x, y, rotation, self.get_clock().now())


if __name__ == "__main__":
rospy.init_node("joystick")
rclpy.init(args=sys.argv)
node = rclpy.create_node("joystick")

joystick = Joystick()
rospy.spin()

rclpy.spin(node)
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@
calls. Curses is used to display a basic UI in the terminal that gives the user
useful feedback and captures key presses to be sent to the server.
"""

import contextlib
import curses
import sys
from typing import Optional

import rospy
import rclpy
from navigator_msgs.srv import KeyboardControl
from rclpy.duration import Duration

__author__ = "Anthony Olive"


rospy.init_node("keyboard_client", anonymous=True)
rclpy.init(args=sys.argv)
node = rclpy.create_node("keyboard_client", anonymous=True)


class KeyboardClient:
Expand All @@ -31,7 +33,7 @@ def __init__(self, stdscr):
self.uuid = ""
self.is_locked = False

self.keyboard_server = rospy.ServiceProxy("/keyboard_control", KeyboardControl)
self.keyboard_server = self.create_client(KeyboardControl, "/keyboard_control")

self.help_menu = [
"Lock: L ",
Expand Down Expand Up @@ -64,19 +66,19 @@ def read_key(self) -> Optional[int]:
new_keycode = self.screen.getch()

# This eliminates building a buffer of keys that takes forever to process
while (new_keycode != -1) and (not rospy.is_shutdown()):
while (new_keycode != -1) and (not self.is_shutdown()):
keycode = new_keycode
new_keycode = self.screen.getch()

# The 'q' key can be used to quit the program
if keycode == ord("q"):
rospy.signal_shutdown("The user has closed the keyboard client")
rclpy.shutdown("The user has closed the keyboard client")
elif keycode == ord("H"):
raise NotImplementedError("Kevin, you just threw away your shot!")

return keycode if keycode != -1 else None

def send_key(self, _: rospy.timer.TimerEvent) -> None:
def send_key(self, _: rclpy.timer.TimerEvent) -> None:
"""
Sends the key to the keyboard server and stores the returned locked
status and generated UUID (if one was received).
Expand Down Expand Up @@ -138,12 +140,16 @@ def clear(self) -> None:


def main(stdscr):
rospy.wait_for_service("/keyboard_control")
while not rclpy.wait_for_service(node, "/keyboard_control", timeout_sec=1.0):
node.get_logger().info("service not available, waiting again...")
tele = KeyboardClient(stdscr)
rospy.Timer(rospy.Duration(0.05), tele.send_key, oneshot=False)
rospy.spin()
node.create_timer(Duration(seconds=0.05), tele.send_key)
rclpy.spin(node)


if __name__ == "__main__":
with contextlib.suppress(rospy.ROSInterruptException):
# with contextlib.suppress(rospy.ROSInterruptException):
# curses.wrapper(main)
with contextlib.suppress(rclpy.ROSInterruptException):
curses.wrapper(main)
# not sure about this change, could not find any migration solution
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,27 @@


import curses
import sys
import uuid

import rospy
import rclpy
from navigator_msgs.srv import KeyboardControl, KeyboardControlRequest
from remote_control_lib import RemoteControl

__author__ = "Anthony Olive"


rospy.init_node("keyboard_server")
rclpy.init(args=sys.argv)
node = rclpy.create_node("keyboard_server")


class KeyboardServer:
def __init__(self):
self.force_scale = rospy.get_param("/joystick_wrench/force_scale", 600)
self.torque_scale = rospy.get_param("/joystick_wrench/torque_scale", 500)
self.force_scale = self.declare_parameter("/joystick_wrench/force_scale", 600)
self.torque_scale = self.declare_parameter("/joystick_wrench/torque_scale", 500)

self.remote = RemoteControl("keyboard", "/wrench/keyboard")
rospy.Service("/keyboard_control", KeyboardControl, self.key_recieved)
self.create_service(KeyboardControl, "/keyboard_control", self.key_recieved)

# Initialize this to a random UUID so that a client without a UUID cannot authenticate
self.locked_uuid = uuid.uuid4().hex
Expand Down Expand Up @@ -115,4 +117,4 @@ def execute_key(self, key: int):

if __name__ == "__main__":
keyboard = KeyboardServer()
rospy.spin()
rclpy.spin(node)
Loading

0 comments on commit 8734935

Please sign in to comment.