文章目录
- MIT 6.824 分布式系统 lab1:MapReduce
- Notes
- wordcount's MapReduce Model look like
- a simple sequential mapreduce implementation(mrsequential.go)
MIT 6.824 分布式系统 lab1:MapReduce
文档 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
MapReduce论文 https://pdos.csail.mit.edu/6.824/schedule.html
知乎笔记 zhuanlan.zhihu.com/p/54243727
博客 https://www.cnblogs.com/haoweizh/p/10395016.html
Notes
单词计数模块:We also provide you with a couple of MapReduce applications: word-count in mrapps/wc.go. a simple sequential mapreduce implementation
[kou@python mrapps]$ pwd
/home/kou/MIT/6.824-golabs-2020/src/mrapps
[kou@python mrapps]$ ls
wc.go
wordcount’s MapReduce Model look like
[kou@python mrapps]$ cat wc.go
package main
// a word-count application "plugin" for MapReduce.
//
// go build -buildmode=plugin wc.go
import "../mr"
import "unicode"
import "strings"
import "strconv"
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}
a simple sequential mapreduce implementation(mrsequential.go)
mrsequential.go is a imple sequential MapReduce and leaves its output in the file mr-out-0. The input is from the text files named pg-xxx.txt.
[kou@python main]$ cat mrsequential.go
package main
// simple sequential MapReduce.
// go run mrsequential.go wc.so pg*.txt
import "fmt"
import "../mr"
import "plugin"
import "os"
import "log"
import "io/ioutil"
import "sort"
// for sorting by key.
type ByKey []mr.KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func main() {
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n")
os.Exit(1)
}
mapf, reducef := loadPlugin(os.Args[1])// os.Args[1] wc.so
fmt.Println(os.Args[1])
fmt.Println(mapf)
fmt.Println(reducef)
os.Exit(0)
//
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
//
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {//os.Args[2:] pg*.txt
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}
//
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
//
sort.Sort(ByKey(intermediate))
oname := "mr-out-0"
ofile, _ := os.Create(oname)
//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
ofile.Close()
}
//
// load the application Map and Reduce functions
// from a plugin file, e.g. ../mrapps/wc.so
//
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
p, err := plugin.Open(filename)
if err != nil {
log.Fatalf("cannot load plugin %v", filename)
}
xmapf, err := p.Lookup("Map")
if err != nil {
log.Fatalf("cannot find Map in %v", filename)
}
mapf := xmapf.(func(string, string) []mr.KeyValue)
xreducef, err := p.Lookup("Reduce")
if err != nil {
log.Fatalf("cannot find Reduce in %v", filename)
}
reducef := xreducef.(func(string, []string) string)
return mapf, reducef
}
寇浩哲
发布了323 篇原创文章 · 获赞 232 · 访问量 34万+
关注