内容简介:ReentrantLock是排他锁,下一篇会进行介绍,排他锁在同一时刻仅有一个线程可以进行访问, 实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何读-读、读-写、写-写操作都不能同时进行, 在一定程度上降低了吞吐量。然而并发读之间不存在数据竞争问题,如果在读多写少场景下,允许并发读同时进行,会进一步提升性能。因此引入了ReentrantReadWriteLock, 顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下
一、简介
ReentrantLock是排他锁,下一篇会进行介绍,排他锁在同一时刻仅有一个线程可以进行访问, 实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何读-读、读-写、写-写操作都不能同时进行, 在一定程度上降低了吞吐量。然而并发读之间不存在数据竞争问题,如果在读多写少场景下,允许并发读同时进行,会进一步提升性能。因此引入了ReentrantReadWriteLock, 顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下面称它为读写锁。 读写锁内部又分为读锁和写锁,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。读锁和写锁分离从而提升程序性能, 读写锁主要应用于读多写少的场景。先知道ReentrantReadWriteLock的这几点注意事项①支持公平和非公平(如果使用无参的构造函数)的获取锁②读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占 ③条件变量(Condition)只能在写锁使用,读锁不允许获取条件变量,否则将得到④如果一个线程UnsupportedOperationException异常⑤支持可重入,如已获取读锁的线程,能够再次获取读锁⑥如已获取写锁的线程,之后能够再次获取写锁,同时也可以再获取读锁。⑦ReentrantReadWriteLock可能会导致获取写锁的线程饥饿,可以看下我的另一篇StampedLock的分析。StampedLock可以解决获取写锁的线程饥饿。ReentrantReadWriteLock内部类有读锁(ReadLock)和写锁(WriteLock),内部都是调用Sync类进行读写锁的获取和释放,Sync有两个子类FairSync和NonFairSync,下面会进行详细的介绍。
二、类关系
//读写锁的抽象类 public interface ReadWriteLock { //获取读锁抽象方法 Lock readLock(); //获取写锁抽象方法 Lock writeLock(); }复制代码
三、属性
//读锁 private final ReentrantReadWriteLock.ReadLock readerLock; //写锁 private final ReentrantReadWriteLock.WriteLock writerLock; //AQS的实现类,在下面内部类中会进行介绍,AQS不清楚的,可以看我的AQS源代码分析https://juejin.im/post/5d0b3b55f265da1bc23f7dca final Sync sync; //UnSafe不清楚的可以看下我的分享https://juejin.im/user/5bd8718051882528382d8728/shares private static final sun.misc.Unsafe UNSAFE; //线程Thread的属性tid的相对偏移量,定义成静态是因为tid相对每个Thread实例的相对偏移量都是一样 private static final long TID_OFFSET; static { try { //获取UnSafe实例,如果也想在自己的代码中使用UnSafe,也可以看下我的分享,里面有介绍到如何获取到UnSafe实例 UNSAFE = sun.misc.Unsafe.getUnsafe(); //获取Thread类的Class Class<?> tk = Thread.class; //使用UnSafe获取Thread类的属性tid相对偏移量 TID_OFFSET = UNSAFE.objectFieldOffset (tk.getDeclaredField("tid")); } catch (Exception e) { //抛出错误 throw new Error(e); } }复制代码
四、构造函数
//无参构造函数,创建非公平锁 public ReentrantReadWriteLock() { //调用有参的构造函数 this(false); } /** * 传入fair参数创建ReentrantReadWriteLock实例 * * @param fair true创建公平锁,false创建非公平锁 */ public ReentrantReadWriteLock(boolean fair) { //true创建公平锁,false创建非公平锁 sync = fair ? new FairSync() : new NonfairSync(); //创建读锁,ReadLock会在下面内部类中进行介绍 readerLock = new ReadLock(this); //创建写锁,WriteLock会在下面内部类中进行介绍 writerLock = new WriteLock(this); }复制代码
五、内部类
-
sync内部类
//ReadLock和WriteLock的获取和释放都是使用Sync进行获取和释放,AQS的属性state即表示锁的状态 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; //分别存放读锁和写锁被获取的位数,即AQS的属性state(类型int)高16位存放读锁被获取个数,低16位存放写锁被获取个数 static final int SHARED_SHIFT = 16; //由于AQS属性state的高16位存放读锁被获取的个数,为此每次加读锁,增加的锁个数最小单元 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //读锁和写锁的锁被获取个数都不能大于的最大值 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //低16位存放写锁个数的标记位,在下面exclusiveCount方法使用到,和锁状态,即AQS中的属性state进行&操作,获取写锁的个数 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //获取读锁被获取的个数,传入AQS的属性state值,即ReentrantReadWriteLock的锁状态,获取读锁被获取的个数,由于AQS属性state,即锁的状态,高16位才是存放读锁被获取的个数,为此要获取到读锁被获取的个数需将其state做右移16位,才获取到读锁被获取的个数 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //获取写锁被获取的个数,传入AQS的属性state值,即ReentrantReadWriteLock的锁状态,由于AQS属性state,即锁的状态,低16位才是存放写锁被获取的个数,为此要获取到写锁被获取的个数需将其&上EXCLUSIVE_MASK即低16位的标识值,才获取到写锁被获取的个数 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //记录线程获取读锁的次数count,和获取读锁的当前线程tid static final class HoldCounter { //获取读锁的次数 int count = 0; //存放获取读锁的当前线程tid final long tid = getThreadId(Thread.currentThread()); } //ThreadLocal实现的类,只是重写了ThreadLocal的initialValue方法,在调用get方法时,返回默认值, static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { //重写ThreadLocal的initialValue方法 public HoldCounter initialValue() { //返回当前线程对应的HoldCounter实例 return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; //用来缓存最近一次获取读锁的线程的HoldCounter对象 private transient HoldCounter cachedHoldCounter; //表示第一个获取读锁的线程 private transient Thread firstReader = null; //表示第一个获取读锁的线程获取 private transient int firstReaderHoldCount; //Sync构造函数,在子类FairSync或者NonFairSync被初始化时,会调用父类Sync的构造函数 Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); } //在获取读锁时,当前线程是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到 abstract boolean readerShouldBlock(); //在获取写锁时,当前线程是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到 abstract boolean writerShouldBlock(); //根据传入进来的要释放写锁被获取的个数尝试释放写锁 protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } private IllegalMonitorStateException unmatchedUnlockException() { return new IllegalMonitorStateException( "attempt to unlock read lock, not locked by current thread"); } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; } final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } } protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } final Thread getOwner() { return ((exclusiveCount(getState()) == 0) ? null : getExclusiveOwnerThread()); } final int getReadLockCount() { return sharedCount(getState()); } final boolean isWriteLocked() { return exclusiveCount(getState()) != 0; } final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0; } final int getReadHoldCount() { if (getReadLockCount() == 0) return 0; Thread current = Thread.currentThread(); if (firstReader == current) return firstReaderHoldCount; HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == getThreadId(current)) return rh.count; int count = readHolds.get().count; if (count == 0) readHolds.remove(); return count; } private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); readHolds = new ThreadLocalHoldCounter(); setState(0); // reset to unlocked state } final int getCount() { return getState(); } }复制代码
-
NonFairSync内部类
static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } }复制代码
-
FairSync内部类
static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }复制代码
-
ReadLock内部类
public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquireShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryLock() { return sync.tryReadLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.releaseShared(1); } public Condition newCondition() { throw new UnsupportedOperationException(); } public String toString() { int r = sync.getReadLockCount(); return super.toString() + "[Read locks = " + r + "]"; } }复制代码
-
WriteLock内部类
public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock( ) { return sync.tryWriteLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public int getHoldCount() { return sync.getWriteHoldCount(); } }复制代码
六、独占锁
-
独占锁的获取
复制代码
-
独占锁的释放
复制代码
七、共享锁
-
共享锁的获取
复制代码
-
共享锁的释放
复制代码
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。