Python使用Hadoop集群

栏目: Python · 发布时间: 5年前

内容简介:$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标

Python使用Hadoop集群

$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。

编写MapReduce函数

以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标准输入($sys.stdin$)、标准输出($sys.stdout$)在 $Map$ 函数和 $Reduce$ 函数之间传递数据,其余的事情 $Hadoop$ 流将会帮助我们完成。

Mapper部分

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)

文件从标准输入 sys.stdin 读取文件后把单词切开,并把单词和词频输出标准输出。$mapper$ 不计算单词的总数,而是输出 (word, 1) ,方便让随后的 $reducer$ 做统计工作。

对文件赋权以使其可以运行

chmod +x mapper.py

Reducer部分

!/usr/bin/env python
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:
    print "%s\t%s" % (current_word, current_count)

按照 $Hadoop$ 的设计理念,$mapper$ 输出的相同的 $key/value$ 数据会存储在同一个 $part$ 上,并且 $mapper$ 的输出会自动作为 $reducer$ 的输入。在这个需求中,$reducer$ 要做的事情只是依次接受集群的标准输入,并统计连续单词的个数,因为在这个例子中,$key$ 代表的就是单词。

同样对文件赋权以使其可以运行

chmod +x reducer.py

本地测试

通常在把任务提交给 $Hadoop$ 集群执行的之前,都进行一次本地测试。本地与 $Hadoop$ 环境不同之处在于我们需要手动对 $mapper$ 的输出按照 $key$ 来进行排序,模拟 $Hadoop$ 集群场景。

cat data | mapper | sort | reducer > output

实例:

cat news.merge.0.json | python mapper.py | sort -k1,1 | python reducer.py > ret
cat file_input | python mapper.py > ret

第一行是对当前统计单词个数的测试命令,可以拓展到含有 $mapper$ 和 $reducer$ 的任务中,第二行是对 $mapper$ 的单独测试,同样也可是应用到只有 $mapper$ 没有 $reducer$ 的任务中。

使用 Hadoop 集群执行任务

集群运行

在把任务提交给集群执行时,我们常常将诸多命令合并到一个脚本文件中,一次性执行所有内容

nohup sh -x hadoop.sh &

$hadoop.sh$ 的书写方法如下

对于含有 $mapper$ 和 $reducer$ 的任务

#!/bin/bash
HADOOP='/home/work/infra/infra-client/bin/hadoop'

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
	-D mapreduce.job.queuename=root.production.cloud_group.qabot  \
	-input /user/..../input-data \
	-output /user/.../output/\
	-mapper 'python mapper.py' \
	-reducer 'python reducer.py' \
	-file './mapper.py' \
        -file './reducer.py'

只有 $mapper$ 没有 $reducer$ 的任务

#!/bin/bash
HADOOP=/home/work/infra/infra-client/bin/hadoop

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
	-D mapreduce.job.queuename=root.production.cloud_group.qabot  \
	-input /user/..../input-data \
	-output /user/.../output/\
	-mapper 'python mapper.py' \
	-reducer 'cat' \
	-file './mapper.py' \

$nohup$ 中会实时更新集群对任务的处理进度,任务执行结束后会在指定的 $output$ 目录下输出一个 $part-00000$ 文件。

其中, -file 命令用于上传文件到 $Hadoop$ 集群,若 $mapper$ 或 $reducer$ 中读入了额外的文件,同样需要将该程序上传到 $Hadoop$ 集群上。

常用的 Hadoop Shell 命令

$Hadoop$ 所有的对文件的操作都需要调用文件系统,所以其命令的格式为

hadoop fs -shell

详细的命令可以查看这里: Hadoop Shell

get

hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>

复制文件到本地文件系统。可用 -ignorecrc 选项复制 CRC 校验失败的文件。使用 -crc 选项复制文件以及 CRC 信息。

hadoop fs -get /user/hadoop/file localfile
hadoop fs -get hdfs://host:port/user/hadoop/file localfile

put

hadoop fs -put <localsrc> ... <dst>

从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。

hadoop fs -put localfile /user/hadoop/hadoopfile
hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
hadoop fs -put - hdfs://host:port/hadoop/hadoopfile

参考文献:

[1] MapReduce: Simplified Data Processing on Large Clusters-Google

[2] python 写MapReduce函数 以WordCount为例-CNBolgs

[3] Hadoop Shell命令-Apache Hadoop


以上所述就是小编给大家介绍的《Python使用Hadoop集群》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

跨越

跨越

Lydia / 人民文学出版社 / 2018-4-1 / 39

三跨青年Lydia的一线奋斗笔记,抛却艰深理论,用亲身经验为你打通任督二脉。 揭开思维认知盲区,剖析成长潜在技巧,探知进阶背后逻辑,在拐点到来的时刻,推动人生加速上行。 10大职场潜在成长技巧,13种打破思维认知的的碎片重建,15种正确面对情感的能量释放, 38篇有世界 观,有方法论的故事,为你打开上升通道。 停留在思维层面的改变人生,其实已然陷入困境, 人生上行的实......一起来看看 《跨越》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

MD5 加密
MD5 加密

MD5 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器