内容简介:redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.javaredisson-3.8.1-sources.jar!/org/redisson/RedissonLock.javaredisson-3.8.1-sources.jar!/org/redisson/pubsub/LockPubSub.java
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.8.1</version> </dependency> 复制代码
实例
@Test public void testDistributedLock(){ Config config = new Config(); // config.setTransportMode(TransportMode.EPOLL); config.useSingleServer() .setAddress("redis://192.168.99.100:6379"); RedissonClient redisson = Redisson.create(config); IntStream.rangeClosed(1,5) .parallel() .forEach(i -> { executeLock(redisson); }); executeLock(redisson); } public void executeLock(RedissonClient redisson){ RLock lock = redisson.getLock("myLock"); boolean locked = false; try{ LOGGER.info("try lock"); locked = lock.tryLock(); // locked = lock.tryLock(1,2,TimeUnit.MINUTES); LOGGER.info("get lock result:{}",locked); if(locked){ TimeUnit.HOURS.sleep(1); LOGGER.info("get lock and finish"); } }catch (Exception e){ e.printStackTrace(); }finally { LOGGER.info("enter unlock"); if(locked){ lock.unlock(); } } } 复制代码
源码解析
RedissonLock.tryLock
redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java
@Override public boolean tryLock() { return get(tryLockAsync()); } @Override public RFuture<Boolean> tryLockAsync() { return tryLockAsync(Thread.currentThread().getId()); } @Override public RFuture<Boolean> tryLockAsync(long threadId) { return tryAcquireOnceAsync(-1, null, threadId); } private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { if (!future.isSuccess()) { return; } Boolean ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } protected String getLockName(long threadId) { return id + ":" + threadId; } private void scheduleExpirationRenewal(final long threadId) { if (expirationRenewalMap.containsKey(getEntryName())) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { RFuture<Boolean> future = renewExpirationAsync(threadId); future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (!future.isSuccess()) { log.error("Can't update lock " + getName() + " expiration", future.cause()); return; } if (future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); } } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) { task.cancel(); } } protected RFuture<Boolean> renewExpirationAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } 复制代码
- 这里leaseTime没有设置的话,默认是-1,使用的是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),默认为30秒
- tryLockInnerAsync使用的是一段 lua 脚本,该脚本有3个参数,第一个参数为KEYS数组,后面几个参数为ARGV数组的元素
- 这里key的值为调用方指定的这个redissonLock的名称,两个变量,第一个为leaseTime,第二个为锁的名称,使用redissonLock的id+线程id
- lua脚本第一个方法判断redissonLock的hashmap是否存在,如果不存在则创建,该hashmap有一个entry的key为锁名称,valude为1,之后设置该hashmap失效时间为leaseTime
- lua脚本第二个方法是在redissonLock的hashmap存在的情况下,将该锁名的value增1,同时设置失效时间为leaseTime
- 最后返回该redissonLock名称的key的ttl
- 执行成功之后判断ttl是否还有值,有的话则调用scheduleExpirationRenewal,防止lock未执行完就失效
- scheduleExpirationRenewal是注册一个延时任务,在internalLockLeaseTime / 3的时候触发,执行的方法是renewExpirationAsync,将该锁失效时间重置回internalLockLeaseTime
- scheduleExpirationRenewal里头给scheduleExpirationRenewal任务增加listener,如果设置成功之后还会再次递归调用scheduleExpirationRenewal重新注册延时任务
- tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)方法是指定自动解锁时间时调用的方法,它与tryAcquireOnceAsync的区别在于,它对ttl的方回值采用long值来判断,如果是null,才执行延长失效时间的定时任务,而tryAcquireOnceAsync方法采用的是BooleanNullReplayConvertor,只要返回不是null,则返回true
RedissonLock.unlock
redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java
@Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException)e.getCause(); } else { throw e; } } // Future<Void> future = unlockAsync(); // future.awaitUninterruptibly(); // if (future.isSuccess()) { // return; // } // if (future.cause() instanceof IllegalMonitorStateException) { // throw (IllegalMonitorStateException)future.cause(); // } // throw commandExecutor.convertException(future); } @Override public RFuture<Void> unlockAsync(final long threadId) { final RPromise<Void> result = new RedissonPromise<Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { if (!future.isSuccess()) { cancelExpirationRenewal(threadId); result.tryFailure(future.cause()); return; } Boolean opStatus = future.getNow(); if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } if (opStatus) { cancelExpirationRenewal(null); } result.trySuccess(null); } }); return result; } protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); } String getChannelName() { return prefixName("redisson_lock__channel", getName()); } void cancelExpirationRenewal(Long threadId) { ExpirationEntry task = expirationRenewalMap.get(getEntryName()); if (task != null && (threadId == null || task.getThreadId() == threadId)) { expirationRenewalMap.remove(getEntryName()); task.getTimeout().cancel(); } } 复制代码
- unlockInnerAsync通过lua脚本来释放锁,该lua使用两个key,一个是redissonLock名称,一个是channelName
- 该lua使用的变量有三个,一个是pubSub的unlockMessage,默认为0,一个是internalLockLeaseTime,默认为commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),一个是锁名称
- 如果该redissonLock不存在,则直接发布unlock消息返回1;如果该锁不存在则返回nil;
- 如果该锁存在则将其计数-1,如果counter大于0,则重置下失效时间,返回0;如果counter不大于0,则删除该redissonLock锁,发布unlockMessage,返回1;如果上面条件都没有命中返回nil
- unlockAsync里头对unlockInnerAsync注册了FutureListener,主要是调用cancelExpirationRenewal,取消掉scheduleExpirationRenewal任务
LockPubSub
redisson-3.8.1-sources.jar!/org/redisson/pubsub/LockPubSub.java
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> { public static final Long unlockMessage = 0L; @Override protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) { return new RedissonLockEntry(newPromise); } @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(unlockMessage)) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } value.getLatch().release(); } } } 复制代码
- 接收到unlockMessage的时候,会调用RedissonLockEntry的listener,然后触发latch的release
- tryAcquireOnceAsync这个方法默认没有创建LockPubSub,而且没有指定自动解锁时间,则定时任务会一直延长失效时间,这个可能存在锁一直没释放的风险
小结
加锁有如下注意事项:
- 加锁需要设置超时时间,防止出现死锁
- 加锁以及设置超时时间的时候,需要保证两个操作的原子性,因而最好使用lua脚本或者使用支持NX以及EX的set方法
- 加锁的时候需要把加锁的调用方信息,比如线程id给记录下来,这个在解锁的时候需要使用
- 对于加锁时长不确定的任务,为防止任务未执行完导致超时被释放,需要对尚未运行完的任务延长失效时间
解锁有如下注意事项:
判断key是否存在,存在的话删除key等
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。