内容简介:源代码在:阅读本部分之前,需要对条件变量有了解,否则,请先阅读:Barrier对象内部会维护一个状态:
目录
屏障(Barrier)会阻塞所有在该屏障上 等待(wait) 的线程,一直到 指定数量 的线程进入了该屏障,屏障才会被 解除 ,这些线程才能继续运行。也就是说,屏障用于 在同步点上,同步确定数量的线程 。
本文是在 Python 3.7测试通过的。Barrier是在Python3.2引入到threading模块的。
Python中的Barrier其实是一个“循环屏障”,可以查看这个例子。
from threading import Thread, Barrier
from time import sleep
import logging
logging.basicConfig(level=logging.INFO,
format="[thread:%(threadName)s] [%(asctime)s] %(message)s",
datefmt="%F %T")
logger = logging.getLogger(__name__)
b = Barrier(2)
def func(t, b):
sleep(t)
logger.info("enter into barrier")
b.wait()
logger.info("exit barrier")
t1 = Thread(target=func, name="t1", args=(3, b))
t2 = Thread(target=func, name="t2", args=(5, b))
t1.start()
t2.start()
logger.info("start")
t1.join()
t2.join()
logger.info("end")
源代码在: https://github.com/python/cpython/blob/3.7/Lib/threading.py
阅读本部分之前,需要对条件变量有了解,否则,请先阅读: Python Condition源码解析 。
Barrier对象内部会维护一个状态: self._state 。其取值为:
- 0
表示屏障正在填充状态,也就是,正在等待足够数量的线程进入到屏障 - 1
表示屏障正在解除状态,也就是,已经有指定数量的线程进入到了屏障。正在屏障上等待的线程,会依次退出屏障, 在最后一个线程退出屏障时,self._state会被重置为0 。在 这期间 ,尝试进入到屏障的线程,会一直阻塞,直到在屏障上等待的最后一个线程退出屏障,这些线程会再次尝试进入屏障。
也就是说,在屏障中,有两类线程: 等待进入到屏障的线程 和 已经进入到屏障的线程 。 - -1
表示屏障正在重置状态。调用Barrier对象的reset()方法,会将屏障重置到初始状态。重置屏障时, 当前正在屏障上等待的所有线程 都会得到BrokenBarrierError错误,并退出屏障,在最后一个线程退出屏障的时候,self._state会被重置为0。在 重置期间 ,尝试进入到屏障的线程,会一直阻塞,直到在屏障上等待的最后一个线程退出屏障,这些线程会再次尝试进入屏障。 - -2
表示屏障正在打破状态。既可以通过调用Barrier对象的abort()方法,来打破屏障;同时,正在屏障上等待的线程在达到超时之后,也会从内部打破屏障。一个屏障一旦被打破,那么所有 正在屏障上等待的线程 和 正在尝试进入到屏障的线程 ,都会得到BrokenBarrierError错误,并退出屏障。需要注意的是:当因为重置屏障,或屏障解除,而导致 正在屏障上等待的线程 退出屏障时,最后一个退出的线程,会将屏障的状态 重置为0 ;但是如果因为屏障被打破而退出屏障时,最后一个退出的线程,并不会重置屏障的状态。也就是说,如果不主动reset屏障,那么以后屏障会一直处于打破状态。尝试进入到处于打破状态的屏障的线程,会直接得到BrokenBarrierError错误。
接下来,用一张状态迁移图,来描述Barrier对象的内部状态的 部分 相互转化关系:
Python3的Condition增加了 wait_for 方法,并且在Barrier类中用到了该方法,因此先看一下,它的作用:
def wait_for(self, predicate, timeout=None):
# predicate是一个可调用对象,调用该可调用对象,需要返回一个布尔值
# <strong>当timeout是None的时候,wait_for方法,首先会调用</strong>
# <strong> + predicate,如果返回True,则wait_for返回True;</strong>
# <strong> + 否则,该方法会阻塞,直到被唤醒。</strong>
# <strong> + 被唤醒之后,会继续调用predicate,...,如此循环,直到调用predicate返回True</strong>
# <strong>当timeout不是None的时候,wait_for方法,首先会调用predicate,</strong>
# <strong> + 如果返回True,则wait_for返回True;</strong>
# <strong> + 否则,该方法会阻塞,直到超时或被唤醒。</strong>
# <strong> + 在超时或被唤醒之后,会继续调用predicate,...,如此循环,直到:</strong>
# <strong> + + 1,在timeout期间内,调用predicate返回了True,那么wait_for返回True</strong>
# <strong> + + 2,在timeout期间内,调用predicate一直返回False,</strong>
# <strong> + + + 那么,wait_for会等待到超时,然后返回False</strong>
# 也就是说,如果该方法返回了True,那么调用predicate就会返回True;否则,返回False
endtime = None
waittime = timeout
result = predicate()
while not result:
if waittime is not None:
if endtime is None:
endtime = _time() + waittime
else:
waittime = endtime - _time()
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result
下面是Barrier类的主要源码:
class Barrier:
def __init__(self, parties, action=None, timeout=None):
# parties: 线程数量
# action: 是一个可调用对象,当屏障解除时,会调用该可调用对象,
# + 如果,在调用该可调用对象时,出现了异常,该屏障会进入到broken状态(可以查看:<a href="http://download.timd.cn/blog/2018/test_barrier_1.py.txt">例子</a>)
# timeout:是一个超时时间,它是wait()方法的默认超时时间
self._cond = Condition(Lock())
self._action = action
self._timeout = timeout
self._parties = parties
# <strong>关于屏障对象的状态,上面已经详细说明过</strong>
self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
# <strong>已经进入到屏障的线程的数量</strong>
self._count = 0
<strong>def wait(self, timeout=None):</strong>
"""<strong>等候屏障被解除或被打破。
只有当指定数量的线程进入屏障时,屏障才会被移除,这些线程才能继续运行。
如果指定了action,那么在屏障被移除之前,会先调用它。
该方法返回,线程是第几个进入到屏障的(从0开始计数)
</strong>"""
# 如果没有指定timeout参数,那么使用构造方法中指定的timeout
if timeout is None:
timeout = self._timeout
with self._cond:
# 当屏障正在解除,或者正在重置时,会阻塞尝试进入到屏障的线程
# + 在屏障完成解除或重置时,会唤醒这些线程,它们会再次尝试进入屏障
# + 在屏障被打破时,也会唤醒这些线程,它们会获得BrokenBarrierError,
# + + 并退出屏障
self._enter()
index = self._count
self._count += 1
try:
# 当已经有指定数量的线程,进入到屏障时,那么解除屏障:
# + 1,运行action指定的可调用对象
# + 2,释放所有在屏障上等待的线程
if index + 1 == self._parties:
self._release()
# 否则,等待:
# + 1,直到屏障被解除
# + 2,直到屏障被重置
# + 3,直到屏障被打破
# + 4,达到超时时间,
# + + 在达到超时时间之后,线程会在内部打破屏障
else:
self._wait(timeout)
# 在退出屏障时,
# + 1,返回线程是第几个进入到屏障的(从0开始计数)
# + 2,递减进入到屏障的线程数
# + 3,最后一个退出屏障的线程会重置屏障的状态
# + + 4,并唤醒等待进入屏障的线程
return index
finally:
self._count -= 1
self._exit()
def _enter(self):
while self._state in (-1, 1):
self._cond.wait()
if self._state < 0:
raise BrokenBarrierError
assert self._state == 0
def _release(self):
try:
if self._action:
self._action()
self._state = 1
self._cond.notify_all()
except:
self._break()
raise
def _wait(self, timeout):
if not self._cond.wait_for(lambda : self._state != 0, timeout):
self._break()
raise BrokenBarrierError
if self._state < 0:
raise BrokenBarrierError
assert self._state == 1
def _exit(self):
if self._count == 0:
if self._state in (-1, 1):
self._state = 0
self._cond.notify_all()
<strong>def reset(self):</strong>
"""<strong>把屏障重置到初始状态
当前正在屏障上等待的线程会得到BrokenBarrierError,并退出屏障
</strong>"""
with self._cond:
# 1,如果有线程正在屏障上等待,那么,
<strong>if self._count > 0:</strong>
# + 1.1,如果屏障处于填充状态或打破状态,那么将状态置为正在重置;
# + + 并唤醒所有正在等待的线程;
# + + 最后一个离开屏障的线程,会重置屏障的状态为填充状态;
# + + 并唤醒尝试进入到屏障的线程
if self._state == 0:
self._state = -1
elif self._state == -2:
#was broken, set it to reset state
#which clears when the last thread exits
self._state = -1
# + 1.2,如果屏障正在解除或重置,那么无需任何操作,
# + + 屏障在完成解除或重置之后,会自动回到初始状态
# 2,如果屏障上没有任何线程在等待,那么:
# + 2.1,强制将状态置为填充状态(也就是初始状态)
# + 2.2,唤醒尝试进入到屏障的线程
else:
self._state = 0
<strong>self._cond.notify_all()</strong>
<strong>def abort(self):</strong>
"""<strong>从外部打破屏障</strong>"""
with self._cond:
self._break()
def _break(self):
self._state = -2
self._cond.notify_all()
@property
def parties(self):
# 返回需要的线程数量
return self._parties
@property
def n_waiting(self):
# 返回当前正在屏障上等待的线程的数量
if self._state == 0:
return self._count
return 0
@property
def broken(self):
# 返回屏障是否处于broken状态
return self._state == -2
class BrokenBarrierError(RuntimeError):
pass
以上所述就是小编给大家介绍的《Python Barrier源码解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。