内容简介:Bootstrap是netty中用于创建Server、Client端代码的构造工具类,里面包括了启动Server、Client需要的配置信息等。下面用一段代码来初步了解下。主要的代码类为io.netty.bootstrap.Bootstrap
Bootstrap是netty中用于创建Server、Client端代码的构造 工具 类,里面包括了启动Server、Client需要的配置信息等。下面用一段代码来初步了解下。
// 创建ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap(); // 创建用户acceptor和dispatcher io event的io线程池 EventLoopGroupbossGroup=newNioEventLoopGroup(); EventLoopGroupworkerGroup=newNioEventLoopGroup(); try{ // 设置server的channel类型为NioServerSocketChannel bootstrap.channel(NioServerSocketChannel.class) // 设置对应的io线程池 .group(bossGroup, workerGroup) // 设置新建连接的处理器 .childHandler(newChannelInitializer<SocketChannel>() { protectedvoidinitChannel(SocketChannelch)throwsException{ // 对于新建的连接,在它的ChannelPipeline上增加一个EchoHandler ch.pipeline().addLast(newEchoHandler()); } }); // 绑定到8090 ChannelFuturebind=bootstrap.bind(8090); // netty中大多数操作都是异步的,所以上面的bind方法会立刻返回并返回一个Future, 需要sync阻塞等待bind成功 bind.sync(); // 一直阻塞当前线程知道server channel关闭 bind.channel().closeFuture().sync(); }finally{ // 关闭线程池 bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); }
Bootstrap相关源码分析
主要的代码类为
io.netty.bootstrap.Bootstrap
io.netty.bootstrap.ServerBootstrap
io.netty.bootstrap.AbstractBootstrap
io.netty.bootstrap.AbstractBootstrapConfig
io.netty.bootstrap.ServerBootstrapConfig
io.netty.bootstrap.BootstrapConfig
AbstractBootstrap
AbstractBootstrap通过提供chain方法链提供方面的Channel配置方式。
这里要了解一种很多代码中用到的看上去不太好懂的 Hierarchical Builder
模式。
public abstract class AbstractBootstrap<Bextends AbstractBootstrap<B,C>,Cextends Channel>implementsCloneable{ ... /** * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created * {@link Channel} */ public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return self(); } @SuppressWarnings("unchecked") private B self() { return (B) this; }
返回了B类型的对象,这样又可以通过其他方法返回B的串联起来,这些方法最后都调用了B self()返回this, 这个this的类型是子类型,这样子类就可以直接使用而不必强行转换父类型到子类型了。
这时ServerBootstrap继承AbstractBootstrap并将B设置为自己时,就可以方便的在AbstractBootstrap基础上增加配置方法了。
public class ServerBootstrapextends AbstractBootstrap<ServerBootstrap,ServerChannel>{ public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; } ... }
这里的ServerBootstrap在AbstractBootstrap的基础上又添加了group(parentGroup, childGroup)的方法。所以这种写法很适合多级Builder。
当子类继承
AbstractBootstrapConfig
AbstratBootstrapConfig向外暴露AbstractBootstrap的配置。
public abstract class AbstractBootstrapConfig<Bextends AbstractBootstrap<B,C>,Cextends Channel>{ protected final B bootstrap; protected AbstractBootstrapConfig(B bootstrap) { this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap"); } /** * Returns the configured local address or {@code null} if non is configured yet. */ public final SocketAddress localAddress() { return bootstrap.localAddress(); }
AbstractBootstrapConfig提供基础的AbstractBootstrap拥有的配置,具体子类又可以提供其具体AbstractBootstrap的配置,
例如ServerBootstrapConfig中B为ServerBoostrap,就可以返回更多的ServerBootstrap中的方法例如childGroup等。
ublic final class ServerBootstrapConfigextends AbstractBootstrapConfig<ServerBootstrap,ServerChannel>{ ServerBootstrapConfig(ServerBootstrap bootstrap) { super(bootstrap); } /** * Returns the configured {@link EventLoopGroup} which will be used for the child channels or {@code null} * if non is configured yet. */ @SuppressWarnings("deprecation") public EventLoopGroup childGroup() { return bootstrap.childGroup(); } ...
ServerBootstrap bind过程
通常在给ServerBootstrap配置好各种参数后的最后一步就是bind。
Server的bind将创建一个本地的ServerSocket绑定到对应端口上,并且配置接受到新的socket后的处理等。
下面看一下具体的bind过程。
AbstractBootstrap中
/** * Create a new {@linkChannel} and bind it. */ public ChannelFuturebind(SocketAddress localAddress){ validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
private ChannelFuturedoBind(final SocketAddress localAddress){ final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future)throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
initAndRegister
- 负责通过ChannelFactory创建Channel
- 初始化 init Channel
- 注册Channel到EventLoopGroup上
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
通过 ServerBootstrap.channel(NioServerSocketChannel.class)
这样配置的ServerBootstrap
使用的是 ReflectiveChannelFactory
, 通过对应的类的构造器反射创建Channel对象。
ServerBootstrap的init方法中将前面配置的Bootstrap的attr、option等设置到Channel上
然后给当前ServerChannel增加一个ChannelInitializer, initChannel会在Channel注册完成后调用,
这里会给Channel添加Bootstrap中配置的Handler,然后给继续在pipeline上添加一个ServerBootstrapAcceptor。
ServerBootstrapAcceptor就是ServerSocket接受到新建的socket连接的处理。
注意到这里给pipeline添加ServerBootstrapAcceptor放到的channel的eventloop中去执行,这样做的原因是
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor。 ServerBootstrapAcceptor就是ServerSocket接受到新建的socket连接的处理。( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
ServerBootstrapAcceptor
首先给接受到的Channel(child)添加配置的childHandler、设置ChannelOption、ChannelAttribute等。
然后注册到childGroup上,对应于上面示例代码中的workerGroup。
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }
注册到EventLoopGruop上
ChannelFuture regFuture = config().group().register(channel);
NioEventLoopGroup
继承于 MultithreadEventLoopGroup
,next()方法会round-robin的方式选出一个NioEventLoop,
然后设置Channel的eventLoop为这个NioEventLoop然后发出channelRegister事件等。@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
MultithreadEventLoopGroup
@Override public ChannelFutureregister(Channel channel){ return next().register(channel); } public EventExecutornext(){ return chooser.next(); }
chooser的作用是从EventExecutor中选出一个作为next的返回结果
public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
NioEventLoop
继承于 SingleThreadEventLoop
public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
AbstractChannel
这里register中的处理是
设置当前Channel的eventLoop,fireChannelRegister事件,如果当前Channel是active状态并且是第一次注册则fireChannelActive事件
public final void register(EventLoop eventLoop,final ChannelPromise promise){ if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run(){ register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
initAndRegister完成后,会进行doBind0
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run(){ if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
doBind0调用AbstractChannel的bind方法,然后调用pipeline.bind。
后面会讲到ChannelPipeline,ChannelPipeline中有一个特殊的ChannelHandler是HeadContext,作为pipelien的head节点。
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
unsafe是一个接口,不同的io channel有不同的实现,
NioServerSocketChannel
可以看到 java 7以上使用jdk的ServerSocketChannel.bind方法,小于的版本得到对应的socket后进行bind
protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。