内容简介:$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标
$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。
编写MapReduce函数
以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标准输入($sys.stdin$)、标准输出($sys.stdout$)在 $Map$ 函数和 $Reduce$ 函数之间传递数据,其余的事情 $Hadoop$ 流将会帮助我们完成。
Mapper部分
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print "%s\t%s" % (word, 1)
文件从标准输入 sys.stdin
读取文件后把单词切开,并把单词和词频输出标准输出。$mapper$ 不计算单词的总数,而是输出 (word, 1)
,方便让随后的 $reducer$ 做统计工作。
对文件赋权以使其可以运行
chmod +x mapper.py
Reducer部分
!/usr/bin/env python import sys current_word = None current_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: current_count += count else: if current_word: print "%s\t%s" % (current_word, current_count) current_count = count current_word = word if word == current_word: print "%s\t%s" % (current_word, current_count)
按照 $Hadoop$ 的设计理念,$mapper$ 输出的相同的 $key/value$ 数据会存储在同一个 $part$ 上,并且 $mapper$ 的输出会自动作为 $reducer$ 的输入。在这个需求中,$reducer$ 要做的事情只是依次接受集群的标准输入,并统计连续单词的个数,因为在这个例子中,$key$ 代表的就是单词。
同样对文件赋权以使其可以运行
chmod +x reducer.py
本地测试
通常在把任务提交给 $Hadoop$ 集群执行的之前,都进行一次本地测试。本地与 $Hadoop$ 环境不同之处在于我们需要手动对 $mapper$ 的输出按照 $key$ 来进行排序,模拟 $Hadoop$ 集群场景。
cat data | mapper | sort | reducer > output
实例:
cat news.merge.0.json | python mapper.py | sort -k1,1 | python reducer.py > ret cat file_input | python mapper.py > ret
第一行是对当前统计单词个数的测试命令,可以拓展到含有 $mapper$ 和 $reducer$ 的任务中,第二行是对 $mapper$ 的单独测试,同样也可是应用到只有 $mapper$ 没有 $reducer$ 的任务中。
使用 Hadoop 集群执行任务
集群运行
在把任务提交给集群执行时,我们常常将诸多命令合并到一个脚本文件中,一次性执行所有内容
nohup sh -x hadoop.sh &
$hadoop.sh$ 的书写方法如下
对于含有 $mapper$ 和 $reducer$ 的任务
#!/bin/bash HADOOP='/home/work/infra/infra-client/bin/hadoop' ${HADOOP} fs -rmr /user/.../output ${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \ -D mapreduce.job.queuename=root.production.cloud_group.qabot \ -input /user/..../input-data \ -output /user/.../output/\ -mapper 'python mapper.py' \ -reducer 'python reducer.py' \ -file './mapper.py' \ -file './reducer.py'
只有 $mapper$ 没有 $reducer$ 的任务
#!/bin/bash HADOOP=/home/work/infra/infra-client/bin/hadoop ${HADOOP} fs -rmr /user/.../output ${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \ -D mapreduce.job.queuename=root.production.cloud_group.qabot \ -input /user/..../input-data \ -output /user/.../output/\ -mapper 'python mapper.py' \ -reducer 'cat' \ -file './mapper.py' \
$nohup$ 中会实时更新集群对任务的处理进度,任务执行结束后会在指定的 $output$ 目录下输出一个 $part-00000$ 文件。
其中, -file
命令用于上传文件到 $Hadoop$ 集群,若 $mapper$ 或 $reducer$ 中读入了额外的文件,同样需要将该程序上传到 $Hadoop$ 集群上。
常用的 Hadoop Shell 命令
$Hadoop$ 所有的对文件的操作都需要调用文件系统,所以其命令的格式为
hadoop fs -shell
详细的命令可以查看这里: Hadoop Shell
get
hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>
复制文件到本地文件系统。可用 -ignorecrc
选项复制 CRC
校验失败的文件。使用 -crc
选项复制文件以及 CRC
信息。
hadoop fs -get /user/hadoop/file localfile hadoop fs -get hdfs://host:port/user/hadoop/file localfile
put
hadoop fs -put <localsrc> ... <dst>
从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。
hadoop fs -put localfile /user/hadoop/hadoopfile hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile hadoop fs -put - hdfs://host:port/hadoop/hadoopfile
参考文献:
[1] MapReduce: Simplified Data Processing on Large Clusters-Google
以上所述就是小编给大家介绍的《Python使用Hadoop集群》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Druid集群安装使用
- BookKeeper 集群搭建及使用
- Eureka使用及集群部署
- 使用Docker部署RabbitMQ集群
- Kafka集群的安装和使用
- 使用 docker 搭建 clickhouse 集群
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Music Recommendation and Discovery
Òscar Celma / Springer / 2010-9-7 / USD 49.95
With so much more music available these days, traditional ways of finding music have diminished. Today radio shows are often programmed by large corporations that create playlists drawn from a limited......一起来看看 《Music Recommendation and Discovery》 这本书的介绍吧!