ReentrantReadWriteLock源码分析

栏目: 编程工具 · 发布时间: 5年前

内容简介:ReentrantLock是排他锁,下一篇会进行介绍,排他锁在同一时刻仅有一个线程可以进行访问, 实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何读-读、读-写、写-写操作都不能同时进行, 在一定程度上降低了吞吐量。然而并发读之间不存在数据竞争问题,如果在读多写少场景下,允许并发读同时进行,会进一步提升性能。因此引入了ReentrantReadWriteLock, 顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下

一、简介

ReentrantLock是排他锁,下一篇会进行介绍,排他锁在同一时刻仅有一个线程可以进行访问, 实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何读-读、读-写、写-写操作都不能同时进行, 在一定程度上降低了吞吐量。然而并发读之间不存在数据竞争问题,如果在读多写少场景下,允许并发读同时进行,会进一步提升性能。因此引入了ReentrantReadWriteLock, 顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下面称它为读写锁。 读写锁内部又分为读锁和写锁,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。读锁和写锁分离从而提升程序性能, 读写锁主要应用于读多写少的场景。先知道ReentrantReadWriteLock的这几点注意事项①支持公平和非公平(如果使用无参的构造函数)的获取锁②读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占 ③条件变量(Condition)只能在写锁使用,读锁不允许获取条件变量,否则将得到④如果一个线程UnsupportedOperationException异常⑤支持可重入,如已获取读锁的线程,能够再次获取读锁⑥如已获取写锁的线程,之后能够再次获取写锁,同时也可以再获取读锁。⑦ReentrantReadWriteLock可能会导致获取写锁的线程饥饿,可以看下我的另一篇StampedLock的分析。StampedLock可以解决获取写锁的线程饥饿。ReentrantReadWriteLock内部类有读锁(ReadLock)和写锁(WriteLock),内部都是调用Sync类进行读写锁的获取和释放,Sync有两个子类FairSync和NonFairSync,下面会进行详细的介绍。

二、类关系

//读写锁的抽象类
public interface ReadWriteLock {
    //获取读锁抽象方法
    Lock readLock();
    //获取写锁抽象方法
    Lock writeLock();
}复制代码

三、属性

//读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
//写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
//AQS的实现类,在下面内部类中会进行介绍,AQS不清楚的,可以看我的AQS源代码分析https://juejin.im/post/5d0b3b55f265da1bc23f7dca
final Sync sync;
//UnSafe不清楚的可以看下我的分享https://juejin.im/user/5bd8718051882528382d8728/shares
private static final sun.misc.Unsafe UNSAFE;
//线程Thread的属性tid的相对偏移量,定义成静态是因为tid相对每个Thread实例的相对偏移量都是一样
private static final long TID_OFFSET;
static {
    try {
        //获取UnSafe实例,如果也想在自己的代码中使用UnSafe,也可以看下我的分享,里面有介绍到如何获取到UnSafe实例
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        //获取Thread类的Class
        Class<?> tk = Thread.class;
        //使用UnSafe获取Thread类的属性tid相对偏移量 
        TID_OFFSET = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("tid"));
    } catch (Exception e) {
        //抛出错误
        throw new Error(e);
    }
}复制代码

四、构造函数

//无参构造函数,创建非公平锁
public ReentrantReadWriteLock() {
    //调用有参的构造函数 
    this(false);
}

/**
 * 传入fair参数创建ReentrantReadWriteLock实例
 *
 * @param fair true创建公平锁,false创建非公平锁
 */
public ReentrantReadWriteLock(boolean fair) {
    //true创建公平锁,false创建非公平锁
    sync = fair ? new FairSync() : new NonfairSync();
    //创建读锁,ReadLock会在下面内部类中进行介绍 
    readerLock = new ReadLock(this);
    //创建写锁,WriteLock会在下面内部类中进行介绍
    writerLock = new WriteLock(this);
}复制代码

五、内部类

  1. sync内部类
    //ReadLock和WriteLock的获取和释放都是使用Sync进行获取和释放,AQS的属性state即表示锁的状态 
    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 6317671515068378041L;
            //分别存放读锁和写锁被获取的位数,即AQS的属性state(类型int)高16位存放读锁被获取个数,低16位存放写锁被获取个数
            static final int SHARED_SHIFT   = 16;
            //由于AQS属性state的高16位存放读锁被获取的个数,为此每次加读锁,增加的锁个数最小单元
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            //读锁和写锁的锁被获取个数都不能大于的最大值
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            //低16位存放写锁个数的标记位,在下面exclusiveCount方法使用到,和锁状态,即AQS中的属性state进行&操作,获取写锁的个数
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
            
            //获取读锁被获取的个数,传入AQS的属性state值,即ReentrantReadWriteLock的锁状态,获取读锁被获取的个数,由于AQS属性state,即锁的状态,高16位才是存放读锁被获取的个数,为此要获取到读锁被获取的个数需将其state做右移16位,才获取到读锁被获取的个数  
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            
            //获取写锁被获取的个数,传入AQS的属性state值,即ReentrantReadWriteLock的锁状态,由于AQS属性state,即锁的状态,低16位才是存放写锁被获取的个数,为此要获取到写锁被获取的个数需将其&上EXCLUSIVE_MASK即低16位的标识值,才获取到写锁被获取的个数  
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
            
            //记录线程获取读锁的次数count,和获取读锁的当前线程tid
            static final class HoldCounter {
                //获取读锁的次数 
                int count = 0;
                //存放获取读锁的当前线程tid
                final long tid = getThreadId(Thread.currentThread());
            }
             
            //ThreadLocal实现的类,只是重写了ThreadLocal的initialValue方法,在调用get方法时,返回默认值,
            static final class ThreadLocalHoldCounter
                extends ThreadLocal<HoldCounter> {
                //重写ThreadLocal的initialValue方法
                public HoldCounter initialValue() {
                    //返回当前线程对应的HoldCounter实例
                    return new HoldCounter();
                }
            }
            
             
            private transient ThreadLocalHoldCounter readHolds;
            
            //用来缓存最近一次获取读锁的线程的HoldCounter对象
            private transient HoldCounter cachedHoldCounter;
            
            //表示第一个获取读锁的线程
            private transient Thread firstReader = null;
            //表示第一个获取读锁的线程获取
            private transient int firstReaderHoldCount;
            
            //Sync构造函数,在子类FairSync或者NonFairSync被初始化时,会调用父类Sync的构造函数
            Sync() {
                readHolds = new ThreadLocalHoldCounter();
                setState(getState()); 
            }
             
            //在获取读锁时,当前线程是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到
            abstract boolean readerShouldBlock();
            
            //在获取写锁时,当前线程是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到
            abstract boolean writerShouldBlock();
            
            //根据传入进来的要释放写锁被获取的个数尝试释放写锁
            protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }
    
            protected final boolean tryAcquire(int acquires) {
                
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    
            protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    
            private IllegalMonitorStateException unmatchedUnlockException() {
                return new IllegalMonitorStateException(
                    "attempt to unlock read lock, not locked by current thread");
            }
    
            protected final int tryAcquireShared(int unused) {
                
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    
            
            final int fullTryAcquireShared(Thread current) {
               
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0) {
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                        
                    } else if (readerShouldBlock()) {
                        if (firstReader == current) {
                            
                        } else {
                            if (rh == null) {
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                                return -1;
                        }
                    }
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }
    
            final boolean tryWriteLock() {
                Thread current = Thread.currentThread();
                int c = getState();
                if (c != 0) {
                    int w = exclusiveCount(c);
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                }
                if (!compareAndSetState(c, c + 1))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    
            final boolean tryReadLock() {
                Thread current = Thread.currentThread();
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0 &&
                        getExclusiveOwnerThread() != current)
                        return false;
                    int r = sharedCount(c);
                    if (r == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (r == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            HoldCounter rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                cachedHoldCounter = rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                        }
                        return true;
                    }
                }
            }
    
            protected final boolean isHeldExclusively() {
               
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            final Thread getOwner() {
               
                return ((exclusiveCount(getState()) == 0) ?
                        null :
                        getExclusiveOwnerThread());
            }
    
            final int getReadLockCount() {
                return sharedCount(getState());
            }
    
            final boolean isWriteLocked() {
                return exclusiveCount(getState()) != 0;
            }
    
            final int getWriteHoldCount() {
                return isHeldExclusively() ? exclusiveCount(getState()) : 0;
            }
    
            final int getReadHoldCount() {
                if (getReadLockCount() == 0)
                    return 0;
    
                Thread current = Thread.currentThread();
                if (firstReader == current)
                    return firstReaderHoldCount;
    
                HoldCounter rh = cachedHoldCounter;
                if (rh != null && rh.tid == getThreadId(current))
                    return rh.count;
    
                int count = readHolds.get().count;
                if (count == 0) readHolds.remove();
                return count;
            }
    
            private void readObject(java.io.ObjectInputStream s)
                throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                readHolds = new ThreadLocalHoldCounter();
                setState(0); // reset to unlocked state
            }
    
            final int getCount() { return getState(); }
        }复制代码
  2. NonFairSync内部类
    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -8159625535654395037L;
            final boolean writerShouldBlock() {
                return false; 
            }
            final boolean readerShouldBlock() {
                
                return apparentlyFirstQueuedIsExclusive();
            }
    }复制代码
  3. FairSync内部类
    static final class FairSync extends Sync {
            private static final long serialVersionUID = -2274990926593161451L;
            final boolean writerShouldBlock() {
                return hasQueuedPredecessors();
            }
            final boolean readerShouldBlock() {
                return hasQueuedPredecessors();
            }
    }复制代码
  4. ReadLock内部类
    public static class ReadLock implements Lock, java.io.Serializable {
            private static final long serialVersionUID = -5992448646407690164L;
            private final Sync sync;
    
            protected ReadLock(ReentrantReadWriteLock lock) {
                sync = lock.sync;
            }
    
            public void lock() {
                sync.acquireShared(1);
            }
    
            public void lockInterruptibly() throws InterruptedException {
                sync.acquireSharedInterruptibly(1);
            }
    
            public boolean tryLock() {
                return sync.tryReadLock();
            }
    
            public boolean tryLock(long timeout, TimeUnit unit)
                    throws InterruptedException {
                return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
            }
    
            public void unlock() {
                sync.releaseShared(1);
            }
    
            public Condition newCondition() {
                throw new UnsupportedOperationException();
            }
    
            public String toString() {
                int r = sync.getReadLockCount();
                return super.toString() +
                    "[Read locks = " + r + "]";
            }
    }复制代码
  5. WriteLock内部类
    public static class WriteLock implements Lock, java.io.Serializable {
            private static final long serialVersionUID = -4992448646407690164L;
            private final Sync sync;
    
            protected WriteLock(ReentrantReadWriteLock lock) {
                sync = lock.sync;
            }
    
            public void lock() {
                sync.acquire(1);
            }
    
            public void lockInterruptibly() throws InterruptedException {
                sync.acquireInterruptibly(1);
            }
    
            public boolean tryLock( ) {
                return sync.tryWriteLock();
            }
    
            public boolean tryLock(long timeout, TimeUnit unit)
                    throws InterruptedException {
                return sync.tryAcquireNanos(1, unit.toNanos(timeout));
            }
    
            public void unlock() {
                sync.release(1);
            }
    
            public Condition newCondition() {
                return sync.newCondition();
            }
    
            public String toString() {
                Thread o = sync.getOwner();
                return super.toString() + ((o == null) ?
                                           "[Unlocked]" :
                                           "[Locked by thread " + o.getName() + "]");
            }
    
            public boolean isHeldByCurrentThread() {
                return sync.isHeldExclusively();
            }
    
            public int getHoldCount() {
                return sync.getWriteHoldCount();
            }
    }复制代码

六、独占锁

  1. 独占锁的获取
    复制代码
  2. 独占锁的释放
    复制代码

七、共享锁

  1. 共享锁的获取
    复制代码
  2. 共享锁的释放
    复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

支付战争

支付战争

埃里克•杰克逊 / 徐彬、王晓、清华大学五道口金融学院未央研究 审译 / 中信出版社 / 2015-5-19 / 49.00

这是一个野心勃勃的创业计划,在线支付鼻祖PayPal试图创造一个“统治世界”的金融操作系统,并在全球成功推广一款颠覆式的互联网产品。 《支付战争》的作者是“PayPal黑帮”成员之一,他真实还原了这个伟大产品是如何诞生的,以及在后来的发展壮大之路上,如何应对融资紧张、突破增长瓶颈,在竞争者凶猛围剿与平台商霸王条款的夹击下,逆境求生,改变业务模式,最终完成IPO,并成功出售给竞争对手eBay的......一起来看看 《支付战争》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具