内容简介:Delta Lake 是一个存储层,为 Apache Spark 和大数据 workloads 提供 ACID 事务能力。Delta Lake 0.3.0 发布,支持多个语句,以便于更新和删除DeltaLake 表中的数据,具体如下: 从表中删除数据 可以从 DeltaLak...
Delta Lake 是一个存储层,为 Apache Spark 和大数据 workloads 提供 ACID 事务能力。Delta Lake 0.3.0 发布,支持多个语句,以便于更新和删除DeltaLake 表中的数据,具体如下:
从表中删除数据
可以从 DeltaLake 表中删除相匹配的数据,例如,要删除 2017 年之前所有事件,可以运行以下命令:
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToEventsTable)
deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.delete($"date" < "2017-01-01")
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToEventsTable);
deltaTable.delete("date < '2017-01-01'"); // predicate using SQL formatted string
deltaTable.delete(functions.col("date").lt(functions.lit("2017-01-01")));
DELETE 从 Delta Lake 表的最新版本中删除数据,但在显式清除旧版本之前不会将其从物理存储中删除。
更新一张表
可以更新与 Delta Lake 表中相匹配的数据。例如,要修复事件类型中的拼写错误,可以运行以下命令:
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToEventsTable)
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string
"eventType = 'clck'",
Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.update( // predicate using Spark SQL functions and implicits
$"eventType" = "clck"),
Map("eventType" -> lit("click"));
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToEventsTable);
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string
"eventType = 'clck'",
new HashMap<String, String>() {{
put("eventType", "'click'");
}}
);
deltaTable.update( // predicate using Spark SQL functions
functions.col(eventType).eq("clck"),
new HashMap<String, Column>() {{
put("eventType", functions.lit("click"));
}}
);
使用 Merge 向上插入到表中
可以使用 merge 操作将数据从 SPark DataFrame 插入到 Delta Lake 表中。此操作类似于 SQL Merge 命令,但对更新、插入和删除中的删除和附加条件有其他的支持。
假设有一个 Spark DataFrame,它包含 eventId 事件的新数据。其中一些事件可能已经出现在 Events 表中。因此,当想要将新数据合并到 Events 表中时,你需要更新匹配的行(即已存在的 eventId)并插入新的行(即 eventId 不存在)。运行以下操作:
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, pathToEventsTable)
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
Dataset<Row> updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, pathToEventsTable)
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched()
.updateExpr(
new HashMap<String, String>() {{
put("data" -> "events.data");
}})
.whenNotMatched()
.insertExpr(
new HashMap<String, String>() {{
put("date", "updates.date");
put("eventId", "updates.eventId");
put("data", "updates.data");
}})
.execute();
你应该向合并条件中添加尽可能多的信息,以减少工作量和减少事务冲突的可能性。关于如何在不同场景中使用合并,请看发布说明。
发布说明:
https://docs.delta.io/0.3.0/delta-update.html
以上所述就是小编给大家介绍的《Delta Lake 0.3.0 发布,大型数据集上的 DML》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 数据齿轮(DataGear)数据库管理系统 1.1 版本发布
- DataGear 1.2.0 发布,数据齿轮数据库管理系统
- 数据齿轮(DataGear)数据库管理系统 v1.0 版本发布
- 数据齿轮(DataGear)数据库管理系统 v1.1.1 发布
- 数据齿轮(DataGear)数据库管理系统 v1.3.0 发布
- 数据齿轮(DataGear)数据库管理系统 v1.4.0 发布
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Learn Python 3 the Hard Way
Zed A. Shaw / Addison / 2017-7-7 / USD 30.74
You Will Learn Python 3! Zed Shaw has perfected the world’s best system for learning Python 3. Follow it and you will succeed—just like the millions of beginners Zed has taught to date! You bring t......一起来看看 《Learn Python 3 the Hard Way》 这本书的介绍吧!