聊聊flink DataStream的join操作

栏目: 编程工具 · 发布时间: 6年前

内容简介:flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/
stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)
复制代码
  • 这里首先调用join,与另外一个stream合并,返回的是JoinedStreams,之后就可以调用JoinedStreams的where操作来构建Where对象构造条件;Where有equalTo操作可以构造EqualTo,而EqualTo有window操作可以构造WithWindow,而WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作

DataStream.join

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {
	//......

	/**
	 * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
	 * and window can be specified.
	 */
	public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
		return new JoinedStreams<>(this, otherStream);
	}

	//......
}
复制代码
  • DataStream提供了join方法,用于执行join操作,它返回的是JoinedStreams

JoinedStreams

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

@Public
public class JoinedStreams<T1, T2> {

	/** The first input stream. */
	private final DataStream<T1> input1;

	/** The second input stream. */
	private final DataStream<T2> input2;

	public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
		this.input1 = requireNonNull(input1);
		this.input2 = requireNonNull(input2);
	}

	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
		requireNonNull(keySelector);
		final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
		return where(keySelector, keyType);
	}

	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType)  {
		requireNonNull(keySelector);
		requireNonNull(keyType);
		return new Where<>(input1.clean(keySelector), keyType);
	}

	//......
}
复制代码
  • JoinedStreams主要是提供where操作来构建Where对象

Where

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

@Public
	public class Where<KEY> {

		private final KeySelector<T1, KEY> keySelector1;
		private final TypeInformation<KEY> keyType;

		Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
			this.keySelector1 = keySelector1;
			this.keyType = keyType;
		}

		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
			requireNonNull(keySelector);
			final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
			return equalTo(keySelector, otherKey);
		}

		public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
			requireNonNull(keySelector);
			requireNonNull(keyType);

			if (!keyType.equals(this.keyType)) {
				throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
						"first key = " + this.keyType + " , second key = " + keyType);
			}

			return new EqualTo(input2.clean(keySelector));
		}

		//......

	}
复制代码
  • Where对象主要提供equalTo操作用于构建EqualTo对象

EqualTo

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

@Public
		public class EqualTo {

			private final KeySelector<T2, KEY> keySelector2;

			EqualTo(KeySelector<T2, KEY> keySelector2) {
				this.keySelector2 = requireNonNull(keySelector2);
			}

			/**
			 * Specifies the window on which the join operation works.
			 */
			@PublicEvolving
			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
			}
		}
复制代码
  • EqualTo对象提供window操作用于构建WithWindow对象

WithWindow

/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

@Public
	public static class WithWindow<T1, T2, KEY, W extends Window> {

		private final DataStream<T1> input1;
		private final DataStream<T2> input2;

		private final KeySelector<T1, KEY> keySelector1;
		private final KeySelector<T2, KEY> keySelector2;
		private final TypeInformation<KEY> keyType;

		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

		private final Time allowedLateness;

		private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream;

		@PublicEvolving
		protected WithWindow(DataStream<T1> input1,
				DataStream<T2> input2,
				KeySelector<T1, KEY> keySelector1,
				KeySelector<T2, KEY> keySelector2,
				TypeInformation<KEY> keyType,
				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
				Time allowedLateness) {

			this.input1 = requireNonNull(input1);
			this.input2 = requireNonNull(input2);

			this.keySelector1 = requireNonNull(keySelector1);
			this.keySelector2 = requireNonNull(keySelector2);
			this.keyType = requireNonNull(keyType);

			this.windowAssigner = requireNonNull(windowAssigner);

			this.trigger = trigger;
			this.evictor = evictor;

			this.allowedLateness = allowedLateness;
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, newTrigger, evictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, trigger, newEvictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
				windowAssigner, trigger, evictor, newLateness);
		}

		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
				function,
				JoinFunction.class,
				0,
				1,
				2,
				TypeExtractor.NO_INDEX,
				input1.getType(),
				input2.getType(),
				"Join",
				false);

			return apply(function, resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) {
			return (SingleOutputStreamOperator<T>) apply(function);
		}

		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			coGroupedWindowedStream = input1.coGroup(input2)
				.where(keySelector1)
				.equalTo(keySelector2)
				.window(windowAssigner)
				.trigger(trigger)
				.evictor(evictor)
				.allowedLateness(allowedLateness);

			return coGroupedWindowedStream
					.apply(new FlatJoinCoGroupFunction<>(function), resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			return (SingleOutputStreamOperator<T>) apply(function, resultType);
		}

		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
				function,
				FlatJoinFunction.class,
				0,
				1,
				2,
				new int[]{2, 0},
				input1.getType(),
				input2.getType(),
				"Join",
				false);

			return apply(function, resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) {
			return (SingleOutputStreamOperator<T>) apply(function);
		}

		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			coGroupedWindowedStream = input1.coGroup(input2)
				.where(keySelector1)
				.equalTo(keySelector2)
				.window(windowAssigner)
				.trigger(trigger)
				.evictor(evictor)
				.allowedLateness(allowedLateness);

			return coGroupedWindowedStream
					.apply(new JoinCoGroupFunction<>(function), resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			return (SingleOutputStreamOperator<T>) apply(function, resultType);
		}

		@VisibleForTesting
		Time getAllowedLateness() {
			return allowedLateness;
		}

		@VisibleForTesting
		CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() {
			return coGroupedWindowedStream;
		}
	}
复制代码
with操作被标记为废弃
JoinFunction使用JoinCoGroupFunction包装,FlatJoinFunction使用FlatJoinCoGroupFunction包装

JoinFunction

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/JoinFunction.java

@Public
@FunctionalInterface
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {

	/**
	 * The join method, called once per joined pair of elements.
	 *
	 * @param first The element from first input.
	 * @param second The element from second input.
	 * @return The resulting element.
	 *
	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
	 *                   to fail and may trigger recovery.
	 */
	OUT join(IN1 first, IN2 second) throws Exception;
}
复制代码
  • JoinFunction继承了Function、Serializable,它定义了join操作,默认是inner join的语义,如果需要outer join,可以使用CoGroupFunction

FlatJoinFunction

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/FlatJoinFunction.java

@Public
@FunctionalInterface
public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {

	/**
	 * The join method, called once per joined pair of elements.
	 *
	 * @param first The element from first input.
	 * @param second The element from second input.
	 * @param out The collector used to return zero, one, or more elements.
	 *
	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
	 *                   to fail and may trigger recovery.
	 */
	void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
}
复制代码
  • FlatJoinFunction继承了Function、Serializable,它定义了join操作,默认是inner join的语义,如果需要outer join,可以使用CoGroupFunction;与JoinFunction的join方法不同,FlatJoinFunction的join方法多了Collector参数,可以用来发射0条、1条或者多条数据,所以是Flat命名

CoGroupedStreams

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Public
public class CoGroupedStreams<T1, T2> {
	//......

@Public
	public static class WithWindow<T1, T2, KEY, W extends Window> {
		private final DataStream<T1> input1;
		private final DataStream<T2> input2;

		private final KeySelector<T1, KEY> keySelector1;
		private final KeySelector<T2, KEY> keySelector2;

		private final TypeInformation<KEY> keyType;

		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

		private final Time allowedLateness;

		private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;

		protected WithWindow(DataStream<T1> input1,
				DataStream<T2> input2,
				KeySelector<T1, KEY> keySelector1,
				KeySelector<T2, KEY> keySelector2,
				TypeInformation<KEY> keyType,
				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
				Time allowedLateness) {
			this.input1 = input1;
			this.input2 = input2;

			this.keySelector1 = keySelector1;
			this.keySelector2 = keySelector2;
			this.keyType = keyType;

			this.windowAssigner = windowAssigner;
			this.trigger = trigger;
			this.evictor = evictor;

			this.allowedLateness = allowedLateness;
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, newTrigger, evictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, trigger, newEvictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, trigger, evictor, newLateness);
		}

		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

			TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
				function,
				input1.getType(),
				input2.getType(),
				"CoGroup",
				false);

			return apply(function, resultType);
		}

		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
			UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

			DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
					.map(new Input1Tagger<T1, T2>())
					.setParallelism(input1.getParallelism())
					.returns(unionType);
			DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
					.map(new Input2Tagger<T1, T2>())
					.setParallelism(input2.getParallelism())
					.returns(unionType);

			DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

			// we explicitly create the keyed stream to manually pass the key type information in
			windowedStream =
					new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
					.window(windowAssigner);

			if (trigger != null) {
				windowedStream.trigger(trigger);
			}
			if (evictor != null) {
				windowedStream.evictor(evictor);
			}
			if (allowedLateness != null) {
				windowedStream.allowedLateness(allowedLateness);
			}

			return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
		}

		//......

	}

	//......
}
复制代码
  • CoGroupedStreams的整体类结构跟JoinedStreams很像,CoGroupedStreams提供where操作来构建Where对象;Where对象主要提供equalTo操作用于构建EqualTo对象;EqualTo对象提供window操作用于构建WithWindow对象;WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作;其中一个不同的地方是CoGroupedStreams定义的WithWindow对象的apply操作接收的Function是CoGroupFunction类型,而JoinedStreams定义的WithWindow对象的apply操作接收的Function类型是JoinFunction或FlatJoinFunction

CoGroupFunction

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.java

@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {

	/**
	 * This method must be implemented to provide a user implementation of a
	 * coGroup. It is called for each pair of element groups where the elements share the
	 * same key.
	 *
	 * @param first The records from the first input.
	 * @param second The records from the second.
	 * @param out A collector to return elements.
	 *
	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
	 *                   and may trigger the recovery logic.
	 */
	void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
复制代码
  • CoGroupFunction继承了Function、Serializable,它定义了coGroup操作,可以用来实现outer join,其参数使用的是Iterable,而JoinFunction与FlatJoinFunction的join参数使用的是单个对象类型

WrappingFunction

flink-java-1.7.0-sources.jar!/org/apache/flink/api/java/operators/translation/WrappingFunction.java

@Internal
public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {

	private static final long serialVersionUID = 1L;

	protected T wrappedFunction;

	protected WrappingFunction(T wrappedFunction) {
		this.wrappedFunction = wrappedFunction;
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		FunctionUtils.openFunction(this.wrappedFunction, parameters);
	}

	@Override
	public void close() throws Exception {
		FunctionUtils.closeFunction(this.wrappedFunction);
	}

	@Override
	public void setRuntimeContext(RuntimeContext t) {
		super.setRuntimeContext(t);

		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
	}

	public T getWrappedFunction () {
		return this.wrappedFunction;
	}
}
复制代码
  • WrappingFunction继承了AbstractRichFunction,这里它覆盖了父类的open、close、setRuntimeContext方法,用于管理wrappedFunction

JoinCoGroupFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

/**
	 * CoGroup function that does a nested-loop join to get the join result.
	 */
	private static class JoinCoGroupFunction<T1, T2, T>
			extends WrappingFunction<JoinFunction<T1, T2, T>>
			implements CoGroupFunction<T1, T2, T> {
		private static final long serialVersionUID = 1L;

		public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
			super(wrappedFunction);
		}

		@Override
		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for (T2 val2: second) {
					out.collect(wrappedFunction.join(val1, val2));
				}
			}
		}
	}
复制代码
  • JoinCoGroupFunction继承了WrappingFunction,同时实现CoGroupFunction接口定义的coGroup方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行wrappedFunction.join,然后发射join数据
  • JoinedStreams定义了私有静态类JoinCoGroupFunction,JoinedStreams的WithWindow对象的apply方法内部使用它将JoinFunction进行包装,然后去调用CoGroupedStreams的WithWindow的apply方法
  • JoinFunction定义的join方法,接收的是两个对象类型参数,而JoinCoGroupFunction定义的coGroup方法,接收的两个Iterable类型参数

FlatJoinCoGroupFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

/**
	 * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
	 */
	private static class FlatJoinCoGroupFunction<T1, T2, T>
			extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
			implements CoGroupFunction<T1, T2, T> {
		private static final long serialVersionUID = 1L;

		public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
			super(wrappedFunction);
		}

		@Override
		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for (T2 val2: second) {
					wrappedFunction.join(val1, val2, out);
				}
			}
		}
	}
复制代码
  • FlatJoinCoGroupFunction继承了WrappingFunction,同时实现CoGroupFunction接口定义的coGroup方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行wrappedFunction.join,然后发射join数据
  • JoinedStreams定义了私有静态类FlatJoinCoGroupFunction,JoinedStreams的WithWindow对象的apply方法内部使用它将FlatJoinFunction进行包装,然后去调用CoGroupedStreams的WithWindow的apply方法
  • FlatJoinFunction定义的join方法,接收的是两个对象类型参数,而FlatJoinCoGroupFunction定义的coGroup方法,接收的两个Iterable类型参数

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

HTML5

HTML5

Matthew David / Focal Press / 2010-07-29 / USD 39.95

Implement the powerful new multimedia and interactive capabilities offered by HTML5, including style control tools, illustration tools, video, audio, and rich media solutions. Understand how HTML5 is ......一起来看看 《HTML5》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具