微信公众号: 深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
[如果你觉得深广大数据Club对你有帮助,欢迎赞赏]
从上一片《Flink源码解析 | 从Example出发理解Flink-Flink启动》之后,本文讲解Apache Flink example中的SocketWindowWordCount实例代码的实现。
SocketWindowWordCount
首先我们先来看下SocketWindowWordCount的重要代码内容
// get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount");
上述代码分为以下几块内容:
-
获取执行环境
-
实例化DataStream对象
-
执行数据处理获取windowCounts
-
Map - flatMap
-
transaction - keyby
-
reduce
-
打印
-
调用env的execute运行任务
这里的MapFunction以及ReduceFunction可以根据你的业务场景自行实现。
StreamExecutionEnvironment实例化
public static StreamExecutionEnvironment getExecutionEnvironment() { if (contextEnvironmentFactory != null) { return contextEnvironmentFactory.createExecutionEnvironment(); } // because the streaming project depends on "flink-clients" (and not the other way around) // we currently need to intercept the data set environment and create a dependent stream env. // this should be fixed once we rework the project dependencies ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { return new StreamContextEnvironment((ContextEnvironment) env); } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) { return new StreamPlanEnvironment(env); } else { return createLocalEnvironment(); } }
创建StreamExecutionEnvironment时线检查是否存在contextEnvironmentFactory,如果有直接从contextEnvironmentFactory创建返回。没有则通过ExecutionEnvironment创建。
本地模式调用createLocalEnvironment()方法创建StreamEnvironment。
public static LocalStreamEnvironment createLocalEnvironment() { return createLocalEnvironment(defaultLocalParallelism); } public static LocalStreamEnvironment createLocalEnvironment(int parallelism) { return createLocalEnvironment(parallelism, new Configuration()); } public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) { final LocalStreamEnvironment currentEnvironment; currentEnvironment = new LocalStreamEnvironment(configuration); currentEnvironment.setParallelism(parallelism); return currentEnvironment; }
方法一轮轮调用下来最终实例化一个LocalStreamEnvironment返回。
LocalStreamEnvironment
public JobExecutionResult execute(String jobName) throws Exception { // transform the streaming program into a JobGraph StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true); Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0"); // add (and override) the settings with what the user defined configuration.addAll(this.configuration); if (!configuration.contains(RestOptions.PORT)) { configuration.setInteger(RestOptions.PORT, 0); } int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) .setNumSlotsPerTaskManager(numSlotsPerTaskManager) .build(); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster"); } MiniCluster miniCluster = new MiniCluster(cfg); try { miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); return miniCluster.executeJobBlocking(jobGraph); } finally { transformations.clear(); miniCluster.close(); } }
整体代码分为以下几步:
-
创建streamGraph
-
通过streamGraph创建jobGraph
-
创建Configuration
-
创建MiniClusterConfiguration,并设置每一个TaskManager使用的slot数量setNumSlotsPerTaskManager
-
创建miniCluster
-
通过miniCluster.executeJobBlocking执行jobGraph
注:jobGraph是我们要利用miniCluster运行获取结果的Graph有向无环图。
MiniCluster
public void start() throws Exception { synchronized (lock) { checkState(!running, "FlinkMiniCluster is already running"); LOG.info("Starting Flink Mini Cluster"); LOG.debug("Using configuration {}", miniClusterConfiguration); -----------------获取配置信息--------------------- final Configuration configuration = miniClusterConfiguration.getConfiguration(); final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout(); final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED; try { ---------初始化 IO Format类 -------------- initializeIOFormatClasses(configuration); ---------注册MetricsRegistry并实例化jobManagerMetricGroup------- LOG.info("Starting Metrics Registry"); metricRegistry = createMetricRegistry(configuration); this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, "localhost", ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); final RpcService jobManagerRpcService; final RpcService resourceManagerRpcService; final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers]; // bring up all the RPC services LOG.info("Starting RPC Service(s)"); // we always need the 'commonRpcService' for auxiliary calls commonRpcService = createRpcService(configuration, rpcTimeout, false, null); ---------创建ActorSystem------------- // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem( configuration, commonRpcService.getAddress(), LOG); metricRegistry.startQueryService(metricQueryServiceActorSystem, null); ---------实例化jobManagerRpcService、resourceManagerRpcService、taskManagerRpcServices ---------------- if (useSingleRpcService) { for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = commonRpcService; } jobManagerRpcService = commonRpcService; resourceManagerRpcService = commonRpcService; this.resourceManagerRpcService = null; this.jobManagerRpcService = null; this.taskManagerRpcServices = null; } else { // start a new service per component, possibly with custom bind addresses final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress(); final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress(); final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress(); jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress); resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress); for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = createRpcService( configuration, rpcTimeout, true, taskManagerBindAddress); } this.jobManagerRpcService = jobManagerRpcService; this.taskManagerRpcServices = taskManagerRpcServices; this.resourceManagerRpcService = resourceManagerRpcService; } ----------创建ha services--------------- // create the high-availability services LOG.info("Starting high-availability services"); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( configuration, commonRpcService.getExecutor()); ----------创建blob server--------------- blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); ----------创建心跳 server--------------- heartbeatServices = HeartbeatServices.fromConfiguration(configuration); ---------- 启动ResourceManager----------------- // bring up the ResourceManager(s) LOG.info("Starting ResourceManger"); resourceManagerRunner = startResourceManager( configuration, haServices, heartbeatServices, metricRegistry, resourceManagerRpcService, new ClusterInformation("localhost", blobServer.getPort()), jobManagerMetricGroup); ---------创建BlobCacheService-------------------- blobCacheService = new BlobCacheService( configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()) ); ---------启动TaskManager---------------- // bring up the TaskManager(s) for the mini cluster LOG.info("Starting {} TaskManger(s)", numTaskManagers); taskManagers = startTaskManagers( configuration, haServices, heartbeatServices, metricRegistry, blobCacheService, numTaskManagers, taskManagerRpcServices); --------启动调度程序rest端口----------------- // starting the dispatcher rest endpoint LOG.info("Starting dispatcher rest endpoint."); - dispatcherGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, DispatcherGateway.class, DispatcherId::fromUuid, 20, Time.milliseconds(20L)); final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 20, Time.milliseconds(20L)); this.dispatcherRestEndpoint = new DispatcherRestEndpoint( RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, configuration, RestHandlerConfiguration.fromConfiguration(configuration), resourceManagerGatewayRetriever, blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever( metricQueryServiceActorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), haServices.getWebMonitorLeaderElectionService(), new ShutDownFatalErrorHandler()); dispatcherRestEndpoint.start(); restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl()); --------启动HistoryServerArchivist----------------- // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for JobManger"); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint); -------实例化UI数据显示器并启动----------------- dispatcher = new StandaloneDispatcher( jobManagerRpcService, Dispatcher.DISPATCHER_NAME + UUID.randomUUID(), configuration, haServices, resourceManagerRunner.getResourceManageGateway(), blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServicePath(), new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), dispatcherRestEndpoint.getRestBaseUrl(), historyServerArchivist); dispatcher.start(); ------获取ResourceManagerLeader、dispatcherLeaderRetriever并启动------------ resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever(); dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever(); resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever); dispatcherLeaderRetriever.start(dispatcherGatewayRetriever); } catch (Exception e) { // cleanup everything try { close(); } catch (Exception ee) { e.addSuppressed(ee); } throw e; } // create a new termination future terminationFuture = new CompletableFuture<>(); // now officially mark this as running running = true; LOG.info("Flink Mini Cluster started successfully"); } }
MiniCluster所做的事情较多,具体步骤如下:
-
获取配置信息
-
初始化 IO Format类
-
注册MetricsRegistry并实例化jobManagerMetricGroup
-
启动rpc服务
-
启动HA服务
-
启动resourceManager
-
启动TaskManagers
-
启动调度程序rest端口
-
在提交工作时启动JobManagers的分配器
-
获取ResourceManagerLeader、dispatcherLeaderRetriever并启动
总结
简化的描述下整个流程的处理过程:
-
创建获取对应的StreamExecutionEnvironment对象:LocalStreamEnvironment
-
调用StreamExecutionEnvironment对象的execute方法
-
获取streamGraph
-
获取jobGraph
-
实例化miniCluster
-
miniCluster.executeJobBlocking指定要运行的jobGraph
-
启动minCluster执行任务
启动各类所需服务(rpc、ha、resourceManager、TaskManagers等等)
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。