Python Barrier源码解析

栏目: Python · 发布时间: 6年前

内容简介:源代码在:阅读本部分之前,需要对条件变量有了解,否则,请先阅读:Barrier对象内部会维护一个状态:

目录

屏障(Barrier)会阻塞所有在该屏障上 等待(wait) 的线程,一直到 指定数量 的线程进入了该屏障,屏障才会被 解除 ,这些线程才能继续运行。也就是说,屏障用于 在同步点上,同步确定数量的线程

本文是在 Python 3.7测试通过的。Barrier是在Python3.2引入到threading模块的。

Python中的Barrier其实是一个“循环屏障”,可以查看这个例子。

from threading import Thread, Barrier
from time import sleep
import logging

logging.basicConfig(level=logging.INFO,
    format="[thread:%(threadName)s] [%(asctime)s] %(message)s",
    datefmt="%F %T")

logger = logging.getLogger(__name__)

b = Barrier(2)

def func(t, b):
    sleep(t)
    logger.info("enter into barrier")
    b.wait()
    logger.info("exit barrier")

t1 = Thread(target=func, name="t1", args=(3, b))
t2 = Thread(target=func, name="t2", args=(5, b))
t1.start()
t2.start()

logger.info("start")
t1.join()
t2.join()
logger.info("end")

源代码在: https://github.com/python/cpython/blob/3.7/Lib/threading.py

阅读本部分之前,需要对条件变量有了解,否则,请先阅读: Python Condition源码解析

Barrier对象内部会维护一个状态: self._state 。其取值为:

  • 0
    表示屏障正在填充状态,也就是,正在等待足够数量的线程进入到屏障
  • 1
    表示屏障正在解除状态,也就是,已经有指定数量的线程进入到了屏障。正在屏障上等待的线程,会依次退出屏障, 在最后一个线程退出屏障时, self._state 会被重置为0 。在 这期间 ,尝试进入到屏障的线程,会一直阻塞,直到在屏障上等待的最后一个线程退出屏障,这些线程会再次尝试进入屏障。
    也就是说,在屏障中,有两类线程: 等待进入到屏障的线程已经进入到屏障的线程
  • -1
    表示屏障正在重置状态。调用 Barrier 对象的 reset() 方法,会将屏障重置到初始状态。重置屏障时, 当前正在屏障上等待的所有线程 都会得到 BrokenBarrierError 错误,并退出屏障,在最后一个线程退出屏障的时候, self._state 会被重置为0。在 重置期间 ,尝试进入到屏障的线程,会一直阻塞,直到在屏障上等待的最后一个线程退出屏障,这些线程会再次尝试进入屏障。
  • -2
    表示屏障正在打破状态。既可以通过调用 Barrier 对象的 abort() 方法,来打破屏障;同时,正在屏障上等待的线程在达到超时之后,也会从内部打破屏障。一个屏障一旦被打破,那么所有 正在屏障上等待的线程正在尝试进入到屏障的线程 ,都会得到 BrokenBarrierError 错误,并退出屏障。需要注意的是:当因为重置屏障,或屏障解除,而导致 正在屏障上等待的线程 退出屏障时,最后一个退出的线程,会将屏障的状态 重置为0 ;但是如果因为屏障被打破而退出屏障时,最后一个退出的线程,并不会重置屏障的状态。也就是说,如果不主动reset屏障,那么以后屏障会一直处于打破状态。尝试进入到处于打破状态的屏障的线程,会直接得到 BrokenBarrierError 错误。

接下来,用一张状态迁移图,来描述Barrier对象的内部状态的 部分 相互转化关系:

Python Barrier源码解析

Python3的Condition增加了 wait_for 方法,并且在Barrier类中用到了该方法,因此先看一下,它的作用:

def wait_for(self, predicate, timeout=None):
        # predicate是一个可调用对象,调用该可调用对象,需要返回一个布尔值

        # <strong>当timeout是None的时候,wait_for方法,首先会调用</strong>
        # <strong> + predicate,如果返回True,则wait_for返回True;</strong>
        # <strong> + 否则,该方法会阻塞,直到被唤醒。</strong>
        # <strong> + 被唤醒之后,会继续调用predicate,...,如此循环,直到调用predicate返回True</strong>

        # <strong>当timeout不是None的时候,wait_for方法,首先会调用predicate,</strong>
        # <strong> + 如果返回True,则wait_for返回True;</strong>
        # <strong> + 否则,该方法会阻塞,直到超时或被唤醒。</strong>
        # <strong> + 在超时或被唤醒之后,会继续调用predicate,...,如此循环,直到:</strong>
        # <strong> + + 1,在timeout期间内,调用predicate返回了True,那么wait_for返回True</strong>
        # <strong> + + 2,在timeout期间内,调用predicate一直返回False,</strong>
        # <strong> + + + 那么,wait_for会等待到超时,然后返回False</strong>

        # 也就是说,如果该方法返回了True,那么调用predicate就会返回True;否则,返回False

        endtime = None
        waittime = timeout
        result = predicate()
        while not result:
            if waittime is not None:
                if endtime is None:
                    endtime = _time() + waittime
                else:
                    waittime = endtime - _time()
                    if waittime <= 0:
                        break
            self.wait(waittime)
            result = predicate()
        return result

下面是Barrier类的主要源码:

class Barrier:
    def __init__(self, parties, action=None, timeout=None):
        # parties: 线程数量
        # action: 是一个可调用对象,当屏障解除时,会调用该可调用对象,
        # + 如果,在调用该可调用对象时,出现了异常,该屏障会进入到broken状态(可以查看:<a href="http://download.timd.cn/blog/2018/test_barrier_1.py.txt">例子</a>)
        # timeout:是一个超时时间,它是wait()方法的默认超时时间

        self._cond = Condition(Lock())
        self._action = action
        self._timeout = timeout
        self._parties = parties
        # <strong>关于屏障对象的状态,上面已经详细说明过</strong>
        self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
        # <strong>已经进入到屏障的线程的数量</strong>
        self._count = 0

    <strong>def wait(self, timeout=None):</strong>
        """<strong>等候屏障被解除或被打破。
        只有当指定数量的线程进入屏障时,屏障才会被移除,这些线程才能继续运行。
        如果指定了action,那么在屏障被移除之前,会先调用它。
        该方法返回,线程是第几个进入到屏障的(从0开始计数)
        </strong>"""

        # 如果没有指定timeout参数,那么使用构造方法中指定的timeout
        if timeout is None:
            timeout = self._timeout

        with self._cond:
            # 当屏障正在解除,或者正在重置时,会阻塞尝试进入到屏障的线程
            # + 在屏障完成解除或重置时,会唤醒这些线程,它们会再次尝试进入屏障
            # + 在屏障被打破时,也会唤醒这些线程,它们会获得BrokenBarrierError,
            # + + 并退出屏障
            self._enter()

            index = self._count
            self._count += 1
            try:
                # 当已经有指定数量的线程,进入到屏障时,那么解除屏障:
                # + 1,运行action指定的可调用对象
                # + 2,释放所有在屏障上等待的线程
                if index + 1 == self._parties:
                    self._release()
                # 否则,等待:
                # + 1,直到屏障被解除
                # + 2,直到屏障被重置
                # + 3,直到屏障被打破
                # + 4,达到超时时间,
                # + + 在达到超时时间之后,线程会在内部打破屏障
                else:
                    self._wait(timeout)

            # 在退出屏障时,
            # + 1,返回线程是第几个进入到屏障的(从0开始计数)
            # + 2,递减进入到屏障的线程数
            # + 3,最后一个退出屏障的线程会重置屏障的状态
            # + + 4,并唤醒等待进入屏障的线程
                return index
            finally:
                self._count -= 1
                self._exit()

    def _enter(self):
        while self._state in (-1, 1):
            self._cond.wait()
        if self._state < 0:
            raise BrokenBarrierError
        assert self._state == 0

    def _release(self):
        try:
            if self._action:
                self._action()
            self._state = 1
            self._cond.notify_all()
        except:
            self._break()
            raise

    def _wait(self, timeout):
        if not self._cond.wait_for(lambda : self._state != 0, timeout):
            self._break()
            raise BrokenBarrierError
        if self._state < 0:
            raise BrokenBarrierError
        assert self._state == 1

    def _exit(self):
        if self._count == 0:
            if self._state in (-1, 1):
                self._state = 0
                self._cond.notify_all()

    <strong>def reset(self):</strong>
        """<strong>把屏障重置到初始状态
        当前正在屏障上等待的线程会得到BrokenBarrierError,并退出屏障
        </strong>"""
        with self._cond:
            # 1,如果有线程正在屏障上等待,那么,
            <strong>if self._count > 0:</strong>
                # + 1.1,如果屏障处于填充状态或打破状态,那么将状态置为正在重置;
                # + + 并唤醒所有正在等待的线程;
                # + + 最后一个离开屏障的线程,会重置屏障的状态为填充状态;
                # + + 并唤醒尝试进入到屏障的线程
                if self._state == 0:
                    self._state = -1
                elif self._state == -2:
                    #was broken, set it to reset state
                    #which clears when the last thread exits
                    self._state = -1
                # + 1.2,如果屏障正在解除或重置,那么无需任何操作,
                # + + 屏障在完成解除或重置之后,会自动回到初始状态
            # 2,如果屏障上没有任何线程在等待,那么:
            # + 2.1,强制将状态置为填充状态(也就是初始状态)
            # + 2.2,唤醒尝试进入到屏障的线程
            else:
                self._state = 0
            <strong>self._cond.notify_all()</strong>

    <strong>def abort(self):</strong>
        """<strong>从外部打破屏障</strong>"""
        with self._cond:
            self._break()

    def _break(self):
        self._state = -2
        self._cond.notify_all()

    @property
    def parties(self):
        # 返回需要的线程数量
        return self._parties

    @property
    def n_waiting(self):
       # 返回当前正在屏障上等待的线程的数量
        if self._state == 0:
            return self._count
        return 0

    @property
    def broken(self):
        # 返回屏障是否处于broken状态
        return self._state == -2


class BrokenBarrierError(RuntimeError):
    pass

以上所述就是小编给大家介绍的《Python Barrier源码解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

We Are the Nerds

We Are the Nerds

Christine Lagorio-Chafkin / Hachette Books / 2018-10-2 / USD 18.30

Reddit hails itself as "the front page of the Internet." It's the third most-visited website in the United States--and yet, millions of Americans have no idea what it is. We Are the Nerds is an eng......一起来看看 《We Are the Nerds》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具