简介
根据MapReduce Paper构造一个MapReduce系统。该系统主要包括master和worker。master主要负责分发任务、处理worker故障;worker主要负责根据map
、reduce
函数读写文件。
思路
- 任务分发:master将需要完成的任务放到通道中,让worker从通道中拿取任务,根据任务类型完成相应的操作。
- 容错:master跟踪每个任务的完成情况,如果一个任务超过一定时间仍未完成,则重新发布该任务。
- 完成情况判断:master直接判断当前目录的目标文件是否存在来判断一个任务是否完成。例如中间文件
mr-X
,和reduce
操作完成后输出的文件mr-out-X
;master开始前要判断和中间文件同名文件是否存在,如存在则删除,避免运行时错误地判断了任务完成。 - 程序退出:master检查所有任务完成后互斥地设置
done = true
,这时mrworker
调用Done()
方法发现任务完成,就能顺利退出;在master退出后,worker在RPC时联系不上master就可以判断所有任务已经完成。 - 避免并发错误:利用
ioutil.TempFile
创建一个名字独有的临时文件、利用os.Rename
原子性地重命名一个文件。
具体实现
以下给出rpc.go
,master.go
,worker.go
三个文件。
rpc.go
rpc.go
定义了master和worker通信的数据结构:
package mr
//
// RPC definitions.
//
// remember to capitalize all names.
//
import "os"
import "strconv"
type TaskRequest struct {
}
type TaskType int
const (
MapTask = 1
ReduceTask = 2
)
type TaskResponse struct {
// if it is a map task, Filename indicates file that need to be mapped, else it is empty string
Filename string
// task type is either map/reduce
TypeOfTask TaskType
// this is the serial number of task
Serial int
// NReduce is for dividing intermediate result into buckets
NReduce int
}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}
master.go
master实现如下:
package mr
import (
"log"
"net"
"net/http"
"net/rpc"
"strconv"
"time"
)
import "os"
type Master struct {
// user TaskChannel to deliver task to workers
TaskChannel chan TaskResponse
// done will be true if all task is done
done bool
// sem is to protect done from concurrent read/write
sem chan struct{}
}
// keep track of task
type TaskTrack struct {
taskResp TaskResponse
startTime time.Time
}
func (m *Master) DispatchTask(request *TaskRequest, response *TaskResponse) error {
// extract a task from channel
// if there is no task available, the thread which calls this function will go to sleep
temp := <-m.TaskChannel
response.Filename = temp.Filename
response.TypeOfTask = temp.TypeOfTask
response.Serial = temp.Serial
response.NReduce = temp.NReduce
return nil
}
//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
ret := false
// read m.done exclusively
<- m.sem
ret = m.done
m.sem <- struct{}{}
return ret
}
// task expires after ten seconds
func isExpired(task TaskTrack) bool {
return time.Now().Sub(task.startTime).Seconds() > 10
}
func dispatcher(files []string, nReduce int, m *Master) {
// remove intermediate files in case there is any collision
for i := 0; i < len(files); i++ {
filename := "mr-" + strconv.Itoa(i)
err := os.Remove(filename)
if err != nil && !os.IsNotExist(err) {
log.Fatalf("error occurs while removing file %v", filename)
}
}
var unfinishedTasks []TaskTrack
//-------------------------------------------- dispatch map task --------------------------------------------
for i, file := range files {
resp := TaskResponse{Filename: file, TypeOfTask: MapTask, Serial: i}
m.TaskChannel <- resp
unfinishedTasks = append(unfinishedTasks, TaskTrack{taskResp: resp, startTime: time.Now()})
}
// check if all map tasks are complete
for len(unfinishedTasks) > 0 {
for i := 0; i < len(unfinishedTasks); i++ {
track := unfinishedTasks[i]
filename := "mr-" + strconv.Itoa(track.taskResp.Serial)
// check if intermediate file exists
if _, err := os.Stat(filename); err == nil {
// filename exists, which indicates that this track is completed
unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
i--
} else if len(m.TaskChannel) == 0 && isExpired(track) {
// track dispatch channel is empty && this task is expired, emit this task again
m.TaskChannel <- track.taskResp
// reset startTime of this task
unfinishedTasks[i].startTime = time.Now()
}
}
time.Sleep(time.Second)
}
//-------------------------------------------- dispatch reduce task --------------------------------------------
// all map tasks are completed, now start to emit reduce task
// there are nReduce reduce tasks in total
for i := 0; i < nReduce; i++ {
resp := TaskResponse{TypeOfTask: ReduceTask, Serial: i, NReduce: nReduce}
m.TaskChannel <- resp
unfinishedTasks = append(unfinishedTasks, TaskTrack{taskResp: resp, startTime: time.Now()})
}
// check if all reduce tasks are complete
for len(unfinishedTasks) > 0 {
for i := 0; i < len(unfinishedTasks); i++ {
track := unfinishedTasks[i]
filename := "mr-out-" + strconv.Itoa(track.taskResp.Serial)
if _, err := os.Stat(filename); err == nil {
unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
i--
} else if len(m.TaskChannel) == 0 && isExpired(track) {
m.TaskChannel <- track.taskResp
// reset startTime
unfinishedTasks[i].startTime = time.Now()
}
}
time.Sleep(time.Second)
}
// exclusively set status to done
<- m.sem
m.done = true
m.sem <- struct{}{}
}
//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {
// initialize master
m := Master{TaskChannel: make(chan TaskResponse, 100), sem: make(chan struct{}, 1)}
m.sem <- struct{}{}
// dispatcher tasks in another thread
go dispatcher(files, nReduce, &m)
// start a thread that listens for RPCs from worker.go
m.server()
return &m
}
//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
rpc.Register(m)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := masterSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
worker.go
worker实现如下:
package mr
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sort"
"strconv"
)
import "log"
import "net/rpc"
import "hash/fnv"
//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
_, err := h.Write([]byte(key))
if err != nil {
log.Fatalf("error occurs while hashing key %v", key)
}
return int(h.Sum32() & 0x7fffffff)
}
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
// worker调用rpc向master请求工作
for true {
task, ok := askForTask()
// fail to contact master, which indicates that all tasks are done
if !ok {
break
}
if task.TypeOfTask == MapTask {
doMapTask(task, mapf)
} else {
doReduceTask(task, reducef)
}
}
// uncomment to send the Example RPC to the master.
//CallExample()
}
func askForTask() (TaskResponse, bool) {
request := TaskRequest{}
response := TaskResponse{}
ok := call("Master.DispatchTask", &request, &response)
return response, ok
}
func doMapTask(task TaskResponse, mapf func(string, string) []KeyValue) {
filename := task.Filename
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
err = file.Close()
if err != nil {
log.Fatalf("cannot close file %v", file.Name())
}
kva := mapf(filename, string(content))
tempFile, err := ioutil.TempFile("", "")
if err != nil {
log.Fatalf("cannot create tempFile %v", tempFile)
}
enc := json.NewEncoder(tempFile)
for _, kv := range kva {
err := enc.Encode(&kv)
if err != nil {
log.Fatalf("cannot encode kv %v into file %v", kv, tempFile)
}
}
err = tempFile.Close()
if err != nil {
log.Fatalf("error occurs while closing file %v", tempFile)
}
// intermediate kv pairs are saved in mr-X
err = os.Rename(tempFile.Name(), "mr-"+strconv.Itoa(task.Serial))
if err != nil {
log.Fatalf("err occurs while renaming file %v to %s", tempFile, "mr-"+strconv.Itoa(task.Serial))
}
}
func doReduceTask(task TaskResponse, reducef func(string, []string) string) {
var kva []KeyValue
tempFile, err := ioutil.TempFile("", "")
if err != nil {
log.Fatalf("cannot create tempFile %v", tempFile)
}
// go to every intermediate file to collect corresponding keys
i := 0
for {
file, err := os.Open("mr-" + strconv.Itoa(i))
if err != nil {
if os.IsNotExist(err) {
// all intermediate files are read
break
} else {
log.Fatalf("error occurs while openning a file %v", "mr-" + strconv.Itoa(i))
}
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
// select keys that this worker need to reduce
if ihash(kv.Key) % task.NReduce == task.Serial {
kva = append(kva, kv)
}
}
i++
}
sort.Sort(ByKey(kva))
j := 0
for j < len(kva) {
k := j + 1
for k < len(kva) && kva[j].Key == kva[k].Key {
k++
}
var values []string
for u := j; u < k; u++ {
values = append(values, kva[u].Value)
}
output := reducef(kva[j].Key, values)
_, err := fmt.Fprintf(tempFile, "%v %v\n", kva[j].Key, output)
if err != nil {
log.Fatalf("error occurs while wrting into tempFile %v", tempFile)
}
j = k
}
err = tempFile.Close()
if err != nil {
log.Fatalf("error occurs while closing file %v", tempFile)
}
err = os.Rename(tempFile.Name(), "mr-out-"+strconv.Itoa(task.Serial))
if err != nil {
log.Fatalf("error occurs while renaming file %v to %s", tempFile, "mr-out-"+strconv.Itoa(task.Serial))
}
}
//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := masterSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}