XNIO源码阅读

栏目: 服务器 · 发布时间: 6年前

内容简介:XNIO源码阅读

XNIO 是JBoss的一个IO框架。最开始我想找个lightweight servlet container库,于是看到了 undertow ,发现其网络部分使用的就是XNIO。所以干脆就先把XNIO的源码读下。

XNIO文档非常匮乏,能找到都是 3.0的版本 ,而且描述也不完全。Git上已经出到3.5.0。我读的是3.3.6.Final。

使用方式

可以参考 SimpleEchoServer.java ,不过这个例子使用的API已经被deprecated,仅供参考。使用方式大致为:

  • 创建服务,提供acceptListener
  • 在acceptListener中accept新的连接,并注册连接listener
  • 在连接listener回调中完成IO读写

主要概念

  • Channel,基本上同Java NIO中的Channel一致,一个server socket是一个channel,accept出来的连接也是channel
  • ChannelListener,监听Channel上的IO事件,应用代码与XNIO交互的地方
  • XnioWorker,维护IO线程池及应用任务线程池

项目结构

源码分为两个项目: xnio-api及nio-impl。xnio-api属于API层;nio-impl是基于NIO的实现。通过Java service provider动态地找到nio-impl这个实现。可见XNIO还可以用其他方式来实现。

org.xnio.channels 这个包里包含了大量的Channel接口定义,这个是非常恶心的一个地方,读代码的时候很容易被绕进去。这个包主要的实现后面提。 org.xnio.conduits ,我理解为比Channel更底层的传输通道,channel依赖于conduit实现,总之也是个恶心的概念。

线程模型

可以通过连接如何建立以及建立连接后如何管理连接来了解XNIO的线程模型。通过这个过程我简单画了下主要类关系以及连接建立过程:

XNIO源码阅读

用的Dia绘图,UML图支持得不够好

XNIO的线程模型是一个典型的one loop per thread的Reactor模型。 WorkerThread 类就是这个线程,其有一个主循环,不断地检测其关心的IO设备是否有IO事件发生。当有事件发生时,就将事件通知给关心的listener。站在上层模块的角度,这个线程就是一个Reactor,事件产生器。整个系统有固定数量的 WorkerThread ,也就是IO线程数。这个模型基本上凡是基于epoll/select模型实现的网络库都会用,例如muduo。可以回看下这个模型:

XNIO源码阅读

XNIO中接收到一个新连接时,会根据这个连接的地址(remote&local address)算出一个哈希值,然后根据哈希值分配到某一个IO线程,然后该连接以后的IO事件都由该线程处理。 WorkerThread 会始终回调 NioHandleQueuedNioTcpServerHandle 是一个accept socket,监听accept事件。而 NioSocketStreamConnection 则是一个建立好的连接,每次新连接进来就会创建,被哈希到某个 WorkerThread 处理。 NioSocketConduit 是一个连接具体关心IO事件的类,正是前面提到的,是一个Channel的底层实现。

NioXnioWorker 继承于 XnioWorkerXnioWorker 内部包含了一个应用任务的线程池。应用代码通过channel listener获取到IO事件通知,channel listener是在IO线程中回调的,所以不适合做耗时操作,否则会导致IO线程中其他IO设备饿死。所以对于这类任务就可以放到这个线程池中做。

Channel架构

前面提到的XNIO例子使用了一个deprecated的接口,那如何不使用这个接口呢?这就需要更具体地了解channel。XNIO中抽象的channel有很多类型,有些是只读的,有些是只写的,有些则是全双工的。channel还能被组合 ( AssembledChannel )。可以看下3.1里channel包的大图: channel package summary

这里我只关注基于TCP服务中的channel。如图:

XNIO源码阅读

重点关注 QueuedNioTcpServerNioSocketStreamConnectionQueuedNioTcpServer 实现 AcceptingChannel 没什么好说的,就是表示一个可以接收连接的channel。 NioSocketStreamConnection 表示一个网络连接。 StreamConnection 是一个可读可写的channel,但是其内部是通过另外两个channel来实现的,分别是 ConduitStreamSourceChannelConduitStreamSinkChannel ,分别用读和写。这两个channel内部其实是分别通过两个conduit 来实现,分别为 ConduitStreamSourceChannelConduitStreamSinkChannel

NioSocketStreamConnection 内部包含 NioSocketConduit ,这个类实现了 ConduitStreamSourceChannelConduitStreamSinkChannel 。在TCP场景下, StreamConnection 中的读写channel正是指向了 NioSocketConduit 。这个层次包装得有点绕,需要慢慢梳理。

在accept的时候,得到的可以是 StreamConnection ,其实也就是得到了一个可读可写的channel,设计得也没问题。可以基于这个channel设置读写listener。但是如果想在读listener里发起写操作,由于在读listener里看到的是一个只读的channel,所以就没办法写。所以才会有其他包装的channel。

理清了以上关系,就可以不用那个deprecated的API来实现一个echo server:

class ReadListener implements ChannelListener<StreamSourceChannel> {
  // 保存一个可写的channel,才能在读listener里做写操作
  private StreamSinkChannel sinkChannel;

  public ReadListener(StreamSinkChannel sinkChannel) {
    this.sinkChannel = sinkChannel;
  }

  public void handleEvent(StreamSourceChannel channel) {
    final ByteBuffer buffer = ByteBuffer.allocate(512);
    int res;
    try {
      while ((res = channel.read(buffer)) > 0) {
        buffer.flip();
        Channels.writeBlocking(sinkChannel, buffer);
      }
      Channels.flushBlocking(sinkChannel);
      if (res == -1) {
        channel.close();
      } else {
        channel.resumeReads();
      }
    } catch (IOException e) {
      e.printStackTrace();
      IoUtils.safeClose(channel);
    }
  }
}
final ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = new ChannelListener<AcceptingChannel<StreamConnection>>() {
  public void handleEvent(AcceptingChannel<StreamConnection> channel) {
    try {
      StreamConnection accepted;
      // channel is ready to accept zero or more connections
      while ((accepted = channel.accept()) != null) {
        System.out.println("accepted "
            + accepted.getPeerAddress());
        // stream channel has been accepted at this stage.
        // read listener is set; start it up
        accepted.getSourceChannel().setReadListener(new ReadListener(accepted.getSinkChannel()));
        accepted.getSourceChannel().resumeReads();
      }
    } catch (IOException ignored) {
    }
  }
};
final XnioWorker worker = Xnio.getInstance().createWorker(
    OptionMap.EMPTY);
// Create the server.
AcceptingChannel<? extends StreamConnection> server = worker
    .createStreamConnectionServer(new InetSocketAddress(12345),
        acceptListener, OptionMap.EMPTY);
// lets start accepting connections
server.resumeAccepts();

完。


以上所述就是小编给大家介绍的《XNIO源码阅读》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Head Rush Ajax

Head Rush Ajax

Brett McLaughlin、Eric Freeman、Elisabeth Freeman / O'Reilly Media, Inc. / 2006-03-01 / USD 34.99

Ajax, or Asynchronous JavaScript and XML, is a term describing the latest rage in web development. Ajax is used to create interactive web applications with XML-based web services, and using JavaScript......一起来看看 《Head Rush Ajax》 这本书的介绍吧!

html转js在线工具
html转js在线工具

html转js在线工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具