CountDownLatch的await和countDown方法简单分析

栏目: Java · 发布时间: 5年前

内容简介:调用sync.acquireSharedInterruptiblysync.acquireSharedInterruptibly调用tryAcquireShared方法返回<0执行doAcquireSharedInterruptibly

await

调用sync.acquireSharedInterruptibly

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

sync.acquireSharedInterruptibly

调用tryAcquireShared方法返回<0执行doAcquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared

尝试获取共享锁,获取成功返回1,否则-1

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //如果前一个node为队头,则通过tryAcquireShared尝试获取共享锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                //获取到锁执行
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        //产生异常执行
        if (failed)
            cancelAcquire(node);
    }
}

addWaiter

调用addWaiter方法把队尾设置为当前node;如果队尾为空或者设置失败则调用enq方法

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

enq

调用enq方法队尾为空则创建空的队尾和队头,否则重新设置队尾为当前node,设置成功返回。enq和addWaiter方法不同在于enq循环执行一定会执行成功,不存在失败情况

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

predecessor

调用predecessor方法获取前一个node

final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}

static final int CANCELLED = 1; //取消 
static final int SIGNAL = -1; //下个节点需要被唤醒 
static final int CONDITION = -2; //线程在等待条件触发
static final int PROPAGATE = -3; //(共享锁)状态需要向后传播

shouldParkAfterFailedAcquire

获取当前node的前一个note的线程等待状态,如果为SIGNAL,那么返回true,大于0通过循环将当前节点之前所有取消状态的节点移出队列;其他状时,利用compareAndSetWaitStatus使前节点的状态为-1;如果是第一次await时ws状态是0,多次await时ws状态是0,最后肯定返回true

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt

调用park并返回线程是否已经中断

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

park

调用UNSAFE.park阻塞当前线程

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

setBlocker

在当前线程t的parkBlockerOffset位置设置blocker的引用

private static void setBlocker(Thread t, Object arg) {
    // Even though volatile, hotspot doesn't need a write barrier here.
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

UNSAFE.park

/**
 * 阻塞一个线程直到<a href="#unpark"><code>unpark</code></a>出现、线程
 * 被中断或者timeout时间到期。如果一个<code>unpark</code>调用已经出现了,
 * 这里只计数。timeout为0表示永不过期.当<code>isAbsolute</code>为true时,
 * timeout是相对于新纪元之后的毫秒。否则这个值就是超时前的纳秒数。这个方法执行时
 * 也可能不合理地返回(没有具体原因)
 * 
 * @param isAbsolute true if the timeout is specified in milliseconds from
 *                   the epoch.
 *                   如果为true timeout的值是一个相对于新纪元之后的毫秒数
 * @param time either the number of nanoseconds to wait, or a time in
 *             milliseconds from the epoch to wait for.
 *             可以是一个要等待的纳秒数,或者是一个相对于新纪元之后的毫秒数直到
 *             到达这个时间点
 */
UNSAFE.park(false, 0L);

countDown

调用sync.releaseShared

public void countDown() {
    sync.releaseShared(1);
}

releaseShared

执行tryReleaseShared成功后执行doReleaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared

更新state值为state-1,如果state新值为0返回true,否则false

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

doReleaseShared

只要等待队列有数据,获取队头等待状态,队头状态=-1其他node为等待时,则把队头等待状态置为初始,且调用unparkSuccessor方法;队头状态=0时,把队头状态置为-3传播到下一node

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

unparkSuccessor

上面调用unparkSuccessor时,node的状态已经更改为0,且node.next存在,执行unpark方法

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

unpark

unpark执行完之后是如何更改head的?

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

UNSAFE.unpark

/**
 * Releases the block on a thread created by 
 * <a href="#park"><code>park</code></a>.  This method can also be used
 * to terminate a blockage caused by a prior call to <code>park</code>.
 * This operation is unsafe, as the thread must be guaranteed to be
 * live.  This is true of Java, but not native code.
 * 释放被<a href="#park"><code>park</code></a>创建的在一个线程上的阻塞.这个
 * 方法也可以被使用来终止一个先前调用<code>park</code>导致的阻塞.
 * 这个操作操作时不安全的,因此线程必须保证是活的.这是 java 代码不是native代码。
 * @param thread the thread to unblock.
 *           要解除阻塞的线程
 */
UNSAFE.unpark(thread);

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Natural Language Processing with Python

Natural Language Processing with Python

Steven Bird、Ewan Klein、Edward Loper / O'Reilly Media / 2009-7-10 / USD 44.99

This book offers a highly accessible introduction to Natural Language Processing, the field that underpins a variety of language technologies, ranging from predictive text and email filtering to autom......一起来看看 《Natural Language Processing with Python》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具