内容简介:flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalaflink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scalacalcite-core-1.18.0-sources.jar!/org/apache/calcite/rex/RexNode.java
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class Table(
private[flink] val tableEnv: TableEnvironment,
private[flink] val logicalPlan: LogicalNode) {
//......
def where(predicate: String): Table = {
filter(predicate)
}
def where(predicate: Expression): Table = {
filter(predicate)
}
def filter(predicate: String): Table = {
val predicateExpr = ExpressionParser.parseExpression(predicate)
filter(predicateExpr)
}
def filter(predicate: Expression): Table = {
new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
}
//......
}
复制代码
- Table的where及filter操作均有两中方法,一种是String参数,一种是Expression参数;而where方法内部是调用filter方法;filter方法使用Filter(predicate, logicalPlan).validate(tableEnv)创建了新的Table;String参数最后是通过ExpressionParser.parseExpression方法转换为Expression类型
Filter
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
child.construct(relBuilder)
relBuilder.filter(condition.toRexNode(relBuilder))
}
override def validate(tableEnv: TableEnvironment): LogicalNode = {
val resolvedFilter = super.validate(tableEnv).asInstanceOf[Filter]
if (resolvedFilter.condition.resultType != BOOLEAN_TYPE_INFO) {
failValidation(s"Filter operator requires a boolean expression as input," +
s" but ${resolvedFilter.condition} is of type ${resolvedFilter.condition.resultType}")
}
resolvedFilter
}
}
复制代码
- Filter对象继承了UnaryNode,它覆盖了output、construct、validate等方法;construct方法先通过Expression.toRexNode将flink的Expression转换为Apache Calcite的RexNode,然后再执行Apache Calcite的RelBuilder的filter方法
RexNode
calcite-core-1.18.0-sources.jar!/org/apache/calcite/rex/RexNode.java
public abstract class RexNode {
//~ Instance fields --------------------------------------------------------
// Effectively final. Set in each sub-class constructor, and never re-set.
protected String digest;
//~ Methods ----------------------------------------------------------------
public abstract RelDataType getType();
public boolean isAlwaysTrue() {
return false;
}
public boolean isAlwaysFalse() {
return false;
}
public boolean isA(SqlKind kind) {
return getKind() == kind;
}
public boolean isA(Collection<SqlKind> kinds) {
return getKind().belongsTo(kinds);
}
public SqlKind getKind() {
return SqlKind.OTHER;
}
public String toString() {
return digest;
}
public abstract <R> R accept(RexVisitor<R> visitor);
public abstract <R, P> R accept(RexBiVisitor<R, P> visitor, P arg);
@Override public abstract boolean equals(Object obj);
@Override public abstract int hashCode();
}
复制代码
- RexNode是Row expression,可以通过RexBuilder来创建;它有很多子类,比如RexCall、RexVariable、RexFieldAccess等
RelBuilder.filter
calcite-core-1.18.0-sources.jar!/org/apache/calcite/tools/RelBuilder.java
public class RelBuilder {
protected final RelOptCluster cluster;
protected final RelOptSchema relOptSchema;
private final RelFactories.FilterFactory filterFactory;
private final RelFactories.ProjectFactory projectFactory;
private final RelFactories.AggregateFactory aggregateFactory;
private final RelFactories.SortFactory sortFactory;
private final RelFactories.ExchangeFactory exchangeFactory;
private final RelFactories.SortExchangeFactory sortExchangeFactory;
private final RelFactories.SetOpFactory setOpFactory;
private final RelFactories.JoinFactory joinFactory;
private final RelFactories.SemiJoinFactory semiJoinFactory;
private final RelFactories.CorrelateFactory correlateFactory;
private final RelFactories.ValuesFactory valuesFactory;
private final RelFactories.TableScanFactory scanFactory;
private final RelFactories.MatchFactory matchFactory;
private final Deque<Frame> stack = new ArrayDeque<>();
private final boolean simplify;
private final RexSimplify simplifier;
protected RelBuilder(Context context, RelOptCluster cluster,
RelOptSchema relOptSchema) {
this.cluster = cluster;
this.relOptSchema = relOptSchema;
if (context == null) {
context = Contexts.EMPTY_CONTEXT;
}
this.simplify = Hook.REL_BUILDER_SIMPLIFY.get(true);
this.aggregateFactory =
Util.first(context.unwrap(RelFactories.AggregateFactory.class),
RelFactories.DEFAULT_AGGREGATE_FACTORY);
this.filterFactory =
Util.first(context.unwrap(RelFactories.FilterFactory.class),
RelFactories.DEFAULT_FILTER_FACTORY);
this.projectFactory =
Util.first(context.unwrap(RelFactories.ProjectFactory.class),
RelFactories.DEFAULT_PROJECT_FACTORY);
this.sortFactory =
Util.first(context.unwrap(RelFactories.SortFactory.class),
RelFactories.DEFAULT_SORT_FACTORY);
this.exchangeFactory =
Util.first(context.unwrap(RelFactories.ExchangeFactory.class),
RelFactories.DEFAULT_EXCHANGE_FACTORY);
this.sortExchangeFactory =
Util.first(context.unwrap(RelFactories.SortExchangeFactory.class),
RelFactories.DEFAULT_SORT_EXCHANGE_FACTORY);
this.setOpFactory =
Util.first(context.unwrap(RelFactories.SetOpFactory.class),
RelFactories.DEFAULT_SET_OP_FACTORY);
this.joinFactory =
Util.first(context.unwrap(RelFactories.JoinFactory.class),
RelFactories.DEFAULT_JOIN_FACTORY);
this.semiJoinFactory =
Util.first(context.unwrap(RelFactories.SemiJoinFactory.class),
RelFactories.DEFAULT_SEMI_JOIN_FACTORY);
this.correlateFactory =
Util.first(context.unwrap(RelFactories.CorrelateFactory.class),
RelFactories.DEFAULT_CORRELATE_FACTORY);
this.valuesFactory =
Util.first(context.unwrap(RelFactories.ValuesFactory.class),
RelFactories.DEFAULT_VALUES_FACTORY);
this.scanFactory =
Util.first(context.unwrap(RelFactories.TableScanFactory.class),
RelFactories.DEFAULT_TABLE_SCAN_FACTORY);
this.matchFactory =
Util.first(context.unwrap(RelFactories.MatchFactory.class),
RelFactories.DEFAULT_MATCH_FACTORY);
final RexExecutor executor =
Util.first(context.unwrap(RexExecutor.class),
Util.first(cluster.getPlanner().getExecutor(), RexUtil.EXECUTOR));
final RelOptPredicateList predicates = RelOptPredicateList.EMPTY;
this.simplifier =
new RexSimplify(cluster.getRexBuilder(), predicates, executor);
}
public RelBuilder filter(RexNode... predicates) {
return filter(ImmutableList.copyOf(predicates));
}
public RelBuilder filter(Iterable<? extends RexNode> predicates) {
final RexNode simplifiedPredicates =
simplifier.simplifyFilterPredicates(predicates);
if (simplifiedPredicates == null) {
return empty();
}
if (!simplifiedPredicates.isAlwaysTrue()) {
final Frame frame = stack.pop();
final RelNode filter = filterFactory.createFilter(frame.rel, simplifiedPredicates);
stack.push(new Frame(filter, frame.fields));
}
return this;
}
//......
}
复制代码
-
RelBuilder在构造器里头创建了RelFactories.FilterFactory,它提供了两个filter方法,一个是RexNode变长数组参数,一个是RexNode类型的Iterable参数;filter方法首先使用simplifier.simplifyFilterPredicates将RexNode类型的Iterable转为simplifiedPredicates(
RexNode),之后只要simplifiedPredicates.isAlwaysTrue()为false,则取出deque中队首的Frame(LIFO (Last-In-First-Out) stacks),调用filterFactory.createFilter创建RelNode构造新的Frame,然后重新放入deque的队首
Frame
calcite-core-1.18.0-sources.jar!/org/apache/calcite/tools/RelBuilder.java
private static class Frame {
final RelNode rel;
final ImmutableList<Field> fields;
private Frame(RelNode rel, ImmutableList<Field> fields) {
this.rel = rel;
this.fields = fields;
}
private Frame(RelNode rel) {
String tableAlias = deriveAlias(rel);
ImmutableList.Builder<Field> builder = ImmutableList.builder();
ImmutableSet<String> aliases = tableAlias == null
? ImmutableSet.of()
: ImmutableSet.of(tableAlias);
for (RelDataTypeField field : rel.getRowType().getFieldList()) {
builder.add(new Field(aliases, field));
}
this.rel = rel;
this.fields = builder.build();
}
private static String deriveAlias(RelNode rel) {
if (rel instanceof TableScan) {
final List<String> names = rel.getTable().getQualifiedName();
if (!names.isEmpty()) {
return Util.last(names);
}
}
return null;
}
List<RelDataTypeField> fields() {
return Pair.right(fields);
}
}
复制代码
- Frame被存放于ArrayDeque中,实际是用于描述上一个操作的关系表达式以及table的别名怎么映射到row type中
RelFactories.FilterFactory.createFilter
calcite-core-1.18.0-sources.jar!/org/apache/calcite/rel/core/RelFactories.java
public interface FilterFactory {
/** Creates a filter. */
RelNode createFilter(RelNode input, RexNode condition);
}
private static class FilterFactoryImpl implements FilterFactory {
public RelNode createFilter(RelNode input, RexNode condition) {
return LogicalFilter.create(input, condition);
}
}
复制代码
-
FilterFactoryImpl实现了FilterFactory接口,createFilter方法执行的是LogicalFilter.create(input, condition),这里input是RelNode类型(
RelNode取的是Frame的rel),condition是RexNode类型
LogicalFilter
calcite-core-1.18.0-sources.jar!/org/apache/calcite/rel/logical/LogicalFilter.java
public final class LogicalFilter extends Filter {
private final ImmutableSet<CorrelationId> variablesSet;
/** Creates a LogicalFilter. */
public static LogicalFilter create(final RelNode input, RexNode condition) {
return create(input, condition, ImmutableSet.of());
}
/** Creates a LogicalFilter. */
public static LogicalFilter create(final RelNode input, RexNode condition,
ImmutableSet<CorrelationId> variablesSet) {
final RelOptCluster cluster = input.getCluster();
final RelMetadataQuery mq = cluster.getMetadataQuery();
final RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
.replaceIfs(RelCollationTraitDef.INSTANCE,
() -> RelMdCollation.filter(mq, input))
.replaceIf(RelDistributionTraitDef.INSTANCE,
() -> RelMdDistribution.filter(mq, input));
return new LogicalFilter(cluster, traitSet, input, condition, variablesSet);
}
//......
}
复制代码
- LogicalFilter继承了抽象类Filter,Filter继承了SingleRel,SingleRel继承了AbstractRelNode,AbstractRelNode实现了RelNode接口
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 聊聊jdbc的batch操作
- 聊聊flink DataStream的join操作
- 聊聊flink DataStream的split操作
- 聊聊flink DataStream的iterate操作
- 聊聊从逻辑门到操作系统的计算机
- 聊聊动态规划(2) -- 特征
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Numerical Recipes 3rd Edition
William H. Press、Saul A. Teukolsky、William T. Vetterling、Brian P. Flannery / Cambridge University Press / 2007-9-6 / GBP 64.99
Do you want easy access to the latest methods in scientific computing? This greatly expanded third edition of Numerical Recipes has it, with wider coverage than ever before, many new, expanded and upd......一起来看看 《Numerical Recipes 3rd Edition》 这本书的介绍吧!