内容简介:Spark ML由分类,回归,聚类,协同过滤,降维等。其中基于机器学习算法可以构建流水线PipeLine。spark基于DataFrames高层次API,通过机器学习管道构建整套机器学习算法库。只有把原始特征转化为特征向量,才能用于机器学习模型的训练。常用特征主要有以下几种:
Spark ML由分类,回归,聚类,协同过滤,降维等。其中基于机器学习算法可以构建流水线PipeLine。
spark基于DataFrames高层次API,通过机器学习管道构建整套机器学习算法库。
2 特征向量化
只有把原始特征转化为特征向量,才能用于机器学习模型的训练。常用特征主要有以下几种:
- 数值特征:主要针对数值类型。
- 类别特征:类别特征是可穷举的值。类别特征不能直接使用,一般需要对特征进行编号,将其转化为数值特征。
- 文本特征:从文本中提取出来的特征,如:电影评论。注意文本特征也不能直接使用,需要进行分词,编码等处理。
- 统计特征: 从原始数据中使用统计方法得到的高级特征,常用的统计特征包括:平均值,中位数,求和,最大值,最小值。
3 Spark ML的数据类型
3.1 本地向量 (Local Vector)
首先建立一个向量(1.0,0.0,3.0),采用3种方式:
import org.apache.spark.mllib.linalg.{Vector, Vectors} ## 创建稠密矩阵 val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) ## 基于索引(0,2)和值(1,3)创建稀疏向量 val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) ## 基于序列方式(0,1)(2,3.0) val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))) 复制代码
3.2 标记点 (LabeledPoint)
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint ##通过稠密向量创建标记点 val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) println(pos.features) println(pos.label) ##通过稀疏向量创建 val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) ##libSVM的数据格式(稀疏) Label 1:value 2:value -15 1:0.708 3:-0.3333 (标签为-15 -> 向量为(1,0,3) ) 表明第2个特征值为0,从编程的角度来说,这样做可以减少内存的使用,并提高做矩阵内积时的运算速度。 import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") examples.foreach(println) 复制代码
3.3 本地矩阵
import org.apache.spark.mllib.linalg.{Matrix, Matrices} val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) 注意:稀疏本地矩阵暗藏玄机: val sm: Matrix =Matrices.sparse(4,4,Array(0,1,5,6,7),Array(0,0,1,2,3,3,3),Array(9,6,8,5,1,7,10)) 复制代码
4 分布式矩阵
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix} import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 复制代码
4.1 面向行的分布式矩阵
val v0=Vectors.dense(1.0,0.0,3.0) val v1=Vectors.dense(3.0,2.0,4.0) val rows =sc.parallelize(Seq(v0,v1)) val rows: RDD[Vector] = ... // an RDD of local vectors // Create a RowMatrix from an RDD[Vector]. val mat: RowMatrix = new RowMatrix(rows) 向量运算: val m = mat.numRows() val n = mat.numCols() mat.rows.foreach(println) 通过computeColumnSummaryStatistics()方法获取统计摘要 scala> val summary = mat.computeColumnSummaryStatistics() summary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@4ecc4b43 最大向量 summary.max res5: org.apache.spark.mllib.linalg.Vector = [3.0,2.0,4.0] 以通过summary实例来获取矩阵的相关统计信息,例如行数 scala> summary.count res1: Long = 2 方差向量 scala> summary.variance res2: org.apache.spark.mllib.linalg.Vector = [2.0,2.0,0.5] 平均向量 scala> summary.mean res3: org.apache.spark.mllib.linalg.Vector = [2.0,1.0,3.5] L1范数向量 scala> summary.normL1 res4: org.apache.spark.mllib.linalg.Vector = [4.0,2.0,7.0] 复制代码
4.2 行索引矩阵(IndexedRowMatrix)
索引行矩阵IndexedRowMatrix与RowMatrix相似,但它的每一行都带有一个有意义的行索引值,这个索引值可以被用来识别不同行,或是进行诸如join之类的操作。其数据存储在一个由IndexedRow组成的RDD里,即每一行都是一个带长整型索引的本地向量。
与RowMatrix类似,IndexedRowMatrix的实例可以通过RDD[IndexedRow]实例来创建
val dv1=Vectors.dense(1.0,0.0,3.0) val dv2=Vectors.dense(3.0,2.0,4.0) 通过本地向量dv1 dv2来创建对应的IndexedRow scala> val idxr1 = IndexedRow(1,dv1) idxr1: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(1,[1.0,2.0,3.0]) 通过IndexedRow创建RDD[IndexedRow] scala> val idxr2 = IndexedRow(2,dv2) idxr2: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(2,[2.0,3.0,4.0]) scala> val idxrows = sc.parallelize(Array(idxr1,idxr2)) idxrows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.IndexedRow] = ParallelCollectionRDD[3] at parallelize at <console>:35 通过RDD[IndexedRow]创建一个索引行矩阵 scala> val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows) idxmat: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@7951a08c scala> idxmat.rows.foreach(println) IndexedRow(1,[1.0,2.0,3.0]) IndexedRow(2,[2.0,3.0,4.0]) idxmat.computeColumnSummaryStatistics() 复制代码
4.3 坐标矩阵(Coordinate Matrix)
坐标矩阵CoordinateMatrix是一个基于矩阵项构成的RDD的分布式矩阵。每一个矩阵项MatrixEntry都是一个三元组:(i: Long, j: Long, value: Double),其中i是行索引,j是列索引,value是该位置的值。坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候使用。 CoordinateMatrix实例可通过RDD[MatrixEntry]实例来创建,其中每一个矩阵项都是一个(rowIndex, colIndex, elem)的三元组:
scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 创建两个矩阵项ent1和ent2,每一个矩阵项都是由索引和值构成的三元组 scala> val ent1 = new MatrixEntry(0,1,0.5) ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,1,0.5) 创建两个矩阵项ent1和ent2,每一个矩阵项都是由索引和值构成的三元组 scala> val ent2 = new MatrixEntry(2,2,1.8) ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.8) 创建RDD[MatrixEntry] scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2)) entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[4] at parallelize at <console>:36 scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@8862881 scala> coordMat.entries.foreach(println) MatrixEntry(0,1,0.5) MatrixEntry(2,2,1.8) 将coordMat进行转置 scala> val transMat: CoordinateMatrix = coordMat.transpose() transMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@6b945576 将coordMat进行转置 scala> transMat.entries.foreach(println) MatrixEntry(1,0,0.5) MatrixEntry(2,2,1.8) 将坐标矩阵转换成一个索引行矩阵 val indexedRowMatrix = transMat.toIndexedRowMatrix() indexedRowMatrix.rows.foreach(println) IndexedRow(1,(3,[0],[0.5])) IndexedRow(2,(3,[2],[1.8])) 复制代码
4.4 分块矩阵(Block Matrix)
分块矩阵是基于矩阵块MatrixBlock构成的RDD的分布式矩阵,其中每一个矩阵块MatrixBlock都是一个元组((Int, Int), Matrix),其中(Int, Int)是块的索引,而Matrix则是在对应位置的子矩阵(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock决定,默认值均为1024。分块矩阵支持和另一个分块矩阵进行加法操作和乘法操作,并提供了一个支持方法validate()来确认分块矩阵是否创建成功。
分块矩阵可由索引行矩阵IndexedRowMatrix或坐标矩阵CoordinateMatrix调用toBlockMatrix()方法来进行转换,该方法将矩阵划分成尺寸默认为1024×1024的分块,可以在调用toBlockMatrix(rowsPerBlock, colsPerBlock)方法时传入参数来调整分块的尺寸。
scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} scala> import org.apache.spark.mllib.linalg.distributed.BlockMatrix import org.apache.spark.mllib.linalg.distributed.BlockMatrix scala> val ent1 = new MatrixEntry(0,0,1) ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,0,1.0) scala> val ent2 = new MatrixEntry(1,1,1) ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(1,1,1.0) scala> val ent3 = new MatrixEntry(2,0,-1) ent3: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,0,-1.0) scala> val ent4 = new MatrixEntry(2,1,2) ent4: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,1,2.0) scala> val ent5 = new MatrixEntry(2,2,1) ent5: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.0) scala> val ent6 = new MatrixEntry(3,0,1) ent6: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,0,1.0) scala> val ent7 = new MatrixEntry(3,1,1) ent7: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,1,1.0) scala> val ent8 = new MatrixEntry(3,3,1) ent8: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,3,1.0) 创建RDD[MatrixEntry] scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2,ent3,ent4,ent5,ent6,ent7,ent8)) entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[10] at parallelize at <console>:51 通过RDD[MatrixEntry]创建一个坐标矩阵 scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@7bb93e18 将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入 scala> val matA: BlockMatrix = coordMat.toBlockMatrix(2,2).cache() matA: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@5dbbfa05 可以用validate()方法判断是否分块成功 scala> matA.validate() scala> matA.toLocalMatrix res12: org.apache.spark.mllib.linalg.Matrix = 1.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 -1.0 2.0 1.0 0.0 1.0 1.0 0.0 1.0 // 查看其分块情况 scala> matA.numColBlocks res13: Int = 2 scala> matA.numRowBlocks res14: Int = 2 scala> val ata = matA.multiply(matA) ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@5722c149 scala> ata.toLocalMatrix 2018-11-16 23:30:59 WARN BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 2018-11-16 23:30:59 WARN BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS res15: org.apache.spark.mllib.linalg.Matrix = 1.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 -2.0 4.0 1.0 0.0 2.0 2.0 0.0 1.0 // 计算矩阵A和其转置矩阵的积矩阵 scala> val ata = matA.transpose.multiply(matA) ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@3644e451 scala> ata.toLocalMatrix res1: org.apache.spark.mllib.linalg.Matrix = 3.0 -1.0 -1.0 1.0 -1.0 6.0 2.0 1.0 -1.0 2.0 1.0 0.0 1.0 1.0 0.0 1.0 有关分块矩阵的另类操作及解释请参考:https://blog.csdn.net/legotime/article/details/51089644 复制代码
5 汇总基础统计算法
scala> import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vector scala> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} scala> val data =sc.parallelize(1 to 100,2) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:37 scala> val data1 =data.map(x=>Vectors.dense(x)) data1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[29] at map at <console>:38 scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(data1) summary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@35a2c933 scala> summary.count res24: Long = 100 scala> summary.max res25: org.apache.spark.mllib.linalg.Vector = [100.0] scala> summary.mean res26: org.apache.spark.mllib.linalg.Vector = [50.5] scala> summary.normL1 res27: org.apache.spark.mllib.linalg.Vector = [5050.0] scala> summary.normL2 res28: org.apache.spark.mllib.linalg.Vector = [581.6786054171153] scala> summary.numNonzeros res29: org.apache.spark.mllib.linalg.Vector = [100.0] scala> summary.variance res30: org.apache.spark.mllib.linalg.Vector = [841.6666666666666] 复制代码
6 相关性统计
举例如下:比如分析身高和体重,我们会问个问题:.身高越高,体重是不是越重?问题细分为两个方向:1,身高越高,体重越重还是越轻。2,身高每增加 1 ,体重又是增加多少或减少多少。這就是相关性的两个重要要素:相关的方向和相关的强度。对于相关的方向很好理解,就是正相关、负相关还是无关。对于问题2,有不同的人产生了不同的 定义相关性强度的思想
val x =sc.parallelize(Array(1,2,3,4,5)) val y =sc.parallelize(Array(1,2,3,4,5)) scala> val correlation: Double = Statistics.corr(x,y, "pearson") correlation: Double = 0.9999999999999998 scala> val correlation: Double = Statistics.corr(x,y, "spearman") correlation: Double = 0.9999999999999998 复制代码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。