<Linux> 线程池

一、线程池

1. 池化技术

池化技术是一种在计算机科学中广泛应用的优化技术,它的核心思想是:预先创建并维护一组资源(例如线程、连接、对象),供多个任务共享使用,以减少创建和销毁资源的开销,提高效率和性能。

  • 减少资源创建和销毁的开销: 创建和销毁资源(例如线程、数据库连接等)通常需要较高的开销,池化技术可以预先创建一组资源,避免频繁创建和销毁,从而提高性能
  • 降低内存占用: 池化技术可以减少创建和销毁资源所带来的内存开销,从而降低内存占用。
  • 是以空间换时间的策略

2. 线程池概念

线程池是一种线程使用模式。

线程过多会带来调度开销,进而影响缓存局部和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。

3. 线程池的优点

  • 线程池避免了在处理短时间任务时创建与销毁线程的代价。
  • 线程池不仅能够保证内核充分利用,还能防止过分调度。

注意: 线程池中可用线程的数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

4. 线程池的应用场景

线程池常见的应用场景如下:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。 

相关解释:

  • 像Web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。
  • 对于长时间的任务,比如Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  • 突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存到达极限,出现错误。

5. 线程池实现 

下面我们实现一个简单的线程池,线程池中提供了一个任务队列,以及若干个线程(多线程),主线程向任务队列push任务,线程池中的线程pop获取任务

  • 线程池中的多个线程负责从任务队列当中拿任务,并将拿到的任务进行处理。
  • 线程池对外提供一个Push接口,用于让外部线程能够将任务Push到任务队列当中。

线程池代码:

#pragma once
#include <vector>
#include <queue>
#include <iostream>
#include <pthread.h>

// 线程信息
struct ThreadInfo
{
    pthread_t tid;    // 线程tid
    std::string name; // 线程名字
};

// 线程池
template <class T>
class ThreadPool
{
    static const int defaultnum = 5;

public:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }
    void Unlock()   
    {
        pthread_mutex_unlock(&_mutex);
    }
    void Wakeup()
    {
        // 唤醒线程执行任务
        pthread_cond_signal(&_cond);
    }
    void ThreadSleep()
    {
        // 在条件变量下等待
        pthread_cond_wait(&_cond, &_mutex);
    }
    bool IsQueueEmpty()
    {
        return _tasks.empty();
    }
    std::string GetThreadName(pthread_t tid)
    {
        // 获取线程name
        for (const auto &e : _threads)
        {
            if (e.tid == tid)
                return e.name;
        }
        return "None";
    }

public:
    ThreadPool(int num = defaultnum)
        : _threads(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    static void *HandlerTask(void *args)
    {
        // 获取this指针
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        // 获取线程名字,方便后续打印观察
        std::string name = tp->GetThreadName(pthread_self());
        while (true)
        {
            // 1. 加锁
            tp->Lock();

            // 2. 判断是否等待排队(while 防止伪唤醒)
            while (tp->IsQueueEmpty())
            {
                tp->ThreadSleep();
            }
            // 3. 获取任务
            T t = tp->pop();
            // 4. 解锁
            tp->Unlock();

            // 5. 运行任务
            t();
            std::cout << name << " run, result: " << t.GetResult() << std::endl;
        }
        sleep(1);
    }

    // 线程池启动,创建线程,执行任务
    void start()
    {
        int num = _threads.size();
        for (int i = 0; i < num; i++)
        {
            _threads[i].name = "thread-" + std::to_string(i + 1);
            pthread_create(&(_threads[i].tid), nullptr, HandlerTask, this);
            // 传递this指针
            // 1. 如果HandlerTask是普通成员函数,形参抹油有this指针,那么线程创建时还需要额外传递this指针
            // 2. 将HandlerTask改为静态成员函数,可是函数内部需要使用成员函数和成员变量,所以crete时的参数应传递this指针
            // 3. 所以将Lock等函数访问权限设为public
        }
    }

    // 外部push任务
    void push(const T &t)
    {
        // 1. 上锁
        Lock();
        // 2. push
        _tasks.push(t);
        // 3. 唤醒线程执行任务
        Wakeup();
        // 4. 解锁
        Unlock();
    }

    // 在HandlerTask内部调用它时已经加锁了
    T pop()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }

    ~ThreadPool()
    {
        // 销毁锁、条件变量
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    std::vector<ThreadInfo> _threads; // 多线程容器,记录各个线程信息
    std::queue<T> _tasks;             // 线程从任务队列拿任务

    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _cond;   // 条件变量
};

相关细节:

成员变量的设计

  • 任务队列由STL的queue实现,没有限定大小,但是线程池中的线程数量有限制,构造函数处默认为defaultnum
  • 线程池的构造和析构就是对互斥锁和条件变量的初始化和销毁工作,其他的自定义类型成员变量例如_threads、_tasks会调用各自默认的析构函数
  • 还需要有一个互斥锁和条件变量

为什么线程池中需要有互斥锁和条件变量?

  • 线程池需要有互斥锁和条件变量
  • 互斥锁因为线程池中的任务队列是会被多个执行流并发访问的共享资源,因此我们需要引入互斥锁对任务队列进行保护,同一时刻只能由一个执行流对任务队列进行push或pop操作
  • 条件变量是因为线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此线程池当中的线程在拿任务之前,需要先判断任务队列当中是否有任务,若此时任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此我们需要引入条件变量。

为什么线程池中的线程执行例程 HandlerTask 需要设置为静态方法?

  • 线程池需要由start函数(线程池启动,创建线程,执行任务),该函数用来创建多个线程,并为创建的线程传入HandlerTask 执行例程,但是需要注意,成员函数默认第一个参数为类类型的this指针,所以实际上HandlerTask 有两个参数:类类型指针、void*参数,而pthread_create只能向HandlerTask 传递一个void*参数,所以我们需要将 HandlerTask 函数改为static静态成员函数,因为静态成员函数属于类,而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针,此时Routine函数才真正只有一个参数类型为void*的参数。
  • 将 HandlerTask 改为静态成员函数又会引来一个问题,我们需要在 HandlerTask 使用互斥锁、条件变量、pop函数等成员,但是 HandlerTask 静态成员函数没有this指针,无法访问成员变量和成员函数,所以我们需要在start函数的pthread_create处为HandlerTask传递当前对象的 this 指针!此时我们就能够通过该this指针在HandlerTask 函数内部调用非静态成员函数了。
  • 此外 HandlerTask 内部的逻辑是先对任务队列加锁、判断是否等待排队(while 防止伪唤醒)、获取任务、解锁、运行任务

任务类型的设计

  • 我们将线程池进行了模板化,因此线程池当中存储的任务类型可以是任意的,但无论该任务是什么类型的,在该任务类当中都必须包含一个Run方法,当我们处理该类型的任务时只需调用该Run方法即可。

 

#include <string>

std::string opers = "+-*/%";

enum
{
    DivZero = 1,
    ModZero,
    Unknown
};

template <class T>
class Task
{
public:
    Task()
    {}
    
    Task(int x, int y, char op)
        : _data1(x), _data2(y), _op(op), _result(0), _exitcode(0)
    {}

    // run起来
    void run()
    {
        switch (_op)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
            {
                if (_data2 == 0) _exitcode = DivZero;
                else _result = _data1 / _data2;
            } 
            break;
        case '%':
            {
                if (_data2 == 0)
                    _exitcode = ModZero;
                else
                    _result = _data1 % _data2;
            }
            break;
        default:
            _exitcode = Unknown;
            break;
        }
    }

    void operator()()
    {
        run();
    }

    std::string GetResult()
    {
        std::string result = std::to_string(_data1) + _op + std::to_string(_data2) + '=' + std::to_string(_result);
        result += "[code: " + std::to_string(_exitcode) + ']';
        return result;
    }

    std::string GetTask()
    {
        std::string result = std::to_string(_data1) + _op + std::to_string(_data2) + "= ?";
        return result;
    }
    
private:
    T _data1;
    T _data2;
    char _op;

    T _result;
    int _exitcode;
};

主线程逻辑 

主线程就负责不断向任务队列当中Push任务就行了,此后线程池当中的线程会从任务队列当中获取到这些任务并进行处理。

#include <iostream>
#include <ctime>
#include <unistd.h>
#include "ThreadPool.hpp"
#include "Task.hpp"

int main()
{
    srand(time(nullptr));

    ThreadPool<Task<int>>* tp = new ThreadPool<Task<int>>(5);
    tp->start();
    // 因为此时已经创建线程,开始执行HandlerTask,所以五个线程申请锁,然后在条件变量下等待(释放锁),排队顺序为1 2 3 4 5
    // 当push任务后,1被唤醒,执行任务,再次申请锁,因为push很慢,所以又在条件变量下释放锁等待,排队顺序为2 3 4 5 1
    while (true)
    {
        // 1. 构建任务
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 5;
        char op = opers[rand() % opers.size()];
        Task<int> t(data1, data2, op);
        
        // 2. 交给线程池处理
        tp->push(t);
        std::cout << "main thread make task: " << t.GetTask() << std::endl;
        sleep(1);
    }
    return 0;
}

 

我们会发现这五个线程在处理时会呈现出一定的顺序性,因为主线程在 tp->start() 调用后此时已经创建线程,各线程开始执行 HandlerTask,所以五个线程申请锁,因为创建时是有顺序的,所以最初线程对锁的申请也具有顺序性,然后因为任务队列没有任务,而都在条件变量下等待(释放锁),排队顺序为1 2 3 4 5。

当push任务后,1被唤醒,执行任务,再次申请锁,因为push很慢,所以又在条件变量下释放锁等待,排队顺序为2 3 4 5 1 ,因此这五个线程在处理任务时会呈现出一定的顺序性。

注意: 此后我们如果想让线程池处理其他不同的任务请求时,我们只需要提供一个任务类,在该任务类当中提供对应的任务处理方法就行了。

完整代码

Makefile

ThreadPool:Main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -rf mycond

Task.hpp

#include <string>

std::string opers = "+-*/%";

enum
{
    DivZero = 1,
    ModZero,
    Unknown
};

template <class T>
class Task
{
public:
    Task()
    {}
    
    Task(int x, int y, char op)
        : _data1(x), _data2(y), _op(op), _result(0), _exitcode(0)
    {}

    // run起来
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
            {
                if (_data2 == 0) _exitcode = DivZero;
                else _result = _data1 / _data2;
            } 
            break;
        case '%':
            {
                if (_data2 == 0)
                    _exitcode = ModZero;
                else
                    _result = _data1 % _data2;
            }
            break;
        default:
            _exitcode = Unknown;
            break;
        }
    }

    std::string GetResult()
    {
        std::string result = std::to_string(_data1) + _op + std::to_string(_data2) + '=' + std::to_string(_result);
        result += "[code: " + std::to_string(_exitcode) + ']';
        return result;
    }

    std::string GetTask()
    {
        std::string result = std::to_string(_data1) + _op + std::to_string(_data2) + "= ?";
        return result;
    }
    
private:
    T _data1;
    T _data2;
    char _op;

    T _result;
    int _exitcode;
};

ThreadPool.hpp

#pragma once
#include <vector>
#include <queue>
#include <iostream>
#include <pthread.h>

// 线程信息
struct ThreadInfo
{
    pthread_t tid;    // 线程tid
    std::string name; // 线程名字
};

// 线程池
template <class T>
class ThreadPool
{
    static const int defaultnum = 5;

public:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }
    void Unlock()   
    {
        pthread_mutex_unlock(&_mutex);
    }
    void Wakeup()
    {
        // 唤醒线程执行任务
        pthread_cond_signal(&_cond);
    }
    void ThreadSleep()
    {
        // 在条件变量下等待
        pthread_cond_wait(&_cond, &_mutex);
    }
    bool IsQueueEmpty()
    {
        return _tasks.empty();
    }
    std::string GetThreadName(pthread_t tid)
    {
        // 获取线程name
        for (const auto &e : _threads)
        {
            if (e.tid == tid)
                return e.name;
        }
        return "None";
    }

public:
    ThreadPool(int num = defaultnum)
        : _threads(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    static void *HandlerTask(void *args)
    {
        // 获取this指针
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        // 获取线程名字,方便后续打印观察
        std::string name = tp->GetThreadName(pthread_self());
        while (true)
        {
            // 1. 加锁
            tp->Lock();

            // 2. 判断是否等待排队(while 防止伪唤醒)
            while (tp->IsQueueEmpty())
            {
                tp->ThreadSleep();
            }
            // 3. 获取任务
            T t = tp->pop();
            // 4. 解锁
            tp->Unlock();

            // 5. 运行任务
            t();
            std::cout << name << " run, result: " << t.GetResult() << std::endl;
        }
        sleep(1);
    }

    // 线程池启动,创建线程,执行任务
    void start()
    {
        int num = _threads.size();
        for (int i = 0; i < num; i++)
        {
            _threads[i].name = "thread-" + std::to_string(i + 1);
            pthread_create(&(_threads[i].tid), nullptr, HandlerTask, this);
            // 传递this指针
            // 1. 如果HandlerTask是普通成员函数,形参抹油有this指针,那么线程创建时还需要额外传递this指针
            // 2. 将HandlerTask改为静态成员函数,可是函数内部需要使用成员函数和成员变量,所以crete时的参数应传递this指针
            // 3. 所以将Lock等函数访问权限设为public
        }
    }

    // 外部push任务
    void push(const T &t)
    {
        // 1. 上锁
        Lock();
        // 2. push
        _tasks.push(t);
        // 3. 唤醒线程执行任务
        Wakeup();
        // 4. 解锁
        Unlock();
    }

    // 在HandlerTask内部调用它时已经加锁了
    T pop()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }

    ~ThreadPool()
    {
        // 销毁锁、条件变量
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    std::vector<ThreadInfo> _threads; // 多线程容器,记录各个线程信息
    std::queue<T> _tasks;             // 线程从任务队列拿任务

    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _cond;   // 条件变量
};

Main.cc 

#include <iostream>
#include <ctime>
#include <unistd.h>
#include "ThreadPool.hpp"
#include "Task.hpp"

int main()
{
    srand(time(nullptr));

    ThreadPool<Task<int>>* tp = new ThreadPool<Task<int>>(5);
    tp->start();
    // 因为此时已经创建线程,开始执行HandlerTask,所以五个线程申请锁,然后在条件变量下等待(释放锁),排队顺序为1 2 3 4 5
    // 当push任务后,1被唤醒,执行任务,再次申请锁,因为push很慢,所以又在条件变量下释放锁等待,排队顺序为2 3 4 5 1
    while (true)
    {
        // 1. 构建任务
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 5;
        char op = opers[rand() % opers.size()];
        Task<int> t(data1, data2, op);
        
        // 2. 交给线程池处理
        tp->push(t);
        std::cout << "main thread make task: " << t.GetTask() << std::endl;
        sleep(1);
    }
    return 0;
}

二、简单封装原生线程库

类似C++的线程库,实际上C++的线程库就是对原生线程库的封装,我们也来简单封装一下原生线程库

类似C++的线程库

#pragma once

#include <iostream>
#include <pthread.h>
#include <ctime>
#include <string>
// #include <functional>
// using fun = std::function<void()>;

typedef void (*callback_t)(int);

static int num = 1;

template<class T>
class Thread
{
public:
    static void* Routine(void* args)
    {
        Thread* td = static_cast<Thread*>(args);
        td->Entery();
        return nullptr;
    } 
public:
    Thread(callback_t cb, int data)
        :_tid(0), _name(""), _start_timestamp(0), _isrunning(false), _cb(cb), _data(data)
    {}

    void Run()
    {
        _name = "thread-" + std::to_string(num++);
        _start_timestamp = time(nullptr);
        _isrunning = true;
        pthread_create(&_tid, nullptr, Routine, this);
    }

    void join()
    {
        pthread_join(_tid, nullptr);
    }

    std::string Name()
    {
        return _name;
    }

    uint64_t StartTimestamp()
    {
        return _start_timestamp;
    }

    bool IsRunning()
    {
        return _isrunning;
    }

    void Entery()
    {
        _cb(_data);
    }

    ~Thread()
    {}
private:
    pthread_t _tid;              // 线程tid
    std::string _name;           // 线程名字
    uint64_t _start_timestamp;   // 时间戳
    bool _isrunning;             // 是否运行
    callback_t _cb;              // 用户传递执行任务
    T _data;                      // 用户传递函数的参数
};

相关细节:

  • 将线程封装,我们首先要对线程进行描述,线程要有tid、线程名、是否在运行、运行多长时间等信息,可以自行添加
  • 类的构造函数只是要初始化列表初始化成员变量,线程的创建在run成员函数内部
  • 类内封装线程的create函数时,与线程池一样,我们需要将Routine线程执行函数改为静态成员函数,并在create函数向Routine传递本对象的this指针
  • 使用过C++的线程库的同学都知道,在使用时外部可以传递函数,那么我们实现时就根据 typedef 的函数指针类型,指定用户传递的函数的类型,如果函数带参,那么 Thread 类要加上模板、成员变量(用来接收用户传递的参数),然后在Entery成员函数内调用回调函数,将成员变量传进去 ,所以在Routine函数内部执行的是用户传递的函数

 在Main.cc中,如果传递的函数带参,那么需要加上模板

Main.cc

一次创建单个线程 

#include <iostream>
#include "Thread.hpp"

void Print(int cnt)
{
    for (int i = 0; i < cnt; i++)
        std::cout << "hello world!" << std::endl;
}

int main()
{
    Thread<int> t(Print, 5);
    t.Run();

    t.join();
    return 0;
}
  • 调用我们封装的库,需要手动调用Run函数、join等待线程 

 一次创建多个线程

#include <iostream>
#include <unistd.h>
#include <vector>
#include "Thread.hpp"

void Print(int cnt)
{
    for (int i = 0; i < cnt; i++)
        std::cout << "hello world!" << std::endl;
    std::cout << "一次结束" << std::endl;
}

int main()
{
    std::vector<Thread<int>> threads;
    for (int i = 0; i < 10; i++)
    {
        threads.push_back(Thread<int>(Print, 5));
    }

    for (auto& e : threads)
    {
        e.Run();
    }
    for (auto& e : threads)
    {
        e.join();
    }

    return 0;
}

上一篇:网络通信与并发编程(二)基于tcp的套接字、基于udp的套接字、粘包现象-一、套接字的工作流程


下一篇:数据结构 -- 排序算法