思考问题:
1. 如何实现传感器数据的融合,或者说时间同步? 比如里程计读数和雷达数据融合?
void SlamGMapping::startLiveSlam()
{
entropy_publisher_ = private_nh_.advertise<std_msgs::Float64>("entropy", 1, true);
sst_ = node_.advertise<nav_msgs::OccupancyGrid>("map", 1, true);
sstm_ = node_.advertise<nav_msgs::MapMetaData>("map_metadata", 1, true);
ss_ = node_.advertiseService("dynamic_map", &SlamGMapping::mapCallback, this);
scan_filter_sub_ = new message_filters::Subscriber<sensor_msgs::LaserScan>(node_, "scan", 5);
scan_filter_ = new tf::MessageFilter<sensor_msgs::LaserScan>(*scan_filter_sub_, tf_, odom_frame_, 5);
scan_filter_->registerCallback(boost::bind(&SlamGMapping::laserCallback, this, _1)); transform_thread_ = new boost::thread(boost::bind(&SlamGMapping::publishLoop, this, transform_publish_period_));
}
ROS中message_filters 和 tf::MessageFilter 需要关注一下。
message_filters is a utility library for use with roscpp and rospy. It collects commonly used message "filtering" algorithms into a common space. A message filter is defined as something which a message arrives into and may or may not be spit back out of at a later point in time.
An example is the time synchronizer, which takes in messages of different types from multiple sources, and outputs them only if it has received a message on each of those sources with the same timestamp.
以下为MessageFilter泛型类执行的基本流程。
/**
* \brief Connect this filter's input to another filter's output. If this filter is already connected, disconnects first.
*/
template<class F> void connectInput(F& f)
{
message_connection_.disconnect();
message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
}
void incomingMessage(const ros::MessageEvent<M const>& evt)
{
add(evt);
}
/**
* \brief Manually add a message into this filter.
* \note If the message (or any other messages in the queue) are immediately transformable this will immediately call through to the output callback, possibly
* multiple times
*/
void add(const MConstPtr& message)
{
boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
(*header)["callerid"] = "unknown";
add(MEvent(message, header, ros::Time::now()));
}
void add(const MEvent& evt)
{
if (target_frames_.empty())
{
return;
} namespace mt = ros::message_traits;
const MConstPtr& message = evt.getMessage();
std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
ros::Time stamp = mt::TimeStamp<M>::value(*message); if (frame_id.empty())
{
messageDropped(evt, filter_failure_reasons::EmptyFrameID);
return;
} // iterate through the target frames and add requests for each of them
MessageInfo info;
info.handles.reserve(expected_success_count_);
{
V_string target_frames_copy;
// Copy target_frames_ to avoid deadlock from #79
{
boost::mutex::scoped_lock frames_lock(target_frames_mutex_);
target_frames_copy = target_frames_;
} V_string::iterator it = target_frames_copy.begin();
V_string::iterator end = target_frames_copy.end();
for (; it != end; ++it)
{
const std::string& target_frame = *it;
tf2::TransformableRequestHandle handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp);
if (handle == 0xffffffffffffffffULL) // never transformable
{
messageDropped(evt, filter_failure_reasons::OutTheBack);
return;
}
else if (handle == )
{
++info.success_count;
}
else
{
info.handles.push_back(handle);
} if (!time_tolerance_.isZero())
{
handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_);
if (handle == 0xffffffffffffffffULL) // never transformable
{
messageDropped(evt, filter_failure_reasons::OutTheBack);
return;
}
else if (handle == )
{
++info.success_count;
}
else
{
info.handles.push_back(handle);
}
}
}
} // We can transform already
if (info.success_count == expected_success_count_)
{
messageReady(evt);
}
else
{
// If this message is about to push us past our queue size, erase the oldest message
if (queue_size_ != && message_count_ + > queue_size_)
{
// While we're using the reference keep a shared lock on the messages.
boost::shared_lock< boost::shared_mutex > shared_lock(messages_mutex_); ++dropped_message_count_;
const MessageInfo& front = messages_.front();
TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_,
(mt::FrameId<M>::value(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec()); V_TransformableRequestHandle::const_iterator it = front.handles.begin();
V_TransformableRequestHandle::const_iterator end = front.handles.end();
for (; it != end; ++it)
{
bc_.cancelTransformableRequest(*it);
} messageDropped(front.event, filter_failure_reasons::Unknown);
// Unlock the shared lock and get a unique lock. Upgradeable lock is used in transformable.
// There can only be one upgrade lock. It's important the cancelTransformableRequest not deadlock with transformable.
// They both require the transformable_requests_mutex_ in BufferCore.
shared_lock.unlock();
// There is a very slight race condition if an older message arrives in this gap.
boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
messages_.pop_front();
--message_count_;
} // Add the message to our list
info.event = evt;
// Lock access to the messages_ before modifying them.
boost::unique_lock< boost::shared_mutex > unique_lock(messages_mutex_);
messages_.push_back(info);
++message_count_;
} TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_); ++incoming_message_count_;
}
完整的类代码:http://docs.ros.org/api/tf/html/c++/message__filter_8h_source.html
/*
2 * Copyright (c) 2008, Willow Garage, Inc.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of the Willow Garage, Inc. nor the names of its
14 * contributors may be used to endorse or promote products derived from
15 * this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */ #ifndef TF_MESSAGE_FILTER_H
#define TF_MESSAGE_FILTER_H #include <ros/ros.h>
#include <tf/transform_listener.h>
#include <tf/tfMessage.h> #include <string>
#include <list>
#include <vector>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/signals2.hpp> #include <ros/callback_queue.h> #include <message_filters/connection.h>
#include <message_filters/simple_filter.h> #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__) #define TF_MESSAGEFILTER_WARN(fmt, ...) \
ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: " fmt, getTargetFramesString().c_str(), __VA_ARGS__) namespace tf
{ namespace filter_failure_reasons
{
enum FilterFailureReason
{
Unknown,
OutTheBack,
EmptyFrameID,
};
}
typedef filter_failure_reasons::FilterFailureReason FilterFailureReason; class MessageFilterBase
{
public:
virtual ~MessageFilterBase(){}
virtual void clear() = ;
virtual void setTargetFrame(const std::string& target_frame) = ;
virtual void setTargetFrames(const std::vector<std::string>& target_frames) = ;
virtual void setTolerance(const ros::Duration& tolerance) = ;
virtual void setQueueSize( uint32_t new_queue_size ) = ;
virtual uint32_t getQueueSize() = ;
}; template<class M>
class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M>
{
public:
typedef boost::shared_ptr<M const> MConstPtr;
typedef ros::MessageEvent<M const> MEvent;
typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback;
typedef boost::signals2::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal; MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
: tf_(tf)
, nh_(nh)
, max_rate_(max_rate)
, queue_size_(queue_size)
{
init(); setTargetFrame(target_frame);
} template<class F>
MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))
: tf_(tf)
, nh_(nh)
, max_rate_(max_rate)
, queue_size_(queue_size)
{
init(); setTargetFrame(target_frame); connectInput(f);
} template<class F>
void connectInput(F& f)
{
message_connection_.disconnect();
message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);
} ~MessageFilter()
{
// Explicitly stop callbacks; they could execute after we're destroyed
max_rate_timer_.stop();
message_connection_.disconnect();
tf_.removeTransformsChangedListener(tf_connection_); clear(); TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
(long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_,
(long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
(long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_); } void setTargetFrame(const std::string& target_frame)
{
std::vector<std::string> frames;
frames.push_back(target_frame);
setTargetFrames(frames);
} void setTargetFrames(const std::vector<std::string>& target_frames)
{
boost::mutex::scoped_lock list_lock(messages_mutex_);
boost::mutex::scoped_lock string_lock(target_frames_string_mutex_); target_frames_ = target_frames; std::stringstream ss;
for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
{
ss << *it << " ";
}
target_frames_string_ = ss.str();
}
std::string getTargetFramesString()
{
boost::mutex::scoped_lock lock(target_frames_string_mutex_);
return target_frames_string_;
}; void setTolerance(const ros::Duration& tolerance)
{
time_tolerance_ = tolerance;
} void clear()
{
boost::mutex::scoped_lock lock(messages_mutex_); TF_MESSAGEFILTER_DEBUG("%s", "Cleared"); messages_.clear();
message_count_ = ; warned_about_unresolved_name_ = false;
warned_about_empty_frame_id_ = false;
} void add(const MEvent& evt)
{
boost::mutex::scoped_lock lock(messages_mutex_); testMessages(); if (!testMessage(evt))
{
// If this message is about to push us past our queue size, erase the oldest message
if (queue_size_ != && message_count_ + > queue_size_)
{
++dropped_message_count_;
const MEvent& front = messages_.front();
TF_MESSAGEFILTER_DEBUG(
"Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",
message_count_,
ros::message_traits::FrameId<M>::value(*front.getMessage()).c_str(),
ros::message_traits::TimeStamp<M>::value(*front.getMessage()).toSec());
signalFailure(messages_.front(), filter_failure_reasons::Unknown); messages_.pop_front();
--message_count_;
} // Add the message to our list
messages_.push_back(evt);
++message_count_;
} TF_MESSAGEFILTER_DEBUG(
"Added message in frame %s at time %.3f, count now %d",
ros::message_traits::FrameId<M>::value(*evt.getMessage()).c_str(),
ros::message_traits::TimeStamp<M>::value(*evt.getMessage()).toSec(),
message_count_); ++incoming_message_count_;
} void add(const MConstPtr& message)
{ boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>);
(*header)["callerid"] = "unknown";
add(MEvent(message, header, ros::Time::now()));
} message_filters::Connection registerFailureCallback(const FailureCallback& callback)
{
boost::mutex::scoped_lock lock(failure_signal_mutex_);
return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));
} virtual void setQueueSize( uint32_t new_queue_size )
{
queue_size_ = new_queue_size;
} virtual uint32_t getQueueSize()
{
return queue_size_;
} private: void init()
{
message_count_ = ;
new_transforms_ = false;
successful_transform_count_ = ;
failed_transform_count_ = ;
failed_out_the_back_count_ = ;
transform_message_count_ = ;
incoming_message_count_ = ;
dropped_message_count_ = ;
time_tolerance_ = ros::Duration(0.0);
warned_about_unresolved_name_ = false;
warned_about_empty_frame_id_ = false; tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this)); max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);
} typedef std::list<MEvent> L_Event; bool testMessage(const MEvent& evt)
{
const MConstPtr& message = evt.getMessage();
std::string callerid = evt.getPublisherName();
std::string frame_id = ros::message_traits::FrameId<M>::value(*message);
ros::Time stamp = ros::message_traits::TimeStamp<M>::value(*message); //Throw out messages which have an empty frame_id
if (frame_id.empty())
{
if (!warned_about_empty_frame_id_)
{
warned_about_empty_frame_id_ = true;
TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id. This message will only print once.", callerid.c_str());
}
signalFailure(evt, filter_failure_reasons::EmptyFrameID);
return true;
} //Throw out messages which are too old
for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
{
const std::string& target_frame = *target_it; if (target_frame != frame_id && stamp != ros::Time())
{
ros::Time latest_transform_time ; tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, ) ; if (stamp + tf_.getCacheLength() < latest_transform_time)
{
++failed_out_the_back_count_;
++dropped_message_count_;
TF_MESSAGEFILTER_DEBUG(
"Discarding Message, in frame %s, Out of the back of Cache Time "
"(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. "
"Message Count now: %d",
ros::message_traits::FrameId<M>::value(*message).c_str(),
ros::message_traits::TimeStamp<M>::value(*message).toSec(),
tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_); last_out_the_back_stamp_ = stamp;
last_out_the_back_frame_ = frame_id; signalFailure(evt, filter_failure_reasons::OutTheBack);
return true;
}
} } bool ready = !target_frames_.empty();
for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
{
std::string& target_frame = *target_it;
if (time_tolerance_ != ros::Duration(0.0))
{
ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) );
}
else
{
ready = ready && tf_.canTransform(target_frame, frame_id, stamp);
}
} if (ready)
{
TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_); ++successful_transform_count_; this->signalMessage(evt);
}
else
{
++failed_transform_count_;
} return ready;
} void testMessages()
{
if (!messages_.empty() && getTargetFramesString() == " ")
{
ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());
} int i = ; typename L_Event::iterator it = messages_.begin();
for (; it != messages_.end(); ++i)
{
MEvent& evt = *it; if (testMessage(evt))
{
--message_count_;
it = messages_.erase(it);
}
else
{
++it;
}
}
} void maxRateTimerCallback(const ros::TimerEvent&)
{
boost::mutex::scoped_lock list_lock(messages_mutex_);
if (new_transforms_)
{
testMessages();
new_transforms_ = false;
} checkFailures();
} void incomingMessage(const ros::MessageEvent<M const>& evt)
{
add(evt);
} void transformsChanged()
{
new_transforms_ = true; ++transform_message_count_;
} void checkFailures()
{
if (next_failure_warning_.isZero())
{
next_failure_warning_ = ros::Time::now() + ros::Duration();
} if (ros::Time::now() >= next_failure_warning_)
{
if (incoming_message_count_ - message_count_ == )
{
return;
} double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
if (dropped_pct > 0.95)
{
TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*, ROSCONSOLE_DEFAULT_NAME);
next_failure_warning_ = ros::Time::now() + ros::Duration(); if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
{
TF_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
}
}
}
} void disconnectFailure(const message_filters::Connection& c)
{
boost::mutex::scoped_lock lock(failure_signal_mutex_);
c.getBoostConnection().disconnect();
} void signalFailure(const MEvent& evt, FilterFailureReason reason)
{
boost::mutex::scoped_lock lock(failure_signal_mutex_);
failure_signal_(evt.getMessage(), reason);
} Transformer& tf_;
ros::NodeHandle nh_;
ros::Duration max_rate_;
ros::Timer max_rate_timer_;
std::vector<std::string> target_frames_;
std::string target_frames_string_;
boost::mutex target_frames_string_mutex_;
uint32_t queue_size_; L_Event messages_;
uint32_t message_count_;
boost::mutex messages_mutex_; bool new_messages_;
volatile bool new_transforms_; bool warned_about_unresolved_name_;
bool warned_about_empty_frame_id_; uint64_t successful_transform_count_;
uint64_t failed_transform_count_;
uint64_t failed_out_the_back_count_;
uint64_t transform_message_count_;
uint64_t incoming_message_count_;
uint64_t dropped_message_count_; ros::Time last_out_the_back_stamp_;
std::string last_out_the_back_frame_; ros::Time next_failure_warning_; ros::Duration time_tolerance_; boost::signals2::connection tf_connection_;
message_filters::Connection message_connection_; FailureSignal failure_signal_;
boost::mutex failure_signal_mutex_;
}; } // namespace tf #endif
TF_MESSAGE_FILTER
bool
Buffer::canTransform(const std::string& target_frame, const std::string& source_frame,
const ros::Time& time, const ros::Duration timeout, std::string* errstr) const
{
if (!checkAndErrorDedicatedThreadPresent(errstr))
return false; // poll for transform if timeout is set
ros::Time start_time = now_fallback_to_wall();
while (now_fallback_to_wall() < start_time + timeout &&
!canTransform(target_frame, source_frame, time) &&
(now_fallback_to_wall()+ros::Duration(3.0) >= start_time) && //don't wait when we detect a bag loop
(ros::ok() || !ros::isInitialized())) // Make sure we haven't been stopped (won't work for pytf)
{
sleep_fallback_to_wall(ros::Duration(0.01));
}
bool retval = canTransform(target_frame, source_frame, time, errstr);
conditionally_append_timeout_info(errstr, start_time, timeout);
return retval;
} ros::Time now_fallback_to_wall()
{
try
{
return ros::Time::now();
}
catch (ros::TimeNotInitializedException ex)
{
ros::WallTime wt = ros::WallTime::now();
return ros::Time(wt.sec, wt.nsec);
}
}
#include "tf2_ros/buffer.h"
2.ROS中的消息发布与订阅机制为啥不需要线程锁?