内容简介:在PartI主要是实现 common_map.go的 doMap()方法(分割map任务输出的函数)以及 common_reduce.go的 doReduce()方法(收集reduce任务的所有输入的函数),此时map和reduce阶段的task还是串行运行的。首先了解一下整个程序的运行流程,执行下列命令l即可运行试验第一部分的代码。执行命令后会运行test_test.go中的TestSequentialSingle函数
在PartI主要是实现 common_map.go的 doMap()方法(分割map任务输出的函数)以及 common_reduce.go的 doReduce()方法(收集reduce任务的所有输入的函数),此时map和reduce阶段的task还是串行运行的。
2.实现思路
首先了解一下整个程序的运行流程,执行下列命令l即可运行试验第一部分的代码。
go test -run Sequential
执行命令后会运行test_test.go中的TestSequentialSingle函数
func TestSequentialSingle(t *testing.T) {
mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
复制代码
Sequential()函数中会串行地执行map和reduce tasks。其中makeInputs会生成一个输入文件824-mrinput-0.txt,即本次要处理的文件,里面是递增的数字(0~99999),一个数字为一行。
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}
复制代码
根据参数,本次只会生成一个文件824-mrinput-0.txt,以及一个reduce task处理。在Sequential函数中首先调用 doMap()方法实现Map功能,生成中间键值对,然后调用doReduce()方法实现Reduce功能。由于只有一个文件和一个reduce task,所以doMap()和doReduce()会依次串行地各执行一次。
3.doMap()
doMap()函数主要是实现这样的功能:读取一个输入文件(inFile),调用我们实现的Map功能的函数mapF,并将mapF的输出内容分配到给nReduce的中间文件中。
- 每个reduce task有一个中间文件,首先需要生成中间文件的名称,调用common.go中的reduceName方法即可 reduceName(jobName, mapTask, r)生成中间文件名称
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
复制代码
- 对每个键值key调用ihash()方法,然后mod nReduce,来选择该键值对放在哪个中间文件中
- 在此部分实验中,mapF()其实已经在原代码中帮我们实现了,他就是test_test.go中的MapFunc()函数。他在TestSequentialSingle中调用Sequential时已经传入。其中第一个参数是要处理的文件名,第二个参数是文件的全部内容,最后函数会返回一个[]KeyValue。具体实现如下
// 将读取文件中的所有单词分割,返回[]KeyValue,形如[(“0”,””),(“1”,””)...] ps.可能存在重复的KeyValue
func MapFunc(file string, value string) (res []KeyValue) {
debug("Map %v\n", value)
// Fields 以连续的空白字符为分隔符,将 s 切分成多个子串,结果中不包含空白字符本身
// 空白字符有:\t, \n, \v, \f, \r, ' ', U+0085 (NEL), U+00A0 (NBSP)
words := strings.Fields(value)
for _, w := range words {
kv := KeyValue{w, ""}
res = append(res, kv)
}
return
}
复制代码
至于写入中间文件,建议使用json的方式,可以使用以下方法
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)
}
复制代码
综上分析,得到实现doMap()函数的大致思路(本部分 只有一个输入文件和一个reduce task,nReduce=1):
首先根据n个输入文件和设定的m个reduce tasks,生成n*m个中间文件,调用reduceName方法进行命名。由于这部分实验只有一个输入文件和一个reduce task,因此只会生成一个中间文件。 对于每一个输入文件fileX,读取文件内容,调用mapF()方法进行处理,最终返回键值对[]KeyValue。此处的mapF()是指上文的MapFunc方法 对上一步生成的[]KeyValue,每一个Key调用ihash()方法然后mod nReduce,选择该 KeyValue写入哪个中间文件中。
处理完[]KeyValue全部内容,关闭文件。
我的实现代码如下:
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
//创建中间文件
var interFiles []*os.File
for r:=0;r<nReduce;r++ {
interName := reduceName(jobName, mapTask, r)
interFile,err := os.Create(interName)
if err != nil {
fmt.Println(err)
}
interFiles = append(interFiles,interFile)
defer interFile.Close()
}
//调用mapF,得到map程序处理后的键值对
inBody,err:=ioutil.ReadFile(inFile)//读取文件内容
if err!=nil {
fmt.Println(err)
}
keyValue := mapF(inFile,string(inBody))
//写入中间文件
for _,v := range keyValue {
r:= ihash(v.Key)%nReduce
enc := json.NewEncoder(interFiles[r])
err := enc.Encode(&v)
if err!=nil {
fmt.Println(err)
}
}
}
复制代码
4.doReduce()
doReduce()主要是实现这样的功能:读取这个reduce task对应的中间文件,按key对中间键/值对进行 排序 及合并,为每个key调用用户定义的reduce函数(reduceF),最后将reduceF的输出写入磁盘。
若在doMap()中使用了enc.Encode(&kv)将中间键值对写入中间文件,在doReduce()中可以使用Decode(&kv)来读取。最后的输出文件也推荐使用json的方式写入。
同理,reduceF对应的是ReduceFunc,也已经在原代码中实现了。第一个参数是键key,第二个参数是值value的数组,若各个中间文件中某个key有多个value。在ReduceFunc中只是打印了key值,没做什么处理。
// Just return key
func ReduceFunc(key string, values []string) string {
for _, e := range values {
debug("Reduce %s %v\n", key, e)
}
return ""
}
复制代码
我的实现如下
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//读取中间文件
var keyValue []KeyValue
for i:=0;i<nMap;i++ {
interName := reduceName(jobName, i, reduceTask)
interBody,err:=ioutil.ReadFile(interName)
if err != nil {
fmt.Println(err)
}
dec := json.NewDecoder(strings. NewReader(string(interBody)))
for {
var m KeyValue
if err := dec.Decode(&m) ; err == io. EOF {
break
} else if err != nil {
fmt.Println(err)
}
keyValue = append(keyValue,m)
}
}
//排序及合并,处理后应该类似["0":[""],"1":[""]...]
var keyValuesMap map[string][]string
keyValuesMap = make(map[string][]string)
for _,v := range keyValue {
if _,ok:= keyValuesMap[v.Key];ok {//若key值已存在,将value添加到[]string中
keyValuesMap[v.Key] = append(keyValuesMap[v.Key],v.Value)
}else{//若key值不存在,在map中新建key
var values []string
values = append(values,v.Value)
keyValuesMap[v.Key] = values
}
}
//对每个key调用reduceF,并写入最后的文件
outputFile,err := os.Create(outFile)
if err!=nil {
fmt.Println(err)
}
enc := json.NewEncoder(outputFile)
for k,v := range keyValuesMap {
err := enc.Encode(KeyValue{k, reduceF(k,v)})
if err!=nil {
fmt.Println(err)
}
}
}
复制代码
5.测试运行
在6.824\src\mapreduce目录下执行下列命令进行PartI实验的测试
go test -run Sequential
若运行通过会在结果中输出ok,类似
ps.运行 go 命令需要设置好GOPATH
在common.go中设置 debugEnabled = true,go test时增加-v参数可以获得更多调试信息
env "GOPATH=$PWD/../../" go test -v -run Sequential === RUN TestSequentialSingle master: Starting Map/Reduce task test Merge: read mrtmp.test-res-0 master: Map/Reduce task completed --- PASS: TestSequentialSingle (1.34s) === RUN TestSequentialMany master: Starting Map/Reduce task test Merge: read mrtmp.test-res-0 Merge: read mrtmp.test-res-1 Merge: read mrtmp.test-res-2 master: Map/Reduce task completed --- PASS: TestSequentialMany (1.33s) PASS ok mapreduce 2.672s 复制代码
6.问题记录
-
运行报错:./master_rpc.go:48: debug call has arguments but no formatting directives
解决方法master_rpc.go的48行debug("RegistrationServer: accept error ", err) 改为debug("RegistrationServer: accept error %v", err)
以上所述就是小编给大家介绍的《MIT6.824-Lab1-Part I: Map/Reduce input and output》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Perl语言入门
[美] Randal L.Schwartz、Tom Phoenix / 李晓峰 / 中国电力出版社 / 2002-8 / 48.00元
本书第一版于1993年问世,并从此成为畅销书。本书由Perl社区最著名、最活跃的两位成员写成,是Perl程序设计语言的精髓指南。 Perl最初只是Unix系统管理员的一个工具,在工作日里被用在无数的小任务中。从那以后,它逐步发展成为一种全功能的程序设计语言,特别是在各种计算平台上,它被用作Web编程、数据库处理、XML处理以及系统管理——它能够完成所有这些工作,同时仍然是处理小的日常工作的完......一起来看看 《Perl语言入门》 这本书的介绍吧!