十四、day14
今天学习如何通过单例模板实现逻辑层
1. 利用C++11特性封装单例模板
和上一节设计的单例模板有些不同,本节设计的单例模板利用了以下四个C++11新特性,优化了代码
-
unique_lock和lock_guard
-
once_flag和call_once
-
std::function
-
condition_variable
#pragma once
#include <memory>
#include <mutex>
#include <iostream>
using std::cout;
using std::cin;
using std::endl;
template <typename T>
class Singleton;
template<typename T>
class SafeDeletor
{
public:
void operator()(Singleton<T>* st)
{
cout << "this is safe deleter operator()" << endl;
delete st;
}
};
template <typename T>
class Singleton {
template<typename T>
friend class SafeDeletor;
protected:
~Singleton() {
cout << "this is singleton destruct" << endl;
}
Singleton() = default;
Singleton(const Singleton<T>&) = delete;
Singleton& operator=(const Singleton<T>&st) = delete;
private:
static std::shared_ptr<T> _instance;
public:
static std::shared_ptr<T> GetInstance() {
static std::once_flag s_flag; // C++11新特性,懒汉模式在多线程下也不会生成多个实例,不用进行加锁操作
std::call_once(s_flag, [&]() { // call_once内部有锁
_instance = std::shared_ptr<T>(new T, SafeDeletor<T>());
});
return _instance;
}
void PrintAddress() {
cout << _instance->get() << endl;
}
};
// 初始化
template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;
-
私有成员仅保留静态成员变量_instance,用于保存唯一实例,无需手动进行加锁(C++11新特性会自动实现)
-
s_flag是函数GetInstance内的局部静态变量,该变量在函数GetInstance第一次调用时被初始化。以后无论调用多少次GetInstances_flag都不会被重复初始化,而且s_flag存在静态区,会随着进程结束而自动释放。
-
call_once只会调用一次,而且是线程安全的, 其内部的原理就是调用该函数时加锁,然后设置s_flag内部的标记,设置为已经初始化,执行lambda表达式逻辑初始化智能指针,然后解锁。第二次调用GetInstance内部还会调用call_once, 只是call_once判断s_flag已经被初始化了就不执行初始化智能指针的操作了。
-
PrintAddress()函数获取指向托管对象的原始指针
1)本节GetInstance()函数实现
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<T>(new T, SafeDeletor<T>());
});
-
使用 std::call_once:这个方法保证了指定的 lambda 函数只会被调用一次。无论有多少线程尝试访问 GetInstance,只有第一个线程会执行初始化代码,其他线程会等待。这种方法内部处理了线程同步,不需要手动加锁。
-
懒汉式加载:确保在第一次调用时实例化,避免了不必要的开销。
2)上一节GetInstance()函数实现
if (single != nullptr)
{
return single;
}
s_mutex.lock();
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
//额外指定删除器
single = std::shared_ptr<T>(new T, SafeDeletor_T<T>());
s_mutex.unlock();
-
手动加锁:使用 s_mutex.lock() 和 s_mutex.unlock(),需要手动管理锁。
以上四个C++11新特性的介绍请参考
C++11新特性 - 知乎 (zhihu.com)
2. 逻辑类LogicSystem
1)逻辑类的声明
首先定义一个逻辑类LogicSystem,并继承单例模板Singleton<LogicSystem>
#pragma once
#include "Singleton.h"
#include <queue>
#include <thread>
#include "CSession.h"
#include <map>
#include <functional>
#include "Const.h"
#include <json/json.h>
#include <json/reader.h>
typedef std::function<void(std::shared_ptr<CSession>, const short& msg_id, const std::string& msg_data)> FunCallBack;
class LogicSystem:public Singleton<LogicSystem>
{
// 因为Singleton<LogicSystem>中实例化make_ptr时,使用了new,需要用到LogicSystem的构造函数,但已被私有化,所以需要友元
friend class Singleton<LogicSystem>;
private:
LogicSystem();
void RegisterCallBacks();
void HelloWordCallBack(std::shared_ptr<CSession>, const short& msg_id, const std::string& msg_data);
void DealMsg();
std::queue<std::shared_ptr<LogicNode>> _msg_que;
std::mutex _mutex;
std::condition_variable _consume;
std::thread _worker_thread;
bool _b_stop;
std::map<short, FunCallBack> _fun_callback;
public:
~LogicSystem();
void PostMsgToQue(std::shared_ptr<LogicNode> msg);
};
-
FunCallBack为要注册的回调函数类型,其参数为绘画类智能指针,消息id,以及消息内容。
-
_msg_que为逻辑队列,其中的元素相当于RecvNode,只不过为了实现伪闭包,所以创建一个LogicNode类,包含Session的智能指针防止被提前释放
-
_mutex 为保证逻辑队列安全的互斥量
-
_consume表示消费者条件变量,用来控制当逻辑队列为空时保证线程暂时挂起等待,不要干扰其他线程。
-
_fun_callbacks表示回调函数的map,根据id查找对应的逻辑处理函数。
-
_worker_thread表示工作线程,用来从逻辑队列中取数据并执行回调函数。
-
_b_stop表示收到外部的停止信号,逻辑类要中止工作线程并优雅退出。
2)LogicNode类
LogicNode类定义在CSession.h中,详细代码可以查看之前的文章
class LogicNode {
friend class LogicSystem;
public:
LogicNode(std::shared_ptr<CSession>, std::shared_ptr<RecvNode>);
private:
std::shared_ptr<CSession> _session;
std::shared_ptr<RecvNode> _recvnode;
};
-
_session:声明Session的智能指针,实现伪闭包,防止Session被提前释放
-
_recvnode:接收消息节点的智能指针
构造函数如下:
LogicNode::LogicNode(std::shared_ptr<CSession> session, std::shared_ptr<RecvNode> recvnode) :
_session(session), _recvnode(recvnode){}
3)逻辑类的实现
a. 逻辑类的构造函数
LogicSystem::LogicSystem() :_b_stop(false) {
RegisterCallBacks();
_worker_thread = std::thread(&LogicSystem::DealMsg, this);
}
-
_b_stop初始化为false,当前工作线程不会停止
-
RegisterCallBacks():调用回调注册函数:这个函数用于注册消息处理的回调函数。将不同的消息 ID 和对应的处理函数关联起来,以便在处理消息时能够找到正确的函数。
-
_worker_thread = std::thread(&LogicSystem::DealMsg, this);
-
创建工作线程:使用 std::thread 创建一个新的线程。
-
&LogicSystem::DealMsg:指定要在新线程中执行的成员函数,即 DealMsg,该函数将负责从消息队列中提取消息并处理它。
-
this:传递当前对象的指针,以便 DealMsg 可以访问 LogicSystem 的成员变量和函数。
-
b. RegisterCallBacks(), 注册消息处理的回调函数
void LogicSystem::RegisterCallBacks() {
_fun_callback[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack,
this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}
将消息ID:MSG_HELLO_WORD映射至回调函数HelloWordCallBack,并存储至_fun_callback。当接收到消息MSG_HELLO_WORD时,系统会调用 HelloWordCallBack 方法来处理消息。
_fun_callback的定义如下:
typedef std::function<void(std::shared_ptr<CSession>, const short& msg_id, const std::string& msg_data)> FunCallBack;
std::map<short, FunCallBack> _fun_callback;
_fun_callback是一个map,值和键的类型分别为short和FunCallBack,前者是消息id的类型,后者是回调函数。
FunCallBack使用了C++11新特性std::function来声明了一个函数签名,表示接受一个 shared_ptr 指向 CSession、一个 short 类型的消息 ID 和一个 std::string 类型的消息数据的可调用对象
c. HelloWordCallBack, 处理对应消息类型的回调函数
void LogicSystem::HelloWordCallBack(std::shared_ptr<CSession> session, const short& msg_id, const std::string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
std::cout << "receive msg id is " << root["id"].asInt() << "msg data is "
<< root["data"].asString() << endl;
root["data"] = "server has receive msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString();
session->Send(return_str, root["id"].asInt());
}
-
使用jsoncpp库包装或者解析数据
-
调用 parse 方法将 msg_data 字符串解析为 JSON 格式,结果存储在 root 中
-
回传消息
d. DealMsg, 处理逻辑队列中的消息流程
void LogicSystem::DealMsg() {
for (;;) {
// 和lock_guard的区别,这里是对_mutex进行加锁
std::unique_lock<std::mutex> unique_lk(_mutex);
// 若队列为空,用条件变量等待
while (_msg_que.empty() and !_b_stop) {
_consume.wait(unique_lk);
}
// 判断如果为关闭状态,取出逻辑队列所有数据处理并退出循环
if (_b_stop) {
while (!_msg_que.empty()) {
auto msg_node = _msg_que.front();
cout << "recv msg id is " << msg_node->_recvnode->GetMsgID() << endl;
auto call_back_iter = _fun_callback.find(msg_node->_recvnode->GetMsgID());
if (call_back_iter == _fun_callback.end()) {
_msg_que.pop();
continue;
}
call_back_iter->second(msg_node->_session, msg_node->_recvnode->GetMsgID(),
std::string(msg_node->_recvnode->_msg, msg_node->_recvnode->_cur_len));
_msg_que.pop();
}
break;
}
// 如果没有停服,且队列不为空
auto msg_node = _msg_que.front();
cout << "recv msg id is " << msg_node->_recvnode->GetMsgID() << endl;
auto call_back_iter = _fun_callback.find(msg_node->_recvnode->GetMsgID());
// 注册_fun_callback时,将消息id以及匹配的回调函数绑定在一起
// 若队列中没有该消息对应的回调函数,则释放该消息并处理下一条
if (call_back_iter == _fun_callback.end()) {
_msg_que.pop();
continue;
}
// 执行消息类型对应的回调函数
call_back_iter->second(msg_node->_session, msg_node->_recvnode->GetMsgID(),
std::string(msg_node->_recvnode->_msg, msg_node->_recvnode->_cur_len));
_msg_que.pop();
}
}
-
DealMsg 逻辑中初始化了一个unique_lock,主要用于控制队列安全,并且配合条件变量可以随时解锁。lock_guard不具备解锁功能,所以此处用unique_lock。
-
若队列为空,并且不是停止状态,就挂起线程。否则继续执行之后的逻辑,如果_b_stop为true,说明处于停服状态,则将队列中未处理的消息全部处理完然后退出循环。如果_b_stop未false,则说明没有停服,是consumer发送的激活信号激活了线程,则继续取队列中的数据处理。
e. 析构函数
LogicSystem::~LogicSystem() {
_b_stop = true;
_consume.notify_one();
_worker_thread.join();
}
逻辑类的析构函数在所有工作线程运行结束后被执行,但工作线程可能会处于挂起状态,此时需要一个激活信号打断_consume的wait状态(在该命令前一步将_b_stop置为true)。
f. PostMsgToQue, 添加消息至逻辑队列
void LogicSystem::PostMsgToQue(std::shared_ptr<LogicNode> msg) {
std::unique_lock<std::mutex> unique_lk(_mutex); // 锁定互斥量以保护共享数据
_msg_que.push(msg); // 将消息推入队列
if (_msg_que.size() == 1) { // 如果这是队列中的第一个消息
_consume.notify_one(); // 通知消费者线程,结束wait状态
}
}
3. 修改CSession类的headle_read函数
将函数中原本对消息的处理过程(cout读到的消息并回传)删去,改为将消息投至逻辑队列
LogicSystem::GetInstance()->PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
完整函数代码如下:
void CSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred,
std::shared_ptr<CSession> _self_shared) {
if (!error) {
// 打印缓存区的数据并将该线程暂停2s
//PrintRecvData(_data, bytes_transferred);
//std::chrono::milliseconds dura(2000);
//std::this_thread::sleep_for(dura);
// 每触发一次handale_read,它会返回实际读取的字节数bytes_transferred,copy_len表示已处理的长度,每处理一字节,copy_len便加一
int copy_len = 0; // 已经处理的字符数
while (bytes_transferred > 0) { // 只要读取到数据就对其处理
if (!_b_head_parse) { // 判断消息头部是否已处理,_b_head_parse默认为false
// 异步读取到的字节数 + 已接收到的头部长度 < 头部总长度
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) { // 收到的数据长度小于头部长度,说明头部还未全部读取
// 如果未完全接收消息头,则将接收到的数据复制到头部缓冲区
// _recv_head_node->_msg,更新当前头部的接收长度,并继续异步读取剩余数据。
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;
// 缓冲区清零,无需更新copy_len追踪已处理的字符数,因为之前读取的数据已经全部写入头部节点,下一个
// 读入的消息从头开始(copy_len=0)往头节点写
::memset(_data, 0, MAX_LENGTH);
// 继续读消息
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::headle_read, this,
std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
// 如果接收到的数据量足够处理消息头部,则计算头部剩余的未接收字节,
// 并将其从 _data 缓冲区复制到头部消息缓冲区 _recv_head_node->_msg
int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len; // 头部剩余未复制的长度
// 填充头部节点
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);
copy_len += head_remain; // 更新已处理的data长度
bytes_transferred -= head_remain; // 更新剩余未处理的长度
short msg_id = 0; // 获取消息id
memcpy(&msg_id, _recv_head_node->_msg, HEAD_ID_LEN);
//网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
cout << "msg_id is " << msg_id << endl;
// 判断id是否合法
if (msg_id > MAX_LENGTH) {
std::cout << "invaild msg_id is " << msg_id << endl;
_server->ClearSession(_uuid);
return;
}
short msg_len = 0; // 获取头部数据(消息长度)
memcpy(&msg_len, _recv_head_node->_msg + HEAD_ID_LEN, HEAD_DATA_LEN);
//网络字节序转化为本地字节序
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
cout << "msg_len is " << msg_len << endl;
if (msg_len > MAX_LENGTH) { // 判断头部长度是否非法
std::cout << "invalid data length is " << msg_len << endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node = std::make_shared<RecvNode>(msg_len, msg_id); // 已知数据长度msg_len,构建消息内容载体
//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < msg_len) {
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
// copy_len不用更新,缓冲区会清零,下一个读入data的数据从头开始写入,copy_len也会被初始化为0
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
_b_head_parse = true; //头部处理完成
return;
}
// 接收的长度多于消息内容长度
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
_recv_msg_node->_cur_len += msg_len;
copy_len += msg_len;
bytes_transferred -= msg_len;
_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
// cout << "receive data is " << _recv_msg_node->_msg << endl;
// protobuf序列化
//MsgData msgdata;
//std::string receive_data;
//msgdata.ParseFromString(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len));
//std::cout << "receive msg id is " << msgdata.id () << " msg data is " << msgdata.data() << endl;
//std::string return_str = "Server has received msg, msg data is " + msgdata.data();
//MsgData msgreturn;
//msgreturn.set_id(msgdata.id());
//msgreturn.set_data(return_str);
//msgreturn.SerializeToString(&return_str);
//Send(return_str);
// jsoncpp序列化
//Json::Reader reader;
//Json::Value root;
//reader.parse(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len), root);
//std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
// << root["data"].asString() << endl;
//root["data"] = "Server has received msg, msg data is " + root["data"].asString();
//std::string return_str = root.toStyledString();
//Send(return_str, root["id"].asInt());
LogicSystem::GetInstance()->PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
//Send(_recv_msg_node->_msg, _recv_msg_node->_total_len); // 回传
// 清理已处理的头部消息并重置,准备解析下一条消息
_b_head_parse = false;
_recv_head_node->Clear();
// 如果当前数据已经全部处理完,重置缓冲区 _data,并继续异步读取新的数据
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
continue; // 异步读取的消息未处理完,继续填充头节点乃至新的消息节点
}
//已经处理完头部,处理上次未接受完的消息数据
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) { //接收的数据仍不足剩余未处理的
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
// 接收的数据多于剩余未处理的长度
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
//cout << "receive data is " << _recv_msg_node->_msg << endl;
// protobuf序列化
//MsgData msgdata;
//std::string receive_data;
//msgdata.ParseFromString(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len));
//std::cout << "receive msg id is " << msgdata.id() << " msg data is " << msgdata.data() << endl;
//std::string return_str = "Server has received msg, msg data is " + msgdata.data();
//MsgData msgreturn;
//msgreturn.set_id(msgdata.id());
//msgreturn.set_data(return_str);
//msgreturn.SerializeToString(&return_str);
//Send(return_str);
//jsoncpp序列化
//Json::Reader reader;
//Json::Value root;
//reader.parse(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len), root);
//std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
// << root["data"].asString() << endl;
//root["data"] = "Server has received msg, msg data is " + root["data"].asString();
//std::string return_str = root.toStyledString();
//Send(return_str, root["id"].asInt());
LogicSystem::GetInstance()->PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
//此处可以调用Send发送测试
//Send(_recv_msg_node->_msg, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
continue;
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}
4. 测试
运行服务器和客户端,测试结果如下