Python技术栈与Spark交叉数据分析双向整合技术实战--大数据ML样本集案例实战

栏目: Python · 发布时间: 5年前

内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

  • Python Spark DataFrame 基础

    df = spark.read.parquet('/sql/users.parquet')
     df.show()
     
     +------+--------------+----------------+
     |  name|favorite_color|favorite_numbers|
     +------+--------------+----------------+
     |Alyssa|          null|  [3, 9, 15, 20]|
     |   Ben|           red|              []|
     +------+--------------+----------------+
    复制代码
  • Python Spark DataFrame 聚合统计

    CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100)
      0001,Male,19,15,39
      0002,Male,21,15,81
      0003,Female,20,16,6
      0004,Female,23,16,77
      0005,Female,31,17,40
      0006,Female,22,17,76
      
      df = spark.read.csv('/sql/customers.csv',header=True)
      df.printSchema()
      df.show()
      
      root
       |-- CustomerID: string (nullable = true)
       |-- Genre: string (nullable = true)
       |-- Age: string (nullable = true)
       |-- Annual Income (k$): string (nullable = true)
       |-- Spending Score (1-100): string (nullable = true)
      
      +----------+------+---+------------------+----------------------+
      |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
      +----------+------+---+------------------+----------------------+
      |      0001|  Male| 19|                15|                    39|
      |      0002|  Male| 21|                15|                    81|
      |      0003|Female| 20|                16|                     6|
      |      0004|Female| 23|                16|                    77|
      |      0005|Female| 31|                17|                    40|
      |      0006|Female| 22|                17|                    76|
      |      0007|Female| 35|                18|                     6|
      |      0008|Female| 23|                18|                    94|
      |      0009|  Male| 64|                19|                     3|
      |      0010|Female| 30|                19|                    72|
      |      0011|  Male| 67|                19|                    14|
      |      0012|Female| 35|                19|                    99|
      |      0013|Female| 58|                20|                    15|
      |      0014|Female| 24|                20|                    77|
      |      0015|  Male| 37|                20|                    13|
      |      0016|  Male| 22|                20|                    79|
      |      0017|Female| 35|                21|                    35|
      |      0018|  Male| 20|                21|                    66|
      |      0019|  Male| 52|                23|                    29|
      |      0020|Female| 35|                23|                    98|
      +----------+------+---+------------------+----------------------+
      
      df.agg({"Age": "max","Annual Income (k$)":"mean","Spending Score (1-100)":"mean"}).show()
      
      +---------------------------+-----------------------+--------+
      |avg(Spending Score (1-100))|avg(Annual Income (k$))|max(Age)|
      +---------------------------+-----------------------+--------+
      |                       50.2|                  60.56|      70|
      +---------------------------+-----------------------+--------+
    复制代码
  • alias(alias)为DataFrame定义一个别名,稍后再函数中就可以利用这个别名来做相关的运 算,例如说自关联Join:

    df1 = df.alias('cus1')
      type(df1)
      df2 = df.alias('cus2')
      df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')
      df3.count()
      
      200
      
      +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
      |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
      +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
      |      0001|  Male| 19|                15|                    39|      0001|  Male| 19|                15|                    39|
      |      0002|  Male| 21|                15|                    81|      0002|  Male| 21|                15|                    81|
      |      0003|Female| 20|                16|                     6|      0003|Female| 20|                16|                     6|
      |      0004|Female| 23|                16|                    77|      0004|Female| 23|                16|                    77|
      |      0005|Female| 31|                17|                    40|      0005|Female| 31|                17|                    40|
      |      0006|Female| 22|                17|                    76|      0006|Female| 22|                17|                    76|
      |      0007|Female| 35|                18|                     6|      0007|Female| 35|                18|                     6|
      |      0008|Female| 23|                18|                    94|      0008|Female| 23|                18|                    94|
      |      0009|  Male| 64|                19|                     3|      0009|  Male| 64|                19|                     3|
      |      0010|Female| 30|                19|                    72|      0010|Female| 30|                19|                    72|
      +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
      only showing top 10 rows
    复制代码
  • cache(),将DataFrame缓存到StorageLevel对应的缓存级别中,默认是 MEMORY_AND_DISK

    df = spark.read.csv('/sql/customers.csv',header=True)
      a = df.cache()
      a.show()
      
      +----------+------+---+------------------+----------------------+
      |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
      +----------+------+---+------------------+----------------------+
      |      0001|  Male| 19|                15|                    39|
      |      0002|  Male| 21|                15|                    81|
      |      0003|Female| 20|                16|                     6|
      |      0004|Female| 23|                16|                    77|
      |      0005|Female| 31|                17|                    40|
      |      0006|Female| 22|                17|                    76|
      |      0007|Female| 35|                18|                     6|
      |      0008|Female| 23|                18|                    94|
      |      0009|  Male| 64|                19|                     3|
      |      0010|Female| 30|                19|                    72|
      |      0011|  Male| 67|                19|                    14|
      |      0012|Female| 35|                19|                    99|
    复制代码
  • checkpoint(eager=True) 对DataFrame设置断点,这个方法是Spark2.1引入的方法,这个方法的调用会斩断在这个 DataFrame上的逻辑执行计划,将前后的依赖关系持久化到checkpoint文件中去。

    sc
      sc.setCheckpointDir('/datas/checkpoint')
      a.checkpoint()
      a.show()
    复制代码
  • coalesce(numPartitions) 重分区算法,传入的参数是DataFrame的分区数量。

    注意通过read方法读取文件,创建的DataFrame默认的分区数为文件的个数,即一个文件对
      应一个分区,在分区数少于coalesce指定的分区数的时候,调用coalesce是不起作用的
      
      df = spark.read.csv('/sql/customers.csv',header=True)
      df.rdd.getNumPartitions()
      1
      
      spark.read.csv('/sql/customers.csv',header=True).coalesce(3).rdd.getNumPartitions()
      1
      
      df = spark.range(0,20,2,3)
      df.rdd.getNumPartitions()
      df.coalesce(2).rdd.getNumPartitions()
      2
    复制代码
  • repartition(numPartitions, *cols)这个方法和coalesce(numPartitions) 方法一样,都是 对DataFrame进行重新的分区,但是repartition这个方法会使用hash算法,在整个集群中进 行shuffle,效率较低。repartition方法不仅可以指定分区数,还可以指定按照哪些列来做分 区。

    df = spark.read.csv('/sql/customers.csv',header=True)
      df.rdd.getNumPartitions()
      1
      
      df2 = df.repartition(3)
      df2.rdd.getNumPartitions()
      3
      
      df2.columns
      df3 = df2.repartition(6,'Genre')
      df3.show(20)
      
      +----------+------+---+------------------+----------------------+
      |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
      +----------+------+---+------------------+----------------------+
      |      0003|Female| 20|                16|                     6|
      |      0004|Female| 23|                16|                    77|
      |      0005|Female| 31|                17|                    40|
      |      0006|Female| 22|                17|                    76|
      |      0007|Female| 35|                18|                     6|
      |      0008|Female| 23|                18|                    94|
      |      0010|Female| 30|                19|                    72|
      |      0012|Female| 35|                19|                    99|
      |      0013|Female| 58|                20|                    15|
      
      df3.rdd.getNumPartitions()
      6
    复制代码
  • colRegex(colName)用正则表达式的方式返回我们想要的列。

    df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])
      df.select(df.colRegex("`(Col1)?+.+`")).show()
      +---+
      |  a|
      +---+
      |  1|
      |  2|
      |  3|
      +---+
    复制代码
  • collect(),返回DataFrame中的所有数据,注意数据量大了容易造成Driver节点内存溢 出!

    df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])
      df.collect()
      [Row(Col1='a', a=1), Row(Col1='b', a=2), Row(Col1='c', a=3)]
    复制代码
  • columns,以列表的形式返回DataFrame的所有列名

    df = spark.read.csv('/sql/customers.csv',header=True)
      df.columns
      
      df = spark.read.csv('/sql/customers.csv',header=True)
      df.columns
      ['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']
    复制代码
  • SparkSQL DataFrame 转换为 PandasDataFrame

    df = spark.read.csv('/sql/customers.csv',header=True)
      pdf = df.toPandas()
      pdf.info()
      
      <class 'pandas.core.frame.DataFrame'>
      RangeIndex: 200 entries, 0 to 199
      Data columns (total 5 columns):
      CustomerID                200 non-null object
      Genre                     200 non-null object
      Age                       200 non-null object
      Annual Income (k$)        200 non-null object
      Spending Score (1-100)    200 non-null object
      dtypes: object(5)
      memory usage: 7.9+ KB
      
      pdf['Age'] = pdf['Age'].astype('int')
      pdf["Annual Income (k$)"]=pdf["Annual Income (k$)"].astype('int')
      pdf["Spending Score (1-100)"]=pdf["Spending Score (1-100)"].astype('int')
      pdf.info()
      
      <class 'pandas.core.frame.DataFrame'>
      RangeIndex: 200 entries, 0 to 199
      Data columns (total 5 columns):
      CustomerID                200 non-null object
      Genre                     200 non-null object
      Age                       200 non-null int64
      Annual Income (k$)        200 non-null int64
      Spending Score (1-100)    200 non-null int64
      dtypes: int64(3), object(2)
      memory usage: 7.9+ KB
    复制代码
  • PandasDataFrame 转换为 SparkSQL DataFrame

    df1 = spark.createDataFrame(pdf)
      df1.corr("Age","Annual Income (k$)")
      df1.corr("Spending Score (1-100)","Annual Income (k$)")
      
      0.009902848094037492
    复制代码
  • count()返回DataFrame中Row的数量

    df = spark.read.csv('/sql/customers.csv',header=True)
      df.count()
      
      200
    复制代码
  • createGlobalTempView(name)使用DataFrame创建一个全局的临时表,其生命周期 和启动的app的周期一致,即启动的spark应用存在则这个临时的表就一直能访问。直到 sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库 中


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Python 3面向对象编程

Python 3面向对象编程

[加]Dusty Phillips(达斯帝•菲利普斯) / 肖鹏、常贺、石琳 / 电子工业出版社 / 2015-6 / 79.00元

Python 是一种面向对象的解释型语言,面向对象是其非常重要的特性。《Python 3面向对象编程》通过Python 的数据结构、语法、设计模式,从简单到复杂,从初级到高级,一步步通过例子来展示了Python 中面向对象的概念和原则。 《Python 3面向对象编程》不是Python 的入门书籍,适合具有Python 基础经验的开发人员阅读。如果你拥有其他面向对象语言的经验,你会更容易理解......一起来看看 《Python 3面向对象编程》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具