数据库事务的部分实际上并没有占我校这门课程的很大份额,while~,但是这并不意味着校招会略过MVCC,thread,process,各种lock的区别和实现这些在PPT里一笔带过的东西,个人认为,只是在理论上的理解这些概念甚至不能配叫皮毛,所以这篇博客会从C++的多线程和lock开始介绍,以做到万无一失。
进程和线程
首先先回顾一下大三时期的美好时光(假定我们使用C++或C面向服务器编程),首先我们了解了并行(Concurrent)和并发(parallel)的区别,单核CPU内的Multi Thread叫做Concurrent,而多核时代的Multi Thread叫做Parallel,即:真正存在在一个时刻多个Thread在执行而不是一个时间时刻多个Thread在执行。同时应运而生的有超线程技术,例如:四核八线程CPU代表了单核会有2个Thread在运行。面试考的有多线程并发和多进程并发,多进程并发涉及进程之间的七种沟通方式:pipe,semafore,fifo,message queue,shared memory,socket,signal。不论哪一种都涉及大量底层API,每一个功能包括的头文件全都不一样,在这里把这七种全介绍完,相当于用中文复述一遍Advanced Unix Programming Environment半本书的内容,而且目测这些内容的考题已经逼近社招的难度,所以在这里没必要赘述。我们已经粗略了解多进程编程相比于多线程编程的优势和劣势,那么接下来简单介绍一下多线程的背景。经过查阅各种博客,发现C++11标准库包括了多线程的部分、包括了锁的部分,也就是说C++11之前的程序全部都是用Linux下的pthread、Linux内置的lock,那么对于C++的多进程支持呢?很巧,只有Boost库支持,那么我们如果不用Boost库,就只能用纯C混搭C++来支持多进程。以上,就是对于这个话题的粗略回顾。以下C++的例子来自于cpp reference
std::threading
Class to represent individual threads of execution.
std::thread类表示了执行中的单独线程
A thread of execution is a sequence of instructions that can be executed concurrently with other such sequences in multithreading environments, while sharing a same address space.
执行中的线程是一系列指令的组合,这些指令可以在多线程环境下与其他指令并发执行,并共享同一寻址空间
An initialized thread object represents an active thread of execution; Such a thread object is joinable, and has a unique thread id.
一个初始化的std::thread对象代表了一个激活了的对象,此对象被声明为joinable的,并且获得一个线程ID
A default-constructed (non-initialized) thread object is not joinable, and its thread id is common for all non-joinable threads.
默认构造的线程对象并不是initialized的,所以不是joinable的,它的线程ID和所有non-joinable的thread一样。
A joinable thread becomes not joinable if moved from, or if either join or detach are called on them.
一个joinable的thread如果使用了moved from或者join和detach,就会变成non-joinable。
// thread example
#include <iostream> // std::cout
#include <thread> // std::thread
void foo()
{
// do stuff...
}
void bar(int x)
{
// do stuff...
}
int main()
{
std::thread first (foo); // spawn new thread that calls foo()
std::cout << "main, foo and bar now execute concurrently...\n";
// synchronize threads:
first.join(); // pauses until first finishes
second.join(); // pauses until second finishes
std::cout << "foo and bar completed.\n";
return 0;
}
get_id()
假设当前thread是joinable的,get_id()函数返回当前线程id,否则返回默认thread::id
// thread::get_id / this_thread::get_id
#include <iostream> // std::cout
#include <thread> // std::thread, std::thread::id, std::this_thread::get_id
#include <chrono> // std::chrono::seconds
std::thread::id main_thread_id = std::this_thread::get_id();
void is_main_thread() {
if ( main_thread_id == std::this_thread::get_id() )
std::cout << "This is the main thread.\n";
else
std::cout << "This is not the main thread.\n";
}
int main()
{
is_main_thread();
std::thread th (is_main_thread);
th.join();
}
join()
join()的返回类型是void,当线程结束时返回,否则block waiting,在执行join()之后,该thread变为none joinable thread
// example for thread::join
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::sleep_for
#include <chrono> // std::chrono::seconds
void pause_thread(int n)
{
std::this_thread::sleep_for (std::chrono::seconds(n));
std::cout << "pause of " << n << " seconds ended\n";
}
int main()
{
std::cout << "Spawning 3 threads...\n";
std::thread t1 (pause_thread,1);
std::thread t2 (pause_thread,2);
std::thread t3 (pause_thread,3);
std::cout << "Done spawning threads. Now waiting for them to join:\n";
t1.join();
t2.join();
t3.join();
std::cout << "All threads joined!\n";
return 0;
}
joinable
一个线程有三种情况是none joinable的,第一种是当它被default construct的时候,第二种情况是它被detach或者join了之后,第三种情况是用了move语法给了其他线程。
// example for thread::joinable
#include <iostream> // std::cout
#include <thread> // std::thread
void mythread()
{
// do stuff...
}
int main()
{
std::thread foo;
std::thread bar(mythread);
std::cout << "Joinable after construction:\n" << std::boolalpha;
std::cout << "foo: " << foo.joinable() << '\n';
std::cout << "bar: " << bar.joinable() << '\n';
if (foo.joinable()) foo.join();
if (bar.joinable()) bar.join();
std::cout << "Joinable after joining:\n" << std::boolalpha;
std::cout << "foo: " << foo.joinable() << '\n';
std::cout << "bar: " << bar.joinable() << '\n';
return 0;
}
operator=
thread对象不能被copy,假设左值是none joinable的,那么右值就会给左值,然后右值变成"as if defaul contructed"的样子。假设左值是joinable的,就会terminate。
// example for thread::operator=
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::sleep_for
#include <chrono> // std::chrono::seconds
void pause_thread(int n)
{
std::this_thread::sleep_for (std::chrono::seconds(n));
std::cout << "pause of " << n << " seconds ended\n";
}
int main()
{
std::thread threads[5]; // default-constructed threads
std::cout << "Spawning 5 threads...\n";
for (int i=0; i<5; ++i)
threads[i] = std::thread(pause_thread,i+1); // move-assign threads
std::cout << "Done spawning threads. Now waiting for them to join:\n";
for (int i=0; i<5; ++i)
threads[i].join();
std::cout << "All threads joined!\n";
return 0;
}
swap
#include <iostream>
#include <thread>
#include <chrono>
void foo()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
void bar()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
std::thread t1(foo);
std::thread t2(bar);
std::cout << "thread 1 id: " << t1.get_id() << '\n'
<< "thread 2 id: " << t2.get_id() << '\n';
std::swap(t1, t2);
std::cout << "after std::swap(t1, t2):" << '\n'
<< "thread 1 id: " << t1.get_id() << '\n'
<< "thread 2 id: " << t2.get_id() << '\n';
t1.swap(t2);
std::cout << "after t1.swap(t2):" << '\n'
<< "thread 1 id: " << t1.get_id() << '\n'
<< "thread 2 id: " << t2.get_id() << '\n';
t1.join();
t2.join();
}
交换线程id,stack overflow上有讨论这个到底是不是只交换id,有一个回答是说不要把它当作交换id,而是交换线程本身。(待考究)
detach
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::sleep_for
#include <chrono> // std::chrono::seconds
void pause_thread(int n)
{
std::this_thread::sleep_for (std::chrono::seconds(n));
std::cout << "pause of " << n << " seconds ended\n";
}
int main()
{
std::cout << "Spawning and detaching 3 threads...\n";
std::thread (pause_thread,1).detach();
std::thread (pause_thread,2).detach();
std::thread (pause_thread,3).detach();
std::cout << "Done spawning threads.\n";
std::cout << "(the main thread will now pause for 5 seconds)\n";
// give the detached threads time to finish (but not guaranteed!):
pause_thread(5);
return 0;
}
线程在detach之后就会none joinable,独立运行,不block waiting,一直到结束。
std::mutex
mutex是一个lockable的对象,提供critical setion的exclusive access。
// mutex example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
std::mutex mtx; // mutex for critical section
void print_block (int n, char c) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
for (int i=0; i<n; ++i) { std::cout << c; }
std::cout << '\n';
mtx.unlock();
}
int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');
th1.join();
th2.join();
return 0;
}
lock
调用的线程lock了mutex, 假设该mutex并没有被其他线程占用,那么这个调用线程就将其lock, 此线程拥有这个mutex,直到其调用unlock函数。否则block waiting或者deadlock,block waiting是该mutex已经被其他线程占用,死锁是因为该mutex被该线程的递归(重复)调用抢走了"key",导致第一个线程无法unlock。(这个地方很难理解,需要用recursive lock来解死锁),返回类型void。
// mutex::lock/unlock
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
std::mutex mtx; // mutex for critical section
void print_thread_id (int id) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
std::cout << "thread #" << id << '\n';
mtx.unlock();
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);
for (auto& th : threads) th.join();
return 0;
}
unlock
只需要注意一点,unlock一个没lock的mutex是undefined behavior
try lock
try lock也是会递归死锁的。假设被mutex其他thread占用,那么return false,否则return true
// mutex::try_lock example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
volatile int counter (0); // non-atomic counter
std::mutex mtx; // locks access to counter
void attempt_10k_increases () {
for (int i=0; i<10000; ++i) {
if (mtx.try_lock()) { // only increase if currently not locked:
++counter;
mtx.unlock();
}
}
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(attempt_10k_increases);
for (auto& th : threads) th.join();
std::cout << counter << " successful increases of the counter.\n";
return 0;
}
首先强调一下这并不是dead lock代码,因为并不存在一个线程内拿两个lock的情况,正确的输出结果大概是八万多,volatile的目的是声明一个被多线程共享的变量,被volatile声明的变量不会被从cache中读值(但这并不意味着它atomic了,它这里只起声明作用)
lock guard
// lock_guard example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock_guard
#include <stdexcept> // std::logic_error
std::mutex mtx;
void print_even (int x) {
if (x%2==0) std::cout << x << " is even\n";
else throw (std::logic_error("not even"));
}
void print_thread_id (int id) {
try {
// using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
std::lock_guard<std::mutex> lck (mtx);
print_even(id);
}
catch (std::logic_error&) {
std::cout << "[exception caught]\n";
}
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);
for (auto& th : threads) th.join();
return 0;
}
官网说了一句很矛盾的话:“Note though that the lock_guard object does not manage the lifetime of the mutex object in any way: the duration of the mutex object shall extend at least until the destruction of the lock_guard that locks it.” 但是我们很容易看出来它的unlock是和智能指针一样,由函数的生命周期决定的,结束调用析构函数的unlock。官网的意思很有可能是lock guard并不确保它一直托管lock,开发者需要自己确定在析构函数调用之前lock是有效的。
adopt_lock and defer_lock
std::lock(m1, m2); // calling thread locks the mutex
std::lock_guard<std::mutex> lock1(m1, std::adopt_lock); std::lock_guard<std::mutex> lock2(m2, std::adopt_lock);
// access shared data protected by the m1 and m2
std::unique_lock<std::mutex> lock1(m1, std::defer_lock); std::unique_lock<std::mutex> lock2(m2, std::defer_lock);
std::lock(lock1, lock2);
// access shared data protected by the m1 and m2
defer lock假定mutex无锁,adopt lock假定有锁
unique lock对于无defer lock和adopt lock或者try to lock的会报错,lock guard不会,所以lock guard有很大的机率block一个线程两次导致死锁。
unique lock
unique lock和lock guard差不多。
// unique_lock constructor example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock, std::unique_lock
// std::adopt_lock, std::defer_lock
std::mutex foo,bar;
void task_a () {
std::lock (foo,bar); // simultaneous lock (prevents deadlock)
std::unique_lock<std::mutex> lck1 (foo,std::adopt_lock);
std::unique_lock<std::mutex> lck2 (bar,std::adopt_lock);
std::cout << "task a\n";
// (unlocked automatically on destruction of lck1 and lck2)
}
void task_b () {
// foo.lock(); bar.lock(); // replaced by:
std::unique_lock<std::mutex> lck1, lck2;
lck1 = std::unique_lock<std::mutex>(bar,std::defer_lock);
lck2 = std::unique_lock<std::mutex>(foo,std::defer_lock);
std::lock (lck1,lck2); // simultaneous lock (prevents deadlock)
std::cout << "task b\n";
// (unlocked automatically on destruction of lck1 and lck2)
}
int main ()
{
std::thread th1 (task_a);
std::thread th2 (task_b);
th1.join();
th2.join();
return 0;
}
shared lock
shared lock是cpp14的内容,如果你去cppreference上查是可以查到的,但是你并不能在cplusplus com上查到,因为这个网站只包括c++11的简单教程。
#include <iostream>
#include <mutex>
#include <string>
#include <shared_mutex>
#include <thread>
std::string file = "Original content."; // Simulates a file
std::mutex output_mutex; // mutex that protects output operations.
std::shared_mutex file_mutex; // reader/writer mutex
void read(int id)
{
std::string content;
{
std::shared_lock lock(file_mutex, std::defer_lock); // Do not lock it first.
lock.lock(); // Lock it here.
content = file;
}
std::lock_guard lock(output_mutex);
std::cout << "Contents read by reader #" << id << ": " << content << '\n';
}
void write()
{
{
std::lock_guard file_lock(file_mutex);
file = "New content";
}
std::lock_guard output_lock(output_mutex);
std::cout << "New content saved.\n";
}
int main()
{
std::cout << "Two readers reading from file.\n"
<< "A writer competes with them.\n";
std::thread reader1(read, 1);
std::thread reader2(read, 2);
std::thread writer(write);
reader1.join();
reader2.join();
writer.join();
std::cout << "The first few operations to file are done.\n";
reader1 = std::thread(read, 3);
reader1.join();
}
我们可以看到,这里的share lock是共享模式的,和操作系统教材里说的读写锁意义一致,此外cout需要用互斥锁保证输出正常。
差不多先到这,C++的多线程标准库有atomic future conditional variable thread mutex等一坨库,这里只挑最简单的介绍,而且深坑浅坑也并没有涉及,不过我们的目标是database多线程的具体概念,无需纠结太多syntax 剩下漏掉的功能会在日后记录