内容简介:本文主要研究一下flink Table的Set Operationsflink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalaflink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
序
本文主要研究一下flink Table的Set Operations
实例
Union
Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "a, b, c"); Table result = left.union(right);
- union方法类似 sql 的union
UnionAll
Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "a, b, c"); Table result = left.unionAll(right);
- unionAll方法类似sql的union all
Intersect
Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "d, e, f"); Table result = left.intersect(right);
- intersect方法类似sql的intersect
IntersectAll
Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "d, e, f"); Table result = left.intersectAll(right);
- intersectAll方法类似sql的intersect all
Minus
Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "a, b, c"); Table result = left.minus(right);
- minus方法类似sql的except
MinusAll
Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "a, b, c"); Table result = left.minusAll(right);
- minusAll方法类似sql的except all
In
Table left = ds1.toTable(tableEnv, "a, b, c"); Table right = ds2.toTable(tableEnv, "a"); // using implicit registration Table result = left.select("a, b, c").where("a.in(" + right + ")"); // using explicit registration tableEnv.registerTable("RightTable", right); Table result = left.select("a, b, c").where("a.in(RightTable)");
- in方法类似sql的in
Table
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class Table( private[flink] val tableEnv: TableEnvironment, private[flink] val logicalPlan: LogicalNode) { //...... def union(right: Table): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") } new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv)) } def unionAll(right: Table): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") } new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv)) } def intersect(right: Table): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException( "Only tables from the same TableEnvironment can be intersected.") } new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv)) } def intersectAll(right: Table): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException( "Only tables from the same TableEnvironment can be intersected.") } new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv)) } def minus(right: Table): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be " + "subtracted.") } new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false) .validate(tableEnv)) } def minusAll(right: Table): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be " + "subtracted.") } new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true) .validate(tableEnv)) } //...... }
- union及unionAll使用的是Union,intersect及intersectAll使用的是Intersect,minus及minusAll使用的是Minus
Union
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode { override def output: Seq[Attribute] = left.output override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { left.construct(relBuilder) right.construct(relBuilder) relBuilder.union(all) } override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) { failValidation(s"Union on stream tables is currently not supported.") } val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union] if (left.output.length != right.output.length) { failValidation(s"Union two tables of different column sizes:" + s" ${left.output.size} and ${right.output.size}") } val sameSchema = left.output.zip(right.output).forall { case (l, r) => l.resultType == r.resultType } if (!sameSchema) { failValidation(s"Union two tables of different schema:" + s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" + s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]") } resolvedUnion } }
- Union继承了BinaryNode,其construct方法通过relBuilder.union来构建union操作
Intersect
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode { override def output: Seq[Attribute] = left.output override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { left.construct(relBuilder) right.construct(relBuilder) relBuilder.intersect(all) } override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { failValidation(s"Intersect on stream tables is currently not supported.") } val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect] if (left.output.length != right.output.length) { failValidation(s"Intersect two tables of different column sizes:" + s" ${left.output.size} and ${right.output.size}") } // allow different column names between tables val sameSchema = left.output.zip(right.output).forall { case (l, r) => l.resultType == r.resultType } if (!sameSchema) { failValidation(s"Intersect two tables of different schema:" + s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" + s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]") } resolvedIntersect } }
- Intersect继承了BinaryNode,其construct方法通过relBuilder.intersect来构建intersect操作
Minus
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode { override def output: Seq[Attribute] = left.output override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { left.construct(relBuilder) right.construct(relBuilder) relBuilder.minus(all) } override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { failValidation(s"Minus on stream tables is currently not supported.") } val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus] if (left.output.length != right.output.length) { failValidation(s"Minus two table of different column sizes:" + s" ${left.output.size} and ${right.output.size}") } val sameSchema = left.output.zip(right.output).forall { case (l, r) => l.resultType == r.resultType } if (!sameSchema) { failValidation(s"Minus two table of different schema:" + s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" + s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]") } resolvedMinus } }
- Minus继承了BinaryNode,其construct方法通过relBuilder.minus来构建minus操作
小结
in在where子句中
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
人人都是架构师:分布式系统架构落地与瓶颈突破
高翔龙 / 电子工业出版社 / 2017-5 / 69
《人人都是架构师:分布式系统架构落地与瓶颈突破》并没有过多渲染系统架构的理论知识,而是切切实实站在开发一线角度,为各位读者诠释了大型网站在架构演变过程中出现一系列技术难题时的解决方案。《人人都是架构师:分布式系统架构落地与瓶颈突破》首先从分布式服务案例开始介绍,重点为大家讲解了大规模服务化场景下企业应该如何实施服务治理;然后在大流量限流/消峰案例中,笔者为大家讲解了应该如何有效地对流量实施管制,避......一起来看看 《人人都是架构师:分布式系统架构落地与瓶颈突破》 这本书的介绍吧!