内容简介:这两天实践了一下sklearn、pyspark ml、pmml,下面记录一下。项目地址:train_df.show()打印出了经过RFormula特征提取后的DataFrame:
这两天实践了一下sklearn、pyspark ml、pmml,下面记录一下。
项目地址: https://github.com/owenliang/machine-learning
sklearn LR分类
# -*- coding: utf-8 -*- from sklearn.datasets import load_iris from sklearn.linear_model import LogisticRegression from sklearn.model_selection import train_test_split # GBDT算法 # from sklearn.ensemble import GradientBoostingClassifier # 从官网下载数据 iris = load_iris() # 随机拆分训练集与测试集 train_x, test_x, train_y, test_y = train_test_split(iris.data, iris.target, test_size = 0.2) # 逻辑回归分类算法 lr = LogisticRegression() # 训练模型 lr.fit(train_x, train_y) # 预测 predict_y = lr.predict(test_x) print(predict_y) # 模型得分 score = lr.score(test_x, test_y) print(score)
- 利用train_test_split切分了训练集合和验证集合
- fit训练了LR回归分类模型
- predict预测了验证集合
- score计算了预测模型的得分
spark LR分类
# -*- coding: utf-8 -*- from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession from sklearn.datasets import load_iris import pandas from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import VectorAssembler from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.feature import RFormula # 配置spark客户端 conf = SparkConf().setAppName("lr_spark") conf = conf.setMaster("local") sc = SparkContext(conf = conf) # 加载sklearn的训练数据 iris = load_iris() # 特征矩阵 features = pandas.DataFrame(iris.data, columns = iris.feature_names) # 目标矩阵 targets = pandas.DataFrame(iris.target, columns = ['Species']) # 合并矩阵 merged = pandas.concat([features, targets], axis = 1) # 创建SparkSession sess = SparkSession(sc) # 创建spark DataFrame raw_df = sess.createDataFrame(merged) # 提取特征与目标 fomula = RFormula(formula = 'Species ~ .') raw_df = fomula.fit(raw_df).transform(raw_df) # 拆分训练集和测试集 train_df, test_df = raw_df.randomSplit([0.8, 0.2]) # 创建LR分类器 lr = LogisticRegression() # 训练 train_df.show() model = lr.fit(train_df) # 预测test集合 predict_df = model.transform(test_df) # 对测试集做predict, 生成(预测分类, 正确分类) def build_predict_target(row): return (float(row.prediction), float(row.Species)) predict_and_target_rdd = predict_df.rdd.map(build_predict_target) # 统计模型效果 metrics = BinaryClassificationMetrics(predict_and_target_rdd) print(metrics.areaUnderPR) # 保存模型到磁盘 model.write().overwrite().save('./lr.model')
- 利用pandas将原始数组转换成DataFrame矩阵
- 利用concat将特征与目标矩阵按行连接
- 利用SparkSession将本地的pandas.DataFrame上传到spark集群中,变成了spark中的分布式DataFrame
- 利用RFormula将DataFrame中属于特征的列提取到features字段中,将目标提取到labal字段
- 将数据随机切分成8:2的训练集合与测试集合
- 利用spark的逻辑回归分类器进行分布式训练fit
- 然后调用transform完成对测试集的预测,得到一个预测结果的DataFrame
- 将预测结果DataFrame转换成rdd并运行map输出(预测分类,正确分类)
- 利用spark的一个Metrics库,统计模型得分
- 最后将训练好的LR模型保存到hdfs上的lr.model文件
train_df.show()打印出了经过RFormula特征提取后的DataFrame:
+-----------------+----------------+-----------------+----------------+-------+-----------------+-----+ |sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|Species| features|label| +-----------------+----------------+-----------------+----------------+-------+-----------------+-----+ | 4.4| 3.2| 1.3| 0.2| 0|[4.4,3.2,1.3,0.2]| 0.0| | 4.5| 2.3| 1.3| 0.3| 0|[4.5,2.3,1.3,0.3]| 0.0| | 4.6| 3.1| 1.5| 0.2| 0|[4.6,3.1,1.5,0.2]| 0.0| | 4.6| 3.2| 1.4| 0.2| 0|[4.6,3.2,1.4,0.2]| 0.0| | 4.6| 3.6| 1.0| 0.2| 0|[4.6,3.6,1.0,0.2]| 0.0| | 4.7| 3.2| 1.3| 0.2| 0|[4.7,3.2,1.3,0.2]| 0.0| | 4.7| 3.2| 1.6| 0.2| 0|[4.7,3.2,1.6,0.2]| 0.0| | 4.8| 3.0| 1.4| 0.1| 0|[4.8,3.0,1.4,0.1]| 0.0| | 4.8| 3.0| 1.4| 0.3| 0|[4.8,3.0,1.4,0.3]| 0.0| | 4.8| 3.1| 1.6| 0.2| 0|[4.8,3.1,1.6,0.2]| 0.0| | 4.8| 3.4| 1.6| 0.2| 0|[4.8,3.4,1.6,0.2]| 0.0| | 4.9| 2.4| 3.3| 1.0| 1|[4.9,2.4,3.3,1.0]| 1.0| | 4.9| 2.5| 4.5| 1.7| 2|[4.9,2.5,4.5,1.7]| 2.0| | 4.9| 3.0| 1.4| 0.2| 0|[4.9,3.0,1.4,0.2]| 0.0| | 5.0| 2.0| 3.5| 1.0| 1|[5.0,2.0,3.5,1.0]| 1.0| | 5.0| 2.3| 3.3| 1.0| 1|[5.0,2.3,3.3,1.0]| 1.0| | 5.0| 3.0| 1.6| 0.2| 0|[5.0,3.0,1.6,0.2]| 0.0| | 5.0| 3.2| 1.2| 0.2| 0|[5.0,3.2,1.2,0.2]| 0.0| | 5.0| 3.3| 1.4| 0.2| 0|[5.0,3.3,1.4,0.2]| 0.0| | 5.0| 3.4| 1.5| 0.2| 0|[5.0,3.4,1.5,0.2]| 0.0| +-----------------+----------------+-----------------+----------------+-------+-----------------+-----+ only showing top 20 rows
可见4个特征被提取到了features字段,目标被提取到了labal字段。
其实无论对pandas还是spark来说,DataFrame都类似于一张 MYSQL 中的数据表,理解这一点非常重要。
spark ml导出PMML
PMML是预测模型的通用描述语言,简单的说就是用XML语法保存模型的一种标准规范,按照这个格式保存模型,其他编程语言也可以加载模型完成预测。
# -*- coding: utf-8 -*- from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession from sklearn.datasets import load_iris import pandas from pyspark.ml.classification import LogisticRegression from pyspark.ml import Pipeline from pyspark2pmml import PMMLBuilder from pyspark.ml.feature import RFormula # 配置spark客户端 conf = SparkConf().setAppName("lr_spark").set("spark.jars", "./jpmml-sparkml-executable-1.4.5.jar") # 注意: 这里需要加载jpmml jar conf = conf.setMaster("local") sc = SparkContext(conf = conf) # 加载sklearn的训练数据 iris = load_iris() # 特征矩阵 features = pandas.DataFrame(iris.data, columns = iris.feature_names) # 目标矩阵 targets = pandas.DataFrame(iris.target, columns = ['Species']) # 合并矩阵 merged = pandas.concat([features, targets], axis = 1) # 创建SparkSession sess = SparkSession(sc) # 创建spark DataFrame raw_df = sess.createDataFrame(merged) # 特征提取 fomula = RFormula(formula = 'Species ~ .') # 创建LR分类器 lr = LogisticRegression() # 流水线: 先提取特征, 再训练模型 pipeline = Pipeline(stages = [fomula, lr]) pipeline_model = pipeline.fit(raw_df) # 导出PMML pmmlBuilder = PMMLBuilder(sc, raw_df, pipeline_model) pmmlBuilder.buildFile("lr.pmml")
PMML支持导出extract、transform、estimator的完整过程,即特征的提取、转换、模型都可以导出。
我们通过pipeline配置整个训练流程,这里我配置了先经过RFormula提取特征与目标,然后再交给LR模型训练。大家也可以再增加其他的流程,比如对特征做标准化,归一化等操作都可以。
经过pipeline.fit训练后得到了模型,这个模型里包含了提取、转换、训练的完整信息,可以交给PMML来导出到文件。
我们看一下PMML文件就明白了:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <PMML xmlns="http://www.dmg.org/PMML-4_3" xmlns:data="http://jpmml.org/jpmml-model/InlineTable" version="4.3"> <Header> <Application name="JPMML-SparkML" version="1.4.5"/> <Timestamp>2018-07-17T03:25:17Z</Timestamp> </Header> <DataDictionary> <DataField name="Species" optype="categorical" dataType="integer"> <Value value="0"/> <Value value="1"/> <Value value="2"/> </DataField> <DataField name="sepal length (cm)" optype="continuous" dataType="double"/> <DataField name="sepal width (cm)" optype="continuous" dataType="double"/> <DataField name="petal length (cm)" optype="continuous" dataType="double"/> <DataField name="petal width (cm)" optype="continuous" dataType="double"/> </DataDictionary> <RegressionModel functionName="classification" normalizationMethod="softmax"> <MiningSchema> <MiningField name="Species" usageType="target"/> <MiningField name="sepal length (cm)"/> <MiningField name="sepal width (cm)"/> <MiningField name="petal length (cm)"/> <MiningField name="petal width (cm)"/> </MiningSchema> <Output> <OutputField name="pmml(prediction)" optype="categorical" dataType="integer" feature="predictedValue"/> <OutputField name="prediction" optype="categorical" dataType="double" feature="transformedValue"> <MapValues outputColumn="data:output"> <FieldColumnPair field="pmml(prediction)" column="data:input"/> <InlineTable> <row> <data:input>0</data:input> <data:output>0</data:output> </row> <row> <data:input>1</data:input> <data:output>1</data:output> </row> <row> <data:input>2</data:input> <data:output>2</data:output> </row> </InlineTable> </MapValues> </OutputField> <OutputField name="probability(0)" optype="continuous" dataType="double" feature="probability" value="0"/> <OutputField name="probability(1)" optype="continuous" dataType="double" feature="probability" value="1"/> <OutputField name="probability(2)" optype="continuous" dataType="double" feature="probability" value="2"/> </Output> <RegressionTable intercept="2.0589260323395826" targetCategory="0"> <NumericPredictor name="sepal length (cm)" coefficient="-9.3215388958028"/> <NumericPredictor name="sepal width (cm)" coefficient="40.62625783146442"/> <NumericPredictor name="petal length (cm)" coefficient="-11.78994187010167"/> <NumericPredictor name="petal width (cm)" coefficient="-26.527030223116284"/> </RegressionTable> <RegressionTable intercept="20.289599797446442" targetCategory="1"> <NumericPredictor name="sepal length (cm)" coefficient="5.8933440030500135"/> <NumericPredictor name="sepal width (cm)" coefficient="-16.972682698035506"/> <NumericPredictor name="petal length (cm)" coefficient="1.1802982181289101"/> <NumericPredictor name="petal width (cm)" coefficient="4.120419403699187"/> </RegressionTable> <RegressionTable intercept="-22.348525829786027" targetCategory="2"> <NumericPredictor name="sepal length (cm)" coefficient="3.428194892752785"/> <NumericPredictor name="sepal width (cm)" coefficient="-23.653575133428916"/> <NumericPredictor name="petal length (cm)" coefficient="10.60964365197276"/> <NumericPredictor name="petal width (cm)" coefficient="22.406610819417097"/> </RegressionTable> </RegressionModel> </PMML>
- DataDictionary描述了有哪些特征,特征排列的顺序是什么,目标是什么。
- RegressionModel描述了回归分类算法的模型信息,可以看到每个分类的模型参数。
总结
关键记住2点:
- DataFrame等价于mysql table,每一列有名字关联,需要参与训练的特征被提取到features字段,即特征向量。
- PMML可以导出完整的提取、转换、预测模型,供其他语言加载使用(主要是JAVA)。
以上所述就是小编给大家介绍的《记录pyspark ml与pmml的用法》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。