[python] 使用 Spark與 Hive進行 ETL

栏目: 服务器 · 发布时间: 7年前

内容简介:[python] 使用 Spark與 Hive進行 ETL

之前實作 ETL系統是透過 Python + MongoDB/MySQL完成,對於少量的資料綽綽有餘,但如果想處理大量資料,又想要借用 Spark MLlib機器學習套件的話,那麼就使用 PySpark + Hive來達成任務吧。能使用熟悉的 PythonSQL 語法,無痛轉移。

Spark與 Hive環境設定

這裡透過 Google Cloud Dataproc 架設,Cluster跑起來之後全部套件都有囉,包含 Hadoop 2.7.3,Spark 2.0.2與 Hive 2.1.1 ,而為了要使用 PySpark,記得加上環境變數。

$ exportSPARK_HOME=/usr/lib/spark
$ exportPYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$PYTHONPATH

使用 PySpark來進行 ETL

[python] 使用 Spark與 Hive進行 ETL

ETL 的流程就如上圖三步驟。我們想要抓取的資料是行政院農委會提供的 農村地方美食小吃特色料理 ,內容是農村特色料理的所在位置與店家資訊,反正內容是什麼不重要,重要的是希望程式能直接透過 HTTP Request 抓下來,經過轉換後丟進 Hive資料庫。

由於 Spark 2.0版本加入了 Spark session,使用上變得簡潔,不過想要與 Hive溝通,記得要加上 enableHiveSupport() 才能存取資料喔。另外,Dataproc上的 Python 版本為 2.7,處理中文會比較麻煩,如果要換成 Python3,記得 Cluster環境下只改 Master是不行的,必須要在裝環境的時候,將 所有機器上的 PySpark預設改掉。詳細作法可以參考 這篇 。這裡仍然使用 Python2.7,詳細的程式碼與說明如下:

# -*- coding: utf-8 -*-
frompyspark.sqlimportSparkSession
frompyspark.sqlimportHiveContext
importrequests
importjson
 
class SparkHiveExample:
 
    def__init__(self):
        ## initialize spark session
        self.spark = SparkSession.builder.appName("Spark Hive example").enableHiveSupport().getOrCreate()
 
    defrun(self):
        ## download with opendata API
        url = "http://data.coa.gov.tw/Service/OpenData/ODwsv/ODwsvTravelFood.aspx?"
        data = requests.get(url)
 
        ## convert from JSON to dataframe
        df = self.spark.createDataFrame(data.json())
 
        ## display schema
        df.printSchema()
 
        ## creates a temporary view using the DataFrame
        df.createOrReplaceTempView("travelfood")
 
        ## save into Hive
        self.spark.sql("DROP TABLE IF EXISTS travelfood_hive")
        df.write.saveAsTable("travelfood_hive")
 
        ## use SQL
        sqlDF = self.spark.sql("SELECT * FROM travelfood_hive WHERE City == '屏東縣'")
        sqlDF.select("Name", "City", "Town", "Coordinate").show()
 
if __name__ == "__main__":
    EXAMPLE = SparkHiveExample()
    EXAMPLE.run()

從 Requests取得的 JSON格式資料,使用 createDataFrame 就會變成 Spark的 DataFrame格式,也因為這個資料集的 Key只有一個層級,轉換不用特別處理,你能透過 printSchema 看見所有的欄位名稱。

使用 createOrReplaceTempView 會產生一個暫時的 View,就能透過 SQL語法存取資料了,但注意資料還沒有送進 Hive喔,只是方便你處理而已,必須用 DataFrame的函式 saveAsTable才算完成。剩下的 SQL操作,看看就懂了吧!附上執行結果:

[python] 使用 Spark與 Hive進行 ETL

Youtube上的教學影片

如果使用的語言是 Scala,請參考這部影片的教學 (Spark 1.6)

參考資料


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Programming Collective Intelligence

Programming Collective Intelligence

Toby Segaran / O'Reilly Media / 2007-8-26 / USD 39.99

Want to tap the power behind search rankings, product recommendations, social bookmarking, and online matchmaking? This fascinating book demonstrates how you can build Web 2.0 applications to mine the......一起来看看 《Programming Collective Intelligence》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

随机密码生成器
随机密码生成器

多种字符组合密码