Skip to content

Commit

Permalink
Merge pull request #153 from sunava/move_base
Browse files Browse the repository at this point in the history
[move_base] added move_base interface to external interface
  • Loading branch information
Tigul authored Apr 24, 2024
2 parents 93bb231 + 21f4ea0 commit 408986d
Showing 1 changed file with 81 additions and 0 deletions.
81 changes: 81 additions & 0 deletions src/pycram/external_interfaces/move_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import sys

import rospy
import actionlib
import rosnode
from geometry_msgs.msg import PoseStamped
from typing import Callable

try:
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
except ModuleNotFoundError as e:
rospy.logwarn(f"Could not import MoveBase messages, Navigation interface could not be initialized")


# Global variables for shared resources
nav_action_client = None
is_init = False


def create_nav_action_client() -> actionlib.SimpleActionClient:
"""Creates a new action client for the move_base interface."""
client = actionlib.SimpleActionClient("move_base", MoveBaseAction)
rospy.loginfo("Waiting for move_base action server")
client.wait_for_server()
return client


def init_nav_interface(func: Callable) -> Callable:
"""Ensures initialization of the navigation interface before function execution."""

def wrapper(*args, **kwargs):
global is_init
global nav_action_client

if is_init:
return func(*args, **kwargs)

if "move_base_msgs" not in sys.modules:
rospy.logwarn("Could not initialize the navigation interface: move_base_msgs not imported")
return

if "/move_base" in rosnode.get_node_names():
nav_action_client = create_nav_action_client()
rospy.loginfo("Successfully initialized navigation interface")
is_init = True
else:
rospy.logwarn("Move_base is not running, could not initialize navigation interface")
return

return func(*args, **kwargs)

return wrapper


@init_nav_interface
def query_pose_nav(navpose: PoseStamped):
"""Sends a goal to the move_base service, initiating robot navigation to a given pose."""
global nav_action_client
global query_result

def active_callback():
rospy.loginfo("Sent query to move_base")

def done_callback(state, result):
rospy.loginfo("Finished moving")
global query_result
query_result = result

goal_msg = MoveBaseGoal()
goal_msg.target_pose = navpose
nav_action_client.send_goal(goal_msg, active_cb=active_callback, done_cb=done_callback)

nav_action_client.wait_for_result()

return query_result


def cancel_nav():
"""Cancels the current navigation goal."""
global nav_action_client
nav_action_client.cancel_all_goals()

0 comments on commit 408986d

Please sign in to comment.