内容简介:spark从1.6开始引入,到现在2.4版本,pivot算子有了进一步增强,这使得后续无论是交给pandas继续做处理,还是交给R继续分析,都简化了不少。大家无论在使用pandas、numpy或是R的时候,首先会做的就是处理数据,尤其是将列表,转成成合适的形状。在说透视表之前,我们先看看,什么是列表,在传统观念上,列表的每一行代表一条记录,而每一列代表一个属性。+-------+-------+-----+
spark从1.6开始引入,到现在2.4版本,pivot算子有了进一步增强,这使得后续无论是交给pandas继续做处理,还是交给R继续分析,都简化了不少。大家无论在使用pandas、numpy或是R的时候,首先会做的就是处理数据,尤其是将列表,转成成合适的形状。
列表
在说透视表之前,我们先看看,什么是列表,在传统观念上,列表的每一行代表一条记录,而每一列代表一个属性。
+-------+-------+-----+
| date|project|value|
+-------+-------+-----+
|2018-01| p1| 100|
|2018-01| p2| 200|
|2018-01| p3| 300|
|2018-02| p1| 1000|
|2018-02| p2| 2000|
|2018-03| px| 999|
+-------+-------+-----+
举个简单的例子,如上表,一条记录可能代表某个项目,在某个年月创造的价值。而在这个表里面,某一列,就代表一个属性,比如date代表日期,project代表项目名称。而这里每一行,代表一条独立,完整的记录,一条与另外一条记录,没有直接的关系。
这种结构,也是一般关系型数据库的数据结构。
透视表
透视表没有一个明确的定义,一般是观念上是指,为了方便进行数据分析,而对数据进行一定的重排,方便后续分析,计算等操作。透视表每一个元素及其对应的“坐标”一起形成一条完整的记录。
+-------+------+------+-----+-----+
| date| p1| p2| p3| px|
+-------+------+------+-----+-----+
|2018-01| 100.0| 200.0|300.0| 0.0|
|2018-02|1000.0|2000.0| 0.0| 0.0|
|2018-03| 0.0| 0.0| 0.0|999.0|
+-------+------+------+-----+-----+
上面的表,是将列表进行重排后的透视表,其第一行和第一列可以理解成索引,而在表中根据索引可以确定一条唯一的值,他们一起组成一条相当于列表里的数据。
通过一般的定义,我们能看出,透视表主要用于分析,所以,一般的场景我们都会先对数据进行聚合,以后再对数据分析,这样也更有意义。就好像,将话费清单,做成透视表,尽管逻辑上没有任何问题,但是结果是可能比现在的清单列表更难查阅。
PS:一些可以借鉴的名词,目前维基百科并没有收录,也只能权且理解一下吧
建模拟数据
先来模拟个数据吧,按照前面的例子,建个csv,这里多加了一列s2,是为了做多透视列的,
date,project,value,s2 2018-01,p1,100,12 2018-01,p2,200,33 2018-01,p3,300,44 2018-02,p1,1000,22 2018-02,p2,2000,41 2018-03,px,999,22
spark API
我们先来看下DEMO程序
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); SparkContext sc = SparkContext.getOrCreate(sparkConf); SparkSession ss = new SparkSession(sc); Dataset<Row> ds = ss.read() //csv分隔符 .option("sep", ",") //是否包含header .option("header", "true") //加载csv路径 .csv("E:\\devlop\\workspace\\sparkdemo\\src\\main\\java\\com\\dafei1288\\spark\\data1.csv"); Dataset<Row> r = //设置分组 ds.groupBy(col("date")) //设置pivot .pivot("project") //设置聚合 .agg(sum("value")); r.show();
在加载csv的时候,我们设置了分隔符,以及读取表头。
对加载后的dataset只需要进行3步设置
-
groupBy 设置分组列
-
pivot 设置pivot列
-
agg 设置聚合方式,可以是求和、平均等聚合函数
我们得到的输出结果如下:
+-------+------+------+-----+-----+
| date| p1| p2| p3| px|
+-------+------+------+-----+-----+
|2018-03| null| null| null|999.0|
|2018-02|1000.0|2000.0| null| null|
|2018-01| 100.0| 200.0|300.0| null|
+-------+------+------+-----+-----+
请注意,这里和 sql 有些区别,就是groupBy的时候,不需要将project列写入了,如果写入成了
groupBy(col("date"),col("project"))
那么结果就是这样了
+-------+-------+------+------+-----+-----+
| date|project| p1| p2| p3| px|
+-------+-------+------+------+-----+-----+
|2018-01| p3| null| null|300.0| null|
|2018-01| p2| null| 200.0| null| null|
|2018-01| p1| 100.0| null| null| null|
|2018-03| px| null| null| null|999.0|
|2018-02| p1|1000.0| null| null| null|
|2018-02| p2| null|2000.0| null| null|
+-------+-------+------+------+-----+-----+
sparkSQL
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); SparkContext sc = SparkContext.getOrCreate(sparkConf); SparkSession ss = new SparkSession(sc); Dataset<Row> ds = ss.read() .option("sep", ",") .option("header", "true").csv("E:\\devlop\\workspace\\sparkdemo\\src\\main\\java\\com\\dafei1288\\spark\\data1.csv"); ds.registerTempTable("f"); Dataset<Row> r = ds.sqlContext().sql( "select * from ( select date,project as p,sum(value) as ss from f group by date,project ) pivot ( sum(ss) for p in ( 'p1','p2','p3','px' ) ) order by date"); r.na().fill(0).show();
可以看到,这里我们将读取的csv注册成了表f,使用spark sql语句,这里和oracle的透视语句类似
pivot语法: pivot( 聚合列 for 待转换列 in (列值) )
其语法还是比较简单的。
为了展示数据好看一点,我特意使用语句
r.na().fill(0)
将空值`null`替换成了0。
+-------+------+------+-----+-----+
| date| p1| p2| p3| px|
+-------+------+------+-----+-----+
|2018-01| 100.0| 200.0|300.0| 0.0|
|2018-02|1000.0|2000.0| 0.0| 0.0|
|2018-03| 0.0| 0.0| 0.0|999.0|
+-------+------+------+-----+-----+
多聚合列
上文提到了,多做了一列,就是为了这个DEMO准备的,使用如下SparkSQL语句,设置多聚合列透视表
select * from ( select date,project as p,sum(value) as ss,sum(s2) as ss2 from f group by date,project ) pivot ( sum(ss),sum(ss2) for p in ( 'p1','p2','p3','px' ) ) order by date
这里为例方便看,我就截图了
为了防止OOM的情况,spark对pivot的数据量进行了限制,其可以通过 spark.sql.pivotMaxValues 来进行修改,默认值为10000,这里是指piovt后的列数。
好了,关于spark pivot就介绍到这了,其实这里与矩阵的行列转换类似,pivot对应的也有unpivot,下次我们再聊。
参考资料:
https://stackoverflow.com/questions/30244910/how-to-pivot-dataframe
https://databricks.com/session/pivoting-data-with-sparksql
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 如何使用Azure数据迁移将数据移动到云端
- iOS数据持久化:使用NSKeyedArchiver进行数据归档
- WordPress插件开发 -- 在插件使用数据库存储数据
- 使用Pig清洗数据
- 使用“数据驱动测试”之前
- 数据安全治理中的开发测试环境数据安全使用技术
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Spring Cloud微服务实战
翟永超 / 电子工业出版社 / 2017-5 / 89
《Spring Cloud微服务实战》从时下流行的微服务架构概念出发,详细介绍了Spring Cloud针对微服务架构中几大核心要素的解决方案和基础组件。对于各个组件的介绍,《Spring Cloud微服务实战》主要以示例与源码结合的方式来帮助读者更好地理解这些组件的使用方法以及运行原理。同时,在介绍的过程中,还包含了作者在实践中所遇到的一些问题和解决思路,可供读者在实践中作为参考。 《Sp......一起来看看 《Spring Cloud微服务实战》 这本书的介绍吧!