内容简介:本文主要研究一下Elasticsearch的SizeBlockingQueueelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.javaelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java
序
本文主要研究一下Elasticsearch的SizeBlockingQueue
SizeBlockingQueue
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java
public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { private final BlockingQueue<E> queue; private final int capacity; private final AtomicInteger size = new AtomicInteger(); public SizeBlockingQueue(BlockingQueue<E> queue, int capacity) { assert capacity >= 0; this.queue = queue; this.capacity = capacity; } @Override public int size() { return size.get(); } public int capacity() { return this.capacity; } @Override public Iterator<E> iterator() { final Iterator<E> it = queue.iterator(); return new Iterator<E>() { E current; @Override public boolean hasNext() { return it.hasNext(); } @Override public E next() { current = it.next(); return current; } @Override public void remove() { // note, we can't call #remove on the iterator because we need to know // if it was removed or not if (queue.remove(current)) { size.decrementAndGet(); } } }; } @Override public E peek() { return queue.peek(); } @Override public E poll() { E e = queue.poll(); if (e != null) { size.decrementAndGet(); } return e; } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = queue.poll(timeout, unit); if (e != null) { size.decrementAndGet(); } return e; } @Override public boolean remove(Object o) { boolean v = queue.remove(o); if (v) { size.decrementAndGet(); } return v; } /** * Forces adding an element to the queue, without doing size checks. */ public void forcePut(E e) throws InterruptedException { size.incrementAndGet(); try { queue.put(e); } catch (InterruptedException ie) { size.decrementAndGet(); throw ie; } } @Override public boolean offer(E e) { while (true) { final int current = size.get(); if (current >= capacity()) { return false; } if (size.compareAndSet(current, 1 + current)) { break; } } boolean offered = queue.offer(e); if (!offered) { size.decrementAndGet(); } return offered; } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // note, not used in ThreadPoolExecutor throw new IllegalStateException("offer with timeout not allowed on size queue"); } @Override public void put(E e) throws InterruptedException { // note, not used in ThreadPoolExecutor throw new IllegalStateException("put not allowed on size queue"); } @Override public E take() throws InterruptedException { E e; try { e = queue.take(); size.decrementAndGet(); } catch (InterruptedException ie) { throw ie; } return e; } @Override public int remainingCapacity() { return capacity() - size.get(); } @Override public int drainTo(Collection<? super E> c) { int v = queue.drainTo(c); size.addAndGet(-v); return v; } @Override public int drainTo(Collection<? super E> c, int maxElements) { int v = queue.drainTo(c, maxElements); size.addAndGet(-v); return v; } @Override public Object[] toArray() { return queue.toArray(); } @Override public <T> T[] toArray(T[] a) { return (T[]) queue.toArray(a); } @Override public boolean contains(Object o) { return queue.contains(o); } @Override public boolean containsAll(Collection<?> c) { return queue.containsAll(c); } }
- SizeBlockingQueue继承了AbstractQueue,同时实现了BlockingQueue接口;它的构造器要求输入blockingQueue及capacity参数
- SizeBlockingQueue有个AtomicInteger类型的size参数用于记录queue的大小,它在poll、remove、offer、take等方法都会维护这个size参数
- 其中offer方法会判断当前size是否大于等于capacity,如果大于等于则直接返回false;而put方法则直接抛出IllegalStateException
ResizableBlockingQueue
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java
final class ResizableBlockingQueue<E> extends SizeBlockingQueue<E> { private volatile int capacity; ResizableBlockingQueue(BlockingQueue<E> queue, int initialCapacity) { super(queue, initialCapacity); this.capacity = initialCapacity; } @Override public int capacity() { return this.capacity; } @Override public int remainingCapacity() { return Math.max(0, this.capacity()); } /** Resize the limit for the queue, returning the new size limit */ public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) { assert adjustmentAmount > 0 : "adjustment amount should be a positive value"; assert optimalCapacity >= 0 : "desired capacity cannot be negative"; assert minCapacity >= 0 : "cannot have min capacity smaller than 0"; assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity"; if (optimalCapacity == capacity) { // Yahtzee! return this.capacity; } if (optimalCapacity > capacity + adjustmentAmount) { // adjust up final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount); this.capacity = newCapacity; return newCapacity; } else if (optimalCapacity < capacity - adjustmentAmount) { // adjust down final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount); this.capacity = newCapacity; return newCapacity; } else { return this.capacity; } } }
- ResizableBlockingQueue继承了SizeBlockingQueue,它提供了一个线程安全的adjustCapacity方法,用于resize队列的capacity
EsExecutors
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
public class EsExecutors { //...... public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) { BlockingQueue<Runnable> queue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity); } return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder); } public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, int maxQueueSize, int frameSize, TimeValue targetedResponseTime, ThreadFactory threadFactory, ThreadContext contextHolder) { if (initialQueueCapacity <= 0) { throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " + initialQueueCapacity); } ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity); return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory, new EsAbortPolicy(), contextHolder); } //...... }
- EsExecutors的newFixed创建的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed创建的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor
小结
- SizeBlockingQueue继承了AbstractQueue,同时实现了BlockingQueue接口;它的构造器要求输入blockingQueue及capacity参数;它有个AtomicInteger类型的size参数用于记录queue的大小,它在poll、remove、offer、take等方法都会维护这个size参数;其中offer方法会判断当前size是否大于等于capacity,如果大于等于则直接返回false;而put方法则直接抛出IllegalStateException
- ResizableBlockingQueue继承了SizeBlockingQueue,它提供了一个线程安全的adjustCapacity方法,用于resize队列的capacity
- EsExecutors的newFixed创建的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed创建的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。