线程
创建线程
#include <pthread.h>
int pthread_create(pthread_t *restrict thread,const pthread_attr_t *restrict attr,void *(*start_routine)(void*), void *restrict arg);
- 成功返回0,失败返回错误号。pthread 库的函数都是通过返回值返回错误号。
- 在一个线程中调用
pthread_create()
创建新的线程后,当前线程从pthread_create()
返回继续往下执行,而新的线程所执行的代码由我们传给pthread_create
的函数指针start_routine
决定。 -
start_routine
函数接收一个参数,是通过pthread_create
的arg
参数传递给它的,该参数的类型为void *
,这个指针按什么类型解释由调用者自己定义。 -
start_routine
的返回值类型也是void *
,这个指针的含义同样由调用者自己定义。start_routine
返回时,这个线程就退出了,其它线程可以调用pthread_join
得到start_routine
的返回值。 -
attr
参数表示线程属性,传NULL
给attr
参数,表示线程属性取缺省值
终止线程
如果需要只终止某个线程而不终止整个进程,可以有三种方法:
- 从线程函数
return
。这种方法对主线程不适用,从main
函数return
相当于调用exit
。 - 一个线程可以调用
pthread_cancel
终止同一进程中的另一个线程。用pthread_cancel
终止一个线程分同步和异步两种情况,比较复杂。 - 线程可以调用
pthread_exit
终止自己。
#include <pthread.h>
void pthread_exit(void *value_ptr);
-
value_ptr
是void *
类型,和线程函数返回值的用法一样,其它线程可以调用pthread_join
获得这个指针。需要注意,pthread_exit
或者return
返回的指针所指向的内存单元必须是全局的或者是用malloc
分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。
join 和 detach
#include <pthread.h>
int pthread_join(pthread_t thread, void **value_ptr);
- 成功返回0,失败返回错误号。
- 调用该函数的线程将挂起等待,直到id为
thread
的线程终止。thread
线程以不同的方法终止,通过pthread_join
得到的终止状态是不同的,总结如下:- 如果
thread
线程通过return
返回,value_ptr
所指向的单元里存放的是thread
线程函数的返回值。 - 如果
thread
线程被别的线程调用pthread_cancel
异常终止掉,value_ptr
所指向的单元里存放的是常数PTHREAD_CANCELED
。 - 如果
thread
线程是自己调用pthread_exit
终止的,value_ptr
所指向的单元存放的是传给pthread_exit
的参数。 - 如果对
thread
线程的终止状态不感兴趣,可以传NULL
给value_ptr
参数。
- 如果
#include <pthread.h>
int pthread_detach(pthread_t tid);
- 线程也可以被置为detach状态,这样的线程一旦终止就立刻回收它占用的所有资源,而不保留终止状态。
- 不能对一个已经处于
detach
状态的线程调用pthread_join
,这样的调用将返回EINVAL
。对一个尚未detach的线程调用pthread_join
或pthread_detach
都可以把该线程置为detach状态,也就是说,不能对同一线程调用两次pthread_join
,或者如果已经对一个线程调用了pthread_detach
就不能再调用pthread_join
了。 - 成功返回0,失败返回错误号。
pthread_atfork
#include <pthread.h>
int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));
pthread_atfork()
在 fork()
之前调用,当调用 fork
时,内部创建子进程前在父进程中会调用 prepare
,内部创建子进程成功后,父进程会调用 parent
,子进程会调用 child
。
源码
Thread.h
#pragma once
#include <pthread.h>
#include <functional>
#include <memory>
#include <string>
#include "NonCopyable.h"
#include "CountDownLatch.h"
#include "Atomic.h"
#include "CurrentThread.h"
namespace muduo
{
class Thread : public NonCopyable
{
public:
typedef std::function<void()> ThreadFunc;
explicit Thread(ThreadFunc func, const std::string &name = std::string());
~Thread();
void start();
int join();
bool started() const { return started_; }
pid_t tid() const { return pid_; }
const std::string &name() { return name_; }
static int num_created() { return num_creadted_.get(); }
private:
void set_default_name();
private:
bool started_;
bool joined_;
pthread_t pthread_id_;
pid_t pid_;
ThreadFunc func_;
std::string name_;
CountDownLatch latch_;
static AtomicInt32 num_creadted_;
};
}
Thread.cc
#include "Thread.h"
#include <sys/prctl.h>
#include "Exception.h"
namespace muduo
{
namespace detail
{
void after_fork()
{
CurrentThread::t_cached_tid = 0;
CurrentThread::t_thread_name = "main";
CurrentThread::tid();
}
struct ThreadNameInitializer
{
ThreadNameInitializer()
{
CurrentThread::t_thread_name = "main";
CurrentThread::tid();
pthread_atfork(NULL, NULL, &after_fork);
}
};
ThreadNameInitializer init;
struct ThreadData
{
typedef Thread::ThreadFunc ThrFunc;
ThrFunc func;
std::string name;
pid_t *pid;
CountDownLatch *latch;
ThreadData(ThrFunc function, const std::string &thr_name, pid_t *process_id, CountDownLatch *count_down_latch) : func(function), name(thr_name), pid(process_id), latch(count_down_latch)
{
}
void run_thread()
{
*pid = muduo::CurrentThread::tid();
pid = NULL;
latch->count_down();
latch = NULL;
CurrentThread::t_thread_name = name.empty() ? "unknown" : name.c_str();
::prctl(PR_SET_NAME, CurrentThread::t_thread_name);
try
{
func();
CurrentThread::t_thread_name = "finished";
}
catch (const Exception &ex)
{
CurrentThread::t_thread_name = "crashed";
fprintf(stderr, "exception caught in Thread: %s\n", name.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stack());
abort();
}
catch (const std::exception &ex)
{
CurrentThread::t_thread_name = "crashed";
fprintf(stderr, "exception caught in Thread: %s\n", name.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
CurrentThread::t_thread_name = "crashed";
fprintf(stderr, "exception caught in Thread: %s\n", name.c_str());
throw; //@ rethrow
}
}
};
//@ global function
void *start_thread(void *obj)
{
ThreadData *data = static_cast<ThreadData *>(obj);
data->run_thread();
delete data;
return NULL;
}
} //@ namespace detail
AtomicInt32 Thread::num_creadted_; //@ defination
Thread::Thread(ThreadFunc func, const std::string &name) : started_(false), joined_(false), pthread_id_(0), pid_(0), func_(std::move(func)), name_(name), latch_(1)
{
set_default_name();
}
Thread::~Thread()
{
if (started_ && !joined_)
pthread_detach(pthread_id_);
}
void Thread::set_default_name()
{
int num = num_creadted_.increment_and_get();
if (name_.empty())
{
char buf[32];
snprintf(buf, sizeof(buf), "Thread%d", num);
name_ = buf;
}
}
void Thread::start()
{
assert(!started_);
started_ = true;
detail::ThreadData *data = new detail::ThreadData(func_, name_, &pid_, &latch_);
if (pthread_create(&pthread_id_, NULL, &detail::start_thread, data))
{
started_ = false;
delete data;
}
else
{
latch_.wait();
assert(pid_ > 0);
}
}
int Thread::join()
{
assert(started_);
assert(!joined_);
joined_ = true;
return pthread_join(pthread_id_, NULL);
}
}