Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(3)

栏目: Java · 发布时间: 5年前

内容简介:本系列是基于经验设计原型,然后不断优化最终达到和AQS(AbstractQueuedSynchronizer)类似的设计。最终结果不一定和AQS完全一致,基于个人理解会有修改,可以作为理解AQS的不完全参考。接上篇。本篇主要介绍Condition即条件变量的实现,ReentrantLock中最后一块需要实现的内容。在实现条件变量之前,考虑一下条件变量的一些特性。

本系列是基于经验设计原型,然后不断优化最终达到和AQS(AbstractQueuedSynchronizer)类似的设计。最终结果不一定和AQS完全一致,基于个人理解会有修改,可以作为理解AQS的不完全参考。

接上篇。本篇主要介绍Condition即条件变量的实现,ReentrantLock中最后一块需要实现的内容。

在实现条件变量之前,考虑一下条件变量的一些特性。

  1. 条件变量依赖锁,而且是独占锁
  2. 执行await方法后释放锁,当前线程进入睡眠状态,等待满足条件后被其他线程signal/signalAll唤醒,被唤醒后会尝试重新获取锁
  3. 唤醒可以选择一个也可以全部

注意第一条特性,条件变量依赖的是独占锁,所以类似读锁这种共享锁是不支持条件变量的,ReentrantReadWriteLock中ReadLock#newCondition的实现是直接抛错。

按照第二条特性,可以得到如下的执行过程

Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(3)

注意ThreadA中被加粗的unlock和lock,调用条件变量的await方法时,虽然没有显式调用unlock和lock,但是内部其实是做了类似的事情的。

从第三条来看,条件变量应该是对应了一个专门的队列的,那些调用了await方法的线程会进入条件变量的队列中等待被signal。这个专门的队列和锁自身的lock/unlock等待队列不同,语义上唤醒是某个线程的signal/signalAll,而不是lock/unlock方法。如果有点难理解的话,可以考虑在上图中增加一个不调用条件变量任何方法的ThreadC,C的lock/unlock不会唤醒A,因为A等待的是B的唤醒,所以条件变量的队列和锁的lock/unlock队列是分离的,类似下图。

Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(3)

那么条件变量的等待队列需要设计得和锁的等待队列一样么?答案是不用。原因是假如你在await的unlock语义执行之前加入队列的话,因为条件队列依赖的是独占锁,只有一个线程可以操作condition queue。调用signal方法时,锁还没有被释放,同样只有一个线程可以操作condition queue。所以condition queue可以使用普通的同步队列设计。

基于以上初步分析之后得到的条件变量原型

import javax.annotation.Nonnull;
import java.util.Date;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
 
public class LockWithCondition1 implements Lock {
 
    @Override
    public void lock() {
 
    }
 
    @Override
    public void lockInterruptibly() throws InterruptedException {
 
    }
 
    @Override
    public boolean tryLock() {
        return false;
    }
 
    @Override
    public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
        return false;
    }
 
    @Override
    public void unlock() {
 
    }
 
    @Override
    @Nonnull
    public Condition newCondition() {
        return new ConditionImpl();
    }
 
    @SuppressWarnings("Duplicates")
    private class ConditionImpl implements Condition {
        final LinkedList<WaitingThread> waitingThreads = new LinkedList<>();
 
        @Override
        public void await() throws InterruptedException {
            waitingThreads.addLast(new WaitingThread(Thread.currentThread()));
            unlock();
            LockSupport.park(this);
            lockInterruptibly();
        }
 
        @Override
        public void awaitUninterruptibly() {
            waitingThreads.addLast(new WaitingThread(Thread.currentThread()));
            unlock();
            LockSupport.park(this);
            lock();
        }
 
        @Override
        public long awaitNanos(long nanosTimeout) throws InterruptedException {
            if (nanosTimeout <= 0L) {
                return 0L;
            }
            long startAt = System.nanoTime();
            waitingThreads.addLast(new WaitingThread(Thread.currentThread()));
            unlock();
            LockSupport.parkNanos(this, nanosTimeout);
            lock();
            return System.nanoTime() - startAt;
        }
 
        @Override
        public boolean await(long time, @Nonnull TimeUnit unit) throws InterruptedException {
            WaitingThread t = new WaitingThread(Thread.currentThread());
            waitingThreads.addLast(t);
            unlock();
            LockSupport.parkNanos(this, unit.toNanos(time));
            boolean signaled = (t.status.get() == WaitingThread.STATUS_SIGNALED || !t.abort());
            lock();
            return signaled;
        }
 
        @Override
        public boolean awaitUntil(@Nonnull Date deadline) throws InterruptedException {
            return await(deadline.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
 
        @Override
        public void signal() {
            WaitingThread t = waitingThreads.removeFirst();
            if (t != null) {
                t.wakeUp();
            }
        }
 
        @Override
        public void signalAll() {
            for (WaitingThread t : waitingThreads) {
                t.wakeUp();
            }
            waitingThreads.clear();
        }
    }
 
    private static class WaitingThread {
        static final int STATUS_NORMAL = 0;
        static final int STATUS_SIGNALED = 1;
        static final int STATUS_ABORTED = -1;
 
        final AtomicReference<Thread> thread;
        final AtomicInteger status = new AtomicInteger(STATUS_NORMAL);
 
        WaitingThread(@Nonnull Thread thread) {
            this.thread = new AtomicReference<>(thread);
        }
 
        void wakeUp() {
            if (status.compareAndSet(STATUS_NORMAL, STATUS_SIGNALED)) {
                LockSupport.unpark(thread.get());
            }
        }
 
        boolean abort() {
            thread.set(null);
            return status.compareAndSet(STATUS_NORMAL, STATUS_ABORTED);
        }
    }
}

这里为了只关注条件变量省去了lock的所有代码。可以看到await方法中,执行了如下操作

  1. enqueue(condition queue)
  2. unlock
  3. park
  4. lock

对应地,signal执行了如下操作

  1. dequeue(condition queue)
  2. unpark

提问,这里是否会发生signal无法唤醒await线程的情况?

考虑到enqueue和dequeue都是在持有独占锁的前提下操作的,实际可能有两种情况:

  1. 先dequeue
  2. 先enqueue

先dequeue意味着signal先执行(因为此时持有独占锁),以及队列为空,unpark不会唤醒任何线程。signal先执行等价于之后await调用不会发生,因为进入await之前的条件是满足的。执行结果正确。

先enqueue意味signal还无法执行(因为此时await的线程持有锁)。unlock之后,signal线程获取锁,发现await线程的节点,尝试unpark。这里有可能是unpark/park,也有可能是park/unpark执行序列。不管哪种,await线程都会被唤醒,尝试lock,进入lock的等待队列。signal线程释放锁之后,await线程获取锁,执行await之后的代码。执行结果正确。

可以看到,条件变量的分析比独占锁本身要容易很多,一方面这是条件变量本身的特性所致,另一方面合理利用条件变量的特性(特性一)简化了设计。

顺便说一句,设计上多个条件变量的队列互不影响,如果你看到某个锁的条件变量要求只能有一个,或者互相有影响的话,多少说明设计是有问题的。

刚才关于“先enqueue”的分析中,你是否注意到await线程被唤醒之后,由于signal线程还没有释放锁,所以await线程暂时无法获取锁,导致了一次多余的unpark/park。是否可以直接把await线程的节点从conditon queue转移到lock的等待队列?当然这样做的前提是条件变量了解lock的内部实现,由于条件变量依赖锁,所以这不是问题。

其次LockWithCondition1严格上来说在一些边界条件上不满足条件变量的语义。比如说await方法在抛出InterruptedException时必须持有锁(await返回时必须持有锁),现在的实现中lockInterruptibly在抛出InterruptedException时不持有锁。这是条件变量实现中最容易忽视的问题。

还有一些条件变量实现的细节问题,比如await的什么时候可以抛出InterruptedException。这块可以参考Condition接口本身的注释。

Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(3)

以下是修改版的条件变量实现。由于代码比较多,这里只展示了ConditionImpl的实现,其他代码可以参考

https://github.com/xnnyygn/concurrent-learning/tree/master/src/main/java/in/xnnyygn/concurrent/mylock

import javax.annotation.Nonnull;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
 
@SuppressWarnings("Duplicates")
class ConditionImpl implements Condition {
    private final MyLock lock;
    private final ConditionQueue conditionQueue = new ConditionQueue();
 
    ConditionImpl(MyLock lock) {
        this.lock = lock;
    }
 
    @Override
    public void await() throws InterruptedException {
        Node node = Node.createConditionForCurrent();
        conditionQueue.enqueue(node);
        lock.unlock();
        boolean interrupted = false;
        while (nodeNotLockEnqueued(node)) {
            LockSupport.park(this);
            /*
             * 1. interrupted, unknown
             * 2. predecessor is aborted, lock enqueued
             * 3. signaled by predecessor, lock enqueued
             */
            if (Thread.interrupted()) {
                interrupted = lockEnqueueByAwaitingThread(node);
                break;
            }
        }
        if (lock.acquireUninterruptibly(node) && !interrupted) {
            Thread.currentThread().interrupt();
        }
        if (interrupted) {
            conditionQueue.removeNonConditionNodes();
            throw new InterruptedException();
        }
    }
 
    @Override
    public void awaitUninterruptibly() {
        Node node = Node.createConditionForCurrent();
        conditionQueue.enqueue(node);
        lock.unlock();
        boolean interrupted = false;
        while (nodeNotLockEnqueued(node)) {
            LockSupport.park(this);
            if (Thread.interrupted()) {
                interrupted = true;
            }
        }
        if (lock.acquireUninterruptibly(node) || interrupted) {
            Thread.currentThread().interrupt();
        }
    }
 
    @Override
    public long awaitNanos(long nanosTimeout) throws InterruptedException {
        if (nanosTimeout <= 0L) {
            return 0L;
        }
        Node node = Node.createConditionForCurrent();
        conditionQueue.enqueue(node);
        lock.unlock();
        long deadline = System.nanoTime() + nanosTimeout;
        long nanos = nanosTimeout;
        boolean interrupted = false;
        while (nodeNotLockEnqueued(node)) {
            LockSupport.parkNanos(this, nanos);
            nanos = deadline - System.nanoTime();
            if (Thread.interrupted()) {
                interrupted = lockEnqueueByAwaitingThread(node);
                break;
            }
            if (nanos <= 0L) {
                lockEnqueueByAwaitingThread(node);
                break;
            }
        }
        if (lock.acquireUninterruptibly(node) && !interrupted) {
            Thread.currentThread().interrupt();
        }
        if (interrupted || nanos <= 0L) {
            conditionQueue.removeNonConditionNodes();
        }
        if (interrupted) {
            throw new InterruptedException();
        }
        return nanos;
    }
 
    @Override
    public boolean await(long time, TimeUnit unit) throws InterruptedException {
        return awaitNanos(unit.toNanos(time)) > 0;
    }
 
    @Override
    public boolean awaitUntil(@Nonnull Date deadline) throws InterruptedException {
        return awaitNanos(TimeUnit.MILLISECONDS.toNanos(deadline.getTime() - System.currentTimeMillis())) > 0;
    }
 
    @Override
    public void signal() {
        Node node;
        do {
            node = conditionQueue.dequeue();
            if (node == null) {
                return;
            }
        } while (!lockEnqueueBySignalingThread(node));
    }
 
    @Override
    public void signalAll() {
        Node node;
        while ((node = conditionQueue.dequeue()) != null) {
            lockEnqueueBySignalingThread(node);
        }
    }
 
    private boolean nodeNotLockEnqueued(@Nonnull Node node) {
        if (node.status.get() == Node.STATUS_CONDITION || node.predecessor.get() == null) {
            return true;
        }
        if (node.successor.get() != null) {
            return false;
        }
        // status == NORMAL
        return !lock.queue.contains(node);
    }
 
    private boolean lockEnqueueByAwaitingThread(@Nonnull Node node) {
        if (node.status.compareAndSet(Node.STATUS_CONDITION, Node.STATUS_NORMAL)) {
            lock.queue.enqueue(node);
            return true;
        }
        // enqueuing by signal thread
        while (!lock.queue.contains(node)) {
            Thread.yield();
        }
        return false;
    }
 
    private boolean lockEnqueueBySignalingThread(@Nonnull Node node) {
        if (!node.status.compareAndSet(Node.STATUS_CONDITION, Node.STATUS_NORMAL)) {
            return false;
        }
        Node predecessor = lock.queue.enqueue(node);
        if (predecessor.isAborted() || !predecessor.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL)) {
            // predecessor is aborted
            LockSupport.unpark(node.thread.get());
        }
        return true;
    }
}

在分析修改版的await之前,解释一下这里的Node。MyLock使用的Node和AQS一样,可同时用于condition queue和lock queue。这样做只是为了复用Node,你也可以选择和LockWithCondition1一样,分开使用condition queu的节点和lock queue的节点。condition queue中主要使用Node的thread,status和next,lock queue使用除next以外所有字段。为了区分condition queue和lock queue的节点,condition queue的节点一开始的状态是STATUS_CONDITION,相对的,lock queue中节点一开始为STATUS_NORMAL。前篇提到过Node中status的迁移图,不包括STATUS_CONDITION。事实上加入lock queue时节点的status不可能为STATUS_CONDITION。STATUS_CONDITION只是condition queue中的状态,这点请注意。

condition queue中节点的status只有一种迁移

STATUS_CONDITION -> STATUS_NORMAL

可能的情况有

  1. signal
  2. signalAll
  3. await超时
  4. 中断

为什么不区分异常情况?原因是异常状态是也需要加入lock queue,此时status必须是STATUS_NORMAL。你不能把一个STATUS_ABORTED的节点加入lock queue,这样你就没办法获取锁了。重申一遍,条件变量的语义中不管是正常返回,还是await超时,中断,返回时必须持有锁。

由于以上原因,status迁移只有一种。考虑到signal/signalAll时signal线程执行状态迁移,await超时和中断时await线程执行状态迁移,status迁移还必须是CAS。

condition queue的具体实现在类ConditionQueue中,这个类现阶段有3个方法

  1. enqueue
  2. removeNonConditionNodes
  3. dequeue

enqueue很好理解,removeNonConditionNodes主要用于在await超时和中断移除await线程的节点的。注意,只是从condition queue中移除,之后会放到lock queue中去。dequeue就是正常情况从condition queue中移除下一个节点,之后放到lock queue中去。

在上述前提下理解await和signal方法。

首先看最简单的signal方法。从condition queue中取出一个节点,如果没有节点就直接返回。有的话尝试状态迁移,失败的话,说明await线程自己修改了状态(状态迁移的情况3和4),signal会尝试下一个节点。如果状态迁移成功,此时按照前篇的要求,设置前置节点status为STATUS_SIGNAL。如果前置节点放弃了,或者CAS过程中失败了(意味着前置节点放弃了),此时signal线程能做的事情就是唤醒await线程,让await线程自己跳过放弃的前置节点。

signal线程是否可以帮await线程的节点跳过放弃的节点而不是unpark await线程?这样可以帮忙省去一次unpark/park。

回答是请分析哪些是signal线程可以做的,哪些是await线程可以做的。

从MyLock是基于CLHLock实现这点来说,设置前置节点的status是后继节点对应的线程,即await线程该做的事情。所以signal线程这里设置前置节点的status其实不是它的职责,这里个人觉得是为了减少一般情况下的unpark/park。如果碰到前置节点是放弃了的节点还考虑帮忙跳过的话,有点舍本逐末了。

signalAll和signal类似,这里不再赘述。

接下来是相对简单的awaitUninterruptibly。为什么不是await?因为await对于中断的处理有点微妙。awaitUninterruptibly不处理中断,准确来说是,碰到中断,只会重新设置中断状态。这样关注点就可以集中到中断处理以外的部分。

awaitUninterruptibly的基本流程和LockWithCondition1有所不同。考虑到park会响应中断,需要把park放在一个循环中,记录中断状态。跳过循环的唯一条件是节点已经被加入lock queue。

判断是否已加入lock queue的最直接的方法(nodeNotLockEnqueued最后)是从tail检查节点是否存在。其次是检查进入lock queue之后才会被使用的字段,比如successor。假如successor被设置,那么节点肯定被加入了lock queue。还有一种提前判断的方法,节点自身的状态。如果节点status是STATUS_CONDITION或者进入lock queue后才会被使用的predecessor字段没有被设置的话,可以断言节点还没有加入lock queue。

awaitUninterruptibly中跳出循环之后,执行acquireUninterruptibly。在AQS中这个方法被叫做acquireQueued。从行为上这个acquireXXX方法在节点被加入lock queue后执行。其次,会记录acquire过程中的中断,但是不会因为中断而停止尝试获取锁。await系列方法其实需要的就是这种获取模式,不会因为中断而放弃获取的方式。

不管是acquireUninterruptibly还是循环过程中被中断,awaitUninterruptibly最终只会重新设置中断标志,除此以外什么都不做。

在理解了acquireUninterruptibly之后,看一下await方法。

相比于await方法,循环有两个出口。一个是由signal线程放到lock queue的条件,另外一个是中断。中断时,await线程会自己尝试把节点加入lock queue。从两个lockEnqueueByXXXThread方法可以看出,存在两个线程同时把节点放入lock queue的可能性。假如signal线程先放进lock queue,那么await在CAS这一步(lockEnqueueByAwaitingThread第一行)会失败,这时必须等待signal线程enqueue操作的完成(通过yield)。假如await线程先放进lock queue,那么signal线程会失败,现有逻辑中signal线程会尝试唤醒下一个节点。

注意这里的await和AQS的await方法有些许不同。AWS的await在await线程先放入lock queue时最后会抛出InterruptedException,但是在signal线程先放入的话,会设置中断状态。这里的await,在await线程先放入时同样会抛出异常,但是在signal线程先放入的话,不会设置中断状态。这里其实有一个边界问题,在中断时恰好被signal时算不算中断?个人觉得可以不算,要算也是以留下证据为目的。

跳出循环之后,同样调用acquireUninterruptibly,按情况设置中断标志。最后在确认是中断了的情况下,从condition queue中移除自己的节点,并抛出异常。注意这里,移除节点时持有独占锁,其次,中断不代表节点仍旧在condition queue里面。一种情况是await线程比同时执行的signal先CAS成功。

考虑到这里的await和AWS的await相比还是有点差异的,如果你对于改造不太放心的话,在理解的前面内容的基础上,相信你能修改成AQS的语义。

最后是几个限时await方法,这里全部引导到同一个限时方法awaitNanos上。主体流程和await很像,有一点不同是剩余时间的处理。个人理解awaitNanos的返回值代表是是否超时,所以不能简单把timeout减去执行时间返回去调用方。具体代码这里不再分析。

小结

总体来说,条件变量比起独占锁本身要简单。因为可以借助条件变量的特性使用同步队列。另一方面,条件变量实现中也有比较细节的问题需要考虑。

至此,ReentrantLock的实现算是真正完成了。接下来是ReentrantReadWriteLock的实现。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Java Servlet & JSP Cookbook

Java Servlet & JSP Cookbook

Bruce W. Perry / O'Reilly Media / 2003-12-1 / USD 49.99

With literally hundreds of examples and thousands of lines of code, the Java Servlet and JSP Cookbook yields tips and techniques that any Java web developer who uses JavaServer Pages or servlets will ......一起来看看 《Java Servlet & JSP Cookbook》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具