public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
- 基于链表实现的阻塞队列,线程安全;
- 队头元素是存活时间最长的元素,队尾元素是存活时间最短的元素;
- 元素从队列尾进队,从队列头出队,符合FIFO;
- 链表实现的队列一般比基于数组实现有更高吞吐量,但比大多数并发应用的理想性能低;
- 默认队列最大长度为Integer.MAX_VALUE,新节点动态创建,已保存节点不会超过此值;
这是 two lock queue
算法的变体。putLock守卫元素的put、offer操作,且与等待存入的条件关联。takeLock原理类似。putLock和takeLock都依赖的 count
当元素已进入队列,putLock已被获取,且count变量也更新。随后,读取者通过获取putLock或获取takeLock,得到入队元素的可见性,然后读取 n = count.get();
- 允许一个异常的迭代器触发无限制的内存保留;
- 如果节点在老年代存活,会引起老节点和新节点间的跨代连接,令分代垃圾回收难以进行,导致重复老年代回收(major collections);
static class Node<E> { E item; // 节点数据 Node<E> next; // 下一节点 Node(E x) { item = x; } // 构造方法,存入节点的内容 }
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // 帮助GC head = first; E x = first.item; first.item = null; return x; }
void fullyLock() { putLock.lock(); takeLock.lock(); }
void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); // 自定义队列capacity this.capacity = capacity; // 头指针和尾指针指向同一个空节点 last = head = new Node<E>(null); }
public LinkedBlockingQueue(Collection<? extends E> c) { // capacity设置为Integer.MAX_VALUE this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; // 没有竞争,但对可见性来说有必要 putLock.lock(); try { int n = 0; // 依次遍历集合c for (E e : c) { if (e == null) // 集合元素为空抛出NullPointerException throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); // 在集合c中取元素,用元素创建节点存入队列 enqueue(new Node<E>(e)); // 添加元素递增 ++n; } // 最后把总添加元素更新至原子值count count.set(n); } finally { putLock.unlock(); } }
public int size() { return count.get(); }
public int remainingCapacity() { return capacity - count.get(); }
public void put(E e) throws InterruptedException { // 存入元素不能为空 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 此锁可以被中断 putLock.lockInterruptibly(); try { // 队列已满则原地等待直到队列出现空余 while (count.get() == capacity) { notFull.await(); } // 元素插入到队列尾部 enqueue(node); // 已保存元素数量递增 c = count.getAndIncrement(); // 检查队列是否已满 if (c + 1 < capacity) // 通知其他线程继续插入元素 notFull.signal(); } finally { // 解除锁 putLock.unlock(); } if (c == 0) signalNotEmpty(); }
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // e为空抛出NullPointerException if (e == null) throw new NullPointerException(); // 根据超时值和时间单位转换为纳秒时长 long nanos = unit.toNanos(timeout); int c = -1; // 获取putLock final ReentrantLock putLock = this.putLock; // 获取已有元素总数 final AtomicInteger count = this.count; // 锁putLock设置为可中断 putLock.lockInterruptibly(); try { // 队列已满则原地等待直到队列出现空余 while (count.get() == capacity) { if (nanos <= 0L) // 到达超时时间,退出方法并返回false return false; // 时间倒计时 nanos = notFull.awaitNanos(nanos); } // 元素进入队列 enqueue(new Node<E>(e)); // 队列元素总数递增 c = count.getAndIncrement(); if (c + 1 < capacity) // 唤醒其他写入线程 notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
public boolean offer(E e) { // 存入元素不能为null if (e == null) throw new NullPointerException(); // 获取队列元素总数 final AtomicInteger count = this.count; // 队列已满,退出 if (count.get() == capacity) return false; int c = -1; // 创建新节点 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; // 获取不可中断锁 putLock.lock(); try { // 还有剩余空间 if (count.get() < capacity) { // 向队列存入元素 enqueue(node); // 队列保存元素总数递增 c = count.getAndIncrement(); // 队列没满,通知其他线程写入 if (c + 1 < capacity) notFull.signal(); } } finally { // 解锁 putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
public E take() throws InterruptedException { E x; int c = -1; // 获取队列元素总数 final AtomicInteger count = this.count; // 获取takeLock final ReentrantLock takeLock = this.takeLock; // 上锁,设置为可中断 takeLock.lockInterruptibly(); try { // 如果队列为空,则等待其他线程通知 while (count.get() == 0) { notEmpty.await(); } // 队列有可用元素,队头元素出列 x = dequeue(); // 队列元素总数递减 c = count.getAndDecrement(); // 如果队列还有元素,通知其他线程取数据 if (c > 1) notEmpty.signal(); } finally { // 解锁 takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; // 转换超时时间为纳秒 long nanos = unit.toNanos(timeout); // 获取队列元素总数 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 上锁takeLock,设置为可中断 takeLock.lockInterruptibly(); try { // 当队列为空,在超时时间内等待 while (count.get() == 0) { if (nanos <= 0L) // 到达超时时间,返回null return null; // 超时时间倒数 nanos = notEmpty.awaitNanos(nanos); } // 队头元素出列 x = dequeue(); // 队列元素总数递减 c = count.getAndDecrement(); // 队列还有可用元素,通知其他线程获取数据 if (c > 1) notEmpty.signal(); } finally { // 解锁 takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
public E poll() { // 获取元素数量 final AtomicInteger count = this.count; // 队列中没有元素则返回null if (count.get() == 0) return null; // 队列中有元素,开始以下逻辑 E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { // 元素从对头出队 x = dequeue(); // 队列元素数量递减 c = count.getAndDecrement(); // 队列还有可用元素,通知其他线程获取数据 if (c > 1) notEmpty.signal(); } } finally { // 解锁 takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
public E peek() { // 队列没有节点直接返回null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 返回对头元素的内容,否则返回null return (count.get() > 0) ? head.next.item : null; } finally { // 解锁 takeLock.unlock(); } }
void unlink(Node<E> p, Node<E> pred) { // assert putLock.isHeldByCurrentThread(); // assert takeLock.isHeldByCurrentThread(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; // 节点p的内容置空 pred.next = p.next; // 操作p前后节点,令p解除链接 if (last == p) // 如果p是尾节点,则调整尾指针的指向 last = pred; if (count.getAndDecrement() == capacity) notFull.signal(); }
public boolean remove(Object o) { // 元素为null直接返回false if (o == null) return false; fullyLock(); try { for (Node<E> pred = head, p = pred.next; p != null; pred = p, p = p.next) { // 在链表上逐个查找元素是否匹配 if (o.equals(p.item)) { // 找到匹配元素则把该元素解除链接 unlink(p, pred); // 元素返回true return true; } } // 没有移除元素 return false; } finally { fullyUnlock(); } }
public boolean contains(Object o) { // 元素为null直接返回false if (o == null) return false; fullyLock(); try { // 遍历队列逐个查找元素 for (Node<E> p = head.next; p != null; p = p.next) // 找到匹配元素 if (o.equals(p.item)) return true; // 找不到匹配元素 return false; } finally { fullyUnlock(); } }
public Object[] toArray() { fullyLock(); try { // 获取队列中元素个数 int size = count.get(); // 通过元素数量构造数组 Object[] a = new Object[size]; int k = 0; // 遍历队列,一次拷贝元素引用到数组对应索引 for (Node<E> p = head.next; p != null; p = p.next) a[k++] = p.item; // 返回数组 return a; } finally { fullyUnlock(); } }
@SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { fullyLock(); try { // 获取队列元素总数 int size = count.get(); // 队列元素总数超过传入数组的长度 if (a.length < size) // 创建新的数组,长度为队列元素总数 a = (T[])java.lang.reflect.Array.newInstance (a.getClass().getComponentType(), size); int k = 0; // 依次把队列的元素存入数组中 for (Node<E> p = head.next; p != null; p = p.next) a[k++] = (T)p.item; // 如果数组还有空余空间,则该位置设为null if (a.length > k) a[k] = null; // 返回数组 return a; } finally { fullyUnlock(); } }
public void clear() { // 上锁 fullyLock(); try { for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; // 置空节点的数据 } head = last; // 由于队列元素全清空了,所以头指针和为指针引用相同 // assert head.item == null && head.next == null; if (count.getAndSet(0) == capacity) notFull.signal(); } finally { fullyUnlock(); } }
public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); }
public int drainTo(Collection<? super E> c, int maxElements) { Objects.requireNonNull(c); // 不能把本队列的元素添加到自己队列上 if (c == this) throw new IllegalArgumentException(); // maxElements须为正数 if (maxElements <= 0) return 0; boolean signalNotFull = false; final ReentrantLock takeLock = this.takeLock; // takeLock上锁 takeLock.lock(); try { // 计算需要转移多少个队列元素 int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { while (i < n) { // 从列表取节点 Node<E> p = h.next; // 节点数据添加到数组中 c.add(p.item); // 置空节点的数据 p.item = null; // 引用移动到下一个节点 h.next = h; h = p; // 转移节点数递增 ++i; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { // assert h.item == null; // 已经转移了i个元素,队列的头引用需要向后移动i个位置 head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }
- 出队节点(p.next == p)
- (可能多个)内部已移除节点(p.item == null)
Node<E> succ(Node<E> p) { if (p == (p = p.next)) p = head.next; return p; }
public Iterator<E> iterator() { return new Itr(); }
private class Itr implements Iterator<E> { private Node<E> next; // Node holding nextItem private E nextItem; // next item to hand out private Node<E> lastRet; private Node<E> ancestor; // Helps unlink lastRet on remove() Itr() { fullyLock(); try { if ((next = head.next) != null) nextItem = next.item; } finally { fullyUnlock(); } } public boolean hasNext() { return next != null; } public E next() { Node<E> p; if ((p = next) == null) throw new NoSuchElementException(); lastRet = p; E x = nextItem; fullyLock(); try { E e = null; for (p = p.next; p != null && (e = p.item) == null; ) p = succ(p); next = p; nextItem = e; } finally { fullyUnlock(); } return x; } public void forEachRemaining(Consumer<? super E> action) { // A variant of forEachFrom Objects.requireNonNull(action); Node<E> p; if ((p = next) == null) return; lastRet = p; next = null; final int batchSize = 64; Object[] es = null; int n, len = 1; do { fullyLock(); try { if (es == null) { p = p.next; for (Node<E> q = p; q != null; q = succ(q)) if (q.item != null && ++len == batchSize) break; es = new Object[len]; es[0] = nextItem; nextItem = null; n = 1; } else n = 0; for (; p != null && n < len; p = succ(p)) if ((es[n] = p.item) != null) { lastRet = p; n++; } } finally { fullyUnlock(); } for (int i = 0; i < n; i++) { @SuppressWarnings("unchecked") E e = (E) es[i]; action.accept(e); } } while (n > 0 && p != null); } public void remove() { Node<E> p = lastRet; if (p == null) throw new IllegalStateException(); lastRet = null; fullyLock(); try { if (p.item != null) { if (ancestor == null) ancestor = head; ancestor = findPred(p, ancestor); unlink(p, ancestor); } } finally { fullyUnlock(); } } }
private final class LBQSpliterator implements Spliterator<E> { static final int MAX_BATCH = 1 << 25; // max batch array size; Node<E> current; // current node; null until initialized int batch; // batch size for splits boolean exhausted; // true when no more nodes long est = size(); // size estimate LBQSpliterator() {} public long estimateSize() { return est; } public Spliterator<E> trySplit() { Node<E> h; if (!exhausted && ((h = current) != null || (h = head.next) != null) && h.next != null) { int n = batch = Math.min(batch + 1, MAX_BATCH); Object[] a = new Object[n]; int i = 0; Node<E> p = current; fullyLock(); try { if (p != null || (p = head.next) != null) for (; p != null && i < n; p = succ(p)) if ((a[i] = p.item) != null) i++; } finally { fullyUnlock(); } if ((current = p) == null) { est = 0L; exhausted = true; } else if ((est -= i) < 0L) est = 0L; if (i > 0) return Spliterators.spliterator (a, 0, i, (Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT)); } return null; } public boolean tryAdvance(Consumer<? super E> action) { Objects.requireNonNull(action); if (!exhausted) { E e = null; fullyLock(); try { Node<E> p; if ((p = current) != null || (p = head.next) != null) do { e = p.item; p = succ(p); } while (e == null && p != null); if ((current = p) == null) exhausted = true; } finally { fullyUnlock(); } if (e != null) { action.accept(e); return true; } } return false; } public void forEachRemaining(Consumer<? super E> action) { Objects.requireNonNull(action); if (!exhausted) { exhausted = true; Node<E> p = current; current = null; forEachFrom(action, p); } } public int characteristics() { return (Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT); } }
public Spliterator<E> spliterator() { return new LBQSpliterator(); }
Node<E> findPred(Node<E> p, Node<E> ancestor) { // assert p.item != null; if (ancestor.item == null) ancestor = head; // Fails with NPE if precondition not satisfied for (Node<E> q; (q = ancestor.next) != p; ) ancestor = q; return ancestor; }
public boolean removeAll(Collection<?> c) { Objects.requireNonNull(c); return bulkRemove(e -> c.contains(e)); }
public boolean retainAll(Collection<?> c) { Objects.requireNonNull(c); return bulkRemove(e -> !c.contains(e)); }
@SuppressWarnings("unchecked") private boolean bulkRemove(Predicate<? super E> filter) { boolean removed = false; Node<E> p = null, ancestor = head; Node<E>[] nodes = null; int n, len = 0; do { // 1. Extract batch of up to 64 elements while holding the lock. long deathRow = 0; // "bitset" of size 64 fullyLock(); try { if (nodes == null) { if (p == null) p = head.next; for (Node<E> q = p; q != null; q = succ(q)) if (q.item != null && ++len == 64) break; nodes = (Node<E>[]) new Node<?>[len]; } for (n = 0; p != null && n < len; p = succ(p)) nodes[n++] = p; } finally { fullyUnlock(); } // 2. Run the filter on the elements while lock is free. for (int i = 0; i < n; i++) { final E e; if ((e = nodes[i].item) != null && filter.test(e)) deathRow |= 1L << i; } // 3. Remove any filtered elements while holding the lock. if (deathRow != 0) { fullyLock(); try { for (int i = 0; i < n; i++) { final Node<E> q; if ((deathRow & (1L << i)) != 0L && (q = nodes[i]).item != null) { ancestor = findPred(q, ancestor); unlink(q, ancestor); removed = true; } } } finally { fullyUnlock(); } } } while (n > 0 && p != null); return removed; }
