基于POISX线程的线程池实现

一、线程池的功能

  减少线程的创建和销毁的内存资源消耗。

二、线程池基础结构

  • 消息队列  
  • 任务执行队列
  • 管理组件

2.1 简易数据结构

2.1.1 消息队列

  使用双向链表实现队列结构,节点消息包括执行的消息函数以及函数参数数据。

1 struct nTask {
2     void (*task_func)(struct nTask *task);
3     void *user_data;
4 
5     struct nTask *prev;
6     struct nTask *next;
7 };
2.1.2 任务执行队列

  使用双向链表实现队列结构,节点信息包括线程id、线程存活状态、所属管理组件。

struct nWorker {
    pthread_t threadid;
    int terminate;
    struct nManager *manager;

    struct nWorker *prev;
    struct nWorker *next;
};
2.1.3 管理组件

  管理组件管理消息队列、任务执行队列。同时,使用锁和条件变量完成对共享数据的保护操作。

typedef struct nManager {
    struct nTask *tasks;
    struct nWorker *workers;

    pthread_mutex_t mutex;
    pthread_cond_t cond;
} ThreadPool;

2.2 底层数据结构接口实现

2.2.1 向队列头插入节点
#define LIST_INSERT(item, list) do {    \
    item->prev = NULL;                    \
    item->next = list;                    \
    if ((list) != NULL) (list)->prev = item; \
    (list) = item;                        \
} while(0)
2.2.2 删除队列中指定节点
#define LIST_REMOVE(item, list) do {    \
    if (item->prev != NULL) item->prev->next = item->next; \
    if (item->next != NULL) item->next->prev = item->prev; \
    if (list == item) list = item->next;                     \
    item->prev = item->next = NULL;    \
} while(0)

2.3 线程任务定义

  线程池线程执行相同的线程回调函数,在回调函数内,获取消息队列中待处理的消息并执行。

// callback != task
static void *nThreadPoolCallback(void *arg) {

    struct nWorker *worker = (struct nWorker*)arg;

    while (1) {

        pthread_mutex_lock(&worker->manager->mutex);
        while (worker->manager->tasks == NULL) {
            if (worker->terminate) break;
            pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex);
        }
        if (worker->terminate) {
            pthread_mutex_unlock(&worker->manager->mutex);
            break;
        }

        struct nTask *task = worker->manager->tasks;
        LIST_REMOVE(task, worker->manager->tasks);

        pthread_mutex_unlock(&worker->manager->mutex);

        task->task_func(task);
    }

    free(worker);
    
}

  对共享数据消息队列的操作要在锁中进行。执行任务在锁外进行使锁尽可能小。

三、业务接口实现

3.1 线程池创建函数

  此函数参数为线程池对象指针和最大线程数。

  互斥锁和条件变量的初始化有静态初始化和动态初始化两种。此处用静态初始化的互斥锁和条件变量和memset来初始化线程池的互斥锁和条件变量。

  在进行内存申请时要判断内存申请是否成功。

// API
int nThreadPoolCreate(ThreadPool *pool, int numWorkers) {

    if (pool == NULL) return -1;
    if (numWorkers < 1) numWorkers = 1;
    memset(pool, 0, sizeof(ThreadPool));

    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));

    //pthread_mutex_init(&pool->mutex, NULL);
    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
    

    int i = 0;
    for (i = 0;i < numWorkers;i ++) {
        struct nWorker *worker = (struct nWorker*)malloc(sizeof(struct nWorker));
        if (worker == NULL) {
            perror("malloc");
            return -2;
        }
        memset(worker, 0, sizeof(struct nWorker));
        worker->manager = pool; //

        int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);
        if (ret) {
            perror("pthread_create");
            free(worker);
            return -3;
        }
        
        LIST_INSERT(worker, pool->workers);
    }

    // success
    return 0; 

}

3.2 线程池销毁函数

  线程池销毁主要工作是将线程的存活状态设置为销毁,并广播条件变量通知线程回调函数销毁线程。条件变量要配合锁使用。

// API
int nThreadPoolDestory(ThreadPool *pool, int nWorker) {

    struct nWorker *worker = NULL;

    for (worker = pool->workers;worker != NULL;worker = worker->next) {
        worker->terminate;
    }

    pthread_mutex_lock(&pool->mutex);

    pthread_cond_broadcast(&pool->cond);

    pthread_mutex_unlock(&pool->mutex);

    pool->workers = NULL;
    pool->tasks = NULL;

    return 0;
    
}

3.3 消息添加函数

  此函数操作共享数据消息队列,所以需要加锁操作。pthread_cond_signal()函数作用是随机唤醒一个等待条件变量的线程。

// API
int nThreadPoolPushTask(ThreadPool *pool, struct nTask *task) {

    pthread_mutex_lock(&pool->mutex);

    LIST_INSERT(task, pool->tasks);

    pthread_cond_signal(&pool->cond);

    pthread_mutex_unlock(&pool->mutex);

}

四、测试样例

 

#if 1

#define THREADPOOL_INIT_COUNT    20
#define TASK_INIT_SIZE            1000


void task_entry(struct nTask *task) { //type 

    //struct nTask *task = (struct nTask*)task;
    int idx = *(int *)task->user_data;

    printf("idx: %d\n", idx);

    free(task->user_data);
    free(task);
}


int main(void) {

    ThreadPool pool = {0};
    
    nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
    // pool --> memset();
    
    int i = 0;
    for (i = 0;i < TASK_INIT_SIZE;i ++) {
        struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask));
        if (task == NULL) {
            perror("malloc");
            exit(1);
        }
        memset(task, 0, sizeof(struct nTask));

        task->task_func = task_entry;
        task->user_data = malloc(sizeof(int));
        *(int*)task->user_data  = i;

        
        nThreadPoolPushTask(&pool, task);
    }

    getchar();
    
}


#endif

 

上一篇:C++ 线程池


下一篇:进程原理及系统调用