回调关系建立
创建handle时,会listen一个tcp端口
bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
bool TransportTCP::initializeSocket()
设置poll的回调
poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
socketUpdate会调特定socket发生事件时指定的函数,此时只有accept_cb被赋值
accept_cb其实就是void ConnectionManager::tcprosAcceptConnection(const TransportTCPPtr& transport)
accept系统调用在poll线程中被调用,具体函数为
TransportTCPPtr TransportTCP::accept(),在它里面的
accept系统调用返回时,先构造TCPTransport
bool TransportTCP::setSocket(int sock)
中,将新建的socket塞进poll池子里
接下来调accept_cb,里面走到了
void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
给已经塞进poll池子里的transport,设置回调
transport_->setReadCallback(boost::bind(&Connection::onReadable, this, _1));
transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, _1));
transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, _1));
若塞进池子里的接受fd可写,那么最终会调
void Connection::writeTransport()会以while循环,把buffer里面的数据发完
poll线程
NodeHandle::NodeHandle(const NodeHandle& rhs)
void NodeHandle::construct(const std::string& ns, bool validate_name)
void start()
PollManager::instance()->start();
thread_ = boost::thread(&PollManager::threadFunc, this);
while (!shutting_down_)
{
{
boost::recursive_mutex::scoped_lock lock(signal_mutex_);
//调processPublishQueues,把publisher的message
//塞到subscriber link的queue里面
// 之后给该fd造一个poll的可写event,在后面的update中,epoll_wait就会返回并处理。
poll_signal_();
}
if (shutting_down_)
{
return;
}
// update,监听epoll set。
// 若sender socket有数据待发送,在上面的poll_signal_中产生了事件,此update必然返回
// 若reader socket有数据能接收,也能返回
// 否则就会进入等待,一旦进入等待,那么send只有在100ms后才能send出去?!
// asw:在用户线程中,如果enqueue了一个需要通过socket写的message,会唤醒poll。
poll_set_.update(100);
}
boost::shared_ptr<std::vector<socket_pollfd> > ofds = poll_sockets(epfd_, &ufds_.front(), ufds_.size(), poll_timeout); // 有事件发生,则返回,否则等待100ms
之后调用info.func
publish-push queue —— by 用户线程
void publish(const boost::shared_ptr
void publish(const M& message) const ——不设置type_info,导致下面的serialize必为true。
TopicManager::instance()->publish(impl_->topic_, serfunc, m);
序列化
publish
void Publication::publish(SerializedMessage& m)
↓ 检查impl对象可用性
void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
注意两个变量
nocopy——若为true,则m的message不会被reset,而直接enqueue到subscriber里。
只有存在以const boost::shared_ptr方式调用publish,并且有进程内部的subscriber时会被置为true。
serialize——若为true,则需要进行序列化,并且publish时,会唤醒poll线程的100ms wait。
存在进程间subscriber,则此serialize必为true。
若没有进程间subscriber,但是message以引用而非指针publish时,会置为true。
- nocopy的进程内部subscriber
for (; it != end; ++it)
{
const SubscriberLinkPtr& sub = *it;
if (sub->isIntraprocess())
{
sub->enqueueMessage(m, false, true);
}
}
- need copy的进程内部subscriber,或者进程间subscriber。
publish_queue_.push_back(m);
publish-pop queue —— by poll 线程
topic manager 来pop publish调用enqueue的message
TopicManager::instance()->start();,于node handle构造函数调用
void PollManager::threadFunc() 中,见poll线程,会通过回调用processPublishQueues
processPublishQueues 是一个回调,在void PollManager::threadFunc() 中被调用,也就是说还是发送线程管发
把publish_queue_里面的东西搬到局部变量,
queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end());
publish_queue_.clear();
调enqueue
bool Publication::enqueueMessage(const SerializedMessage& m)
对每个subscriber,enqueue 也就是说,每个subscriber都有个queue
for(V_SubscriberLink::iterator i = subscriber_links_.begin();
i != subscriber_links_.end(); ++i)
{
const SubscriberLinkPtr& sub_link = (*i);
sub_link->enqueueMessage(m, true, false);
}
outbox_.push(m);
startMessageWrite(false);
调用connection,write
connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write);
// 把message设置到buffer里面,在发送的线程会把这个buffer发出去
{
boost::mutex::scoped_lock lock(write_callback_mutex_);
ROS_ASSERT(!write_callback_);
write_callback_ = callback;
write_buffer_ = buffer;
write_size_ = size;
write_sent_ = 0;
has_write_callback_ = 1;
}
transport_->enableWrite();
poll_set_->addEvents(sock_, POLLOUT);
给发送的线程说可以写了,发送线程会锁住write_callback_mutex_,写poll的fd
同时,onMessageWritten是一个message写完后的回调,写完后。
会通过回调和immediate_write变量配合,在pollwait线程里写完这个connection的所有message。