Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rosserial server] implement rosservice server #615

Open
wants to merge 16 commits into
base: noetic-devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 91 additions & 49 deletions rosserial_server/include/rosserial_server/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/version.hpp>

#include <ros/callback_queue.h>
#include <ros/ros.h>
Expand All @@ -65,29 +66,19 @@ class Session : boost::noncopyable
socket_(io_service),
sync_timer_(io_service),
require_check_timer_(io_service),
ros_spin_timer_(io_service),
sub_spinner_(1), srv_spinner_(1, &ros_srv_callback_queue_),
async_read_buffer_(socket_, buffer_max,
boost::bind(&Session::read_failed, this,
boost::asio::placeholders::error))
{
active_ = false;

timeout_interval_ = boost::posix_time::milliseconds(5000);
timeout_interval_ = boost::posix_time::milliseconds(15000); // rosserial_python has 5s timeout, but sync timeout check multiplies it by three
attempt_interval_ = boost::posix_time::milliseconds(1000);
require_check_interval_ = boost::posix_time::milliseconds(1000);
ros_spin_interval_ = boost::posix_time::milliseconds(10);
require_param_name_ = "~require";

unrecognised_topic_retry_threshold_ = ros::param::param("~unrecognised_topic_retry_threshold", 0);

nh_.setCallbackQueue(&ros_callback_queue_);

// Intermittent callback to service ROS callbacks. To avoid polling like this,
// CallbackQueue could in the future be extended with a scheme to monitor for
// callbacks on another thread, and then queue them up to be executed on this one.
ros_spin_timer_.expires_from_now(ros_spin_interval_);
ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
boost::asio::placeholders::error));
}

Socket& socket()
Expand All @@ -103,6 +94,10 @@ class Session : boost::noncopyable
= boost::bind(&Session::setup_publisher, this, _1);
callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
= boost::bind(&Session::setup_subscriber, this, _1);
callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_SERVER+rosserial_msgs::TopicInfo::ID_PUBLISHER]
= boost::bind(&Session::setup_service_server_publisher, this, _1);
callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_SERVER+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
= boost::bind(&Session::setup_service_server_subscriber, this, _1);
callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER]
= boost::bind(&Session::setup_service_client_publisher, this, _1);
callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER]
Expand All @@ -115,12 +110,16 @@ class Session : boost::noncopyable
active_ = true;
attempt_sync();
read_sync_header();

sub_spinner_.start();
srv_spinner_.start();
}

void stop()
{
// Abort any pending ROS callbacks.
ros_callback_queue_.clear();
// Stop the ros::AsyncSpinner
sub_spinner_.stop();
srv_spinner_.stop();

// Abort active session timer callbacks, if present.
sync_timer_.cancel();
Expand All @@ -131,7 +130,12 @@ class Session : boost::noncopyable
callbacks_.clear();
subscribers_.clear();
publishers_.clear();
services_.clear();
service_clients_.clear();
service_servers_.clear();

// Send disconnection message
std::vector<uint8_t> message(1);
write_message(message, rosserial_msgs::TopicInfo::ID_TX_STOP);

// Close the socket.
socket_.close();
Expand Down Expand Up @@ -164,26 +168,6 @@ class Session : boost::noncopyable
}

private:
/**
* Periodic function which handles calling ROS callbacks, executed on the same
* io_service thread to avoid a concurrency nightmare.
*/
void ros_spin_timeout(const boost::system::error_code& error) {
ros_callback_queue_.callAvailable();

if (ros::ok())
{
// Call again next interval.
ros_spin_timer_.expires_from_now(ros_spin_interval_);
ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this,
boost::asio::placeholders::error));
}
else
{
shutdown();
}
}

//// RECEIVING MESSAGES ////
// TODO: Total message timeout, implement primarily in ReadBuffer.

Expand Down Expand Up @@ -304,6 +288,17 @@ class Session : boost::noncopyable
stream << msg_checksum;

ROS_DEBUG_NAMED("async_write", "Sending buffer of %d bytes to client.", length);

// Will call immediately if we are already on the io_service thread. Otherwise,
// the request is queued up and executed on that thread.
#if BOOST_VERSION >= 107000
boost::asio::dispatch(socket_.get_executor(), boost::bind(&Session::write_buffer, this, buffer_ptr));
#else
socket_.get_io_service().dispatch(boost::bind(&Session::write_buffer, this, buffer_ptr));
#endif
}

void write_buffer(BufferPtr buffer_ptr) {
boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr),
boost::bind(&Session::write_completion_cb, this, boost::asio::placeholders::error, buffer_ptr));
}
Expand Down Expand Up @@ -345,7 +340,7 @@ class Session : boost::noncopyable

void sync_timeout(const boost::system::error_code& error) {
if (error != boost::asio::error::operation_aborted) {
ROS_DEBUG("Sync with device lost.");
ROS_WARN("Sync with device lost.");
stop();
}
}
Expand Down Expand Up @@ -424,6 +419,8 @@ class Session : boost::noncopyable
publishers_[topic_info.topic_id] = pub;

set_sync_timeout(timeout_interval_);

ROS_INFO("publisher name: %s, type: %s, id: %d", topic_info.topic_name.c_str(), topic_info.message_type.c_str(), topic_info.topic_id);
}

void setup_subscriber(ros::serialization::IStream& stream) {
Expand All @@ -435,6 +432,8 @@ class Session : boost::noncopyable
subscribers_[topic_info.topic_id] = sub;

set_sync_timeout(timeout_interval_);

ROS_INFO("subscriber name: %s, type: %s, id: %d", topic_info.topic_name.c_str(), topic_info.message_type.c_str(), topic_info.topic_id);
}

// When the rosserial client creates a ServiceClient object (and/or when it registers that object with the NodeHandle)
Expand All @@ -447,14 +446,14 @@ class Session : boost::noncopyable
rosserial_msgs::TopicInfo topic_info;
ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);

if (!services_.count(topic_info.topic_name)) {
ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
if (!service_clients_.count(topic_info.topic_name)) {
ROS_INFO("Creating service client for topic %s",topic_info.topic_name.c_str());
ServiceClientPtr srv(new ServiceClient(
nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
services_[topic_info.topic_name] = srv;
service_clients_[topic_info.topic_name] = srv;
callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
}
if (services_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
if (service_clients_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS");
} else {
ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s",
Expand All @@ -467,16 +466,16 @@ class Session : boost::noncopyable
rosserial_msgs::TopicInfo topic_info;
ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);

if (!services_.count(topic_info.topic_name)) {
ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str());
if (!service_clients_.count(topic_info.topic_name)) {
ROS_INFO("Creating service client for topic %s",topic_info.topic_name.c_str());
ServiceClientPtr srv(new ServiceClient(
nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2)));
services_[topic_info.topic_name] = srv;
service_clients_[topic_info.topic_name] = srv;
callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1);
}
// see above comment regarding the service client callback for why we set topic_id here
services_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
if (services_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
service_clients_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
if (service_clients_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS");
} else {
ROS_DEBUG("Service client %s: response message MD5 successfully validated as %s",
Expand All @@ -485,6 +484,48 @@ class Session : boost::noncopyable
set_sync_timeout(timeout_interval_);
}

// When the rosserial client creates a ServiceServer object (and/or when it registers that object with the NodeHandle)
// it creates a "proxy" service server
void setup_service_server_publisher(ros::serialization::IStream& stream) {
rosserial_msgs::TopicInfo topic_info;
ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);

if (!service_servers_.count(topic_info.topic_name)) {
ROS_INFO("Creating service server for topic %s",topic_info.topic_name.c_str());
ServiceServerPtr srv(new ServiceServer(nh_, ros_srv_callback_queue_, topic_info, buffer_max, boost::bind(&Session::write_message, this, _1, _2)));
service_servers_[topic_info.topic_name] = srv;
callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::response_handle, srv, _1);
}
if (service_servers_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) {
ROS_WARN("Service server setup: Request message MD5 mismatch between rosserial server and ROS");
} else {
ROS_DEBUG("Service server %s: request message MD5 successfully validated as %s",
topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
}
set_sync_timeout(timeout_interval_);
}

void setup_service_server_subscriber(ros::serialization::IStream& stream) {
rosserial_msgs::TopicInfo topic_info;
ros::serialization::Serializer<rosserial_msgs::TopicInfo>::read(stream, topic_info);

if (!service_servers_.count(topic_info.topic_name)) {
ROS_INFO("Creating service server for topic %s",topic_info.topic_name.c_str());
ServiceServerPtr srv(new ServiceServer(nh_, ros_srv_callback_queue_, topic_info, buffer_max, boost::bind(&Session::write_message, this, _1, _2)));
service_servers_[topic_info.topic_name] = srv;
callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::response_handle, srv, _1);
}
// see above comment regarding the service server callback for why we set topic_id here
service_servers_[topic_info.topic_name]->setTopicId(topic_info.topic_id);
if (service_servers_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) {
ROS_WARN("Service server setup: Response message MD5 mismatch between rosserial server and ROS");
} else {
ROS_DEBUG("Service server %s: response message MD5 successfully validated as %s",
topic_info.topic_name.c_str(),topic_info.md5sum.c_str());
}
set_sync_timeout(timeout_interval_);
}

void handle_log(ros::serialization::IStream& stream) {
rosserial_msgs::Log l;
ros::serialization::Serializer<rosserial_msgs::Log>::read(stream, l);
Expand Down Expand Up @@ -519,23 +560,24 @@ class Session : boost::noncopyable
bool active_;

ros::NodeHandle nh_;
ros::CallbackQueue ros_callback_queue_;
ros::AsyncSpinner sub_spinner_; // For sub topics
ros::AsyncSpinner srv_spinner_; // For service server
ros::CallbackQueue ros_srv_callback_queue_;

boost::posix_time::time_duration timeout_interval_;
boost::posix_time::time_duration attempt_interval_;
boost::posix_time::time_duration require_check_interval_;
boost::posix_time::time_duration ros_spin_interval_;
boost::asio::deadline_timer sync_timer_;
boost::asio::deadline_timer require_check_timer_;
boost::asio::deadline_timer ros_spin_timer_;
std::string require_param_name_;
int unrecognised_topic_retry_threshold_{ 0 };
int unrecognised_topics_{ 0 };

std::map<uint16_t, boost::function<void(ros::serialization::IStream&)> > callbacks_;
std::map<uint16_t, PublisherPtr> publishers_;
std::map<uint16_t, SubscriberPtr> subscribers_;
std::map<std::string, ServiceClientPtr> services_;
std::map<std::string, ServiceClientPtr> service_clients_;
std::map<std::string, ServiceServerPtr> service_servers_;
};

} // namespace
Expand Down
110 changes: 110 additions & 0 deletions rosserial_server/include/rosserial_server/topic_handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
#include <topic_tools/shape_shifter.h>
#include <rosserial_server/msg_lookup.h>

#include <chrono>
#include <thread>
#include <condition_variable>

namespace rosserial_server
{

Expand Down Expand Up @@ -190,6 +194,112 @@ class ServiceClient {

typedef boost::shared_ptr<ServiceClient> ServiceClientPtr;


class ServiceServer {
public:
ServiceServer(ros::NodeHandle& nh, ros::CallbackQueue& service_queue, rosserial_msgs::TopicInfo& topic_info, size_t capacity,
boost::function<void(std::vector<uint8_t>& buffer, const uint16_t topic_id)> write_fn, double timeout = 10)
: write_fn_(write_fn), timeout_(timeout) {
response_buffer_.resize(capacity);
topic_id_ = -1;
get_response_ = false;

rosserial_server::MsgInfo srvinfo;
rosserial_server::MsgInfo reqinfo;
rosserial_server::MsgInfo respinfo;
try
{
srvinfo = lookupMessage(topic_info.message_type, "srv");
reqinfo = lookupMessage(topic_info.message_type + "Request", "srv");
respinfo = lookupMessage(topic_info.message_type + "Response", "srv");
}
catch (const std::exception& e)
{
ROS_WARN_STREAM("Unable to look up service definition: " << e.what());
}

service_md5_ = srvinfo.md5sum;
request_message_md5_ = reqinfo.md5sum;
response_message_md5_ = respinfo.md5sum;

ros::AdvertiseServiceOptions opts;
opts.callback_queue = &service_queue;
opts.service = topic_info.topic_name;
opts.md5sum = service_md5_;
opts.datatype = topic_info.message_type;
opts.req_datatype = topic_info.message_type + "Request";
opts.res_datatype = topic_info.message_type + "Response";
opts.helper = boost::make_shared<ros::ServiceCallbackHelperT<ros::ServiceSpec<topic_tools::ShapeShifter,topic_tools::ShapeShifter> > >(boost::bind(&ServiceServer::request_handle, this, _1, _2));

service_server_ = nh.advertiseService(opts);
}
void setTopicId(uint16_t topic_id) {
topic_id_ = topic_id;
}
std::string getServiceMD5() {
return service_md5_;
}
std::string getRequestMessageMD5() {
return request_message_md5_;
}
std::string getResponseMessageMD5() {
return response_message_md5_;
}

bool request_handle(topic_tools::ShapeShifter& request_message, topic_tools::ShapeShifter& response_message) {

get_response_ = false;
ROS_DEBUG_STREAM("service receive " << service_server_.getService() << " request");

size_t length = ros::serialization::serializationLength(request_message);
std::vector<uint8_t> buffer(length);

ros::serialization::OStream ostream(&buffer[0], length);
ros::serialization::Serializer<topic_tools::ShapeShifter>::write(ostream, request_message);
write_fn_(buffer,topic_id_);

//wait for the response
std::unique_lock<std::mutex> lk(service_mutex_);
if(cv_.wait_for(lk, std::chrono::milliseconds((int)(timeout_*1000)), [this]{ return get_response_; }))
{
ros::serialization::IStream response_stream(&response_buffer_[0], buffer_len_);
ros::serialization::Serializer<topic_tools::ShapeShifter>::read(response_stream, response_message);
service_mutex_.unlock();
return true;
}
else
{
ROS_WARN_STREAM(service_server_.getService() << ": no response for " << timeout_ << ", give up.");
service_mutex_.unlock();
return false;
}
}

void response_handle(ros::serialization::IStream stream) {
buffer_len_ = stream.getLength();
std::memcpy(&response_buffer_[0], stream.getData(),stream.getLength());
ROS_DEBUG_STREAM("service receive " << service_server_.getService() << " response");
get_response_ = true;
cv_.notify_one();
}

private:
std::vector<uint8_t> response_buffer_;
size_t buffer_len_;
ros::ServiceServer service_server_;
boost::function<void(std::vector<uint8_t>& buffer, const uint16_t topic_id)> write_fn_;
std::string service_md5_;
std::string request_message_md5_;
std::string response_message_md5_;
uint16_t topic_id_;
double timeout_;
std::condition_variable cv_;
std::mutex service_mutex_;
bool get_response_;
};

typedef boost::shared_ptr<ServiceServer> ServiceServerPtr;

} // namespace

#endif // ROSSERIAL_SERVER_TOPIC_HANDLERS_H
Loading