内容简介:Python中的多线程、包括协程,由于CPython的概念,就是与普通的多进程类似,Python多进程的核心概念是
Python中的多线程、包括协程,由于CPython的 GIL (Global interpreter Lock ,全局解释锁)问题,只能实现并发(Concurrency),不能实现并行(Parallelism)。 因此,在并行计算场景,多进程是 Python 最简单的选择。
Python多进程概念
概念,就是 class
。
了解概念,就会了解 class
之间的关系。
Process
与普通的多进程类似,Python多进程的核心概念是 Process (进程)。 一个简单的进程使用示例如下:
from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p0 = Process(target=f, args=('alice',)) p1 = Process(target=f, args=('bob',)) p0.start() p1.start() p0.join() p1.join()
进程使用三部曲:
- 创建 Process 。
-
start
,开始执行。 -
join
,等待执行完毕。
Pipe
Pipe
即管道,是Bash中最常见的跨进程通信手段。 echo hello | tee stdout.log
,中间的 |
就是管道,把前一个进程的stdout传递给下一个进程。
Pipe 创建时,返回两个 Connection ,前者负责 send 而后者负责 recv 。 两个进程各执一端,就可以实现单向通信。
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': receiver, sender = Pipe() p = Process(target=f, args=(sender,)) p.start() print(receiver.recv()) # prints "[42, None, 'hello']" p.join()
如果在创建 Pipe
时,指定 duplex=True
,比如 Pipe(True)
,两个 Connection
即可实现双向通信。
默认 duplex=False
。
Queue
Queue 是一个基于标准模块 queue 、包装了 Pipe 的类。 它不仅具有先进先出(FIFO)的特性,还能实现跨进程通信。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
在实际使用中,除了少数简单场景外,都不会直接使用 Process 、 Pipe 、 Queue 来实现多进程。 这种低层级(low level,无贬义)的API,可读性差,容易出错。 常用的是高层级API——进程池。
Pool
由于 Process 创建、销毁有较大开销,并且并行数受机器CPU数量的限制,过多无益。 一个 Pool (进程池)会统一创建并维持一定数量的 Process ,并行地执行Task。 在所有Task执行完毕后,再统一地关闭 Process 。
这里Task(任务)的概念,并未被实现为一个 class
,而是一个 callable
,比如下面的 f
、 g
。
#!/usr/bin/env python3 # -*- coding:utf-8 -*- from multiprocessing.pool import Pool def f(x): return x * x def g(x, y): return x**y def main(): with Pool(4) as pool: result = pool.map(f, [1, 2, 3, 4, 5]) print(type(result)) print(result) with Pool(4) as pool: result = pool.starmap(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)]) print(type(result)) print(result) if __name__ == '__main__': main()
以上代码保存为 multi.py
文件,执行结果如下:
$ python3 multi.py <class 'list'> [1, 4, 9, 16, 25] <class 'list'> [1, 16, 243, 4096, 78125]
map
的用法类似内置函数 map
,专门处理单个参数的 callable
;
而 starmap
则是用来处理多个参数的 callable
。
此外,还有利用 Pool 执行单个Task的 apply 。 除非Task本身就是一个个来的,否则使用 apply 的效率不高。
比起 apply
,更值得关注的是 imap
和 imap_unordered
。 imap
和 map
非常类似,而这个多出来的 i
,则是 Iterable
。 map
使用的是 list
而 imap
则是 Iterable
,前者效率略高,而后者内存消耗显著的小。
在处理结果上, imap
可以尽快返回一个 Iterable
的结果,而 map
则需要等待全部Task执行完毕,返回 list
。
无论 map
还是 imap
,都需要按顺序等待Task执行完成,而 imap_unordered
则不必。 imap_unordered
返回的 Iterable
,会优先迭代到先执行完成的Task。
三者各有特点,要按需使用。
AsyncResult
以上为进程池的同步使用方案。 同步方案会卡在 map 或 starmap 这一行,直到所有任务都执行完毕。 有时,我们会需要一个异步方案,这时就需要用到 map_async 或 starmap_async 。 它们返回的结果,就是 AsyncResult 。
#!/usr/bin/env python3 # -*- coding:utf-8 -*- from multiprocessing.pool import Pool def f(x): return x * x def g(x, y): return x**y def main(): with Pool(4) as pool: result = pool.map_async(f, [1, 2, 3, 4, 5]) print(type(result)) print(result.get()) with Pool(4) as pool: result = pool.starmap_async(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)]) print(type(result)) print(result.get()) if __name__ == '__main__': main()
以上代码保存为 multi_async.py
文件,执行结果如下:
$ python3 multi_async.py <class 'multiprocessing.pool.MapResult'> [1, 4, 9, 16, 25] <class 'multiprocessing.pool.MapResult'> [1, 16, 243, 4096, 78125]
以上代码中,实际等待位置是 result.get()
那一行。
Timeout
以上多进程代码,其实是不完善的。
除非Task非常简单,并无IO、网络等资源依赖,否则多进程也好、多线程也好,都有可能执行不完。
为了避免未知原因的挂起,及时止损,通常需要设置 timeout
。 AsyncResult
在阻塞时,可以用 wait
或 get
,设置 timeout
参数。
#!/usr/bin/env python3 # -*- coding:utf-8 -*- import time from multiprocessing.pool import Pool, TimeoutError def sleep(duration): time.sleep(duration) with open('result.log', 'a') as file: file.write(str(duration)) file.write('\n') return duration def main(): with Pool(4) as pool: result = pool.map_async(sleep, range(8)) try: print(result.get(timeout=5)) except TimeoutError: print(TimeoutError.__name__) if __name__ == '__main__': main()
以上代码保存为 timeout.py
文件,执行结果如下:
$ python3 timeout.py TimeoutError $ cat result.log 0 1 2 3 4
可以看到,由于 timeout=5
,4秒以前的Task都成功了,而大于(等于)5秒的Task都失败了。
当 get
需要等待所有进程结束时,需要在 Pool
关闭以前。
因此,需要在 with
作用域中执行,否则将超时或(没设 timeout
)挂死。
如果使用 wait
,则 get
可以在 with
以外获取结果。
因此,更 推荐使用
wait
配合 get
。
def main(): with Pool(4) as pool: result = pool.map_async(sleep, range(8)) result.wait() try: print(result.get(9)) except TimeoutError: print(TimeoutError.__name__)
替换 main()
,执行结果如下:
$ python3 timeout.py [0, 1, 2, 3, 4, 5, 6, 7]
总结
前面提到Python做并行计算的选择,多进程 multiprocessing 只是最简单的一个选择。 另外还有两个常见选择: 一是使用其它解释器实现的Python,比如PyPy、Jython等; 二是使用 C语言 优化需要并行的代码,在Native层绕过GIL的限制; 三是使用协程(或线程)加 subprocess ,这也算是多进程的一个方案。 此外,确认代码是否真的会被GIL所影响,是首要工作。 如果代码中真正耗时的计算是在Native层执行——这在Python中非常常见,比如OpenCV——那么用多线程也没问题。
另外,要注意多进程的测试覆盖问题。 在另一个进程执行的代码,是无法被 coverage 确认为已覆盖的。 需要对执行内容进行单独测试,或者在程序中预留未用多进程优化的原始方案。
其实,多进程带来的额外通信、切换开销,有时候也是很明显的。 还有个问题是,主进程被杀掉后,子进程会仍然存活,这在某些场景下会产生未知问题。 所以,在机器不是很强大的场景下,用原始的单线程串行方案,是最经济实用的选择。
参考
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 进程:进程生命周期
- Python 知识巩固:通过主进程带起多个子进程实现多进程执行逻辑
- Python 中子进程与父进程
- 什么是僵尸进程,如何找到并杀掉僵尸进程?
- Python第十二章-多进程和多线程01-多进程
- 深入Node.js的进程与子进程:从文档到实践
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Pro Django
Marty Alchin / Apress / 2008-11-24 / USD 49.99
Django is the leading Python web application development framework. Learn how to leverage the Django web framework to its full potential in this advanced tutorial and reference. Endorsed by Django, Pr......一起来看看 《Pro Django》 这本书的介绍吧!
Markdown 在线编辑器
Markdown 在线编辑器
UNIX 时间戳转换
UNIX 时间戳转换