Python中的多进程(multiprocessing)

栏目: Python · 发布时间: 5年前

内容简介:Python中的多线程、包括协程,由于CPython的概念,就是与普通的多进程类似,Python多进程的核心概念是

Python中的多线程、包括协程,由于CPython的 GIL (Global interpreter Lock ,全局解释锁)问题,只能实现并发(Concurrency),不能实现并行(Parallelism)。 因此,在并行计算场景,多进程是 Python 最简单的选择。

Python多进程概念

概念,就是 class 。 了解概念,就会了解 class 之间的关系。

Process

与普通的多进程类似,Python多进程的核心概念是 Process (进程)。 一个简单的进程使用示例如下:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p0 = Process(target=f, args=('alice',))
    p1 = Process(target=f, args=('bob',))
    p0.start()
    p1.start()
    p0.join()
    p1.join()

进程使用三部曲:

  1. 创建 Process
  2. start ,开始执行。
  3. join ,等待执行完毕。

Pipe

Pipe 即管道,是Bash中最常见的跨进程通信手段。 echo hello | tee stdout.log ,中间的 | 就是管道,把前一个进程的stdout传递给下一个进程。

Pipe 创建时,返回两个 Connection ,前者负责 send 而后者负责 recv 。 两个进程各执一端,就可以实现单向通信。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    receiver, sender = Pipe()
    p = Process(target=f, args=(sender,))
    p.start()
    print(receiver.recv())   # prints "[42, None, 'hello']"
    p.join()

如果在创建 Pipe 时,指定 duplex=True ,比如 Pipe(True) ,两个 Connection 即可实现双向通信。 默认 duplex=False

Queue

Queue 是一个基于标准模块 queue 、包装了 Pipe 的类。 它不仅具有先进先出(FIFO)的特性,还能实现跨进程通信。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

在实际使用中,除了少数简单场景外,都不会直接使用 ProcessPipeQueue 来实现多进程。 这种低层级(low level,无贬义)的API,可读性差,容易出错。 常用的是高层级API——进程池。

Pool

由于 Process 创建、销毁有较大开销,并且并行数受机器CPU数量的限制,过多无益。 一个 Pool (进程池)会统一创建并维持一定数量的 Process ,并行地执行Task。 在所有Task执行完毕后,再统一地关闭 Process

这里Task(任务)的概念,并未被实现为一个 class ,而是一个 callable ,比如下面的 fg

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from multiprocessing.pool import Pool


def f(x):
    return x * x


def g(x, y):
    return x**y


def main():
    with Pool(4) as pool:
        result = pool.map(f, [1, 2, 3, 4, 5])
        print(type(result))
        print(result)

    with Pool(4) as pool:
        result = pool.starmap(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)])
        print(type(result))
        print(result)


if __name__ == '__main__':
    main()

以上代码保存为 multi.py 文件,执行结果如下:

$ python3 multi.py
<class 'list'>
[1, 4, 9, 16, 25]
<class 'list'>
[1, 16, 243, 4096, 78125]

map 的用法类似内置函数 map ,专门处理单个参数的 callable ; 而 starmap 则是用来处理多个参数的 callable

此外,还有利用 Pool 执行单个Task的 apply 。 除非Task本身就是一个个来的,否则使用 apply 的效率不高。

比起 apply ,更值得关注的是 imapimap_unorderedimapmap 非常类似,而这个多出来的 i ,则是 Iterablemap 使用的是 listimap 则是 Iterable ,前者效率略高,而后者内存消耗显著的小。 在处理结果上, imap 可以尽快返回一个 Iterable 的结果,而 map 则需要等待全部Task执行完毕,返回 list 。 无论 map 还是 imap ,都需要按顺序等待Task执行完成,而 imap_unordered 则不必。 imap_unordered 返回的 Iterable ,会优先迭代到先执行完成的Task。 三者各有特点,要按需使用。

AsyncResult

以上为进程池的同步使用方案。 同步方案会卡在 mapstarmap 这一行,直到所有任务都执行完毕。 有时,我们会需要一个异步方案,这时就需要用到 map_asyncstarmap_async 。 它们返回的结果,就是 AsyncResult

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from multiprocessing.pool import Pool


def f(x):
    return x * x


def g(x, y):
    return x**y


def main():
    with Pool(4) as pool:
        result = pool.map_async(f, [1, 2, 3, 4, 5])
        print(type(result))
        print(result.get())

    with Pool(4) as pool:
        result = pool.starmap_async(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)])
        print(type(result))
        print(result.get())


if __name__ == '__main__':
    main()

以上代码保存为 multi_async.py 文件,执行结果如下:

$ python3 multi_async.py
<class 'multiprocessing.pool.MapResult'>
[1, 4, 9, 16, 25]
<class 'multiprocessing.pool.MapResult'>
[1, 16, 243, 4096, 78125]

以上代码中,实际等待位置是 result.get() 那一行。

Timeout

以上多进程代码,其实是不完善的。 除非Task非常简单,并无IO、网络等资源依赖,否则多进程也好、多线程也好,都有可能执行不完。 为了避免未知原因的挂起,及时止损,通常需要设置 timeoutAsyncResult 在阻塞时,可以用 waitget ,设置 timeout 参数。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import time
from multiprocessing.pool import Pool, TimeoutError


def sleep(duration):
    time.sleep(duration)
    with open('result.log', 'a') as file:
        file.write(str(duration))
        file.write('\n')
    return duration


def main():
    with Pool(4) as pool:
        result = pool.map_async(sleep, range(8))
        try:
            print(result.get(timeout=5))
        except TimeoutError:
            print(TimeoutError.__name__)


if __name__ == '__main__':
    main()

以上代码保存为 timeout.py 文件,执行结果如下:

$ python3 timeout.py
TimeoutError
$ cat result.log
0
1
2
3
4

可以看到,由于 timeout=5 ,4秒以前的Task都成功了,而大于(等于)5秒的Task都失败了。

get 需要等待所有进程结束时,需要在 Pool 关闭以前。 因此,需要在 with 作用域中执行,否则将超时或(没设 timeout )挂死。 如果使用 wait ,则 get 可以在 with 以外获取结果。 因此,更 推荐使用 wait 配合 get

def main():
    with Pool(4) as pool:
        result = pool.map_async(sleep, range(8))
        result.wait()
    try:
        print(result.get(9))
    except TimeoutError:
        print(TimeoutError.__name__)

替换 main() ,执行结果如下:

$ python3 timeout.py
[0, 1, 2, 3, 4, 5, 6, 7]

总结

前面提到Python做并行计算的选择,多进程 multiprocessing 只是最简单的一个选择。 另外还有两个常见选择: 一是使用其它解释器实现的Python,比如PyPy、Jython等; 二是使用 C语言 优化需要并行的代码,在Native层绕过GIL的限制; 三是使用协程(或线程)加 subprocess ,这也算是多进程的一个方案。 此外,确认代码是否真的会被GIL所影响,是首要工作。 如果代码中真正耗时的计算是在Native层执行——这在Python中非常常见,比如OpenCV——那么用多线程也没问题。

另外,要注意多进程的测试覆盖问题。 在另一个进程执行的代码,是无法被 coverage 确认为已覆盖的。 需要对执行内容进行单独测试,或者在程序中预留未用多进程优化的原始方案。

其实,多进程带来的额外通信、切换开销,有时候也是很明显的。 还有个问题是,主进程被杀掉后,子进程会仍然存活,这在某些场景下会产生未知问题。 所以,在机器不是很强大的场景下,用原始的单线程串行方案,是最经济实用的选择。

参考


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pro Django

Pro Django

Marty Alchin / Apress / 2008-11-24 / USD 49.99

Django is the leading Python web application development framework. Learn how to leverage the Django web framework to its full potential in this advanced tutorial and reference. Endorsed by Django, Pr......一起来看看 《Pro Django》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换