一、简介
AQS是JUC框架中重要的类,通过它来实现独占锁和共享锁的,内部很多类都是通过AQS来实现的,比如CountDownLatch、ReentrantLock、ReentrantReadWriteLock、Semaphore。本章是对AbstractQueuedSynchronizer源码进行分析。从独占锁的获取、释放,共享锁的获取、释放,以及Condition的await、signal三方面对源码进行分析。先知道AQS的这几点注意事项1、Condition条件变量只支持在独占锁状态下2、AQS支持同步队列和条件队列(条件队列的数目根据Condition实例的数目),只有同步队列中的节点线程才能获取锁3、AQS是个抽象类,提供独占锁和共享锁的公有方法,子类需要实现的几个模板方法,①tryAcquire②tryRelease③tryAcquireShared④tryReleaseShared⑤isHeldExclusively。这几点在下面代码解析会分析到。 还有一个AbstractQueuedLongSynchronizer类,它与AQS功能和实现几乎一样,唯一不同的是AQLS中代表锁被获取次数的属性state类型是long类型,而AQS中该成员变量是int类型。AQS 源码对应的jdk版本是1.8。
二、类关系
//此类只提供保存和获取独占锁线程,子类可以使用适当的保留值来帮助控制和监控访问并提供诊断 public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } //独占锁拥有者线程 private transient Thread exclusiveOwnerThread; //设置独占锁的拥有者线程,访问权限protected,同包或者子类使用 protected final void setExclusiveOwnerThread(Thread thread) { //将传入进来的线程赋值给独占锁拥有者线程 exclusiveOwnerThread = thread; } //获取独自锁的拥有者线程,访问权限protected,只能在同包或者子类使用 protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }复制代码
三、属性
//同步队列的头节点 private transient volatile Node head; //同步队列的尾节点 private transient volatile Node tail; //同步状态 private volatile int state; //如果超时时间小于此阈值,不阻塞线程,让其自旋,在doAcquireNanos、doAcquireSharedNanos、awaitNanos、await(long time, TimeUnit unit)方法使用到 static final long spinForTimeoutThreshold = 1000L; //获取UnSafe使用,如果对UnSafe使用不清楚的,可以看下我分享的UnSafe的使用 private static final Unsafe unsafe = Unsafe.getUnsafe(); //属性state的相对偏移量,相对AbstractQueuedSynchronizer实例的起始内存位置的相对偏移量,定义成静态的原因是,属性的相对实例的偏移量都是相等的 private static final long stateOffset; //属性head的相对偏移量 private static final long headOffset; //属性tail的相对偏移量 private static final long tailOffset; //内部类Node实例的属性waitStatus的相对偏移量 private static final long waitStatusOffset; //内部类Node实例的属性next的相对偏移量 private static final long nextOffset; static { try { //使用UnSafe实例获取AbstractQueuedSynchronizer类的属性state的相对偏移量 stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); //使用UnSafe实例获取AbstractQueuedSynchronizer类的属性head的相对偏移量 headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); //使用UnSafe实例获取AbstractQueuedSynchronizer类的属性tail的相对偏移量 tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); //使用UnSafe实例获取AbstractQueuedSynchronizer内部类Node的属性waitStatus的相对偏移量 waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); //使用UnSafe实例获取AbstractQueuedSynchronizer内部类Node的属性next的相对偏移量 nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } }复制代码
四、内部类
- Node类
//通过Node我们可以实现两个队列,一是通过prev和next属性实现CLH队列(线程同步队列,双向队列),二是nextWaiter属性实现Condition条件上的等待条件队列(单向队列),在Condition中会详细介绍。 static final class Node { //当前节点是获取共享锁的标记 static final Node SHARED = new Node(); //当前节点是获取独占锁的标记 static final Node EXCLUSIVE = null; //属性waitStatus的值,标志节点对应的线程被取消 static final int CANCELLED = 1; //属性waitStatus的值,标志当前节点的next节点的线程(即队列中当前节点的下一个节点)需要被阻塞 static final int SIGNAL = -1; //属性waitStatus的值,标志当前节点在Condition条件下等待阻塞,在Condition实例的await系列方法中使用,新建一个waitStatus的值为CONDITION的节点Node,将其加入到Condition中的条件队列中,在Condition实现类详细介绍 static final int CONDITION = -2; //属性waitStatus的值,标志着下一个acquireShared方法线程应该被允许,在获取共享锁 static final int PROPAGATE = -3; //标记着当前节点的状态,默认状态是0,小于0的状态值都是有特殊作用,大于0的状态值表示已取消 volatile int waitStatus; //使用prev和next实现同步队列,即双向链表,当前节点的前驱节点 volatile Node prev; //当前节点的下一节点 volatile Node next; //当前节点对应的线程 volatile Thread thread; //有两种作用:1、表示下一个在Condition条件上等待的节点,调用Condition中await和signal方法,当前节点的线程是拥有独占锁的线程2、表示同步队列中的节点是共享模式还是独占模式 Node nextWaiter; //判断当前节点是不是共享模式 final boolean isShared() { return nextWaiter == SHARED; } //获取当前节点的前驱节点,如果为null,则抛出空指针异常 final Node predecessor() throws NullPointerException { //当前节点的前驱节点 Node p = prev; //如果前驱节点为空 if (p == null) //抛出空指针异常 throw new NullPointerException(); else //返回当前节点的前驱节点 return p; } //在创建链表头head,或者创建节点共享锁标记属性SHARED值 Node() { } //在addWaiter方法中使用 Node(Thread thread, Node mode) { //当前节点的模式,是属于共享模式,还是独占模式 this.nextWaiter = mode; //将传入进来的线程赋值给节点属性thread this.thread = thread; } //在Condition条件中使用 Node(Thread thread, int waitStatus) { //将传入节点的状态值赋值给节点属性waitStatus this.waitStatus = waitStatus; //将传入进来的线程赋值给节点属性thread this.thread = thread; } } 复制代码
- ConditionObject类
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject() { } private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } // public methods public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } private static final int REINTERRUPT = 1; private static final int THROW_IE = -1; private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }复制代码
五、独占锁
六、共享锁
七、总结
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
互联网思维的企业
[美] Dave Gray Thomas Vander Wal / 张 玳 / 人民邮电出版社 / 2014-4-25 / 59.00元
本书指导企业跳出仅更新自家产品和服务的怪圈,在管理方式、组织结构和公司文化方面进行变革,建立具有互联网思维的企业。书中通过大量图示和示例阐述了互联式公司必需的基础元素(透明的互动和交流平台,推崇自治和应变的组织结构,实验和学习的企业文化),以及一套鼓励员工创新的新式管理和奖励体系。最后,讨论板可方便你在工作时间和同事探讨如何增加公司的互联程度。一起来看看 《互联网思维的企业》 这本书的介绍吧!