Python Barrier源码解析

栏目: Python · 发布时间: 6年前

内容简介:源代码在:阅读本部分之前,需要对条件变量有了解,否则,请先阅读:Barrier对象内部会维护一个状态:

目录

屏障(Barrier)会阻塞所有在该屏障上 等待(wait) 的线程,一直到 指定数量 的线程进入了该屏障,屏障才会被 解除 ,这些线程才能继续运行。也就是说,屏障用于 在同步点上,同步确定数量的线程

本文是在 Python 3.7测试通过的。Barrier是在Python3.2引入到threading模块的。

Python中的Barrier其实是一个“循环屏障”,可以查看这个例子。

from threading import Thread, Barrier
from time import sleep
import logging

logging.basicConfig(level=logging.INFO,
    format="[thread:%(threadName)s] [%(asctime)s] %(message)s",
    datefmt="%F %T")

logger = logging.getLogger(__name__)

b = Barrier(2)

def func(t, b):
    sleep(t)
    logger.info("enter into barrier")
    b.wait()
    logger.info("exit barrier")

t1 = Thread(target=func, name="t1", args=(3, b))
t2 = Thread(target=func, name="t2", args=(5, b))
t1.start()
t2.start()

logger.info("start")
t1.join()
t2.join()
logger.info("end")

源代码在: https://github.com/python/cpython/blob/3.7/Lib/threading.py

阅读本部分之前,需要对条件变量有了解,否则,请先阅读: Python Condition源码解析

Barrier对象内部会维护一个状态: self._state 。其取值为:

  • 0
    表示屏障正在填充状态,也就是,正在等待足够数量的线程进入到屏障
  • 1
    表示屏障正在解除状态,也就是,已经有指定数量的线程进入到了屏障。正在屏障上等待的线程,会依次退出屏障, 在最后一个线程退出屏障时, self._state 会被重置为0 。在 这期间 ,尝试进入到屏障的线程,会一直阻塞,直到在屏障上等待的最后一个线程退出屏障,这些线程会再次尝试进入屏障。
    也就是说,在屏障中,有两类线程: 等待进入到屏障的线程已经进入到屏障的线程
  • -1
    表示屏障正在重置状态。调用 Barrier 对象的 reset() 方法,会将屏障重置到初始状态。重置屏障时, 当前正在屏障上等待的所有线程 都会得到 BrokenBarrierError 错误,并退出屏障,在最后一个线程退出屏障的时候, self._state 会被重置为0。在 重置期间 ,尝试进入到屏障的线程,会一直阻塞,直到在屏障上等待的最后一个线程退出屏障,这些线程会再次尝试进入屏障。
  • -2
    表示屏障正在打破状态。既可以通过调用 Barrier 对象的 abort() 方法,来打破屏障;同时,正在屏障上等待的线程在达到超时之后,也会从内部打破屏障。一个屏障一旦被打破,那么所有 正在屏障上等待的线程正在尝试进入到屏障的线程 ,都会得到 BrokenBarrierError 错误,并退出屏障。需要注意的是:当因为重置屏障,或屏障解除,而导致 正在屏障上等待的线程 退出屏障时,最后一个退出的线程,会将屏障的状态 重置为0 ;但是如果因为屏障被打破而退出屏障时,最后一个退出的线程,并不会重置屏障的状态。也就是说,如果不主动reset屏障,那么以后屏障会一直处于打破状态。尝试进入到处于打破状态的屏障的线程,会直接得到 BrokenBarrierError 错误。

接下来,用一张状态迁移图,来描述Barrier对象的内部状态的 部分 相互转化关系:

Python Barrier源码解析

Python3的Condition增加了 wait_for 方法,并且在Barrier类中用到了该方法,因此先看一下,它的作用:

def wait_for(self, predicate, timeout=None):
        # predicate是一个可调用对象,调用该可调用对象,需要返回一个布尔值

        # <strong>当timeout是None的时候,wait_for方法,首先会调用</strong>
        # <strong> + predicate,如果返回True,则wait_for返回True;</strong>
        # <strong> + 否则,该方法会阻塞,直到被唤醒。</strong>
        # <strong> + 被唤醒之后,会继续调用predicate,...,如此循环,直到调用predicate返回True</strong>

        # <strong>当timeout不是None的时候,wait_for方法,首先会调用predicate,</strong>
        # <strong> + 如果返回True,则wait_for返回True;</strong>
        # <strong> + 否则,该方法会阻塞,直到超时或被唤醒。</strong>
        # <strong> + 在超时或被唤醒之后,会继续调用predicate,...,如此循环,直到:</strong>
        # <strong> + + 1,在timeout期间内,调用predicate返回了True,那么wait_for返回True</strong>
        # <strong> + + 2,在timeout期间内,调用predicate一直返回False,</strong>
        # <strong> + + + 那么,wait_for会等待到超时,然后返回False</strong>

        # 也就是说,如果该方法返回了True,那么调用predicate就会返回True;否则,返回False

        endtime = None
        waittime = timeout
        result = predicate()
        while not result:
            if waittime is not None:
                if endtime is None:
                    endtime = _time() + waittime
                else:
                    waittime = endtime - _time()
                    if waittime <= 0:
                        break
            self.wait(waittime)
            result = predicate()
        return result

下面是Barrier类的主要源码:

class Barrier:
    def __init__(self, parties, action=None, timeout=None):
        # parties: 线程数量
        # action: 是一个可调用对象,当屏障解除时,会调用该可调用对象,
        # + 如果,在调用该可调用对象时,出现了异常,该屏障会进入到broken状态(可以查看:<a href="http://download.timd.cn/blog/2018/test_barrier_1.py.txt">例子</a>)
        # timeout:是一个超时时间,它是wait()方法的默认超时时间

        self._cond = Condition(Lock())
        self._action = action
        self._timeout = timeout
        self._parties = parties
        # <strong>关于屏障对象的状态,上面已经详细说明过</strong>
        self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
        # <strong>已经进入到屏障的线程的数量</strong>
        self._count = 0

    <strong>def wait(self, timeout=None):</strong>
        """<strong>等候屏障被解除或被打破。
        只有当指定数量的线程进入屏障时,屏障才会被移除,这些线程才能继续运行。
        如果指定了action,那么在屏障被移除之前,会先调用它。
        该方法返回,线程是第几个进入到屏障的(从0开始计数)
        </strong>"""

        # 如果没有指定timeout参数,那么使用构造方法中指定的timeout
        if timeout is None:
            timeout = self._timeout

        with self._cond:
            # 当屏障正在解除,或者正在重置时,会阻塞尝试进入到屏障的线程
            # + 在屏障完成解除或重置时,会唤醒这些线程,它们会再次尝试进入屏障
            # + 在屏障被打破时,也会唤醒这些线程,它们会获得BrokenBarrierError,
            # + + 并退出屏障
            self._enter()

            index = self._count
            self._count += 1
            try:
                # 当已经有指定数量的线程,进入到屏障时,那么解除屏障:
                # + 1,运行action指定的可调用对象
                # + 2,释放所有在屏障上等待的线程
                if index + 1 == self._parties:
                    self._release()
                # 否则,等待:
                # + 1,直到屏障被解除
                # + 2,直到屏障被重置
                # + 3,直到屏障被打破
                # + 4,达到超时时间,
                # + + 在达到超时时间之后,线程会在内部打破屏障
                else:
                    self._wait(timeout)

            # 在退出屏障时,
            # + 1,返回线程是第几个进入到屏障的(从0开始计数)
            # + 2,递减进入到屏障的线程数
            # + 3,最后一个退出屏障的线程会重置屏障的状态
            # + + 4,并唤醒等待进入屏障的线程
                return index
            finally:
                self._count -= 1
                self._exit()

    def _enter(self):
        while self._state in (-1, 1):
            self._cond.wait()
        if self._state < 0:
            raise BrokenBarrierError
        assert self._state == 0

    def _release(self):
        try:
            if self._action:
                self._action()
            self._state = 1
            self._cond.notify_all()
        except:
            self._break()
            raise

    def _wait(self, timeout):
        if not self._cond.wait_for(lambda : self._state != 0, timeout):
            self._break()
            raise BrokenBarrierError
        if self._state < 0:
            raise BrokenBarrierError
        assert self._state == 1

    def _exit(self):
        if self._count == 0:
            if self._state in (-1, 1):
                self._state = 0
                self._cond.notify_all()

    <strong>def reset(self):</strong>
        """<strong>把屏障重置到初始状态
        当前正在屏障上等待的线程会得到BrokenBarrierError,并退出屏障
        </strong>"""
        with self._cond:
            # 1,如果有线程正在屏障上等待,那么,
            <strong>if self._count > 0:</strong>
                # + 1.1,如果屏障处于填充状态或打破状态,那么将状态置为正在重置;
                # + + 并唤醒所有正在等待的线程;
                # + + 最后一个离开屏障的线程,会重置屏障的状态为填充状态;
                # + + 并唤醒尝试进入到屏障的线程
                if self._state == 0:
                    self._state = -1
                elif self._state == -2:
                    #was broken, set it to reset state
                    #which clears when the last thread exits
                    self._state = -1
                # + 1.2,如果屏障正在解除或重置,那么无需任何操作,
                # + + 屏障在完成解除或重置之后,会自动回到初始状态
            # 2,如果屏障上没有任何线程在等待,那么:
            # + 2.1,强制将状态置为填充状态(也就是初始状态)
            # + 2.2,唤醒尝试进入到屏障的线程
            else:
                self._state = 0
            <strong>self._cond.notify_all()</strong>

    <strong>def abort(self):</strong>
        """<strong>从外部打破屏障</strong>"""
        with self._cond:
            self._break()

    def _break(self):
        self._state = -2
        self._cond.notify_all()

    @property
    def parties(self):
        # 返回需要的线程数量
        return self._parties

    @property
    def n_waiting(self):
       # 返回当前正在屏障上等待的线程的数量
        if self._state == 0:
            return self._count
        return 0

    @property
    def broken(self):
        # 返回屏障是否处于broken状态
        return self._state == -2


class BrokenBarrierError(RuntimeError):
    pass

以上所述就是小编给大家介绍的《Python Barrier源码解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

设计模式之禅(第2版)

设计模式之禅(第2版)

秦小波 / 机械工业出版社 / 2014-2-25 / 89.00元

本书是设计模式领域公认的3本经典著作之一,“极具趣味,容易理解,但讲解又极为严谨和透彻”是本书的写作风格和方法的最大特点。第1版2010年出版,畅销至今,广受好评,是该领域的里程碑著作。深刻解读6大设计原则和28种设计模式的准确定义、应用方法和最佳实践,全方位比较各种同类模式之间的异同,详细讲解将不同的模式组合使用的方法。第2版在第1版的基础上有两方面的改进,一方面结合读者的意见和建议对原有内容中......一起来看看 《设计模式之禅(第2版)》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具