ros publish and subscribe

回调关系建立

创建handle时,会listen一个tcp端口
bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
bool TransportTCP::initializeSocket()
设置poll的回调
poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
socketUpdate会调特定socket发生事件时指定的函数,此时只有accept_cb被赋值
accept_cb其实就是void ConnectionManager::tcprosAcceptConnection(const TransportTCPPtr& transport)

accept系统调用在poll线程中被调用,具体函数为
TransportTCPPtr TransportTCP::accept(),在它里面的
accept系统调用返回时,先构造TCPTransport
bool TransportTCP::setSocket(int sock)
中,将新建的socket塞进poll池子里

接下来调accept_cb,里面走到了
void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
给已经塞进poll池子里的transport,设置回调

  transport_->setReadCallback(boost::bind(&Connection::onReadable, this, _1));
  transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, _1));
  transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, _1));

若塞进池子里的接受fd可写,那么最终会调
void Connection::writeTransport()会以while循环,把buffer里面的数据发完

poll线程

NodeHandle::NodeHandle(const NodeHandle& rhs)
void NodeHandle::construct(const std::string& ns, bool validate_name)
void start()
PollManager::instance()->start();
thread_ = boost::thread(&PollManager::threadFunc, this);

  while (!shutting_down_)
  {
    {
      boost::recursive_mutex::scoped_lock lock(signal_mutex_);
      //调processPublishQueues,把publisher的message
      //塞到subscriber link的queue里面
      // 之后给该fd造一个poll的可写event,在后面的update中,epoll_wait就会返回并处理。
      poll_signal_(); 
       
    }

    if (shutting_down_)
    {
      return;
    }

      // update,监听epoll set。
      // 若sender socket有数据待发送,在上面的poll_signal_中产生了事件,此update必然返回
      // 若reader socket有数据能接收,也能返回
      // 否则就会进入等待,一旦进入等待,那么send只有在100ms后才能send出去?!
      // asw:在用户线程中,如果enqueue了一个需要通过socket写的message,会唤醒poll。
    poll_set_.update(100);
  }

boost::shared_ptr<std::vector<socket_pollfd> > ofds = poll_sockets(epfd_, &ufds_.front(), ufds_.size(), poll_timeout); // 有事件发生,则返回,否则等待100ms
之后调用info.func

publish-push queue —— by 用户线程

void publish(const boost::shared_ptr& message) const ——设置了messag的type_info可以在进程内直接将mesage塞到subscriber的queue里面,不再经过fd。
void publish(const M& message) const ——不设置type_info,导致下面的serialize必为true。

TopicManager::instance()->publish(impl_->topic_, serfunc, m);
序列化
publish
void Publication::publish(SerializedMessage& m)
↓ 检查impl对象可用性
void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)

注意两个变量
nocopy——若为true,则m的message不会被reset,而直接enqueue到subscriber里。
只有存在以const boost::shared_ptr方式调用publish,并且有进程内部的subscriber时会被置为true。
serialize——若为true,则需要进行序列化,并且publish时,会唤醒poll线程的100ms wait。
存在进程间subscriber,则此serialize必为true。
若没有进程间subscriber,但是message以引用而非指针publish时,会置为true。

  • nocopy的进程内部subscriber
    for (; it != end; ++it)
    {
      const SubscriberLinkPtr& sub = *it;
      if (sub->isIntraprocess())
      {
        sub->enqueueMessage(m, false, true);
      }
    }

  • need copy的进程内部subscriber,或者进程间subscriber。

publish_queue_.push_back(m);

publish-pop queue —— by poll 线程

topic manager 来pop publish调用enqueue的message
TopicManager::instance()->start();,于node handle构造函数调用
void PollManager::threadFunc() 中,见poll线程,会通过回调用processPublishQueues
processPublishQueues 是一个回调,在void PollManager::threadFunc() 中被调用,也就是说还是发送线程管发
把publish_queue_里面的东西搬到局部变量,
queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end());
publish_queue_.clear();
调enqueue
bool Publication::enqueueMessage(const SerializedMessage& m)
对每个subscriber,enqueue 也就是说,每个subscriber都有个queue

  for(V_SubscriberLink::iterator i = subscriber_links_.begin();
      i != subscriber_links_.end(); ++i)
  {
    const SubscriberLinkPtr& sub_link = (*i);
    sub_link->enqueueMessage(m, true, false);
  }

outbox_.push(m);
startMessageWrite(false);
调用connection,write
connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, _1), immediate_write);

// 把message设置到buffer里面,在发送的线程会把这个buffer发出去
{
    boost::mutex::scoped_lock lock(write_callback_mutex_);

    ROS_ASSERT(!write_callback_);

    write_callback_ = callback;
    write_buffer_ = buffer;
    write_size_ = size;
    write_sent_ = 0;
    has_write_callback_ = 1;
  }

transport_->enableWrite();
poll_set_->addEvents(sock_, POLLOUT);
给发送的线程说可以写了,发送线程会锁住write_callback_mutex_,写poll的fd

同时,onMessageWritten是一个message写完后的回调,写完后。
会通过回调和immediate_write变量配合,在pollwait线程里写完这个connection的所有message。

上一篇:第二章 Publisher 和 Subscriber (Part. 1)


下一篇:rasa培训课程:3小时掌握rasa3.x项目实战之电商零售Customer Service智能业务对话