内容简介:分布式锁是控制分布式系统之间同时操作一个数据的一种方式,通过互斥来保证数据的一致性。在分布式版本的算法里我们假设我们有
分布式锁是控制分布式系统之间同时操作一个数据的一种方式,通过互斥来保证数据的一致性。
安全和可靠性保证:
- 一致性: 互斥,不管任何时候,只有一个客户端能持有同一个锁。
- 分区可容忍性: 不会死锁,最终一定会得到锁,就算一个持有锁的客户端宕掉或者发生网络分区。
- 可用性: 只要大多数 Redis 节点正常工作,客户端应该都能获取和释放锁。
Redlock算法
在分布式版本的算法里我们假设我们有 N
个 Redis master
节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调算法。我们已经描述了如何在单节点环境下安全地获取和释放锁。因此我们理所当然地应当用这个方法在每个单节点里来获取和释放锁。在我们的例子里面我们把 N
设成 5
,这个数字是一个相对比较合理的数值,因此我们需要在不同的计算机或者虚拟机上运行 5
个 master
节点来保证他们大多数情况下都不会同时宕机。一个客户端需要做如下操作来获取锁:
-
获取当前时间(单位是毫秒)。
-
轮流用相同的
key
和随机值在N
个节点上请求锁,在这一步里,客户端在每个master
上请求锁时,会有一个和总的锁释放时间相比小的多的超时时间。比如如果锁自动释放时间是10
秒钟,那每个节点锁请求的超时时间可能是5-50
毫秒的范围,这个可以防止一个客户端在某个宕掉的master
节点上阻塞过长时间,如果一个master
节点不可用了,我们应该尽快尝试下一个master
节点。 -
客户端计算第二步中获取锁所花的时间,只有当客户端在大多数
master
节点上成功获取了锁(在这里是3个),而且总共消耗的时间不超过锁释放时间,这个锁就认为是获取成功了。 -
如果锁获取成功了,那现在锁自动释放时间就是最初的锁释放时间减去之前获取锁所消耗的时间。
-
如果锁获取失败了,不管是因为获取成功的锁不超过一半
(N/2+1)
还是因为总消耗时间超过了锁释放时间,客户端都会到每个master
节点上释放锁,即便是那些他认为没有获取成功的锁。
redlock-py
redlock-py
是一个 python
基于 redis
实现的 分布式锁
我们先来看看 redlock-py
的使用, 这里使用多进程来操作获取锁和释放锁。
from multiprocessing import Pool from redlock import Redlock dlm = Redlock([{"host": "127.0.0.1", "port": 6379, "db": 0}], retry_count=3, retry_delay=0.2) def test_pool(): my_lock = dlm.lock("my_resource_name", 1000) //获取锁 print(my_lock) a = dlm.unlock(my_lock) //释放锁 print(a) if __name__ == '__main__': pool = Pool(2) for i in range(0, 5): pool.apply_async(test_pool) pool.close() pool.join()
redlock-py源码分析
类变量
default_retry_count = 3 default_retry_delay = 0.2 clock_drift_factor = 0.01 unlock_script = """ if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end"""
类中定义的变量:
-
default_retry_count
: 默认重试次数, (3次) -
default_retry_delay
: 默认重试延时, (0.2毫秒) -
clock_drift_factor
: 过期时间精度, (0.01秒) -
unlock_script
: 释放锁的lua
脚本,获取锁的value
(value为随机数)是否和传入参数相同,相同既释放锁(删除key)
lock_instance
函数
获取锁, 通过 redis
的 set
命令实现
def lock_instance(self, server, resource, val, ttl): try: assert isinstance(ttl, int), 'ttl {} is not an integer'.format(ttl) except AssertionError as e: raise ValueError(str(e)) return server.set(resource, val, nx=True, px=ttl)
从 Redis 2.6.12
版本开始, SET
命令的行为可以通过一系列参数来修改:
-
EX second
: 设置键的过期时间为second
秒。SET key value EX second
效果等同于SETEX key second value
。 -
PX millisecond
: 设置键的过期时间为millisecond
毫秒。SET key value PX millisecond
效果等同于PSETEX key millisecond value
。 -
NX
: 只在键不存在时,才对键进行设置操作。SET key value NX
效果等同于SETNX key value
。 -
XX
:只在键已经存在时,才对键进行设置操作
unlock_instance
函数
释放锁, 通过 redis
的 lua
脚本实现
def unlock_instance(self, server, resource, val): try: server.eval(self.unlock_script, 1, resource, val) except Exception as e: logging.exception("Error unlocking resource %s in server %s", resource, str(server))
从 Redis 2.6.0
版本开始,通过内置的 Lua
解释器,可以使用 EVAL
命令对 Lua
脚本进行求值。
-
script
参数是一段Lua 5.1
脚本程序,它会被运行在Redis
服务器上下文中,这段脚本不必(也不应该)定义为一个Lua
函数。 -
numkeys
参数用于指定键名参数的个数。
get_unique_id
函数
生成随机数, 随机值是为了以安全的方式释放锁
例如:如果不使用随机数, A
客户端获取了锁,在业务操作中的时间超过了锁的有效时间(过期时间), B
客户端就可以获取锁, 此时 A
客户端业务操作完成想要去释放锁,就会导致将 B
客户端的释放了。
def get_unique_id(self): CHARACTERS = string.ascii_letters + string.digits return ''.join(random.choice(CHARACTERS) for _ in range(22)).encode()
lock
函数
def lock(self, resource, ttl): retry = 0 val = self.get_unique_id() // 生产随机数 # Add 2 milliseconds to the drift to account for Redis expires # precision, which is 1 millisecond, plus 1 millisecond min # drift for small TTLs. drift = int(ttl * self.clock_drift_factor) + 2 redis_errors = list() while retry < self.retry_count: //重试机制 n = 0 start_time = int(time.time() * 1000) del redis_errors[:] for server in self.servers: try: if self.lock_instance(server, resource, val, ttl): //获取锁 n += 1 except RedisError as e: redis_errors.append(e) elapsed_time = int(time.time() * 1000) - start_time validity = int(ttl - elapsed_time - drift) //通过获取开始时间和结束时间计算剩余ttl if validity > 0 and n >= self.quorum: if redis_errors: raise MultipleRedlockException(redis_errors) return Lock(validity, resource, val) //返回锁信息 else: for server in self.servers: try: self.unlock_instance(server, resource, val) //获取锁失败,尝试释放锁 except: pass retry += 1 time.sleep(self.retry_delay) //重试延时 return False
unlock
函数
def unlock(self, lock): redis_errors = [] for server in self.servers: try: self.unlock_instance(server, lock.resource, lock.key) // 释放锁 except RedisError as e: redis_errors.append(e) if redis_errors: raise MultipleRedlockException(redis_errors)
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Spark 源码系列(五)分布式缓存
- 每日一博 | Redission 分布式锁源码解析
- 分布式消息队列 RocketMQ源码解析:Message存储
- Seata 分布式事务框架 TCC 模式源码分析
- 想通关分布式系统「限流问题」?来一篇源码实战
- 从源码角度看 Redis 分布式锁的 3 种实现方案!
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
《10%创业家》
[美] 帕特里克•J.麦金尼斯 / 李文远 / 广东人民出版社 / 2017-4 / 45.00
还在打工和创业之间苦苦挣扎吗?麦金尼斯用亲身经历告诉你,不用辞职,只需投入10%的时间和资源,就能获得100%的财务自由。你不需要雄厚的资本,也不必占用工作时间,只要准确掌握本书所授的方法,就能立即开始创业。 麦金尼斯是世界银行风投顾问,同时也是一名10%创业家。在本书中,他结合自身的创业咨询经历,为读者讲解了移动互联时代的5种创业模式,还提供了创业基因测试、10%创业计划、自传模板等个性化......一起来看看 《《10%创业家》》 这本书的介绍吧!
RGB HSV 转换
RGB HSV 互转工具
HEX HSV 转换工具
HEX HSV 互换工具