【Boost】boost库asio详解3——io_service作为work pool

无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。
使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.dispatch的接口也和io_service.post一样,但不同的是它是直接调用而不是经过push到队列然后在io_services.run中执行,而在这个示例当中,显然我们需要把工作交到另一个线程去完成,这样才不会影响网络接收线程池的工作以达到高效率的接收数据,这种设计与前面的netsever其实相同,这就是典型的Half Sync/Half Async。二者的区别就是netsever自己实现了工作队列,而不是直接使用io_service,这种设计实际上在win下是使用了iocp作为工作队列。
不过我更倾向于前一种设计,因为那样做,代码一切都在自己的掌握中,而io_service则是经过许多封装代码,并且本身设计只是用于处理网络完成事件的。
无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。

    1. #include <stdio.h>
    2. #include <cstdlib>
    3. #include <iostream>
    4. #include <boost/thread.hpp>
    5. #include <boost/aligned_storage.hpp>
    6. #include <boost/array.hpp>
    7. #include <boost/bind.hpp>
    8. #include <boost/enable_shared_from_this.hpp>
    9. #include <boost/noncopyable.hpp>
    10. #include <boost/shared_ptr.hpp>
    11. #include <boost/asio.hpp>
    12. using boost::asio::ip::tcp;
    13. class handler_allocator
    14. : private boost::noncopyable
    15. {
    16. public:
    17. handler_allocator()
    18. : in_use_(false)
    19. {
    20. }
    21. void* allocate(std::size_t size)
    22. {
    23. if (!in_use_ && size < storage_.size)
    24. {
    25. in_use_ = true;
    26. return storage_.address();
    27. }
    28. else
    29. {
    30. return ::operator new(size);
    31. }
    32. }
    33. void deallocate(void* pointer)
    34. {
    35. if (pointer == storage_.address())
    36. {
    37. in_use_ = false;
    38. }
    39. else
    40. {
    41. ::operator delete(pointer);
    42. }
    43. }
    44. private:
    45. // Storage space used for handler-based custom memory allocation.
    46. boost::aligned_storage<1024> storage_;
    47. // Whether the handler-based custom allocation storage has been used.
    48. bool in_use_;
    49. };
    50. template <typename Handler>
    51. class custom_alloc_handler
    52. {
    53. public:
    54. custom_alloc_handler(handler_allocator& a, Handler h)
    55. : allocator_(a),
    56. handler_(h)
    57. {
    58. }
    59. template <typename Arg1>
    60. void operator()(Arg1 arg1)
    61. {
    62. handler_(arg1);
    63. }
    64. template <typename Arg1, typename Arg2>
    65. void operator()(Arg1 arg1, Arg2 arg2)
    66. {
    67. handler_(arg1, arg2);
    68. }
    69. friend void* asio_handler_allocate(std::size_t size,
    70. custom_alloc_handler<Handler>* this_handler)
    71. {
    72. return this_handler->allocator_.allocate(size);
    73. }
    74. friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/,
    75. custom_alloc_handler<Handler>* this_handler)
    76. {
    77. this_handler->allocator_.deallocate(pointer);
    78. }
    79. private:
    80. handler_allocator& allocator_;
    81. Handler handler_;
    82. };
    83. // Helper function to wrap a handler object to add custom allocation.
    84. template <typename Handler>
    85. inline custom_alloc_handler<Handler> make_custom_alloc_handler(
    86. handler_allocator& a, Handler h)
    87. {
    88. return custom_alloc_handler<Handler>(a, h);
    89. }
    90. /// A pool of io_service objects.
    91. class io_service_pool
    92. : private boost::noncopyable
    93. {
    94. public:
    95. /// Construct the io_service pool.
    96. explicit io_service_pool(std::size_t pool_size) : next_io_service_(0)
    97. {
    98. if (pool_size == 0)
    99. throw std::runtime_error("io_service_pool size is 0");
    100. // Give all the io_services work to do so that their run() functions will not
    101. // exit until they are explicitly stopped.
    102. for (std::size_t i = 0; i < pool_size; ++i)
    103. {
    104. io_service_ptr io_service(new boost::asio::io_service);
    105. work_ptr work(new boost::asio::io_service::work(*io_service));
    106. io_services_.push_back(io_service);
    107. work_.push_back(work);
    108. }
    109. }
    110. // Run all io_service objects in the pool.
    111. void run()
    112. {
    113. // Create a pool of threads to run all of the io_services.
    114. std::vector<boost::shared_ptr<boost::thread> > threads;
    115. for (std::size_t i = 0; i < io_services_.size(); ++i)
    116. {
    117. boost::shared_ptr<boost::thread> thread(new boost::thread(
    118. boost::bind(&boost::asio::io_service::run, io_services_[i])));
    119. threads.push_back(thread);
    120. }
    121. // Wait for all threads in the pool to exit.
    122. for (std::size_t i = 0; i < threads.size(); ++i)
    123. threads[i]->join();
    124. }
    125. // Stop all io_service objects in the pool.
    126. void stop()
    127. {
    128. // Explicitly stop all io_services.
    129. for (std::size_t i = 0; i < io_services_.size(); ++i)
    130. io_services_[i]->stop();
    131. }
    132. // Get an io_service to use.
    133. boost::asio::io_service& get_io_service()
    134. {
    135. // Use a round-robin scheme to choose the next io_service to use.
    136. boost::asio::io_service& io_service = *io_services_[next_io_service_];
    137. ++next_io_service_;
    138. if (next_io_service_ == io_services_.size())
    139. next_io_service_ = 0;
    140. return io_service;
    141. }
    142. private:
    143. typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;
    144. typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;
    145. /// The pool of io_services.
    146. std::vector<io_service_ptr> io_services_;
    147. /// The work that keeps the io_services running.
    148. std::vector<work_ptr> work_;
    149. /// The next io_service to use for a connection.
    150. std::size_t next_io_service_;
    151. };
    152. class session
    153. : public boost::enable_shared_from_this<session>
    154. {
    155. public:
    156. session(boost::asio::io_service& work_service
    157. , boost::asio::io_service& io_service)
    158. : socket_(io_service)
    159. , io_work_service(work_service)
    160. {
    161. }
    162. tcp::socket& socket()
    163. {
    164. return socket_;
    165. }
    166. void start()
    167. {
    168. socket_.async_read_some(boost::asio::buffer(data_),
    169. make_custom_alloc_handler(allocator_,
    170. boost::bind(&session::handle_read,
    171. shared_from_this(),
    172. boost::asio::placeholders::error,
    173. boost::asio::placeholders::bytes_transferred)));
    174. }
    175. void handle_read(const boost::system::error_code& error,
    176. size_t bytes_transferred)
    177. {
    178. if (!error)
    179. {
    180. boost::shared_ptr<std::vector<char> > buf(new std::vector<char>);
    181. buf->resize(bytes_transferred);
    182. std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin());
    183. io_work_service.post(boost::bind(&session::on_receive
    184. , shared_from_this(), buf, bytes_transferred));
    185. socket_.async_read_some(boost::asio::buffer(data_),
    186. make_custom_alloc_handler(allocator_,
    187. boost::bind(&session::handle_read,
    188. shared_from_this(),
    189. boost::asio::placeholders::error,
    190. boost::asio::placeholders::bytes_transferred)));
    191. }
    192. }
    193. void handle_write(const boost::system::error_code& error)
    194. {
    195. if (!error)
    196. {
    197. }
    198. }
    199. void on_receive(boost::shared_ptr<std::vector<char> > buffers
    200. , size_t bytes_transferred)
    201. {
    202. char* data_stream = &(*buffers->begin());
    203. // in here finish the work.
    204. std::cout << "receive :" << bytes_transferred << " bytes." <<
    205. "message :" << data_stream << std::endl;
    206. }
    207. private:
    208. // The io_service used to finish the work.
    209. boost::asio::io_service& io_work_service;
    210. // The socket used to communicate with the client.
    211. tcp::socket socket_;
    212. // Buffer used to store data received from the client.
    213. boost::array<char, 1024> data_;
    214. // The allocator to use for handler-based custom memory allocation.
    215. handler_allocator allocator_;
    216. };
    217. typedef boost::shared_ptr<session> session_ptr;
    218. class server
    219. {
    220. public:
    221. server(short port, std::size_t io_service_pool_size)
    222. : io_service_pool_(io_service_pool_size)
    223. , io_service_work_pool_(io_service_pool_size)
    224. , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
    225. {
    226. session_ptr new_session(new session(io_service_work_pool_.get_io_service()
    227. , io_service_pool_.get_io_service()));
    228. acceptor_.async_accept(new_session->socket(),
    229. boost::bind(&server::handle_accept, this, new_session,
    230. boost::asio::placeholders::error));
    231. }
    232. void handle_accept(session_ptr new_session,
    233. const boost::system::error_code& error)
    234. {
    235. if (!error)
    236. {
    237. new_session->start();
    238. new_session.reset(new session(io_service_work_pool_.get_io_service()
    239. , io_service_pool_.get_io_service()));
    240. acceptor_.async_accept(new_session->socket(),
    241. boost::bind(&server::handle_accept, this, new_session,
    242. boost::asio::placeholders::error));
    243. }
    244. }
    245. void run()
    246. {
    247. io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
    248. , &io_service_pool_)));
    249. work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
    250. , &io_service_work_pool_)));
    251. }
    252. void stop()
    253. {
    254. io_service_pool_.stop();
    255. io_service_work_pool_.stop();
    256. io_thread_->join();
    257. work_thread_->join();
    258. }
    259. private:
    260. boost::shared_ptr<boost::thread> io_thread_;
    261. boost::shared_ptr<boost::thread> work_thread_;
    262. io_service_pool io_service_pool_;
    263. io_service_pool io_service_work_pool_;
    264. tcp::acceptor acceptor_;
    265. };
    266. int main(int argc, char* argv[])
    267. {
    268. try
    269. {
    270. if (argc != 2)
    271. {
    272. std::cerr << "Usage: server <port>/n";
    273. return 1;
    274. }
    275. using namespace std; // For atoi.
    276. server s(atoi(argv[1]), 10);
    277. s.run();
    278. getchar();
    279. s.stop();
    280. }
    281. catch (std::exception& e)
    282. {
    283. std::cerr << "Exception: " << e.what() << "/n";
    284. }
    285. return 0;
    286. }
上一篇:HTML基础


下一篇:spring集成rabbitmq