内容简介:flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/
stream.join(otherStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner>) .apply(<JoinFunction>) 复制代码
- 这里首先调用join,与另外一个stream合并,返回的是JoinedStreams,之后就可以调用JoinedStreams的where操作来构建Where对象构造条件;Where有equalTo操作可以构造EqualTo,而EqualTo有window操作可以构造WithWindow,而WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作
DataStream.join
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public public class DataStream<T> { //...... /** * Creates a join operation. See {@link JoinedStreams} for an example of how the keys * and window can be specified. */ public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) { return new JoinedStreams<>(this, otherStream); } //...... } 复制代码
- DataStream提供了join方法,用于执行join操作,它返回的是JoinedStreams
JoinedStreams
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@Public public class JoinedStreams<T1, T2> { /** The first input stream. */ private final DataStream<T1> input1; /** The second input stream. */ private final DataStream<T2> input2; public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) { this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); } public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) { requireNonNull(keySelector); final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); return where(keySelector, keyType); } public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) { requireNonNull(keySelector); requireNonNull(keyType); return new Where<>(input1.clean(keySelector), keyType); } //...... } 复制代码
- JoinedStreams主要是提供where操作来构建Where对象
Where
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@Public public class Where<KEY> { private final KeySelector<T1, KEY> keySelector1; private final TypeInformation<KEY> keyType; Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) { this.keySelector1 = keySelector1; this.keyType = keyType; } public EqualTo equalTo(KeySelector<T2, KEY> keySelector) { requireNonNull(keySelector); final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); return equalTo(keySelector, otherKey); } public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) { requireNonNull(keySelector); requireNonNull(keyType); if (!keyType.equals(this.keyType)) { throw new IllegalArgumentException("The keys for the two inputs are not equal: " + "first key = " + this.keyType + " , second key = " + keyType); } return new EqualTo(input2.clean(keySelector)); } //...... } 复制代码
- Where对象主要提供equalTo操作用于构建EqualTo对象
EqualTo
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@Public public class EqualTo { private final KeySelector<T2, KEY> keySelector2; EqualTo(KeySelector<T2, KEY> keySelector2) { this.keySelector2 = requireNonNull(keySelector2); } /** * Specifies the window on which the join operation works. */ @PublicEvolving public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null); } } 复制代码
- EqualTo对象提供window操作用于构建WithWindow对象
WithWindow
/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@Public public static class WithWindow<T1, T2, KEY, W extends Window> { private final DataStream<T1> input1; private final DataStream<T2> input2; private final KeySelector<T1, KEY> keySelector1; private final KeySelector<T2, KEY> keySelector2; private final TypeInformation<KEY> keyType; private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner; private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger; private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor; private final Time allowedLateness; private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream; @PublicEvolving protected WithWindow(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2, TypeInformation<KEY> keyType, WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, Time allowedLateness) { this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); this.keySelector1 = requireNonNull(keySelector1); this.keySelector2 = requireNonNull(keySelector2); this.keyType = requireNonNull(keyType); this.windowAssigner = requireNonNull(windowAssigner); this.trigger = trigger; this.evictor = evictor; this.allowedLateness = allowedLateness; } @PublicEvolving public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, newTrigger, evictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, newEvictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, evictor, newLateness); } public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, JoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, input1.getType(), input2.getType(), "Join", false); return apply(function, resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) { return (SingleOutputStreamOperator<T>) apply(function); } public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function); coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream .apply(new FlatJoinCoGroupFunction<>(function), resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { return (SingleOutputStreamOperator<T>) apply(function, resultType); } public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, FlatJoinFunction.class, 0, 1, 2, new int[]{2, 0}, input1.getType(), input2.getType(), "Join", false); return apply(function, resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) { return (SingleOutputStreamOperator<T>) apply(function); } public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function); coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream .apply(new JoinCoGroupFunction<>(function), resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { return (SingleOutputStreamOperator<T>) apply(function, resultType); } @VisibleForTesting Time getAllowedLateness() { return allowedLateness; } @VisibleForTesting CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() { return coGroupedWindowedStream; } } 复制代码
with操作被标记为废弃 JoinFunction使用JoinCoGroupFunction包装,FlatJoinFunction使用FlatJoinCoGroupFunction包装
JoinFunction
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/JoinFunction.java
@Public @FunctionalInterface public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable { /** * The join method, called once per joined pair of elements. * * @param first The element from first input. * @param second The element from second input. * @return The resulting element. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ OUT join(IN1 first, IN2 second) throws Exception; } 复制代码
- JoinFunction继承了Function、Serializable,它定义了join操作,默认是inner join的语义,如果需要outer join,可以使用CoGroupFunction
FlatJoinFunction
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/FlatJoinFunction.java
@Public @FunctionalInterface public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable { /** * The join method, called once per joined pair of elements. * * @param first The element from first input. * @param second The element from second input. * @param out The collector used to return zero, one, or more elements. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception; } 复制代码
- FlatJoinFunction继承了Function、Serializable,它定义了join操作,默认是inner join的语义,如果需要outer join,可以使用CoGroupFunction;与JoinFunction的join方法不同,FlatJoinFunction的join方法多了Collector参数,可以用来发射0条、1条或者多条数据,所以是Flat命名
CoGroupedStreams
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@Public public class CoGroupedStreams<T1, T2> { //...... @Public public static class WithWindow<T1, T2, KEY, W extends Window> { private final DataStream<T1> input1; private final DataStream<T2> input2; private final KeySelector<T1, KEY> keySelector1; private final KeySelector<T2, KEY> keySelector2; private final TypeInformation<KEY> keyType; private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner; private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger; private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor; private final Time allowedLateness; private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream; protected WithWindow(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2, TypeInformation<KEY> keyType, WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, Time allowedLateness) { this.input1 = input1; this.input2 = input2; this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; this.keyType = keyType; this.windowAssigner = windowAssigner; this.trigger = trigger; this.evictor = evictor; this.allowedLateness = allowedLateness; } @PublicEvolving public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, newTrigger, evictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, newEvictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, evictor, newLateness); } public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes( function, input1.getType(), input2.getType(), "CoGroup", false); return apply(function, resultType); } public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function); UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType()); UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2); DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1 .map(new Input1Tagger<T1, T2>()) .setParallelism(input1.getParallelism()) .returns(unionType); DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2 .map(new Input2Tagger<T1, T2>()) .setParallelism(input2.getParallelism()) .returns(unionType); DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2); // we explicitly create the keyed stream to manually pass the key type information in windowedStream = new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType) .window(windowAssigner); if (trigger != null) { windowedStream.trigger(trigger); } if (evictor != null) { windowedStream.evictor(evictor); } if (allowedLateness != null) { windowedStream.allowedLateness(allowedLateness); } return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); } //...... } //...... } 复制代码
- CoGroupedStreams的整体类结构跟JoinedStreams很像,CoGroupedStreams提供where操作来构建Where对象;Where对象主要提供equalTo操作用于构建EqualTo对象;EqualTo对象提供window操作用于构建WithWindow对象;WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作;其中一个不同的地方是CoGroupedStreams定义的WithWindow对象的apply操作接收的Function是CoGroupFunction类型,而JoinedStreams定义的WithWindow对象的apply操作接收的Function类型是JoinFunction或FlatJoinFunction
CoGroupFunction
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.java
@Public @FunctionalInterface public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable { /** * This method must be implemented to provide a user implementation of a * coGroup. It is called for each pair of element groups where the elements share the * same key. * * @param first The records from the first input. * @param second The records from the second. * @param out A collector to return elements. * * @throws Exception The function may throw Exceptions, which will cause the program to cancel, * and may trigger the recovery logic. */ void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception; } 复制代码
- CoGroupFunction继承了Function、Serializable,它定义了coGroup操作,可以用来实现outer join,其参数使用的是Iterable,而JoinFunction与FlatJoinFunction的join参数使用的是单个对象类型
WrappingFunction
flink-java-1.7.0-sources.jar!/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@Internal public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction { private static final long serialVersionUID = 1L; protected T wrappedFunction; protected WrappingFunction(T wrappedFunction) { this.wrappedFunction = wrappedFunction; } @Override public void open(Configuration parameters) throws Exception { FunctionUtils.openFunction(this.wrappedFunction, parameters); } @Override public void close() throws Exception { FunctionUtils.closeFunction(this.wrappedFunction); } @Override public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t); } public T getWrappedFunction () { return this.wrappedFunction; } } 复制代码
- WrappingFunction继承了AbstractRichFunction,这里它覆盖了父类的open、close、setRuntimeContext方法,用于管理wrappedFunction
JoinCoGroupFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
/** * CoGroup function that does a nested-loop join to get the join result. */ private static class JoinCoGroupFunction<T1, T2, T> extends WrappingFunction<JoinFunction<T1, T2, T>> implements CoGroupFunction<T1, T2, T> { private static final long serialVersionUID = 1L; public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) { super(wrappedFunction); } @Override public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { for (T1 val1: first) { for (T2 val2: second) { out.collect(wrappedFunction.join(val1, val2)); } } } } 复制代码
- JoinCoGroupFunction继承了WrappingFunction,同时实现CoGroupFunction接口定义的coGroup方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行wrappedFunction.join,然后发射join数据
- JoinedStreams定义了私有静态类JoinCoGroupFunction,JoinedStreams的WithWindow对象的apply方法内部使用它将JoinFunction进行包装,然后去调用CoGroupedStreams的WithWindow的apply方法
- JoinFunction定义的join方法,接收的是两个对象类型参数,而JoinCoGroupFunction定义的coGroup方法,接收的两个Iterable类型参数
FlatJoinCoGroupFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
/** * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version) */ private static class FlatJoinCoGroupFunction<T1, T2, T> extends WrappingFunction<FlatJoinFunction<T1, T2, T>> implements CoGroupFunction<T1, T2, T> { private static final long serialVersionUID = 1L; public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) { super(wrappedFunction); } @Override public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { for (T1 val1: first) { for (T2 val2: second) { wrappedFunction.join(val1, val2, out); } } } } 复制代码
- FlatJoinCoGroupFunction继承了WrappingFunction,同时实现CoGroupFunction接口定义的coGroup方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行wrappedFunction.join,然后发射join数据
- JoinedStreams定义了私有静态类FlatJoinCoGroupFunction,JoinedStreams的WithWindow对象的apply方法内部使用它将FlatJoinFunction进行包装,然后去调用CoGroupedStreams的WithWindow的apply方法
- FlatJoinFunction定义的join方法,接收的是两个对象类型参数,而FlatJoinCoGroupFunction定义的coGroup方法,接收的两个Iterable类型参数
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 聊聊jdbc的batch操作
- 聊聊flink DataStream的split操作
- 聊聊flink DataStream的iterate操作
- 聊聊flink Table的where及filter操作
- 聊聊从逻辑门到操作系统的计算机
- 聊聊动态规划(2) -- 特征
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Effective Objective-C 2.0
Matt Galloway / 爱飞翔 / 机械工业出版社 / 2014-1 / 69.00元
《effective objective-c 2.0:编写高质量ios与os x代码的52个有效方法》是世界级c++开发大师scott meyers亲自担当顾问编辑的“effective software development series”系列丛书中的新作,amazon全五星评价。从语法、接口与api设计、内存管理、框架等7大方面总结和探讨了objective-c编程中52个鲜为人知和容易被忽......一起来看看 《Effective Objective-C 2.0》 这本书的介绍吧!