内容简介:本文主要研究一下Elasticsearch的RoundRobinSupplierelasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.javaelasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java
序
本文主要研究一下Elasticsearch的RoundRobinSupplier
RoundRobinSupplier
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java
final class RoundRobinSupplier<S> implements Supplier<S> {
private final AtomicBoolean selectorsSet = new AtomicBoolean(false);
private volatile S[] selectors;
private AtomicInteger counter = new AtomicInteger(0);
RoundRobinSupplier() {
this.selectors = null;
}
RoundRobinSupplier(S[] selectors) {
this.selectors = selectors;
this.selectorsSet.set(true);
}
@Override
public S get() {
S[] selectors = this.selectors;
return selectors[counter.getAndIncrement() % selectors.length];
}
void setSelectors(S[] selectors) {
if (selectorsSet.compareAndSet(false, true)) {
this.selectors = selectors;
} else {
throw new AssertionError("Selectors already set. Should only be set once.");
}
}
int count() {
return selectors.length;
}
}
-
RoundRobinSupplier实现了Supplier接口,其get方法使用
counter.getAndIncrement() % selectors.length来选择selectors数组的下标,然后返回该下标的值
NioSelectorGroup
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java
public class NioSelectorGroup implements NioGroup {
private final List<NioSelector> dedicatedAcceptors;
private final RoundRobinSupplier<NioSelector> acceptorSupplier;
private final List<NioSelector> selectors;
private final RoundRobinSupplier<NioSelector> selectorSupplier;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
//......
public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory,
int selectorCount, Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException {
dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount);
selectors = new ArrayList<>(selectorCount);
try {
List<RoundRobinSupplier<NioSelector>> suppliersToSet = new ArrayList<>(selectorCount);
for (int i = 0; i < selectorCount; ++i) {
RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>();
suppliersToSet.add(supplier);
NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier));
selectors.add(selector);
}
for (RoundRobinSupplier<NioSelector> supplierToSet : suppliersToSet) {
supplierToSet.setSelectors(selectors.toArray(new NioSelector[0]));
assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list.";
}
for (int i = 0; i < dedicatedAcceptorCount; ++i) {
RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier));
dedicatedAcceptors.add(acceptor);
}
if (dedicatedAcceptorCount != 0) {
acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0]));
} else {
acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
}
selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
assert selectorCount == selectors.size() : "We need to have created all the selectors at this point.";
assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point.";
startSelectors(selectors, selectorThreadFactory);
startSelectors(dedicatedAcceptors, acceptorThreadFactory);
} catch (Exception e) {
try {
close();
} catch (Exception e1) {
e.addSuppressed(e1);
}
throw e;
}
}
public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
throws IOException {
ensureOpen();
return factory.openNioServerSocketChannel(address, acceptorSupplier);
}
@Override
public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
ensureOpen();
return factory.openNioChannel(address, selectorSupplier);
}
//......
}
- NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)
ChannelFactory
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java
public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel, Socket extends NioSocketChannel> {
//......
public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier<NioSelector> supplier) throws IOException {
ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address);
NioSelector selector = supplier.get();
ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel);
scheduleServerChannel(serverChannel, selector);
return serverChannel;
}
public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier<NioSelector> supplier) throws IOException {
SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
NioSelector selector = supplier.get();
Socket channel = internalCreateChannel(selector, rawChannel);
scheduleChannel(channel, selector);
return channel;
}
//......
}
- ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector
小结
counter.getAndIncrement() % selectors.length
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
机器学习基础教程
(英)Simon Rogers,、Mark Girolami / 郭茂祖、王春宇 刘扬 刘晓燕、刘扬、刘晓燕 / 机械工业出版社 / 2014-1 / 45.00
本书是一本机器学习入门教程,包含了数学和统计学的核心技术,用于帮助理解一些常用的机器学习算法。书中展示的算法涵盖了机器学习的各个重要领域:分类、聚类和投影。本书对一小部分算法进行了详细描述和推导,而不是简单地将大量算法罗列出来。 本书通过大量的MATLAB/Octave脚本将算法和概念由抽象的等式转化为解决实际问题的工具,利用它们读者可以重新绘制书中的插图,并研究如何改变模型说明和参数取值。......一起来看看 《机器学习基础教程》 这本书的介绍吧!
HTML 编码/解码
HTML 编码/解码
HEX HSV 转换工具
HEX HSV 互换工具