内容简介:今天花了一早上以及午休时间,终于把delta的Upsert功能做完了。加上上周周四做的Delta Compaction支持,我想要的功能基本就都有了。Delta的核心是DeltaLog,其实就是元数据管理。通过该套元数据管理,我们可以很容易的将Compaction,Update,Upsert,Delete等功能加上,因为本质上就是调用元数据管理API完成数据最后的提交。Upsert支持流式和批的方式进行更新。因为受限于Spark的SQL解析,大家可以使用Dataframe 或者 MLSQL的方式进行调用。
前言
今天花了一早上以及午休时间,终于把delta的Upsert功能做完了。加上上周周四做的Delta Compaction支持,我想要的功能基本就都有了。
Delta的核心是DeltaLog,其实就是元数据管理。通过该套元数据管理,我们可以很容易的将Compaction,Update,Upsert,Delete等功能加上,因为本质上就是调用元数据管理API完成数据最后的提交。
代码使用方式
Upsert支持流式和批的方式进行更新。因为受限于Spark的 SQL 解析,大家可以使用Dataframe 或者 MLSQL的方式进行调用。
批使用方式:
val log = DeltaLog.forTable(spark, outputDir.getCanonicalPath) val upsertTableInDelta = UpsertTableInDelta(data, Option(SaveMode.Append), None, log, new DeltaOptions(Map[String, String](), df.sparkSession.sessionState.conf), Seq(), Map("idCols" -> "key,value")) val items = upsertTableInDelta.run(df.sparkSession)
唯一需要大家指定的就是 idCols, 也就是你的表的唯一主键组合是啥。比如我这里是key,value两个字段组成唯一主键。
流使用技巧是一模一样的,只需要做一点点修改:
UpsertTableInDelta(data, None, Option(OutputMode.Append())
UpsertTableInDelta 根据你设置的是SaveMode还是OutputMode来看是不是流写入。
MLSQL 使用方式
写入数据到Kafka:
set abc=''' { "x": 100, "y": 201, "z": 204 ,"dataType":"A group"} '''; load jsonStr.`abc` as table1; select to_json(struct(*)) as value from table1 as table2; save append table2 as kafka.`wow` where kafka.bootstrap.servers="127.0.0.1:9092";
使用流程序消费Kafka:
-- the stream name, should be uniq. set streamName="kafkaStreamExample"; !kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow; -- convert table as stream source load kafka.`wow` options kafka.bootstrap.servers="127.0.0.1:9092" and failOnDataLoss="false" as newkafkatable1; -- aggregation select * from newkafkatable1 as table21; -- output the the result to console. save append table21 as rate.`/tmp/delta/wow-0` options mode="Append" and idCols="x,y" and duration="5" and checkpointLocation="/tmp/s-cpl6";
同样的,我们设置了idCols,指定x,y为唯一主键。
然后查看对应的记录变化:
load delta.`/tmp/delta/wow-0` as show_table1; select * from show_table1 where x=100 and z=204 as output;
你会惊喜的发现数据可以更新了。
实现剖析
一共涉及到三个新文件:
org.apache.spark.sql.delta.commands.UpsertTableInDelta org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource org.apache.spark.sql.delta.sources.MLSQLDeltaSink
对应源码参看我fork的delta项目: mlsql-delta
https://github.com/allwefantasy/delta
第一个文件是实现核心的更新逻辑。第二个第三个支持Spark的datasource API来进行批和流的写入。
这篇文章我们主要介绍UpsertTableInDelta。
case class UpsertTableInDelta(_data: Dataset[_], saveMode: Option[SaveMode], outputMode: Option[OutputMode], deltaLog: DeltaLog, options: DeltaOptions, partitionColumns: Seq[String], configuration: Map[String, String] ) extends RunnableCommand with ImplicitMetadataOperation with DeltaCommand with DeltaCommandsFun {
UpsertTableInDelta 集成了delta一些必要的基础类,ImplicitMetadataOperation,DeltaCommand,主要是为了方便得到一些操作日志写入的方法。
saveMode 和 outputMode 主要是为了方便区分现在是流在写,还是批在写,以及写的模式是什么。
assert(configuration.contains(UpsertTableInDelta.ID_COLS), "idCols is required ") if (outputMode.isDefined) { assert(outputMode.get == OutputMode.Append(), "append is required ") } if (saveMode.isDefined) { assert(saveMode.get == SaveMode.Append, "append is required ") }
限制条件是必须都是用Append模式,并且idCols是必须存在的。
saveMode match { case Some(mode) => deltaLog.withNewTransaction { txn => actions = upsert(txn, sparkSession) val operation = DeltaOperations.Write(SaveMode.Overwrite, Option(partitionColumns), options.replaceWhere) txn.commit(actions, operation) } case None => outputMode match {
如果是批写入,那么直接调用deltaLog开启一个新的事物,然后进行upsert操作。同时进行commit,然后就搞定了。
如果是流写入则麻烦一点,
case None => outputMode match { case Some(mode) => val queryId = sparkSession.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY) assert(queryId != null) if (SchemaUtils.typeExistsRecursively(_data.schema)(_.isInstanceOf[NullType])) { throw DeltaErrors.streamWriteNullTypeException } val txn = deltaLog.startTransaction() // Streaming sinks can't blindly overwrite schema. // See Schema Management design doc for details updateMetadata( txn, _data, partitionColumns, configuration = Map.empty, false) val currentVersion = txn.txnVersion(queryId) val batchId = configuration(UpsertTableInDelta.BATCH_ID).toLong if (currentVersion >= batchId) { logInfo(s"Skipping already complete epoch $batchId, in query $queryId") } else { actions = upsert(txn, sparkSession) val setTxn = SetTransaction(queryId, batchId, Some(deltaLog.clock.getTimeMillis())) :: Nil val info = DeltaOperations.StreamingUpdate(outputMode.get, queryId, batchId) txn.commit(setTxn ++ actions, info) } } }
首选我们获取queryId,因为在delta里需要使用queryId获取事务ID(batchId),并且最后写完成之后的会额外写入一些数据到元数据里,也需要queryId。
updateMetadata 主要是为了检测schema信息,譬如如果stream 是complte模式,那么是直接覆盖的,而如果是其他模式,则需要做schema合并。
如果我们发现当前事务ID>batchId,说明这个已经运行过了,跳过。如果没有,则使用upsert进行实际的操作。最后设置一些额外的信息提交。
upsert 方法
upsert的基本逻辑是:
-
获取idCols是不是有分区字段,如果有,先根据分区字段过滤出所有的文件。
-
如果没有分区字段,则得到所有的文件
-
将这些文件转化为dataframe
-
和新写入的dataframe进行join操作,得到受影响的行(需要更新的行),然后得到这些行所在的文件。
-
获取这些文件里没有无需变更的记录,写成新文件。
-
删除这些文件
-
将新数据写成新文件
4,5两个步骤需要对数据进行join,但是在Spark里静态表并不能直接join流表,所以我们需要将流表转化为静态表。
def upsert(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = { // if _data is stream dataframe, we should convert it to normal // dataframe and so we can join it later val data = if (_data.isStreaming) { class ConvertStreamDataFrame[T](encoder: ExpressionEncoder[T]) { def toBatch(data: Dataset[_]): Dataset[_] = { val resolvedEncoder = encoder.resolveAndBind( data.logicalPlan.output, data.sparkSession.sessionState.analyzer) val rdd = data.queryExecution.toRdd.map(resolvedEncoder.fromRow)(encoder.clsTag) val ds = data.sparkSession.createDataset(rdd)(encoder) ds } } new ConvertStreamDataFrame[Row](_data.asInstanceOf[Dataset[Row]].exprEnc).toBatch(_data) } else _data
上述代码就是将流表转化为普通静态表。接着我们需要拿到主键字段里满足分区字段的字段,然后获取他们的min/max值
val minMaxColumns = partitionColumnsInIdCols.flatMap { column => Seq(F.lit(column), F.min(column).as(s"${column}_min"), F.max(F.max(s"${column}_max"))) }.toArray val minxMaxKeyValues = data.select(minMaxColumns: _*).collect()
最后得到过滤条件:
// build our where statement val whereStatement = minxMaxKeyValues.map { row => val column = row.getString(0) val minValue = row.get(1).toString val maxValue = row.get(2).toString if (isNumber(column)) { s"${column} >= ${minValue} and ${maxValue} >= ${column}" } else { s"""${column} >= "${minValue}" and "${maxValue}" >= ${column}""" } logInfo(s"whereStatement: ${whereStatement.mkString(" and ")}") val predicates = parsePartitionPredicates(sparkSession, whereStatement.mkString(" and ")) Some(predicates)
现在可以得到所有相关的文件了:
val filterFilesDataSet = partitionFilters match { case None => snapshot.allFiles case Some(predicates) => DeltaLog.filterFileList( metadata.partitionColumns, snapshot.allFiles.toDF(), predicates).as[AddFile] }
将这些文件转化为dataframe,并且将里面的每条记录都带上所属文件的路径:
// Again, we collect all files to driver, // this may impact performance and even make the driver OOM when // the number of files are very huge. // So please make sure you have configured the partition columns or make compaction frequently val filterFiles = filterFilesDataSet.collect val dataInTableWeShouldProcess = deltaLog.createDataFrame(snapshot, filterFiles, false) val dataInTableWeShouldProcessWithFileName = dataInTableWeShouldProcess. withColumn(UpsertTableInDelta.FILE_NAME, F.input_file_name())
通过Join获取哪些文件里面的记录需要被更新:
// get all files that are affected by the new data(update) val filesAreAffected = dataInTableWeShouldProcessWithFileName.join(data, usingColumns = idColsList, joinType = "inner").select(UpsertTableInDelta.FILE_NAME). distinct().collect().map(f => f.getString(0)) val tmpFilePathSet = filesAreAffected.map(f => f.split("/").last).toSet val filesAreAffectedWithDeltaFormat = filterFiles.filter { file => tmpFilePathSet.contains(file.path.split("/").last) } val deletedFiles = filesAreAffectedWithDeltaFormat.map(_.remove)
将需要删除的文件里没有改变的记录单独拿出来写成新文件:
// we should get not changed records in affected files and write them back again val affectedRecords = deltaLog.createDataFrame(snapshot, filesAreAffectedWithDeltaFormat, false) val notChangedRecords = affectedRecords.join(data, usingColumns = idColsList, joinType = "leftanti"). drop(F.col(UpsertTableInDelta.FILE_NAME)) val notChangedRecordsNewFiles = txn.writeFiles(notChangedRecords, Some(options))
最后将我们新增数据写入:
val newFiles = txn.writeFiles(data, Some(options))
因为第一次写入的时候,schema还没有形成,所以不能走upsert逻辑,而是需要直接写入,这里我偷懒,没有把逻辑写在UpsertTableInDelta里,而是写在了MLSQLDeltaSink里:
override def addBatch(batchId: Long, data: DataFrame): Unit = { val metadata = deltaLog.snapshot.metadata val readVersion = deltaLog.snapshot.version val isInitial = readVersion < 0 if (!isInitial && parameters.contains(UpsertTableInDelta.ID_COLS)) { UpsertTableInDelta(data, None, Option(outputMode), deltaLog, new DeltaOptions(Map[String, String](), sqlContext.sparkSession.sessionState.conf), Seq(), Map(UpsertTableInDelta.ID_COLS -> parameters(UpsertTableInDelta.ID_COLS), UpsertTableInDelta.BATCH_ID -> batchId.toString )).run(sqlContext.sparkSession) } else { super.addBatch(batchId, data) } }
总结
Delta 具备了数据的增删改查能力,同时流批共享,并发修改控制,加上小文件compaction功能,基本解决了我们之前在使用流计算遇到的大部分问题。后续持续优化delta的查询功能,相信前景无限。
大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛http://hbase.group,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号:hbasegroup),非常欢迎大家积极投稿。
本群为HBase+Spark技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动
点击链接钉钉入群:https://dwz.cn/Fvqv066s或扫码进群
本群为Cassandra技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动
Cassandra 社区钉钉大群:https://c.tb.cn/F3.ZRTY0o
Cassandra 技术社区微信公众号:
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
拆掉互联网那堵墙
庄良基 / 经济日报出版社 / 2014-6 / 25.80
都在说道互联网、说道电子商务、说道移动APP、说道微信、说道互联网金融......我们该如何认识互联网?中小微企业该如何借力互联网?互联网很神秘吗?很高深莫测吗? 其实互联网并没有什么神秘的,也没有什么高深莫测的!互联网无非是人类发明的工具而已,既然是工具,我们就一定可以驾驭和使用它。既然可以双重使用,就理当让所有有人都容易掌握并轻松驾驭。 互联网离我们很远吗?互联网界的成功故事都是那......一起来看看 《拆掉互联网那堵墙》 这本书的介绍吧!