阿里开源分布式事务组件 seata :seata server 通信层解析

栏目: 后端 · 发布时间: 6年前

内容简介:seata client 和 seata server 间是需要通过网络通信来传递信息的,client 发送请求消息给 server,server 根据实际的处理逻辑,可能会给 client 发送相应的响应消息,或者不响应任何消息。在 seata 中,客户端和服务端的通信实现,被抽象成来公共的模块,它的 package 位于这个包名叫 rpc,这个包下的很多类名也有 rpc 相关的字眼,而实际上在我看来,这个通信框架并不是一个常规意义的 rpc 框架,如果硬要揪书本知识,那么 rpc 的解释如下:远程过程调

RPC ?

seata client 和 seata server 间是需要通过网络通信来传递信息的,client 发送请求消息给 server,server 根据实际的处理逻辑,可能会给 client 发送相应的响应消息,或者不响应任何消息。在 seata 中,客户端和服务端的通信实现,被抽象成来公共的模块,它的 package 位于 io.seata.core.rpc   中。

这个包名叫 rpc,这个包下的很多类名也有 rpc 相关的字眼,而实际上在我看来,这个通信框架并不是一个常规意义的 rpc 框架,如果硬要揪书本知识,那么 rpc 的解释如下:

远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而 程序员 无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。

在以 dubbo 为代表的微服务时代下,dubbo 常规意义上我们都称之为 rpc 框架,rpc 的理论原则是:程序员无需额外地为这个交互作用编程。那么对于像 dubbo 这样的 rpc 实现,它能让 client 像调用本地代码 api 一样,来调用远程 server 上的某个 method。  

在 client 这一层直接面向 interface 编程,通过动态代理的方式,对上层屏蔽掉通信细节,在底层,将方法调用,通过序列化方式,封装成一个二进制数据串发送给 server,server 层解析该消息,通过反射的方式,将 interface 对应的 implemention 执行起来,将执行结果,扁平化成一个二进制数据串,回送给 client,client 收到数据后,拼装成 interface api 所定义的返回值类型的一个实例,作为方法调用的返回值。整个底层的细节,应用层面并不需要了解,应用层只需要以 interface.method 的方式,就像代码在本地执行一样,就能把远端 interface_implemention.method 给调用起来。

阿里开源分布式事务组件 seata :seata server 通信层解析

而 seata 的 rpc 框架上,实际上仅仅是一个普通的基于 netty 的网络通信框架,client 与 server 之间通过发送 request 和 response 来达到相互通信的目的,在 seata 中的每个 request 和 response 类,都实现了如何把自己序列化的逻辑。  

各种消息类型,都实现了   io.seata.core.protocol.MessageCodec

接口


 

public interface MessageCodec {

/**

* Gets type code.

*

* @return the type code

*/

short getTypeCode();


/**

* Encode byte [ ].

*

* @return the byte [ ]

*/

byte[] encode();


/**

* Decode boolean.

*

* @param in the in

* @return the boolean

*/

boolean decode(ByteBuf in);

}

阿里开源分布式事务组件 seata :seata server 通信层解析

  io.seata.core.protocol.GlobalBeginRequest   为例,它都 decode 和 encode 实现如下所示:


 

@Override

public byte[] encode() {

ByteBuffer byteBuffer = ByteBuffer.allocate(256);

byteBuffer.putInt(timeout);


if (this.transactionName != null) {

byte[] bs = transactionName.getBytes(UTF8);

byteBuffer.putShort((short)bs.length);

if (bs.length > 0) {

byteBuffer.put(bs);

}

} else {

byteBuffer.putShort((short)0);

}


byteBuffer.flip();

byte[] content = new byte[byteBuffer.limit()];

byteBuffer.get(content);

return content;

}


@Override

public void decode(ByteBuffer byteBuffer) {

this.timeout = byteBuffer.getInt();


short len = byteBuffer.getShort();

if (len > 0) {

byte[] bs = new byte[len];

byteBuffer.get(bs);

this.setTransactionName(new String(bs, UTF8));

}

}

这意味着,发送方先对 message 做 encode 动作形成字节数组,将字节数组发往接收方,接收方收到字节数组后,对字节数组先判断 message type,再用对应的 message 类型对字节数组做 decode 动作。

类的组织形式

从 seata server 的入口类   io.seata.server.Server   分析,main 方法如下所示:


 

/**

* The entry point of application.

*

* @param args the input arguments

* @throws IOException the io exception

*/

public static void main(String[] args) throws IOException {

RpcServer rpcServer = new RpcServer(WORKING_THREADS);


int port = SERVER_DEFAULT_PORT;

//server port

if (args.length > 0) {

try {

port = Integer.parseInt(args[0]);

} catch (NumberFormatException e) {

System.err.println("Usage: sh services-server.sh $LISTEN_PORT $PATH_FOR_PERSISTENT_DATA");

System.exit(0);

}

}

rpcServer.setListenPort(port);


//log store mode : file、db

String storeMode = null;

if (args.length > 1) {

storeMode = args[1];

}


UUIDGenerator.init(1);

SessionHolder.init(storeMode);


DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);

coordinator.init();

rpcServer.setHandler(coordinator);

// register ShutdownHook

ShutdownHook.getInstance().addDisposable(coordinator);


if (args.length > 2) {

XID.setIpAddress(args[2]);

} else {

XID.setIpAddress(NetUtil.getLocalIp());

}

XID.setPort(rpcServer.getListenPort());


rpcServer.init();


System.exit(0);

}

可以看到 seata server 使用一个 RpcServer 类来启动它的服务监听端口,这个端口用来接收 seata client 的消息,RpcServer 这个类是通信层的实现分析的入口。  

在这里,SessionHolder 用来做全局事务树的管理,DefaultCoordinator 用来处理事务执行逻辑,而 RpcServer 是这两者可以正常运行的基础,这篇文章的重点在于剖析 RpcServer 的实现,进而延伸到 seata 整个通信框架的细节。  

如果先从 RpcServer 的类继承图看的话,那么我们能发现一些与常规思维不太一样的地方,类继承图如下

:

阿里开源分布式事务组件 seata :seata server 通信层解析

褐色部分是 netty 的类,灰色部分是 seata 的类。

在一般常规的思维中,依赖 netty 做一个 server,大致的思路是:

  1. 定义一个 xxx server 类

  2. 在这个类中设置初始化 netty bootstrap,eventloop,以及设置相应的 ChannelHandler

在这种思维下,很容易想到,server 与 ChannelHandler 之间的关系应该是一个“组合”的关系,即在我们构建 server 的过程中,应该把 ChannelHandler 当作参数传递给 server,成为 server 类的成员变量。

没错,这是我们一般情况下的思维。不过 seata 在这方面却不那么“常规”,从上面的类继承图中可以看到,从 RpcServer 这个类开始向上追溯,发现它其实是 ChannelDuplexHandler 的一个子类或者实例。这种逻辑让人一时很困惑,一个问题在我脑海里浮现:“当我启动一个 RpcServer 的时候,我是真的在启动一个 server 吗?看起来我好像在启动一个 ChannelHandler,可是 ChannelHandler 怎么谈得上‘启动’呢?”

异步转同步的 Future 机制

首先分析 AbstractRpcRemoting 这个类,它直接继承自 ChannelDuplexHandler 类,而 ChannelDuplexHandler 是 netty 中 inbound handler 和 outbound handler 的结合体。  

AbstractRpcRemoting 的 init 方法里,仅仅通过 Java 中的定时任务执行线程池启动了一个定时执行的任务:


 

/**

* Init.

*/

public void init() {

timerExecutor.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());


for (MessageFuture future : futures.values()) {

if (future.isTimeout()) {

timeoutMessageFutures.add(future);

}

}


for (MessageFuture messageFuture : timeoutMessageFutures) {

futures.remove(messageFuture.getRequestMessage().getId());

messageFuture.setResultMessage(null);

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());

}

}

nowMills = System.currentTimeMillis();

}

}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);

}

这个定时任务的逻辑也比较简单:扫描   ConcurrentHashMap<Long, MessageFuture> futures   这个成员变量里的 MessageFuture,如果这个 Future 超时了,就将 Future 的结果设置为 null。逻辑虽然简单,但这个功能涉及到了异步通信里一个很常见的功能,即 异步转同步 的功能。  

在 netty 这种基于 NIO 的通信方式中,数据的发送,接收,全部是非阻塞的,因此判断一个动作完成与否,并不能像传统的 Java 同步代码一样,代码执行完了就认为相应的动作也真正完成了,例如,在 netty 中,如果通过 channel.write(); 方法往对端发送一个数据,这个方法执行完了,并不代表数据发送出去了,channel.write() 方法会返回一个 future,应用代码应该利用这个 future ,通过这个 future 可以知道数据到底发送出去了没有,也可以为这个 future 添加动作完成后的回调逻辑,也可以阻塞等待这个 future 所关联的动作执行完毕。  

在 seata 中,存在着 发送一个请求,并等待相应 这样的使用场景,上层的 api 可能是这么定义的:  

public Response request(Request request) {}  

而基于 nio 的底层数据发送逻辑却是这样的:

  1. send request message

  2. 为业务的请求构建一个业务层面的 future 实例

  3. 阻塞等待在这个 future 上

  4. 当收到对应的 response message 后,唤醒上面的 future,阻塞等待在这个 future 上的线程继续执行

  5. 拿到结果,request 方法结束

AbstractRpcRemoting 定义了几个数据发送相关的方法,分别是:


 

/**

* Send async request with response object.

*

* @param address the address

* @param channel the channel

* @param msg the msg

* @return the object

* @throws TimeoutException the timeout exception

*/

protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg) throws TimeoutException;


/**

* Send async request with response object.

*

* @param address the address

* @param channel the channel

* @param msg the msg

* @param timeout the timeout

* @return the object

* @throws TimeoutException the timeout exception

*/

protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws

TimeoutException;


/**

* Send async request without response object.

*

* @param address the address

* @param channel the channel

* @param msg the msg

* @return the object

* @throws TimeoutException the timeout exception

*/

protected Object sendAsyncRequestWithoutResponse(String address, Channel channel, Object msg) throws

TimeoutException;

这几个方法就符合上面说到的 发送一个请求,并等待相应 这样的使用场景,上面这三个方法,其实都委托给了   sendAsyncRequest   来实现,这个方法的代码是这样子的:


 

private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)

throws TimeoutException {

if (channel == null) {

LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");

return null;

}

final RpcMessage rpcMessage = new RpcMessage();

rpcMessage.setId(RpcMessage.getNextMessageId());

rpcMessage.setAsync(false);

rpcMessage.setHeartbeat(false);

rpcMessage.setRequest(true);

rpcMessage.setBody(msg);


final MessageFuture messageFuture = new MessageFuture();

messageFuture.setRequestMessage(rpcMessage);

messageFuture.setTimeout(timeout);

futures.put(rpcMessage.getId(), messageFuture);


if (address != null) {

ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;

BlockingQueue<RpcMessage> basket = map.get(address);

if (basket == null) {

map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());

basket = map.get(address);

}

basket.offer(rpcMessage);

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("offer message: " + rpcMessage.getBody());

}

if (!isSending) {

synchronized (mergeLock) {

mergeLock.notifyAll();

}

}

} else {

ChannelFuture future;

channelWriteableCheck(channel, msg);

future = channel.writeAndFlush(rpcMessage);

future.addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) {

if (!future.isSuccess()) {

MessageFuture messageFuture = futures.remove(rpcMessage.getId());

if (messageFuture != null) {

messageFuture.setResultMessage(future.cause());

}

destroyChannel(future.channel());

}

}

});

}

if (timeout > 0) {

try {

return messageFuture.get(timeout, TimeUnit.MILLISECONDS);

} catch (Exception exx) {

LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);

if (exx instanceof TimeoutException) {

throw (TimeoutException)exx;

} else {

throw new RuntimeException(exx);

}

}

} else {

return null;

}

}

先抛开方法的其它细节,比如说同步写还是异步写,以及发送频率控制。我们可以发现,这个方法其实从大角度来划分,就是如下的步骤:

  1. 构造请求 message

  2. 为这个请求 message 构造一个 message future

  3. 发送数据

  4. 阻塞等待在 message future

不过 AbstractRpcRemoting 也定义了方法用于 仅发送消息,不接收响应 的使用场景,如下所示:


 

/**

* Send request.

*

* @param channel the channel

* @param msg the msg

*/

protected void sendRequest(Channel channel, Object msg) {

RpcMessage rpcMessage = new RpcMessage();

rpcMessage.setAsync(true);

rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);

rpcMessage.setRequest(true);

rpcMessage.setBody(msg);

rpcMessage.setId(RpcMessage.getNextMessageId());

if (msg instanceof MergeMessage) {

mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);

}

channelWriteableCheck(channel, msg);

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"

+ channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());

}

channel.writeAndFlush(rpcMessage);

}


/**

* Send response.

*

* @param msgId the msg id

* @param channel the channel

* @param msg the msg

*/

protected void sendResponse(long msgId, Channel channel, Object msg) {

RpcMessage rpcMessage = new RpcMessage();

rpcMessage.setAsync(true);

rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);

rpcMessage.setRequest(false);

rpcMessage.setBody(msg);

rpcMessage.setId(msgId);

channelWriteableCheck(channel, msg);

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("send response:" + rpcMessage.getBody() + ",channel:" + channel);

}

channel.writeAndFlush(rpcMessage);

}

这样的场景就不需要引入 future 机制,直接调用 netty 的 api 把数据发送出去就完事了。  

分析思路回到有 future 的场景,发送数据后,要在 future 上进行阻塞等待,即调用 get 方法,那 get 方法什么返回呢,我们上面说到 future 被唤醒的时候,我们先不讨论 future 的实现细节,一个 future 什么时候被唤醒呢,在这种 请求-响应 的模式下,显然是收到了响应的时候。所以我们需要查看一下 AbstractRpcRemoting 的 channelRead 方法


 

@Override

public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {

if (msg instanceof RpcMessage) {

final RpcMessage rpcMessage = (RpcMessage)msg;

if (rpcMessage.isRequest()) {

if (LOGGER.isDebugEnabled()) {

LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));

}

try {

AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {

@Override

public void run() {

try {

dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());

} catch (Throwable th) {

LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);

}

}

});

} catch (RejectedExecutionException e) {

LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),

"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());

if (allowDumpStack) {

String name = ManagementFactory.getRuntimeMXBean().getName();

String pid = name.split("@")[0];

int idx = new Random().nextInt(100);

try {

Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");

} catch (IOException exx) {

LOGGER.error(exx.getMessage());

}

allowDumpStack = false;

}

}

} else {

MessageFuture messageFuture = futures.remove(rpcMessage.getId());

if (LOGGER.isDebugEnabled()) {

LOGGER.debug(String

.format("%s msgId:%s, future :%s, body:%s", this, rpcMessage.getId(), messageFuture,

rpcMessage.getBody()));

}

if (messageFuture != null) {

messageFuture.setResultMessage(rpcMessage.getBody());

} else {

try {

AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {

@Override

public void run() {

try {

dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());

} catch (Throwable th) {

LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);

}

}

});

} catch (RejectedExecutionException e) {

LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),

"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());

}

}

}

}

}

可以看到调用了 messageFuture 当 setResultMessage() 方法,设置 future 的结果,也就是说,唤醒了 future,那么阻塞在 future 的 get 方法上的线程就被唤醒了,得到结果,继续往下执行。

接下来我们讨论 MessageFuture 的实现细节,其实 seata 里面有很多种 future 相关的类,实现方式也不太一样,不过都大同小异,有的是基于 CompletableFuture 实现,有的是基于 CountDownLatch 实现。比如说,MessageFuture 就是基于 CompletableFuture 实现的,先看看它的成员变量:


 

private RpcMessage requestMessage;

private long timeout;

private long start = System.currentTimeMillis();

private transient CompletableFuture origin = new CompletableFuture();

CompletableFuture 是它的一个成员变量,它被利用来阻塞当前线程。MessageFuture 的 get 方法,依赖于 CompletableFuture 的 get 方法,来实现有一定时间限制的等待,直到另一个线程唤醒 CompletableFuture。如下所示:


 

/**

* Get object.

*

* @param timeout the timeout

* @param unit the unit

* @return the object

* @throws TimeoutException the timeout exception

* @throws InterruptedException the interrupted exception

*/

public Object get(long timeout, TimeUnit unit) throws TimeoutException,

InterruptedException {

Object result = null;

try {

result = origin.get(timeout, unit);

} catch (ExecutionException e) {

throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);

} catch (TimeoutException e) {

throw new TimeoutException("cost " + (System.currentTimeMillis() - start) + " ms");

}


if (result instanceof RuntimeException) {

throw (RuntimeException)result;

} else if (result instanceof Throwable) {

throw new RuntimeException((Throwable)result);

}


return result;

}


/**

* Sets result message.

*

* @param obj the obj

*/

public void setResultMessage(Object obj) {

origin.complete(obj);

}

既然说到了 future 机制,这里也顺便把   io.seata.config.ConfigFuture   提一下,它就是上面提到的基于 CountDownLatch 实现的一种 future 机制,虽然实现方式两者不一样,但完成的功能和作用是一样的。


 

private final CountDownLatch latch = new CountDownLatch(1);


/**

* Get object.

*

* @param timeout the timeout

* @param unit the unit

* @return the object

* @throws InterruptedException the interrupted exception

*/

public Object get(long timeout, TimeUnit unit) {

this.timeoutMills = unit.toMillis(timeout);

try {

boolean success = latch.await(timeout, unit);

if (!success) {

LOGGER.error(

"config operation timeout,cost:" + (System.currentTimeMillis() - start) + " ms,op:" + operation

.name()

+ ",dataId:" + dataId);

return getFailResult();

}

} catch (InterruptedException exx) {

LOGGER.error("config operate interrupted,error:" + exx.getMessage());

return getFailResult();

}

if (operation == ConfigOperation.GET) {

return result == null ? content : result;

} else {

return result == null ? Boolean.FALSE : result;

}

}


/**

* Sets result.

*

* @param result the result

*/

public void setResult(Object result) {

this.result = result;

latch.countDown();

}

阻塞操作调用了 CountDownLatch 的 await 方法,而唤醒操作则调用 countDown 方法,核心在于需要把 CountDownLatch 的 latch 值设置为 1。

实际上,Java 语言本身已经提供了 java.util.concurrent.Future 这个类来提供 Future 机制,但 Java 原生的 Future 机制功能过于单一,比如说不能主动设置 future 的结果,也不能为它添加 listener,所有有许多像 seata 这样的软件,会选择去重新实现一种 future 机制来满足异步转同步的需求。也有像 netty 这样的软件,它不会借助类似于 countdownlatch 来实现,而是直接扩展 java.util.concurrent.Future,在它的基础上添加功能。

防洪机制

在 AbstractRpcRemoting 中,往外发数据的时候,它都会先进行一个检查,即检查当前的 channel 是否可写。


 

private void channelWriteableCheck(Channel channel, Object msg) {

int tryTimes = 0;

synchronized (lock) {

while (!channel.isWritable()) {

try {

tryTimes++;

if (tryTimes > NettyClientConfig.getMaxNotWriteableRetry()) {

destroyChannel(channel);

throw new FrameworkException("msg:" + ((msg == null) ? "null" : msg.toString()),

FrameworkErrorCode.ChannelIsNotWritable);

}

lock.wait(NOT_WRITEABLE_CHECK_MILLS);

} catch (InterruptedException exx) {

LOGGER.error(exx.getMessage());

}

}

}

}

这要从 netty 的内部机制说起,当调用 ChannelHandlerContext 或者 Channel 的 write 方法时,netty 只是把要写的数据放入了自身的一个环形队列里面,再由后台线程真正往链路上发。如果接受方的处理速度慢,也就是说,接收的速度慢,那么根据 tcpip 协议的滑动窗口机制,它也会导致发送方发送得慢。  

我们可以把 netty 的环形队列想像成一个水池,调用 write 方法往池子里加水,netty 通过后台线程,慢慢把池子的水流走。这就有可能出现一种情况,由于池子水流走的速度远远慢于往池子里加水的速度,这样会导致池子的总水量随着时间的推移越来越多。所以往池子里加水时应该考虑当前池子里的水量,否则最终会导致应用的内存溢出。  

netty 对于水池提供了两个设置,一个是 高水位 ,一个是 低水位

,当池子里的水高于高水位时,这个时候 channel.isWritable() 返回 false,并且直到水位慢慢降回到低水位时,这个方法才会返回 true。

阿里开源分布式事务组件 seata :seata server 通信层解析

上述的 channelWriteableCheck 方法,发现channel 不可写的时候,进入循环等待,等待的目的是让池子的水位下降到 low water mark,如果等待超过最大允许等待的时间,那么将会抛出异常并关闭连接。

消息队列

在 AbstractRpcRemoting 中,发送数据有两种方式,一种是直接调用 channel 往外写,另一种是先把数据放进“数据篮子”里,它实际上是一个 map, key 为远端地址,value为一个消息队列。数据放队列后,再由其它线程往外发。下面是 sendAsycRequest 方法的一部分代码,显示了这种机制:


 

ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;

BlockingQueue<RpcMessage> basket = map.get(address);

if (basket == null) {

map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());

basket = map.get(address);

}

basket.offer(rpcMessage);

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("offer message: " + rpcMessage.getBody());

}

if (!isSending) {

synchronized (mergeLock) {

mergeLock.notifyAll();

}

}

但我们在 AbstractRpcRemoting 里面没有看有任何额外的线程在晴空这个 basketMap。回顾一下上面的 RpcServer 的类继承体系,接下来我们要分析一下,AbstractRpcRemotingServer 这个类。

阿里开源分布式事务组件 seata :seata server 通信层解析

AbstractRpcRemotingServer 这个类主要定义了于netty 启动一个 server bootstrap 相关的类,可见真正启动服务监听端口的是在这个类,先看一下它的start方法 :


 

@Override

public void start() {

this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)

.channel(nettyServerConfig.SERVER_CHANNEL_CLAZZ)

.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())

.option(ChannelOption.SO_REUSEADDR, true)

.childOption(ChannelOption.SO_KEEPALIVE, true)

.childOption(ChannelOption.TCP_NODELAY, true)

.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())

.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())

.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,

new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),

nettyServerConfig.getWriteBufferHighWaterMark()))

.localAddress(new InetSocketAddress(listenPort))

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) {

ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))

.addLast(new MessageCodecHandler());

if (null != channelHandlers) {

addChannelPipelineLast(ch, channelHandlers);

}


}

});


if (nettyServerConfig.isEnableServerPooledByteBufAllocator()) {

this.serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyServerConfig.DIRECT_BYTE_BUF_ALLOCATOR);

}


try {

ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();

LOGGER.info("Server started ... ");

RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));

initialized.set(true);

future.channel().closeFuture().sync();

} catch (Exception exx) {

throw new RuntimeException(exx);

}

}

这个类很常规,就是遵循 netty 的使用规范,用合适的配置启动一个 server,并调用注册中心 api 把自己作为一个服务发布出去。  

我们可以看到,配置中确实也出现了我们上文中提到过的上下水位的配置。  

另外,channelpipeline 中,除了添加一个保持链路有效性探测的 IdleStateHandler,和一个 MessageCodec,处理事务逻辑相关的 Handler 还需要由参数传入。

接下来我们看 RpcServer 这个类,从它的 init 方法里,我们可以看到,它把自己做为一个 ChannelHandler,加入到了 channel pipeline 中


 

/**

* Init.

*/

@Override

public void init() {

super.init();

setChannelHandlers(RpcServer.this);

DefaultServerMessageListenerImpl defaultServerMessageListenerImpl = new DefaultServerMessageListenerImpl(

transactionMessageHandler);

defaultServerMessageListenerImpl.init();

defaultServerMessageListenerImpl.setServerMessageSender(this);

this.setServerMessageListener(defaultServerMessageListenerImpl);

super.start();


}

RpcServer 自身也实现了 channelRead 方法,但它只处理心跳相关的信息和注册相关的信息,其它的业务消息,它交给父类处理,而先前我们也已经看到,父类的channelRead

方法里,反过来会调用 dispatch 这个抽象方法去做消息的分发,而 RpcServer 类实现了这个抽象方法,在接收到不同的消息类型是,采取不同的处理流程。

关于事务的处理流程的细节,本篇文章暂不涉及,后续文章再慢慢分析。

行文至此,回想我们先前提到的一个疑惑:

“当我启动一个 RpcServer 的时候,我是真的在启动一个 server 吗?看起来我好像在启动一个 ChannelHandler,可是 ChannelHandler 怎么谈得上‘启动’呢?”

是的,我们既在启动一个 server,这个 server 也实现了事务处理逻辑,它同时也是个 ChannelHandler。

没有一定的事实标准去衡量这样写的代码是好是坏,我们也没必要去争论 Effective Java 提到的什么时候该用组合,什么时候该用继承。

本文到此结束。


以上所述就是小编给大家介绍的《阿里开源分布式事务组件 seata :seata server 通信层解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Data Structures and Algorithms in Java

Data Structures and Algorithms in Java

Robert Lafore / Sams / 2002-11-06 / USD 64.99

Data Structures and Algorithms in Java, Second Edition is designed to be easy to read and understand although the topic itself is complicated. Algorithms are the procedures that software programs use......一起来看看 《Data Structures and Algorithms in Java》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具