内容简介:本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。TransportContext 内部握有创建TransPortClient和TransPortServer的方法实现,但却属于最底层的RPC通讯设施。为什么呢?因为成员变量RPCHandler是抽象的,并没有具体的消息处理,而且TransportContext功能也在于创建TransPortClient客户端和T
本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。
Spark商业环境实战及调优进阶系列
- Spark商业环境实战-Spark内置框架rpc通讯机制及RpcEnv基础设施
- Spark商业环境实战-Spark事件监听总线流程分析
- Spark商业环境实战-Spark存储体系底层架构剖析
- Spark商业环境实战-Spark底层多个MessageLoop循环线程执行流程分析
- Spark商业环境实战-Spark二级调度系统Stage划分算法和最佳任务调度细节剖析
1. Spark 内置框架rpc通讯机制
TransportContext 内部握有创建TransPortClient和TransPortServer的方法实现,但却属于最底层的RPC通讯设施。为什么呢?
因为成员变量RPCHandler是抽象的,并没有具体的消息处理,而且TransportContext功能也在于创建TransPortClient客户端和TransPortServer服务端。具体解释如下:
Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to setup Netty Channel pipelines with a {@link org.apache.spark.network.server.TransportChannelHandler}. 复制代码
所以TransportContext只能为最底层的通讯基础。上层为NettyRPCEnv高层封装,并持有TransportContext引用,在TransportContext中传入NettyRpcHandler实体,来实现netty通讯回调Handler处理。TransportContext代码片段如下:
/* The TransportServer and TransportClientFactory both create a TransportChannelHandler for each * channel. As each TransportChannelHandler contains a TransportClient, this enables server * processes to send messages back to the client on an existing channel. */ public class TransportContext { private final Logger logger = LoggerFactory.getLogger(TransportContext.class); private final TransportConf conf; private final RpcHandler rpcHandler; private final boolean closeIdleConnections; private final MessageEncoder encoder; private final MessageDecoder decoder; public TransportContext(TransportConf conf, RpcHandler rpcHandler) { this(conf, rpcHandler, false); } 复制代码
1.1 客户端和服务端统一的消息接收处理器 TransportChannelHandlerer
TransportClient 和TransportServer 在配置Netty的pipeLine的handler处理器时,均采用TransportChannelHandler, 来做统一的消息receive处理。为什么呢?在于统一消息处理入口,TransportChannelHandlerer根据消息类型执行不同的处理,代码片段如下:
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception { if (request instanceof RequestMessage) { requestHandler.handle((RequestMessage) request); } else if (request instanceof ResponseMessage) { responseHandler.handle((ResponseMessage) request); } else { ctx.fireChannelRead(request); } 复制代码
}
TransportContext初始化Pipeline的代码片段:
public TransportChannelHandler initializePipeline( SocketChannel channel, RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) .addLast("handler", channelHandler); return channelHandler; } catch (RuntimeException e) { logger.error("Error while initializing Netty pipeline", e); throw e; } 复制代码
客户端和服务端统一的消息接收处理器 TransportChannelHandlerer 是这个函数:createChannelHandler(channel, channelRpcHandler)实现的,也即统一了这个netty的消息接受处理,代码片段如下:
/** * Creates the server- and client-side handler which is used to handle both RequestMessages and * ResponseMessages. The channel is expected to have been successfully created, though certain * properties (such as the remoteAddress()) may not be available yet. */ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) { TransportResponseHandler responseHandler = new TransportResponseHandler(channel); TransportClient client = new TransportClient(channel, responseHandler); TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler, conf.maxChunksBeingTransferred()); return new TransportChannelHandler(client, responseHandler, requestHandler, conf.connectionTimeoutMs(), closeIdleConnections); } 复制代码
不过transportClient对应的是TransportResponseHander,TransportServer对应的的是TransportRequestHander。 在进行消息处理时,首先会经过TransportChannelHandler根据消息类型进行处理器选择,分别进行netty的消息生命周期管理:
- exceptionCaught
- channelActive
- channelInactive
- channelRead
- userEventTriggered
1.2 transportClient对应的是ResponseMessage
客户端一旦发送消息(均为Request消息),就会在
private final Map<Long, RpcResponseCallback> outstandingRpcs;
private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches
中缓存,用于回调处理。
1.3 transportServer对应的是RequestMessage
服务端接收消息类型(均为Request消息)
- ChunkFetchRequest
- RpcRequest
- OneWayMessage
- StremRequest
服务端响应类型(均为Response消息):
- ChunkFetchSucess
- ChunkFetchFailure
- RpcResponse
- RpcFailure
2. Spark RpcEnv基础设施
2.1 上层建筑NettyRPCEnv
上层建筑NettyRPCEnv,持有TransportContext引用,在TransportContext中传入NettyRpcHandler实体,来实现netty通讯回调Handler处理
- Dispatcher
- TransportContext
- TransPortClientFactroy
- TransportServer
- TransportConf
2.2 RpcEndPoint 与 RPCEndPointRef 端点
- RpcEndPoint 为服务端
- RPCEndPointRef 为客户端
2.2 Dispacher 与 Inbox 与 Outbox
- 一个端点对应一个Dispacher,一个Inbox , 多个OutBox
- RpcEndpoint:RPC端点 ,Spark针对于每个节点(Client/Master/Worker)都称之一个Rpc端点 ,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher
- RpcEnv:RPC上下文环境,每个Rpc端点运行时依赖的上下文环境称之为RpcEnv
- Dispatcher:消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己存入收件箱,如果指令接收方为非自身端点,则放入发件箱
- Inbox:指令消息收件箱,一个本地端点对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部待Receiver Queue中,另外Dispatcher创建时会启动一个单独线程进行轮询Receiver Queue,进行收件箱消息消费
- OutBox:指令消息发件箱,一个远程端点对应一个发件箱,当消息放入Outbox后,紧接着将消息通过TransportClient发送出去。消息放入发件箱以及发送过程是在同一个线程中进行,这样做的主要原因是远程消息分为RpcOutboxMessage, OneWayOutboxMessage两种消息,而针对于需要应答的消息直接发送且需要得到结果进行处理
- TransportClient:Netty通信客户端,根据OutBox消息的receiver信息,请求对应远程TransportServer
- TransportServer:Netty通信服务端,一个RPC端点一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱
Spark在Endpoint的设计上核心设计即为Inbox与Outbox,其中Inbox核心要点为:
- 内部的处理流程拆分为多个消息指令(InboxMessage)存放入Inbox
- 当Dispatcher启动最后,会启动一个名为【dispatcher-event-loop】的线程扫描Inbox待处理InboxMessage,并调用Endpoint根据InboxMessage类型做相应处理
- 当Dispatcher启动最后,默认会向Inbox存入OnStart类型的InboxMessage,Endpoint在根据OnStart指令做相关的额外启动工作,端点启动后所有的工作都是对OnStart指令处理衍生出来的,因此可以说OnStart指令是相互通信的源头。
-
注意: 一个端点对应一个Dispacher,一个Inbox , 多个OutBox,可以看到 inbox在Dispacher 中且在EndPointData内部:
private final RpcHandler rpcHandler; /** * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s). */ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) } private val endpoints = new ConcurrentHashMap[String, EndpointData] private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef] // Track the receivers whose inboxes may contain messages. private val receivers = new LinkedBlockingQueue[EndpointData] 复制代码
-
注意: 一个端点对应一个Dispacher,一个Inbox , 多个OutBox,可以看到 OutBox在NettyRpcEnv内部:
private[netty] class NettyRpcEnv( val conf: SparkConf, javaSerializerInstance: JavaSerializerInstance, host: String, securityManager: SecurityManager) extends RpcEnv(conf) with Logging { private val dispatcher: Dispatcher = new Dispatcher(this) private val streamManager = new NettyStreamManager(this) private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this, streamManager)) /** * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]], * we just put messages to its [[Outbox]] to implement a non-blocking `send` method. */ private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]() 复制代码
2.3 Dispacher 与 Inbox 与 Outbox
Dispatcher的代码片段中,包含了核心的消息发送代码逻辑,意思是:向服务端发送一条消息,也即同时放进Dispatcher中的receiverrs中,也放进inbox的messages中。这个高层封装,如Master和Worker端点发送消息都是通过NettyRpcEnv中的 Dispatcher来实现的。在Dispatcher中有一个线程,叫做MessageLoop,实现消息的及时处理。
/** * Posts a message to a specific endpoint. * * @param endpointName name of the endpoint. * @param message the message to post * @param callbackIfStopped callback function if the endpoint is stopped. */ private def postMessage( endpointName: String, message: InboxMessage, callbackIfStopped: (Exception) => Unit): Unit = { val error = synchronized { val data = endpoints.get(endpointName) if (stopped) { Some(new RpcEnvStoppedException()) } else if (data == null) { Some(new SparkException(s"Could not find $endpointName.")) } else { data.inbox.post(message) receivers.offer(data) None } } 复制代码
注意:默认第一条消息为onstart,为什么呢?看这里:
看到下面的 new EndpointData(name, endpoint, endpointRef) 了吗?
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") } if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) { throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) receivers.offer(data) // for the OnStart message } endpointRef 复制代码
}
注意EndpointData里面包含了inbox,因此Inbox初始化的时候,放进了onstart
private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) 复制代码
}
onstart在Inbox初始化时出现了,注意每一个端点只有一个inbox,比如:master 节点。
2.4 发送消息流程为分为两种,一种端点(Master)自己把消息发送到本地Inbox,一种端点(Master)接收到消息后,通过TransPortRequestHander接收后处理,扔进Inbox
2.4.1 端点(Master)自己把消息发送到本地Inbox
- endpoint(Master) -> NettyRpcEnv-> Dispatcher -> postMessage -> MessageLoop(Dispatcher) -> inbox -> process -> endpoint.receiveAndReply 复制代码
解释如下:端点通过自己的RPCEnv环境,向自己的Inbox中发送消息,然后交由Dispatch来进行消息的处理,调用了端点自己的receiveAndReply方法
-
这里着重讲一下MessageLoop是什么时候启动的,参照Dispatcher的代码段如下,一旦初始化就会启动,因为是成员变量:
private val threadpool: ThreadPoolExecutor = { val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, Runtime.getRuntime.availableProcessors())) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) } pool } 复制代码
-
接着讲nettyRpcEnv是何时初始化的,Dispatcher是何时初始化的?
master初始化RpcEnv环境时,调用NettyRpcEnvFactory().create(config)进行初始化nettyRpcEnv,然后其成员变量Dispatcher开始初始化,然后Dispatcher内部成员变量threadpool开始启动messageLoop,然后开始处理消息,可谓是一环套一环啊。如下是Master端点初始化RPCEnv。
在NettyRpcEnv中,NettyRpcEnvFactory的create方法如下:
其中nettyRpcEnv.startServer,代码段如下,然后调用底层 transportContext.createServer来创建Server,并初始化netty 的 pipeline:
server = transportContext.createServer(host, port, bootstraps) dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) 复制代码
最终端点开始不断向自己的Inboxz中发送消息即可,代码段如下:
private def postMessage( endpointName: String, message: InboxMessage, callbackIfStopped: (Exception) => Unit): Unit = { error = synchronized { val data = endpoints.get(endpointName) if (stopped) { Some(new RpcEnvStoppedException()) } else if (data == null) { Some(new SparkException(s"Could not find $endpointName.")) } else { data.inbox.post(message) receivers.offer(data) None } } 复制代码
2.4.2 端点(Master)接收到消息后,通过TransPortRequestHander接收后处理,扔进Inbox
- endpointRef(Worker) ->TransportChannelHandler -> channelRead0 -> TransPortRequestHander -> handle -> processRpcRequest ->NettyRpcHandler(在NettyRpcEnv中) -> receive -> internalReceive -> dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) (响应)-> dispatcher.postRemoteMessage(messageToDispatch, callback) (发送远端来的消息放进inbox)-> postMessage -> inbox -> process 复制代码
如下图展示了整个消息接收到inbox的流程:
下图展示了 TransportChannelHandler接收消息:
@Override public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception { if (request instanceof RequestMessage) { requestHandler.handle((RequestMessage) request); } else { responseHandler.handle((ResponseMessage) request); } } 复制代码
然后TransPortRequestHander来进行消息匹配处理:
最终交给inbox的process方法,实际上由端点 endpoint.receiveAndReply(context)方法处理:
/** * Process stored messages. */ def process(dispatcher: Dispatcher): Unit = { var message: InboxMessage = null inbox.synchronized { if (!enableConcurrent && numActiveThreads != 0) { return } message = messages.poll() if (message != null) { numActiveThreads += 1 } else { return } } while (true) { safelyCall(endpoint) { message match { case RpcMessage(_sender, content, context) => try { endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg => throw new SparkException(s"Unsupported message $message from ${_sender}") }) } catch { case NonFatal(e) => context.sendFailure(e) // Throw the exception -- this exception will be caught by the safelyCall function. // The endpoint's onError function will be called. throw e } case OneWayMessage(_sender, content) => endpoint.receive.applyOrElse[Any, Unit](content, { msg => throw new SparkException(s"Unsupported message $message from ${_sender}") }) case OnStart => endpoint.onStart() if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) { inbox.synchronized { if (!stopped) { enableConcurrent = true } } } case OnStop => val activeThreads = inbox.synchronized { inbox.numActiveThreads } assert(activeThreads == 1, s"There should be only a single active thread but found $activeThreads threads.") dispatcher.removeRpcEndpointRef(endpoint) endpoint.onStop() assert(isEmpty, "OnStop should be the last message") case RemoteProcessConnected(remoteAddress) => endpoint.onConnected(remoteAddress) case RemoteProcessDisconnected(remoteAddress) => endpoint.onDisconnected(remoteAddress) case RemoteProcessConnectionError(cause, remoteAddress) => endpoint.onNetworkError(cause, remoteAddress) } } inbox.synchronized { // "enableConcurrent" will be set to false after `onStop` is called, so we should check it // every time. if (!enableConcurrent && numActiveThreads != 1) { // If we are not the only one worker, exit numActiveThreads -= 1 return } message = messages.poll() if (message == null) { numActiveThreads -= 1 return } } } 复制代码
}
以上所述就是小编给大家介绍的《Spark内置框架rpc通讯机制及RpcEnv基础设施-Spark商业环境实战》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
创投之巅——中国创投精彩案例
投资界网站 / 人民邮电出版社 / 2018-11 / 69.00
中国的科技产业发展,与创投行业密不可分。在过去的几十年间,资本与科技的结合,缔造了众多创业“神话”。回顾这些科技巨头背后的资本路径,可以给如今的国内创业者很多有益的启发。 本书从风险投资回报率、投资周期、利润水平、未来趋势等多个维度,筛选出了我国过去几十年中最具代表性的创业投资案例,对其投资过程和企业成长过程进行复盘和解读,使读者可以清晰地看到优秀创业公司的价值与卓越投资人的投资逻辑。一起来看看 《创投之巅——中国创投精彩案例》 这本书的介绍吧!