[MIT 6.S081] Lab 7: Multithreading

Lab 7: Multithreading

Uthread: switching between threads (moderate)

要点

  • 添加代码到 user/uthread.cthread_create(), thread_schedule() 函数以及 user/uthread_switch.Sthread_swtich 函数.
  • 确保当 thread_schedule() 第一个运行给定线程时, 该线程执行传递给 thread_create() 的函数在该线程的堆栈中.
  • 确保 thread_switch 保存被切换出去的线程的寄存器, 恢复被切换到的线程的寄存器, 并返回到切换到的线程指令中最后停止的位置.

思路

该部分实验是在用户模式模拟一个进程有多个用户线程. 通过 thread_create() 创建线程, thread_schedule() 进行线程调度. 这与 xv6 的线程调度是大同小异的.
对于此处用户多线程之间的切换, 也需要保存寄存器信息, 实际上就可以直接参考内核线程切换的 struct context 结构体, 需要保存的寄存器信息是一致的. 而每个线程会有独立的需要执行的函数和线程栈, 则需要在创建线程进行设置.

步骤

  1. 设置线程上下文结构体.
    由于此时用户多线程切换需要保存的寄存器信息和 xv6 内核线程切换需要保存的寄存器信息是一致的, 因此可以直接使用 kernel/proc.h 中定义的 struct context 结构体. 此处为了代码的清晰独立, 单独设置了 struct ctx 结构体, 其成员是和前者一致的.
// Saved registers for thread context switches. - lab7-1
struct ctx {
    uint64 ra;
    uint64 sp;

    // callee-saved
    uint64 s0;
    uint64 s1;
    uint64 s2;
    uint64 s3;
    uint64 s4;
    uint64 s5;
    uint64 s6;
    uint64 s7;
    uint64 s8;
    uint64 s9;
    uint64 s10;
    uint64 s11;
};
  1. 在线程结构体 struct thread 中添加线程上下文字段 context.
    很显然, 上文定义的线程上下文结构体 struct ctx 是和线程一一对应的, 应作为线程结构体的一个成员变量.
struct thread {
  char       stack[STACK_SIZE]; /* the thread's stack */
  int        state;             /* FREE, RUNNING, RUNNABLE */
  struct ctx context;       // thread's context - lab7-1
};
  1. 添加代码到 thread_create() 函数.
    thread_create() 函数主要进行线程的初始化操作: 其先在线程数组中找到一个状态为 FREE 即未初始化的线程, 然后设置其状态为 RUNNABLE 等进行初始化.
    这里要注意到, 传递的 thread_create() 参数 func 需要记录, 这样在线程运行时才能运行该函数, 此外线程的栈结构是独立的, 在运行函数时要在线程自己的栈上, 因此也要初始化线程的栈指针. 而在线程进行调度切换时, 同样需要保存和恢复寄存器状态, 而上述二者实际上分别对应着 rasp 寄存器, 在线程初始化进行设置, 这样在后续调度切换时便能保持其正确性.
void 
thread_create(void (*func)())
{
  struct thread *t;

  for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
    if (t->state == FREE) break;
  }
  t->state = RUNNABLE;
  // YOUR CODE HERE
  // set thread's function address and thread's stack pointer - lab7-1
  t->context.ra = (uint64) func;
  t->context.sp = (uint64) t->stack + STACK_SIZE;
}
  1. 添加代码到 thread_schedule() 函数.
    thread_schedule() 函数负责进行用户多线程间的调度. 此处是通过函数的主动调用进行的线程切换. 其主要工作就是从当前线程在线程数组的位置开始寻找一个 RUNNABLE 状态的线程进行运行. 实际上与 kernel/proc.c 中的 scheduler() 函数是很相似的. 而很明显在找到线程后就需要进行线程的切换, 调用函数 thread_switch().
    thread_switch() 根据其在 user/thread.c 中的外部声明以及指导书的要求可以推断出, 该函数应该是定义在 user/uthread_switch.S, 用汇编代码实现. 因此其功能应该与 kernel/swtch.S 中的 swtch() 函数一致, 进行线程切换时的寄存器代码的保存与恢复.
void 
thread_schedule(void)
{
  struct thread *t, *next_thread;

  /* Find another runnable thread. */
  next_thread = 0;
  t = current_thread + 1;
  for(int i = 0; i < MAX_THREAD; i++){
    if(t >= all_thread + MAX_THREAD)
      t = all_thread;
    if(t->state == RUNNABLE) {
      next_thread = t;
      break;
    }
    t = t + 1;
  }

  if (next_thread == 0) {
    printf("thread_schedule: no runnable threads\n");
    exit(-1);
  }

  if (current_thread != next_thread) {         /* switch threads?  */
    next_thread->state = RUNNING;
    t = current_thread;
    current_thread = next_thread;
    /* YOUR CODE HERE
     * Invoke thread_switch to switch from t to next_thread:
     * thread_switch(??, ??);
     */
    thread_switch(&t->context, &current_thread->context);   // switch thread - lab7-1
  } else
    next_thread = 0;
}
  1. 最后在 user/uthread_switch.S 中添加 thread_switch 的代码. 正如上文所述, 该函数实际上功能与 kernel/swtch.S 中的 swtch 函数一致, 而由于此处 struct ctx 与内核的 struct context 结构体的成员是相同的, 因此该函数可以直接复用 kernel/swtch.S 中的 swtch 代码.
	.text

	/*
     * save the old thread's registers,
     * restore the new thread's registers.
     */

	.globl thread_switch
thread_switch:
	/* YOUR CODE HERE */
	# same as swtch in swtch.S - lab7
    sd ra, 0(a0)
    sd sp, 8(a0)
    sd s0, 16(a0)
    sd s1, 24(a0)
    sd s2, 32(a0)
    sd s3, 40(a0)
    sd s4, 48(a0)
    sd s5, 56(a0)
    sd s6, 64(a0)
    sd s7, 72(a0)
    sd s8, 80(a0)
    sd s9, 88(a0)
    sd s10, 96(a0)
    sd s11, 104(a0)

    ld ra, 0(a1)
    ld sp, 8(a1)
    ld s0, 16(a1)
    ld s1, 24(a1)
    ld s2, 32(a1)
    ld s3, 40(a1)
    ld s4, 48(a1)
    ld s5, 56(a1)
    ld s6, 64(a1)
    ld s7, 72(a1)
    ld s8, 80(a1)
    ld s9, 88(a1)
    ld s10, 96(a1)
    ld s11, 104(a1)
	ret    /* return to ra */

测试

  • 在 xv6 中运行 uthread:
    [MIT 6.S081] Lab 7: Multithreading
  • ./grade-lab-thread uthread 单项测试:
    [MIT 6.S081] Lab 7: Multithreading

思考题

  • Q: thread_switch needs to save/restore only the callee-save registers. Why?
    A: 这里仅需保存被调用者保存(callee-save)寄存器的原因和 xv6 中内核线程切换时仅保留 callee-save 寄存器的原因是相同的. 由于 thread_switch() 一定由其所在的 C 语言函数调用, 因此函数的调用规则是满足 xv6 的函数调用规则的, 对于其它 caller-save 寄存器都会被保存在线程的堆栈上, 在切换后的线程上下文恢复时可以直接从切换后线程的堆栈上恢复 caller-save 寄存器的值. 由于 callee-save 寄存器是由被调用函数即 thread_switch() 进行保存的, 在函数返回时已经丢失, 因此需要额外保存这些寄存器的内容.

Using threads (moderate)

预处理

  1. 使用如下命令构建 ph 程序, 该程序包含一个线程不安全的哈希表.
$ make ph
  1. 运行 ./ph 1 即使用单线程运行该哈希表, 输出如下, 其 0 个键丢失:
    [MIT 6.S081] Lab 7: Multithreading
  2. 运行 ./ph 2 即使用两个线程运行该哈希表, 输出如下, 可以看到其 put 速度近乎先前 2 倍, 但是有 16423 个键丢失, 也说明了该哈希表非线程安全.
    [MIT 6.S081] Lab 7: Multithreading

思考题

  • Q: Why are there missing keys with 2 threads, but not with 1 thread? Identify a sequence of events with 2 threads that can lead to a key being missing. Submit your sequence with a short explanation in answers-thread.txt
    A: 这里的哈希表就是"数组(bucket)+链表"的经典实现方法. 通过取余确定 bucket, put() 是使用前插法插入键值对, get() 遍历 bucket 下的链表找到对应 key 的 entry. 而这个实现没有涉及任何锁机制或者 CAS 等线程安全机制, 因此线程不安全, 多线程插入时会出现数据丢失.
    该哈希表的线程安全问题是: 多个线程同时调用 put() 对同一个 bucket 进行数据插入时, 可能会使得先插入的 entry 丢失. 结合代码具体来讲, 假设有 A 和 B 两个线程同时 put(), 由于该哈希表的桶数 NBUCKET5, 哈希函数为 key%NBUCKET, 而插入的 key 为 keys[b*n+i], 而 b=NKEYS/nthread=100000/2=50000, 而 b%NBUCKET==0, 因此对于 A 和 B 两个线程, 在 i 相同时实际上会在同一个 bucket 插入数据. 假设 A 和 B 都运行到 put() 函数的 insert(), 还未进入该函数内部, 这就会导致两个线程 insert() 的后两个参数是相同的, 都是当前 bucket 的链表头, 如若线程 A 调用 insert() 插入完 entry 后, 切换到线程 B 再调用 insert() 插入 entry, 则会导致线程 A 刚刚插入的 entry 丢失.

要点

  • 使用互斥锁 pthread_mutex_t 来进行数据一致性的保护.
  • 通过避免并发 put() 在哈希表中读取或写入的内存重叠来提高并行速度, 可以考虑哈希表的每个 bucket 加锁.

步骤

  1. 定义互斥锁数组.
    根据指导书可知, 此处主要通过加互斥锁来解决线程不安全的问题. 此处没有选择使用一个互斥锁, 这样会导致访问整个哈希表都是串行的. 而考虑到对该哈希表, 实际上只有对同一 bucket 操作时才可能造成数据的丢失, 不同 bucket 之间是互不影响的, 因此此处是构建了一个互斥锁数组, 每个 bucket 对应一个互斥锁.
pthread_mutex_t locks[NBUCKET]; // lab7-2
  1. main() 函数中对所有互斥锁进行初始化.
int
main(int argc, char *argv[])
{
  pthread_t *tha;
  void *value;
  double t1, t0;

  if (argc < 2) {
    fprintf(stderr, "Usage: %s nthreads\n", argv[0]);
    exit(-1);
  }
  nthread = atoi(argv[1]);
  tha = malloc(sizeof(pthread_t) * nthread);
  srandom(0);
  assert(NKEYS % nthread == 0);
  for (int i = 0; i < NKEYS; i++) {
    keys[i] = random();
  }
  // initialize locks - lab7-2
  for(int i = 0; i < NBUCKET; ++i) {
      pthread_mutex_init(&locks[i], NULL);
  }

  //
  // first the puts
  // ...
}
  1. put() 中加锁.
    由于线程安全问题是由于对 bucket 中的链表操作时产生的, 因此要在对链表操作的前后加锁.
    但实际上, 对于加锁的临界区可以缩小至 insert() 函数. 原因是 insert() 函数采取头插法插入 entry, 在函数的最后才使用 *p=e 修改 bucket 链表头 table[i] 的值, 也就是说, 在前面操作的同时, 并不会对 bucket 链表进行修改, 因此可以缩小临界区的方法. 实际上加锁的范围可以缩小至 *p=e 前后, 但由于需要修改 insert() 函数, 此处便未这样修改.
static 
void put(int key, int value)
{
  int i = key % NBUCKET;

  // is the key already present?
  struct entry *e = 0;
  for (e = table[i]; e != 0; e = e->next) {
    if (e->key == key)
      break;
  }
  if(e){
    // update the existing key.
    e->value = value;
  } else {
    pthread_mutex_lock(&locks[i]);    // lock - lab7-2
    // the new is new.
    insert(key, value, &table[i], table[i]);
    pthread_mutex_unlock(&locks[i]);  // unlock - lab7-2
  }
}
  1. 不需要在 get() 中加锁.
    get() 函数主要是遍历 bucket 链表找寻对应的 entry, 并不会对 bucket 链表进行修改, 实际上只是读操作, 因此无需加锁.
  2. 修改 NBUCKET 避免并发写入内存重叠.
    之所以在修改前两个线程运行有并发问题, 是由于 b%NBUCKET==0, 因此两个同时运行的线程, put(keys[b*n+i],n)时, 实际上都是在操作 i%NBUCKET 这一 bucket, 也就很可能同时修改 bucket 的链表头, 即指导书中所说的写入同一物理内存, 自然容易发生丢失修改, 而且加锁后因为会同时争用锁, 也会影响并发性能.
    可以通过修改 NBUCKET 使得 b%NBUCKET!=0, 这样两个同时运行的线程进行 put() 时便大概率不会对同一个 bucket 进行操作, 自然也就减少了锁的争用, 能够一定程度上挺高并发性能.
    此处选择 NBUCKET=7, 在两个线程时, b%NBUCKET=50000%7!=0.
#define NBUCKET 7   // lab7

测试

  1. 修改 NBUCKET 前测试:
    • ./ph 2 测试: 可以加锁后两个线程运行不会有 key 丢失, 同时可以看到 put 的性能虽然相比加锁之前有所减小, 但实际影响不大.
      [MIT 6.S081] Lab 7: Multithreading
    • ./grade-lab-thread ph_safe单项测试:
      [MIT 6.S081] Lab 7: Multithreading
    • ./grade-lab-thread ph_fast 单项测试: 实际上此处在未修改 NBUCKET 前就可以通过 ph_fast 的测试.
      [MIT 6.S081] Lab 7: Multithreading
  2. 修改 NBUCKET=7 后:
    • ./ph 2 测试: 可以看到修改后的 put 和 get 性能达到了 35000 左右每秒, 达到甚至超过了最初不加锁的性能.
      [MIT 6.S081] Lab 7: Multithreading
    • ./grade-lab-thread ph_fast 单项测试: 修改后, 该测试的完成时间减少到 19.1s, 相比修改前的 22.3s 也有一定的提升.
      [MIT 6.S081] Lab 7: Multithreading

Barrier(moderate)

预处理

  1. 运行如下命令构建 barrier 程序, 该程序要求多线程同时执行到同一位置后再继续运行, 即多线程同步问题.
$ make barrier
  1. 运行 ./barrier 2 即使用两个线程运行该程序, 输出如下, 即最初版本不满足该性质, 会致使运行失败.
    [MIT 6.S081] Lab 7: Multithreading

要点

  • 实现满足线程同步的 barrier.
  • 使用条件变量 pthread_cond_t 配合互斥锁完成多线程同步.

思路

此处主要涉及互斥锁和条件变量配合达到线程同步.
首先条件变量的操作需要在互斥锁锁定的临界区内.
然后进行条件判断, 此处即判断是否所有的线程都进入了 barrier() 函数, 若不满足则使用 pthread_cond_wait() 将当前线程休眠, 等待唤醒; 若全部线程都已进入 barrier() 函数, 则最后进入的线程会调用 pthread_cond_broadcast() 唤醒其他由条件变量休眠的线程继续运行.
需要注意的是, 对于 pthread_cond_wait() 涉及三个操作: 原子的释放拥有的锁并阻塞当前线程, 这两个操作是原子的; 第三个操作是由条件变量唤醒后会再次获取锁.

步骤

根据上述思路, 在 barrier() 函数中添加如下代码.
注意对于变量 bstate.roundbstate.nthread 的设置需要在 pthread_cond_broadcast() 唤醒其它线程之前, 否则其他线程进入下一轮循环时可能这两个字段的值还未得到修改.

static void 
barrier()
{
  // YOUR CODE HERE
  //
  // Block until all threads have called barrier() and
  // then increment bstate.round.
  //
  // lab7-3
  pthread_mutex_lock(&bstate.barrier_mutex);
  // judge whether all threads reach the barrier
  if(++bstate.nthread != nthread)  {    // not all threads reach    
    pthread_cond_wait(&bstate.barrier_cond,&bstate.barrier_mutex);  // wait other threads
  } else {  // all threads reach
    bstate.nthread = 0; // reset nthread
    ++bstate.round; // increase round
    pthread_cond_broadcast(&bstate.barrier_cond);   // wake up all sleeping threads
  }
  pthread_mutex_unlock(&bstate.barrier_mutex);
}

测试

  • ./barrier <threadNum> 测试:
    [MIT 6.S081] Lab 7: Multithreading
  • ./grade-lab-thread barrier 单项测试:
    [MIT 6.S081] Lab 7: Multithreading
上一篇:MIT 6.824 Lab 1: MapReduce


下一篇:一款趣味少儿编程工具