python调用接口数据写入hive

栏目: 服务器 · 发布时间: 6年前

该方法使用场景为:在hadoop集群进行接口调用,并且获取接口返回值进行解析,解析完成数据写入hive表

其中存在的问题:测试环境和线上环境的一致性,还有接口调用不能一次性并发太高,自己把握这个量

分模块说一下每个模块需要做的:

1、调用get接口:

请求本地接口进行测试,参数需要encode

# -*- coding: utf-8 -*-

import urllib

import urllib2



# get接口调用

get_url ="http://localhost:7000/api/test"



get_params = {"startDate":20180729,"endDate":20180729}



get_params_urlencode = urllib.urlencode(get_params)



get_params_url = get_url+'?'+get_params_urlencode



get_requst = urllib2.Request(url=get_params_url)



get_res_data = urllib2.urlopen(get_requst)



get_res = get_res_data.read()



print get_res;

2、调用post接口:

这个操作复杂一点,加了从文件读取参数,参数是作为list入参

每次读取20个id,循环调用post接口,获取json结果进行解析

# -*- coding: utf-8 -*-



import json

import urllib2

import time



file_dir = "/user/tmp/ids"

post_url = "http://localhost:7000/api/users"



# 读取文本文件获取id列表数据

file = open(file_dir)

data_list = file.readlines()

# 获取id循环次数,除数-1,结果加+1

n = (data_list.__len__() - 1) / 20 + 1

# 初始化list下标

j = 1

while j <= n:

    id_lists = data_list[(j - 1) * 20:j * 20]

    ids = []

    for id in id_lists:

        ids.append(id.strip())

    j += 1

    print "Start : %s" % time.ctime()

    # 拼接参数值

    params = {"staticDate": 20180926, "ids": ids}

    # 参数json格式化

    json_params = json.dumps(params)

    # 组装request请求

    post_requst = urllib2.Request(url=post_url, data=json_params)



    post_requst.add_header('Content-Type', 'application/json')



    post_requst.add_header('Accept', 'application/json')

    # 发起http请求,并获取返回结果

    res = urllib2.urlopen(post_requst)



    # 把http返回的json数据转为 python 对象,

    result = json.loads(res.read())



    result1 = result["data"]



    print result1

    for id in ids:

        print 'id:' + id + '-> name:' + str(result1[id]['name'])

    # 休息20s再继续处理

    time.sleep(20)

    print "End : %s" % time.ctime()



file.close()


3、把获取到的解析结果写入文件:

一般来说写入文件之前需要校验一下文件是否存在,不存在的话新建一个空文件,写个方法校验一下:

# 判断输出文件存不存在,不存在新建,存在删除后新建

def get_file(filename):

    # print os.path.abspath('.')

    if not os.path.isfile(filename):

        print 'file not exists! create file :'+filename

        f = open(filename,'w')

        f.close()

        return 1

    else:

        print 'file exists! starting delete history file !'

        os.remove(filename)

        f = open(filename,'w')

        f.close()

        print 'file created file !'+filename

        return 1

后面写入文件的时候有两种模式,w(w+) 写文件(读写)并且会每次写的时候覆盖 ,a(a+) 写文件(读写)每次写只会在文件后面追加。

下面这个就是往文件追加内容,最好追加之前调用一次get_file()进行文件清空,代码没有贴全,自己可以补充一下

with open(filename, 'a') as f:

    for id in ids: 

        f.write(str(id).strip()+"\t"+str(result1[id]['name']).strip()+"\n")

        f.flush()

f.close()

4、python连接hive-server

可以参考: https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-PythonClient

定一个连接hive的方法,用账号秘密进行连接,自己定义 sql 语句:

# 初始化hive联接

def get_hive_conn():

    conn = pyhs2.connect(host='10.10.10.10',

                         port=9090,

                         authMechanism="PLAIN",

                         user='user',

                         password='password',

                         database='db_name')

    return conn

# 执行hive操作语句

def get_result(load_file, dt):

    conn = get_hive_conn()

    hive_sql = "select * from tablename"

    print hive_sql

    with conn.cursor() as cur:

        # Show databases

        # print cur.getDatabases()

        # Execute query

        cur.execute(hive_sql)

        # result = cur.fetchall()

        # print result

        # # Return column info from query

        print cur.getSchema()

        #

        # # Fetch table results

        for i in cur.fetch():

            print i

        cur.close();

    conn.close()

5、 shell 组装模块进行调度

因为还需要操作hdfs文件,shell脚本在我目前的环境是比较方便的,所以需要组装一下python各个模块

  • a、调用接口模块 GetData.py
  • b、写入文本文件的模块 WriteFile.py(调用接口的模块和写入文件可以写一起)
  • c、连接hive的模块 ToHive.py

shell接受 python 参数:

result=`python test.py` 

获取test.py的print语句返回的值

python接受shell参数:

python mta_hive.py ${a}  ${b}

需要在python脚本内引用sys包,python脚本获取参数a = sys.argv[1]   b= sys.argv[2]

shell脚本的大概代码:

#激活python环境

source ./bin/activate

#参数

input="/user/tmp/inputfile.txt"

output="/user/tmp/output.txt"

filename=basename ${output}

hdfs_path="hdfs:///user/temp"

#执行python脚本调用接口,解析返回结果写入文件

result=`python WriteFile.py "${input}" "${output}"`

#把文件put到hdfs上

$HADOOP_HOME/bin/hadoop fs -put -f ${result} ${hdfs_path}

hdfsfile=${hdfs_path}${filename}

#执行hive操作,把文件load到hive表

python ToHive.py ${hdfs_file}

一般来说写入load到hive表到语句是:

load data local inpath '/home/temp/test.txt' overwrite into table db_name.tablename PARTITION (dt=20180926); 

但是如果是读取hdfs文件,需要把local去掉,再附上建表语句:

CREATE EXTERNAL TABLE IF NOT EXISTS dbname.tablename (

id bigint comment 'id',

name string comment '名称'

)COMMENT'测试'

PARTITIONED BY(dt string)

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

STORED AS TEXTFILE

LOCATION '/user/temp/test'

;

写的比较乱,有时间再梳理一下。

python调用接口数据写入hive

本文由brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。

转载、引用前需联系作者,并署名作者且注明文章出处。

本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

从规范出发的程序设计

从规范出发的程序设计

[美] Carroll Morgan / 裘宗燕 / 机械工业出版社 / 2002-8 / 45.00元

本书详细论述了有关规范程序设计的内容,包括:程序和精化、谓词演算、选择、迭代、构造类型、模块和封装等,最后几章还包含了大量的实例研究和一些更高级的程序设计技术。本书提倡一种严格的程序开发方法,分析问题要用严格方式写出程序的规范,而后通过一系列具有严格理论基础的推导,最终得到可以运行的程序。 本书是被世界上许多重要大学采用的教材,适于计算机及相关专业的本科生和研究生使用。一起来看看 《从规范出发的程序设计》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换