每日一博 | Redission 分布式锁源码解析

栏目: 服务器 · 发布时间: 6年前

每日一博 | Redission 分布式锁源码解析

Redission分布式锁源码解析

  JackY-Ji 发布于 前天 17:38

字数 1267

阅读 185

收藏 10

点赞 0

评论 0

Redis Lua Java

【活动】决战应用运维 性能之巅 赢华为荣耀V10 >>> 每日一博 | Redission 分布式锁源码解析

Redission锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等。加上Redission中 Redis 具备分布式的特性,所以非常适合用来做 Java 中的分布式锁。 下面我们对其加锁、解锁过程中的源码细节进行一一分析。

锁的接口定义了一下方法:

每日一博 | Redission 分布式锁源码解析

分布式锁当中加锁,我们常用的加锁接口:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

下面我们来看一下方法的具体实现:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

首先我们看到调用tryAcquire尝试获取锁,在这里是否能获取到锁,是根据锁名称的过期时间TTL来判定的(TTL<=0:则说明该锁不存在或者已经超时,此时获取锁成功。TTL>0:则说明该锁被其他现成持有,此时获取锁失败);

下面我们接着看一下tryAcquire的实现:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

可以看到真正获取锁的操作经过一层get操作里面执行的,这里为何要这么操作,本人也不是太理解,如有理解错误,欢迎指正。

get 是由CommandAsyncExecutor(一个线程Executor)封装的一个Executor

设置一个单线程的同步控制器CountDownLatch,用于控制单个线程的中断信息。个人理解经过中间的这么一步:主要是为了支持线程可中断操作。

public <V> V get(RFuture<V> future) {
    if (!future.isDone()) {
        final CountDownLatch l = new CountDownLatch(1);
        future.addListener(new FutureListener<V>() {
            @Override
            public void operationComplete(Future<V> future) throws Exception {
                l.countDown();
            }
        });
        
        boolean interrupted = false;
        while (!future.isDone()) {
            try {
                l.await();
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
        
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    // commented out due to blocking issues up to 200 ms per minute for each thread:由于每个线程的阻塞问题,每分钟高达200毫秒
    // future.awaitUninterruptibly();
    if (future.isSuccess()) {
        return future.getNow();
    }

    throw convertException(future);
}

我们进一步往下看:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

首先判断锁是否有超时时间,有过期时间的话,会在后面获取锁的时候设置进去。没有过期时间的话,则会用默认的

private long lockWatchdogTimeout = 30 * 1000;

下面我们在进一步往下分析真正获取锁的操作:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

我把里面的重点信息做了以下三点总结:

1:真正执行的是一段具有原子性的 Lua 脚本,并且最终也是由CommandAsynExecutor去执行。

2:锁真正持久化到Redis时,用的hash类型key field value

3:获取锁的三个参数:getName()是逻辑锁名称,例如:分布式锁要锁住的methodName+params;internalLockLeaseTime是毫秒单位的锁过期时间;getLockName则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:UUID+":"threadId。有的同学可能会问,这样不是很缜密:不同的JVM可能会生成相同的threadId,所以Redission这里加了一个区分度很高的UUID;

Lua脚本中的执行分为以下三步:

1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过期时间ARGV[2],返回;

2:如果检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间

3:key不存,直接返回key的剩余过期时间(-2)

锁获取失败、解锁过程后在后面的文章继续补充

© 著作权归作者所有

共有人打赏支持

每日一博 | Redission 分布式锁源码解析

JackY-Ji

粉丝 5

博文 34

码字总数 16788

作品 0

杭州

相关文章 最新文章

高性能分布式锁 - lock4j-spring-boot-starter

一种简单的,支持不同方案的高性能分布式锁 简介 lock4j-spring-boot-starter是一个分布式锁组件,其提供了多种不同的支持以满足不同性能和环境的需求。 立志打造一个简单但富有内涵的分布式...

小锅盖

昨天

0

0

用Redis实现分布式锁以及redission使用

前言:分布式环境有很多问题,比如你前一个请求访问的是服务器A,第二个请求访问到了服务器B,就会发生并发重复插入问题,这个时候需要依赖单点来实现分布锁,而redis就是。 先导入maven依赖...

王念博客

2016/05/02

2.9K

4

史上最简单的 SpringCloud 教程 | 终章

转载请标明出处: http://blog.csdn.net/forezp/article/details/70148833 本文出自方志朋的博客 错过了这一篇,你可能再也学不会 Spring Cloud 了!Spring Boot做为下一代 web 框架,Sprin...

forezp

2017/04/12

0

0

每日一博 | Redission 分布式锁源码解析
Gecco 1.1.3 发布,易用的轻量化爬虫

Gecco 1.1.3 发布了,该版本改进内容包括: 处理jsonp的时候,分号问题修复 2.支持自定义下载重试次数定义,GeccoEngine.retry(count) 3.HttpClientDownloader支持response的Set Cookie自动存...

xtuhcy

2016/05/31

2.3K

9

分布式锁实现汇总

[TOC] 分布式锁实现汇总 很多时候我们需要保证同一时间一个方法只能被同一个线程调用,在单机环境中,Java中其实提供了很多并发处理相关的API,但是这些API在分布式场景中就无能为力了。也就...

Wang_Coder

2017/12/01

0

0

易用的轻量化的网络爬虫--GECCO

Gecco是什么 Gecco是一款用java语言开发的轻量化的易用的网络爬虫。Gecco整合了jsoup、httpclient、fastjson、spring、htmlunit、redission等优秀框架,让您只需要配置一些jquery风格的选择器...

xtuhcy

2016/02/19

12.2K

21

database

存储过程高级篇 讲解了一些存储过程的高级特性,包括 cursor、schema、控制语句、事务等。 数据库索引与事务管理 本篇文章为对数据库知识的查缺补漏,从索引,事务管理,存储过程,触发器,一...

掘金官方

01/04

0

0

有高人对我做个估值吗,我这样能在项目中担当什么职位,报酬能有多少

快工作快三年了,本科学历。做个两个java项目,第一个是做产品,第二个为平台框架的。 我在项目中使用的一些技术: 前端jsp,freemarker,sitemesh都懂一些,也能做出东西。但是兴趣不是很大。...

只是会java

2013/08/03

695

10

并发编程-锁的发展和主流分布式锁比较总结

一、锁的发展 系统结构由传统的“单应用服务--》SOA --》微服务 --》无服务器” 的演进过程中,场景越来越复杂,由单体应用的但进程中多线程并发的内存锁,随着互联网场景越来越复杂,在复杂...

贾浩v

2017/10/24

0

0

基于spring+quartz的分布式任务调度

学习地址:http://www.roncoo.com/course/view/e2b459016e2e477dbd5d67c8b23fe86d 课程介绍 Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,它可以与J2EE与J2SE应用程序相...

小红牛

04/19

0

0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

centos 利用yum更新git

#安装源yum install http://opensource.wandisco.com/centos/7/git/x86_64/wandisco-git-release-7-2.noarch.rpm#安装gityum install git#更新gityum update git...

TonyStarkSir

59分钟前

0

0

002、图解算法之大O符号

大O符号 运行时间是评判一个算法的指标,运行时间由算法的执行次数决定。大O符号就是反映出算法的执行次数,即,反应算法的运行时间。如下图所示: 常见的大O运行时间

刀锋

今天

0

0

每日一博 | Redission 分布式锁源码解析
OSChina 周三乱弹 —— 本人拼多多买了一盒杜雪斯

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子:分享Nana的单曲《Lonely》 《Lonely》- Nana 手机党少年们想听歌,请使劲儿戳(这里) @Re-Inspired :每天早上起床的我,一眼万年...

小小编辑

今天

49

7

每日一博 | Redission 分布式锁源码解析
PostgreSQL DBA快速入门(三) - 逻辑和物理备份

执行定期备份并有一整套恢复计划是一个DBA的重要工作之一,这也是数据库可用性和完整性的保障。我们可以在多个机房之间部署流复制集群,以保证但节点故障,但是集群无法保证认为的意外DELET...

闻术苑

今天

0

0

每日一博 | Redission 分布式锁源码解析
分享一个自己写的py扫描路径工具

mac下懒得找扫描工具,自己写了一个简单的文件路径如下: #文件结构-file文件夹-result文件夹-scan.py# -*- coding: utf-8 -*-import requestsimport timeimport osfrom threading i...

hirainn

今天

0

0

39.安装 PHP 5 安装PHP7

11.10/11.11/11.12 安装PHP5 11.13 安装PHP7 php中mysql,mysqli,mysqlnd,pdo到底是什么 http://blog.csdn.net/u013785951/article/details/60876816 查看编译参数 http://ask.apelearn.com/......

王鑫linux

今天

0

0

从MP3中读取专辑封面图片

首先判断MP3文件中是否含有ID3V2的标签,关于ID3V2的格式有一堆的说法 我嘛,不怎么关心,因此只攻专辑图片,也就是判断是否包含APIC这个标识 找到这个标识其实也就是和解析普通文件一样,每...

会哭的鳄鱼

今天

0

0

大型项目 linux 自动化版本发布脚本(shell)之tomcat、nginx服务脚本

最近,又临近博主所负责的一个大型项目的发版了。之前有提到过,该项目涉及到30-40台服务器的发版。且项目客户规定发版需在晚上10-11点左右开始进行,这里博主不得不说每次发布最后都是眼花缭...

em_aaron

今天

0

0

每日一博 | Redission 分布式锁源码解析
数据结构与算法15-链表-单链表

单链表的查找和删除以及显示 正确使用指针 class Link{public int iData;public double dData;public Link next;public Link(int id,double dd){iData = id;dData =...

沉迷于编程的小菜菜

今天

1

0

设计模式 - 行为型 - 责任链模式

介绍 意图:避免请求发送者与接收者耦合在一起,让多个对象都有可能接收请求,将这些对象连接成一条链,并且沿着这条链传递请求,直到有对象处理它为止。 主要解决:职责链上的处理者负责处理...

巨輪

昨天

1

0

每日一博 | Redission 分布式锁源码解析

没有更多内容

加载失败,请刷新页面

加载更多

下一页


以上所述就是小编给大家介绍的《每日一博 | Redission 分布式锁源码解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Algorithms Unlocked

Algorithms Unlocked

Thomas H. Cormen / The MIT Press / 2013-3-1 / USD 25.00

Have you ever wondered how your GPS can find the fastest way to your destination, selecting one route from seemingly countless possibilities in mere seconds? How your credit card account number is pro......一起来看看 《Algorithms Unlocked》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

SHA 加密
SHA 加密

SHA 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具