mit6.824lab1

环境

指导书,认真看 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
要求golang1.15 那就用那个版本
我开始尝试使用1.17发现gobuild不通过
在windows的goland上编代码, 在远程linux服务器(或者虚拟机)上执行。没有mac的可以尝试远程目录挂载。
windows挂载远程目录
实验是做完了,这里写一写过程,整理一下思路
mit6.824lab1

过程

  1. 首先认真读了mapreduce的论文,因为以前本科也上过课,研一也上过,所以对于mr的过程还是算熟悉。

  2. 认真读指导书

  3. 按指导书跑一遍 sequencial-mr的例子

  4. 根据指导书和论文,需要做的事情是实现coordinator和worker的通信,(blacklive matters!)这里没用master了。coordinator里面用一些数据结构mutex或者channel控制不同worker的互斥访问,控制map任务和reduce任务的状态,map任务产生的中间文件的位置等。根据论文里面说的3种状态,Idle,inprogress,complete.这里我用了比较简单的数据结构。因为这个实验的输入文件都不是很大,不像gfs实验中需要split为16-64MB,所以直接一个map任务对应一个输入文件,因此将文件名作为了任务名,map的key。reduce任务名则根据mrcoordinator.go指定的nreduce数量决定,用数字代替。TaskStatus是枚举,三种任务状态。结构比较简单,刚开始想着要是数据结构不够用后面再加,但是好像做下来都没改过。

修改这三个文件
mit6.824lab1

type Coordinator struct {
	mapTasks       map[string]TaskStatus
	reduceTasks    map[string]TaskStatus
	reduceFiles    map[string][]string //中间文件位置
	mapCompleted   bool 
	MapCompleteNum int //完成了多少个map
	nReduce        int 
	done bool
	mutex sync.Mutex
}

master初始化

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{
		map[string]TaskStatus{},
		map[string]TaskStatus{},
		map[string][]string{},
		false,
		0,
		nReduce,
		false,
		sync.Mutex{},
	}

	// Your code here.
	for _, fileName := range files {
		c.mapTasks[fileName] = Idle
	}
	for i := 0; i < nReduce; i++ {
		c.reduceTasks[strconv.Itoa(i)] = Idle
		c.reduceFiles[strconv.Itoa(i)] = []string{}
	}

	c.server()
	return &c
}

worker申请任务

他给的demo里面有example rpc调用的例子,在rpc里定义自己的请求体和响应体的结构,这也是rpc通信的风格。grpc开发app就是先定义protobuf消息格式。
我做的过程中定义的函数没有返回error,好像不行,需要保持和example一样。
coordinator(master)分配任务的同时,给对应的任务加一个过期时间,实验要求的是10s。
超时处理的时候,刚开始没看指导书的hints,后来发现里面建议使用ioutils.TempFile,因为测试过程中会测试当某个worker crash掉的时候,程序会怎么办,A worker超时了,分给另一个worker B,A写了一半的文件就失效了,因此应该用临时文件,最后返回的时候写入成功了才改成本应的文件名。
这里map的中间产物我命名很随意,直接文件名*n,n代表作为哪一个reduce的输入,n是由key 哈希后 取模得到的。


func (c *Coordinator) GetTask(request *ExampleArgs, reply *GetTaskReply) error {

	c.mutex.Lock()
	defer c.mutex.Unlock()

	if !c.mapCompleted { //分配map任务
		for fileName, status := range c.mapTasks {
			if status == Idle {
				c.mapTasks[fileName] = InProgress
				reply.MapName = fileName
				reply.NReduce = c.nReduce
				reply.TaskType = MapType
				go c.HandleMapTimeout(fileName)
				return nil
			}
		}
		reply.TaskType = Sleep

	} else {

		for reduceName, status := range c.reduceTasks {
			if status == Idle{
				c.reduceTasks[reduceName] = InProgress
				reply.ReduceFiles = c.reduceFiles[reduceName]
				reply.TaskType = ReduceType
				reply.ReduceName = reduceName
				go c.HandleReduceTimeout(reduceName)
				return nil
			}
		}
		reply.TaskType = Sleep
	}
	return nil

}

worker上报任务完成

map和reduce完成后同样rpc调用,报告master完成了任务,每次上报master都判断map阶段和reduce阶段是否完成,主程序中有一个判断整个程序是否完成的逻辑,done()

func (c *Coordinator) ReduceReport(req *ReduceCompleteReq,reply *ExampleReply) error{
	c.mutex.Lock()
	defer c.mutex.Unlock()
	c.reduceTasks[req.ReduceName] = Completed
	//fmt.Fprintf(os.Stderr, "####reduce完成 "+req.ReduceName+"\n")
	fmt.Fprintf(os.Stderr, "####reduce完成, "+req.ReduceName+"############### \n")

	done := true
	for _,status := range c.reduceTasks{
		if status != Completed{
			done = false
			break
		}
	}
	c.done = done
	return nil
}

差不多就这些,实现进程同步还可以用channel

遇到的两个坑就是,一个rpc调用函数需要返回error,另一个就是crash测试,如果不做临时文件处理,就可能通不过。

上一篇:复习day2


下一篇:记录一次线上故障新生代,老年代,内存溢出,IO线程,CPU全部满负载异常情况排查