内容简介:队列同步器 (AQS), 是用来构建锁或其他同步组件的基础框架,它通过使用 int 变量表示同步状态,通过内置的 FIFO 的队列完成资源获取的排队工作。(摘自《Java并发编程的艺术》)我们知道获取同步状态有独占和共享两种模式,本文先针对独占模式进行分析。head 同步队列头节点
队列同步器 (AQS), 是用来构建锁或其他同步组件的基础框架,它通过使用 int 变量表示同步状态,通过内置的 FIFO 的队列完成资源获取的排队工作。(摘自《Java并发编程的艺术》)
我们知道获取同步状态有独占和共享两种模式,本文先针对独占模式进行分析。
变量定义
private transient volatile Node head; 复制代码
head 同步队列头节点
private transient volatile Node tail; 复制代码
tail 同步队列尾节点
private volatile int state; 复制代码
state 同步状态值
Node - 同步队列节点定义
volatile int waitStatus; 复制代码
waitStatus 节点的等待状态,可取值如下 :
- 0 : 初始状态
- -1 : SIGNAL 处于该状态的节点,说明其后置节点处于等待状态; 若当前节点释放了锁可唤醒后置节点
- -2 : CONDITION 该状态与 Condition 操作有关后续在说明
- -3 : PROPAGATE 该状态与共享式获取同步状态操作有关后续在说明
- 1 : CANCELLED 处于该状态的节点会取消等待,从队列中移除
volatile Node prev; 复制代码
prev 指向当前节点的前置节点
volatile Node next; 复制代码
next 指向当前节点的后置节点
volatile Thread thread; 复制代码
thread 节点对应的线程也是指当前获取锁失败的线程
Node nextWaiter; 复制代码
acquire()
独占模式下获取同步状态, 既是当前只允许一个线程获取到同步状态
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
从 acquire 方法中我们可以大概猜测下,获取锁的过程如下:
- tryAcquire 尝试获取同步状态, 具体如何判定获取到同步状态由子类实现
- 当获取同步状态失败时,执行 addWaiter 创建独占模式下的 Node 并将其添加到同步队列尾部
- 加入同步队列之后,再次尝试获取同步状态,当达到某种条件的时候将当前线程挂起等待唤醒
下面具体看下各个阶段如何实现:
private Node addWaiter(Node mode) { // 绑定当前线程 创建 Node 节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 判断同步队列尾节点是否为空 if (pred != null) { // node 的前置节点指向队列尾部 node.prev = pred; // 将同步队列的 tail 移动指向 node if (compareAndSetTail(pred, node)) { // 将原同步队列的尾部后置节点指向 node pred.next = node; return node; } } // tail 为空说明同步队列还未初始化 // 此时调用 enq 完成队列的初始化及 node 入队 enq(node); return node; } 复制代码
private Node enq(final Node node) { // 轮询的方式执行 // 成功入队后退出 for (;;) { Node t = tail; if (t == null) { // Must initialize // 创建 Node, 并将 head 指向该节点 // 同时将 tail 指向该节点 // 完成队列的初始化 if (compareAndSetHead(new Node())) tail = head; } else { // node 的前置节点指向队列尾部 node.prev = t; // 将同步队列的 tail 移动指向 node if (compareAndSetTail(t, node)) { // 将原同步队列的尾部后置节点指向 node t.next = node; return t; } } } } 复制代码
从代码中可以看出通过 CAS 操作保证节点入队的有序安全,其入队过程中如下图所示:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // for (;;) { // 获取当前节点的前置节点 final Node p = node.predecessor(); // 判断前置节点是否为 head 头节点 // 若前置节点为 head 节点,则再次尝试获取同步状态 if (p == head && tryAcquire(arg)) { // 若获取同步状态成功 // 则将队列的 head 移动指向当前节点 setHead(node); // 将原头部节点的 next 指向为空,便于对象回收 p.next = null; // help GC failed = false; // 退出轮询过程 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 若前置节点状态为 -1 ,则说明后置节点 node 可以安全挂起了 return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { // ws > 0 说明前置节点状态为 CANCELLED , 也就是说前置节点为无效节点 // 此时从前置节点开始向队列头节点方向寻找有效的前置节点 // 此操作也即是将 CANCELLED 节点从队列中移除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 若前置节点状态为初始状态 则将其状态设为 -1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
private final boolean parkAndCheckInterrupt() { // 将当前线程挂起 LockSupport.park(this); // 被唤醒后检查当前线程是否被挂起 return Thread.interrupted(); } 复制代码
从 acquireQueued 的实现可以看出,节点在入队后会采用轮询的方式(自旋)重复执行以下过程:
- 判断前置节点是否为 head, 若为 head 节点则尝试获取同步状态; 若获取同步状态成功则移动 head 指向当前节点并退出循环
- 若前置节点非 head 节点或者获取同步状态失败,则将前置节点状态修改为 -1, 并挂起当前线程,等待被唤醒重复执行以上过程
如下图所示:
接下来我们看看同步状态释放的实现。
release
释放同步状态
public final boolean release(int arg) { // 尝试释放同步状态 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒后置节点 unparkSuccessor(h); return true; } return false; } 复制代码
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) // 将 head 节点状态改为 0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取后置节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒后置节点上所阻塞的线程 LockSupport.unpark(s.thread); } 复制代码
从上述代码,我们可以明白释放同步状态的过程如下:
- 调用 tryRelease 尝试释放同步状态,同样其具体的实现由子类控制
- 成功释放同步状态后,将 head 节点状态改为 0
- 唤醒后置节点上阻塞的线程
如下图所示(红色曲线表示节点自旋过程) :
acquireInterruptibly()
独占模式下获取同步状态,不同于 acquire 方法,该方法对中断操作敏感; 也就是说当前线程在获取同步状态的过程中,若被中断则会抛出中断异常
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 检查线程是否被中断 // 中断则抛出中断异常由调用方处理 throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } 复制代码
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 不同于 acquire 的操作,此处在唤醒后检查是否中断,若被中断直接抛出中断异常 throw new InterruptedException(); } } finally { if (failed) // 抛出中断异常后最终执行 cancelAcquire cancelAcquire(node); } } 复制代码
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. // 若当前节点为 tail 节点,则将 tail 移动指向 node 的前置节点 if (node == tail && compareAndSetTail(node, pred)) { // 同时将node 前置节点的 next 指向 null compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 当前节点位于队列中部 Node next = node.next; if (next != null && next.waitStatus <= 0) // 将前置节点的 next 指向 node 的后置节点 compareAndSetNext(pred, predNext, next); } else { // 若 node 的前置节点为 head 节点则唤醒 node 节点的后置节点 unparkSuccessor(node); } node.next = node; // help GC } } 复制代码
从 acquireInterruptibly 的实现可以看出,若线程在获取同步状态的过程中出现中断操作,则会将当前线程对应的同步队列等待节点从队列中移除并唤醒可获取同步状态的线程。
tryAcquireNanos()
独占模式超时获取同步状态,该操作与acquireInterruptibly一样对中断操作敏感,不同在于超过等待时间若未获取到同步状态将会返回
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } 复制代码
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 计算等待到期时间 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) // 超时时间到期直接返回 return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 按指定时间挂起s LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 复制代码
节点的状态
同步队列中的节点在自旋获取同步状态的过程中,会将前置节点的状态由 0 初始状态改为 -1 (SIGNAL), 若是中断敏感的操作则会将状态由 0 改为 1 (CANCELLED)
同步队列中的节点在释放同步状态的过程中会将同步队列的 head 节点的状态改为 0, 也即是由 -1(SIGNAL) 变为 0;
以上所述就是小编给大家介绍的《AbstractQueuedSynchronizer 队列同步器(AQS)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- AQS队列同步器
- 浅谈AQS(抽象队列同步器)
- 源码级深挖 AQS 队列同步器
- Go 语言中的同步队列
- 一篇文章理清Python多线程之同步条件,信号量和队列
- rabbitmq实现延时队列(死信队列)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。