内容简介:spark2.0开始引入dataframe作为RDD的上层封装,以屏蔽RDD层次的复杂操作,本文使用spark milib中ml机器学习库进行新闻文本多分类预测,包含数据预预处理,分词,标签和特征向量化转换、多分类模型训练(包含朴素贝叶斯、逻辑回归、决策树和随机森林),多分类模型预测和模型评估等完整的机器学习demo。本文分词方法选用HanLP分词工具(文档丰富、算法公开、代码开源,并且经测试分词效果比较好)。本文使用的数据为4类新闻,每条数据包含标签,标题,时间和新闻内容,以"\u00EF"符号作为分割符
文本分类
spark2.0开始引入dataframe作为RDD的上层封装,以屏蔽RDD层次的复杂操作,本文使用spark milib中ml机器学习库进行新闻文本多分类预测,包含数据预预处理,分词,标签和特征向量化转换、多分类模型训练(包含朴素贝叶斯、逻辑回归、决策树和随机森林),多分类模型预测和模型评估等完整的机器学习demo。本文分词方法选用HanLP分词工具(文档丰富、算法公开、代码开源,并且经测试分词效果比较好)。
1.数据预处理
1.1文本数据
本文使用的数据为4类新闻,每条数据包含标签,标题,时间和新闻内容,以"\u00EF"符号作为分割符,数据格式如下:
1.2预处理流程
文本清洗->标签索引化->内容文本分词->去除停用词->分词取前5000个词作为特征->特征向量化->保存预处理模型->调用预处理模型->输出预处理数据(indexedLabel,features)
1.3标签索引化
首先将文本读取成Dataframe格式,将标签列数据索引化,{文化,经济,军事和体育}向量化后为{0,1,2,3}
/** * 数据清洗 可根据具体数据结构和业务场景的不同进行重写. 注意: 输出必须要有标签字段"label" * @param filePath 数据路径 * @param spark SparkSession * @return 清洗后的数据, 包含字段: "label", "title", "time", "content" */ def clean(filePath: String, spark: SparkSession): DataFrame = { import spark.implicits._ val textDF = spark.sparkContext.textFile(filePath).flatMap { line => val fields = line.split("\u00EF") //分隔符:ï,分成标签,标题,时间,内容 //首页|文化新闻ï第十一届全国优秀舞蹈节目展演将在武汉举办ï2016-07-05 19:25:00ï新华社北京7月5日电(记者周玮)由文化部... //首页|财经中心|财经频道ï上半年浙江口岸原油进口量创同期历史新高ï2016-07-04 21:54:00ï杭州7月4日... if (fields.length > 3) { val categoryLine = fields(0) val categories = categoryLine.split("\\|") val category = categories.last //分成4个标签名和其他,最后去除标签为其他的数据 var label = "其他" if (category.contains("文化")) label = "文化" else if (category.contains("财经")) label = "财经" else if (category.contains("军事")) label = "军事" else if (category.contains("体育")) label = "体育" else {} //输出标签,标题,时间,内容 val title = fields(1) val time = fields(2) val content = fields(3) if (!label.equals("其他")) Some(label, title, time, content) else None } else None }.toDF("label", "title", "time", "content") //输出标签,标题,时间,内容DF textDF } /** * 处理label转换为索引形式 * @param data 输入label字段的数据 * @return 标签索引模型, 模型增加字段: "indexedLabel" */ def indexrize(data: DataFrame): StringIndexerModel = { val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(data) labelIndexer } 复制代码
predictDF.select("label","indexedLabel").show(10, truncate = false) 复制代码
1.4内容字段分词
处理内容字段,首先要进行分词,然后去除停用词以及转换为特征向量,方便分类模型进行训练和预测。本文模仿spark的ml包下的StopWordsRemover类创建了Segmenter类,用于对数据进行分词,其内部调用了HanLP分词工具。
由于spark自带的StopWordsRemover等使用的闭包仅限于ml包,自定义的类无法调用,故只是采用了与StopWordsRemover类似的使用形式,内部结构并不相同,并且由于以上原因,Segmenter类没有继承Transformer类,故无法进行pipeline管道操作,故在分类模型超参数调优过程中,没有加入分词模型的参数调优。
/** * 分词过程,包括"分词", "去除停用词" * @param data 输入需要分词的字段的数据"content" * @param params 分词参数 * @return 分词处理后的DataFrame,增加字段: "tokens", "removed" */ def segment(data: DataFrame, params: PreprocessParam): DataFrame = { val spark = data.sparkSession //设置分词模型 val segmenter = new Segmenter() .setSegmentType(params.segmentType) //分词方式 .isDelEn(params.delEn) //是否去除英语单词 .isDelNum(params.delNum) //是否去除数字 .addNature(params.addNature) //是否添加词性 .setMinTermLen(params.minTermLen) //最小词长度 .setMinTermNum(params.minTermNum) //行最小词数 .setInputCol("content") //输入内容字段 .setOutputCol("tokens") //输出分词后的字段 //进行分词 val segDF = segmenter.transform(data) 复制代码
1.5去除停用词
分词之后,需要对一些常用的无意义词如:“的”、“我们”、“是”等(统称为“停用词”)进行去除。这些词没有多大的意义,但这些词不去掉会强烈的干扰我们对特征的抽取效果。(比如:在体育分类中,“的”出现500次,“足球”共出现300次,但显然足球更能表示体育分类,而“的”反而影响体育分类的结果。
去除停用词的操作我们直接调用ml包中的StopWordsRemover类:
//读取停用词数据 val stopWordArray = spark.sparkContext.textFile(params.stopwordFilePath).collect() //设置停用词模型 val remover = new StopWordsRemover() .setStopWords(stopWordArray) .setInputCol(segmenter.getOutputCol) //读取"tokens"字段 .setOutputCol("removed") //输出删除停用词后的字段"removed" //删除停用词 val removedDF = remover.transform(segDF) removedDF } 复制代码
1.6特征向量化
由于目前常用的分类、聚类等算法都是基于向量空间模型VSM(即将对象向量化为一个N维向量,映射成N维超空间中的一个点),VSM将数据转换为向量形式,便于对大规模数据进行矩阵操作等,也可以通过计算超空间中两个点之间的距离(一般是余弦距离)来计算两个向量之间的相似度。因此,我们需要将经过处理的语料转换为向量形式,这个过程叫做向量化。
这里我们也调用spark提供的向量化类CountVectorizer类进行向量化操作:
/** * 特征向量化处理,包括词汇表过滤 * @param data 输入向量化的字段"removed" * @param params 配置参数 * @return 向量模型 */ def vectorize(data: DataFrame, params: PreprocessParam): CountVectorizerModel = { //设置向量模型 val vectorizer = new CountVectorizer() .setVocabSize(params.vocabSize) .setInputCol("removed") .setOutputCol("features") val parentVecModel = vectorizer.fit(data) //过滤停用词中没有的数字features val numPattern = "[0-9]+".r val vocabulary = parentVecModel.vocabulary.flatMap { term => if (term.length == 1 || term.matches(numPattern.regex)) None else Some(term) } val vecModel = new CountVectorizerModel(Identifiable.randomUID("cntVec"), vocabulary) .setInputCol("removed") .setOutputCol("features") vecModel } 复制代码
将字段"content"先进行分词和去除停用词得到"removed",再将所有词作为特征,进行特征向量化得到"features"字段:
在模型中可以设置出现次数最多的前5000个词作为分类用的特征,下图5000后有两个数组,第一个数值表示对应前5000个词的第几个词,第二组表示对应第一组出现的词在本条数据中的出现的次数,取出一条完整的数据看看:
1.7数据处理模型训练、保存和调用
为了方便每个模型单独训练和预测,将预处理也作为数据处理的模型进行训练,保存和调用,方法如下:
/** * 训练预处理模型 * @param filePath 数据路径 * @param spark SparkSession * @return (预处理后的数据,索引模型,向量模型) * 数据包括字段: "label", "indexedLabel", "title", "time", "content", "tokens", "removed", "features" */ def train(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = { val params = new PreprocessParam //预处理参数 val cleanDF = this.clean(filePath, spark) //读取DF,清洗数据 val indexModel = this.indexrize(cleanDF) //调用索引模型 val indexDF = indexModel.transform(cleanDF) //标签索引化 val segDF = this.segment(indexDF, params) //将内容字段分词 val vecModel = this.vectorize(segDF, params) //调用向量模型 val trainDF = vecModel.transform(segDF) //内容分词特征向量化 this.saveModel(indexModel, vecModel, params) //保存模型 (trainDF, indexModel, vecModel) } /** * 拟合预处理模型 * @param filePath 数据路径 * @param spark SparkSession * @return (预处理后的数据,索引模型,向量模型) */ def predict(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = { val params = new PreprocessParam //预处理参数 val cleanDF = this.clean(filePath, spark) //读取DF,清洗数据 val (indexModel, vecModel) = this.loadModel(params) //加载索引和向量模型 val indexDF = indexModel.transform(cleanDF) //标签索引化 val segDF = this.segment(indexDF, params) //内容字段分词 val predictDF = vecModel.transform(segDF) //内容分词特征向量化 (predictDF, indexModel, vecModel) } 复制代码
2.多分类模型训练和超参数调优
本文选用了常用的4中多分类模型对文本数据进行训练,利用了管道Pipeline + 网格搜索Gridsearch + 交叉验证CrossValidator 进行参数调优,直接将参数调优放在了训练模型里,将得到的最优模型保存。
2.1朴素贝叶斯
朴素贝叶斯算法原理
朴素贝叶斯算法是基于贝叶斯定理与特征条件独立假设的分类方法。
条件概率
P(A|B)表示事件B已经发生的前提下,事件A发生的概率,叫做事件B发生下事件A的条件概率。其基本求解公式为:
贝叶斯定理便是基于条件概率,通过P(A|B)来求P(B|A):
特征条件独立假设 朴素贝叶斯模型常用的模型主要有3个,多项式、伯努利和高斯模型:
- 当特征是离散的时候,使用多项式模型。
- 伯努利模型也适用于离散特征的情况,所不同的是,伯努利模型中每个特征的取值只能是1和0,以文本分类为例,某个单词在文档中出现过,则其特征值为1,否则为0,而本文是把单词出现的次数作为特征,所以不适应于伯努利模型
- 当特征是连续变量的时候,多项式模型及时加入平滑系数也很难描述分类特征,因此需要使用高斯模型
平滑系数
超参数平滑系数α,作用是防止后验概率为0,当α = 1时,称作Laplace平滑,当0 < α < 1时,称作Lidstone平滑,α = 0时不做平滑。本文主要对平滑系数进行调参。
/** * NB模型训练处理过程 * @param data 训练数据集 * @return nbBestModel */ def train(data: DataFrame): NaiveBayesModel = { val params = new ClassParam //NB分类模型管道训练调参 data.persist() data.show(5) //NB模型 val nbModel = new NaiveBayes() .setModelType(params.nbModelType) //多项式模型或者伯努利模型 .setSmoothing(params.smoothing) //平滑系数 .setLabelCol("indexedLabel") .setFeaturesCol("features") //建立管道,模型只有一个 stages = 0 val pipeline = new Pipeline() .setStages(Array(nbModel)) //建立网格搜索 val paramGrid = new ParamGridBuilder() //.addGrid(nbModel.modelType, Array("multinomial", "bernoulli")) //伯努利模型需要特征为01的数据 .addGrid(nbModel.smoothing, Array(0.01, 0.1, 0.2, 0.5)) .build() //建立evaluator,必须要保证验证的标签列是向量化后的标签 val evaluator = new BinaryClassificationEvaluator() .setLabelCol("indexedLabel") //建立一个交叉验证的评估器,设置评估器的参数 val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) //运行交叉验证评估器,得到最佳参数集的模型 val cvModel = cv.fit(data) //获取最优逻辑回归模型 val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel] val bestNBModel = bestModel.stages(0).asInstanceOf[NaiveBayesModel] println("类的数量(标签可以使用的值): " + bestNBModel.numClasses) println("模型所接受的特征的数量: " + bestNBModel.numFeatures) println("最优的modelType的值为: "+ bestNBModel.explainParam(bestNBModel.modelType)) println("最优的smoothing的值为: "+ bestNBModel.explainParam(bestNBModel.smoothing)) //更新最优朴素贝叶斯模型,并训练数据 val nbBestModel = new NaiveBayes() .setModelType(bestNBModel.getModelType) //多项式模型或者伯努利模型 .setSmoothing(bestNBModel.getSmoothing) //平滑系数 .setLabelCol("indexedLabel") .setFeaturesCol("features") .fit(data) this.saveModel(nbBestModel, params) data.unpersist() nbBestModel } 复制代码
后续的三个算法原理网上都有很多,训练的代码也类似,本文只给出模型调参的部分代码。
2.2逻辑回归
//LR模型 val lrModel = new LogisticRegression() .setMaxIter(bestLRModel.getMaxIter) //模型最大迭代次数 .setRegParam(bestLRModel.getRegParam) //正则化参数 .setElasticNetParam(params.elasticNetParam) //L1范式比例, L1/(L1 + L2) .setTol(params.converTol) //模型收敛阈值 .setLabelCol("indexedLabel") //设置索引化标签字段 .setFeaturesCol("features") //设置向量化文本特征字段 //建立网格搜索 val paramGrid = new ParamGridBuilder() .addGrid(lrModel.maxIter, Array(5, 10)) .addGrid(lrModel.regParam, Array(0.1, 0.2)) .build() 复制代码
2.3决策树
//决策树模型 val dtModel = new DecisionTreeClassifier() .setMinInfoGain(params.minInfoGain) //最小信息增益阈值 .setMaxDepth(params.maxDepth) //决策树最大深度 .setImpurity(params.impurity) //节点不纯度和信息增益方法gini, entropy .setLabelCol("indexedLabel") //设置索引化标签字段 .setFeaturesCol("features") //设置向量化文本特征字段 //建立网格搜索 val paramGrid = new ParamGridBuilder() .addGrid(dtModel.minInfoGain, Array(0.0, 0.1)) .addGrid(dtModel.maxDepth, Array(10, 20)) .addGrid(dtModel.impurity, Array("gini", "entropy")) .build() 复制代码
2.4随机森林
随机森林模型常常需要调试以提高算法效果的两个参数:numTrees,maxDepth
- numTrees:增加决策树的个数会降低预测结果的方差,这样在测试时会有更高的accuracy。训练时间大致与numTrees呈线性增长关系
- maxDepth:限定决策树的最大可能深度。最终的决策树的深度可能要比maxDepth小
- minInfoGain:最小信息增益(设置阈值),但由于其它终止条件或者是被剪枝的缘故小于该值将不带继续分叉
- maxBins:连续特征离散化时选用的最大分桶个数,并且决定每个节点如何分裂。(25,28,31)
- impurity:计算信息增益的指标,熵和gini不纯度("entropy", "gini")
- minInstancesPerNode:如果某个节点的样本数量小于该值,则该节点将不再被分叉。(设置阈值)
- auto:在每个节点分裂时是否自动选择参与的特征个数
- seed:随机数生成种子
实际上要想获得一个适当的阈值是相当困难的。高阈值可能导致过分简化的树,而低阈值可能简化不够。
预剪枝方法 minInfoGain、minInstancesPerNode 实际上是通过不断修改停止条件来得到合理的结果,这并不是一个好办法,事实上 我们常常甚至不知道要寻找什么样的结果。这样就需要对树进行后剪枝了(后剪枝不需要用户指定参数,是更为理想化的剪枝方法)
//随机森林模型(不加fit) val rfModel = new RandomForestClassifier() .setMaxDepth(params.maxDepth) //决策树最大深度 .setNumTrees(params.numTrees) //设置决策树个数 .setMinInfoGain(params.minInfoGain) //最小信息增益阈值 .setImpurity(params.impurity) //信息增益的指标,选择熵或者gini不纯度 //.setMaxBins(params.maxBins) //最大分桶个数,用于连续特征离散化时决定每个节点如何分裂 .setLabelCol("indexedLabel") //设置索引化标签字段 .setFeaturesCol("features") //设置向量化文本特征字段 //建立网格搜索 val paramGrid = new ParamGridBuilder() .addGrid(rfModel.maxDepth, Array(5, 10, 20)) .addGrid(rfModel.numTrees, Array(5, 10, 20)) .addGrid(rfModel.minInfoGain, Array(0.0, 0.1, 0.5)) .build() 复制代码
3.多分类模型预测和模型评估
3.1模型评估类MulticlassClassificationEvaluator
机器学期一般都需要一个量化指标来衡量其效果:这个模型的准确率、召回率和F1值(这3个指标是评判模型预测能力常用的一组指标),spark提供了用于多分类模型评估的类MulticlassClassificationEvaluator,并将3个指标同时输出
object Evaluations extends Serializable { /** * 多分类结果评估 * @param data 分类结果 * @return (准确率, 召回率, F1) */ def multiClassEvaluate(data: RDD[(Double, Double)]): (Double, Double, Double) = { val metrics = new MulticlassMetrics(data) val weightedPrecision = metrics.weightedPrecision val weightedRecall = metrics.weightedRecall val f1 = metrics.weightedFMeasure (weightedPrecision, weightedRecall, f1) } } 复制代码
3.2四个多分类模型预测结果和模型评估
以逻辑回归为例,预测结果如下图,"probability"中4个值表示4个类别的预测概率:
4个分类模型的评估结果如下:
评估模型代码:
/** * Description: 多分类模型预测结果评估对比 * Created by wy in 2019/4/16 10:07 */ object MultiClassEvalution { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) Logger.getLogger("org").setLevel(Level.ERROR) val spark = SparkSession .builder .master("local") .appName("Multi_Class_Evaluation_Demo") .getOrCreate() val filePath = "data/dataTest/predict" //预处理(清洗、分词、向量化) val preprocessor = new Preprocessor val (predictDF, indexModel, _) = preprocessor.predict(filePath, spark) predictDF.select("content","removed", "features").show(1, truncate = false) //朴素贝叶斯模型预测 val nbClassifier = new NBClassifier val nbPredictions = nbClassifier.predict(predictDF, indexModel) //逻辑回归模型预测 val lrClassifier = new LRClassifier //import Classification.LogisticRegression.LRClassifier val lrPredictions = lrClassifier.predict(predictDF, indexModel) //决策树模型预测 val dtClassifier = new DTClassifier val dtPredictions = dtClassifier.predict(predictDF, indexModel) //随机森林模型预测 val rfClassifier = new RFClassifier val rfPredictions = rfClassifier.predict(predictDF, indexModel) //多个模型评估 val predictions = Seq(nbPredictions, lrPredictions, dtPredictions, rfPredictions) val classNames = Seq("朴素贝叶斯模型", "逻辑回归模型", "决策树模型", "随机森林模型") for (i <- 0 to 3) { val prediction = predictions(i) val className = classNames(i) val resultRDD = prediction.select("prediction", "indexedLabel").rdd.map { case Row(prediction: Double, label: Double) => (prediction, label) } val (precision, recall, f1) = Evaluations.multiClassEvaluate(resultRDD) println(s"\n========= $className 评估结果 ==========") println(s"加权准确率:$precision") println(s"加权召回率:$recall") println(s"F1值:$f1") } } } 复制代码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 自己动手实现神经网络分词模型
- 联合汉语分词和依存句法分析的统一模型:当前效果最佳
- 【NLP】用于语音识别、分词的隐马尔科夫模型HMM
- 基于海量公司分词ES中文分词插件
- 北大开源全新中文分词工具包:准确率远超THULAC、结巴分词
- 复旦大学提出中文分词新方法,Transformer连有歧义的分词也能学
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
算法:C语言实现
塞奇威克 / 霍红卫 / 机械工业出版社 / 2009-10 / 79.00元
《算法:C语言实现(第1-4部分)基础知识、数据结构、排序及搜索(原书第3版)》细腻讲解计算机算法的C语言实现。全书分为四部分,共16章。包括基本算法分析原理,基本数据结构、抽象数据结构、递归和树等数据结构知识,选择排序、插入排序、冒泡排序、希尔排序、快速排序方法、归并和归并排序方法、优先队列与堆排序方法、基数排序方法以及特殊用途的排序方法,并比较了各种排序方法的性能特征,在进一步讲解符号表、树等......一起来看看 《算法:C语言实现》 这本书的介绍吧!