netty-bootstrap

栏目: 编程工具 · 发布时间: 6年前

内容简介:Bootstrap是netty中用于创建Server、Client端代码的构造工具类,里面包括了启动Server、Client需要的配置信息等。下面用一段代码来初步了解下。主要的代码类为io.netty.bootstrap.Bootstrap

Bootstrap是netty中用于创建Server、Client端代码的构造 工具 类,里面包括了启动Server、Client需要的配置信息等。下面用一段代码来初步了解下。

// 创建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
// 创建用户acceptor和dispatcher io event的io线程池
EventLoopGroupbossGroup=newNioEventLoopGroup();
EventLoopGroupworkerGroup=newNioEventLoopGroup();
try{

// 设置server的channel类型为NioServerSocketChannel
bootstrap.channel(NioServerSocketChannel.class)
// 设置对应的io线程池
.group(bossGroup, workerGroup)
// 设置新建连接的处理器
.childHandler(newChannelInitializer<SocketChannel>() {
protectedvoidinitChannel(SocketChannelch)throwsException{
// 对于新建的连接,在它的ChannelPipeline上增加一个EchoHandler
ch.pipeline().addLast(newEchoHandler());
}
});

// 绑定到8090
ChannelFuturebind=bootstrap.bind(8090);
// netty中大多数操作都是异步的,所以上面的bind方法会立刻返回并返回一个Future, 需要sync阻塞等待bind成功
bind.sync();
// 一直阻塞当前线程知道server channel关闭
bind.channel().closeFuture().sync();
}finally{
// 关闭线程池
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}

Bootstrap相关源码分析

主要的代码类为

io.netty.bootstrap.Bootstrap

io.netty.bootstrap.ServerBootstrap

io.netty.bootstrap.AbstractBootstrap

io.netty.bootstrap.AbstractBootstrapConfig

io.netty.bootstrap.ServerBootstrapConfig

io.netty.bootstrap.BootstrapConfig

AbstractBootstrap

AbstractBootstrap通过提供chain方法链提供方面的Channel配置方式。

这里要了解一种很多代码中用到的看上去不太好懂的 Hierarchical Builder 模式。

public abstract class AbstractBootstrap<Bextends AbstractBootstrap<B,C>,Cextends Channel>implementsCloneable{

   ...
    /**
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
* {@link Channel}
*/
    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return self();
    }

    @SuppressWarnings("unchecked")
    private B self() {
        return (B) this;
    }
这里的AbstractBootstrap

返回了B类型的对象,这样又可以通过其他方法返回B的串联起来,这些方法最后都调用了B self()返回this, 这个this的类型是子类型,这样子类就可以直接使用而不必强行转换父类型到子类型了。

这时ServerBootstrap继承AbstractBootstrap并将B设置为自己时,就可以方便的在AbstractBootstrap基础上增加配置方法了。

public class ServerBootstrapextends AbstractBootstrap<ServerBootstrap,ServerChannel>{
	 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }
    ...
}

这里的ServerBootstrap在AbstractBootstrap的基础上又添加了group(parentGroup, childGroup)的方法。所以这种写法很适合多级Builder。

当子类继承

AbstractBootstrapConfig

AbstratBootstrapConfig向外暴露AbstractBootstrap的配置。

public abstract class AbstractBootstrapConfig<Bextends AbstractBootstrap<B,C>,Cextends Channel>{

    protected final B bootstrap;

    protected AbstractBootstrapConfig(B bootstrap) {
        this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");
    }

    /**
* Returns the configured local address or {@code null} if non is configured yet.
*/
    public final SocketAddress localAddress() {
        return bootstrap.localAddress();
    }

AbstractBootstrapConfig提供基础的AbstractBootstrap拥有的配置,具体子类又可以提供其具体AbstractBootstrap的配置,

例如ServerBootstrapConfig中B为ServerBoostrap,就可以返回更多的ServerBootstrap中的方法例如childGroup等。

ublic final class ServerBootstrapConfigextends AbstractBootstrapConfig<ServerBootstrap,ServerChannel>{

    ServerBootstrapConfig(ServerBootstrap bootstrap) {
        super(bootstrap);
    }

    /**
* Returns the configured {@link EventLoopGroup} which will be used for the child channels or {@code null}
* if non is configured yet.
*/
    @SuppressWarnings("deprecation")
    public EventLoopGroup childGroup() {
        return bootstrap.childGroup();
    }
    ...

ServerBootstrap bind过程

通常在给ServerBootstrap配置好各种参数后的最后一步就是bind。

Server的bind将创建一个本地的ServerSocket绑定到对应端口上,并且配置接受到新的socket后的处理等。

下面看一下具体的bind过程。

AbstractBootstrap中

/**
* Create a new {@linkChannel} and bind it.
*/
    public ChannelFuturebind(SocketAddress localAddress){
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }
private ChannelFuturedoBind(final SocketAddress localAddress){
     final ChannelFuture regFuture = initAndRegister();
     final Channel channel = regFuture.channel();
     if (regFuture.cause() != null) {
         return regFuture;
     }

     if (regFuture.isDone()) {
         // At this point we know that the registration was complete and successful.
         ChannelPromise promise = channel.newPromise();
         doBind0(regFuture, channel, localAddress, promise);
         return promise;
     } else {
         // Registration future is almost always fulfilled already, but just in case it's not.
         final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
         regFuture.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future)throws Exception {
                 Throwable cause = future.cause();
                 if (cause != null) {
                     // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                     // IllegalStateException once we try to access the EventLoop of the Channel.
                     promise.setFailure(cause);
                 } else {
                     // Registration was successful, so set the correct executor to use.
                     // See https://github.com/netty/netty/issues/2586
                     promise.registered();

                     doBind0(regFuture, channel, localAddress, promise);
                 }
             }
         });
         return promise;
     }
 }

initAndRegister

  • 负责通过ChannelFactory创建Channel
  • 初始化 init Channel
  • 注册Channel到EventLoopGroup上
final ChannelFuture initAndRegister() {
     Channel channel = null;
     try {
         channel = channelFactory.newChannel();
         init(channel);
     } catch (Throwable t) {
         if (channel != null) {
             // channel can be null if newChannel crashed (eg SocketException("too many open files"))
             channel.unsafe().closeForcibly();
             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
         }
         // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
     }

     ChannelFuture regFuture = config().group().register(channel);
     if (regFuture.cause() != null) {
         if (channel.isRegistered()) {
             channel.close();
         } else {
             channel.unsafe().closeForcibly();
         }
     }

     // If we are here and the promise is not failed, it's one of the following cases:
     // 1) If we attempted registration from the event loop, the registration has been completed at this point.
     // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
     // 2) If we attempted registration from the other thread, the registration request has been successfully
     // added to the event loop's task queue for later execution.
     // i.e. It's safe to attempt bind() or connect() now:
     // because bind() or connect() will be executed *after* the scheduled registration task is executed
     // because register(), bind(), and connect() are all bound to the same thread.

     return regFuture;
 }

通过 ServerBootstrap.channel(NioServerSocketChannel.class) 这样配置的ServerBootstrap

使用的是 ReflectiveChannelFactory , 通过对应的类的构造器反射创建Channel对象。

ServerBootstrap的init方法中将前面配置的Bootstrap的attr、option等设置到Channel上

然后给当前ServerChannel增加一个ChannelInitializer, initChannel会在Channel注册完成后调用,

这里会给Channel添加Bootstrap中配置的Handler,然后给继续在pipeline上添加一个ServerBootstrapAcceptor。

ServerBootstrapAcceptor就是ServerSocket接受到新建的socket连接的处理。

注意到这里给pipeline添加ServerBootstrapAcceptor放到的channel的eventloop中去执行,这样做的原因是

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor。
											ServerBootstrapAcceptor就是ServerSocket接受到新建的socket连接的处理。(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

ServerBootstrapAcceptor

首先给接受到的Channel(child)添加配置的childHandler、设置ChannelOption、ChannelAttribute等。

然后注册到childGroup上,对应于上面示例代码中的workerGroup。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
  final Channel child = (Channel) msg;

  child.pipeline().addLast(childHandler);

  setChannelOptions(child, childOptions, logger);

  for (Entry<AttributeKey<?>, Object> e: childAttrs) {
      child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
  }

  try {
      childGroup.register(child).addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
              if (!future.isSuccess()) {
                  forceClose(child, future.cause());
              }
          }
      });
  } catch (Throwable t) {
      forceClose(child, t);
  }

注册到EventLoopGruop上

ChannelFuture regFuture = config().group().register(channel);

NioEventLoopGroup 继承于 MultithreadEventLoopGroup ,next()方法会round-robin的方式选出一个NioEventLoop,

然后设置Channel的eventLoop为这个NioEventLoop然后发出channelRegister事件等。@Override

public ChannelFuture register(Channel channel) {

return next().register(channel);

}

MultithreadEventLoopGroup

@Override
   public ChannelFutureregister(Channel channel){
       return next().register(channel);
   }

	public EventExecutornext(){
       return chooser.next();
   }

chooser的作用是从EventExecutor中选出一个作为next的返回结果

public EventExecutorChooser newChooser(EventExecutor[] executors) {
       if (isPowerOfTwo(executors.length)) {
           return new PowerOfTwoEventExecutorChooser(executors);
       } else {
           return new GenericEventExecutorChooser(executors);
       }
   }

NioEventLoop 继承于 SingleThreadEventLoop

public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

AbstractChannel

这里register中的处理是

设置当前Channel的eventLoop,fireChannelRegister事件,如果当前Channel是active状态并且是第一次注册则fireChannelActive事件

public final void register(EventLoop eventLoop,final ChannelPromise promise){
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run(){
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

initAndRegister完成后,会进行doBind0

private static void doBind0(
					 final ChannelFuture regFuture, final Channel channel,
					 final SocketAddress localAddress, final ChannelPromise promise) {

			 // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
			 // the pipeline in its channelRegistered() implementation.
			 channel.eventLoop().execute(new Runnable() {
					 @Override
					 public void run(){
							 if (regFuture.isSuccess()) {
									 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
							 } else {
									 promise.setFailure(regFuture.cause());
							 }
					 }
			 });
	 }

doBind0调用AbstractChannel的bind方法,然后调用pipeline.bind。

后面会讲到ChannelPipeline,ChannelPipeline中有一个特殊的ChannelHandler是HeadContext,作为pipelien的head节点。

public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        unsafe.bind(localAddress, promise);
    }

unsafe是一个接口,不同的io channel有不同的实现,

NioServerSocketChannel

可以看到 java 7以上使用jdk的ServerSocketChannel.bind方法,小于的版本得到对应的socket后进行bind

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

硅谷之火

硅谷之火

迈克尔·斯韦因 / 王建华 / 机械工业出版社 / 2001-1-1 / 34.00

我们今天正处于这样一个时代:充满幻想的人们发现他们获得了他们曾经梦寐以求的力量,并且可以利用这个力量来改造我们的世界。 这是个转折的时代,跨国公司迷失了发展方向,而小企业家却举起了计算机革命的大旗,成了开拓未来的先锋。在这个时代里,计算机奇才的脸上露出了胜利的微笑,胸怀 大志者成了富有理想的人,而富有理想的人则成了亿万富翁。这是一场真正的革命,它促使人们变得伟大,变得富有而充满理想,自豪而富......一起来看看 《硅谷之火》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具