Spark ML 数值类型与数据汇总基础统计算法详解-Spark商业ML实战

栏目: Scala · 发布时间: 7年前

内容简介:Spark ML由分类,回归,聚类,协同过滤,降维等。其中基于机器学习算法可以构建流水线PipeLine。spark基于DataFrames高层次API,通过机器学习管道构建整套机器学习算法库。只有把原始特征转化为特征向量,才能用于机器学习模型的训练。常用特征主要有以下几种:

Spark ML由分类,回归,聚类,协同过滤,降维等。其中基于机器学习算法可以构建流水线PipeLine。

Spark ML 数值类型与数据汇总基础统计算法详解-Spark商业ML实战

spark基于DataFrames高层次API,通过机器学习管道构建整套机器学习算法库。

2 特征向量化

只有把原始特征转化为特征向量,才能用于机器学习模型的训练。常用特征主要有以下几种:

  • 数值特征:主要针对数值类型。
  • 类别特征:类别特征是可穷举的值。类别特征不能直接使用,一般需要对特征进行编号,将其转化为数值特征。
  • 文本特征:从文本中提取出来的特征,如:电影评论。注意文本特征也不能直接使用,需要进行分词,编码等处理。
  • 统计特征: 从原始数据中使用统计方法得到的高级特征,常用的统计特征包括:平均值,中位数,求和,最大值,最小值。

3 Spark ML的数据类型

3.1 本地向量 (Local Vector)

首先建立一个向量(1.0,0.0,3.0),采用3种方式:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
    ## 创建稠密矩阵
    val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
    
    ## 基于索引(0,2)和值(1,3)创建稀疏向量
    val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
    
    ## 基于序列方式(0,1)(2,3.0)
    val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
复制代码

3.2 标记点 (LabeledPoint)

import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.regression.LabeledPoint
    
     ##通过稠密向量创建标记点 
    val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
    println(pos.features)
    println(pos.label)
    
    ##通过稀疏向量创建
    val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

    ##libSVM的数据格式(稀疏)
    
    Label    1:value    2:value
    
    -15 1:0.708 3:-0.3333  (标签为-15 -> 向量为(1,0,3) )
    表明第2个特征值为0,从编程的角度来说,这样做可以减少内存的使用,并提高做矩阵内积时的运算速度。
   
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.util.MLUtils
    import org.apache.spark.rdd.RDD
   
    val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
    examples.foreach(println)
复制代码

3.3 本地矩阵

import org.apache.spark.mllib.linalg.{Matrix, Matrices}
    val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
    
    注意:稀疏本地矩阵暗藏玄机:
    val sm: Matrix =Matrices.sparse(4,4,Array(0,1,5,6,7),Array(0,0,1,2,3,3,3),Array(9,6,8,5,1,7,10))
复制代码
Spark ML 数值类型与数据汇总基础统计算法详解-Spark商业ML实战
Spark ML 数值类型与数据汇总基础统计算法详解-Spark商业ML实战

4 分布式矩阵

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
复制代码

4.1 面向行的分布式矩阵

val v0=Vectors.dense(1.0,0.0,3.0)
val v1=Vectors.dense(3.0,2.0,4.0)

val rows =sc.parallelize(Seq(v0,v1))
val rows: RDD[Vector] = ... // an RDD of local vectors
 
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
 
 
向量运算:
val m = mat.numRows()
val n = mat.numCols()
mat.rows.foreach(println)

通过computeColumnSummaryStatistics()方法获取统计摘要
scala> val summary = mat.computeColumnSummaryStatistics()

summary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@4ecc4b43

最大向量
summary.max
res5: org.apache.spark.mllib.linalg.Vector = [3.0,2.0,4.0]

以通过summary实例来获取矩阵的相关统计信息,例如行数
scala> summary.count
res1: Long = 2

方差向量
scala> summary.variance
res2: org.apache.spark.mllib.linalg.Vector = [2.0,2.0,0.5]

平均向量
scala> summary.mean
res3: org.apache.spark.mllib.linalg.Vector = [2.0,1.0,3.5]

L1范数向量
scala> summary.normL1
res4: org.apache.spark.mllib.linalg.Vector = [4.0,2.0,7.0]
复制代码

4.2 行索引矩阵(IndexedRowMatrix)

索引行矩阵IndexedRowMatrix与RowMatrix相似,但它的每一行都带有一个有意义的行索引值,这个索引值可以被用来识别不同行,或是进行诸如join之类的操作。其数据存储在一个由IndexedRow组成的RDD里,即每一行都是一个带长整型索引的本地向量。

与RowMatrix类似,IndexedRowMatrix的实例可以通过RDD[IndexedRow]实例来创建

val dv1=Vectors.dense(1.0,0.0,3.0)
val dv2=Vectors.dense(3.0,2.0,4.0)

通过本地向量dv1 dv2来创建对应的IndexedRow
scala> val idxr1 = IndexedRow(1,dv1)
idxr1: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(1,[1.0,2.0,3.0])

通过IndexedRow创建RDD[IndexedRow]
scala> val idxr2 = IndexedRow(2,dv2)
idxr2: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(2,[2.0,3.0,4.0])


scala> val idxrows = sc.parallelize(Array(idxr1,idxr2))
idxrows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.IndexedRow] = ParallelCollectionRDD[3] at parallelize at <console>:35

通过RDD[IndexedRow]创建一个索引行矩阵
scala> val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)
idxmat: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@7951a08c

scala> idxmat.rows.foreach(println)
IndexedRow(1,[1.0,2.0,3.0])
IndexedRow(2,[2.0,3.0,4.0])

idxmat.computeColumnSummaryStatistics()
复制代码

4.3 坐标矩阵(Coordinate Matrix)

坐标矩阵CoordinateMatrix是一个基于矩阵项构成的RDD的分布式矩阵。每一个矩阵项MatrixEntry都是一个三元组:(i: Long, j: Long, value: Double),其中i是行索引,j是列索引,value是该位置的值。坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候使用。 CoordinateMatrix实例可通过RDD[MatrixEntry]实例来创建,其中每一个矩阵项都是一个(rowIndex, colIndex, elem)的三元组:

scala>  import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    
    创建两个矩阵项ent1和ent2,每一个矩阵项都是由索引和值构成的三元组
    scala> val ent1 = new MatrixEntry(0,1,0.5)
    ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,1,0.5)
    
    创建两个矩阵项ent1和ent2,每一个矩阵项都是由索引和值构成的三元组
    scala> val ent2 = new MatrixEntry(2,2,1.8)
    ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.8)
    
    创建RDD[MatrixEntry]
    scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2))
    entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[4] at parallelize at <console>:36
    
    scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
    coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@8862881
    
    scala> coordMat.entries.foreach(println)
    MatrixEntry(0,1,0.5)
    MatrixEntry(2,2,1.8)
    
    将coordMat进行转置
    scala> val transMat: CoordinateMatrix = coordMat.transpose()
    transMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@6b945576
    
    将coordMat进行转置
    scala>  transMat.entries.foreach(println)
    MatrixEntry(1,0,0.5)
    MatrixEntry(2,2,1.8)

    将坐标矩阵转换成一个索引行矩阵
    val indexedRowMatrix = transMat.toIndexedRowMatrix()
    indexedRowMatrix.rows.foreach(println)
    
    IndexedRow(1,(3,[0],[0.5]))
    IndexedRow(2,(3,[2],[1.8]))
复制代码

4.4 分块矩阵(Block Matrix)

分块矩阵是基于矩阵块MatrixBlock构成的RDD的分布式矩阵,其中每一个矩阵块MatrixBlock都是一个元组((Int, Int), Matrix),其中(Int, Int)是块的索引,而Matrix则是在对应位置的子矩阵(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock决定,默认值均为1024。分块矩阵支持和另一个分块矩阵进行加法操作和乘法操作,并提供了一个支持方法validate()来确认分块矩阵是否创建成功。

分块矩阵可由索引行矩阵IndexedRowMatrix或坐标矩阵CoordinateMatrix调用toBlockMatrix()方法来进行转换,该方法将矩阵划分成尺寸默认为1024×1024的分块,可以在调用toBlockMatrix(rowsPerBlock, colsPerBlock)方法时传入参数来调整分块的尺寸。

Spark ML 数值类型与数据汇总基础统计算法详解-Spark商业ML实战
scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    
    scala>  import org.apache.spark.mllib.linalg.distributed.BlockMatrix
    import org.apache.spark.mllib.linalg.distributed.BlockMatrix
    
    scala> val ent1 = new MatrixEntry(0,0,1)
    ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,0,1.0)
    
    scala> val ent2 = new MatrixEntry(1,1,1)
    ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(1,1,1.0)
    
    scala> val ent3 = new MatrixEntry(2,0,-1)
    ent3: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,0,-1.0)
    
    scala> val ent4 = new MatrixEntry(2,1,2)
    ent4: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,1,2.0)
    
    scala> val ent5 = new MatrixEntry(2,2,1)
    ent5: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.0)
    
    scala> val ent6 = new MatrixEntry(3,0,1)
    ent6: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,0,1.0)
    
    scala> val ent7 = new MatrixEntry(3,1,1)
    ent7: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,1,1.0)
    
    scala>  val ent8 = new MatrixEntry(3,3,1)
    ent8: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(3,3,1.0)
    
    
    创建RDD[MatrixEntry]
    scala>  val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2,ent3,ent4,ent5,ent6,ent7,ent8))
    entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[10] at parallelize at <console>:51
    
    通过RDD[MatrixEntry]创建一个坐标矩阵
    scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
    coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@7bb93e18
    
    
    将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入
    scala> val matA: BlockMatrix = coordMat.toBlockMatrix(2,2).cache()
    matA: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@5dbbfa05
    
    可以用validate()方法判断是否分块成功
    scala> matA.validate()
    
    scala> matA.toLocalMatrix
    res12: org.apache.spark.mllib.linalg.Matrix =
    1.0   0.0  0.0  0.0
    0.0   1.0  0.0  0.0
    -1.0  2.0  1.0  0.0
    1.0   1.0  0.0  1.0
    
    
    // 查看其分块情况
    scala>  matA.numColBlocks
    res13: Int = 2
    
    scala> matA.numRowBlocks
    res14: Int = 2
    
    scala> val ata = matA.multiply(matA)
    ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@5722c149
    
    scala> ata.toLocalMatrix
    2018-11-16 23:30:59 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
    2018-11-16 23:30:59 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
    res15: org.apache.spark.mllib.linalg.Matrix =
    1.0   0.0  0.0  0.0
    0.0   1.0  0.0  0.0
    -2.0  4.0  1.0  0.0
    2.0   2.0  0.0  1.0 
    
    
    // 计算矩阵A和其转置矩阵的积矩阵
    scala> val ata = matA.transpose.multiply(matA)
    ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@3644e451
    scala> ata.toLocalMatrix
    res1: org.apache.spark.mllib.linalg.Matrix =
    3.0   -1.0  -1.0  1.0
    -1.0  6.0   2.0   1.0
    -1.0  2.0   1.0   0.0
    1.0   1.0   0.0   1.0

 有关分块矩阵的另类操作及解释请参考:https://blog.csdn.net/legotime/article/details/51089644
复制代码

5 汇总基础统计算法

scala> import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vector

scala> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

scala> val data =sc.parallelize(1 to 100,2)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:37

scala> val data1 =data.map(x=>Vectors.dense(x))
data1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[29] at map at <console>:38

scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(data1)
summary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary = 	   
org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@35a2c933

scala> summary.count
res24: Long = 100

scala> summary.max
res25: org.apache.spark.mllib.linalg.Vector = [100.0]

scala> summary.mean
res26: org.apache.spark.mllib.linalg.Vector = [50.5]

scala> summary.normL1
res27: org.apache.spark.mllib.linalg.Vector = [5050.0]

scala> summary.normL2
res28: org.apache.spark.mllib.linalg.Vector = [581.6786054171153]

scala> summary.numNonzeros
res29: org.apache.spark.mllib.linalg.Vector = [100.0]

scala> summary.variance
res30: org.apache.spark.mllib.linalg.Vector = [841.6666666666666]
复制代码

6 相关性统计

举例如下:比如分析身高和体重,我们会问个问题:.身高越高,体重是不是越重?问题细分为两个方向:1,身高越高,体重越重还是越轻。2,身高每增加 1 ,体重又是增加多少或减少多少。這就是相关性的两个重要要素:相关的方向和相关的强度。对于相关的方向很好理解,就是正相关、负相关还是无关。对于问题2,有不同的人产生了不同的 定义相关性强度的思想

val x =sc.parallelize(Array(1,2,3,4,5))
val y =sc.parallelize(Array(1,2,3,4,5))

scala> val correlation: Double = Statistics.corr(x,y, "pearson")
correlation: Double = 0.9999999999999998

scala> val correlation: Double = Statistics.corr(x,y, "spearman")
correlation: Double = 0.9999999999999998
复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Node.js in Action

Node.js in Action

Mike Cantelon、Marc Harter、TJ Holowaychuk、Nathan Rajlich / Manning Publications / 2013-11-25 / USD 44.99

* Simplifies web application development * Outlines valuable online resources * Teaches Node.js from the ground up Node.js is an elegant server-side JavaScript development environment perfect for scal......一起来看看 《Node.js in Action》 这本书的介绍吧!

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具