内容简介:Apache Spark是一个分布式计算平台,在当今非常流行,特别是因为与Hadoop mapreduce相比性能要好得多,Spark比基于磁盘的hadoop mapreduce 快了近100倍。让我们测试它并从头开始创建maven Apache Spark应用程序......Apache Spark可以以三种模式运行:我们将在这里测试第一个变体,其中包含经典的单词计数示例,用于保存在Hadoop HDFS文件系统中的文件
Apache Spark是一个分布式计算平台,在当今非常流行,特别是因为与Hadoop mapreduce相比性能要好得多,Spark比基于磁盘的hadoop mapreduce 快了近100倍。让我们测试它并从头开始创建maven Apache Spark应用程序......
Apache Spark可以以三种模式运行:
我们将在这里测试第一个变体,其中包含经典的单词计数示例,用于保存在Hadoop HDFS文件系统中的文件
先决条件:
- 安装了Hadoop,我的Mac上有Hadoop 2.6.0。要进行安装,请按照此处的说明进行操作... 在Mac上安装Hadoop
- 将文本文件上传到HDFS。有关如何使用 HDFS的命令 ,请参阅 HDFS命令指南
- 安装了Apache Spark。Spark可以在预构建版本中下载,也可以手动构建。在Mac OS X中,安装Apache Spark的最简单方法是使用自制程序。我有Apache Spark 1.3.0,可以很好地与Hadoop 2.6.0配合使用。请参阅 Spark独立模式
- git clone https://bitbucket.org/tomask79/apache-spark-maven.git并运行mvn clean install
准备和启动Apache Spark和Hadoop集群
在进行任何测试之前,我们需要以下内容:
- 我们需要运行Apache Spark master ...从文件夹/usr/local/Cellar/apache-spark/1.3.0/libexec/sbin运行以下命令:./start-master.sh
这将启动Apache Spark集群的主节点。Spark提供了出色的Web控制台来查看主节点属性...在浏览器中,点击以下URL:
http://localhost:8080/
为了能够在Apache Spark集群中运行任何我们的东西,我们需要在apache spark run以下命令的bin文件夹中添加至少一个worker来执行此操作:
spark-class org.apache.spark.deploy.worker.Worker spark://tomask79.local:7077
spark://tomask79.local:7077是我Mac上的主节点,你要根据localhost:8080中列出的主节点URL将其更改为你的主节点。
再次访问localhost:8080,您应该看到您的worker在employees表中列出...完美,Apache Spark集群已准备就绪!
我们要在HDFS文件系统上加载文件, 所以我们需要启动Hadoop集群,从文件夹/usr/local/Cellar/hadoop/2.6.0/libexec/sbin 启动hadoop集群:./start-all.sh
将应用程序提交到Apache Cluster并检查代码
在将每个Apache Spark应用程序提交到Apache集群之前,需要对其进行正确配置,让我们检查一下配置代码:
SparkConf sparkConf = <b>new</b> SparkConf(); sparkConf.set(<font>"spark.cores.max"</font><font>, </font><font>"1"</font><font>); sparkConf.set(</font><font>"spark.executor.memory"</font><font>, </font><font>"2048M"</font><font>); sparkConf.set(</font><font>"spark.driver.memory"</font><font>, </font><font>"1024M"</font><font>); sparkConf.setAppName(</font><font>"JavaWordCount"</font><font>); sparkConf.setMaster(</font><font>"spark://tomask79.local:7077"</font><font>); </font>
要查看Apache Spark应用程序属性的完整列表,请参阅以下链接: Spark配置
我们只是说使用setMaster方法设置主节点URL是必需的。其他选项是可选的。但是控制Apache Spark节点应用程序的资源量是很好的。否则你可能会遇到新奇的Apache Spark问题:
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
第二个非常重要的配置是以下代码:
JavaSparkContext ctx = <b>new</b> JavaSparkContext(sparkConf); ctx.addJar(<font>"/Users/tomask79/Documents/workspace/apache-spark/spark/target/spark-0.0.1-SNAPSHOT.jar"</font><font>); </font>
使用此代码,您将告诉Apache Spark驱动程序,他应该将jar带到哪个节点以运行代码计算(banq注:计算代码送到数据附近,云计算是数据送到计算代码附近)。否则你最终得到:
15/11/22 12:03:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.112): java.lang.ClassNotFoundException: com.myspark.test.spark.JavaWordCount$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
其余的代码是执行mapreduce字数统计逻辑的简单 RDD 命令,没什么大不了的,要理解它,请访问以下页面: Hadoop wordcount
现在您应该能够将JavaWordCount应用程序提交到Apache Spark集群,如果一切顺利,从IDE启动JavaWordCount类后,您应该看到输出。
关于maven依赖关系的建议
首先,请确保使用与安装的Apache Spark相同版本的spark_core依赖项。例如,我安装了Apache Spark 1.3.0,因此我的示例在pom.xml中使用
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.0</version> </dependency>
我有与hadoop 2.6.0客户端的servlet API兼容性问题,所以我使用2.2.0 hadoop客户端,它可以与Apache Spark 1.3.0核心一起使用:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.2.0</version> </dependency>
用Apache Spark编程
我们将尝试使用JavaRDD API解决简单任务。
Apache Spark JavaRDD和任务解决:如果你不知道map-reduce概念那么你就无法理解Apache Spark。为了证明这一点,让我们解决简单的任务。假设我们有以下带有温度的城市文本文件:
Prague 35 Madrid 40 Berlin 20 Paris 15 Rome 25
位于Apache Hadoop HDFS文件系统,我们需要编写简单的JavaRDD Apache Spark程序来打印具有温度低于整个平均温度的城市。
JavaRDD API和MapReduce有区别吗?
要解决前面提到的任务,我们需要将问题分成以下几部分:
- 首先,我们需要编写JavaRDD程序来计算温度和平均值的总和。
- 然后我们要打印温度低于计算平均值的行。
MapReduce解决方案概念:
如果我们使用Spring Data for Hadoop或为map-reduce程序指定的简单Apache Hadoop API,那么我们的解决方案将是:
- Map函数将创建键[K,V] ='reducer',town.temperature
- Reduce功能将接收先前的键并将整个组的温度相加并计算平均温度。
- 链式map-reduce减少任务将打印温度低于平均值的城镇的结果。
Apache Spark JavaRDD解决方案:
- 首先,我们需要通过将map函数应用于输入RDD集来获取所有行的JavaRDD温度集:
JavaRDD<String> parsedTemperatures = lines.map(<b>new</b> Function<String, String>() { <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L; <b>public</b> String call(String v1) throws Exception { <b>final</b> String arr[] = SPACE.split(v1); System.out.println(<font>"Reading temperature ["</font><font>+arr[1]+</font><font>"] from "</font><font>+v1); <b>return</b> arr[1]; } }); </font>
- 然后我们需要将此RDD集转换为CONSTANT.row.temperature表单以将数据准备到reducer中:
JavaPairRDD<String, Integer> forGroup = parsedTemperatures.mapToPair( <b>new</b> PairFunction<String, String, Integer>() { <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L; <b>public</b> Tuple2<String, Integer> call(String t) throws Exception { <b>return</b> <b>new</b> Tuple2<String, Integer>(<font>"reducer"</font><font>, Integer.parseInt(t)); } }); </font>
- 有了这个数据集,我们就为reducer准备了数据,它将聚合所有温度
JavaPairRDD<String, Integer> counts = forGroup.reduceByKey( <b>new</b> Function2<Integer, Integer, Integer>() { <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L; <b>public</b> Integer call(Integer v1, Integer v2) throws Exception { System.out.println(<font>"Agregatting "</font><font>+v1+</font><font>" plus "</font><font>+v2); <b>return</b> v1 + v2; } }); </font>
(很像map-reduce概念)
要了解Spark减速器的工作原理,请查看日志:
Reading temperature [35] from Prague 35 Reading temperature [40] from Madrid 40 Agregatting 35 plus 40 Reading temperature [20] from Berlin 20 Agregatting 75 plus 20 Reading temperature [15] from Paris 15 Agregatting 95 plus 15 Reading temperature [25] from Rome 25 Agregatting 110 plus 25
Spark实际上并行运行前三个函数map,mapToPair和reduceByKey!DAG图形分析器组合Spark任务的好处之一!
解决方案的第二部分是打印温度低于平均温度的所有城镇:
Tuple2<String, Integer> sumTemperatures = counts.first(); <b>final</b> Integer sum = sumTemperatures._2; <b>final</b> <b>long</b> count = parsedTemperatures.count(); <b>final</b> <b>double</b> avg = (<b>double</b>) sum / count; System.out.println(<font>"Average temperature "</font><font>+avg); JavaRDD<String> result = lines.filter(<b>new</b> Function<String, Boolean>() { <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L; <b>public</b> Boolean call(String v1) throws Exception { <b>final</b> String arr[] = SPACE.split(v1); <b>long</b> temperature = Long.parseLong(arr[1]); <b>return</b> temperature <= avg; } }); List<String> resultList = result.collect(); <b>for</b> (String item: resultList) { System.out.println(</font><font>"Result item: "</font><font>+item); } </font>
让我们解释一下这段代码:
- 通过counts.first()我们从reducer中读取所有温度的总和
- 我们使用count函数来获取JavaRDD输入集中所有行的计数。
- 我们使用JavaRDD过滤功能来过滤掉温度高于平均值的城镇。
- 我们使用JavaRDD collect函数来打印结果。
如果你运行这个程序,你应该得到如下结果:
16/03/03 21:02:26 INFO DAGScheduler: Job 1 finished: count at AvgTemperatureAnalyzer.java:85, took 0,094561 s Average temperature 27.0 . . Result item: Berlin 20 Result item: Paris 15 Result item: Rome 25 16/03/03 21:02:26 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,<b>null</b>}
结论
从我的观点来看,Apache Spark比map-reduce编程更加友好,即使概念是相同的。我打赌你明白我们需要通过JavaRDD输入进行多次迭代,但是使用map-reduce你需要弄清楚如何将前一个map reduce任务的结果传递给下一个,Apache Spark一个输入迭代以新的RDD设置,您可以在其中应用其他功能,从主节点驱动的所有内容......这不是很酷吗?
让我们看看如何使用Apache Spark Shared Variables使代码看起来更干净。
Apache Spark中的共享变量
通常在编写像map这样的Spark动作时,驱动程序中的任何传递的变量都会转换为远程工作人员的本地副本。并且远程工作人员对它们的任何更改都不会传播回驱动程序。为了克服这个事实,Apache Spark提供了共享变量的概念。我们有两种类型:
1. 广播变量
当您需要跨多个Spark操作的相同数据时,广播变量非常有用。通过调用SparkContext.broadcast(v),“v”数据然后仅以序列化形式发送给远程工作者,这加快了整个过程。
2.Acumulators累加器
变量只能是我们“添加”的东西,因此它们完全可用于并行处理。重要的是要记住,远程工作人员无法读取其值,只能添加它们。最终,只有驱动程序可以使用其值进行操作。
使用共享变量的演示
Acumulators累加器是解决我们所有行的所有温度总和问题的完美之选。没有必要使用map-reduce概念,就像我在之前的Spark编程部分中所做的那样,我们将简单地使用RDD forEach函数将解析后的温度发送到累加器。
<b>final</b> Accumulator<Integer> tempSum = ctx.accumulator(0); lines.foreach(<b>new</b> VoidFunction<String>() { <b>private</b> <b>static</b> <b>final</b> <b>long</b> serialVersionUID = 1L; <b>public</b> <b>void</b> call(String t) throws Exception { tempSum.add(Integer.valueOf(SPACE.split(t)[1])); } });
然后计算平均温度如下:
<b>final</b> Integer sum = tempSum.value(); <b>final</b> <b>long</b> count = lines.count(); <b>final</b> <b>double</b> avg = (<b>double</b>) sum / count;
如果您以通常的方式编译并运行此 repo ,您应该看到以下结果,就像之前的情况一样:
Average temperature 27.0 . . Result item: Berlin 20 Result item: Paris 15 Result item: Rome 25
点击标题见原文
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 函数式编程教程前奏
- 使用ZooKeeper编程 - 快速教程
- F#开发教程(13):函数化编程(十二)
- F#开发教程(12):函数化编程(十一)
- React教程 - 12. React的编程思想
- F#开发教程(13):函数化编程(十二)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Linux Command Line
William E. Shotts Jr. / No Starch Press, Incorporated / 2012-1-17 / USD 39.95
You've experienced the shiny, point-and-click surface of your Linux computer-now dive below and explore its depths with the power of the command line. The Linux Command Line takes you from your very ......一起来看看 《The Linux Command Line》 这本书的介绍吧!