内容简介:源代码在:阅读本部分之前,需要对条件变量有了解,否则,请先阅读:Barrier对象内部会维护一个状态:
目录
屏障(Barrier)会阻塞所有在该屏障上 等待(wait) 的线程,一直到 指定数量 的线程进入了该屏障,屏障才会被 解除 ,这些线程才能继续运行。也就是说,屏障用于 在同步点上,同步确定数量的线程 。
本文是在 Python 3.7测试通过的。Barrier是在Python3.2引入到threading模块的。
Python中的Barrier其实是一个“循环屏障”,可以查看这个例子。
from threading import Thread, Barrier from time import sleep import logging logging.basicConfig(level=logging.INFO, format="[thread:%(threadName)s] [%(asctime)s] %(message)s", datefmt="%F %T") logger = logging.getLogger(__name__) b = Barrier(2) def func(t, b): sleep(t) logger.info("enter into barrier") b.wait() logger.info("exit barrier") t1 = Thread(target=func, name="t1", args=(3, b)) t2 = Thread(target=func, name="t2", args=(5, b)) t1.start() t2.start() logger.info("start") t1.join() t2.join() logger.info("end")
源代码在: https://github.com/python/cpython/blob/3.7/Lib/threading.py
阅读本部分之前,需要对条件变量有了解,否则,请先阅读: Python Condition源码解析 。
Barrier对象内部会维护一个状态: self._state
。其取值为:
- 0
表示屏障正在填充状态,也就是,正在等待足够数量的线程进入到屏障 - 1
表示屏障正在解除状态,也就是,已经有指定数量的线程进入到了屏障。正在屏障上等待的线程,会依次退出屏障, 在最后一个线程退出屏障时,self._state
会被重置为0 。在 这期间 ,尝试进入到屏障的线程,会一直阻塞,直到在屏障上等待的最后一个线程退出屏障,这些线程会再次尝试进入屏障。
也就是说,在屏障中,有两类线程: 等待进入到屏障的线程 和 已经进入到屏障的线程 。 - -1
表示屏障正在重置状态。调用Barrier
对象的reset()
方法,会将屏障重置到初始状态。重置屏障时, 当前正在屏障上等待的所有线程 都会得到BrokenBarrierError
错误,并退出屏障,在最后一个线程退出屏障的时候,self._state
会被重置为0。在 重置期间 ,尝试进入到屏障的线程,会一直阻塞,直到在屏障上等待的最后一个线程退出屏障,这些线程会再次尝试进入屏障。 - -2
表示屏障正在打破状态。既可以通过调用Barrier
对象的abort()
方法,来打破屏障;同时,正在屏障上等待的线程在达到超时之后,也会从内部打破屏障。一个屏障一旦被打破,那么所有 正在屏障上等待的线程 和 正在尝试进入到屏障的线程 ,都会得到BrokenBarrierError
错误,并退出屏障。需要注意的是:当因为重置屏障,或屏障解除,而导致 正在屏障上等待的线程 退出屏障时,最后一个退出的线程,会将屏障的状态 重置为0 ;但是如果因为屏障被打破而退出屏障时,最后一个退出的线程,并不会重置屏障的状态。也就是说,如果不主动reset屏障,那么以后屏障会一直处于打破状态。尝试进入到处于打破状态的屏障的线程,会直接得到BrokenBarrierError
错误。
接下来,用一张状态迁移图,来描述Barrier对象的内部状态的 部分 相互转化关系:
Python3的Condition增加了 wait_for
方法,并且在Barrier类中用到了该方法,因此先看一下,它的作用:
def wait_for(self, predicate, timeout=None): # predicate是一个可调用对象,调用该可调用对象,需要返回一个布尔值 # <strong>当timeout是None的时候,wait_for方法,首先会调用</strong> # <strong> + predicate,如果返回True,则wait_for返回True;</strong> # <strong> + 否则,该方法会阻塞,直到被唤醒。</strong> # <strong> + 被唤醒之后,会继续调用predicate,...,如此循环,直到调用predicate返回True</strong> # <strong>当timeout不是None的时候,wait_for方法,首先会调用predicate,</strong> # <strong> + 如果返回True,则wait_for返回True;</strong> # <strong> + 否则,该方法会阻塞,直到超时或被唤醒。</strong> # <strong> + 在超时或被唤醒之后,会继续调用predicate,...,如此循环,直到:</strong> # <strong> + + 1,在timeout期间内,调用predicate返回了True,那么wait_for返回True</strong> # <strong> + + 2,在timeout期间内,调用predicate一直返回False,</strong> # <strong> + + + 那么,wait_for会等待到超时,然后返回False</strong> # 也就是说,如果该方法返回了True,那么调用predicate就会返回True;否则,返回False endtime = None waittime = timeout result = predicate() while not result: if waittime is not None: if endtime is None: endtime = _time() + waittime else: waittime = endtime - _time() if waittime <= 0: break self.wait(waittime) result = predicate() return result
下面是Barrier类的主要源码:
class Barrier: def __init__(self, parties, action=None, timeout=None): # parties: 线程数量 # action: 是一个可调用对象,当屏障解除时,会调用该可调用对象, # + 如果,在调用该可调用对象时,出现了异常,该屏障会进入到broken状态(可以查看:<a href="http://download.timd.cn/blog/2018/test_barrier_1.py.txt">例子</a>) # timeout:是一个超时时间,它是wait()方法的默认超时时间 self._cond = Condition(Lock()) self._action = action self._timeout = timeout self._parties = parties # <strong>关于屏障对象的状态,上面已经详细说明过</strong> self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken # <strong>已经进入到屏障的线程的数量</strong> self._count = 0 <strong>def wait(self, timeout=None):</strong> """<strong>等候屏障被解除或被打破。 只有当指定数量的线程进入屏障时,屏障才会被移除,这些线程才能继续运行。 如果指定了action,那么在屏障被移除之前,会先调用它。 该方法返回,线程是第几个进入到屏障的(从0开始计数) </strong>""" # 如果没有指定timeout参数,那么使用构造方法中指定的timeout if timeout is None: timeout = self._timeout with self._cond: # 当屏障正在解除,或者正在重置时,会阻塞尝试进入到屏障的线程 # + 在屏障完成解除或重置时,会唤醒这些线程,它们会再次尝试进入屏障 # + 在屏障被打破时,也会唤醒这些线程,它们会获得BrokenBarrierError, # + + 并退出屏障 self._enter() index = self._count self._count += 1 try: # 当已经有指定数量的线程,进入到屏障时,那么解除屏障: # + 1,运行action指定的可调用对象 # + 2,释放所有在屏障上等待的线程 if index + 1 == self._parties: self._release() # 否则,等待: # + 1,直到屏障被解除 # + 2,直到屏障被重置 # + 3,直到屏障被打破 # + 4,达到超时时间, # + + 在达到超时时间之后,线程会在内部打破屏障 else: self._wait(timeout) # 在退出屏障时, # + 1,返回线程是第几个进入到屏障的(从0开始计数) # + 2,递减进入到屏障的线程数 # + 3,最后一个退出屏障的线程会重置屏障的状态 # + + 4,并唤醒等待进入屏障的线程 return index finally: self._count -= 1 self._exit() def _enter(self): while self._state in (-1, 1): self._cond.wait() if self._state < 0: raise BrokenBarrierError assert self._state == 0 def _release(self): try: if self._action: self._action() self._state = 1 self._cond.notify_all() except: self._break() raise def _wait(self, timeout): if not self._cond.wait_for(lambda : self._state != 0, timeout): self._break() raise BrokenBarrierError if self._state < 0: raise BrokenBarrierError assert self._state == 1 def _exit(self): if self._count == 0: if self._state in (-1, 1): self._state = 0 self._cond.notify_all() <strong>def reset(self):</strong> """<strong>把屏障重置到初始状态 当前正在屏障上等待的线程会得到BrokenBarrierError,并退出屏障 </strong>""" with self._cond: # 1,如果有线程正在屏障上等待,那么, <strong>if self._count > 0:</strong> # + 1.1,如果屏障处于填充状态或打破状态,那么将状态置为正在重置; # + + 并唤醒所有正在等待的线程; # + + 最后一个离开屏障的线程,会重置屏障的状态为填充状态; # + + 并唤醒尝试进入到屏障的线程 if self._state == 0: self._state = -1 elif self._state == -2: #was broken, set it to reset state #which clears when the last thread exits self._state = -1 # + 1.2,如果屏障正在解除或重置,那么无需任何操作, # + + 屏障在完成解除或重置之后,会自动回到初始状态 # 2,如果屏障上没有任何线程在等待,那么: # + 2.1,强制将状态置为填充状态(也就是初始状态) # + 2.2,唤醒尝试进入到屏障的线程 else: self._state = 0 <strong>self._cond.notify_all()</strong> <strong>def abort(self):</strong> """<strong>从外部打破屏障</strong>""" with self._cond: self._break() def _break(self): self._state = -2 self._cond.notify_all() @property def parties(self): # 返回需要的线程数量 return self._parties @property def n_waiting(self): # 返回当前正在屏障上等待的线程的数量 if self._state == 0: return self._count return 0 @property def broken(self): # 返回屏障是否处于broken状态 return self._state == -2 class BrokenBarrierError(RuntimeError): pass
以上所述就是小编给大家介绍的《Python Barrier源码解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。