使用 python 实现简单的共享锁和排他锁

栏目: 数据库 · 发布时间: 5年前

内容简介:本文通过代码实操讲解了如何使用 python 实现简单的共享锁和排他锁。上篇文章回顾:以 S 锁为例,获取锁的步骤如下:

本文通过代码实操讲解了如何使用 python 实现简单的共享锁和排他锁。

上篇文章回顾: 记一次容量提升5倍的HttpDns业务Cache调优

共享锁和排它锁

1、什么是共享锁

共享锁又称为读锁。

从多线程的角度来讲,共享锁允许多个线程同时访问资源,但是对写资源只能又一个线程进行。

从事务的角度来讲,若事务 T 对数据 A 加上共享锁,则事务 T 只能读 A; 其他事务也只能对数据 A 加共享锁,而不能加排他锁,直到事务 T 释放 A 上的 S 锁。这就保证了其他事务可以读 A,但是在事务 T 释放 A 上的共享锁之前,不能对 A 做任何修改。

2、什么是排它锁

排他锁又成为写锁。

从多线程的角度来讲,在访问共享资源之前对进行加锁操作,在访问完成之后进行解锁操作。 加锁后,任何其他试图再次加锁的线程会被阻塞,直到当前进程解锁。如果解锁时有一个以上的线程阻塞,那么所有该锁上的线程都被编程就绪状态, 第一个变为就绪状态的线程又执行加锁操作,那么其他的线程又会进入等待。 在这种方式下,只有一个线程能够访问被互斥锁保护的资源。

从事务的角度来讲,若事务T对数据对象A加上排它锁,则只允许T读取和修改数据A,其他任何事务都不能再对A加任何类型的锁,直到事务T释放X锁。它可以防止其他事务获取资源上的锁,直到事务末尾释放锁。

InnoDB 中的行锁

InnoDB实现了以下两种类型的行锁:

共享锁(S): 允许一个事务去读一行,阻止其他事务获得相同数据集的排他锁。

排他锁(X): 允许获得排他锁的事务更新数据,阻止其他事务取得相同数据集的共享读锁和排他写锁。

另外,为了允许行锁和表锁共存,实现多粒度锁机制,InnoDB 还有两种内部使用的意向锁(Intention Locks),这两种意向锁都是表锁。

意向共享锁(IS): 事务打算给数据行加行共享锁,事务在给一个数据行加共享锁前必须先取得该表的 IS 锁。

意向排他锁(IX) 事务打算给数据行加行排他锁,事务在给一个数据行加排他锁前必须先取得该表的 IX 锁。

使用 python 实现简单的共享锁和排他锁

如果一个事务请求的锁模式与当前的锁兼容,InnoDB 就将请求的锁授予该事务;反之,如果两者不兼容,该事务就要等待锁释放。

意向锁是 InnoDB 自动加的,不需用户干预。对于 UPDATE、DELETE 和 INSERT 语句,InnoDB 会自动给涉及数据集加排他锁(X);对于普通SELECT语句,InnoDB 不会加任何锁;事务可以通过以下语句显示给记录集加共享锁或排他锁。

共享锁(S):

SELECT * FROM table_name WHERE ... LOCK IN SHARE MODE复制代码

排他锁(X):

SELECT * FROM table_name WHERE ... FOR UPDATE复制代码

用 SELECT ... IN SHARE MODE获得共享锁,主要用在需要数据依存关系时来确认某行记录是否存在,并确保没有人对这个记录进行UPDATE或者DELETE操作。但是如果当前事务也需要对该记录进行更新操作,则很有可能造成死锁,对于锁定行记录后需要进行更新操作的应用,应该使用 SELECT... FOR UPDATE 方式获得排他锁。

使用Python实现

1、 代码实现

不多说,直接上代码:

# -*- coding: utf-8 -*-import threadingclass Source:    # 队列成员标识    __N = None    # 排他锁    __X = 0    # 意向排他锁    __IX = 1    # 共享锁标识    __S = 2    # 意向共享标识    __IS = 3    # 同步排他锁    __lockX = threading.Lock()       # 事件通知    __events = [        threading.Event(),        threading.Event(),        threading.Event(),        threading.Event()   ]        # 事件通知队列    __eventsQueue = [       [],       [],       [],       []   ]        # 事件变更锁    __eventsLock = [        threading.Lock(),        threading.Lock(),        threading.Lock(),        threading.Lock()   ]         # 相互互斥的锁    __mutexFlag = {}         # 锁类    class __ChildLock:        # 锁标识        __flag = 0        # 锁定的资源        __source = None        def __init__(self, source, flag):            self.__flag = flag            self.__source = source                # 加锁        def lock(self):            self.__source.lock(self.__flag)                        # 解锁        def unlock(self):            self.__source.unlock(self.__flag)     def __init__(self):        self.__initMutexFlag()        self.__initEvents()        # 不建议直接在外面使用,以免死锁    def lock(self, flag):        # 如果是排他锁,先进进行枷锁        if flag == self.__X: self.__lockX.acquire()        self.__events[flag].wait()        self.__lockEvents(flag)            # 不建议直接在外面使用,以免死锁    def unlock(self, flag):        self.__unlockEvents(flag)                if flag == self.__X: self.__lockX.release()            # 获取相互互斥    def __getMutexFlag(self, flag):        return self.__mutexFlag[flag]        def __initMutexFlag(self):        self.__mutexFlag[self.__X] = [self.__X, self.__IX, self.__S, self.__IS]        self.__mutexFlag[self.__IX] = [self.__X, self.__S]        self.__mutexFlag[self.__S] = [self.__X, self.__IX]        self.__mutexFlag[self.__IS] = [self.__X]            def __initEvents(self):        for event in self.__events:            event.set()            # 给事件加锁, 调用 wait 时阻塞    def __lockEvents(self, flag):        mutexFlags = self.__getMutexFlag(flag)                for i in mutexFlags:                        # 为了保证原子操作,加锁            self.__eventsLock[i].acquire()            self.__eventsQueue[i].append(self.__N)            self.__events[i].clear()            self.__eventsLock[i].release()        # 给事件解锁, 调用 wait 不阻塞    def __unlockEvents(self, flag):        mutexFlags = self.__getMutexFlag(flag)                for i in mutexFlags:                        # 为了保证原子操作,加锁            self.__eventsLock[i].acquire()            self.__eventsQueue[i].pop(0)                        if len(self.__eventsQueue[i]) == 0: self.__events[i].set()            self.__eventsLock[i].release()        # 获取锁    def __getLock(self, flag):        lock = self.__ChildLock(self, flag)        lock.lock()                return lock       # 获取 X 锁    def lockX(self):        return self.__getLock(self.__X)        # 获取 IX 锁    def lockIX(self):        return self.__getLock(self.__IX)            # 获取 S 锁    def lockS(self):        return self.__getLock(self.__S)            # 获取 IS 锁    def lockIS(self):        return self.__getLock(self.__IS)复制代码

使用方式:

from lock import Source# 初始化一个锁资源source = Source()# 获取资源的X锁,获取不到则线程被阻塞,获取到了继续往下执行lock = source.lockX() lock.unlock()lock = source.lockIX() lock.unlock()lock = source.lockS() lock.unlock()lock = source.lockIS() lock.unlock()复制代码

2、实现思路

以 S 锁为例,获取锁的步骤如下:

  • 检测 S 锁是否可以取到,取到了话继续执行,没有取到则阻塞,等待其他线程解锁唤醒。

  • 获取与 S 锁相互冲突的锁(IX,X),并将 IX 锁和 X 锁 锁住,后续想获得 IX 锁或者 X 锁的线程就会被阻塞。

  • 向 IX 锁和 X 锁的标识队列插入标识,如果此时另外一个线程拿到了 IS 锁,则会继续想 IX 锁队列标识插入标识。

  • 完成加锁,返回 S 锁。

以 S 锁为例,解锁的步骤如下:

  • 获取与 S 锁相互冲突的锁(IX,X),向 IX 锁和 X 锁的标识队列移除一个标识。

  • 判断 IX 锁和 X 锁队列标识是否为空,如果不为空,则继续锁定,为空则解锁并唤醒被 IX 锁和 X 锁阻塞的线程。

  • 完成 S 锁解锁。

3、锁兼容测试

测试代码

# -*- coding: utf-8 -*-import threadingimport timefrom lock import Source# 初始化资源source= Source()maplockname=['X','IX','S','IS']class MyThread(threading,Thread):    flag = None    def __init__(self, flag):        super().__init__()        self.flag = flag    def run(self):        lock = self.lock()        time1 = time.time()        strtime1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time1))        print('我拿到 %s 锁,开始执行了喔,现在时间是 %s' % (maplockname[self.flag], strtime1))        time.sleep(1)        time2 = time.time()        strtime2 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time2))        print('我释放 %s 锁,结束执行了,现在时间是 %s' % (maplockname[self.flag], strtime2))        lock.unlock()       def lock(self):        if self.flag == 0:                  return source.lockX()                elif self.flag == 1:               return source.lockIX()               elif self.flag == 2:                         return source.lockS()                else:                         return source.lockIS()             def unlock(self, lock):        lock.unlock()def test_lock():    for x in range(0, 4):                  for y in range(0, 4):                time1 = time.time()                thread1 = MyThread(x)                thread2 = MyThread(y)                thread1.start()                thread2.start()                thread1.join()                thread2.join()                time2 = time.time()                difftime = time2 - time1                            if difftime > 2:                     print('%s 锁和 %s 锁 冲突了!' % (maplockname[x], maplockname[y]))                            elif difftime > 1:                     print('%s 锁和 %s 锁 没有冲突!' % (maplockname[x], maplockname[y]))            print('')if __name__ == '__main__':    test_lock()复制代码

运行结果:

我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:09我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:10我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:10我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:11X 锁和 X 锁 冲突了!我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:11我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:12我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:12我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:13X 锁和 IX 锁 冲突了!我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:13我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:14我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:14我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:15X 锁和 S 锁 冲突了!我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:15我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:16我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:16我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:17X 锁和 IS 锁 冲突了!我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:17我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:18我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:18我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:19IX 锁和 X 锁 冲突了!我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:19我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:19我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:20我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:20IX 锁和 IX 锁 没有冲突!我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:20我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:21我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:21我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:22IX 锁和 S 锁 冲突了!我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:22我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:22我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:23我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:23IX 锁和 IS 锁 没有冲突!我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:23我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:24我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:24我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:25S 锁和 X 锁 冲突了!我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:25我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:26我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:26我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:27S 锁和 IX 锁 冲突了!我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:27我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:27我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:28我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:28S 锁和 S 锁 没有冲突!我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:28我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:28我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:29我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:29S 锁和 IS 锁 没有冲突!我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:29我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:30我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:30我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:31IS 锁和 X 锁 冲突了!我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:31我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:31我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:32我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:32IS 锁和 IX 锁 没有冲突!我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:32我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:32我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:33我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:33IS 锁和 S 锁 没有冲突!我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:33我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:33我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:34我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:34IS 锁和 IS 锁 没有冲突!复制代码

4、 公平锁与非公平锁

(1)问题分析

仔细想了想,如果有一种场景,就是用户一直再读,写获取不到锁,那么不就造成脏读吗?这不就是由于资源的抢占不就是非公平锁造成的。如何避免这个问题呢?这就涉及到了公平锁与非公平锁。

对产生的结果来说,如果一个线程组里,能保证每个线程都能拿到锁,那么这个锁就是公平锁。相反,如果保证不了每个线程都能拿到锁,也就是存在有线程饿死,那么这个锁就是非公平锁。

(2)非公平锁测试

上述代码锁实现的是非公平锁,测试代码如下:

def test_fair_lock():    threads = []        for i in range(0, 10):                  if i == 2:                       # 0 代表排他锁(X)            threads.append(MyThread(0))                  else:                        # 2 代表共享锁(S)            threads.append(MyThread(2))    for thread in threads: thread.start()复制代码

运行结果:

我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:34我释放 X 锁了,结束执行了,现在时间是 2019-02-17 19:06:35复制代码

可以看到由于资源抢占问题,排他锁被最后才被获取到了。

(3)公平锁的实现

实现公平锁,只需要在原有的代码进行小小得修改就行了。

class Source:    # ...... 省略    def __init__(self, isFair=False):          self.__isFair = isFair          self.__initMutexFlag()          self.__initEvents()        # ...... 省略        def lock(self, flag):        # 如果是排他锁,先进进行枷锁        if flag == self.__X: self.__lockX.acquire()                if self.__isFair:                   # 如果是公平锁则,先将互斥的锁给阻塞,防止其他线程进入            self.__lockEventsWait(flag)            self.__events[flag].wait()            self.__lockEventsQueue(flag)                else:                    # 如果是非公平锁,如果锁拿不到,则先等待            self.__events[flag].wait()            self.__lockEvents(flag)       def __lockEventsWait(self, flag):        mutexFlags = self.__getMutexFlag(flag)                for i in mutexFlags:                        # 为了保证原子操作,加锁            self.__eventsLock[i].acquire()            self.__events[i].clear()            self.__eventsLock[i].release()        def __lockEventsQueue(self, flag):          mutexFlags = self.__getMutexFlag(flag)               for i in mutexFlags:                            # 为了保证原子操作,加锁                self.__eventsLock[i].acquire()                self.__eventsQueue[i].append(self.__N)                self.__eventsLock[i].release()复制代码

测试代码:

source = Source(True)def test_fair_lock():    threads = []        for i in range(0, 10):                  if i == 2:                            # 0 代表排他锁(X)                threads.append(MyThread(0))                  else:                           # 2 代表共享锁(S)                threads.append(MyThread(2))            for thread in threads: thread.start()复制代码

运行结果:

我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:16我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:16我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:17我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:17我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:17我释放 X 锁了,结束执行了,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19复制代码

可以看到排他锁在第二次的时候就被获取到了。

(4)优缺点

非公平锁性能高于公平锁性能。首先,在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。而且,非公平锁能更充分的利用cpu的时间片,尽量的减少cpu空闲的状态时间。

参考文献:

共享锁(S锁)和排它锁(X锁):https://www.jianshu.com/p/bd3b3ccedda9

Java多线程--互斥锁/共享锁/读写锁 快速入门:https://www.jianshu.com/p/87ac733fda80

Java多线程 -- 公平锁和非公平锁的一些思考:https://www.jianshu.com/p/eaea337c5e5b

MySQL- InnoDB锁机制:https://www.cnblogs.com/aipiaoborensheng/p/5767459.html

文章首发于共公众号“小米运维”,点击查看原文。


以上所述就是小编给大家介绍的《使用 python 实现简单的共享锁和排他锁》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Understanding Machine Learning

Understanding Machine Learning

Shai Shalev-Shwartz、Shai Ben-David / Cambridge University Press / 2014 / USD 48.51

Machine learning is one of the fastest growing areas of computer science, with far-reaching applications. The aim of this textbook is to introduce machine learning, and the algorithmic paradigms it of......一起来看看 《Understanding Machine Learning》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具