内容简介:AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时运用了 CLH 同步队列作同步器,这也是 ReentrantLock、CountDownLatch 等同步工具实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。说是框架,其实就是一个普通的类,它是 Doug Lea 大神的杰作,下面让我们一起学习 Doug Lea 大神是如何用一个普通类就玩转 C
AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时运用了 CLH 同步队列作同步器,这也是 ReentrantLock、CountDownLatch 等同步 工具 实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。
说是框架,其实就是一个普通的类,它是 Doug Lea 大神的杰作,下面让我们一起学习 Doug Lea 大神是如何用一个普通类就玩转 CAS 解决并发安全问题。
AQS 简介
之前我有介绍过 CAS 原理,AQS 就是建立在它的基础之上,增加了大量的实现细节,例如获取同步状态、FIFO同步队列,独占式锁和共享式锁的获取和释放等等,这些都是 AQS 类提供出来的一些方法。
现在我们来直接定位到类 java.util.concurrent.locks.AbstractQueuedSynchronizer
,看到类里面有很多注释,我主要将 AQS 类的几个重要字段与方法列出来:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private transient volatile Node head; private transient volatile Node tail; private volatile int state; protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // ... }
- head 字段为等待队列的头节点;
- tail 字段为等待队列的尾节点;
- state 字段为同步状态,其中 state > 0 为有锁状态,每次加锁就在原有state基础上加1,即代表当前持有锁的线程加了 state 次锁,反之解锁时每次减一,当 statte = 0 为无锁状态;
- 通过 compareAndSetState 方法操作 CAS 更改 state 状态,保证 state 的原子性。
有没有发现,这几个字段都用 volatile 关键字进行修饰,以确保多线程间保证字段的可见性。
同步队列
在分析锁的源码之前,我们先来看看 FIFO 队列的结构:
用大神的注释来形象地描述一下队列的模型:
/** * <pre> * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * </pre> */
这是一个普通双向链表的节点结构,多了 thread 字段用于存储当前线程对象,同时每个节点都有一个 waitStatus 等待状态,一共有四种状态:
- CANCELLED(1):取消状态,如果当前线程的前置节点状态为 CANCELLED,则表明前置节点已经等待超时或者已经被中断了,这时需要将其从等待队列中删除。
- SIGNAL(-1):等待触发状态,如果当前线程的前置节点状态为 SIGNAL,则表明当前线程需要阻塞。
- CONDITION(-2):等待条件状态,表示当前节点在等待condition,即在condition队列中。
- PROPAGATE(-3):状态需要向后传播,表示 releaseShared 需要被传播给后续节点,仅在共享锁模式下使用。
以上解释说明参考官方文档。
可以这么理解:head 节点可以表示成当前持有锁的线程的节点,其余线程竞争锁失败后,会加入到队尾,tail始终指向队列的最后一个节点。
锁
AQS 提供独占锁和共享锁两种模式,独占锁指的是如果有线程获取到锁,那么其它线程只能是获取锁失败,然后进入等待队列中等待被唤醒,而共享锁当有一个线程获取到锁之后,那么它就会依次唤醒等待队列中可以跟它共享的节点,当然这些节点也是共享锁类型。
独占式锁
获取锁
获取独占锁方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
解读:
tryAcquire(arg) addWaiter(Node.EXCLUSIVE)
基于上面源码的步骤分析后,我们继续往下看源码具体实现:
private Node addWaiter(Node mode) { // 创建一个基于当前线程的节点,该节点是 Node.EXCLUSIVE 独占式类型 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 这里先判断队尾是否为空,如果不为空则直接将节点加入队尾 if (pred != null) { node.prev = pred; // 采取 CAS 操作,将当前节点设置为队尾节点,由于采用了 CAS 原子操作,无论并发怎么修改,都有且只有一条线程可以修改成功,其余都将执行后面的enq方法 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
简单来说 addWaiter(Node mode)
方法做了以下事情:
- 创建基于当前线程的独占式类型的节点;
- 将节点加入队尾。
我们继续看 enq(Node node)
方法:
private Node enq(final Node node) { // 自旋操作 for (;;) { Node t = tail; // 如果队尾节点为空,那么进行CAS操作初始化队列 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // 这一步也是采取CAS操作,将当前节点加入队尾,如果失败的话,自旋继续修改直到成功为止 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
从源码可看出, enq(final Node node)
方法主要做了以下事情:
- 采用自旋机制,这是 aqs 里面很重要的一个机制;
- 如果队尾节点为空,则初始化队列,将当前节点设置为头节点;
- 如果队尾节点不为空,则继续采取CAS操作,将当前节点加入队尾,不成功则继续自旋,知道成功为止;
对比了上面两段代码,不难看出,首先是判断队尾是否为空,先进行一次CAS入队操作,如果失败则进入enq(final Node node)方法执行完整的入队操作。
经过上面CAS不断尝试,这时当前节点已经成功加入到队尾了,接下来就到了挂起当前线程的操作了,我们继续往下撕扯源码:
在分析 acquireQueued(final Node node, int arg)
方法之前,我们需要明确一点,我们上面也说过,head 节点代表当前持有锁的线程,那么如果当前节点的 pred 节点是 head 节点,很可能此时 head 节点已经释放锁了,所以此时需要再次尝试获取锁。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 线程中断标记字段 boolean interrupted = false; for (;;) { // 获取当前节点的 pred 节点 final Node p = node.predecessor(); // 如果 pred 节点为 head 节点,那么再次尝试获取锁 if (p == head && tryAcquire(arg)) { // 获取锁之后,那么当前节点也就成为了 head 节点 setHead(node); p.next = null; // help GC failed = false; // 不需要挂起,返回 false return interrupted; } // 获取锁失败,则进入挂起逻辑 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
这一步 acquireQueued(final Node node, int arg)
方法主要做了以下事情:
- 判断当前节点的 pred 节点是否为 head 节点,如果是,则尝试获取锁;
- 获取锁失败后,进入挂起逻辑。
接下来继续看挂起逻辑源码:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 如果 pred 节点为 SIGNAL 状态,返回true,说明当前节点需要挂起 return true; // 如果ws > 0,说明节点状态为CANCELLED,需要从队列中删除 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果是其它状态,则操作CAS统一改成SIGNAL状态 // 由于这里waitStatus的值只能是0或者PROPAGATE,所以我们将节点设置为SIGNAL,从新循环一次判断 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
这一步 shouldParkAfterFailedAcquire(Node pred, Node node)
方法主要做了以下事情:
- 判断 pred 节点状态,如果为 SIGNAL 状态,则直接返回 true 执行挂起;
- 删除状态为 CANCELLED 的节点。
通俗来说就是:根据 pred 节点状态来判断当前节点是否可以挂起,如果该方法返回 false,那么挂起条件还没准备好,就会重新进入 acquireQueued(final Node node, int arg)
的自旋体,重新进行判断。如果返回 true,那就说明当前线程可以进行挂起操作了,那么就会继续执行挂起,所以我们继续往下看源码:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
这步骤就是执行中断最核心的操作了,这里调用了 LockSupport.park(this)
方法,查看官方解析该类的作用是:
LockSupport是用来创建锁和其他同步类的基本 线程阻塞 原语。LockSupport 提供park()和unpark()方法实现阻塞线程和解除线程阻塞,LockSupport和每个使用它的线程都与一个许可(permit)关联。permit相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就是将1变成0,同时park立即返回。再次调用park会变成block(因为permit为0了,会阻塞在这里,直到permit变为1), 这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。
(下面的会抽时间补上去)
释放锁
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
共享式锁
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 并发编程(十)—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析
- 并发编程(九)—— Java 并发队列 BlockingQueue 实现之 LinkedBlockingQueue 源码分析
- 并发编程(八)—— Java 并发队列 BlockingQueue 实现之 ArrayBlockingQueue 源码分析
- java 并发编程-AQS源码分析
- Java 并发编程 -- 线程池源码实战
- Java并发之AQS源码分析(一)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。