示例代码
#include "Util.h"
#include "MyAsio.h"
#include "TcpConnectionManager.h"
#include "SocketMessageRecvDispatchManager.h"
/*
作 者: itdef
技术博客 http://www.cnblogs.com/itdef/
技术交流群 群号码:324164944
欢迎c c++ windows驱动爱好者 服务器程序员沟通交流
部分老代码存放地点 http://www.oschina.net/code/list_by_user?id=614253
*/ unsigned int DEF::tcp_connection::i = ;
extern DEF::TcpConnectManager g_tcp_connect_manager;
extern DEF::SocketMessageRecvDispatchManager g_socket_message_recv_dispatch_manager; void DEF::AsioLoopThread() {
try
{
boost::asio::io_service io_service;
DEF::tcp_server server(io_service);
io_service.run();
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
} std::thread DEF::StartAsioLoopThread()
{
std::thread t = std::thread(DEF::AsioLoopThread);
return t;
} void DEF::tcp_connection::handle_read_head(const boost::system::error_code& error,
size_t bytes_transferred)
{
//收到错误格式信息
if (error || bufHead_.flag != '|')
{
std::weak_ptr<DEF::tcp_connection> wp(shared_from_this());
g_tcp_connect_manager.LeaveWithLock(wp);
std::cerr << __FUNCTION__ << "(). wrong flag or " << error.message()<< std::endl;
return;
} boost::asio::async_read(socket_, boost::asio::buffer(recvBuffBody, bufHead_.bufferLenth),
boost::bind(&tcp_connection::handle_read_body, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
} void DEF::tcp_connection::start()
{
std::weak_ptr<DEF::tcp_connection> wp(shared_from_this());
g_tcp_connect_manager.JoinWithLock(wp); boost::asio::async_read(socket_, boost::asio::buffer(&bufHead_, sizeof(bufHead_)),
boost::bind(&tcp_connection::handle_read_head, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
} std::string DEF::SharedPtr2StringId(const std::shared_ptr<DEF::tcp_connection>& ptr) {
std::stringstream ss;
ss << ptr;
std::string str_ptr_id;
ss >> str_ptr_id;
return str_ptr_id;
} void DEF::tcp_connection::handle_read_body(const boost::system::error_code& error,
size_t bytes_transferred)
{
if (error) {
std::weak_ptr<DEF::tcp_connection> wp(shared_from_this());
g_tcp_connect_manager.LeaveWithLock(wp);
std::cerr << __FUNCTION__ <<" "<< error.message()<< std::endl;
return;
}
recvBuffBody[bytes_transferred] = '\0';
//std::cout << "recv " << recvBuffBody << std::endl;
g_socket_message_recv_dispatch_manager.PostSockRecvMessageAndNotify(recvBuffBody);
boost::asio::async_read(socket_, boost::asio::buffer(&bufHead_, sizeof(bufHead_)),
boost::bind(&tcp_connection::handle_read_head, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
消息队列表
#pragma once
#include <mutex>
#include <deque>
#include <string> #include "Util.h"
/*
作 者: itdef
技术博客 http://www.cnblogs.com/itdef/
技术交流群 群号码:324164944
欢迎c c++ windows驱动爱好者 服务器程序员沟通交流
部分老代码存放地点 http://www.oschina.net/code/list_by_user?id=614253
*/
NAMESPACEBEGIN(DEF) struct MessageQueueWithLock {
std::mutex messageQueuemtx;
std::condition_variable messageQueuecv;
volatile bool ready = false;
std::deque<std::string> messageDeq; void PostMessageAndNotify(const std::string& json) {
std::lock_guard<std::mutex> lck(messageQueuemtx);
messageDeq.push_back(json);
messageQueuecv.notify_one();
} std::string WaitForMessageDequeOnce() {
std::string s;
{
std::unique_lock<std::mutex> lck(messageQueuemtx);
while (messageDeq.empty())
messageQueuecv.wait(lck);
}
{
std::lock_guard<std::mutex> lck(messageQueuemtx);
if (!messageDeq.empty()) {
s = messageDeq.front();
messageDeq.pop_front();
}
}
return s;
}
}; NAMESPACEEND
一个全局管理连接的容器
#pragma once
#pragma once
#include "Util.h"
#include <map>
#include <mutex>
#include <memory>
#include <sstream> #include "MyAsio.h"
/*
作 者: itdef
技术博客 http://www.cnblogs.com/itdef/
技术交流群 群号码:324164944
欢迎c c++ windows驱动爱好者 服务器程序员沟通交流
部分老代码存放地点 http://www.oschina.net/code/list_by_user?id=614253
*/
NAMESPACEBEGIN(DEF)
//class tcp_connection;
class TcpConnectManager {
public:
typedef std::string SOCKWEAKPTRID;
TcpConnectManager() {}
~TcpConnectManager() {} void JoinWithLock(const std::weak_ptr<tcp_connection>& ptr); void LeaveWithLock(const std::weak_ptr<tcp_connection>& ptr); std::weak_ptr<tcp_connection> FindWeakPtrBySockPtrIdWithLock(const std::string& sock_ptr_id);
TcpConnectManager& operator=(const TcpConnectManager&) = delete;
TcpConnectManager(const TcpConnectManager&) = delete;
private:
std::mutex mtx;
std::map<SOCKWEAKPTRID, std::weak_ptr<DEF::tcp_connection>> connection_infos;
}; NAMESPACEEND