Apache Spark编程教程

栏目: 编程工具 · 发布时间: 5年前

内容简介:Apache Spark是一个分布式计算平台,在当今非常流行,特别是因为与Hadoop mapreduce相比性能要好得多,Spark比基于磁盘的hadoop mapreduce 快了近100倍。让我们测试它并从头开始创建maven Apache Spark应用程序......Apache Spark可以以三种模式运行:我们将在这里测试第一个变体,其中包含经典的单词计数示例,用于保存在Hadoop HDFS文件系统中的文件

Apache Spark是一个分布式计算平台,在当今非常流行,特别是因为与Hadoop mapreduce相比性能要好得多,Spark比基于磁盘的hadoop mapreduce 快了近100倍。让我们测试它并从头开始创建maven Apache Spark应用程序......

Apache Spark可以以三种模式运行:

  • Apache Spark应用程序访问文件系统(本地文件系统, HDFS )。
  • Apache Spark应用程序访问 HBase 等分布式系统。
  • Yarn上的  Apache Spark应用程序。

我们将在这里测试第一个变体,其中包含经典的单词计数示例,用于保存在Hadoop HDFS文件系统中的文件

先决条件:

  • 安装了Hadoop,我的Mac上有Hadoop 2.6.0。要进行安装,请按照此处的说明进行操作...  在Mac上安装Hadoop
  • 将文本文件上传到HDFS。有关如何使用 HDFS的命令 ,请参阅 HDFS命令指南
  • 安装了Apache Spark。Spark可以在预构建版本中下载,也可以手动构建。在Mac OS X中,安装Apache Spark的最简单方法是使用自制程序。我有Apache Spark 1.3.0,可以很好地与Hadoop 2.6.0配合使用。请参阅 Spark独立模式
  • git clone https://bitbucket.org/tomask79/apache-spark-maven.git并运行mvn clean install

准备和启动Apache Spark和Hadoop集群

在进行任何测试之前,我们需要以下内容:

  • 我们需要运行Apache Spark master ...从文件夹/usr/local/Cellar/apache-spark/1.3.0/libexec/sbin运行以下命令:./start-master.sh

这将启动Apache Spark集群的主节点。Spark提供了出色的Web控制台来查看主节点属性...在浏览器中,点击以下URL:

http://localhost:8080/

为了能够在Apache Spark集群中运行任何我们的东西,我们需要在apache spark run以下命令的bin文件夹中添加至少一个worker来执行此操作:

spark-class org.apache.spark.deploy.worker.Worker spark://tomask79.local:7077

spark://tomask79.local:7077是我Mac上的主节点,你要根据localhost:8080中列出的主节点URL将其更改为你的主节点。

再次访问localhost:8080,您应该看到您的worker在employees表中列出...完美,Apache Spark集群已准备就绪!

我们要在HDFS文件系统上加载文件, 所以我们需要启动Hadoop集群,从文件夹/usr/local/Cellar/hadoop/2.6.0/libexec/sbin 启动hadoop集群:./start-all.sh

将应用程序提交到Apache Cluster并检查代码

在将每个Apache Spark应用程序提交到Apache集群之前,需要对其进行正确配置,让我们检查一下配置代码:

SparkConf sparkConf = <b>new</b> SparkConf();
    sparkConf.set(<font>"spark.cores.max"</font><font>, </font><font>"1"</font><font>);
    sparkConf.set(</font><font>"spark.executor.memory"</font><font>, </font><font>"2048M"</font><font>);
    sparkConf.set(</font><font>"spark.driver.memory"</font><font>, </font><font>"1024M"</font><font>);
    sparkConf.setAppName(</font><font>"JavaWordCount"</font><font>);
    sparkConf.setMaster(</font><font>"spark://tomask79.local:7077"</font><font>);
</font>

要查看Apache Spark应用程序属性的完整列表,请参阅以下链接: Spark配置

我们只是说使用setMaster方法设置主节点URL是必需的。其他选项是可选的。但是控制Apache Spark节点应用程序的资源量是很好的。否则你可能会遇到新奇的Apache Spark问题:

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

第二个非常重要的配置是以下代码:

JavaSparkContext ctx = <b>new</b> JavaSparkContext(sparkConf);
ctx.addJar(<font>"/Users/tomask79/Documents/workspace/apache-spark/spark/target/spark-0.0.1-SNAPSHOT.jar"</font><font>);
</font>

使用此代码,您将告诉Apache Spark驱动程序,他应该将jar带到哪个节点以运行代码计算(banq注:计算代码送到数据附近,云计算是数据送到计算代码附近)。否则你最终得到:

15/11/22 12:03:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.112): java.lang.ClassNotFoundException: com.myspark.test.spark.JavaWordCount$1
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

其余的代码是执行mapreduce字数统计逻辑的简单 RDD 命令,没什么大不了的,要理解它,请访问以下页面: Hadoop wordcount

现在您应该能够将JavaWordCount应用程序提交到Apache Spark集群,如果一切顺利,从IDE启动JavaWordCount类后,您应该看到输出。

关于maven依赖关系的建议

首先,请确保使用与安装的Apache Spark相同版本的spark_core依赖项。例如,我安装了Apache Spark 1.3.0,因此我的示例在pom.xml中使用

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>

我有与hadoop 2.6.0客户端的servlet API兼容性问题,所以我使用2.2.0 hadoop客户端,它可以与Apache Spark 1.3.0核心一起使用:

<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.2.0</version>
    </dependency>

用Apache Spark编程​​​​​​​

我们将尝试使用JavaRDD API解决简单任务。

Apache Spark JavaRDD和任务解决:如果你不知道map-reduce概念那么你就无法理解Apache Spark。为了证明这一点,让我们解决简单的任务。假设我们有以下带有温度的城市文本文件:

Prague 35
Madrid 40
Berlin 20
Paris 15
Rome 25

位于Apache Hadoop HDFS文件系统,我们需要编写简单的JavaRDD Apache Spark程序来打印具有温度低于整个平均温度的城市。

JavaRDD API和MapReduce有区别吗?

要解决前面提到的任务,我们需要将问题分成以下几部分:

  • 首先,我们需要编写JavaRDD程序来计算温度和平均值的总和。
  • 然后我们要打印温度低于计算平均值的行。

MapReduce解决方案概念:

如果我们使用Spring Data for Hadoop或为map-reduce程序指定的简单Apache Hadoop API,那么我们的解决方案将是:

  • Map函数将创建键[K,V] ='reducer',town.temperature
  • Reduce功能将接收先前的键并将整个组的温度相加并计算平均温度。
  • 链式map-reduce减少任务将打印温度低于平均值的城镇的结果。

Apache Spark JavaRDD解决方案:

  • 首先,我们需要通过将map函数应用于输入RDD集来获取所有行的JavaRDD温度集:
JavaRDD<String> parsedTemperatures = lines.map(<b>new</b> Function<String, String>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> String call(String v1) throws Exception {
            <b>final</b> String arr[] = SPACE.split(v1);
            System.out.println(<font>"Reading temperature ["</font><font>+arr[1]+</font><font>"] from "</font><font>+v1);
            <b>return</b> arr[1];
        }
    });
</font>
  • 然后我们需要将此RDD集转换为CONSTANT.row.temperature表单以将数据准备到reducer中:
JavaPairRDD<String, Integer> forGroup = parsedTemperatures.mapToPair(
            <b>new</b> PairFunction<String, 
            String, Integer>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> Tuple2<String, Integer> call(String t) throws Exception {
            <b>return</b> <b>new</b> Tuple2<String, Integer>(<font>"reducer"</font><font>, Integer.parseInt(t));
        }
    });
</font>
  • 有了这个数据集,我们就为reducer准备了数据,它将聚合所有温度
JavaPairRDD<String, Integer> counts = forGroup.reduceByKey(
            <b>new</b> Function2<Integer, Integer, Integer>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> Integer call(Integer v1, Integer v2) throws Exception {
              System.out.println(<font>"Agregatting "</font><font>+v1+</font><font>" plus "</font><font>+v2);
              <b>return</b> v1 + v2;
        }
    });
</font>

(很像map-reduce概念)

要了解Spark减速器的工作原理,请查看日志:

Reading temperature [35] from Prague 35
Reading temperature [40] from Madrid 40
Agregatting 35 plus 40
Reading temperature [20] from Berlin 20
Agregatting 75 plus 20
Reading temperature [15] from Paris 15
Agregatting 95 plus 15
Reading temperature [25] from Rome 25
Agregatting 110 plus 25

Spark实际上并行运行前三个函数map,mapToPair和reduceByKey!DAG图形分析器组合Spark任务的好处之一!

解决方案的第二部分是打印温度低于平均温度的所有城镇:

Tuple2<String, Integer> sumTemperatures = counts.first();    
    <b>final</b> Integer sum = sumTemperatures._2;
    <b>final</b> <b>long</b> count = parsedTemperatures.count();
    <b>final</b> <b>double</b> avg = (<b>double</b>) sum / count;
    System.out.println(<font>"Average temperature "</font><font>+avg);

    JavaRDD<String> result = lines.filter(<b>new</b> Function<String, Boolean>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> Boolean call(String v1) throws Exception {
            <b>final</b> String arr[] = SPACE.split(v1);
            <b>long</b> temperature = Long.parseLong(arr[1]);
            <b>return</b> temperature <= avg;
        }
    });

    List<String> resultList = result.collect();
    <b>for</b> (String item: resultList) {
        System.out.println(</font><font>"Result item: "</font><font>+item);
    }
</font>

让我们解释一下这段代码:

  • 通过counts.first()我们从reducer中读取所有温度的总和
  • 我们使用count函数来获取JavaRDD输入集中所有行的计数。
  • 我们使用JavaRDD过滤功能来过滤掉温度高于平均值的城镇。
  • 我们使用JavaRDD collect函数来打印结果。

如果你运行这个程序,你应该得到如下结果:

16/03/03 21:02:26 INFO DAGScheduler: Job 1 finished: count at AvgTemperatureAnalyzer.java:85, took 0,094561 s

Average temperature 27.0
.
.
Result item: Berlin 20
Result item: Paris 15
Result item: Rome 25

16/03/03 21:02:26 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,<b>null</b>}

结论

从我的观点来看,Apache Spark比map-reduce编程更加友好,即使概念是相同的。我打赌你明白我们需要通过JavaRDD输入进行多次迭代,但是使用map-reduce你需要弄清楚如何将前一个map reduce任务的结果传递给下一个,Apache Spark一个输入迭代以新的RDD设置,您可以在其中应用其他功能,从主节点驱动的所有内容......这不是很酷吗?

代码

让我们看看如何使用Apache Spark Shared Variables使代码看起来更干净。

Apache Spark中的共享变量

通常在编写像map这样的Spark动作时,驱动程序中的任何传递的变量都会转换为远程工作人员的本地副本。并且远程工作人员对它们的任何更改都不会传播回驱动程序。为了克服这个事实,Apache Spark提供了共享变量的概念。我们有两种类型:

1. 广播变量

当您需要跨多个Spark操作的相同数据时,广播变量非常有用。通过调用SparkContext.broadcast(v),“v”数据然后仅以序列化形式发送给远程工作者,这加快了整个过程。

2.Acumulators累加器

变量只能是我们“添加”的东西,因此它们完全可用于并行处理。重要的是要记住,远程工作人员无法读取其值,只能添加它们。最终,只有驱动程序可以使用其值进行操作。

使用共享变量的演示

Acumulators累加器是解决我们所有行的所有温度总和问题的完美之选。没有必要使用map-reduce概念,就像我在之前的Spark编程部分中所做的那样,我们将简单地使用RDD forEach函数将解析后的温度发送到累加器。

<b>final</b> Accumulator<Integer> tempSum = ctx.accumulator(0);

    lines.foreach(<b>new</b> VoidFunction<String>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> <b>void</b> call(String t) throws Exception {
            tempSum.add(Integer.valueOf(SPACE.split(t)[1]));
        }
    });

然后计算平均温度如下:

<b>final</b> Integer sum = tempSum.value();
    <b>final</b> <b>long</b> count = lines.count();
    <b>final</b> <b>double</b> avg = (<b>double</b>) sum / count;

如果您以通常的方式编译并运行此 repo ,您​​应该看到以下结果,就像之前的情况一样:

Average temperature 27.0
.
.
Result item: Berlin 20
Result item: Paris 15
Result item: Rome 25

点击标题见原文


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

精通Python设计模式

精通Python设计模式

[荷] Sakis Kasampalis / 夏永锋 / 人民邮电出版社 / 2016-7 / 45.00元

本书分三部分、共16章介绍一些常用的设计模式。第一部分介绍处理对象创建的设计模式,包括工厂模式、建造者模式、原型模式;第二部分介绍处理一个系统中不同实体(类、对象等)之间关系的设计模式,包括外观模式、享元模式等;第三部分介绍处理系统实体之间通信的设计模式,包括责任链模式、观察者模式等。一起来看看 《精通Python设计模式》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具