【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 原 荐

栏目: Java · 发布时间: 5年前

内容简介:借用一下源码中的说法,

一、了解 AbstractQueuedSynchronizer(AQS)

1、AQS 简介

AbstractQueuedSynchronizer 是大师 Doug Lea 编写的一个并发编程类,位于 java.util.concurrent.locks,是 CountdownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor 中重要的组成部分,他们中关于 “锁” 的部分与 AQS 息息相关。

借用一下源码中的说法, AbstractQueuedSynchronizer 基于一个 FIFO 队列 提供了一套阻塞锁和同步相关的实现。该类被设计成为很多同步容器 synchronizers 的底层实现,它使用了一个原子int private volatile int state; 来表示当前状态。当在 AQSacquired (获取资源) 或被 release (释放资源)时,需要依据这个 state 来进行判断。所以子类需要定义方法来修改这个状态,该状态的含义由我们自由定制。(翻译的不好...)

【并发编程】 图文深入解析 <a href='https://www.codercto.com/topics/22013.html'>Java</a> 显式锁底层源码 —— 加解锁是如何实现的 原 荐

2、实现最简单的 AQS

我们来看一个最简单的例子,我们有一个类 Sync 继承了 AbstractQueuedSynchronizer ,并重写了其 tryAcquiretryRelease 方法。实现非常简单,我们通过调用父类的 compareAndSetState() 以及 setState() 来完成, 简单来说(不是特别准确) ,就是 tryAcquire 返回 true ,代表获取锁成功,否则就会阻塞。而 tryRelease 则负责锁的释放。

在例子中:将 state 设置为 100 代表当前状态为无锁, 1 则代表已经有某个线程获取了该锁。当然这个 state 表达的含义是怎么样的,完全是我们定义的,实际上锁定或者无锁是 100 还是 200 还是 -100 ,都没有什么关系。

/**
 * Created by Anur IjuoKaruKas on 2019/5/7
 */
public class Mutex extends AbstractQueuedSynchronizer {

    public static class Sync extends AbstractQueuedSynchronizer {

        public Sync() {
            setState(100); // set the initial state, being unlocked.
        }

        @Override
        protected boolean tryAcquire(int ignore) {
            boolean result = compareAndSetState(100, 1);
            print("尝试获取锁" + (result ? "成功" : "失败"));
            return result;
        }

        @Override
        protected boolean tryRelease(int ignore) {
            setState(100);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(0);
    }

    public void unLock() {
        sync.release(0);
    }

    public static void main(String[] args) throws InterruptedException {
        Mutex mutex = new Mutex();
        mutex.lock();

        Thread thread = new Thread(() -> {
            print("调用 mutex.lock() 之前");
            mutex.lock();
            print("调用 mutex.lock() 之后");
        });

        thread.start();

        print("main 线程 Sleep 之前");
        Thread.sleep(5000);
        print("main 线程 Sleep 之后");
        mutex.unLock();
    }

    public static void print(String print) {
        System.out.println(String.format("时间 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print));
    }
}

========================================= 输出
时间 - Fri May 24 15:44:19 CST 2019		Thread[main,5,main]		尝试获取锁成功
时间 - Fri May 24 15:44:19 CST 2019		Thread[main,5,main]		main 线程 Sleep 之前
时间 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		调用 mutex.lock() 之前
时间 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 15:44:24 CST 2019		Thread[main,5,main]		main 线程 Sleep 之后
时间 - Fri May 24 15:44:24 CST 2019		Thread[Thread-0,5,main]		尝试获取锁成功
时间 - Fri May 24 15:44:24 CST 2019		Thread[Thread-0,5,main]		调用 mutex.lock() 之后

我们可以看到,代码符合我们的预期:在 main 函数所在线程调用 mutex.unLock(); 释放锁之前,子线程是一直阻塞的, 调用 mutex.lock() 之后 的日志输出发生在 main 线程 Sleep 之后 之后。

通过重写 tryAcquiretryRelease 方法,以及调用 acquirerelease 方法,我们很容易就实现了一个锁,当然这个锁有一堆问题... 我们只是通过这个小例子,来建立对 AQS 一个简单的了解。

看到这里,有些细心的小伙伴可能会想了,既然锁是由 tryAcquire 控制的,那和 state 又有什么关系呢? 我们完全可以定义一个自定义变量,比如 signfalse 代表无锁, true 代表锁定,好像也可以实现这段逻辑啊?这个时候就需要引出我们神奇的 compareAndSetCAS 操作了。

3、AQS 绕不过的话题: CAS Compare And Swap

前面说到,我们暂时认为 : tryAcquire 返回 true ,代表获取到锁,反之只要 tryAcquire 返回 flase ,线程就会被阻塞 (不准确,后面会细说)。实际上这里有一个 隐含条件,我们必须做到:

  • ※ 无论何时,都只能有一个线程 tryAcquire 成功,且在某个线程 tryAcquire 成功之后,并在其 release 释放锁之前,任何线程进行 tryAcquire 都将返回 false

是的,就是并发问题!

下面这个例子我们简单使用一个自定义变量 sign 来实现 tryAcquire ,看看会发生什么:

private boolean sign;

        @Override
        protected boolean tryAcquire(int ignore) {
            boolean result = false;
            if (!sign) {
                sign = true;
                result =  true;
            }
            print("尝试获取锁" + (result ? "成功" : "失败"));
            return result;
        }

        @Override
        protected boolean tryRelease(int ignore) {
            sign = false;
            return true;
        }
========================================= 输出
时间 - Fri May 24 18:03:12 CST 2019		Thread[main,5,main]		尝试获取锁成功
时间 - Fri May 24 18:03:12 CST 2019		Thread[main,5,main]		main 线程 Sleep 之前
时间 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		调用 mutex.lock() 之前
时间 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		尝试获取锁失败
时间 - Fri May 24 18:03:17 CST 2019		Thread[main,5,main]		main 线程 Sleep 之后
时间 - Fri May 24 18:03:17 CST 2019		Thread[Thread-0,5,main]		尝试获取锁成功
时间 - Fri May 24 18:03:17 CST 2019		Thread[Thread-0,5,main]		调用 mutex.lock() 之后

看起来好像没问题,在这个 demo 中也得到了和第一个 DEMO 一样的预期的结果。然而事情并没有那么简单,新写的这个 tryAcquire 实现是一个 "CompareThenSet" 操作,在并发的情况下,会出现不可预期的情况

  • 线程A 进来,发现 signfalse
  • 线程B 同时进来,也发现 signfalse
  • 两者同时将 sign 修改为 true ,问题就来了。

到底是 线程A 获取到了锁,还是 线程B 呢?(实际上都获取到了)

我们改一下 Main 方法,我们使用 100 个线程并发执行 mutex.lock(); 获取锁成功则会输出语句 print("获取锁成功"); ,执行,发现,竟然有两个线程同时获取到了锁。 有两个线程同时将 sign 修改为了 true

public static void main(String[] args) throws InterruptedException {
        Mutex mutex = new Mutex();

        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            threads.add(new Thread(() -> {
                mutex.lock();
                print("获取锁成功");
            }));
        }

        ExecutorService executorService = Executors.newFixedThreadPool(100);
        threads.forEach(executorService::submit);
        Thread.sleep(1000);
    }

【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 原 荐

如果我们使用 AQS 帮我们写好的 compareAndSetState 则没有这个问题。

Java9 之前,底层实现是调用 unsafe 包的 compareAndSwapInt 来实现的:

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

而在 Java9 之后,则是使用 VarHandle 来实现 VarHandleunSafe 的一个替代方案,本文不多赘述,后面会有文章讲到这个 ~ 。

// VarHandle mechanics
    private static final VarHandle STATE;

---------------------------------------------

    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }

这里简单的说一下 CAS ,即 CompareAndSwapCAS 可原子性地比较并替换一个值,乐观锁中一个典型的实现便是使用 CAS 来完成的。对并发编程有所了解的小伙伴应该都知道 CAS ,一般情况下, Compare(比较)Swap(交换) 至少是两个原子操作(实际上是更多个原子操作,主要看编译成多少条机器码)。 CAS 则保证了 CompareSwap 为一个原子操作。

二、深入理解 AbstractQueuedSynchronizer(AQS) 资源锁定与解锁正向流程

上文说到, 我们暂时认为 : tryAcquire 返回 true ,代表获取到锁,反之只要 tryAcquire 返回 flase ,线程就会被阻塞。

AQS 当然没有这么简单,但我们可以先看看加锁时调用的 acquire 方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们发现, tryAcquire 只是第一重判断,如果 tryAcquire 失败,紧接着还有另一个 核心逻辑 acquireQueued 。在简介里,我们说, AQS 除了使用一个 原子state 来作为状态判断以外,还有一个 FIFO 队列 ,此队列就和 acquireQueued 方法息息相关。另外, AQS 所控制的资源访问,还可以是共享的,或者独占的( addWaiter 参数 Node.EXCLUSIVE )。

以下的分析我们以一个简单的 独占式非公平 AQS 实现: java.util.concurrent.locks.ReentrantLock.NonfairSync 来深入解析。独占式很好理解,大部分的锁实现都只允许一个线程在同一时间获取到锁定的资源。

1、TryAcquire 与 TryRelease 的标准写法及其优化

先看看 NonfairSynctryAcuire 是怎么实现及优化的。首先, NonfairSync 中将 state == 0 定义为无锁状态。

  1. 竞争优化: 如果当前无锁( state == 0 ),再调用 CAS 。这实际上对性能是一个很好的优化,假设当前取 state 不为 0 ,实际上 CompareAndSetState 成功的概率也很小,这也可以避免同一时间内,过多的线程去并发修改 state 这个状态。
  2. 重入设计: 试想如果我们不判断当前线程是否持有锁,就去进行 CAS 操作,会发生什么?毫无疑问是 CAS 失败,这会间接导致死锁。这里我们可以看到,重入以后,有一个 int nextc = c + acquires; 操作,这是方便我们记录到底套了几层锁用的,如果没有这个机制,我们将 无法精确的控制加锁和解锁的层级 ,难免会出现一些意料之外的情况。简单来说: lock 几次,就要 unLock 几次。当然我们也可以做到 aquire 多次,一次性 release 掉,或者反过来,取决于怎么我们实现 tryAquiretryRlease 方法。
  3. 偏向优化: 这个优化实际上很简单,如果说要获取锁的线程就是锁的持有线程,我们无需去进行任何 CAS 操作,返回 true 即可。
@ReservedStackAccess
        final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 避免过多的线程竞争 CAS 操作
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);// 如果 CAS 操作成功,则将当前线程保存起来,重入和解锁时用于判断。
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires; // 重入优化,每次加锁相当于 `state++`
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true; // 偏向优化
            }
            return false;
        }



        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { // 每次解锁相当于 `state--` 直到 state == 0 ,代表可释放锁了
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

2、AcquireQueued 解析

①、addWaiter阶段

如果 tryAquire 失败,就会进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))addWaiter 方法创建了一个新的 Node 实例, Node 实例中主要保存了当前线程信息,并将 nextWaiter 赋值为 Node.EXCLUSIVE , 这个 nextWaiter 后面再谈,它主要用于线程调度、以及独占模式、共享模式的区分,我们可以先不管它。

【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 原 荐

操作比较简单,原理是将 node 塞入双向链表尾端,也就是前面提到的 FIFO队列 。就是利用 CAS 操作将新创建的、带有本线程信息的 node 设置为双向链表新的 tail ,并且修改两者的 ‘指针’ prevnext

【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 原 荐

/** Constructor used by addWaiter. */
        Node(Node nextWaiter) {
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());
        }
Node node = new Node(mode);

        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                initializeSyncQueue(); // 初始化双向链表,就是创建一个新的空 node,并且头尾都是此 node。
                                       // 这个 node 除了拿来标记链表从哪里开始,没有什么别的意义。
            }
        }

②、acquireQueued阶段(自旋)

入队成功后,进入 acquireQueued 方法, 抛开线程被 interrupt 的情况 acquireQueued 的代码其实也很简单,我们不看 interrupt 相关逻辑,其实逻辑还是很简单的。这是一个 无限循环(或者说自旋) ,只要没有 tryAcquire 成功,就会一直循环下去,逻辑如下:

  1. 如果上一个节点是 FIFO 队列头,则进行一次 tryAquire ,如果成功,则跳出循环。
  2. 检测是否需要阻塞,如果需要阻塞,则阻塞等待唤醒, parkAndCheckInterrupt 便是阻塞直到被唤醒(或者被 interrupt ,暂时先不考虑这个情况)。
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquire()parkAndCheckInterrupt() 都很好理解,前者就是去尝试一下获取锁定资源,看能否成功。后者则是阻塞直到被唤醒。

③、阻塞阶段

我们先说说 shouldParkAfterFailedAcquire ,这个判断是一个挺有意思的设计,后续文章会细说,它和线程调度、取消获取锁等相关。因为在获取锁定资源和释放锁定资源的过程中,实际上我们只需要用到两个状态,一个是初始状态 pred.waitStatus == 0 ,另一个是 pred.waitStatus == SIGNAL == -1

代码中我们可以很容易看出,在 CASprev 节点的 waitStatus 设置为 SIGNAL : -1 之前,都将返回 false ,如果设置成功,下一次自旋进入该方法就是 true 了,也就是说,会进入 parkAndCheckInterrupt() 方法,阻塞直到被唤醒。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

自旋阶段图解:

【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 原 荐

阻塞阶段图解:

【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 原 荐

3、release 解析

①、唤醒 FIFO 的下一个节点

阻塞直到唤醒这个逻辑在锁定资源、释放资源 这两个阶段来看十分简单,最后我们来看看 release 做了什么, release 除了调用了我们自己实现的 tryRelease 之外,其实关键的就是这个 unparkSuccessor

tryRelease 上面也说过了,就是改改原子 state ,这里不多赘述。

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

代码中可以看出,当 FIFO 队列不为空且头结点的 waitStatu 被修改过,就会进入 unparkSuccessorunparkSuccessor 传入了当前 FIFO 的队列头,逻辑如下:

  1. 如果当前节点 waitStatus 为负(可能为 SIGNALCONDITION 或者 PROPAGATE ),我们这里简单先看成只有 SIGNAL 状态,则 CAS 将其设置为 0 。其他几个状态我们后面会说到。
  2. 如果 !(s == null || s.waitStatus > 0) ,也就是说 node.nextwaitStatus <= 0 ,则简单的直接将其唤醒: LockSupport.unpark(s.thread);
private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 原 荐

②、被唤醒后

被唤醒的线程当然不是直接获得了锁,它还是会继续 acquireQueue 进行自旋,逻辑还是和之前一样,避免小伙伴往上翻代码,这里贴了一份如果 prev 是头结点,如果 tryAcquire 成功,我们看到其实很简单,只是将自己设为头部即可。

if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 原 荐

这篇文章只是简单的说说 AQS 的正向获取资源,释放资源流程,后续会继续解析 waitnotifycondition 等基于 AQS 的线程调度解析 ~~ 以及各个锁是如何实现 AQS 的 ~~

文章皆是基于源码一步步分析,没有参考过多资料,如有错误, 请指出!!

另外欢迎来 Q 群讨论技术相关(目前基本没人) [左二维码] ~

如果觉得写得好还可以关注一波订阅号哟 ~ 博客和订阅号同步更新 [右二维码] ~

【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 原 荐

参考资料:

JDK12 源码

Brief introduction to AbstractQueuedSynchronizer by Using a Simple Mutex Example

另外小伙伴可以思考一下:

  1. 如果说阶段7: ThreadB 被唤醒后,继续自旋时,另一个线程 ThreadC tryAcquire 成功了会发生什么。
  2. 如果说第一个问题了解了,那应该就很清楚为什么说本文解析的这个锁叫做:非公平锁了
  3. 众所周知,只要是 CAS 操作,都有 ABA 问题,如果说修改 waitStatus 发生了 ABA 问题,会发生什么?

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

构建之法(第三版)

构建之法(第三版)

邹欣 / 人民邮电出版社 / 2017-6 / 69.00元

软件工程牵涉的范围很广, 同时也是一般院校的同学反映比较空洞乏味的课程。 但是,软件工程 的技术对于投身 IT 产业的学生来说是非常重要的。作者有在世界一流软件企业 20 年的一线软件开 发经验,他在数所高校进行了多年的软件工程教学实践,总结出了在 16 周的时间内让同学们通过 “做 中学 (Learning By Doing)” 掌握实用的软件工程技术的教学计划,并得到高校师生的积极反馈。在此 ......一起来看看 《构建之法(第三版)》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具