使用boost协程搭建tcp服务器

         在以前的项目中,使用过boost::asio来搭建Tcp服务器,最近有个新的项目也需要搭建服务器,因此想要将以前代码移植过来。

        原先的项目使用的是异步回调函数的方式进行搭建的,回想起来,当时写回调函数,和处理读写buffer时艰难的岁月,因此想要重新构建原先的架构。经过查找资料,发现可以使用boost的协程来实现,可以把异步处理成跟同步的效果。

        先看一段代码:

void do_read(void)
{    
    auto self(shared_from_this());
    boost::asio::spawn(m_strand,boost::bind(&TcpSession::handle_read, self,std::placeholders::_1));
}

        这里使用到了boost::asio::spawn,正是通过boost::asio::spawn来创建协程,协程所处理的函数为TcpSession::handle_read。

void handle_read(boost::asio::yield_context yield)
{ 
    while(1)
    {
        std::vector<char> vectorRecvBuffer(1024);

        boost::system::error_code ec;
        int32_t i32ReadLen = m_socket.async_read_some(boost::asio::buffer(vectorRecvBuffer,vectorRecvBuffer.size()),yield[ec]);
        
        if (!ec)
        {
            printf("read len %d\n",i32ReadLen);
            std::string msg(vectorRecvBuffer.data(),i32ReadLen);
            printf("get msg %s\n",msg.c_str());

            std::string strResponMsg("hello aaa");
            handle_write(yield,strResponMsg);
        }
        else//客户端关闭连接
        {
            try
            {
                m_socket.shutdown(BoostTcp::socket::shutdown_both, const_cast<boost::system::error_code&>(ec));
                m_socket.close(const_cast<boost::system::error_code&>(ec));    
            }
            catch (std::exception& e)
            {
                printf("colse %s\n",e.what());   
            }

            return;
        }
    }
}

        可以看到async_read_some,这里是异步的读操作,如果是以前的回调函数的方式,那么处理方式为当read完成之后,调用你注册的回调。但是使用了boost::asio::spawn之后呢,虽然还是异步处理方式,但是代码上的处理看起来就是同步的方式。

        这里来简单说下流程:do_read->boost::asio::spawn->handle_read->async_read_some,当遇到async_read_some异步读操作之后,将会返回do_read结束。直到io收到数据,async_read_some读完数据之后返回继续往下走,来到if (!ec)判断( 这里需要注意的是,如果没有使用yield[ec]接收错误码直接yield的话,发生错误的时候将会抛出异常 )。

        这样是不是看起来就是同步的读操作呢,而且这里的vectorRecvBuffer的生命周期也是在此代码块中的,不像以前使用回调的方式,需要保证读的buffer不能在异步期间结束生命周期,boost给我们保证了vectorRecvBuffer的有效性,减少了我们的代码开发处理。
        

        boost的异步处理加上协程简直不要太好用,它将异步操作处理成跟同步一样,大大减少的开发的时间,而且更加好的理解代码,毕竟人的思维就是同步的方式。

        下面附上完整的代码:

#include <iostream>

#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

typedef boost::asio::ip::tcp        BoostTcp;
typedef boost::asio::io_service     BoostIoServer;

class TcpSession : public std::enable_shared_from_this<TcpSession> 
{
public:

    TcpSession(BoostIoServer &ioServer,BoostTcp::socket socket)
              : m_socket(std::move(socket)),
                m_strand(ioServer.get_executor())
    {    
    }

    ~TcpSession()
    {
    }

    void start(void)
    {
        do_read();
    }

    void send(const std::string &msg)
    {
        do_write(msg);
    }

private:
    void do_read(void)
    {    
        auto self(shared_from_this());
        boost::asio::spawn(m_strand,boost::bind(&TcpSession::handle_read, self,std::placeholders::_1));
    }

    void do_write(const std::string &msg)
    { 
        auto self(shared_from_this());
        boost::asio::spawn(m_strand,boost::bind(&TcpSession::handle_write, self,std::placeholders::_1,msg));
    }

    void handle_read(boost::asio::yield_context yield)
    { 
        while(1)
        {
            std::vector<char> vectorRecvBuffer(1024);

            boost::system::error_code ec;
            int32_t i32ReadLen = m_socket.async_read_some(boost::asio::buffer(vectorRecvBuffer,vectorRecvBuffer.size()),yield[ec]);
            
            if (!ec)
            {
                printf("read len %d\n",i32ReadLen);
                std::string msg(vectorRecvBuffer.data(),i32ReadLen);
                printf("get msg %s\n",msg.c_str());

                std::string strResponMsg("hello aaa");
                handle_write(yield,strResponMsg);
            }
            else//客户端关闭连接
            {
                try
                {
                    m_socket.shutdown(BoostTcp::socket::shutdown_both, const_cast<boost::system::error_code&>(ec));
                    m_socket.close(const_cast<boost::system::error_code&>(ec));    
                }
                catch (std::exception& e)
                {
                    printf("colse %s\n",e.what());   
                }

                return;
            }
        }
    }

    void handle_write(boost::asio::yield_context yield,const std::string &msg)
    {
        int32_t i32BufLen = msg.length();

        printf("Buf Len %d\n",i32BufLen);

        std::vector<char> vectorSendBuffer(i32BufLen);
        boost::system::error_code ec;

        memcpy((void *)(vectorSendBuffer.data()),(const void *)msg.data(),i32BufLen);

        int32_t i32WriteLen = m_socket.async_write_some(boost::asio::buffer(vectorSendBuffer,i32BufLen),yield[ec]);

        if (!ec)
        {    
            printf("send len %d\n",i32WriteLen);         
        }
        else
        {
            printf("send error %s\n",ec.what().c_str());
        }  
    }

private:
    BoostTcp::socket m_socket;
    boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
};

class TcpServer
{
public:
    TcpServer(BoostIoServer &io_service, int32_t i32Port)
                        : m_ioServer(io_service),
                          m_ioAcceptor(io_service, BoostTcp::endpoint(BoostTcp::v4(), i32Port)),
                          m_i32Port(i32Port)
    {
        boost::asio::spawn(m_ioServer,boost::bind(&TcpServer::do_accept,this,std::placeholders::_1));
    }

    ~TcpServer()
    {
    }

private:
    void do_accept(boost::asio::yield_context yield)
    {
        while(1)
        {
            boost::system::error_code ec;
            BoostTcp::socket socket(m_ioServer);
            
            m_ioAcceptor.async_accept(socket, yield[ec]);
            if(!ec)
            {
                std::shared_ptr<TcpSession> tcpSession = std::make_shared<TcpSession>(m_ioServer,std::move(socket));                                                                             
                tcpSession->start();
            }
            else
            {
                printf("%s\n",ec.what().c_str());
                return;
            }                                   
        }
    }
private:
    BoostIoServer &m_ioServer;
    BoostTcp::acceptor m_ioAcceptor;
    int32_t m_i32Port;
};

int main(void)
{
    BoostIoServer ioServer;

    TcpServer tcpServer(ioServer,12345);
    ioServer.run();

    return 0;
}

所有的处理,看起来都是跟同步的方式一样,十分的舒服。

编译:

g++ tcp_server.cpp -std=c++11 -lpthread -lboost_coroutine -I ./lib/boost/include/ -o tcp_server

上一篇:腾讯人力资源与组织管理体系(54页)


下一篇:spring参数校验之validation的使用