6.824 lab1 MapReduce

6.824 Lab-1 MapReduce

1.实验内容

1.1内容概述

将经典的Word Counter任务使用MapReduce编程范式去实现,任务整体流程如下(假设两个Map节点和两个Reduce节点):
6.824 lab1 MapReduce

每个Map Worker负责一个输入文件的Map处理,每个Map任务输出N份文件(N是Reduce Worker数目),这N份文件会送到N个Reduce Worker处理。等待所有Map任务完成后,Reduce工作才能开始(此时所有输入文件才准备好),每个Reduce Worker输出一份reduce结果。

1.2lab相关代码概述

与本实验相关的主要有三个包,分别是main、mrapps、mr包。main包调用mrapps包和mr包运行整个流程,mrapps包是运行和测试时使用的工具函数包,这两个包在实验过程中都不需要改动,自己写的代码都在mr包中,其中包含三个文件coordinator.go、rpc.go和worker.go,作用如下

mr
├── coordinator.go \\Master
├── rpc.go		   \\处理通信
└── worker.go	   \\worker,包含map和reduce

2.实验步骤

2.1定义通信内容(rpc.go)

2.1.1分配任务时的请求与响应

worker请求任务时,不区分map和reduce,让coordinator根据任务完成情况来决定分配任务类型,这里有个边界情况就是所有的任务都在运行中,这个状态既得不到任务又不能直接退出,只能进行下一次任务。

// 节点请求任务
type ReqArgs struct {
	ReqNumber int8 //占用一个字节表示请求 为1表示申请任务
}
// Master回应任务内容
type ReqReply struct {
	TypeName string   // map or reduce or allinprogress
	Idx      int      //worker idx
	Content  []string //file names for work content
	NReduce  int      //reduce worker number
}

2.1.2完成任务时worker汇报任务结果给Master

汇报完成情况时,需要说明任务类型(TypeName), 任务结果(Ret), 完成的任务编号(idx)

// 报告任务完成情况
type FinishReq struct {
	TypeName string //map or reduce
	Ret      []string //output files(map or reduce)
	Idx      int //worker idx (map or reduce)
}

//Master 回应worker
type FinishReply struct {
	Done bool //for reply
}

2.2实现Worker(Worker.go)

2.2.1Map worker

Map worker任务包括三步,第一步读取输入文件调用mapf生成key value对,第二步处理kv对,排序之后合并相同条目到同一行,第三步将结果写入输出文件。

1.读取文件内容生成Key Value对
intermediate := []KeyValue{}
filename := reply1.Content[0]
NReduce := reply1.NReduce

//打开文件
file, err := os.Open(filename)
if err != nil {
	log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
	log.Fatalf("cannot read %v", filename)
}

file.Close()

//生成KV对
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
2.处理kv对生成中间结果
sort.Sort(ByKey(intermediate))
i := 0
reduceInput := make([][]ReduceKv, NReduce)
for i < len(intermediate) {
	j := i + 1
    for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
        j++
    }
    values := []string{}
    for k := i; k < j; k++ {
        values = append(values, intermediate[k].Value)
    }

    key := intermediate[i].Key

    //根据key分配到不同的reduce任务中
    idx := ihash(key) % NReduce
    reduceInput[idx] = append(reduceInput[idx], ReduceKv{Key: key, Value: values})

    i = j
}
3.写入结果

这个过程分为两步,首先写入临时文件,然后检查文件是否已经存在,如果不存在,将临时文件改名(原子操作),这样做的好处是防止一个任务被分配给不同worker时,多个worker同时写一个文件。

  1. 写入临时文件

    tempFiles := make([]*os.File, NReduce)
    //临时文件命名
    for i := range tempFiles {
    	tempFiles[i], err = ioutil.TempFile(".", "out*")
    	if err != nil {
    		log.Fatal("creat tempfile fail")
    	}
    }
    
    //对象以json格式写入临时文件
    for i := range reduceInput {
    	enc := json.NewEncoder(tempFiles[i])
    	for _, kv := range reduceInput[i] {
    		err := enc.Encode(&kv)
    		if err != nil {
    			log.Fatalf("cannot write json %v", i)
    		}
    	}
    }
    
  2. 重命名(原子操作)

//输入文件命名
for i := range outNames {
	outNames[i] = "mr-map-out-" + strconv.Itoa(reply1.Idx) + "-" + strconv.Itoa(i)
}
//rename
for i := range tempFiles {
	_, err := os.Stat(outNames[i])
	if os.IsNotExist(err) {
		os.Rename(tempFiles[i].Name(), outNames[i])
		if i == len(tempFiles)-1 {//complete work, call finish info
			args := FinishReq{TypeName: "map", Ret: outNames, Idx: reply1.Idx}
			reply := FinishReply{}
			call("Coordinator.HandFinishInfo", &args, &reply)
		}
	} else {
		os.Remove(tempFiles[i].Name()) //remove tempfile
		break
	}
}

2.2.2Reduce Worker

reduce 操作主要是读取map处理得到的文件,然后处理写入输出文件,写文件和map worker类似,先写入临时文件,完成之后再重命名

使用一个map来记录读取到的kv对,kvaMap := make(map[string]*ReduceKv)

1.读取中间文件(json格式)
for _, filename := range inputFileNames {
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}

	dec := json.NewDecoder(file)

	for {
		var kv ReduceKv
		if err := dec.Decode(&kv); err != nil {
			break
		}
		if kvaMap[kv.Key] == nil {//map中没有记录,新建
			kvaMap[kv.Key] = &kv
		} else {//map中已经记录,追加
			kvaMap[kv.Key].Value = append(kvaMap[kv.Key].Value, kv.Value...)
		}
	}
}
2.写入临时文件
// 写入临时文件
for _, kv := range kvaMap {
   output := reducef(kv.Key, kv.Value)
   fmt.Fprintf(oTmpFile, "%v %v\n", kv.Key, output)
}
3.重命名,完成后汇报给Master
// 重命名临时文件
outName := "mr-out-" + strconv.Itoa(idx)
retName := []string{}
_, err := os.Stat(outName)
if os.IsNotExist(err) {
	os.Rename(oTmpFile.Name(), outName)
    //通知Master
	args := FinishReq{TypeName: "reduce", Ret: append(retName, outName), Idx: idx}
	reply := FinishReply{}
	call("Coordinator.HandFinishInfo", &args, &reply)
} else {
	os.Remove(oTmpFile.Name())
}

2.3实现Master (Coordinate.go)

Master负责记录任务的状态和分配任务,定义了两个数据结构如下:

type Task struct {
	inputFileName []string   //输入文件
	status        TaskStatus //任务状态
}

type Coordinator struct {
	MapTask    []Task
	ReduceTask []Task
	NReduce    int//记录reduce worker数目 关系到map输出的文件数
}

定义两个锁和两个全局变量

var coordinateLock sync.RWMutex //用于控制Coordinate结构内变量的访问
var lockBool sync.RWMutex       //控制mapDone 和 reduceDone

//标识任务完成状态
var mapDone bool = false
var reduceDone bool = false

定义两个函数 IsWorkDone AssignTask

//用于判断某一类任务是否完成
func IsWorkDone(tasks []Task) bool {
	coordinateLock.RLock()
	defer coordinateLock.RUnlock()
	for i := range tasks {
		if tasks[i].status != Completed {
			return false
		}
	}
	return true
}

//读取任务状态并分配任务,返回值中int如果为正数则是worker id, 为-1表示该类任务完成,为-2表示所有任务都在执行
func AssignTask(tasks []Task) (*Task, int) {
	coordinateLock.RLock()
	defer coordinateLock.RUnlock()
	for i := range tasks {
		if tasks[i].status == Idle {
			return &tasks[i], i
		}
	}
	if IsWorkDone(tasks) {
		return nil, -1 //work done
	} else {
		return nil, -2 //all in progress wait
	}
}

定义Coordinate中处理两类请求的函数,需要在任务分配后的10秒,检查任务是否完成,如果没有完成,需要重置任务状态到空闲,便于分配给另外一个节点,这里开了一个协程来完成这个事情,只要主函数没有结束,协程不会提前结束,也就是说调用协程的函数(不是main)结束了,协程还可以运行

处理分配任务的请求:

func (c *Coordinator) HandWorkerReq(args *ReqArgs, reply *ReqReply) error {
	if args.ReqNumber == 1 {
		lockBool.RLock()
		if !mapDone {
			if mapTask, mapStatus := AssignTask(c.MapTask); mapStatus >= 0 {
				// lock.Lock()

				reply.TypeName = "map"
				reply.Content = mapTask.inputFileName
				reply.Idx = mapStatus
				reply.NReduce = c.NReduce

				coordinateLock.Lock()
				mapTask.status = InProgress
				coordinateLock.Unlock()
				// lock.Unlock()

				go CheckStatus(mapTask)

			} else if mapStatus == -1 {
				lockBool.RUnlock()
				lockBool.Lock()
				mapDone = true
				lockBool.Unlock()
				lockBool.RLock()
				// lock.Unlock()
			} else {
				// lock.Lock()
				reply.TypeName = "allinprogress"
				// lock.Unlock()
			}
		} else if !reduceDone {
			if redTask, redStatus:=AssignTask(c.ReduceTask); redStatus >= 0 {
				reply.TypeName = "reduce"
				reply.Idx = redStatus
				reply.Content = redTask.inputFileName
				reply.NReduce = c.NReduce

				coordinateLock.Lock()
				redTask.status = InProgress
				coordinateLock.Unlock()

				go CheckStatus(redTask)
			} else if redStatus == -1 {
				lockBool.RUnlock()
				lockBool.Lock()
				reduceDone = true
				lockBool.Unlock()
				lockBool.RLock()
			} else {
				// lock.Lock()
				reply.TypeName = "allinprogress"
				// lock.Unlock()
			}
		} else {
			// lock.Lock()
			reply.TypeName = "finish"
			// lock.Unlock()
		}
		lockBool.RUnlock()
	}
	return nil
}

//检查任务状态
func CheckStatus(t *Task) {
	time.Sleep(10 * time.Second)

	coordinateLock.Lock()
	defer coordinateLock.Unlock()

	if t.status != Completed {
		t.status = Idle
	}
}

处理任务完成消息:

func (c *Coordinator) HandFinishInfo(args *FinishReq, reply *FinishReply) error {
	idx := args.Idx
	if args.TypeName == "map" {
		coordinateLock.Lock()

		if c.MapTask[idx].status != Completed {
			for i := range c.ReduceTask {
				c.ReduceTask[i].inputFileName = append(c.ReduceTask[i].inputFileName, args.Ret[i])
			}
			c.MapTask[idx].status = Completed
		}

		coordinateLock.Unlock()
	}

	if args.TypeName == "reduce" {

		coordinateLock.Lock()

		if c.ReduceTask[idx].status != Completed {
			c.ReduceTask[idx].status = Completed
		}

		coordinateLock.Unlock()
	}
	return nil
}

3实验总结

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JTrtDnoe-1645363330895)(C:\Users\sonwi\AppData\Roaming\Typora\typora-user-images\image-20220220211427160.png)]

踩坑! 代码可以通过所有的测试,在debug的过程中,有一个版本一直过不了reduce的并行测试,后面看了测试代码才知道,判断并行的方式是在同1秒内有没有多个节点在新建文件,由于之前设置的worker会休息1秒再发送请求(指导书建议这样做),正是请求的间隙,造成了reduce在测试中无法并行,后面不等待直接发送下一个请求就可以通过测试了。

心得! lab1的指导书真的超级详细,基本上有问题了指导书上都可以找到建议,只要认真做,根本不用参考别人的代码,就是debug的时候有点困难,基本靠打印来判断问题。

上一篇:ansible playbook使用总结


下一篇:Ingress nginx Controller源码分析