使用Spark进行数据统计并将结果转存至MSSQL

栏目: 数据库 · SQL Server · 发布时间: 6年前

内容简介:在本文假设你已经安装、配置好了HDFS、Hive和Spark,在Hive中创建了数据仓库Eshop,在其下创建了OrderInfo表,基于Retailer和Year进行了分区,并填充了以下数据(注意Retailer和Year是虚拟列):实际上,这篇文章的orderinfo表是基于上一篇

使用Spark读取Hive中的数据 中,我们演示了如何使用 python 编写脚本,提交到spark,读取并输出了Hive中的数据。在实际应用中,在读取完数据后,通常需要使用pyspark中的API来对数据进行统计或运算,并将结果保存起来。本节将演示这一过程。

环境准备

Hive建表并填充测试数据

本文假设你已经安装、配置好了HDFS、Hive和Spark,在Hive中创建了数据仓库Eshop,在其下创建了OrderInfo表,基于Retailer和Year进行了分区,并填充了以下数据(注意Retailer和Year是虚拟列):

OrderId Customer OrderAmount OrderDate Retailer Year
1 Jimmy 5200 2017-10-01 00:00:00 Apple 2017
2 Jack 3180 2017-11-01 00:00:00 Apple 2017
3 Jimmy 2010 2017-12-01 00:00:00 XiaoMi 2017
4 Alice 980 2018-10-01 00:00:00 XiaoMi 2018
5 Eva 1080 2018-10-20 00:00:00 XiaoMi 2018
6 Alice 680 2018-11-01 00:00:00 XiaoMi 2018
7 Alice 920 2018-12-01 00:00:00 Apple 2018

实际上,这篇文章的orderinfo表是基于上一篇 Hive中分区和分桶的概念和操作 进行构建的,因此建议先阅读一下。

安装MSSQL的JDBC驱动程序

在本文中,需要将运算的结果转存至MS Sql Server数据库,而要通过 java 连接MSSQL,需要在服务器上安装jdbc驱动。首先下载驱动,地址是: 下载 Microsoft SQL Server JDBC 驱动程序

按下图选择sqljdbc_7.0.0.0_chs.tar.gz压缩包,然后点击“Next”下载:

使用Spark进行数据统计并将结果转存至MSSQL
下载MSSQL的JDBC驱动

解压缩之后,将根目录下的mssql-jdbc-7.0.0.jre8.jar文件,拷贝到Spark服务器上的$SPARK_HOME/jars文件夹下。

如果是搭建了一个Spark集群,那么务必将该文件拷贝至集群内所有节点的 $SPARK_HOME/jars 文件夹下。

从Windows拷贝文件到 Linux 有很多种方法,可以通过FTP上传,也可以通过pscp直接从Windows上拷贝至Linux,参见: 免密码从windows复制文件到linux

MSSql建表StatOrderInfo

假设要统计的是每年每个经销商的订单总数(OrderCount)、销售总额(TotalAmount)、用户数(CustomerCount),那么可以这样建表:

USE StatEShop
GO
CREATE TABLE [dbo].[Stat_OrderInfo](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [Year] [int] NOT NULL,
    [Retailer] [varchar](50) NOT NULL,
    [OrderCount] [int] NOT NULL,
    [CustomerCount] [int] NOT NULL,
    [TotalAmount] [money] NOT NULL
    CONSTRAINT [PK_stat_orderinfo] PRIMARY KEY CLUSTERED 
(
    [Id] ASC
)
) ON [PRIMARY]

需要注意订单总数和用户总数的区别:用户总数是去除重复后的下单数,即同一个用户下了10个订单,订单数为10,但是用户数为1。

编写python脚本

在向Spark提交任务作业时,可以采用三种语言的脚本,Scala、Java和Python,因为Python相对而言比较轻量(脚本语言),比较好学,因此我选择了使用Python。大多数情况下,使用哪种语言并没有区别,但在Spark SQL中,Python不支持DataSet,仅支持DataFrame,而Java和Scala则两种类型都支持。DataSet相对DataFrame的优势就是取行数据时是强类型的,而在其他方面DataSet和DataFrame的API都是相似的。

下面是本次任务的python脚本,位于D:\python\dataclean\eshop\stat_orderinfo.py:

from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

spark = SparkSession.builder.master("spark://node0:7077")\
    .appName("eshop.year.retailer")\
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse")\
    .config("hive.metastore.uris", "thrift://192.168.1.56:9083")\
    .enableHiveSupport()\
    .getOrCreate()

hiveCtx = HiveContext(spark)
df = hiveCtx.sql("select * from eshop.orderinfo")

df2 = df.groupBy("year", "retailer").agg(
    F.count("*").alias("OrderCount"),
    F.sum("OrderAmount").alias("TotalAmount"),
    F.countDistinct("Customer").alias("CustomerCount")
)

options = {
    "url": "jdbc:sqlserver://192.168.1.103:1433;databaseName=StatEShop",
    "user":"sa",
    "password":"your db password",
    "driver":"com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

df2.write.format("jdbc").options(dbtable="Stat_OrderInfo", **options)\
    .mode("append")\
    .save()

本例中的数据统计逻辑很简单,如果要学习spark都可以执行哪些运算,请参考官方的文档: pyspark.sql module 。这个文档需要花大量时间认真学习一下,才能对Spark的操作有更深入的了解。

上面的代码有几下几点还需要注意一下:

  • 这里我是运行在Spark集群上,其中的master节点是node0,因此是这样创建spark对象的: spark = SparkSession.builder.master("spark://node0:7077") 。如果是本地运行,则将 spark://node0:7077 替换为 local
  • Hive的metasotre服务需要先运行,也就是要已经执行过: hive --service metastore 。具体参见: 使用Spark读取Hive中的数据
  • F.sum("OrderAmount").alias("TotalAmount") 语句用于改名,否则,聚合函数执行完毕后,列名为 sum(OrderAmount)。

先在Windows上执行下面的命令,将stat_orderinfo.py拷贝至Linux的/root/python/eshop目录:

# pscp -i D:\linux\keys\awwork.ppk D:\python\dataclean\eshop\stat_orderinfo.py root@192.168.1.56:/root/python/eshop

然后在配置好Spark的服务器上执行:

# $SPARK_HOME/bin/spark-submit /root/python/eshop/stat_orderinfo.py

执行过程中如果一切正常将不会有任何输出,此时,如果访问 http://node0:8080,可以看到spark作业正在执行:

node0是Spark集群的主节点,地址是一个局域网地址:192.168.1.56。

使用Spark进行数据统计并将结果转存至MSSQL
http://node0:8080 作业概览

点击 application ID,会进入到作业的执行明细中,注意此时浏览器地址变为了 http://node0:4040。

使用Spark进行数据统计并将结果转存至MSSQL
http://node0:4040 作业明细

4040端口号只有在作业执行阶段可以访问,而因为我们的数据量很少,运算逻辑也极为简单,因此这个作业通常10几秒就执行完成了。当作业执行完成后,这个页面也就无法访问了。

打开SQL Server管理器,可以看到下面的结果:

Select * from stat_orderinfo;
Id    Year     Retailer       OrderCount  CustomerCount TotalAmount
----- -------- -------------- ----------  ------------- ------------
1     2017     Apple          2           2             8380.00
2     2018     XiaoMi         3           2             2740.00
3     2017     XiaoMi         1           1             2010.00
4     2018     Apple          1           1             920.00

至此,已经成功完成了Spark数据统计并转存到MSSQL Server的作业任务。

感谢阅读,希望这篇文章能给你带来帮助!


以上所述就是小编给大家介绍的《使用Spark进行数据统计并将结果转存至MSSQL》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Open Data Structures

Open Data Structures

Pat Morin / AU Press / 2013-6 / USD 29.66

Offered as an introduction to the field of data structures and algorithms, Open Data Structures covers the implementation and analysis of data structures for sequences (lists), queues, priority queues......一起来看看 《Open Data Structures》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具