内容简介:作者:William责编:云湖湖1、概述
云湖湖导读:
在当今大数据时代,随着存储数据量的增长,对数据库插入与读取性能的要求越来越严苛。为了提升访问数据库的性能,已经有越来越多成熟的方案来并发访问数据库,Spark JDBC Datasource就是其中之一。而另一方面,并发访问数据库也会带来各种各样的问题,比如数据的保序、数据倾斜、并发一致性等问题。
本文将会从具体案例入手,通过以下3点来详细描述如何利用Spark来处理分布式事务,解决并发一致性问题。
1、概述事务
2、案例分析
3、解决方案
更多优质内容请关注微信公众号“智能数据湖”
作者:William
责编:云湖湖
1、概述
1.1
数据库事务
事务 可以简单理解为对数据库的一批操作,这批操作要么全部成功,要么全部失败回滚。 事务有4大特性ACID :
原子性(Atomicity)
事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。
一致性(Consistency)
事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。
隔离性(Isolation)
多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
持久性(Durability)
已被提交的事务对数据库的修改应该永久保存在数据库中。
以 MySQL 为例,一个简单的事务操作流程
1.2
分布式数据库事务
分布式事务是数据库事务的子类,在分布式系统与微服务架构盛行的今天,各个不同模块、服务之间进行数据库业务的操作,也是需要保证事务性质的。
现在,已经有很多分布式事务的解决方案,比如常见的2PC、3PC、MySQL的XA事务、SAGA等等。然而,各种方案都有各自的优缺点,面对分布式系统中的所有需求问题,至今是无法找到一个十全十美的解决方案。著名的CAP理论提到,在一个分布式系统中,最多只能满足C、A、P中的两个需求:
C:Consistency 一致性
同一数据的多个副本保持一致。
A:Availability 可用性
系统能够提供正常服务并能在需求的时间内返回结果。
P:Partition tolerance 分区容忍性
由于各种不可预知的问题,系统中必然不能一直使数据的各个副本保持同步,那么系统能够在可用的同时容忍数据不同步的极限称为分区容忍性。
由于至今没有有效的解法来否决CAP理论,因此在实际分布式的解决方案中,又引入了BASE理论来辅助解决分布式中的难题,该理论将会结合下面的实际案例分析来进行介绍。
1.3
Spark JDBC事务处理
Spark JDBC Datasource是Spark SQL用于访问关系型数据库的模块,其中自然运用到了事务逻辑。附录有详细源码分析。
2、案例分析
2.1
某企业需要定期将存放在HBase中的订单信息数据增量同步到PostGre数据库中,以便后续做BI报表展示。流程示意图如下:
该企业要求每隔1小时将HBase中新产生的数据导入到PostGre中,在下一个导入周期开始前完成,每个周期的数据量在10~100GB级别左右。其中每张表都有时间戳(Timestamp)字段来表示数据插入的时间。另外,该企业还要求导入过程中有失败重试机制,保证所有订单数据最终导入成功,并且不能在PostGre中产生重复的订单数据。
考虑到10~100GB级别数据需要在一个周期内导入完成,性能还是有一定的需求。选型中确定了使用SparkSQL的分布式并发处理能力来进行数据的导入。
2.2
Spark JDBC中的并发问题
Spark能够并发启动多个task来同时进行数据的导入,解决了性能问题。但是并发也会带来其他的问题。
Spark是具有事务处理的能力,但是每个task都是一个独立的事务。task执行成功之后,数据会永久写入 SQL 数据库。如果其他的task执行失败,不会影响到已经执行成功的task进行回滚,这时SQL数据库将会产生脏数据。对于用户来讲,实际上只是执行了一条“insert”语句,但是Spark内部启动了并发task,将用户的SQL语句拆分成了多个SQL语句进行处理,而用户是不会感知的。从用户的角度考虑,如果需要事务的能力,要么这一条“insert”语句执行成功,数据正确写入数据库,要么执行失败,SQL语句回滚,没有数据写入数据库中。因此,Spark的并发处理能力无法满足用户对于事务的需求。
图 用户的一条insert语句被拆分成多个insert语句并发执行,当task4写入失败后,其他task写入的数据变成脏数据
3、解决方案
3.1
单并发事务处理
思路
1.2中提到了CAP理论,当C、A、P三者选其二的时候,如果选择CA,那么需要放弃掉数据副本,也就是说系统中只有一份数据,那么就减少了因为同步而带来的一致性问题(实际上考虑到分布式系统的可靠性,数据是需要多副本存放的,一般的分布式系统都会选择P,并在CA中二选一)。
根据这个思想,当并发存在问题的时候,采用单并发从根源上直接抹去分布式事务能力,是可以解决问题的。在Spark中可以设置参数spark.default.parallelism和spark.sql.shuffle.partitions为1来控制task的个数。这样利用Spark JDBC的事务能力,当单task执行失败的时候进行回滚,数据库就不会产生脏数据了。
图 单task成功写入即完成任务
图 单task失败事务回滚,数据库不会产生脏数据
优点
完全解决了Spark因并发导致的分布式事务功能缺失问题。
缺点
无法使用Spark的并行计算能力,性能直线下降,无法满足客户需求。
3.2
二阶段式:协调者与执行者实现
1.2中介绍过,业界已经有很多分布式事务的解决方案,在这里我们介绍一下比较简单的2PC如何在Spark中实现,其他的方案也都可以根据其思想结合Spark现有的执行模式来实现。
思路
先来介绍一下2PC的实现,在2PC的系统中需要有2个角色——协调者和参与者,协调者负责任务调度,参与者负责执行任务。2PC活动分为2个阶段——准备阶段和提交阶段。
准备阶段,协调者会发送确认信息询问参与者是否已准备好,具备事务提交能力;当协调者接收到所有参与者回复之后,进入提交阶段,如果所有参与者都准备好事务提交,则提交事务,如果有一个参与者返回无法进行事务提交,则所有参与者进行回滚。
图 2PC事务成功提交流程
图 2PC事务失败回滚流程
Spark中的Driver与Executor运行模式,正好填充了协调者与参与者的角色。Driver可以作为协调者,Executor中的每一个task可以作为参与者,这里还需要在Driver与task之间增加一个通信通道。准备阶段,Driver启动监听端口等待接收task反馈,各个task完成insert数据处理之后发送完成准备信息给Driver,并等待Driver下一步指令。提交阶段,Driver接受到所有task的准备信息之后,发送提交或者回滚指令。
图 Spark 2PC改造后实现分布式事务处理流程
在这里还需要打通2个难点。第一个问题是准备阶段,Driver在等待task反馈的时候,当task异常失败后有可能会无法反馈。这就需要Driver的监听与task状态检测机制关联,当遇到task异常退出的时候,Driver需要下发回滚指令,不能无止尽等待,防止Spark作业卡死。第二个问题是task是在队列等待下发的,当executor的资源不足以启动所有task的时候,会有部分task存在队列中,这时候driver等待所有task的反馈,启动中的task在等待driver下发新的指令,队列中未启动的task等待启动中的task完成任务释放资源,三方互相牵制,造成死循环。
图 当集群资源无法满足同时启动所有task的时候,造成死循环
这时需要通过Spark配置项来控制task并发数,确保executor有足够的资源能够一次性将所有并发task都启动。相关的配置项如下表所示。
表 控制task并发数的相关配置项
优点
既能满足简单的分布式事务需求,还能充分利用Spark的并行计算能力提升性能。
缺点
1. 需要精确控制task并发数,确保所有task能同时执行,否则会造成死循环。
2. 需要在Spark中实现分布式事务的逻辑,具有一定的工作量。
除了以上这些,2PC协议本身的缺陷在结合了Spark中的一些可靠性机制之后会有所减少(比如协调者挂死、参与者挂死等),但并不代表2PC的所有缺陷Spark都能规避。因此,对于一些有复杂分布式事务处理能力的需求,该方案还是不太满足。
3.3
最佳方案:可幂等的insert ignore模式
思路
1.2中我们遗留了一个BASE理论,先来做一下介绍。BASE理论是eBay架构师提出的,它是CAP理论的一个延伸。BASE理论由BA(Basically Available, 基本可用),S(Soft State),E(Eventual Consistency, 最终一致性)三部分组成,核心思想是即使无法做到强一致性,但应用可以采用适合的方式达到最终一致性。
从案例背景中,我们可以看出来,实际上用户需要的只是最终能够完成所有数据的同步,即所谓的最终一致性,因此我们可以结合Spark的重试机制与类似MySQL的insert ignore功能来实现。
先来介绍一下MySQL的insert ignore。当表中存在主键(例如ID),那么在使用insert ignore into语法时,如果遇到插入主键相同的情况下,将会忽略此次插入。Oracle有类似的功能,叫做merge into,有一点区别是MySQL遇到冲突时,忽略插入,而Oracle遇到冲突时是进行更新操作。PostGre没有直接提供类似功能,但是网上有很多关于PostGre如何实现insert ignore的方法,在这里就不做介绍了。而在案例中涉及到的PostGre,在华为云解决方案中采用的是DWS数据仓库服务,除了兼容PostGre还提供了Oracle的merge into语法来实现该功能。
因此,针对本文的案例,具体的方案是先对表设置主键,如果只是静态的订单状态等信息,则可以简单将订单ID作为主键,而有些涉及到订单操作信息的数据(订单ID会多次出现,记录用户每次对该订单的操作,例如创建订单、对订单支付、撤销订单等),则需要新增一个唯一的事件ID作为主键。
表 订单操作信息表示例
表 增加事件ID作为主键后的订单操作信息表示例
然后在Spark中使用insert ignore的功能,这样如果遇到并发执行时某个task失败的情况,重新执行作业,会对已经插入的数据进行忽略,并不造成数据的重复。
图 使用insert ignore,当task执行失败时部分数据写入,REDO时忽略已写入数据
优点
既解决了一致性问题,也能利用Spark的并行计算能力提升性能。
缺点
该方案使用的是最终一致性的思想,因此如果遇到强一致性的需求是无法满足的。
注释
1、 参考文献: 王能斌. 《数据库系统教程(上册)》. 电子工业出版社
2、 通过分析向数据库插入数据的源码,来介绍一下 Spark 中如何进行事务处理的(代码表可左右滑动):
// @param insertStmt是插入数据库的sql语句模板, 例如"insert into tablexx values(?, ?, ?...)"
// 根据rdd中的数据调用java jdbc batch接口将问号替换掉
// @param batchSize指定了每批次插入sql数据库的记录(行)数
// @param dialect为各种不同SQL数据库提供类型转换器
// @param isolationLevel事务的隔离级别, 同时也是用于判断是否开启事务的标志
def savePartition(
getConnection: () => Connection,
table: String,
iterator: Iterator[Row],
rddSchema: StructType,
insertStmt: String,
batchSize: Int,
dialect: JdbcDialect,
isolationLevel: Int): Iterator[Byte] = {
val conn = getConnection()
var committed = false
// 判断对方数据库是否支持相应的隔离级别
// JDBC接口中隔离级别一共有5种:(网上有很多关于该级别的解释)
// "NONE"
// "READ_UNCOMMITTED"
// "READ_COMMITTED"
// "REPEATABLE_READ"
// "SERIALIZABLE"
//
// isolationLevel为用户指定的隔离级别, spark sql中可以由参数isolationLevel来指定, 默认为"READ_UNCOMMITTED"
// finalIsolationLevel为分析对方数据库的元数据信息后, 选择的最终隔离级别
//
var finalIsolationLevel = Connection.TRANSACTION_NONE
if (isolationLevel != Connection.TRANSACTION_NONE) {
try {
// 获取数据库的元数据信息
val metadata = conn.getMetaData
// 判断是否支持事务
if (metadata.supportsTransactions()) {
// 若数据库支持用户指定的隔离级别,则将最终隔离级别调整为用户指定
// 否则的话使用数据库的默认隔离级别
val defaultIsolation = metadata.getDefaultTransactionIsolation
finalIsolationLevel = defaultIsolation
if (metadata.supportsTransactionIsolationLevel(isolationLevel)) {
finalIsolationLevel = isolationLevel
} else {
logWarning(s"Requested isolation level $isolationLevel is not supported; " +
s"falling back to default isolation level $defaultIsolation")
}
} else {
// 若数据库不支持事务, finalIsolationLevel为'NONE', 即不开启事务
logWarning(s"Requested isolation level $isolationLevel, but transactions are unsupported")
}
} catch {
case NonFatal(e) => logWarning("Exception while detecting transaction support", e)
}
}
// 若最终隔离级别不为'NONE'则开启事务
val supportsTransactions = finalIsolationLevel != Connection.TRANSACTION_NONE
try {
// 若开启事务, 则关闭jdbc接口中的自动提交功能, 并设置隔离级别
if (supportsTransactions) {
conn.setAutoCommit(false) // Everything in the same db transaction.
conn.setTransactionIsolation(finalIsolationLevel)
}
// 根据不同的数据库(mysql, postgre等)构造不同的setters, 用于将insert模板语句中的"?"填充成数据
val stmt = conn.prepareStatement(insertStmt)
val setters = rddSchema.fields.map(f => makeSetter(conn, dialect, f.dataType))
val nullTypes = rddSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType)
val numFields = rddSchema.fields.length
try {
var rowCount = 0
while (iterator.hasNext) {
val row = iterator.next()
var i = 0
// 根据字段个数逐行对insert语句进行填充
while (i < numFields) {
if (row.isNullAt(i)) {
stmt.setNull(i + 1, nullTypes(i))
} else {
setters(i).apply(stmt, row, i)
}
i = i + 1
}
stmt.addBatch()
rowCount += 1
// 当插入数据的行数达到batchSize(默认1000)之后, 进行插入
if (rowCount % batchSize == 0) {
stmt.executeBatch()
rowCount = 0
}
}
// 最后全部处理完之后, 将剩余的数据插入数据库
if (rowCount > 0) {
stmt.executeBatch()
}
} finally {
stmt.close()
}
// 若开启了事务, 执行完stmt.executeBatch()之后数据库是不会生效的, 需要执行提交事务
if (supportsTransactions) {
conn.commit()
}
committed = true
Iterator.empty
} catch {
...
} finally {
if (!committed) {
// 提交失败, 如果开启了事务则进行回滚
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
...
}
}
}
▼ 每周二18点,不见不散 ▼
延伸阅读
咦,在看吗?点一下「在看」再走呗:point_down:
以上所述就是小编给大家介绍的《Spark SQL 分布式事务处理能力的探索与实践》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
数学建模算法与应用
司守奎、孙玺菁 / 国防工业出版社 / 2011-8 / 49.00元
《数学建模算法与应用》主要内容简介:作者司守奎、孙玺菁根据多年数学建模竞赛辅导工作的经验编写《数学建模算法与应用》,涵盖了很多同类型书籍较少涉及的新算法和热点技术,主要内容包括时间序列、支持向量机、偏最小二乘面归分析、现代优化算法、数字图像处理、综合评价与决策方法、预测方法以及数学建模经典算法等内容。《数学建模算法与应用》系统全面,各章节相对独立。《数学建模算法与应用》所选案例具有代表性,注重从不......一起来看看 《数学建模算法与应用》 这本书的介绍吧!