内容简介:Delta Lake 是一个存储层,为 Apache Spark 和大数据 workloads 提供 ACID 事务能力。Delta Lake 0.3.0 发布,支持多个语句,以便于更新和删除DeltaLake 表中的数据,具体如下: 从表中删除数据 可以从 DeltaLak...
Delta Lake 是一个存储层,为 Apache Spark 和大数据 workloads 提供 ACID 事务能力。Delta Lake 0.3.0 发布,支持多个语句,以便于更新和删除DeltaLake 表中的数据,具体如下:
从表中删除数据
可以从 DeltaLake 表中删除相匹配的数据,例如,要删除 2017 年之前所有事件,可以运行以下命令:
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToEventsTable)
deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.delete($"date" < "2017-01-01")
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToEventsTable);
deltaTable.delete("date < '2017-01-01'"); // predicate using SQL formatted string
deltaTable.delete(functions.col("date").lt(functions.lit("2017-01-01")));
DELETE 从 Delta Lake 表的最新版本中删除数据,但在显式清除旧版本之前不会将其从物理存储中删除。
更新一张表
可以更新与 Delta Lake 表中相匹配的数据。例如,要修复事件类型中的拼写错误,可以运行以下命令:
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToEventsTable)
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string
"eventType = 'clck'",
Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.update( // predicate using Spark SQL functions and implicits
$"eventType" = "clck"),
Map("eventType" -> lit("click"));
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToEventsTable);
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string
"eventType = 'clck'",
new HashMap<String, String>() {{
put("eventType", "'click'");
}}
);
deltaTable.update( // predicate using Spark SQL functions
functions.col(eventType).eq("clck"),
new HashMap<String, Column>() {{
put("eventType", functions.lit("click"));
}}
);
使用 Merge 向上插入到表中
可以使用 merge 操作将数据从 SPark DataFrame 插入到 Delta Lake 表中。此操作类似于 SQL Merge 命令,但对更新、插入和删除中的删除和附加条件有其他的支持。
假设有一个 Spark DataFrame,它包含 eventId 事件的新数据。其中一些事件可能已经出现在 Events 表中。因此,当想要将新数据合并到 Events 表中时,你需要更新匹配的行(即已存在的 eventId)并插入新的行(即 eventId 不存在)。运行以下操作:
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, pathToEventsTable)
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
Dataset<Row> updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, pathToEventsTable)
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched()
.updateExpr(
new HashMap<String, String>() {{
put("data" -> "events.data");
}})
.whenNotMatched()
.insertExpr(
new HashMap<String, String>() {{
put("date", "updates.date");
put("eventId", "updates.eventId");
put("data", "updates.data");
}})
.execute();
你应该向合并条件中添加尽可能多的信息,以减少工作量和减少事务冲突的可能性。关于如何在不同场景中使用合并,请看发布说明。
发布说明:
https://docs.delta.io/0.3.0/delta-update.html
以上所述就是小编给大家介绍的《Delta Lake 0.3.0 发布,大型数据集上的 DML》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 数据齿轮(DataGear)数据库管理系统 1.1 版本发布
- DataGear 1.2.0 发布,数据齿轮数据库管理系统
- 数据齿轮(DataGear)数据库管理系统 v1.0 版本发布
- 数据齿轮(DataGear)数据库管理系统 v1.1.1 发布
- 数据齿轮(DataGear)数据库管理系统 v1.3.0 发布
- 数据齿轮(DataGear)数据库管理系统 v1.4.0 发布
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
世界是平的(3.0版)
[美] 托马斯·弗里德曼 / 何帆、肖莹莹、郝正非 / 湖南科学技术出版社 / 2008-9 / 58.00元
世界变得平坦,是不是迫使我们跑得更快才能拥有一席之地? 在《世界是平的》中,托马斯·弗里德曼描述了当代世界发生的重大变化。科技和通信领域如闪电般迅速的进步,使全世界的人们可以空前地彼此接近——在印度和中国创造爆炸式增长的财富;挑战我们中的一些人,比他们更快占领地盘。3.0版新增两章,更新了报告和注释方面的内容,这些内容均采自作者考察世界各地特别是整个美国中心地带的见闻,在美国本土,世界的平坦......一起来看看 《世界是平的(3.0版)》 这本书的介绍吧!