如果构造方法的容量参数没有指定,则 Integer#MAX_VALUE 将作为默认容量使用。而队列元素插入时,对应链节点动态创建。
多数操作能在常量时间内完成执行。例外的是 remove(Object) 、 removeFirstOccurrence 、 removeLastOccurrence 、 contains 、 iterator.remove() 和批量操作等方法,消耗的时间是线性的。
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable
static final class Node<E> { // item,节点已被移除时为空 E item; // 其一: // - 真正的前导节点 // - 指向本节点,表明前导节点是尾节点 // - null,表明没有前导节点 Node<E> prev; // 其一: // - 真正的后继节点 // - 指向本节点,表明后继节点是头节点 // - null,表明没有后继节点 Node<E> next; // 构造方法 Node(E x) { item = x; } }
// 指向队列首节点 // Invariant: (first == null && last == null) || (first.prev == null && first.item != null) transient Node<E> first; // 指向队列最后节点 // Invariant: (first == null && last == null) || (last.next == null && last.item != null) transient Node<E> last; // 双端队列已保存元素数量 private transient int count; // 双端队列元素最大容量 private final int capacity; // 保护所有访问操作的主锁 final ReentrantLock lock = new ReentrantLock(); // 等待获取的Condition private final Condition notEmpty = lock.newCondition(); // 等待存入的Condition private final Condition notFull = lock.newCondition();
// 用Integer#MAX_VALUE构建实例 public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } // 用指定容量值构建实例 public LinkedBlockingDeque(int capacity) { // 值小于1时抛出IllegalArgumentException if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } // 用Integer#MAX_VALUE构建实例,且用指定集合的元素作为双端队列的初始内容 // c为空时抛出NullPointerException public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); addAll(c); }
5.1 link
// 把指定节点作为第一个元素存入,如果队列已满返回false private boolean linkFirst(Node<E> node) { // 检查容量是否已满 if (count >= capacity) return false; // 获取第一个节点 Node<E> f = first; // 新节点作为原首节点的前导节点 node.next = f; first = node; // 处理尾引用 if (last == null) last = node; else f.prev = node; // 元素数量统计递增 ++count; // 通知其他线程取数据 notEmpty.signal(); return true; } // 把指定节点作为最后一个元素存入,如果队列已满返回false private boolean linkLast(Node<E> node) { // 检查容量是否已满 if (count >= capacity) return false; // 获取最后一个节点 Node<E> l = last; // 新节点作为原尾节点的后继节点 node.prev = l; last = node; // 处理头引用 if (first == null) first = node; else l.next = node; // 元素数量统计递增 ++count; // 通知其他线程取数据 notEmpty.signal(); return true; }
5.2 unlink
// 移除并返回第一个元素,如果为空则返回null private E unlinkFirst() { // 获取首节点 Node<E> f = first; if (f == null) return null; // 首节点的下一个节点 Node<E> n = f.next; // 获取首节点的item E item = f.item; // 清空首节点 f.item = null; // 首节点的后继节点引用为自己 f.next = f; // 处理头引用 first = n; if (n == null) last = null; else n.prev = null; // 元素数量统计递减 --count; notFull.signal(); // 返回首节点的内容 return item; } // 移除并返回最后一个元素,如果为空则返回null private E unlinkLast() { // 获取尾节点 Node<E> l = last; if (l == null) return null; // 尾节点的上一个节点 Node<E> p = l.prev; // 获取尾节点的item E item = l.item; // 清空尾节点 l.item = null; l.prev = l; // 处理尾引用 last = p; if (p == null) first = null; else p.next = null; // 元素数量统计递减 --count; notFull.signal(); // 返回尾节点的内容 return item; } // 从队列中移除x节点 void unlink(Node<E> x) { Node<E> p = x.prev; Node<E> n = x.next; // p为空表示x是队列的首节点 if (p == null) { unlinkFirst(); } else if (n == null) { // n为空表示x是队列的尾节点 unlinkLast(); } else { // x是队列的中间节点 p.next = n; n.prev = p; x.item = null; // 不要置空x.next的节点,因为x节点在iterator里可能在使用 --count; notFull.signal(); } }
5.3 阻塞双端队列
两个 add 方法是 offer 方法的辩题,如果元素添加失败会直接抛出异常。若deque容量已满抛出 IllegalStateException 。存入元素为空则抛出 NullPointerException
public void addFirst(E e) { if (!offerFirst(e)) throw new IllegalStateException("Deque full"); } public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); } public boolean add(E e) { addLast(e); return true; } public boolean offer(E e) { return offerLast(e); } public void put(E e) throws InterruptedException { putLast(e); } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { return offerLast(e, timeout, unit); } // 默认移除并返回双端队列的头节点 public E remove() { return removeFirst(); } public E poll() { return pollFirst(); } public E take() throws InterruptedException { return takeFirst(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { return pollFirst(timeout, unit); } // 获取双端队列的头节点,但是获取之后不会移除该节点 public E element() { return getFirst(); } public E peek() { return peekFirst(); } // 返回双端队列可用容量 // 不要通过观察队列剩余容量来确定元素成功插入,因为其他线程也可能在增加或移除元素 public int remainingCapacity() { final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { // 计算剩余有效容量 return capacity - count; } finally { // 解锁 lock.unlock(); } } public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } public int drainTo(Collection<? super E> c, int maxElements) { Objects.requireNonNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { int n = Math.min(maxElements, count); for (int i = 0; i < n; i++) { c.add(first.item); // In this order, in case add() throws. unlinkFirst(); } return n; } finally { // 解锁 lock.unlock(); } }
5.4 offer
public boolean offerFirst(E e) { // 存入元素不能为空 if (e == null) throw new NullPointerException(); // 把e封装为新Node Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { return linkFirst(node); } finally { // 解锁 lock.unlock(); } } public boolean offerLast(E e) { // 存入元素不能为空 if (e == null) throw new NullPointerException(); // 把e封装为新Node Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { return linkLast(node); } finally { // 解锁 lock.unlock(); } }
等待存入元素到队列头时被中断抛出 InterruptedException
public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException { // 存入元素不能为空 if (e == null) throw new NullPointerException(); // 创建新节点 Node<E> node = new Node<E>(e); // 换算为纳秒 long nanos = unit.toNanos(timeout); // 获取锁,且锁可中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (!linkFirst(node)) { if (nanos <= 0L) return false; nanos = notFull.awaitNanos(nanos); } return true; } finally { // 解锁 lock.unlock(); } }
等待存入元素到队列尾时被中断抛出 InterruptedException
public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException { // 存入元素不能为空 if (e == null) throw new NullPointerException(); // 创建新节点 Node<E> node = new Node<E>(e); // 换算为纳秒 long nanos = unit.toNanos(timeout); // 获取锁,且锁可中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (!linkLast(node)) { if (nanos <= 0L) return false; nanos = notFull.awaitNanos(nanos); } return true; } finally { // 解锁 lock.unlock(); } }
5.5 put
指定元素e为空时抛出 NullPointerException ,等待存入元素过程中被中断抛出 InterruptedException
public void putFirst(E e) throws InterruptedException { // 存入元素不能为空 if (e == null) throw new NullPointerException(); // 把e封装为新Node Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { while (!linkFirst(node)) notFull.await(); } finally { // 解锁 lock.unlock(); } }
public void putLast(E e) throws InterruptedException { // 存入元素不能为空 if (e == null) throw new NullPointerException(); // 把e封装为新Node Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { while (!linkLast(node)) notFull.await(); } finally { // 解锁 lock.unlock(); } }
5.6 remove
public E removeFirst() { E x = pollFirst(); if (x == null) throw new NoSuchElementException(); return x; } public E removeLast() { E x = pollLast(); if (x == null) throw new NoSuchElementException(); return x; }
5.7 poll
public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } } public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); } finally { lock.unlock(); } } // 带超时的poll public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; while ( (x = unlinkFirst()) == null) { if (nanos <= 0L) return null; nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } } // 带超时的poll public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; while ( (x = unlinkLast()) == null) { if (nanos <= 0L) return null; nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } }
5.8 take
public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkLast()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } }
5.9 get
若队列为空则抛出 NoSuchElementException
public E getFirst() { E x = peekFirst(); if (x == null) throw new NoSuchElementException(); return x; }
若队列为空则抛出 NoSuchElementException
public E getLast() { E x = peekLast(); if (x == null) throw new NoSuchElementException(); return x; }
5.10 peek
public E peekFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return (first == null) ? null : first.item; } finally { lock.unlock(); } } public E peekLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return (last == null) ? null : last.item; } finally { lock.unlock(); } }
5.11 removeOccurrence
public boolean removeFirstOccurrence(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { // 顺序查找 for (Node<E> p = first; p != null; p = p.next) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } }
public boolean removeLastOccurrence(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { // 倒叙查找 for (Node<E> p = last; p != null; p = p.prev) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } }
5.12 栈方法
public void push(E e) { addFirst(e); }
public E pop() { return removeFirst(); }
5.13 Collection方法
// 移除双端队列第一次遇到的指定节点,若节点不存在返回false public boolean remove(Object o) { return removeFirstOccurrence(o); } // 返回双端队列以保存节点数量 public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } } // 若双端队列包含指定元素返回true,也可能是队列中保存了多个相同元素,查找时命中其中一个 public boolean contains(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { lock.unlock(); } } // 把指定集合的元素全部添加到本双端队列尾部中,元素插入的顺序由c的迭代器决定 // 当插入集合c和实例本身是同一双端队列,抛出 IllegalArgumentException // 双端队列为空抛出 IllegalStateException public boolean addAll(Collection<? extends E> c) { if (c == this) throw new IllegalArgumentException(); // Copy c into a private chain of Nodes // 把c所有元素迁移到私有节点链 Node<E> beg = null, end = null; int n = 0; // 把c所有元素封装为Node,并连接到链beg for (E e : c) { Objects.requireNonNull(e); n++; Node<E> newNode = new Node<E>(e); if (beg == null) beg = end = newNode; else { // end的后继节点是newNode end.next = newNode; // newNode的前导节点是end newNode.prev = end; // 尾引用从end移到newNode end = newNode; } } // 集合c没有元素,或里面的元素全为null if (beg == null) return false; // 把元素原子性地插入到队列尾部中 final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { if (count + n <= capacity) { beg.prev = last; if (first == null) // beg作为头结点 first = beg; else // 元素插入到队列尾部 last.next = beg; // 更新尾引用 last = end; // 更新队列元素总数量 count += n; // 通知其他线程获取元素 notEmpty.signalAll(); return true; } } finally { lock.unlock(); } // 当元素插入容量溢出导致IllegalStateException时,回到旧的非原子性的实现方法 return super.addAll(c); }
5.14 toArray
s@SuppressWarnings("unchecked") public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { // 创建新数组,大小为count Object[] a = new Object[count]; int k = 0; for (Node<E> p = first; p != null; p = p.next) // 把队列的元素逐个放入数组中 a[k++] = p.item; // 返回数组 return a; } finally { // 解锁 lock.unlock(); } }
如果传入数组空间足够存入所有元素,该数组的下一个空间会被置为 null 。
本方法可以实现队列转数组的功能: String[] y = x.toArray(new String[0]); 。且值得注意的是,传入 toArray(new Object[0]) 和 传入 toArray() 的效果完全相同。
指定数组元素的运行时类型不是双端队列元素的运行时类型的子类时抛出 ArrayStoreException ; 指定数组为空抛出 NullPointerException ;
@SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; // 上锁 lock.lock(); try { // 双端队列元素数量比指定数组空间大,则需要创建新数组并赋值给a if (a.length < count) // 创建类型为a.getClass().getComponentType(),大小为count的数组 a = (T[])java.lang.reflect.Array.newInstance (a.getClass().getComponentType(), count); // 初始数组索引k int k = 0; for (Node<E> p = first; p != null; p = p.next) // 把队列的元素逐个放入数组中 a[k++] = (T)p.item; if (a.length > k) // 置空索引k的值 a[k] = null; // 返回数组 return a; } finally { // 解锁 lock.unlock(); } }
5.15 clear
public void clear() { final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { for (Node<E> f = first; f != null; ) { // 解除节点链接,并清空前导引用,后继引用和节点的负载 f.item = null; Node<E> n = f.next; f.prev = null; f.next = null; f = n; } first = last = null; // 头尾指针置空 count = 0; // 总元素数置0 notFull.signalAll(); } finally { // 解锁 lock.unlock(); } }
5.16 succ
- 已出队节点 (p.next == p)
- 可能多个内部已移除节点 (p.item == null)
Node<E> succ(Node<E> p) { if (p == (p = p.next)) p = first; return p; }
5.17 checkInvariants
void checkInvariants() { // assert lock.isHeldByCurrentThread(); // Nodes may get self-linked or lose their item, but only // after being unlinked and becoming unreachable from first. for (Node<E> p = first; p != null; p = p.next) { // assert p.next != p; // assert p.item != null; } }
