Python技术栈与Spark交叉数据分析双向整合技术实战--大数据ML样本集案例实战

栏目: Python · 发布时间: 6年前

内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

  • Python Spark DataFrame 基础

    df = spark.read.parquet('/sql/users.parquet')
     df.show()
     
     +------+--------------+----------------+
     |  name|favorite_color|favorite_numbers|
     +------+--------------+----------------+
     |Alyssa|          null|  [3, 9, 15, 20]|
     |   Ben|           red|              []|
     +------+--------------+----------------+
    复制代码
  • Python Spark DataFrame 聚合统计

    CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100)
      0001,Male,19,15,39
      0002,Male,21,15,81
      0003,Female,20,16,6
      0004,Female,23,16,77
      0005,Female,31,17,40
      0006,Female,22,17,76
      
      df = spark.read.csv('/sql/customers.csv',header=True)
      df.printSchema()
      df.show()
      
      root
       |-- CustomerID: string (nullable = true)
       |-- Genre: string (nullable = true)
       |-- Age: string (nullable = true)
       |-- Annual Income (k$): string (nullable = true)
       |-- Spending Score (1-100): string (nullable = true)
      
      +----------+------+---+------------------+----------------------+
      |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
      +----------+------+---+------------------+----------------------+
      |      0001|  Male| 19|                15|                    39|
      |      0002|  Male| 21|                15|                    81|
      |      0003|Female| 20|                16|                     6|
      |      0004|Female| 23|                16|                    77|
      |      0005|Female| 31|                17|                    40|
      |      0006|Female| 22|                17|                    76|
      |      0007|Female| 35|                18|                     6|
      |      0008|Female| 23|                18|                    94|
      |      0009|  Male| 64|                19|                     3|
      |      0010|Female| 30|                19|                    72|
      |      0011|  Male| 67|                19|                    14|
      |      0012|Female| 35|                19|                    99|
      |      0013|Female| 58|                20|                    15|
      |      0014|Female| 24|                20|                    77|
      |      0015|  Male| 37|                20|                    13|
      |      0016|  Male| 22|                20|                    79|
      |      0017|Female| 35|                21|                    35|
      |      0018|  Male| 20|                21|                    66|
      |      0019|  Male| 52|                23|                    29|
      |      0020|Female| 35|                23|                    98|
      +----------+------+---+------------------+----------------------+
      
      df.agg({"Age": "max","Annual Income (k$)":"mean","Spending Score (1-100)":"mean"}).show()
      
      +---------------------------+-----------------------+--------+
      |avg(Spending Score (1-100))|avg(Annual Income (k$))|max(Age)|
      +---------------------------+-----------------------+--------+
      |                       50.2|                  60.56|      70|
      +---------------------------+-----------------------+--------+
    复制代码
  • alias(alias)为DataFrame定义一个别名,稍后再函数中就可以利用这个别名来做相关的运 算,例如说自关联Join:

    df1 = df.alias('cus1')
      type(df1)
      df2 = df.alias('cus2')
      df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')
      df3.count()
      
      200
      
      +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
      |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
      +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
      |      0001|  Male| 19|                15|                    39|      0001|  Male| 19|                15|                    39|
      |      0002|  Male| 21|                15|                    81|      0002|  Male| 21|                15|                    81|
      |      0003|Female| 20|                16|                     6|      0003|Female| 20|                16|                     6|
      |      0004|Female| 23|                16|                    77|      0004|Female| 23|                16|                    77|
      |      0005|Female| 31|                17|                    40|      0005|Female| 31|                17|                    40|
      |      0006|Female| 22|                17|                    76|      0006|Female| 22|                17|                    76|
      |      0007|Female| 35|                18|                     6|      0007|Female| 35|                18|                     6|
      |      0008|Female| 23|                18|                    94|      0008|Female| 23|                18|                    94|
      |      0009|  Male| 64|                19|                     3|      0009|  Male| 64|                19|                     3|
      |      0010|Female| 30|                19|                    72|      0010|Female| 30|                19|                    72|
      +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
      only showing top 10 rows
    复制代码
  • cache(),将DataFrame缓存到StorageLevel对应的缓存级别中,默认是 MEMORY_AND_DISK

    df = spark.read.csv('/sql/customers.csv',header=True)
      a = df.cache()
      a.show()
      
      +----------+------+---+------------------+----------------------+
      |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
      +----------+------+---+------------------+----------------------+
      |      0001|  Male| 19|                15|                    39|
      |      0002|  Male| 21|                15|                    81|
      |      0003|Female| 20|                16|                     6|
      |      0004|Female| 23|                16|                    77|
      |      0005|Female| 31|                17|                    40|
      |      0006|Female| 22|                17|                    76|
      |      0007|Female| 35|                18|                     6|
      |      0008|Female| 23|                18|                    94|
      |      0009|  Male| 64|                19|                     3|
      |      0010|Female| 30|                19|                    72|
      |      0011|  Male| 67|                19|                    14|
      |      0012|Female| 35|                19|                    99|
    复制代码
  • checkpoint(eager=True) 对DataFrame设置断点,这个方法是Spark2.1引入的方法,这个方法的调用会斩断在这个 DataFrame上的逻辑执行计划,将前后的依赖关系持久化到checkpoint文件中去。

    sc
      sc.setCheckpointDir('/datas/checkpoint')
      a.checkpoint()
      a.show()
    复制代码
  • coalesce(numPartitions) 重分区算法,传入的参数是DataFrame的分区数量。

    注意通过read方法读取文件,创建的DataFrame默认的分区数为文件的个数,即一个文件对
      应一个分区,在分区数少于coalesce指定的分区数的时候,调用coalesce是不起作用的
      
      df = spark.read.csv('/sql/customers.csv',header=True)
      df.rdd.getNumPartitions()
      1
      
      spark.read.csv('/sql/customers.csv',header=True).coalesce(3).rdd.getNumPartitions()
      1
      
      df = spark.range(0,20,2,3)
      df.rdd.getNumPartitions()
      df.coalesce(2).rdd.getNumPartitions()
      2
    复制代码
  • repartition(numPartitions, *cols)这个方法和coalesce(numPartitions) 方法一样,都是 对DataFrame进行重新的分区,但是repartition这个方法会使用hash算法,在整个集群中进 行shuffle,效率较低。repartition方法不仅可以指定分区数,还可以指定按照哪些列来做分 区。

    df = spark.read.csv('/sql/customers.csv',header=True)
      df.rdd.getNumPartitions()
      1
      
      df2 = df.repartition(3)
      df2.rdd.getNumPartitions()
      3
      
      df2.columns
      df3 = df2.repartition(6,'Genre')
      df3.show(20)
      
      +----------+------+---+------------------+----------------------+
      |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
      +----------+------+---+------------------+----------------------+
      |      0003|Female| 20|                16|                     6|
      |      0004|Female| 23|                16|                    77|
      |      0005|Female| 31|                17|                    40|
      |      0006|Female| 22|                17|                    76|
      |      0007|Female| 35|                18|                     6|
      |      0008|Female| 23|                18|                    94|
      |      0010|Female| 30|                19|                    72|
      |      0012|Female| 35|                19|                    99|
      |      0013|Female| 58|                20|                    15|
      
      df3.rdd.getNumPartitions()
      6
    复制代码
  • colRegex(colName)用正则表达式的方式返回我们想要的列。

    df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])
      df.select(df.colRegex("`(Col1)?+.+`")).show()
      +---+
      |  a|
      +---+
      |  1|
      |  2|
      |  3|
      +---+
    复制代码
  • collect(),返回DataFrame中的所有数据,注意数据量大了容易造成Driver节点内存溢 出!

    df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])
      df.collect()
      [Row(Col1='a', a=1), Row(Col1='b', a=2), Row(Col1='c', a=3)]
    复制代码
  • columns,以列表的形式返回DataFrame的所有列名

    df = spark.read.csv('/sql/customers.csv',header=True)
      df.columns
      
      df = spark.read.csv('/sql/customers.csv',header=True)
      df.columns
      ['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']
    复制代码
  • SparkSQL DataFrame 转换为 PandasDataFrame

    df = spark.read.csv('/sql/customers.csv',header=True)
      pdf = df.toPandas()
      pdf.info()
      
      <class 'pandas.core.frame.DataFrame'>
      RangeIndex: 200 entries, 0 to 199
      Data columns (total 5 columns):
      CustomerID                200 non-null object
      Genre                     200 non-null object
      Age                       200 non-null object
      Annual Income (k$)        200 non-null object
      Spending Score (1-100)    200 non-null object
      dtypes: object(5)
      memory usage: 7.9+ KB
      
      pdf['Age'] = pdf['Age'].astype('int')
      pdf["Annual Income (k$)"]=pdf["Annual Income (k$)"].astype('int')
      pdf["Spending Score (1-100)"]=pdf["Spending Score (1-100)"].astype('int')
      pdf.info()
      
      <class 'pandas.core.frame.DataFrame'>
      RangeIndex: 200 entries, 0 to 199
      Data columns (total 5 columns):
      CustomerID                200 non-null object
      Genre                     200 non-null object
      Age                       200 non-null int64
      Annual Income (k$)        200 non-null int64
      Spending Score (1-100)    200 non-null int64
      dtypes: int64(3), object(2)
      memory usage: 7.9+ KB
    复制代码
  • PandasDataFrame 转换为 SparkSQL DataFrame

    df1 = spark.createDataFrame(pdf)
      df1.corr("Age","Annual Income (k$)")
      df1.corr("Spending Score (1-100)","Annual Income (k$)")
      
      0.009902848094037492
    复制代码
  • count()返回DataFrame中Row的数量

    df = spark.read.csv('/sql/customers.csv',header=True)
      df.count()
      
      200
    复制代码
  • createGlobalTempView(name)使用DataFrame创建一个全局的临时表,其生命周期 和启动的app的周期一致,即启动的spark应用存在则这个临时的表就一直能访问。直到 sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库 中


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

破茧成蝶:用户体验设计师的成长之路

破茧成蝶:用户体验设计师的成长之路

刘津、李月 / 人民邮电出版社 / 2014-7 / 69.00

市面上已经有很多专业的用户体验书籍,但解决用户体验设计师在职场中遇到的众多现实问题的图书并不多见。本书从用户体验设计师的角度出发,系统地介绍了其职业生涯中的学习方法、思维方式、工作流程等,覆盖了用户体验设计基础知识、设计师的角色和职业困惑、工作流程、需求分析、设计规划和设计标准、项目跟进和成果检验、设计师职业修养以及需要具备的意识等,力图帮助设计师解决在项目中遇到的一些常见问题,找到自己的职业成长......一起来看看 《破茧成蝶:用户体验设计师的成长之路》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

URL 编码/解码
URL 编码/解码

URL 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具