Netty系列文章之服务端启动分析

栏目: Java · 发布时间: 6年前

内容简介:本文主要分析 Netty服务端的启动,以便对Netty框架有一个基本的认识,我用的Netty版本是该源码出自 netty官方提供的 服务端demo,详细地址:我做了一点小改动,代码如下:

本文主要分析 Netty服务端的启动,以便对Netty框架有一个基本的认识,我用的Netty版本是 netty-4.1.29 ,之前的文章 Netty 系列文章之基本组件概览 对Netty的基本组件做了一个简单的介绍,算是对本文分析Netty服务端的启动做一个基础铺垫

服务端代码

该源码出自 netty官方提供的 服务端demo,详细地址: github.com/netty/netty…

我做了一点小改动,代码如下:

public final class EchoServer {

    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
复制代码

服务端启动流程概览:

ServerBootStrap
NioEventLoopGroup
Channel
ChannelHandler

在之前的文章我就提到过, ServerBootstrap 是Netty服务端的启动辅助类,帮助Netty服务端完成初始化,下面我将深入代码,仔细分析Netty服务端启动过程中各组件的创建与初始化

Channel的创建和初始化过程

Channel是Netty的网络操作抽象类,对应于JDK底层的 Socket,Netty服务端的Channel类型是 NioServerSocketChannel 。下面来分析 NioServerSocketChannel 的创建和初始化

NioServerSocketChannel的创建

NioServerSocketChannel 的创建实际上是从 ServerBootStrapbind() 方法开始的,进入 bind() 源码分析( AbstractBootstrap 的bind()方法):

......
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}
......
/**
 * Create a new {@link Channel} and bind it.
 */
public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    }
    ......
}
复制代码

在源码里注意到一个 initAndRegister() 方法,这个方法就负责 NioServerSocketChannel 的初始化和注册操作,走进 initAndRegister() 方法,如下图所示:

Netty系列文章之服务端启动分析

从上图可以看出,源码里是调用 channelFactory.newChannel() 来创建 channel , 走进 ChannelFactory 发现该接口被 @Deprecated 注解标注了,说明是一个过时的接口:

@Deprecated
public interface ChannelFactory<T extends Channel> {
    /**
     * Creates a new channel.
     */
    T newChannel();
}
复制代码

我用的Netty版本是 Netty-4.1.29 ,其Netty API 文档 中介绍 io.netty.bootstrap.ChannelFactory 提到用 io.netty.channel.ChannelFactory 代替。

这里 ChannelFactory 只是一个工厂接口,真正创建 Channel 的是 ReflectiveChannelFactory 类,它是 ChannelFactory 的一个重要实现类,该类通过反射方式创建 Channel ,源代码如下:

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}
复制代码

其中 newChannel() 方法通过 clazz.getConstructor().newInstance() 来创建 Channel,即通过反射方式来创建 Channel,而这个 clazz 就是 通过 ServerBootStrapchannel 方法传入的,最开始的服务端代码传入的 NioServerSocketChannel ,所以对应通过反射创建了 NioServerSocketChannel ,并且 ChannelFactory 的初始化也是在该方法中进行的,代码如下:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
复制代码

到此, NioServerSocketChannel 的创建过程大体结束,再次总结一下:

  • ServerBootstrap 中的 ChannelFactory 的实现是 ReflectiveChannelFactory
  • 生成的 Channel的具体类型是 NioServerSocketChannel
  • Channel 的实例化过程其实就是调用 ChannelFactory.newChannel 方法,实际上是通过反射方式进行创建的

NioServerSocketChanel的实例化过程

在前面的分析中,NioServerSocketChannel是通过反射创建的,它的构造方法如下:

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}
/**
 * Create a new instance
 */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
复制代码

方法newSocket利用 provider.openServerSocketChannel() 生成Nio中的 ServerSocketChannel 对象,然后调用重载的构造器:

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码

该构造器中,调用了父类的构造器,传入的参数是 SelectionKey.OP_ACCEPT ,这个参数对于有Java NIO编程经验的人来说应该非常熟悉,在Java NIO中服务端要监听客户端的连接请求,就向多路复用器 Selector 注册 SelectionKey.OP_ACCEPT 客户端连接事件,而Netty又是基于 Java NIO开发的,这里可见一斑。接着进入父类构造器:

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}
复制代码

然后:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
复制代码

设置当前 ServerSocketChannel为非阻塞通道,然后再次进入父类构造器 AbstractChannel(Channel parent) :

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
复制代码
  • parent属性设置为null
  • 初始化unsafe,用来负责底层的connect,register,read和write操作
  • 初始化pipeline,在实例化一个Channel的同时,当有事件发生的时候,pipeline负责调用相应的Handler进行处理

关于 unsafe

Netty中的 unsafe 不是JDK中的 sun.misc.Unsafe ,该 unsafe 实际是封装了 Java 底层 Socket的操作,因此是沟通 Netty上层和Java 底层重要的桥梁。

ChannelPipeline的初始化

每个Channel都有对应的 ChannelPipeline ,当一个Channel被创建时,对应的ChannelPipeline也会自动创建,在上面分析 NioServerSocketChannel 实例化过程就看到,在其父类构造器中,有初始化一个 pipeline ,对应源代码如下:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    .....
    private final Channel parent;
    private final ChannelId id;
    private final Unsafe unsafe;
    private final DefaultChannelPipeline pipeline;
    ......
    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    ......
        /**
     * Returns a new {@link DefaultChannelPipeline} instance.
     */
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
复制代码

从上面代码看到,pipeline最终被初始化为一个 DefaultChannelPipelineDefaultChannelPipelineChannelPipeline 的实现类,进入它的构造方法,如下:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
复制代码

该构造方法有一个参数 channel ,这个 channel 就是我们之前传入的 NioServerSocketChannel 。关于该构造器其他方法以及 ChannelPipeline 更详细的介绍将在后续文章分析。

NioEventLoopGroup

在我们最开始的Netty服务端代码中初始化了两个 NioEventLoopGroup ,即一个处理客户端连接请求的线程池—— bossGroup ,一个处理客户端读写操作的线程池—— workerGroup

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
复制代码

NioEventLoopGroup 的类继承结构图如下所示:

Netty系列文章之服务端启动分析

从图中可以看到, NioEventLoopGroup 实现了 Executor 接口, Executor 框架可以用来创建线程池的,也是一个线程执行器。关于 Executor 框架更加详细的介绍请参阅《Java并发编程的艺术》

NioEventLoopGroup

看了 NioEventLoopGroup 的类继承结构,下面来分析一下它的初始化过程,构造器源代码如下:

......
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
......
public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}
......
public NioEventLoopGroup(
        int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
复制代码

上面几个重载构造器其实没做啥,最终调用父类 MultithreadEventLoopGroup 的构造器,

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
复制代码

这里需要注意的是如果我们传入的线程数 nThreads 是 0 的话,那么Netty将会为我们设置默认的线程数 DEFAULT_EVENT_LOOP_THREADS ,这个默认值是 处理器核心数 * 2 ,如下:

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}
复制代码

bossGroup 这个线程池我们传入的 nThread 是1,实际上在 bossGroup 中只会有一个线程用于处理客户端连接请求,所以这里设置为1,而不使用默认的线程数,至于为什么只用一个线程处理连接请求还需用线程池,在Stack Overflow有相关问题的讨论。

然后回来再次进入父类 MultithreadEventExecutorGroup 的构造器,

......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
复制代码

MultithreadEventExecutorGroup 管理着 eventLoop 的生命周期,它有几个变量:

  • children : EventExecutor数组,保存eventloop
  • chooser : 线程选择器,从children中选取一个 eventloop的策略

MultithreadEventExecutorGroup 的构造器主要分为以下几个步骤:

  • 创建线程执行器—— ThreadPerTaskExecutor
  • 调用 newChild 方法初始化 children 数组
  • 创建线程选择器—— chooser

创建ThreadPerTaskExecutor

我们一开始初始化 NioEventLoopGroup ,并没有传入 Executor 参数:

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
复制代码

所以到父类 MultithreadEventExecutorGroup 构造器时,executor 为null, 然后执行:

if (executor == null) {
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
复制代码

ThreadPerTaskExecutor是一个线程执行器,它实现了 Executor 接口,

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
复制代码

ThreadPerTaskExecutor 实现了 execute 方法,每次通过调用 execute 方法执行线程任务

调用 newChild 方法初始化 children 数组

children[i] = newChild(executor, args);
复制代码

在一个for循环里,nThread线程数是总的循环次数,通过 newChild方法初始化 EventExecutor数组的每个元素,而 newChild 方法如下:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
复制代码

每次循环通过newChild实例化一个 NioEventLoop 对象。

创建线程选择器——chooser

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}
......
private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}
复制代码

根据 EventExecutor[] 数组的大小,采用不同策略初始化一个 Chooser,如果大小为 2的幂次方则采用 PowerOfTwoEventExecutorChooser ,否则使用 GenericEventExecutorChooser 。 无论使用哪个 chooser,它们的功能都是一样的,即从 EventExecutor[] 数组中,这里也就是 NioEventLoop 数组中,选择一个合适的 NioEventLoop

NioEventLoop的初始化

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
复制代码

从前面的分析就可以看到,通过 newChild方法初始化 NioEventLoopGroup 中的 NioEventLoop ,下面来看下 NioEventLoop 的构造方法是怎样的:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}
复制代码

之前在 NioEventLoopGroup 的构造器中通过 SelectorProvider.provider() 创建了一个 SelectorProvider ,这里传递给了NioEventLoop中的provider,而 NioEventLoop 又通过 openSelector() 方法获取一个 selector对象,实际上是通过 provideropenSelector 方法。这不就是 对应Java NIO中的创建多路复用器 selector。(这里只是简单阐述NioEventLoop的构造方法,后续文章会对NioEventLoop做更加详细的分析)

Channel的注册过程

前面已经介绍了 Channel的创建和初始化过程,是在 initAndRegister 方法中进行的,这个方法里还会将初始化好的 channel注册到 EventLoop 线程中去

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
       .....
    }

    ChannelFuture regFuture = config().group().register(channel);
    ......
}
复制代码

调用 config().group().register 方法将 channel注册到 EventLoopGroup 中去,其目的就是 为了实现NIO中把ServerSocketChannel注册到 Selector中去,这样就是可以实现client请求的监听 ,代码如下:

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
......
public EventLoop next() {
    return (EventLoop) super.next();
}
复制代码

父类MultithreadEventExecutorGroup的next()方法,next方法使用 chooser策略从 EventExecutor[] 数组中选择一个 SingleThreadEventLoop

public EventExecutor next() {
    return chooser.next();
}
.....
public EventExecutor next() {
    return executors[idx.getAndIncrement() & executors.length - 1];
}
复制代码

然后再执行 SingleThreadEventLoopregister() 注册方法:

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
...
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
复制代码

上面代码调用了 unsafe的register方法,具体是 AbstractUnsafe.register ,而unsafe主要用于实现底层的 rergister,read,write等操作。该 register 方法是:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ......
        AbstractChannel.this.eventLoop = eventLoop;

        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            ......
        }
    }
复制代码

将 eventLoop 赋值给 Channel 的 eventLoop 属性,然后又调用了 register0()方法:

private void register0(ChannelPromise promise) {
        try {
            ......
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            registered = true;
            pipeline.invokeHandlerAddedIfNeeded();
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            ......
        }
    }

复制代码

上面有个关键方法就是 doRegister() , doRegister 才是最终Nio的注册方法:

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}
复制代码

通过 javaChannel().register(eventLoop().unwrappedSelector(), 0, this) 将 Channel对应的Java NIO ServerSocketChannel注册到 EventLoop 中的Selector上,最终完成了channel向eventLoop的注册过程。

这里总结下 Channel注册过程中函数调用链: AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister()

添加 ChannelHandler

在之前的 initAndRegister() 方法里,里面有个 init() 方法,如下:

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

复制代码

init() 方法中设置服务端自定义的 ChannelOptions,ChannelAttrs 属性和为服务端Channel创建出来的新连接的Channel设置的自定义属性 ChildOptions,ChildAttrs ,这里就不多叙述设置参数的问题了,重点关注 pipelineaddLast 方法,该方法就是添加用于处理出站和入站数据流的 ChannelHandler,而 pipeline 是从 channel 中获取的,之前分析过当创建 channel 时会自动创建一个对应的 channelPipeline

至于 ChannelInitializer 又是什么,来看下它的类继承结构图就知道了:

Netty系列文章之服务端启动分析

ChannelInitializer 是一个抽象类,实现了 ChannelHandler 接口,它有一个抽象方法 initChannel ,上面代码实现了该方法并且添加了 bootstrap 的handler,逻辑如下:

.....
//1
ChannelHandler handler = config.handler();
if (handler != null) {
    pipeline.addLast(handler);
}
......
//2
public final ChannelHandler handler() {
    return bootstrap.handler();
}
......
//3
final ChannelHandler handler() {
    return handler;
}

复制代码

initChannel 添加的 Handler 就是我们服务端代码中 serverbootstrap 设置的 handler ,如下:

b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) {
                 ChannelPipeline p = ch.pipeline();
                 //p.addLast(new LoggingHandler(LogLevel.INFO));
                 p.addLast(serverHandler);
             }
         });
复制代码

示例代码设置的handler为 LoggingHandler ,用于处理日志,这里不细说。上面的 initChannel 方法可以添加 Handler ,这里的 serverbootstrap 启动类还增加了 childHandler 方法,也是用来添加 handler,只不过是向已经连接的 channel客户端的 channnelpipeline 添加 handler

serverbootstrap.handler() 设置的 handler 在初始化就会执行,而 serverbootstrap.childHandler() 设置的 childHandler 在客户端连接成功才会执行

小结

由于自身知识与经验有限,对Netty的服务端启动源码分析得不是很全面,在此过程中也参考了一些大佬的Netty源码分析文章,本文如有错误之处,欢迎指出。

参考资料 & 鸣谢


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

500 Lines or Less

500 Lines or Less

Amy Brown、Michael DiBernardo / 2016-6-28 / USD 35.00

This book provides you with the chance to study how 26 experienced programmers think when they are building something new. The programs you will read about in this book were all written from scratch t......一起来看看 《500 Lines or Less》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试