AbstractQueuedSynchronizer源码分析

栏目: 编程工具 · 发布时间: 5年前

一、简介

AQS是JUC框架中重要的类,通过它来实现独占锁和共享锁的,内部很多类都是通过AQS来实现的,比如CountDownLatch、ReentrantLock、ReentrantReadWriteLock、Semaphore。本章是对AbstractQueuedSynchronizer源码进行分析。从独占锁的获取、释放,共享锁的获取、释放,以及Condition的await、signal三方面对源码进行分析。先知道AQS的这几点注意事项1、Condition条件变量只支持在独占锁状态下2、AQS支持同步队列和条件队列(条件队列的数目根据Condition实例的数目),只有同步队列中的节点线程才能获取锁3、AQS是个抽象类,提供独占锁和共享锁的公有方法,子类需要实现的几个模板方法,①tryAcquire②tryRelease③tryAcquireShared④tryReleaseShared⑤isHeldExclusively。这几点在下面代码解析会分析到。 还有一个AbstractQueuedLongSynchronizer类,它与AQS功能和实现几乎一样,唯一不同的是AQLS中代表锁被获取次数的属性state类型是long类型,而AQS中该成员变量是int类型。AQS 源码对应的jdk版本是1.8。

二、类关系

AbstractQueuedSynchronizer源码分析

//此类只提供保存和获取独占锁线程,子类可以使用适当的保留值来帮助控制和监控访问并提供诊断
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    private static final long serialVersionUID = 3737899427754241961L;
    
    protected AbstractOwnableSynchronizer() { }
    
    //独占锁拥有者线程
    private transient Thread exclusiveOwnerThread;
     
    //设置独占锁的拥有者线程,访问权限protected,同包或者子类使用
    protected final void setExclusiveOwnerThread(Thread thread) {
        //将传入进来的线程赋值给独占锁拥有者线程
        exclusiveOwnerThread = thread;
    }
    
    //获取独自锁的拥有者线程,访问权限protected,只能在同包或者子类使用
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}复制代码

三、属性

//同步队列的头节点
private transient volatile Node head;
//同步队列的尾节点
private transient volatile Node tail;
//同步状态
private volatile int state;
//如果超时时间小于此阈值,不阻塞线程,让其自旋,在doAcquireNanos、doAcquireSharedNanos、awaitNanos、await(long time, TimeUnit unit)方法使用到
static final long spinForTimeoutThreshold = 1000L;
//获取UnSafe使用,如果对UnSafe使用不清楚的,可以看下我分享的UnSafe的使用
private static final Unsafe unsafe = Unsafe.getUnsafe();
//属性state的相对偏移量,相对AbstractQueuedSynchronizer实例的起始内存位置的相对偏移量,定义成静态的原因是,属性的相对实例的偏移量都是相等的
private static final long stateOffset;
//属性head的相对偏移量
private static final long headOffset;
//属性tail的相对偏移量
private static final long tailOffset;
//内部类Node实例的属性waitStatus的相对偏移量
private static final long waitStatusOffset;
//内部类Node实例的属性next的相对偏移量
private static final long nextOffset;
static {
    try {
        //使用UnSafe实例获取AbstractQueuedSynchronizer类的属性state的相对偏移量
        stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        //使用UnSafe实例获取AbstractQueuedSynchronizer类的属性head的相对偏移量
        headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        //使用UnSafe实例获取AbstractQueuedSynchronizer类的属性tail的相对偏移量
        tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        //使用UnSafe实例获取AbstractQueuedSynchronizer内部类Node的属性waitStatus的相对偏移量
        waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
        //使用UnSafe实例获取AbstractQueuedSynchronizer内部类Node的属性next的相对偏移量
        nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
    } catch (Exception ex) { throw new Error(ex); }    
}复制代码

四、内部类

  1. Node类
    //通过Node我们可以实现两个队列,一是通过prev和next属性实现CLH队列(线程同步队列,双向队列),二是nextWaiter属性实现Condition条件上的等待条件队列(单向队列),在Condition中会详细介绍。
    static final class Node {
            //当前节点是获取共享锁的标记
            static final Node SHARED = new Node();
            //当前节点是获取独占锁的标记
            static final Node EXCLUSIVE = null;
            
            //属性waitStatus的值,标志节点对应的线程被取消
            static final int CANCELLED =  1;
            //属性waitStatus的值,标志当前节点的next节点的线程(即队列中当前节点的下一个节点)需要被阻塞
            static final int SIGNAL    = -1;
            //属性waitStatus的值,标志当前节点在Condition条件下等待阻塞,在Condition实例的await系列方法中使用,新建一个waitStatus的值为CONDITION的节点Node,将其加入到Condition中的条件队列中,在Condition实现类详细介绍
            static final int CONDITION = -2;
            //属性waitStatus的值,标志着下一个acquireShared方法线程应该被允许,在获取共享锁
            static final int PROPAGATE = -3;
            
            //标记着当前节点的状态,默认状态是0,小于0的状态值都是有特殊作用,大于0的状态值表示已取消
            volatile int waitStatus;
            
            //使用prev和next实现同步队列,即双向链表,当前节点的前驱节点 
            volatile Node prev;
            
            //当前节点的下一节点 
            volatile Node next;
            
            //当前节点对应的线程
            volatile Thread thread;
            
            //有两种作用:1、表示下一个在Condition条件上等待的节点,调用Condition中await和signal方法,当前节点的线程是拥有独占锁的线程2、表示同步队列中的节点是共享模式还是独占模式
            Node nextWaiter;
            
            //判断当前节点是不是共享模式
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
            
            //获取当前节点的前驱节点,如果为null,则抛出空指针异常
            final Node predecessor() throws NullPointerException {
                //当前节点的前驱节点
                Node p = prev;
                //如果前驱节点为空
                if (p == null)
                    //抛出空指针异常
                    throw new NullPointerException();
                else
                    //返回当前节点的前驱节点
                    return p;
            }
            
            //在创建链表头head,或者创建节点共享锁标记属性SHARED值
            Node() {    
            }
            
            //在addWaiter方法中使用
            Node(Thread thread, Node mode) {
                //当前节点的模式,是属于共享模式,还是独占模式     
                this.nextWaiter = mode;
                //将传入进来的线程赋值给节点属性thread
                this.thread = thread;
            }
            
            //在Condition条件中使用
            Node(Thread thread, int waitStatus) {
                //将传入节点的状态值赋值给节点属性waitStatus 
                this.waitStatus = waitStatus;
                //将传入进来的线程赋值给节点属性thread
                this.thread = thread;
            }
    }
    
    复制代码
  2. ConditionObject类
    public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            private transient Node firstWaiter;
            private transient Node lastWaiter;
    
            public ConditionObject() { }
    
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    
            // public methods
            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
    
            public final void awaitUninterruptibly() {
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean interrupted = false;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if (Thread.interrupted())
                        interrupted = true;
                }
                if (acquireQueued(node, savedState) || interrupted)
                    selfInterrupt();
            }
    
    
            private static final int REINTERRUPT =  1;
            private static final int THROW_IE    = -1;
    
            
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    
            
            private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }
    
            
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
            
            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                final long deadline = System.nanoTime() + nanosTimeout;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return deadline - System.nanoTime();
            }
    
            public final boolean awaitUntil(Date deadline)
                    throws InterruptedException {
                long abstime = deadline.getTime();
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (System.currentTimeMillis() > abstime) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    LockSupport.parkUntil(this, abstime);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }
    
            public final boolean await(long time, TimeUnit unit)
                    throws InterruptedException {
                long nanosTimeout = unit.toNanos(time);
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                final long deadline = System.nanoTime() + nanosTimeout;
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }
    
    
            final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
                return sync == AbstractQueuedSynchronizer.this;
            }
    
            protected final boolean hasWaiters() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                    if (w.waitStatus == Node.CONDITION)
                        return true;
                }
                return false;
            }
    
            protected final int getWaitQueueLength() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int n = 0;
                for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                    if (w.waitStatus == Node.CONDITION)
                        ++n;
                }
                return n;
            }
    
            protected final Collection<Thread> getWaitingThreads() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                ArrayList<Thread> list = new ArrayList<Thread>();
                for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                    if (w.waitStatus == Node.CONDITION) {
                        Thread t = w.thread;
                        if (t != null)
                            list.add(t);
                    }
                }
                return list;
            }
    }复制代码

五、独占锁

六、共享锁

七、总结


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

互联网思维的企业

互联网思维的企业

[美] Dave Gray Thomas Vander Wal / 张 玳 / 人民邮电出版社 / 2014-4-25 / 59.00元

本书指导企业跳出仅更新自家产品和服务的怪圈,在管理方式、组织结构和公司文化方面进行变革,建立具有互联网思维的企业。书中通过大量图示和示例阐述了互联式公司必需的基础元素(透明的互动和交流平台,推崇自治和应变的组织结构,实验和学习的企业文化),以及一套鼓励员工创新的新式管理和奖励体系。最后,讨论板可方便你在工作时间和同事探讨如何增加公司的互联程度。一起来看看 《互联网思维的企业》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器