内容简介:TopN就是找出数据中排在最前边或者最后的N个数。假设原始数据是K,V形式存储在文本文件中。要实现这个需求如下步骤1、首先把数据按行读入并分隔成KeyValue形式
TopN就是找出数据中排在最前边或者最后的N个数。假设原始数据是K,V形式存储在文本文件中。
要实现这个需求如下步骤
1、首先把数据按行读入并分隔成KeyValue形式
2、需要对key相同的数据进行聚合
3、按value排序,取topN,对value排序有3种方案
A) 对多个分区分别使用SortedMap,将value以作为SortedMap键,key作为SortedMap的value保存到SortedMap中,如果SortedMap 的size大于N,则删除首个值,这样每个分区最终得到的StoreMap就是N。最后对多个分区的SortedMap进行合并,取TopN
B) 使用RDD的takeOrdered,对value进行排序,需要自定义 排序 类,此类需要继承序列化类
C) 使用RDD的top对value进行排序,需要自定义排序类,此类需要继承序列化类
方案一:
点击( 此处 )折叠或打开
- import org . apache . spark . SparkConf ;
- import org . apache . spark . api . java . JavaPairRDD ;
- import org . apache . spark . api . java . JavaRDD ;
- import org . apache . spark . api . java . JavaSparkContext ;
- import org . apache . spark . api . java . function . FlatMapFunction ;
- import org . apache . spark . api . java . function . Function2 ;
- import org . apache . spark . api . java . function . PairFunction ;
- import org . apache . spark . broadcast . Broadcast ;
- import scala . Tuple2 ;
- import java . io . Serializable ;
- import java . util . * ;
- public class Top10NoUnique {
- public static class compare implements Comparator < Integer > , Serializable {
- public final static compare Instance = new compare ( ) ;
- @ Override
- public int compare ( Integer o1 , Integer o2 ) {
- return o1 - o2 ;
- }
- }
- public static void main ( final String args [ ] ) {
- SparkConf conf = new SparkConf ( ) ;
- conf . setMaster ( "local" ) . setAppName ( "Top10NoUnique" ) ;
- JavaSparkContext sc = new JavaSparkContext ( conf ) ;
- JavaRDD < String > lines = sc . textFile ( "e:\\tmp\\input\\top10nonunique.txt" ) ;
- //定义共享变量
- final Broadcast < Integer > topN = sc . broadcast ( 10 ) ;
- //对数据进行分区
- JavaRDD < String > parLines = lines . coalesce ( 2 ) ;
- JavaPairRDD < String , Integer > pairRDD = parLines . mapToPair ( new PairFunction < String , String , Integer > ( ) {
- @ Override
- public Tuple2 < String , Integer > call ( String s ) throws Exception {
- s = s . trim ( ) ;
- String [ ] arrs = s . split ( "," ) ;
- return new Tuple2 < String , Integer > ( arrs [ 0 ] , Integer . parseInt ( arrs [ 1 ] ) ) ;
- }
- } ) ;
- List < Tuple2 < String , Integer > > list = pairRDD . collect ( ) ;
- for ( Tuple2 < String , Integer > tuple2 : list ) {
- System . out . println ( tuple2 . _1 ( ) + " " + tuple2 . _2 ( ) ) ;
- }
- //对Key合并value值
- JavaPairRDD < String , Integer > uniquePairRDD = pairRDD . reduceByKey ( new Function2 < Integer , Integer , Integer > ( ) {
- @ Override
- public Integer call ( Integer v1 , Integer v2 ) throws Exception {
- return v1 + v2 ;
- }
- } ) ;
- list = uniquePairRDD . collect ( ) ;
- for ( Tuple2 < String , Integer > tuple2 : list ) {
- System . out . println ( tuple2 . _1 ( ) + " " + tuple2 . _2 ( ) ) ;
- }
- //计算topN
- JavaRDD < SortedMap < Integer , String > > sortedMapJavaRDD = uniquePairRDD . mapPartitions ( new FlatMapFunction < Iterator < Tuple2 < String , Integer > > , SortedMap < Integer , String > > ( ) {
- @ Override
- public Iterator < SortedMap < Integer , String > > call ( Iterator < Tuple2 < String , Integer > > tuple2Iterator ) throws Exception {
- SortedMap < Integer , String > sortedMap = new TreeMap < Integer , String > ( compare . Instance ) ;
- while ( tuple2Iterator . hasNext ( ) ) {
- Tuple2 < String , Integer > tuple2 = tuple2Iterator . next ( ) ;
- sortedMap . put ( tuple2 . _2 ( ) , tuple2 . _1 ( ) ) ;
- if ( sortedMap . size ( ) > topN . value ( ) ) {
- sortedMap . remove ( sortedMap . firstKey ( ) ) ;
- }
- }
- return Collections . singletonList ( sortedMap ) . iterator ( ) ;
- }
- } ) ;
- //对多个分区进行合并
- List < SortedMap < Integer , String > > sortedMapList = sortedMapJavaRDD . collect ( ) ;
- SortedMap < Integer , String > top10Map = new TreeMap < > ( ) ;
- for ( SortedMap < Integer , String > map : sortedMapList ) {
- for ( Map . Entry < Integer , String > entry : map . entrySet ( ) ) {
- top10Map . put ( entry . getKey ( ) , entry . getValue ( ) ) ;
- if ( top10Map . size ( ) > topN . value ( ) ) {
- top10Map . remove ( top10Map . firstKey ( ) ) ;
- }
- }
- }
- System . out . println ( "----------top 10------------------" ) ;
- for ( Map . Entry < Integer , String > entry : top10Map . entrySet ( ) ) {
- System . out . println ( entry . getKey ( ) + " " + entry . getValue ( ) ) ;
- }
- }
- }
方案二:
点击( 此处 )折叠或打开
- import org . apache . spark . SparkConf ;
- import org . apache . spark . api . java . JavaPairRDD ;
- import org . apache . spark . api . java . JavaRDD ;
- import org . apache . spark . api . java . JavaSparkContext ;
- import org . apache . spark . api . java . function . Function2 ;
- import org . apache . spark . api . java . function . PairFunction ;
- import scala . Tuple2 ;
- import java . io . Serializable ;
- import java . util . Comparator ;
- import java . util . List ;
- public class Top10TaskOrder {
- public static class Comp implements Comparator < Tuple2 < String , Integer > > , Serializable {
- public final static Comp INSTANCE = new Comp ( ) ;
- @ Override
- public int compare ( Tuple2 < String , Integer > o1 , Tuple2 < String , Integer > o2 ) {
- return o2 . _2 ( ) . compareTo ( o1 . _2 ( ) ) ;
- }
- }
- public static void main ( String [ ] args ) {
- SparkConf conf = new SparkConf ( ) ;
- conf . setMaster ( "local" ) . setAppName ( "Top10TaskOrder" ) ;
- JavaSparkContext sc = new JavaSparkContext ( conf ) ;
- JavaRDD < String > lines = sc . textFile ( "e:\\tmp\\input\\top10nonunique.txt" ) ;
- JavaRDD < String > partLines = lines . coalesce ( 2 ) ;
- JavaPairRDD < String , Integer > pairRDD = partLines . mapToPair ( new PairFunction < String , String , Integer > ( ) {
- @ Override
- public Tuple2 < String , Integer > call ( String s ) throws Exception {
- s = s . trim ( ) ;
- String [ ] splits = s . split ( "," ) ;
- return new Tuple2 < String , Integer > ( splits [ 0 ] , Integer . parseInt ( splits [ 1 ] ) ) ;
- }
- } ) ;
- JavaPairRDD < String , Integer > reducePairRDD = pairRDD . reduceByKey ( new Function2 < Integer , Integer , Integer > ( ) {
- @ Override
- public Integer call ( Integer v1 , Integer v2 ) throws Exception {
- return v1 + v2 ;
- }
- } ) ;
- List < Tuple2 < String , Integer > > takeOrderPairRDD = reducePairRDD . takeOrdered ( 10 , Comp . INSTANCE ) ;
- System . out . println ( "--------------------top list------------------" ) ;
- for ( Tuple2 < String , Integer > tuple2 : takeOrderPairRDD ) {
- System . out . println ( tuple2 . _2 ( ) + " " + tuple2 . _1 ( ) ) ;
- }
- }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- Golang学习笔记-调度器学习
- Vue学习笔记(二)------axios学习
- 算法/NLP/深度学习/机器学习面试笔记
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
平台革命:改变世界的商业模式
[美]杰奥夫雷G.帕克(Geoffrey G. Parker)、马歇尔W.范·埃尔斯泰恩(Marshall W. Van Alstyne)、桑基特·保罗·邱达利(Sangeet Paul Choudary) / 志鹏 / 机械工业出版社 / 2017-10 / 65.00
《平台革命》一书从网络效应、平台的体系结构、颠覆市场、平台上线、盈利模式、平台开放的标准、平台治理、平台的衡量指标、平台战略、平台监管的10个视角,清晰地为读者提供了平台模式最权威的指导。 硅谷著名投资人马克·安德森曾经说过:“软件正在吞食整个世界。”而《平台革命》进一步指出:“平台正在吞食整个世界”。以平台为导向的经济变革为社会和商业机构创造了巨大的价值,包括创造财富、增长、满足人类的需求......一起来看看 《平台革命:改变世界的商业模式》 这本书的介绍吧!