内容简介:$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标
$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。
编写MapReduce函数
以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标准输入($sys.stdin$)、标准输出($sys.stdout$)在 $Map$ 函数和 $Reduce$ 函数之间传递数据,其余的事情 $Hadoop$ 流将会帮助我们完成。
Mapper部分
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print "%s\t%s" % (word, 1)
文件从标准输入 sys.stdin
读取文件后把单词切开,并把单词和词频输出标准输出。$mapper$ 不计算单词的总数,而是输出 (word, 1)
,方便让随后的 $reducer$ 做统计工作。
对文件赋权以使其可以运行
chmod +x mapper.py
Reducer部分
!/usr/bin/env python import sys current_word = None current_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: current_count += count else: if current_word: print "%s\t%s" % (current_word, current_count) current_count = count current_word = word if word == current_word: print "%s\t%s" % (current_word, current_count)
按照 $Hadoop$ 的设计理念,$mapper$ 输出的相同的 $key/value$ 数据会存储在同一个 $part$ 上,并且 $mapper$ 的输出会自动作为 $reducer$ 的输入。在这个需求中,$reducer$ 要做的事情只是依次接受集群的标准输入,并统计连续单词的个数,因为在这个例子中,$key$ 代表的就是单词。
同样对文件赋权以使其可以运行
chmod +x reducer.py
本地测试
通常在把任务提交给 $Hadoop$ 集群执行的之前,都进行一次本地测试。本地与 $Hadoop$ 环境不同之处在于我们需要手动对 $mapper$ 的输出按照 $key$ 来进行排序,模拟 $Hadoop$ 集群场景。
cat data | mapper | sort | reducer > output
实例:
cat news.merge.0.json | python mapper.py | sort -k1,1 | python reducer.py > ret cat file_input | python mapper.py > ret
第一行是对当前统计单词个数的测试命令,可以拓展到含有 $mapper$ 和 $reducer$ 的任务中,第二行是对 $mapper$ 的单独测试,同样也可是应用到只有 $mapper$ 没有 $reducer$ 的任务中。
使用 Hadoop 集群执行任务
集群运行
在把任务提交给集群执行时,我们常常将诸多命令合并到一个脚本文件中,一次性执行所有内容
nohup sh -x hadoop.sh &
$hadoop.sh$ 的书写方法如下
对于含有 $mapper$ 和 $reducer$ 的任务
#!/bin/bash HADOOP='/home/work/infra/infra-client/bin/hadoop' ${HADOOP} fs -rmr /user/.../output ${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \ -D mapreduce.job.queuename=root.production.cloud_group.qabot \ -input /user/..../input-data \ -output /user/.../output/\ -mapper 'python mapper.py' \ -reducer 'python reducer.py' \ -file './mapper.py' \ -file './reducer.py'
只有 $mapper$ 没有 $reducer$ 的任务
#!/bin/bash HADOOP=/home/work/infra/infra-client/bin/hadoop ${HADOOP} fs -rmr /user/.../output ${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \ -D mapreduce.job.queuename=root.production.cloud_group.qabot \ -input /user/..../input-data \ -output /user/.../output/\ -mapper 'python mapper.py' \ -reducer 'cat' \ -file './mapper.py' \
$nohup$ 中会实时更新集群对任务的处理进度,任务执行结束后会在指定的 $output$ 目录下输出一个 $part-00000$ 文件。
其中, -file
命令用于上传文件到 $Hadoop$ 集群,若 $mapper$ 或 $reducer$ 中读入了额外的文件,同样需要将该程序上传到 $Hadoop$ 集群上。
常用的 Hadoop Shell 命令
$Hadoop$ 所有的对文件的操作都需要调用文件系统,所以其命令的格式为
hadoop fs -shell
详细的命令可以查看这里: Hadoop Shell
get
hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>
复制文件到本地文件系统。可用 -ignorecrc
选项复制 CRC
校验失败的文件。使用 -crc
选项复制文件以及 CRC
信息。
hadoop fs -get /user/hadoop/file localfile hadoop fs -get hdfs://host:port/user/hadoop/file localfile
put
hadoop fs -put <localsrc> ... <dst>
从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。
hadoop fs -put localfile /user/hadoop/hadoopfile hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile hadoop fs -put - hdfs://host:port/hadoop/hadoopfile
参考文献:
[1] MapReduce: Simplified Data Processing on Large Clusters-Google
以上所述就是小编给大家介绍的《Python使用Hadoop集群》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Druid集群安装使用
- BookKeeper 集群搭建及使用
- Eureka使用及集群部署
- 使用Docker部署RabbitMQ集群
- Kafka集群的安装和使用
- 使用 docker 搭建 clickhouse 集群
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。