内容简介:完整报错信息如下:es客户端在写入es时,数据现有的版本号与它所持有的版本号不一致,即有别的client已经修改过数据。1)首先思考:确保了_id全局唯一,正常情况下同一个_id的数据,仅会被一个spark task执行一次。而es基于乐观锁进行控制,只有其他client在当前client读写之间进行了数据的更改才会导致当前client报版本冲突错误。于是思考,是什么原因导致会有至少两个client去写同一条数据呢?
- 业务场景:spark批量写入es,基于es-hadoop组件实现
- 批处理任务定时调度
- cdh5.5.3集群,spark2.3,elasticsearch6.4.3
- es中对应索引的_id由程序控制,保证全局唯一
- 仅测试环境出现,且偶尔出现
问题描述
完整报错信息如下:
19/05/20 11:08:54 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 24.0 failed 4 times, most recent failure: Lost task 2.3 in stage 24.0 (TID 849, p016d052n01, executor 6): org.elasticsearch.hadoop.EsHadoopException: Could not write all entries for bulk operation [24/1000]. Error sample (first [5] error messages): org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][OZVIK_2462056_2019-05-18]: version conflict, document already exists (current version [1]) {"update":{"_id":"OZVIK_2462056_2019-05-18"}} {"doc_as_upsert":true,"doc":{"id":"OZVIK_2462056_2019-05-18","product_no":"OZVIK","cust_id":"2462056","p106":32,"p107":61,"p108":55,"p109":"YGM6E","p110":1,"p111":46,"p112":11126,"p113":189,"p114":70,"p115":6,"p116":60,"p117":"male","p118":"gg","p119":19,"p120":2,"p121":1544025600000,"p122":69,"p123":"FL0SS","dt":"2019-05-18","absum01":71,"testday01":76,"testday02":11202,"testday03":"7611202","testday04":"70male","testday04_2":22404,"testday05":"761120270male761120222404","amount01":"YGM6E2462056","amount02":22252,"amount03":"OZVIK","aa":11197,"testb21":93,"fix_const_999_0222":999,"0304tf":"999 2462056 YGM6E","0305test_long":11173,"hello":87,"datetest":"2019-05-18","binarytest":32,"nestedtest":"YGM6E","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"OZVIK","floattest02":1,"__namelist_54":"0"}} org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][OZWTC_148752_2019-05-18]: version conflict, document already exists (current version [1]) {"update":{"_id":"OZWTC_148752_2019-05-18"}} {"doc_as_upsert":true,"doc":{"id":"OZWTC_148752_2019-05-18","product_no":"OZWTC","cust_id":"148752","p106":88,"p107":20,"p108":13,"p109":"3BIW6","p110":1,"p111":79,"p112":15107,"p113":183,"p114":62,"p115":85,"p116":68,"p117":"female","p118":"nn","p119":51,"p120":80,"p121":1534867200000,"p122":87,"p123":"VOG2J","dt":"2019-05-18","absum01":63,"testday01":147,"testday02":15254,"testday03":"14715254","testday04":"62female","testday04_2":30508,"testday05":"1471525462female1471525430508","amount01":"3BIW6148752","amount02":30214,"amount03":"OZWTC","aa":15170,"testb21":108,"fix_const_999_0222":999,"0304tf":"999 148752 3BIW6","0305test_long":15187,"hello":101,"datetest":"2019-05-18","binarytest":88,"nestedtest":"3BIW6","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"OZWTC","floattest02":1,"__namelist_54":"0"}} org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P08Y7_3310671_2019-05-18]: version conflict, document already exists (current version [1]) {"update":{"_id":"P08Y7_3310671_2019-05-18"}} {"doc_as_upsert":true,"doc":{"id":"P08Y7_3310671_2019-05-18","product_no":"P08Y7","cust_id":"3310671","p106":27,"p107":62,"p108":40,"p109":"5JPCP","p110":0,"p111":93,"p112":17036,"p113":185,"p114":68,"p115":54,"p116":24,"p117":"female","p118":"aa","p119":43,"p120":88,"p121":1536508800000,"p122":43,"p123":"HI31Q","dt":"2019-05-18","absum01":68,"testday01":122,"testday02":17158,"testday03":"12217158","testday04":"68female","testday04_2":34316,"testday05":"1221715868female1221715834316","amount01":"5JPCP3310671","amount02":34072,"amount03":"P08Y7","aa":17104,"testb21":89,"fix_const_999_0222":999,"0304tf":"999 3310671 5JPCP","0305test_long":17129,"hello":67,"datetest":"2019-05-18","binarytest":27,"nestedtest":"5JPCP","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P08Y7","floattest02":0,"__namelist_54":"0"}} org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P0TI9_8523_2019-05-18]: version conflict, document already exists (current version [1]) {"update":{"_id":"P0TI9_8523_2019-05-18"}} {"doc_as_upsert":true,"doc":{"id":"P0TI9_8523_2019-05-18","product_no":"P0TI9","cust_id":"8523","p106":20,"p107":68,"p108":36,"p109":"YIP72","p110":0,"p111":24,"p112":13632,"p113":197,"p114":73,"p115":70,"p116":90,"p117":"male","p118":"aa","p119":75,"p120":11,"p121":1532361600000,"p122":82,"p123":"8KUUS","dt":"2019-05-18","absum01":73,"testday01":143,"testday02":13775,"testday03":"14313775","testday04":"73male","testday04_2":27550,"testday05":"1431377573male1431377527550","amount01":"YIP728523","amount02":27264,"amount03":"P0TI9","aa":13705,"testb21":88,"fix_const_999_0222":999,"0304tf":"999 8523 YIP72","0305test_long":13656,"hello":56,"datetest":"2019-05-18","binarytest":20,"nestedtest":"YIP72","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P0TI9","floattest02":0,"__namelist_54":"0"}} org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P1J8O_2619118_2019-05-18]: version conflict, document already exists (current version [1]) {"update":{"_id":"P1J8O_2619118_2019-05-18"}} {"doc_as_upsert":true,"doc":{"id":"P1J8O_2619118_2019-05-18","product_no":"P1J8O","cust_id":"2619118","p106":99,"p107":57,"p108":53,"p109":"NR3QD","p110":1,"p111":83,"p112":17171,"p113":157,"p114":55,"p115":8,"p116":20,"p117":"male","p118":"oo","p119":42,"p120":4,"p121":1516636800000,"p122":62,"p123":"FO4IS","dt":"2019-05-18","absum01":56,"testday01":63,"testday02":17234,"testday03":"6317234","testday04":"55male","testday04_2":34468,"testday05":"631723455male631723434468","amount01":"NR3QD2619118","amount02":34342,"amount03":"P1J8O","aa":17227,"testb21":156,"fix_const_999_0222":999,"0304tf":"999 2619118 NR3QD","0305test_long":17255,"hello":152,"datetest":"2019-05-18","binarytest":99,"nestedtest":"NR3QD","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P1J8O","floattest02":1,"__namelist_54":"0"}} Bailing out... at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:519) at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:127) at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192) at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 复制代码
es客户端在写入es时,数据现有的版本号与它所持有的版本号不一致,即有别的client已经修改过数据。
解决过程
1)首先思考:确保了_id全局唯一,正常情况下同一个_id的数据,仅会被一个spark task执行一次。而es基于乐观锁进行控制,只有其他client在当前client读写之间进行了数据的更改才会导致当前client报版本冲突错误。于是思考,是什么原因导致会有至少两个client去写同一条数据呢?
可能一:spark的动态资源分配
spark的动态资源分配,在CDH中确实会导致executor数量成倍增长,然后将task调度到新的executor执行,但这不会导致同一个task对应的数据(partition)多个task执行,故排除。
可能二:task的推测执行
推测执行机制为了防止某个task拖慢task set整体的执行进度,会为同一份数据启动多个task,哪个task最先执行完就以该task的结果为准,并杀掉其他task。该种情况确实会产生多个client写同一条数据产生版本冲突,但spark默认并未开启该机制,程序也没有手动设置,所以也要排除。
2)debug源代码,因为问题很难复现问题,也没有获得足够有用的信息。
3)这个时候突然发现ui界面除了有大量版本冲突的报错信息,在某个角落还有一种EsHadoopNoNodesLeftException: Connection error的错误信息,再结合spark的task重试机制,貌似已经有了答案。由于网络原因,es连接异常,但已经写入的数据却无法回滚,spark重新调度该任务,新任务以数据的版本号为0进行写入,但实际已经写入的数据版本已经被自增为1了,这时报版本冲突。
4)首先解决版本冲突问题。因为只要保证数据不丢失,所以版本冲突时只需忽略该条数据即可。
结合官网配置如下错误处理器
public class IgnoreConflictsHandler extends BulkWriteErrorHandler { @Override public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception { if (entry.getResponseCode() == 409) { return HandlerResult.HANDLED; } return collector.pass("Not a conflict response code."); } } 复制代码
经验证,确实不会再出现版本冲突的错误,ui界面只能看到EsHadoopNoNodesLeftException: Connection error
5)解决EsHadoopNoNodesLeftException: Connection error
由于集群使用 docker 虚拟机搭建,并且elasticsearch与cdh集群部署在一起,整体性能较差;并且集群中默认开启了spark的动态资源分配,导致写入并行度成倍增长。以上原因导致连接异常报错。 解决:使用--conf spark.dynamicAllocation.enabled=false 禁用动态资源分配,同时调整并行度,即控制同时写入es的client数量。
经验证,连接异常不再出现。
源码验证
由 dataframe.saveToEs(to, map) 开始,调用链如下:
SparkDataFrameFunctions#saveToEs
EsSparkSQL#saveToEs
SparkContext#runJob
忽略dag划分、task调度等细节,关注runJob方法 sparkCtx.runJob(srdd.toDF().rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)
EsDataFrameWriter的write方法转换为函数作为参数传递到runJob中,在后续调用
def write(taskContext: TaskContext, data: Iterator[T]) { val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log) taskContext.addTaskCompletionListener((TaskContext) => writer.close()) if (runtimeMetadata) { writer.repository.addRuntimeFieldExtractor(metaExtractor) } while (data.hasNext) { writer.repository.writeToIndex(processData(data)) } } 复制代码
调用链如下: RestRepository#writeToIndex
RestRepository#doWriteToIndex
BulkProcessor#add
BulkProcessor#flush
BulkProcessor#tryFlush
RestClient#bulk
NetworkClient#execute
核心方法:
public Response execute(Request request) { Response response = null; boolean newNode; do { SimpleRequest routedRequest = new SimpleRequest(request.method(), null, request.path(), request.params(), request.body()); newNode = false; try { response = currentTransport.execute(routedRequest); ByteSequence body = routedRequest.body(); if (body != null) { stats.bytesSent += body.length(); } } catch (Exception ex) { if (ex instanceof EsHadoopIllegalStateException) { throw (EsHadoopException) ex; } // issues with the SSL handshake, bail out instead of retry, for security reasons if (ex instanceof javax.net.ssl.SSLException) { throw new EsHadoopTransportException(ex); } // check for fatal, non-recoverable network exceptions if (ex instanceof BindException) { throw new EsHadoopTransportException(ex); } if (log.isTraceEnabled()) { log.trace( String.format( "Caught exception while performing request [%s][%s] - falling back to the next node in line...", currentNode, request.path()), ex); } String failed = currentNode; failedNodes.put(failed, ex); newNode = selectNextNode(); log.error(String.format("Node [%s] failed (%s); " + (newNode ? "selected next node [" + currentNode + "]" : "no other nodes left - aborting..."), failed, ex.getMessage())); if (!newNode) { throw new EsHadoopNoNodesLeftException(failedNodes); } } } while (newNode); return response; } 复制代码
在此抛出 EsHadoopNoNodesLeftException
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 优化ElasticSearch写入效率
- golang 创建,读取,写入文件
- Kafka学习笔记 -- 写入数据
- Elasticsearch 写入原理深入详解
- 高频写入redis场景优化
- Laravel log 无法写入问题
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
遗传算法与工程优化
程润伟 / 清华大学出版社 / 2004-1 / 39.00元
《遗传算法与工程优化》总结了遗传算法在工业工程相关领域应用的前沿进展。全书共分9章:遗传算法基础、组合优化问题、多目标优化问题、模糊优化问题、可靠性设计问题、调度问题、高级运输问题、网络设计与路径问题和制造元设计问题。内容既涵盖了遗传算法在传统优化问题中的新进展,又涉及了目前在供应链和物流研究中相当热门的话题。一起来看看 《遗传算法与工程优化》 这本书的介绍吧!