MIT 6.824 Lab 1: MapReduce

笑死,这个实验像是在做需求,不过没钱拿QAQ

文章目录

需要注意的

  1. 中间文件名mr-X-Y,我的实现是每次worker发送map rpc请求的时候发送一个文件名作为X
  2. Done()退出,多打日志看看中间有没有data race,我的实现逻辑是当mapArray(map任务数组) mapMap(map任务crash-recovery标记) reduceArray(reduce任务数组) reduceMap(reduce任务crash-recovery标记) 长度全为0时退出
  3. 排序,我的实现是reduce读取根据nReduce 分组的文件,这个文件用map保存在Coordinator struct
  4. RPC-Server 没有在Client注册回调接口,所以crash的时候直接把任务放回任务数组就好
  5. 因为Golang不太熟,所以踩了几个坑:RPC的结构名需要首字母大写以便正常序列化/反序列化,GoLand参数有时候会出问题,直接命令行启动最好;
  6. data race我通过多加了几个mutex解决,可以优化一下减小粒度
  7. bash脚本中设置LC_COLLATE=C保证sort命令按照大写优先排序

代码

coordinator

package mr

import (
	"log"
	"sync"
	"time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"

type Coordinator struct {
	// Your definitions here.
	midFileLock sync.Mutex
	nReduce       int
	midFile		map[int][]string

	mapTaskLock sync.Mutex
	mapTaskMapLock sync.Mutex
	mapTaskNum int
	mapTask       []string
	mapTaskMap map[int]bool

	reduceTaskLock sync.Mutex
	reduceTaskMapLock sync.Mutex
	reduceTaskNum int
	reduceTask []int
	reduceTaskMap map[int]bool
}

// Example Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}
func (c *Coordinator) WorkerStatus(args *WorkerStatusArgs,reply *WorkerStatusReply) error{
	c.mapTaskLock.Lock()
	c.mapTaskMapLock.Lock()
	c.reduceTaskLock.Lock()
	c.reduceTaskMapLock.Lock()
	defer c.reduceTaskMapLock.Unlock()
	defer c.reduceTaskLock.Unlock()
	defer c.mapTaskMapLock.Unlock()
	defer c.mapTaskLock.Unlock()
	if len(c.mapTask)>0{
		reply.Status = 0
	}else if len(c.mapTaskMap)==0{
		//begin reduce
		if len(c.reduceTask)==0{
			if len(c.reduceTaskMap)==0{
				reply.Status = 3
			}
			reply.Status = 2
		}else {
			reply.Status = 1
		}
	}else{
		//wait for all map task over
		reply.Status = 2
	}
	log.Printf("get worker status,status is %d,len maptask %d,len maptaskmap %d,len reducetask %d len reducetask map %d",
		reply.Status,len(c.mapTask),len(c.mapTaskMap),len(c.reduceTask),len(c.reduceTaskMap))
	return nil
}
func (c *Coordinator) MapRequest(args *MapCallArgs, reply *MapCallReply) error {
	c.mapTaskLock.Lock()
	var file string
	if len(c.mapTask)>0{
		file = c.mapTask[0]
		c.mapTask = c.mapTask[1:len(c.mapTask)]
	}
	c.mapTaskLock.Unlock()
	if file!="" {
		c.mapTaskMapLock.Lock()
		defer c.mapTaskMapLock.Unlock()
		c.mapTaskNum++
		reply.MapTaskNum = c.mapTaskNum
		c.mapTaskMap[c.mapTaskNum] = true
		reply.Value = file
		reply.NReduce = c.nReduce

		reply.Err = ""
		log.Printf("%d task be sent,filename is %s", reply.MapTaskNum,reply.Value)
	}else{
		reply.Value = ""
		reply.Err = "Empty queue"
		return nil
	}
	go func(){
		time.Sleep(10*time.Second)
		c.mapTaskMapLock.Lock()
		c.mapTaskLock.Lock()
		defer c.mapTaskLock.Unlock()
		defer c.mapTaskMapLock.Unlock()
		if _,ok :=c.mapTaskMap[reply.MapTaskNum];ok{
			log.Printf("task %d fail,file %s come back",reply.MapTaskNum, file)
			delete(c.mapTaskMap,reply.MapTaskNum)
			c.mapTask = append(c.mapTask, file)
		}
	}()
	return nil
}
func (c *Coordinator) MapOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {
	log.Printf("map over request be called, TaskId %d ",args.TaskId)
	log.Printf("TaskId value: %d",c.mapTaskMap[args.TaskId])
	c.mapTaskMapLock.Lock()
	c.midFileLock.Lock()
	defer c.mapTaskMapLock.Unlock()
	defer c.midFileLock.Unlock()
	for i:=0;i<c.nReduce;i++{
		log.Printf("will register %d files",len(args.RegisterFile[i]))
		c.midFile[i] = append(c.midFile[i],args.RegisterFile[i]...)
	}
	delete(c.mapTaskMap,args.TaskId)
	return nil
}

func (c *Coordinator) ReduceRequest(args *ReduceCallArgs, reply *ReduceCallReply) error {
	c.reduceTaskLock.Lock()
	taskId := -1
	if len(c.reduceTask)>0{
		taskId = c.reduceTask[0]
		log.Printf("will send reduce task,task id is %d",taskId)
		c.reduceTask = c.reduceTask[1:len(c.reduceTask)]
	}else{
		log.Printf("reduceTask is empty")
		reply.Err = "Empty queue"
		return nil
	}
	c.reduceTaskLock.Unlock()
	var _ int
	if taskId!=-1 {
		c.reduceTaskMapLock.Lock()
		defer c.reduceTaskMapLock.Unlock()
		c.midFileLock.Lock()
		defer c.midFileLock.Unlock()
		c.reduceTaskNum++
		_ = c.reduceTaskNum
		c.reduceTaskMap[taskId] = true
		reply.TaskId = taskId
		reply.TaskName = c.midFile[taskId]
		log.Printf("will dispose %d file,task id is %d",len(c.midFile[taskId]),taskId)
		reply.Err = ""
	}else{
		reply.Err = "Empty queue"
		return nil
	}
	//goes normally,begin routine to monitor this task
	go func(){
		time.Sleep(10*time.Second)
		c.reduceTaskLock.Lock()
		defer c.reduceTaskLock.Unlock()
		c.reduceTaskMapLock.Lock()
		defer c.reduceTaskMapLock.Unlock()
		if _,ok :=c.reduceTaskMap[taskId];ok{
			log.Printf("task %d fail",reply.TaskId)
			delete(c.reduceTaskMap,taskId)
			c.reduceTask = append(c.reduceTask, taskId)
		}
	}()
	return nil
}
func (c *Coordinator) ReduceOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {
	c.reduceTaskMapLock.Lock()
	defer c.reduceTaskMapLock.Unlock()
	delete(c.reduceTaskMap,args.TaskId)
	log.Printf("task %d delete",args.TaskId)


	return nil
}

//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
	rpc.Register(c)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := coordinatorSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

// Done
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
	c.mapTaskLock.Lock()
	c.mapTaskMapLock.Lock()
	c.reduceTaskLock.Lock()
	c.reduceTaskMapLock.Lock()
	defer c.reduceTaskMapLock.Unlock()
	defer c.reduceTaskLock.Unlock()
	defer c.mapTaskMapLock.Unlock()
	defer c.mapTaskLock.Unlock()
	ret := false

	// Your code here.
	ret = len(c.mapTask)==0&&(len(c.mapTaskMap)==0)&&len(c.reduceTask)==0&&len(c.reduceTaskMap)==0

	return ret
}

// MakeCoordinator
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// NReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.
	c.mapTask = files
	c.nReduce = nReduce
	c.reduceTaskNum = 0
	c.mapTaskNum = 0
	for i := 0;i<nReduce;i++{
		c.reduceTask = append(c.reduceTask,i)
	}
	c.midFile=make(map[int][]string,nReduce)
	c.mapTaskMap=make(map[int] bool,1000)
	c.reduceTaskMap=make(map[int] bool,1000)
	c.server()

	return &c
}

worker

package mr

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"os"
	"sort"
	"strconv"
	"time"
)
import "log"
import "net/rpc"
import "hash/fnv"

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// KeyValue
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
	Key   string
	Value string
}

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
	h := fnv.New32a()
	_, err := h.Write([]byte(key))
	if err != nil {
		return 0
	}
	return int(h.Sum32() & 0x7fffffff)
}
func GetMidFileName(x int,y int) string{
	s := "mr-"
	s+=strconv.Itoa(x)
	s+="-"
	s+=strconv.Itoa(y)
	return s
}

// Worker
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.
	for{
		args := WorkerStatusArgs{}
		reply := WorkerStatusReply{}
		ret := call("Coordinator.WorkerStatus",&args,&reply)
		if !ret{
			break
		}
		switch reply.Status {
		case 0:
			ExecuteMapTask(mapf)
		case 1:
			ExecuteReduceTask(reducef)
		case 2:
			time.Sleep(2*time.Second)
		case 3:
			log.Printf("all task over ,worker will quit")
			break
		}
		log.Printf("task status: %d",reply.Status)
	}

	// uncomment to send the Example RPC to the coordinator.
	// CallExample()

}

// ExecuteMapTask
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func ExecuteMapTask(mapf func(string, string) []KeyValue){
	args := MapCallArgs{}
	reply := MapCallReply{}
	var registerFile map[int][]string
	registerFile = make(map[int][]string,reply.NReduce)
	call("Coordinator.MapRequest",&args,&reply)
	file, err := os.Open(reply.Value)
	if err != nil {
		log.Fatalf("cannot open %v", reply.Value)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", reply.Value)
	}
	err = file.Close()
	if err != nil {
		return
	}

	//wait for goroutines to complete
	ch := make(chan struct{})
	kva := mapf(reply.Value, string(content))
	//Temporary store of key-value pairs
	var midResult map[int][]KeyValue
	midResult = make(map[int][]KeyValue,10+5)

	for _,iter := range kva{
		iter := iter
		go func() {
			taskYNum := ihash(iter.Key)%reply.NReduce
			midResult[taskYNum] = append(midResult[taskYNum],iter)
			ch <- struct{}{}
		}()
		<-ch
	}
	for taskYNum:=0;taskYNum<reply.NReduce;taskYNum++{
		//sort.Sort(ByKey(midResult[taskYNum]))
		filename := GetMidFileName(reply.MapTaskNum,taskYNum)

		err := os.Remove(filename)
		file, err2 := ioutil.TempFile(".", "tmp")
		if err2 != nil {
			fmt.Println("文件创建失败")
			return
		}
		defer func(name string) {
			err := os.Remove(name)
			if err != nil {
				return
			}
		}(file.Name())

		enc := json.NewEncoder(file)
		for _,iter := range midResult[taskYNum]{
			err = enc.Encode(&iter)
		}
		err = os.Rename(file.Name(), filename)
		if err != nil {
			return
		}
		log.Printf("file write ok,will append registerfile subscript is %d,filename %s",taskYNum,filename)
		registerFile[taskYNum] = append(registerFile[taskYNum],filename)


	}
	mapOverRequestArgs := OverRequestArgs{}
	mapOverRequestReply := OverRequestReply{}
	mapOverRequestArgs.RegisterFile = registerFile
	mapOverRequestArgs.TaskId = reply.MapTaskNum
	log.Printf("task ok,begin return %s, register file size %d", reply.Value,len(registerFile))
	call("Coordinator.MapOverRequest",&mapOverRequestArgs,&mapOverRequestReply)

}

func ExecuteReduceTask(reducef func(string, []string) string){
	args := ReduceCallArgs{}
	reply := ReduceCallReply{}
	call("Coordinator.ReduceRequest",&args,&reply)
	log.Printf("will dispose task %d",reply.TaskId)
	tmpFile, err2 := ioutil.TempFile(".", "tmp")
	if err2 != nil {
		log.Println("文件创建失败")
		return
	}
	defer func(name string) {
		err := os.Remove(name)
		if err != nil {
			return
		}
	}(tmpFile.Name())
	log.Printf("Reduce task file size: %d",len(reply.TaskName))
	var kva []KeyValue

	//range all files get slice
	for _, fileName := range reply.TaskName {
		log.Printf("range taskname is %s", fileName)
		file, err := os.Open(fileName)
		if err != nil {
			log.Fatalf("cannot open %v", file)
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
	}
	//sort slices
	sort.Sort(ByKey(kva))

	//get result and print to final file
	var nowStr string
	var values []string
	for subscript,kv :=range kva{
		if subscript==len(kva)-1 {
			if nowStr!=""{
				output:= reducef(kv.Key,values)
				_, err := fmt.Fprintf(tmpFile, "%v %v\n", nowStr, output)
				if err != nil {
					log.Printf("output to tmp file fail")
					return
				}
			}
			break
		}
		if kv.Key== nowStr {
			values = append(values, kv.Value)
		}else{
			if nowStr=="" {
				nowStr = kv.Key
				values = nil
				values = append(values, kv.Value)
			}else{
				output:= reducef(kv.Key,values)
				_, err := fmt.Fprintf(tmpFile, "%v %v\n", kv.Key, output)
				if err != nil {
					log.Printf("output to tmp file fail")
					return
				}
				nowStr = kv.Key
				values = nil
				values = append(values, kv.Value)
			}
		}
	}

	fileName := "mr-out-"
	fileName+=strconv.Itoa(reply.TaskId)
	err := os.Remove(fileName)
	if err != nil {
		log.Printf("file not exist")
	}
	err = os.Rename(tmpFile.Name(), fileName)
	if err != nil {
		log.Printf("rename file fail")
		return
	}
	OverRequestArgs := OverRequestArgs{}
	OverRequestReply := OverRequestReply{}
	OverRequestArgs.TaskId = reply.TaskId
	log.Printf("reduce task ok,begin return %d", reply.TaskId)
	call("Coordinator.ReduceOverRequest",&OverRequestArgs,&OverRequestReply)

}

func CallExample() {

	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	call("Coordinator.Example", &args, &reply)

	// reply.Y should be 100.
	fmt.Printf("reply.Y %v\n", reply.Y)
}

//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := coordinatorSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer func(c *rpc.Client) {
		err := c.Close()
		if err != nil {
		}
	}(c)

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

rpc

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
	X int
}

type ExampleReply struct {
	Y int
}

// Add your RPC definitions here.

type MapCallArgs struct{
}
type MapCallReply struct{
	Err string
	Value      string
	NReduce    int
	MapTaskNum int
}
type ReduceCallArgs struct{

}
type ReduceCallReply struct{
	TaskName []string
	TaskId   int
	Err      string
}
type WorkerStatusArgs struct{}
type WorkerStatusReply struct{
	Status int
}
type OverRequestArgs struct{
	TaskId          int
	RegisterFile map[int][]string
}
type OverRequestReply struct{

}

type ReduceOverRequestArgs struct{
	PId int
}
type ReduceOverRequestReply struct{

}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

上一篇:事件总线的原理


下一篇:[MIT 6.S081] Lab 7: Multithreading