Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

栏目: 编程工具 · 发布时间: 5年前

内容简介:在这浮躁的社会沉静,用心记录,用心学习!

Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

【Spark排序算法系列】主要介绍的是目前推荐系统或者广告点击方面用的比较广的几种算法,和他们在Spark中的应用实现,本篇文章主要介绍LR算法。

【Spark排序算法系列】主要介绍的是目前推荐系统或者广告点击方面用的比较广的几种算法,和他们在Spark中的应用实现,本篇文章主要介绍LR算法。

本系列还包括(持续更新):

  • Spark排序算法系列之GBDT(梯度提升决策树)

  • Spark排序算法系列之模型融合(GBDT+LR)

  • Spark排序算法系列之XGBoost

  • Spark排序算法系列之FTRL(Follow-the-regularized-Leader)

  • Spark排序算法系列之FM与FFM

背景

逻辑回归(Logistic Regression,LR)是较早应用在推荐 排序 上的,其属于线性模型,模型简单,可以引入海量离散特征,这样的好处就是模型可以考虑更加细节或者说针对具体个体的因素。如果想要引入非线性因素需要做特征交叉,这样很容易产生百亿特征,在很早之前ctr就主要靠堆人力搞特征工程工作来持续优化效果。

虽然目前在工业界LR应用的并不多,但是对于初学者,一些中小企业或者应用场景不需要负责排序模型的时候,LR扔不失为一个不错的选择。

关于LR的算法原理,这里不做过多说明,可参考:

  • 回归分析之逻辑回归-Logistic Regression

  • 线性模型篇之Logistic Regression数学公式推导

LR介绍

LR的数学表达式可以简写为:

Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

对于二分类模型,LR是一个分类算法,模型计算得到预测值后会通过以下函数进转化。

Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

如果L(w,x,y) > 0.5 则是1 否则为0。当然在实际应用过程中,并不是一定取0.5作为界限值,而是根据实际情况进行调整。

二进制回归可以转化为多分类回归问题。关于多分类介绍和基于Spark实现多分类可参考多分类实现方式介绍和在Spark上实现多分类逻辑回归(Multinomial Logistic Regression)

在Spark.mllib包中提供了两种LR分类模型,分别是:

  • mini-batch gradient descent(LogisticRegressionWithLBFGS)

  • L-BFGS(LogisticRegressionWithSGD)

但官方给出的建议是:推荐使用LBFGS,因为基于LBFGS的LR比基于SGD的能更快的收敛。其原话如下:

We implemented two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. We recommend L-BFGS over mini-batch gradient descent for faster convergence.

而且LRWithLBFGS不仅支持二分类还支持多分类,但LRWithSGD只支持二分类。所以后续只介绍下Spark mllib中的LogisticRegressionWithLBFGS相关操作。

mllib中的LRWithLBFGS

设置变量和创建spark对象

val file = "data/sample_libsvm_data.txt"
val model_path = "model/lr/"
val model_param = "numInterations:5,regParam:0.1,updater:SquaredL2Updater,gradient:LogisticGradient"

val spark = SparkSession.builder()
    .master("local[5]")
    .appName("LogisticRegression_Model_Train")
    .getOrCreate()
Logger.getRootLogger.setLevel(Level.WARN)

拆分数据集

// 记载数据集 并拆分成训练集和测试集
val data = MLUtils.loadLibSVMFile(spark.sparkContext,file).randomSplit(Array(0.7,0.3))
val (train, test) = (data(0), data(1))

LRWithLBFGS模型设置参数

// 定义分类的数目,默认为2,是logisticregression的参数
private var numClass: Int = 2
// 定义是否添加截距,默认值为false,是logisticregression的参数
private var isAddIntercept: Option[Boolean] = None
// 定义是否在训练模型前进行验证,是logisticregression的参数
private var isValidateData: Option[Boolean] = None

// 定义迭代的次数,默认值是100,LBFGS的参数
private var numInterations: Option[Int] = None
// 定义正则化系数值,默认值是0.0,LBFGS的参数
private var regParam: Option[Double] = None
// 定义正则化参数,支持:L1Updater[L1]、SquaredL2Updater[L2]、SimpleUpdater[没有正则项],LBFGS的参数
private var updater: Option[String] = None
// 定义计算梯度的方式,支持:LogisticGradient、LeastSquaresGradient、HingeGradient ,LBFGS的参数
private var gradient: Option[String] = None
// 人工定义的收敛阈值
private var threshold:Option[Double]=None
// 定义模型收敛阈值,默认为 10^-6
private var convergenceTol: Double= 1.0e-6

创建模型

def createLRModel(model_param: String): LogisticRegressionWithLBFGS={
    // 设置模型参数
    val optimizer = new LROptimizer()
    optimizer.parseString(model_param)
    println(s"模型训练参数为:${optimizer.toString}")

    // 创建模型并指定相关参数
    val LRModel = new LogisticRegressionWithLBFGS()
    // 设置分类数目
    LRModel.setNumClasses(optimizer.getNumClass)
    // 设置是否添加截距
    if(optimizer.getIsAddIntercept.nonEmpty) {LRModel.setIntercept(optimizer.getIsAddIntercept.get)}
    // 设置是否进行验证模型
    if(optimizer.getIsValidateData.nonEmpty){LRModel.setValidateData(optimizer.getIsValidateData.get)}
    // 设置迭代次数
    if(optimizer.getNumInterations.nonEmpty){LRModel.optimizer.setNumIterations((optimizer.getNumInterations.get))}
    // 设置正则项参数
    if(optimizer.getRegParam.nonEmpty) { LRModel.optimizer.setRegParam(optimizer.getRegParam.get) }
    // 设置正则化参数
    if(optimizer.getUpdater.nonEmpty){
        optimizer.getUpdater match {
            case Some("L1Updater") => LRModel.optimizer.setUpdater( new L1Updater())
            case Some("SquaredL2Updater") => LRModel.optimizer.setUpdater(new SquaredL2Updater())
            case Some("SimpleUpdater") => LRModel.optimizer.setUpdater(new SimpleUpdater())
            case _ => LRModel.optimizer.setUpdater(new SquaredL2Updater())
        }
    }
    // 设置梯度计算方式
    if(optimizer.getGradient.nonEmpty){
        optimizer.getGradient match {
            case Some("LogisticGradient") => LRModel.optimizer.setGradient(new LogisticGradient())
            case Some("LeastSquaresGradient") => LRModel.optimizer.setGradient(new LeastSquaresGradient())
            case Some("HingeGradient") => LRModel.optimizer.setGradient(new HingeGradient())
            case _ => LRModel.optimizer.setGradient(new LogisticGradient())
        }
    }
    // 设置收敛阈值
    if(optimizer.getThreshold.nonEmpty){ LRModel.optimizer.setConvergenceTol(optimizer.getThreshold.get)}
    else {LRModel.optimizer.setConvergenceTol(optimizer.getConvergenceTol)}

    LRModel
}

模型效果评估

    def evaluteResult(result: RDD[(Double,Double,Double)]) :Unit = {
        // MSE
        val testMSE = result.map{ case(real, pre, _) => math.pow((real - pre), 2)}.mean()
        println(s"Test Mean Squared Error = $testMSE")
        // AUC
        val metrics = new BinaryClassificationMetrics(result.map(x => (x._2,x._1)).sortByKey(ascending = true),numBins = 2)
        println(s"0-1 label AUC is = ${metrics.areaUnderROC}")
        val metrics1 = new BinaryClassificationMetrics(result.map(x => (x._3,x._1)).sortByKey(ascending = true),numBins = 2)
        println(s"score-label AUC is = ${metrics1.areaUnderROC}")
        // 错误率
        val error = result.filter(x => x._1!=x._2).count().toDouble / result.count()
        println(s"error is = $error")
        // 准确率
        val accuracy = result.filter(x => x._1==x._2).count().toDouble / result.count()
        println(s"accuracy is = $accuracy")
    }

保存模型

    def saveModel(model: LogisticRegressionModel, model_path: String): Unit = {
        // 保存模型文件 obj
        val out_obj = new ObjectOutputStream(new FileOutputStream(model_path+"model.obj"))
        out_obj.writeObject(model)

        // 保存模型信息
        val model_info=new BufferedWriter(new FileWriter(model_path+"model_info.txt"))
        model_info.write(model.toString())
        model_info.flush()
        model_info.close()

        // 保存模型权重
        val model_weights=new BufferedWriter(new FileWriter(model_path+"model_weights.txt"))
        model_weights.write(model.weights.toString)
        model_weights.flush()
        model_weights.close()

        println(s"模型信息写入文件完成,路径为:$model_path")
    }

加载模型

    def loadModel(model_path: String): Option[LogisticRegressionModel] = {
        try{
            val in = new ObjectInputStream( new FileInputStream(model_path) )
            val model = Option( in.readObject().asInstanceOf[LogisticRegressionModel] )
            in.close()
            println("Model Load Success")
            model
        }
        catch {
            case ex: ClassNotFoundException => {
                println(ex.printStackTrace())
                None
            }
            case ex: IOException => {
                println(ex.printStackTrace())
                println(ex)
                None
            }
            case _: Throwable => throw new Exception
        }
    }

使用加载的模型进行分值计算

    // 加载obj文件进行预测
    val model_new = loadModel(s"$model_path/model.obj")
    // 使用加载的模型进行样例预测
    val result_new = test.map(line =>{
        val pre_label = model_new.get.predict(line.features)
        // blas.ddot(x.length, x,1,y,1) (向量x的长度,向量x,向量x的索引递增间隔,向量y,向量y的索引递增间隔)
        val pre_score = blas.ddot(model.numFeatures, line.features.toArray, 1, model.weights.toArray, 1)
        val score = Math.pow(1+Math.pow(Math.E, -2 * pre_score), -1)
        (line.label, pre_label,score)
    } )
    result_new.take(2).foreach(println)

ml中的二分类LR

ml包中的LR既可以用来做二分类,也可以用来做多分类。

  • 二分类对应:Binomial logistic regression

  • 多分类对应:multinomial logistic regression

其中二分类可以通过Binomial logistic regression 和 multinomial logistic regression实现。

基于Binomial logistic regression的LR实现:

def BinaryModel(train: Dataset[Row], model_path: String, spark: SparkSession) = {
    // 创建模型
    val LRModel = new LogisticRegression()
        .setMaxIter(20)
        .setRegParam(0.3)
        .setElasticNetParam(0.8)
    // 训练评估模型
    val model = LRModel.fit(train)
    evalute(model, train, spark)
}

def evalute(model: LogisticRegressionModel, train: Dataset[Row], spark: SparkSession):Unit = {
        // 打印模型参数
        println(s"模型参数信息如下:\n ${model.parent.explainParams()} \n")
        println(s"Coefficients(系数): ${model.coefficients}")
        println(s"Intercept(截距): ${model.intercept}")
        // 查看训练集的预测结果 rawPrediction:row 计算的分值,probability:经过sigmoid转换后的概率
        val result = model.evaluate(train)
        result.predictions.show(10)
        // 将 label,0 值概率,predict label提取出来
        result.predictions.select("label","probability","prediction").rdd
            .map(row => (row.getDouble(0),row.get(1).asInstanceOf[DenseVector].toArray(0),row.getDouble(2)))
            .take(10).foreach(println)
        // 模型评估
        val trainSummary = model.summary
        val objectiveHistory = trainSummary.objectiveHistory
        println("objectiveHistoryLoss:")
        objectiveHistory.foreach(loss => println(loss))

        val binarySummary = trainSummary.asInstanceOf[BinaryLogisticRegressionSummary]

        val roc = binarySummary.roc
        roc.show()
        println(s"areaUnderROC: ${binarySummary.areaUnderROC}")

        // Set the model threshold to maximize F-Measure
        val fMeasure = binarySummary.fMeasureByThreshold
        fMeasure.show(10)
        val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
        import spark.implicits ._
        val bestThreshold = fMeasure.where($"F-Measure"===maxFMeasure).select("threshold").head().getDouble(0)
        model.setThreshold(bestThreshold)
    }

基于Multimial logistic regression的LR实现:

def BinaryModelWithMulti(train: Dataset[Row], model_path: String, spark: SparkSession) = {
    // 创建模型
    val LRModel = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.3)
        .setElasticNetParam(0.8)
        .setFamily("multinomial")
    // 训练模型
    val model = LRModel.fit(train)
    // 打印模型参数
    println(s"模型参数信息如下:\n ${model.parent.explainParams()} \n")
    println(s"Coefficients(系数): ${model.coefficientMatrix}")
    println(s"Intercept(截距): ${model.interceptVector}")
}

ml中的多分类LR

某条样本属于类别k的概率计算为:

Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

其中K表示类别,J表示特征个数

权重最小化使用的是最大似然函数,其更新公式如下:

Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

使用的数据集形式为:

1 1:-0.222222 2:0.5 3:-0.762712 4:-0.833333
1 1:-0.555556 2:0.25 3:-0.864407 4:-0.916667
1 1:-0.722222 2:-0.166667 3:-0.864407 4:-0.833333
1 1:-0.722222 2:0.166667 3:-0.694915 4:-0.916667
0 1:0.166667 2:-0.416667 3:0.457627 4:0.5
1 1:-0.833333 3:-0.864407 4:-0.916667
2 1:-1.32455e-07 2:-0.166667 3:0.220339 4:0.0833333
2 1:-1.32455e-07 2:-0.333333 3:0.0169491 4:-4.03573e-08

多分类LR模型实现为:

def MultiModel(file_multi: String, spark: SparkSession, model_path: String): Unit = {
    val training = spark.read.format("libsvm").load(file_multi)
    val lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.3)
        .setElasticNetParam(0.8)

    // Fit the model
    val lrModel = lr.fit(training)

    // Print the coefficients and intercept for multinomial logistic regression
    println(s"Coefficients: \n${lrModel.coefficientMatrix}")
    println(s"Intercepts: ${lrModel.interceptVector}")
}

参考资料

  • https://spark.apache.org/docs/2.1.0/mllib-linear-methods.html#classification

  • https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#logistic-regression

  • https://blog.csdn.net/pupilxmk/article/details/80735599

在这浮躁的社会沉静,用心记录,用心学习!

Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

关于【数据与算法联盟】

专注于推荐系统,深度学习,机器学习,数据挖掘,云计算,人工智能,架构和编程等技术干货的分享和探讨,偶尔会推送一些福利,文字,摄影和游记,扫码关注,不再孤单。

Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍

更多干货,扫码关注

相关文章

Spark排序算法系列之GBTs使用方式介绍

Spark排序算法系列之GBTs基础——三种梯度下降算法

Spark排序算法系列之GBTs基础——梯度上升和梯度下降

多分类实现方式介绍和在Spark上实现多分类逻辑回归

多分类逻辑回归(Multinomial Logistic Regression)

集成学习(Ensemble Learning)

欢迎投稿,凡是投稿一经录用者,赠送技术图书和相关学习资料

国内各大互联网公司,可内推

关注公众号,加小编微信,拉你进

数据与算法交流群

你点的每个 “在 看” ,我都认真当成了喜欢


以上所述就是小编给大家介绍的《Spark 排序算法系列之(MLLib、ML)LR 使用方式介绍》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Linux设备驱动程序

Linux设备驱动程序

科波特 / 魏永明、耿岳、钟书毅 / 中国电力出版社 / 2006-1-1 / 69.00元

本书是经典著作《Linux设备驱动程序》的第三版。如果您希望在Linux操作系统上支持计算机外部设备,或者在Linux上运行新的硬件,或者只是希望一般性地了解Linux内核的编程,就一定要阅读本书。本书描述了如何针对各种设备编写驱动程序,而在过去,这些内容仅仅以口头形式交流,或者零星出现在神秘的代码注释中。 本书的作者均是Linux社区的领导者。Jonathan Corbet虽不是专职的内核......一起来看看 《Linux设备驱动程序》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具