内容简介:AbstractQueuedSynchronizer(以下简称AQS)是Java中用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。很多文章已经基于论文和源码对实现进行了解读,本文试着从另外的角度入手:先不考虑AQS的实现,假设让我们自己实现锁,我们可以怎么做?最后再来看AQS的实现,才能更好地理解为什么要这么实现。我们可以形象地把锁理解成门票,只有当线程拿到了门票,才能进入临界区。因此我们可以用一个状态变量基于第一种思路实现的锁叫做自旋锁(SpinLock)。下面我们先看自选锁
AbstractQueuedSynchronizer(以下简称AQS)是 Java 中用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。很多文章已经基于论文和源码对实现进行了解读,本文试着从另外的角度入手:先不考虑AQS的实现,假设让我们自己实现锁,我们可以怎么做?最后再来看AQS的实现,才能更好地理解为什么要这么实现。
锁的实现思路
我们可以形象地把锁理解成门票,只有当线程拿到了门票,才能进入临界区。因此我们可以用一个状态变量 state
表示锁, state = true
表示可以获取到锁,反之就是表示锁已经被占用。那么当锁被占用时,应该怎么处理?这里有两种思路:
- 循环检测直到锁可用(也叫自旋)
- 让出处理器,等待通知
TAS锁
基于第一种思路实现的锁叫做自旋锁(SpinLock)。下面我们先看自选锁中最简单的实现,这个实现叫做 Test-And-Set-LOCK ,简称 TAS Lock 。
public class TASLock { AtomicBoolean state = new AtomicBoolean(false); public void lock() { while (!state.getAndSet(true)) {} // 循环检测直到状态可用 } public void unlock() { state.set(false); } } 复制代码
从实现上我们可以看出,获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,因此使用自旋锁会造成 busy-waiting
。
在对TAS锁提出优化思路前,先介绍一下 缓存一致性 。下面这张图描述的是每个处理器都有自己的缓存,但共享一个内存,缓存的内容来自内存。一旦处理器更新了自己的缓存,如果这个更新需要被其他处理器感知,就需要通过总线来通知。因此频繁更新会占用大量总线流量。
目前我们是用一个状态变量来标识锁的状态。TAS锁每次循环都会调用 getAndSet()
,这是一个更新指令,会导致其他线程的缓存都失效,从而都会去内存中获取值,因此占用总线流量资源。
TTAS锁
TAS锁
的问题在于每次循环都修改状态,实际上只有状态是可用的情况下,才有必要去修改。 TTAS
( Test-Test-And-Set
)改进就是在加锁前先检查状态变量是否为false,只有条件满足才去修改。
public class TTASLock { AtomicBoolean state = new AtomicBoolean(false); public void lock() { while (true) { while (state.get()) {} // 循环读取state状态 if (!state.getAndSet(true)) { // 只有当state为false,才会修改 return; } } } public void unlock() { state.set(false); } } 复制代码
但是当释放锁时,其他线程检测到 state
都是 false
,这时都会调用 state.getAndSet(true)
,又退化到 TAS
的情形。
指数退避
TTAS
的问题关键在于所有线程都同时去获取锁,因此引入延迟可以解决问题:当获取锁失败时,在重试前先睡眠一段时间,再次失败则延迟时间翻倍——指数退避。
public class BackoffLock { AtomicBoolean state = new AtomicBoolean(false); private int minDelay; private int maxDelay; public BackoffLock(int minDelay, int maxDelay) { this.minDelay = minDelay; this.maxDelay = maxDelay; } public void lock() throws InterruptedException { int delay = minDelay; while (true) { while (state.get()) {} if (!state.getAndSet(true)) { return; } Thread.sleep((int) (Math.random() * minDelay)); if (delay < maxDelay) { delay = 2 * delay; } } } public void unlock() { state.set(false); } } 复制代码
指数退避自旋的不足在于需要设置好延迟参数,很可能就在线程睡眠过程中,获取锁的线程刚好就释放了锁。
基于数组的队列锁
一开始因为我们都是基于一个状态变量来标识锁,才会导致频繁占用总线流量,那么如果每个线程都有一个状态,就可以大幅减少占用。
基于数组的队列锁 lock()
时从数组中按顺序找到一个可用的位置,用来代表当前线程。 unlock()
时通知下一个线程。
public class ArrayLock { private int n; private volatile boolean[] flags; private AtomicInteger next = new AtomicInteger(0); private ThreadLocal<Integer> slot = new ThreadLocal<>(); public ArrayLock(int n) { this.n = n; flags = new boolean[n]; flags[0] = true; } public void lock() { int index = next.getAndIncrement(); slot.set(index); while (!flags[index % n]) {} } public void unlock() { int index = slot.get(); flags[index % n] = false; // 为复用做好准备 flags[(index + 1) % n] = true; // 通知下一个线程 } } 复制代码
显然,基于数组的队列锁的不足之处就是锁的数量受限于数组长度。因此,可用考虑通过链表来改进。
CLH锁
CLH锁
内部就维护了一个隐式的链表。 CLH
是Craig, Landin, and Hagersten的缩写。
public class CLHSpinLock { private final ThreadLocal<QNode> node; private final ThreadLocal<QNode> prev; AtomicReference<QNode> tail = new AtomicReference<>(new QNode()); public CLHSpinLock() { node = new ThreadLocal<QNode>() { @Override protected QNode initialValue() { return new QNode(); } }; prev = new ThreadLocal<QNode>() { @Override protected QNode initialValue() { return null; } }; } public void lock() { QNode myNode = node.get(); myNode.locked = true; QNode pred = tail.getAndSet(myNode); prev.set(pred); // 在前继节点自旋 while (pred.locked) {}; } public void unlock() { QNode myNode = node.get(); myNode.locked = false; node.set(prev.get()); } class QNode { volatile boolean locked = false; } } 复制代码
由于CLH是在前继节点上自旋,在NUMA架构下,可能需要频繁访问远端内存,影响性能。那么能不能直接在本地节点自旋呢?
MCS锁
MCS锁
就是在本地节点自旋,把CLH的多次对远端内存的监听 + 一次对本地内存的更新,简化成了多次对本地内存的监听 + 一次对远端内存的更新。
public class MCSSpinLock { ThreadLocal<QNode> node = new ThreadLocal<QNode>() { @Override protected QNode initialValue() { return new QNode(); } }; AtomicReference<QNode> tail = new AtomicReference<>(null); public void lock() { QNode qNode = node.get(); QNode pred = tail.getAndSet(qNode); if (pred != null) { qNode.locked = true; pred.next = qNode; // QNode.next是volatile,保证了线程可见性 while (qNode.locked) {}; } } public void unlock() { QNode qNode = node.get(); if (qNode.next == null) { // 当前节点没有发现后继节点 if (tail.compareAndSet(qNode, null)) { // 确实没有后继节点 return; } while (qNode.next == null) {}; // 有后继节点,但是还没有关联上,需要等待 } qNode.next.locked = false; } class QNode { volatile boolean locked = false; volatile QNode next = null; } } 复制代码
参考文献
- Spin Locks and Contention
- Practice: Spin Locks and Contention
- 自旋锁学习系列(4):基于数组的队列锁
- building fifo and priority-queueing spin locks from atomic swap
- Ticket Lock, CLH Lock, MCS Lock
- NUMA架构的CPU -- 你真的用好了么?
以上所述就是小编给大家介绍的《循序渐进理解AQS(1):如何实现锁》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。