聊聊redisson的分布式锁

栏目: Lua · 发布时间: 6年前

内容简介:redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.javaredisson-3.8.1-sources.jar!/org/redisson/RedissonLock.javaredisson-3.8.1-sources.jar!/org/redisson/pubsub/LockPubSub.java
<dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.8.1</version>
		</dependency>
复制代码

实例

@Test
    public void testDistributedLock(){
        Config config = new Config();
//        config.setTransportMode(TransportMode.EPOLL);
        config.useSingleServer()
                .setAddress("redis://192.168.99.100:6379");
        RedissonClient redisson = Redisson.create(config);


        IntStream.rangeClosed(1,5)
                .parallel()
                .forEach(i -> {
                    executeLock(redisson);
                });

        executeLock(redisson);
    }

    public void executeLock(RedissonClient redisson){
        RLock lock = redisson.getLock("myLock");
        boolean locked = false;
        try{
            LOGGER.info("try lock");
            locked = lock.tryLock();
//            locked = lock.tryLock(1,2,TimeUnit.MINUTES);
            LOGGER.info("get lock result:{}",locked);
            if(locked){
                TimeUnit.HOURS.sleep(1);
                LOGGER.info("get lock and finish");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            LOGGER.info("enter unlock");
            if(locked){
                lock.unlock();
            }
        }
    }
复制代码

源码解析

RedissonLock.tryLock

redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java

@Override
    public boolean tryLock() {
        return get(tryLockAsync());
    }

    @Override
    public RFuture<Boolean> tryLockAsync() {
        return tryLockAsync(Thread.currentThread().getId());
    }

    @Override
    public RFuture<Boolean> tryLockAsync(long threadId) {
        return tryAcquireOnceAsync(-1, null, threadId);
    }

    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Boolean ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }

    private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }

        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
            task.cancel();
        }
    }

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
复制代码
  • 这里leaseTime没有设置的话,默认是-1,使用的是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),默认为30秒
  • tryLockInnerAsync使用的是一段 lua 脚本,该脚本有3个参数,第一个参数为KEYS数组,后面几个参数为ARGV数组的元素
  • 这里key的值为调用方指定的这个redissonLock的名称,两个变量,第一个为leaseTime,第二个为锁的名称,使用redissonLock的id+线程id
  • lua脚本第一个方法判断redissonLock的hashmap是否存在,如果不存在则创建,该hashmap有一个entry的key为锁名称,valude为1,之后设置该hashmap失效时间为leaseTime
  • lua脚本第二个方法是在redissonLock的hashmap存在的情况下,将该锁名的value增1,同时设置失效时间为leaseTime
  • 最后返回该redissonLock名称的key的ttl
  • 执行成功之后判断ttl是否还有值,有的话则调用scheduleExpirationRenewal,防止lock未执行完就失效
  • scheduleExpirationRenewal是注册一个延时任务,在internalLockLeaseTime / 3的时候触发,执行的方法是renewExpirationAsync,将该锁失效时间重置回internalLockLeaseTime
  • scheduleExpirationRenewal里头给scheduleExpirationRenewal任务增加listener,如果设置成功之后还会再次递归调用scheduleExpirationRenewal重新注册延时任务
  • tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)方法是指定自动解锁时间时调用的方法,它与tryAcquireOnceAsync的区别在于,它对ttl的方回值采用long值来判断,如果是null,才执行延长失效时间的定时任务,而tryAcquireOnceAsync方法采用的是BooleanNullReplayConvertor,只要返回不是null,则返回true

RedissonLock.unlock

redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java

@Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException)e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

    @Override
    public RFuture<Void> unlockAsync(final long threadId) {
        final RPromise<Void> result = new RedissonPromise<Void>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    cancelExpirationRenewal(threadId);
                    result.tryFailure(future.cause());
                    return;
                }

                Boolean opStatus = future.getNow();
                if (opStatus == null) {
                    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                    return;
                }
                if (opStatus) {
                    cancelExpirationRenewal(null);
                }
                result.trySuccess(null);
            }
        });

        return result;
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }

    String getChannelName() {
        return prefixName("redisson_lock__channel", getName());
    }

    void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = expirationRenewalMap.get(getEntryName());
        if (task != null && (threadId == null || task.getThreadId() == threadId)) {
            expirationRenewalMap.remove(getEntryName());
            task.getTimeout().cancel();
        }
    }
复制代码
  • unlockInnerAsync通过lua脚本来释放锁,该lua使用两个key,一个是redissonLock名称,一个是channelName
  • 该lua使用的变量有三个,一个是pubSub的unlockMessage,默认为0,一个是internalLockLeaseTime,默认为commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),一个是锁名称
  • 如果该redissonLock不存在,则直接发布unlock消息返回1;如果该锁不存在则返回nil;
  • 如果该锁存在则将其计数-1,如果counter大于0,则重置下失效时间,返回0;如果counter不大于0,则删除该redissonLock锁,发布unlockMessage,返回1;如果上面条件都没有命中返回nil
  • unlockAsync里头对unlockInnerAsync注册了FutureListener,主要是调用cancelExpirationRenewal,取消掉scheduleExpirationRenewal任务

LockPubSub

redisson-3.8.1-sources.jar!/org/redisson/pubsub/LockPubSub.java

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            value.getLatch().release();
        }
    }

}
复制代码
  • 接收到unlockMessage的时候,会调用RedissonLockEntry的listener,然后触发latch的release
  • tryAcquireOnceAsync这个方法默认没有创建LockPubSub,而且没有指定自动解锁时间,则定时任务会一直延长失效时间,这个可能存在锁一直没释放的风险

小结

加锁有如下注意事项:

  • 加锁需要设置超时时间,防止出现死锁
  • 加锁以及设置超时时间的时候,需要保证两个操作的原子性,因而最好使用lua脚本或者使用支持NX以及EX的set方法
  • 加锁的时候需要把加锁的调用方信息,比如线程id给记录下来,这个在解锁的时候需要使用
  • 对于加锁时长不确定的任务,为防止任务未执行完导致超时被释放,需要对尚未运行完的任务延长失效时间

解锁有如下注意事项:

判断key是否存在,存在的话删除key等

doc


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Release It!

Release It!

Michael T. Nygard / Pragmatic Bookshelf / 2007-03-30 / USD 34.95

“Feature complete” is not the same as “production ready.” Whether it’s in Java, .NET, or Ruby on Rails, getting your application ready to ship is only half the battle. Did you design your system to......一起来看看 《Release It!》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具