笑死,这个实验像是在做需求,不过没钱拿QAQ
文章目录
需要注意的
- 中间文件名
mr-X-Y
,我的实现是每次worker
发送map rpc
请求的时候发送一个文件名作为X -
Done()
退出,多打日志看看中间有没有data race
,我的实现逻辑是当mapArray(map任务数组)
mapMap(map任务crash-recovery标记)
reduceArray(reduce任务数组)
reduceMap(reduce任务crash-recovery标记)
长度全为0时退出 - 排序,我的实现是
reduce
读取根据nReduce
分组的文件,这个文件用map
保存在Coordinator struct
中 -
RPC-Server
没有在Client
注册回调接口,所以crash
的时候直接把任务放回任务数组就好 - 因为
Golang
不太熟,所以踩了几个坑:RPC
的结构名需要首字母大写以便正常序列化/反序列化,GoLand
参数有时候会出问题,直接命令行启动最好; -
data race
我通过多加了几个mutex
解决,可以优化一下减小粒度 -
bash
脚本中设置LC_COLLATE=C
保证sort
命令按照大写优先排序
代码
coordinator
package mr
import (
"log"
"sync"
"time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"
type Coordinator struct {
// Your definitions here.
midFileLock sync.Mutex
nReduce int
midFile map[int][]string
mapTaskLock sync.Mutex
mapTaskMapLock sync.Mutex
mapTaskNum int
mapTask []string
mapTaskMap map[int]bool
reduceTaskLock sync.Mutex
reduceTaskMapLock sync.Mutex
reduceTaskNum int
reduceTask []int
reduceTaskMap map[int]bool
}
// Example Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}
func (c *Coordinator) WorkerStatus(args *WorkerStatusArgs,reply *WorkerStatusReply) error{
c.mapTaskLock.Lock()
c.mapTaskMapLock.Lock()
c.reduceTaskLock.Lock()
c.reduceTaskMapLock.Lock()
defer c.reduceTaskMapLock.Unlock()
defer c.reduceTaskLock.Unlock()
defer c.mapTaskMapLock.Unlock()
defer c.mapTaskLock.Unlock()
if len(c.mapTask)>0{
reply.Status = 0
}else if len(c.mapTaskMap)==0{
//begin reduce
if len(c.reduceTask)==0{
if len(c.reduceTaskMap)==0{
reply.Status = 3
}
reply.Status = 2
}else {
reply.Status = 1
}
}else{
//wait for all map task over
reply.Status = 2
}
log.Printf("get worker status,status is %d,len maptask %d,len maptaskmap %d,len reducetask %d len reducetask map %d",
reply.Status,len(c.mapTask),len(c.mapTaskMap),len(c.reduceTask),len(c.reduceTaskMap))
return nil
}
func (c *Coordinator) MapRequest(args *MapCallArgs, reply *MapCallReply) error {
c.mapTaskLock.Lock()
var file string
if len(c.mapTask)>0{
file = c.mapTask[0]
c.mapTask = c.mapTask[1:len(c.mapTask)]
}
c.mapTaskLock.Unlock()
if file!="" {
c.mapTaskMapLock.Lock()
defer c.mapTaskMapLock.Unlock()
c.mapTaskNum++
reply.MapTaskNum = c.mapTaskNum
c.mapTaskMap[c.mapTaskNum] = true
reply.Value = file
reply.NReduce = c.nReduce
reply.Err = ""
log.Printf("%d task be sent,filename is %s", reply.MapTaskNum,reply.Value)
}else{
reply.Value = ""
reply.Err = "Empty queue"
return nil
}
go func(){
time.Sleep(10*time.Second)
c.mapTaskMapLock.Lock()
c.mapTaskLock.Lock()
defer c.mapTaskLock.Unlock()
defer c.mapTaskMapLock.Unlock()
if _,ok :=c.mapTaskMap[reply.MapTaskNum];ok{
log.Printf("task %d fail,file %s come back",reply.MapTaskNum, file)
delete(c.mapTaskMap,reply.MapTaskNum)
c.mapTask = append(c.mapTask, file)
}
}()
return nil
}
func (c *Coordinator) MapOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {
log.Printf("map over request be called, TaskId %d ",args.TaskId)
log.Printf("TaskId value: %d",c.mapTaskMap[args.TaskId])
c.mapTaskMapLock.Lock()
c.midFileLock.Lock()
defer c.mapTaskMapLock.Unlock()
defer c.midFileLock.Unlock()
for i:=0;i<c.nReduce;i++{
log.Printf("will register %d files",len(args.RegisterFile[i]))
c.midFile[i] = append(c.midFile[i],args.RegisterFile[i]...)
}
delete(c.mapTaskMap,args.TaskId)
return nil
}
func (c *Coordinator) ReduceRequest(args *ReduceCallArgs, reply *ReduceCallReply) error {
c.reduceTaskLock.Lock()
taskId := -1
if len(c.reduceTask)>0{
taskId = c.reduceTask[0]
log.Printf("will send reduce task,task id is %d",taskId)
c.reduceTask = c.reduceTask[1:len(c.reduceTask)]
}else{
log.Printf("reduceTask is empty")
reply.Err = "Empty queue"
return nil
}
c.reduceTaskLock.Unlock()
var _ int
if taskId!=-1 {
c.reduceTaskMapLock.Lock()
defer c.reduceTaskMapLock.Unlock()
c.midFileLock.Lock()
defer c.midFileLock.Unlock()
c.reduceTaskNum++
_ = c.reduceTaskNum
c.reduceTaskMap[taskId] = true
reply.TaskId = taskId
reply.TaskName = c.midFile[taskId]
log.Printf("will dispose %d file,task id is %d",len(c.midFile[taskId]),taskId)
reply.Err = ""
}else{
reply.Err = "Empty queue"
return nil
}
//goes normally,begin routine to monitor this task
go func(){
time.Sleep(10*time.Second)
c.reduceTaskLock.Lock()
defer c.reduceTaskLock.Unlock()
c.reduceTaskMapLock.Lock()
defer c.reduceTaskMapLock.Unlock()
if _,ok :=c.reduceTaskMap[taskId];ok{
log.Printf("task %d fail",reply.TaskId)
delete(c.reduceTaskMap,taskId)
c.reduceTask = append(c.reduceTask, taskId)
}
}()
return nil
}
func (c *Coordinator) ReduceOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {
c.reduceTaskMapLock.Lock()
defer c.reduceTaskMapLock.Unlock()
delete(c.reduceTaskMap,args.TaskId)
log.Printf("task %d delete",args.TaskId)
return nil
}
//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
// Done
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
c.mapTaskLock.Lock()
c.mapTaskMapLock.Lock()
c.reduceTaskLock.Lock()
c.reduceTaskMapLock.Lock()
defer c.reduceTaskMapLock.Unlock()
defer c.reduceTaskLock.Unlock()
defer c.mapTaskMapLock.Unlock()
defer c.mapTaskLock.Unlock()
ret := false
// Your code here.
ret = len(c.mapTask)==0&&(len(c.mapTaskMap)==0)&&len(c.reduceTask)==0&&len(c.reduceTaskMap)==0
return ret
}
// MakeCoordinator
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// NReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
c.mapTask = files
c.nReduce = nReduce
c.reduceTaskNum = 0
c.mapTaskNum = 0
for i := 0;i<nReduce;i++{
c.reduceTask = append(c.reduceTask,i)
}
c.midFile=make(map[int][]string,nReduce)
c.mapTaskMap=make(map[int] bool,1000)
c.reduceTaskMap=make(map[int] bool,1000)
c.server()
return &c
}
worker
package mr
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sort"
"strconv"
"time"
)
import "log"
import "net/rpc"
import "hash/fnv"
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// KeyValue
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
_, err := h.Write([]byte(key))
if err != nil {
return 0
}
return int(h.Sum32() & 0x7fffffff)
}
func GetMidFileName(x int,y int) string{
s := "mr-"
s+=strconv.Itoa(x)
s+="-"
s+=strconv.Itoa(y)
return s
}
// Worker
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
for{
args := WorkerStatusArgs{}
reply := WorkerStatusReply{}
ret := call("Coordinator.WorkerStatus",&args,&reply)
if !ret{
break
}
switch reply.Status {
case 0:
ExecuteMapTask(mapf)
case 1:
ExecuteReduceTask(reducef)
case 2:
time.Sleep(2*time.Second)
case 3:
log.Printf("all task over ,worker will quit")
break
}
log.Printf("task status: %d",reply.Status)
}
// uncomment to send the Example RPC to the coordinator.
// CallExample()
}
// ExecuteMapTask
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func ExecuteMapTask(mapf func(string, string) []KeyValue){
args := MapCallArgs{}
reply := MapCallReply{}
var registerFile map[int][]string
registerFile = make(map[int][]string,reply.NReduce)
call("Coordinator.MapRequest",&args,&reply)
file, err := os.Open(reply.Value)
if err != nil {
log.Fatalf("cannot open %v", reply.Value)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", reply.Value)
}
err = file.Close()
if err != nil {
return
}
//wait for goroutines to complete
ch := make(chan struct{})
kva := mapf(reply.Value, string(content))
//Temporary store of key-value pairs
var midResult map[int][]KeyValue
midResult = make(map[int][]KeyValue,10+5)
for _,iter := range kva{
iter := iter
go func() {
taskYNum := ihash(iter.Key)%reply.NReduce
midResult[taskYNum] = append(midResult[taskYNum],iter)
ch <- struct{}{}
}()
<-ch
}
for taskYNum:=0;taskYNum<reply.NReduce;taskYNum++{
//sort.Sort(ByKey(midResult[taskYNum]))
filename := GetMidFileName(reply.MapTaskNum,taskYNum)
err := os.Remove(filename)
file, err2 := ioutil.TempFile(".", "tmp")
if err2 != nil {
fmt.Println("文件创建失败")
return
}
defer func(name string) {
err := os.Remove(name)
if err != nil {
return
}
}(file.Name())
enc := json.NewEncoder(file)
for _,iter := range midResult[taskYNum]{
err = enc.Encode(&iter)
}
err = os.Rename(file.Name(), filename)
if err != nil {
return
}
log.Printf("file write ok,will append registerfile subscript is %d,filename %s",taskYNum,filename)
registerFile[taskYNum] = append(registerFile[taskYNum],filename)
}
mapOverRequestArgs := OverRequestArgs{}
mapOverRequestReply := OverRequestReply{}
mapOverRequestArgs.RegisterFile = registerFile
mapOverRequestArgs.TaskId = reply.MapTaskNum
log.Printf("task ok,begin return %s, register file size %d", reply.Value,len(registerFile))
call("Coordinator.MapOverRequest",&mapOverRequestArgs,&mapOverRequestReply)
}
func ExecuteReduceTask(reducef func(string, []string) string){
args := ReduceCallArgs{}
reply := ReduceCallReply{}
call("Coordinator.ReduceRequest",&args,&reply)
log.Printf("will dispose task %d",reply.TaskId)
tmpFile, err2 := ioutil.TempFile(".", "tmp")
if err2 != nil {
log.Println("文件创建失败")
return
}
defer func(name string) {
err := os.Remove(name)
if err != nil {
return
}
}(tmpFile.Name())
log.Printf("Reduce task file size: %d",len(reply.TaskName))
var kva []KeyValue
//range all files get slice
for _, fileName := range reply.TaskName {
log.Printf("range taskname is %s", fileName)
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("cannot open %v", file)
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
}
//sort slices
sort.Sort(ByKey(kva))
//get result and print to final file
var nowStr string
var values []string
for subscript,kv :=range kva{
if subscript==len(kva)-1 {
if nowStr!=""{
output:= reducef(kv.Key,values)
_, err := fmt.Fprintf(tmpFile, "%v %v\n", nowStr, output)
if err != nil {
log.Printf("output to tmp file fail")
return
}
}
break
}
if kv.Key== nowStr {
values = append(values, kv.Value)
}else{
if nowStr=="" {
nowStr = kv.Key
values = nil
values = append(values, kv.Value)
}else{
output:= reducef(kv.Key,values)
_, err := fmt.Fprintf(tmpFile, "%v %v\n", kv.Key, output)
if err != nil {
log.Printf("output to tmp file fail")
return
}
nowStr = kv.Key
values = nil
values = append(values, kv.Value)
}
}
}
fileName := "mr-out-"
fileName+=strconv.Itoa(reply.TaskId)
err := os.Remove(fileName)
if err != nil {
log.Printf("file not exist")
}
err = os.Rename(tmpFile.Name(), fileName)
if err != nil {
log.Printf("rename file fail")
return
}
OverRequestArgs := OverRequestArgs{}
OverRequestReply := OverRequestReply{}
OverRequestArgs.TaskId = reply.TaskId
log.Printf("reduce task ok,begin return %d", reply.TaskId)
call("Coordinator.ReduceOverRequest",&OverRequestArgs,&OverRequestReply)
}
func CallExample() {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
reply := ExampleReply{}
// send the RPC request, wait for the reply.
call("Coordinator.Example", &args, &reply)
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
}
//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer func(c *rpc.Client) {
err := c.Close()
if err != nil {
}
}(c)
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
rpc
package mr
//
// RPC definitions.
//
// remember to capitalize all names.
//
import "os"
import "strconv"
//
// example to show how to declare the arguments
// and reply for an RPC.
//
type ExampleArgs struct {
X int
}
type ExampleReply struct {
Y int
}
// Add your RPC definitions here.
type MapCallArgs struct{
}
type MapCallReply struct{
Err string
Value string
NReduce int
MapTaskNum int
}
type ReduceCallArgs struct{
}
type ReduceCallReply struct{
TaskName []string
TaskId int
Err string
}
type WorkerStatusArgs struct{}
type WorkerStatusReply struct{
Status int
}
type OverRequestArgs struct{
TaskId int
RegisterFile map[int][]string
}
type OverRequestReply struct{
}
type ReduceOverRequestArgs struct{
PId int
}
type ReduceOverRequestReply struct{
}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}