数据库事务的部分(前言)

数据库事务的部分实际上并没有占我校这门课程的很大份额,while~,但是这并不意味着校招会略过MVCC,thread,process,各种lock的区别和实现这些在PPT里一笔带过的东西,个人认为,只是在理论上的理解这些概念甚至不能配叫皮毛,所以这篇博客会从C++的多线程和lock开始介绍,以做到万无一失。

进程和线程

首先先回顾一下大三时期的美好时光(假定我们使用C++或C面向服务器编程),首先我们了解了并行(Concurrent)和并发(parallel)的区别,单核CPU内的Multi Thread叫做Concurrent,而多核时代的Multi Thread叫做Parallel,即:真正存在在一个时刻多个Thread在执行而不是一个时间时刻多个Thread在执行。同时应运而生的有超线程技术,例如:四核八线程CPU代表了单核会有2个Thread在运行。面试考的有多线程并发和多进程并发,多进程并发涉及进程之间的七种沟通方式:pipe,semafore,fifo,message queue,shared memory,socket,signal。不论哪一种都涉及大量底层API,每一个功能包括的头文件全都不一样,在这里把这七种全介绍完,相当于用中文复述一遍Advanced Unix Programming Environment半本书的内容,而且目测这些内容的考题已经逼近社招的难度,所以在这里没必要赘述。我们已经粗略了解多进程编程相比于多线程编程的优势和劣势,那么接下来简单介绍一下多线程的背景。经过查阅各种博客,发现C++11标准库包括了多线程的部分、包括了锁的部分,也就是说C++11之前的程序全部都是用Linux下的pthread、Linux内置的lock,那么对于C++的多进程支持呢?很巧,只有Boost库支持,那么我们如果不用Boost库,就只能用纯C混搭C++来支持多进程。以上,就是对于这个话题的粗略回顾。以下C++的例子来自于cpp reference

std::threading

Class to represent individual threads of execution.

std::thread类表示了执行中的单独线程

A thread of execution is a sequence of instructions that can be executed concurrently with other such sequences in multithreading environments, while sharing a same address space.

执行中的线程是一系列指令的组合,这些指令可以在多线程环境下与其他指令并发执行,并共享同一寻址空间

An initialized thread object represents an active thread of execution; Such a thread object is joinable, and has a unique thread id.

一个初始化的std::thread对象代表了一个激活了的对象,此对象被声明为joinable的,并且获得一个线程ID

A default-constructed (non-initialized) thread object is not joinable, and its thread id is common for all non-joinable threads.

默认构造的线程对象并不是initialized的,所以不是joinable的,它的线程ID和所有non-joinable的thread一样。

A joinable thread becomes not joinable if moved from, or if either join or detach are called on them.

一个joinable的thread如果使用了moved from或者join和detach,就会变成non-joinable。

// thread example
#include <iostream>       // std::cout
#include <thread>         // std::thread
 
void foo() 
{
  // do stuff...
}

void bar(int x)
{
  // do stuff...
}

int main() 
{
  std::thread first (foo);     // spawn new thread that calls foo()


  std::cout << "main, foo and bar now execute concurrently...\n";

  // synchronize threads:
  first.join();                // pauses until first finishes
  second.join();               // pauses until second finishes

  std::cout << "foo and bar completed.\n";

  return 0;
}

get_id()

假设当前thread是joinable的,get_id()函数返回当前线程id,否则返回默认thread::id

// thread::get_id / this_thread::get_id
#include <iostream>       // std::cout
#include <thread>         // std::thread, std::thread::id, std::this_thread::get_id
#include <chrono>         // std::chrono::seconds
 
std::thread::id main_thread_id = std::this_thread::get_id();

void is_main_thread() {
  if ( main_thread_id == std::this_thread::get_id() )
    std::cout << "This is the main thread.\n";
  else
    std::cout << "This is not the main thread.\n";
}

int main() 
{
  is_main_thread();
  std::thread th (is_main_thread);
  th.join();
}

join()

join()的返回类型是void,当线程结束时返回,否则block waiting,在执行join()之后,该thread变为none joinable thread

// example for thread::join
#include <iostream>       // std::cout
#include <thread>         // std::thread, std::this_thread::sleep_for
#include <chrono>         // std::chrono::seconds
 
void pause_thread(int n) 
{
  std::this_thread::sleep_for (std::chrono::seconds(n));
  std::cout << "pause of " << n << " seconds ended\n";
}
 
int main() 
{
  std::cout << "Spawning 3 threads...\n";
  std::thread t1 (pause_thread,1);
  std::thread t2 (pause_thread,2);
  std::thread t3 (pause_thread,3);
  std::cout << "Done spawning threads. Now waiting for them to join:\n";
  t1.join();
  t2.join();
  t3.join();
  std::cout << "All threads joined!\n";

  return 0;
}

joinable

一个线程有三种情况是none joinable的,第一种是当它被default construct的时候,第二种情况是它被detach或者join了之后,第三种情况是用了move语法给了其他线程。

// example for thread::joinable
#include <iostream>       // std::cout
#include <thread>         // std::thread
 
void mythread() 
{
  // do stuff...
}
 
int main() 
{
  std::thread foo;
  std::thread bar(mythread);

  std::cout << "Joinable after construction:\n" << std::boolalpha;
  std::cout << "foo: " << foo.joinable() << '\n';
  std::cout << "bar: " << bar.joinable() << '\n';

  if (foo.joinable()) foo.join();
  if (bar.joinable()) bar.join();

  std::cout << "Joinable after joining:\n" << std::boolalpha;
  std::cout << "foo: " << foo.joinable() << '\n';
  std::cout << "bar: " << bar.joinable() << '\n';

  return 0;
}

operator=

thread对象不能被copy,假设左值是none joinable的,那么右值就会给左值,然后右值变成"as if defaul contructed"的样子。假设左值是joinable的,就会terminate。

// example for thread::operator=
#include <iostream>       // std::cout
#include <thread>         // std::thread, std::this_thread::sleep_for
#include <chrono>         // std::chrono::seconds
 
void pause_thread(int n) 
{
  std::this_thread::sleep_for (std::chrono::seconds(n));
  std::cout << "pause of " << n << " seconds ended\n";
}

int main() 
{
  std::thread threads[5];                         // default-constructed threads

  std::cout << "Spawning 5 threads...\n";
  for (int i=0; i<5; ++i)
    threads[i] = std::thread(pause_thread,i+1);   // move-assign threads

  std::cout << "Done spawning threads. Now waiting for them to join:\n";
  for (int i=0; i<5; ++i)
    threads[i].join();

  std::cout << "All threads joined!\n";

  return 0;
}

swap

#include <iostream>
#include <thread>
#include <chrono>
 
void foo()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
}
 
void bar()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
}
 
int main()
{
    std::thread t1(foo);
    std::thread t2(bar);
 
    std::cout << "thread 1 id: " << t1.get_id() << '\n'
              << "thread 2 id: " << t2.get_id() << '\n';
 
    std::swap(t1, t2);
 
    std::cout << "after std::swap(t1, t2):" << '\n'
              << "thread 1 id: " << t1.get_id() << '\n'
              << "thread 2 id: " << t2.get_id() << '\n';
 
    t1.swap(t2);
 
    std::cout << "after t1.swap(t2):" << '\n'
              << "thread 1 id: " << t1.get_id() << '\n'
              << "thread 2 id: " << t2.get_id() << '\n';
 
    t1.join();
    t2.join();
}

交换线程id,stack overflow上有讨论这个到底是不是只交换id,有一个回答是说不要把它当作交换id,而是交换线程本身。(待考究)

detach

#include <iostream>       // std::cout
#include <thread>         // std::thread, std::this_thread::sleep_for
#include <chrono>         // std::chrono::seconds
 
void pause_thread(int n) 
{
  std::this_thread::sleep_for (std::chrono::seconds(n));
  std::cout << "pause of " << n << " seconds ended\n";
}
 
int main() 
{
  std::cout << "Spawning and detaching 3 threads...\n";
  std::thread (pause_thread,1).detach();
  std::thread (pause_thread,2).detach();
  std::thread (pause_thread,3).detach();
  std::cout << "Done spawning threads.\n";

  std::cout << "(the main thread will now pause for 5 seconds)\n";
  // give the detached threads time to finish (but not guaranteed!):
  pause_thread(5);
  return 0;
}

线程在detach之后就会none joinable,独立运行,不block waiting,一直到结束。

std::mutex

mutex是一个lockable的对象,提供critical setion的exclusive access。

// mutex example
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex

std::mutex mtx;           // mutex for critical section

void print_block (int n, char c) {
  // critical section (exclusive access to std::cout signaled by locking mtx):
  mtx.lock();
  for (int i=0; i<n; ++i) { std::cout << c; }
  std::cout << '\n';
  mtx.unlock();
}

int main ()
{
  std::thread th1 (print_block,50,'*');
  std::thread th2 (print_block,50,'$');

  th1.join();
  th2.join();

  return 0;
}

lock

调用的线程lock了mutex, 假设该mutex并没有被其他线程占用,那么这个调用线程就将其lock, 此线程拥有这个mutex,直到其调用unlock函数。否则block waiting或者deadlock,block waiting是该mutex已经被其他线程占用,死锁是因为该mutex被该线程的递归(重复)调用抢走了"key",导致第一个线程无法unlock。(这个地方很难理解,需要用recursive lock来解死锁),返回类型void。

// mutex::lock/unlock
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex

std::mutex mtx;           // mutex for critical section

void print_thread_id (int id) {
  // critical section (exclusive access to std::cout signaled by locking mtx):
  mtx.lock();
  std::cout << "thread #" << id << '\n';
  mtx.unlock();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_thread_id,i+1);

  for (auto& th : threads) th.join();

  return 0;
}

unlock

只需要注意一点,unlock一个没lock的mutex是undefined behavior

try lock

try lock也是会递归死锁的。假设被mutex其他thread占用,那么return false,否则return true

// mutex::try_lock example
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex

volatile int counter (0); // non-atomic counter
std::mutex mtx;           // locks access to counter

void attempt_10k_increases () {
  for (int i=0; i<10000; ++i) {
    if (mtx.try_lock()) {   // only increase if currently not locked:
      ++counter;
      mtx.unlock();
    }
  }
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(attempt_10k_increases);

  for (auto& th : threads) th.join();
  std::cout << counter << " successful increases of the counter.\n";

  return 0;
}

首先强调一下这并不是dead lock代码,因为并不存在一个线程内拿两个lock的情况,正确的输出结果大概是八万多,volatile的目的是声明一个被多线程共享的变量,被volatile声明的变量不会被从cache中读值(但这并不意味着它atomic了,它这里只起声明作用)

lock guard

// lock_guard example
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::lock_guard
#include <stdexcept>      // std::logic_error

std::mutex mtx;

void print_even (int x) {
  if (x%2==0) std::cout << x << " is even\n";
  else throw (std::logic_error("not even"));
}

void print_thread_id (int id) {
  try {
    // using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
    std::lock_guard<std::mutex> lck (mtx);
    print_even(id);
  }
  catch (std::logic_error&) {
    std::cout << "[exception caught]\n";
  }
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_thread_id,i+1);

  for (auto& th : threads) th.join();

  return 0;
}

官网说了一句很矛盾的话:“Note though that the lock_guard object does not manage the lifetime of the mutex object in any way: the duration of the mutex object shall extend at least until the destruction of the lock_guard that locks it.” 但是我们很容易看出来它的unlock是和智能指针一样,由函数的生命周期决定的,结束调用析构函数的unlock。官网的意思很有可能是lock guard并不确保它一直托管lock,开发者需要自己确定在析构函数调用之前lock是有效的。

adopt_lock and defer_lock

std::lock(m1, m2); // calling thread locks the mutex
std::lock_guard<std::mutex> lock1(m1, std::adopt_lock);    std::lock_guard<std::mutex> lock2(m2, std::adopt_lock);
// access shared data protected by the m1 and m2
std::unique_lock<std::mutex> lock1(m1, std::defer_lock);    std::unique_lock<std::mutex> lock2(m2, std::defer_lock);    
std::lock(lock1, lock2);
// access shared data protected by the m1 and m2

defer lock假定mutex无锁,adopt lock假定有锁
unique lock对于无defer lock和adopt lock或者try to lock的会报错,lock guard不会,所以lock guard有很大的机率block一个线程两次导致死锁。

unique lock

unique lock和lock guard差不多。

// unique_lock constructor example
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::lock, std::unique_lock
                          // std::adopt_lock, std::defer_lock
std::mutex foo,bar;

void task_a () {
  std::lock (foo,bar);         // simultaneous lock (prevents deadlock)
  std::unique_lock<std::mutex> lck1 (foo,std::adopt_lock);
  std::unique_lock<std::mutex> lck2 (bar,std::adopt_lock);
  std::cout << "task a\n";
  // (unlocked automatically on destruction of lck1 and lck2)
}

void task_b () {
  // foo.lock(); bar.lock(); // replaced by:
  std::unique_lock<std::mutex> lck1, lck2;
  lck1 = std::unique_lock<std::mutex>(bar,std::defer_lock);
  lck2 = std::unique_lock<std::mutex>(foo,std::defer_lock);
  std::lock (lck1,lck2);       // simultaneous lock (prevents deadlock)
  std::cout << "task b\n";
  // (unlocked automatically on destruction of lck1 and lck2)
}
int main ()
{
  std::thread th1 (task_a);
  std::thread th2 (task_b);

  th1.join();
  th2.join();

  return 0;
}

shared lock

shared lock是cpp14的内容,如果你去cppreference上查是可以查到的,但是你并不能在cplusplus com上查到,因为这个网站只包括c++11的简单教程。

#include <iostream>
#include <mutex>
#include <string>
#include <shared_mutex>
#include <thread>
 
std::string file = "Original content."; // Simulates a file
std::mutex output_mutex; // mutex that protects output operations.
std::shared_mutex file_mutex; // reader/writer mutex
 
void read(int id)
{
    std::string content;
    {
        std::shared_lock lock(file_mutex, std::defer_lock); // Do not lock it first.
        lock.lock(); // Lock it here.
        content = file;
    }
    std::lock_guard lock(output_mutex);
    std::cout << "Contents read by reader #" << id << ": " << content << '\n';
}
 
void write()
{
    {
        std::lock_guard file_lock(file_mutex);
        file = "New content";
    }
    std::lock_guard output_lock(output_mutex);
    std::cout << "New content saved.\n";
}
 
int main()
{
    std::cout << "Two readers reading from file.\n"
              << "A writer competes with them.\n";
    std::thread reader1(read, 1);
    std::thread reader2(read, 2);
    std::thread writer(write);
    reader1.join();
    reader2.join();
    writer.join();
    std::cout << "The first few operations to file are done.\n";
    reader1 = std::thread(read, 3);
    reader1.join();
}

我们可以看到,这里的share lock是共享模式的,和操作系统教材里说的读写锁意义一致,此外cout需要用互斥锁保证输出正常。

差不多先到这,C++的多线程标准库有atomic future conditional variable thread mutex等一坨库,这里只挑最简单的介绍,而且深坑浅坑也并没有涉及,不过我们的目标是database多线程的具体概念,无需纠结太多syntax 剩下漏掉的功能会在日后记录

由上我们可以窥见,数据库教材中所谓S锁与X锁在具体实操中的原型。

上一篇:iOS中使用ZipArchive压缩和解压缩文件-备


下一篇:mutex 锁