使用protobuf设计消息协议(C++asio网络库相关)

protobuf是目前应用最广范的设计消息协议工具,具有以下一些特点:
1、proto2只支持python,c++,JAVA,proto3可以支持更多语言
2、定义协议以proto作为文件后缀名,在第一行用syntax指定proto版本
3、C++使用protobuf官方文档:http://developers.google.com/protocol-buffers/docs/cpptutorial当然需要*后才能访问
4、required表示必须字段,optional表示可选字段,在proto3中可以省略定义默认都是可选
5、用protoc生成工具可以直接将proto协议生成对应语言可使用的代码,生成命令比如:protoc --cpp_out=. Protocal.proto

优点:
1、效率和可读性的结合

以下是用protobuf作为通信协议的asio聊天室系统协议部分:

//Protocal.proto
syntax = 'proto3';
message PBindName {
	string name = 1;
}
message PChat {
	string information = 1;
}
message PRoomInformation {
	string name = 1;
	string information = 2;
}

生成好该文件后执行protoc --cpp_out=. Protocal.proto可执行.h头文件和.cc代码文件其中需要注意部分:
1、mutable_name()和release_name()分别是获取对string类型的name指针,和释放name指针
2、用的最多的是parsefromstring处理字符串数据
3、用parsefromstream可进行数据流的处理

以下是asio聊天室用protobuf的完整例子:

协议处理部分:

//structHeader.h
#ifndef FND_STRUCT_HEADER_H
#define FND_STRUCT_HEADER_H
#include <string>
struct Header {
	int bodySize;
	int type;
};

enum MessageType {
	MT_BIND_NAME = 1, // {"name" : "abc"}
	MT_CHAT_INFO = 2, // {"information" "what i say"}
	MT_ROOM_INFO = 3, // {"name" : "abc", "information" : "what i say"}
};

// client send
struct BindName {
	char name[32];
	int nameLen;
};

// client send
struct ChatInformation {
	char information[256];
	int infoLen;
};


// serversend
struct RoomInformation {
	BindName name;
	ChatInformation chat;
};

bool parseMessage(const std::string &input, int *type, std::string &outbuffer);
bool parseMessage2(const std::string &input, int *type, std::string &outbuffer);
bool parseMessage3(const std::string &input, int *type, std::string &outbuffer);
bool parseMessage4(const std::string &input, int *type, std::string &outbuffer);
#endif

//structHeader.cpp
#include "structHeader.h"
#include "SerilizationObject.h"
#include "JsonObject.h"
#include "Protocal.pb.h"
#include <cstdlib>
#include <cstring>
#include <iostream>
template <typename T> std::string seriliaze(const T &obj) {
  std::stringstream ss;
  boost::archive::text_oarchive oa(ss);
  oa & obj;
  return ss.str();
}

bool parseMessage4(const std::string &input, int *type,
                   std::string &outbuffer) {
  auto pos = input.find_first_of(" ");
  if (pos == std::string::npos)
    return false;
  if (pos == 0)
    return false;
	// "BindName ok" -> substr -> BindName
  auto command = input.substr(0, pos);
  if (command == "BindName") {
    // we try to bind name
    std::string name = input.substr(pos + 1);
    if (name.size() > 32)
      return false;
    if (type)
      *type = MT_BIND_NAME;
		PBindName bindName;
		bindName.set_name(name);
		//auto oldname = bindName.name();
		auto ok = bindName.SerializeToString(&outbuffer);
    return ok;
  } else if (command == "Chat") {
    // we try to chat
    std::string chat = input.substr(pos + 1);
    if (chat.size() > 256)
      return false;
		PChat pchat;
		pchat.set_information(chat);
		auto ok = pchat.SerializeToString(&outbuffer);
    if (type)
      *type = MT_CHAT_INFO;
    return ok;
  }
  return false;
}



bool parseMessage3(const std::string &input, int *type,
                   std::string &outbuffer) {
  auto pos = input.find_first_of(" ");
  if (pos == std::string::npos)
    return false;
  if (pos == 0)
    return false;
	// "BindName ok" -> substr -> BindName
  auto command = input.substr(0, pos);
  if (command == "BindName") {
    // we try to bind name
    std::string name = input.substr(pos + 1);
    if (name.size() > 32)
      return false;
    if (type)
      *type = MT_BIND_NAME;
		ptree tree;
		tree.put("name", name);
		outbuffer = ptreeToJsonString(tree);
    //outbuffer = seriliaze(SBindName(std::move(name)));
    return true;
  } else if (command == "Chat") {
    // we try to chat
    std::string chat = input.substr(pos + 1);
    if (chat.size() > 256)
      return false;
		ptree tree;
		tree.put("information", chat);
		outbuffer = ptreeToJsonString(tree);
		//outbuffer = seriliaze(SChatInfo(std::move(chat)));
    if (type)
      *type = MT_CHAT_INFO;
    return true;
  }
  return false;
}

bool parseMessage2(const std::string &input, int *type,
                   std::string &outbuffer) {
  auto pos = input.find_first_of(" ");
  if (pos == std::string::npos)
    return false;
  if (pos == 0)
    return false;
	// "BindName ok" -> substr -> BindName
  auto command = input.substr(0, pos);
  if (command == "BindName") {
    // we try to bind name
    std::string name = input.substr(pos + 1);
    if (name.size() > 32)
      return false;
    if (type)
      *type = MT_BIND_NAME;
    //SBindName bindInfo(std::move(name));
    outbuffer = seriliaze(SBindName(std::move(name)));
    return true;
  } else if (command == "Chat") {
    // we try to chat
    std::string chat = input.substr(pos + 1);
    if (chat.size() > 256)
      return false;
		outbuffer = seriliaze(SChatInfo(std::move(chat)));
//    ChatInformation info;
//    info.infoLen = chat.size();
//    std::memcpy(&(info.information), chat.data(), chat.size());
//    auto buffer = reinterpret_cast<const char *>(&info);
//    outbuffer.assign(buffer, buffer + sizeof(info));
    if (type)
      *type = MT_CHAT_INFO;
    return true;
  }
  return false;
}
// cmd messagebody
bool parseMessage(const std::string &input, int *type, std::string &outbuffer) {
  // input should be cmd body
  auto pos = input.find_first_of(" ");
  if (pos == std::string::npos)
    return false;
  if (pos == 0)
    return false;
	// "BindName ok" -> substr -> BindName
  auto command = input.substr(0, pos);
  if (command == "BindName") {
    // we try to bind name
    std::string name = input.substr(pos + 1);
    if (name.size() > 32)
      return false;
    if (type)
      *type = MT_BIND_NAME;
    BindName bindInfo;
    bindInfo.nameLen = name.size();
    std::memcpy(&(bindInfo.name), name.data(), name.size());
    auto buffer = reinterpret_cast<const char *>(&bindInfo);
    outbuffer.assign(buffer, buffer + sizeof(bindInfo));
    return true;
  } else if (command == "Chat") {
    // we try to chat
    std::string chat = input.substr(pos + 1);
    if (chat.size() > 256)
      return false;
    ChatInformation info;
    info.infoLen = chat.size();
    std::memcpy(&(info.information), chat.data(), chat.size());
    auto buffer = reinterpret_cast<const char *>(&info);
    outbuffer.assign(buffer, buffer + sizeof(info));
    if (type)
      *type = MT_CHAT_INFO;
    return true;
  }
  return false;
}

聊天消息处理:

//chat_message.h
#ifndef CHAT_MESSAGE_HPP
#define CHAT_MESSAGE_HPP

#include "structHeader.h"
#include "SerilizationObject.h"

#include <iostream>

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cassert>
// s -> c   c -> s message { header, body } // header length

class chat_message {
public:
  enum { header_length = sizeof(Header) };
  enum { max_body_length = 512 };

  chat_message(){}

  const char *data() const { return data_; }

  char *data() { return data_; }

  std::size_t length() const { return header_length + m_header.bodySize; }

  const char *body() const { return data_ + header_length; }

  char *body() { return data_ + header_length; }
  int type() const { return m_header.type; }

  std::size_t body_length() const { return m_header.bodySize; }
  void setMessage(int messageType, const void *buffer, size_t bufferSize) {
    assert(bufferSize <= max_body_length);
		m_header.bodySize = bufferSize;
		m_header.type = messageType;
		std::memcpy(body(), buffer, bufferSize);
		std::memcpy(data(), &m_header, sizeof(m_header));
  }
	void setMessage(int messageType, const std::string& buffer) {
		setMessage(messageType, buffer.data(), buffer.size());
	}

  bool decode_header() {
    std::memcpy(&m_header, data(), header_length);
    if (m_header.bodySize > max_body_length) {
      std::cout << "body size " << m_header.bodySize << " " << m_header.type
                << std::endl;
      return false;
		}
    return true;
  }

private:
  char data_[header_length + max_body_length];
	Header m_header;
};

#endif // CHAT_MESSAGE_HPP

服务器代码部分:

//main.cpp
#include "chat_message.h"
#include "JsonObject.h"
#include "Protocal.pb.h"

#include <boost/asio.hpp>

#include <deque>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <utility>

#include <cstdlib>

using boost::asio::ip::tcp;

//----------------------------------------------------------------------

using chat_message_queue = std::deque<chat_message>;
using chat_message_queue2 = std::list<chat_message>;
//----------------------------------------------------------------------


//----------------------------------------------------------------------

class chat_session;
using chat_session_ptr = std::shared_ptr<chat_session>;
class chat_room {
public:
public:
	void join(chat_session_ptr);
	void leave(chat_session_ptr);
	void deliver(const chat_message&);
private:
  std::set<chat_session_ptr> participants_;
  enum { max_recent_msgs = 100 };
  chat_message_queue recent_msgs_;
};
//----------------------------------------------------------------------

class chat_session : public std::enable_shared_from_this<chat_session> {
public:
  chat_session(tcp::socket socket, chat_room &room)
      : socket_(std::move(socket)), room_(room) {}

  void start() {
    room_.join(shared_from_this());
    do_read_header();
  }

  void deliver(const chat_message &msg) {
		// first false
    bool write_in_progress = !write_msgs_.empty();
    write_msgs_.push_back(msg);
    if (!write_in_progress) {
			// first
      do_write();
    }
  }

private:
  void do_read_header() {
    auto self(shared_from_this());
    boost::asio::async_read(
        socket_,
        boost::asio::buffer(read_msg_.data(), chat_message::header_length),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec && read_msg_.decode_header()) {
            do_read_body();
          } else {
            std::cout << "Player leave the room\n";
            room_.leave(shared_from_this());
          }
        });
  }

  void do_read_body() {
    auto self(shared_from_this());
    boost::asio::async_read(
        socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            //room_.deliver(read_msg_);
						handleMessage();
            do_read_header();
          } else {
            room_.leave(shared_from_this());
          }
        });
  }

  template <typename T> T toObject() {
    T obj;
    std::stringstream ss(std::string(
        read_msg_.body(), read_msg_.body() + read_msg_.body_length()));
    boost::archive::text_iarchive oa(ss);
    oa &obj;
    return obj;
  }

  bool fillProtobuf(::google::protobuf::Message* msg) {
    std::string ss(
        read_msg_.body(), read_msg_.body() + read_msg_.body_length());
		auto ok = msg->ParseFromString(ss);
    return ok;
  }

	ptree toPtree() {
		ptree obj;
		std::stringstream ss(
				std::string(read_msg_.body(),
					read_msg_.body() + read_msg_.body_length()));
		boost::property_tree::read_json(ss, obj);
		return obj;
	}

  void handleMessage() {
    if (read_msg_.type() == MT_BIND_NAME) {
			PBindName bindName;
			if(fillProtobuf(&bindName))
				m_name = bindName.name();
    } else if (read_msg_.type() == MT_CHAT_INFO) {
			PChat chat;
			if(!fillProtobuf(&chat)) return;
			m_chatInformation = chat.information();

      auto rinfo = buildRoomInfo();
      chat_message msg;
      msg.setMessage(MT_ROOM_INFO, rinfo);
      room_.deliver(msg);

    } else {
      // not valid msg do nothing
    }
  }

  void do_write() {
    auto self(shared_from_this());
    boost::asio::async_write(
        socket_, boost::asio::buffer(write_msgs_.front().data(),
                                     write_msgs_.front().length()),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            write_msgs_.pop_front();
            if (!write_msgs_.empty()) {
              do_write();
            }
          } else {
            room_.leave(shared_from_this());
          }
        });
  }

  tcp::socket socket_;
  chat_room &room_;
  chat_message read_msg_;
  chat_message_queue write_msgs_;
	std::string m_name;
	std::string m_chatInformation;
	std::string buildRoomInfo() const {
		PRoomInformation roomInfo;
		roomInfo.set_name(m_name);
		roomInfo.set_information(m_chatInformation);
		std::string out;
		auto ok = roomInfo.SerializeToString(&out);
		assert(ok);
		return out;
	}
//	RoomInformation buildRoomInfo() const {
//		RoomInformation info;
//		info.name.nameLen = m_name.size();
//		std::memcpy(info.name.name, m_name.data(), m_name.size());
//		info.chat.infoLen = m_chatInformation.size();
//		std::memcpy(info.chat.information, m_chatInformation.data(),
//				m_chatInformation.size());
//		return info;
//	}
};


  void chat_room::join(chat_session_ptr participant) {
    participants_.insert(participant);
    for (const auto& msg : recent_msgs_)
      participant->deliver(msg);
  }

  void chat_room::leave(chat_session_ptr participant) {
    participants_.erase(participant);
  }

  void chat_room::deliver(const chat_message &msg) {
    recent_msgs_.push_back(msg);
    while (recent_msgs_.size() > max_recent_msgs)
      recent_msgs_.pop_front();

    for (auto& participant : participants_)
      participant->deliver(msg);
  }


//----------------------------------------------------------------------

class chat_server {
public:
  chat_server(boost::asio::io_service &io_service,
              const tcp::endpoint &endpoint)
      : acceptor_(io_service, endpoint), socket_(io_service) {
    do_accept();
  }

private:
  void do_accept() {
    acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
      if (!ec) {
        auto session =
            std::make_shared<chat_session>(std::move(socket_), room_);
        session->start();
      }

      do_accept();
    });
  }

  tcp::acceptor acceptor_;
  tcp::socket socket_;
  chat_room room_;
};

//----------------------------------------------------------------------

int main(int argc, char *argv[]) {
  try {
		GOOGLE_PROTOBUF_VERIFY_VERSION;
    if (argc < 2) {
      std::cerr << "Usage: chat_server <port> [<port> ...]\n";
      return 1;
    }

    boost::asio::io_service io_service;

    std::list<chat_server> servers;
    for (int i = 1; i < argc; ++i) {
      tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i]));
      servers.emplace_back(io_service, endpoint);
    }

    io_service.run();
  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }

	google::protobuf::ShutdownProtobufLibrary();
  return 0;
}

客户端代码部分:

//client.cpp
#include "chat_message.h"
#include "structHeader.h"
#include "JsonObject.h"
#include "SerilizationObject.h"
#include "Protocal.pb.h"

#include <boost/asio.hpp>

#include <deque>
#include <iostream>
#include <thread>

#include <cstdlib>
#include <cassert>

using boost::asio::ip::tcp;

using chat_message_queue = std::deque<chat_message>;


class chat_client {
public:
  chat_client(boost::asio::io_service &io_service,
              tcp::resolver::iterator endpoint_iterator)
      : io_service_(io_service), socket_(io_service) {
    do_connect(endpoint_iterator);
  }

  void write(const chat_message &msg) {
    io_service_.post([this, msg]() {
      bool write_in_progress = !write_msgs_.empty();
      write_msgs_.push_back(msg);
      if (!write_in_progress) {
        do_write();
      }
    });
  }

  void close() {
    io_service_.post([this]() { socket_.close(); });
  }

private:
  void do_connect(tcp::resolver::iterator endpoint_iterator) {
    boost::asio::async_connect(
        socket_, endpoint_iterator,
        [this](boost::system::error_code ec, tcp::resolver::iterator) {
          if (!ec) {
            do_read_header();
          }
        });
  }

  void do_read_header() {
    boost::asio::async_read(
        socket_,
        boost::asio::buffer(read_msg_.data(), chat_message::header_length),
        [this](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec && read_msg_.decode_header()) {
            do_read_body();
          } else {
            socket_.close();
          }
        });
  }

  void do_read_body() {
    boost::asio::async_read(
        socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
        [this](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            if (read_msg_.type() == MT_ROOM_INFO) {
              //SRoomInfo info;
						std::string buffer(read_msg_.body(),
                              read_msg_.body() + read_msg_.body_length());

						PRoomInformation roomInfo;
						auto ok = roomInfo.ParseFromString(buffer);
						//if(!ok) throw std::runtime_error("not valid message");
              //std::stringstream ss(buffer);
							//ptree tree;
							//boost::property_tree::read_json(ss, tree);
						if (ok) {
						std::cout << "client: '";
						std::cout << roomInfo.name();
						std::cout << "' says '";
						std::cout
						<< roomInfo.information();
						std::cout << "'\n";
						}
//              boost::archive::text_iarchive ia(ss);
//              ia & info;
//              std::cout << "client: '";
//              std::cout << info.name();
//              std::cout << "' says '";
//              std::cout << info.information();
//              std::cout << "'\n";
            }
            do_read_header();
          } else {
            socket_.close();
          }
        });
  }

  void do_write() {
    boost::asio::async_write(
        socket_, boost::asio::buffer(write_msgs_.front().data(),
                                     write_msgs_.front().length()),
        [this](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            write_msgs_.pop_front();
            if (!write_msgs_.empty()) {
              do_write();
            }
          } else {
            socket_.close();
          }
        });
  }

private:
  boost::asio::io_service &io_service_;
  tcp::socket socket_;
  chat_message read_msg_;
  chat_message_queue write_msgs_;
};

int main(int argc, char *argv[]) {
  try {
		GOOGLE_PROTOBUF_VERIFY_VERSION;
    if (argc != 3) {
      std::cerr << "Usage: chat_client <host> <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;

    tcp::resolver resolver(io_service);
    auto endpoint_iterator = resolver.resolve({argv[1], argv[2]});
    chat_client c(io_service, endpoint_iterator);

    std::thread t([&io_service]() { io_service.run(); });

    char line[chat_message::max_body_length + 1];
		// ctrl-d
    while (std::cin.getline(line, chat_message::max_body_length + 1)) {
      chat_message msg;
			auto type = 0;
			std::string input(line, line + std::strlen(line));
			std::string output;
			if(parseMessage4(input, &type, output)) {
				msg.setMessage(type, output.data(), output.size());
				c.write(msg);
				std::cout << "write message for server " << output.size() << std::endl;
			}
    }

    c.close();
    t.join();
  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }

	google::protobuf::ShutdownProtobufLibrary();
  return 0;
}
使用protobuf设计消息协议(C++asio网络库相关)使用protobuf设计消息协议(C++asio网络库相关) 昔拉再世 发布了145 篇原创文章 · 获赞 34 · 访问量 4833 私信 关注
上一篇:Django使用Channels实现WebSocket


下一篇:我应该让长时间轮询的连接保持多长时间?