内容简介:本文主要研究一下Elasticsearch的RoundRobinSupplierelasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.javaelasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java
序
本文主要研究一下Elasticsearch的RoundRobinSupplier
RoundRobinSupplier
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java
final class RoundRobinSupplier<S> implements Supplier<S> { private final AtomicBoolean selectorsSet = new AtomicBoolean(false); private volatile S[] selectors; private AtomicInteger counter = new AtomicInteger(0); RoundRobinSupplier() { this.selectors = null; } RoundRobinSupplier(S[] selectors) { this.selectors = selectors; this.selectorsSet.set(true); } @Override public S get() { S[] selectors = this.selectors; return selectors[counter.getAndIncrement() % selectors.length]; } void setSelectors(S[] selectors) { if (selectorsSet.compareAndSet(false, true)) { this.selectors = selectors; } else { throw new AssertionError("Selectors already set. Should only be set once."); } } int count() { return selectors.length; } }
-
RoundRobinSupplier实现了Supplier接口,其get方法使用
counter.getAndIncrement() % selectors.length
来选择selectors数组的下标,然后返回该下标的值
NioSelectorGroup
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java
public class NioSelectorGroup implements NioGroup { private final List<NioSelector> dedicatedAcceptors; private final RoundRobinSupplier<NioSelector> acceptorSupplier; private final List<NioSelector> selectors; private final RoundRobinSupplier<NioSelector> selectorSupplier; private final AtomicBoolean isOpen = new AtomicBoolean(true); //...... public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, int selectorCount, Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException { dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount); selectors = new ArrayList<>(selectorCount); try { List<RoundRobinSupplier<NioSelector>> suppliersToSet = new ArrayList<>(selectorCount); for (int i = 0; i < selectorCount; ++i) { RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(); suppliersToSet.add(supplier); NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier)); selectors.add(selector); } for (RoundRobinSupplier<NioSelector> supplierToSet : suppliersToSet) { supplierToSet.setSelectors(selectors.toArray(new NioSelector[0])); assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list."; } for (int i = 0; i < dedicatedAcceptorCount; ++i) { RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier)); dedicatedAcceptors.add(acceptor); } if (dedicatedAcceptorCount != 0) { acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0])); } else { acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); } selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); assert selectorCount == selectors.size() : "We need to have created all the selectors at this point."; assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point."; startSelectors(selectors, selectorThreadFactory); startSelectors(dedicatedAcceptors, acceptorThreadFactory); } catch (Exception e) { try { close(); } catch (Exception e1) { e.addSuppressed(e1); } throw e; } } public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory) throws IOException { ensureOpen(); return factory.openNioServerSocketChannel(address, acceptorSupplier); } @Override public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException { ensureOpen(); return factory.openNioChannel(address, selectorSupplier); } //...... }
- NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)
ChannelFactory
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java
public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel, Socket extends NioSocketChannel> { //...... public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier<NioSelector> supplier) throws IOException { ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address); NioSelector selector = supplier.get(); ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel); scheduleServerChannel(serverChannel, selector); return serverChannel; } public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier<NioSelector> supplier) throws IOException { SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); NioSelector selector = supplier.get(); Socket channel = internalCreateChannel(selector, rawChannel); scheduleChannel(channel, selector); return channel; } //...... }
- ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector
小结
counter.getAndIncrement() % selectors.length
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。