Apache Spark编程教程

栏目: 编程工具 · 发布时间: 5年前

内容简介:Apache Spark是一个分布式计算平台,在当今非常流行,特别是因为与Hadoop mapreduce相比性能要好得多,Spark比基于磁盘的hadoop mapreduce 快了近100倍。让我们测试它并从头开始创建maven Apache Spark应用程序......Apache Spark可以以三种模式运行:我们将在这里测试第一个变体,其中包含经典的单词计数示例,用于保存在Hadoop HDFS文件系统中的文件

Apache Spark是一个分布式计算平台,在当今非常流行,特别是因为与Hadoop mapreduce相比性能要好得多,Spark比基于磁盘的hadoop mapreduce 快了近100倍。让我们测试它并从头开始创建maven Apache Spark应用程序......

Apache Spark可以以三种模式运行:

  • Apache Spark应用程序访问文件系统(本地文件系统, HDFS )。
  • Apache Spark应用程序访问 HBase 等分布式系统。
  • Yarn上的  Apache Spark应用程序。

我们将在这里测试第一个变体,其中包含经典的单词计数示例,用于保存在Hadoop HDFS文件系统中的文件

先决条件:

  • 安装了Hadoop,我的Mac上有Hadoop 2.6.0。要进行安装,请按照此处的说明进行操作...  在Mac上安装Hadoop
  • 将文本文件上传到HDFS。有关如何使用 HDFS的命令 ,请参阅 HDFS命令指南
  • 安装了Apache Spark。Spark可以在预构建版本中下载,也可以手动构建。在Mac OS X中,安装Apache Spark的最简单方法是使用自制程序。我有Apache Spark 1.3.0,可以很好地与Hadoop 2.6.0配合使用。请参阅 Spark独立模式
  • git clone https://bitbucket.org/tomask79/apache-spark-maven.git并运行mvn clean install

准备和启动Apache Spark和Hadoop集群

在进行任何测试之前,我们需要以下内容:

  • 我们需要运行Apache Spark master ...从文件夹/usr/local/Cellar/apache-spark/1.3.0/libexec/sbin运行以下命令:./start-master.sh

这将启动Apache Spark集群的主节点。Spark提供了出色的Web控制台来查看主节点属性...在浏览器中,点击以下URL:

http://localhost:8080/

为了能够在Apache Spark集群中运行任何我们的东西,我们需要在apache spark run以下命令的bin文件夹中添加至少一个worker来执行此操作:

spark-class org.apache.spark.deploy.worker.Worker spark://tomask79.local:7077

spark://tomask79.local:7077是我Mac上的主节点,你要根据localhost:8080中列出的主节点URL将其更改为你的主节点。

再次访问localhost:8080,您应该看到您的worker在employees表中列出...完美,Apache Spark集群已准备就绪!

我们要在HDFS文件系统上加载文件, 所以我们需要启动Hadoop集群,从文件夹/usr/local/Cellar/hadoop/2.6.0/libexec/sbin 启动hadoop集群:./start-all.sh

将应用程序提交到Apache Cluster并检查代码

在将每个Apache Spark应用程序提交到Apache集群之前,需要对其进行正确配置,让我们检查一下配置代码:

SparkConf sparkConf = <b>new</b> SparkConf();
    sparkConf.set(<font>"spark.cores.max"</font><font>, </font><font>"1"</font><font>);
    sparkConf.set(</font><font>"spark.executor.memory"</font><font>, </font><font>"2048M"</font><font>);
    sparkConf.set(</font><font>"spark.driver.memory"</font><font>, </font><font>"1024M"</font><font>);
    sparkConf.setAppName(</font><font>"JavaWordCount"</font><font>);
    sparkConf.setMaster(</font><font>"spark://tomask79.local:7077"</font><font>);
</font>

要查看Apache Spark应用程序属性的完整列表,请参阅以下链接: Spark配置

我们只是说使用setMaster方法设置主节点URL是必需的。其他选项是可选的。但是控制Apache Spark节点应用程序的资源量是很好的。否则你可能会遇到新奇的Apache Spark问题:

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

第二个非常重要的配置是以下代码:

JavaSparkContext ctx = <b>new</b> JavaSparkContext(sparkConf);
ctx.addJar(<font>"/Users/tomask79/Documents/workspace/apache-spark/spark/target/spark-0.0.1-SNAPSHOT.jar"</font><font>);
</font>

使用此代码,您将告诉Apache Spark驱动程序,他应该将jar带到哪个节点以运行代码计算(banq注:计算代码送到数据附近,云计算是数据送到计算代码附近)。否则你最终得到:

15/11/22 12:03:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.112): java.lang.ClassNotFoundException: com.myspark.test.spark.JavaWordCount$1
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

其余的代码是执行mapreduce字数统计逻辑的简单 RDD 命令,没什么大不了的,要理解它,请访问以下页面: Hadoop wordcount

现在您应该能够将JavaWordCount应用程序提交到Apache Spark集群,如果一切顺利,从IDE启动JavaWordCount类后,您应该看到输出。

关于maven依赖关系的建议

首先,请确保使用与安装的Apache Spark相同版本的spark_core依赖项。例如,我安装了Apache Spark 1.3.0,因此我的示例在pom.xml中使用

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>

我有与hadoop 2.6.0客户端的servlet API兼容性问题,所以我使用2.2.0 hadoop客户端,它可以与Apache Spark 1.3.0核心一起使用:

<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.2.0</version>
    </dependency>

用Apache Spark编程​​​​​​​

我们将尝试使用JavaRDD API解决简单任务。

Apache Spark JavaRDD和任务解决:如果你不知道map-reduce概念那么你就无法理解Apache Spark。为了证明这一点,让我们解决简单的任务。假设我们有以下带有温度的城市文本文件:

Prague 35
Madrid 40
Berlin 20
Paris 15
Rome 25

位于Apache Hadoop HDFS文件系统,我们需要编写简单的JavaRDD Apache Spark程序来打印具有温度低于整个平均温度的城市。

JavaRDD API和MapReduce有区别吗?

要解决前面提到的任务,我们需要将问题分成以下几部分:

  • 首先,我们需要编写JavaRDD程序来计算温度和平均值的总和。
  • 然后我们要打印温度低于计算平均值的行。

MapReduce解决方案概念:

如果我们使用Spring Data for Hadoop或为map-reduce程序指定的简单Apache Hadoop API,那么我们的解决方案将是:

  • Map函数将创建键[K,V] ='reducer',town.temperature
  • Reduce功能将接收先前的键并将整个组的温度相加并计算平均温度。
  • 链式map-reduce减少任务将打印温度低于平均值的城镇的结果。

Apache Spark JavaRDD解决方案:

  • 首先,我们需要通过将map函数应用于输入RDD集来获取所有行的JavaRDD温度集:
JavaRDD<String> parsedTemperatures = lines.map(<b>new</b> Function<String, String>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> String call(String v1) throws Exception {
            <b>final</b> String arr[] = SPACE.split(v1);
            System.out.println(<font>"Reading temperature ["</font><font>+arr[1]+</font><font>"] from "</font><font>+v1);
            <b>return</b> arr[1];
        }
    });
</font>
  • 然后我们需要将此RDD集转换为CONSTANT.row.temperature表单以将数据准备到reducer中:
JavaPairRDD<String, Integer> forGroup = parsedTemperatures.mapToPair(
            <b>new</b> PairFunction<String, 
            String, Integer>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> Tuple2<String, Integer> call(String t) throws Exception {
            <b>return</b> <b>new</b> Tuple2<String, Integer>(<font>"reducer"</font><font>, Integer.parseInt(t));
        }
    });
</font>
  • 有了这个数据集,我们就为reducer准备了数据,它将聚合所有温度
JavaPairRDD<String, Integer> counts = forGroup.reduceByKey(
            <b>new</b> Function2<Integer, Integer, Integer>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> Integer call(Integer v1, Integer v2) throws Exception {
              System.out.println(<font>"Agregatting "</font><font>+v1+</font><font>" plus "</font><font>+v2);
              <b>return</b> v1 + v2;
        }
    });
</font>

(很像map-reduce概念)

要了解Spark减速器的工作原理,请查看日志:

Reading temperature [35] from Prague 35
Reading temperature [40] from Madrid 40
Agregatting 35 plus 40
Reading temperature [20] from Berlin 20
Agregatting 75 plus 20
Reading temperature [15] from Paris 15
Agregatting 95 plus 15
Reading temperature [25] from Rome 25
Agregatting 110 plus 25

Spark实际上并行运行前三个函数map,mapToPair和reduceByKey!DAG图形分析器组合Spark任务的好处之一!

解决方案的第二部分是打印温度低于平均温度的所有城镇:

Tuple2<String, Integer> sumTemperatures = counts.first();    
    <b>final</b> Integer sum = sumTemperatures._2;
    <b>final</b> <b>long</b> count = parsedTemperatures.count();
    <b>final</b> <b>double</b> avg = (<b>double</b>) sum / count;
    System.out.println(<font>"Average temperature "</font><font>+avg);

    JavaRDD<String> result = lines.filter(<b>new</b> Function<String, Boolean>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> Boolean call(String v1) throws Exception {
            <b>final</b> String arr[] = SPACE.split(v1);
            <b>long</b> temperature = Long.parseLong(arr[1]);
            <b>return</b> temperature <= avg;
        }
    });

    List<String> resultList = result.collect();
    <b>for</b> (String item: resultList) {
        System.out.println(</font><font>"Result item: "</font><font>+item);
    }
</font>

让我们解释一下这段代码:

  • 通过counts.first()我们从reducer中读取所有温度的总和
  • 我们使用count函数来获取JavaRDD输入集中所有行的计数。
  • 我们使用JavaRDD过滤功能来过滤掉温度高于平均值的城镇。
  • 我们使用JavaRDD collect函数来打印结果。

如果你运行这个程序,你应该得到如下结果:

16/03/03 21:02:26 INFO DAGScheduler: Job 1 finished: count at AvgTemperatureAnalyzer.java:85, took 0,094561 s

Average temperature 27.0
.
.
Result item: Berlin 20
Result item: Paris 15
Result item: Rome 25

16/03/03 21:02:26 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,<b>null</b>}

结论

从我的观点来看,Apache Spark比map-reduce编程更加友好,即使概念是相同的。我打赌你明白我们需要通过JavaRDD输入进行多次迭代,但是使用map-reduce你需要弄清楚如何将前一个map reduce任务的结果传递给下一个,Apache Spark一个输入迭代以新的RDD设置,您可以在其中应用其他功能,从主节点驱动的所有内容......这不是很酷吗?

代码

让我们看看如何使用Apache Spark Shared Variables使代码看起来更干净。

Apache Spark中的共享变量

通常在编写像map这样的Spark动作时,驱动程序中的任何传递的变量都会转换为远程工作人员的本地副本。并且远程工作人员对它们的任何更改都不会传播回驱动程序。为了克服这个事实,Apache Spark提供了共享变量的概念。我们有两种类型:

1. 广播变量

当您需要跨多个Spark操作的相同数据时,广播变量非常有用。通过调用SparkContext.broadcast(v),“v”数据然后仅以序列化形式发送给远程工作者,这加快了整个过程。

2.Acumulators累加器

变量只能是我们“添加”的东西,因此它们完全可用于并行处理。重要的是要记住,远程工作人员无法读取其值,只能添加它们。最终,只有驱动程序可以使用其值进行操作。

使用共享变量的演示

Acumulators累加器是解决我们所有行的所有温度总和问题的完美之选。没有必要使用map-reduce概念,就像我在之前的Spark编程部分中所做的那样,我们将简单地使用RDD forEach函数将解析后的温度发送到累加器。

<b>final</b> Accumulator<Integer> tempSum = ctx.accumulator(0);

    lines.foreach(<b>new</b> VoidFunction<String>() {
        <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L;

        <b>public</b> <b>void</b> call(String t) throws Exception {
            tempSum.add(Integer.valueOf(SPACE.split(t)[1]));
        }
    });

然后计算平均温度如下:

<b>final</b> Integer sum = tempSum.value();
    <b>final</b> <b>long</b> count = lines.count();
    <b>final</b> <b>double</b> avg = (<b>double</b>) sum / count;

如果您以通常的方式编译并运行此 repo ,您​​应该看到以下结果,就像之前的情况一样:

Average temperature 27.0
.
.
Result item: Berlin 20
Result item: Paris 15
Result item: Rome 25

点击标题见原文


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Linux Command Line

The Linux Command Line

William E. Shotts Jr. / No Starch Press, Incorporated / 2012-1-17 / USD 39.95

You've experienced the shiny, point-and-click surface of your Linux computer-now dive below and explore its depths with the power of the command line. The Linux Command Line takes you from your very ......一起来看看 《The Linux Command Line》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具