c++之初级的消息队列及线程池模型

1.最近项目不是很忙,结合之前看的一些开源代码(skynet及其他github代码)及项目代码,抽空写了一个简单的任务队列当做练习。

2.介绍:

  1)全局队列中锁的使用:多线程下,全局队列需要加锁,本例中封装了MutexGuard。操作全局队列之前,先在栈上创建一个临时锁对象,调用构造函数时加锁,对象销毁时调用析构函数从而解锁,减少了我们手动加锁,解锁的过程。

  2)信号的使用:本例可以说是为了使用信号而使用信号,仅仅是为了熟悉信号机一些特性。 当程序以后台模式 跑起来以后,输入kill -USR1 %1 向程序发送SIGUSR1信号,从而使生产者生产一定数量的job,供消费者使用;消费者线程,在处理完全局队列以后sleep,等待生产者产生新任务; 输入 kill -USR2 %1, 改变变量状态,向信号监听线程发送结束通知,结束线程。

  3)简单的线程池模型。

  4)简单的线程间通信和同步方式示例。

  5)简单的类模板的使用。

3.编译: 文件不多,偷懒没有写makefile文件,可自行加上。编译指令 : g++ -g -Wall -o test main.cpp mutex.cpp List.h mutex.h -lpthread

4:执行流程:

  1)编译成功后,输入 ./mytest &。 以后台模式运行程序

  2)此时所有consumer线程阻塞,等待生产者生产job; 一个producer线程阻塞在select处,等待读管道内的消息;一个signal_handler线程调用 pthread_sigwait( ... ) 等待 SIGUSR1 和 SIGUSR2 信号的到来。

可通过在控制台输入: kill -USR1 %1(ps: kill 指令用来产生信号 当以后台模式运行该进程时, %1用来获得该进程 id,因此该命令表示向 该进程发送 SIGUSR1 信号)进程发送SIGUSR1信号,被signal_handler捕捉到以后,生产job,唤醒consumer线程处理job,此流程可重复执行;当在控制台输入 kill -USR2 %1 时, 改变quit变量值,从而使得各个线程退出,进程结束。还有一个 spoliling 轮询线程,在全局队列不为空的情况下,及时唤醒consumer线程处理任务。可通过调整wakeup中的参数,调整唤醒consumer的频率。

5.参考:

  1)UNIX环境高级编程。

  2)https://github.com/idispatch/signaltest

  3)https:github.com/cloudwu/skynet/skynet-src/skynet_start.c

水平有限,仅供参考,希望能对读者有所帮助。以上描述及以下源码有任何漏洞与不足,欢迎及时指正与交流。

6:源码:

main.cpp:

#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/types.h>
#include <string.h>
#include <errno.h>
#include <signal.h> #include "List.h"
#include "mutex.h" #define THREAD_NUM 4
#define JOB_NUM 100 #define handle_error_en(en, msg) \
do{ errno = en; perror(msg); exit(EXIT_FAILURE); } while() using std::cout;
using std::endl;
using std::string;
using std::cin; struct monitor
{
int count;
pthread_cond_t cond;
pthread_mutex_t mutex;
int sleep;
int quit;
int pfds[];
}; struct sig
{
sigset_t set;
struct monitor *m;
}; typedef void (*thread_func)(void *arg, int value); //job call back
struct job
{
void *arg;
thread_func cb;
}; List<job *> g_list;
const int allowed_signals[] = {SIGUSR1, SIGUSR2, SIGQUIT}; static void
print_v(void *value, int pid)
{
printf("pid: %d, value: %d\n", pid, *(int*)value);
} static void
free_job(struct job *j)
{
if(j == NULL)
{
return;
} free(j->arg);
j->arg = NULL;
free(j);
j = NULL;
} static int
dispatch(int pid)
{
struct job *j = g_list.Pop();
if (j != NULL)
{
j->cb(j->arg, pid);
free_job(j);
return ;
} return -;
} static void *
consumer(void *arg)
{
struct monitor *m = (struct monitor *)arg;
int r = ; usleep();
int pid = pthread_self();
while(!m->quit)
{
r = dispatch(pid);
if (r < )
{
if(pthread_mutex_lock(&m->mutex) == )
{
++m->sleep;
cout << "thread : " << pid << " sleep" << endl;
if(!m->quit)
{
pthread_cond_wait(&m->cond, &m->mutex);
} -- m->sleep;
cout << "thread : " << pid << " wakeup" << endl;
if(pthread_mutex_unlock(&m->mutex))
{
fprintf(stderr, "unlock mutex error");
exit();
}
}
}
}
cout << "thread consumer quit " << endl;
return NULL;
} static void
free_monitor(struct monitor *m)
{
if(m == NULL)
{
return;
}
cout << "free monitor called" << endl;
close(m->pfds[]);
close(m->pfds[]); free(m);
cout << "free monitor over" << endl;
} static void
wakeup(struct monitor *m, int busy)
{
if (m->sleep >= m->count - busy)
{
// signal sleep worker, "spurious wakeup" is harmless
pthread_cond_signal(&m->cond);
}
} static struct job*
create_job()
{
struct job * j = (struct job *)calloc(, sizeof(*j));
if (j == NULL)
{
fprintf(stderr, "create_job failed");
return NULL;
} int v = rand();
j->arg = malloc(sizeof (int));
if (j->arg == NULL)
{
fprintf(stderr, "get arg failed");
return NULL;
}
memcpy(j->arg, &v, sizeof (int) );
j->cb = print_v; return j;
} static void *
producer(void *arg)
{ struct monitor *m = (struct monitor *)arg;
cout << "producer called" << endl;
int pid = pthread_self();
int state;
while(!m->quit)
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(m->pfds[], &fds); state = select(m->pfds[] + , &fds, NULL, NULL, NULL);
if(state < )
{
if(errno == EINTR)
{
cout << "errno == EINTR" << endl;
continue;
}
break;
}
else if (state == )
{ }
else
{
char msg[];
memset(msg, , sizeof(msg));
read(m->pfds[], msg, sizeof(msg)); //only to clear up pipe.
msg[strlen(msg)] = '\0';
fprintf(stdout, "msgis: %s\n", msg);
fflush(stdout); if (FD_ISSET(m->pfds[], &fds))
{
if(strncmp(msg, "quit", strlen("quit")) == )
{
break;
} int i;
for (i = ; i < JOB_NUM; i++)
{
struct job *j = create_job();
if (j == NULL)
{
fprintf(stderr, "prodecer failed");
exit();
}
g_list.Push(j);
}
cout << "Thread " << "[" << pid << "]" << ": create " << JOB_NUM << " jobs" << endl;
wakeup(m, );
}
}
} cout << "thread producer quit" << endl;
return NULL;
} static int
check_g_list()
{
int len = g_list.get_job_num();
if(len == )
{
return -;
} return ;
} static void *
spoiling(void *arg)
{
struct monitor *m = (struct monitor *)arg;
cout << "spoiling called" << endl;
while(!m->quit)
{
int n = check_g_list();
if(n == )
{
break;
}
if(n < )
{
continue;
}
wakeup(m, );
} cout << "thread spoiling quit" << endl;
return NULL;
} static void
thread_create(pthread_t *pid, void *arg , void * (*pthread_func) (void *))
{
if(pthread_create(pid, NULL, pthread_func, arg) != )
{
fprintf(stderr, "create_thread failed");
exit();
}
} static void*
signal_handler(void *arg)
{
struct monitor *m = (struct monitor *)arg;
int isig, state; sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGUSR1);
sigaddset(&set, SIGUSR2);
sigaddset(&set, SIGTERM); cout << "signal_handler called" << endl;
for(;;)
{
state = sigwait(&set, &isig);
cout << "sigwait : " << isig << endl;
if(state != )
{
fprintf(stderr, "wrong state %d\n", state);
continue;
}
if(isig == SIGUSR1)
{
cout << "SIGUSR1 " << endl;
char msg[];
memset(msg, , sizeof(msg));
snprintf(msg, sizeof(msg), "signal_handler: received signal=%d(thread=%d)\n", isig, (int)pthread_self());
write(m->pfds[], msg, strlen(msg));
}
else if(isig == SIGUSR2)
{
cout << "SIGUSR2 " << endl;
pthread_mutex_lock(&m->mutex);
m->quit = ;
write(m->pfds[], "quit", strlen("quit"));
pthread_cond_broadcast(&m->cond);
pthread_mutex_unlock(&m->mutex); //when quit, send "quit" to producer or it will block on select break;
}
else
{
cout << "SIG OTHER quit" << endl;
break;
}
}
cout << "signal_handler quit" << endl;
return NULL;
} static void
start_thread()
{
pthread_t pids[THREAD_NUM + ];
struct monitor *m = (struct monitor *)malloc(sizeof(*m));//(struct monitor *)malloc(sizeof(*m));
if (m == NULL)
{
fprintf(stderr, "create monitor failed");
exit();
}
if(pipe(m->pfds))
{
fprintf(stderr, "%s: pipe failed\n", __FUNCTION__);
exit();
} m->count = THREAD_NUM;
m->sleep = ;
m->quit = ;
if(pthread_mutex_init(&(m->mutex), NULL) != || pthread_cond_init(&(m->cond), NULL) != )
{
fprintf(stderr, "mutex or cond init failed");
exit();
} int rc;
sigset_t set; sigemptyset(&set);
sigaddset(&set, SIGUSR1);
sigaddset(&set, SIGUSR2);
sigaddset(&set, SIGQUIT);
rc = pthread_sigmask(SIG_BLOCK, &set, NULL);
if(rc != )
{
fprintf(stderr, "%s pthread_sigmask failed\n", __FUNCTION__);
exit();
} thread_create(&pids[], m, signal_handler);
thread_create(&pids[], m, spoiling); //spoiling thread , check if the g_list is empty
thread_create(&pids[], m, producer); //producer thread int i;
for (i = ; i < THREAD_NUM + ; i++)
{
thread_create(&pids[i], m, consumer); //consumer thread
} for (i = ; i < THREAD_NUM + ; i++)
{
pthread_join(pids[i], NULL);
} free_monitor(m);
} int
main(int argc, char *argv[])
{
cout << "-----------------start---------------------" << endl;
start_thread();
cout << "------------------end----------------------" << endl;
}

mutex.h

#ifndef __MUTEX__H__
#define __MUTEX__H__ #include <list> #include <pthread.h> class MyMutex
{
public:
MyMutex(pthread_mutex_t& m);
~MyMutex(); void Lock();
void UnLock(); private:
pthread_mutex_t& m_m; }; class MyMutexGuard
{
public:
MyMutexGuard(pthread_mutex_t& m);
~MyMutexGuard(); private:
MyMutex mm; }; #endif

mutex.cpp

#include "mutex.h"

MyMutex::MyMutex(pthread_mutex_t& m) : m_m(m)
{
} MyMutex::~MyMutex()
{
} void MyMutex::Lock()
{
pthread_mutex_lock(&m_m);
} void MyMutex::UnLock()
{
pthread_mutex_unlock(&m_m);
} MyMutexGuard::MyMutexGuard(pthread_mutex_t& m):mm(m)
{
mm.Lock();
} MyMutexGuard::~MyMutexGuard()
{
mm.UnLock();
}

List.h

#ifndef __LIST_HEAD__
#define __LIST_HEAD__ #include "mutex.h" #include <list>
using std::list;
#ifndef _WIN32
#include <pthread.h>
#endif template<typename T>
class List
{
public:
List();
List(const list<T> &l);
virtual ~List(); T Pop();
void Push(const T t);
bool Empty();
int get_job_num();
private:
void init();
void destroy(); private:
bool m_init;
list<T> my_list;
pthread_mutex_t mm; }; #include "List.cpp" #endif

List.cpp

#include "List.h"
#include "mutex.h" template<typename T>
List<T>::List()
:m_init(false)
{ } template<typename T>
List<T>::List(const list<T> &l)
:m_init(false)
{ } template<typename T>
List<T>::~List()
{
destroy();
} template<typename T>
void List<T>::Push(const T t)
{
MyMutexGuard g(mm);
my_list.push_back(t);
} template<typename T>
T List<T>::Pop()
{
MyMutexGuard g(mm); if(my_list.empty())
{
return NULL;
}
else
{
T tt = my_list.front();
my_list.pop_front(); return tt;
}
} template<typename T>
bool List<T>::Empty()
{
MyMutexGuard g(mm);
return my_list.empty();
} template<typename T>
void List<T>::init()
{
if(!m_init)
{
m_init = (pthread_mutex_init(&mm, NULL) == );
} return m_init;
} template<typename T>
void List<T>::destroy()
{
pthread_mutex_destroy(&mm);
} template<typename T>
int List<T>::get_job_num()
{
MyMutexGuard g(mm);
return my_list.size();
}
上一篇:weblogic之CVE-2018-3191漏洞分析


下一篇:GitBash上传代码不计入贡献的问题处理