Flink 源码解析之从 Example 出发:读懂本地任务执行流程

栏目: 服务器 · 发布时间: 6年前

Flink 源码解析之从 Example 出发:读懂本地任务执行流程

微信公众号: 深广大数据Club

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

[如果你觉得深广大数据Club对你有帮助,欢迎赞赏]

从上一片《Flink源码解析 | 从Example出发理解Flink-Flink启动》之后,本文讲解Apache Flink example中的SocketWindowWordCount实例代码的实现。

SocketWindowWordCount

首先我们先来看下SocketWindowWordCount的重要代码内容

// get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })

                .keyBy("word")
                .timeWindow(Time.seconds(5))

                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");

上述代码分为以下几块内容:

  • 获取执行环境

  • 实例化DataStream对象

  • 执行数据处理获取windowCounts

    • Map - flatMap

    • transaction - keyby

    • reduce

  • 打印

  • 调用env的execute运行任务

这里的MapFunction以及ReduceFunction可以根据你的业务场景自行实现。

StreamExecutionEnvironment实例化

public static StreamExecutionEnvironment getExecutionEnvironment() {
        if (contextEnvironmentFactory != null) {
            return contextEnvironmentFactory.createExecutionEnvironment();
        }

        // because the streaming project depends on "flink-clients" (and not the other way around)
        // we currently need to intercept the data set environment and create a dependent stream env.
        // this should be fixed once we rework the project dependencies

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        if (env instanceof ContextEnvironment) {
            return new StreamContextEnvironment((ContextEnvironment) env);
        } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
            return new StreamPlanEnvironment(env);
        } else {
            return createLocalEnvironment();
        }
    }

创建StreamExecutionEnvironment时线检查是否存在contextEnvironmentFactory,如果有直接从contextEnvironmentFactory创建返回。没有则通过ExecutionEnvironment创建。

本地模式调用createLocalEnvironment()方法创建StreamEnvironment。

    public static LocalStreamEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalParallelism);
    }

    public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
        return createLocalEnvironment(parallelism, new Configuration());
    }

    public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
        final LocalStreamEnvironment currentEnvironment;

        currentEnvironment = new LocalStreamEnvironment(configuration);
        currentEnvironment.setParallelism(parallelism);

        return currentEnvironment;
    }

方法一轮轮调用下来最终实例化一个LocalStreamEnvironment返回。

LocalStreamEnvironment

public JobExecutionResult execute(String jobName) throws Exception {
        // transform the streaming program into a JobGraph
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);

        JobGraph jobGraph = streamGraph.getJobGraph();
        jobGraph.setAllowQueuedScheduling(true);

        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

        // add (and override) the settings with what the user defined
        configuration.addAll(this.configuration);

        if (!configuration.contains(RestOptions.PORT)) {
            configuration.setInteger(RestOptions.PORT, 0);
        }

        int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
            .setConfiguration(configuration)
            .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
            .build();

        if (LOG.isInfoEnabled()) {
            LOG.info("Running job on local embedded Flink mini cluster");
        }

        MiniCluster miniCluster = new MiniCluster(cfg);

        try {
            miniCluster.start();
            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

            return miniCluster.executeJobBlocking(jobGraph);
        }
        finally {
            transformations.clear();
            miniCluster.close();
        }
    }

整体代码分为以下几步:

  • 创建streamGraph

  • 通过streamGraph创建jobGraph

  • 创建Configuration

  • 创建MiniClusterConfiguration,并设置每一个TaskManager使用的slot数量setNumSlotsPerTaskManager

  • 创建miniCluster

  • 通过miniCluster.executeJobBlocking执行jobGraph

注:jobGraph是我们要利用miniCluster运行获取结果的Graph有向无环图。

MiniCluster

public void start() throws Exception {
        synchronized (lock) {
            checkState(!running, "FlinkMiniCluster is already running");

            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", miniClusterConfiguration);
            -----------------获取配置信息---------------------
            final Configuration configuration = miniClusterConfiguration.getConfiguration();
            final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
            final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
            final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;

            try {
                ---------初始化 IO Format类 --------------
                initializeIOFormatClasses(configuration);
                ---------注册MetricsRegistry并实例化jobManagerMetricGroup-------
                LOG.info("Starting Metrics Registry");
                metricRegistry = createMetricRegistry(configuration);
                this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
                    metricRegistry,
                    "localhost",
                    ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));

                final RpcService jobManagerRpcService;
                final RpcService resourceManagerRpcService;
                final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];

                // bring up all the RPC services
                LOG.info("Starting RPC Service(s)");

                // we always need the 'commonRpcService' for auxiliary calls
                commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
                ---------创建ActorSystem-------------
                // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
                metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(
                    configuration,
                    commonRpcService.getAddress(),
                    LOG);
                metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
                ---------实例化jobManagerRpcService、resourceManagerRpcService、taskManagerRpcServices ----------------
                if (useSingleRpcService) {
                    for (int i = 0; i < numTaskManagers; i++) {
                        taskManagerRpcServices[i] = commonRpcService;
                    }

                    jobManagerRpcService = commonRpcService;
                    resourceManagerRpcService = commonRpcService;

                    this.resourceManagerRpcService = null;
                    this.jobManagerRpcService = null;
                    this.taskManagerRpcServices = null;
                }
                else {
                    // start a new service per component, possibly with custom bind addresses
                    final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
                    final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
                    final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();

                    jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
                    resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);

                    for (int i = 0; i < numTaskManagers; i++) {
                        taskManagerRpcServices[i] = createRpcService(
                                configuration, rpcTimeout, true, taskManagerBindAddress);
                    }

                    this.jobManagerRpcService = jobManagerRpcService;
                    this.taskManagerRpcServices = taskManagerRpcServices;
                    this.resourceManagerRpcService = resourceManagerRpcService;
                }
                ----------创建ha services---------------
                // create the high-availability services
                LOG.info("Starting high-availability services");
                haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                    configuration,
                    commonRpcService.getExecutor());
                ----------创建blob server---------------
                blobServer = new BlobServer(configuration, haServices.createBlobStore());
                blobServer.start();
                ----------创建心跳 server---------------
                heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
                ---------- 启动ResourceManager-----------------
                // bring up the ResourceManager(s)
                LOG.info("Starting ResourceManger");
                resourceManagerRunner = startResourceManager(
                    configuration,
                    haServices,
                    heartbeatServices,
                    metricRegistry,
                    resourceManagerRpcService,
                    new ClusterInformation("localhost", blobServer.getPort()),
                    jobManagerMetricGroup);
                ---------创建BlobCacheService--------------------
                blobCacheService = new BlobCacheService(
                    configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
                );
                ---------启动TaskManager----------------
                // bring up the TaskManager(s) for the mini cluster
                LOG.info("Starting {} TaskManger(s)", numTaskManagers);
                taskManagers = startTaskManagers(
                    configuration,
                    haServices,
                    heartbeatServices,
                    metricRegistry,
                    blobCacheService,
                    numTaskManagers,
                    taskManagerRpcServices);

                --------启动调度程序rest端口-----------------
                // starting the dispatcher rest endpoint
                LOG.info("Starting dispatcher rest endpoint.");
                -
                dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
                    jobManagerRpcService,
                    DispatcherGateway.class,
                    DispatcherId::fromUuid,
                    20,
                    Time.milliseconds(20L));
                final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
                    jobManagerRpcService,
                    ResourceManagerGateway.class,
                    ResourceManagerId::fromUuid,
                    20,
                    Time.milliseconds(20L));

                this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
                    RestServerEndpointConfiguration.fromConfiguration(configuration),
                    dispatcherGatewayRetriever,
                    configuration,
                    RestHandlerConfiguration.fromConfiguration(configuration),
                    resourceManagerGatewayRetriever,
                    blobServer.getTransientBlobService(),
                    WebMonitorEndpoint.createExecutorService(
                        configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
                        configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                        "DispatcherRestEndpoint"),
                    new AkkaQueryServiceRetriever(
                        metricQueryServiceActorSystem,
                        Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
                    haServices.getWebMonitorLeaderElectionService(),
                    new ShutDownFatalErrorHandler());

                dispatcherRestEndpoint.start();

                restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());

                --------启动HistoryServerArchivist-----------------
                // bring up the dispatcher that launches JobManagers when jobs submitted
                LOG.info("Starting job dispatcher(s) for JobManger");

                final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
                -------实例化UI数据显示器并启动-----------------
                dispatcher = new StandaloneDispatcher(
                    jobManagerRpcService,
                    Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
                    configuration,
                    haServices,
                    resourceManagerRunner.getResourceManageGateway(),
                    blobServer,
                    heartbeatServices,
                    jobManagerMetricGroup,
                    metricRegistry.getMetricQueryServicePath(),
                    new MemoryArchivedExecutionGraphStore(),
                    Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                    new ShutDownFatalErrorHandler(),
                    dispatcherRestEndpoint.getRestBaseUrl(),
                    historyServerArchivist);

                dispatcher.start();
                ------获取ResourceManagerLeader、dispatcherLeaderRetriever并启动------------
                resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
                dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();

                resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
                dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
            }
            catch (Exception e) {
                // cleanup everything
                try {
                    close();
                } catch (Exception ee) {
                    e.addSuppressed(ee);
                }
                throw e;
            }

            // create a new termination future
            terminationFuture = new CompletableFuture<>();

            // now officially mark this as running
            running = true;

            LOG.info("Flink Mini Cluster started successfully");
        }
    }

MiniCluster所做的事情较多,具体步骤如下:

  • 获取配置信息

  • 初始化 IO Format类

  • 注册MetricsRegistry并实例化jobManagerMetricGroup

  • 启动rpc服务

  • 启动HA服务

  • 启动resourceManager

  • 启动TaskManagers

  • 启动调度程序rest端口

  • 在提交工作时启动JobManagers的分配器

  • 获取ResourceManagerLeader、dispatcherLeaderRetriever并启动

总结

简化的描述下整个流程的处理过程:

  • 创建获取对应的StreamExecutionEnvironment对象:LocalStreamEnvironment

  • 调用StreamExecutionEnvironment对象的execute方法

    • 获取streamGraph

    • 获取jobGraph

    • 实例化miniCluster

    • miniCluster.executeJobBlocking指定要运行的jobGraph

  • 启动minCluster执行任务

    启动各类所需服务(rpc、ha、resourceManager、TaskManagers等等) 


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

小群效应

小群效应

徐志斌 / 中信出版集团 / 2017-11 / 58.00元

互联网经济时代,新零售、网红经济、知识经济多受益于社群。用户的获取、留存及订单转化直接决定了一个社群的存亡。无论是“做”群还是“用”群,每个人都需要迭代常识:了解用户行为习惯,了解社群运行规律。 《社交红利》《即时引爆》作者徐志斌历时两年,挖掘腾讯、百度、豆瓣的一手后台数据,从上百个产品中深度解读社群行为,通过大量生动案例总结出利用社交网络和海量用户进行沟通的方法论。 本书将告诉你: ......一起来看看 《小群效应》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具