内容简介:本文主要研究一下flink Table的OrderBy及Limitflink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalaflink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
序
本文主要研究一下flink Table的OrderBy及Limit
实例
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");
Table in = tableEnv.fromDataSet(ds, "a, b, c");
// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5);
// skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);
// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
- orderBy方法类似 sql 的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch
Table
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class Table(
private[flink] val tableEnv: TableEnvironment,
private[flink] val logicalPlan: LogicalNode) {
//......
def orderBy(fields: String): Table = {
val parsedFields = ExpressionParser.parseExpressionList(fields)
orderBy(parsedFields: _*)
}
def orderBy(fields: Expression*): Table = {
val order: Seq[Ordering] = fields.map {
case o: Ordering => o
case e => Asc(e)
}
new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
}
def offset(offset: Int): Table = {
new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
}
def fetch(fetch: Int): Table = {
if (fetch < 0) {
throw new ValidationException("FETCH count must be equal or larger than 0.")
}
this.logicalPlan match {
case Limit(o, -1, c) =>
// replace LIMIT without FETCH by LIMIT with FETCH
new Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))
case Limit(_, _, _) =>
throw new ValidationException("FETCH is already defined.")
case _ =>
new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))
}
}
//......
}
-
Table的orderBy方法,支持String或Expression类型的参数,其中String类型最终是转为Expression类型;orderBy方法最后使用Sort重新创建了Table;offset及fetch方法,使用Limit重新创建了Table(
offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0)
Sort
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
child.construct(relBuilder)
relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)
}
override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
failValidation(s"Sort on stream tables is currently not supported.")
}
super.validate(tableEnv)
}
}
- Sort继承了UnaryNode,它的构造器接收Set类型的Ordering,其construct方法使用relBuilder.sort来构建sort条件
Ordering
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/ordering.scala
abstract class Ordering extends UnaryExpression {
override private[flink] def validateInput(): ValidationResult = {
if (!child.isInstanceOf[NamedExpression]) {
ValidationFailure(s"Sort should only based on field reference")
} else {
ValidationSuccess
}
}
}
case class Asc(child: Expression) extends Ordering {
override def toString: String = s"($child).asc"
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
child.toRexNode
}
override private[flink] def resultType: TypeInformation[_] = child.resultType
}
case class Desc(child: Expression) extends Ordering {
override def toString: String = s"($child).desc"
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.desc(child.toRexNode)
}
override private[flink] def resultType: TypeInformation[_] = child.resultType
}
- Ordering是一个抽象类,它有Asc及Desc两个子类
Limit
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
child.construct(relBuilder)
relBuilder.limit(offset, fetch)
}
override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
failValidation(s"Limit on stream tables is currently not supported.")
}
if (!child.isInstanceOf[Sort]) {
failValidation(s"Limit operator must be preceded by an OrderBy operator.")
}
if (offset < 0) {
failValidation(s"Offset should be greater than or equal to zero.")
}
super.validate(tableEnv)
}
}
- Limit继承了UnaryNode,它的构造器接收offset及fetch参数,它的construct方法通过relBuilder.limit来设置offset及fetch
小结
offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0
doc
以上所述就是小编给大家介绍的《聊聊flink Table的OrderBy及Limit》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
浪潮之巅(第三版)(上下册)
吴军 / 人民邮电出版社 / 2016-5 / 99.00元
一个企业的发展与崛起,绝非只是空有领导强人即可达成。任何的决策、同期的商业环境,都在都影响着企业的兴衰。《浪潮之巅》不只是一本历史书,除了讲述科技顶尖企业的发展规律,对于华尔街如何左右科技公司,以及金融风暴对科技产业的冲击,也多有着墨。此外,这本书也着力讲述很多尚在普及 或将要发生的,比如微博和云计算,以及对下一代互联网科技产业浪潮的判断和预测。因为在极度商业化的今天,科技的进步和商机是分不开的。......一起来看看 《浪潮之巅(第三版)(上下册)》 这本书的介绍吧!