MIT6.824-Lab1-Part I: Map/Reduce input and output

栏目: 编程工具 · 发布时间: 5年前

内容简介:在PartI主要是实现 common_map.go的 doMap()方法(分割map任务输出的函数)以及 common_reduce.go的 doReduce()方法(收集reduce任务的所有输入的函数),此时map和reduce阶段的task还是串行运行的。首先了解一下整个程序的运行流程,执行下列命令l即可运行试验第一部分的代码。执行命令后会运行test_test.go中的TestSequentialSingle函数

在PartI主要是实现 common_map.go的 doMap()方法(分割map任务输出的函数)以及 common_reduce.go的 doReduce()方法(收集reduce任务的所有输入的函数),此时map和reduce阶段的task还是串行运行的。

2.实现思路

首先了解一下整个程序的运行流程,执行下列命令l即可运行试验第一部分的代码。

go test -run Sequential

执行命令后会运行test_test.go中的TestSequentialSingle函数

func TestSequentialSingle(t *testing.T) {
    mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}
复制代码

Sequential()函数中会串行地执行map和reduce tasks。其中makeInputs会生成一个输入文件824-mrinput-0.txt,即本次要处理的文件,里面是递增的数字(0~99999),一个数字为一行。

func Sequential(jobName string, files []string, nreduce int,
    mapF func(string, string) []KeyValue,
    reduceF func(string, []string) string,
) (mr *Master) {
    mr = newMaster("master")
    go mr.run(jobName, files, nreduce, func(phase jobPhase) {
        switch phase {
        case mapPhase:
            for i, f := range mr.files {
                doMap(mr.jobName, i, f, mr.nReduce, mapF)
            }
        case reducePhase:
            for i := 0; i < mr.nReduce; i++ {
                doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
            }
        }
    }, func() {
        mr.stats = []int{len(files) + nreduce}
    })
    return
}
复制代码

根据参数,本次只会生成一个文件824-mrinput-0.txt,以及一个reduce task处理。在Sequential函数中首先调用 doMap()方法实现Map功能,生成中间键值对,然后调用doReduce()方法实现Reduce功能。由于只有一个文件和一个reduce task,所以doMap()和doReduce()会依次串行地各执行一次。

3.doMap()

doMap()函数主要是实现这样的功能:读取一个输入文件(inFile),调用我们实现的Map功能的函数mapF,并将mapF的输出内容分配到给nReduce的中间文件中。

  1. 每个reduce task有一个中间文件,首先需要生成中间文件的名称,调用common.go中的reduceName方法即可 reduceName(jobName, mapTask, r)生成中间文件名称
func reduceName(jobName string, mapTask int, reduceTask int) string {
    return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
复制代码
  1. 对每个键值key调用ihash()方法,然后mod nReduce,来选择该键值对放在哪个中间文件中
  2. 在此部分实验中,mapF()其实已经在原代码中帮我们实现了,他就是test_test.go中的MapFunc()函数。他在TestSequentialSingle中调用Sequential时已经传入。其中第一个参数是要处理的文件名,第二个参数是文件的全部内容,最后函数会返回一个[]KeyValue。具体实现如下
// 将读取文件中的所有单词分割,返回[]KeyValue,形如[(“0”,””),(“1”,””)...]    ps.可能存在重复的KeyValue
func MapFunc(file string, value string) (res []KeyValue) {
    debug("Map %v\n", value)

    // Fields 以连续的空白字符为分隔符,将 s 切分成多个子串,结果中不包含空白字符本身
    // 空白字符有:\t, \n, \v, \f, \r, ' ', U+0085 (NEL), U+00A0 (NBSP)
    words := strings.Fields(value)
    for _, w := range words {
        kv := KeyValue{w, ""}
        res = append(res, kv)
    }
    return
}
复制代码

至于写入中间文件,建议使用json的方式,可以使用以下方法

enc := json.NewEncoder(file)
for _, kv := ... {
    err := enc.Encode(&kv)
}
复制代码

综上分析,得到实现doMap()函数的大致思路(本部分 只有一个输入文件和一个reduce task,nReduce=1):

首先根据n个输入文件和设定的m个reduce tasks,生成n*m个中间文件,调用reduceName方法进行命名。由于这部分实验只有一个输入文件和一个reduce task,因此只会生成一个中间文件。 对于每一个输入文件fileX,读取文件内容,调用mapF()方法进行处理,最终返回键值对[]KeyValue。此处的mapF()是指上文的MapFunc方法 对上一步生成的[]KeyValue,每一个Key调用ihash()方法然后mod nReduce,选择该 KeyValue写入哪个中间文件中。

处理完[]KeyValue全部内容,关闭文件。

我的实现代码如下:

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,
) {
    //创建中间文件
    var interFiles []*os.File
    for r:=0;r<nReduce;r++ {
        interName := reduceName(jobName, mapTask, r)
        interFile,err := os.Create(interName)
        if err != nil {
            fmt.Println(err)
        }
        interFiles = append(interFiles,interFile)
        defer interFile.Close()
    }

    //调用mapF,得到map程序处理后的键值对
    inBody,err:=ioutil.ReadFile(inFile)//读取文件内容
    if err!=nil {
        fmt.Println(err)
    }
    keyValue := mapF(inFile,string(inBody))

    //写入中间文件
    for _,v := range keyValue {
        r:= ihash(v.Key)%nReduce
        enc := json.NewEncoder(interFiles[r])
        err := enc.Encode(&v)
        if err!=nil {
            fmt.Println(err)
        }
    }
}
复制代码

4.doReduce()

doReduce()主要是实现这样的功能:读取这个reduce task对应的中间文件,按key对中间键/值对进行 排序 及合并,为每个key调用用户定义的reduce函数(reduceF),最后将reduceF的输出写入磁盘。

若在doMap()中使用了enc.Encode(&kv)将中间键值对写入中间文件,在doReduce()中可以使用Decode(&kv)来读取。最后的输出文件也推荐使用json的方式写入。

同理,reduceF对应的是ReduceFunc,也已经在原代码中实现了。第一个参数是键key,第二个参数是值value的数组,若各个中间文件中某个key有多个value。在ReduceFunc中只是打印了key值,没做什么处理。

// Just return key
func ReduceFunc(key string, values []string) string {
    for _, e := range values {
        debug("Reduce %s %v\n", key, e)
    }
    return ""
}
复制代码

我的实现如下

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //读取中间文件
    var keyValue []KeyValue
    for i:=0;i<nMap;i++ {
        interName := reduceName(jobName, i, reduceTask)
        interBody,err:=ioutil.ReadFile(interName)
        if err != nil {
                fmt.Println(err)
        }
        dec := json.NewDecoder(strings. NewReader(string(interBody)))
        for {
        var m KeyValue
        if err := dec.Decode(&m) ; err == io. EOF {
            break
        } else if err != nil {
            fmt.Println(err)
        }
                keyValue = append(keyValue,m)
    }
    }

    //排序及合并,处理后应该类似["0":[""],"1":[""]...]
    var keyValuesMap map[string][]string
    keyValuesMap = make(map[string][]string)
    for _,v := range keyValue {
        if _,ok:= keyValuesMap[v.Key];ok {//若key值已存在,将value添加到[]string中
             keyValuesMap[v.Key] = append(keyValuesMap[v.Key],v.Value)
        }else{//若key值不存在,在map中新建key
            var values []string
            values = append(values,v.Value)
            keyValuesMap[v.Key] = values
        }
    }

    //对每个key调用reduceF,并写入最后的文件
    outputFile,err := os.Create(outFile)
    if err!=nil {
        fmt.Println(err)
    }
    enc := json.NewEncoder(outputFile)
    for k,v := range keyValuesMap {
        err := enc.Encode(KeyValue{k, reduceF(k,v)})
        if err!=nil {
            fmt.Println(err)
        }
    }
}
复制代码

5.测试运行

在6.824\src\mapreduce目录下执行下列命令进行PartI实验的测试

go test -run Sequential

若运行通过会在结果中输出ok,类似

MIT6.824-Lab1-Part I: Map/Reduce input and output

ps.运行 go 命令需要设置好GOPATH

在common.go中设置 debugEnabled = true,go test时增加-v参数可以获得更多调试信息

env "GOPATH=$PWD/../../" go test -v -run Sequential
=== RUN TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok mapreduce 2.672s
复制代码

6.问题记录

  1. 运行报错:./master_rpc.go:48: debug call has arguments but no formatting directives

    解决方法master_rpc.go的48行debug("RegistrationServer: accept error ", err) 改为debug("RegistrationServer: accept error %v", err)

MIT6.824-Lab1-Part I: Map/Reduce input and output

以上所述就是小编给大家介绍的《MIT6.824-Lab1-Part I: Map/Reduce input and output》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

CSS揭秘

CSS揭秘

[希] Lea Verou / CSS魔法 / 人民邮电出版社 / 2016-4 / 99.00元

本书是一本注重实践的教程,作者为我们揭示了 47 个鲜为人知的 CSS 技巧,主要内容包括背景与边框、形状、 视觉效果、字体排印、用户体验、结构与布局、过渡与动画等。本书将带领读者循序渐进地探寻更优雅的解决方案,攻克每天都会遇到的各种网页样式难题。 本书的读者对象为前端工程师、网页开发人员。一起来看看 《CSS揭秘》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

SHA 加密
SHA 加密

SHA 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器