微信公众号: 深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
[如果你觉得深广大数据Club对你有帮助,欢迎赞赏]
从上一片《Flink源码解析 | 从Example出发理解Flink-Flink启动》之后,本文讲解Apache Flink example中的SocketWindowWordCount实例代码的实现。
SocketWindowWordCount
首先我们先来看下SocketWindowWordCount的重要代码内容
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
上述代码分为以下几块内容:
-
获取执行环境
-
实例化DataStream对象
-
执行数据处理获取windowCounts
-
Map - flatMap
-
transaction - keyby
-
reduce
-
打印
-
调用env的execute运行任务
这里的MapFunction以及ReduceFunction可以根据你的业务场景自行实现。
StreamExecutionEnvironment实例化
public static StreamExecutionEnvironment getExecutionEnvironment() {
if (contextEnvironmentFactory != null) {
return contextEnvironmentFactory.createExecutionEnvironment();
}
// because the streaming project depends on "flink-clients" (and not the other way around)
// we currently need to intercept the data set environment and create a dependent stream env.
// this should be fixed once we rework the project dependencies
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
return new StreamContextEnvironment((ContextEnvironment) env);
} else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
return new StreamPlanEnvironment(env);
} else {
return createLocalEnvironment();
}
}
创建StreamExecutionEnvironment时线检查是否存在contextEnvironmentFactory,如果有直接从contextEnvironmentFactory创建返回。没有则通过ExecutionEnvironment创建。
本地模式调用createLocalEnvironment()方法创建StreamEnvironment。
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalParallelism);
}
public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
return createLocalEnvironment(parallelism, new Configuration());
}
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
final LocalStreamEnvironment currentEnvironment;
currentEnvironment = new LocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
return currentEnvironment;
}
方法一轮轮调用下来最终实例化一个LocalStreamEnvironment返回。
LocalStreamEnvironment
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);
if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}
MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}
整体代码分为以下几步:
-
创建streamGraph
-
通过streamGraph创建jobGraph
-
创建Configuration
-
创建MiniClusterConfiguration,并设置每一个TaskManager使用的slot数量setNumSlotsPerTaskManager
-
创建miniCluster
-
通过miniCluster.executeJobBlocking执行jobGraph
注:jobGraph是我们要利用miniCluster运行获取结果的Graph有向无环图。
MiniCluster
public void start() throws Exception {
synchronized (lock) {
checkState(!running, "FlinkMiniCluster is already running");
LOG.info("Starting Flink Mini Cluster");
LOG.debug("Using configuration {}", miniClusterConfiguration);
-----------------获取配置信息---------------------
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
try {
---------初始化 IO Format类 --------------
initializeIOFormatClasses(configuration);
---------注册MetricsRegistry并实例化jobManagerMetricGroup-------
LOG.info("Starting Metrics Registry");
metricRegistry = createMetricRegistry(configuration);
this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry,
"localhost",
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
final RpcService jobManagerRpcService;
final RpcService resourceManagerRpcService;
final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
// bring up all the RPC services
LOG.info("Starting RPC Service(s)");
// we always need the 'commonRpcService' for auxiliary calls
commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
---------创建ActorSystem-------------
// TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(
configuration,
commonRpcService.getAddress(),
LOG);
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
---------实例化jobManagerRpcService、resourceManagerRpcService、taskManagerRpcServices ----------------
if (useSingleRpcService) {
for (int i = 0; i < numTaskManagers; i++) {
taskManagerRpcServices[i] = commonRpcService;
}
jobManagerRpcService = commonRpcService;
resourceManagerRpcService = commonRpcService;
this.resourceManagerRpcService = null;
this.jobManagerRpcService = null;
this.taskManagerRpcServices = null;
}
else {
// start a new service per component, possibly with custom bind addresses
final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
for (int i = 0; i < numTaskManagers; i++) {
taskManagerRpcServices[i] = createRpcService(
configuration, rpcTimeout, true, taskManagerBindAddress);
}
this.jobManagerRpcService = jobManagerRpcService;
this.taskManagerRpcServices = taskManagerRpcServices;
this.resourceManagerRpcService = resourceManagerRpcService;
}
----------创建ha services---------------
// create the high-availability services
LOG.info("Starting high-availability services");
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
configuration,
commonRpcService.getExecutor());
----------创建blob server---------------
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
----------创建心跳 server---------------
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
---------- 启动ResourceManager-----------------
// bring up the ResourceManager(s)
LOG.info("Starting ResourceManger");
resourceManagerRunner = startResourceManager(
configuration,
haServices,
heartbeatServices,
metricRegistry,
resourceManagerRpcService,
new ClusterInformation("localhost", blobServer.getPort()),
jobManagerMetricGroup);
---------创建BlobCacheService--------------------
blobCacheService = new BlobCacheService(
configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
);
---------启动TaskManager----------------
// bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
taskManagers = startTaskManagers(
configuration,
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
numTaskManagers,
taskManagerRpcServices);
--------启动调度程序rest端口-----------------
// starting the dispatcher rest endpoint
LOG.info("Starting dispatcher rest endpoint.");
-
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
20,
Time.milliseconds(20L));
final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
20,
Time.milliseconds(20L));
this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
configuration,
RestHandlerConfiguration.fromConfiguration(configuration),
resourceManagerGatewayRetriever,
blobServer.getTransientBlobService(),
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
haServices.getWebMonitorLeaderElectionService(),
new ShutDownFatalErrorHandler());
dispatcherRestEndpoint.start();
restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());
--------启动HistoryServerArchivist-----------------
// bring up the dispatcher that launches JobManagers when jobs submitted
LOG.info("Starting job dispatcher(s) for JobManger");
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
-------实例化UI数据显示器并启动-----------------
dispatcher = new StandaloneDispatcher(
jobManagerRpcService,
Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
configuration,
haServices,
resourceManagerRunner.getResourceManageGateway(),
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServicePath(),
new MemoryArchivedExecutionGraphStore(),
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
new ShutDownFatalErrorHandler(),
dispatcherRestEndpoint.getRestBaseUrl(),
historyServerArchivist);
dispatcher.start();
------获取ResourceManagerLeader、dispatcherLeaderRetriever并启动------------
resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
}
catch (Exception e) {
// cleanup everything
try {
close();
} catch (Exception ee) {
e.addSuppressed(ee);
}
throw e;
}
// create a new termination future
terminationFuture = new CompletableFuture<>();
// now officially mark this as running
running = true;
LOG.info("Flink Mini Cluster started successfully");
}
}
MiniCluster所做的事情较多,具体步骤如下:
-
获取配置信息
-
初始化 IO Format类
-
注册MetricsRegistry并实例化jobManagerMetricGroup
-
启动rpc服务
-
启动HA服务
-
启动resourceManager
-
启动TaskManagers
-
启动调度程序rest端口
-
在提交工作时启动JobManagers的分配器
-
获取ResourceManagerLeader、dispatcherLeaderRetriever并启动
总结
简化的描述下整个流程的处理过程:
-
创建获取对应的StreamExecutionEnvironment对象:LocalStreamEnvironment
-
调用StreamExecutionEnvironment对象的execute方法
-
获取streamGraph
-
获取jobGraph
-
实例化miniCluster
-
miniCluster.executeJobBlocking指定要运行的jobGraph
-
启动minCluster执行任务
启动各类所需服务(rpc、ha、resourceManager、TaskManagers等等)
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Definitive Guide to MongoDB
Peter Membrey、Wouter Thielen / Apress / 2010-08-26 / USD 44.99
MongoDB, a cross-platform NoSQL database, is the fastest-growing new database in the world. MongoDB provides a rich document orientated structure with dynamic queries that you’ll recognize from RDMBS ......一起来看看 《The Definitive Guide to MongoDB》 这本书的介绍吧!