6.824 Lab-1 MapReduce
1.实验内容
1.1内容概述
将经典的Word Counter任务使用MapReduce编程范式去实现,任务整体流程如下(假设两个Map节点和两个Reduce节点):
每个Map Worker负责一个输入文件的Map处理,每个Map任务输出N份文件(N是Reduce Worker数目),这N份文件会送到N个Reduce Worker处理。等待所有Map任务完成后,Reduce工作才能开始(此时所有输入文件才准备好),每个Reduce Worker输出一份reduce结果。
1.2lab相关代码概述
与本实验相关的主要有三个包,分别是main、mrapps、mr包。main包调用mrapps包和mr包运行整个流程,mrapps包是运行和测试时使用的工具函数包,这两个包在实验过程中都不需要改动,自己写的代码都在mr包中,其中包含三个文件coordinator.go、rpc.go和worker.go,作用如下
mr
├── coordinator.go \\Master
├── rpc.go \\处理通信
└── worker.go \\worker,包含map和reduce
2.实验步骤
2.1定义通信内容(rpc.go)
2.1.1分配任务时的请求与响应
worker请求任务时,不区分map和reduce,让coordinator根据任务完成情况来决定分配任务类型,这里有个边界情况就是所有的任务都在运行中,这个状态既得不到任务又不能直接退出,只能进行下一次任务。
// 节点请求任务
type ReqArgs struct {
ReqNumber int8 //占用一个字节表示请求 为1表示申请任务
}
// Master回应任务内容
type ReqReply struct {
TypeName string // map or reduce or allinprogress
Idx int //worker idx
Content []string //file names for work content
NReduce int //reduce worker number
}
2.1.2完成任务时worker汇报任务结果给Master
汇报完成情况时,需要说明任务类型(TypeName), 任务结果(Ret), 完成的任务编号(idx)
// 报告任务完成情况
type FinishReq struct {
TypeName string //map or reduce
Ret []string //output files(map or reduce)
Idx int //worker idx (map or reduce)
}
//Master 回应worker
type FinishReply struct {
Done bool //for reply
}
2.2实现Worker(Worker.go)
2.2.1Map worker
Map worker任务包括三步,第一步读取输入文件调用mapf生成key value对,第二步处理kv对,排序之后合并相同条目到同一行,第三步将结果写入输出文件。
1.读取文件内容生成Key Value对
intermediate := []KeyValue{}
filename := reply1.Content[0]
NReduce := reply1.NReduce
//打开文件
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
//生成KV对
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
2.处理kv对生成中间结果
sort.Sort(ByKey(intermediate))
i := 0
reduceInput := make([][]ReduceKv, NReduce)
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
key := intermediate[i].Key
//根据key分配到不同的reduce任务中
idx := ihash(key) % NReduce
reduceInput[idx] = append(reduceInput[idx], ReduceKv{Key: key, Value: values})
i = j
}
3.写入结果
这个过程分为两步,首先写入临时文件,然后检查文件是否已经存在,如果不存在,将临时文件改名(原子操作),这样做的好处是防止一个任务被分配给不同worker时,多个worker同时写一个文件。
-
写入临时文件
tempFiles := make([]*os.File, NReduce) //临时文件命名 for i := range tempFiles { tempFiles[i], err = ioutil.TempFile(".", "out*") if err != nil { log.Fatal("creat tempfile fail") } } //对象以json格式写入临时文件 for i := range reduceInput { enc := json.NewEncoder(tempFiles[i]) for _, kv := range reduceInput[i] { err := enc.Encode(&kv) if err != nil { log.Fatalf("cannot write json %v", i) } } }
-
重命名(原子操作)
//输入文件命名
for i := range outNames {
outNames[i] = "mr-map-out-" + strconv.Itoa(reply1.Idx) + "-" + strconv.Itoa(i)
}
//rename
for i := range tempFiles {
_, err := os.Stat(outNames[i])
if os.IsNotExist(err) {
os.Rename(tempFiles[i].Name(), outNames[i])
if i == len(tempFiles)-1 {//complete work, call finish info
args := FinishReq{TypeName: "map", Ret: outNames, Idx: reply1.Idx}
reply := FinishReply{}
call("Coordinator.HandFinishInfo", &args, &reply)
}
} else {
os.Remove(tempFiles[i].Name()) //remove tempfile
break
}
}
2.2.2Reduce Worker
reduce 操作主要是读取map处理得到的文件,然后处理写入输出文件,写文件和map worker类似,先写入临时文件,完成之后再重命名
使用一个map来记录读取到的kv对,kvaMap := make(map[string]*ReduceKv)
1.读取中间文件(json格式)
for _, filename := range inputFileNames {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
dec := json.NewDecoder(file)
for {
var kv ReduceKv
if err := dec.Decode(&kv); err != nil {
break
}
if kvaMap[kv.Key] == nil {//map中没有记录,新建
kvaMap[kv.Key] = &kv
} else {//map中已经记录,追加
kvaMap[kv.Key].Value = append(kvaMap[kv.Key].Value, kv.Value...)
}
}
}
2.写入临时文件
// 写入临时文件
for _, kv := range kvaMap {
output := reducef(kv.Key, kv.Value)
fmt.Fprintf(oTmpFile, "%v %v\n", kv.Key, output)
}
3.重命名,完成后汇报给Master
// 重命名临时文件
outName := "mr-out-" + strconv.Itoa(idx)
retName := []string{}
_, err := os.Stat(outName)
if os.IsNotExist(err) {
os.Rename(oTmpFile.Name(), outName)
//通知Master
args := FinishReq{TypeName: "reduce", Ret: append(retName, outName), Idx: idx}
reply := FinishReply{}
call("Coordinator.HandFinishInfo", &args, &reply)
} else {
os.Remove(oTmpFile.Name())
}
2.3实现Master (Coordinate.go)
Master负责记录任务的状态和分配任务,定义了两个数据结构如下:
type Task struct {
inputFileName []string //输入文件
status TaskStatus //任务状态
}
type Coordinator struct {
MapTask []Task
ReduceTask []Task
NReduce int//记录reduce worker数目 关系到map输出的文件数
}
定义两个锁和两个全局变量
var coordinateLock sync.RWMutex //用于控制Coordinate结构内变量的访问
var lockBool sync.RWMutex //控制mapDone 和 reduceDone
//标识任务完成状态
var mapDone bool = false
var reduceDone bool = false
定义两个函数 IsWorkDone AssignTask
//用于判断某一类任务是否完成
func IsWorkDone(tasks []Task) bool {
coordinateLock.RLock()
defer coordinateLock.RUnlock()
for i := range tasks {
if tasks[i].status != Completed {
return false
}
}
return true
}
//读取任务状态并分配任务,返回值中int如果为正数则是worker id, 为-1表示该类任务完成,为-2表示所有任务都在执行
func AssignTask(tasks []Task) (*Task, int) {
coordinateLock.RLock()
defer coordinateLock.RUnlock()
for i := range tasks {
if tasks[i].status == Idle {
return &tasks[i], i
}
}
if IsWorkDone(tasks) {
return nil, -1 //work done
} else {
return nil, -2 //all in progress wait
}
}
定义Coordinate中处理两类请求的函数,需要在任务分配后的10秒,检查任务是否完成,如果没有完成,需要重置任务状态到空闲,便于分配给另外一个节点,这里开了一个协程来完成这个事情,只要主函数没有结束,协程不会提前结束,也就是说调用协程的函数(不是main)结束了,协程还可以运行
处理分配任务的请求:
func (c *Coordinator) HandWorkerReq(args *ReqArgs, reply *ReqReply) error {
if args.ReqNumber == 1 {
lockBool.RLock()
if !mapDone {
if mapTask, mapStatus := AssignTask(c.MapTask); mapStatus >= 0 {
// lock.Lock()
reply.TypeName = "map"
reply.Content = mapTask.inputFileName
reply.Idx = mapStatus
reply.NReduce = c.NReduce
coordinateLock.Lock()
mapTask.status = InProgress
coordinateLock.Unlock()
// lock.Unlock()
go CheckStatus(mapTask)
} else if mapStatus == -1 {
lockBool.RUnlock()
lockBool.Lock()
mapDone = true
lockBool.Unlock()
lockBool.RLock()
// lock.Unlock()
} else {
// lock.Lock()
reply.TypeName = "allinprogress"
// lock.Unlock()
}
} else if !reduceDone {
if redTask, redStatus:=AssignTask(c.ReduceTask); redStatus >= 0 {
reply.TypeName = "reduce"
reply.Idx = redStatus
reply.Content = redTask.inputFileName
reply.NReduce = c.NReduce
coordinateLock.Lock()
redTask.status = InProgress
coordinateLock.Unlock()
go CheckStatus(redTask)
} else if redStatus == -1 {
lockBool.RUnlock()
lockBool.Lock()
reduceDone = true
lockBool.Unlock()
lockBool.RLock()
} else {
// lock.Lock()
reply.TypeName = "allinprogress"
// lock.Unlock()
}
} else {
// lock.Lock()
reply.TypeName = "finish"
// lock.Unlock()
}
lockBool.RUnlock()
}
return nil
}
//检查任务状态
func CheckStatus(t *Task) {
time.Sleep(10 * time.Second)
coordinateLock.Lock()
defer coordinateLock.Unlock()
if t.status != Completed {
t.status = Idle
}
}
处理任务完成消息:
func (c *Coordinator) HandFinishInfo(args *FinishReq, reply *FinishReply) error {
idx := args.Idx
if args.TypeName == "map" {
coordinateLock.Lock()
if c.MapTask[idx].status != Completed {
for i := range c.ReduceTask {
c.ReduceTask[i].inputFileName = append(c.ReduceTask[i].inputFileName, args.Ret[i])
}
c.MapTask[idx].status = Completed
}
coordinateLock.Unlock()
}
if args.TypeName == "reduce" {
coordinateLock.Lock()
if c.ReduceTask[idx].status != Completed {
c.ReduceTask[idx].status = Completed
}
coordinateLock.Unlock()
}
return nil
}
3实验总结
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JTrtDnoe-1645363330895)(C:\Users\sonwi\AppData\Roaming\Typora\typora-user-images\image-20220220211427160.png)]
踩坑! 代码可以通过所有的测试,在debug的过程中,有一个版本一直过不了reduce的并行测试,后面看了测试代码才知道,判断并行的方式是在同1秒内有没有多个节点在新建文件,由于之前设置的worker会休息1秒再发送请求(指导书建议这样做),正是请求的间隙,造成了reduce在测试中无法并行,后面不等待直接发送下一个请求就可以通过测试了。
心得! lab1的指导书真的超级详细,基本上有问题了指导书上都可以找到建议,只要认真做,根本不用参考别人的代码,就是debug的时候有点困难,基本靠打印来判断问题。