Python使用Hadoop集群

栏目: Python · 发布时间: 5年前

内容简介:$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标

Python使用Hadoop集群

$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。

编写MapReduce函数

以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标准输入($sys.stdin$)、标准输出($sys.stdout$)在 $Map$ 函数和 $Reduce$ 函数之间传递数据,其余的事情 $Hadoop$ 流将会帮助我们完成。

Mapper部分

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)

文件从标准输入 sys.stdin 读取文件后把单词切开,并把单词和词频输出标准输出。$mapper$ 不计算单词的总数,而是输出 (word, 1) ,方便让随后的 $reducer$ 做统计工作。

对文件赋权以使其可以运行

chmod +x mapper.py

Reducer部分

!/usr/bin/env python
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:
    print "%s\t%s" % (current_word, current_count)

按照 $Hadoop$ 的设计理念,$mapper$ 输出的相同的 $key/value$ 数据会存储在同一个 $part$ 上,并且 $mapper$ 的输出会自动作为 $reducer$ 的输入。在这个需求中,$reducer$ 要做的事情只是依次接受集群的标准输入,并统计连续单词的个数,因为在这个例子中,$key$ 代表的就是单词。

同样对文件赋权以使其可以运行

chmod +x reducer.py

本地测试

通常在把任务提交给 $Hadoop$ 集群执行的之前,都进行一次本地测试。本地与 $Hadoop$ 环境不同之处在于我们需要手动对 $mapper$ 的输出按照 $key$ 来进行排序,模拟 $Hadoop$ 集群场景。

cat data | mapper | sort | reducer > output

实例:

cat news.merge.0.json | python mapper.py | sort -k1,1 | python reducer.py > ret
cat file_input | python mapper.py > ret

第一行是对当前统计单词个数的测试命令,可以拓展到含有 $mapper$ 和 $reducer$ 的任务中,第二行是对 $mapper$ 的单独测试,同样也可是应用到只有 $mapper$ 没有 $reducer$ 的任务中。

使用 Hadoop 集群执行任务

集群运行

在把任务提交给集群执行时,我们常常将诸多命令合并到一个脚本文件中,一次性执行所有内容

nohup sh -x hadoop.sh &

$hadoop.sh$ 的书写方法如下

对于含有 $mapper$ 和 $reducer$ 的任务

#!/bin/bash
HADOOP='/home/work/infra/infra-client/bin/hadoop'

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
	-D mapreduce.job.queuename=root.production.cloud_group.qabot  \
	-input /user/..../input-data \
	-output /user/.../output/\
	-mapper 'python mapper.py' \
	-reducer 'python reducer.py' \
	-file './mapper.py' \
        -file './reducer.py'

只有 $mapper$ 没有 $reducer$ 的任务

#!/bin/bash
HADOOP=/home/work/infra/infra-client/bin/hadoop

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
	-D mapreduce.job.queuename=root.production.cloud_group.qabot  \
	-input /user/..../input-data \
	-output /user/.../output/\
	-mapper 'python mapper.py' \
	-reducer 'cat' \
	-file './mapper.py' \

$nohup$ 中会实时更新集群对任务的处理进度,任务执行结束后会在指定的 $output$ 目录下输出一个 $part-00000$ 文件。

其中, -file 命令用于上传文件到 $Hadoop$ 集群,若 $mapper$ 或 $reducer$ 中读入了额外的文件,同样需要将该程序上传到 $Hadoop$ 集群上。

常用的 Hadoop Shell 命令

$Hadoop$ 所有的对文件的操作都需要调用文件系统,所以其命令的格式为

hadoop fs -shell

详细的命令可以查看这里: Hadoop Shell

get

hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>

复制文件到本地文件系统。可用 -ignorecrc 选项复制 CRC 校验失败的文件。使用 -crc 选项复制文件以及 CRC 信息。

hadoop fs -get /user/hadoop/file localfile
hadoop fs -get hdfs://host:port/user/hadoop/file localfile

put

hadoop fs -put <localsrc> ... <dst>

从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。

hadoop fs -put localfile /user/hadoop/hadoopfile
hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
hadoop fs -put - hdfs://host:port/hadoop/hadoopfile

参考文献:

[1] MapReduce: Simplified Data Processing on Large Clusters-Google

[2] python 写MapReduce函数 以WordCount为例-CNBolgs

[3] Hadoop Shell命令-Apache Hadoop


以上所述就是小编给大家介绍的《Python使用Hadoop集群》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Algorithms on Strings, Trees and Sequences

Algorithms on Strings, Trees and Sequences

Dan Gusfield / Cambridge University Press / 1997-5-28 / USD 99.99

String algorithms are a traditional area of study in computer science. In recent years their importance has grown dramatically with the huge increase of electronically stored text and of molecular seq......一起来看看 《Algorithms on Strings, Trees and Sequences》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具