From fc1387f7843aab6ee814bbea879a472d5f7c8e2c Mon Sep 17 00:00:00 2001 From: matlabbe Date: Tue, 3 Sep 2024 17:18:06 -0700 Subject: [PATCH] Odom: removed long processing from ros callbacks (decreasing delay, also making delay independent of the message filters topic_queue_size and sync_queue_size parameters) --- .../include/rtabmap_odom/OdometryROS.h | 14 ++- rtabmap_odom/src/OdometryROS.cpp | 108 ++++++++++++------ 2 files changed, 85 insertions(+), 37 deletions(-) diff --git a/rtabmap_odom/include/rtabmap_odom/OdometryROS.h b/rtabmap_odom/include/rtabmap_odom/OdometryROS.h index d36a18d9a..fe632fd4a 100644 --- a/rtabmap_odom/include/rtabmap_odom/OdometryROS.h +++ b/rtabmap_odom/include/rtabmap_odom/OdometryROS.h @@ -43,6 +43,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include @@ -55,7 +56,7 @@ class Odometry; namespace rtabmap_odom { -class OdometryROS : public nodelet::Nodelet +class OdometryROS : public nodelet::Nodelet, public UThread { public: @@ -94,6 +95,9 @@ class OdometryROS : public nodelet::Nodelet virtual void onOdomInit() = 0; virtual void updateParameters(rtabmap::ParametersMap & parameters) {} + virtual void mainLoop(); + virtual void mainLoopKill(); + void callbackIMU(const sensor_msgs::ImuConstPtr& msg); void reset(const rtabmap::Transform & pose = rtabmap::Transform::getIdentity()); @@ -138,6 +142,13 @@ class OdometryROS : public nodelet::Nodelet tf::TransformListener tfListener_; ros::Subscriber imuSub_; + // Safe-threading + UMutex imuMutex_; + UMutex dataMutex_; + USemaphore dataReady_; + rtabmap::SensorData dataToProcess_; + std_msgs::Header dataHeaderToProcess_; + bool paused_; int resetCountdown_; int resetCurrentCount_; @@ -156,7 +167,6 @@ class OdometryROS : public nodelet::Nodelet bool waitIMUToinit_; bool imuProcessed_; std::map imus_; - std::pair bufferedData_; rtabmap_util::ULogToRosout ulogToRosout_; diff --git a/rtabmap_odom/src/OdometryROS.cpp b/rtabmap_odom/src/OdometryROS.cpp index 1219a87a0..d9150749b 100644 --- a/rtabmap_odom/src/OdometryROS.cpp +++ b/rtabmap_odom/src/OdometryROS.cpp @@ -93,6 +93,7 @@ OdometryROS::OdometryROS(bool stereoParams, bool visParams, bool icpParams) : OdometryROS::~OdometryROS() { + this->join(true); delete odometry_; } @@ -379,6 +380,8 @@ void OdometryROS::onInit() NODELET_INFO("odometry: Subscribing to IMU topic %s", imuSub_.getTopic().c_str()); } + this->start(); + onOdomInit(); } @@ -433,17 +436,12 @@ void OdometryROS::callbackIMU(const sensor_msgs::ImuConstPtr& msg) cv::Mat(3,3,CV_64FC1,(void*)msg->linear_acceleration_covariance.data()).clone(), localTransform); - imus_.insert(std::make_pair(stamp, imu)); - - if(bufferedData_.first.isValid() && stamp > bufferedData_.first.stamp()) - { - SensorData data = bufferedData_.first; - bufferedData_.first = SensorData(); - processData(data, bufferedData_.second); - } + UScopeMutex m(imuMutex_); + imus_.insert(std::make_pair(stamp, imu)); if(imus_.size() > 1000) { + NODELET_WARN("Dropping imu data!"); imus_.erase(imus_.begin()); } } @@ -451,40 +449,75 @@ void OdometryROS::callbackIMU(const sensor_msgs::ImuConstPtr& msg) void OdometryROS::processData(SensorData & data, const std_msgs::Header & header) { - if((waitIMUToinit_ && !imuProcessed_) && odometry_->framesProcessed() == 0 && odometry_->getPose().isIdentity() && imus_.empty()) + //NODELET_WARN("Received image: %f delay=%f", data.stamp(), (ros::Time::now() - header.stamp).toSec()); + if(dataMutex_.lockTry() == 0) + { + dataToProcess_ = data; + dataHeaderToProcess_ = header; + dataReady_.release(); + dataMutex_.unlock(); + } + else { - NODELET_WARN("odometry: waiting imu (%s) to initialize orientation (wait_imu_to_init=true)", imuSub_.getTopic().c_str()); + NODELET_INFO("Dropping image/scan data"); + } +} + +void OdometryROS::mainLoopKill() +{ + // in case we were waiting, unblock thread + dataReady_.release(); +} + +void OdometryROS::mainLoop() +{ + dataReady_.acquire(); + + if(!this->isRunning()) + { + // thread killed return; } - if(waitIMUToinit_ && (imus_.empty() || imus_.rbegin()->first < header.stamp.toSec())) + UScopeMutex lock(dataMutex_); + + // aliases + SensorData & data = dataToProcess_; + std_msgs::Header & header = dataHeaderToProcess_; + + std::vector > imus; { - //NODELET_WARN("No imu received with higher stamp than last image (%f)! Buffering this image until we get more imu msgs...", stamp.toSec()); + UScopeMutex m(imuMutex_); - // keep in cache to process later when we will receive imu msgs - if(bufferedData_.first.isValid()) + if((waitIMUToinit_ && !imuProcessed_) && odometry_->framesProcessed() == 0 && odometry_->getPose().isIdentity() && imus_.empty()) { - NODELET_ERROR("Overwriting previous data! Make sure IMU is " - "published faster than data rate. (last image stamp " - "buffered=%f and new one is %f, last imu stamp received=%f)", - bufferedData_.first.stamp(), data.stamp(), imus_.empty()?0:imus_.rbegin()->first); + NODELET_WARN("odometry: waiting imu (%s) to initialize orientation (wait_imu_to_init=true)", imuSub_.getTopic().c_str()); + return; + } + + if(waitIMUToinit_ && (imus_.empty() || imus_.rbegin()->first < header.stamp.toSec())) + { + NODELET_ERROR("Make sure IMU is published faster than data rate! (last image stamp=%f and last imu stamp received=%f)", + data.stamp(), imus_.empty()?0:imus_.rbegin()->first); + return; + } + // process all imu data up to current image stamp (or just after so that underlying odom approach can do interpolation of imu at image stamp) + std::map::iterator iterEnd = imus_.lower_bound(header.stamp.toSec()); + if(iterEnd!= imus_.end()) + { + ++iterEnd; + } + for(std::map::iterator iter=imus_.begin(); iter!=iterEnd;) + { + imus.push_back(*iter); + imus_.erase(iter++); } - bufferedData_.first = data; - bufferedData_.second = header; - return; - } - // process all imu data up to current image stamp (or just after so that underlying odom approach can do interpolation of imu at image stamp) - std::map::iterator iterEnd = imus_.lower_bound(header.stamp.toSec()); - if(iterEnd!= imus_.end()) - { - ++iterEnd; } - for(std::map::iterator iter=imus_.begin(); iter!=iterEnd;) + + for(size_t i=0; ifirst); - SensorData dataIMU(iter->second, 0, iter->first); + SensorData dataIMU(imus[i].second, 0, imus[i].first); odometry_->process(dataIMU); - imus_.erase(iter++); imuProcessed_ = true; } @@ -1020,20 +1053,21 @@ void OdometryROS::processData(SensorData & data, const std_msgs::Header & header odomSensorDataCompressedPub_.publish(msg); } + double delay = (ros::Time::now() - header.stamp).toSec(); if(visParams_) { if(icpParams_) { - NODELET_INFO( "Odom: quality=%d, ratio=%f, std dev=%fm|%frad, update time=%fs", info.reg.inliers, info.reg.icpInliersRatio, pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(0,0)), pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(5,5)), (ros::WallTime::now()-time).toSec()); + NODELET_INFO( "Odom: quality=%d, ratio=%f, std dev=%fm|%frad, update time=%fs, delay=%fs", info.reg.inliers, info.reg.icpInliersRatio, pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(0,0)), pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(5,5)), (ros::WallTime::now()-time).toSec(), delay); } else { - NODELET_INFO( "Odom: quality=%d, std dev=%fm|%frad, update time=%fs", info.reg.inliers, pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(0,0)), pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(5,5)), (ros::WallTime::now()-time).toSec()); + NODELET_INFO( "Odom: quality=%d, std dev=%fm|%frad, update time=%fs, delay=%fs", info.reg.inliers, pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(0,0)), pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(5,5)), (ros::WallTime::now()-time).toSec(), delay); } } else // if(icpParams_) { - NODELET_INFO( "Odom: ratio=%f, std dev=%fm|%frad, update time=%fs", info.reg.icpInliersRatio, pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(0,0)), pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(5,5)), (ros::WallTime::now()-time).toSec()); + NODELET_INFO( "Odom: ratio=%f, std dev=%fm|%frad, update time=%fs, delay=%fs", info.reg.icpInliersRatio, pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(0,0)), pose.isNull()?0.0f:std::sqrt(info.reg.covariance.at(5,5)), (ros::WallTime::now()-time).toSec(), delay); } statusDiagnostic_.setStatus(pose.isNull()); @@ -1066,14 +1100,18 @@ bool OdometryROS::resetToPose(rtabmap_msgs::ResetPose::Request& req, rtabmap_msg void OdometryROS::reset(const Transform & pose) { + UScopeMutex lock(dataMutex_); odometry_->reset(pose); guess_.setNull(); guessPreviousPose_.setNull(); previousStamp_ = 0.0; resetCurrentCount_ = resetCountdown_; imuProcessed_ = false; - bufferedData_.first= SensorData(); + dataToProcess_ = SensorData(); + dataHeaderToProcess_ = std_msgs::Header(); + imuMutex_.lock(); imus_.clear(); + imuMutex_.unlock(); this->flushCallbacks(); }