Python多线程批量插入数据小结

栏目: Python · 发布时间: 5年前

内容简介:在测试的过程中,无法避免的需要做一些性能压测,造数据的时长在此时就会备受关注。比如,造数据的时候用多线程还是多进程,用直接插入DB方式还是用先写文件后导入mysql的方式,写文件是写批量sql后面source导入,还是写文本格式后面load ito file导入,使用不同的方法耗时结果肯定是不一样的。除此之外,还有mysql的版本,DB的引擎,表的结构设计这些都会影响大量数据插入的时间。这次导入数据做了一个小试验:导入2000w笔数据到DB内。使用多线程的线程池技术,首先写2000w笔数据分成M个文件,然

在测试的过程中,无法避免的需要做一些性能压测,造数据的时长在此时就会备受关注。比如,造数据的时候用多线程还是多进程,用直接插入DB方式还是用先写文件后导入 mysql 的方式,写文件是写批量 sql 后面source导入,还是写文本格式后面load ito file导入,使用不同的方法耗时结果肯定是不一样的。除此之外,还有mysql的版本,DB的引擎,表的结构设计这些都会影响大量数据插入的时间。

这次导入数据做了一个小试验:导入2000w笔数据到DB内。使用多线程的线程池技术,首先写2000w笔数据分成M个文件,然后使用N个线程去并发处理写好M个文件,并把M个文件导入到MYSQL中。之前同事写的文件后面都是用load data from file命令导入的,官网也有说明这个命令字跟insert比,性能会高出20倍,于是把这个命令字跟我常用的source命令字做了下对比,结论如下:

Python多线程批量插入数据小结

一、Insert命令字详解

基于这些试验和现象,确定load ito file加载数据的性能确实要比source导入大批量数据的性能更稳定,更快。但这又是为什么呢?查阅了下MYSQL的官网,insert命令字的插入过程如下,括号内的数字几乎表示可能占用的耗时比:

A: 打开连接:(3)

B: 向服务器发送查询:(2)

C:解析查询:(2)

D:插入行:(1×行大小)

E:插入索引:(1×数索引)

F:关闭连接(1)

以上步骤还不包含连接时,打开表的开销。 当大量的insert批量文件被多线程执行插入时,每一个线程都需要经过6步才能完成数据的插入,表的索引结构,表当前数据的行数对insert的每次插入都会影响。如果想提升写入大数据的性能,可以尝试批量insert(即insert后的值有多个values),这在一般情况下会单个insert要快,但是要注意设置mysql的bulk_insert_buffer_size参数的大小,之前开发有一些经验值,一般情况下是设置300-500一批插入性能最佳。但是想要性能更快更稳定,可以使用“LOAD DATA INFILE”,这个命令比单insert要快近20倍。

二、提升导入数据性能----mysql服务器端

1、当将数据导入到INNODB中时,关闭自动提交模式,因为在自动提交模式下,每一次插入都会刷新一次日志到磁盘。可以使用如下语句:

SET autocommit=0;

... SQL import statements ...

COMMIT;

2、关闭唯一索引。减少索引的插入和唯一性的校验。

SET unique_checks=0;

... SQL import statements ...

SET unique_checks=1;

3、关闭外键检查来加速表导入。

SET foreign_key_checks=0;

... SQL import statements ...

SET foreign_key_checks=1;

4、将innodb_autoinc_lock_mode设置为2,而不是默认值1。

5、在执行批量插入时,以主键顺序插入行更快。

6、字符串的拼接用 .join > a += b ,因为 +=方式每次要重新计算内存/分配。

7、 Python 的多线程在IO密集的应用场景下,可以写多个文件,让多线程的优势得到更充分的发挥。

8、批量插入时,多利用字段的默认值,字段值如果使用默认值,会缩端插入过程中对数据解析的时间。

三、过程遇到的问题以及解决办法

1、怎么快速删除2000w笔数据?

使用truncate 命令字,几秒内就能删除数据。如 :truncate table t_dc;

2、测试机器上网速很慢,无法安装python的第三方库怎么办?

在已经安装好的python的site-packages下复制使用的py脚本,然后放到对应机器上。如threadpool.py无法安装时,最快速的方法就是手工复制文件Python27\Lib\site-packages\threadpool.py使用。

3、mysqldb执行报错:Lock wait timeout exceeded; try restarting transaction

设置全局等待事务锁超时时间 :SET GLOBAL innodb_lock_wait_timeout=100;

查询全局等待事务锁超时时间 :SHOW GLOBAL VARIABLES LIKE 'innodb_lock_wait_timeout';

4、mysql事务锁如何查看:

在information_schema下面有三张表:INNODB_TRX、INNODB_LOCKS、INNODB_LOCK_WAITS(解决问题方法),通过这三张表,可以更简单地监控当前的事务并分析可能存在的问题。

当前运行的所有事务 :select * from information_schema.innodb_trx;

当前出现的锁 :select * from information_schema.innodb_locks;

锁等待的对应关系:select * from information_schema.innodb_lock_waits;

四:批量脚本(线程池+lLOAD DATA LOCAL INFILE)

#!/usr/bin/env python

#coding=GBK

import threadpool

import time,sys

from subprocess import call,Popen,PIPE

COUNT = 1000

process_num = 5

seperate = 100

dbHost='100.92.174.16'

dbUser='root'

dbPasswd='root1234'

dbOperater=None

baseData = {}

thread_arr=[]

detail_data={}

listid = 16080802011100100001

Ftde_id = 1

Fbank_list = 20171217761623447701

tablename = "epcc_check.t_dc_list"

resultDir = "/data/home/loleinaliao/loleinatext/"

#columns

order_columns="Fcheck_bank, Facc_day, Fbankaccno, Fbankusername, Famount, Fdc_type_id, Fori_accno, Fbank_status,Fid,Fbank_listid,Fbatchno"

#data

order_base="'4251','2018022721','6225425177777777800004','wltest','1','16','6225425177777777800004','00'"

tablename ="epcc_check_201810.t_dc_list_06"

def writeDownSqlData(fileName,content):

fo = open(fileName,"w")

fo.write(content)

fo.close()

def make_t_tcpay_list(deal_num,Flistid,Ftde_id,Fbank_list):

filename ="data_order_"+Fbank_list+".text"

Fbatchno = 'B201810070011'

orderDataList = []

for i in range(int(deal_num)):

orderData=""

orderData = "%s,'%s','%s','%s'" % (order_base, Ftde_id, Fbank_list, Fbatchno)

orderDataList.append(orderData)

Ftde_id = int(Ftde_id) + 1

Flistid = int(Flistid) + 1

Fbank_list = int(Fbank_list) + 1

writeDownSqlData(resultDir+filename, "\n".join(orderDataList) + "\n")

loadDataIntoDB(resultDir+filename,tablename,order_columns)

def loadDataIntoDB(filename,tableName,order_columns):

mysqlCmd = r"LOAD DATA LOCAL INFILE '%s' into table %s FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\''  LINES TERMINATED BY '\n' (%s"%(filename,tableName,order_columns)+")"

mysqlConCmd = r"mysql -u%s -p%s -h%s"%(dbUser,dbPasswd,dbHost)

runCmd = mysqlConCmd+' -e "'+mysqlCmd+'"'

result = call(runCmd,shell=True)

if result != 0:

print "load local data into database failed,exit "

sys.exit()

if __name__ == "__main__":

start = time.time()

begin_Flistid ='110180809100012153304210311120'

begin_Ftde_id ="1"

begin_Fbank_seq ="2018100800000110734321790770100"

total_num  =20000000

threadpool_num = 20

func_var=[]

seperate =2000

pool = threadpool.ThreadPool(threadpool_num)

for i  in range(seperate):

list_temp =[]

list_temp =[str(total_num/seperate),begin_Flistid,begin_Ftde_id,begin_Fbank_seq]

func_var.append((list_temp,None))

begin_Flistid = str(int(begin_Flistid)+ total_num/threadpool_num)

begin_Ftde_id = str(int(begin_Ftde_id)+ total_num/threadpool_num)

begin_Fbank_seq = str(int(begin_Fbank_seq)+ total_num/threadpool_num)

pool = threadpool.ThreadPool(threadpool_num)

requests = threadpool.makeRequests(make_t_tcpay_list, func_var)

for req in requests:

pool.putRequest(req)

pool.wait()

end = time.time()

print end - start

Linux公社的RSS地址https://www.linuxidc.com/rssFeed.aspx

本文永久更新链接地址: https://www.linuxidc.com/Linux/2018-11/155255.htm


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

超预期

超预期

[美] 莱昂纳多·因基莱里、迈卡·所罗门 / 杨波 / 后浪丨江西人民出版社 / 2017-11 / 45.00元

用户体验决定产品成败,只有超预期才能赢得好口碑! 互联网大佬一致推崇的打造爆款产品及服务的核心理念 ................... ※编辑推荐※ ☆ 超预期,才有用户体验,互联网大佬一致推崇的打造爆款产品及服务的核心理念 - 周鸿祎:“什么叫用户体验?超过用户预期才叫 体验!” - 雷军:“口碑的真谛是超越用户的期望值。” - 马化腾:“用户体验,......一起来看看 《超预期》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

MD5 加密
MD5 加密

MD5 加密工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具