内容简介:借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块
我是51CTO学院讲师张敏,在51CTO学院“4.20 IT充电节”(4月19~20日) 到来之际,和大家分享一下Spark Core之上扩建自己的模块的经验。正文来啦~~~
hbase-rdd是一个构建在SparkContext基础之上的用于对Hbase进行增删改查的第三方开源模块,目前最新版本为0.7.1。目前该rdd在操作hbase时,默认调用隐式方法。
implicitdef stringToBytes(s: String): Array[Byte] = { Bytes.toBytes(s) }
将RDD的key转换成字节b,然后调用Hbase的put(b)方法保存rowkey,之后将RDD的每一行存入hbase。
在轨迹图绘制项目数据计算中,我们考虑到hbase的rowkey的设计——尽量减少rowkey存储的开销。虽然hbase-rdd最终的rowkey默认都是采用字节数组,但这个地方我们希望按自己的方式组装rowkey。使用MD5(imei)+dateTime组成的字节数组作为rowkey。因此默认的hbase-rdd提供的方法是不满足我们存储需求的,需要对源代码进行修改。在toHbase方法中,有一个convert方法,该方法将对RDD中的每一行数据进行转化,使用RDD中的key生成Put(Bytes.toBytes(key))对象,该对象为之后存储Hbase提供rowkey。
在convert函数中,对其实现进行了改造,hbase-rdd默认使用stringToBytes隐式函数将RDD的String类型的key转换成字节数组,这里我们需要改造,不使stringToBytes隐式方法,而是直接生成字节数据。
protected def convert(id: String, values: Map[String, Map[String, A]], put: PutAdder[A]) = { val strs = id.split(",") val imei = strs {0} val dateTime = strs {1} val b1 = MD5Utils.computeMD5Hash(imei.getBytes()) val b2 = Bytes.toBytes(dateTime.toLong) val key = b1.++(b2) val p = new Put(key)//改造 var empty = true for { (family, content) <- values (key, value) <- content } { empty = false if (StrUtils.isNotEmpty(family) &&StrUtils.isNotEmpty(key)) { put(p, family, key, value) } } if (empty) None else Some(new ImmutableBytesWritable, p) }
这样就实现了使用自己的方式构建rowkey,当然基于此思想我们可以使用任意的方式构建rowkey。
在使用hbase-rdd插件的过程中,我在思考,默认的RDD上是没有toHbase方法的,那为什么引入hbase-rdd包之后,RDD之上就有toHbase方法了?经过查看源码,发现hbase-rdd包中提供了两个隐式方法:
implicitdef toHBaseRDDSimple[A](rdd: RDD[(String, Map[String, A])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[A] =new HBaseWriteRDDSimple(rdd, pa[A]) implicit def toHBaseRDDSimpleTS[A](rdd: RDD[(String, Map[String, (A, Long)])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[(A, Long)] =new HBaseWriteRDDSimple(rdd, pa[A])
这两个方法在发现RDD上没有toHbase方法时会自动尝试调用,从隐式定义中尝试找到解决方案,尝试之后发现有定义toHBaseRDDSimple隐式方法,于是调用该隐式方法新建HBaseWriteRDDSimple类,返回hBaseWriteRDDSimple,而在hBaseWriteRDDSimple对象中是有toHbase方法的,因此在引入hbase-rdd之后,可以发现原本没有toHbase方法的RDD上有toHbase方法了。这一切都要归功于Scala强大的隐式转换功能。
那明白了原理,是否我们可以基于RDD写自己的模块,说干就干!
第一步:新建Trait
traitHaha{ implicitdef gaga[A](rdd: RDD[String]): Hehe= newHehe(rdd) }
第二步:新建Hehe类
final class Hehe(rdd:RDD[String]) { def wow(tableName:String,family:String): Unit ={ println("---------------------------------------------") println("tableName:"+tableName+" - family:"+family) println("size:"+rdd.count()) rdd.collect().foreach(data=>println(data)) println("---------------------------------------------") } }
第三步:新建包对象
package object test extends Haha
第四步:新建test类
object Test{ def main(args: Array[String]) { valsparkConf = new SparkConf().setAppName("Test") valsc = new SparkContext(sparkConf) sc.makeRDD(Seq("one","two","three","four")).wow("taskDataPre","T") } }
项目结构图:
运行效果图:
希望对大家以后的开发有帮助,同时借鉴本案例,在Spark Core之上构建自己的小模块。
51CTO学院 4.20 IT充电节
(19-20号两天,100门视频课程免单抢,更有视频课程会员享6折,非会员享7折,套餐折上8折,微职位立减2000元钜惠)
活动链接: http://edu.51cto.com/activity/lists/id-47.html?wenzhang
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 国际资讯 斯柯达计划将HPC计算量提升至15千兆次 扩建其数据中心
- Node.js模块系统 (创建模块与加载模块)
- 黑客基础,Metasploit模块简介,渗透攻击模块、攻击载荷模块
- 022.Python模块序列化模块(json,pickle)和math模块
- 024.Python模块OS模块
- 023.Python的随机模块和时间模块
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Data Mining
Jiawei Han、Micheline Kamber、Jian Pei / Morgan Kaufmann / 2011-7-6 / USD 74.95
The increasing volume of data in modern business and science calls for more complex and sophisticated tools. Although advances in data mining technology have made extensive data collection much easier......一起来看看 《Data Mining》 这本书的介绍吧!