深入理解AbstractQueuedSynchronizer

栏目: 后端 · 发布时间: 5年前

内容简介:AQS的等待队列(阻塞队列)如下。如图所示,等待队列中的每一个线程都被封装为一个Node节点,Node是AQS的静态内部类,我们来看一下Node节点的源码:从源码可知,Node的数据结构主要就是:
/*
	头节点,可以理解为当前持有锁的线程。
	*/
    private transient volatile Node head;

    /*
    尾节点
     */
    private transient volatile Node tail;

    /**
     表示当前锁的状态,state == 0,说明当前锁可用,state > 0,说明当前锁被占用。
     */
    private volatile int state;
    
    /*
	 继承自AbstractOwnableSynchronizer,表示当前占有锁的线程。
	*/
    private transient Thread exclusiveOwnerThread; 
复制代码

AQS的等待队列(阻塞队列)如下。 注意,这个阻塞队列不包含head!!!,我们可以理解head为当前持有锁的线程!它不包含在阻塞队列中。

深入理解AbstractQueuedSynchronizer

如图所示,等待队列中的每一个线程都被封装为一个Node节点,Node是AQS的静态内部类,我们来看一下Node节点的源码:

static final class Node {
        /** Marker to indicate a node is waiting in shared mode 
			当前节点为共享模式的标识
		*/
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode 
        	当前节点为独占模式的标识
        */
        static final Node EXCLUSIVE = null;

 		//下面这四个变量都是赋值给waitState的
        /** waitStatus value to indicate thread has cancelled 
        	表示线程已经取消
        */
        static final int CANCELLED =  1;
        
        /** waitStatus value to indicate successor's thread needs unparking
        	表示当前节点的后继节点对应的线程需要被唤醒。
         */
        static final int SIGNAL    = -1;
        
        /** waitStatus value to indicate thread is waiting on condition
        不懂,暂时不看
        */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         * 不懂,暂时不看
         */
        static final int PROPAGATE = -3;

      	/**
      	  表示当前节点所表示的线程的状态,取值为上面的1, -1, -2, -3。
      	  注意与AQS的state区分,state表示锁的状态,waitStatus表示当前节点所表示的线程的状态。
		*/
        volatile int waitStatus;

        /**
         前置节点的引用
         */
        volatile Node prev;

        /**
         后置节点的引用.
         */
        volatile Node next;

        /**
         线程本体
         */
        volatile Thread thread;

        /**
         不懂,暂时不看
         */
        Node nextWaiter;
复制代码

从源码可知,Node的数据结构主要就是: thread + waitStatus + prev + next

2、AQS扮演的角色

2.1 java.util.concurrent

在进一步了解AQS之前,我们先看一下concurrent包下的结构(图片未包括全部):

深入理解AbstractQueuedSynchronizer

从整体上看,concurrent包的实现如下:

深入理解AbstractQueuedSynchronizer

2.2 Lock接口和ReentrantLock

lock接口定义的方法:

void lock(); //获取锁
void lockInterruptibly() throws InterruptedException;//获取锁的过程能够响应中断
boolean tryLock();//非阻塞式响应中断能立即返回,获取锁放回true反之返回fasle
/*
超时获取锁,在超时内或者未中断的情况下能够获取锁
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/*获取与lock绑定的等待通知组件,当前线程必须获得了锁才能进行等待,进行等待时会先释放锁,
当再次获取锁时才能从等待中返回*/
Condition newCondition();
复制代码

ReentrantLock实现了Lock接口,但是,我们看ReentrantLock时可以发现,ReentrantLock基本上所有的方法的实现实际上都是调用了其静态内存类Sync中的方法,而Sync类继承了AbstractQueuedSynchronizer(AQS)

2.3 AQS的模板设计方法

AQS的设计是使用模板方法设计模式,可以这么理解:AQS提供了一些方法让子类进行重写,但是在使用子类时,并不会直接调用子类实现的方法,而是会调用AQS的目标方法,AQS的目标方法再调用子类实现的方法。举个例子: AQS中需要子类重写的方法tryAcquire:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}
复制代码

ReentrantLock中NonfairSync(继承AQS)会重写该方法为:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
复制代码

而AQS中的模板方法acquire():

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }
复制代码

会调用tryAcquire方法,而此时当继承AQS的NonfairSync调用模板方法acquire时就会调用已经被NonfairSync重写的tryAcquire方法。这就是使用AQS的方式,在弄懂这点后会lock的实现理解有很大的提升。可以归纳总结为这么几点:

  1. 同步组件(这里不仅仅指锁,还包括CountDownLatch等)的实现依赖于同步器AQS,在同步组件实现中,使用AQS的方式被推荐定义继承AQS的静态内存类;
  2. AQS采用模板方法进行设计,AQS的protected修饰的方法需要由继承AQS的子类进行重写实现,当调用AQS的子类的方法时就会调用被重写的方法;
  3. AQS负责同步状态的管理,线程的排队,等待和唤醒这些底层操作,而Lock等同步组件主要专注于实现同步语义;
  4. 在重写AQS的方式时,使用AQS提供的getState(),setState(),compareAndSetState()方法进行修改同步状态

3、通过ReentrantLock来了解AQS

ReentrantLock内部通过Sync来实现锁:

public ReentrantLock(boolean fair) {
 		//FairSync为公平锁,NonFairSync为非公平锁
        sync = fair ? new FairSync() : new NonfairSync();
    }
复制代码

Sync是ReentrantLock的内部类,它继承了AQS。FairSync和NonFairSync也是ReentrantLock的内部类,它们都继承了Sync:

abstract static class Sync extends AbstractQueuedSynchronizer {}

static final class FairSync extends Sync{}

static final class NonFairSync extends Sync{}
复制代码

下面我们来通过ReentrantLock的内部类FairSync的源码来了解AQS: 使用ReentrantLock

public class OrderService {
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void createOrder() {
        reentrantLock.lock();
        // 通常,lock 之后紧跟着 try 语句
        try {
            //do something
        } finally {
            // 释放锁
            reentrantLock.unlock();
        }
    }
}
复制代码
static final class FairSync extends Sync {
		/**
		上面代码的reentrantLock.lock()方法,实际上调用的就是FairSync#lock(),
		而FairSync#lock()中,acquire(1)为父类AQS的方法。
		*/
        final void lock() {
        	//执行这条语句后,进入到AQS的acquire()方法。
            acquire(1);
        }

/////////////////////////////////////////AQS#acquire()/////////////////////////////////////////////////////////
		/*
		这个方法其实是AQS中的方法,写在这里是为了方便查看。
		AQS#acquire()提供了一个模板,但是它里面的具体实现还是调用子类的方法,
		接下来我们一步一步进行分析。
        */
		public final void acquire(int arg) {
			/*
			上面执行acquire(1)后进入到这里。
			这里调用的tryAcquire()方法,其实是AQS子类实现的方法。
			执行tryAcquire(),也就是执行FairSync#tryAcquire()。
			我们先往下走。
			*/
	        if (!tryAcquire(arg) &&
	            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	            selfInterrupt();
   	    }
///////////////////////////////////////////////////////////////////////////////////////////////////////////////

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         * AQS#acquire()中调用了这个方法。
         * tryAcquire()方法会尝试获取锁。
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //获取锁的状态
            int c = getState();
            /*state == 0,说明没有线程持有该锁,可以尝试拿一下。
            但是不一定能成功,因为可能有其它线程也在抢这个锁。*/
            if (c == 0) {
            	/*
            	因为这是公平锁,所有要先调用hasQueuedPredecessors()方法来看一下等待队列中是否有线程再等待。
            	如果没有线程在等待,!hasQueuedPredecessors() == true,执行compareAndSetState(0, acquires)。
            	compareAndSetState(arg1, arg2)是AQS中的方法,通过原子操作,将锁的状态state从0(arg1)
            	改为acquire(arg2)。如果设置成功(期间没有其它线程抢占),
            	则调用setExclusiveOwnerThread(current)将锁的占用线程设置为当前线程,返回true。
            	*/
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            /*
            state != 0,说明当有线程持有锁,ReentrantLock为可重入锁,下面是可重入逻辑,
            判断当前线程与持有锁的线程是否相等。是的话就把state + 1。
            */
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            /*
            有几种情况会到达这里:
            1. 锁已经被其它线程持有,且不是当前线程,不可重入;
            2. 等待队列中有其它线程在等待;(公平锁)
            3. compareAndSetState(0, acquires)失败,即尝试抢占锁失败,锁被其它同时进入的线程抢占。
            */
            return false;
        }

		//在FairSync#tryAcquire()执行后,应该回到AQS#acquire()
		/*
		if (!tryAcquire(arg) &&
	            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	            selfInterrupt();
	       tryAcquire()返回true,即线程抢占锁成功,!tryAcquire() == false,if语句不会执行后面的方法
	       tryAcquire()返回false,即线程抢占锁失败,线程执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
		*/
		/*
		我们先来看看addWaiter(Node.EXCLUSIVE),这个方法将当前线程包装成Node,然后添加到等待队列的队尾。
		*/
		private Node addWaiter(Node mode) {
		//封装当前线程为Node
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        //尝试快速入队
        if (pred != null) {
            node.prev = pred;
            //使用原子操作,将Node添加到队尾
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        /*
        快速入队失败的几种情况:
        1. tail == null,即队列为空队列
        2. compareAndSetTail(pred, node)失败,即在入队时,被其它线程抢先。
        快速入队失败后调用enq()
        */
        enq(node);
        return node;
    }
    
    /*
    下面我们来看enq()
    */
    private Node enq(final Node node) {
        //不断进行CAS操作尝试入队
        for (;;) {
            Node t = tail;
            //队列为空,设置Head,初始化队列
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { //队列不为空,使用CAS操作尝试将node设置为队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    /*
    接下来又回到了这段代码:
    	if (!tryAcquire(arg) &&
	            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	            selfInterrupt();
	            
     addWaiter()返回node,执行acquireQueued()方法。
    */
    // acquireQueued()方法的参数node,经过addWaiter(Node.EXCLUSIVE),已经进入阻塞队列
    // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
    // 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
    // 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                /*
                如果node的前驱是head,那么我们可以尝试抢一下锁。
                因为node为等待队列的第一个节点,Head刚刚被初始化,还未被其它线程占有。
                注意,Head表示持有锁的线程。
                */
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //方法执行到这里有两种情况:
                1. node不是队头
                2. 抢占线程失败
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    /*
    执行这个方法,说明线程没有抢占到锁。这个方法决定是否该将线程挂起
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             * 前置节点状态正常。可以挂起
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             * 前置节点取消了等待,更换前置节点。返回false
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             * 将前置节点状态设置为SIGNAL,返回false
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        /*
        返回false后会执行acquireQueued(),如果经过上面的操作以后当前节点不是head的后继节点,
        那么再次执行这个方法,然后返回true。
        注意,这个方法第一次进来的时候不会返回true,前驱节点
        waitStatus = SIGNAL是依赖后继节点设置的,第一次进来不可能返回true。
        */
        return false;
    }
    
    /*
    将线程挂起
    */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

}
复制代码

看到这里,ReentrantLock#lock()的流程大概走了一遍,接下来看一下ReentrantLock#unlock()方法:

public void unlock() {
        sync.release(1);
}

public final boolean release(int arg) {
        //tryRelease()返回true时,即锁完全释放,才会执行unparkSuccessor(),唤醒下一个线程
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

protected final boolean tryRelease(int releases) {
            //可重入锁,可能调用一次unlock()不够
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //c == 0,说明锁完全释放,返回true
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从后往前找,找出第一个状态正常的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
}
复制代码

以上所述就是小编给大家介绍的《深入理解AbstractQueuedSynchronizer》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

商战

商战

杰克•特劳特、阿尔•里斯 / 李正栓、李腾 / 机械工业出版社 / 2011-3 / 42.00元

本书重点阐述了商战中的四种常用战略形式,如防御战、进攻战、侧翼战和游击战,针对每一种形式又提出了三条应遵循的原则,以及如何在具体的商战中应用这些原则。本书分析了商战中的实际案例:可口可乐与百事可乐的战役,汉堡王与温迪斯对麦当劳的挑战以及DEC对阵IBM等。这些人们熟知品牌的案例在作者精心的组织下,使读者不仅加深了对本书中心思想的理解,而且学习了如何在实战中具体应用各种营销战略和策略的技巧。 ......一起来看看 《商战》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具