线程池原理及实现
1.什么是线程池
线程池是一种多线程处理形式,提前将线程创建好,放入队列中进行管理。有任务需要处理时,将任务分配到具体的线程进行执行。减少线程的创建、销毁和切换,来提高性能。
2.线程池原理
大量的任务需要处理,肯定会考虑使用多线程。通常是一个任务就直接调用一个线程去执行,效率不高。所以就有了线程池,提前创建好定量线程,在队列中等待任务,有任务就分配具体线程去执行任务,回调的函数和参数都是通过任务的形式提交给线程,避免了线程的创建和销毁,减少了线程的切换。既然是定量这个量到底是多少。
3.我设计的线程池
3.1 数据结构
我采用了一个任务队列(单向链表)和两个线程队列(双向链表):空闲队列和忙碌队列。
- 任务结点 : 里面存放每个具体的任务信息
typedef struct _TaskNode {
struct _ThreadPool *thread_pool;
void *args; // fun arg
void *(working)(void); // the real work of the task
int task_id; // task id
struct _TaskNode *next;
}TaskNode;
- 指向ThreadPool的指针,为了从任务结点中指向全局变量。
- 需要完成的任务(函数)的参数
- 需要完成的任务(函数的地址)
- 任务id号
- 指向下一个任务
- 任务队列 : 里面存放整个任务队列的信息
typedef struct _TaskQueue {
pthread_mutex_t mutex;
// when no task, the manager thread wait for; when a new task come, signal
pthread_cond_t cond;
struct _TaskNode *head; // point to the task_link head
struct _TaskNode *rear; // point to the task_link rear
// current number of task, include unassigned and assigned but no finished
int number;
// task id initialize 0, add 1 when add task to task link
int task_id;
}TaskQueue;
- 任务队列互斥锁,添加任务和取出任务的时候需要使用互斥锁
- 任务队列条件锁,管理线程会等待任务,添加任务时需要通知管理线程
- 任务队列的头结点
- 任务队列的尾结点
- 任务队列总任务数
- 当前任务编号,用于下次任务分配id
- 线程结点 : 保存每个线程工作需要的信息
typedef struct _PthreadNode {
// the pid of this thread in kernel, the value is syscall return
pthread_t tid;
struct _ThreadPool *thread_pool;
struct _TaskNode *work; // if exec a work, which work
struct _PthreadNode *next;
struct _PthreadNode *prev;
pthread_cond_t cond; // when assigned a task, signal this child thread by manager.
pthread_mutex_t mutex;
}PthreadNode;
- 线程id号,删除线程、等待线程和方便调试
- 指向ThreadPool的指针,为了从任务结点中指向全局变量。
- 指向任务结点的指针,为空则线程空闲,否则执行当前任务
- 指向下一个线程结点
- 指向上一个线程结点
- 线程的条件锁,用于管理线程分配任务后唤醒当前线程
- 线程的互斥锁,用于条件锁
- 线程队列 : 存放线程队列的所有信息
typedef struct _PthreadQueue {
int number; // the number of thread in this queue.
struct _PthreadNode *head;
struct _PthreadNode *rear;
// when no idle thread, the manager wait for, or when a thread return with idle, signal
pthread_cond_t cond;
pthread_mutex_t mutex;
}PthreadQueue;
- 线程队列中线程的数量
- 线程队列的头结点
- 线程队列的尾结点
- 线程队列的条件锁,用于管理线程等待空闲线程。
- 线程队列的互斥锁,用于删除线程和添加线程。
- 信息 :存放利于调试的信息,整个程序的动向。
typedef struct _Info {
FILE *info_fd;
struct timeval time_begin;
pthread_mutex_t mutex_info;
}Info;
- 信息存放文件句柄
- 程序开始时间
- 信息的互斥锁,避免多个线程同时写文件
- 全局变量 : 将所有全局变量封装到结构体中,可以同时创建多个线程池
typedef struct _ThreadPool {
PthreadQueue *pthread_queue_idle; // the idle thread double link queue.
PthreadQueue *pthread_queue_busy; // the work thread double link queue.
TaskQueue *task_queue_head; // the task queue single link list
int pthread_current_count; // the count of current pthread
Info info;
} ThreadPool;
- 空闲队列指针
- 忙碌队列指针
- 任务队列指针
- 线程总数
- 信息
3.2 动态线程数的设计
我把线程池的数量设计成了动态的形式,默认数量是8个线程。
- 达到增长的点(线程占用率达到1,所有线程都处于忙碌忙碌状态)并且小于线程最大数量(我设置为1024)则将线程增长25%(新增数量 = 当前线程数 * 25%, 如果大于最大线程数,则 = 最大线程数 - 当前线程数),将新建线程依次插入到空闲队列的尾部。
- 达到删除的点(线程占用率达到0.6,工作线程 / 总线程数)并且大于线程最小数量(我设置为8)则将线程删除30%(删除数量 = 当前线程数 * 30%,如果小于最小线程数,则 = 当前线程数 - 最小线程数),将空闲队列尾部的对应个数删除。
删除线程使用的pthread_cancel() 调用这个函数之后我又调用了pthread_join()来等待待删除线程的结束,然后再释放内存(吐过核,找了好久才发现原因,在动态删除的过程中,恰好有线程刚执行完任务并发现没有任务了,将自己插入到了空闲队列的尾部,并释放空闲队列锁。然后监视线程拿到了锁执行删除线程,pthread_cancel()要到取消点或者阻塞点才会结束进程,删除线程并释放空间后,待删除线程还在执行然后调用了释放的空间就吐核了)
3.3 具体操作流程
- 对外接口 : InitSystem(初始化线程池)和ThreadPoolRequireTask(请求任务)。
- 信息 : debug的情况下编译时添加-define INFO_FLAG则可以打开信息,记录每个线程的调用情况,任务的处理情况,管理线程的增加和删除情况,管理线程没0.5秒查看一系列局部变量写入信息中。
- InitSystem(初始化线程),初始化所有全局变量并调用CreatePthreadPool创建默认数量的空闲线程,创建PthreadManager(管理线程)、Monitor(监视线程)。
- ThreadPoolRequireTask(请求任务),传入全局变量,回调的函数的参数以及回调函数,创建新的任务并加入任务队列,释放任务队列锁并重新锁住任务队列(如果有忙碌线程刚好执行完任务再次查看是否有任务时,可以拿到该任务,减少线程的切换),如果有任务则通知管理线程。
- CreatePthreadPool,用于创建线程,传入全局变量和创建线程数,将创建好的线程添加到空闲线程,线程执行函数为ChildWork。
- ChildWork(线程执行函数),内部死循环:没有工作则等待自身的条件锁信号(管理线程唤醒),有工作则执行工作,然后释放回调函数的参数和执行完的任务。查看任务队列中是否有任务,有任务则拿一个任务重新循环,没有任务则将线程从忙碌队列中加入到空闲队列尾继续循环。
- PthreadManager(管理线程),等待空闲队列有空闲线程,取出第一个空闲线程,不断开空闲队列的连接。循环等待任务队列有任务(可能会被其它线程抢),取出第一个任务分配给取出的线程。断开取出线程的连接,通知该线程。
- Monitor(监视线程),如果定义INFO_FLAG则每0.5秒查看空闲线程数量、忙碌线程数量、线程总数、线程占用率、待处理任务数量和当前任务id并写入信息中。满足线程增长条件则创建线程,否则判断是否满足删除条件,满足则删除线程。
4.代码实现
pthread_pool.h
/******************************************************************************
*
* Copyright (C), 2001-2005, Huawei Tech. Co., Ltd.
*
*******************************************************************************
* File Name : pthread_pool.h
* Version : Initial Draft
* Author : sst
* Created : 2021/3/28
* Last Modified :
* Description : pthread_pool.c header file
* Function List :
*
*
* History:
*
* 1. Date : 2021/3/28
* Author : sst
* Modification : Created file
*
******************************************************************************/
#ifndef __PTHREAD_POOL_H__
#define __PTHREAD_POOL_H__
#ifdef __cplusplus
#if __cplusplus
extern "C"{
#endif
#endif /* __cplusplus */
/*==============================================*
* include header files *
*----------------------------------------------*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/sem.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <unistd.h>
/*==============================================*
* constants or macros define *
*----------------------------------------------*/
#define THREAD_MAX_NUM 1024 // max number of thread pool
#define THREAD_DEF_NUM 8 // by default, number of thread
#define THREAD_MIN_NUM 8 // min number of thread pool
#define THREAD_ADD_POINT 1.0 // %, malloc new thread when usage rate greater then it
#define THREAD_DEL_POINT 0.6 // %, free idle thread when usage rate less then it
#define THREAD_ADD_RATE 0.25 // %, malloc new thread = sum thread * it
#define THREAD_DEL_RATE 0.3 // %, free idle thread = sum thread * it
#define INFO_MAX 1024
/*==============================================*
* project-wide global variables *
*----------------------------------------------*/
/*==============================================*
* routines' or functions' implementations *
*----------------------------------------------*/
struct _ThreadPool;
// ds of the every task. make all task in a single link
typedef struct _TaskNode {
struct _ThreadPool *thread_pool;
void *args; // fun arg
void *(*working)(void*); // the real work of the task
int task_id; // task id
struct _TaskNode *next;
}TaskNode;
// the ds of the task_queue
typedef struct _TaskQueue {
pthread_mutex_t mutex;
// when no task, the manager thread wait for; when a new task come, signal
pthread_cond_t cond;
struct _TaskNode *head; // point to the task_link head
struct _TaskNode *rear; // point to the task_link rear
// current number of task, include unassigned and assigned but no finished
int number;
// task id initialize 0, add 1 when add task to task link
int task_id;
}TaskQueue;
// the ds of every thread, mask all thread in a double link queue.
typedef struct _PthreadNode {
// the pid of this thread in kernel, the value is syscall return
pthread_t tid;
struct _ThreadPool *thread_pool;
struct _TaskNode *work; // if exec a work, which work
struct _PthreadNode *next;
struct _PthreadNode *prev;
pthread_cond_t cond; // when assigned a task, signal this child thread by manager.
pthread_mutex_t mutex;
}PthreadNode;
// the ds of the thread queue
typedef struct _PthreadQueue {
int number; // the number of thread in this queue.
struct _PthreadNode *head;
struct _PthreadNode *rear;
// when no idle thread, the manager wait for, or when a thread return with idle, signal
pthread_cond_t cond;
pthread_mutex_t mutex;
}PthreadQueue;
typedef struct _Info {
FILE *info_fd;
struct timeval time_begin;
pthread_mutex_t mutex_info;
}Info;
typedef struct _ThreadPool {
PthreadQueue *pthread_queue_idle; // the idle thread double link queue.
PthreadQueue *pthread_queue_busy; // the work thread double link queue.
TaskQueue *task_queue_head; // the task queue single link list
int pthread_current_count; // the count of current pthread
Info info;
} ThreadPool;
extern void* ChildWork(void *args);
extern void CloseInfo(Info *info);
extern void CreatePthreadPool
(ThreadPool *thread_pool, const int count);
extern void InitSystem(ThreadPool *thread_pool);
extern void* Monitor(void *args);
extern void OpenInfo(Info *info, const char *info_path);
extern void* PthreadManager(void *args);
extern void PthreadQueueAdd(PthreadQueue *pthread_queue,
PthreadNode* pthread_node);
extern void PthreadQueueUnlink(PthreadQueue* pthread_queue,
PthreadNode* pthread_node);
extern void TaskAddNew(TaskNode *new_task);
extern void TaskGetNew(PthreadNode *pthread_node);
extern void ThreadIdleDel(ThreadPool *thread_pool, int count);
extern int ThreadPoolRequireTask(ThreadPool *thread_pool, void *args,
void *(*working)(void *args));
extern void TimeCurrentSpend(struct timeval *time_old, char *info);
extern void WriteInfo(Info *info, char *buffer);
#ifdef __cplusplus
#if __cplusplus
}
#endif
#endif /* __cplusplus */
#endif /* __PTHREAD_POOL_H__ */
pthread_pool.c
#include "pthread_pool.h"
void OpenInfo(Info *info, const char *info_path) {
info->info_fd = fopen(info_path, "w");
}
// lock log and write log
void WriteInfo(Info *info, char *buffer) {
TimeCurrentSpend(&info->time_begin, buffer);
pthread_mutex_lock(&info->mutex_info);
fwrite(buffer, strlen(buffer), 1, info->info_fd);
fflush(info->info_fd);
pthread_mutex_unlock(&info->mutex_info);
}
void CloseInfo(Info *info) {
fclose(info->info_fd);
}
// spend time = current time - old time
// day + hours + minute + second + millisecond + mincrosecond + info
void TimeCurrentSpend(struct timeval *time_old, char *info)
{
struct timeval now;
gettimeofday(&now, NULL);
if(now.tv_usec < time_old->tv_usec)
{
now.tv_sec--;
now.tv_usec += 1000000;
}
time_t sec = now.tv_sec - time_old->tv_sec;
char info_old[strlen(info) + 1];
strcpy(info_old, info);
sprintf(info, "%2ldd,%2ldh,%2ldm,%2lds,%4ldms,%4ldus : %s",
sec /8640, // day
(sec / 360) % 24, // hours
(sec / 60) % 60, // minute
sec % 60, // second
(now.tv_usec - time_old->tv_usec) / 1000, // millisecond
(now.tv_usec - time_old->tv_usec) % 1000, // mincrosecond
info_old); // info
}
// must malloc args memory and not must free
int ThreadPoolRequireTask(ThreadPool *thread_pool, void *args,
void *(*working)(void *args)) {
TaskNode *new_task = (TaskNode*)malloc(sizeof(TaskNode));
if (new_task == NULL) {
perror("ThreadPoolRequireTask malloc failure ");
return -1;
}
new_task->thread_pool = thread_pool;
new_task->args = args;
new_task->working = working;
new_task->next = NULL;
TaskQueue *task_queue_head = new_task->thread_pool->task_queue_head;
TaskAddNew(new_task);
#ifdef INFO_FLAG
pthread_t tid = syscall(SYS_gettid);
char info_buf[INFO_MAX];
sprintf(info_buf, "task_manager : %ld, add task id : %d\n",
tid, new_task->task_id);
WriteInfo(&new_task->thread_pool->info, info_buf);
#endif
// again require mutex lock for reduce thread status change
pthread_mutex_lock(&task_queue_head->mutex);
if (task_queue_head->number) {
pthread_cond_signal(&task_queue_head->cond);
}
pthread_mutex_unlock(&task_queue_head->mutex);
return 0;
}
// need lock PthreadNode and TaskQueue at call
void TaskGetNew(PthreadNode *pthread_node) {
TaskQueue *task_queue_head = pthread_node->thread_pool->task_queue_head;
pthread_node->work = task_queue_head->head;
// get the first task and modify self thread attribute
if (task_queue_head->number == 1) {
task_queue_head->head = task_queue_head->rear = NULL;
} else {
task_queue_head->head = task_queue_head->head->next;
}
task_queue_head->number--;
pthread_node->work->next = NULL;
}
void TaskAddNew(TaskNode *new_task) {
/*
initial the attribute of the task.
because this task havn't add to system, so no need lock the mutex.
*/
TaskQueue *task_queue_head = new_task->thread_pool->task_queue_head;
// add the set task node to task link
pthread_mutex_lock(&task_queue_head->mutex);
new_task->task_id = ++task_queue_head->task_id;
// find the tail of the task link and add the new one to tail
if(!task_queue_head->number) {
task_queue_head->head = task_queue_head->rear = new_task;
} else {
task_queue_head->rear->next = new_task;
task_queue_head->rear = new_task;
}
task_queue_head->number++;
pthread_mutex_unlock(&task_queue_head->mutex);
}
// delete pthread node from pthread queue but don't destory
void PthreadQueueUnlink(PthreadQueue* pthread_queue,
PthreadNode* pthread_node) {
// no task need to exec, add self to idle queue and del from busy queue
// self is the last execute thread
pthread_mutex_lock(&pthread_queue->mutex);
if(pthread_queue->head == pthread_node
&& pthread_queue->rear == pthread_node) {
pthread_queue->head = pthread_queue->rear = NULL;
} else if(pthread_queue->head == pthread_node
&& pthread_queue->rear != pthread_node) {
// the first one thread in pthread queue
pthread_queue->head = pthread_queue->head->next;
pthread_queue->head->prev = NULL;
} else if(pthread_queue->head != pthread_node
&& pthread_queue->rear == pthread_node) {
// the last one thread in pthread queue
pthread_queue->rear = pthread_queue->rear->prev;
pthread_queue->rear->next = NULL;
} else {
// middle one
pthread_node->next->prev = pthread_node->prev;
pthread_node->prev->next = pthread_node->next;
}
pthread_queue->number--;
pthread_node->next = pthread_node->prev = NULL;
pthread_mutex_unlock(&pthread_queue->mutex);
}
// add PthreadNode to PthreadQueue head
void PthreadQueueAdd(PthreadQueue *pthread_queue,
PthreadNode* pthread_node) {
// now the idle queue is empty
pthread_mutex_lock(&pthread_queue->mutex);
if(pthread_queue->head == NULL &&
pthread_queue->rear == NULL) {
pthread_queue->head = pthread_queue->rear = pthread_node;
pthread_node->next = pthread_node->prev = NULL;
} else {
pthread_queue->rear->next = pthread_node;
pthread_node->next = NULL;
pthread_node->prev = pthread_queue->rear;
pthread_queue->rear = pthread_node;
}
pthread_queue->number++;
pthread_mutex_unlock(&pthread_queue->mutex);
}
/*
child_work : the code exec in child thread
ptr : the ds of thread_node of current thread.
return : nothing. void* just avoid warning.
*/
void* ChildWork(void *args) {
// restore the parameters
PthreadNode *self = (PthreadNode*)args;
ThreadPool *thread_pool = self->thread_pool;
// write log
#ifdef INFO_FLAG
char info_buf[INFO_MAX];
sprintf(info_buf, "%ld thread prepare\n", self->tid);
WriteInfo(&thread_pool->info, info_buf);
#endif
while(1) {
// if no task exec, blocked
pthread_mutex_lock(&self->mutex);
if(self->work == NULL) {
pthread_cond_wait(&self->cond, &self->mutex);
}
pthread_mutex_unlock(&self->mutex);
#ifdef INFO_FLAG
sprintf(info_buf, "%ld thread working, task id is %d\n",
self->tid, self->work->task_id);
WriteInfo(&thread_pool->info, info_buf);
#endif
// execute the real work and destroy
self->work->working(self->work->args);
// destroy the task space
free(self->work->args);
args = NULL;
self->work->working = NULL;
self->work->next = NULL;
free(self->work);
self->work = NULL;
/*
get new task from the task_link if not NULL.
there are no idle thread if there are task to do.
if there are not task, mask self idle and add to the idle queue.
*/
pthread_mutex_lock(&thread_pool->task_queue_head->mutex);
if(thread_pool->task_queue_head->number > 0) {
TaskGetNew(self);
pthread_mutex_unlock(&thread_pool->task_queue_head->mutex);
continue;
} else {
pthread_mutex_unlock(&thread_pool->task_queue_head->mutex);
PthreadQueueUnlink(thread_pool->pthread_queue_busy, self);
PthreadQueueAdd(thread_pool->pthread_queue_idle, self);
pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
pthread_cond_signal(&thread_pool->pthread_queue_idle->cond);
pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex);
}
}
return NULL;
}
/*
create thread pool when the system on, and thread number is THREAD_DEF_NUM.
when niit, initial all the thread into a double link queue and all wait for self->cond.
*/
void CreatePthreadPool
(ThreadPool *thread_pool, const int count) {
// init as a double link queue
int i;
PthreadNode *temp, *head = NULL, *prev = NULL;
for(i = 0; i < count; ++i) {
temp = (PthreadNode*)malloc(sizeof(PthreadNode));
if (head == NULL) head = temp;
if(temp == NULL) {
perror("malloc failure ");
exit(1);
}
temp->thread_pool = thread_pool;
temp->work = NULL;
pthread_cond_init(&temp->cond, NULL);
pthread_mutex_init(&temp->mutex, NULL);
temp->prev = prev;
if(prev != NULL) {
prev->next = temp;
}
// create this thread
pthread_create(&temp->tid, NULL, ChildWork, (void*)temp);
prev = temp;
}
temp->next = NULL;
// modify the idle thread queue attribute
pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
thread_pool->pthread_current_count += count;
thread_pool->pthread_queue_idle->number += count;
if (thread_pool->pthread_queue_idle->head == NULL) {
thread_pool->pthread_queue_idle->head = head;
thread_pool->pthread_queue_idle->rear = temp;
} else {
thread_pool->pthread_queue_idle->rear->next = head;
}
pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex);
}
// init_system : init the system glob pointor.
void InitSystem(ThreadPool *thread_pool) {
#ifdef INFO_FLAG
pthread_mutex_init(&thread_pool->info.mutex_info, NULL);
gettimeofday(&thread_pool->info.time_begin, NULL);
OpenInfo(&thread_pool->info, "infomation.txt");
#endif
thread_pool->pthread_current_count = 0;
// init the pthread_queue_idle
thread_pool->pthread_queue_idle = (PthreadQueue*)malloc(sizeof(PthreadQueue));
thread_pool->pthread_queue_idle->number = 0;
thread_pool->pthread_queue_idle->head = NULL;
thread_pool->pthread_queue_idle->rear = NULL;
pthread_mutex_init(&thread_pool->pthread_queue_idle->mutex, NULL);
pthread_cond_init(&thread_pool->pthread_queue_idle->cond, NULL);
// init the pthread_queue_busy
thread_pool->pthread_queue_busy = (PthreadQueue*)malloc(sizeof(PthreadQueue));
thread_pool->pthread_queue_busy->number = 0;
thread_pool->pthread_queue_busy->head = NULL;
thread_pool->pthread_queue_busy->rear = NULL;
pthread_mutex_init(&thread_pool->pthread_queue_busy->mutex, NULL);
pthread_cond_init(&thread_pool->pthread_queue_busy->cond, NULL);
// init the task_queue_head
thread_pool->task_queue_head = (TaskQueue*)malloc(sizeof(TaskQueue));
thread_pool->task_queue_head->head = NULL;
thread_pool->task_queue_head->number = 0;
thread_pool->task_queue_head->task_id = 0;
pthread_mutex_init(&thread_pool->task_queue_head->mutex, NULL);
pthread_cond_init(&thread_pool->task_queue_head->cond, NULL);
// create thread pool
CreatePthreadPool(thread_pool, THREAD_DEF_NUM);
// create thread of manage
pthread_t thread_manage_id;
pthread_create(&thread_manage_id, NULL, PthreadManager, (void*)thread_pool);
// create thread of manage
pthread_t thread_monitor_id;
pthread_create(&thread_monitor_id, NULL, Monitor, (void*)thread_pool);
}
/*
PthreadManage : allocate tasks to idle threads.
block on pthread_queue_idle->cond when no idle thread
block on task_queue_head->cond when no task come.
ptr : no used, in order to avoid warning.
return : nothing.
*/
void* PthreadManager(void *args)
{
ThreadPool *thread_pool = (ThreadPool*)args;
#ifdef INFO_FLAG
pthread_t tid = syscall(SYS_gettid);
char info_buf[INFO_MAX];
sprintf(info_buf, "PthreadManager : %ld\n", tid);
WriteInfo(&thread_pool->info, info_buf);
#endif
while(1)
{
PthreadNode *temp_thread = NULL;
// get a new idle thread, and modify the idle_queue.
// if no idle thread, block on pthread_queue_idle->cond.
pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
if(thread_pool->pthread_queue_idle->number == 0) {
pthread_cond_wait(&thread_pool->pthread_queue_idle->cond,
&thread_pool->pthread_queue_idle->mutex);
}
temp_thread = thread_pool->pthread_queue_idle->head;
pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex);
// get a new task, and modify the task_queue.
// if no task block on task_queue_head->cond.
pthread_mutex_lock(&thread_pool->task_queue_head->mutex);
while(thread_pool->task_queue_head->number == 0) {
pthread_cond_wait(&thread_pool->task_queue_head->cond,
&thread_pool->task_queue_head->mutex);
}
TaskGetNew(temp_thread);
pthread_mutex_unlock(&thread_pool->task_queue_head->mutex);
// modify the idle thread attribute.
// PthreadNode of pthread_queue_idle only allow access to PthreadManage
// so not have to lock temp_thread
PthreadQueueUnlink(thread_pool->pthread_queue_idle, temp_thread);
PthreadQueueAdd(thread_pool->pthread_queue_busy, temp_thread);
#ifdef INFO_FLAG
sprintf(info_buf, "%ld manager thread : %ld tid, %d work_id\n",
tid, temp_thread->tid, temp_thread->work->task_id);
WriteInfo(&thread_pool->info, info_buf);
#endif
// signal the child thread to exec the work
pthread_mutex_lock(&temp_thread->mutex);
pthread_cond_signal(&temp_thread->cond);
pthread_mutex_unlock(&temp_thread->mutex);
}
}
/*
monitor : get the system info
ptr : not used, only to avoid warning for pthread_create
return : nothing
*/
void* Monitor(void *args)
{
ThreadPool *thread_pool = (ThreadPool*)args;
#ifdef INFO_FLAG
char info_buf[INFO_MAX];
sprintf(info_buf, "monitor : %ld\n", syscall(SYS_gettid));
WriteInfo(&thread_pool->info, info_buf);
#endif
int idle_num, busy_num, task_num, task_id;
while(1) {
idle_num = thread_pool->pthread_queue_idle->number;
busy_num = thread_pool->pthread_queue_busy->number;
task_num = thread_pool->task_queue_head->number;
task_id = thread_pool->task_queue_head->task_id;
double usage_rate = (1.0 * busy_num) / (thread_pool->pthread_current_count);
#ifdef INFO_FLAG
sprintf(info_buf, "monitor : %ld\n", syscall(SYS_gettid));
sprintf(info_buf, "%s********************************\n", info_buf);
sprintf(info_buf, "%sidle number : %d\n", info_buf, idle_num);
sprintf(info_buf, "%sbusy number : %d\n", info_buf, busy_num);
sprintf(info_buf, "%sthread sum : %d\n", info_buf, thread_pool->pthread_current_count);
sprintf(info_buf, "%susage rate : %lf\n", info_buf, usage_rate);
sprintf(info_buf, "%stask number : %d\n", info_buf, task_num);
sprintf(info_buf, "%stask id : %d\n", info_buf, task_id);
sprintf(info_buf, "%s ******************************\n\n", info_buf);
WriteInfo(&thread_pool->info, info_buf);
#endif
// dynamic manage thread count
if (usage_rate >= THREAD_ADD_POINT &&
thread_pool->pthread_current_count < THREAD_MAX_NUM) {
int count = thread_pool->pthread_current_count * THREAD_ADD_RATE;
if(thread_pool->pthread_current_count + count > THREAD_MAX_NUM)
count = THREAD_MAX_NUM - thread_pool->pthread_current_count;
#ifdef INFO_FLAG
sprintf(info_buf, "monitor : %ld, create pthread pool %d\n",
syscall(SYS_gettid), count);
WriteInfo(&thread_pool->info, info_buf);
#endif
CreatePthreadPool(thread_pool, count);
pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
pthread_cond_signal(&thread_pool->pthread_queue_idle->cond);
pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex);
} else if (usage_rate <= THREAD_DEL_POINT &&
thread_pool->pthread_current_count > THREAD_MIN_NUM) {
int count = thread_pool->pthread_current_count * THREAD_DEL_RATE;
if(thread_pool->pthread_current_count - count < THREAD_MIN_NUM)
count = thread_pool->pthread_current_count - THREAD_MIN_NUM;
#ifdef INFO_FLAG
sprintf(info_buf, "monitor : %ld, thread idle delete %d\n",
syscall(SYS_gettid), count);
WriteInfo(&thread_pool->info, info_buf);
#endif
ThreadIdleDel(thread_pool, count);
}
usleep(500000);
}
return NULL;
}
void ThreadIdleDel(ThreadPool *thread_pool, const int count) {
PthreadNode *thread_del, *thread_tmp = NULL;
int i, end = thread_pool->pthread_queue_idle->number - count;
pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
thread_del = thread_pool->pthread_queue_idle->head;
for (i = 0; i < end; i++) {
thread_del = thread_del->next;
}
thread_pool->pthread_queue_idle->rear = thread_del->prev;
thread_del->prev->next = NULL;
thread_del->prev = NULL;
thread_pool->pthread_current_count -= count;
thread_pool->pthread_queue_idle->number -= count;
pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex);
for (i = 0; i < count; i++) {
thread_tmp = thread_del;
thread_del = thread_del->next;
pthread_cancel(thread_tmp->tid);
pthread_join(thread_tmp->tid, NULL);
pthread_mutex_destroy(&thread_tmp->mutex);
pthread_cond_destroy(&thread_tmp->cond);
free(thread_tmp);
thread_tmp = NULL;
}
}
test_project.c
#include "lib/pthread_pool.h"
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <unistd.h>
int SockCreate()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1) {
perror("socket error ");
}
return sockfd;
}
int SockConnect(int fd, struct sockaddr_in *addr) {
int res = connect(fd, (struct sockaddr*)addr,
sizeof(struct sockaddr_in));
if(res < 0) {
perror("connect failure ");
}
return res;
}
void *SendInfo(void *args) {
struct sockaddr_in *addr = (struct sockaddr_in*)args;
int sockfd = SockCreate();
if (sockfd == -1) return NULL;
if (SockConnect(sockfd, addr) < 0) return NULL;
char buffer[1024] = "hello world!\n";
send(sockfd, buffer, strlen(buffer), 0);
close(sockfd);
return NULL;
}
int main(int argc, char **argv)
{
if (argc < 3) {
printf("./sortware ip port\n");
exit(errno);
}
const char *ip = argv[1];
int port = atoi(argv[2]);
ThreadPool thread_pool;
InitSystem(&thread_pool);
int i, size = sizeof(struct sockaddr_in);
while(1) {
for(i = 0; i < 100; i++) {
struct sockaddr_in *addr = (struct sockaddr_in*)malloc(size);
addr->sin_family = AF_INET;
addr->sin_addr.s_addr = inet_addr(ip);
addr->sin_port = htons(port);
ThreadPoolRequireTask(&thread_pool, (void*)addr, SendInfo);
}
char in = getchar();
if(in == 'q') break;
}
return 0;
}