线程池原理及设计与实现

线程池原理及实现

1.什么是线程池

线程池是一种多线程处理形式,提前将线程创建好,放入队列中进行管理。有任务需要处理时,将任务分配到具体的线程进行执行。减少线程的创建、销毁和切换,来提高性能。

2.线程池原理

大量的任务需要处理,肯定会考虑使用多线程。通常是一个任务就直接调用一个线程去执行,效率不高。所以就有了线程池,提前创建好定量线程,在队列中等待任务,有任务就分配具体线程去执行任务,回调的函数和参数都是通过任务的形式提交给线程,避免了线程的创建和销毁,减少了线程的切换。既然是定量这个量到底是多少

3.我设计的线程池

3.1 数据结构

我采用了一个任务队列(单向链表)和两个线程队列(双向链表):空闲队列和忙碌队列。

  • 任务结点 : 里面存放每个具体的任务信息
    typedef struct _TaskNode {
    struct _ThreadPool *thread_pool;
    void *args; // fun arg
    void *(working)(void); // the real work of the task
    int task_id; // task id
    struct _TaskNode *next;
    }TaskNode;
  1. 指向ThreadPool的指针,为了从任务结点中指向全局变量。
  2. 需要完成的任务(函数)的参数
  3. 需要完成的任务(函数的地址)
  4. 任务id号
  5. 指向下一个任务
  • 任务队列 : 里面存放整个任务队列的信息
    typedef struct _TaskQueue {
    pthread_mutex_t mutex;
    // when no task, the manager thread wait for; when a new task come, signal
    pthread_cond_t cond;
    struct _TaskNode *head; // point to the task_link head
    struct _TaskNode *rear; // point to the task_link rear
    // current number of task, include unassigned and assigned but no finished
    int number;
    // task id initialize 0, add 1 when add task to task link
    int task_id;
    }TaskQueue;
  1. 任务队列互斥锁,添加任务和取出任务的时候需要使用互斥锁
  2. 任务队列条件锁,管理线程会等待任务,添加任务时需要通知管理线程
  3. 任务队列的头结点
  4. 任务队列的尾结点
  5. 任务队列总任务数
  6. 当前任务编号,用于下次任务分配id
  • 线程结点 : 保存每个线程工作需要的信息
    typedef struct _PthreadNode {
    // the pid of this thread in kernel, the value is syscall return
    pthread_t tid;
    struct _ThreadPool *thread_pool;
    struct _TaskNode *work; // if exec a work, which work
    struct _PthreadNode *next;
    struct _PthreadNode *prev;
    pthread_cond_t cond; // when assigned a task, signal this child thread by manager.
    pthread_mutex_t mutex;
    }PthreadNode;
  1. 线程id号,删除线程、等待线程和方便调试
  2. 指向ThreadPool的指针,为了从任务结点中指向全局变量。
  3. 指向任务结点的指针,为空则线程空闲,否则执行当前任务
  4. 指向下一个线程结点
  5. 指向上一个线程结点
  6. 线程的条件锁,用于管理线程分配任务后唤醒当前线程
  7. 线程的互斥锁,用于条件锁
  • 线程队列 : 存放线程队列的所有信息
    typedef struct _PthreadQueue {
    int number; // the number of thread in this queue.
    struct _PthreadNode *head;
    struct _PthreadNode *rear;
    // when no idle thread, the manager wait for, or when a thread return with idle, signal
    pthread_cond_t cond;
    pthread_mutex_t mutex;
    }PthreadQueue;
  1. 线程队列中线程的数量
  2. 线程队列的头结点
  3. 线程队列的尾结点
  4. 线程队列的条件锁,用于管理线程等待空闲线程。
  5. 线程队列的互斥锁,用于删除线程和添加线程。
  • 信息 :存放利于调试的信息,整个程序的动向。
    typedef struct _Info {
    FILE *info_fd;
    struct timeval time_begin;
    pthread_mutex_t mutex_info;
    }Info;
  1. 信息存放文件句柄
  2. 程序开始时间
  3. 信息的互斥锁,避免多个线程同时写文件
  • 全局变量 : 将所有全局变量封装到结构体中,可以同时创建多个线程池
    typedef struct _ThreadPool {
    PthreadQueue *pthread_queue_idle; // the idle thread double link queue.
    PthreadQueue *pthread_queue_busy; // the work thread double link queue.
    TaskQueue *task_queue_head; // the task queue single link list
    int pthread_current_count; // the count of current pthread
    Info info;
    } ThreadPool;
  1. 空闲队列指针
  2. 忙碌队列指针
  3. 任务队列指针
  4. 线程总数
  5. 信息

3.2 动态线程数的设计

我把线程池的数量设计成了动态的形式,默认数量是8个线程。

  • 达到增长的点(线程占用率达到1,所有线程都处于忙碌忙碌状态)并且小于线程最大数量(我设置为1024)则将线程增长25%(新增数量 = 当前线程数 * 25%, 如果大于最大线程数,则 = 最大线程数 - 当前线程数),将新建线程依次插入到空闲队列的尾部。
  • 达到删除的点(线程占用率达到0.6,工作线程 / 总线程数)并且大于线程最小数量(我设置为8)则将线程删除30%(删除数量 = 当前线程数 * 30%,如果小于最小线程数,则 = 当前线程数 - 最小线程数),将空闲队列尾部的对应个数删除。
    删除线程使用的pthread_cancel() 调用这个函数之后我又调用了pthread_join()来等待待删除线程的结束,然后再释放内存(吐过核,找了好久才发现原因,在动态删除的过程中,恰好有线程刚执行完任务并发现没有任务了,将自己插入到了空闲队列的尾部,并释放空闲队列锁。然后监视线程拿到了锁执行删除线程,pthread_cancel()要到取消点或者阻塞点才会结束进程,删除线程并释放空间后,待删除线程还在执行然后调用了释放的空间就吐核了)

3.3 具体操作流程

  • 对外接口 : InitSystem(初始化线程池)和ThreadPoolRequireTask(请求任务)。
  • 信息 : debug的情况下编译时添加-define INFO_FLAG则可以打开信息,记录每个线程的调用情况,任务的处理情况,管理线程的增加和删除情况,管理线程没0.5秒查看一系列局部变量写入信息中。
  • InitSystem(初始化线程),初始化所有全局变量并调用CreatePthreadPool创建默认数量的空闲线程,创建PthreadManager(管理线程)、Monitor(监视线程)。
  • ThreadPoolRequireTask(请求任务),传入全局变量,回调的函数的参数以及回调函数,创建新的任务并加入任务队列,释放任务队列锁并重新锁住任务队列(如果有忙碌线程刚好执行完任务再次查看是否有任务时,可以拿到该任务,减少线程的切换),如果有任务则通知管理线程。
  • CreatePthreadPool,用于创建线程,传入全局变量和创建线程数,将创建好的线程添加到空闲线程,线程执行函数为ChildWork。
  • ChildWork(线程执行函数),内部死循环:没有工作则等待自身的条件锁信号(管理线程唤醒),有工作则执行工作,然后释放回调函数的参数和执行完的任务。查看任务队列中是否有任务,有任务则拿一个任务重新循环,没有任务则将线程从忙碌队列中加入到空闲队列尾继续循环。
  • PthreadManager(管理线程),等待空闲队列有空闲线程,取出第一个空闲线程,不断开空闲队列的连接。循环等待任务队列有任务(可能会被其它线程抢),取出第一个任务分配给取出的线程。断开取出线程的连接,通知该线程。
  • Monitor(监视线程),如果定义INFO_FLAG则每0.5秒查看空闲线程数量、忙碌线程数量、线程总数、线程占用率、待处理任务数量和当前任务id并写入信息中。满足线程增长条件则创建线程,否则判断是否满足删除条件,满足则删除线程。

4.代码实现

pthread_pool.h

/******************************************************************************
*
*  Copyright (C), 2001-2005, Huawei Tech. Co., Ltd.
*
*******************************************************************************
*  File Name     : pthread_pool.h
*  Version       : Initial Draft
*  Author        : sst
*  Created       : 2021/3/28
*  Last Modified :
*  Description   : pthread_pool.c header file
*  Function List :
*
*
*  History:
* 
*       1.  Date         : 2021/3/28
*           Author       : sst
*           Modification : Created file
*
******************************************************************************/
#ifndef __PTHREAD_POOL_H__
#define __PTHREAD_POOL_H__


#ifdef __cplusplus
#if __cplusplus
extern "C"{
#endif
#endif /* __cplusplus */


/*==============================================*
 *      include header files                    *
 *----------------------------------------------*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/sem.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <unistd.h>

/*==============================================*
 *      constants or macros define              *
 *----------------------------------------------*/
#define THREAD_MAX_NUM 1024     // max number of thread pool
#define THREAD_DEF_NUM 8        // by default, number of thread
#define THREAD_MIN_NUM 8        // min number of thread pool
#define THREAD_ADD_POINT 1.0    // %, malloc new thread when usage rate greater then it
#define THREAD_DEL_POINT 0.6    // %, free idle thread when usage rate less then it
#define THREAD_ADD_RATE 0.25    // %, malloc new thread = sum thread * it
#define THREAD_DEL_RATE 0.3     // %, free idle thread = sum thread * it
#define INFO_MAX 1024
/*==============================================*
 *      project-wide global variables           *
 *----------------------------------------------*/
/*==============================================*
 *      routines' or functions' implementations *
 *----------------------------------------------*/ 
struct _ThreadPool;
// ds of the every task. make all task in a single link
typedef struct _TaskNode {
    struct _ThreadPool *thread_pool;
    void *args;                 // fun arg
    void *(*working)(void*);    // the real work of the task
    int task_id; // task id
    struct _TaskNode *next;
}TaskNode;

// the ds of the task_queue
typedef struct _TaskQueue {
    pthread_mutex_t mutex;
    // when no task, the manager thread wait for; when a new task come, signal
    pthread_cond_t cond;
    struct _TaskNode *head;     // point to the task_link head
    struct _TaskNode *rear;     // point to the task_link rear
    // current number of task, include unassigned and assigned but no finished
    int number;
    // task id initialize 0, add 1 when add task to task link
    int task_id;
}TaskQueue;

// the ds of every thread, mask all thread in a double link queue.
typedef struct _PthreadNode {
    // the pid of this thread in kernel, the value is syscall return
    pthread_t tid;          
    struct _ThreadPool *thread_pool;
    struct _TaskNode *work; // if exec a work, which work
    struct _PthreadNode *next;
    struct _PthreadNode *prev;
    pthread_cond_t cond;    // when assigned a task, signal this child thread by manager.
    pthread_mutex_t mutex;
}PthreadNode;

// the ds of the thread queue
typedef struct _PthreadQueue {
    int number;             // the number of thread in this queue.
    struct _PthreadNode *head;
    struct _PthreadNode *rear;
    // when no idle thread, the manager wait for, or when a thread return with idle, signal
    pthread_cond_t cond;
    pthread_mutex_t mutex;
}PthreadQueue;

typedef struct _Info {
    FILE *info_fd;
    struct timeval time_begin;
    pthread_mutex_t mutex_info;
}Info;

typedef struct _ThreadPool {

   PthreadQueue *pthread_queue_idle;     // the idle thread double link queue.
    PthreadQueue *pthread_queue_busy;     // the work thread double link queue.
    TaskQueue *task_queue_head;           // the task queue single link list
    int pthread_current_count;            // the count of current pthread
    Info info;
} ThreadPool;

extern void* ChildWork(void *args);
extern void CloseInfo(Info *info);
extern void CreatePthreadPool
(ThreadPool *thread_pool, const int count);
extern void InitSystem(ThreadPool *thread_pool);
extern void* Monitor(void *args);
extern void OpenInfo(Info *info, const char *info_path);
extern void* PthreadManager(void *args);
extern void PthreadQueueAdd(PthreadQueue *pthread_queue,
                              PthreadNode* pthread_node);
extern void PthreadQueueUnlink(PthreadQueue* pthread_queue, 
                                 PthreadNode* pthread_node);
extern void TaskAddNew(TaskNode *new_task);
extern void TaskGetNew(PthreadNode *pthread_node);
extern void ThreadIdleDel(ThreadPool *thread_pool, int count);
extern int ThreadPoolRequireTask(ThreadPool *thread_pool, void *args, 
                          void *(*working)(void *args));
extern void TimeCurrentSpend(struct timeval *time_old, char *info);
extern void WriteInfo(Info *info, char *buffer);


#ifdef __cplusplus
#if __cplusplus
}
#endif
#endif /* __cplusplus */


#endif /* __PTHREAD_POOL_H__ */

pthread_pool.c

#include "pthread_pool.h"

void OpenInfo(Info *info, const char *info_path) {
    info->info_fd = fopen(info_path, "w");
}

// lock log and write log
void WriteInfo(Info *info, char *buffer) {
    TimeCurrentSpend(&info->time_begin, buffer);
    pthread_mutex_lock(&info->mutex_info);
    fwrite(buffer, strlen(buffer), 1, info->info_fd);
    fflush(info->info_fd);
    pthread_mutex_unlock(&info->mutex_info);
}

void CloseInfo(Info *info) {
    fclose(info->info_fd);
}

// spend time = current time - old time
// day + hours + minute + second + millisecond + mincrosecond + info
void TimeCurrentSpend(struct timeval *time_old, char *info)
{
    struct timeval now;
    gettimeofday(&now, NULL);
    if(now.tv_usec < time_old->tv_usec)
    {
        now.tv_sec--;
        now.tv_usec += 1000000;
    }
    time_t sec = now.tv_sec - time_old->tv_sec;
    char info_old[strlen(info) + 1];
    strcpy(info_old, info);
    sprintf(info, "%2ldd,%2ldh,%2ldm,%2lds,%4ldms,%4ldus :  %s",
        sec /8640,                                  // day
        (sec / 360) % 24,                           // hours
        (sec / 60) % 60,                            // minute
        sec % 60,                                   // second
        (now.tv_usec - time_old->tv_usec) / 1000,    // millisecond
        (now.tv_usec - time_old->tv_usec) % 1000,    // mincrosecond
        info_old);                                  // info
}

// must malloc args memory and not must free
int ThreadPoolRequireTask(ThreadPool *thread_pool, void *args, 
                          void *(*working)(void *args)) {
    TaskNode *new_task = (TaskNode*)malloc(sizeof(TaskNode));
    if (new_task == NULL) {
        perror("ThreadPoolRequireTask malloc failure ");
        return -1;
    }
    new_task->thread_pool = thread_pool;
    new_task->args = args;
    new_task->working = working;
    new_task->next = NULL;

    TaskQueue *task_queue_head = new_task->thread_pool->task_queue_head;
    TaskAddNew(new_task);

#ifdef INFO_FLAG
    pthread_t tid = syscall(SYS_gettid);
    char info_buf[INFO_MAX];
    sprintf(info_buf, "task_manager : %ld, add task id : %d\n", 
                      tid, new_task->task_id);
    WriteInfo(&new_task->thread_pool->info, info_buf);
#endif

    // again require mutex lock for reduce thread status change
    pthread_mutex_lock(&task_queue_head->mutex);
    if (task_queue_head->number) {
        pthread_cond_signal(&task_queue_head->cond);
    }
    pthread_mutex_unlock(&task_queue_head->mutex);
    
    return 0;
}

// need lock PthreadNode and TaskQueue at call
void TaskGetNew(PthreadNode *pthread_node) {
    TaskQueue *task_queue_head = pthread_node->thread_pool->task_queue_head;
    pthread_node->work = task_queue_head->head;
    // get the first task and modify self thread attribute
    if (task_queue_head->number == 1) {
        task_queue_head->head = task_queue_head->rear = NULL;
    } else {
        task_queue_head->head = task_queue_head->head->next;
    }
    task_queue_head->number--;
    pthread_node->work->next = NULL;
}

void TaskAddNew(TaskNode *new_task) {
    /*
        initial the attribute of the task.
        because this task havn't add to system, so no need lock the mutex.
    */
    TaskQueue *task_queue_head = new_task->thread_pool->task_queue_head;
    // add the set task node to task link
    pthread_mutex_lock(&task_queue_head->mutex);
    new_task->task_id = ++task_queue_head->task_id;
    // find the tail of the task link and add the new one to tail
    if(!task_queue_head->number) {
        task_queue_head->head = task_queue_head->rear = new_task;
    } else {
        task_queue_head->rear->next = new_task;
        task_queue_head->rear = new_task;
    }
    task_queue_head->number++;
    pthread_mutex_unlock(&task_queue_head->mutex);
}

// delete pthread node from pthread queue but don't destory
void PthreadQueueUnlink(PthreadQueue* pthread_queue, 
                        PthreadNode* pthread_node) {
    // no task need to exec, add self to idle queue and del from busy queue
    // self is the last execute thread
    pthread_mutex_lock(&pthread_queue->mutex);
    if(pthread_queue->head == pthread_node
    && pthread_queue->rear == pthread_node) {
        pthread_queue->head = pthread_queue->rear = NULL;
    } else if(pthread_queue->head == pthread_node
           && pthread_queue->rear != pthread_node) {
        // the first one thread in pthread queue
        pthread_queue->head = pthread_queue->head->next;
        pthread_queue->head->prev = NULL;
    } else if(pthread_queue->head != pthread_node
           && pthread_queue->rear == pthread_node) {
        // the last one thread in pthread queue
        pthread_queue->rear = pthread_queue->rear->prev;
        pthread_queue->rear->next = NULL;
    } else {
        // middle one
        pthread_node->next->prev = pthread_node->prev;
        pthread_node->prev->next = pthread_node->next;
    }
    pthread_queue->number--;
    pthread_node->next = pthread_node->prev = NULL;
    pthread_mutex_unlock(&pthread_queue->mutex);
}

// add PthreadNode to PthreadQueue head
void PthreadQueueAdd(PthreadQueue *pthread_queue,
                     PthreadNode* pthread_node) {
    // now the idle queue is empty
    pthread_mutex_lock(&pthread_queue->mutex);
    if(pthread_queue->head == NULL &&
       pthread_queue->rear == NULL) {
        pthread_queue->head = pthread_queue->rear = pthread_node;
        pthread_node->next = pthread_node->prev = NULL;
    } else {
        pthread_queue->rear->next = pthread_node;
        pthread_node->next = NULL;
        pthread_node->prev = pthread_queue->rear;
        pthread_queue->rear = pthread_node;
    }
    pthread_queue->number++;
    pthread_mutex_unlock(&pthread_queue->mutex);
}
/*
    child_work : the code exec in child thread
    ptr : the ds of thread_node of current thread.
    return : nothing. void* just avoid warning.
*/
void* ChildWork(void *args) {
    // restore the parameters
    PthreadNode *self = (PthreadNode*)args;
    ThreadPool *thread_pool = self->thread_pool;
    // write log
#ifdef INFO_FLAG
    char info_buf[INFO_MAX];
    sprintf(info_buf, "%ld thread prepare\n", self->tid);
    WriteInfo(&thread_pool->info, info_buf);
#endif
    
    while(1) {
        // if no task exec, blocked
        pthread_mutex_lock(&self->mutex);
        if(self->work == NULL) {
            pthread_cond_wait(&self->cond, &self->mutex);
        }
        pthread_mutex_unlock(&self->mutex);
        
#ifdef INFO_FLAG
        sprintf(info_buf, "%ld thread working, task id is %d\n", 
                                self->tid, self->work->task_id);
        WriteInfo(&thread_pool->info, info_buf);
#endif
        // execute the real work and destroy
        self->work->working(self->work->args);
        // destroy the task space
        free(self->work->args);
        args = NULL;
        self->work->working = NULL;
        self->work->next = NULL;
        free(self->work);
        self->work = NULL;
        /*
            get new task from the task_link if not NULL.
            there are no idle thread if there are task to do.
            if there are not task, mask self idle and add to the idle queue.
        */
        pthread_mutex_lock(&thread_pool->task_queue_head->mutex);
        if(thread_pool->task_queue_head->number > 0) {
            TaskGetNew(self);
            pthread_mutex_unlock(&thread_pool->task_queue_head->mutex);
            continue;
        } else {

            pthread_mutex_unlock(&thread_pool->task_queue_head->mutex);
            
            PthreadQueueUnlink(thread_pool->pthread_queue_busy, self);
            PthreadQueueAdd(thread_pool->pthread_queue_idle, self);

            pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
            pthread_cond_signal(&thread_pool->pthread_queue_idle->cond);
            pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex);            
        }
    }
    return NULL;
}

/*
    create thread pool when the system on, and thread number is THREAD_DEF_NUM.
    when niit, initial all the thread into a double link queue and all wait for self->cond.
*/
void CreatePthreadPool
(ThreadPool *thread_pool, const int count) {
    // init as a double link queue
    int i;
    PthreadNode *temp, *head = NULL, *prev = NULL;
    for(i = 0; i < count; ++i) {
        temp = (PthreadNode*)malloc(sizeof(PthreadNode));
        if (head == NULL) head = temp;
        if(temp == NULL) {
            perror("malloc failure ");
            exit(1);
        }
    
        temp->thread_pool = thread_pool;
        temp->work = NULL;
        pthread_cond_init(&temp->cond, NULL);
        pthread_mutex_init(&temp->mutex, NULL);

        temp->prev = prev;
        if(prev != NULL) {
            prev->next = temp;
        }
        
        // create this thread
        pthread_create(&temp->tid, NULL, ChildWork, (void*)temp);
        prev = temp;
    }
    temp->next = NULL;
    
    // modify the idle thread queue attribute
    pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
    thread_pool->pthread_current_count += count;
    thread_pool->pthread_queue_idle->number += count;
    if (thread_pool->pthread_queue_idle->head == NULL) {
        thread_pool->pthread_queue_idle->head = head;
        thread_pool->pthread_queue_idle->rear = temp;
    } else {
        thread_pool->pthread_queue_idle->rear->next = head;
    }
    pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex);
}

// init_system : init the system glob pointor.
void InitSystem(ThreadPool *thread_pool) {
#ifdef INFO_FLAG
    pthread_mutex_init(&thread_pool->info.mutex_info, NULL);

    gettimeofday(&thread_pool->info.time_begin, NULL);
    OpenInfo(&thread_pool->info, "infomation.txt");
#endif
    thread_pool->pthread_current_count = 0;

    // init the pthread_queue_idle
    thread_pool->pthread_queue_idle = (PthreadQueue*)malloc(sizeof(PthreadQueue));
    thread_pool->pthread_queue_idle->number = 0;
    thread_pool->pthread_queue_idle->head = NULL;
    thread_pool->pthread_queue_idle->rear = NULL;
    pthread_mutex_init(&thread_pool->pthread_queue_idle->mutex, NULL);
    pthread_cond_init(&thread_pool->pthread_queue_idle->cond, NULL);

    // init the pthread_queue_busy
    thread_pool->pthread_queue_busy = (PthreadQueue*)malloc(sizeof(PthreadQueue));
    thread_pool->pthread_queue_busy->number = 0;
    thread_pool->pthread_queue_busy->head = NULL;
    thread_pool->pthread_queue_busy->rear = NULL;
    pthread_mutex_init(&thread_pool->pthread_queue_busy->mutex, NULL);
    pthread_cond_init(&thread_pool->pthread_queue_busy->cond, NULL);

    // init the task_queue_head
    thread_pool->task_queue_head = (TaskQueue*)malloc(sizeof(TaskQueue));
    thread_pool->task_queue_head->head = NULL;
    thread_pool->task_queue_head->number = 0;
    thread_pool->task_queue_head->task_id = 0;
    pthread_mutex_init(&thread_pool->task_queue_head->mutex, NULL);

    pthread_cond_init(&thread_pool->task_queue_head->cond, NULL);

    // create thread pool
    CreatePthreadPool(thread_pool, THREAD_DEF_NUM);

    // create thread of manage
    pthread_t thread_manage_id;
    pthread_create(&thread_manage_id, NULL, PthreadManager, (void*)thread_pool);

    // create thread of manage
    pthread_t thread_monitor_id;
    pthread_create(&thread_monitor_id, NULL, Monitor, (void*)thread_pool);
}

/*
    PthreadManage : allocate tasks to idle threads.
            block on pthread_queue_idle->cond when no idle thread
            block on task_queue_head->cond when no task come.
    ptr : no used, in order to avoid warning.
    return : nothing.
*/
void* PthreadManager(void *args)
{
    ThreadPool *thread_pool = (ThreadPool*)args;
#ifdef INFO_FLAG
    pthread_t tid = syscall(SYS_gettid);
    char info_buf[INFO_MAX];
    sprintf(info_buf, "PthreadManager : %ld\n", tid);
    WriteInfo(&thread_pool->info, info_buf);
#endif

    while(1)
    {
        PthreadNode *temp_thread = NULL;
        
        // get a new idle thread, and modify the idle_queue.
        // if no idle thread, block on pthread_queue_idle->cond.
        pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
        if(thread_pool->pthread_queue_idle->number == 0) {
            pthread_cond_wait(&thread_pool->pthread_queue_idle->cond, 
                              &thread_pool->pthread_queue_idle->mutex);
        }
        temp_thread = thread_pool->pthread_queue_idle->head;
        pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex);

        // get a new task, and modify the task_queue.
        // if no task block on task_queue_head->cond.
        pthread_mutex_lock(&thread_pool->task_queue_head->mutex);
        while(thread_pool->task_queue_head->number == 0) {
            pthread_cond_wait(&thread_pool->task_queue_head->cond, 
                              &thread_pool->task_queue_head->mutex);
        }
        TaskGetNew(temp_thread);
        pthread_mutex_unlock(&thread_pool->task_queue_head->mutex);

        // modify the idle thread attribute.
        // PthreadNode of pthread_queue_idle only allow access to PthreadManage
        // so not have to lock temp_thread
        PthreadQueueUnlink(thread_pool->pthread_queue_idle, temp_thread);
        PthreadQueueAdd(thread_pool->pthread_queue_busy, temp_thread);
#ifdef INFO_FLAG
        sprintf(info_buf, "%ld manager thread : %ld tid, %d work_id\n", 
        tid, temp_thread->tid, temp_thread->work->task_id);
        WriteInfo(&thread_pool->info, info_buf);
#endif
        // signal the child thread to exec the work
        pthread_mutex_lock(&temp_thread->mutex);
        pthread_cond_signal(&temp_thread->cond);
        pthread_mutex_unlock(&temp_thread->mutex);
    }
}

/*
    monitor : get the system info
    ptr : not used, only to avoid warning for pthread_create
    return : nothing
*/
void* Monitor(void *args)
{
    ThreadPool *thread_pool = (ThreadPool*)args;
#ifdef INFO_FLAG
    char info_buf[INFO_MAX];
    sprintf(info_buf, "monitor : %ld\n", syscall(SYS_gettid));
    WriteInfo(&thread_pool->info, info_buf);
#endif

    int idle_num, busy_num, task_num, task_id;
    while(1) {
        idle_num = thread_pool->pthread_queue_idle->number;
        busy_num = thread_pool->pthread_queue_busy->number;
        task_num = thread_pool->task_queue_head->number;
        task_id = thread_pool->task_queue_head->task_id;       
        double usage_rate = (1.0 * busy_num) / (thread_pool->pthread_current_count);
#ifdef INFO_FLAG
        sprintf(info_buf, "monitor : %ld\n", syscall(SYS_gettid));
        sprintf(info_buf, "%s********************************\n", info_buf);
        sprintf(info_buf, "%sidle number : %d\n", info_buf, idle_num);
        sprintf(info_buf, "%sbusy number : %d\n", info_buf, busy_num);
        sprintf(info_buf, "%sthread sum : %d\n", info_buf, thread_pool->pthread_current_count);
        sprintf(info_buf, "%susage rate : %lf\n", info_buf, usage_rate);
        sprintf(info_buf, "%stask number : %d\n", info_buf, task_num);
        sprintf(info_buf, "%stask id : %d\n", info_buf, task_id);
        sprintf(info_buf, "%s ******************************\n\n", info_buf);
        WriteInfo(&thread_pool->info, info_buf);
#endif

        // dynamic manage thread count
        if (usage_rate >= THREAD_ADD_POINT && 
            thread_pool->pthread_current_count < THREAD_MAX_NUM) {
            int count = thread_pool->pthread_current_count * THREAD_ADD_RATE;
            if(thread_pool->pthread_current_count + count > THREAD_MAX_NUM) 
                count = THREAD_MAX_NUM - thread_pool->pthread_current_count;
#ifdef INFO_FLAG
            sprintf(info_buf, "monitor : %ld, create pthread pool %d\n", 
                              syscall(SYS_gettid), count);
            WriteInfo(&thread_pool->info, info_buf);
#endif
            CreatePthreadPool(thread_pool, count);

            pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
            pthread_cond_signal(&thread_pool->pthread_queue_idle->cond);
            pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex);
        } else if (usage_rate <= THREAD_DEL_POINT && 
                   thread_pool->pthread_current_count > THREAD_MIN_NUM) {
            int count = thread_pool->pthread_current_count * THREAD_DEL_RATE;
            if(thread_pool->pthread_current_count - count < THREAD_MIN_NUM) 
                count = thread_pool->pthread_current_count - THREAD_MIN_NUM;
#ifdef INFO_FLAG
            sprintf(info_buf, "monitor : %ld, thread idle delete %d\n", 
                               syscall(SYS_gettid), count);
            WriteInfo(&thread_pool->info, info_buf);
#endif            
            ThreadIdleDel(thread_pool, count);
        }

        usleep(500000);
    }
    return NULL;
}

void ThreadIdleDel(ThreadPool *thread_pool, const int count) {
    PthreadNode *thread_del, *thread_tmp = NULL;
    int i, end = thread_pool->pthread_queue_idle->number - count;
    pthread_mutex_lock(&thread_pool->pthread_queue_idle->mutex);
    thread_del = thread_pool->pthread_queue_idle->head;
    for (i = 0; i < end; i++) {
        thread_del = thread_del->next;
    }
    thread_pool->pthread_queue_idle->rear = thread_del->prev;
    thread_del->prev->next = NULL;
    thread_del->prev = NULL;
    thread_pool->pthread_current_count -= count;
    thread_pool->pthread_queue_idle->number -= count;
    pthread_mutex_unlock(&thread_pool->pthread_queue_idle->mutex); 
    
    for (i = 0; i < count; i++) {
        thread_tmp = thread_del;
        thread_del = thread_del->next;

        pthread_cancel(thread_tmp->tid);
        pthread_join(thread_tmp->tid, NULL);
        pthread_mutex_destroy(&thread_tmp->mutex);
        pthread_cond_destroy(&thread_tmp->cond);
        free(thread_tmp);
        thread_tmp = NULL;
    }
    
}

test_project.c

#include "lib/pthread_pool.h"

#include <stdio.h>
#include <string.h>

#include <arpa/inet.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <unistd.h>

int SockCreate()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd == -1) {
        perror("socket error ");
    }
    return sockfd;
}

int SockConnect(int fd, struct sockaddr_in *addr) {
    int res = connect(fd, (struct sockaddr*)addr, 
                      sizeof(struct sockaddr_in));
    if(res < 0) {
        perror("connect failure ");
    }
    return res;
}

void *SendInfo(void *args) {
    struct sockaddr_in *addr = (struct sockaddr_in*)args;

    int sockfd = SockCreate();
    if (sockfd == -1) return NULL;
    if (SockConnect(sockfd, addr) < 0) return NULL;

    char buffer[1024] = "hello world!\n";
    send(sockfd, buffer, strlen(buffer), 0);
    close(sockfd);
    return NULL;
}

int main(int argc, char **argv)
{
    if (argc < 3) {
        printf("./sortware ip port\n");
        exit(errno);
    }

    const char *ip = argv[1];
    int port = atoi(argv[2]);

    ThreadPool thread_pool;
    InitSystem(&thread_pool);
    
    int i, size = sizeof(struct sockaddr_in);
    while(1) {
        for(i = 0; i < 100; i++) {
            struct sockaddr_in *addr = (struct sockaddr_in*)malloc(size);
            addr->sin_family = AF_INET;
            addr->sin_addr.s_addr = inet_addr(ip);
            addr->sin_port = htons(port);
            ThreadPoolRequireTask(&thread_pool, (void*)addr, SendInfo);
        }
        char in = getchar();
        if(in == 'q') break;
    }
    return 0;
}
上一篇:C++服务器开发精髓笔记


下一篇:django过滤器大全