深入理解AbstractQueuedSynchronizer

栏目: 后端 · 发布时间: 6年前

内容简介:AQS的等待队列(阻塞队列)如下。如图所示,等待队列中的每一个线程都被封装为一个Node节点,Node是AQS的静态内部类,我们来看一下Node节点的源码:从源码可知,Node的数据结构主要就是:
/*
	头节点,可以理解为当前持有锁的线程。
	*/
    private transient volatile Node head;

    /*
    尾节点
     */
    private transient volatile Node tail;

    /**
     表示当前锁的状态,state == 0,说明当前锁可用,state > 0,说明当前锁被占用。
     */
    private volatile int state;
    
    /*
	 继承自AbstractOwnableSynchronizer,表示当前占有锁的线程。
	*/
    private transient Thread exclusiveOwnerThread; 
复制代码

AQS的等待队列(阻塞队列)如下。 注意,这个阻塞队列不包含head!!!,我们可以理解head为当前持有锁的线程!它不包含在阻塞队列中。

深入理解AbstractQueuedSynchronizer

如图所示,等待队列中的每一个线程都被封装为一个Node节点,Node是AQS的静态内部类,我们来看一下Node节点的源码:

static final class Node {
        /** Marker to indicate a node is waiting in shared mode 
			当前节点为共享模式的标识
		*/
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode 
        	当前节点为独占模式的标识
        */
        static final Node EXCLUSIVE = null;

 		//下面这四个变量都是赋值给waitState的
        /** waitStatus value to indicate thread has cancelled 
        	表示线程已经取消
        */
        static final int CANCELLED =  1;
        
        /** waitStatus value to indicate successor's thread needs unparking
        	表示当前节点的后继节点对应的线程需要被唤醒。
         */
        static final int SIGNAL    = -1;
        
        /** waitStatus value to indicate thread is waiting on condition
        不懂,暂时不看
        */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         * 不懂,暂时不看
         */
        static final int PROPAGATE = -3;

      	/**
      	  表示当前节点所表示的线程的状态,取值为上面的1, -1, -2, -3。
      	  注意与AQS的state区分,state表示锁的状态,waitStatus表示当前节点所表示的线程的状态。
		*/
        volatile int waitStatus;

        /**
         前置节点的引用
         */
        volatile Node prev;

        /**
         后置节点的引用.
         */
        volatile Node next;

        /**
         线程本体
         */
        volatile Thread thread;

        /**
         不懂,暂时不看
         */
        Node nextWaiter;
复制代码

从源码可知,Node的数据结构主要就是: thread + waitStatus + prev + next

2、AQS扮演的角色

2.1 java.util.concurrent

在进一步了解AQS之前,我们先看一下concurrent包下的结构(图片未包括全部):

深入理解AbstractQueuedSynchronizer

从整体上看,concurrent包的实现如下:

深入理解AbstractQueuedSynchronizer

2.2 Lock接口和ReentrantLock

lock接口定义的方法:

void lock(); //获取锁
void lockInterruptibly() throws InterruptedException;//获取锁的过程能够响应中断
boolean tryLock();//非阻塞式响应中断能立即返回,获取锁放回true反之返回fasle
/*
超时获取锁,在超时内或者未中断的情况下能够获取锁
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/*获取与lock绑定的等待通知组件,当前线程必须获得了锁才能进行等待,进行等待时会先释放锁,
当再次获取锁时才能从等待中返回*/
Condition newCondition();
复制代码

ReentrantLock实现了Lock接口,但是,我们看ReentrantLock时可以发现,ReentrantLock基本上所有的方法的实现实际上都是调用了其静态内存类Sync中的方法,而Sync类继承了AbstractQueuedSynchronizer(AQS)

2.3 AQS的模板设计方法

AQS的设计是使用模板方法设计模式,可以这么理解:AQS提供了一些方法让子类进行重写,但是在使用子类时,并不会直接调用子类实现的方法,而是会调用AQS的目标方法,AQS的目标方法再调用子类实现的方法。举个例子: AQS中需要子类重写的方法tryAcquire:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}
复制代码

ReentrantLock中NonfairSync(继承AQS)会重写该方法为:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
复制代码

而AQS中的模板方法acquire():

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }
复制代码

会调用tryAcquire方法,而此时当继承AQS的NonfairSync调用模板方法acquire时就会调用已经被NonfairSync重写的tryAcquire方法。这就是使用AQS的方式,在弄懂这点后会lock的实现理解有很大的提升。可以归纳总结为这么几点:

  1. 同步组件(这里不仅仅指锁,还包括CountDownLatch等)的实现依赖于同步器AQS,在同步组件实现中,使用AQS的方式被推荐定义继承AQS的静态内存类;
  2. AQS采用模板方法进行设计,AQS的protected修饰的方法需要由继承AQS的子类进行重写实现,当调用AQS的子类的方法时就会调用被重写的方法;
  3. AQS负责同步状态的管理,线程的排队,等待和唤醒这些底层操作,而Lock等同步组件主要专注于实现同步语义;
  4. 在重写AQS的方式时,使用AQS提供的getState(),setState(),compareAndSetState()方法进行修改同步状态

3、通过ReentrantLock来了解AQS

ReentrantLock内部通过Sync来实现锁:

public ReentrantLock(boolean fair) {
 		//FairSync为公平锁,NonFairSync为非公平锁
        sync = fair ? new FairSync() : new NonfairSync();
    }
复制代码

Sync是ReentrantLock的内部类,它继承了AQS。FairSync和NonFairSync也是ReentrantLock的内部类,它们都继承了Sync:

abstract static class Sync extends AbstractQueuedSynchronizer {}

static final class FairSync extends Sync{}

static final class NonFairSync extends Sync{}
复制代码

下面我们来通过ReentrantLock的内部类FairSync的源码来了解AQS: 使用ReentrantLock

public class OrderService {
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void createOrder() {
        reentrantLock.lock();
        // 通常,lock 之后紧跟着 try 语句
        try {
            //do something
        } finally {
            // 释放锁
            reentrantLock.unlock();
        }
    }
}
复制代码
static final class FairSync extends Sync {
		/**
		上面代码的reentrantLock.lock()方法,实际上调用的就是FairSync#lock(),
		而FairSync#lock()中,acquire(1)为父类AQS的方法。
		*/
        final void lock() {
        	//执行这条语句后,进入到AQS的acquire()方法。
            acquire(1);
        }

/////////////////////////////////////////AQS#acquire()/////////////////////////////////////////////////////////
		/*
		这个方法其实是AQS中的方法,写在这里是为了方便查看。
		AQS#acquire()提供了一个模板,但是它里面的具体实现还是调用子类的方法,
		接下来我们一步一步进行分析。
        */
		public final void acquire(int arg) {
			/*
			上面执行acquire(1)后进入到这里。
			这里调用的tryAcquire()方法,其实是AQS子类实现的方法。
			执行tryAcquire(),也就是执行FairSync#tryAcquire()。
			我们先往下走。
			*/
	        if (!tryAcquire(arg) &&
	            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	            selfInterrupt();
   	    }
///////////////////////////////////////////////////////////////////////////////////////////////////////////////

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         * AQS#acquire()中调用了这个方法。
         * tryAcquire()方法会尝试获取锁。
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //获取锁的状态
            int c = getState();
            /*state == 0,说明没有线程持有该锁,可以尝试拿一下。
            但是不一定能成功,因为可能有其它线程也在抢这个锁。*/
            if (c == 0) {
            	/*
            	因为这是公平锁,所有要先调用hasQueuedPredecessors()方法来看一下等待队列中是否有线程再等待。
            	如果没有线程在等待,!hasQueuedPredecessors() == true,执行compareAndSetState(0, acquires)。
            	compareAndSetState(arg1, arg2)是AQS中的方法,通过原子操作,将锁的状态state从0(arg1)
            	改为acquire(arg2)。如果设置成功(期间没有其它线程抢占),
            	则调用setExclusiveOwnerThread(current)将锁的占用线程设置为当前线程,返回true。
            	*/
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            /*
            state != 0,说明当有线程持有锁,ReentrantLock为可重入锁,下面是可重入逻辑,
            判断当前线程与持有锁的线程是否相等。是的话就把state + 1。
            */
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            /*
            有几种情况会到达这里:
            1. 锁已经被其它线程持有,且不是当前线程,不可重入;
            2. 等待队列中有其它线程在等待;(公平锁)
            3. compareAndSetState(0, acquires)失败,即尝试抢占锁失败,锁被其它同时进入的线程抢占。
            */
            return false;
        }

		//在FairSync#tryAcquire()执行后,应该回到AQS#acquire()
		/*
		if (!tryAcquire(arg) &&
	            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	            selfInterrupt();
	       tryAcquire()返回true,即线程抢占锁成功,!tryAcquire() == false,if语句不会执行后面的方法
	       tryAcquire()返回false,即线程抢占锁失败,线程执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
		*/
		/*
		我们先来看看addWaiter(Node.EXCLUSIVE),这个方法将当前线程包装成Node,然后添加到等待队列的队尾。
		*/
		private Node addWaiter(Node mode) {
		//封装当前线程为Node
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        //尝试快速入队
        if (pred != null) {
            node.prev = pred;
            //使用原子操作,将Node添加到队尾
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        /*
        快速入队失败的几种情况:
        1. tail == null,即队列为空队列
        2. compareAndSetTail(pred, node)失败,即在入队时,被其它线程抢先。
        快速入队失败后调用enq()
        */
        enq(node);
        return node;
    }
    
    /*
    下面我们来看enq()
    */
    private Node enq(final Node node) {
        //不断进行CAS操作尝试入队
        for (;;) {
            Node t = tail;
            //队列为空,设置Head,初始化队列
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { //队列不为空,使用CAS操作尝试将node设置为队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    /*
    接下来又回到了这段代码:
    	if (!tryAcquire(arg) &&
	            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	            selfInterrupt();
	            
     addWaiter()返回node,执行acquireQueued()方法。
    */
    // acquireQueued()方法的参数node,经过addWaiter(Node.EXCLUSIVE),已经进入阻塞队列
    // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
    // 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
    // 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                /*
                如果node的前驱是head,那么我们可以尝试抢一下锁。
                因为node为等待队列的第一个节点,Head刚刚被初始化,还未被其它线程占有。
                注意,Head表示持有锁的线程。
                */
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //方法执行到这里有两种情况:
                1. node不是队头
                2. 抢占线程失败
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    /*
    执行这个方法,说明线程没有抢占到锁。这个方法决定是否该将线程挂起
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             * 前置节点状态正常。可以挂起
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             * 前置节点取消了等待,更换前置节点。返回false
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             * 将前置节点状态设置为SIGNAL,返回false
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        /*
        返回false后会执行acquireQueued(),如果经过上面的操作以后当前节点不是head的后继节点,
        那么再次执行这个方法,然后返回true。
        注意,这个方法第一次进来的时候不会返回true,前驱节点
        waitStatus = SIGNAL是依赖后继节点设置的,第一次进来不可能返回true。
        */
        return false;
    }
    
    /*
    将线程挂起
    */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

}
复制代码

看到这里,ReentrantLock#lock()的流程大概走了一遍,接下来看一下ReentrantLock#unlock()方法:

public void unlock() {
        sync.release(1);
}

public final boolean release(int arg) {
        //tryRelease()返回true时,即锁完全释放,才会执行unparkSuccessor(),唤醒下一个线程
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

protected final boolean tryRelease(int releases) {
            //可重入锁,可能调用一次unlock()不够
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //c == 0,说明锁完全释放,返回true
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从后往前找,找出第一个状态正常的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
}
复制代码

以上所述就是小编给大家介绍的《深入理解AbstractQueuedSynchronizer》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ajax for Web Application Developers

Ajax for Web Application Developers

Kris Hadlock / Sams / 2006-10-30 / GBP 32.99

Book Description Reusable components and patterns for Ajax-driven applications Ajax is one of the latest and greatest ways to improve users’ online experience and create new and innovative web f......一起来看看 《Ajax for Web Application Developers》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具