Spark driver 端得到 Executor 返回值的方法

栏目: 编程工具 · 发布时间: 6年前

内容简介:有人说spark的代码不优雅,这个浪尖就忍不了了。实际上,说spark代码不优雅的主要是对scala不熟悉,spark代码我觉得还是很赞的,最值得阅读的大数据框架之一。今天这篇文章不是为了争辩Spark 代码优雅与否,主要是讲一下理解了spark源码之后我们能使用的一些小技巧吧。

Spark driver 端得到 Executor 返回值的方法

有人说spark的代码不优雅,这个浪尖就忍不了了。实际上,说spark代码不优雅的主要是对scala不熟悉,spark代码我觉得还是很赞的,最值得阅读的大数据框架之一。

今天这篇文章不是为了争辩Spark 代码优雅与否,主要是讲一下理解了spark源码之后我们能使用的一些小技巧吧。

spark 使用的时候,总有些需求比较另类吧,比如有球友问过这样一个需求:

浪尖,我想要在driver端获取executor执行task返回的结果,比如task是个规则引擎,我想知道每条规则命中了几条数据,请问这个怎么做呢?

这个是不是很骚气,也很常见,按理说你输出之后,在 mysql 里跑条 sql 就行了,但是这个往往显的比较麻烦。而且有时候,在 driver可能还要用到这些数据呢?具体该怎么做呢?

大部分的想法估计是collect方法,那么用collect如何实现呢?大家自己可以考虑一下,我只能告诉你不简单,不如输出到数据库里,然后driver端写sql分析一下。

还有一种考虑就是使用自定义累加器。这样就可以在executor端将结果累加然后在driver端使用,不过具体实现也是很麻烦。大家也可以自己琢磨一下下~

那么,浪尖就给大家介绍一个比较常用也比较骚的操作吧。

其实,这种操作我们最先想到的应该是count函数,因为他就是将task的返回值返回到driver端,然后进行聚合的。我们可以从idea count函数点击进去,可以看到

  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

也即是sparkcontext的runJob方法。

Utils.getIteratorSize _这个方法主要是计算每个iterator的元素个数,也即是每个分区的元素个数,返回值就是元素个数:

<span>/**</span>

<span> * Counts the number of elements of an iterator using a while loop rather than calling</span>

<span> * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower</span>

<span> * in the current version of Scala.</span>

<span> */</span>

<span> def getIteratorSize[<span>T</span>](<span>iterator: Iterator[T]</span>): Long = {</span>

<span><span> var count = 0L</span></span>

<span><span> while (iterator.hasNext) {</span></span>

<span><span> count += 1L</span></span>

<span><span> iterator.next()</span></span>

<span><span> }</span></span>

<span><span> count</span></span>

<span> }</span>

然后就是runJob返回的是一个数组,每个数组的元素就是我们task执行函数的返回值,然后调用sum就得到我们的统计值了。

那么我们完全可以借助这个思路实现我们开头的目标。浪尖在这里直接上案例了:

<span><span>import</span> org.apache.spark.{SparkConf, SparkContext, TaskContext}</span>

<span><span>import</span> org.elasticsearch.hadoop.cfg.ConfigurationOptions</span>

<span><br /></span>

<span>object es2sparkRunJob {</span>

<span><br /></span>

<span> def main(args: <span>Array</span>[<span>String</span>]): <span>Unit</span> = {</span>

<span> val conf = new <span>SparkConf</span>().setMaster(<span>&quot;local[*]&quot;</span>).setAppName(this.getClass.getCanonicalName)</span>

<span><br /></span>

<span> conf.<span>set</span>(<span>ConfigurationOptions</span>.<span>ES_NODES</span>, <span>&quot;127.0.0.1&quot;</span>)</span>

<span> conf.<span>set</span>(<span>ConfigurationOptions</span>.<span>ES_PORT</span>, <span>&quot;9200&quot;</span>)</span>

<span> conf.<span>set</span>(<span>ConfigurationOptions</span>.<span>ES_NODES_WAN_ONLY</span>, <span>&quot;true&quot;</span>)</span>

<span> conf.<span>set</span>(<span>ConfigurationOptions</span>.<span>ES_INDEX_AUTO_CREATE</span>, <span>&quot;true&quot;</span>)</span>

<span> conf.<span>set</span>(<span>ConfigurationOptions</span>.<span>ES_NODES_DISCOVERY</span>, <span>&quot;false&quot;</span>)</span>

<span> conf.<span>set</span>(<span>&quot;es.write.rest.error.handlers&quot;</span>, <span>&quot;ignoreConflict&quot;</span>)</span>

<span> conf.<span>set</span>(<span>&quot;es.write.rest.error.handler.ignoreConflict&quot;</span>, <span>&quot;com.jointsky.bigdata.handler.IgnoreConflictsHandler&quot;</span>)</span>

<span><br /></span>

<span> val sc = new <span>SparkContext</span>(conf)</span>

<span> <span>import</span> org.elasticsearch.spark._</span>

<span><br /></span>

<span> val rdd = sc.esJsonRDD(<span>&quot;posts&quot;</span>).repartition(<span>10</span>)</span>

<span><br /></span>

<span> rdd.<span>count</span>()</span>

<span> val <span><span>func</span> = <span>(itr : Iterator[<span>(String,String)</span></span></span>]) =&gt; {</span>

<span> <span>var</span> <span>count</span> = <span>0</span></span>

<span> itr.foreach(each=&gt;{</span>

<span> <span>count</span> += <span>1</span></span>

<span> })</span>

<span> (<span>TaskContext</span>.getPartitionId(),<span>count</span>)</span>

<span> }</span>

<span><br /></span>

<span> val res = sc.runJob(rdd,<span><span>func</span>)</span></span>

<span><br /></span>

<span> <span>res</span>.<span>foreach</span><span>(<span>println</span>)</span></span>

<span><br /></span>

<span> sc.stop()</span>

<span> }</span>

<span>}</span>

<span><br /></span>

例子中driver端获取的就是每个task处理的数据量。

效率高,而且操作灵活高效~

是不是很骚气~~

更多spark源码知识,flink知识,欢迎加入浪尖知识星球,一起学习~

Spark driver 端得到 Executor 返回值的方法


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

C语言的科学和艺术

C语言的科学和艺术

罗伯茨 / 翁惠玉 / 机械工业出版社 / 2005-3 / 55.00元

《C语言的科学和艺术》是计算机科学的经典教材,介绍了计算机科学的基础知识和程序设计的专门知识。《C语言的科学和艺术》以介绍ANSI C为主线,不仅涵盖C语言的基本知识,而且介绍了软件工程技术以及如何应用良好的程序设计风格进行开发等内容。《C语言的科学和艺术》采用了库函数的方法,强调抽象的原则,详细阐述了库和模块化开发。此外,《C语言的科学和艺术》还利用大量实例讲述解决问题的全过程,对开发过程中常见......一起来看看 《C语言的科学和艺术》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

html转js在线工具
html转js在线工具

html转js在线工具