该方法使用场景为:在hadoop集群进行接口调用,并且获取接口返回值进行解析,解析完成数据写入hive表
其中存在的问题:测试环境和线上环境的一致性,还有接口调用不能一次性并发太高,自己把握这个量
分模块说一下每个模块需要做的:
1、调用get接口:
请求本地接口进行测试,参数需要encode
# -*- coding: utf-8 -*- import urllib import urllib2 # get接口调用 get_url ="http://localhost:7000/api/test" get_params = {"startDate":20180729,"endDate":20180729} get_params_urlencode = urllib.urlencode(get_params) get_params_url = get_url+'?'+get_params_urlencode get_requst = urllib2.Request(url=get_params_url) get_res_data = urllib2.urlopen(get_requst) get_res = get_res_data.read() print get_res;
2、调用post接口:
这个操作复杂一点,加了从文件读取参数,参数是作为list入参
每次读取20个id,循环调用post接口,获取json结果进行解析
# -*- coding: utf-8 -*- import json import urllib2 import time file_dir = "/user/tmp/ids" post_url = "http://localhost:7000/api/users" # 读取文本文件获取id列表数据 file = open(file_dir) data_list = file.readlines() # 获取id循环次数,除数-1,结果加+1 n = (data_list.__len__() - 1) / 20 + 1 # 初始化list下标 j = 1 while j <= n: id_lists = data_list[(j - 1) * 20:j * 20] ids = [] for id in id_lists: ids.append(id.strip()) j += 1 print "Start : %s" % time.ctime() # 拼接参数值 params = {"staticDate": 20180926, "ids": ids} # 参数json格式化 json_params = json.dumps(params) # 组装request请求 post_requst = urllib2.Request(url=post_url, data=json_params) post_requst.add_header('Content-Type', 'application/json') post_requst.add_header('Accept', 'application/json') # 发起http请求,并获取返回结果 res = urllib2.urlopen(post_requst) # 把http返回的json数据转为 python 对象, result = json.loads(res.read()) result1 = result["data"] print result1 for id in ids: print 'id:' + id + '-> name:' + str(result1[id]['name']) # 休息20s再继续处理 time.sleep(20) print "End : %s" % time.ctime() file.close()
3、把获取到的解析结果写入文件:
一般来说写入文件之前需要校验一下文件是否存在,不存在的话新建一个空文件,写个方法校验一下:
# 判断输出文件存不存在,不存在新建,存在删除后新建 def get_file(filename): # print os.path.abspath('.') if not os.path.isfile(filename): print 'file not exists! create file :'+filename f = open(filename,'w') f.close() return 1 else: print 'file exists! starting delete history file !' os.remove(filename) f = open(filename,'w') f.close() print 'file created file !'+filename return 1
后面写入文件的时候有两种模式,w(w+) 写文件(读写)并且会每次写的时候覆盖 ,a(a+) 写文件(读写)每次写只会在文件后面追加。
下面这个就是往文件追加内容,最好追加之前调用一次get_file()进行文件清空,代码没有贴全,自己可以补充一下
with open(filename, 'a') as f: for id in ids: f.write(str(id).strip()+"\t"+str(result1[id]['name']).strip()+"\n") f.flush() f.close()
4、python连接hive-server
可以参考: https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-PythonClient
定一个连接hive的方法,用账号秘密进行连接,自己定义 sql 语句:
# 初始化hive联接 def get_hive_conn(): conn = pyhs2.connect(host='10.10.10.10', port=9090, authMechanism="PLAIN", user='user', password='password', database='db_name') return conn # 执行hive操作语句 def get_result(load_file, dt): conn = get_hive_conn() hive_sql = "select * from tablename" print hive_sql with conn.cursor() as cur: # Show databases # print cur.getDatabases() # Execute query cur.execute(hive_sql) # result = cur.fetchall() # print result # # Return column info from query print cur.getSchema() # # # Fetch table results for i in cur.fetch(): print i cur.close(); conn.close()
5、 shell 组装模块进行调度
因为还需要操作hdfs文件,shell脚本在我目前的环境是比较方便的,所以需要组装一下python各个模块
- a、调用接口模块 GetData.py
- b、写入文本文件的模块 WriteFile.py(调用接口的模块和写入文件可以写一起)
- c、连接hive的模块 ToHive.py
shell接受 python 参数:
result=`python test.py` 获取test.py的print语句返回的值
python接受shell参数:
python mta_hive.py ${a} ${b} 需要在python脚本内引用sys包,python脚本获取参数a = sys.argv[1] b= sys.argv[2]
shell脚本的大概代码:
#激活python环境 source ./bin/activate #参数 input="/user/tmp/inputfile.txt" output="/user/tmp/output.txt" filename=basename ${output} hdfs_path="hdfs:///user/temp" #执行python脚本调用接口,解析返回结果写入文件 result=`python WriteFile.py "${input}" "${output}"` #把文件put到hdfs上 $HADOOP_HOME/bin/hadoop fs -put -f ${result} ${hdfs_path} hdfsfile=${hdfs_path}${filename} #执行hive操作,把文件load到hive表 python ToHive.py ${hdfs_file}
一般来说写入load到hive表到语句是:
load data local inpath '/home/temp/test.txt' overwrite into table db_name.tablename PARTITION (dt=20180926);
但是如果是读取hdfs文件,需要把local去掉,再附上建表语句:
CREATE EXTERNAL TABLE IF NOT EXISTS dbname.tablename ( id bigint comment 'id', name string comment '名称' )COMMENT'测试' PARTITIONED BY(dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '/user/temp/test' ;
写的比较乱,有时间再梳理一下。
本文由brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 优化ElasticSearch写入效率
- golang 创建,读取,写入文件
- Kafka学习笔记 -- 写入数据
- Elasticsearch 写入原理深入详解
- 高频写入redis场景优化
- Laravel log 无法写入问题
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
JSON 在线解析
在线 JSON 格式化工具
HSV CMYK 转换工具
HSV CMYK互换工具