#ifndef __APP_WEBSOCKET_CONNECTION_H__
#define __APP_WEBSOCKET_CONNECTION_H__
#include <asio.hpp>
#include "tcp_connection.hpp"
class websocket_connection : public tcp_connection
{
public:
websocket_connection( const std::tr1::shared_ptr<asio::ip::tcp::socket> & s );
std::tr1::shared_ptr<asio::ip::tcp::socket> get_socket() ;
//握手应答后调用该方法
void start_read();
~websocket_connection();
//buf为payload数据,write方法内部自动加上web socket frame的头部.
void write( const std::tr1::shared_ptr<buffer> & buf);
//服务器接受到连接时,调用该方法执行握手过程
void start_handshake();
//设置接收到消息的回调方法
void set_message_handler(const OnMessage & on_message);
protected:
//读取web socket frame 的最小长度值
void read_min_head();
//读取剩余的数据、在一个比较大的包中,可能会出现多次调用该方法才读到一个完整的包
void read_leave_data( int leave_len );
private:
shared_ptr<buffer> read_buf;
OnMessage on_message_;
};
#endif
websocket_connection::websocket_connection( const std::tr1::shared_ptr<asio::ip::tcp::socket> & s ):tcp_connection(s){
read_buf = get_buffer(8192);
read_buf->size(8192);
}
std::tr1::shared_ptr<asio::ip::tcp::socket> websocket_connection:: get_socket() { return socket_;}
void websocket_connection:: start_read(){
//从包头读取
read_min_head();
}
websocket_connection::~websocket_connection(){
LOG(ERROR)<<"~websocket_connection";
}
void websocket_connection::write( const std::tr1::shared_ptr<buffer> & buf){
websocket_frame frame;
frame.fin = true;
frame.mask = true;//使用mask、某些浏览器版本较低不能使用
frame.opcode = 0x02;//发送的是二进制数据
buf->offset(4);
frame.payload = buf;
frame.package_size();
if( frame.payload_len == 127 ){
LOG(ERROR)<<"ignore big message.."<<buf->capacity();
return;
}
shared_ptr<buffer> frame_buf = frame.package();
tcp_connection::write(frame_buf);
}
void websocket_connection::read_min_head(){
auto buf = asio::buffer( read_buf->data(),websocket_frame::fix_min_len);
auto self(shared_from_this());
auto read_handler = [self,this](std::error_code ec, std::size_t length){
if( ec ){
LOG(ERROR)<<"read_min_head error:"<<ec.message();
if( on_error_ ){
on_error_(ec.message());
}
}else{
read_buf->size(length);
websocket_frame frame;
int leave_len = frame.unpakcage(read_buf);
//读取需要解析一个frame所需要的剩余数据
read_leave_data(leave_len);
}
};
//asio 异步读取指定长度的数据,读取成功后回调read_handler方法.
asio::async_read(*socket_, buf,read_handler);
}
void websocket_connection::read_leave_data( int leave_len ){
if( read_buf->length() + leave_len > read_buf->capacity() ){
if( on_error_ ){
on_error_("data to big...");
}
return;
}
auto buf = asio::buffer( read_buf->data()+read_buf->length(),leave_len);
auto self( shared_from_this());
auto read_handler =[self,this](std::error_code ec, std::size_t length){
if( ec ){
if( on_error_ ){
on_error_(ec.message());
}
}else{
read_buf->size(read_buf->length() + length);
websocket_frame frame;
int leave_len = frame.unpakcage(read_buf);
if( leave_len == 0 ){
//读取到一个完整的freame了...
if( on_message_ ){
on_message_(frame.payload);
}
read_buf->size(0);
start_read();
}else{
//如果不够、则继续读取剩余的字节.
read_leave_data(leave_len);
}
}
};
asio::async_read(*socket_, buf,read_handler);
}
void websocket_connection::start_handshake(){
shared_ptr<buffer> buf = get_buffer(512);
auto asio_buf = asio::buffer( buf->data(), 512);
auto self(shared_from_this());
socket_->async_read_some( asio_buf,
[self,this,buf]( const asio::error_code& error, std::size_t bytes_transferred ){
if( error ){
if( on_error_ ){
on_error_(error.message());
}
recycle_buffer(buf);
return;
}
std::string data((const char*)buf->data(),bytes_transferred);
recycle_buffer(buf);
std::string key="Sec-WebSocket-Key:";
auto pos = data.find(key);
auto posEnd = data.find("\r\n",pos);
auto value = data.substr(pos + key.length(),posEnd - (pos + key.length()));
std::string sha1Src = trim(value);
sha1Src += std::string("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
unsigned char sha1out[20];
sha1((const unsigned char *)sha1Src.c_str(),sha1Src.length(),sha1out);
std::vector<unsigned char> data64;
for( auto c: sha1out) data64.push_back(c);
std::ostringstream os_rsp;
os_rsp<<"HTTP/1.1 101 Switching Protocols\r\n"
<<"Upgrade: websocket\r\n"
<<"Connection: Upgrade\r\n"
<<"Sec-WebSocket-Accept: "<<base64Encode(data64)<<"=\r\n"
<<"\r\n";
std::string rsp = os_rsp.str();
socket_->async_send(asio::buffer(rsp),[self,this](const asio::error_code& ec, std::size_t bytes_transferred){
});
start_read();//握手应答后,启动对web socke frame的读
});
}
void websocket_connection::set_message_handler(const OnMessage & on_message){
on_message_ = on_message;
}
使用;;
void init_service_admin_websocket( asio::io_service & io){
static std::once_flag init_flag;
std::call_once( init_flag,[&io]{
auto new_connected_handler = []( const std::tr1::shared_ptr<asio::ip::tcp::socket> & s)
{//新连接到来时的回调方法
shared_ptr<websocket_connection> websocket( new websocket_connection(s));
//设置错误发生时的回调方法
auto on_error =[websocket](const std::string & error ){//主要清理资源
LOG(ERROR)<<error<<" "<<websocket->name();
sessions::clear_session(websocket);
websocket.reset();
};
websocket->set_error_handler(on_error);
std::tr1::weak_ptr<websocket_connection> wpsocket(websocket);
//设置接收到消息时的回调方法,websocket frame的payload
auto on_message = [wpsocket]( const std::tr1::shared_ptr<buffer> & buf)
{
//消息分发
};
websocket->set_message_handler(on_message);
websocket->start_handshake();//启动握手
};
unsigned int port = 80;
tcp_server * srv (new tcp_server(io, port , new_connected_handler));
srv->start_accept();
});
}
web socket RFC6455 connection --asio C++11,布布扣,bubuko.com
web socket RFC6455 connection --asio C++11