内容简介:本文主要研究一下Elasticsearch的ReleasableLockelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.javaelasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java
序
本文主要研究一下Elasticsearch的ReleasableLock
ReleasableLock
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java
public class ReleasableLock implements Releasable { private final Lock lock; // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled private final ThreadLocal<Integer> holdingThreads; public ReleasableLock(Lock lock) { this.lock = lock; if (Assertions.ENABLED) { holdingThreads = new ThreadLocal<>(); } else { holdingThreads = null; } } @Override public void close() { lock.unlock(); assert removeCurrentThread(); } public ReleasableLock acquire() throws EngineException { lock.lock(); assert addCurrentThread(); return this; } private boolean addCurrentThread() { final Integer current = holdingThreads.get(); holdingThreads.set(current == null ? 1 : current + 1); return true; } private boolean removeCurrentThread() { final Integer count = holdingThreads.get(); assert count != null && count > 0; if (count == 1) { holdingThreads.remove(); } else { holdingThreads.set(count - 1); } return true; } public boolean isHeldByCurrentThread() { if (holdingThreads == null) { throw new UnsupportedOperationException("asserts must be enabled"); } final Integer count = holdingThreads.get(); return count != null && count > 0; } }
close方法
ReleasableLockTests
elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java
public class ReleasableLockTests extends ESTestCase { /** * Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant * lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but * not its second entrance. * * @throws BrokenBarrierException if awaiting on the synchronization barrier breaks * @throws InterruptedException if awaiting on the synchronization barrier is interrupted */ public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException { final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock()); final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock()); final int numberOfThreads = scaledRandomIntBetween(1, 32); final int iterations = scaledRandomIntBetween(1, 32); final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); final List<Thread> threads = new ArrayList<>(); for (int i = 0; i < numberOfThreads; i++) { final Thread thread = new Thread(() -> { try { barrier.await(); } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } for (int j = 0; j < iterations; j++) { if (randomBoolean()) { acquire(readLock, writeLock); } else { acquire(writeLock, readLock); } } try { barrier.await(); } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } }); threads.add(thread); thread.start(); } barrier.await(); barrier.await(); for (final Thread thread : threads) { thread.join(); } } private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) { try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) { assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) { assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); } // previously there was a bug here and this would return false assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); } assertFalse(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); } }
- ReleasableLockTests使用多线程随机执行acquire,该方法断言lockToAcquire被当前线程持有,而otherLock不被当前线程持有
Cache.CacheSegment
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/cache/Cache.java
private static class CacheSegment<K, V> { // read/write lock protecting mutations to the segment ReadWriteLock segmentLock = new ReentrantReadWriteLock(); ReleasableLock readLock = new ReleasableLock(segmentLock.readLock()); ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock()); Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>(); SegmentStats segmentStats = new SegmentStats(); /** * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is * pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback * * @param key the key of the entry to get from the cache * @param now the access time of this entry * @param isExpired test if the entry is expired * @param onExpiration a callback if the entry associated to the key is expired * @return the entry if there was one, otherwise null */ Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) { CompletableFuture<Entry<K, V>> future; try (ReleasableLock ignored = readLock.acquire()) { future = map.get(key); } if (future != null) { Entry<K, V> entry; try { entry = future.get(); } catch (ExecutionException e) { assert future.isCompletedExceptionally(); segmentStats.miss(); return null; } catch (InterruptedException e) { throw new IllegalStateException(e); } if (isExpired.test(entry)) { segmentStats.miss(); onExpiration.accept(entry); return null; } else { segmentStats.hit(); entry.accessTime = now; return entry; } } else { segmentStats.miss(); return null; } } /** * put an entry into the segment * * @param key the key of the entry to add to the cache * @param value the value of the entry to add to the cache * @param now the access time of this entry * @return a tuple of the new entry and the existing entry, if there was one otherwise null */ Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) { Entry<K, V> entry = new Entry<>(key, value, now); Entry<K, V> existing = null; try (ReleasableLock ignored = writeLock.acquire()) { try { CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry)); if (future != null) { existing = future.handle((ok, ex) -> { if (ok != null) { return ok; } else { return null; } }).get(); } } catch (ExecutionException | InterruptedException e) { throw new IllegalStateException(e); } } return Tuple.tuple(entry, existing); } /** * remove an entry from the segment * * @param key the key of the entry to remove from the cache * @param onRemoval a callback for the removed entry */ void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) { CompletableFuture<Entry<K, V>> future; try (ReleasableLock ignored = writeLock.acquire()) { future = map.remove(key); } if (future != null) { segmentStats.eviction(); onRemoval.accept(future); } } /** * remove an entry from the segment iff the future is done and the value is equal to the * expected value * * @param key the key of the entry to remove from the cache * @param value the value expected to be associated with the key * @param onRemoval a callback for the removed entry */ void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) { CompletableFuture<Entry<K, V>> future; boolean removed = false; try (ReleasableLock ignored = writeLock.acquire()) { future = map.get(key); try { if (future != null) { if (future.isDone()) { Entry<K, V> entry = future.get(); if (Objects.equals(value, entry.value)) { removed = map.remove(key, future); } } } } catch (ExecutionException | InterruptedException e) { throw new IllegalStateException(e); } } if (future != null && removed) { segmentStats.eviction(); onRemoval.accept(future); } } private static class SegmentStats { private final LongAdder hits = new LongAdder(); private final LongAdder misses = new LongAdder(); private final LongAdder evictions = new LongAdder(); void hit() { hits.increment(); } void miss() { misses.increment(); } void eviction() { evictions.increment(); } } }
-
CacheSegment使用ReentrantReadWriteLock的readLock及writeLock创建了两个ReleasableLock,一个为readLock,一个为writeLock;由于ReleasableLock实现了Releasable接口(
close方法
),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁
小结
close方法
ReleasableLock实现了Releasable接口( close方法
),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。