聊聊Elasticsearch的AtomicArray

栏目: 后端 · 发布时间: 6年前

内容简介:本文主要研究一下Elasticsearch的AtomicArrayelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.javaelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java

本文主要研究一下Elasticsearch的AtomicArray

AtomicArray

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java

public class AtomicArray<E> {
    private final AtomicReferenceArray<E> array;
    private volatile List<E> nonNullList;

    public AtomicArray(int size) {
        array = new AtomicReferenceArray<>(size);
    }

    /**
     * The size of the expected results, including potential null values.
     */
    public int length() {
        return array.length();
    }

    /**
     * Sets the element at position {@code i} to the given value.
     *
     * @param i     the index
     * @param value the new value
     */
    public void set(int i, E value) {
        array.set(i, value);
        if (nonNullList != null) { // read first, lighter, and most times it will be null...
            nonNullList = null;
        }
    }

    public final void setOnce(int i, E value) {
        if (array.compareAndSet(i, null, value) == false) {
            throw new IllegalStateException("index [" + i + "] has already been set");
        }
        if (nonNullList != null) { // read first, lighter, and most times it will be null...
            nonNullList = null;
        }
    }

    /**
     * Gets the current value at position {@code i}.
     *
     * @param i the index
     * @return the current value
     */
    public E get(int i) {
        return array.get(i);
    }

    /**
     * Returns the it as a non null list.
     */
    public List<E> asList() {
        if (nonNullList == null) {
            if (array == null || array.length() == 0) {
                nonNullList = Collections.emptyList();
            } else {
                List<E> list = new ArrayList<>(array.length());
                for (int i = 0; i < array.length(); i++) {
                    E e = array.get(i);
                    if (e != null) {
                        list.add(e);
                    }
                }
                nonNullList = list;
            }
        }
        return nonNullList;
    }

    /**
     * Copies the content of the underlying atomic array to a normal one.
     */
    public E[] toArray(E[] a) {
        if (a.length != array.length()) {
            throw new ElasticsearchGenerationException("AtomicArrays can only be copied to arrays of the same size");
        }
        for (int i = 0; i < array.length(); i++) {
            a[i] = array.get(i);
        }
        return a;
    }
}
  • AtomicArray封装了AtomicReferenceArray并定义了nonNullList,提供了asList方法转换为ArrayList;而setOnce方法则使用了AtomicReferenceArray的compareAndSet方法来实现;另外set及setOnce都会判断nonNullList是否为null,不为null则重新设置为null

GroupedActionListener

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java

public final class GroupedActionListener<T> implements ActionListener<T> {
    private final CountDown countDown;
    private final AtomicInteger pos = new AtomicInteger();
    private final AtomicArray<T> results;
    private final ActionListener<Collection<T>> delegate;
    private final Collection<T> defaults;
    private final AtomicReference<Exception> failure = new AtomicReference<>();

    /**
     * Creates a new listener
     * @param delegate the delegate listener
     * @param groupSize the group size
     */
    public GroupedActionListener(ActionListener<Collection<T>> delegate, int groupSize,
                                 Collection<T> defaults) {
        results = new AtomicArray<>(groupSize);
        countDown = new CountDown(groupSize);
        this.delegate = delegate;
        this.defaults = defaults;
    }

    @Override
    public void onResponse(T element) {
        results.setOnce(pos.incrementAndGet() - 1, element);
        if (countDown.countDown()) {
            if (failure.get() != null) {
                delegate.onFailure(failure.get());
            } else {
                List<T> collect = this.results.asList();
                collect.addAll(defaults);
                delegate.onResponse(Collections.unmodifiableList(collect));
            }
        }
    }

    @Override
    public void onFailure(Exception e) {
        if (failure.compareAndSet(null, e) == false) {
            failure.accumulateAndGet(e, (previous, current) -> {
                previous.addSuppressed(current);
                return previous;
            });
        }
        if (countDown.countDown()) {
            delegate.onFailure(failure.get());
        }
    }
}
  • GroupedActionListener的构造器根据groupSize创建了AtomicArray及CountDown
  • onResponse方法会调用AtomicArray的setOnce方法来设置结果,之后判断countDown是否都完成了,完成的话判断是否有failure,有则回调delegate.onFailure,没有failure则调用AtomicArray的asList方法获取list形式的结果,最后回调delegate.onResponse
  • onFailure方法会更新failure,如果compareAndSet失败则使用accumulateAndGet来更新,之后判断countDown是否都完成了,完成的话则回调delegate.onFailure

CountDown

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/CountDown.java

public final class CountDown {

    private final AtomicInteger countDown;
    private final int originalCount;

    public CountDown(int count) {
        if (count < 0) {
            throw new IllegalArgumentException("count must be greater or equal to 0 but was: " + count);
        }
        this.originalCount = count;
        this.countDown = new AtomicInteger(count);
    }

    /**
     * Decrements the count-down and returns <code>true</code> iff this call
     * reached zero otherwise <code>false</code>
     */
    public boolean countDown() {
        assert originalCount > 0;
        for (;;) {
            final int current = countDown.get();
            assert current >= 0;
            if (current == 0) {
                return false;
            }
            if (countDown.compareAndSet(current, current - 1)) {
                return current == 1;
            }
        }
    }

    /**
     * Fast forwards the count-down to zero and returns <code>true</code> iff
     * the count down reached zero with this fast forward call otherwise
     * <code>false</code>
     */
    public boolean fastForward() {
        assert originalCount > 0;
        assert countDown.get() >= 0;
        return countDown.getAndSet(0) > 0;
    }
    
    /**
     * Returns <code>true</code> iff the count-down has reached zero. Otherwise <code>false</code>
     */
    public boolean isCountedDown() {
        assert countDown.get() >= 0;
        return countDown.get() == 0;
    }
}
  • CountDown是一个简易线程安全非阻塞版的CountDownLatch,它提供了countDown方法使用compareAndSet来递减值,同时返回countDown是否完成( countDown.get() == 0 );另外还提供了isCountedDown来查询countDown是否完成;还有fastForward方法用于将countDown直接设置为0

小结

countDown.get() == 0

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Music Recommendation and Discovery

Music Recommendation and Discovery

Òscar Celma / Springer / 2010-9-7 / USD 49.95

With so much more music available these days, traditional ways of finding music have diminished. Today radio shows are often programmed by large corporations that create playlists drawn from a limited......一起来看看 《Music Recommendation and Discovery》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具