聊聊flink如何兼容StormTopology

栏目: 编程工具 · 发布时间: 7年前

内容简介:flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.javaflink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.javaflink-release-1.6.2/flink-contrib/flink-s
@Test
    public void testStormWordCount() throws Exception {
        //NOTE 1 build Topology the Storm way
        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomWordSpout(), 1);
        builder.setBolt("count", new WordCountBolt(), 5)
                .fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1)
                .shuffleGrouping("count");

        //NOTE 2 convert StormTopology to FlinkTopology
        FlinkTopology flinkTopology = FlinkTopology.createTopology(builder);

        //NOTE 3 execute program locally using FlinkLocalCluster
        Config conf = new Config();
        // only required to stabilize integration test
        conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true);

        final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
        cluster.submitTopology("stormWordCount", conf, flinkTopology);
        cluster.shutdown();
    }
复制代码
由于flink是使用的Checkpoint机制,不会转换storm的ack操作,因而这里用BaseBasicBolt还是BaseRichBolt都无特别要求

LocalClusterFactory

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java

// ------------------------------------------------------------------------
	//  Access to default local cluster
	// ------------------------------------------------------------------------

	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
	private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();

	/**
	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
	 * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
	 *
	 * @return a {@link FlinkLocalCluster} to be used for execution
	 */
	public static FlinkLocalCluster getLocalCluster() {
		return currentFactory.createLocalCluster();
	}

	/**
	 * Sets a different factory for FlinkLocalClusters to be used for execution.
	 *
	 * @param clusterFactory
	 * 		The LocalClusterFactory to create the local clusters for execution.
	 */
	public static void initialize(LocalClusterFactory clusterFactory) {
		currentFactory = Objects.requireNonNull(clusterFactory);
	}

	// ------------------------------------------------------------------------
	//  Cluster factory
	// ------------------------------------------------------------------------

	/**
	 * A factory that creates local clusters.
	 */
	public interface LocalClusterFactory {

		/**
		 * Creates a local Flink cluster.
		 * @return A local Flink cluster.
		 */
		FlinkLocalCluster createLocalCluster();
	}

	/**
	 * A factory that instantiates a FlinkLocalCluster.
	 */
	public static class DefaultLocalClusterFactory implements LocalClusterFactory {

		@Override
		public FlinkLocalCluster createLocalCluster() {
			return new FlinkLocalCluster();
		}
	}
复制代码
  • flink在FlinkLocalCluster里头提供了一个静态方法getLocalCluster,用来获取FlinkLocalCluster,它是通过LocalClusterFactory来创建一个FlinkLocalCluster
  • LocalClusterFactory这里使用的是DefaultLocalClusterFactory实现类,它的createLocalCluster方法,直接new了一个FlinkLocalCluster
  • 目前的实现来看,每次调用FlinkLocalCluster.getLocalCluster,都会创建一个新的FlinkLocalCluster,这个在调用的时候是需要注意一下的

FlinkTopology

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java

/**
	 * Creates a Flink program that uses the specified spouts and bolts.
	 * @param stormBuilder The Storm topology builder to use for creating the Flink topology.
	 * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
	 */
	public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
		return new FlinkTopology(stormBuilder);
	}

	private FlinkTopology(TopologyBuilder builder) {
		this.builder = builder;
		this.stormTopology = builder.createTopology();
		// extract the spouts and bolts
		this.spouts = getPrivateField("_spouts");
		this.bolts = getPrivateField("_bolts");

		this.env = StreamExecutionEnvironment.getExecutionEnvironment();

		// Kick off the translation immediately
		translateTopology();
	}
复制代码
  • FlinkTopology提供了一个静态工厂方法createTopology用来创建FlinkTopology
  • FlinkTopology先保存一下TopologyBuilder,然后通过getPrivateField反射调用getDeclaredField获取_spouts、_bolts私有属性然后保存起来,方便后面转换topology使用
  • 之后先获取到ExecutionEnvironment,最后就是调用translateTopology进行整个StormTopology的转换

translateTopology

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java

/**
	 * Creates a Flink program that uses the specified spouts and bolts.
	 */
	private void translateTopology() {

		unprocessdInputsPerBolt.clear();
		outputStreams.clear();
		declarers.clear();
		availableInputs.clear();

		// Storm defaults to parallelism 1
		env.setParallelism(1);

		/* Translation of topology */

		for (final Entry<String, IRichSpout> spout : spouts.entrySet()) {
			final String spoutId = spout.getKey();
			final IRichSpout userSpout = spout.getValue();

			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
			userSpout.declareOutputFields(declarer);
			final HashMap<String, Fields> sourceStreams = declarer.outputStreams;
			this.outputStreams.put(spoutId, sourceStreams);
			declarers.put(spoutId, declarer);

			final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
			final DataStreamSource<?> source;

			if (sourceStreams.size() == 1) {
				final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
				spoutWrapperSingleOutput.setStormTopology(stormTopology);

				final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];

				DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
						declarer.getOutputType(outputStreamId));

				outputStreams.put(outputStreamId, src);
				source = src;
			} else {
				final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
						userSpout, spoutId, null, null);
				spoutWrapperMultipleOutputs.setStormTopology(stormTopology);

				@SuppressWarnings({ "unchecked", "rawtypes" })
				DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
						spoutWrapperMultipleOutputs, spoutId,
						(TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));

				SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
						.split(new StormStreamSelector<Tuple>());
				for (String streamId : sourceStreams.keySet()) {
					SingleOutputStreamOperator<Tuple> outStream = splitSource.select(streamId)
							.map(new SplitStreamMapper<Tuple>());
					outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
					outputStreams.put(streamId, outStream);
				}
				source = multiSource;
			}
			availableInputs.put(spoutId, outputStreams);

			final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
			if (common.is_set_parallelism_hint()) {
				int dop = common.get_parallelism_hint();
				source.setParallelism(dop);
			} else {
				common.set_parallelism_hint(1);
			}
		}

		/**
		 * 1. Connect all spout streams with bolts streams
		 * 2. Then proceed with the bolts stream already connected
		 *
		 * <p>Because we do not know the order in which an iterator steps over a set, we might process a consumer before
		 * its producer
		 * ->thus, we might need to repeat multiple times
		 */
		boolean makeProgress = true;
		while (bolts.size() > 0) {
			if (!makeProgress) {
				StringBuilder strBld = new StringBuilder();
				strBld.append("Unable to build Topology. Could not connect the following bolts:");
				for (String boltId : bolts.keySet()) {
					strBld.append("\n  ");
					strBld.append(boltId);
					strBld.append(": missing input streams [");
					for (Entry<GlobalStreamId, Grouping> streams : unprocessdInputsPerBolt
							.get(boltId)) {
						strBld.append("'");
						strBld.append(streams.getKey().get_streamId());
						strBld.append("' from '");
						strBld.append(streams.getKey().get_componentId());
						strBld.append("'; ");
					}
					strBld.append("]");
				}

				throw new RuntimeException(strBld.toString());
			}
			makeProgress = false;

			final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator();
			while (boltsIterator.hasNext()) {

				final Entry<String, IRichBolt> bolt = boltsIterator.next();
				final String boltId = bolt.getKey();
				final IRichBolt userBolt = copyObject(bolt.getValue());

				final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();

				Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
				if (unprocessedBoltInputs == null) {
					unprocessedBoltInputs = new HashSet<>();
					unprocessedBoltInputs.addAll(common.get_inputs().entrySet());
					unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs);
				}

				// check if all inputs are available
				final int numberOfInputs = unprocessedBoltInputs.size();
				int inputsAvailable = 0;
				for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) {
					final String producerId = entry.getKey().get_componentId();
					final String streamId = entry.getKey().get_streamId();
					final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId);
					if (streams != null && streams.get(streamId) != null) {
						inputsAvailable++;
					}
				}

				if (inputsAvailable != numberOfInputs) {
					// traverse other bolts first until inputs are available
					continue;
				} else {
					makeProgress = true;
					boltsIterator.remove();
				}

				final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs);

				for (Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) {
					final GlobalStreamId streamId = input.getKey();
					final Grouping grouping = input.getValue();

					final String producerId = streamId.get_componentId();

					final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId);

					inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer));
				}

				final SingleOutputStreamOperator<?> outputStream = createOutput(boltId,
						userBolt, inputStreams);

				if (common.is_set_parallelism_hint()) {
					int dop = common.get_parallelism_hint();
					outputStream.setParallelism(dop);
				} else {
					common.set_parallelism_hint(1);
				}

			}
		}
	}
复制代码
它实现了storm的OutputFieldsDeclarer接口
比如shuffleGrouping转换为对DataStream的rebalance操作,fieldsGrouping转换为对DataStream的keyBy操作,globalGrouping转换为global操作,allGrouping转换为broadcast操作

FlinkLocalCluster

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/api/FlinkLocalCluster.java

/**
 * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
 */
public class FlinkLocalCluster {

	/** The log used by this mini cluster. */
	private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);

	/** The Flink mini cluster on which to execute the programs. */
	private FlinkMiniCluster flink;

	/** Configuration key to submit topology in blocking mode if flag is set to {@code true}. */
	public static final String SUBMIT_BLOCKING = "SUBMIT_STORM_TOPOLOGY_BLOCKING";

	public FlinkLocalCluster() {
	}

	public FlinkLocalCluster(FlinkMiniCluster flink) {
		this.flink = Objects.requireNonNull(flink);
	}

	@SuppressWarnings("rawtypes")
	public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
			throws Exception {
		this.submitTopologyWithOpts(topologyName, conf, topology, null);
	}

	@SuppressWarnings("rawtypes")
	public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
		LOG.info("Running Storm topology on FlinkLocalCluster");

		boolean submitBlocking = false;
		if (conf != null) {
			Object blockingFlag = conf.get(SUBMIT_BLOCKING);
			if (blockingFlag instanceof Boolean) {
				submitBlocking = ((Boolean) blockingFlag).booleanValue();
			}
		}

		FlinkClient.addStormConfigToTopology(topology, conf);

		StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
		streamGraph.setJobName(topologyName);

		JobGraph jobGraph = streamGraph.getJobGraph();

		if (this.flink == null) {
			Configuration configuration = new Configuration();
			configuration.addAll(jobGraph.getJobConfiguration());

			configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

			this.flink = new LocalFlinkMiniCluster(configuration, true);
			this.flink.start();
		}

		if (submitBlocking) {
			this.flink.submitJobAndWait(jobGraph, false);
		} else {
			this.flink.submitJobDetached(jobGraph);
		}
	}

	public void killTopology(final String topologyName) {
		this.killTopologyWithOpts(topologyName, null);
	}

	public void killTopologyWithOpts(final String name, final KillOptions options) {
	}

	public void activate(final String topologyName) {
	}

	public void deactivate(final String topologyName) {
	}

	public void rebalance(final String name, final RebalanceOptions options) {
	}

	public void shutdown() {
		if (this.flink != null) {
			this.flink.stop();
			this.flink = null;
		}
	}

	//......
}
复制代码
  • FlinkLocalCluster的submitTopology方法调用了submitTopologyWithOpts,而后者主要是设置一些参数,调用topology.getExecutionEnvironment().getStreamGraph()根据transformations生成StreamGraph,再获取JobGraph,然后创建LocalFlinkMiniCluster并start,最后使用LocalFlinkMiniCluster的submitJobAndWait或submitJobDetached来提交整个JobGraph

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Spark SQL内核剖析

Spark SQL内核剖析

朱锋、张韶全、黄明 / 电子工业出版社 / 2018-8 / 69.00元

Spark SQL 是 Spark 技术体系中较有影响力的应用(Killer application),也是 SQL-on-Hadoop 解决方案 中举足轻重的产品。《Spark SQL内核剖析》由 11 章构成,从源码层面深入介绍 Spark SQL 内部实现机制,以及在实际业务场 景中的开发实践,其中包括 SQL 编译实现、逻辑计划的生成与优化、物理计划的生成与优化、Aggregation 算......一起来看看 《Spark SQL内核剖析》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

随机密码生成器
随机密码生成器

多种字符组合密码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具