内容简介:在当我们在手机或者熟悉
在 前面的文章中
对 Python
协程的概念和实现做了简单地介绍。为了对 Python
并发编程有更加全面地认识,我也对 Python
线程和进程的概念和相关技术的使用进行了学习,于是有了这篇文字。
0x01 线程与进程
当我们在手机或者 PC
上打开一个应用时,操作系统就会创建一个进程实例,并开始执行进程里的主线程,它有独立的内存空间和数据结构。线程是轻量级的进程。在同一个进程里,多个线程共享内存和数据结构,线程间的通信也更加容易。
0x02 使用线程实现并发
熟悉 Java
编程的同学就会发现 Python
中的线程模型与 Java
非常类似。下文我们将主要使用 Python
中的线程模块 threading
包。(对于低级别的 API
模块 thread
不推荐初学者使用。本文所有代码将使用 Python 3.7
的环境)
threading
要使用线程我们要导入 threading
包,这个包是在 _thread
包(即上文提到的低级别 thread
模块)的基础上封装了许多高级的 API
,在开发中应当首选 threading
包。
常见地,有两种方式构建一个线程:通过 Thread
的构造函数传递一个 callable
对象,或继承 Thread
类并重写 run
方法。
import threading import time def do_in_thread(arg): print('do in thread {}'.format(arg)) time.sleep(2) if __name__ == '__main__': start_time = time.time() t1 = threading.Thread(target=do_in_thread, args=(1,), name='t1') t2 = threading.Thread(target=do_in_thread, args=(2,), name='t2') t1.start() t2.start() # join方法让主线程等待子线程执行完毕 t1.join() t2.join() print("\nduration {} ".format(time.time() - start_time)) # do in thread 1 # do in thread 2 # duration 2.001628875732422 复制代码
还可以通过继承 threading.Thread
类定义线程
import threading import time def do_in_thread(arg): print('do in thread {}'.format(arg)) time.sleep(2) class MyThread(threading.Thread): def __init__(self, arg): super().__init__() self.arg = arg def run(self): start_time = time.time() do_in_thread(self.arg) print("duration {} ".format(time.time() - start_time)) def start_thread_2(): start_time = time.time() print("duration {} ".format(time.time() - start_time)) if __name__ == '__main__': mt1 = MyThread(3) mt2 = MyThread(4) mt1.start() mt2.start() # join方法让主线程等待子线程执行完毕 mt1.join() mt2.join() # do in thread 3 # do in thread 4 # duration 2.004937171936035 复制代码
join
方法的作用是 让调用它的线程等待其执行完毕。
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 复制代码
定义线程时可以通过指定构造方法的 name
参数设置线程名称。
target
用于指定 callable
对象,将在 run
方法中被调用。
args
设置 target
对象被调用时的参数,类型是元组 ()
,例如上文中的 do_in_thread(arg)
方法的参数。
kwargs
是一个字典类型的参数,也用于 target
对象的参数。
daemon
设置守护线程的标识,如果设置为 True
那么这个线程就是守护线程,此时如果主线程结束了,那么守护线程也会立即被杀死。所以当有在守护线程中打开文件、数据库等资源操作时,释放资源就有可能出错。
线程池
程序中若有大量的线程创建和销毁,则对性能影响较大。我们可以使用 线程池
。同样地,它的 API
与 Java
极为相似。
Executor
concurrent.futures.Executor 复制代码
这是一个抽象类,定义了线程池的接口。
-
submit(fn, *args, **kwargs)
执行fn(args,kwargs) 并会返回一个future
对象,通过future
可获取到执行结果 -
map(func, *iterables, timeout=None, chunksize=1)
这个方法与map(func,*iterables)
类似 -
shutdown(wait=True)
关闭线程池
from concurrent.futures import ThreadPoolExecutor # 使用max_workers参数指定线程池中线程的最大数量为2 with ThreadPoolExecutor(max_workers=2) as executor: # 提交任务到线程池 future = executor.submit(pow, 2, 31) # 计算2^31 future2 = executor.submit(pow, 1024, 2) # 使用future 获取执行结果 print(future.result()) print(future2.result()) # 执行结果 # 2147483648 # 1048576 复制代码
同步
若有多个线程对同一个资源或内存进行访问或操作就有会产生竞争条件。
Python
提供了 锁、信号量、条件和事件等
同步原语可帮忙我们实现线程的同步机制。
Lock
Lock
有两种状态: locked
和 unlocked
。它有两个基本方法: acquire()
和 release()
,且都是原子操作的。
一个线程通过 acquire()
获取到锁, Lock
的状态变成 locked
,而其它线程调用 acquire()
时只能等待锁被释放。 当线程调用了 release()
时 Lock
的状态就变成了 unlocked
,此时其它等待线程中只有一个线程将获得锁。
import threading share_mem_lock = 0 share_mem = 0 count = 1000000 locker = threading.Lock() def add_in_thread_with_lock(): global share_mem_lock for i in range(count): locker.acquire() share_mem_lock += 1 locker.release() def minus_in_thread_with_lock(): global share_mem_lock for i in range(count): locker.acquire() share_mem_lock -= 1 locker.release() def add_in_thread(): global share_mem for i in range(count): share_mem += 1 def minus_in_thread(): global share_mem for i in range(count): share_mem -= 1 if __name__ == '__main__': t1 = threading.Thread(target=add_in_thread_with_lock) t2 = threading.Thread(target=minus_in_thread_with_lock) t3 = threading.Thread(target=add_in_thread) t4 = threading.Thread(target=minus_in_thread) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join() print("share_mem_lock : ", share_mem_lock) print("share_mem : ", share_mem) # 执行结果 # share_mem_lock : 0 # share_mem : 51306 复制代码
没有使用锁机制的代码执行后最后的值很有可能就不为0。而有锁的代码则可以保证同步。
RLock
RLock
即 Reentrant Lock
,就是可以重复进入的锁,也叫 递归锁
。它有3个特点:
- 谁获取锁谁释放。如A线程获取了锁,那么只有A线程才能释放该锁
-
同一线程可重复多次获取该锁。即可以调用
acquire
多次 -
acquire
多少次,对应release
就多少次,且最后一次release
才会释放锁。
Condition
条件是另一种同步原语机制。其实它的内部是封装了 RLock
,它的 acquire()
和 release()
方法就是 RLock
的方法。
Condition
常用的 API
还有 wait()
、 notify()
和 notify_all()
方法。 wait()
方法会释放锁,然后进入阻塞状态直到其它线程通过 notify()
或 notify_all()
唤醒自己。 wait()
方法重新获取到锁就会返回。
notify()
会唤醒其中一个等待的线程,而 notify_all()
会唤醒所有等待的线程。
需要注意的是 notify()
或 notify_all()
执行后并不会释放锁,只有调用了 release()
方法后锁才会释放。
让我们看一个来自于《Python并行编程手册》中的一个 生产者与消费者 例子
from threading import Thread, Condition import time items = [] condition = Condition() class consumer(Thread): def __init__(self): Thread.__init__(self) def consume(self): global condition global items # 获取锁 condition.acquire() if len(items) == 0: # 当items为空时,释放了锁,并等待生产者notify condition.wait() print("Consumer notify : no item to consume") # 开始消费 items.pop() print("Consumer notify : consumed 1 item") print("Consumer notify : items to consume are " + str(len(items))) # 消费之后notify唤醒生产者,因为notify不会释放锁,所以还要调用release释放锁 condition.notify() condition.release() def run(self): for i in range(0, 10): time.sleep(2) self.consume() class producer(Thread): def __init__(self): Thread.__init__(self) def produce(self): global condition global items condition.acquire() if len(items) == 5: # 若items时满的,则执行wait,释放锁,并等待消费者notify condition.wait() print("Producer notify : items producted are " + str(len(items))) print("Producer notify : stop the production!!") # 开始生产 items.append(1) print("Producer notify : total items producted " + str(len(items))) # 生产后notify消费者,因为notify不会释放锁,所以还执行了release释放锁。 condition.notify() condition.release() def run(self): for i in range(0, 10): time.sleep(1) self.produce() if __name__ == "__main__": producer = producer() consumer = consumer() producer.start() consumer.start() producer.join() consumer.join() 复制代码
Semaphore
信号量内部维护一个计数器。 acquire()
会减少这个计数, release()
会增加这个计数,这个计数器永远不会小于0。当计数器等于0时, acquire()
方法就会等待其它线程调用 release()
。
还是借助一个 生产者与消费者 的例子来理解
# -*- coding: utf-8 -*- """Using a Semaphore to synchronize threads""" import threading import time import random # 默认情况内部计数为1,这里设置为0。 # 若设置为负数则会抛出ValueError semaphore = threading.Semaphore(0) def consumer(): print("consumer is waiting.") # 获取一个信号量,因为初始化时内部计数设置为0,所以这里一开始时是处于等待状态 semaphore.acquire() # 开始消费 print("Consumer notify : consumed item number %s " % item) def producer(): global item time.sleep(2) # create a random item item = random.randint(0, 1000) # 开始生产 print("producer notify : produced item number %s" % item) # 释放信号量, 内部计数器+1。当有等待的线程发现计数器大于0时,就会唤醒并从acquire方法中返回 semaphore.release() if __name__ == '__main__': for i in range(0, 5): t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join() print("program terminated") 复制代码
信号量经常会用于资源容量确定的场景,比如数据库连接池等。
Event
事件在线程间的通信方式非常简单。一个线程发送事件另一个线程等待接收。
Event
对象内部维护了一个 bool
变量 flag
。通过 set()
方法设置该变量为 True
, clear()
方法设置 flag
为 False
。 wait()
方法会一直等待直到 flag
变成 True
结合例子
# -*- coding: utf-8 -*- import time from threading import Thread, Event import random items = [] event = Event() class consumer(Thread): def __init__(self, items, event): Thread.__init__(self) self.items = items self.event = event def run(self): while True: time.sleep(2) # 等待事件 self.event.wait() # 开始消费 item = self.items.pop() print('Consumer notify : %d popped from list by %s' % (item, self.name)) class producer(Thread): def __init__(self, integers, event): Thread.__init__(self) self.items = items self.event = event def run(self): global item while True: time.sleep(2) # 开始生产 item = random.randint(0, 256) self.items.append(item) print('Producer notify : item N° %d appended to list by %s' % (item, self.name)) print('Producer notify : event set by %s' % self.name) # 发送事件通知消费者消费 self.event.set() print('Produce notify : event cleared by %s ' % self.name) # 设置事件内部变量为False,随后消费者线程调用wait()方法时,进入阻塞状态 self.event.clear() if __name__ == '__main__': t1 = producer(items, event) t2 = consumer(items, event) t1.start() t2.start() t1.join() t2.join() 复制代码
Timer
定时器 Timer
是 Thread
的子类。用于处理定时执行的任务。启动定时器使用 start()
,取消定时器使用 cancel()
。
from threading import Timer def hello(): print("hello, world") t = Timer(3.0, hello) t.start() # 3秒后 打印 "hello, world" 复制代码
with语法
Lock
、 RLock
、 Condition
和 Semaphore
可以使用 with
语法。
这几个对象都实现拥有 acquire()
和 release()
方法,且都实现了上下文管理协议。
with some_lock: # do something... 复制代码
等价于
some_lock.acquire() try: # do something... finally: some_lock.release() 复制代码
0x03 小结
本文主要介绍 Python
中线程的使用,主要是对 threading
模块中 Thread
对象、线程池 Executor
常见用法的展示。还了解了线程的同步原语 Lock
、 RLock
、 Condition
、 Semaphore
、 Event
以及 Timer
等 API
的使用。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- ConcurrentHashMap 并发扩容骚操作,你了解吗?
- 快速了解Python并发编程的工程实现(下)
- 如果你到现在还不了解 Go 并发模型
- Java并发基础:了解无锁CAS就从源码分析
- 融云开发漫谈:你是否了解Go语言并发编程的第一要义?标题文章
- Java并发系列—并发编程基础
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。