内容简介:AQS(AbstractQueuedSynchronizer)队列同步器,是JUC中非常重要的一个组件,基于它可以简单高效地构建一些通用的锁和同步器,如ReentrantLock、Semaphore等(本文学习内容基于JDK1.8),本文主要关注AQS的源码实现及基于AQS实现的一些常用的同步组件通过使用JUC中的同步组件,可以比较简洁地进行并发编程,而在很多同步组件的实现中都出现了AQS采用了模板方法设计模式,支持通过子类重写相应的方法实现不同的同步器。在AQS中,有一个state变量,表示同步状态(这里
AQS(AbstractQueuedSynchronizer)队列同步器,是JUC中非常重要的一个组件,基于它可以简单高效地构建一些通用的锁和同步器,如ReentrantLock、Semaphore等(本文学习内容基于JDK1.8),本文主要关注AQS的源码实现及基于AQS实现的一些常用的同步组件
基本内容
通过使用JUC中的同步组件,可以比较简洁地进行并发编程,而在很多同步组件的实现中都出现了 Sync extends AbstractQueuedSynchronizer
的身影,通过对AQS的一些方法的重写,实现了相应的组件的功能。AQS是实现锁的关键,其中锁是面向锁的使用者的,定义了锁的使用方式,而AQS是面向锁的实现者的,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
AQS采用了模板方法设计模式,支持通过子类重写相应的方法实现不同的同步器。在AQS中,有一个state变量,表示同步状态(这里的同步状态就可以看作是一种资源,对同步状态的获取可以看作是对同步资源的竞争),AQS提供了多种获取同步状态的方式,包括独占式获取、共享式获取以及超时获取等,下面会进行具体的介绍。
原理分析
下面将结合源码从模板方法、同步状态管理、CLH锁队列、独占式获取方式、共享式获取方式、超时获取方式等方面分析AQS的原理及实现
模板方法
可以通过子类重写的方法列表如下
方法名称 | 用途 |
---|---|
tryAcquire(int arg) | 主要用于实现独占式获取同步状态,实现该方法需要查询当前状态是否符合预期,然后进行相应的状态更新实现控制(获取成功返回true,否则返回false,成功通常是可以更新同步状态,失败则是不符合更新同步状态的条件),其中arg表示需要获取的同步状态数 |
tryRelease(int arg) | 主要用于实现独占式释放同步状态,同时更新同步状态(通常在同步状态state更新为0才会返回true,表示已经彻底释放同步资源),其中arg表示需要释放的同步状态数 |
tryAcquireShared(int arg) | 主要用于实现共享式获取同步状态,同时更新同步状态 |
tryReleaseShared(int arg) | 主要用于实现共享式释放同步状态,同时更新同步状态 |
isHeldExclusively() | 一般用于判断同步器是否被当前线程独占 |
同步状态管理
对线程进行加锁在AQS中体现为对同步状态的操作,通过的同步状态地管理,可以实现不同的同步任务,同步状态 state
是AQS很关键的一个域
// 因为state是volatile的,所以get、set方法均为原子操作,而compareAndSetState方法 // 使用了Unsafe类的CAS操作,所以也是原子的 // 同步状态 private volatile int state; // 同步状态的操作包括 // 获取同步状态 protected final int getState() { return state;} // 设置同步状态 protected final void setState(int newState) { state = newState;} // CAS操作更新同步状态 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } 复制代码
CLH锁队列
CLH(Craig, Landin, and Hagersten)锁,是自旋锁的一种。AQS中使用了CLH锁的一个变种,实现了一个双向队列,并使用其实现阻塞的功能,通过将请求共享资源的线程封装为队列中的一个结点实现锁的分配。
双向队列的头结点记录工作状态下的线程,后继结点若获取不了同步状态则会进入阻塞状态,新的结点会从队尾加入队列,竞争同步状态
// 队列的数据结构如下 // 结点的数据结构 static final class Node { // 表示该节点等待模式为共享式,通常记录于nextWaiter, // 通过判断nextWaiter的值可以判断当前结点是否处于共享模式 static final Node SHARED = new Node(); // 表示节点处于独占式模式,与SHARED相对 static final Node EXCLUSIVE = null; // waitStatus的不同状态,具体内容见下文的表格 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; // 记录前置结点 volatile Node prev; // 记录后置结点 volatile Node next; // 记录当前的线程 volatile Thread thread; // 用于记录共享模式(SHARED), 也可以用来记录CONDITION队列(见扩展分析) Node nextWaiter; // 通过nextWaiter的记录值判断当前结点的模式是否为共享模式 final boolean isShared() { return nextWaiter == SHARED;} // 获取当前结点的前置结点 final Node predecessor() throws NullPointerException { ... } // 用于初始化时创建head结点或者创建SHARED结点 Node() {} // 在addWaiter方法中使用,用于创建一个新的结点 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // 在CONDITION队列中使用该构造函数新建结点 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } } // 记录头结点 private transient volatile Node head; // 记录尾结点 private transient volatile Node tail; 复制代码
Node状态表(waitStatus,初始化时默认为0)
状态名称 | 状态值 | 状态描述 |
---|---|---|
CANCELLED | 1 | 说明当前结点(即相应的线程)是因为超时或者中断取消的,进入该状态后将无法恢复 |
SIGNAL | -1 | 说明当前结点的后继结点是(或者将要)由park导致阻塞的,当结点被释放或者取消时,需要通过unpark唤醒后继结点(表现为unparkSuccessor()方法) |
CONDITION | -2 | 该状态是用于condition队列结点的,表明结点在等待队列中,结点线程等待在Condition上,当其他线程对Condition调用了signal()方法时,会将其加入到同步队列中去,关于这一部分的内容会在扩展中提及。 |
PROPAGATE | -3 | 说明下一次共享式同步状态的获取将会无条件地向后继结点传播 |
下图展示该队列的基本结构
独占式获取方式
独占式(EXCLUSIVE)获取需重写 tryAcquire
、 tryRelease
方法,并访问 acquire
、 release
方法实现相应的功能。
acquire的流程图如下:
上述流程图比较复杂,这里简单概述一下其中的过程
- 线程尝试获取同步状态,如果成功获取则继续执行,如果获取失败则加入到同步队列自旋
- 自旋过程中若立即获取到同步状态(前置结点为head并且尝试获取同步状态成功)则可以直接执行
- 若无法立即获取到同步状态则会将前置结点置为SIGNAL状态同时自身通过park()方法进入阻塞状态,等待unpark()方法唤醒
- 若线程被unpark()方法(此时说明前置结点在执行release操作)唤醒后,前置结点是头结点并且被唤醒的线程获取到了同步状态,则恢复工作
主要代码如下:
// 这里不去看tryAcquire、tryRelease方法的具体实现,只知道它们的作用分别为尝试获取同步状态、 // 尝试释放同步状态 public final void acquire(int arg) { // 如果线程直接获取成功,或者再尝试获取成功后都是直接工作, // 如果是从阻塞状态中唤醒开始工作的线程,将当前的线程中断 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 包装线程,新建结点并加入到同步队列中 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 尝试入队, 成功返回 if (pred != null) { node.prev = pred; // CAS操作设置队尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 通过CAS操作自旋完成node入队操作 enq(node); return node; } // 在同步队列中等待获取同步状态 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋 for (;;) { final Node p = node.predecessor(); // 检查是否符合开始工作的条件 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } // 获取不到同步状态,将前置结点标为SIGNAL状态并且通过park操作将node包装的线程阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 如果获取失败,将node标记为CANCELLED if (failed) cancelAcquire(node); } } 复制代码
release流程图如下
release的过程比较简单,主要就是通过tryRelease更新同步状态,然后如果需要,唤醒后置结点中被阻塞的线程
主要代码如下
// release public final boolean release(int arg) { // 首先尝试释放并更新同步状态 if (tryRelease(arg)) { Node h = head; // 检查是否需要唤醒后置结点 if (h != null && h.waitStatus != 0) // 唤醒后置结点 unparkSuccessor(h); return true; } return false; } // 唤醒后置结点 private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 通过CAS操作将waitStatus更新为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 检查后置结点,若为空或者状态为CANCELLED,找到后置非CANCELLED结点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 唤醒后置结点 if (s != null) LockSupport.unpark(s.thread); } 复制代码
共享式获取方式
共享式(SHARED)获取需重写 tryAcquireShared
、 tryReleaseShared
方法,并访问 acquireShared
、 releaseShared
方法实现相应的功能。与独占式相对,共享式支持多个线程同时获取到同步状态并进行工作
acquireShared
acquireShared过程和acquire非常相似,流程大致相同,下面简单概括一下
- 线程获取同步状态,若能获取到,则直接执行,如获取不到,新建共享式结点进入同步队列
- 由于获取不到同步状态,线程将被park方法阻塞,等待被唤醒
- 被唤醒后,满足获取同步状态的条件,会向后传播,唤醒后继结点
// public final void acquireShared(int arg) { // 尝试共享式获取同步状态,如果成功获取则可以继续执行,否则执行doAcquireShared if (tryAcquireShared(arg) < 0) // 以共享式不停得尝试获取同步状态 doAcquireShared(arg); } // Acquires in shared uninterruptible mode. private void doAcquireShared(int arg) { // 向同步队列中新增一个共享式的结点 final Node node = addWaiter(Node.SHARED); // 标记获取失败状态 boolean failed = true; try { // 标记中断状态(若在该过程中被中断是不会响应的,需要手动中断) boolean interrupted = false; // 自旋 for (;;) { // 获取前置结点 final Node p = node.predecessor(); // 若前置结点为头结点 if (p == head) { // 尝试获取同步状态 int r = tryAcquireShared(arg); // 若获取到同步状态。 if (r >= 0) { // 此时,当前结点存储的线程恢复执行,需要将当前结点设置为头结点并且向后传播, // 通知符合唤醒条件的结点一起恢复执行 setHeadAndPropagate(node, r); p.next = null; // help GC // 需要中断,中断当前线程 if (interrupted) selfInterrupt(); // 获取成功 failed = false; return; } } // 获取同步状态失败,需要进入阻塞状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 获取失败,CANCELL node if (failed) cancelAcquire(node); } } // 将node设置为同步队列的头结点,并且向后通知当前结点的后置结点,完成传播 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); // 向后传播 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if(s == null || s.isShared()) doReleaseShared(); } } 复制代码
releasShared
releaseShared在尝试释放同步状态成功后,会唤醒后置结点,并且保证传播性
public final boolean releaseShared(int arg) { // 尝试释放同步状态 if (tryReleaseShared(arg)) { // 成功后唤醒后置结点 doReleaseShared(); return true; } return false; } // 唤醒后置结点 private void doReleaseShared() { // 循环的目的是为了防止新结点在该过程中进入同步队列产生的影响,同时要保证CAS操作的完成 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } } 复制代码
超时获取方式
超时获取使通过AQS实现的锁支持超时获取锁,这是synchronized关键字所不具备的,关于其具体的实现,和上述实现方式相似,只是在独占式、共享式获取的基础上增加了时间的约束,同时通过parkNanos()方法为阻塞定时,这里不再过多展开。
实例分析
下面列举几个常用的并发组件
ReetrantLock
ReentrantLock,重入锁。通过AQS独占式实现加锁、解锁操作,支持同一线程重复获取锁。主要操作为lock,unlock,其实现分别依赖acquire和release
private final Sync sync; // 继承AQS,重写相应方法 abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); final boolean nonfairTryAcquire(int acquires) { ... } protected final boolean tryRelease(int releases) { ... } // ...略 } static final class NonfairSync extends Sync { final void lock() { ... } protected final boolean tryAcquire(int acquires) { ... } } static final class FairSync extends Sync { final void lock() { ... } protected final boolean tryAcquire(int acquires) { ... } } 复制代码
相关总结
- 重入锁支持公平获取、非公平(默认)获取两种方式,通过构造函数fair来决定用NonfairSync还是用FairSync完成sync的实例化,两种方式的区别在于公平式要求锁的获取顺序应该符合申请的时间顺序,即严格按照同步队列FIFO,而非公平式则不考虑(公平式通过对当前结点的前置结点进行判断来保证公平性)
- 重入锁的加锁逻辑是,若锁尚未被获取(state = 0),说明可以直接获取到锁并且更新同步状态(此时需要CAS更新保证原子性),若锁已经被获取,判断获取锁的线程是否为当前线程,若是,则更新同步状态(state + acquires,此时直接更新即可,因为只有该线程可以访问该段代码),说明同样可以获取到锁。否则,当前获取不到锁,线程会被阻塞
- 重入锁的解锁逻辑是,更新同步状态(state - releases),若state为0,说明该线程完全释放锁,返回true,否则返回false
CountDownLatch
CountDownLatch,一种同步工具,可以使一个或多个线程一直等待,直到指定数量线程全部执行完毕后才再执行。通过AQS共享式实现。主要操作为await(让当前线程等待,调用了AQS的acquireSharedInterruptibly方法,可以简单将其当作acquireShared方法,实现基本相同),countDown(执行同步状态释放的操作),其实大体的思路就是,初始化CountDownLatch一定的同步状态数,执行await操作的线程需等待同步状态数完全释放(为0)时才可以执行,而需要先完成的任务在完成后都通过countDown释放一定的同步状态数
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // 检查同步状态数是否已经为0,不为0则同步状态获取失败 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 释放一定的同步状态数 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; 复制代码
ReentrantReadWriteLock
ReentrantReadWriteLock,可重入的读写锁,同时使用了AQS的独占式和共享式,当进行写操作时,锁由写线程独占,其他写线程和读线程阻塞。当进行读操作时,写线程阻塞,所有读线程可以共享锁。读写锁的实现相对复杂,这里不再贴过多的代码,简单概括一下其实现的方式:
- 读写锁内部是通过一个读锁、一个写锁实现的。而读锁和写锁共享一个同步状态state,那么读写状态的就由同步状态决定。读写锁采取按位分割的方法实现一个同步状态表示两种不同类型(读和写)的状态的。读写锁将变量分为两部分,高16位表示读,低16位表示写。那么写状态的值就为state&0x0000ffff,对其修改操作可以直接对state进行。读状态的值为state>>16,对其修改操作(如加操作)为(state+0x00010000)
- 写锁的获取逻辑,如果当前线程已经获取了写锁,则增加写状态;如果读锁已经被获取或者获取写锁的线程不为当前线程,则当前线程进入同步队列中等待。如果还没有锁获取线程,则直接获取。
- 写锁的释放逻辑,减少写状态,直至写状态为0表示写锁完全被释放
- 读锁的获取逻辑,写锁未被获取时,读锁总可以被获取。若当前线程已经获取了读锁,则增加读状态(为各读线程的读状态之和,各线程的读状态记录在ThreadLocal中)。若写锁已经被获取,则无法获取读锁。
- 读锁的释放逻辑,每次释放都会减少读状态
- ReentrantReadWriteLock支持锁降级,指的是获取写锁后,先获取读锁然后再释放写锁,完成写锁到读锁的降级。这样可以保证数据可见性,防止写锁直接释放后,其他线程获取了写锁,则当前线程可能无法获取写锁线程的修改。但是读写锁不支持锁升级,原因相同。
扩展
Condition
在synchronized加锁的时候,可以通过Object类的方法的wait()、notify()方法实现等待通知,那么在Lock锁的过程中,也存在类似的操作,即Condition接口,该接口提供了await()、signal()方法,具有相同的功能。
在AQS中,有一个类ConditionObject,实现了Condition接口。它同样使用了Node的数据结构,构成了一个队列(FIFO),与同步队列区别,可以叫它等待队列。获取Condition需要通过Lock接口的newCondition方法,这意味着一个Lock可以有多个等待队列,而Object监视器模型提供的一个对象仅有一个等待队列
// Condition的数据结构 static final class Node { // next 指针 Node nextWaiter; // ... } public class ConditionObject implements Condition, java.io.Serializable { // head private transient Node firstWaiter; // tail private transient Node lastWaiter; // ... } 复制代码
下面具体来看await和signal操作
await()
// 涉及中断的操作,暂时忽略 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 向等待队列的队尾新建一个CONDITION结点 Node node = addConditionWaiter(); // 因为要进入等待状态,所以需要释放同步状态(即释放锁),如果失败,该结点会被CANCELLED int savedState = fullyRelease(node); int interruptMode = 0; // 判读该结点是否在同步队列上,如果不在就通过park操作将其阻塞,进入等待状态 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 从等待状态恢复,进入同步队列竞争同步状态 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // 向等待队列的队尾新建一个CONDITION结点 private Node addConditionWaiter() { Node t = lastWaiter; // 如果最后一个结点的waitStatus并非CONDITION,说明该结点被CANCELLED了,需要 // 从队列中清除掉 if (t != null && t.waitStatus != Node.CONDITION) { // 将CANCELLED结点从等待队列中清除出去 unlinkCancelledWaiters(); t = lastWaiter; } // 新建CONDITION结点并且将其加入队尾 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } 复制代码
关于await的操作
- 执行await操作后,线程会被包装成CONDITION结点进入等待队列
- 通过park使线程阻塞
- 被唤醒后,线程从等待队列进入同步队列竞争同步状态
signal()
// public final void signal() { // 校验 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 唤醒等待队列的头结点 if (first != null) doSignal(first); } // 执行唤醒操作 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 唤醒结点并且将其加入同步队列 } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 将唤醒的结点加入到同步队列中竞争同步状态,恢复执行 final boolean transferForSignal(Node node) { // 将node的状态从CONDITION恢复到默认状态,该CAS操作由外层doSignal的循环保证成功操作 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 将node加入到同步队列中 Node p = enq(node); int ws = p.waitStatus; // 如果前置结点已经被取消或者将前置结点设置为SIGNAL失败,就通过unpark唤醒node包装的线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } 复制代码
关于signal的操作
将等待队列的头结点唤醒,从等待队列中移除,并将其加入到同步队列中竞争同步状态,恢复执行
还有一些操作,如signalAll()则是将等待队列中的全部结点从等待队列中移除并加入到同步队列中竞争同步状态
StampedLock
StampedLock是 Java 8中新增的一个锁,是对读写锁的改进。读写锁虽然分离了读与写的功能,但是它在处理读与写的并发上,采取的是一种悲观的策略,这就导致了,当读取的情况很多而写入的情况很少时,写入线程可能迟迟无法竞争到锁并被阻塞,遭遇饥饿问题。
StampedLock提供了3种控制锁的模式,写、读、乐观读。在加锁时可以获取一个stamp作为校验的凭证,在释放锁的时候需要校验这个凭证,如果凭证失效的话(比如在读的过程中,写线程产生了修改),就需要重新获取凭证,并且重新获取数据。这很适合在写入操作较少,读取操作较多的情景,可以乐观地认为写入操作不会发生在读取数据的过程中,而是在读取线程解锁前进行凭证的校验,在必要的情况下,切换成悲观读锁,完成数据的获取。这样可以大幅度提高程序的吞吐量。
StampedLock在实现上没有借助AQS,但是其很多设计的思想、方法都是参照AQS并进行了一些修改完成的。在StampedLock内部同样维护了一个CLH队列完成相关的功能。
与ReentrantReadWriteLock相比,StamptedLock的API调用相对复杂一些,所以在很多时候还是会用ReentrantReadWriteLock。
更多的关于StampedLock的内容后续再补充。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
构建可扩展的Web站点
Cal Henderson / 徐宁 / 电子工业出版社 / 2008 / 58.00元
随着Web 2.0网站的蓬勃发展,如何成功地构建可扩展的Web站点成为网站开发人员必备的技能。本书是Flickr.com的主力开发人员讲解构建可扩展的Web站点的经典之作。本书主要介绍了Web应用程序的概念、体系结构、硬件需求、开发环境的原则及国际化、本地化和Unicode等基本内容,并为解决Web应用程序的数据安全、电子邮件整合、远程服务交互、应用程序优化、扩展、监测和预警、开放API等问题提供......一起来看看 《构建可扩展的Web站点》 这本书的介绍吧!