在以前的项目中,使用过boost::asio来搭建Tcp服务器,最近有个新的项目也需要搭建服务器,因此想要将以前代码移植过来。
原先的项目使用的是异步回调函数的方式进行搭建的,回想起来,当时写回调函数,和处理读写buffer时艰难的岁月,因此想要重新构建原先的架构。经过查找资料,发现可以使用boost的协程来实现,可以把异步处理成跟同步的效果。
先看一段代码:
void do_read(void)
{
auto self(shared_from_this());
boost::asio::spawn(m_strand,boost::bind(&TcpSession::handle_read, self,std::placeholders::_1));
}
这里使用到了boost::asio::spawn,正是通过boost::asio::spawn来创建协程,协程所处理的函数为TcpSession::handle_read。
void handle_read(boost::asio::yield_context yield)
{
while(1)
{
std::vector<char> vectorRecvBuffer(1024);
boost::system::error_code ec;
int32_t i32ReadLen = m_socket.async_read_some(boost::asio::buffer(vectorRecvBuffer,vectorRecvBuffer.size()),yield[ec]);
if (!ec)
{
printf("read len %d\n",i32ReadLen);
std::string msg(vectorRecvBuffer.data(),i32ReadLen);
printf("get msg %s\n",msg.c_str());
std::string strResponMsg("hello aaa");
handle_write(yield,strResponMsg);
}
else//客户端关闭连接
{
try
{
m_socket.shutdown(BoostTcp::socket::shutdown_both, const_cast<boost::system::error_code&>(ec));
m_socket.close(const_cast<boost::system::error_code&>(ec));
}
catch (std::exception& e)
{
printf("colse %s\n",e.what());
}
return;
}
}
}
可以看到async_read_some,这里是异步的读操作,如果是以前的回调函数的方式,那么处理方式为当read完成之后,调用你注册的回调。但是使用了boost::asio::spawn之后呢,虽然还是异步处理方式,但是代码上的处理看起来就是同步的方式。
这里来简单说下流程:do_read->boost::asio::spawn->handle_read->async_read_some,当遇到async_read_some异步读操作之后,将会返回do_read结束。直到io收到数据,async_read_some读完数据之后返回继续往下走,来到if (!ec)判断( 这里需要注意的是,如果没有使用yield[ec]接收错误码直接yield的话,发生错误的时候将会抛出异常 )。
这样是不是看起来就是同步的读操作呢,而且这里的vectorRecvBuffer的生命周期也是在此代码块中的,不像以前使用回调的方式,需要保证读的buffer不能在异步期间结束生命周期,boost给我们保证了vectorRecvBuffer的有效性,减少了我们的代码开发处理。
boost的异步处理加上协程简直不要太好用,它将异步操作处理成跟同步一样,大大减少的开发的时间,而且更加好的理解代码,毕竟人的思维就是同步的方式。
下面附上完整的代码:
#include <iostream>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
typedef boost::asio::ip::tcp BoostTcp;
typedef boost::asio::io_service BoostIoServer;
class TcpSession : public std::enable_shared_from_this<TcpSession>
{
public:
TcpSession(BoostIoServer &ioServer,BoostTcp::socket socket)
: m_socket(std::move(socket)),
m_strand(ioServer.get_executor())
{
}
~TcpSession()
{
}
void start(void)
{
do_read();
}
void send(const std::string &msg)
{
do_write(msg);
}
private:
void do_read(void)
{
auto self(shared_from_this());
boost::asio::spawn(m_strand,boost::bind(&TcpSession::handle_read, self,std::placeholders::_1));
}
void do_write(const std::string &msg)
{
auto self(shared_from_this());
boost::asio::spawn(m_strand,boost::bind(&TcpSession::handle_write, self,std::placeholders::_1,msg));
}
void handle_read(boost::asio::yield_context yield)
{
while(1)
{
std::vector<char> vectorRecvBuffer(1024);
boost::system::error_code ec;
int32_t i32ReadLen = m_socket.async_read_some(boost::asio::buffer(vectorRecvBuffer,vectorRecvBuffer.size()),yield[ec]);
if (!ec)
{
printf("read len %d\n",i32ReadLen);
std::string msg(vectorRecvBuffer.data(),i32ReadLen);
printf("get msg %s\n",msg.c_str());
std::string strResponMsg("hello aaa");
handle_write(yield,strResponMsg);
}
else//客户端关闭连接
{
try
{
m_socket.shutdown(BoostTcp::socket::shutdown_both, const_cast<boost::system::error_code&>(ec));
m_socket.close(const_cast<boost::system::error_code&>(ec));
}
catch (std::exception& e)
{
printf("colse %s\n",e.what());
}
return;
}
}
}
void handle_write(boost::asio::yield_context yield,const std::string &msg)
{
int32_t i32BufLen = msg.length();
printf("Buf Len %d\n",i32BufLen);
std::vector<char> vectorSendBuffer(i32BufLen);
boost::system::error_code ec;
memcpy((void *)(vectorSendBuffer.data()),(const void *)msg.data(),i32BufLen);
int32_t i32WriteLen = m_socket.async_write_some(boost::asio::buffer(vectorSendBuffer,i32BufLen),yield[ec]);
if (!ec)
{
printf("send len %d\n",i32WriteLen);
}
else
{
printf("send error %s\n",ec.what().c_str());
}
}
private:
BoostTcp::socket m_socket;
boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
};
class TcpServer
{
public:
TcpServer(BoostIoServer &io_service, int32_t i32Port)
: m_ioServer(io_service),
m_ioAcceptor(io_service, BoostTcp::endpoint(BoostTcp::v4(), i32Port)),
m_i32Port(i32Port)
{
boost::asio::spawn(m_ioServer,boost::bind(&TcpServer::do_accept,this,std::placeholders::_1));
}
~TcpServer()
{
}
private:
void do_accept(boost::asio::yield_context yield)
{
while(1)
{
boost::system::error_code ec;
BoostTcp::socket socket(m_ioServer);
m_ioAcceptor.async_accept(socket, yield[ec]);
if(!ec)
{
std::shared_ptr<TcpSession> tcpSession = std::make_shared<TcpSession>(m_ioServer,std::move(socket));
tcpSession->start();
}
else
{
printf("%s\n",ec.what().c_str());
return;
}
}
}
private:
BoostIoServer &m_ioServer;
BoostTcp::acceptor m_ioAcceptor;
int32_t m_i32Port;
};
int main(void)
{
BoostIoServer ioServer;
TcpServer tcpServer(ioServer,12345);
ioServer.run();
return 0;
}
所有的处理,看起来都是跟同步的方式一样,十分的舒服。
编译:
g++ tcp_server.cpp -std=c++11 -lpthread -lboost_coroutine -I ./lib/boost/include/ -o tcp_server