从源码分析基于Redis的分布式锁

栏目: 数据库 · 发布时间: 6年前

内容简介:分布式锁是控制分布式系统之间同时操作一个数据的一种方式,通过互斥来保证数据的一致性。在分布式版本的算法里我们假设我们有

分布式锁是控制分布式系统之间同时操作一个数据的一种方式,通过互斥来保证数据的一致性。

安全和可靠性保证:

  • 一致性: 互斥,不管任何时候,只有一个客户端能持有同一个锁。
  • 分区可容忍性: 不会死锁,最终一定会得到锁,就算一个持有锁的客户端宕掉或者发生网络分区。
  • 可用性: 只要大多数 Redis 节点正常工作,客户端应该都能获取和释放锁。

Redlock算法

在分布式版本的算法里我们假设我们有 NRedis master 节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调算法。我们已经描述了如何在单节点环境下安全地获取和释放锁。因此我们理所当然地应当用这个方法在每个单节点里来获取和释放锁。在我们的例子里面我们把 N 设成 5 ,这个数字是一个相对比较合理的数值,因此我们需要在不同的计算机或者虚拟机上运行 5master 节点来保证他们大多数情况下都不会同时宕机。一个客户端需要做如下操作来获取锁:

  • 获取当前时间(单位是毫秒)。

  • 轮流用相同的 key 和随机值在 N 个节点上请求锁,在这一步里,客户端在每个 master 上请求锁时,会有一个和总的锁释放时间相比小的多的超时时间。比如如果锁自动释放时间是 10 秒钟,那每个节点锁请求的超时时间可能是 5-50 毫秒的范围,这个可以防止一个客户端在某个宕掉的 master 节点上阻塞过长时间,如果一个 master 节点不可用了,我们应该尽快尝试下一个 master 节点。

  • 客户端计算第二步中获取锁所花的时间,只有当客户端在大多数 master 节点上成功获取了锁(在这里是3个),而且总共消耗的时间不超过锁释放时间,这个锁就认为是获取成功了。

  • 如果锁获取成功了,那现在锁自动释放时间就是最初的锁释放时间减去之前获取锁所消耗的时间。

  • 如果锁获取失败了,不管是因为获取成功的锁不超过一半 (N/2+1) 还是因为总消耗时间超过了锁释放时间,客户端都会到每个 master 节点上释放锁,即便是那些他认为没有获取成功的锁。

redlock-py

redlock-py 是一个 python 基于 redis 实现的 分布式锁

我们先来看看 redlock-py 的使用, 这里使用多进程来操作获取锁和释放锁。

from multiprocessing import Pool
from redlock import Redlock

dlm = Redlock([{"host": "127.0.0.1", "port": 6379, "db": 0}], retry_count=3, retry_delay=0.2)


def test_pool():
    my_lock = dlm.lock("my_resource_name", 1000)  //获取锁
    print(my_lock)
    a = dlm.unlock(my_lock) //释放锁
    print(a)


if __name__ == '__main__':

    pool = Pool(2)
    for i in range(0, 5):
        pool.apply_async(test_pool)
    pool.close()
    pool.join()

redlock-py源码分析

类变量

default_retry_count = 3
default_retry_delay = 0.2
clock_drift_factor = 0.01
unlock_script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end"""

类中定义的变量:

  • default_retry_count : 默认重试次数, (3次)
  • default_retry_delay : 默认重试延时, (0.2毫秒)
  • clock_drift_factor : 过期时间精度, (0.01秒)
  • unlock_script : 释放锁的 lua 脚本,获取锁的 value (value为随机数)是否和传入参数相同,相同既释放锁(删除key)

lock_instance 函数

获取锁, 通过 redisset 命令实现

def lock_instance(self, server, resource, val, ttl):
    try:
        assert isinstance(ttl, int), 'ttl {} is not an integer'.format(ttl)
    except AssertionError as e:
        raise ValueError(str(e))
    return server.set(resource, val, nx=True, px=ttl)

Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:

  • EX second : 设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value

  • PX millisecond : 设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value

  • NX : 只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value

  • XX :只在键已经存在时,才对键进行设置操作

unlock_instance 函数

释放锁, 通过 redislua 脚本实现

def unlock_instance(self, server, resource, val):
    try:
        server.eval(self.unlock_script, 1, resource, val)
    except Exception as e:
        logging.exception("Error unlocking resource %s in server %s", resource, str(server))

Redis 2.6.0 版本开始,通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值。

  • script 参数是一段 Lua 5.1 脚本程序,它会被运行在 Redis 服务器上下文中,这段脚本不必(也不应该)定义为一个 Lua 函数。

  • numkeys 参数用于指定键名参数的个数。

get_unique_id 函数

生成随机数, 随机值是为了以安全的方式释放锁

例如:如果不使用随机数, A 客户端获取了锁,在业务操作中的时间超过了锁的有效时间(过期时间), B 客户端就可以获取锁, 此时 A 客户端业务操作完成想要去释放锁,就会导致将 B 客户端的释放了。

def get_unique_id(self):
    CHARACTERS = string.ascii_letters + string.digits
    return ''.join(random.choice(CHARACTERS) for _ in range(22)).encode()

lock 函数

def lock(self, resource, ttl):
    retry = 0
    val = self.get_unique_id()  // 生产随机数

    # Add 2 milliseconds to the drift to account for Redis expires
    # precision, which is 1 millisecond, plus 1 millisecond min
    # drift for small TTLs.
    drift = int(ttl * self.clock_drift_factor) + 2

    redis_errors = list()
    while retry < self.retry_count:  //重试机制
        n = 0
        start_time = int(time.time() * 1000)
        del redis_errors[:]
        for server in self.servers:
            try:
                if self.lock_instance(server, resource, val, ttl):  //获取锁
                    n += 1
            except RedisError as e:
                redis_errors.append(e)
        elapsed_time = int(time.time() * 1000) - start_time
        validity = int(ttl - elapsed_time - drift)  //通过获取开始时间和结束时间计算剩余ttl
        if validity > 0 and n >= self.quorum:
            if redis_errors:
                raise MultipleRedlockException(redis_errors)
            return Lock(validity, resource, val)  //返回锁信息
        else:
            for server in self.servers:
                try:
                    self.unlock_instance(server, resource, val)  //获取锁失败,尝试释放锁
                except:
                    pass
            retry += 1
            time.sleep(self.retry_delay)  //重试延时
    return False

unlock 函数

def unlock(self, lock):
    redis_errors = []
    for server in self.servers:
        try:
            self.unlock_instance(server, lock.resource, lock.key) // 释放锁
        except RedisError as e:
            redis_errors.append(e)
    if redis_errors:
        raise MultipleRedlockException(redis_errors)

参考文章: https://redis.io/topics/distlock


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

《10%创业家》

《10%创业家》

[美] 帕特里克•J.麦金尼斯 / 李文远 / 广东人民出版社 / 2017-4 / 45.00

还在打工和创业之间苦苦挣扎吗?麦金尼斯用亲身经历告诉你,不用辞职,只需投入10%的时间和资源,就能获得100%的财务自由。你不需要雄厚的资本,也不必占用工作时间,只要准确掌握本书所授的方法,就能立即开始创业。 麦金尼斯是世界银行风投顾问,同时也是一名10%创业家。在本书中,他结合自身的创业咨询经历,为读者讲解了移动互联时代的5种创业模式,还提供了创业基因测试、10%创业计划、自传模板等个性化......一起来看看 《《10%创业家》》 这本书的介绍吧!

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具