内容简介:今天花了一早上以及午休时间,终于把delta的Upsert功能做完了。加上上周周四做的Delta Compaction支持,我想要的功能基本就都有了。Delta的核心是DeltaLog,其实就是元数据管理。通过该套元数据管理,我们可以很容易的将Compaction,Update,Upsert,Delete等功能加上,因为本质上就是调用元数据管理API完成数据最后的提交。Upsert支持流式和批的方式进行更新。因为受限于Spark的SQL解析,大家可以使用Dataframe 或者 MLSQL的方式进行调用。
前言
今天花了一早上以及午休时间,终于把delta的Upsert功能做完了。加上上周周四做的Delta Compaction支持,我想要的功能基本就都有了。
Delta的核心是DeltaLog,其实就是元数据管理。通过该套元数据管理,我们可以很容易的将Compaction,Update,Upsert,Delete等功能加上,因为本质上就是调用元数据管理API完成数据最后的提交。
代码使用方式
Upsert支持流式和批的方式进行更新。因为受限于Spark的 SQL 解析,大家可以使用Dataframe 或者 MLSQL的方式进行调用。
批使用方式:
val log = DeltaLog.forTable(spark, outputDir.getCanonicalPath)
val upsertTableInDelta = UpsertTableInDelta(data, Option(SaveMode.Append), None, log,
new DeltaOptions(Map[String, String](), df.sparkSession.sessionState.conf),
Seq(),
Map("idCols" -> "key,value"))
val items = upsertTableInDelta.run(df.sparkSession)
唯一需要大家指定的就是 idCols, 也就是你的表的唯一主键组合是啥。比如我这里是key,value两个字段组成唯一主键。
流使用技巧是一模一样的,只需要做一点点修改:
UpsertTableInDelta(data, None, Option(OutputMode.Append())
UpsertTableInDelta 根据你设置的是SaveMode还是OutputMode来看是不是流写入。
MLSQL 使用方式
写入数据到Kafka:
set abc='''
{ "x": 100, "y": 201, "z": 204 ,"dataType":"A group"}
''';
load jsonStr.`abc` as table1;
select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where
kafka.bootstrap.servers="127.0.0.1:9092";
使用流程序消费Kafka:
-- the stream name, should be uniq. set streamName="kafkaStreamExample"; !kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow; -- convert table as stream source load kafka.`wow` options kafka.bootstrap.servers="127.0.0.1:9092" and failOnDataLoss="false" as newkafkatable1; -- aggregation select * from newkafkatable1 as table21; -- output the the result to console. save append table21 as rate.`/tmp/delta/wow-0` options mode="Append" and idCols="x,y" and duration="5" and checkpointLocation="/tmp/s-cpl6";
同样的,我们设置了idCols,指定x,y为唯一主键。
然后查看对应的记录变化:
load delta.`/tmp/delta/wow-0` as show_table1; select * from show_table1 where x=100 and z=204 as output;
你会惊喜的发现数据可以更新了。
实现剖析
一共涉及到三个新文件:
org.apache.spark.sql.delta.commands.UpsertTableInDelta org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource org.apache.spark.sql.delta.sources.MLSQLDeltaSink
对应源码参看我fork的delta项目: mlsql-delta
https://github.com/allwefantasy/delta
第一个文件是实现核心的更新逻辑。第二个第三个支持Spark的datasource API来进行批和流的写入。
这篇文章我们主要介绍UpsertTableInDelta。
case class UpsertTableInDelta(_data: Dataset[_],
saveMode: Option[SaveMode],
outputMode: Option[OutputMode],
deltaLog: DeltaLog,
options: DeltaOptions,
partitionColumns: Seq[String],
configuration: Map[String, String]
) extends RunnableCommand
with ImplicitMetadataOperation
with DeltaCommand with DeltaCommandsFun {
UpsertTableInDelta 集成了delta一些必要的基础类,ImplicitMetadataOperation,DeltaCommand,主要是为了方便得到一些操作日志写入的方法。
saveMode 和 outputMode 主要是为了方便区分现在是流在写,还是批在写,以及写的模式是什么。
assert(configuration.contains(UpsertTableInDelta.ID_COLS), "idCols is required ")
if (outputMode.isDefined) {
assert(outputMode.get == OutputMode.Append(), "append is required ")
}
if (saveMode.isDefined) {
assert(saveMode.get == SaveMode.Append, "append is required ")
}
限制条件是必须都是用Append模式,并且idCols是必须存在的。
saveMode match {
case Some(mode) =>
deltaLog.withNewTransaction { txn =>
actions = upsert(txn, sparkSession)
val operation = DeltaOperations.Write(SaveMode.Overwrite,
Option(partitionColumns),
options.replaceWhere)
txn.commit(actions, operation)
}
case None => outputMode match {
如果是批写入,那么直接调用deltaLog开启一个新的事物,然后进行upsert操作。同时进行commit,然后就搞定了。
如果是流写入则麻烦一点,
case None => outputMode match {
case Some(mode) =>
val queryId = sparkSession.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
assert(queryId != null)
if (SchemaUtils.typeExistsRecursively(_data.schema)(_.isInstanceOf[NullType])) {
throw DeltaErrors.streamWriteNullTypeException
}
val txn = deltaLog.startTransaction()
// Streaming sinks can't blindly overwrite schema.
// See Schema Management design doc for details
updateMetadata(
txn,
_data,
partitionColumns,
configuration = Map.empty,
false)
val currentVersion = txn.txnVersion(queryId)
val batchId = configuration(UpsertTableInDelta.BATCH_ID).toLong
if (currentVersion >= batchId) {
logInfo(s"Skipping already complete epoch $batchId, in query $queryId")
} else {
actions = upsert(txn, sparkSession)
val setTxn = SetTransaction(queryId,
batchId, Some(deltaLog.clock.getTimeMillis())) :: Nil
val info = DeltaOperations.StreamingUpdate(outputMode.get, queryId, batchId)
txn.commit(setTxn ++ actions, info)
}
}
}
首选我们获取queryId,因为在delta里需要使用queryId获取事务ID(batchId),并且最后写完成之后的会额外写入一些数据到元数据里,也需要queryId。
updateMetadata 主要是为了检测schema信息,譬如如果stream 是complte模式,那么是直接覆盖的,而如果是其他模式,则需要做schema合并。
如果我们发现当前事务ID>batchId,说明这个已经运行过了,跳过。如果没有,则使用upsert进行实际的操作。最后设置一些额外的信息提交。
upsert 方法
upsert的基本逻辑是:
-
获取idCols是不是有分区字段,如果有,先根据分区字段过滤出所有的文件。
-
如果没有分区字段,则得到所有的文件
-
将这些文件转化为dataframe
-
和新写入的dataframe进行join操作,得到受影响的行(需要更新的行),然后得到这些行所在的文件。
-
获取这些文件里没有无需变更的记录,写成新文件。
-
删除这些文件
-
将新数据写成新文件
4,5两个步骤需要对数据进行join,但是在Spark里静态表并不能直接join流表,所以我们需要将流表转化为静态表。
def upsert(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
// if _data is stream dataframe, we should convert it to normal
// dataframe and so we can join it later
val data = if (_data.isStreaming) {
class ConvertStreamDataFrame[T](encoder: ExpressionEncoder[T]) {
def toBatch(data: Dataset[_]): Dataset[_] = {
val resolvedEncoder = encoder.resolveAndBind(
data.logicalPlan.output,
data.sparkSession.sessionState.analyzer)
val rdd = data.queryExecution.toRdd.map(resolvedEncoder.fromRow)(encoder.clsTag)
val ds = data.sparkSession.createDataset(rdd)(encoder)
ds
}
}
new ConvertStreamDataFrame[Row](_data.asInstanceOf[Dataset[Row]].exprEnc).toBatch(_data)
} else _data
上述代码就是将流表转化为普通静态表。接着我们需要拿到主键字段里满足分区字段的字段,然后获取他们的min/max值
val minMaxColumns = partitionColumnsInIdCols.flatMap { column =>
Seq(F.lit(column), F.min(column).as(s"${column}_min"), F.max(F.max(s"${column}_max")))
}.toArray
val minxMaxKeyValues = data.select(minMaxColumns: _*).collect()
最后得到过滤条件:
// build our where statement
val whereStatement = minxMaxKeyValues.map { row =>
val column = row.getString(0)
val minValue = row.get(1).toString
val maxValue = row.get(2).toString
if (isNumber(column)) {
s"${column} >= ${minValue} and ${maxValue} >= ${column}"
} else {
s"""${column} >= "${minValue}" and "${maxValue}" >= ${column}"""
}
logInfo(s"whereStatement: ${whereStatement.mkString(" and ")}")
val predicates = parsePartitionPredicates(sparkSession, whereStatement.mkString(" and "))
Some(predicates)
现在可以得到所有相关的文件了:
val filterFilesDataSet = partitionFilters match {
case None =>
snapshot.allFiles
case Some(predicates) =>
DeltaLog.filterFileList(
metadata.partitionColumns, snapshot.allFiles.toDF(), predicates).as[AddFile]
}
将这些文件转化为dataframe,并且将里面的每条记录都带上所属文件的路径:
// Again, we collect all files to driver,
// this may impact performance and even make the driver OOM when
// the number of files are very huge.
// So please make sure you have configured the partition columns or make compaction frequently
val filterFiles = filterFilesDataSet.collect
val dataInTableWeShouldProcess = deltaLog.createDataFrame(snapshot, filterFiles, false)
val dataInTableWeShouldProcessWithFileName = dataInTableWeShouldProcess.
withColumn(UpsertTableInDelta.FILE_NAME, F.input_file_name())
通过Join获取哪些文件里面的记录需要被更新:
// get all files that are affected by the new data(update)
val filesAreAffected = dataInTableWeShouldProcessWithFileName.join(data,
usingColumns = idColsList,
joinType = "inner").select(UpsertTableInDelta.FILE_NAME).
distinct().collect().map(f => f.getString(0))
val tmpFilePathSet = filesAreAffected.map(f => f.split("/").last).toSet
val filesAreAffectedWithDeltaFormat = filterFiles.filter { file =>
tmpFilePathSet.contains(file.path.split("/").last)
}
val deletedFiles = filesAreAffectedWithDeltaFormat.map(_.remove)
将需要删除的文件里没有改变的记录单独拿出来写成新文件:
// we should get not changed records in affected files and write them back again val affectedRecords = deltaLog.createDataFrame(snapshot, filesAreAffectedWithDeltaFormat, false) val notChangedRecords = affectedRecords.join(data, usingColumns = idColsList, joinType = "leftanti"). drop(F.col(UpsertTableInDelta.FILE_NAME)) val notChangedRecordsNewFiles = txn.writeFiles(notChangedRecords, Some(options))
最后将我们新增数据写入:
val newFiles = txn.writeFiles(data, Some(options))
因为第一次写入的时候,schema还没有形成,所以不能走upsert逻辑,而是需要直接写入,这里我偷懒,没有把逻辑写在UpsertTableInDelta里,而是写在了MLSQLDeltaSink里:
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val metadata = deltaLog.snapshot.metadata
val readVersion = deltaLog.snapshot.version
val isInitial = readVersion < 0
if (!isInitial && parameters.contains(UpsertTableInDelta.ID_COLS)) {
UpsertTableInDelta(data, None, Option(outputMode), deltaLog,
new DeltaOptions(Map[String, String](), sqlContext.sparkSession.sessionState.conf),
Seq(),
Map(UpsertTableInDelta.ID_COLS -> parameters(UpsertTableInDelta.ID_COLS),
UpsertTableInDelta.BATCH_ID -> batchId.toString
)).run(sqlContext.sparkSession)
} else {
super.addBatch(batchId, data)
}
}
总结
Delta 具备了数据的增删改查能力,同时流批共享,并发修改控制,加上小文件compaction功能,基本解决了我们之前在使用流计算遇到的大部分问题。后续持续优化delta的查询功能,相信前景无限。
大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛http://hbase.group,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号:hbasegroup),非常欢迎大家积极投稿。
本群为HBase+Spark技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动
点击链接钉钉入群:https://dwz.cn/Fvqv066s或扫码进群
本群为Cassandra技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动
Cassandra 社区钉钉大群:https://c.tb.cn/F3.ZRTY0o
Cassandra 技术社区微信公众号:
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Programming Amazon Web Services
James Murty / O'Reilly Media / 2008-3-25 / USD 49.99
Building on the success of its storefront and fulfillment services, Amazon now allows businesses to "rent" computing power, data storage and bandwidth on its vast network platform. This book demonstra......一起来看看 《Programming Amazon Web Services》 这本书的介绍吧!