内容简介:在PartI主要是实现 common_map.go的 doMap()方法(分割map任务输出的函数)以及 common_reduce.go的 doReduce()方法(收集reduce任务的所有输入的函数),此时map和reduce阶段的task还是串行运行的。首先了解一下整个程序的运行流程,执行下列命令l即可运行试验第一部分的代码。执行命令后会运行test_test.go中的TestSequentialSingle函数
在PartI主要是实现 common_map.go的 doMap()方法(分割map任务输出的函数)以及 common_reduce.go的 doReduce()方法(收集reduce任务的所有输入的函数),此时map和reduce阶段的task还是串行运行的。
2.实现思路
首先了解一下整个程序的运行流程,执行下列命令l即可运行试验第一部分的代码。
go test -run Sequential
执行命令后会运行test_test.go中的TestSequentialSingle函数
func TestSequentialSingle(t *testing.T) { mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc) mr.Wait() check(t, mr.files) checkWorker(t, mr.stats) cleanup(mr) } 复制代码
Sequential()函数中会串行地执行map和reduce tasks。其中makeInputs会生成一个输入文件824-mrinput-0.txt,即本次要处理的文件,里面是递增的数字(0~99999),一个数字为一行。
func Sequential(jobName string, files []string, nreduce int, mapF func(string, string) []KeyValue, reduceF func(string, []string) string, ) (mr *Master) { mr = newMaster("master") go mr.run(jobName, files, nreduce, func(phase jobPhase) { switch phase { case mapPhase: for i, f := range mr.files { doMap(mr.jobName, i, f, mr.nReduce, mapF) } case reducePhase: for i := 0; i < mr.nReduce; i++ { doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF) } } }, func() { mr.stats = []int{len(files) + nreduce} }) return } 复制代码
根据参数,本次只会生成一个文件824-mrinput-0.txt,以及一个reduce task处理。在Sequential函数中首先调用 doMap()方法实现Map功能,生成中间键值对,然后调用doReduce()方法实现Reduce功能。由于只有一个文件和一个reduce task,所以doMap()和doReduce()会依次串行地各执行一次。
3.doMap()
doMap()函数主要是实现这样的功能:读取一个输入文件(inFile),调用我们实现的Map功能的函数mapF,并将mapF的输出内容分配到给nReduce的中间文件中。
- 每个reduce task有一个中间文件,首先需要生成中间文件的名称,调用common.go中的reduceName方法即可 reduceName(jobName, mapTask, r)生成中间文件名称
func reduceName(jobName string, mapTask int, reduceTask int) string { return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask) } 复制代码
- 对每个键值key调用ihash()方法,然后mod nReduce,来选择该键值对放在哪个中间文件中
- 在此部分实验中,mapF()其实已经在原代码中帮我们实现了,他就是test_test.go中的MapFunc()函数。他在TestSequentialSingle中调用Sequential时已经传入。其中第一个参数是要处理的文件名,第二个参数是文件的全部内容,最后函数会返回一个[]KeyValue。具体实现如下
// 将读取文件中的所有单词分割,返回[]KeyValue,形如[(“0”,””),(“1”,””)...] ps.可能存在重复的KeyValue func MapFunc(file string, value string) (res []KeyValue) { debug("Map %v\n", value) // Fields 以连续的空白字符为分隔符,将 s 切分成多个子串,结果中不包含空白字符本身 // 空白字符有:\t, \n, \v, \f, \r, ' ', U+0085 (NEL), U+00A0 (NBSP) words := strings.Fields(value) for _, w := range words { kv := KeyValue{w, ""} res = append(res, kv) } return } 复制代码
至于写入中间文件,建议使用json的方式,可以使用以下方法
enc := json.NewEncoder(file) for _, kv := ... { err := enc.Encode(&kv) } 复制代码
综上分析,得到实现doMap()函数的大致思路(本部分 只有一个输入文件和一个reduce task,nReduce=1):
首先根据n个输入文件和设定的m个reduce tasks,生成n*m个中间文件,调用reduceName方法进行命名。由于这部分实验只有一个输入文件和一个reduce task,因此只会生成一个中间文件。 对于每一个输入文件fileX,读取文件内容,调用mapF()方法进行处理,最终返回键值对[]KeyValue。此处的mapF()是指上文的MapFunc方法 对上一步生成的[]KeyValue,每一个Key调用ihash()方法然后mod nReduce,选择该 KeyValue写入哪个中间文件中。
处理完[]KeyValue全部内容,关闭文件。
我的实现代码如下:
func doMap( jobName string, // the name of the MapReduce job mapTask int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(filename string, contents string) []KeyValue, ) { //创建中间文件 var interFiles []*os.File for r:=0;r<nReduce;r++ { interName := reduceName(jobName, mapTask, r) interFile,err := os.Create(interName) if err != nil { fmt.Println(err) } interFiles = append(interFiles,interFile) defer interFile.Close() } //调用mapF,得到map程序处理后的键值对 inBody,err:=ioutil.ReadFile(inFile)//读取文件内容 if err!=nil { fmt.Println(err) } keyValue := mapF(inFile,string(inBody)) //写入中间文件 for _,v := range keyValue { r:= ihash(v.Key)%nReduce enc := json.NewEncoder(interFiles[r]) err := enc.Encode(&v) if err!=nil { fmt.Println(err) } } } 复制代码
4.doReduce()
doReduce()主要是实现这样的功能:读取这个reduce task对应的中间文件,按key对中间键/值对进行 排序 及合并,为每个key调用用户定义的reduce函数(reduceF),最后将reduceF的输出写入磁盘。
若在doMap()中使用了enc.Encode(&kv)将中间键值对写入中间文件,在doReduce()中可以使用Decode(&kv)来读取。最后的输出文件也推荐使用json的方式写入。
同理,reduceF对应的是ReduceFunc,也已经在原代码中实现了。第一个参数是键key,第二个参数是值value的数组,若各个中间文件中某个key有多个value。在ReduceFunc中只是打印了key值,没做什么处理。
// Just return key func ReduceFunc(key string, values []string) string { for _, e := range values { debug("Reduce %s %v\n", key, e) } return "" } 复制代码
我的实现如下
func doReduce( jobName string, // the name of the whole MapReduce job reduceTask int, // which reduce task this is outFile string, // write the output here nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { //读取中间文件 var keyValue []KeyValue for i:=0;i<nMap;i++ { interName := reduceName(jobName, i, reduceTask) interBody,err:=ioutil.ReadFile(interName) if err != nil { fmt.Println(err) } dec := json.NewDecoder(strings. NewReader(string(interBody))) for { var m KeyValue if err := dec.Decode(&m) ; err == io. EOF { break } else if err != nil { fmt.Println(err) } keyValue = append(keyValue,m) } } //排序及合并,处理后应该类似["0":[""],"1":[""]...] var keyValuesMap map[string][]string keyValuesMap = make(map[string][]string) for _,v := range keyValue { if _,ok:= keyValuesMap[v.Key];ok {//若key值已存在,将value添加到[]string中 keyValuesMap[v.Key] = append(keyValuesMap[v.Key],v.Value) }else{//若key值不存在,在map中新建key var values []string values = append(values,v.Value) keyValuesMap[v.Key] = values } } //对每个key调用reduceF,并写入最后的文件 outputFile,err := os.Create(outFile) if err!=nil { fmt.Println(err) } enc := json.NewEncoder(outputFile) for k,v := range keyValuesMap { err := enc.Encode(KeyValue{k, reduceF(k,v)}) if err!=nil { fmt.Println(err) } } } 复制代码
5.测试运行
在6.824\src\mapreduce目录下执行下列命令进行PartI实验的测试
go test -run Sequential
若运行通过会在结果中输出ok,类似
ps.运行 go 命令需要设置好GOPATH
在common.go中设置 debugEnabled = true,go test时增加-v参数可以获得更多调试信息
env "GOPATH=$PWD/../../" go test -v -run Sequential === RUN TestSequentialSingle master: Starting Map/Reduce task test Merge: read mrtmp.test-res-0 master: Map/Reduce task completed --- PASS: TestSequentialSingle (1.34s) === RUN TestSequentialMany master: Starting Map/Reduce task test Merge: read mrtmp.test-res-0 Merge: read mrtmp.test-res-1 Merge: read mrtmp.test-res-2 master: Map/Reduce task completed --- PASS: TestSequentialMany (1.33s) PASS ok mapreduce 2.672s 复制代码
6.问题记录
-
运行报错:./master_rpc.go:48: debug call has arguments but no formatting directives
解决方法master_rpc.go的48行debug("RegistrationServer: accept error ", err) 改为debug("RegistrationServer: accept error %v", err)
以上所述就是小编给大家介绍的《MIT6.824-Lab1-Part I: Map/Reduce input and output》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。