聊聊flink如何兼容StormTopology

栏目: 编程工具 · 发布时间: 6年前

内容简介:flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.javaflink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.javaflink-release-1.6.2/flink-contrib/flink-s
@Test
    public void testStormWordCount() throws Exception {
        //NOTE 1 build Topology the Storm way
        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomWordSpout(), 1);
        builder.setBolt("count", new WordCountBolt(), 5)
                .fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1)
                .shuffleGrouping("count");

        //NOTE 2 convert StormTopology to FlinkTopology
        FlinkTopology flinkTopology = FlinkTopology.createTopology(builder);

        //NOTE 3 execute program locally using FlinkLocalCluster
        Config conf = new Config();
        // only required to stabilize integration test
        conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true);

        final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
        cluster.submitTopology("stormWordCount", conf, flinkTopology);
        cluster.shutdown();
    }
复制代码
由于flink是使用的Checkpoint机制,不会转换storm的ack操作,因而这里用BaseBasicBolt还是BaseRichBolt都无特别要求

LocalClusterFactory

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java

// ------------------------------------------------------------------------
	//  Access to default local cluster
	// ------------------------------------------------------------------------

	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
	private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();

	/**
	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
	 * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
	 *
	 * @return a {@link FlinkLocalCluster} to be used for execution
	 */
	public static FlinkLocalCluster getLocalCluster() {
		return currentFactory.createLocalCluster();
	}

	/**
	 * Sets a different factory for FlinkLocalClusters to be used for execution.
	 *
	 * @param clusterFactory
	 * 		The LocalClusterFactory to create the local clusters for execution.
	 */
	public static void initialize(LocalClusterFactory clusterFactory) {
		currentFactory = Objects.requireNonNull(clusterFactory);
	}

	// ------------------------------------------------------------------------
	//  Cluster factory
	// ------------------------------------------------------------------------

	/**
	 * A factory that creates local clusters.
	 */
	public interface LocalClusterFactory {

		/**
		 * Creates a local Flink cluster.
		 * @return A local Flink cluster.
		 */
		FlinkLocalCluster createLocalCluster();
	}

	/**
	 * A factory that instantiates a FlinkLocalCluster.
	 */
	public static class DefaultLocalClusterFactory implements LocalClusterFactory {

		@Override
		public FlinkLocalCluster createLocalCluster() {
			return new FlinkLocalCluster();
		}
	}
复制代码
  • flink在FlinkLocalCluster里头提供了一个静态方法getLocalCluster,用来获取FlinkLocalCluster,它是通过LocalClusterFactory来创建一个FlinkLocalCluster
  • LocalClusterFactory这里使用的是DefaultLocalClusterFactory实现类,它的createLocalCluster方法,直接new了一个FlinkLocalCluster
  • 目前的实现来看,每次调用FlinkLocalCluster.getLocalCluster,都会创建一个新的FlinkLocalCluster,这个在调用的时候是需要注意一下的

FlinkTopology

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java

/**
	 * Creates a Flink program that uses the specified spouts and bolts.
	 * @param stormBuilder The Storm topology builder to use for creating the Flink topology.
	 * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
	 */
	public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
		return new FlinkTopology(stormBuilder);
	}

	private FlinkTopology(TopologyBuilder builder) {
		this.builder = builder;
		this.stormTopology = builder.createTopology();
		// extract the spouts and bolts
		this.spouts = getPrivateField("_spouts");
		this.bolts = getPrivateField("_bolts");

		this.env = StreamExecutionEnvironment.getExecutionEnvironment();

		// Kick off the translation immediately
		translateTopology();
	}
复制代码
  • FlinkTopology提供了一个静态工厂方法createTopology用来创建FlinkTopology
  • FlinkTopology先保存一下TopologyBuilder,然后通过getPrivateField反射调用getDeclaredField获取_spouts、_bolts私有属性然后保存起来,方便后面转换topology使用
  • 之后先获取到ExecutionEnvironment,最后就是调用translateTopology进行整个StormTopology的转换

translateTopology

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java

/**
	 * Creates a Flink program that uses the specified spouts and bolts.
	 */
	private void translateTopology() {

		unprocessdInputsPerBolt.clear();
		outputStreams.clear();
		declarers.clear();
		availableInputs.clear();

		// Storm defaults to parallelism 1
		env.setParallelism(1);

		/* Translation of topology */

		for (final Entry<String, IRichSpout> spout : spouts.entrySet()) {
			final String spoutId = spout.getKey();
			final IRichSpout userSpout = spout.getValue();

			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
			userSpout.declareOutputFields(declarer);
			final HashMap<String, Fields> sourceStreams = declarer.outputStreams;
			this.outputStreams.put(spoutId, sourceStreams);
			declarers.put(spoutId, declarer);

			final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
			final DataStreamSource<?> source;

			if (sourceStreams.size() == 1) {
				final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
				spoutWrapperSingleOutput.setStormTopology(stormTopology);

				final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];

				DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
						declarer.getOutputType(outputStreamId));

				outputStreams.put(outputStreamId, src);
				source = src;
			} else {
				final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
						userSpout, spoutId, null, null);
				spoutWrapperMultipleOutputs.setStormTopology(stormTopology);

				@SuppressWarnings({ "unchecked", "rawtypes" })
				DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
						spoutWrapperMultipleOutputs, spoutId,
						(TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));

				SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
						.split(new StormStreamSelector<Tuple>());
				for (String streamId : sourceStreams.keySet()) {
					SingleOutputStreamOperator<Tuple> outStream = splitSource.select(streamId)
							.map(new SplitStreamMapper<Tuple>());
					outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
					outputStreams.put(streamId, outStream);
				}
				source = multiSource;
			}
			availableInputs.put(spoutId, outputStreams);

			final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
			if (common.is_set_parallelism_hint()) {
				int dop = common.get_parallelism_hint();
				source.setParallelism(dop);
			} else {
				common.set_parallelism_hint(1);
			}
		}

		/**
		 * 1. Connect all spout streams with bolts streams
		 * 2. Then proceed with the bolts stream already connected
		 *
		 * <p>Because we do not know the order in which an iterator steps over a set, we might process a consumer before
		 * its producer
		 * ->thus, we might need to repeat multiple times
		 */
		boolean makeProgress = true;
		while (bolts.size() > 0) {
			if (!makeProgress) {
				StringBuilder strBld = new StringBuilder();
				strBld.append("Unable to build Topology. Could not connect the following bolts:");
				for (String boltId : bolts.keySet()) {
					strBld.append("\n  ");
					strBld.append(boltId);
					strBld.append(": missing input streams [");
					for (Entry<GlobalStreamId, Grouping> streams : unprocessdInputsPerBolt
							.get(boltId)) {
						strBld.append("'");
						strBld.append(streams.getKey().get_streamId());
						strBld.append("' from '");
						strBld.append(streams.getKey().get_componentId());
						strBld.append("'; ");
					}
					strBld.append("]");
				}

				throw new RuntimeException(strBld.toString());
			}
			makeProgress = false;

			final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator();
			while (boltsIterator.hasNext()) {

				final Entry<String, IRichBolt> bolt = boltsIterator.next();
				final String boltId = bolt.getKey();
				final IRichBolt userBolt = copyObject(bolt.getValue());

				final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();

				Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
				if (unprocessedBoltInputs == null) {
					unprocessedBoltInputs = new HashSet<>();
					unprocessedBoltInputs.addAll(common.get_inputs().entrySet());
					unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs);
				}

				// check if all inputs are available
				final int numberOfInputs = unprocessedBoltInputs.size();
				int inputsAvailable = 0;
				for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) {
					final String producerId = entry.getKey().get_componentId();
					final String streamId = entry.getKey().get_streamId();
					final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId);
					if (streams != null && streams.get(streamId) != null) {
						inputsAvailable++;
					}
				}

				if (inputsAvailable != numberOfInputs) {
					// traverse other bolts first until inputs are available
					continue;
				} else {
					makeProgress = true;
					boltsIterator.remove();
				}

				final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs);

				for (Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) {
					final GlobalStreamId streamId = input.getKey();
					final Grouping grouping = input.getValue();

					final String producerId = streamId.get_componentId();

					final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId);

					inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer));
				}

				final SingleOutputStreamOperator<?> outputStream = createOutput(boltId,
						userBolt, inputStreams);

				if (common.is_set_parallelism_hint()) {
					int dop = common.get_parallelism_hint();
					outputStream.setParallelism(dop);
				} else {
					common.set_parallelism_hint(1);
				}

			}
		}
	}
复制代码
它实现了storm的OutputFieldsDeclarer接口
比如shuffleGrouping转换为对DataStream的rebalance操作,fieldsGrouping转换为对DataStream的keyBy操作,globalGrouping转换为global操作,allGrouping转换为broadcast操作

FlinkLocalCluster

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/api/FlinkLocalCluster.java

/**
 * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
 */
public class FlinkLocalCluster {

	/** The log used by this mini cluster. */
	private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);

	/** The Flink mini cluster on which to execute the programs. */
	private FlinkMiniCluster flink;

	/** Configuration key to submit topology in blocking mode if flag is set to {@code true}. */
	public static final String SUBMIT_BLOCKING = "SUBMIT_STORM_TOPOLOGY_BLOCKING";

	public FlinkLocalCluster() {
	}

	public FlinkLocalCluster(FlinkMiniCluster flink) {
		this.flink = Objects.requireNonNull(flink);
	}

	@SuppressWarnings("rawtypes")
	public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
			throws Exception {
		this.submitTopologyWithOpts(topologyName, conf, topology, null);
	}

	@SuppressWarnings("rawtypes")
	public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
		LOG.info("Running Storm topology on FlinkLocalCluster");

		boolean submitBlocking = false;
		if (conf != null) {
			Object blockingFlag = conf.get(SUBMIT_BLOCKING);
			if (blockingFlag instanceof Boolean) {
				submitBlocking = ((Boolean) blockingFlag).booleanValue();
			}
		}

		FlinkClient.addStormConfigToTopology(topology, conf);

		StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
		streamGraph.setJobName(topologyName);

		JobGraph jobGraph = streamGraph.getJobGraph();

		if (this.flink == null) {
			Configuration configuration = new Configuration();
			configuration.addAll(jobGraph.getJobConfiguration());

			configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

			this.flink = new LocalFlinkMiniCluster(configuration, true);
			this.flink.start();
		}

		if (submitBlocking) {
			this.flink.submitJobAndWait(jobGraph, false);
		} else {
			this.flink.submitJobDetached(jobGraph);
		}
	}

	public void killTopology(final String topologyName) {
		this.killTopologyWithOpts(topologyName, null);
	}

	public void killTopologyWithOpts(final String name, final KillOptions options) {
	}

	public void activate(final String topologyName) {
	}

	public void deactivate(final String topologyName) {
	}

	public void rebalance(final String name, final RebalanceOptions options) {
	}

	public void shutdown() {
		if (this.flink != null) {
			this.flink.stop();
			this.flink = null;
		}
	}

	//......
}
复制代码
  • FlinkLocalCluster的submitTopology方法调用了submitTopologyWithOpts,而后者主要是设置一些参数,调用topology.getExecutionEnvironment().getStreamGraph()根据transformations生成StreamGraph,再获取JobGraph,然后创建LocalFlinkMiniCluster并start,最后使用LocalFlinkMiniCluster的submitJobAndWait或submitJobDetached来提交整个JobGraph

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

信息乌托邦

信息乌托邦

桑斯坦 / 毕竞悦 / 法律出版社 / 2008-10-1 / 28.50元

我们被无限的媒体网从四面包围,如何能够确保最准确的信息脱颖而出、并且引起注意?在本书中,凯斯•R. 桑斯坦对于积蓄信息和运用知识改善我们生活的人类潜能,展示了深刻的乐观理解。 在一个信息超负荷的时代里,很容易退回到我们自己的偏见。人群很快就会变为暴徒。伊拉克战争的合法理由、安然破产、哥伦比亚号航天载人飞机的爆炸——所有这些都源自埋于“信息茧房”的领导和组织做出的决定,以他们的先入之见躲避意见......一起来看看 《信息乌托邦》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

MD5 加密
MD5 加密

MD5 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具