Spark学习笔记--TopN

栏目: 编程工具 · 发布时间: 6年前

内容简介:TopN就是找出数据中排在最前边或者最后的N个数。假设原始数据是K,V形式存储在文本文件中。要实现这个需求如下步骤1、首先把数据按行读入并分隔成KeyValue形式

TopN就是找出数据中排在最前边或者最后的N个数。假设原始数据是K,V形式存储在文本文件中。

要实现这个需求如下步骤

1、首先把数据按行读入并分隔成KeyValue形式

2、需要对key相同的数据进行聚合

3、按value排序,取topN,对value排序有3种方案

A) 对多个分区分别使用SortedMap,将value以作为SortedMap键,key作为SortedMap的value保存到SortedMap中,如果SortedMap 的size大于N,则删除首个值,这样每个分区最终得到的StoreMap就是N。最后对多个分区的SortedMap进行合并,取TopN

B) 使用RDD的takeOrdered,对value进行排序,需要自定义 排序 类,此类需要继承序列化类

C) 使用RDD的top对value进行排序,需要自定义排序类,此类需要继承序列化类

方案一:

点击( 此处 )折叠或打开

  1. import org . apache . spark . SparkConf ;
  2. import org . apache . spark . api . java . JavaPairRDD ;
  3. import org . apache . spark . api . java . JavaRDD ;
  4. import org . apache . spark . api . java . JavaSparkContext ;
  5. import org . apache . spark . api . java . function . FlatMapFunction ;
  6. import org . apache . spark . api . java . function . Function2 ;
  7. import org . apache . spark . api . java . function . PairFunction ;
  8. import org . apache . spark . broadcast . Broadcast ;
  9. import scala . Tuple2 ;
  10. import java . io . Serializable ;
  11. import java . util . * ;
  12. public class Top10NoUnique {
  13.      public static class compare implements Comparator < Integer > , Serializable {
  14.         public final static compare Instance = new compare ( ) ;
  15.         @ Override
  16.          public int compare ( Integer o1 , Integer o2 ) {
  17.              return o1 - o2 ;
  18.          }
  19.      }
  20.      public static void main ( final String args [ ] ) {
  21.         SparkConf conf = new SparkConf ( ) ;
  22.         conf . setMaster ( "local" ) . setAppName ( "Top10NoUnique" ) ;
  23.         JavaSparkContext sc = new JavaSparkContext ( conf ) ;
  24.         JavaRDD < String > lines = sc . textFile ( "e:\\tmp\\input\\top10nonunique.txt" ) ;
  25.          //定义共享变量
  26.          final Broadcast < Integer > topN = sc . broadcast ( 10 ) ;
  27.          //对数据进行分区
  28.         JavaRDD < String > parLines = lines . coalesce ( 2 ) ;
  29.         JavaPairRDD < String , Integer > pairRDD = parLines . mapToPair ( new PairFunction < String , String , Integer > ( ) {
  30.             @ Override
  31.              public Tuple2 < String , Integer > call ( String s ) throws Exception {
  32.                  s = s . trim ( ) ;
  33.                   String [ ] arrs = s . split ( "," ) ;
  34.                  return new Tuple2 < String , Integer > ( arrs [ 0 ] , Integer . parseInt ( arrs [ 1 ] ) ) ;
  35.              }
  36.          } ) ;
  37.          List < Tuple2 < String , Integer > > list = pairRDD . collect ( ) ;
  38.          for ( Tuple2 < String , Integer > tuple2 : list ) {
  39.              System . out . println ( tuple2 . _1 ( ) + " " + tuple2 . _2 ( ) ) ;
  40.          }
  41.          //对Key合并value值
  42.         JavaPairRDD < String , Integer > uniquePairRDD = pairRDD . reduceByKey ( new Function2 < Integer , Integer , Integer > ( ) {
  43.             @ Override
  44.              public Integer call ( Integer v1 , Integer v2 ) throws Exception {
  45.                  return v1 + v2 ;
  46.              }
  47.          } ) ;
  48.         list = uniquePairRDD . collect ( ) ;
  49.          for ( Tuple2 < String , Integer > tuple2 : list ) {
  50.              System . out . println ( tuple2 . _1 ( ) + " " + tuple2 . _2 ( ) ) ;
  51.          }
  52.          //计算topN
  53.         JavaRDD < SortedMap < Integer , String > > sortedMapJavaRDD = uniquePairRDD . mapPartitions ( new FlatMapFunction < Iterator < Tuple2 < String , Integer > > , SortedMap < Integer , String > > ( ) {
  54.             @ Override
  55.              public Iterator < SortedMap < Integer , String > > call ( Iterator < Tuple2 < String , Integer > > tuple2Iterator ) throws Exception {
  56.                  SortedMap < Integer , String > sortedMap = new TreeMap < Integer , String > ( compare . Instance ) ;
  57.                  while ( tuple2Iterator . hasNext ( ) ) {
  58.                     Tuple2 < String , Integer > tuple2 = tuple2Iterator . next ( ) ;
  59.                      sortedMap . put ( tuple2 . _2 ( ) , tuple2 . _1 ( ) ) ;
  60.                      if ( sortedMap . size ( ) > topN . value ( ) ) {
  61.                          sortedMap . remove ( sortedMap . firstKey ( ) ) ;
  62.                      }
  63.                  }
  64.                  return Collections . singletonList ( sortedMap ) . iterator ( ) ;
  65.              }
  66.          } ) ;
  67.          //对多个分区进行合并
  68.          List < SortedMap < Integer , String > > sortedMapList = sortedMapJavaRDD . collect ( ) ;
  69.          SortedMap < Integer , String > top10Map = new TreeMap < > ( ) ;
  70.          for ( SortedMap < Integer , String > map : sortedMapList ) {
  71.              for ( Map . Entry < Integer , String > entry : map . entrySet ( ) ) {
  72.                 top10Map . put ( entry . getKey ( ) , entry . getValue ( ) ) ;
  73.                  if ( top10Map . size ( ) > topN . value ( ) ) {
  74.                     top10Map . remove ( top10Map . firstKey ( ) ) ;
  75.                  }
  76.              }
  77.          }
  78.          System . out . println ( "----------top 10------------------" ) ;
  79.          for ( Map . Entry < Integer , String > entry : top10Map . entrySet ( ) ) {
  80.              System . out . println ( entry . getKey ( ) + " " + entry . getValue ( ) ) ;
  81.          }
  82.      }
  83. }

方案二:

点击( 此处 )折叠或打开

  1. import org . apache . spark . SparkConf ;
  2. import org . apache . spark . api . java . JavaPairRDD ;
  3. import org . apache . spark . api . java . JavaRDD ;
  4. import org . apache . spark . api . java . JavaSparkContext ;
  5. import org . apache . spark . api . java . function . Function2 ;
  6. import org . apache . spark . api . java . function . PairFunction ;
  7. import scala . Tuple2 ;
  8. import java . io . Serializable ;
  9. import java . util . Comparator ;
  10. import java . util . List ;
  11. public class Top10TaskOrder {
  12.      public static class Comp implements Comparator < Tuple2 < String , Integer > > , Serializable {
  13.          public final static Comp INSTANCE = new Comp ( ) ;
  14.         @ Override
  15.          public int compare ( Tuple2 < String , Integer > o1 , Tuple2 < String , Integer > o2 ) {
  16.              return o2 . _2 ( ) . compareTo ( o1 . _2 ( ) ) ;
  17.          }
  18.      }
  19.      public static void main ( String [ ] args ) {
  20.         SparkConf conf = new SparkConf ( ) ;
  21.         conf . setMaster ( "local" ) . setAppName ( "Top10TaskOrder" ) ;
  22.         JavaSparkContext sc = new JavaSparkContext ( conf ) ;
  23.         JavaRDD < String > lines = sc . textFile ( "e:\\tmp\\input\\top10nonunique.txt" ) ;
  24.         JavaRDD < String > partLines = lines . coalesce ( 2 ) ;
  25.         JavaPairRDD < String , Integer > pairRDD = partLines . mapToPair ( new PairFunction < String , String , Integer > ( ) {
  26.             @ Override
  27.              public Tuple2 < String , Integer > call ( String s ) throws Exception {
  28.                 s = s . trim ( ) ;
  29.                  String [ ] splits = s . split ( "," ) ;
  30.                  return new Tuple2 < String , Integer > ( splits [ 0 ] , Integer . parseInt ( splits [ 1 ] ) ) ;
  31.              }
  32.          } ) ;
  33.         JavaPairRDD < String , Integer > reducePairRDD = pairRDD . reduceByKey ( new Function2 < Integer , Integer , Integer > ( ) {
  34.             @ Override
  35.              public Integer call ( Integer v1 , Integer v2 ) throws Exception {
  36.                  return v1 + v2 ;
  37.              }
  38.          } ) ;
  39.          List < Tuple2 < String , Integer > > takeOrderPairRDD = reducePairRDD . takeOrdered ( 10 , Comp . INSTANCE ) ;
  40.          System . out . println ( "--------------------top list------------------" ) ;
  41.          for ( Tuple2 < String , Integer > tuple2 : takeOrderPairRDD ) {
  42.              System . out . println ( tuple2 . _2 ( ) + " " + tuple2 . _1 ( ) ) ;
  43.          }
  44.      }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

平台革命:改变世界的商业模式

平台革命:改变世界的商业模式

[美]杰奥夫雷G.帕克(Geoffrey G. Parker)、马歇尔W.范·埃尔斯泰恩(Marshall W. Van Alstyne)、桑基特·保罗·邱达利(Sangeet Paul Choudary) / 志鹏 / 机械工业出版社 / 2017-10 / 65.00

《平台革命》一书从网络效应、平台的体系结构、颠覆市场、平台上线、盈利模式、平台开放的标准、平台治理、平台的衡量指标、平台战略、平台监管的10个视角,清晰地为读者提供了平台模式最权威的指导。 硅谷著名投资人马克·安德森曾经说过:“软件正在吞食整个世界。”而《平台革命》进一步指出:“平台正在吞食整个世界”。以平台为导向的经济变革为社会和商业机构创造了巨大的价值,包括创造财富、增长、满足人类的需求......一起来看看 《平台革命:改变世界的商业模式》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

随机密码生成器
随机密码生成器

多种字符组合密码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器