基于Azure HDInsight Spark 集群统计销售报表

栏目: 编程工具 · 发布时间: 7年前

内容简介:某公司旗下有多家门店,每个月会分批次把盘点数据向总公司汇报,汇报的数据量可能是亿级,并且每次汇报可能包含了上次汇报的修正数据,所以需要先根据主键去重,以最新的数据为准。以上图为准,

背景

某公司旗下有多家门店,每个月会分批次把盘点数据向总公司汇报,汇报的数据量可能是亿级,并且每次汇报可能包含了上次汇报的修正数据,所以需要先根据主键去重,以最新的数据为准。

基于Azure HDInsight Spark 集群统计销售报表

以上图为准, SKUID+CustomerId+StoreID+SalesDate 红色的标题列为联合主键,再以绿色的列 DataArrivalTime 去重,取最新值。

最终需要生成所有门店的销售额报表,按照总销售额降序排列:

StoreName StoreId TotalSales
门店1 24 984756
门店2 56 982352

实践

我们选择的环境是Azure上的HDInsight Spark 集群,配置如下:

基于Azure HDInsight Spark 集群统计销售报表

通过Jupyter

新建PySpark3的Notebooks

导入基本类型

from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
from pyspark.sql import functions as func
from pyspark.sql.functions import month,year,concat,format_string,desc

读取Azure Storage 中的csv文件

在jupyer中我们只读取一个小文件,用来测试

inputCsvDir="wasb:///directory/data/001.csv"
df = spark.read.csv(inputCsvDir, header=True, inferSchema=True)
df.show()
+-----+----------+-------+-------------------+----------+--------------+----------------+
|SKUID|CustomerID|StoreID|          SalesDate|SalesValue|InventoryValue| DataArrivalTime|
+-----+----------+-------+-------------------+----------+--------------+----------------+
|   32|        41|     55|2018-07-14 00:00:00|       924|           452|2018-07-19 09:42|
|   31|        14|     38|2018-08-10 00:00:00|      4113|          4022|2018-08-12 15:55|
|   27|        30|     12|2018-07-14 00:00:00|      1616|          8654|2018-07-23 00:04|
|   26|        47|     24|2018-07-28 00:00:00|       957|           148|2018-08-11 16:32|
|   48|         4|     81|2018-07-25 00:00:00|      3715|          2230|2018-07-30 22:01|
|    9|        11|     40|2018-07-13 00:00:00|      2623|          6709|2018-07-25 07:01|
|   45|        24|     28|2018-08-03 00:00:00|      3230|          7976|2018-08-06 05:50|
|   23|        41|     83|2018-07-14 00:00:00|       807|          1629|2018-07-15 06:12|
|    4|        22|     90|2018-07-21 00:00:00|        87|          9741|2018-08-12 12:59|
|   31|         2|     18|2018-07-30 00:00:00|      4710|          3177|2018-08-02 04:40|
|   10|        16|      9|2018-07-17 00:00:00|      2798|           799|2018-07-30 00:29|
|    6|         4|     21|2018-07-19 00:00:00|      1666|          7970|2018-07-31 12:01|
|   11|        38|     95|2018-07-20 00:00:00|      2960|          3131|2018-08-05 17:55|
|   45|        15|     34|2018-07-21 00:00:00|      3637|          2487|2018-07-24 17:46|
|   41|        28|      4|2018-07-24 00:00:00|      1702|          1725|2018-07-25 21:27|
|   37|         3|     17|2018-07-18 00:00:00|      1884|          3904|2018-07-20 03:10|
|    4|        25|     79|2018-07-23 00:00:00|        82|          7270|2018-07-25 10:42|
|   26|        45|     48|2018-07-25 00:00:00|       165|          7630|2018-08-01 23:35|
|   11|         7|     92|2018-07-18 00:00:00|       210|          9605|2018-07-23 05:17|
|   17|         4|     34|2018-07-18 00:00:00|      3321|          6317|2018-07-28 07:13|
+-----+----------+-------+-------------------+----------+--------------+----------------+
only showing top 20 rows

筛选所有8月份的数据

df = df.filter(df['SalesDate'] >= '2018-08-01')
df.show()
+-----+----------+-------+-------------------+----------+--------------+----------------+
|SKUID|CustomerID|StoreID|          SalesDate|SalesValue|InventoryValue| DataArrivalTime|
+-----+----------+-------+-------------------+----------+--------------+----------------+
|   31|        14|     38|2018-08-10 00:00:00|      4113|          4022|2018-08-12 15:55|
|   45|        24|     28|2018-08-03 00:00:00|      3230|          7976|2018-08-06 05:50|
|   26|        22|     24|2018-08-09 00:00:00|      4186|          3988|2018-08-09 21:04|
|   12|        16|     87|2018-08-06 00:00:00|        67|          3990|2018-08-10 10:00|
|   47|        15|     49|2018-08-06 00:00:00|      2464|          6898|2018-08-07 04:44|
|   12|        13|     26|2018-08-01 00:00:00|       254|          9929|2018-08-04 06:51|
|   29|         7|     44|2018-08-03 00:00:00|      3133|           471|2018-08-07 04:42|
|   28|        29|     27|2018-08-02 00:00:00|      4307|          6875|2018-08-04 05:17|
|   39|        11|     71|2018-08-08 00:00:00|      2200|          5354|2018-08-12 19:53|
|   39|        10|     14|2018-08-02 00:00:00|      4972|          5929|2018-08-03 11:22|
|   21|        31|     99|2018-08-01 00:00:00|      3632|           795|2018-08-03 14:22|
|   42|         8|     29|2018-08-04 00:00:00|      4389|          5532|2018-08-06 00:00|
|   43|        18|     88|2018-08-04 00:00:00|       184|            26|2018-08-10 23:55|
|   16|        27|     51|2018-08-03 00:00:00|      4754|          9556|2018-08-08 15:58|
|   24|         2|      9|2018-08-04 00:00:00|      2456|          2521|2018-08-13 08:14|
|    2|         3|     56|2018-08-06 00:00:00|      4868|          7167|2018-08-06 14:59|
|   25|        32|     58|2018-08-01 00:00:00|       478|          4014|2018-08-02 14:19|
|   15|         3|     91|2018-08-05 00:00:00|       676|          2703|2018-08-08 13:09|
|   42|         7|     30|2018-08-02 00:00:00|      4742|          3251|2018-08-02 11:02|
|   18|         1|     10|2018-08-01 00:00:00|      4388|          4730|2018-08-01 05:17|
+-----+----------+-------+-------------------+----------+--------------+----------------+
only showing top 20 rows

使用window模式进行分组去重

window = Window.partitionBy(df['SKUID'],
                            df['CustomerID'],
                            df['StoreID'],
                            df['SalesDate']).orderBy(df['DataArrivalTime'].desc())
df = df.select('*', rank().over(window).alias('rank')).filter(col('rank') == 1)
df.show()
+-----+----------+-------+-------------------+----------+--------------+----------------+----+
|SKUID|CustomerID|StoreID|          SalesDate|SalesValue|InventoryValue| DataArrivalTime|rank|
+-----+----------+-------+-------------------+----------+--------------+----------------+----+
|   12|        12|     63|2018-08-05 00:00:00|      2650|          4982|2018-08-06 03:43|   1|
|   15|        43|     76|2018-08-01 00:00:00|      4738|          3235|2018-08-10 20:08|   1|
|   19|        47|     88|2018-08-08 00:00:00|      3832|          3907|2018-08-12 04:04|   1|
|   22|        33|     62|2018-08-01 00:00:00|      1189|          5813|2018-08-06 09:41|   1|
|   29|        12|     53|2018-08-04 00:00:00|      1898|          9908|2018-08-10 06:22|   1|
|   37|        18|     47|2018-08-05 00:00:00|      3780|          2871|2018-08-05 17:04|   1|
|   39|        16|     42|2018-08-08 00:00:00|      4292|          2447|2018-08-09 06:25|   1|
|   45|        24|     28|2018-08-03 00:00:00|      3230|          7976|2018-08-06 05:50|   1|
|    3|        15|     52|2018-08-07 00:00:00|      4591|          4182|2018-08-10 17:55|   1|
|   10|        13|     83|2018-08-04 00:00:00|      2391|          3459|2018-08-04 16:50|   1|
|   10|        13|     95|2018-08-03 00:00:00|       823|          4463|2018-08-12 19:43|   1|
|   21|         8|     25|2018-08-07 00:00:00|      4242|          4829|2018-08-12 22:30|   1|
|   44|        42|     83|2018-08-01 00:00:00|      2878|          8283|2018-08-11 05:30|   1|
|    1|        17|     34|2018-08-08 00:00:00|      3511|          5744|2018-08-11 05:35|   1|
|    9|        11|      8|2018-08-04 00:00:00|       577|          1151|2018-08-11 16:13|   1|
|   14|         2|     24|2018-08-04 00:00:00|      3917|          8689|2018-08-11 12:57|   1|
|    6|        32|     71|2018-08-02 00:00:00|      3871|          4069|2018-08-06 22:59|   1|
|   39|        32|     23|2018-08-02 00:00:00|      1026|          7535|2018-08-05 01:20|   1|
|    2|        49|     92|2018-08-01 00:00:00|      1588|          8685|2018-08-06 00:00|   1|
|   11|        30|     69|2018-08-08 00:00:00|       455|          9936|2018-08-11 01:33|   1|
+-----+----------+-------+-------------------+----------+--------------+----------------+----+
only showing top 20 rows

根据门店分组,并汇总销售额

monthlyByStore = df.groupby('StoreID') \
.agg(func.sum("SalesValue").alias('TotalSales'))
monthlyByStore  = monthlyByStore.orderBy(monthlyByStore['TotalSales'].desc())
monthlyByStore.show()
+-------+----------+
|StoreID|TotalSales|
+-------+----------+
|     24|     38385|
|     52|     34762|
|     93|     33797|
|     76|     32639|
|     74|     31924|
|      9|     31417|
|     87|     31315|
|     45|     31020|
|     44|     31019|
|     35|     30973|
|      1|     30017|
|     20|     29777|
|     83|     28589|
|     95|     28324|
|     22|     27589|
|     38|     27538|
|      2|     27318|
|     21|     27191|
|     56|     26657|
|     62|     26612|
+-------+----------+
only showing top 20 rows

读取门店维度表,Join门店名称

dfStoreData = spark.read.csv("wasb:///cat-big-data-0.2billion/storageData.csv", header=True, inferSchema=True)
monthlyByStoreWithName = monthlyByStore.join(dfStoreData,dfStoreData['StoreID']==monthlyByStore['StoreId'],'left')
monthlyByStoreWithName = monthlyByStoreWithName.select('StoreName',dfStoreData['StoreId'],'TotalSales')
monthlyByStoreWithName.show()
+---------+-------+----------+
|StoreName|StoreId|TotalSales|
+---------+-------+----------+
|    24号门店|     24|     38385|
|    52号门店|     52|     34762|
|    93号门店|     93|     33797|
|    76号门店|     76|     32639|
|    74号门店|     74|     31924|
|     9号门店|      9|     31417|
|    87号门店|     87|     31315|
|    45号门店|     45|     31020|
|    44号门店|     44|     31019|
|    35号门店|     35|     30973|
|     1号门店|      1|     30017|
|    20号门店|     20|     29777|
|    83号门店|     83|     28589|
|    95号门店|     95|     28324|
|    22号门店|     22|     27589|
|    38号门店|     38|     27538|
|     2号门店|      2|     27318|
|    21号门店|     21|     27191|
|    56号门店|     56|     26657|
|    62号门店|     62|     26612|
+---------+-------+----------+
only showing top 20 rows

保存结果至Hive表

monthlyByStoreWithName.write.mode("overwrite").saveAsTable("hv_monthly_report")

通过Hive View查询

基于Azure HDInsight Spark 集群统计销售报表

hive viewer 查询结果

通过Livy Job

将上述的脚本片段保存成 Python 脚本 sales-report.py 文件,并上传至Azure存储账户,然后通过livy 的REST 提交 Spark submit Job

编写脚本文件

from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
from pyspark.sql import functions as func
from pyspark.sql.functions import month,year,concat,format_string,desc

#
# init Spark session
#
spark = SparkSession \
    .builder \
    .appName("Build Monthly Sales Data Report") \
    .enableHiveSupport() \
    .getOrCreate()

# read csv input
inputCsvDir="wasb:///directory/data/*.csv"
df = spark.read.csv(inputCsvDir, header=True, inferSchema=True)

# filter latest month
df = df.filter(df['SalesDate'] >= '2018-08-01')

# remove duplicate rows
window = Window.partitionBy(df['SKUID'],
                            df['CustomerID'],
                            df['StoreID'],
                            df['SalesDate']).orderBy(df['DataArrivalTime'].desc())
df = df.select('*', rank().over(window).alias('rank')).filter(col('rank') == 1)

# group by store
monthlyByStore = df.groupby('StoreID') \
.agg(func.sum("SalesValue").alias('TotalSales'))
monthlyByStore  = monthlyByStore.orderBy(monthlyByStore['TotalSales'].desc())

# join wth store name#
dfStoreData = spark.read.csv("wasb:///directory/storeData.csv", header=True, inferSchema=True)
monthlyByStoreWithName = monthlyByStore.join(dfStoreData,dfStoreData['StoreID']==monthlyByStore['StoreId'],'left')
monthlyByStoreWithName = monthlyByStoreWithName.select('StoreName',dfStoreData['StoreId'],'TotalSales')

# save result to hive table
monthlyByStoreWithName.write.mode("overwrite").saveAsTable("hv_monthly_report")

提交 Livy Job

curl -k --user 'admin:password' -v -H 'Content-Type: application/json' -X POST 
-d '{ "file": "wasb:///directory/sales-report.py", "args": [""]}' "https://xxx.azurehdinsight.net/livy/batches" 
-H "X-Requested-By: admin"

Power BI 中连接HD insight

备注:如何在Power BI中生成报表不是本篇重点,这里主要是演示其可行性

基于Azure HDInsight Spark 集群统计销售报表 基于Azure HDInsight Spark 集群统计销售报表 基于Azure HDInsight Spark 集群统计销售报表

× 用微信扫描并分享


以上所述就是小编给大家介绍的《基于Azure HDInsight Spark 集群统计销售报表》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Remote

Remote

Jason Fried、David Heinemeier Hansson / Crown Business / 2013-10-29 / CAD 26.95

The “work from home” phenomenon is thoroughly explored in this illuminating new book from bestselling 37signals founders Fried and Hansson, who point to the surging trend of employees working from hom......一起来看看 《Remote》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换