Python 的协程

栏目: Python · 发布时间: 5年前

内容简介:协程拥有极高的执行效率,因为子程序切换不是线程切换,而是由程序自身控制,因此没有线程切换的开销。和多线程比,线程数量越多,协程的性能优势就越明显。不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多协程也就是微线程,python 的 generator(生成器) 中的 yield 可以一定程度上实现协程

0x00 协程的优势

协程拥有极高的执行效率,因为子程序切换不是线程切换,而是由程序自身控制,因此没有线程切换的开销。和多线程比,线程数量越多,协程的性能优势就越明显。

不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多

0x01 Python 中的协程

协程也就是微线程,python 的 generator(生成器) 中的 yield 可以一定程度上实现协程

在 generator 中,我们不但可以通过 for 循环来迭代,还可以不断调用 next() 函数获取由 yield 语句返回的下一个值。

但是 Python 的 yield 不但可以返回一个值,它还可以接收调用者发出的参数。 Python 的协程

0x02 使用 gevent

python 中可以通过 generator 实现协程,但是不完全,第三方的 gevent 为 Python 提供了比较完善的协程支持,gevent 可以通过 monkey patch 动态的修改 Python 自带的一些标准库

由于 IO 操作(比如访问网络)非常耗时,经常使程序处于等待状态,而 gevent 可以为我们自动切换协程,再在适当的时候切换回来继续执行,这就保证总有 greenlet 在运行,而不是等待 IO

使用 gevent 可以获得极高的并发性能,但 gevent 只能在 Unix/Linux 下运行,在 Windows 下不保证正常安装和运行 下面 3 个网络操作是并发执行的,且结束顺序不同,但只有一个线程

from gevent import monkey; monkey.patch_all()
import requests
import gevent

def get_resp_size(url):
    print('GET: %s' % url)
    html = requests.get(url).content
    print('%d bytes received from %s.' % (len(html), url))

def gevent_test(urls):
    job_list = [gevent.spawn(get_resp_size, url) for url in urls]
    gevent.joinall(job_list)

urls = [
    'https://www.python.org/',
    'https://www.yahoo.com/',
    'https://github.com/',
]
gevent_test(urls)

0x03 asyncio

在 python 3.4 时引入了 asyncio 这个模块,asyncio 专门被用来实现异步IO操作。

通过使用 yield from 和 asyncio 模块中的 @asyncio.coroutine 可以实现协程

对于简单的迭代器,yield from iterable 本质上等于 for item in iterable: yield item 的缩写版

  • hello world 示例
@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")
  • 请求web网页
import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

0x04 async/await

在 python 3.5 时引入了 async/await

  • 关于asyncio的一些关键字的说明

    • event_loop
      事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数
    • coroutine
      协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
    • task
      一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态
    • future
      代表将来执行或没有执行的任务的结果。它和task没有本质上的区别
    • async/await
      async定义一个协程,await就像生成器里的yield一样用于挂起阻塞的异步调用接口。
  • async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

    • 把@asyncio.coroutine替换为async;
    • 把yield from替换为await。
  • hello world 示例

@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 改为如下代码:

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")
  • 使用asyncio创建任务运行,并给task指定callback得到执行结果
import asyncio

async def do_some_work(x):
    print("waiting:", x)
    return "Done after {}s".format(x)

def callback(future):
    result = future.result()
    print('callback:',result)

def run1():
    loop = asyncio.get_event_loop() # 定义一个事件loop
    coroutine = do_some_work(2) # 定义协程对象,它不能直接运行
    # 将协程加入到事件循环 loop:其实 run_until_complete 内部将协程包装成了一个任务(task)对象了
    # task 对象是 Future 类的子类,保存了协程运行后的状态,用于未来获取协程的结果
    result = loop.run_until_complete(coroutine)
    print(result)
    loop.close()

def run2():
    loop = asyncio.get_event_loop() # 定义一个事件loop
    coroutine = do_some_work(1) # 定义协程对象,它不能直接运行
    task = loop.create_task(coroutine) # 创建 task
    # task = asyncio.ensure_future(coroutine) # 也可以使用asyncio.ensure_future创建 task
    task.add_done_callback(callback) # 回调函数,获取task的返回值
    loop.run_until_complete(task) # 将task加入到事件循环 loop
    loop.close()
  • 使用 asyncio 并发执行协程

异步和并发与并行并没有关系,异步用于表示并发或并行任务的印象

import asyncio

async def do_some_work(x):
    print("waiting:", x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

def callback(future):
    print('callback',future.result())

def run1():
    # 4s 后结果同时返回
    tasks = [asyncio.ensure_future(do_some_work(x)) for x in [2, 1, 4]]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    for task in tasks:
        result = task.result()
        print(result)
    loop.close()

def run2():
    # 1s、2s、4s 分别返回结果
    tasks = [asyncio.ensure_future(do_some_work(x)) for x in [2,1,4]]
    for task in tasks:
        task.add_done_callback(callback)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
  • 停止协程
import asyncio

async def do_some_work(x):
    print("waiting:", x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

def callback(future):
    print('callback',future.result())

def run():
    # 1s、2s、4s 分别返回结果
    tasks = [asyncio.ensure_future(do_some_work(x)) for x in [2,1,4]]
    for task in tasks:
        task.add_done_callback(callback)

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        for task in asyncio.Task.all_tasks():
            print(task.cancel())
        loop.stop()
        # loop stop 之后还需要再次开启事件循环,最后再 close,不然还会抛出异常
        loop.run_forever()
    finally:
        loop.close()

0x05 协程与线程配合使用

import asyncio
import threading

async def do_some_work(x):
    print("waiting:", x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

def callback(future):
    print('callback',future.result())

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def run():
    # 在子线程中运行协程loop
    sub_loop = asyncio.new_event_loop()
    thread = threading.Thread(target=start_loop, args=(sub_loop,))
    thread.start()

    # 在主线程给子线程的 loop 添加协程任务
    futures = [asyncio.run_coroutine_threadsafe(do_some_work(x), sub_loop) for x in [2,1,4]]
    for future in futures:
        future.add_done_callback(callback)

    print('test...')
    
    try:
        while True:pass
    except KeyboardInterrupt as e:
        import sys;sys.exit('user aborted!')
    finally:
        sub_loop.call_soon_threadsafe(sub_loop.stop)

run()
➜  python3 tmp.py
waiting: 2
test...
waiting: 1
waiting: 4
callback Done after 1s
callback Done after 2s
callback Done after 4s
^Cuser aborted!

Reference(侵删):

若未作声明则文章版权归本人所有,转载请注明原文链接:


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Data Structures and Algorithms in Java

Data Structures and Algorithms in Java

Michael T. Goodrich、Roberto Tamassia / Wiley / 2010-01-26 / USD 177.41

* This newest edition examines fundamental data structures by following a consistent object-oriented framework that builds intuition and analysis skills of data structures and algorithms * Presents ne......一起来看看 《Data Structures and Algorithms in Java》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具