kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

栏目: 后端 · 发布时间: 6年前

内容简介:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。}

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

1 Reactor单线程案例代码热热身

  • 如下是单线程的JAVA NIO编程模型。

  • 首先服务端创建ServerSocketChannel对象,并注册到Select上OP_ACCEPT事件,然后ServerSocketChannel负责监听指定端口上的连接请求。

  • 客户端一旦连接上ServerSocketChannel,就会触发Acceptor来处理OP_ACCEPT事件,并为来自客户端的连接创建Socket Channel,并设置为非阻塞模式,并在其Selector上注册OP_READ或者OP_WRITE,最终实现客户端与服务端的连接建立和数据通道打通。

  • 这里有一个明显的问题,就是所有时间的处理逻辑都是在Acceptor单线程完成的,在并发连接数较小,数据量较小的场景下,是没有问题的,但是......

  • Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低.

  • 优化点在于:通道连接|读取或写入|业务处理均采用多线程来处理。通过线程池或者MessageQueue共享队列,进一步优化了高并发的处理要求,这样就解决了同一时间出现大量I/O事件时,单独的Select就可能在分发事件时阻塞(或延时),而成为瓶颈的问题。

    kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

    public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000;

    public static void main(String args[]) throws Exception {
          // 打开服务端 Socket
          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
          // 打开 Selector
          Selector selector = Selector.open();
    
          // 服务端 Socket 监听8080端口, 并配置为非阻塞模式
          serverSocketChannel.socket().bind(new InetSocketAddress(8080));
          serverSocketChannel.configureBlocking(false);
    
          // 将 channel 注册到 selector 中.
          // 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
          // 注册到 Selector 中.
          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
          while (true) {
              // 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
              if (selector.select(TIMEOUT) == 0) {
                  System.out.print(".");
                  continue;
              }
    
              // 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
              Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
    
              while (keyIterator.hasNext()) {
    
                  SelectionKey key = keyIterator.next();
    
                  // 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
                  keyIterator.remove();
    
                  if (key.isAcceptable()) {
                      // 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
                      // 代表客户端的连接
                      // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
                      // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
                      SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                      clientChannel.configureBlocking(false);
                      //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
                      // 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
                      clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                  }
    
                  if (key.isReadable()) {
                      SocketChannel clientChannel = (SocketChannel) key.channel();
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      long bytesRead = clientChannel.read(buf);
                      if (bytesRead == -1) {
                          clientChannel.close();
                      } else if (bytesRead > 0) {
                          key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                          System.out.println("Get data length: " + bytesRead);
                      }
                  }
    
                  if (key.isValid() && key.isWritable()) {
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      buf.flip();
                      SocketChannel clientChannel = (SocketChannel) key.channel();
    
                      clientChannel.write(buf);
    
                      if (!buf.hasRemaining()) {
                          key.interestOps(OP_READ);
                      }
                      buf.compact();
                  }
              }
          }
      }
    复制代码

}


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

算法艺术与信息学竞赛

算法艺术与信息学竞赛

刘汝佳 / 清华大学出版社 / 2004-1 / 45.00元

《算法艺术与信息学竞赛》较为系统和全面地介绍了算法学最基本的知识。这些知识和技巧既是高等院校“算法与数据结构”课程的主要内容,也是国际青少年信息学奥林匹克(IOI)竞赛和ACM/ICPC国际大学生程序设计竞赛中所需要的。书中分析了相当数量的问题。 本书共3章。第1章介绍算法与数据结构;第2章介绍数学知识和方法;第3章介绍计算机几何。全书内容丰富,分析透彻,启发性强,既适合读者自学,也适合于课......一起来看看 《算法艺术与信息学竞赛》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

html转js在线工具
html转js在线工具

html转js在线工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具