Skip to content

Commit

Permalink
Merge pull request #26 from HuaYuXiao/multithread-dev
Browse files Browse the repository at this point in the history
v2.1.5 multithread supported
  • Loading branch information
HuaYuXiao authored Dec 24, 2024
2 parents 6179008 + 60d4f13 commit 36df6f2
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 60 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v2.1.5 - 2024-12-24
- [upgrade] apply multithread to process several pointclouds parallely
- [upgrade] store pointclouds with `std::deque`
- support for one single pointcloud input

## v2.1.4 - 2024-12-24
- [upgrade] set `queue_size` of `Subscriber` to `boost::thread::hardware_concurrency()`
- [upgrade] check if both `frame_id`s are the same
Expand Down
31 changes: 19 additions & 12 deletions include/merge_pcl/merge_pcl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#ifndef MERGE_PCL_H
#define MERGE_PCL_H

#include <thread>
#include <deque>
#include <mutex>
#include <ros/ros.h>
#include <ros/rate.h>
#include <pcl_ros/point_cloud.h>
Expand All @@ -16,9 +19,6 @@
#include <pcl/point_types.h>
#include <pcl/kdtree/kdtree_flann.h>
#include <message_filters/subscriber.h>
#include <message_filters/sync_policies/approximate_time.h>
#include <message_filters/sync_policies/exact_time.h>
#include <message_filters/time_synchronizer.h>
#include <sensor_msgs/PointCloud2.h>

using namespace std;
Expand All @@ -28,23 +28,30 @@ class PointCloudMerger {
PointCloudMerger();
~PointCloudMerger() = default;

void mergeCallback(const sensor_msgs::PointCloud2ConstPtr& pcl2_0,
const sensor_msgs::PointCloud2ConstPtr& pcl2_1);
// Method to process buffers
void processBuffers();

private:
typedef message_filters::sync_policies::ApproximateTime<sensor_msgs::PointCloud2, sensor_msgs::PointCloud2>
SyncPolicy;
typedef shared_ptr<message_filters::Synchronizer<SyncPolicy>> Synchronizer;

ros::NodeHandle nh;

int pcl2_source_num;
size_t queue_size;
double timeout;

std::string pcl2_topic_0, pcl2_frame_0;
ros::Subscriber pcl2_sub_0;
void pcl2Callback0(const sensor_msgs::PointCloud2ConstPtr& pcl2);

std::string pcl2_topic_1, pcl2_frame_1;
shared_ptr<message_filters::Subscriber<sensor_msgs::PointCloud2>> pcl2_sub_0, pcl2_sub_1;
int queue_size;
Synchronizer synchronizer_;
ros::Subscriber pcl2_sub_1;
void pcl2Callback1(const sensor_msgs::PointCloud2ConstPtr& pcl2);

// Buffers to store point clouds
std::deque<pcl::PointCloud<pcl::PointXYZ>::Ptr> buffer_0;
std::deque<pcl::PointCloud<pcl::PointXYZ>::Ptr> buffer_1;
std::mutex buffer_mutex;

// Transform listeners
tf::TransformListener listener0, listener1;
double tf_duration;

Expand Down
2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<package format="2">
<name>easondrone_mapping</name>
<version>2.1.4</version>
<version>2.1.5</version>
<description>
A ROS package for mapping via octomap
</description>
Expand Down
131 changes: 84 additions & 47 deletions src/merge_pcl/merge_pcl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,82 +2,119 @@

using namespace std;

PointCloudMerger::PointCloudMerger() : nh("~"){
PointCloudMerger::PointCloudMerger() : nh("~") {
nh.param<int>("pcl2_source_num", pcl2_source_num, 2);
queue_size = boost::thread::hardware_concurrency();
nh.param<double>("timeout", timeout, 0.5);

nh.param<string>("pcl2_topic_0", pcl2_topic_0, "");
nh.param<string>("pcl2_frame_0", pcl2_frame_0, "");
pcl2_sub_0 = nh.subscribe<sensor_msgs::PointCloud2>
(pcl2_topic_0, queue_size, &PointCloudMerger::pcl2Callback0, this);

nh.param<string>("pcl2_topic_1", pcl2_topic_1, "");
nh.param<string>("pcl2_frame_1", pcl2_frame_1, "");
pcl2_sub_1 = nh.subscribe<sensor_msgs::PointCloud2>
(pcl2_topic_1, queue_size, &PointCloudMerger::pcl2Callback1, this);

nh.param<double>("tf_duration", tf_duration, 0.1);
listener0.waitForTransform(pcl2_frame_out, pcl2_frame_0, ros::Time(0), ros::Duration(tf_duration));
listener1.waitForTransform(pcl2_frame_out, pcl2_frame_1, ros::Time(0), ros::Duration(tf_duration));

nh.param<string>("pcl2_topic_out", pcl2_topic_out, "");
nh.param<string>("pcl2_frame_out", pcl2_frame_out, "");
pcl2_pub = nh.advertise<sensor_msgs::PointCloud2>
(pcl2_topic_out, queue_size);

queue_size = boost::thread::hardware_concurrency();

pcl2_sub_0.reset(new message_filters::Subscriber<sensor_msgs::PointCloud2>
(nh, pcl2_topic_0, queue_size));
pcl2_sub_1.reset(new message_filters::Subscriber<sensor_msgs::PointCloud2>
(nh, pcl2_topic_1, queue_size));

synchronizer_.reset(new message_filters::Synchronizer<SyncPolicy>(SyncPolicy(100), *pcl2_sub_0, *pcl2_sub_1));
synchronizer_->registerCallback(boost::bind(&PointCloudMerger::mergeCallback, this, _1, _2));

pcl2_pub = nh.advertise<sensor_msgs::PointCloud2>(pcl2_topic_out, queue_size);
ROS_INFO("[merge_pcl] Initialized!");
}

listener0.waitForTransform(pcl2_frame_out, pcl2_frame_0, ros::Time(0), ros::Duration(tf_duration));
listener1.waitForTransform(pcl2_frame_out, pcl2_frame_1, ros::Time(0), ros::Duration(tf_duration));
void PointCloudMerger::pcl2Callback0(const sensor_msgs::PointCloud2ConstPtr& pcl2) {
pcl::PointCloud<pcl::PointXYZ>::Ptr pclxyz_0(new pcl::PointCloud<pcl::PointXYZ>());
pcl::fromROSMsg(*pcl2, *pclxyz_0);

if (pcl2->header.frame_id != pcl2_frame_out) {
try {
pcl_ros::transformPointCloud(pcl2_frame_out, *pclxyz_0, *pclxyz_0, listener0);
}
catch (...) {
ROS_ERROR("Transform failed for pcl2_0!");
return;
}
}

ROS_INFO("[easondrone_mapping] merge_pcl initialized!");
std::lock_guard<std::mutex> lock(buffer_mutex);
buffer_0.push_back(pclxyz_0);
if (buffer_0.size() > queue_size) {
buffer_0.pop_front();
}
}

void PointCloudMerger::mergeCallback(const sensor_msgs::PointCloud2ConstPtr& pcl2_0, const sensor_msgs::PointCloud2ConstPtr& pcl2_1) {
pcl::PointCloud<pcl::PointXYZ>::Ptr pclxyz_0, pclxyz_1, pclxyz_output;

pclxyz_0.reset(new pcl::PointCloud<pcl::PointXYZ>());
pcl::fromROSMsg(*pcl2_0, *pclxyz_0);
// check if both frames are the same
if(pcl2_0->header.frame_id != pcl2_frame_out){
try{
pcl_ros::transformPointCloud(pcl2_frame_out, *pclxyz_0, *pclxyz_0, listener0);
}
catch(...){
ROS_ERROR("pcl_ros::transformPointCloud failed!");
void PointCloudMerger::pcl2Callback1(const sensor_msgs::PointCloud2ConstPtr& pcl2) {
pcl::PointCloud<pcl::PointXYZ>::Ptr pclxyz_1(new pcl::PointCloud<pcl::PointXYZ>());
pcl::fromROSMsg(*pcl2, *pclxyz_1);

if (pcl2->header.frame_id != pcl2_frame_out) {
try {
pcl_ros::transformPointCloud(pcl2_frame_out, *pclxyz_1, *pclxyz_1, listener1);
}
catch (...) {
ROS_ERROR("Transform failed for pcl2_1!");
return;
}
}

pclxyz_1.reset(new pcl::PointCloud<pcl::PointXYZ>());
pcl::fromROSMsg(*pcl2_1, *pclxyz_1);
// check if both frames are the same
if(pcl2_1->header.frame_id != pcl2_frame_out){
try{
pcl_ros::transformPointCloud(pcl2_frame_out, *pclxyz_1, *pclxyz_1, listener1);
}
catch(...){
ROS_ERROR("pcl_ros::transformPointCloud failed!");
}
std::lock_guard<std::mutex> lock(buffer_mutex);
buffer_1.push_back(pclxyz_1);
if (buffer_1.size() > queue_size) {
buffer_1.pop_front();
}
}

pclxyz_output.reset(new pcl::PointCloud<pcl::PointXYZ>());
void PointCloudMerger::processBuffers() {
while (ros::ok()) {
pcl::PointCloud<pcl::PointXYZ>::Ptr pclxyz_0, pclxyz_1, pclxyz_output;

{
std::lock_guard<std::mutex> lock(buffer_mutex);
if (!buffer_0.empty()) {
pclxyz_0 = buffer_0.front();
buffer_0.pop_front();
}
if (!buffer_1.empty()) {
pclxyz_1 = buffer_1.front();
buffer_1.pop_front();
}
}

pclxyz_output->header.seq = pclxyz_1->header.seq;
pclxyz_output->header.stamp = pclxyz_1->header.stamp;
pclxyz_output->header.frame_id = pcl2_frame_out;
if (pclxyz_0 || pclxyz_1) {
pclxyz_output.reset(new pcl::PointCloud<pcl::PointXYZ>());
if(pclxyz_0){
*pclxyz_output += *pclxyz_0;
}
if(pclxyz_1){
*pclxyz_output += *pclxyz_1;
}

*pclxyz_output += *pclxyz_1;
*pclxyz_output += *pclxyz_0;
sensor_msgs::PointCloud2 pcl2_output;
pcl2_output.header.frame_id = pcl2_frame_out;
pcl::toROSMsg(*pclxyz_output, pcl2_output);

sensor_msgs::PointCloud2 pcl2_output;
pcl::toROSMsg(*pclxyz_output, pcl2_output);
pcl2_pub.publish(pcl2_output);
}

if(pcl2_pub){
pcl2_pub.publish(pcl2_output);
ros::Duration(tf_duration).sleep(); // Small delay to avoid excessive CPU usage
}
}

int main(int argc, char** argv) {
ros::init(argc, argv, "merge_pcl");
PointCloudMerger pc_merger;

boost::thread processing_thread(&PointCloudMerger::processBuffers, &pc_merger);

ros::spin();
processing_thread.join();

return 0;
}

0 comments on commit 36df6f2

Please sign in to comment.