网络编程(14)——基于单例模板实现的逻辑层

十四、day14

今天学习如何通过单例模板实现逻辑层

1. 利用C++11特性封装单例模板

和上一节设计的单例模板有些不同,本节设计的单例模板利用了以下四个C++11新特性,优化了代码

  • unique_lock和lock_guard

  • once_flag和call_once

  • std::function

  • condition_variable

#pragma once
#include <memory>
#include <mutex>
#include <iostream>
using std::cout;
using std::cin;
using std::endl;

template <typename T>
class Singleton;

template<typename T>
class SafeDeletor
{
public:
	void operator()(Singleton<T>* st)
	{
		cout << "this is safe deleter operator()" << endl;
		delete st;
	}
};

template <typename T>
class Singleton {
	template<typename T>
	friend class SafeDeletor;
protected:
	~Singleton() {
		cout << "this is singleton destruct" << endl;
	}
	Singleton() = default;
	Singleton(const Singleton<T>&) = delete;
	Singleton& operator=(const Singleton<T>&st) = delete;
private:
	static std::shared_ptr<T> _instance;
public:
	static std::shared_ptr<T> GetInstance() {
		static std::once_flag s_flag; // C++11新特性,懒汉模式在多线程下也不会生成多个实例,不用进行加锁操作
		std::call_once(s_flag, [&]() { // call_once内部有锁
			_instance = std::shared_ptr<T>(new T, SafeDeletor<T>());
			});
		return _instance;
	}

	void PrintAddress() {
		cout << _instance->get() << endl;
	}
};

// 初始化
template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;
  • 私有成员仅保留静态成员变量_instance,用于保存唯一实例,无需手动进行加锁(C++11新特性会自动实现)

  • s_flag是函数GetInstance内的局部静态变量,该变量在函数GetInstance第一次调用时被初始化。以后无论调用多少次GetInstances_flag都不会被重复初始化,而且s_flag存在静态区,会随着进程结束而自动释放。

  • call_once只会调用一次,而且是线程安全的, 其内部的原理就是调用该函数时加锁,然后设置s_flag内部的标记,设置为已经初始化,执行lambda表达式逻辑初始化智能指针,然后解锁。第二次调用GetInstance内部还会调用call_once, 只是call_once判断s_flag已经被初始化了就不执行初始化智能指针的操作了。

  • PrintAddress()函数获取指向托管对象的原始指针

1)本节GetInstance()函数实现

static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
    _instance = std::shared_ptr<T>(new T, SafeDeletor<T>());
});
  • 使用 std::call_once:这个方法保证了指定的 lambda 函数只会被调用一次。无论有多少线程尝试访问 GetInstance,只有第一个线程会执行初始化代码,其他线程会等待。这种方法内部处理了线程同步,不需要手动加锁。

  • 懒汉式加载:确保在第一次调用时实例化,避免了不必要的开销。

2)上一节GetInstance()函数实现

if (single != nullptr)
        {
            return single;
        }
        s_mutex.lock();
        if (single != nullptr)
        {
            s_mutex.unlock();
            return single;
        }
        //额外指定删除器
        single = std::shared_ptr<T>(new T, SafeDeletor_T<T>());
        s_mutex.unlock();
  • 手动加锁:使用 s_mutex.lock() 和 s_mutex.unlock(),需要手动管理锁。

以上四个C++11新特性的介绍请参考

C++11新特性 - 知乎 (zhihu.com)

2. 逻辑类LogicSystem

1)逻辑类的声明

首先定义一个逻辑类LogicSystem,并继承单例模板Singleton<LogicSystem>

#pragma once
#include "Singleton.h"
#include <queue>
#include <thread>
#include "CSession.h"
#include <map>
#include <functional>
#include "Const.h"
#include <json/json.h>
#include <json/reader.h>

typedef std::function<void(std::shared_ptr<CSession>, const short& msg_id, const std::string& msg_data)> FunCallBack;

class LogicSystem:public Singleton<LogicSystem>
{
	// 因为Singleton<LogicSystem>中实例化make_ptr时,使用了new,需要用到LogicSystem的构造函数,但已被私有化,所以需要友元
	friend class Singleton<LogicSystem>;
private:
	LogicSystem();
	void RegisterCallBacks();
	void HelloWordCallBack(std::shared_ptr<CSession>, const short& msg_id, const std::string& msg_data);
	void DealMsg();

	std::queue<std::shared_ptr<LogicNode>> _msg_que;
	std::mutex _mutex;
	std::condition_variable _consume;
	std::thread _worker_thread;
	bool _b_stop;
	std::map<short, FunCallBack> _fun_callback;
public:
	~LogicSystem();
	void PostMsgToQue(std::shared_ptr<LogicNode> msg);
};
  • FunCallBack为要注册的回调函数类型,其参数为绘画类智能指针,消息id,以及消息内容。

  • _msg_que为逻辑队列,其中的元素相当于RecvNode,只不过为了实现伪闭包,所以创建一个LogicNode类,包含Session的智能指针防止被提前释放

  • _mutex 为保证逻辑队列安全的互斥量

  • _consume表示消费者条件变量,用来控制当逻辑队列为空时保证线程暂时挂起等待,不要干扰其他线程。

  • _fun_callbacks表示回调函数的map,根据id查找对应的逻辑处理函数。

  • _worker_thread表示工作线程,用来从逻辑队列中取数据并执行回调函数。

  • _b_stop表示收到外部的停止信号,逻辑类要中止工作线程并优雅退出。

2)LogicNode类

LogicNode类定义在CSession.h中,详细代码可以查看之前的文章

class LogicNode {
	friend class LogicSystem; 
public:
	LogicNode(std::shared_ptr<CSession>, std::shared_ptr<RecvNode>);
private:
	std::shared_ptr<CSession> _session;
	std::shared_ptr<RecvNode> _recvnode;
};
  • _session:声明Session的智能指针,实现伪闭包,防止Session被提前释放

  • _recvnode:接收消息节点的智能指针

构造函数如下:

LogicNode::LogicNode(std::shared_ptr<CSession> session, std::shared_ptr<RecvNode> recvnode) :
_session(session), _recvnode(recvnode){}

3)逻辑类的实现

a. 逻辑类的构造函数

LogicSystem::LogicSystem() :_b_stop(false) {
	RegisterCallBacks();
	_worker_thread = std::thread(&LogicSystem::DealMsg, this);
}
  • _b_stop初始化为false,当前工作线程不会停止

  • RegisterCallBacks():调用回调注册函数:这个函数用于注册消息处理的回调函数。将不同的消息 ID 和对应的处理函数关联起来,以便在处理消息时能够找到正确的函数。

  • _worker_thread = std::thread(&LogicSystem::DealMsg, this);

    • 创建工作线程:使用 std::thread 创建一个新的线程。

    • &LogicSystem::DealMsg:指定要在新线程中执行的成员函数,即 DealMsg,该函数将负责从消息队列中提取消息并处理它。

    • this:传递当前对象的指针,以便 DealMsg 可以访问 LogicSystem 的成员变量和函数。

b. RegisterCallBacks(), 注册消息处理的回调函数

void LogicSystem::RegisterCallBacks() {
	_fun_callback[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack,
		this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}

消息ID:MSG_HELLO_WORD映射至回调函数HelloWordCallBack,并存储至_fun_callback。当接收到消息MSG_HELLO_WORD时,系统会调用 HelloWordCallBack 方法来处理消息。

_fun_callback的定义如下:

typedef std::function<void(std::shared_ptr<CSession>, const short& msg_id, const std::string& msg_data)> FunCallBack;

std::map<short, FunCallBack> _fun_callback;

_fun_callback是一个map,值和键的类型分别为short和FunCallBack,前者是消息id的类型,后者是回调函数。

FunCallBack使用了C++11新特性std::function来声明了一个函数签名,表示接受一个 shared_ptr 指向 CSession、一个 short 类型的消息 ID 和一个 std::string 类型的消息数据的可调用对象

c. HelloWordCallBack, 处理对应消息类型的回调函数

void LogicSystem::HelloWordCallBack(std::shared_ptr<CSession> session, const short& msg_id, const std::string& msg_data) {
	Json::Reader reader;
	Json::Value root;
	reader.parse(msg_data, root);
	std::cout << "receive msg id is " << root["id"].asInt() << "msg data is "
		<< root["data"].asString() << endl;
	root["data"] = "server has receive msg, msg data is " + root["data"].asString();

	std::string return_str = root.toStyledString();
	session->Send(return_str, root["id"].asInt());
}
  • 使用jsoncpp库包装或者解析数据

  • 调用 parse 方法将 msg_data 字符串解析为 JSON 格式,结果存储在 root 中

  • 回传消息

d. DealMsg, 处理逻辑队列中的消息流程

void LogicSystem::DealMsg() {
	for (;;) {
		// 和lock_guard的区别,这里是对_mutex进行加锁
		std::unique_lock<std::mutex> unique_lk(_mutex);

		// 若队列为空,用条件变量等待
		while (_msg_que.empty() and !_b_stop) {
			_consume.wait(unique_lk);
		}

		// 判断如果为关闭状态,取出逻辑队列所有数据处理并退出循环
		if (_b_stop) {
			while (!_msg_que.empty()) {
				auto msg_node = _msg_que.front();
				cout << "recv msg id is " << msg_node->_recvnode->GetMsgID() << endl;
				auto call_back_iter = _fun_callback.find(msg_node->_recvnode->GetMsgID());
				if (call_back_iter == _fun_callback.end()) {
					_msg_que.pop();
					continue;
				}
				call_back_iter->second(msg_node->_session, msg_node->_recvnode->GetMsgID(),
					std::string(msg_node->_recvnode->_msg, msg_node->_recvnode->_cur_len));
				_msg_que.pop();
			}
			break;
		}

		// 如果没有停服,且队列不为空
		auto msg_node = _msg_que.front();
		cout << "recv msg id is " << msg_node->_recvnode->GetMsgID() << endl;

		auto call_back_iter = _fun_callback.find(msg_node->_recvnode->GetMsgID()); 
		// 注册_fun_callback时,将消息id以及匹配的回调函数绑定在一起
		// 若队列中没有该消息对应的回调函数,则释放该消息并处理下一条
		if (call_back_iter == _fun_callback.end()) {
			_msg_que.pop();
			continue;
		}
		// 执行消息类型对应的回调函数
		call_back_iter->second(msg_node->_session, msg_node->_recvnode->GetMsgID(),
			std::string(msg_node->_recvnode->_msg, msg_node->_recvnode->_cur_len));
		_msg_que.pop();
	}
}
  • DealMsg 逻辑中初始化了一个unique_lock,主要用于控制队列安全,并且配合条件变量可以随时解锁。lock_guard不具备解锁功能,所以此处用unique_lock。

  • 若队列为空,并且不是停止状态,就挂起线程。否则继续执行之后的逻辑,如果_b_stop为true,说明处于停服状态,则将队列中未处理的消息全部处理完然后退出循环。如果_b_stop未false,则说明没有停服,是consumer发送的激活信号激活了线程,则继续取队列中的数据处理。

e. 析构函数

LogicSystem::~LogicSystem() {
	_b_stop = true;
	_consume.notify_one();
	_worker_thread.join();
}

逻辑类的析构函数在所有工作线程运行结束后被执行,但工作线程可能会处于挂起状态,此时需要一个激活信号打断_consumewait状态(在该命令前一步将_b_stop置为true)。

f. PostMsgToQue, 添加消息至逻辑队列

void LogicSystem::PostMsgToQue(std::shared_ptr<LogicNode> msg) {
    std::unique_lock<std::mutex> unique_lk(_mutex); // 锁定互斥量以保护共享数据
    _msg_que.push(msg); // 将消息推入队列

    if (_msg_que.size() == 1) { // 如果这是队列中的第一个消息
        _consume.notify_one(); // 通知消费者线程,结束wait状态
    }
}

3. 修改CSession类的headle_read函数

将函数中原本对消息的处理过程(cout读到的消息并回传)删去,改为将消息投至逻辑队列

LogicSystem::GetInstance()->PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));

完整函数代码如下:

void CSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred,
	std::shared_ptr<CSession> _self_shared) {
	if (!error) {

		// 打印缓存区的数据并将该线程暂停2s
		//PrintRecvData(_data, bytes_transferred);
		//std::chrono::milliseconds dura(2000);
		//std::this_thread::sleep_for(dura);

		// 每触发一次handale_read,它会返回实际读取的字节数bytes_transferred,copy_len表示已处理的长度,每处理一字节,copy_len便加一
		int copy_len = 0; // 已经处理的字符数
		while (bytes_transferred > 0) { // 只要读取到数据就对其处理
			if (!_b_head_parse) { // 判断消息头部是否已处理,_b_head_parse默认为false
				// 异步读取到的字节数 + 已接收到的头部长度 < 头部总长度
				if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) { // 收到的数据长度小于头部长度,说明头部还未全部读取
					// 如果未完全接收消息头,则将接收到的数据复制到头部缓冲区
					// _recv_head_node->_msg,更新当前头部的接收长度,并继续异步读取剩余数据。
					memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
					_recv_head_node->_cur_len += bytes_transferred;
					// 缓冲区清零,无需更新copy_len追踪已处理的字符数,因为之前读取的数据已经全部写入头部节点,下一个
					// 读入的消息从头开始(copy_len=0)往头节点写
					::memset(_data, 0, MAX_LENGTH);
					// 继续读消息
					_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::headle_read, this,
						std::placeholders::_1, std::placeholders::_2, _self_shared));
					return;
				}

				// 如果接收到的数据量足够处理消息头部,则计算头部剩余的未接收字节,
				// 并将其从 _data 缓冲区复制到头部消息缓冲区 _recv_head_node->_msg
				int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len; // 头部剩余未复制的长度
				// 填充头部节点
				memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);
				copy_len += head_remain; // 更新已处理的data长度
				bytes_transferred -= head_remain; // 更新剩余未处理的长度

				short msg_id = 0; // 获取消息id
				memcpy(&msg_id, _recv_head_node->_msg, HEAD_ID_LEN);
				//网络字节序转化为本地字节序
				msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
				cout << "msg_id is " << msg_id << endl;
				// 判断id是否合法
				if (msg_id > MAX_LENGTH) {
					std::cout << "invaild msg_id is " << msg_id << endl;
					_server->ClearSession(_uuid);
					return;
				}
				
				short msg_len = 0; // 获取头部数据(消息长度)
				memcpy(&msg_len, _recv_head_node->_msg + HEAD_ID_LEN, HEAD_DATA_LEN);  
				//网络字节序转化为本地字节序
				msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
				cout << "msg_len is " << msg_len << endl;
				
				if (msg_len > MAX_LENGTH) { // 判断头部长度是否非法
					std::cout << "invalid data length is " << msg_len << endl;
					_server->ClearSession(_uuid);
					return;
				}

				_recv_msg_node = std::make_shared<RecvNode>(msg_len, msg_id); // 已知数据长度msg_len,构建消息内容载体
				//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
				if (bytes_transferred < msg_len) {
					memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
					_recv_msg_node->_cur_len += bytes_transferred;
					// copy_len不用更新,缓冲区会清零,下一个读入data的数据从头开始写入,copy_len也会被初始化为0
					::memset(_data, 0, MAX_LENGTH);
					_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
						std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
					
					_b_head_parse = true; //头部处理完成
					return;
				}

				// 接收的长度多于消息内容长度
				memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
				_recv_msg_node->_cur_len += msg_len;
				copy_len += msg_len;
				bytes_transferred -= msg_len;
				_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
				// cout << "receive data is " << _recv_msg_node->_msg << endl;

				// protobuf序列化
				//MsgData msgdata;
				//std::string receive_data;
				//msgdata.ParseFromString(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len));
				//std::cout << "receive msg id is " << msgdata.id () << " msg data is  " << msgdata.data() << endl;
				//std::string return_str = "Server has received msg, msg data is " + msgdata.data();
				//MsgData msgreturn;
				//msgreturn.set_id(msgdata.id());
				//msgreturn.set_data(return_str);
				//msgreturn.SerializeToString(&return_str);
				//Send(return_str);

				// jsoncpp序列化
				//Json::Reader reader;
				//Json::Value root;
				//reader.parse(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len), root);
				//std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "
				//	<< root["data"].asString() << endl;
				//root["data"] = "Server has received msg, msg data is " + root["data"].asString();
				//std::string return_str = root.toStyledString();
				//Send(return_str, root["id"].asInt());

				LogicSystem::GetInstance()->PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));

				//Send(_recv_msg_node->_msg, _recv_msg_node->_total_len); // 回传
				// 清理已处理的头部消息并重置,准备解析下一条消息
				_b_head_parse = false;
				_recv_head_node->Clear();

				// 如果当前数据已经全部处理完,重置缓冲区 _data,并继续异步读取新的数据
				if (bytes_transferred <= 0) {
					::memset(_data, 0, MAX_LENGTH);
					_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
						std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
					return;
				}

				continue; // 异步读取的消息未处理完,继续填充头节点乃至新的消息节点
			}

			//已经处理完头部,处理上次未接受完的消息数据
			int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
			if (bytes_transferred < remain_msg) { //接收的数据仍不足剩余未处理的
				memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
				_recv_msg_node->_cur_len += bytes_transferred;
				::memset(_data, 0, MAX_LENGTH);
				_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
					std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
				return;
			}
			// 接收的数据多于剩余未处理的长度
			memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
			_recv_msg_node->_cur_len += remain_msg;
			bytes_transferred -= remain_msg;
			copy_len += remain_msg;
			_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
			//cout << "receive data is " << _recv_msg_node->_msg << endl;

			// protobuf序列化
			//MsgData msgdata;
			//std::string receive_data;
			//msgdata.ParseFromString(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len));
			//std::cout << "receive msg id is " << msgdata.id() << " msg data is  " << msgdata.data() << endl;
			//std::string return_str = "Server has received msg, msg data is " + msgdata.data();
			//MsgData msgreturn;
			//msgreturn.set_id(msgdata.id());
			//msgreturn.set_data(return_str);
			//msgreturn.SerializeToString(&return_str);
			//Send(return_str);

			//jsoncpp序列化
			//Json::Reader reader;
			//Json::Value root;
			//reader.parse(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len), root);
			//std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "
			//	<< root["data"].asString() << endl;
			//root["data"] = "Server has received msg, msg data is " + root["data"].asString();
			//std::string return_str = root.toStyledString();
			//Send(return_str, root["id"].asInt());

			LogicSystem::GetInstance()->PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));

			//此处可以调用Send发送测试
			//Send(_recv_msg_node->_msg, _recv_msg_node->_total_len);
			//继续轮询剩余未处理数据
			_b_head_parse = false;
			_recv_head_node->Clear();
			if (bytes_transferred <= 0) {
				::memset(_data, 0, MAX_LENGTH);
				_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
					std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
				return;
			}
			continue;
		}
	}
	else {
		std::cout << "handle read failed, error is " << error.what() << endl;
		Close();
		_server->ClearSession(_uuid);
	}
}

4. 测试

运行服务器和客户端,测试结果如下

上一篇:面试指南1009


下一篇:招联金融2025秋招内推