内容简介:“通用”YARN应用涉及的角色及交互:RM:ResourceManager
点击关注上方“ 知了小巷 ”,
设为“置顶或星标”,第一时间送达干货。
ApplicationMaster<-->ResourceManager
“通用”YARN应用涉及的角色及交互:
RM:ResourceManager
AM:ApplicationMaster
NM:NodeManager
交互中用到的主要通信协议:
ApplicationClientProtocol
ApplicationMasterProtocol
ContainerManagementProtocol
Client<-->ResourceManager
客户端程序与RM进行交互,通过YarnClient对象来实现。
ApplicationMaster<-->ResourceManager
AM与RM进行交互,通过AMRMClientAsync对象来实现,
AMRMClientAsync.CallbackHandler异步处理事件信息。
ApplicationMaster<-->NodeManager
AM与NM进行交互,通过NMClientAsync对象来实现,主要是启动Container,
NMClientAsync.CallbackHandler异步处理Container事件。
接口请求和响应的proto message定义:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。
Hadoop版本3.2.1
Flink版本1.10
1.以Flink中Yarn per-job模式下
JobManager------
进程YarnJobClusterEntrypoint为例
// 起点是 YarnJobClusterEntrypoint#main 方法
// 落点是 YarnResourceManager
/**
* The yarn implementation of the resource manager. Used when the system is started
* via the resource framework YARN.
*/
public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>
implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
// 传说中的ApplicationMaster
...
/** resourceManagerClient与ResourceManager进行交互 Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
/** nodeManagerClient与NodeManager进行交互 Client to communicate with the Node manager and launch TaskExecutor processes. */
private NMClientAsync nodeManagerClient;
...
}
AMRMClientAsync
abstract class(YARN应用需要自定义实现),用来处理与ResourceManager之间的通信和交互,它提供对事件的异步更新操作,比如Container的分配和资源使用结束。它包含一个线程,定期向ResourceManager发送心跳。
需要通过实现AMRMClientAsync.CallbackHandler回调接口来配合AMRMClientAsync。
2.简单实例MyCallbackHandler
AMRMClientAsync客户端生命周期
3.AMRMClientAsync部分源码
4.AMRMClientAsyncImpl部分源码
5.AMRMClient部分源码
package org.apache.hadoop.yarn.client.api;
import ...
// 抽象类AMRMClient
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
...
}
6.AMRMClientImpl部分源码
7.ApplicationMasterProtocol部分源码
ApplicationMasterProtocol接口比较简单,只有三个方法
package org.apache.hadoop.yarn.api;
import ...
// 接口ApplicationMasterProtocol
public interface ApplicationMasterProtocol {
// 向RM注册自己(AM)
public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request)
throws YarnException, IOException;
// 告诉RM,让RM注销自己(AM),有可能AM已经成功执行结束,也有可能应用失败了
public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request)
throws YarnException, IOException;
// AM与RM之间的主要接口(方法),处理AllocateRequest并返回AllocateResponse
// 就是传说中的请求Container,是成批申请和响应的(比如Flink JobManager一次申请3个TaskManager)
// 最多执行一次,不会重复和过度分配
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException;
}
8.ApplicationMasterProtocolPBClientImpl部分源码
package org.apache.hadoop.yarn.api.impl.pb.client;
import ...
// 客户端ApplicationMasterProtocol接口的实现
public class ApplicationMasterProtocolPBClientImpl implements ApplicationMasterProtocol, Closeable {
private ApplicationMasterProtocolPB proxy;
public ApplicationMasterProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class);
// 底层会调用java.lang.reflect.Proxy#newProxyInstance
proxy =
(ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, clientVersion,
addr, conf);
}
...
}
9.ApplicationMasterProtocolPB
package org.apache.hadoop.yarn.api;
import ...
@Private
@Unstable
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB",
protocolVersion = 1)
public interface ApplicationMasterProtocolPB extends ApplicationMasterProtocolService.BlockingInterface {
}
10.ApplicationMasterProtocolService的定义
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationmaster_protocol.proto
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "ApplicationMasterProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_service_protos.proto";
service ApplicationMasterProtocolService {
rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
rpc allocate (AllocateRequestProto) returns (AllocateResponseProto);
}
11.ApplicationMasterProtocolPBServiceImpl部分源码
ApplicationMasterProtocolPB接口的服务端(RM)实现
package org.apache.hadoop.yarn.api.impl.pb.service;
import ...
@Private
public class ApplicationMasterProtocolPBServiceImpl implements ApplicationMasterProtocolPB {
private ApplicationMasterProtocol real;
// ResourceManager启动时会通过此构造方法初始化real对象
public ApplicationMasterProtocolPBServiceImpl(ApplicationMasterProtocol impl) {
this.real = impl;
}
...
}
12.ApplicationMasterService部分源码
package org.apache.hadoop.yarn.server.resourcemanager;
import ...
@SuppressWarnings("unchecked")
@Private
public class ApplicationMasterService extends AbstractService implements
ApplicationMasterProtocol {
// 最终会调用到这里的方法并返回结果
...
}
【END】
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Kafka 源码解析:网络交互模型
- 用Asp与XML实现交互的一个实例源码
- iOS 12 人机交互指南:交互(User Interaction)
- 生活NLP云服务“玩秘”站稳人机交互2.0语音交互场景
- asyncio之子进程交互
- 以太坊交互工具
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。