内容简介:某公司旗下有多家门店,每个月会分批次把盘点数据向总公司汇报,汇报的数据量可能是亿级,并且每次汇报可能包含了上次汇报的修正数据,所以需要先根据主键去重,以最新的数据为准。以上图为准,
背景
某公司旗下有多家门店,每个月会分批次把盘点数据向总公司汇报,汇报的数据量可能是亿级,并且每次汇报可能包含了上次汇报的修正数据,所以需要先根据主键去重,以最新的数据为准。
以上图为准, SKUID+CustomerId+StoreID+SalesDate 红色的标题列为联合主键,再以绿色的列 DataArrivalTime 去重,取最新值。
最终需要生成所有门店的销售额报表,按照总销售额降序排列:
| StoreName | StoreId | TotalSales |
| 门店1 | 24 | 984756 |
| 门店2 | 56 | 982352 |
实践
我们选择的环境是Azure上的HDInsight Spark 集群,配置如下:
通过Jupyter
新建PySpark3的Notebooks
导入基本类型
from pyspark.sql import SparkSession from pyspark.sql import * from pyspark.sql.types import * from pyspark.sql.window import Window from pyspark.sql.functions import rank, col from pyspark.sql import functions as func from pyspark.sql.functions import month,year,concat,format_string,desc
读取Azure Storage 中的csv文件
在jupyer中我们只读取一个小文件,用来测试
inputCsvDir="wasb:///directory/data/001.csv" df = spark.read.csv(inputCsvDir, header=True, inferSchema=True) df.show()
+-----+----------+-------+-------------------+----------+--------------+----------------+ |SKUID|CustomerID|StoreID| SalesDate|SalesValue|InventoryValue| DataArrivalTime| +-----+----------+-------+-------------------+----------+--------------+----------------+ | 32| 41| 55|2018-07-14 00:00:00| 924| 452|2018-07-19 09:42| | 31| 14| 38|2018-08-10 00:00:00| 4113| 4022|2018-08-12 15:55| | 27| 30| 12|2018-07-14 00:00:00| 1616| 8654|2018-07-23 00:04| | 26| 47| 24|2018-07-28 00:00:00| 957| 148|2018-08-11 16:32| | 48| 4| 81|2018-07-25 00:00:00| 3715| 2230|2018-07-30 22:01| | 9| 11| 40|2018-07-13 00:00:00| 2623| 6709|2018-07-25 07:01| | 45| 24| 28|2018-08-03 00:00:00| 3230| 7976|2018-08-06 05:50| | 23| 41| 83|2018-07-14 00:00:00| 807| 1629|2018-07-15 06:12| | 4| 22| 90|2018-07-21 00:00:00| 87| 9741|2018-08-12 12:59| | 31| 2| 18|2018-07-30 00:00:00| 4710| 3177|2018-08-02 04:40| | 10| 16| 9|2018-07-17 00:00:00| 2798| 799|2018-07-30 00:29| | 6| 4| 21|2018-07-19 00:00:00| 1666| 7970|2018-07-31 12:01| | 11| 38| 95|2018-07-20 00:00:00| 2960| 3131|2018-08-05 17:55| | 45| 15| 34|2018-07-21 00:00:00| 3637| 2487|2018-07-24 17:46| | 41| 28| 4|2018-07-24 00:00:00| 1702| 1725|2018-07-25 21:27| | 37| 3| 17|2018-07-18 00:00:00| 1884| 3904|2018-07-20 03:10| | 4| 25| 79|2018-07-23 00:00:00| 82| 7270|2018-07-25 10:42| | 26| 45| 48|2018-07-25 00:00:00| 165| 7630|2018-08-01 23:35| | 11| 7| 92|2018-07-18 00:00:00| 210| 9605|2018-07-23 05:17| | 17| 4| 34|2018-07-18 00:00:00| 3321| 6317|2018-07-28 07:13| +-----+----------+-------+-------------------+----------+--------------+----------------+ only showing top 20 rows
筛选所有8月份的数据
df = df.filter(df['SalesDate'] >= '2018-08-01') df.show()
+-----+----------+-------+-------------------+----------+--------------+----------------+ |SKUID|CustomerID|StoreID| SalesDate|SalesValue|InventoryValue| DataArrivalTime| +-----+----------+-------+-------------------+----------+--------------+----------------+ | 31| 14| 38|2018-08-10 00:00:00| 4113| 4022|2018-08-12 15:55| | 45| 24| 28|2018-08-03 00:00:00| 3230| 7976|2018-08-06 05:50| | 26| 22| 24|2018-08-09 00:00:00| 4186| 3988|2018-08-09 21:04| | 12| 16| 87|2018-08-06 00:00:00| 67| 3990|2018-08-10 10:00| | 47| 15| 49|2018-08-06 00:00:00| 2464| 6898|2018-08-07 04:44| | 12| 13| 26|2018-08-01 00:00:00| 254| 9929|2018-08-04 06:51| | 29| 7| 44|2018-08-03 00:00:00| 3133| 471|2018-08-07 04:42| | 28| 29| 27|2018-08-02 00:00:00| 4307| 6875|2018-08-04 05:17| | 39| 11| 71|2018-08-08 00:00:00| 2200| 5354|2018-08-12 19:53| | 39| 10| 14|2018-08-02 00:00:00| 4972| 5929|2018-08-03 11:22| | 21| 31| 99|2018-08-01 00:00:00| 3632| 795|2018-08-03 14:22| | 42| 8| 29|2018-08-04 00:00:00| 4389| 5532|2018-08-06 00:00| | 43| 18| 88|2018-08-04 00:00:00| 184| 26|2018-08-10 23:55| | 16| 27| 51|2018-08-03 00:00:00| 4754| 9556|2018-08-08 15:58| | 24| 2| 9|2018-08-04 00:00:00| 2456| 2521|2018-08-13 08:14| | 2| 3| 56|2018-08-06 00:00:00| 4868| 7167|2018-08-06 14:59| | 25| 32| 58|2018-08-01 00:00:00| 478| 4014|2018-08-02 14:19| | 15| 3| 91|2018-08-05 00:00:00| 676| 2703|2018-08-08 13:09| | 42| 7| 30|2018-08-02 00:00:00| 4742| 3251|2018-08-02 11:02| | 18| 1| 10|2018-08-01 00:00:00| 4388| 4730|2018-08-01 05:17| +-----+----------+-------+-------------------+----------+--------------+----------------+ only showing top 20 rows
使用window模式进行分组去重
window = Window.partitionBy(df['SKUID'],
df['CustomerID'],
df['StoreID'],
df['SalesDate']).orderBy(df['DataArrivalTime'].desc())
df = df.select('*', rank().over(window).alias('rank')).filter(col('rank') == 1)
df.show()
+-----+----------+-------+-------------------+----------+--------------+----------------+----+ |SKUID|CustomerID|StoreID| SalesDate|SalesValue|InventoryValue| DataArrivalTime|rank| +-----+----------+-------+-------------------+----------+--------------+----------------+----+ | 12| 12| 63|2018-08-05 00:00:00| 2650| 4982|2018-08-06 03:43| 1| | 15| 43| 76|2018-08-01 00:00:00| 4738| 3235|2018-08-10 20:08| 1| | 19| 47| 88|2018-08-08 00:00:00| 3832| 3907|2018-08-12 04:04| 1| | 22| 33| 62|2018-08-01 00:00:00| 1189| 5813|2018-08-06 09:41| 1| | 29| 12| 53|2018-08-04 00:00:00| 1898| 9908|2018-08-10 06:22| 1| | 37| 18| 47|2018-08-05 00:00:00| 3780| 2871|2018-08-05 17:04| 1| | 39| 16| 42|2018-08-08 00:00:00| 4292| 2447|2018-08-09 06:25| 1| | 45| 24| 28|2018-08-03 00:00:00| 3230| 7976|2018-08-06 05:50| 1| | 3| 15| 52|2018-08-07 00:00:00| 4591| 4182|2018-08-10 17:55| 1| | 10| 13| 83|2018-08-04 00:00:00| 2391| 3459|2018-08-04 16:50| 1| | 10| 13| 95|2018-08-03 00:00:00| 823| 4463|2018-08-12 19:43| 1| | 21| 8| 25|2018-08-07 00:00:00| 4242| 4829|2018-08-12 22:30| 1| | 44| 42| 83|2018-08-01 00:00:00| 2878| 8283|2018-08-11 05:30| 1| | 1| 17| 34|2018-08-08 00:00:00| 3511| 5744|2018-08-11 05:35| 1| | 9| 11| 8|2018-08-04 00:00:00| 577| 1151|2018-08-11 16:13| 1| | 14| 2| 24|2018-08-04 00:00:00| 3917| 8689|2018-08-11 12:57| 1| | 6| 32| 71|2018-08-02 00:00:00| 3871| 4069|2018-08-06 22:59| 1| | 39| 32| 23|2018-08-02 00:00:00| 1026| 7535|2018-08-05 01:20| 1| | 2| 49| 92|2018-08-01 00:00:00| 1588| 8685|2018-08-06 00:00| 1| | 11| 30| 69|2018-08-08 00:00:00| 455| 9936|2018-08-11 01:33| 1| +-----+----------+-------+-------------------+----------+--------------+----------------+----+ only showing top 20 rows
根据门店分组,并汇总销售额
monthlyByStore = df.groupby('StoreID') \
.agg(func.sum("SalesValue").alias('TotalSales'))
monthlyByStore = monthlyByStore.orderBy(monthlyByStore['TotalSales'].desc())
monthlyByStore.show()
+-------+----------+ |StoreID|TotalSales| +-------+----------+ | 24| 38385| | 52| 34762| | 93| 33797| | 76| 32639| | 74| 31924| | 9| 31417| | 87| 31315| | 45| 31020| | 44| 31019| | 35| 30973| | 1| 30017| | 20| 29777| | 83| 28589| | 95| 28324| | 22| 27589| | 38| 27538| | 2| 27318| | 21| 27191| | 56| 26657| | 62| 26612| +-------+----------+ only showing top 20 rows
读取门店维度表,Join门店名称
dfStoreData = spark.read.csv("wasb:///cat-big-data-0.2billion/storageData.csv", header=True, inferSchema=True)
monthlyByStoreWithName = monthlyByStore.join(dfStoreData,dfStoreData['StoreID']==monthlyByStore['StoreId'],'left')
monthlyByStoreWithName = monthlyByStoreWithName.select('StoreName',dfStoreData['StoreId'],'TotalSales')
monthlyByStoreWithName.show()
+---------+-------+----------+ |StoreName|StoreId|TotalSales| +---------+-------+----------+ | 24号门店| 24| 38385| | 52号门店| 52| 34762| | 93号门店| 93| 33797| | 76号门店| 76| 32639| | 74号门店| 74| 31924| | 9号门店| 9| 31417| | 87号门店| 87| 31315| | 45号门店| 45| 31020| | 44号门店| 44| 31019| | 35号门店| 35| 30973| | 1号门店| 1| 30017| | 20号门店| 20| 29777| | 83号门店| 83| 28589| | 95号门店| 95| 28324| | 22号门店| 22| 27589| | 38号门店| 38| 27538| | 2号门店| 2| 27318| | 21号门店| 21| 27191| | 56号门店| 56| 26657| | 62号门店| 62| 26612| +---------+-------+----------+ only showing top 20 rows
保存结果至Hive表
monthlyByStoreWithName.write.mode("overwrite").saveAsTable("hv_monthly_report")
通过Hive View查询
hive viewer 查询结果
通过Livy Job
将上述的脚本片段保存成 Python 脚本 sales-report.py 文件,并上传至Azure存储账户,然后通过livy 的REST 提交 Spark submit Job
编写脚本文件
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
from pyspark.sql import functions as func
from pyspark.sql.functions import month,year,concat,format_string,desc
#
# init Spark session
#
spark = SparkSession \
.builder \
.appName("Build Monthly Sales Data Report") \
.enableHiveSupport() \
.getOrCreate()
# read csv input
inputCsvDir="wasb:///directory/data/*.csv"
df = spark.read.csv(inputCsvDir, header=True, inferSchema=True)
# filter latest month
df = df.filter(df['SalesDate'] >= '2018-08-01')
# remove duplicate rows
window = Window.partitionBy(df['SKUID'],
df['CustomerID'],
df['StoreID'],
df['SalesDate']).orderBy(df['DataArrivalTime'].desc())
df = df.select('*', rank().over(window).alias('rank')).filter(col('rank') == 1)
# group by store
monthlyByStore = df.groupby('StoreID') \
.agg(func.sum("SalesValue").alias('TotalSales'))
monthlyByStore = monthlyByStore.orderBy(monthlyByStore['TotalSales'].desc())
# join wth store name#
dfStoreData = spark.read.csv("wasb:///directory/storeData.csv", header=True, inferSchema=True)
monthlyByStoreWithName = monthlyByStore.join(dfStoreData,dfStoreData['StoreID']==monthlyByStore['StoreId'],'left')
monthlyByStoreWithName = monthlyByStoreWithName.select('StoreName',dfStoreData['StoreId'],'TotalSales')
# save result to hive table
monthlyByStoreWithName.write.mode("overwrite").saveAsTable("hv_monthly_report")
提交 Livy Job
curl -k --user 'admin:password' -v -H 'Content-Type: application/json' -X POST
-d '{ "file": "wasb:///directory/sales-report.py", "args": [""]}' "https://xxx.azurehdinsight.net/livy/batches"
-H "X-Requested-By: admin"
Power BI 中连接HD insight
备注:如何在Power BI中生成报表不是本篇重点,这里主要是演示其可行性
× 用微信扫描并分享
以上所述就是小编给大家介绍的《基于Azure HDInsight Spark 集群统计销售报表》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ReportLibrary 报表模板库新增 21 张报表模板,加入报表导出功能!
- ReportLibrary 报表模板库新增 21 张报表模板,加入报表导出功能!
- JimuReport 积木报表 1.3.3 版本发布,可视化报表工具
- JimuReport 积木报表 1.3.4 版本发布,可视化报表工具
- JimuReport 积木报表 1.3.1 beta 版本发布,免费的 Web 报表工具
- JimuReport 积木报表 1.1.05 版本发布,免费的企业级 Web 报表工具
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Remote
Jason Fried、David Heinemeier Hansson / Crown Business / 2013-10-29 / CAD 26.95
The “work from home” phenomenon is thoroughly explored in this illuminating new book from bestselling 37signals founders Fried and Hansson, who point to the surging trend of employees working from hom......一起来看看 《Remote》 这本书的介绍吧!