From d636dbbe05043e28e4b0bf682620355e1c6c8c9a Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Thu, 21 Nov 2019 08:59:13 +0900 Subject: [PATCH 01/15] Add ServiceServer in rosserial server using topic_tools::ShapeShifter. --- .../include/rosserial_server/session.h | 68 ++++++++++-- .../include/rosserial_server/topic_handlers.h | 102 ++++++++++++++++++ 2 files changed, 161 insertions(+), 9 deletions(-) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index 36925a8fb..3255e5392 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -103,6 +103,10 @@ class Session : boost::noncopyable = boost::bind(&Session::setup_publisher, this, _1); callbacks_[rosserial_msgs::TopicInfo::ID_SUBSCRIBER] = boost::bind(&Session::setup_subscriber, this, _1); + callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_SERVER+rosserial_msgs::TopicInfo::ID_PUBLISHER] + = boost::bind(&Session::setup_service_server_publisher, this, _1); + callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_SERVER+rosserial_msgs::TopicInfo::ID_SUBSCRIBER] + = boost::bind(&Session::setup_service_server_subscriber, this, _1); callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_PUBLISHER] = boost::bind(&Session::setup_service_client_publisher, this, _1); callbacks_[rosserial_msgs::TopicInfo::ID_SERVICE_CLIENT+rosserial_msgs::TopicInfo::ID_SUBSCRIBER] @@ -131,7 +135,8 @@ class Session : boost::noncopyable callbacks_.clear(); subscribers_.clear(); publishers_.clear(); - services_.clear(); + service_clients_.clear(); + service_servers_.clear(); // Close the socket. socket_.close(); @@ -447,14 +452,14 @@ class Session : boost::noncopyable rosserial_msgs::TopicInfo topic_info; ros::serialization::Serializer::read(stream, topic_info); - if (!services_.count(topic_info.topic_name)) { + if (!service_clients_.count(topic_info.topic_name)) { ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str()); ServiceClientPtr srv(new ServiceClient( nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); - services_[topic_info.topic_name] = srv; + service_clients_[topic_info.topic_name] = srv; callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1); } - if (services_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) { + if (service_clients_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) { ROS_WARN("Service client setup: Request message MD5 mismatch between rosserial client and ROS"); } else { ROS_DEBUG("Service client %s: request message MD5 successfully validated as %s", @@ -467,16 +472,16 @@ class Session : boost::noncopyable rosserial_msgs::TopicInfo topic_info; ros::serialization::Serializer::read(stream, topic_info); - if (!services_.count(topic_info.topic_name)) { + if (!service_clients_.count(topic_info.topic_name)) { ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str()); ServiceClientPtr srv(new ServiceClient( nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); - services_[topic_info.topic_name] = srv; + service_clients_[topic_info.topic_name] = srv; callbacks_[topic_info.topic_id] = boost::bind(&ServiceClient::handle, srv, _1); } // see above comment regarding the service client callback for why we set topic_id here - services_[topic_info.topic_name]->setTopicId(topic_info.topic_id); - if (services_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) { + service_clients_[topic_info.topic_name]->setTopicId(topic_info.topic_id); + if (service_clients_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) { ROS_WARN("Service client setup: Response message MD5 mismatch between rosserial client and ROS"); } else { ROS_DEBUG("Service client %s: response message MD5 successfully validated as %s", @@ -485,6 +490,50 @@ class Session : boost::noncopyable set_sync_timeout(timeout_interval_); } + // When the rosserial client creates a ServiceServer object (and/or when it registers that object with the NodeHandle) + // it creates a "proxy" service server + void setup_service_server_publisher(ros::serialization::IStream& stream) { + rosserial_msgs::TopicInfo topic_info; + ros::serialization::Serializer::read(stream, topic_info); + + if (!service_servers_.count(topic_info.topic_name)) { + ROS_DEBUG("Creating service server for topic %s",topic_info.topic_name.c_str()); + ServiceServerPtr srv(new ServiceServer( + nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); + service_servers_[topic_info.topic_name] = srv; + callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::response_handle, srv, _1); + } + if (service_servers_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) { + ROS_WARN("Service server setup: Request message MD5 mismatch between rosserial server and ROS"); + } else { + ROS_DEBUG("Service server %s: request message MD5 successfully validated as %s", + topic_info.topic_name.c_str(),topic_info.md5sum.c_str()); + } + set_sync_timeout(timeout_interval_); + } + + void setup_service_server_subscriber(ros::serialization::IStream& stream) { + rosserial_msgs::TopicInfo topic_info; + ros::serialization::Serializer::read(stream, topic_info); + + if (!service_servers_.count(topic_info.topic_name)) { + ROS_DEBUG("Creating service server for topic %s",topic_info.topic_name.c_str()); + ServiceServerPtr srv(new ServiceServer( + nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); + service_servers_[topic_info.topic_name] = srv; + callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::response_handle, srv, _1); + } + // see above comment regarding the service server callback for why we set topic_id here + service_servers_[topic_info.topic_name]->setTopicId(topic_info.topic_id); + if (service_servers_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) { + ROS_WARN("Service server setup: Response message MD5 mismatch between rosserial server and ROS"); + } else { + ROS_DEBUG("Service server %s: response message MD5 successfully validated as %s", + topic_info.topic_name.c_str(),topic_info.md5sum.c_str()); + } + set_sync_timeout(timeout_interval_); + } + void handle_log(ros::serialization::IStream& stream) { rosserial_msgs::Log l; ros::serialization::Serializer::read(stream, l); @@ -535,7 +584,8 @@ class Session : boost::noncopyable std::map > callbacks_; std::map publishers_; std::map subscribers_; - std::map services_; + std::map service_clients_; + std::map service_servers_; }; } // namespace diff --git a/rosserial_server/include/rosserial_server/topic_handlers.h b/rosserial_server/include/rosserial_server/topic_handlers.h index 737adb5b4..480af6ea7 100644 --- a/rosserial_server/include/rosserial_server/topic_handlers.h +++ b/rosserial_server/include/rosserial_server/topic_handlers.h @@ -190,6 +190,108 @@ class ServiceClient { typedef boost::shared_ptr ServiceClientPtr; + +class ServiceServer { +public: + ServiceServer(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info, + boost::function& buffer, const uint16_t topic_id)> write_fn) + : write_fn_(write_fn) { + topic_id_ = -1; + get_response_ = false; + + rosserial_server::MsgInfo srvinfo; + rosserial_server::MsgInfo reqinfo; + rosserial_server::MsgInfo respinfo; + try + { + srvinfo = lookupMessage(topic_info.message_type, "srv"); + reqinfo = lookupMessage(topic_info.message_type + "Request", "srv"); + respinfo = lookupMessage(topic_info.message_type + "Response", "srv"); + } + catch (const std::exception& e) + { + ROS_WARN_STREAM("Unable to look up service definition: " << e.what()); + } + + service_md5_ = srvinfo.md5sum; + request_message_md5_ = reqinfo.md5sum; + response_message_md5_ = respinfo.md5sum; + + ros::AdvertiseServiceOptions opts; + opts.service = topic_info.topic_name; + opts.md5sum = service_md5_; + opts.datatype = topic_info.message_type; + opts.req_datatype = topic_info.message_type + "Request"; + opts.res_datatype = topic_info.message_type + "Response"; + opts.helper = boost::make_shared > >(boost::bind(&ServiceServer::request_handle, this, _1, _2)); + + service_server_ = nh.advertiseService(opts); + } + void setTopicId(uint16_t topic_id) { + topic_id_ = topic_id; + } + std::string getServiceMD5() { + return service_md5_; + } + std::string getRequestMessageMD5() { + return request_message_md5_; + } + std::string getResponseMessageMD5() { + return response_message_md5_; + } + + bool request_handle(topic_tools::ShapeShifter& request_message, topic_tools::ShapeShifter& response_message) { + + get_response_ = false; + ROS_DEBUG_STREAM("service receive " << service_server_.getService() << " request"); + + size_t length = ros::serialization::serializationLength(request_message); + std::vector buffer(length); + + ros::serialization::OStream ostream(&buffer[0], length); + ros::serialization::Serializer::write(ostream, request_message); + write_fn_(buffer,topic_id_); + + //wait for the response + int cnt = 0; + while(!get_response_) + { + ros::Duration(0.01).sleep(); + + if (cnt++ > 500) // hard-coding + { + ROS_WARN_STREAM(service_server_.getService() << ": no response!!"); + return false; + } + } + + response_message = response_message_; + + ROS_ERROR_STREAM("OK!!"); + return true; + } + + void response_handle(ros::serialization::IStream stream) { + ros::serialization::Serializer::read(stream, response_message_); + ROS_DEBUG_STREAM("service receive " << service_server_.getService() << " response"); + get_response_ = true; + } + +private: + //topic_tools::ShapeShifter request_message_; + topic_tools::ShapeShifter response_message_; + ros::ServiceServer service_server_; + boost::function& buffer, const uint16_t topic_id)> write_fn_; + std::string service_md5_; + std::string request_message_md5_; + std::string response_message_md5_; + uint16_t topic_id_; + + bool get_response_; +}; + +typedef boost::shared_ptr ServiceServerPtr; + } // namespace #endif // ROSSERIAL_SERVER_TOPIC_HANDLERS_H From 3e544cb2b04a31095df4309b04c3ec6157d4c95a Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Thu, 21 Nov 2019 09:27:46 +0900 Subject: [PATCH 02/15] User asyncspinner instead of ros timer --- .../include/rosserial_server/session.h | 61 +++++++------------ 1 file changed, 21 insertions(+), 40 deletions(-) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index 3255e5392..cf2bfba61 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -65,7 +65,7 @@ class Session : boost::noncopyable socket_(io_service), sync_timer_(io_service), require_check_timer_(io_service), - ros_spin_timer_(io_service), + spinner_(2), async_read_buffer_(socket_, buffer_max, boost::bind(&Session::read_failed, this, boost::asio::placeholders::error)) @@ -75,19 +75,9 @@ class Session : boost::noncopyable timeout_interval_ = boost::posix_time::milliseconds(5000); attempt_interval_ = boost::posix_time::milliseconds(1000); require_check_interval_ = boost::posix_time::milliseconds(1000); - ros_spin_interval_ = boost::posix_time::milliseconds(10); require_param_name_ = "~require"; unrecognised_topic_retry_threshold_ = ros::param::param("~unrecognised_topic_retry_threshold", 0); - - nh_.setCallbackQueue(&ros_callback_queue_); - - // Intermittent callback to service ROS callbacks. To avoid polling like this, - // CallbackQueue could in the future be extended with a scheme to monitor for - // callbacks on another thread, and then queue them up to be executed on this one. - ros_spin_timer_.expires_from_now(ros_spin_interval_); - ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this, - boost::asio::placeholders::error)); } Socket& socket() @@ -119,12 +109,14 @@ class Session : boost::noncopyable active_ = true; attempt_sync(); read_sync_header(); + + spinner_.start(); } void stop() { - // Abort any pending ROS callbacks. - ros_callback_queue_.clear(); + // Stop the ros::AsyncSpinner + spinner_.stop(); // Abort active session timer callbacks, if present. sync_timer_.cancel(); @@ -169,26 +161,6 @@ class Session : boost::noncopyable } private: - /** - * Periodic function which handles calling ROS callbacks, executed on the same - * io_service thread to avoid a concurrency nightmare. - */ - void ros_spin_timeout(const boost::system::error_code& error) { - ros_callback_queue_.callAvailable(); - - if (ros::ok()) - { - // Call again next interval. - ros_spin_timer_.expires_from_now(ros_spin_interval_); - ros_spin_timer_.async_wait(boost::bind(&Session::ros_spin_timeout, this, - boost::asio::placeholders::error)); - } - else - { - shutdown(); - } - } - //// RECEIVING MESSAGES //// // TODO: Total message timeout, implement primarily in ReadBuffer. @@ -309,6 +281,13 @@ class Session : boost::noncopyable stream << msg_checksum; ROS_DEBUG_NAMED("async_write", "Sending buffer of %d bytes to client.", length); + + // Will call immediately if we are already on the io_service thread. Otherwise, + // the request is queued up and executed on that thread. + socket_.get_io_service().dispatch(boost::bind(&Session::write_buffer, this, buffer_ptr)); + } + + void write_buffer(BufferPtr buffer_ptr) { boost::asio::async_write(socket_, boost::asio::buffer(*buffer_ptr), boost::bind(&Session::write_completion_cb, this, boost::asio::placeholders::error, buffer_ptr)); } @@ -429,6 +408,8 @@ class Session : boost::noncopyable publishers_[topic_info.topic_id] = pub; set_sync_timeout(timeout_interval_); + + ROS_INFO("publisher name: %s, type: %s, id: %d", topic_info.topic_name.c_str(), topic_info.message_type.c_str(), topic_info.topic_id); } void setup_subscriber(ros::serialization::IStream& stream) { @@ -440,6 +421,8 @@ class Session : boost::noncopyable subscribers_[topic_info.topic_id] = sub; set_sync_timeout(timeout_interval_); + + ROS_INFO("subscirber name: %s, type: %s, id: %d", topic_info.topic_name.c_str(), topic_info.message_type.c_str(), topic_info.topic_id); } // When the rosserial client creates a ServiceClient object (and/or when it registers that object with the NodeHandle) @@ -453,7 +436,7 @@ class Session : boost::noncopyable ros::serialization::Serializer::read(stream, topic_info); if (!service_clients_.count(topic_info.topic_name)) { - ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str()); + ROS_INFO("Creating service client for topic %s",topic_info.topic_name.c_str()); ServiceClientPtr srv(new ServiceClient( nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); service_clients_[topic_info.topic_name] = srv; @@ -473,7 +456,7 @@ class Session : boost::noncopyable ros::serialization::Serializer::read(stream, topic_info); if (!service_clients_.count(topic_info.topic_name)) { - ROS_DEBUG("Creating service client for topic %s",topic_info.topic_name.c_str()); + ROS_INFO("Creating service client for topic %s",topic_info.topic_name.c_str()); ServiceClientPtr srv(new ServiceClient( nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); service_clients_[topic_info.topic_name] = srv; @@ -497,7 +480,7 @@ class Session : boost::noncopyable ros::serialization::Serializer::read(stream, topic_info); if (!service_servers_.count(topic_info.topic_name)) { - ROS_DEBUG("Creating service server for topic %s",topic_info.topic_name.c_str()); + ROS_INFO("Creating service server for topic %s",topic_info.topic_name.c_str()); ServiceServerPtr srv(new ServiceServer( nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); service_servers_[topic_info.topic_name] = srv; @@ -517,7 +500,7 @@ class Session : boost::noncopyable ros::serialization::Serializer::read(stream, topic_info); if (!service_servers_.count(topic_info.topic_name)) { - ROS_DEBUG("Creating service server for topic %s",topic_info.topic_name.c_str()); + ROS_INFO("Creating service server for topic %s",topic_info.topic_name.c_str()); ServiceServerPtr srv(new ServiceServer( nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); service_servers_[topic_info.topic_name] = srv; @@ -568,15 +551,13 @@ class Session : boost::noncopyable bool active_; ros::NodeHandle nh_; - ros::CallbackQueue ros_callback_queue_; + ros::AsyncSpinner spinner_; // Use 2 threads boost::posix_time::time_duration timeout_interval_; boost::posix_time::time_duration attempt_interval_; boost::posix_time::time_duration require_check_interval_; - boost::posix_time::time_duration ros_spin_interval_; boost::asio::deadline_timer sync_timer_; boost::asio::deadline_timer require_check_timer_; - boost::asio::deadline_timer ros_spin_timer_; std::string require_param_name_; int unrecognised_topic_retry_threshold_{ 0 }; int unrecognised_topics_{ 0 }; From e27c731975bfd00f6380ea65524202d7ae25eab2 Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Thu, 21 Nov 2019 11:12:40 +0900 Subject: [PATCH 03/15] Add gtest code for service server in rosserial client --- .../include/rosserial_server/topic_handlers.h | 4 +- rosserial_test/CMakeLists.txt | 5 ++ .../include/rosserial_test/fixture.h | 6 +- rosserial_test/package.xml | 1 + rosserial_test/src/publish_subscribe.cpp | 2 - rosserial_test/src/service_server.cpp | 60 +++++++++++++++++++ .../test/rosserial_python_serial.test | 16 ++++- .../test/rosserial_python_socket.test | 16 ++++- .../test/rosserial_server_serial.test | 16 ++++- .../test/rosserial_server_socket.test | 16 ++++- 10 files changed, 124 insertions(+), 18 deletions(-) create mode 100644 rosserial_test/src/service_server.cpp diff --git a/rosserial_server/include/rosserial_server/topic_handlers.h b/rosserial_server/include/rosserial_server/topic_handlers.h index 480af6ea7..97fd010fd 100644 --- a/rosserial_server/include/rosserial_server/topic_handlers.h +++ b/rosserial_server/include/rosserial_server/topic_handlers.h @@ -243,7 +243,7 @@ class ServiceServer { bool request_handle(topic_tools::ShapeShifter& request_message, topic_tools::ShapeShifter& response_message) { get_response_ = false; - ROS_DEBUG_STREAM("service receive " << service_server_.getService() << " request"); + ROS_ERROR_STREAM("service receive " << service_server_.getService() << " request"); size_t length = ros::serialization::serializationLength(request_message); std::vector buffer(length); @@ -273,7 +273,7 @@ class ServiceServer { void response_handle(ros::serialization::IStream stream) { ros::serialization::Serializer::read(stream, response_message_); - ROS_DEBUG_STREAM("service receive " << service_server_.getService() << " response"); + std::cout << "service receive " << service_server_.getService() << " response" << std::endl; get_response_ = true; } diff --git a/rosserial_test/CMakeLists.txt b/rosserial_test/CMakeLists.txt index 2024c0991..8b3612695 100644 --- a/rosserial_test/CMakeLists.txt +++ b/rosserial_test/CMakeLists.txt @@ -3,14 +3,18 @@ project(rosserial_test) find_package(catkin REQUIRED COMPONENTS roscpp + rosserial_arduino rosserial_client rosserial_msgs rosserial_python rosserial_server rostest std_msgs + std_srvs ) +SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + catkin_package( CATKIN_DEPENDS rosserial_msgs @@ -41,6 +45,7 @@ if(CATKIN_ENABLE_TESTING) endfunction() add_rosserial_test_executable(publish_subscribe) + add_rosserial_test_executable(service_server) add_rostest(test/rosserial_server_socket.test) add_rostest(test/rosserial_server_serial.test) diff --git a/rosserial_test/include/rosserial_test/fixture.h b/rosserial_test/include/rosserial_test/fixture.h index fe7fa1800..bae8615f6 100644 --- a/rosserial_test/include/rosserial_test/fixture.h +++ b/rosserial_test/include/rosserial_test/fixture.h @@ -74,6 +74,7 @@ class SocketSetup : public AbstractSetup { class SingleClientFixture : public ::testing::Test { protected: + SingleClientFixture(): as(2) {} static void SetModeFromParam() { std::string mode; ros::param::get("~mode", mode); @@ -90,15 +91,16 @@ class SingleClientFixture : public ::testing::Test { if (setup == NULL) SetModeFromParam(); setup->SetUp(); rosserial::ClientComms::fd = setup->fd; + as.start(); } virtual void TearDown() { setup->TearDown(); + as.stop(); } rosserial::ros::NodeHandle client_nh; ros::NodeHandle nh; + ros::AsyncSpinner as; static AbstractSetup* setup; }; AbstractSetup* SingleClientFixture::setup = NULL; - - diff --git a/rosserial_test/package.xml b/rosserial_test/package.xml index 2f9d6b698..8673888e2 100644 --- a/rosserial_test/package.xml +++ b/rosserial_test/package.xml @@ -21,4 +21,5 @@ rosserial_server rostest std_msgs + std_srvs diff --git a/rosserial_test/src/publish_subscribe.cpp b/rosserial_test/src/publish_subscribe.cpp index c2ad02d49..13f295166 100644 --- a/rosserial_test/src/publish_subscribe.cpp +++ b/rosserial_test/src/publish_subscribe.cpp @@ -30,7 +30,6 @@ TEST_F(SingleClientFixture, single_publish) { for(int attempt = 0; attempt < 50; attempt++) { client_pub.publish(&string_msg); client_nh.spinOnce(); - ros::spinOnce(); if (str_callback.times_called > 0) break; ros::Duration(0.1).sleep(); } @@ -62,7 +61,6 @@ TEST_F(SingleClientFixture, single_subscribe) { string_msg.data = "to-rosserial-client"; for(int attempt = 0; attempt < 50; attempt++) { pub.publish(string_msg); - ros::spinOnce(); client_nh.spinOnce(); if (rosserial_string_cb_count > 0) break; ros::Duration(0.1).sleep(); diff --git a/rosserial_test/src/service_server.cpp b/rosserial_test/src/service_server.cpp new file mode 100644 index 000000000..dbb9533be --- /dev/null +++ b/rosserial_test/src/service_server.cpp @@ -0,0 +1,60 @@ +#include "ros/ros.h" +#include "std_srvs/Empty.h" + +namespace rosserial { +#include "rosserial_test/ros.h" +#include "rosserial/std_srvs/Empty.h" +} + +#include +#include "rosserial_test/fixture.h" +#include + +/* TODO */ +/* this test is imcomplete, since the response is not checked. */ + +bool receive = false; +static void callback(const rosserial::std_srvs::Empty::Request & req, rosserial::std_srvs::Empty::Response & res){ + receive = true; +} + +/** + * Single service call from a rosserial-connected client, + * verified from a roscpp rosservice server. + */ +TEST_F(SingleClientFixture, single_service_server) { + + rosserial::ros::ServiceServer client_server("test_srv",&callback); + client_nh.advertiseService(client_server); + client_nh.initNode(); + + // Roscpp rosservice client to call for the client. + ros::ServiceClient test_client = nh.serviceClient("test_srv"); + std_srvs::Empty srv; + + for(int attempt = 0; attempt < 50; attempt++) { + if (attempt % 10 == 0 && attempt > 0) { + + if (test_client.call(srv)) { + ROS_ERROR("Receive response"); + break; + } + else { + ROS_ERROR("Failed to call service test_srv"); + } + } + + if(receive) break; + client_nh.spinOnce(); + ros::Duration(0.1).sleep(); + } + + EXPECT_TRUE(receive); +} + +int main(int argc, char **argv){ + ros::init(argc, argv, "test_service_server"); + ros::start(); + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/rosserial_test/test/rosserial_python_serial.test b/rosserial_test/test/rosserial_python_serial.test index b28c6b4e1..bbaeb2d95 100644 --- a/rosserial_test/test/rosserial_python_serial.test +++ b/rosserial_test/test/rosserial_python_serial.test @@ -1,11 +1,21 @@ - + + + + - - + + + + + + + + diff --git a/rosserial_test/test/rosserial_python_socket.test b/rosserial_test/test/rosserial_python_socket.test index 538554aff..063ba607f 100644 --- a/rosserial_test/test/rosserial_python_socket.test +++ b/rosserial_test/test/rosserial_python_socket.test @@ -1,9 +1,19 @@ - + + + + - - + + + + + + + + diff --git a/rosserial_test/test/rosserial_server_serial.test b/rosserial_test/test/rosserial_server_serial.test index 72feaf30c..402679fef 100644 --- a/rosserial_test/test/rosserial_server_serial.test +++ b/rosserial_test/test/rosserial_server_serial.test @@ -1,11 +1,21 @@ + + + - + - - + + + + + + + + diff --git a/rosserial_test/test/rosserial_server_socket.test b/rosserial_test/test/rosserial_server_socket.test index ee597fbe4..ba588532d 100644 --- a/rosserial_test/test/rosserial_server_socket.test +++ b/rosserial_test/test/rosserial_server_socket.test @@ -1,11 +1,21 @@ + + + - + - - + + + + + + + + From 3dd5ff3baeb622878dc121b9e4c7b740c1907654 Mon Sep 17 00:00:00 2001 From: moju zhao Date: Mon, 28 Sep 2020 07:21:04 +0900 Subject: [PATCH 04/15] Resolve the double free or corruption problem in the rosservice callback function. --- rosserial_server/include/rosserial_server/session.h | 6 ++---- .../include/rosserial_server/topic_handlers.h | 13 ++++++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index cf2bfba61..d6331c44e 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -481,8 +481,7 @@ class Session : boost::noncopyable if (!service_servers_.count(topic_info.topic_name)) { ROS_INFO("Creating service server for topic %s",topic_info.topic_name.c_str()); - ServiceServerPtr srv(new ServiceServer( - nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); + ServiceServerPtr srv(new ServiceServer(nh_, topic_info, buffer_max, boost::bind(&Session::write_message, this, _1, _2))); service_servers_[topic_info.topic_name] = srv; callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::response_handle, srv, _1); } @@ -501,8 +500,7 @@ class Session : boost::noncopyable if (!service_servers_.count(topic_info.topic_name)) { ROS_INFO("Creating service server for topic %s",topic_info.topic_name.c_str()); - ServiceServerPtr srv(new ServiceServer( - nh_,topic_info,boost::bind(&Session::write_message, this, _1, _2))); + ServiceServerPtr srv(new ServiceServer(nh_, topic_info, buffer_max, boost::bind(&Session::write_message, this, _1, _2))); service_servers_[topic_info.topic_name] = srv; callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::response_handle, srv, _1); } diff --git a/rosserial_server/include/rosserial_server/topic_handlers.h b/rosserial_server/include/rosserial_server/topic_handlers.h index 97fd010fd..8012612ea 100644 --- a/rosserial_server/include/rosserial_server/topic_handlers.h +++ b/rosserial_server/include/rosserial_server/topic_handlers.h @@ -193,9 +193,10 @@ typedef boost::shared_ptr ServiceClientPtr; class ServiceServer { public: - ServiceServer(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info, + ServiceServer(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info, size_t capacity, boost::function& buffer, const uint16_t topic_id)> write_fn) : write_fn_(write_fn) { + response_buffer_.resize(capacity); topic_id_ = -1; get_response_ = false; @@ -265,21 +266,23 @@ class ServiceServer { } } - response_message = response_message_; + ros::serialization::IStream response_stream(&response_buffer_[0], buffer_len_); + ros::serialization::Serializer::read(response_stream, response_message); ROS_ERROR_STREAM("OK!!"); return true; } void response_handle(ros::serialization::IStream stream) { - ros::serialization::Serializer::read(stream, response_message_); + buffer_len_ = stream.getLength(); + std::memcpy(&response_buffer_[0], stream.getData(),stream.getLength()); std::cout << "service receive " << service_server_.getService() << " response" << std::endl; get_response_ = true; } private: - //topic_tools::ShapeShifter request_message_; - topic_tools::ShapeShifter response_message_; + std::vector response_buffer_; + size_t buffer_len_; ros::ServiceServer service_server_; boost::function& buffer, const uint16_t topic_id)> write_fn_; std::string service_md5_; From 8215b1ae858797a80fd391856a6bb01d8bd95613 Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Fri, 3 Jun 2022 23:18:19 +0900 Subject: [PATCH 05/15] Refactor the wating timeout for service server --- .../include/rosserial_server/topic_handlers.h | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rosserial_server/include/rosserial_server/topic_handlers.h b/rosserial_server/include/rosserial_server/topic_handlers.h index 8012612ea..2fa08e0a9 100644 --- a/rosserial_server/include/rosserial_server/topic_handlers.h +++ b/rosserial_server/include/rosserial_server/topic_handlers.h @@ -194,8 +194,8 @@ typedef boost::shared_ptr ServiceClientPtr; class ServiceServer { public: ServiceServer(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info, size_t capacity, - boost::function& buffer, const uint16_t topic_id)> write_fn) - : write_fn_(write_fn) { + boost::function& buffer, const uint16_t topic_id)> write_fn, double timeout = 10) + : write_fn_(write_fn), timeout_(timeout) { response_buffer_.resize(capacity); topic_id_ = -1; get_response_ = false; @@ -244,7 +244,7 @@ class ServiceServer { bool request_handle(topic_tools::ShapeShifter& request_message, topic_tools::ShapeShifter& response_message) { get_response_ = false; - ROS_ERROR_STREAM("service receive " << service_server_.getService() << " request"); + ROS_DEBUG_STREAM("service receive " << service_server_.getService() << " request"); size_t length = ros::serialization::serializationLength(request_message); std::vector buffer(length); @@ -254,14 +254,14 @@ class ServiceServer { write_fn_(buffer,topic_id_); //wait for the response - int cnt = 0; + ros::Time start_t = ros::Time::now(); while(!get_response_) { ros::Duration(0.01).sleep(); - if (cnt++ > 500) // hard-coding + if (ros::Time::now().toSec() - start_t.toSec() > timeout_) { - ROS_WARN_STREAM(service_server_.getService() << ": no response!!"); + ROS_WARN_STREAM(service_server_.getService() << ": no response for " << timeout_ << ", give up."); return false; } } @@ -269,14 +269,13 @@ class ServiceServer { ros::serialization::IStream response_stream(&response_buffer_[0], buffer_len_); ros::serialization::Serializer::read(response_stream, response_message); - ROS_ERROR_STREAM("OK!!"); return true; } void response_handle(ros::serialization::IStream stream) { buffer_len_ = stream.getLength(); std::memcpy(&response_buffer_[0], stream.getData(),stream.getLength()); - std::cout << "service receive " << service_server_.getService() << " response" << std::endl; + ROS_DEBUG_STREAM("service receive " << service_server_.getService() << " response"); get_response_ = true; } @@ -289,6 +288,7 @@ class ServiceServer { std::string request_message_md5_; std::string response_message_md5_; uint16_t topic_id_; + double timeout_; bool get_response_; }; From 20461b00679b418eeab0ce04fc2defda4b1424d8 Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Fri, 3 Jun 2022 23:18:56 +0900 Subject: [PATCH 06/15] Change the level of verbose for sync lost --- rosserial_server/include/rosserial_server/session.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index d6331c44e..cb610a419 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -329,7 +329,7 @@ class Session : boost::noncopyable void sync_timeout(const boost::system::error_code& error) { if (error != boost::asio::error::operation_aborted) { - ROS_DEBUG("Sync with device lost."); + ROS_WARN("Sync with device lost."); stop(); } } From d02e2658dc5da1a0575ea3e6dfd93e01f6f644cd Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Fri, 3 Jun 2022 23:19:12 +0900 Subject: [PATCH 07/15] Add arg for serial.launch --- rosserial_server/launch/serial.launch | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rosserial_server/launch/serial.launch b/rosserial_server/launch/serial.launch index 8928429b7..43768659a 100644 --- a/rosserial_server/launch/serial.launch +++ b/rosserial_server/launch/serial.launch @@ -1,6 +1,8 @@ - + + + From dd5f2e7f2b00647b6f210c0826d82dfed31e4a37 Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Sat, 4 Jun 2022 00:01:30 +0900 Subject: [PATCH 08/15] Switch the service reponse and reuqest md5sum check --- rosserial_server/include/rosserial_server/session.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index cb610a419..1a3a389f0 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -485,7 +485,7 @@ class Session : boost::noncopyable service_servers_[topic_info.topic_name] = srv; callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::response_handle, srv, _1); } - if (service_servers_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) { + if (service_servers_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) { ROS_WARN("Service server setup: Request message MD5 mismatch between rosserial server and ROS"); } else { ROS_DEBUG("Service server %s: request message MD5 successfully validated as %s", @@ -506,7 +506,7 @@ class Session : boost::noncopyable } // see above comment regarding the service server callback for why we set topic_id here service_servers_[topic_info.topic_name]->setTopicId(topic_info.topic_id); - if (service_servers_[topic_info.topic_name]->getResponseMessageMD5() != topic_info.md5sum) { + if (service_servers_[topic_info.topic_name]->getRequestMessageMD5() != topic_info.md5sum) { ROS_WARN("Service server setup: Response message MD5 mismatch between rosserial server and ROS"); } else { ROS_DEBUG("Service server %s: response message MD5 successfully validated as %s", From c1ca58f4d92106d5dac03656f64cfc9480cd4299 Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Sat, 4 Jun 2022 00:16:51 +0900 Subject: [PATCH 09/15] Separate subscriber callbacks and service server callbacks into two different AsyncSpinner to run them in different threads. --- .../include/rosserial_server/session.h | 16 ++++++++++------ .../include/rosserial_server/topic_handlers.h | 3 ++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index 1a3a389f0..1fa4c02e8 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -65,7 +65,7 @@ class Session : boost::noncopyable socket_(io_service), sync_timer_(io_service), require_check_timer_(io_service), - spinner_(2), + sub_spinner_(1), srv_spinner_(1, &ros_srv_callback_queue_), async_read_buffer_(socket_, buffer_max, boost::bind(&Session::read_failed, this, boost::asio::placeholders::error)) @@ -110,13 +110,15 @@ class Session : boost::noncopyable attempt_sync(); read_sync_header(); - spinner_.start(); + sub_spinner_.start(); + srv_spinner_.start(); } void stop() { // Stop the ros::AsyncSpinner - spinner_.stop(); + sub_spinner_.stop(); + srv_spinner_.stop(); // Abort active session timer callbacks, if present. sync_timer_.cancel(); @@ -481,7 +483,7 @@ class Session : boost::noncopyable if (!service_servers_.count(topic_info.topic_name)) { ROS_INFO("Creating service server for topic %s",topic_info.topic_name.c_str()); - ServiceServerPtr srv(new ServiceServer(nh_, topic_info, buffer_max, boost::bind(&Session::write_message, this, _1, _2))); + ServiceServerPtr srv(new ServiceServer(nh_, ros_srv_callback_queue_, topic_info, buffer_max, boost::bind(&Session::write_message, this, _1, _2))); service_servers_[topic_info.topic_name] = srv; callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::response_handle, srv, _1); } @@ -500,7 +502,7 @@ class Session : boost::noncopyable if (!service_servers_.count(topic_info.topic_name)) { ROS_INFO("Creating service server for topic %s",topic_info.topic_name.c_str()); - ServiceServerPtr srv(new ServiceServer(nh_, topic_info, buffer_max, boost::bind(&Session::write_message, this, _1, _2))); + ServiceServerPtr srv(new ServiceServer(nh_, ros_srv_callback_queue_, topic_info, buffer_max, boost::bind(&Session::write_message, this, _1, _2))); service_servers_[topic_info.topic_name] = srv; callbacks_[topic_info.topic_id] = boost::bind(&ServiceServer::response_handle, srv, _1); } @@ -549,7 +551,9 @@ class Session : boost::noncopyable bool active_; ros::NodeHandle nh_; - ros::AsyncSpinner spinner_; // Use 2 threads + ros::AsyncSpinner sub_spinner_; // For sub topics + ros::AsyncSpinner srv_spinner_; // For service server + ros::CallbackQueue ros_srv_callback_queue_; boost::posix_time::time_duration timeout_interval_; boost::posix_time::time_duration attempt_interval_; diff --git a/rosserial_server/include/rosserial_server/topic_handlers.h b/rosserial_server/include/rosserial_server/topic_handlers.h index 2fa08e0a9..7ddcb0565 100644 --- a/rosserial_server/include/rosserial_server/topic_handlers.h +++ b/rosserial_server/include/rosserial_server/topic_handlers.h @@ -193,7 +193,7 @@ typedef boost::shared_ptr ServiceClientPtr; class ServiceServer { public: - ServiceServer(ros::NodeHandle& nh, rosserial_msgs::TopicInfo& topic_info, size_t capacity, + ServiceServer(ros::NodeHandle& nh, ros::CallbackQueue& service_queue, rosserial_msgs::TopicInfo& topic_info, size_t capacity, boost::function& buffer, const uint16_t topic_id)> write_fn, double timeout = 10) : write_fn_(write_fn), timeout_(timeout) { response_buffer_.resize(capacity); @@ -219,6 +219,7 @@ class ServiceServer { response_message_md5_ = respinfo.md5sum; ros::AdvertiseServiceOptions opts; + opts.callback_queue = &service_queue; opts.service = topic_info.topic_name; opts.md5sum = service_md5_; opts.datatype = topic_info.message_type; From 90f454392bc1040fe5200cfc6afda89a143541c8 Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Sat, 4 Jun 2022 00:38:57 +0900 Subject: [PATCH 10/15] Send ID_TX_STOP on shutdown. --- rosserial_server/include/rosserial_server/session.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index 1fa4c02e8..514196108 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -132,6 +132,10 @@ class Session : boost::noncopyable service_clients_.clear(); service_servers_.clear(); + // Send disconnection message + std::vector message(1); + write_message(message, rosserial_msgs::TopicInfo::ID_TX_STOP); + // Close the socket. socket_.close(); active_ = false; From 88cdb93bc09c85f6b89074c502720f7a5616ceb8 Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Sat, 4 Jun 2022 01:37:57 +0900 Subject: [PATCH 11/15] Compatible usage of boost asio dispath for different boost version --- rosserial_server/include/rosserial_server/session.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index 514196108..5fc3e9f2f 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -290,7 +291,11 @@ class Session : boost::noncopyable // Will call immediately if we are already on the io_service thread. Otherwise, // the request is queued up and executed on that thread. +#if BOOST_VERSION >= 107000 + boost::asio::dispatch(socket_.get_executor(), boost::bind(&Session::write_buffer, this, buffer_ptr)); +#else socket_.get_io_service().dispatch(boost::bind(&Session::write_buffer, this, buffer_ptr)); +#endif } void write_buffer(BufferPtr buffer_ptr) { From 169e031a17f2f0153574406b2501e718401e263b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikko=20Sepp=C3=A4l=C3=A4?= Date: Sun, 7 Aug 2022 18:53:21 +0300 Subject: [PATCH 12/15] Replace busy wait loop with condition variables --- .../include/rosserial_server/topic_handlers.h | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/rosserial_server/include/rosserial_server/topic_handlers.h b/rosserial_server/include/rosserial_server/topic_handlers.h index 7ddcb0565..05c558211 100644 --- a/rosserial_server/include/rosserial_server/topic_handlers.h +++ b/rosserial_server/include/rosserial_server/topic_handlers.h @@ -40,6 +40,10 @@ #include #include +#include +#include +#include + namespace rosserial_server { @@ -255,22 +259,20 @@ class ServiceServer { write_fn_(buffer,topic_id_); //wait for the response - ros::Time start_t = ros::Time::now(); - while(!get_response_) - { - ros::Duration(0.01).sleep(); - - if (ros::Time::now().toSec() - start_t.toSec() > timeout_) - { - ROS_WARN_STREAM(service_server_.getService() << ": no response for " << timeout_ << ", give up."); - return false; - } - } - - ros::serialization::IStream response_stream(&response_buffer_[0], buffer_len_); - ros::serialization::Serializer::read(response_stream, response_message); - - return true; + std::unique_lock lk(service_mutex_); + if(cv_.wait_for(lk, std::chrono::milliseconds((int)(timeout_*1000)), [this]{ return get_response_; })) + { + ros::serialization::IStream response_stream(&response_buffer_[0], buffer_len_); + ros::serialization::Serializer::read(response_stream, response_message); + service_mutex_.unlock(); + return true; + } + else + { + ROS_WARN_STREAM(service_server_.getService() << ": no response for " << timeout_ << ", give up."); + service_mutex_.unlock(); + return false; + } } void response_handle(ros::serialization::IStream stream) { @@ -278,6 +280,7 @@ class ServiceServer { std::memcpy(&response_buffer_[0], stream.getData(),stream.getLength()); ROS_DEBUG_STREAM("service receive " << service_server_.getService() << " response"); get_response_ = true; + cv_.notify_one(); } private: @@ -290,7 +293,8 @@ class ServiceServer { std::string response_message_md5_; uint16_t topic_id_; double timeout_; - + std::condition_variable cv_; + std::mutex service_mutex_; bool get_response_; }; From 5d9b20e2fddc90f26d32f397e8b76818911cd113 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikko=20Sepp=C3=A4l=C3=A4?= Date: Sun, 7 Aug 2022 18:54:27 +0300 Subject: [PATCH 13/15] Match sync timeout with rosserial_python --- rosserial_server/include/rosserial_server/session.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index 5fc3e9f2f..c66da61b2 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -73,7 +73,7 @@ class Session : boost::noncopyable { active_ = false; - timeout_interval_ = boost::posix_time::milliseconds(5000); + timeout_interval_ = boost::posix_time::milliseconds(15000); // rosserial_python has 5s timeout, but sync timeout check multiplies it by three attempt_interval_ = boost::posix_time::milliseconds(1000); require_check_interval_ = boost::posix_time::milliseconds(1000); require_param_name_ = "~require"; From 2a8ddee6568128d62334e27c6f2a6ce461675367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikko=20Sepp=C3=A4l=C3=A4?= Date: Sun, 7 Aug 2022 19:31:41 +0300 Subject: [PATCH 14/15] Fix subscriber typo --- rosserial_server/include/rosserial_server/session.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosserial_server/include/rosserial_server/session.h b/rosserial_server/include/rosserial_server/session.h index c66da61b2..df0e50da5 100644 --- a/rosserial_server/include/rosserial_server/session.h +++ b/rosserial_server/include/rosserial_server/session.h @@ -433,7 +433,7 @@ class Session : boost::noncopyable set_sync_timeout(timeout_interval_); - ROS_INFO("subscirber name: %s, type: %s, id: %d", topic_info.topic_name.c_str(), topic_info.message_type.c_str(), topic_info.topic_id); + ROS_INFO("subscriber name: %s, type: %s, id: %d", topic_info.topic_name.c_str(), topic_info.message_type.c_str(), topic_info.topic_id); } // When the rosserial client creates a ServiceClient object (and/or when it registers that object with the NodeHandle) From 01b93de1e89b766ce97f65f9db961496080be99a Mon Sep 17 00:00:00 2001 From: Moju Zhao Date: Wed, 19 Oct 2022 01:08:24 +0900 Subject: [PATCH 15/15] [rosserial_test] remove unecessary dependency in CMake --- rosserial_test/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/rosserial_test/CMakeLists.txt b/rosserial_test/CMakeLists.txt index 8b3612695..1eab338f1 100644 --- a/rosserial_test/CMakeLists.txt +++ b/rosserial_test/CMakeLists.txt @@ -3,7 +3,6 @@ project(rosserial_test) find_package(catkin REQUIRED COMPONENTS roscpp - rosserial_arduino rosserial_client rosserial_msgs rosserial_python