目录
2 void TaskPool::addTask(Task* task)
3 std::condition_variable:条件变量,用以控制调用线程
本文列举两个c++并发服务器架构,并尝试分析其中语法,若有错误,还望指正。
服务器处理任务的大体流程是: 服务器启动线程,开始等待和接收任务,(循环)处理任务,退出线程。接收任务时,任务被添加在全局的一个队列,被所有线程共享,这时就考虑一个同步。如果任务队列为空或者太满,还需要考虑线程池的容量大小。
这不禁让人想起了经典的生产者—消费着模式,生产者生产一个任务,消费者耗费一个任务,很类似,大概流程如下草图所示:
这里用到了互斥量锁,条件变量,
1 《C++服务器开发精髓》中的demo
这个demo只有三个文件,毕竟精简,源码在OSChina。我们先上代码:
/**
* 任务池模型,TaskPool.h
* zhangyl 2019.02.14
*/
#include <thread>
#include <mutex>
#include <condition_variable>
#include <list>
#include <vector>
#include <memory>
#include <iostream>
class Task
{
public:
virtual void doIt()
{
std::cout << "handle a task..." << std::endl;
}
virtual ~Task()
{
//为了看到一个task的销毁,这里刻意补上其析构函数
std::cout << "a task destructed..." << std::endl;
}
};
class TaskPool final
{
public:
TaskPool();
~TaskPool();
TaskPool(const TaskPool& rhs) = delete;
TaskPool& operator=(const TaskPool& rhs) = delete;
public:
void init(int threadNum = 5);
void stop();
void addTask(Task* task);
void removeAllTasks();
private:
void threadFunc();
private:
std::list<std::shared_ptr<Task>> m_taskList;
std::mutex m_mutexList;
std::condition_variable m_cv;
bool m_bRunning;
std::vector<std::shared_ptr<std::thread>> m_threads;
};
测试demo如下
#include "TaskPool.h"
#include <chrono>
int main()
{
TaskPool threadPool;
threadPool.init();
Task* task = NULL;
for (int i = 0; i < 10; ++i)
{
task = new Task();
threadPool.addTask(task);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
threadPool.stop();
return 0;
}
--Task类:是真正处理数据的一个地方,你接收的数据如何处理,都在这里实现。
--TaskPool类:是处理任务的线程池,开启任务处理的地方,因为可能有多个,所以需要等待任务队列,实现方式是互斥锁和条件变量。我们看一些重点接口:
1 void TaskPool::init(int)
void TaskPool::init(int threadNum/* = 5*/)
{
if (threadNum <= 0)
threadNum = 5;
m_bRunning = true;
for (int i = 0; i < threadNum; ++i)
{
std::shared_ptr<std::thread> spThread;
spThread.reset(new std::thread(std::bind(&TaskPool::threadFunc, this)));
m_threads.push_back(spThread);
}
}
1 std::shared_ptr:共享指针类模板
定义在<memory>,常见使用方式如下:
std::shared_ptr<类型名> 对象名;
共享指针代替我们管理对象的创建和销毁,较为安全,常见绑定对象方式如下,
共享指针对象名.reset(对象地址);
若reset参数为空,则表示解绑对象,相当于不管了,该对象的引用计数就会减1.
共享指针使用demo:
// shared_ptr::reset example
#include <iostream>
#include <memory>
int main () {
std::shared_ptr<int> sp; // empty
sp.reset (new int); // takes ownership of pointer
*sp=10;
std::cout << *sp << '\n';
sp.reset (new int); // deletes managed object, acquires new pointer
*sp=20;
std::cout << *sp << '\n';
sp.reset(); // deletes managed object
return 0;
}
2 std::thread 线程类
创建一个空的, 不可join的std::thread
执行对象。如果你只是这样定义,
std::thread thd;
那么线程不可join,因为是个空架子嘛,得绑定函数后,线程才会活动起来,也就可以join了。
std::thread thd;
thd = std::thread(function,args);
std::thread的使用demo
// thread example
#include <iostream> // std::cout
#include <thread> // std::thread
void foo()
{
// do stuff...
}
void bar(int x)
{
// do stuff...
}
int main()
{
std::thread first (foo); // spawn new thread that calls foo()
std::thread second (bar,0); // spawn new thread that calls bar(0)
std::cout << "main, foo and bar now execute concurrently...\n";
// synchronize threads:
first.join(); // pauses until first finishes
second.join(); // pauses until second finishes
std::cout << "foo and bar completed.\n";
return 0;
}
3 std::bind()函数模板
用以生成新的可调用对象,参数可为函数对象,引用,指针等,定义在<functional>,常用使用方式如下:
auto new_fun = std::bind(function, args);
这里有个函数参数占位符的概念,在使用new_func(arg)时,你传入的参数,会按照绑定时的占位符传递给真正的函数,比如你在第一个参数的位置写了_2占位符,
auto new_fun = std::bind(fn,_2,_1);
那么你在这样调用new_fun(11,22)时,11会传给fn的第二个形参,22会传给第一个形参。
具体使用demo如下:
#include <iostream>
#include <functional>
void fn(int n1, int n2, int n3) {
std::cout << n1 << " " << n2 << " " << n3 << std::endl;
}
int fn2() {
std::cout << "fn2 has called.\n";
return -1;
}
int main()
{
using namespace std::placeholders;
auto bind_test1 = std::bind(fn, 1, 2, 3);
auto bind_test2 = std::bind(fn, _1, _2, _3);
auto bind_test3 = std::bind(fn, 0, _1, _2);
auto bind_test4 = std::bind(fn, _2, 0, _1);
bind_test1();//输出1 2 3
bind_test2(3, 8, 24);//输出3 8 24
bind_test2(1, 2, 3, 4, 5);//输出1 2 3,4和5会被丢弃
bind_test3(10, 24);//输出0 10 24
bind_test3(10, fn2());//输出0 10 -1
bind_test3(10, 24, fn2());//输出0 10 24,fn2会被调用,但其返回值会被丢弃
bind_test4(10, 24);//输出24 0 10
return 0;
}
言归正传,那么这两行的意思就是,
std::shared_ptr<std::thread> spThread;
spThread.reset(new std::thread(std::bind(&TaskPool::threadFunc, this)));
用线程指针来绑定初始化并执行的线程。
2 void TaskPool::addTask(Task* task)
任务添加函数用来向任务队列添加任务,每添加一个任务,则向条件变量发送信号,唤醒线程处理任务。这里需要注意的是std::lock_guard,m_cv.notify_one();
void TaskPool::addTask(Task* task)
{
std::shared_ptr<Task> spTask;
spTask.reset(task);
{
std::lock_guard<std::mutex> guard(m_mutexList);
//m_taskList.push_back(std::make_shared<Task>(task));
m_taskList.push_back(spTask);
std::cout << "add a Task." << std::endl;
}
m_cv.notify_one();
}
1 std::lock_guard:互斥量对象的一个包装类
为了防止任务队列出现“我添加任务还没通知你,你就来取了”或者,“我刚通知A,结果B把任务取走了”这样的混乱局面,这里需要同步,用到的是std::lock_guard,其特点是
- 创建即加锁,作用域结束自动析构并解锁,无需手工解锁
- 不能中途解锁,必须等作用域结束才解锁
- 不能复制
如下所示,当函数作用域结束,自动释放全局锁。
std::mutex g_i_mutex; // protects g_i
void safe_increment()
{
const std::lock_guard<std::mutex> lock(g_i_mutex);
++g_i;
std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
}
2 notify_one() /notify_all():条件变量的公有成员函数
当线程使用条件变量对象的wait()方法阻塞时,需要其他线程在同一个条件变量对象上调用上面两个负责通知的成员函数来唤醒,区别是notify_one唤醒一个阻塞线程,notify_all唤醒所有阻塞线程。这里得说一下条件变量std::condition_variable
3 std::condition_variable:条件变量,用以控制调用线程
当std::condition_variable::wait函数被线程调用,std::unique_lock 就会阻塞调用线程,收到上面的两个通知函数发出的信号后,解除阻塞。
Deomo如下
// condition_variable example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void print_id (int id) {
std::unique_lock<std::mutex> lck(mtx);
while (!ready) cv.wait(lck);
// ...
std::cout << "thread " << id << '\n';
}
void go() {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_all();
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_id,i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto& th : threads) th.join();
return 0;
}
Demo里以reday变量控制线程。初始状态是10个线程全部阻塞等待被唤醒,调用go()后,print_id()函数从wait()的阻塞状态,被激活,ready又设置为true,结束循环,线程往下运行。这就是条件变量搭配互斥锁来控制线程的常见用法。
当然,这里有一个说法叫做虚假唤醒,顾名思义,线程被条件变量通知该办事了,结果却没有办。那这是什么原因呢?*这样解释虚假唤醒:
On many systems, especially multiprocessor systems, the problem of spurious wakeups is exacerbated because if there are several threads waiting on the condition variable when it's signaled, the system may decide to wake them all up, treating every
signal( )
to wake one thread as abroadcast( )
to wake all of them, thus breaking any possibly expected 1:1 relationship between signals and wakeups.
在多核处理器的系统上,当多个线程在同一条件变量上阻塞等待唤醒时,如果有一个信号,那么系统会把它当作是广播信号,唤醒所有线程。避免这种问题的方式之一是,把条件变量加入到判断里, 类似这样:
if(work_list.empty()){
cv.wait(unique_lock_object);
}
当工作队列为空时,线程阻塞等待,当工作队列加入任务时,同时发送信号,解除阻塞,线程继续执行。这样做,有一种特别情况:假设在多核系统上有10个阻塞等待的线程,当工作队列添加一个任务并用notify()发送信号时,按照上面的论述,10个有竞争关系的线程都被唤醒,但是只有最先被唤醒的线程拿到任务,其他线程再拿时发现队列为空。这就是虚假唤醒。
为了让线程真正工作,需要再次检查条件是否合适。比如放进循环里:
while(li.empty()){
cout << "do nothing" << endl;
cv.wait(lck);
}
正常来说,当notify_one()后,只有一个线程被唤醒,其他线程仍然阻塞。是这样吗?前文说过,在多核系统里,一个pthread_cond_signal会唤醒至少一个以上的进程。为了弄清楚这点,在多核系统里,我们在while循环里加了一条输出语句,以检查wait函数是否返回过:
// condition_variable example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
#include <list>
#include <algorithm>
using namespace std;
std::mutex mtx;
std::condition_variable cv;
std::list<int> li;
void print_id (std::list<int>& li) {
std::unique_lock<std::mutex> lck(mtx);
//while (!ready) cv.wait(lck);
while(li.empty()){
cout << "do nothing" << endl;
cv.wait(lck);
}
std::cout << "thread " << *(li.begin()) << '\n';
li.pop_back();
}
void go() {
std::unique_lock<std::mutex> lck(mtx);
li.push_back(99); // 添加元素到列表,使其不为空
cv.notify_one(); // 发送信号,唤醒一个阻塞线程
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_id,std::ref(li));
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto& th : threads) th.join();
return 0;
}
输出结果如下:
root@localhost study]$ ./a.out
10 threads ready to race...
thread 99
do nothing
do nothing
do nothing
do nothing
do nothing
do nothing
do nothing
do nothing
do nothing
如果输出do nothing,说明其他9个线程被唤醒过,但是因为list里的元素,被第一个唤醒的线程取走,所以9个线程又进入了阻塞在了wait()那里。
2 linux epoll高并发服务器
1 介绍epoll API.
epoll的工作原理和poll类似,都是在一组fd中监听,监听是否有fd变化,是否有可读写事件发生。那么epoll的一组常用api如下:
int epoll_create(int size):生成一个epoll专用的文件描述符,其中的参数是指定生成描述符的最大范围
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):用于注册,修改,删除某个文件描述符上的事件.
int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout):轮询I/O事件,就是看哪个fd发生了变化。
//结构体epoll_event被用于注册所感兴趣的事件和回传所发生待处理的事件,定义如下:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;//保存触发事件的某个文件描述符相关的数据
struct epoll_event {
__uint32_t events; /* epoll event */
epoll_data_t data; /* User data variable */
};
struct epoll_event的events事件变量,可能的取值如下:
EPOLLIN:表示对应的文件描述符可以读;
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数可读;EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: ET的epoll工作模式;
2 epoll流程
在C++服务器开发精髓中,并没有将服务器功能添加进去。在这里,epoll的fd集搭配套接字,实现了服务器功能。服务器的大概搭建流程是:
1 创建套接字
2 绑定套接字
3 监听套接字
4 循环等待客户端连接
1 有连接时创建新的客户端fd
2 轮询fd集合
而epoll在上述示例服务器中的使用流程是,把监听的fd添加到一个fd集合里,一旦fd状态发生变化(比如有客户端发起连接或者发起关闭),就会为对应的客户端创建套接字,并加入到集合里,epoll负责轮询这个fd集,一旦某个fd状态发生变化(比如fd上发生读/写事件),epoll就会通过判断进入相应的模块进行处理。
网络上有很多epoll服务器的demo,比如下面这个:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/resource.h>
#define MAXBUF 1024
#define MAXEPOLLSIZE 10000
/*
setnonblocking - 设置句柄为非阻塞方式
*/
int setnonblocking(int sockfd)
{
if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1)
{
return -1;
}
return 0;
}
/*
handle_message - 处理每个 socket 上的消息收发
*/
int handle_message(int new_fd)
{
char buf[MAXBUF + 1];
int len;
/* 开始处理每个新连接上的数据收发 */
bzero(buf, MAXBUF + 1);
/* 接收客户端的消息 */
len = recv(new_fd, buf, MAXBUF, 0);
if (len > 0)
{
printf("%d接收消息成功:'%s',共%d个字节的数据/n",
new_fd, buf, len);
}
else
{
if (len < 0)
printf
("消息接收失败!错误代码是%d,错误信息是'%s'/n",
errno, strerror(errno));
close(new_fd);
return -1;
}
/* 处理每个新连接上的数据收发结束 */
return len;
}
int main(int argc, char **argv)
{
int listener, new_fd, kdpfd, nfds, n, ret, curfds;
socklen_t len;
struct sockaddr_in my_addr, their_addr;
unsigned int myport, lisnum;
struct epoll_event ev;
struct epoll_event events[MAXEPOLLSIZE];
struct rlimit rt;
myport = 5000;
lisnum = 2;
/* 设置每个进程允许打开的最大文件数 */
rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
if (setrlimit(RLIMIT_NOFILE, &rt) == -1)
{
perror("setrlimit");
exit(1);
}
else
{
printf("设置系统资源参数成功!/n");
}
/* 开启 socket 监听 */
if ((listener = socket(PF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
}
else
{
printf("socket 创建成功!/n");
}
setnonblocking(listener);
bzero(&my_addr, sizeof(my_addr));
my_addr.sin_family = PF_INET;
my_addr.sin_port = htons(myport);
my_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1)
{
perror("bind");
exit(1);
}
else
{
printf("IP 地址和端口绑定成功/n");
}
if (listen(listener, lisnum) == -1)
{
perror("listen");
exit(1);
}
else
{
printf("开启服务成功!/n");
}
/* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */
kdpfd = epoll_create(MAXEPOLLSIZE);
len = sizeof(struct sockaddr_in);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = listener;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0)
{
fprintf(stderr, "epoll set insertion error: fd=%d/n", listener);
return -1;
}
else
{
printf("监听 socket 加入 epoll 成功!/n");
}
curfds = 1;
while (1)
{
/* 等待有事件发生 */
nfds = epoll_wait(kdpfd, events, curfds, -1);
if (nfds == -1)
{
perror("epoll_wait");
break;
}
/* 处理所有事件 */
for (n = 0; n < nfds; ++n)
{
if (events[n].data.fd == listener)
{
new_fd = accept(listener, (struct sockaddr *) &their_addr,&len);
if (new_fd < 0)
{
perror("accept");
continue;
}
else
{
printf("有连接来自于: %d:%d, 分配的 socket 为:%d/n",
inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), new_fd);
}
setnonblocking(new_fd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = new_fd;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_fd, &ev) < 0)
{
fprintf(stderr, "把 socket '%d' 加入 epoll 失败!%s/n",
new_fd, strerror(errno));
return -1;
}
curfds++;
}
else
{
ret = handle_message(events[n].data.fd);
if (ret < 1 && errno != 11)
{
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
curfds--;
}
}
}
}
close(listener);
return 0;
}
3 setrlimit()函数
获取或设置系统资源使用限制,linux下每种资源都有相关的软硬限制,软限制是内核强加给相应资源的限制值,硬限制是软限制的最大值。非授权调用的进程只能将其软限制指定为0~硬限制范围中的某个值,同时能不可逆转地降低其硬限制。授权进程可以任意改变其软硬限制。
/* 设置每个进程允许打开的最大文件数 */
rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
if (setrlimit(RLIMIT_NOFILE, &rt) == -1)
{
perror("setrlimit");
exit(1);
}
else
{
printf("设置系统资源参数成功!/n");
}
比如,你想发起10000连接,发现系统限制进程打开的最大文件数是1024,这时候你就要修改这个限制。如,查看进程限制的一些信息:
[user@localhost ~]$ ulimit -a
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 7081
max locked memory (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files (-n) 100001
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 4096
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited
这里open files我修改成了100001,原来是1024。当然还有其他一些限制,下文会提。
为了模拟高并发,我用c++简单封装了一个功能虽不完善但可以使用的tcp客户端,可以在linux直接搭配server使用,轻松实现几万甚至10万并发架构,在云盘里:
链接:https://pan.baidu.com/s/1-3mbwE_sBb34Ot2_3IVINQ
提取码:1231
[root@localhost client]# ls
app commdef.h main.cpp network.h
client.cpp epool.c Makefile
client.h network.cpp
解压后,在当前目录直接输入make,编译生成可执行文件app,运行前先启动服务器。为了同步端口等资源,我在commdef.h里配置了相应的端口,IP,连接数量上限等资源,可以自行修改
[root@localhost server]# ls
a.out epool.c
比如,开启服务器,客户端连接时,输出结果如下,笔者因为虚拟机性能限制(2核2G内存),并发2万的连接需要4秒,如果去掉打印输出或者性能更好的好,可能会更快点~~
nfds = 1
有连接来自于: 127.0.0.1:35357, 分配的 socket 为:19991
nfds = 1
有连接来自于: 127.0.0.1:35359, 分配的 socket 为:19992
nfds = 1
有连接来自于: 127.0.0.1:35361, 分配的 socket 为:19993
nfds = 1
有连接来自于: 127.0.0.1:35363, 分配的 socket 为:19994
nfds = 1
有连接来自于: 127.0.0.1:35365, 分配的 socket 为:19995
nfds = 1
有连接来自于: 127.0.0.1:35367, 分配的 socket 为:19996
nfds = 1
有连接来自于: 127.0.0.1:35369, 分配的 socket 为:19997
nfds = 1
有连接来自于: 127.0.0.1:35371, 分配的 socket 为:19998
nfds = 1
有连接来自于: 127.0.0.1:35373, 分配的 socket 为:19999
nfds = 1
有连接来自于: 127.0.0.1:35375, 分配的 socket 为:20000
nfds = 1
有连接来自于: 127.0.0.1:35377, 分配的 socket 为:20001
nfds = 1
有连接来自于: 127.0.0.1:35379, 分配的 socket 为:20002
nfds = 1
nfds = 467
nfds = 822
nfds = 1542
nfds = 1974
nfds = 2135
nfds = 3567
nfds = 4099
nfds = 5392
至于客户端,笔者采用多线程架构,创建5万个客户端,由20个线程向服务器发起连接,但是实际测试中,单个线程并不是5w/20个客户端,而是递减的,这个原因限制我还没有查明。而且发起的连接数最大值,和系统端口资源限制/和单进程打开最大文件数有关系。
控制socket的方法集中在Network类,客户端封装了fd和一些其他如首发数据等信息,下面解释一下这两个类:
Network类:负责控制socket创建,socket地址出事后,socket绑定,socket重用,设置超时时间,发起服务器连接等功能,里面用到了一些新特性的语法,如c++11的lambda表达式等,还有一些socket相关的知识,这里就不作展开了。
//network.h
#ifndef NETWORK_H_
#define NETWORK_H_
#include <thread>
#include <mutex>
#include <condition_variable>
#include <list>
#include <vector>
#include <memory>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <algorithm>
#include "commdef.h"
#include "client.h"
typedef std::shared_ptr<Client> type_shared_ptr_client;
typedef type_shared_ptr_client sp_client;
typedef std::list<type_shared_ptr_client> type_list_client;
typedef std::vector<std::shared_ptr<std::thread>> type_vector_thread;
class Network{
/* static function */
public:
static char* GetSysTime();
/* public member function */
public:
Network();
~Network();
// Setter module
int SetNonBlocking(const int& socket);
int SetAddrReuse(sp_client& c, bool opt);
int SetRecvTimeout(sp_client& c, int ms);
int SetSendTimeout(sp_client& c, int ms);
bool SetFd(sp_client& c,int fd);
int CreateClient(int client_num = MAXCLIENT);
int ParseSockAddr(struct sockaddr* addr,ADDR_TYPE* type, char* str_addr, unsigned short* port);
int Start(int client_num = MAXCLIENT);
// Getter module
int GetClientListSize();
sp_client& GetClientByID(unsigned int id);
// thread module
int InitThread(int thread_num = 20);
void ThreadFunc();
void MonitorFunc();
void Stop();
/* private member function */
private:
int ConnectServer(sp_client& sp);
int CreateSocket(ADDR_TYPE addr_type = IPV4,PROTO_TYPE proto_type = TCP);
int InitSockAddr(const char* str_addr,unsigned short port,SOCK_ADDR* addr,ADDR_TYPE type);
bool InitClient();
int GetSockfd(sp_client& c);
/* member variable*/
private:
bool m_running;
bool m_ready;
std::mutex m_mtx;
type_list_client m_clist;
type_vector_thread m_threads;
std::condition_variable m_cv;
};
#endif //NETWORK_H_
// network.cpp
#include "network.h"
#include "client.h"
using namespace std;
int tnb = 0;
Network::Network(){
m_running = false;
}
Network::~Network(){}
int Network::SetNonBlocking(const int& socket){
return fcntl(socket,F_SETFL,fcntl(socket,F_GETFL,0) | O_NONBLOCK);
}
char* GetSysTime(){
time_t now;
struct tm* timenow;
time(&now);
timenow = localtime(&now);
return asctime(timenow);
}
int Network::SetSendTimeout(sp_client& c, int ms){
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 500;
return setsockopt(c->GetFd(), SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval));
}
int Network::SetRecvTimeout(sp_client& c, int ms){
struct timeval tv;
tv.tv_sec = ms / 1000;
tv.tv_usec = (ms % 1000) * 1000;
return setsockopt(c->GetFd(),SOL_SOCKET, SO_RCVTIMEO,(char *)&tv,sizeof(struct timeval));
}
int Network::SetAddrReuse(sp_client& c, bool opt){
return setsockopt(c->GetFd(), SOL_SOCKET, SO_REUSEADDR,(char *)&opt, sizeof(opt));
}
bool Network::SetFd(sp_client& c, int fd){
if(fd < 0)
return false;
c->SetFd(fd);
return true;
}
int Network::GetSockfd(sp_client& c){
return c->GetFd();
}
// connect server by a client
int Network::ConnectServer(sp_client& sp){
//std::unique_lock<std::mutex> lock(m_mtx);
// declare server socket addr
SOCK_ADDR addr;
bzero(&addr,sizeof(SOCK_ADDR));
// create a socket fd
sp->SetFd(CreateSocket(IPV4, TCP));
// ip port addr addr_type
InitSockAddr(sp->GetServerIP().c_str(), SERV_PORT, &addr, IPV4);
SetSendTimeout(sp, SENDTIMEOUT);
// set reuse socket
SetAddrReuse(sp,true);
// start to connect server
if(connect(sp->GetFd(),(struct sockaddr*)&addr, sizeof(addr)) < 0)
{
if(errno == EADDRINUSE){
cout << "Local address is already in use." << endl;
}
else if(errno == ETIMEDOUT){
cout << " Timeout while attempting connection. The server may be too busy to accept new connections." << endl;
return -1;
}
else if(errno == ECONNREFUSED){
cout << "ECONNREFUSED" << endl;
return -1;
}else{
return -1;
}
}
sp->SetStatus(true);
return 0;
}
int Network::Start(int client_num/* =MAXCLIENT*/){
InitThread();
CreateClient(client_num);
m_cv.notify_all();
return 0;
}
int Network::CreateClient(int client_num /*=MAXCLIENT*/){
cout << "enter createclient" << endl;
std::unique_lock<std::mutex> lock(m_mtx);
std::shared_ptr<Client> spClient;
if(client_num < 1)
return -1;
for(int i = 1; i < client_num + 1; i++){
spClient.reset(new Client(i));
{
m_clist.push_back(spClient);
//std::cout << "create "<< i << " client" << endl;
}
}
cout << "create " << GetClientListSize() << " clients successfully!" << endl;
m_cv.notify_all();
return 0;
}
std::shared_ptr<Client>& Network::GetClientByID(unsigned int id){
type_list_client::iterator it = m_clist.begin();
if(id == 1)
return *it;
else if(id > 1){
id--;
while(id--){
it++;
}
return *it;
}else{
cout << "GetClientByID(): Error client id!" << endl;
exit(1);
}
}
// turn net format ip into demical format ip
int Network::ParseSockAddr(struct sockaddr* addr, ADDR_TYPE* type,char* str_addr, unsigned short* port){
if(addr->sa_family == AF_INET){
struct sockaddr_in* addr_v4 = (struct sockaddr_in*)addr;
inet_ntop(AF_INET, &(addr_v4->sin_addr), str_addr, INET_ADDRSTRLEN);
*port = ntohs(addr_v4->sin_port);
*type = IPV4;
}
else if(addr->sa_family == AF_INET6){
struct sockaddr_in6* addr_v6 = (struct sockaddr_in6*)addr;
inet_ntop(AF_INET6,&(addr_v6->sin6_addr), str_addr,INET6_ADDRSTRLEN);
*port = ntohs(addr_v6->sin6_port);
*type = IPV6;
}
else{
return -1;
}
return 0;
}
int Network::InitSockAddr(const char* ip/*in*/, unsigned short port/*in*/, SOCK_ADDR* addr/*in*/, ADDR_TYPE type/*=IPV4*//*in*/){
if(ip == NULL || addr == NULL){
return -1;
}
if(type == IPV4){
struct in_addr addr_v4;
//turn demical format ip to inet formart ip
if( 1 != inet_pton(AF_INET, ip, &addr_v4)){
return -1;
}
addr->addr_v4.sin_family = AF_INET;
addr->addr_v4.sin_addr = addr_v4;
addr->addr_v4.sin_port = htons(port);
}
else if(type == IPV6){
struct in6_addr addr_v6;
if( 1 != inet_pton(AF_INET6, ip, &addr_v6)){
return -1;
}
addr->addr_v6.sin6_family = AF_INET6;
addr->addr_v6.sin6_addr = addr_v6;
addr->addr_v6.sin6_port = htons(port);
}
else{
return -1;
}
return 0;
}
int Network::CreateSocket(ADDR_TYPE addr_type, PROTO_TYPE proto_type){
int fd = -1;
if(addr_type == IPV4 && proto_type == TCP)
{
fd = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP);
}
else if(addr_type == IPV4 && proto_type == UDP){
fd = socket(AF_INET,SOCK_DGRAM, IPPROTO_UDP);
}
else if(addr_type == IPV6 && proto_type == TCP)
{
fd = socket(AF_INET6,SOCK_STREAM, IPPROTO_TCP);
}
else if(addr_type == IPV6 && proto_type == UDP){
fd = socket(AF_INET6,SOCK_DGRAM, IPPROTO_UDP);
}
return fd;
}
int Network::GetClientListSize(){
return this->m_clist.size();
}
// init threads
int Network::InitThread(int thread_num){
if(thread_num <= 0){
thread_num = 20;
}
m_running = true;
for(int i = 0; i < thread_num; ++i){
std::shared_ptr<std::thread> spThread;
spThread.reset(new std::thread(std::bind(&Network::ThreadFunc,this)));
m_threads.push_back(spThread);
}
return 0;
}
// thread function
void Network::ThreadFunc(){
sp_client sp;
for(;;){
std::unique_lock<std::mutex> lock(m_mtx);//automatically lock
while(m_clist.empty()){
if(!m_running)
break;
m_cv.wait(lock);
}
//cout << std::this_thread::get_id() << " :" << GetClientListSize() << " running = " << m_running << endl;
/*
int connect_nb = GetClientListSize()/20;
int real_conn_nb = 0;
for(type_list_client::iterator it = m_clist.begin(); it != m_clist.end(); it=m_clist.begin())
{
if(real_conn_nb == connect_nb)
break;
if(GetClientListSize() <= 0)
break;
ConnectServer(*it);
m_clist.pop_front();
real_conn_nb++;
}
//cout << "thread "<< std::this_thread::get_id()<<" exit and takes " << real_conn_nb << " clients"<<endl;
*/
sp = m_clist.front();
m_clist.pop_front();
ConnectServer(sp);
sp.reset();
}
}
// stop all threads
void Network::Stop(){
m_running = false;
m_cv.notify_all();
for(auto& it : m_threads){
if(it->joinable())
it->join();
}
}
// output the activ client number per second
void Network::MonitorFunc(){
while(1){
auto acn = [this](){
int cnt=0;
type_list_client::iterator it = m_clist.begin();
while(it!=m_clist.end()){
if((*it)->GetStatus())
cnt++;
it++;
}
return cnt;
};
sleep(1000);
}
}
客户端头文件
//client.h
#ifndef CLIENT_H_
#define CLIENT_H_
#include <iostream>
#include <stdio.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include "commdef.h"
class Client{
public:
Client();
Client(unsigned int id);
~Client();
// setter
void SetID(unsigned int id);
void SetAddrType(ADDR_TYPE type);
void SetPort(unsigned short port);
void SetServerIP(const std::string& ip);
bool SetFd(int fd);
void SetStatus(bool flag);
std::string& GetServerIP();
unsigned int GetID();
int GetFd();
unsigned short GetPort();
unsigned long long GetSendBytes();
unsigned long long GetRecvBytes();
pthread_t GetThreadID();
//const char* GetServerIP();
const char* GetClientIP();
bool GetStatus();
private:
bool client_ready;
unsigned int client_id;
int client_fd;
ADDR_TYPE client_addr_type;//IPV4,IPV6
unsigned short client_port;
std::string client_ip;
std::string server_ip;
pthread_t client_thread_id;
void* client_thread_arg;
unsigned long long client_send_bytes;
unsigned long long client_recv_bytes;
};
#endif // CLIENT_H_
好了,本文就到这里,如果有什么问题说的不对,欢迎指正~如果喜欢,就点个赞吧