环境
指导书,认真看 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
要求golang1.15 那就用那个版本
我开始尝试使用1.17发现gobuild不通过
在windows的goland上编代码, 在远程linux服务器(或者虚拟机)上执行。没有mac的可以尝试远程目录挂载。
windows挂载远程目录
实验是做完了,这里写一写过程,整理一下思路
过程
-
首先认真读了mapreduce的论文,因为以前本科也上过课,研一也上过,所以对于mr的过程还是算熟悉。
-
认真读指导书
-
按指导书跑一遍 sequencial-mr的例子
-
根据指导书和论文,需要做的事情是实现coordinator和worker的通信,(blacklive matters!)这里没用master了。coordinator里面用一些数据结构mutex或者channel控制不同worker的互斥访问,控制map任务和reduce任务的状态,map任务产生的中间文件的位置等。根据论文里面说的3种状态,Idle,inprogress,complete.这里我用了比较简单的数据结构。因为这个实验的输入文件都不是很大,不像gfs实验中需要split为16-64MB,所以直接一个map任务对应一个输入文件,因此将文件名作为了任务名,map的key。reduce任务名则根据mrcoordinator.go指定的nreduce数量决定,用数字代替。TaskStatus是枚举,三种任务状态。结构比较简单,刚开始想着要是数据结构不够用后面再加,但是好像做下来都没改过。
修改这三个文件
type Coordinator struct {
mapTasks map[string]TaskStatus
reduceTasks map[string]TaskStatus
reduceFiles map[string][]string //中间文件位置
mapCompleted bool
MapCompleteNum int //完成了多少个map
nReduce int
done bool
mutex sync.Mutex
}
master初始化
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
map[string]TaskStatus{},
map[string]TaskStatus{},
map[string][]string{},
false,
0,
nReduce,
false,
sync.Mutex{},
}
// Your code here.
for _, fileName := range files {
c.mapTasks[fileName] = Idle
}
for i := 0; i < nReduce; i++ {
c.reduceTasks[strconv.Itoa(i)] = Idle
c.reduceFiles[strconv.Itoa(i)] = []string{}
}
c.server()
return &c
}
worker申请任务
他给的demo里面有example rpc调用的例子,在rpc里定义自己的请求体和响应体的结构,这也是rpc通信的风格。grpc开发app就是先定义protobuf消息格式。
我做的过程中定义的函数没有返回error,好像不行,需要保持和example一样。
coordinator(master)分配任务的同时,给对应的任务加一个过期时间,实验要求的是10s。
超时处理的时候,刚开始没看指导书的hints,后来发现里面建议使用ioutils.TempFile,因为测试过程中会测试当某个worker crash掉的时候,程序会怎么办,A worker超时了,分给另一个worker B,A写了一半的文件就失效了,因此应该用临时文件,最后返回的时候写入成功了才改成本应的文件名。
这里map的中间产物我命名很随意,直接文件名*n,n代表作为哪一个reduce的输入,n是由key 哈希后 取模得到的。
func (c *Coordinator) GetTask(request *ExampleArgs, reply *GetTaskReply) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if !c.mapCompleted { //分配map任务
for fileName, status := range c.mapTasks {
if status == Idle {
c.mapTasks[fileName] = InProgress
reply.MapName = fileName
reply.NReduce = c.nReduce
reply.TaskType = MapType
go c.HandleMapTimeout(fileName)
return nil
}
}
reply.TaskType = Sleep
} else {
for reduceName, status := range c.reduceTasks {
if status == Idle{
c.reduceTasks[reduceName] = InProgress
reply.ReduceFiles = c.reduceFiles[reduceName]
reply.TaskType = ReduceType
reply.ReduceName = reduceName
go c.HandleReduceTimeout(reduceName)
return nil
}
}
reply.TaskType = Sleep
}
return nil
}
worker上报任务完成
map和reduce完成后同样rpc调用,报告master完成了任务,每次上报master都判断map阶段和reduce阶段是否完成,主程序中有一个判断整个程序是否完成的逻辑,done()
func (c *Coordinator) ReduceReport(req *ReduceCompleteReq,reply *ExampleReply) error{
c.mutex.Lock()
defer c.mutex.Unlock()
c.reduceTasks[req.ReduceName] = Completed
//fmt.Fprintf(os.Stderr, "####reduce完成 "+req.ReduceName+"\n")
fmt.Fprintf(os.Stderr, "####reduce完成, "+req.ReduceName+"############### \n")
done := true
for _,status := range c.reduceTasks{
if status != Completed{
done = false
break
}
}
c.done = done
return nil
}
差不多就这些,实现进程同步还可以用channel
遇到的两个坑就是,一个rpc调用函数需要返回error,另一个就是crash测试,如果不做临时文件处理,就可能通不过。