web socket RFC6455 connection --asio C++11

#ifndef __APP_WEBSOCKET_CONNECTION_H__
#define __APP_WEBSOCKET_CONNECTION_H__
#include <asio.hpp>
#include "tcp_connection.hpp"


class websocket_connection : public tcp_connection
{
public:
	websocket_connection( const std::tr1::shared_ptr<asio::ip::tcp::socket> & s );
	std::tr1::shared_ptr<asio::ip::tcp::socket> get_socket() ;
	//握手应答后调用该方法
	void start_read();
	~websocket_connection();
	//buf为payload数据,write方法内部自动加上web socket frame的头部.
	void write( const std::tr1::shared_ptr<buffer> & buf);
	//服务器接受到连接时,调用该方法执行握手过程
	void start_handshake();
	//设置接收到消息的回调方法
	void set_message_handler(const OnMessage & on_message);
protected:
	//读取web socket frame 的最小长度值
	void read_min_head();
	//读取剩余的数据、在一个比较大的包中,可能会出现多次调用该方法才读到一个完整的包
	void read_leave_data( int leave_len );
private:
	shared_ptr<buffer> read_buf;
	OnMessage on_message_;

};


#endif


websocket_connection::websocket_connection( const std::tr1::shared_ptr<asio::ip::tcp::socket> & s ):tcp_connection(s){
		read_buf = get_buffer(8192);
		read_buf->size(8192);
		}
	std::tr1::shared_ptr<asio::ip::tcp::socket> websocket_connection:: get_socket() { return socket_;}

	void websocket_connection::  start_read(){
		//从包头读取
		read_min_head();
	}
	websocket_connection::~websocket_connection(){
		LOG(ERROR)<<"~websocket_connection";
	}
	void websocket_connection::write( const std::tr1::shared_ptr<buffer> & buf){


		websocket_frame frame;
		frame.fin = true;
		frame.mask = true;//使用mask、某些浏览器版本较低不能使用
		frame.opcode = 0x02;//发送的是二进制数据
		buf->offset(4);
		frame.payload = buf;
		frame.package_size();
		if( frame.payload_len == 127 ){
			LOG(ERROR)<<"ignore big message.."<<buf->capacity();
			return;
		}
		shared_ptr<buffer> frame_buf = frame.package();

		tcp_connection::write(frame_buf);

	}
void websocket_connection::read_min_head(){
		auto buf = asio::buffer( read_buf->data(),websocket_frame::fix_min_len);

		auto self(shared_from_this());

		auto read_handler = [self,this](std::error_code ec, std::size_t length){
			if( ec ){
				LOG(ERROR)<<"read_min_head error:"<<ec.message();
				if( on_error_ ){
					on_error_(ec.message());
				}
			}else{
				read_buf->size(length);
				websocket_frame frame;
				int leave_len = frame.unpakcage(read_buf);
				//读取需要解析一个frame所需要的剩余数据
				read_leave_data(leave_len);
			}
		};
		//asio 异步读取指定长度的数据,读取成功后回调read_handler方法.
		asio::async_read(*socket_, buf,read_handler);
	}
	void websocket_connection::read_leave_data( int leave_len ){

		if( read_buf->length() + leave_len > read_buf->capacity() ){
			if( on_error_ ){
				on_error_("data to big...");
			}
			return;
		}
		auto buf = asio::buffer( read_buf->data()+read_buf->length(),leave_len);
		auto self( shared_from_this());

		auto read_handler =[self,this](std::error_code ec, std::size_t length){
			if( ec ){
				if( on_error_ ){
					on_error_(ec.message());
				}
			}else{
				read_buf->size(read_buf->length() + length);
				websocket_frame frame;
				int leave_len = frame.unpakcage(read_buf);
				if( leave_len == 0 ){
					//读取到一个完整的freame了...
					if( on_message_ ){
						on_message_(frame.payload);
					}

					read_buf->size(0);
					start_read();
				}else{
					//如果不够、则继续读取剩余的字节.

					read_leave_data(leave_len);
				}
			}
		};
		asio::async_read(*socket_, buf,read_handler);
	}

void websocket_connection::start_handshake(){
	shared_ptr<buffer> buf = get_buffer(512);
	auto  asio_buf = asio::buffer( buf->data(), 512);
	auto self(shared_from_this());

	socket_->async_read_some( asio_buf,
		[self,this,buf]( const asio::error_code& error,  std::size_t bytes_transferred ){
			    if( error ){
					if( on_error_ ){
						on_error_(error.message());
					}
					recycle_buffer(buf);
					return;
			    }
				std::string data((const char*)buf->data(),bytes_transferred);
				recycle_buffer(buf);

				std::string key="Sec-WebSocket-Key:";
				auto pos = data.find(key);
				auto posEnd = data.find("\r\n",pos);
				auto value = data.substr(pos + key.length(),posEnd - (pos + key.length()));
				std::string sha1Src = trim(value);
				sha1Src += std::string("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");

				unsigned char sha1out[20];
				sha1((const unsigned char *)sha1Src.c_str(),sha1Src.length(),sha1out);
				std::vector<unsigned char> data64;
				for( auto c: sha1out) data64.push_back(c);


				std::ostringstream os_rsp;
				os_rsp<<"HTTP/1.1 101 Switching Protocols\r\n"
    				  <<"Upgrade: websocket\r\n"
				      <<"Connection: Upgrade\r\n"
				      <<"Sec-WebSocket-Accept: "<<base64Encode(data64)<<"=\r\n"
					  <<"\r\n";
				std::string rsp = os_rsp.str();

				socket_->async_send(asio::buffer(rsp),[self,this](const asio::error_code& ec, std::size_t bytes_transferred){

				});
				start_read();//握手应答后,启动对web socke frame的读
			});
}




void websocket_connection::set_message_handler(const OnMessage & on_message){
	on_message_ = on_message;
}


使用;;

void init_service_admin_websocket( asio::io_service & io){

	static std::once_flag init_flag;
	std::call_once( init_flag,[&io]{
		auto new_connected_handler = []( const std::tr1::shared_ptr<asio::ip::tcp::socket> & s)
		{//新连接到来时的回调方法
			shared_ptr<websocket_connection> websocket( new websocket_connection(s));
			//设置错误发生时的回调方法
			auto on_error =[websocket](const std::string & error ){//主要清理资源
				LOG(ERROR)<<error<<" "<<websocket->name();
				sessions::clear_session(websocket);
				websocket.reset();
			};
			websocket->set_error_handler(on_error);

			std::tr1::weak_ptr<websocket_connection> wpsocket(websocket);
			//设置接收到消息时的回调方法,websocket frame的payload
			auto on_message = [wpsocket]( const std::tr1::shared_ptr<buffer> & buf)
			{
				//消息分发

			};

			websocket->set_message_handler(on_message);

			websocket->start_handshake();//启动握手

		};
		unsigned int port = 80;
	 	tcp_server * srv (new tcp_server(io, port , new_connected_handler));
	 	srv->start_accept();
		});
}



web socket RFC6455 connection --asio C++11,布布扣,bubuko.com

web socket RFC6455 connection --asio C++11

上一篇:C++ 优先队列


下一篇:C++ 静态存储周期(static storage duration)