内容简介:先从初始化的函数来看:从这初始化函数能得到哪些信息呢?首先,队列是可以设置其容量大小的,并且具体的底层存放元素的它使用了
起步
queue
模块提供适用于多线程编程的先进先出(FIFO)数据结构。因为它是线程安全的,所以多个线程很轻松地使用同一个实例。
源码分析
先从初始化的函数来看:
class Queue: def __init__(self, maxsize=0): # 设置队列的最大容量 self.maxsize = maxsize self._init(maxsize) # 线程锁,互斥变量 self.mutex = threading.Lock() # 由锁衍生出三个条件变量 self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def _init(self, maxsize): # 初始化底层数据结构 self.queue = deque()
从这初始化函数能得到哪些信息呢?首先,队列是可以设置其容量大小的,并且具体的底层存放元素的它使用了 collections.deque()
双端列表的数据结构,这使得能很方便的做先进先出操作。这里还特地抽象为 _init
函数是为了方便其子类进行覆盖,允许子类使用其他结构来存放元素(比如优先队列使用了 list
)。
然后就是线程锁 self.mutex
,对于底层数据结构 self.queue
的操作都要先获得这把锁;再往下是三个条件变量,这三个 Condition
都以 self.mutex
作为参数,也就是说它们共用一把锁;从这可以知道诸如 with self.mutex
与 with self.not_empty
等都是互斥的。
基于这些锁而做的一些简单的操作:
class Queue: ... def qsize(self): # 返回队列中的元素数 with self.mutex: return self._qsize() def empty(self): # 队列是否为空 with self.mutex: return not self._qsize() def full(self): # 队列是否已满 with self.mutex: return 0 < self.maxsize <= self._qsize() def _qsize(self): return len(self.queue)
这个代码片段挺好理解的,无需分析。
作为队列,主要得完成入队与出队的操作,首先是入队:
class Queue: ... def put(self, item, block=True, timeout=None): with self.not_full: # 获取条件变量not_full if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full # 如果 block 是 False,并且队列已满,那么抛出 Full 异常 elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() # 阻塞直到由剩余空间 elif timeout < 0: # 不合格的参数值,抛出ValueError raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout # 计算等待的结束时间 while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full # 等待期间一直没空间,抛出 Full 异常 self.not_full.wait(remaining) self._put(item) # 往底层数据结构中加入一个元素 self.unfinished_tasks += 1 self.not_empty.notify() def _put(self, item): self.queue.append(item)
尽管只有二十几行的代码,但这里的逻辑还是比较复杂的。它要处理超时与队列剩余空间不足的情况,具体几种情况如下:
-
如果
block
是 False,忽略timeout参数- 若此时队列已满,则抛出 Full 异常;
- 若此时队列未满,则立即把元素保存到底层数据结构中;
-
如果
block
是 True-
若
timeout
是None
时,那么put操作可能会阻塞,直到队列中有空闲的空间(默认); -
若
timeout
是非负数,则会阻塞相应时间直到队列中有剩余空间,在这个期间,如果队列中一直没有空间,抛出Full
异常;
-
若
处理好参数逻辑后,,将元素保存到底层数据结构中,并递增 unfinished_tasks
,同时通知 not_empty
,唤醒在其中等待数据的线程。
出队操作:
class Queue: ... def get(self, block=True, timeout=None): with self.not_empty: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() self.not_full.notify() return item def _get(self): return self.queue.popleft()
get()
操作是 put()
相反的操作,代码块也及其相似, get()
是从队列中移除最先插入的元素并将其返回。
-
如果
block
是 False,忽略timeout参数- 若此时队列没有元素,则抛出 Empty 异常;
- 若此时队列由元素,则立即把元素保存到底层数据结构中;
-
如果
block
是 True-
若
timeout
是None
时,那么get操作可能会阻塞,直到队列中有元素(默认); -
若
timeout
是非负数,则会阻塞相应时间直到队列中有元素,在这个期间,如果队列中一直没有元素,则抛出Empty
异常;
-
若
最后,通过 self.queue.popleft()
将最早放入队列的元素移除,并通知 not_full
,唤醒在其中等待数据的线程。
这里有个值得注意的地方,在 put()
操作中递增了 self.unfinished_tasks
,而 get()
中却没有递减,这是为什么?
这其实是为了留给用户一个消费元素的时间, get()
仅仅是获取元素,并不代表消费者线程处理的该元素,用户需要调用 task_done()
来通知队列该任务处理完成了:
class Queue: ... def task_done(self): with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: # 也就是成功调用put()的次数小于调用task_done()的次数时,会抛出异常 raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() # 当unfinished为0时,会通知all_tasks_done self.unfinished_tasks = unfinished def join(self): with self.all_tasks_done: while self.unfinished_tasks: # 如果有未完成的任务,将调用wait()方法等待 self.all_tasks_done.wait()
由于 task_done()
使用方调用的,当 task_done()
次数大于 put()
次数时会抛出异常。
task_done()
操作的作用是唤醒正在阻塞的 join()
操作。 join()
方法会一直阻塞,直到队列中所有的元素都被取出,并被处理了(和线程的join方法类似)。也就是说 join()
方法必须配合 task_done()
来使用才行。
LIFO 后进先出队列
LifoQueue使用后进先出顺序,与栈结构相似:
class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
这就是 LifoQueue
全部代码了,这正是 Queue
设计很棒的一个原因,它将底层的数据操作抽象成四个操作函数,本身来处理线程安全的问题,使得其子类只需关注底层的操作。
LifoQueue 底层数据结构改用 list
来存放,通过 self.queue.pop()
就能将 list 中最后一个元素移除,无需重置索引。
PriorityQueue 优先队列
from heapq import heappush, heappop class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): heappush(self.queue, item) def _get(self): return heappop(self.queue)
优先队列使用了 heapq
模块的结构,也就是最小堆的结构。优先队列更为常用,队列中项目的处理顺序需要基于这些项目的特征,一个简单的例子:
import queue class A: def __init__(self, priority, value): self.priority = priority self.value = value def __lt__(self, other): return self.priority < other.priority q = queue.PriorityQueue() q.put(A(1, 'a')) q.put(A(0, 'b')) q.put(A(1, 'c')) print(q.get().value) # 'b'
使用优先队列的时候,需要定义 __lt__
魔术方法,来定义它们之间如何比较大小。若元素的 priority
相同,依然使用先进先出的顺序。
以上所述就是小编给大家介绍的《Python 模块源码分析:queue 队列》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 等待队列源码分析
- 聊聊 JDK 阻塞队列源码分析
- 源码级深挖 AQS 队列同步器
- 任务队列怎么写?python rq源码分析
- JStorm 源码分析 - 高性能队列 DisruptorQueue
- Laravel Queue——消息队列任务与分发源码剖析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Intersectional Internet
Safiya Umoja Noble、Brendesha M. Tynes / Peter Lang Publishing / 2016
From race, sex, class, and culture, the multidisciplinary field of Internet studies needs theoretical and methodological approaches that allow us to question the organization of social relations that ......一起来看看 《The Intersectional Internet》 这本书的介绍吧!