聊聊Elasticsearch的RoundRobinSupplier

栏目: 后端 · 发布时间: 6年前

内容简介:本文主要研究一下Elasticsearch的RoundRobinSupplierelasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.javaelasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java

本文主要研究一下Elasticsearch的RoundRobinSupplier

RoundRobinSupplier

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java

final class RoundRobinSupplier<S> implements Supplier<S> {

    private final AtomicBoolean selectorsSet = new AtomicBoolean(false);
    private volatile S[] selectors;
    private AtomicInteger counter = new AtomicInteger(0);

    RoundRobinSupplier() {
        this.selectors = null;
    }

    RoundRobinSupplier(S[] selectors) {
        this.selectors = selectors;
        this.selectorsSet.set(true);
    }

    @Override
    public S get() {
        S[] selectors = this.selectors;
        return selectors[counter.getAndIncrement() % selectors.length];
    }

    void setSelectors(S[] selectors) {
        if (selectorsSet.compareAndSet(false, true)) {
            this.selectors = selectors;
        } else {
            throw new AssertionError("Selectors already set. Should only be set once.");
        }
    }

    int count() {
        return selectors.length;
    }
}
  • RoundRobinSupplier实现了Supplier接口,其get方法使用 counter.getAndIncrement() % selectors.length 来选择selectors数组的下标,然后返回该下标的值

NioSelectorGroup

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java

public class NioSelectorGroup implements NioGroup {

    private final List<NioSelector> dedicatedAcceptors;
    private final RoundRobinSupplier<NioSelector> acceptorSupplier;

    private final List<NioSelector> selectors;
    private final RoundRobinSupplier<NioSelector> selectorSupplier;

    private final AtomicBoolean isOpen = new AtomicBoolean(true);

    //......

    public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory,
                            int selectorCount, Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException {
        dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount);
        selectors = new ArrayList<>(selectorCount);

        try {
            List<RoundRobinSupplier<NioSelector>> suppliersToSet = new ArrayList<>(selectorCount);
            for (int i = 0; i < selectorCount; ++i) {
                RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>();
                suppliersToSet.add(supplier);
                NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier));
                selectors.add(selector);
            }
            for (RoundRobinSupplier<NioSelector> supplierToSet : suppliersToSet) {
                supplierToSet.setSelectors(selectors.toArray(new NioSelector[0]));
                assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list.";
            }

            for (int i = 0; i < dedicatedAcceptorCount; ++i) {
                RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
                NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier));
                dedicatedAcceptors.add(acceptor);
            }

            if (dedicatedAcceptorCount != 0) {
                acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0]));
            } else {
                acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
            }
            selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
            assert selectorCount == selectors.size() : "We need to have created all the selectors at this point.";
            assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point.";

            startSelectors(selectors, selectorThreadFactory);
            startSelectors(dedicatedAcceptors, acceptorThreadFactory);
        } catch (Exception e) {
            try {
                close();
            } catch (Exception e1) {
                e.addSuppressed(e1);
            }
            throw e;
        }
    }

    public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
        throws IOException {
        ensureOpen();
        return factory.openNioServerSocketChannel(address, acceptorSupplier);
    }

    @Override
    public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
        ensureOpen();
        return factory.openNioChannel(address, selectorSupplier);
    }    

    //......
}
  • NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)

ChannelFactory

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java

public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel, Socket extends NioSocketChannel> {
    //......

    public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier<NioSelector> supplier) throws IOException {
        ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address);
        NioSelector selector = supplier.get();
        ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel);
        scheduleServerChannel(serverChannel, selector);
        return serverChannel;
    }

    public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier<NioSelector> supplier) throws IOException {
        SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
        NioSelector selector = supplier.get();
        Socket channel = internalCreateChannel(selector, rawChannel);
        scheduleChannel(channel, selector);
        return channel;
    }

    //......
}
  • ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector

小结

counter.getAndIncrement() % selectors.length

doc


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Smashing Book

The Smashing Book

Jacob Gube、Dmitry Fadeev、Chris Spooner、Darius A Monsef IV、Alessandro Cattaneo、Steven Snell、David Leggett、Andrew Maier、Kayla Knight、Yves Peters、René Schmidt、Smashing Magazine editorial team、Vitaly Friedman、Sven Lennartz / 2009 / $ 29.90 / € 23.90

The Smashing Book is a printed book about best practices in modern Web design. The book shares technical tips and best practices on coding, usability and optimization and explores how to create succes......一起来看看 《The Smashing Book》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具