内容简介:AQS的等待队列(阻塞队列)如下。如图所示,等待队列中的每一个线程都被封装为一个Node节点,Node是AQS的静态内部类,我们来看一下Node节点的源码:从源码可知,Node的数据结构主要就是:
/* 头节点,可以理解为当前持有锁的线程。 */ private transient volatile Node head; /* 尾节点 */ private transient volatile Node tail; /** 表示当前锁的状态,state == 0,说明当前锁可用,state > 0,说明当前锁被占用。 */ private volatile int state; /* 继承自AbstractOwnableSynchronizer,表示当前占有锁的线程。 */ private transient Thread exclusiveOwnerThread; 复制代码
AQS的等待队列(阻塞队列)如下。 注意,这个阻塞队列不包含head!!!,我们可以理解head为当前持有锁的线程!它不包含在阻塞队列中。
如图所示,等待队列中的每一个线程都被封装为一个Node节点,Node是AQS的静态内部类,我们来看一下Node节点的源码:
static final class Node { /** Marker to indicate a node is waiting in shared mode 当前节点为共享模式的标识 */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode 当前节点为独占模式的标识 */ static final Node EXCLUSIVE = null; //下面这四个变量都是赋值给waitState的 /** waitStatus value to indicate thread has cancelled 表示线程已经取消 */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking 表示当前节点的后继节点对应的线程需要被唤醒。 */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition 不懂,暂时不看 */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate * 不懂,暂时不看 */ static final int PROPAGATE = -3; /** 表示当前节点所表示的线程的状态,取值为上面的1, -1, -2, -3。 注意与AQS的state区分,state表示锁的状态,waitStatus表示当前节点所表示的线程的状态。 */ volatile int waitStatus; /** 前置节点的引用 */ volatile Node prev; /** 后置节点的引用. */ volatile Node next; /** 线程本体 */ volatile Thread thread; /** 不懂,暂时不看 */ Node nextWaiter; 复制代码
从源码可知,Node的数据结构主要就是: thread + waitStatus + prev + next
2、AQS扮演的角色
2.1 java.util.concurrent
在进一步了解AQS之前,我们先看一下concurrent包下的结构(图片未包括全部):
从整体上看,concurrent包的实现如下:
2.2 Lock接口和ReentrantLock
lock接口定义的方法:
void lock(); //获取锁 void lockInterruptibly() throws InterruptedException;//获取锁的过程能够响应中断 boolean tryLock();//非阻塞式响应中断能立即返回,获取锁放回true反之返回fasle /* 超时获取锁,在超时内或者未中断的情况下能够获取锁 */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /*获取与lock绑定的等待通知组件,当前线程必须获得了锁才能进行等待,进行等待时会先释放锁, 当再次获取锁时才能从等待中返回*/ Condition newCondition(); 复制代码
ReentrantLock实现了Lock接口,但是,我们看ReentrantLock时可以发现,ReentrantLock基本上所有的方法的实现实际上都是调用了其静态内存类Sync中的方法,而Sync类继承了AbstractQueuedSynchronizer(AQS)
2.3 AQS的模板设计方法
AQS的设计是使用模板方法设计模式,可以这么理解:AQS提供了一些方法让子类进行重写,但是在使用子类时,并不会直接调用子类实现的方法,而是会调用AQS的目标方法,AQS的目标方法再调用子类实现的方法。举个例子: AQS中需要子类重写的方法tryAcquire:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 复制代码
ReentrantLock中NonfairSync(继承AQS)会重写该方法为:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } 复制代码
而AQS中的模板方法acquire():
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
会调用tryAcquire方法,而此时当继承AQS的NonfairSync调用模板方法acquire时就会调用已经被NonfairSync重写的tryAcquire方法。这就是使用AQS的方式,在弄懂这点后会lock的实现理解有很大的提升。可以归纳总结为这么几点:
- 同步组件(这里不仅仅指锁,还包括CountDownLatch等)的实现依赖于同步器AQS,在同步组件实现中,使用AQS的方式被推荐定义继承AQS的静态内存类;
- AQS采用模板方法进行设计,AQS的protected修饰的方法需要由继承AQS的子类进行重写实现,当调用AQS的子类的方法时就会调用被重写的方法;
- AQS负责同步状态的管理,线程的排队,等待和唤醒这些底层操作,而Lock等同步组件主要专注于实现同步语义;
- 在重写AQS的方式时,使用AQS提供的getState(),setState(),compareAndSetState()方法进行修改同步状态
3、通过ReentrantLock来了解AQS
ReentrantLock内部通过Sync来实现锁:
public ReentrantLock(boolean fair) { //FairSync为公平锁,NonFairSync为非公平锁 sync = fair ? new FairSync() : new NonfairSync(); } 复制代码
Sync是ReentrantLock的内部类,它继承了AQS。FairSync和NonFairSync也是ReentrantLock的内部类,它们都继承了Sync:
abstract static class Sync extends AbstractQueuedSynchronizer {} static final class FairSync extends Sync{} static final class NonFairSync extends Sync{} 复制代码
下面我们来通过ReentrantLock的内部类FairSync的源码来了解AQS: 使用ReentrantLock
public class OrderService { private static ReentrantLock reentrantLock = new ReentrantLock(true); public void createOrder() { reentrantLock.lock(); // 通常,lock 之后紧跟着 try 语句 try { //do something } finally { // 释放锁 reentrantLock.unlock(); } } } 复制代码
static final class FairSync extends Sync { /** 上面代码的reentrantLock.lock()方法,实际上调用的就是FairSync#lock(), 而FairSync#lock()中,acquire(1)为父类AQS的方法。 */ final void lock() { //执行这条语句后,进入到AQS的acquire()方法。 acquire(1); } /////////////////////////////////////////AQS#acquire()///////////////////////////////////////////////////////// /* 这个方法其实是AQS中的方法,写在这里是为了方便查看。 AQS#acquire()提供了一个模板,但是它里面的具体实现还是调用子类的方法, 接下来我们一步一步进行分析。 */ public final void acquire(int arg) { /* 上面执行acquire(1)后进入到这里。 这里调用的tryAcquire()方法,其实是AQS子类实现的方法。 执行tryAcquire(),也就是执行FairSync#tryAcquire()。 我们先往下走。 */ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /////////////////////////////////////////////////////////////////////////////////////////////////////////////// /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. * AQS#acquire()中调用了这个方法。 * tryAcquire()方法会尝试获取锁。 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取锁的状态 int c = getState(); /*state == 0,说明没有线程持有该锁,可以尝试拿一下。 但是不一定能成功,因为可能有其它线程也在抢这个锁。*/ if (c == 0) { /* 因为这是公平锁,所有要先调用hasQueuedPredecessors()方法来看一下等待队列中是否有线程再等待。 如果没有线程在等待,!hasQueuedPredecessors() == true,执行compareAndSetState(0, acquires)。 compareAndSetState(arg1, arg2)是AQS中的方法,通过原子操作,将锁的状态state从0(arg1) 改为acquire(arg2)。如果设置成功(期间没有其它线程抢占), 则调用setExclusiveOwnerThread(current)将锁的占用线程设置为当前线程,返回true。 */ if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } /* state != 0,说明当有线程持有锁,ReentrantLock为可重入锁,下面是可重入逻辑, 判断当前线程与持有锁的线程是否相等。是的话就把state + 1。 */ else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } /* 有几种情况会到达这里: 1. 锁已经被其它线程持有,且不是当前线程,不可重入; 2. 等待队列中有其它线程在等待;(公平锁) 3. compareAndSetState(0, acquires)失败,即尝试抢占锁失败,锁被其它同时进入的线程抢占。 */ return false; } //在FairSync#tryAcquire()执行后,应该回到AQS#acquire() /* if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); tryAcquire()返回true,即线程抢占锁成功,!tryAcquire() == false,if语句不会执行后面的方法 tryAcquire()返回false,即线程抢占锁失败,线程执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) */ /* 我们先来看看addWaiter(Node.EXCLUSIVE),这个方法将当前线程包装成Node,然后添加到等待队列的队尾。 */ private Node addWaiter(Node mode) { //封装当前线程为Node Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //尝试快速入队 if (pred != null) { node.prev = pred; //使用原子操作,将Node添加到队尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } /* 快速入队失败的几种情况: 1. tail == null,即队列为空队列 2. compareAndSetTail(pred, node)失败,即在入队时,被其它线程抢先。 快速入队失败后调用enq() */ enq(node); return node; } /* 下面我们来看enq() */ private Node enq(final Node node) { //不断进行CAS操作尝试入队 for (;;) { Node t = tail; //队列为空,设置Head,初始化队列 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //队列不为空,使用CAS操作尝试将node设置为队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /* 接下来又回到了这段代码: if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); addWaiter()返回node,执行acquireQueued()方法。 */ // acquireQueued()方法的参数node,经过addWaiter(Node.EXCLUSIVE),已经进入阻塞队列 // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话, // 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false // 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); /* 如果node的前驱是head,那么我们可以尝试抢一下锁。 因为node为等待队列的第一个节点,Head刚刚被初始化,还未被其它线程占有。 注意,Head表示持有锁的线程。 */ if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //方法执行到这里有两种情况: 1. node不是队头 2. 抢占线程失败 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /* 执行这个方法,说明线程没有抢占到锁。这个方法决定是否该将线程挂起 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. * 前置节点状态正常。可以挂起 */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. * 前置节点取消了等待,更换前置节点。返回false */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. * 将前置节点状态设置为SIGNAL,返回false */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } /* 返回false后会执行acquireQueued(),如果经过上面的操作以后当前节点不是head的后继节点, 那么再次执行这个方法,然后返回true。 注意,这个方法第一次进来的时候不会返回true,前驱节点 waitStatus = SIGNAL是依赖后继节点设置的,第一次进来不可能返回true。 */ return false; } /* 将线程挂起 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } } 复制代码
看到这里,ReentrantLock#lock()的流程大概走了一遍,接下来看一下ReentrantLock#unlock()方法:
public void unlock() { sync.release(1); } public final boolean release(int arg) { //tryRelease()返回true时,即锁完全释放,才会执行unparkSuccessor(),唤醒下一个线程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { //可重入锁,可能调用一次unlock()不够 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //c == 0,说明锁完全释放,返回true if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; //从后往前找,找出第一个状态正常的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } 复制代码
以上所述就是小编给大家介绍的《深入理解AbstractQueuedSynchronizer》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【1】JavaScript 基础深入——数据类型深入理解与总结
- 深入理解java虚拟机(1) -- 理解HotSpot内存区域
- 深入理解 HTTPS
- 深入理解 HTTPS
- 深入理解 SecurityConfigurer
- 深入理解 HTTP 协议
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。