并发编程技术十之j.u.c中的BlockingQueue

栏目: Java · 发布时间: 6年前

内容简介:针对JDK中的j.u.c我们已经讲了三篇文章了,计划我是分了五篇文章来细讲j.u.c中的api。今天 我们讲的BlockingQueue是第四篇,也是在使用中最频发,面试必提的一个话题。我在网上搜了个用列表来讲解队列, 通过这个列表一眼就能了解jdk 在设计 API 时每个队列的特性,也能看出他们的相同点和不同点阻塞队列的概念

针对JDK中的j.u.c我们已经讲了三篇文章了,计划我是分了五篇文章来细讲j.u.c中的api。今天 我们讲的BlockingQueue是第四篇,也是在使用中最频发,面试必提的一个话题。

我在网上搜了个用列表来讲解队列, 通过这个列表一眼就能了解jdk 在设计 API 时每个队列的特性,也能看出他们的相同点和不同点

队列

有界性

数据结构

ArrayBlockingQueue

bounded( 有界 )

加锁

arrayList

LinkedBlockingQueue

optionally-bounded

加锁 linkedList

PriorityBlockingQueue

unbounded 加锁 heap

DelayQueue

unbounded 加锁 heap

SynchronousQueue

bounded 加锁

LinkedTransferQueue

unbounded 加锁 heap

LinkedBlockingDeque

unbounded

无锁

heap

阻塞队列的概念

ArrayBlockingQueue :是一个用数组实现的有界阻塞队列,此队列按照先进先出( FIFO )的原则对元素进行排序。支持公平锁和非公平锁

LinkedBlockingQueue :一个由链表结构组成的有界队列,此队列的长度为 Integer.MAX_VALUE 。此队列按照先进先出的顺序进行排序。

PriorityBlockingQueue :一个支持线程优先级 排序 的无界队列,默认自然序进行排序,也可以自定义实现 compareTo() 方法来指定元素排序规则,不能保证同优先级元素的顺序

DelayQueue :一个实现 PriorityBlockingQueue 实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。

SynchronousQueue :一个不存储元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能添加元素。支持公平锁和非公平锁。 SynchronousQueue 的一个使用场景是在线程池里 LinkedTransferQueue :一个由链表结构组成的无界阻塞队列,相当于其它队列, LinkedTransferQueue 队列多了 transfer tryTransfer 方法

LinkedBlockingDeque :一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半

下面我们对每个类中的API 进行了解和分析

ArrayBlockingQueue

ArrayBlockingQueue继承  AbstractQueue 实现 BlockingQueue


 

public class ArrayBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

/** The queued items */

final Object[] items;

/** items index for next take, poll, peek or remove */

int takeIndex;

/** items index for next put, offer, or add */

int putIndex;

/** Number of elements in the queue */

int count;

/** Main lock guarding all access */

final ReentrantLock lock;

/** Condition for waiting takes */

private final Condition notEmpty;

/** Condition for waiting puts */

private final Condition notFull;

transient Itrs itrs = null;

...

}

主要实现通过 items 数组来实现数据结构。通过 takeIndexputIndex 组合来标识下标, 通过 ReentrantLock 来进行加锁,通过 Condition 来对队列为元素的为空或不满 队列的阻塞

主要的方法

  • add //添加元素,满时会 抛异常

  • offer //添加元素,满时直接返回false

  • put //添加队列满时阻塞

  • poll //移除  队列第 一个元素,为空时直接返回null

  • take // 移除队列第一个元素,为空时等待

  • remove  // 移除队列第一个元素,为空时异常

  • remove(Object) //移除指定元素,若是第一个直接移除, 否则下面的元素上移,致空最后一个。

  • peek // 获取队列中第一个元素

  • size //当前队列数

通过以上方法可以分为以下几类

  • 添加元素

  • 移除元素

  • 元素个数

LinkedBlockingQueue

是一个链表队列那么实现必须使用Node来实现,看下类的定义


 

public class LinkedBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {


/**

* Linked list node class

*/

static class Node<E> {

E item;

Node<E> next;

Node(E x) { item = x; }

}

/** The capacity bound, or Integer.MAX_VALUE if none */

private final int capacity;

/** Current number of elements */

private final AtomicInteger count = new AtomicInteger();

/**

* Head of linked list.

* Invariant: head.item == null

*/

transient Node<E> head;

/**

* Tail of linked list.

* Invariant: last.next == null

*/

private transient Node<E> last;

/** Lock held by take, poll, etc */

private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */

private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */

private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */

private final Condition notFull = putLock.newCondition();

...

}

这里的阻塞也是通过Condition来实现

主要方法

  • add //添加 元素,满时抛异常

  • put //添加元素,满时阻塞

  • offer// 添加元素,满时返回false

  • poll //移除元素,队列为空时返回null

  • remove //移除元素,队列为空 抛异常

  • remove(Object) //移除指定元素,移除时需要在链表中查找元素,然后在移除

  • take //移除 元素为空阻塞

  • size //获取  元素个数

  • drainTo //一次性从队列中获取所有( 可指定个数)可用元素,获取后在队列移除

  • iterator //转化成迭代器

  • remainingCapacity 获取 剩余队列大小

  • sliterator //转化迭代器( JDK1.8)

PriorityBlockingQueue

PriorityBlockingQueue是通过Comparator接口实现对象比较


 

public class PriorityBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

private static final long serialVersionUID = 5595510919245408276L;

}

主要方法

  • add //添加元素,  满时跑异常

  • offer //添加元素,满时返回false

  • put //添加元素,满时阻塞

  • poll //移除 元素,队列为空时返回null

  • take // 移除元素, 队列为空阻塞

  • remove //移除元素,队列为空时抛异常

  • remove(Object)

  • comparator //返回此队列元素进行排序的比较器

  • drainTo // 一次性从队列中 获取所有(可指定范围)元素

  • iterator //转化换代器

  • peek //获取队列中第一个元素

  • remainingCapacity //获取剩余 队列大小

  • spliterator //转化换代器(jdk1.8)

  • size 返回队列元素个数

DelayQueue

内部使用是非线程安全的优先队列 PriorityQueue


 

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>

implements BlockingQueue<E> {


private final transient ReentrantLock lock = new ReentrantLock();

private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition available = lock.newCondition();

}

主要方法

  • add  //添加元素

  • offer //添加元素

  • put //添加元素,满时阻塞

  • take  //移除元素,为空阻塞;延时时阻塞;有leader线程时阻塞;自己为leader时阻塞

  • poll   //移除元素,为空返回null

  • remove //移除元素

  • size   //返回队列元素个数

  • remainingCapacity

  • iterator  //转化成迭代器

  • drainTo  //获取所有(可指定个数)可用元素

  • clear  //清除队列

  • toArray //转化为数组

SynchronousQueue

内部定义抽象静态类Transferer,通过两个内部类来实现公平(TransferQueue)和非公平(TransferStack)模式下的队列操作。


 

public class SynchronousQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

/**

* Shared internal API for dual stacks and queues.

*/

abstract static class Transferer<E> {

abstract E transfer(E e, boolean timed, long nanos);

}


/** The number of CPUs, for spin control */

static final int NCPUS = Runtime.getRuntime().availableProcessors();

static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

static final int maxUntimedSpins = maxTimedSpins * 16;

static final long spinForTimeoutThreshold = 1000L;

...

}

TransferStack 内部通过单向链表SNode实现双重栈,数据是 LIFO的 顺序。TransferQueue是通过内部定义QNode 实现,数据是FIFO的顺序。

主要方法

  • put添加元素后阻塞

  • offer  添加元素,直接返回false,若其它线程正好取出返回true

  • take 移除元素,若未有元素时阻塞

  • poll 移除元素,直接返回null,若其它线程正好添加返回元素

  • 其它方法都是默认值。没有实际意义!

LinkedTransferQueue

LInkedTransferQueue是TransferQueue 的实现类,是一个无界队列, 具有先进先出FIFO特性


 

public class LinkedTransferQueue<E> extends AbstractQueue<E>

implements TransferQueue<E>, java.io.Serializable {

}

主要方法

  • transfer  当存在一个正在等待获取的消费线程,则立即传输数据。否则会插入到队列尾部并阻塞等待消费线程

  • tryTransfer 肖存在一个正在等待获取的消费线程,则立即传输,否则返回false

  • hasWaitingConsumer 判断是否存在消费者线程

  • getWaitingConsumerCount 获取所有等待获取元素的消费线程数量

LinkedBlockingDeque

在分析LinkedBlockingDeque一般与LinkedBlockingQueue一起分析。这两个比较相似。在上面刚刚分析了LinkedBlockingQueue它是链表通过Node来实现,但入队和出队只能一端。而LInkedBlockingDeque也是链表,它可以通过两端同时操作(入队和出队)。

原因是 LinkedBlockingDeque 中的 Node是 双向链表, LinkedBlockingQueue 是单向链表


 

public class LinkedBlockingDeque<E>

extends AbstractQueue<E>

implements BlockingDeque<E>, java.io.Serializable {


/** Doubly-linked list node class */

static final class Node<E> {

E item;

Node<E> prev;

Node<E> next;


Node(E x) {

item = x;

}

}

}

主要方法

  • add /addFirst/addLast

  • offer/offerFirst/offerLast

  • poll/pollFirst/pollLast

  • put/putFirst/putLast

  • remove/removeFirst/removeLast/pop

  • getFirst/getLast

以上是根据个人理解做了分析,如有不正确请留言讨论。

----------------------------------------------------------

再次感谢,欢迎关注微信公众号“零售云技术”,文章持续更新,或留言讨论

并发编程技术十之j.u.c中的BlockingQueue


以上所述就是小编给大家介绍的《并发编程技术十之j.u.c中的BlockingQueue》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Intersectional Internet

The Intersectional Internet

Safiya Umoja Noble、Brendesha M. Tynes / Peter Lang Publishing / 2016

From race, sex, class, and culture, the multidisciplinary field of Internet studies needs theoretical and methodological approaches that allow us to question the organization of social relations that ......一起来看看 《The Intersectional Internet》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

URL 编码/解码
URL 编码/解码

URL 编码/解码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具