内容简介:在Netty中,一个非常重要的组件eventLoop就是基于Reactor模型的思想来实现的,所以有必要对reactor做一下了解。有关reactor模型的的结构我们看下维基百科是怎么说的。可以看到重要分为三个部分Reactor共有三种模型实现,具体资料参考
在Netty中,一个非常重要的组件eventLoop就是基于Reactor模型的思想来实现的,所以有必要对reactor做一下了解。有关reactor模型的的结构我们看下维基百科是怎么说的。
- Resources
Any resource that can provide input to or consume output from the system. - Synchronous Event Demultiplexer
Uses an event loop to block on all resources. The demultiplexer sends the resource to the dispatcher when it is possible to start a synchronous operation on a resource without blocking (Example: a synchronous call to read() will block if there is no data to read. The demultiplexer uses select() on the resource, which blocks until the resource is available for reading. In this case, a synchronous call to read() won't block, and the demultiplexer can send the resource to the dispatcher.) - Dispatcher
Handles registering and unregistering of request handlers. Dispatches resources from the demultiplexer to the associated request handler. - Request Handler
An application defined request handler and its associated resource.
可以看到重要分为三个部分
- Synchronous Event Demultiplexer 将IO事件分发给Dispatcher
- Dispatcher 处理客户端新连接,并分发请求到Request Handler
- Request Handler 执行非阻塞读/写任务
Reactor共有三种模型实现,具体资料参考 【NIO系列】——之Reactor模型 ,另外可参考Doug Lea的Scalable IO in Java
- 单Reactor单线程模型
- 单Reactor多线程模型
- 多Reactor多线程模型
Netty的实现
Netty采用的多Reactor多线程模型。在多Reactor多线程模型中,mainReactor负责监听ServerSocketChannnel,用来处理客户端新连接的建立,并将建立的客户端的socketChannel指定注册给subReactor;subReactor负责处理客户端的socketChannel的IO读写事件,另外将业务处理事件丢给worker线程池。
ServerBootstrap配置和启动
先看下EchoServer里怎么写的,
// Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } 复制代码
其中bossGroup是mainReactor,workerGroup是subReactor。细心的会发现bossGroup的构造参数是nThreads=1,表示使用一个EventLoop。那是因为一个Netty NIO服务端同一时间只能绑定一个端口,那就只能用一个Selector处理客户端连接事件,故而一个线程便足够了。 注意下group方法
/** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */ public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; } 复制代码
所以bossGroup是parentGroup,workerGroup是childGroup。
绑定端口
EchoServer
// Start the server. ChannelFuture f = b.bind(PORT).sync(); 复制代码
AbstractBootstrap
/** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } 复制代码
创建和初始化channel
在initAndRegister中创建和初始化channel,并将chanel注册到EventLoopGroup。还记得之前设置channel为NioServerSocketChannel.class,所以这里channelFactory.newChannel()返回的是NioServerSocketChannel。
final ChannelFuture initAndRegister() { Channel channel = null; try { // 创建Channel对象 channel = channelFactory.newChannel(); // 初始化Channel配置 init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 注册Channel到EventLoopGroup ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; } 复制代码
看下init方法
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } // 添加ChannelInitializer对象到pipeline中,用于后续初始化channelHandler到pipeline中 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); // 添加配置的ChannelHandler到pipeline中 if (handler != null) { pipeline.addLast(handler); } // 添加ServerBootstrapAcceptor ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } 复制代码
注意到pipeline在最后添加了ServerBootstrapAcceptor。
注册channel
// 注册Channel到EventLoopGroup ChannelFuture regFuture = config().group().register(channel); 复制代码
config().group()返回的是bossGroup。
// MultithreadEventLoopGroup @Override public ChannelFuture register(Channel channel) { return next().register(channel); } 复制代码
// SingleThreadEventLoop @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; } 复制代码
最后在unsafe中执行register方法
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } // 校验channel和eventLoop匹配 if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; // 在EventLoop中执行 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } 复制代码
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // 记录是否为首次注册 boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } 复制代码
最后在doRegister方法中将channel注册到selector中。
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } 复制代码
端口绑定
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } 复制代码
NioEventLoop
nioEventLoop的继承层次。
开启线程
接上面,看下eventLoop的execute方法,实际执行的是SingleThreadEventExecutor的方法。
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { // 创建线程 startThread(); // 若已经关闭,移除任务,并进行拒绝 if (isShutdown() && removeTask(task)) { reject(); } } // 唤醒线程 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } 复制代码
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } } } 复制代码
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { if (logger.isErrorEnabled()) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); } } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { if (logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } } terminationFuture.setSuccess(null); } } } } }); } 复制代码
NioEventLoop的run方法
接上面,接下来就会执行run方法。
@Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } // fall through default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } 复制代码
我们看下SelectStrategy的枚举。
/** * Indicates a blocking select should follow. */ int SELECT = -1; /** * Indicates the IO loop should be retried, no blocking select to follow directly. */ int CONTINUE = -2; /** * Indicates the IO loop to poll for new events without blocking. */ int BUSY_WAIT = -3; 复制代码
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())这个方法会返回0。具体看下下面:
@Override public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; } 复制代码
注意此时还有绑定任务,所以hasTasks为true,执行selectSupplier.get()。
private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } }; 复制代码
注意此时NioServerSocketChannel未绑定本机端口,selectNow返回0。 回到run方法:
final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } 复制代码
runAllTasks会执行上文中的NioServerSocketChannel与本机端口的绑定任务。
接收新连接
processSelectedKeys
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } } 复制代码
遍历SelectionKey迭代器
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } // 遍历SelectionKey迭代器 Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } } 复制代码
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // 如果SelectionKey是不合法的,则关闭Channel final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { // 获得就绪的IO事件的ops int readyOps = k.readyOps(); // OP_CONNECT事件就绪 // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 移除对OP_CONNECT感兴趣 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); // 完成连接 unsafe.finishConnect(); } // OP_WRITE事件就绪 // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write // 向Channel写入数据 ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } 复制代码
accept过程
有关Netty的accept过程可参考 Netty源码分析之accept过程 。 接上文,当就绪的IO事件为SelectionKey.OP_ACCEPT时执行unsafe.read()方法。
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } 复制代码
在doReadMessages方法中,将新建的NioSocketChannel添加到上文的readBuf中。
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } 复制代码
fireChannelRead过程
上文中执行pipeline.fireChannelRead(readBuf.get(i)),触发channelRead事件,从pipeline的head开始遍历,最终执行上文中init方法添加的ServerBootstrapAcceptor的channelRead方法。
@Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } 复制代码
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } 复制代码
最终将NioSocketChannel注册到childGroup中,即之前的workerGroup。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 能量视角下的GAN模型(三):生成模型=能量模型
- 提高模型准确率:组合模型
- JVM内存模型 与 JMM内存模型
- Golang并发模型:轻松入门流水线模型
- C++11 中的内存模型(上):内存模型基础
- 机器学习中的判别式模型和生成式模型
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
算法交易:制胜策略与原理
[美]欧内斯特·陈(Ernest P. Chan) / 高闻酉、黄蕊 / 机械工业出版社 / 49.00
本书是一本引人入胜、信息量大、覆盖各类交易策略的图书。无论个人投资者,还是机构投资者,都可以借鉴和使用其中的策略。本书中的策略大致可分为均值回归系统和动量系统两大类。书中不仅介绍了如何使用每种类别的交易策略,更解释了各种策略之所以有效的原因。本书始终以简单、线性的交易策略为重心,因为复杂的交易策略容易受到过度拟合及数据窥探的侵害。数学和软件是算法交易的两条腿。本书用到了一定程度的数学知识,使其对各......一起来看看 《算法交易:制胜策略与原理》 这本书的介绍吧!