内容简介:本文主要研究一下Elasticsearch的SizeBlockingQueueelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.javaelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java
序
本文主要研究一下Elasticsearch的SizeBlockingQueue
SizeBlockingQueue
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java
public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { private final BlockingQueue<E> queue; private final int capacity; private final AtomicInteger size = new AtomicInteger(); public SizeBlockingQueue(BlockingQueue<E> queue, int capacity) { assert capacity >= 0; this.queue = queue; this.capacity = capacity; } @Override public int size() { return size.get(); } public int capacity() { return this.capacity; } @Override public Iterator<E> iterator() { final Iterator<E> it = queue.iterator(); return new Iterator<E>() { E current; @Override public boolean hasNext() { return it.hasNext(); } @Override public E next() { current = it.next(); return current; } @Override public void remove() { // note, we can't call #remove on the iterator because we need to know // if it was removed or not if (queue.remove(current)) { size.decrementAndGet(); } } }; } @Override public E peek() { return queue.peek(); } @Override public E poll() { E e = queue.poll(); if (e != null) { size.decrementAndGet(); } return e; } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = queue.poll(timeout, unit); if (e != null) { size.decrementAndGet(); } return e; } @Override public boolean remove(Object o) { boolean v = queue.remove(o); if (v) { size.decrementAndGet(); } return v; } /** * Forces adding an element to the queue, without doing size checks. */ public void forcePut(E e) throws InterruptedException { size.incrementAndGet(); try { queue.put(e); } catch (InterruptedException ie) { size.decrementAndGet(); throw ie; } } @Override public boolean offer(E e) { while (true) { final int current = size.get(); if (current >= capacity()) { return false; } if (size.compareAndSet(current, 1 + current)) { break; } } boolean offered = queue.offer(e); if (!offered) { size.decrementAndGet(); } return offered; } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // note, not used in ThreadPoolExecutor throw new IllegalStateException("offer with timeout not allowed on size queue"); } @Override public void put(E e) throws InterruptedException { // note, not used in ThreadPoolExecutor throw new IllegalStateException("put not allowed on size queue"); } @Override public E take() throws InterruptedException { E e; try { e = queue.take(); size.decrementAndGet(); } catch (InterruptedException ie) { throw ie; } return e; } @Override public int remainingCapacity() { return capacity() - size.get(); } @Override public int drainTo(Collection<? super E> c) { int v = queue.drainTo(c); size.addAndGet(-v); return v; } @Override public int drainTo(Collection<? super E> c, int maxElements) { int v = queue.drainTo(c, maxElements); size.addAndGet(-v); return v; } @Override public Object[] toArray() { return queue.toArray(); } @Override public <T> T[] toArray(T[] a) { return (T[]) queue.toArray(a); } @Override public boolean contains(Object o) { return queue.contains(o); } @Override public boolean containsAll(Collection<?> c) { return queue.containsAll(c); } }
- SizeBlockingQueue继承了AbstractQueue,同时实现了BlockingQueue接口;它的构造器要求输入blockingQueue及capacity参数
- SizeBlockingQueue有个AtomicInteger类型的size参数用于记录queue的大小,它在poll、remove、offer、take等方法都会维护这个size参数
- 其中offer方法会判断当前size是否大于等于capacity,如果大于等于则直接返回false;而put方法则直接抛出IllegalStateException
ResizableBlockingQueue
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java
final class ResizableBlockingQueue<E> extends SizeBlockingQueue<E> { private volatile int capacity; ResizableBlockingQueue(BlockingQueue<E> queue, int initialCapacity) { super(queue, initialCapacity); this.capacity = initialCapacity; } @Override public int capacity() { return this.capacity; } @Override public int remainingCapacity() { return Math.max(0, this.capacity()); } /** Resize the limit for the queue, returning the new size limit */ public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) { assert adjustmentAmount > 0 : "adjustment amount should be a positive value"; assert optimalCapacity >= 0 : "desired capacity cannot be negative"; assert minCapacity >= 0 : "cannot have min capacity smaller than 0"; assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity"; if (optimalCapacity == capacity) { // Yahtzee! return this.capacity; } if (optimalCapacity > capacity + adjustmentAmount) { // adjust up final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount); this.capacity = newCapacity; return newCapacity; } else if (optimalCapacity < capacity - adjustmentAmount) { // adjust down final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount); this.capacity = newCapacity; return newCapacity; } else { return this.capacity; } } }
- ResizableBlockingQueue继承了SizeBlockingQueue,它提供了一个线程安全的adjustCapacity方法,用于resize队列的capacity
EsExecutors
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
public class EsExecutors { //...... public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) { BlockingQueue<Runnable> queue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity); } return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder); } public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, int maxQueueSize, int frameSize, TimeValue targetedResponseTime, ThreadFactory threadFactory, ThreadContext contextHolder) { if (initialQueueCapacity <= 0) { throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " + initialQueueCapacity); } ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity); return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory, new EsAbortPolicy(), contextHolder); } //...... }
- EsExecutors的newFixed创建的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed创建的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor
小结
- SizeBlockingQueue继承了AbstractQueue,同时实现了BlockingQueue接口;它的构造器要求输入blockingQueue及capacity参数;它有个AtomicInteger类型的size参数用于记录queue的大小,它在poll、remove、offer、take等方法都会维护这个size参数;其中offer方法会判断当前size是否大于等于capacity,如果大于等于则直接返回false;而put方法则直接抛出IllegalStateException
- ResizableBlockingQueue继承了SizeBlockingQueue,它提供了一个线程安全的adjustCapacity方法,用于resize队列的capacity
- EsExecutors的newFixed创建的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed创建的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Effective JavaScript
David Herman / Addison-Wesley Professional / 2012-12-6 / USD 39.99
"It's uncommon to have a programming language wonk who can speak in such comfortable and friendly language as David does. His walk through the syntax and semantics of JavaScript is both charming and h......一起来看看 《Effective JavaScript》 这本书的介绍吧!