内容简介:AQS,全称AbstractQueuedSynchronizer,即抽象队列同步器,和CAS共同撑起了整个java.util.concurrent包,同时也是Java并发编程上绕不开的一个概念抽象队列同步器,以下统称AQS,用于解决的就是多线程并发访问控制问题。在传统的多线程编程中,如果有多个线程需要访问同一个变量,就需要使用synchronized来为临界区加锁(临界区:访问共享资源的程序段),但是这种方式既不“优雅”,也不高效(即使Java为其已经做了很多优化),更重要的是,不能实现更细粒度的控制(虽然
AQS,全称AbstractQueuedSynchronizer,即抽象队列同步器,和CAS共同撑起了整个java.util.concurrent包,同时也是 Java 并发编程上绕不开的一个概念
抽象队列同步器,以下统称AQS,用于解决的就是多线程并发访问控制问题。在传统的多线程编程中,如果有多个线程需要访问同一个变量,就需要使用synchronized来为临界区加锁(临界区:访问共享资源的程序段),但是这种方式既不“优雅”,也不高效(即使Java为其已经做了很多优化),更重要的是,不能实现更细粒度的控制(虽然可以通过大量额外程序代码实现)。这时候,AQS出现了,它提供了一种简洁优雅的机制来实现线程安全
本质上说,AQS是构建(包括锁在内)大部分同步组件的基础框架,它通过管理 资源状态量 和 线程同步队列 来实现资源的分发(如共享或独占)。接下来,我们就要对其实现方式来做进一步的讨论
内部组件
AQS的实现是基于同步状态量和一个FIFO的双向队列来实现的,下面就来分别讲述其各自的特点
同步状态
在类内部有一个被volatile修饰的整形变量state,其定义如下:
private volatile int state; 复制代码
这个变量官方称为 同步状态量 ,实际可以理解为一些共享资源,每有一个线程获取到了一个共享资源,则这个同步状态量就要减一,反之就需要加一,如果这个状态量为0,就表示共享资源已经被其他所有线程分完了,当前的线程只能等待
但是同步状态量并不能直接与资源划等号,它只是提供一种类似门禁的操作(可以类比锁),任何线程想要获取共享资源都需要先来询问这个同步状态量是否允许这样的操作,这也就间接实现了对于共享资源的线程安全控制
在AQS中,对于该变量提供了以下三个操作接口:
protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } /** * 通过CAS操作来更新state变量 */ protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } 复制代码
这里暂且提一句,CAS指的是CompareAndSwap,即比较并替换,是一种乐观锁的实现,其实现原理简单的说就是在更新一个值之前,先比较 变量内存地址的当前值 ,是否与 这个地址上预期应存储的值 一致,如果一致再进行更新。为了不喧宾夺主,关于CAS的东西暂且就说这么多,对于本文来说了解这些就已经够用了
然后再回来看这三个接口方法,其均被protected修饰符所修饰,所以这些接口方法并不是提供给用户调用的,而是供同步框架的开发者使用。这三个方法为同步状态量的修改操作提供了极大的控制权限,因此也需要谨慎使用
双向队列
AQS为了管理所有获取或没获取到同步状态的线程,使用了双向队列来管理这些线程,这个队列的节点定义如下:
static final class Node { /**表示当前节点在共享模式下等待 */ static final Node SHARED = new Node(); /** 表示当前节点在独占模式下等待 */ static final Node EXCLUSIVE = null; /** 表示当前节点线程需要取消等待 */ static final int CANCELLED = 1; /** 表示后继节点线程需要被唤醒 */ static final int SIGNAL = -1; /** 线程正在等待Condition */ static final int CONDITION = -2; /** 传播状态,表示下一次获取共享同步状态的操作会无条件传播下去 */ static final int PROPAGATE = -3; /** 节点的等待状态 */ volatile int waitStatus; /** 前驱节点 */ volatile Node prev; /** 后继节点 */ volatile Node next; /** 队列节点所代表的线程 */ volatile Thread thread; /** 等待队列中的后继节点,如果在共享模式下等待,则该变量为SHARED */ Node nextWaiter; /** 返回节点是否以共享模式在等待资源 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 返回当前节点的前驱节点,如果不存在则抛出异常 * @return 前驱节点 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {} Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } } 复制代码
在AQS中,使用head和tail来标识这个同步队列:
private transient volatile Node head; private transient volatile Node tail; 复制代码
在这个双向队列,更准确地来说是同步队列中,比较重要的点有两个,首先是可以很明显地看出同步节点有两种模式,分别为独占和共享。其次是每一个节点都有一个Thread类型的变量,也就是说每一个节点均代表着一个线程
同时,AQS也提供了一系列对同步队列的操作接口,其中的一些重要的方法我们放在下一节来详细讲解
同步原理
刚才我们简要了解了AQS中同步状态和同步队列的结构,接下来我们就要来分析为什么这两个东西就能实现并发安全控制
接口
AQS本身是不能直接使用的,因为其本质还是一个抽象类(尽管一个抽象方法都没有),如果想要使用AQS的话,我们仅需要继承AQS,并重写以下5个方法:
- boolean tryAcquire(int arg):尝试以独占模式获取同步状态
- boolean tryRelease(int arg):尝试以独占模式释放同步状态
- int tryAcquireShared(int arg):尝试以共享模式获取同步状态
- boolean tryReleaseShared(int arg):尝试以共享模式释放同步变量
- boolean isHeldExclusively():当同步器被当前线程以独占模式占用时返回true
当然,也可以选择不重写这些方法,但是不重写的情况下你是不能直接调用这些方法的,因为这些方法在AQS的实现中均会抛出一个UnsupportedOperationException异常
这些方法的含义很好理解,我们也很容易想到如何利用这5个接口完成线程同步的操作。比如,我们现在想要实现一个写锁,那么我们可以把同步状态的初始值设为1,然后实现tryAcquire和tryRelease方法,每当有线程来获取写锁时就尝试调用tryAcquire,写操作执行完之后就调用tryRelease方法
模板方法
刚才仅仅是做个实例,实际上刚才的这5个接口方法也仅仅是用于被模版方法调用,所以我们实际操作的还是模板方法而已,AQS中提供的模板方法有以下这些:
获取同步状态:
- void acquire(int arg):以独占模式获取同步状态
- void acquireShared(int arg):以共享模式获取同步状态
- void acquireInterruptibly(int arg):能够响应中断的acquire方法
- void acquireSharedInterruptibly(int arg):能够响应中断的acquireShared方法
- boolean tryAcquireNanos(int arg, long nanos):有超时限制的acquireInterruptibly方法
- boolean tryAcquireSharedNanos(int arg, long nanos):有超时限制的acquireSharedInterruptibly方法
释放同步状态:
- boolean release(int arg):独占式释放同步状态
- boolean releaseShared(int arg):共享式释放同步状态
获取队列上的所有线程:
- Colleaction<Thread> getQueuedThreads():获取同步队列上的线程集合
以上这些方法才是开发者直接调用的方法(而且因为这些方法被final修饰,所以也不可能被重写),我们这里用ReentranLock中的实现来举例,我们来看其lock和unlock方法的实现:
static final class NonfairSync extends Sync { // ... final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // ... } public void lock() { sync.lock(); } public void unlock() { sync.release(1); } 复制代码
这里有一个setExclusiveOwnerThread方法,该方法会将当前线程标识为获取了独占资源的线程。了解了这一点,我们再来看lock方法,首先会尝试更新同步状态量,如果更新失败,则将该线程添加到同步队列中。在acquire方法中,会首先调用tryAcquire方法,这也就印证了,之前我们提到的5个接口方法并不是直接调用,而是由模板方法来进行间接调用,关于这些模板方法的细节我们在下一个章节再来进一步讲解
具体实现
enq、addWaiter--入队
先来看enq方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 复制代码
这个方法实际上就是通过无限次尝试使用CAS操作把node节点添加到队尾,所以在理论上是有无限期阻塞线程的可能存在
再来看addWaiter方法
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 尝试快速地在队尾添加; 如果失败就使用enq添加 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } 复制代码
addWaiter的方法流程很简单,就是构造一个节点,然后尝试使用CAS直接将节点添加到队尾,如果失败再调用enq方法。重点不在这里,我们看addWaiter方法第一行构造的Node类型对象,这里我们要结合Node的构造方法来看:
Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } 复制代码
之前我们说过,nextWaiter标识着当前节点的模式,只有两个值可以选择, Node.EXCLUSIVE
或 Node.SHARED
,即独占模式或共享模式。也就是说,addWaiter方法是一个可以设置节点模式的enq方法
acquireQueued--按序获取同步状态
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 当前驱节点是头结点,且获取同步状态成功后,将当前节点设置为头结点 if (p == head && tryAcquire(arg)) { setHead(node); // 节点置空能尽快进行垃圾收集 p.next = null; failed = false; return interrupted; } // 不满足争获同步状态的条件,或争取失败, // 就判断并选择是否要进行进一步的操作(阻塞并中断线程) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
当调用该方法的时候,会先判断当前节点是否是队首节点的直接后继,如果是的话再尝试获取同步状态(也可以理解为获取锁)。因为保证了只有头结点的直接后继节点才能获取同步状态,所以也就保证了不会发生多个节点同时调用setHead来将自己设置为头结点这样的情况
更重要的一点是,这个方式是一个“死循环”,所以节点会不断尝试获取直到成功
acquire--独占式获取同步状态
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
这个方法本来想一开始就讲的,但是最后还是觉得先把其中的辅助方法讲了之后大家会更好理解。acquire方法仅仅是一个if条件语句,我们先看条件满足之后执行了什么方法:
static void selfInterrupt() { Thread.currentThread().interrupt(); } 复制代码
好家伙,直接调用interrupt把当前线程中断,难不成每次调用acquire方法都会中断一次当前线程?其实不是的,这也是大部分框架常用的一个技巧,就是把真正执行的操作放在if条件判断中,只有其当返回值不在预期之内的时候再执行if语句块中的内容
在本例中,acquire方法会先尝试获取同步状态,如果失败后再将当前线程构造为一个独占式同步节点并添加到队列中(addWaiter方法),然后会不断尝试获取同步状态并将自己设置为队首节点(acquireQueued方法)
这时候可能会有人好奇,acquireQueued不是一个死循环吗,那不是只有一种返回值?实际上,acquireQueued方法返回的并不是是否添加成功,而是interrupted这个局部变量,表示当前线程是否被中断,忘记的朋友可以翻上去再看一遍
在parkAndCheckInterrupt这个方法中,会先将线程阻塞,然后返回线程的中断标识(如果一直没有中断的话,线程就会一直阻塞直到unpark方法被调用),所以当线程被中断时(需要节点设置可以被中断),该方法会返回true,然后就会执行if语句块的内容,将线程中断
现在我们再来梳理下acquire方法的整个流程
- 尝试获取一次同步状态,如果失败则进入下一步
- 判断当前节点的前驱是否为头结点,如果是则尝试获取同步状态,否则会重复执行该操作,直到成功。如果节点设置允许中断,则会将线程阻塞,直到检测到中断信号
- 如果上一步由于检测到中断信号导致直接返回,则调用线程的interrupt方法中断当前线程,否则结束
acquireShared--共享式获取同步状态
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } 复制代码
方法简洁明了:如果尝试以共享式获取同步状态失败,就调用doAcquireShared方法来获取。我们来看看这个doAcquireShared:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
果然和acquireQueued方法差不多,finally和try最后几行的if判断都是完全一致的,我们把重点放在不同的地方,不过像第一行这种吧 EXCLUSIVE
换成 SHARED
这种我们就不提了,我们直接来看中间的核心代码部分:
if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } 复制代码
实际上原理和acquireShared是基本一致的,都是判断前驱节点是否是队列的头结点,如果是则调用tryAcquireShared来尝试获取。其中不一样的地方就是setHeadAndPropagate方法,我们来看这个方法:
private void setHeadAndPropagate(Node node, int propagate) { // 保存之前的头结点 Node h = head; setHead(node); // propagate为tryAcquireShared的返回值,表示剩余的状态量 // 如果大于0,则可以唤醒多个节点,所以这个变量名叫做“传播” // 其余的一些条件都是一些允许唤醒多个后继节点的判断 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } 复制代码
从上面的代码结合注释可以看出来,共享式获取同步状态与独占式相比,可以唤醒多个等待的线程
release--独占式释放同步状态
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
该方法流程也很简单,先尝试释放同步状态(这一步如果失败会直接返回),然后在unparkSuccessor中调用unpark方法将h所在节点的后继节点唤醒
其他
最后一些注入共享式释放、允许中断的获取、带有超时的获取等方法我就不一一列出了,一通百通,其本质都是类似的
总结
想要理解AQS,首先一定要理解 同步状态 和 同步队列 这两个概念,同步状态标识着共享资源的许可量,同步队列标识着被阻塞的线程
理解了这两个概念之后,就需要明白tryAcquire、tryRelease等AQS提供的5个接口方法,如果我们想要基于AQS自定义同步组件,就需要重写这5个方法
最后,就需要理解acquire、release等AQS提供的模板方法,理解这些模板方法虽然不能直接为你的业务代码提供帮助,但是可以提高你对于整个并发架构的理解
最后,如果有对AQS的实现有兴趣的,除了阅读AQS源码外,推荐阅读Semaphore(AQS的共享式实现)和ReentrantLock(AQS的独占式实现)的源码
以上所述就是小编给大家介绍的《浅谈AQS(抽象队列同步器)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- AQS队列同步器
- AbstractQueuedSynchronizer 队列同步器(AQS)
- 源码级深挖 AQS 队列同步器
- Go 语言中的同步队列
- 一篇文章理清Python多线程之同步条件,信号量和队列
- rabbitmq实现延时队列(死信队列)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。