asio核心概念和功能

原因

大多数程序以某种方式与外界交互,无论是通过文件、网络、串行电缆还是控制台。 有时,就像网络一样,单个 I/O 操作可能需要很长时间才能完成。 这对应用程序开发提出了特殊的挑战。

Boost.Asio 提供了管理这些长时间运行的操作的工具,而无需程序使用基于线程和显式加锁的并发模型。

Boost.Asio 库适用于使用 C++ 进行系统编程的程序员,这些程序员通常需要访问操作系统功能,例如网络。 特别是,Boost.Asio 解决了以下目标:

  • 可移植性。该库支持一系列常用的操作系统,并为这些操作系统提供一致性的行为。
  • 伸缩性。该库应用来开发能扩展到数千个并发连接的网络应用程序。每个操作系统的库实现应该使用最能实现这种可伸缩性的机制。
  • 高效。该库应支持分散-聚集 I/O 等技术,并允许程序最大限度地减少数据复制。
  • 模型概念来自已有的API,如BSD套接字。BSD套接字API被广泛实现和理解,并在很多文献都有涉及。其他编程语言通常使用相似的网络接口API。在合理的情况下,Boost.Asio应该利用现有的做法。
  • 易用。该库应该采用提供工具包而不是框架的方法为新用户提供较低的入门门槛。也就是说,它应该尽量减少预先在时间上的投资,只学习一些基本的规则和指导方针。在此之后,库用户应该只需要了解正在使用的特定函数。
  • 进一步抽象的基础。该库应该允许能够开发提供更高级别抽象的库。例如,常用协议HTTP的实现。

尽管 Boost.Asio 一开始主要关注网络,但它的异步 I/O 概念已经扩展到包括其他操作系统资源,例如串行端口、文件描述符等。

核心概念和功能

Asio的基本架构

Asio可用于对IO对象(如套接字)执行同步和异步操作。在使用 Boost.Asio 之前,了解一下 Boost.Asio 的各个部分、您的程序以及它们如何协同工作的概念图可能会很有用。

作为入门级的示例,让我们考虑在套接字上执行连接操作时会发生什么。 我们将从检查同步操作开始。

asio核心概念和功能

你的程序中应该至少有一个IO执行上下文(I/O Execution Context),例如一个boost::asio::io_context对象,boost::asio::thread_pool对象或者boost::asio::system_context对象。I/O可执行上下文代表了你的程序与操作系统的I/O服务的链接。

boost::asio::io_context io_context;

为了执行I/O操作,你的程序需要一个I/O对象,例如TCP套接字:

boost::asio::ip::tcp::socket socket(io_context);

当执行一个同步的连接操作时,会发生下面的事件时序:

  1. 你的程度通过调用I/O对象来启动连接操作。
socket.connect(server_endpoint);
  1. IO对象将请求转发到IO execution context。
  2. IO execution context调用操作系统来执行连接操作。
  3. 操作系统返回执行结果给IO execution context。
  4. IO execution context将任何运行的错误结果转为boost::system::error_code类型。error_code可以与特定值进行比较,或作为布尔值进行测试(为false表示没有发生错误)。结果转发会IO对象。
  5. 如果操作失败,IO对象抛出一个boost::system::system_error类型的异常。如果初始化操作的代码被写为:
boost::system::error_code ec;
socket.connect(server_endpoint,ec);

​ 那么error_code类型变量ec将会被设置为运行结果,不会抛出异常。

当使用异步操作的时候,发生的事件时序有所不同。

asio核心概念和功能

  1. 你的程序通过调用IO对象启动连接操作:
socket.async_connect(server_endpoint,completion_handler);

​ completion_handler是一个函数并且函数签名如下:

void completion_handler(const boost::system::error_code ec);

​ 所需的确切签名取决于正在执行的异步操作。参考文档指出了每个操作的适当形式。

  1. IO对象转发请求到IO execution context。
  2. IO execution context向操作系统发送信号,表示它应该启动一个异步连接。不会阻塞在连接操作处。 (在同步情况下,此等待将完全包含在连接操作的持续时间内。)

asio核心概念和功能

  1. 操作系统通过将结果放入队列,准备被IO execution context取走来指示连接操作已经完成。
  2. 当使用io_context作为IO execution context,你的程序必须调用io_context::run()(或一个类似io_context的成员函数)来获取结果。调用io_context::run(),只要还有未完成的异步操作就会阻塞,所以只要你启动了你的第一个异步操作你就应该立即调用。
  3. 在调用io_context::run()时,IO execution context取出运行结果,将其转为error_code,然后传递给completion handler。

Proactor设计模式:无线程的并发

Asio为同步操作和异步操作提供的并行支持。异步操作是基于Proactor设计模式。与只有同步和Reactor方式相比,这种模式的优缺点如下所述:

Proactor与Asio

我们先了解一下Proactor在Asio中是如何实现的,但不设计具体的细节。

asio核心概念和功能

  • Asynchronous Operation(异步操作):定义一个异步执行的操作,例如异步读或写一个套接字。
  • Asynchronous Operation Processor(异步操作处理器):执行异步操作,并且当操作完成时在完成事件队列上对事件进行排队。
  • Completion Event Queue(完成事件队列):缓存完成事件直到他们被异步事件分发器取走。
  • Completion Handler(完成处理器):处理异步操作的结果。这些是函数对象,通常是使用boost::bind创建的。
  • Asynchronous Event Demultiplexer(异步事件分配器):阻塞等待完成事件队列上有事件出现,并将完成的时间返回给调用者。
  • Proactor:调用异步事件分配器来取出事件,然后调度与事件关联的完成处理器(即调用函数对象)。抽象为io_context类。
  • Initiator(启动器):启动异步操作的应用程序代码。Initiator通过高级接口(如basic_stream_socket)与异步操作处理器交互,该接口反过来委托给像reactive_socket_service的服务。

使用Reactor实现

在许多平台,Asio使用Reactor实现Proactor设计模式,例如select,epoll或者kqueue。这种实现方法对应的Proactor设计模式如下:

  • Asynchronous Operation Processor:Reactor实现使用select,epoll或kqueue。当Reactor指示执行操作的资源准备好时,处理器执行异步操作并将相关的完成处理器入队到完成事件队列中。
  • Completion Event Queue:一个完成处理器的链表。
  • Asynchronous Event Demultiplexer:这个是通过等待一个事件或条件变量直到完成事件队列中有一个完成处理器可用实现的。

使用Windows Overlapped I/O实现

在Windows NT,2000和XP。Asio利用Overlapped I/O的优点提供了高效的Proactor设计模式的实现。这种实现方式对应的Proactor设计模式如下:

  • Asynchronous Operation Processor:这是通过操作系统实现的。操作是通过调用诸如 AcceptEx 之类的重叠函数来启动的。
  • Completion Event Queue:这是由操作系统实现的,并与I/O完成端口相关联。每个io_context实例都有一个I/O完成端口。
  • Asynchronous Event Demultiplexer:由 Asio 调用以使事件及其关联的完成处理程序出列。

优点

  • 可移植性。许多操作系统提供本机异步 I/O API(例如Windows上的重叠 I/O )作为开发高性能网络应用程序的首选选项。该库可以根据本机异步 I/O 来实现。但是,如果本机支持不可用,也可以使用代表 Reactor 模式的同步事件多路分解器(例如POSIX select() 来实现该库。
  • 将线程和并发解耦。长时间操作是由代表应用程序的实现来异步执行的,因此,应用程序不需要产生许多线程来增加并发性。
  • 高效和可伸缩性。由于增加了 CPU 之间的上下文切换、同步和数据移动,诸如thread-per-connect(仅同步方法需要)之类的实现策略可能会降低系统性能。 通过异步操作,可以通过最小化操作系统线程的数量并且只激活有事件要处理的逻辑控制线程来避免上下文切换的成本。
  • 简化应用程序同步。可以编写异步操作完成处理程序,就好像它们存在于单线程环境中一样,因此可以在开发应用程序逻辑时几乎不关心同步问题。
  • 函数组合。函数组合是指实现以提供更高级操作的函数,例如以特定格式发送消息。 每个函数都是对较低级别的读取或写入操作的多次调用来实现的。例如,考虑这样一种协议,其中每个消息由固定长度的报头和可变长度的正文组成,其中正文的长度在报头中指定。假设的读取消息操作可以使用两个较低级别的读取来实现,第一个读取用于接收消息头,一旦长度已知,第二个读取用于接收消息体。要在异步模型中组合函数,可以将异步操作链接在一起。也就是说,一个操作的完成处理程序可以启动下一个操作。启动链中的第一个调用可以被封装,这样调用者就不需要知道高级操作是作为异步操作链实现的。以这种方式组合新操作的能力简化了在网络库之上的更高抽象级别的开发,例如支持特定协议的函数。

缺点

  • 程序复杂度。由于操作启动和完成之间的时间和空间分离,使用异步机制开发应用程序更加困难。 由于反向控制流,应用程序也可能更难调试。

  • 内存利用率。在读或写操作期间必须提交缓冲区空间,这可能会无限期地持续下去,并且每个并发操作都需要一个单独的缓冲区。 另一方面,Ractor模式在套接字准备好读取或写入之前不需要缓冲区空间。

线程和Asio

线程安全

一般来说,并发使用不同的对象是安全的,但是并发使用单个对象是不安全的。但是,诸如io_context之类的类型提供了强力的保证,即并发使用单个对象是安全的。

线程池

多个线程可以调用 io_context::run() 来设置一个线程池,可以从中调用完成处理器。这种方法也可以与post()一起使用,作为跨线程池执行任意计算任务的方法。

请注意,所有加入io_context的线程都被认为是等价的,io_context可以以任意的方式在它们之间分发工作。

内部线程

这个库对于特定平台的实现可以使用一个或多个内部线程来模拟异步性。这些线程对于库的使用者必须尽可能不可见。特别的,这些线程:

  • 一定不能直接调用用户代码
  • 必须屏蔽所有信号

此方法由以下保证补充:

  • 异步完成处理器只会被当前正在调用 io_context::run() 的线程调用。

因此,库用户有责任创建和管理通知将发送到的所有线程。

采用这种方法的原因包括:

  • 通过仅从单个线程调用io_context::run(),用户代码可以避免与同步相关的开发复杂度。例如,库用户可以实现单线程的可扩展服务器(从用户角度看)。
  • 库用户可能需要在线程启动后不久和任何其他应用程序代码执行之前在线程中执行初始化。例如,Microsoft 的 COM 用户必须先调用 CoInitializeEx,然后才能从该线程调用任何其他 COM 操作。
  • 库接口与线程创建和管理的接口解耦,并允许在线程不可用的平台上实现。

Strands:使用线程而不使用显式加锁

Strands被定义为事件处理程序的严格调用。使用Strands允许在多线程程序中执行代码而无需显式加锁。

Strands可以是显式的,也可以是隐式的。如下面的例子:

  • 只在一个线程调用io_context::run()意味着所有的事件处理器在一个隐式strand中,因为io_context保证处理器只能从run()中调用。
  • 如果存在与连接相关联的单个异步操作链(例如,在像 HTTP 这样的半双工协议实现中),则不可能同时执行处理器。 这是一个隐含的链。
  • 显式的strand是strand<>或者io_context::strand的实例。所有的事件处理器函数对象需要使用boost::asio::bind_executor绑定到strand上,或者使用strand对象进行发布/调度。

在异步操作组合的情况下,像async_read()或者async_read_until(),如果一个完成处理器经过了一个strand,那么所有的中间处理器也应该经过同样的strand。这是确保能够线程安全地访问调用者和组合操作之间共享对象所必要的(在async_read()的情况下,它是套接字,调用者可以close()取消操作)。

为了实现这一点,所有的异步操作通过使用get_associated_executor函数获取处理器相关的执行器。例如:

boost::asio::associated_executor_t<Handler> a = boost::asio::get_associated_executor(h);

相关的执行器必须满足Executor要求。异步操作使用它来提交中间和最终处理器以供执行。

可以通过指定嵌套类型executor_type和成员函数get_executor为特定处理器类型自定义执行器:

class my_handler {
public:
  //Executor 类型要求的自定义实现。
  typedef my_executor executor_type;
  //返回一个自定义执行器的实现
  executor_type get_executor() const noexcept {
    return my_executor();
  }
  void operator()() {...}
};

在更复杂的情况下,associated_executor模板可能会直接部分特化:

//处理器
struct my_handler {
  void operator()() {...}  
};

namespace boost {namespace asio {
    //特化associator_executor模板
    template<class Executor>
    struct associated_executor<my_handler,Executor> {
      //Executor 类型要求的自定义实现。
      typedef my_executor type;
      //返回一个自定义执行器的实现
      static type get(const my_handler&,const Executor&=Executor()) noexcept {
          return my_executor();
      }  
    };
}}

boost::asio::bind_executor()函数用来将特定的executor对象(像strand)绑定到一个完成处理器上。这个绑定会自动关联一个执行器。例如,为了将strand绑定到一个完成处理器上,我们可以简单地写为:

my_socket.async_read_some(my_buffer, 
    boost::asio::bind_executor(my_strand,[](error_code ec,size_t length) {
    	//....
}));

Buffers

从根本上说,I/O涉及在内存的连续区域(称为缓冲区)之间传输数据。

这些缓冲区可以简单地表示为由一个指针和一个字节大小组成的元组。但是,为了开发高效的网络应用程序,Asio包括对分散-聚集操作的支持。这些操作涉及一个或多个缓冲区。

  • 分散读将数据读取到多个缓冲区
  • 聚集写,传输多个缓冲区

因为,我们需要一个表示缓冲区集合的抽象。Asio使用的方法就是定义一个(实际上是两个)来表示单个缓冲区。这些可以存储在一个容器中,而该容器可以传递给分散-聚集操作。

除了将缓冲区指定为指针和字节大小之外,Boost.Asio 还区分了可修改内存(称为可变)和不可修改内存(后者是从 const 限定变量的存储中创建的)。 因此,这两种类型可以定义如下:

typedef std::pair<void*,std::size_t> mutable_buffer;
typedef std::pair<const void*,std::size_t> const_buffer;

mutable_buffer可以转为const_buffer,但是不能反过来转换。

但是,Asio并没有使用上面的定义,而是定义了两个类mutable_bufferconst_buffer。其目的是提供连续内存的不透明表示,其中:

  • 类型的转换行为与std::pair定义方式的表现一样。也就是说mutable_bufer可以转为const_buffer,但是不能反过来。
  • 有防止缓冲区溢出的保护。给定一个缓冲区实例,用户只能创建表示相同内存范围或其子范围的另一个缓冲区。为了提供进一步的安全性,该库还包括用于从数组中自动确定缓冲区大小的机制,POD元素的boost::arraystd::vector,或来自std::stirng
  • 使用data()显式访问底层的内存。通常来说,应用程序不需要这样做,但是库的实现需要传递原始内存给底层操作系统函数。

最后,多个buffer可以通过将buffer对象放入容器中传给分散-聚集操作(像read()write())。定义了MutableBufferSequenceConstBufferSequence概念,以便可以使用像std::vector,std::list,std::array,boost::array的容器。

与iostreams集成的streambuf

boost::asio::basic_streambuf派生自std::basic_streambuf以将输入序列和输出序列与某种字符数组类型的一个或多个对象相关联,这些对象的元素存储任意值。这些字符数组对象在 streambuf 对象内部,但提供了对数组元素的直接访问,以允许它们与 I/O 操作一起使用,例如套接字的发送或接收操作:

  • streambuf的输入序列可以通过data()成员函数访问。该函数的返回类型满足ConstBufferSequence的要求。
  • streambuf的输出序列可以通过prepare()成员函数访问。函数的返回类型满足MutableBufferSequence要求。
  • 通过调用commit()成员函数,数据从输出序列的前面传输到输入序列的后面。
  • 通过调用consume() 成员函数从输入序列的前面删除数据。

streambuf 构造函数接受一个size_t参数,指定输入序列和输出序列的大小之和的最大值。 如果成功,任何将内部数据增长超过此限制的操作都将抛出 std::length_error 异常。

按字节顺序遍历缓冲区序列

buffers_iterator<> 类模板允许遍历缓冲区序列(即满足 MutableBufferSequenceConstBufferSequence要求的类型),就好像它们是连续的字节序列一样。还提供了称为 buffers_begin()buffers_end() 的辅助函数,其中会自动推导出 buffers_iterator<> 模板参数。

举个例子,从套接字中读取一行放入std::string,可以写为:

boost::asio::streambuf sb;
...
std::size_t n = boost::asio::read_until(sock,sb,'\n');
boost::asio::streambuf::const_buffers_type bufs = sb.data();
std::string line(
	boost::asio::buffers_begin(bufs),
    boost::asio::buffers_begin(bufs)+n
);

Buffer debugging

一些标准库的实现,比如微软Visual c++ 8.0及更高版本附带的库,提供了一个称为迭代器调试的特性。这意味着在运行时检查迭代器的有效性。如果程序尝试使用已失效的迭代器,则会触发断言。 例如:

std::vector<int> v(1);
std::vector<int>::iterator i = v.begin();
v.clear(); //使迭代器无效
*i=0; //断言

Asio利用了这一特性,加入到了buffer的debugging。考虑下面的代码:

void dont_do_this() {
    std::string msg = "Hello,world!";
    boost::asio::async_write(sock,boost::asio::buffer(msg),my_handler);
}

当您调用异步读取或写入时,您需要确保操作的缓冲区在调用完成处理器之前有效。在上面的例子中,缓冲区是 std::string 变量 msg。这个变量在堆栈上,所以它在异步操作完成之前就超出了范围。如果你很幸运,那么应用程序会崩溃,但更有可能出现随机故障。

启用缓冲区调试时,Asio 将迭代器存储到string中,直到异步操作完成,然后解引用它以检查其有效性。在上面的示例中,您将在 Asio 尝试调用完成处理器之前观察到断言失败。

当定义_GLIBCXX_DEBUG时,此功能会自动适用于 Microsoft Visual Studio 8.0 或更高版本以及 GCC。此检查会产生性能成本,因此缓冲区调试仅在调试版本中启用。对于其他编译器,它可以通过定义 BOOST_ASIO_ENABLE_BUFFER_DEBUGGING 来启用。 它也可以通过定义 BOOST_ASIO_DISABLE_BUFFER_DEBUGGING 来显式禁用。

Streams,Short Read and Short Writes

Asio的许多I/O对象是面向流。这就意味着:

  • 没有消息边界。数据时作为连续的字节序列传输的。
  • 读或写操作可能传输的字节比要求的更少。这被称为短读(short read)或短写(short write)。

提供面向流的 I/O 模型的对象具有以下一种或多种类型要求:

  • SyncReadStream:其中使用名为read_some() 的成员函数执行同步读取操作。
  • AsyncReadStream,其中使用名为 async_read_some() 的成员函数执行异步读取操作。
  • SyncWriteStream,其中使用名为write_some()的成员函数执行同步写入操作。
  • AsyncWriteStream,其中使用名为 async_write_some() 的成员函数执行异步写入操作。

面向流的IO对象的例子包括ip::tcp::socket,ssl::stream<>,posix::stream_descriptor,windows::stream_handle等等。

程序通常希望传输确切数量的字节。 当发生短读或短写时,程序必须重新开始操作,并继续这样做,直到传输了所需的字节数。 Asio 提供了自动执行此操作的通用函数:read()async_read()write()async_write()

为什么EOF是错误

  • 流的结尾会导致read、async_read、read_untilasync_read_until 函数违反它们的约定。 例如。 由于 EOF,N 个字节的读取可能会提前完成。
  • EOF 错误可用于区分流的结束和成功读取了0字节大小的数据。

Reactor风格的操作

有时,程序必须与想要自己执行 I/O 操作的第三方库集成。为促进这一点,Asio 的同步和异步操作可用于等待套接字准备好读取、准备写入或具有挂起的错误条件。

举个例子,执行非阻塞读:

ip::tcp::socket socket(my_io_context);
...
socket.non_blocking(true);
...
socket.async_wait(ip::tcp::socket::wait_read,read_handler);
...
void read_handler(boost::system::error_code ec) {
    if(!ec) {
        std::vector<charA> buf(socket.available());
        socket.read_some(buffer(buf));
    }
}

所有平台上的套接字和 POSIX 面向流的描述符类都支持这些操作。

基于行的操作

许多常用的 Internet 协议都是基于行的,这意味着它们具有由字符序列“\r\n”分隔的协议元素。例如HTTP,SMTP,FTP。为了更容易地实现基于行的协议以及其他使用分隔符的协议,Asio 提供了包括read_until() async_read_until()的函数。

下面例子说明了async_read_until()在HTTP服务器中的使用,用来接收来自客户端的HTTP请求的第一行:

class http_connection {
  ...
  void start() {		boost::asio::async_read_until(socket_,data_,"\r\n",boost::bind(&http_connection::handle_request_line,this,_1));
  }
  void handle_request_line(boost::system::error_code ec) {
      if(!ec) {
          std::string method, uri, version;
          char sp1,sp2,cr,lf;
          std::istream is(&data_);
          is.unsetf(std::ios_base::skipws);
          is >> method >>sp1 >> uri >> sp2 >>version >> cr >> lf;
          ...
      }
  }
  ...
  boost::asio::ip::tcp::socket socket_;
  boost::asio::streambuf data_;
};

streambuf 数据成员用作存储在搜索分隔符之前从套接字读取的数据的地方。重要的是要记住,分隔符之后可能还有其他数据。 这个多余的数据应该留在流缓冲中,以便后续调用read_until()async_read_until() 可以检查它。

分隔符可以指定为单个charstd::stringboost::regexread_until()async_read_until() 函数还包括接受称为匹配条件的用户定义函数对象的重载。 例如,要将数据读入流缓冲直到遇到空格:

typedef boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> iterator;
std::pair<iterator,bool> match_whitespace(iterator begin,iterator end) {
    iterator i = begin;
    while(i!=end) {
        if(std::isspace(*i++))
            return std::make_pair(i,true);
     return std::make_pair(i,false);
    }
}
...
boost::asio::streambuf b;
boost::asio::read_until(s,b,match_whitespace);

将数据读入流缓冲区直到找到匹配的字符为止。

class match_char
{
public:
    explicit match_char(char c) : c_(c) {}
    
    template <typename Iterator>
    std::pair<Iterator,bool>operator()(Iterator begin,iterator end) const 
    {
        Iterator i = begin;
        while(i != end) 
            if(c_ == *i++)
                return std::make_pair(i,true);
        return std::make_pair(i,false);
    }
private:
    char c_;
};
namespace boost 
{
    namespace asio
    {
        template<> struct is_match_condition<match_char> : public boost::true_type {}
    }
}
....
boost::asio::streambuf b;
boost::asio::read_until(s,b,match_char('a'));

对于函数和具有嵌套 result_type 类型定义的函数对象, is_match_condition<> 类型特征自动计算为真。对于其他类型,特征必须明确特化,如上所示。

自定义内存分配器

许多同步操作需要申请一个对象来存储与操作相关的状态。例如,Win32 实现需要将 OVERLAPPED 派生对象传递给 Win32 API 函数。

并且,程序通常包含易于识别的同步操作链。半双工协议实现(例如 HTTP 服务器)每个客户端都有一个操作链(接收后发送)。一个全双工协议实现有两条并行执行的链。程序应该能够利用这些知识为链中的所有异步操作重用内存。

给定一个用户定义的Handler对象h的拷贝,如果实现需要分配与该Handler关联的内存,它将使用 get_related_allocator 函数获取分配器。例如:

boost::asio::associated_allocator_t<Handler> a = boost::asio::get_associated_allocator(h);

关联的分配器必须满足标准的分配器的标准。

默认情况下,处理器使用标准分配器(用::oprator new()::operator delete()实现)。可以通过指定嵌套类型 allocator_type 和成员函数 get_allocator() 为特定处理其类型定制分配器:

class my_handler
{
public:
  // 分配器类型要求的自定义实现.
  typedef my_allocator allocator_type;

  // 返回一个自定义的分配器实现
  allocator_type get_allocator() const noexcept
  {
    return my_allocator();
  }

  void operator()() { ... }
};

在大多数复杂长江下,会直接对associated_allocator模板进行特化:

namespace boost 
{ 
    namespace asio 
    {

      template <typename Allocator>
      struct associated_allocator<my_handler, Allocator>
      {
        // Custom implementation of Allocator type requirements.
        typedef my_allocator type;

        // Return a custom allocator implementation.
        static type get(const my_handler&,
            const Allocator& a = Allocator()) noexcept
        {
          return my_allocator();
        }
  };

	} 
} // namespace boost::asio

该实现保证释放将在调用关联的处理器之前发生,这意味着内存已准备好重新用于处理器启动的任何新异步操作。

可以从调用库函数的任何用户创建的线程调用自定义内存分配函数。 该实现保证,对于包含在库中的异步操作,该实现不会对该处理器的内存分配函数进行并发调用。 如果需要从不同线程调用分配函数,在实现时将插入适当的内存屏障以确保正确的内存可见性。

Handler追踪

为了帮助调试异步程序,Asio提供了对Handler追踪的支持。通过定义BOOST_ASIO_ENABLE_HANDLER_TRACKING开启,Asio将调试输出写入标准错误流。输出记录异步操作及其和Handler之间的关系。

此功能在调试时很有用,您需要知道异步操作是如何链接在一起的,或者挂起的异步操作是什么。下面是运行HTTP服务器示例时的输出,处理单个请求,然后通过Ctrl+C关闭:

@asio|1589424178.741850|0*1|signal_set@0x7ffee977d878.async_wait
@asio|1589424178.742593|0*2|socket@0x7ffee977d8a8.async_accept
@asio|1589424178.742619|.2|non_blocking_accept,ec=asio.system:11
@asio|1589424178.742625|0|resolver@0x7ffee977d760.cancel
@asio|1589424195.830382|.2|non_blocking_accept,ec=system:0
@asio|1589424195.830413|>2|ec=system:0
@asio|1589424195.830473|2*3|socket@0x7fa71d808230.async_receive
@asio|1589424195.830496|.3|non_blocking_recv,ec=system:0,bytes_transferred=151
@asio|1589424195.830503|2*4|socket@0x7ffee977d8a8.async_accept
@asio|1589424195.830507|.4|non_blocking_accept,ec=asio.system:11
@asio|1589424195.830510|<2|
@asio|1589424195.830529|>3|ec=system:0,bytes_transferred=151
@asio|1589424195.831143|3^5|in 'async_write' (./../../../boost/asio/impl/write.hpp:330)
@asio|1589424195.831143|3*5|socket@0x7fa71d808230.async_send
@asio|1589424195.831186|.5|non_blocking_send,ec=system:0,bytes_transferred=1090
@asio|1589424195.831194|<3|
@asio|1589424195.831218|>5|ec=system:0,bytes_transferred=1090
@asio|1589424195.831263|5|socket@0x7fa71d808230.close
@asio|1589424195.831298|<5|
@asio|1589424199.793770|>1|ec=system:0,signal_number=2
@asio|1589424199.793781|1|socket@0x7ffee977d8a8.close
@asio|1589424199.793809|<1|
@asio|1589424199.793840|>4|ec=asio.system:125
@asio|1589424199.793854|<4|
@asio|1589424199.793883|0|signal_set@0x7ffee977d878.cancel

每一行的格式如下:

<tag> | <timestamp> | <action> | <description> 

<tag>总是@asio,用于从程序输出中识别和提取Handler追踪消息。

<timestamp>是距离1970.1.1的秒和毫秒。

<action>采取下面的形式之一:

  • >n:程序进入了编号n的处理器。<description>显示Handler的参数。
  • <n:程序离开了编号为n的处理器。
  • !n:由于异常,程序离开的编号为n的处理器。
  • ~n:编号为n的处理程序没有被调用就被销毁了。当io_context被销毁时,任何未完成的异步操作通常都是这种情况。
  • n^m:编号为n的处理程序将要创建一个新的异步操作,其完成处理器编号为m。<description>包含源位置信息,以帮助确定异步操作在程序中的何处启动。
  • n*m:编号为n的处理程序创建了一个新的异步操作,其完成处理程序编号为m。<description>显示了启动了哪些异步操作。
  • n:编号为n的处理器执行了一些其他操作。<description>显示了调用了什么函数。目前只有close()cancel()操作会被记录,因为这些操作可能会影响挂起的异步操作的状态。
  • .n:该实现执行了一个系统调用,作为异步操作的一部分,完成处理器编号为n。 <description> 显示调用了什么函数及其结果。 这些跟踪事件仅在使用基于Reactor的实现时才会发出。

<description> 显示同步或异步操作,格式为 <object-type>@<pointer>.<operation>。 对于处理程序条目,它显示了一个逗号分隔的参数列表及其值。

如上所示,每个处理程序都分配了一个数字标识符。 如果处理器跟踪输出显示处理程序编号为 0,则表示该操作是在任何处理器之外执行的。

添加局部信息

程序可以通过在源代码中使用宏 BOOST_ASIO_HANDLER_LOCATION 来增加处理器跟踪输出的位置信息。 例如:

#define HANDLER_LOCATION \
  BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__))

// ...

void do_read()
{
  HANDLER_LOCATION;

  auto self(shared_from_this());
  socket_.async_read_some(boost::asio::buffer(data_, max_length),
      [this, self](boost::system::error_code ec, std::size_t length)
      {
        HANDLER_LOCATION;

        if (!ec)
        {
          do_write(length);
        }
      });
}

使用附加位置信息可用时,处理程序跟踪输出可能包括源位置的调用堆栈:

@asio|1589423304.861944|>7|ec=system:0,bytes_transferred=5
@asio|1589423304.861952|7^8|in 'async_write' (./../../../boost/asio/impl/write.hpp:330)
@asio|1589423304.861952|7^8|called from 'do_write' (handler_tracking/async_tcp_echo_server.cpp:62)
@asio|1589423304.861952|7^8|called from 'operator()' (handler_tracking/async_tcp_echo_server.cpp:51)
@asio|1589423304.861952|7*8|socket@0x7ff61c008230.async_send
@asio|1589423304.861975|.8|non_blocking_send,ec=system:0,bytes_transferred=5
@asio|1589423304.861980|<7|

此外,如果 std::source_location std::experimental::source_location 可用,则 use_awaitable_t 标记(当默认构造或用作默认完成标记时)还将导致处理器跟踪为每个新创建的异步操作输出源位置 。 use_awaitable_t 对象也可以用局部信息显式构造。

可视化展示

可以使用包含的 handlerviz.pl 工具对处理器跟踪输出进行后处理,以创建处理程序的可视化表示(需要 GraphViz 工具dot)。

自定义追踪

可以通过将 BOOST_ASIO_CUSTOM_HANDLER_TRACKING 宏定义为头文件的名称(用“”或 <> 括起来)来自定义处理器跟踪。 此头文件必须实现以下预处理器宏:

Macro Description
BOOST_ASIO_INHERIT_TRACKED_HANDLER 为实现异步操作的类指定基类。 使用时,宏紧跟在类名之后,因此它必须具有以下形式:public my_class。
BOOST_ASIO_ALSO_INHERIT_TRACKED_HANDLER 为实现异步操作的类指定基类。 使用时,宏跟随其他基类,因此它必须具有形式,public my_class。
BOOST_ASIO_HANDLER_TRACKING_INIT(args) 用于初始化跟踪机制的表达式。
BOOST_ASIO_HANDLER_LOCATION(args) 用于定义源代码位置的变量声明。 args 是一个带括号的函数参数列表,包含文件名、行号和函数名。
BOOST_ASIO_HANDLER_CREATION(args) 在创建异步操作时调用的N表达式。Args是一个带圆括号的函数参数列表,包含拥有的执行上下文、被跟踪的处理程序、对象类型的名称、对象的指针、对象的本机句柄和操作名称。
BOOST_ASIO_HANDLER_COMPLETION(args) 在异步操作完成时调用的表达式。 args 是包含跟踪处理程序的带括号的函数参数列表。
BOOST_ASIO_HANDLER_INVOCATION_BEGIN(args) 在调用完成处理程序之前立即调用的表达式。 args 是一个带括号的函数参数列表,包含完成处理程序的参数。
BOOST_ASIO_HANDLER_INVOCATION_END 在调用完成处理程序后立即调用的表达式。
BOOST_ASIO_HANDLER_OPERATION(args) 在调用某些同步对象操作(例如 close() 或 cancel())时调用的表达式。 args 是一个带括号的函数参数列表,包含拥有的执行上下文、对象类型的名称、指向对象的指针、对象的本机句柄和操作名称。
BOOST_ASIO_HANDLER_REACTOR_REGISTRATION(args) 当对象注册到反应器时调用的表达式。 args 是一个带括号的函数参数列表,包含拥有的执行上下文、对象的本机句柄和唯一的注册键。
BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION(args) 当对象从反应器中注销时调用的表达式。 args 是一个带括号的函数参数列表,包含拥有的执行上下文、对象的本机句柄和唯一的注册键。
BOOST_ASIO_HANDLER_REACTOR_READ_EVENT 用于识别反应器读取就绪事件的位掩码常量。
BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT 用于标识反应器写入准备事件的位掩码常量。
BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT 用于识别反应器错误准备事件的位掩码常量。
BOOST_ASIO_HANDLER_REACTOR_EVENTS(args) 当注册到反应器的对象准备就绪时调用的表达式。 args 是一个带括号的函数参数列表,包含拥有的执行上下文、唯一的注册键和就绪事件的位掩码。
BOOST_ASIO_HANDLER_REACTOR_OPERATION(args) 当实现作为基于反应器的异步操作的一部分执行系统调用时调用的表达式。 args 是一个带括号的函数参数列表,包含被跟踪的处理程序、操作名称、操作产生的错误代码和(可选)传输的字节数。

并发提示

io_context构造器允许程序指定一个并发提示。这是对io_context实现中应用于运行完成处理器的活动线程数的建议。

当后台使用 Windows I/O 完成端口时,此值将传递给 CreateIoCompletionPort

当使用基于Reactor的后端时,实现会识别以下特殊的并发提示值:

Value Description
1 该实现假设 io_context 将从单个线程运行,并基于此假设应用多项优化。例如,当一个处理程序从另一个处理程序中发布时,新的处理程序被添加到一个快速线程本地队列(结果是新的处理程序被阻止,直到当前正在执行的处理程序完成)。
BOOST_ASIO_CONCURRENCY_HINT_UNSAFE 这个特殊的并发提示禁用了调度程序和反应器 I/O 中的锁定。
BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO 这个特殊的并发提示禁用反应器 I/O 中的锁定。
BOOST_ASIO_CONCURRENCY_HINT_SAFE 默认值。io_context提供了完整的线程安全性,并且任何线程都可以使用不同的I/O对象。

通过定义BOOST_ASIO_CONCURRENCY_HINT_DEFAULT宏,可以在编译时覆盖默认构造的 io_context 对象使用的并发提示。 例如,在编译期命令行指定

-DBOOST_ASIO_CONCURRENCY_HINT_DEFAULT=1

意味着对程序中所有默认构造的 io_context 对象使用并发提示 1。类似地,可以通过定义 BOOST_ASIO_CONCURRENCY_HINT_1 来覆盖由 1 构造的 io_context 对象使用的并发提示。 例如,传递

-DBOOST_ASIO_CONCURRENCY_HINT_1=BOOST_ASIO_CONCURRENCY_HINT_UNSAFE

给编译期会禁用所有对象的线程安全。

无堆栈协程

coroutine类提供无堆栈协程的支持。无堆栈协程使程序能够以最小的开销以同步方式实现异步逻辑,如下例所示:

struct session : boost::asio::coroutine
{
  boost::shared_ptr<tcp::socket> socket_;
  boost::shared_ptr<std::vector<char> > buffer_;

  session(boost::shared_ptr<tcp::socket> socket)
    : socket_(socket),
      buffer_(new std::vector<char>(1024))
  {
  }

  void operator()(boost::system::error_code ec = boost::system::error_code(), std::size_t n = 0)
  {
    if (!ec) reenter (this)
    {
      for (;;)
      {
        yield socket_->async_read_some(boost::asio::buffer(*buffer_), *this);
        yield boost::asio::async_write(*socket_, boost::asio::buffer(*buffer_, n), *this);
      }
    }
  }
};

coroutine类与伪关键字reenter,yield,fork同时使用。它们是预处理器宏,并使用类似于 Duff's Device 的技术根据 switch 语句实现。 coroutine类的文档提供了这些伪关键字的完整描述。

堆栈式协程

spawn() 函数是用于运行堆栈协程的高级包装器。 它基于 Boost.Coroutine 库。 spawn() 函数使程序能够以同步方式实现异步逻辑,如下例所示:

boost::asio::spawn(my_strand, do_echo);

// ...

void do_echo(boost::asio::yield_context yield)
{
  try
  {
    char data[128];
    for (;;)
    {
      std::size_t length =
        my_socket.async_read_some(
          boost::asio::buffer(data), yield);

      boost::asio::async_write(my_socket,
          boost::asio::buffer(data, length), yield);
    }
  }
  catch (std::exception& e)
  {
    // ...
  }
}

spawn() 的第一个参数可能是一个strand、io_context 或完成处理器。 此参数确定允许协程执行的上下文。 例如,服务器的每个客户端对象可能由多个协程组成; 它们都应该在同一strand上运行,这样就不需要显式同步。

第二个参数是一个带有签名的函数对象,说明指定的代码作为协程的一部分运行:

void coroutine(boost::asio::yield_context yield);

参数 yield 可以传递给异步操作来代替完成处理器,如下所示:

std::size_t length = my_socket.async_read_some(boost::asio::buffer(data),yield);

这将启动异步操作并暂停协程。 异步操作完成后,协程将自动恢复。

其中异步操作的处理程序签名具有以下形式,启动函数返回 result_type。:

void handler(boost::system::error_code ec, result_type result);

在上面的 async_read_some 示例中,这是 size_t。 如果异步操作失败,则将error_code 转换为system_error 异常并抛出。

处理器签名如下形式,启动函数返回void:

void handler(boost::system::error_code ec);

如上所述,错误作为system_error异常传递回协程。

要从操作中收集 error_code,而不是让它抛出异常,请将输出变量与 yield_context 关联,如下所示:

boost::system::error_code ec;
std::size_t length =
  my_socket.async_read_some(
    boost::asio::buffer(data), yield[ec]);

注意:如果 spawn() 与 Handler 类型的自定义完成处理程序一起使用,则函数对象签名实际上是:

void coroutine(boost::asio::basic_yield_context<Handler> yield);

协程TS支持

通过 awaitable 类模板、use_awaitable 完成标记和co_spawn()函数提供对 Coroutines TS 的支持。 这些工具允许程序以同步方式实现异步逻辑,结合 co_await 关键字,如以下示例所示:

boost::asio::co_spawn(executor, echo(std::move(socket)), boost::asio::detached);

// ...

boost::asio::awaitable<void> echo(tcp::socket socket)
{
  try
  {
    char data[1024];
    for (;;)
    {
      std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), boost::asio::use_awaitable);
      co_await async_write(socket, boost::asio::buffer(data, n), boost::asio::use_awaitable);
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo Exception: %s\n", e.what());
  }
}

co_spawn() 的第一个参数是一个执行器,它确定允许协程执行的上下文。 例如,服务器的每个客户端对象可能由多个协程组成; 它们都应该在同一个strand上运行,这样就不需要显式同步。

第二个参数是一个awaitable<R>,它是协程入口点函数的返回结果,在上面的例子中是调用echo的结果。 (或者,此参数可以是返回 awaitable<R> 的函数对象。)模板参数 R 是协程生成的返回值的类型。 在上面的例子中,协程返回 void。

第三个参数是一个完成标记,co_spawn() 使用它来生成一个带有签名 void(std::exception_ptr, R) 的完成处理程序。 一旦完成,这个完成处理程序就会被协程的结果调用。 在上面的示例中,我们传递了一个完成标记类型 boost::asio::detached,它用于显式忽略异步操作的结果。

在这个例子中,协程的主体是在 echo 函数中实现的。 当 use_awaitable 完成令牌传递给异步操作时,此异步操作的启动函数返回一个可与co_await关键字一起使用的可等待对象:

std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), boost::asio::use_awaitable);

其中异步操作的处理程序函数签名具有以下形式:

void handler(boost::system::error_code ec, result_type result);

co_await表达式的结果类型是 result_type。在上面的 async_read_some 示例中,这是 size_t。 如果异步操作失败,则将error_code 转换为system_error 异常并抛出。

处理程序函数签名为如下形式的:

void handler(boost::system::error_code ec);

co_await 表达式产生一个 void 结果。 如上所述,错误作为 system_error 异常传递回协程。

上一篇:boost::geometry::append用法的测试程序


下一篇:boost::math::daubechies_wavelet用法的测试程序