java 并发编程-AQS源码分析

栏目: 编程工具 · 发布时间: 7年前

  • AQS全称是 AbstractQueuedSynchronizer (抽象队列同步器),是通过一个先进先出的队列(存储等待的线程)来实现同步器的一个框架是一个抽象类,是java.util.concurrent包下很多多线程 工具 类的实现基础。Lock、CountDownLatch、Semaphore等都是基于AQS实现的。所以如果想研究Lock、CountDownLatch、Semaphore等基于AQS实现的类的源码,明白AQS原理是很重要的一步。

AQS实现

  • AQS支持两种锁一种是独占锁(独占模式),一种是共享锁(共享模式)

    • 独占锁:比如像ReentrantLock就是一种独占锁模式,多个线程去同时抢一个锁,只有一个线程能抢到这个锁,其他线程就只能阻塞等待锁被释放后重新竞争锁。
    • 共享锁:比如像读写锁里面的读锁,一个锁可以同时被多个线程拥有(多个线程可以同时拥有读锁),再比如Semaphore 设置一个资源数目(可以理解为一个锁能同时被多少个线程拥有)。
  • ps:共享锁跟独占锁可以同时存在,比如比如读写锁,读锁写锁分别对应共享锁和独占锁

  • 先来介绍一下AQS的几个主要成员变量

//AQS等待队列的头结点,AQS的等待队列是基于一个双向链表来实现的,这个头结点并不包含具体的线程是一个空结点(注意不是null)
private transient volatile Node head;
//AQS等待队列的尾部结点
private transient volatile Node tail;
//AQS同步器状态,也可以说是锁的状态,注意volatile修饰证明这个变量状态要对多线程可见
private volatile int state;
复制代码
  • AQS的内部类Node
    • Node顾名思义就是接点的意思,前面说过AQS等待队列是一个双链表,每个线程进入AQS的等待队列的时候都会被包装成一个Node节点
    static final class Node {
          //下面两个属性都是说明这个结点是共享模式还是独占模式,具体怎么表示下面会分析
          static final Node SHARED = new Node();
          static final Node EXCLUSIVE = null;
    
          //下面这四个属性就是说明结点的状态
        static final int CANCELLED =  1;//由于超时或中断,节点已被取消
        static final int SIGNAL    = -1;//表示下一个节点是通过park阻塞的,需要通过unpark唤醒
        static final int CONDITION = -2;//表示线程在等待条件变量(先获取锁,加入 
        到条件等待队列,然后释放锁,等待条件变量满足条件;只有重新获取锁之 
        后才能返回)
      static final int PROPAGATE = -3;//表示后续结点会传播唤醒的操作,共享模式下起作用
          
          volatile int waitStatus;
          //前驱结点(双链表)
          volatile Node prev;
          //后继结点(双链表)
          volatile Node next;
         //  结点所包装的线程
          volatile Thread thread;
        //对于Condtion表示下一个等待条件变量的节点;其它情况下用于区分共享模式和独占模式
          Node nextWaiter;
    
          final boolean isShared() {
              return nextWaiter == SHARED;
          }
    
          //取得前驱结点
          final Node predecessor() throws NullPointerException {
              Node p = prev;
              if (p == null)
                  //null的时候抛出异常
                  throw new NullPointerException();
              else
                  return p;
          }
    
          Node() {  
          }
    
          Node(Thread thread, Node mode) {  
              this.nextWaiter = mode;
              this.thread = thread;
          }
    
          Node(Thread thread, int waitStatus) {
              this.waitStatus = waitStatus;
              this.thread = thread;
          }
      }
    复制代码

独占模式具体分析

acquire方法分析

  • 首先是acquire方法
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码
  • acquire方法简要执行说明
    • 首先是tryAcquire方法会尝试的获取一下锁,成功的话就直接证明获取锁成功,acquire方法执行完毕(注意tryAcquire方法需要实现的子类根据自己的需要实现怎么抢锁,AQS不实现可以看到,只是抛出了异常,后面我们分析一个ReentrantLock的tryAcquire给大家感受下,暂时你只需要知道这个方法只是尝试获取锁)
    protected boolean tryAcquire(int arg) {
          throw new UnsupportedOperationException();
      }
    复制代码
    • tryAcquire方法抢锁失败就执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    • 先是addWaiter(Node.EXCLUSIVE)方法,这个方法表示将当前抢锁线程包装成结点并加入等待队列,返回包装后的结点
    • addWaiter方法返回的结点,作为acquireQueued方法的参数,该方法主要是等待队列顺序获取资源
    • 注意acquireQueued返回true表示线程发生中断,这时就会执行selfInterrupt方法响应中断。
    • 由于tryAcquireAQS没有具体实现,下面我们就接着看下addWaiter这个方法

addWaiter方法分析

private Node addWaiter(Node mode) {
        //首先把当前竞争锁的线程包装成一个节点
        Node node = new Node(Thread.currentThread(), mode);
        //如果以前的尾结点不为null(为null表示当前结点就是等待队列的第一个结点),
        就将当前结点设置为尾结点,并通过cas操作更新tail尾新入队的结点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //cas更新tail失败就以自旋的方式继续尝试入队列
        enq(node);
        return node;
    }
复制代码
  • 继续往下看下enq方法
private Node enq(final Node node) {
      //死循环进行自旋操作
        for (;;) {
            Node t = tail;
            //这里利用了延迟加载,尾结点为空的时候生成tail结点,初始化
            if (t == null) { 
              //  队列为空的时候自然尾结点就等于头结点,所以通过cas操作设置tail = head
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
               //尾结点初始化成功后就一直自旋的更新尾结点为当前结点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
复制代码
  • 总结一下addWaiter方法还是比较简单的,主要是将当前竞争锁的线程包装成为Node结点然后通过先尝试失败在自旋的方式加入到等待队列的尾部,同时更新尾结点tail
  • addWaiter执行完毕之后就证明结点已经成功入队,所以就要开始执行acquireQueued方法进行资源的获取。

acquireQueued方法分析

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取结点的前驱结点
                final Node p = node.predecessor();
                /**
                如果结点的前驱结点是head表示当前结点就是等待队列的第一个,
                因为head结点并不指向一个实际的线程,所以这个时候就会执行下
                tryAcquire函数尝试性的获取下锁。因为这个时候很有可能竞争成功
                **/
                if (p == head && tryAcquire(arg)) {
                    /**
                     拿到锁之后就更新头结点为当前结点(这个结点的线程已经拿到锁了,
                     所以更新为头结点也不会继续参与锁竞争,再次提示头结点不会参加竞争)
                    **/
                    setHead(node);
                    //  设置以前的头结点不指向其他结点,帮助gc
                    p.next = null; 
                    failed = false;
                    return interrupted;
                }
               /**
                上面前驱不是头结点或者获取锁失败就会执行shouldParkAfterFailedAcquire
                函数判断是否应该挂起线程,注意只是判断并不会执行挂起线程的操作,挂起线程的
                操作由后面的parkAndCheckInterrupt函数执行
               **/
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //  当出现异常的时候会执行cancelAcquire方法,来取消当前结点并从等待队列中清除出去
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码
  • 来一起看下shouldParkAfterFailedAcquire函数
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取下前驱结点的waitStatus,这个决定着是否要对后续结点进行挂起操作
        int ws = pred.waitStatus;
        /**
        如果ws的waitStatus=-1时,证明他的后继结点已经被park阻塞了后面到了竞争的时候会unpark唤醒后继结        
        点,所以如果结点的前驱结点waitStatus是-1,shouldParkAfterFailedAcquire就会判断需要park当前线程 
        所以返回true。
        **/
        if (ws == Node.SIGNAL)
            return true;
      //ws>0证明前驱结点已经被取消所以需要往前找到没有被取消的结点ws>0
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /**
            前驱结点生成时,ws=0,所以如果是第一次执行shouldParkAfterFailedAcquire函数就会发现前驱结点
            的ws = 0就会因为需要阻塞后面的结点设置为-1。
            **/
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        /**
        注意把前驱结点ws设置为-1之后会虽然返回false,不挂起当前线程,注意只是这一次循环不挂起,因为
        acquireQueued函数是一个死循环,所以到下一个循环如果前驱结点不是head并且tryAcquire竞争锁失败还 
        是会执行shouldParkAfterFailedAcquire方法,这个时候前驱结点已经为-1,所以就会直接返回true
       **/
        return false;
    }
复制代码
  • parkAndCheckInterrupt方法主要是park阻塞线程并在unpark的时候返回中断状态
private final boolean parkAndCheckInterrupt() {
        //park挂起线程
        LockSupport.park(this);
        /**
        线程被unpark唤醒的时候会检查终端状态并返回,这个终端状态会在acquireQueued方法中最后返回,
        所以acquireQueued函数并不响应中断而是返回中断状态由外层函数处理。
        **/
        return Thread.interrupted();
    }
复制代码
  • cancelAcquire函数
private void cancelAcquire(Node node) {
        if (node == null)
            return;
        node.thread = null;
        Node pred = node.prev;
      //循环往前找到没有被取消的结点,直到找到正常状态的结点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;
        //因为要取消当前结点所以修改当前结点得ws为CANCELLED
        node.waitStatus = Node.CANCELLED;
        //如果node为尾结点就修改尾结点并将尾结点得next设为null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            //如果不是尾结点
            /**
              满足下面三个条件,将pred的next指向node的下一节点
              1.pred不是head节点:如果pred为头节点,
              而node又被cancel,则node.next为等待队列中的第  一个节点,需要unpark唤醒
              2.pred节点状态为SIGNAL或能更新为SIGNAL
              3.pred的thread变量不能为null
            **/
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                // 
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                //如果pred为头节点,则唤醒node的后节点,注意unparkSuccessor方法为唤醒当前结点得下一个结点
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
复制代码
  • cancleAcquire函数主要是取消当前结点,将当前结点从等待队列中移出

  • 同时遍历前面的结点将被取消的结点从队列中清除出去

  • unparkSuccessor 方法

private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
      //如果下一个结点为null或者ws为取消状态就未开始遍历找到正常状态的结点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //  通过LockSupport.unpark()方法唤醒阻塞的线程,注意被阻塞的线程从哪开始继续执行
            LockSupport.unpark(s.thread);
    }
复制代码
  • 简单回顾下整个过程
    • 首先会通过addWaiter方法来将第一次竞争失败的线程包装成Node结点自旋的方式加入到等待队列
    • 加入到等待队列之后就会该结点就会运行acquireQueued方法开始同等待队列的其他结点一起获取锁
    • 先是判断该结点是不是第一个实际的等待的结点(不是head结点,head结点是空节点),如果是就用tryAcquire方法尝试获取锁,成功就更新head结点。
    • 如果上面的操作失败,就会判断该线程是否需要被挂起(当前驱结点的ws为signal就会被挂起),然后就挂起该线程,当被唤醒之后就继续重复上面的步骤获取锁
    • 当获取到锁以后就会有一个释放锁,释放锁的方法主要是release方法

release方法

public final boolean release(int arg) {
        //首先是执行tryRelease()方法,主要如何去释放获取到的锁,这个方法需要子类自己去实现
        if (tryRelease(arg)) {
            //释放成功以后如果发现等待队列还有在等待获取锁的Node就用unparkSuccessor唤醒头结点的下一个结点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
    //tryRelease方法调用失败会返回false
        return false;
    }
复制代码
  • release方法逻辑还是比较简单的,释放掉获取的锁之后唤醒等待队列的后续Node。
  • 到这里我们独占模式下的AQS获取锁的过程就分析完了。基本上了解了独占模式得锁竞争过程,在看共享模式下的锁竞争过程就比较简单了,看的时候注意对比着看。

共享模式具体分析

  • 这里我建议大家在看下文章前面我讲的共享模式与独占模式得区别

首先是acquireShared方法(对应独占模式得acquire方法)

//注意方法会忽略中断,没有selfInterrupt这个方法来响应中断
 public final void acquireShared(int arg) {
    /**
      这个tryAcquireShared方法对应独占模式的tryAcquire方法,也是需要子类自己去实现的。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。<0就表示获取失败就进doAcquireShared方法来开始进入等待队列等待获取资源
   **/
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
复制代码

doAcquireShared方法

private void doAcquireShared(int arg) {
        //  构造一个共享模式得结点加入到等待队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //如果当前结点是除了head得队列中的第一个结点那么就尝试获取资源
                    int r = tryAcquireShared(arg);
                    //tryAcquireShared返回值>=0资源获取成功,就开始进行更新结点操作
                    if (r >= 0) {
                        //这里注意下独占模式调用的是setHead方法,但是共享模式调用的是setHeadAndPropagate方法
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //这里注意下,这里响应中断,跟独占模式不同
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                } 
        //  获取资源失败就开始继续判断是否需要挂起线程,与独占模式得相同
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //  出现异常就取消结点
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码
  • 可以看到共享模式下的锁竞争同非共享模式下的步骤大体上相同
  • tryAcquireShared不同的是他会返回三种状态,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。因为前面说过共享模式下资源是有很多的(或者说是有多个锁),允许最多由对应数量的线程持有相应的资源,一个线程持有一个资源量就-1直到0。
  • 还有就是共享模式下的setHeadAndPropagate方法,下面一起来看下

setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        //首先更新head结点
        setHead(node);
        /** 
        注意propagate表示的上次执行tryAcquireShared    方法后的返回值。>0表示还有剩余资源,既然有剩余资  源就继续唤醒后面等待获取资源的并且是共享模式得  Node
        或者h == null
        或者当前获取到资源的得结点<0,signal需要唤醒后续结点
        **/
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
               //唤醒后续共享模式得Node结点
                doReleaseShared();
        }
    }
复制代码
  • setHeadAndPropagate方法主要干了两件事

      1. 更新头结点
    • 2.检查是否需要唤醒后续结点,满足上面说的三个条件
  • 看下doReleaseShared方法

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //如果头结点不为空,并且不是tail,队列还有结点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //如果head结点ws为signal就更新为0并唤醒后续结点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                
            }
            if (h == head)                   
                break;
        }
    }
复制代码
  • 可以看到doReleaseShared方法主要就是唤醒阻塞队列中等待的结点
  • 这里引入另外一篇博文的分析(地址: www.jianshu.com/p/c244abd58…
    • 从上面的分析可以知道,独占模式和共享模式的最大区别在于独占模式只允许一个线程持有资源,而共享模式下,当调用doAcquireShared时,会看后续的节点是否是共享模式,如果是,会通过unpark唤醒后续节点; 从前面的分析可以知道,被唤醒的节点是被堵塞在doAcquireShared的parkAndCheckInterrupt方法,因此唤醒之后,会再次调用setHeadAndPropagate,从而将等待共享锁的线程都唤醒,也就是说会将唤醒传播下去;

    • 加入同步队列并阻塞的节点,它的前驱节点只会是SIGNAL,表示前驱节点释放锁时,后继节点会被唤醒。shouldParkAfterFailedAcquire()方法保证了这点,如果前驱节点不是SIGNAL,它会把它修改成SIGNAL。 造成前驱节点是PROPAGATE的情况是前驱节点获得锁时,会唤醒一次后继节点,但这时候后继节点还没有加入到同步队列,所以暂时把节点状态设置为PROPAGATE,当后继节点加入同步队列后,会把PROPAGATE设置为SIGNAL,这样前驱节点释放锁时会再次doReleaseShared,这时候它的状态已经是SIGNAL了,就可以唤醒后续节点了。(补充下,想一下如果不考虑,没有后继结点的时候直接讲ws置为signal,那么每次doReleaseShared执行的之后就直接unparkSuccessor唤醒后继结点那么就没意义,因为没有后继结点。所以在没有后继节点的时候ws = 0,那么就先ws置为PROPAGATE,反正后继结点加入的时候shouldParkAfterFailedAcquire会将前面的结点的ws置为signal)

    • 举例说明:例如读写锁,写读操作和写写操作互斥,读读之间不互斥;当调用acquireShared获取读锁时,会检查后续节点是否是获取读锁,如果是,则同样释放;

总结

  • 看完整篇博文之后各位应该就对aqs实现有一定了解了,第一次看的时候获取会很懵逼,但其实多看几遍慢慢就懂了,看的时候注意线程在什么时候阻塞,以及被唤醒后从哪里开始执行
  • 第二个就是注意共享模式和独占模式的区别。另外建议各位看下基于aqs框架实现的锁的tryAcquire方法以及tryRelease方法,比如reentrantLock。源码比较简单就是抢到资源该怎么处理,释放资源该怎么处理。

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Compilers

Compilers

Alfred V. Aho、Monica S. Lam、Ravi Sethi、Jeffrey D. Ullman / Addison Wesley / 2006-9-10 / USD 186.80

This book provides the foundation for understanding the theory and pracitce of compilers. Revised and updated, it reflects the current state of compilation. Every chapter has been completely revised ......一起来看看 《Compilers》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具