AQS:JAVA经典之锁实现算法(一)

栏目: Java · 发布时间: 5年前

内容简介:AQS可以说是JAVA源码中必读源码之一。同时它也是JAVA大厂面试的高频知识点之一。认识并了解它,JAVA初中升高级工程师必备知识点之一。 AQS是AbstractQueuedSynchronizer的简称,它也是JUC包下众多非原生锁实现的核心。AQS是基于CLH队列算法改进实现的锁机制。大体逻辑是AQS内部有一个链型队列,队列结点类是AQS的一个内部类Node,形成一个类似如下Sync Queue(记住这个名词)可以看出,一个Node除了前后结点的索引外,还维护了一个Thread对象,一个int的wa

AQS可以说是 JAVA 源码中必读源码之一。同时它也是JAVA大厂面试的高频知识点之一。认识并了解它,JAVA初中升高级工程师必备知识点之一。 AQS是AbstractQueuedSynchronizer的简称,它也是JUC包下众多非原生锁实现的核心。

一:AQS基础概况

AQS是基于CLH队列算法改进实现的锁机制。大体逻辑是AQS内部有一个链型队列,队列结点类是AQS的一个内部类Node,形成一个类似如下Sync Queue(记住这个名词)

AQS:JAVA经典之锁实现算法(一)

可以看出,一个Node除了前后结点的索引外,还维护了一个Thread对象,一个int的waitStatus。 Thread对象就是处于竞争队列中的线程对象本身。 waitStatus表示当前竞争结点的状态,这里暂且忽略掉。

处于队首的,即Head所指向的结点,即为获取到锁的结点。释放锁即为出队,后续结点则成为队首,即获取到锁

Tips:这里帮大家理解一个事情,每一个ReentrantLock实例都有且只有一个AQS实例,一个AQS实例维护一个Sync Queue。所以说,当我们的业务代码中的多个线程对同一个ReentrantLock实例进行锁竞争操作时,其实际就是对同一个Sync Queue的队列进行入队、出队操作。

二:AQS调用入口----ReentrantLock

我们在用ReentrantLock时,代码通常如下:

ReentrantLock lock = new ReentrantLock();
Runnable runnable = new Runnable() {
  public void run() {
    lock.lock();
    Sys.out(Thread.currentThread().name() + "抢到锁");
    lock.unlock();
  }
};
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
t2.start();
t1.start();
复制代码

可见线程t1,t2竞争lock这个锁。 lock.lock() 抢锁 lock.unlock() 释放锁 来看入口函数 lock

public void lock() {
  sync.lock();
}
复制代码

syncReentrantLock 内的一个继承了 AQS 的抽象类

abstract static class Sync extends AbstractQueuedSynchronizer {
}
复制代码

抽象类的具体实现是另两个内部类 NonFairSync FairSync ,分别代表非公平锁、公平锁。

我们系统来看下继承图

AQS:JAVA经典之锁实现算法(一)

ReentrantLock中的sync实例,默认是在构造函数中初始化的,

public ReentrantLock() {
  sync = new NonfairSync();
}
复制代码

那我们就看默认的 NonFairSync 的实现逻辑。

NonFairSync

当我们调用 ReentrantLock.lock() 时,直接调用到的是 NonFairSync.lock()

/**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
复制代码

compareAndSetState(0, 1) 通过CAS(CAS是啥,这里不讲,参考乐观锁。具体Google吧)方式,设置AQS的 int state 字段为1。AQS内部就是通过这个 state 是否为0来判断当前锁是否已经被线程获取到。 返回 true ,则说明获取锁成功,设置当前锁的独占线程 setExclusiveOwnerThreaThread.currentThread()); 否则, acquire(1) 尝试获d(取锁。这个方法会自旋、阻塞,一直到获取锁成功为止。这里,传进去的参数1,就参考乐观锁的版本字段,同时,这里,它还记录了可重入锁重复获取到锁的次数。只有释放同样次数才能最终释放锁。

具体看AQS的 acquire() 的逻辑

AbstractQueuedSynchronizer.acquire()

public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}
复制代码

要注意,这个方法是忽略线程中断的。 先看 tryAcquire(arg) ,这个方法是AQS留给子实现类的口子,具体实现看 NonFairSync ,它的实现里直接调用了 Sync.nonfairTryAcquire()

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
复制代码

可以看到方法内,先是尝试获取 state = 0 时的初始锁,如果失败,判断当前锁是否是被当前线程获取,是的话,将 acquire 时传入的参数累加到 state 字段上。在这里,这个字段就是用来记录重复获取锁的次数。 获取失败则返回 false

回到 acquire 在获取失败,返回 false 后,才会继续调用 && 右边的 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法。 先看 addWaiter

private Node addWaiter(Node mode) {
1:        Node node = new Node(Thread.currentThread(), mode);
2:        // Try the fast path of enq; backup to full enq on failure
3:        Node pred = tail;
4:        if (pred != null) {
5:           node.prev = pred;
6:           if (compareAndSetTail(pred, node)) {
7:                pred.next = node;
8:                return node;
9:            }
10:        }
11:       enq(node);
12:        return node;
13:    }
复制代码

利用传进来的 Node.EXCLUSIVE 表示的排斥锁参数以及当前线程实例初始化新 Node ,第 4-10 行代码是在 Sync Queue 队列内有竞争线程时进入。为空时会走到 enq(node) ,这里是在竞争为空时将竞争线程入队的操作。 然后返回当前竞争的线程 node 此时 Sync Queue 如图

AQS:JAVA经典之锁实现算法(一)

我们假设node_1是已经获取到锁的结点,node_2即为我们当前操作的结点。

返回 acquire 接着看 addWaiter 返回的 node 直接作为参数给了 acquireQueued ,这个方法就是主要的 node 竞争锁方法。

final boolean acquireQueued(final Node node, int arg) {
        // 获取锁成功失败标记
        boolean failed = true;
        try {
            // 当前竞争线程的中断标记
            boolean interrupted = false;
            // 自旋竞争锁,竞争不到锁的话,线程又没有中断
            // 则一直在这儿循环
            for (;;) {
                // 获取当前线程的前驱结点
                final Node p = node.predecessor();
                // 如果前驱结点是头结点,则尝试去tryAcquire
                // 这个逻辑我们之前看过,当前线程未获取到锁的情况下
                // 在AQS的state字段不为0时,则返回false
                if (p == head && tryAcquire(arg)) {
                    // 进入到这里,说明要不就是当前线程在重复获取
                    // 要不就是前边的结点释放锁,state 归0,这里获取到
                    setHead(node);
                    p.next = null; // help GC
                    // 标识竞争锁成功
                    failed = false;
                    // 这个方法不响应线程中断,但是会返回线程在竞争锁过程中
                    // 中断标记返回
                    return interrupted;
                }
                // 若获取锁失败,则到这里。这里的逻辑主要在If判断
                // 的两个方法中,用来将当前线程挂起的,具体逻辑
                // 看下面
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

park就是停下的意思。所以这个方法从名字上也比较好理解,就是 挂起线程并且检查线程的中断状态。这里要注意, LockSupport.part(this) 方法是会在线程中断时自动唤醒的

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
复制代码

这个方法传入了当前竞争结点及其前驱结点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驱结点的等待状态。这里只需要记住,我们这里考虑的是
        // 非共享锁、非公平锁的AQS。所以,只需要确保当前竞争
        // 结点的前驱结点状态为SIGNAL就好。剩下的状态,
        // 与我们此时研究的情况而言没有用
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 如果前驱结点的status为0,则将其改为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
复制代码

shouldParkAfterFailedAcquire 可以看出来,在确保前驱结点status为 SIGNAL 时,就可以放心的去 unsafe.park() 了。之所以要为 SIGNAL ,是因为这个状态含义为:当前结点OVER时要唤醒后继结点。

所以不难推出,我们的结点现在就 park 在那了。等他前驱结点释放锁,或者自己interrupt来唤醒,但因为这个方法是无视中断的,所以即使interrupt了,只是设置了一个标记位,但仍然在循环中。

这里假设前驱结点获取锁后释放,则当前结点在 parkAndCheckInterrupt() 方法中被唤醒,而后再次循环 for(;;) ,这次会在第一个 if 中就进入,当前结点获取到锁,然后重置 Head 指向的结点等,返回当前线程的中断标记。

返回 acquire

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

if 中的 selfInterrupt() 方法只是去重新设置当前线程的中断标记位。这是因为获取线程中断状态的方法,在返回状态字段的同时,也会重置字段,所以需要标记后重新设置相应的值。

下面我们看下AQS释放锁的接口方法

ReentrantLock.unlock

public void unlock() {
        sync.release(1);
    }
复制代码

追进去

ReentrantLock.release

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
复制代码

tryRelease 是在 NonFairLock 中的实现的,如果是释放成功,则在 Head 存在并且状态不为0(其实可以理解为值为 SIGNAL 时)去唤醒 Head 的后继结点。

NonFailLock.tryRelease

下面看下 NonFairLocktryRelease 方法的实现

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
复制代码

可以看出,这个 tryRelease 其实就是去判断下是不是当前线程拥有锁,是的话,判断下当前的释放锁是否完全释放,因为锁可以重复获取,完全释放的话,就设置 state 为0,代表AQS的锁已经被释放了。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

创新者

创新者

[美] 沃尔特·艾萨克森 / 关嘉伟、牛小婧 / 中信出版社 / 2017-4 / 88.00元

《创新者》是沃尔特·艾萨克森继全球畅销书《史蒂夫·乔布斯传》之后的又一部力作,不仅讲述了计算机和互联网从无到有的发展历程,为我们 生动地刻画出数字时代的创新者群像,还深度挖掘互联网的精神内核,解读了“诗意科学”这个重大主题。 在近200年的数字化进程中群星闪耀,艾萨克森从第一个计算机程序的创造者、浪漫主义诗人拜伦之女埃达•洛夫莱斯伯爵夫人说起,细数了这一群将科学与人文融合的创新者,他们包括第......一起来看看 《创新者》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具