Python Queue源码分析以及自己实现DelayQueue

栏目: 编程工具 · 发布时间: 7年前

内容简介:队列是用于本文将对Python2.7的标准库提供的Queue模块进行源码解析,并且实现一个Queue模块的源代码在:

目录

队列是用于 多个生产者线程多个消费者线程 之间 同步 数据的。

本文将对 Python 2.7的标准库提供的Queue模块进行源码解析,并且实现一个 延迟队列

Queue模块的源代码在: https://github.com/python/cpython/blob/2.7/Lib/Queue.py

如果对条件变量不了解,可以先看: Python Condition源码解析

接下来,我们看主要代码:

class Queue:
    """使用给定的最大大小创建一个队列对象。
    如果maxsize小于等于0,那么队列是无限大的
    """
    def __init__(self, maxsize=0):
        # <strong>队列的最大容量</strong>
        self.maxsize = maxsize
        # <strong>初始化队列对象的底层数据结构,该数据结构用来保存put到队列中的元素</strong>
        # <strong>在Queue中,底层数据结构是collections.dequeue(双端队列)</strong>
        self._init(maxsize)

        # <strong>在使用队列对象暴漏出的所有的方法时,都应该先获取到这个互斥变量</strong>
        # <strong>所有的获取了该互斥变量的方法,在它返回之前,都应该先释放该变量</strong>
        # <strong>这个互斥变量会在三个条件变量之间共享(也就是说,这三个条件变量</strong>
        # + <strong>的底层锁都是该互斥变量)。因此,获取这些条件变量时,会先获取到该互斥变量;</strong>
        # + <strong>释放这些条件变量时,会先释放该互斥变量。</strong>
        self.mutex = _threading.Lock()
        # Notify not_empty whenever an item is added to the queue; a
        # thread waiting to get is notified then.
        self.not_empty = _threading.Condition(self.mutex)
        # Notify not_full whenever an item is removed from the queue;
        # a thread waiting to put is notified then.
        self.not_full = _threading.Condition(self.mutex)
        # Notify all_tasks_done whenever the number of unfinished tasks
        # drops to zero; thread waiting to join() is notified to resume
        self.all_tasks_done = _threading.Condition(self.mutex)
        self.unfinished_tasks = 0

    # <strong>初始化底层数据结构</strong>
    def _init(self, maxsize):
        self.queue = deque()

    # <strong>把元素保存到底层的数据结构中</strong>
    def _put(self, item):
        self.queue.append(item)

    <strong>def put(self, item, block=True, timeout=None):</strong>
        """往队列中加入一个元素
        如果<em>block</em>参数是True,并且<em>timeout</em>参数是None,
        那么<strong>put</strong>操作会阻塞,直到队列中有空闲的空间。
        如果<em>block</em>参数是True,<em>timeout</em>参数是非负数,
        并且,在这个期间,队列中一直没有空间,那么<strong>put</strong>操作会阻塞
        到超时,然后抛出<strong>Full</strong>异常。
        如果<em>block</em>参数是False,并且队列中有空闲空间,
        那么会立即把元素保存到底层数据结构中;否则,会忽略掉<em>timeout</em>参数,
        抛出<strong>Full</strong>异常。
        """

        # <strong>1,获取条件变量not_full</strong>
        self.not_full.acquire()
        try:
            # <strong>2.1,如果maxsize大于0</strong>
            if self.maxsize > 0:
                if not block:
                    # <strong>2.1.1,如果block是False,并且队列已满,那么抛出Full异常</strong>
                    if self._qsize() == self.maxsize:
                        raise Full
                    # <strong>2.1.2,如果block是False,并且队列未满,那么,走步骤3</strong>
                elif timeout is None:
                    # <strong>2.2.1,如果block是True,并且队列已满,那么线程进入到not_full的waiting池,等待被唤醒;</strong>
                    # <strong> + 如果队列未满,那么,走步骤3</strong>
                    # <strong>2.2.2,线程被唤醒之后,回到2.2.1</strong>
                    # <strong> + (当有其他线程从队列中消费消息时,会通知not_full)</strong>
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    # 如果block为True,但是timeout小于0,那么抛出ValueError
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    # <strong>2.3,计算等待的结束时间</strong>
                    endtime = _time() + timeout
                    # <strong>2.3.1,如果队列未满,则走步骤3</strong>
                    # <strong>2.3.2,如果队列已满,那么计算出需要等待的时间,如果已经超时,那么,抛出Full异常;</strong>
                    # <strong> + 否则,并进入到not_full的waiting池,等待到超时或被唤醒</strong>
                    # <strong>2.3.3,在线程被唤醒或等待超时之后,回到2.3.1</strong>

                    # 也就是说,如果在timeout指定的时间内,队列中,一直没有空闲空间,
                    # + 那么线程在等待到超时之后,会抛出Full异常
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            # <strong>3,将元素保存到底层数据结构中,</strong>
            # + <strong>并递增unfinished_tasks,同时通知not_empty,唤醒在其中等待数据的线程</strong>
            self._put(item)
            self.unfinished_tasks += 1
            # 值得再次提及是,not_full、not_empty、all_tasks_done底层的锁是同一把
            self.not_empty.notify()
        finally:
            # <strong>4,释放条件变量not_full</strong>
            self.not_full.release()

    # 从底层数据结构中,移除并返回一个元素
    def _get(self):
        return self.queue.popleft()

    <strong>def get(self, block=True, timeout=None):</strong>
        """从队列中移除,并返回一个元素。
        如果<em>block</em>参数是True,并且<em>timeout</em>参数是None,
        那么<strong>get</strong>操作会一直阻塞到队列中有元素可用。
        如果<em>timeout</em>参数是非负数,那么<strong>get</strong>操作最多等待<em>timeout</em>秒,
        如果在这个时间内,队列中一直没有元素可用,那么<strong>get</strong>操作会等待<em>timeout</em>秒,并抛出<strong>Empty</strong>异常。
        否则,如果<em>blocking</em>参数是False,那么会忽略<em>timeout</em>参数,
        如果队列中,有元素可用,那么理解返回一个;否则,抛出<strong>Empty</strong>异常
        """
        # <strong>1,获取not_empty条件变量</strong>
        self.not_empty.acquire()
        try:
            # <strong>2.1.1,如果block是False,并且队列中没有元素,那么抛出Empty异常</strong>
            # <strong>2.1.2,如果block是False,并且队列中有元素,那么走步骤3</strong>
            if not block:
                if not self._qsize():
                    raise Empty
            # <strong>2.2,如果block是True,并且timeout是None,那么:</strong>
            # <strong>2.2.1,如果队列中有元素,那么走步骤3;否则,线程进入到not_empty的waiting池,等待被唤醒</strong>
            # <strong>2.2.2,线程被唤醒之后,回到2.2.1</strong>
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            # 如果timeout小于0,那么抛出ValueError
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                # <strong>2.3,计算等待的结束时间</strong>
                endtime = _time() + timeout
                # <strong>2.3.1,如果队列中有元素,那么走步骤3;否则,计算等待的超时时间,</strong>
                # <strong> + 如果达到了超时时间,那么抛出Empty错误;否则,进入到not_empty的waiting池,</strong>
                # <strong> + 等待到超时,或被唤醒 </strong>
                # <strong>2.3.2,当线程被唤醒之后,回到2.3.1</strong>

                # 也就是说,如果在指定的时间之内,队列中一直没有元素可用,
                # + 那么,会等待timeout秒,并抛出Empty异常
                while not self._qsize():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)

            # <strong>3,从队列中移除并返回一个元素;通知not_full,也就是唤醒一个在not_full中等待的线程</strong>
            item = self._get()
            self.not_full.notify()
            return item
        finally:
            # <strong>4,释放not_empty条件变量</strong>
            self.not_empty.release()

    <strong>def task_done(self):</strong>
        """调用该方法意味着,之前放到队列中的一个任务被完成了。
        该方法是被队列的消费者线程使用的。消费者线程在调用get()方法,从队列中获取到一个任务,
        并在处理之后,需要调用该方法,告诉队列该任务处理完成了。
        <em>每次,成功向队列中put一个元素的时候,都会将unfinished_tasks增加1;</em>
        <em>每次,调用task_done()方法,都会将unfinished_tasks减少1</em>
        该方法的作用是唤醒正在阻塞的join()操作。
        """
        # 获取条件变量
        self.all_tasks_done.acquire()
        try:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                # <strong>当unfinished小于0时,</strong>
                # <strong> + 也就是成功调用put()的次数小于调用task_done()的次数时,会抛出异常</strong>
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                # <strong>当unfinished为0时,会通知all_tasks_done</strong>
                <strong>self.all_tasks_done.notify_all()</strong>
            # <strong>递减unfinished_tasks</strong>
            self.unfinished_tasks = unfinished
        finally:
            # 释放条件变量
            self.all_tasks_done.release()

    <strong>def join(self):</strong>
        """该方法会一直阻塞,直到,队列中所有的元素都被取出,并被处理了。
        当成功向队列中put元素的时候,unfinished_tasks会增加。
        当消费者线程调用task_done()方法时,unfinished_tasks会减少。
        当unfinished_tasks降为0时,join()方法才会退出阻塞状态
        """
        self.all_tasks_done.acquire()
        try:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()
        finally:
            self.all_tasks_done.release()

    # 返回队列中的元素数
    def _qsize(self, len=len):
        return len(self.queue)

    <strong>def qsize(self):</strong>
        """返回队列中的元素数"""
        self.mutex.acquire()
        <strong>n = self._qsize()</strong>
        self.mutex.release()
        return n

    def empty(self):
        """如果队列为空,则返回True,否则返回False"""
        self.mutex.acquire()
        n = not self._qsize()
        self.mutex.release()
        return n

    def full(self):
        """如果队列满了,则返回True,否则返回False"""
        self.mutex.acquire()
        n = 0 < self.maxsize == self._qsize()
        self.mutex.release()
        return n


    # 非阻塞的put
    def put_nowait(self, item):
        return self.put(item, False)

    # 非阻塞的get
    def get_nowait(self):
        return self.get(False)
# coding: utf8

from Queue import Queue
from threading import Thread
import logging
import time

logging.basicConfig(level=logging.INFO,
    format="%(thread)d [%(asctime)s] %(msg)s",
    datefmt="%F %T")
LOGGER = logging.getLogger(__name__)

# 创建一个队列,并向其中添加100个元素
queue = Queue()
for i in range(100):
    queue.put("this is %2d" % i)

# 开始10个线程处理任务
def consume():
    global queue

    while not queue.empty():
        item = queue.get()
        LOGGER.info("get '%s' from queue" % item)
        time.sleep(1)
        # 处理完任务后,调用task_done()方法,告诉队列,任务处理完了
        queue.task_done()
for i in range(10):
    thread = Thread(target=consume)
    thread.start()

# 等待,直到队列中所有任务都被处理了
queue.join()

通常,创建自己的队列,只需要继承Queue类,并在子类中重写 _init()_put()_get()_qsize() 方法即可。比如Queue模块中的PriorityQueue和LifoQueue,就是这么实现的。

但是延迟队列比较特别,因为即使队列中有元素,但是这些元素可能因为还没有到达延迟的时间,而导致这些元素其实是不可用的,也就是不能被get出来。因此,下面的实现中对整个 get() 方法进行了重写。

注意:看下面的代码之前,需要对最小堆以及heapq模块有一定的了解。

# coding: utf8

from Queue import Queue, Empty
import heapq
from time import time as _time


class _Item(object):
    def __init__(self, value, delay):
        assert delay >= 0, "delay can not less than 0"
        self._value = value
        self._available_at = _time() + delay

    @property
    def value(self):
        return self._value

    @property
    def available_at(self):
        return self._available_at

    def __cmp__(self, another):
        if not isinstance(another, self.__class__):
            raise TypeError("expect %s, not %s" % 
                (self.__class__.__name__, type(another).__name__))
        if self.available_at < another.available_at:
            return -1
        elif self.available_at == another.available_at:
            return 0
        else:
            return 1


class DelayQueue(Queue):
    def _init(self, max_size):
        self._underlying = []

    def _put(self, item):
        assert isinstance(item, (list, tuple)) and \
            len(item) == 2 and \
            isinstance(item[1], (int, long, float)), \
            "item should be tuple, and the second " \
                    "element should be float"
        heapq.heappush(self._underlying, _Item(item[0], item[1]))

    def _get(self):
        item = heapq.heappop(self._underlying)
        return item.value

    def _qsize(self):
        return len(self._underlying)

    def _has_available_item(self):
        if len(self._underlying) == 0:
            return False
        item = self._underlying[0]
        if item.available_at <= _time():
            return True
        else:
            return False

    def _at_least_wait_for(self):
        if len(self._underlying) == 0:
            return None
        item = self._underlying[0]
        if item.available_at <= _time():
            return 0
        else:
            return max(0, item.available_at - _time())

    def get(self, block=True, timeout=None):
        # 1,获取not_empty条件变量
        self.not_empty.acquire()

        try:
            # 2.1.1,如果block是False,并且在当前,队列无可用的元素,
            # + 那么抛出Empty异常
            # 2.1.2,如果block是False,并且在当前,队列有可用的元素,
            # + 那么走步骤3
            if not block:
                if not self._has_available_item():
                    raise Empty
            # 2.2,如果block是True,并且timeout是None,那么:
            # 2.2.1,如果队列中有可用的元素,那么走步骤3;
            # + 否则,线程进入到not_empty的waiting池,等待被唤醒或者是
            # + + 堆中的第一个元素变得可用
            # 2.2.2,线程被唤醒之后,回到2.2.1
            elif timeout is None:
                while not self._has_available_item():
                    self.not_empty.wait(self._at_least_wait_for())
            # 如果timeout小于0,那么抛出ValueError
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                # 2.3,计算等待的结束时间
                endtime = _time() + timeout
                # 2.3.1,如果队列中有可用的元素,那么走步骤3;
                # + 否则,计算仍需等待的时间,
                # + 如果达到了超时时间,那么抛出Empty错误;
                # + 否则,进入到not_empty的waiting池,
                # + 等待到超时或堆中的第一个元素变得可用,或者被唤醒
                # 2.3.2,当线程被唤醒之后,回到2.3.1

                # 也就是说,如果在指定的时间之内,队列中一直没有元素可用,
                # + 那么,会等待timeout秒,并抛出Empty异常
                while not self._has_available_item():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    wait_for = min(self._at_least_wait_for, remaining)
                    self.not_empty.wait(wait_for)

            # 3,从队列中移除并返回一个元素;通知not_full,
            # + 也就是唤醒一个在not_full中等待的线程
            item = self._get()
            self.not_full.notify()
            return item
        finally:
            # 4,释放not_empty条件变量
            self.not_empty.release()


if __name__ == "__main__":
    from threading import Thread
    delay_queue = DelayQueue(1)

    def consume():
        t1 = _time()
        print(delay_queue.get())
        t2 = _time()
        print("time used: %f" % (t2 - t1))
        print(delay_queue.get())
        t3 = _time()
        print("time used: %f" % (t3 - t2))

    t = Thread(target=consume)
    t.setDaemon(False)
    t.start()
    delay_queue.put(("value1", 3))
    delay_queue.put(("value2", 6))

以上所述就是小编给大家介绍的《Python Queue源码分析以及自己实现DelayQueue》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Probabilistic Method Second Edition

The Probabilistic Method Second Edition

Noga Alon、Joel H. Spencer / Wiley-Blackwell / 2000 / $121.95

The leading reference on probabilistic methods in combinatorics-now expanded and updated When it was first published in 1991, The Probabilistic Method became instantly the standard reference on one......一起来看看 《The Probabilistic Method Second Edition》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

html转js在线工具
html转js在线工具

html转js在线工具