muduo 库解析之十一:Thread

线程

创建线程

#include <pthread.h>

int pthread_create(pthread_t *restrict thread,const pthread_attr_t *restrict attr,void *(*start_routine)(void*), void *restrict arg);
  • 成功返回0,失败返回错误号。pthread 库的函数都是通过返回值返回错误号。
  • 在一个线程中调用 pthread_create() 创建新的线程后,当前线程从 pthread_create() 返回继续往下执行,而新的线程所执行的代码由我们传给pthread_create的函数指针start_routine决定。
  • start_routine函数接收一个参数,是通过pthread_createarg参数传递给它的,该参数的类型为void *,这个指针按什么类型解释由调用者自己定义。
  • start_routine的返回值类型也是void *,这个指针的含义同样由调用者自己定义。start_routine返回时,这个线程就退出了,其它线程可以调用pthread_join得到start_routine的返回值。
  • attr参数表示线程属性,传NULLattr参数,表示线程属性取缺省值

终止线程

如果需要只终止某个线程而不终止整个进程,可以有三种方法:

  • 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit
  • 一个线程可以调用pthread_cancel终止同一进程中的另一个线程。用pthread_cancel终止一个线程分同步和异步两种情况,比较复杂。
  • 线程可以调用pthread_exit终止自己。
#include <pthread.h>

void pthread_exit(void *value_ptr);
  • value_ptrvoid *类型,和线程函数返回值的用法一样,其它线程可以调用pthread_join获得这个指针。需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。

join 和 detach

#include <pthread.h>

int pthread_join(pthread_t thread, void **value_ptr);
  • 成功返回0,失败返回错误号。
  • 调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:
    • 如果thread线程通过return返回,value_ptr所指向的单元里存放的是thread线程函数的返回值。
    • 如果thread线程被别的线程调用pthread_cancel异常终止掉,value_ptr所指向的单元里存放的是常数PTHREAD_CANCELED
    • 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
    • 如果对thread线程的终止状态不感兴趣,可以传NULLvalue_ptr参数。
#include <pthread.h>

int pthread_detach(pthread_t tid);
  • 线程也可以被置为detach状态,这样的线程一旦终止就立刻回收它占用的所有资源,而不保留终止状态。
  • 不能对一个已经处于 detach 状态的线程调用pthread_join,这样的调用将返回EINVAL。对一个尚未detach的线程调用pthread_joinpthread_detach都可以把该线程置为detach状态,也就是说,不能对同一线程调用两次pthread_join,或者如果已经对一个线程调用了pthread_detach就不能再调用pthread_join了。
  • 成功返回0,失败返回错误号。

pthread_atfork

#include <pthread.h>

int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));

pthread_atfork()fork() 之前调用,当调用 fork 时,内部创建子进程前在父进程中会调用 prepare,内部创建子进程成功后,父进程会调用 parent,子进程会调用 child

源码

Thread.h

#pragma once
#include <pthread.h>
#include <functional>
#include <memory>
#include <string>

#include "NonCopyable.h"
#include "CountDownLatch.h"
#include "Atomic.h"
#include "CurrentThread.h"

namespace muduo
{
    class Thread : public NonCopyable
    {
    public:
        typedef std::function<void()> ThreadFunc;
        explicit Thread(ThreadFunc func, const std::string &name = std::string());
        ~Thread();

        void start();
        int join();

        bool started() const { return started_; }
        pid_t tid() const { return pid_; }
        const std::string &name() { return name_; }

        static int num_created() { return num_creadted_.get(); }

    private:
        void set_default_name();

    private:
        bool started_;
        bool joined_;
        pthread_t pthread_id_;
        pid_t pid_;
        ThreadFunc func_;
        std::string name_;
        CountDownLatch latch_;

        static AtomicInt32 num_creadted_;
    };
}

Thread.cc

#include "Thread.h"

#include <sys/prctl.h>

#include "Exception.h"

namespace muduo
{
    namespace detail
    {
        void after_fork()
        {
            CurrentThread::t_cached_tid = 0;
            CurrentThread::t_thread_name = "main";
            CurrentThread::tid();
        }

        struct ThreadNameInitializer
        {
            ThreadNameInitializer()
            {
                CurrentThread::t_thread_name = "main";
                CurrentThread::tid();
                pthread_atfork(NULL, NULL, &after_fork);
            }
        };

        ThreadNameInitializer init;

        struct ThreadData
        {
            typedef Thread::ThreadFunc ThrFunc;
            ThrFunc func;
            std::string name;
            pid_t *pid;
            CountDownLatch *latch;

            ThreadData(ThrFunc function, const std::string &thr_name, pid_t *process_id, CountDownLatch *count_down_latch) : func(function), name(thr_name), pid(process_id), latch(count_down_latch)
            {
            }

            void run_thread()
            {
                *pid = muduo::CurrentThread::tid();
                pid = NULL;
                latch->count_down();
                latch = NULL;

                CurrentThread::t_thread_name = name.empty() ? "unknown" : name.c_str();
                ::prctl(PR_SET_NAME, CurrentThread::t_thread_name);
                try
                {
                    func();
                    CurrentThread::t_thread_name = "finished";
                }
                catch (const Exception &ex)
                {
                    CurrentThread::t_thread_name = "crashed";
                    fprintf(stderr, "exception caught in Thread: %s\n", name.c_str());
                    fprintf(stderr, "reason: %s\n", ex.what());
                    fprintf(stderr, "stack trace: %s\n", ex.stack());
                    abort();
                }
                catch (const std::exception &ex)
                {
                    CurrentThread::t_thread_name = "crashed";
                    fprintf(stderr, "exception caught in Thread: %s\n", name.c_str());
                    fprintf(stderr, "reason: %s\n", ex.what());
                    abort();
                }
                catch (...)
                {
                    CurrentThread::t_thread_name = "crashed";
                    fprintf(stderr, "exception caught in Thread: %s\n", name.c_str());
                    throw; //@ rethrow
                }
            }
        };

        //@ global function
        void *start_thread(void *obj)
        {
            ThreadData *data = static_cast<ThreadData *>(obj);
            data->run_thread();
            delete data;
            return NULL;
        }
    } //@ namespace detail

    AtomicInt32 Thread::num_creadted_; //@ defination

    Thread::Thread(ThreadFunc func, const std::string &name) : started_(false), joined_(false), pthread_id_(0), pid_(0), func_(std::move(func)), name_(name), latch_(1)
    {
        set_default_name();
    }

    Thread::~Thread()
    {
        if (started_ && !joined_)
            pthread_detach(pthread_id_);
    }

    void Thread::set_default_name()
    {
        int num = num_creadted_.increment_and_get();
        if (name_.empty())
        {
            char buf[32];
            snprintf(buf, sizeof(buf), "Thread%d", num);
            name_ = buf;
        }
    }

    void Thread::start()
    {
        assert(!started_);
        started_ = true;
        detail::ThreadData *data = new detail::ThreadData(func_, name_, &pid_, &latch_);
        if (pthread_create(&pthread_id_, NULL, &detail::start_thread, data))
        {
            started_ = false;
            delete data;
        }
        else
        {
            latch_.wait();
            assert(pid_ > 0);
        }
    }

    int Thread::join()
    {
        assert(started_);
        assert(!joined_);
        joined_ = true;
        return pthread_join(pthread_id_, NULL);
    }
}
上一篇:Spring MVC入门实例


下一篇:Exchange2010 控制台提示您的权限不足,无法查看此数据