经常需要和其他的一些程序和进程一起工作,以便使用已经存在的代码而不用重写,或者访问 Python 中已经不可用的库或特性。和网络IO一样,asyncio包含两个抽象概念来启动其他程序然后进行交互。
对于Subprocesses使用Protocol Abstraction
# asyncio_subprocess_protocol.py import asyncio import functools async def run_df(loop): print('in run_df') cmd_done = asyncio.Future(loop=loop) factory = functools.partial(DFProtocol, cmd_done) proc = loop.subprocess_exec( factory, 'df', '-hl', stdin=None, stderr=None, ) try: print('launching process') transport, protocol = await proc print('waiting for process to complete') await cmd_done finally: transport.close() return cmd_done.result()
class DFProtocol(asyncio.SubprocessProtocol): FD_NAMES = ['stdin', 'stdout', 'stderr'] def __init__(self, done_future): self.done = done_future self.buffer = bytearray() super().__init__()
def connection_made(self, transport): print('process started {}'.format(transport.get_pid())) self.transport = transport
def pipe_data_received(self, fd, data): print('read {} bytes from {}'.format(len(data), self.FD_NAMES[fd])) if fd == 1: self.buffer.extend(data)
def process_exited(self): print('process exited') return_code = self.transport.get_returncode() print('return code {}'.format(return_code)) if not return_code: cmd_output = bytes(self.buffer).decode() results = self._parse_results(cmd_output) else: results = [] self.done.set_result((return_code, results))
def _parse_results(self, output): print('parsing results') # Output has one row of headers, all single words. # remaining rows are one per filesystem, with columns # matching the headers (assuming that none of the # mount points have whitespace in the names) if not output: return [] lines = output.splitlines() headers = lines[0].split() devices = lines[1:] results = [ dict(zip(headers, line.split())) for line in devices ] return results
event_loop = asyncio.get_event_loop() try: return_code, results = event_loop.run_until_complete( run_df(event_loop) ) finally: event_loop.close() if return_code: print('error exit {}'.format(return_code)) else: print('\nFree space:') for r in results: print('{Mounted:25}: {Avail}'.format(**r))
$ python3 asyncio_subprocess_protocol.py in run_df launching process process started 49675 waiting for process to complete read 332 bytes from stdout process exited return code 0 parsing results Free space: / : 233Gi /Volumes/hubertinternal : 157Gi /Volumes/hubert-tm : 2.3Ti
使用协程直接运行进程,而不是通过Protocol子类访问,请调用create_subprocess_exec()并且指定要连接到的管道stdout, stderr和stdin。协程生成子进程的结果是一个Process实例,可用于管理子进程或与之通信。
# asyncio_subprocess_coroutine.py import asyncio import asyncio.subprocess async def run_df(): print('in run_df') buffer = bytearray() create = asyncio.create_subprocess_exec( 'df', '-hl', stdout=asyncio.subprocess.PIPE, ) print('launching process') proc = await create print('process started {}'.format(proc.pid))
在这个例子中, 除了命令行参数外,df不需要任何输入,所以下一步就是读取所有的输出。Protocol没有控制同一时间读取多少数据。这个例子使用readline(),但是也可以直接调用read()方法读取数据而不是面向行级的。和protocol例子一样,命令的输出进行了缓存,稍后打印出来。
while True: line = await proc.stdout.readline() print('read {!r}'.format(line)) if not line: print('no more output from command') break buffer.extend(line)
print('waiting for process to complete') await proc.wait()
return_code = proc.returncode print('return code {}'.format(return_code)) if not return_code: cmd_output = bytes(buffer).decode() results = _parse_results(cmd_output) else: results = [] return (return_code, results)
event_loop = asyncio.get_event_loop() try: return_code, results = event_loop.run_until_complete( run_df() ) finally: event_loop.close() if return_code: print('error exit {}'.format(return_code)) else: print('\nFree space:') for r in results: print('{Mounted:25}: {Avail}'.format(**r))
$ python3 asyncio_subprocess_coroutine.py in run_df launching process process started 49678 read b'Filesystem Size Used Avail Capacity iusedifree %iused Mounted on\n' read b'/dev/disk2s2 446Gi 213Gi 233Gi 48% 5595508261015132 48% /\n' read b'/dev/disk1 465Gi 307Gi 157Gi 67% 8051492241281172 66% /Volumes/hubertinternal\n' read b'/dev/disk3s2 3.6Ti 1.4Ti 2.3Ti 38% 181837749306480579 37% /Volumes/hubert-tm\n' read b'' no more output from command waiting for process to complete return code 0 parsing results Free space: / : 233Gi /Volumes/hubertinternal : 157Gi /Volumes/hubert-tm : 2.3Ti
to_upper()协程方法采用事件循环和字符串作为参数。它产生第二个进程运行”tr [:lower:] [:upper:]”
# asyncio_subprocess_coroutine_write.py import asyncio import asyncio.subprocess async def to_upper(input): print('in to_upper') create = asyncio.create_subprocess_exec( 'tr', '[:lower:]', '[:upper:]', stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, ) print('launching process') proc = await create print('pid {}'.format(proc.pid))
print('communicating with process') stdout, stderr = await proc.communicate(input.encode())
print('waiting for process to complete') await proc.wait()
return_code = proc.returncode print('return code {}'.format(return_code)) if not return_code: results = bytes(stdout).decode() else: results = '' return (return_code, results)
MESSAGE = """ This message will be converted to all caps. """ event_loop = asyncio.get_event_loop() try: return_code, results = event_loop.run_until_complete( to_upper(MESSAGE) ) finally: event_loop.close() if return_code: print('error exit {}'.format(return_code)) else: print('Original: {!r}'.format(MESSAGE)) print('Changed : {!r}'.format(results))
$ python3 asyncio_subprocess_coroutine_write.py in to_upper launching process pid 49684 communicating with process waiting for process to complete return code 0 Original: '\nThis message will be converted\nto all caps.\n' Changed : '\nTHIS MESSAGE WILL BE CONVERTED\nTO ALL CAPS.\n'
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 进程:进程生命周期
- Python 知识巩固:通过主进程带起多个子进程实现多进程执行逻辑
- Python 中子进程与父进程
- 什么是僵尸进程,如何找到并杀掉僵尸进程?
- iOS 12 人机交互指南:交互(User Interaction)
- 生活NLP云服务“玩秘”站稳人机交互2.0语音交互场景
Donald E.Knuth / 人民邮电出版社 / 2010-10 / 119.00元
《计算机程序设计艺术》系列著作对计算机领域产生了深远的影响。这一系列堪称一项浩大的工程,自1962年开始编写,计划出版7卷,目前已经出版了4卷。《美国科学家》杂志曾将这套书与爱因斯坦的《相对论》等书并列称为20世纪最重要的12本物理学著作。目前Knuth正将毕生精力投入到这部史诗性著作的撰写中。想了解本书最新信息,请访http://www-cs-faculty.stanford.edu/~knut......一起来看看 《计算机程序设计艺术卷1:基本算法(英文版.第3版)》 这本书的介绍吧!