Python使用Hadoop集群

栏目: Python · 发布时间: 5年前

内容简介:$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标

Python使用Hadoop集群

$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。

编写MapReduce函数

以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标准输入($sys.stdin$)、标准输出($sys.stdout$)在 $Map$ 函数和 $Reduce$ 函数之间传递数据,其余的事情 $Hadoop$ 流将会帮助我们完成。

Mapper部分

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)

文件从标准输入 sys.stdin 读取文件后把单词切开,并把单词和词频输出标准输出。$mapper$ 不计算单词的总数,而是输出 (word, 1) ,方便让随后的 $reducer$ 做统计工作。

对文件赋权以使其可以运行

chmod +x mapper.py

Reducer部分

!/usr/bin/env python
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:
    print "%s\t%s" % (current_word, current_count)

按照 $Hadoop$ 的设计理念,$mapper$ 输出的相同的 $key/value$ 数据会存储在同一个 $part$ 上,并且 $mapper$ 的输出会自动作为 $reducer$ 的输入。在这个需求中,$reducer$ 要做的事情只是依次接受集群的标准输入,并统计连续单词的个数,因为在这个例子中,$key$ 代表的就是单词。

同样对文件赋权以使其可以运行

chmod +x reducer.py

本地测试

通常在把任务提交给 $Hadoop$ 集群执行的之前,都进行一次本地测试。本地与 $Hadoop$ 环境不同之处在于我们需要手动对 $mapper$ 的输出按照 $key$ 来进行排序,模拟 $Hadoop$ 集群场景。

cat data | mapper | sort | reducer > output

实例:

cat news.merge.0.json | python mapper.py | sort -k1,1 | python reducer.py > ret
cat file_input | python mapper.py > ret

第一行是对当前统计单词个数的测试命令,可以拓展到含有 $mapper$ 和 $reducer$ 的任务中,第二行是对 $mapper$ 的单独测试,同样也可是应用到只有 $mapper$ 没有 $reducer$ 的任务中。

使用 Hadoop 集群执行任务

集群运行

在把任务提交给集群执行时,我们常常将诸多命令合并到一个脚本文件中,一次性执行所有内容

nohup sh -x hadoop.sh &

$hadoop.sh$ 的书写方法如下

对于含有 $mapper$ 和 $reducer$ 的任务

#!/bin/bash
HADOOP='/home/work/infra/infra-client/bin/hadoop'

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
	-D mapreduce.job.queuename=root.production.cloud_group.qabot  \
	-input /user/..../input-data \
	-output /user/.../output/\
	-mapper 'python mapper.py' \
	-reducer 'python reducer.py' \
	-file './mapper.py' \
        -file './reducer.py'

只有 $mapper$ 没有 $reducer$ 的任务

#!/bin/bash
HADOOP=/home/work/infra/infra-client/bin/hadoop

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
	-D mapreduce.job.queuename=root.production.cloud_group.qabot  \
	-input /user/..../input-data \
	-output /user/.../output/\
	-mapper 'python mapper.py' \
	-reducer 'cat' \
	-file './mapper.py' \

$nohup$ 中会实时更新集群对任务的处理进度,任务执行结束后会在指定的 $output$ 目录下输出一个 $part-00000$ 文件。

其中, -file 命令用于上传文件到 $Hadoop$ 集群,若 $mapper$ 或 $reducer$ 中读入了额外的文件,同样需要将该程序上传到 $Hadoop$ 集群上。

常用的 Hadoop Shell 命令

$Hadoop$ 所有的对文件的操作都需要调用文件系统,所以其命令的格式为

hadoop fs -shell

详细的命令可以查看这里: Hadoop Shell

get

hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>

复制文件到本地文件系统。可用 -ignorecrc 选项复制 CRC 校验失败的文件。使用 -crc 选项复制文件以及 CRC 信息。

hadoop fs -get /user/hadoop/file localfile
hadoop fs -get hdfs://host:port/user/hadoop/file localfile

put

hadoop fs -put <localsrc> ... <dst>

从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。

hadoop fs -put localfile /user/hadoop/hadoopfile
hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
hadoop fs -put - hdfs://host:port/hadoop/hadoopfile

参考文献:

[1] MapReduce: Simplified Data Processing on Large Clusters-Google

[2] python 写MapReduce函数 以WordCount为例-CNBolgs

[3] Hadoop Shell命令-Apache Hadoop


以上所述就是小编给大家介绍的《Python使用Hadoop集群》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web程序设计

Web程序设计

塞巴斯塔 / 2008-6 / 68.00元

《Web程序设计(第4版)》是最新版,介绍了Internet和万维网的起源及演变过程,全面系统地讨论了Web开发相关的主要编程语言和工具,以及这些语言和工具之间的相互影响及优劣势。该书对全书内容进行了很多修订,并新增加了关于Ruby、Rails和Ajax的3个章节。一起来看看 《Web程序设计》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换