使用Spark进行数据统计并将结果转存至MSSQL

栏目: 数据库 · SQL Server · 发布时间: 6年前

内容简介:在本文假设你已经安装、配置好了HDFS、Hive和Spark,在Hive中创建了数据仓库Eshop,在其下创建了OrderInfo表,基于Retailer和Year进行了分区,并填充了以下数据(注意Retailer和Year是虚拟列):实际上,这篇文章的orderinfo表是基于上一篇

使用Spark读取Hive中的数据 中,我们演示了如何使用 python 编写脚本,提交到spark,读取并输出了Hive中的数据。在实际应用中,在读取完数据后,通常需要使用pyspark中的API来对数据进行统计或运算,并将结果保存起来。本节将演示这一过程。

环境准备

Hive建表并填充测试数据

本文假设你已经安装、配置好了HDFS、Hive和Spark,在Hive中创建了数据仓库Eshop,在其下创建了OrderInfo表,基于Retailer和Year进行了分区,并填充了以下数据(注意Retailer和Year是虚拟列):

OrderId Customer OrderAmount OrderDate Retailer Year
1 Jimmy 5200 2017-10-01 00:00:00 Apple 2017
2 Jack 3180 2017-11-01 00:00:00 Apple 2017
3 Jimmy 2010 2017-12-01 00:00:00 XiaoMi 2017
4 Alice 980 2018-10-01 00:00:00 XiaoMi 2018
5 Eva 1080 2018-10-20 00:00:00 XiaoMi 2018
6 Alice 680 2018-11-01 00:00:00 XiaoMi 2018
7 Alice 920 2018-12-01 00:00:00 Apple 2018

实际上,这篇文章的orderinfo表是基于上一篇 Hive中分区和分桶的概念和操作 进行构建的,因此建议先阅读一下。

安装MSSQL的JDBC驱动程序

在本文中,需要将运算的结果转存至MS Sql Server数据库,而要通过 java 连接MSSQL,需要在服务器上安装jdbc驱动。首先下载驱动,地址是: 下载 Microsoft SQL Server JDBC 驱动程序

按下图选择sqljdbc_7.0.0.0_chs.tar.gz压缩包,然后点击“Next”下载:

使用Spark进行数据统计并将结果转存至MSSQL
下载MSSQL的JDBC驱动

解压缩之后,将根目录下的mssql-jdbc-7.0.0.jre8.jar文件,拷贝到Spark服务器上的$SPARK_HOME/jars文件夹下。

如果是搭建了一个Spark集群,那么务必将该文件拷贝至集群内所有节点的 $SPARK_HOME/jars 文件夹下。

从Windows拷贝文件到 Linux 有很多种方法,可以通过FTP上传,也可以通过pscp直接从Windows上拷贝至Linux,参见: 免密码从windows复制文件到linux

MSSql建表StatOrderInfo

假设要统计的是每年每个经销商的订单总数(OrderCount)、销售总额(TotalAmount)、用户数(CustomerCount),那么可以这样建表:

USE StatEShop
GO
CREATE TABLE [dbo].[Stat_OrderInfo](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [Year] [int] NOT NULL,
    [Retailer] [varchar](50) NOT NULL,
    [OrderCount] [int] NOT NULL,
    [CustomerCount] [int] NOT NULL,
    [TotalAmount] [money] NOT NULL
    CONSTRAINT [PK_stat_orderinfo] PRIMARY KEY CLUSTERED 
(
    [Id] ASC
)
) ON [PRIMARY]

需要注意订单总数和用户总数的区别:用户总数是去除重复后的下单数,即同一个用户下了10个订单,订单数为10,但是用户数为1。

编写python脚本

在向Spark提交任务作业时,可以采用三种语言的脚本,Scala、Java和Python,因为Python相对而言比较轻量(脚本语言),比较好学,因此我选择了使用Python。大多数情况下,使用哪种语言并没有区别,但在Spark SQL中,Python不支持DataSet,仅支持DataFrame,而Java和Scala则两种类型都支持。DataSet相对DataFrame的优势就是取行数据时是强类型的,而在其他方面DataSet和DataFrame的API都是相似的。

下面是本次任务的python脚本,位于D:\python\dataclean\eshop\stat_orderinfo.py:

from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

spark = SparkSession.builder.master("spark://node0:7077")\
    .appName("eshop.year.retailer")\
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse")\
    .config("hive.metastore.uris", "thrift://192.168.1.56:9083")\
    .enableHiveSupport()\
    .getOrCreate()

hiveCtx = HiveContext(spark)
df = hiveCtx.sql("select * from eshop.orderinfo")

df2 = df.groupBy("year", "retailer").agg(
    F.count("*").alias("OrderCount"),
    F.sum("OrderAmount").alias("TotalAmount"),
    F.countDistinct("Customer").alias("CustomerCount")
)

options = {
    "url": "jdbc:sqlserver://192.168.1.103:1433;databaseName=StatEShop",
    "user":"sa",
    "password":"your db password",
    "driver":"com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

df2.write.format("jdbc").options(dbtable="Stat_OrderInfo", **options)\
    .mode("append")\
    .save()

本例中的数据统计逻辑很简单,如果要学习spark都可以执行哪些运算,请参考官方的文档: pyspark.sql module 。这个文档需要花大量时间认真学习一下,才能对Spark的操作有更深入的了解。

上面的代码有几下几点还需要注意一下:

  • 这里我是运行在Spark集群上,其中的master节点是node0,因此是这样创建spark对象的: spark = SparkSession.builder.master("spark://node0:7077") 。如果是本地运行,则将 spark://node0:7077 替换为 local
  • Hive的metasotre服务需要先运行,也就是要已经执行过: hive --service metastore 。具体参见: 使用Spark读取Hive中的数据
  • F.sum("OrderAmount").alias("TotalAmount") 语句用于改名,否则,聚合函数执行完毕后,列名为 sum(OrderAmount)。

先在Windows上执行下面的命令,将stat_orderinfo.py拷贝至Linux的/root/python/eshop目录:

# pscp -i D:\linux\keys\awwork.ppk D:\python\dataclean\eshop\stat_orderinfo.py root@192.168.1.56:/root/python/eshop

然后在配置好Spark的服务器上执行:

# $SPARK_HOME/bin/spark-submit /root/python/eshop/stat_orderinfo.py

执行过程中如果一切正常将不会有任何输出,此时,如果访问 http://node0:8080,可以看到spark作业正在执行:

node0是Spark集群的主节点,地址是一个局域网地址:192.168.1.56。

使用Spark进行数据统计并将结果转存至MSSQL
http://node0:8080 作业概览

点击 application ID,会进入到作业的执行明细中,注意此时浏览器地址变为了 http://node0:4040。

使用Spark进行数据统计并将结果转存至MSSQL
http://node0:4040 作业明细

4040端口号只有在作业执行阶段可以访问,而因为我们的数据量很少,运算逻辑也极为简单,因此这个作业通常10几秒就执行完成了。当作业执行完成后,这个页面也就无法访问了。

打开SQL Server管理器,可以看到下面的结果:

Select * from stat_orderinfo;
Id    Year     Retailer       OrderCount  CustomerCount TotalAmount
----- -------- -------------- ----------  ------------- ------------
1     2017     Apple          2           2             8380.00
2     2018     XiaoMi         3           2             2740.00
3     2017     XiaoMi         1           1             2010.00
4     2018     Apple          1           1             920.00

至此,已经成功完成了Spark数据统计并转存到MSSQL Server的作业任务。

感谢阅读,希望这篇文章能给你带来帮助!


以上所述就是小编给大家介绍的《使用Spark进行数据统计并将结果转存至MSSQL》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

算法设计、分析与实现

算法设计、分析与实现

徐子珊 / 2012-10 / 65.00元

《算法设计、分析与实现:c、c++和java》由徐子珊编著,第1章~第6章按算法设计技巧分成渐增型算法、分治算法、动态规划算法、贪婪算法、回溯算法和图的搜索算法。每章针对一些经典问题给出解决问题的算法,并分析算法的时间复杂度。这样对于初学者来说,按照算法的设计方法划分,算法思想的阐述比较集中,有利于快速入门理解算法的精髓所在。一旦具备了算法设计的基本方法,按应用领域划分专题深入学习,读者可以结合已......一起来看看 《算法设计、分析与实现》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器