快速了解Python并发编程的工程实现(上)

栏目: Python · 发布时间: 5年前

内容简介:在当我们在手机或者熟悉

前面的文章中Python 协程的概念和实现做了简单地介绍。为了对 Python 并发编程有更加全面地认识,我也对 Python 线程和进程的概念和相关技术的使用进行了学习,于是有了这篇文字。

0x01 线程与进程

当我们在手机或者 PC 上打开一个应用时,操作系统就会创建一个进程实例,并开始执行进程里的主线程,它有独立的内存空间和数据结构。线程是轻量级的进程。在同一个进程里,多个线程共享内存和数据结构,线程间的通信也更加容易。

0x02 使用线程实现并发

熟悉 Java 编程的同学就会发现 Python 中的线程模型与 Java 非常类似。下文我们将主要使用 Python 中的线程模块 threading 包。(对于低级别的 API 模块 thread 不推荐初学者使用。本文所有代码将使用 Python 3.7 的环境)

threading

要使用线程我们要导入 threading 包,这个包是在 _thread 包(即上文提到的低级别 thread 模块)的基础上封装了许多高级的 API ,在开发中应当首选 threading 包。

常见地,有两种方式构建一个线程:通过 Thread 的构造函数传递一个 callable 对象,或继承 Thread 类并重写 run 方法。

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)


if __name__ == '__main__':
    start_time = time.time()
    
    t1 = threading.Thread(target=do_in_thread, args=(1,), name='t1')
    t2 = threading.Thread(target=do_in_thread, args=(2,), name='t2')

    t1.start()
    t2.start()
    
    # join方法让主线程等待子线程执行完毕
    t1.join()
    t2.join()
    print("\nduration {} ".format(time.time() - start_time))
    
# do in thread 1
# do in thread 2
# duration 2.001628875732422 

复制代码

还可以通过继承 threading.Thread 类定义线程

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)

    
class MyThread(threading.Thread):

    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):
        start_time = time.time()
        do_in_thread(self.arg)
        print("duration {} ".format(time.time() - start_time))


def start_thread_2():
    start_time = time.time()

    print("duration {} ".format(time.time() - start_time))


if __name__ == '__main__':
    mt1 = MyThread(3)
    mt2 = MyThread(4)

    mt1.start()
    mt2.start()
    
    # join方法让主线程等待子线程执行完毕
    mt1.join()
    mt2.join() 
    
    
# do in thread 3
# do in thread 4
# duration 2.004937171936035 
复制代码

join 方法的作用是 让调用它的线程等待其执行完毕。

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
复制代码

定义线程时可以通过指定构造方法的 name 参数设置线程名称。

target 用于指定 callable 对象,将在 run 方法中被调用。

args 设置 target 对象被调用时的参数,类型是元组 () ,例如上文中的 do_in_thread(arg) 方法的参数。

kwargs 是一个字典类型的参数,也用于 target 对象的参数。

daemon 设置守护线程的标识,如果设置为 True 那么这个线程就是守护线程,此时如果主线程结束了,那么守护线程也会立即被杀死。所以当有在守护线程中打开文件、数据库等资源操作时,释放资源就有可能出错。

线程池

程序中若有大量的线程创建和销毁,则对性能影响较大。我们可以使用 线程池 。同样地,它的 APIJava 极为相似。

Executor

concurrent.futures.Executor
复制代码

这是一个抽象类,定义了线程池的接口。

  • submit(fn, *args, **kwargs)
    执行fn(args,kwargs) 并会返回一个 future 对象,通过 future 可获取到执行结果
  • map(func, *iterables, timeout=None, chunksize=1)
    这个方法与 map(func,*iterables) 类似
  • shutdown(wait=True)
    关闭线程池
from concurrent.futures import ThreadPoolExecutor
# 使用max_workers参数指定线程池中线程的最大数量为2
with ThreadPoolExecutor(max_workers=2) as executor:
    # 提交任务到线程池
    future = executor.submit(pow, 2, 31) # 计算2^31
    future2 = executor.submit(pow, 1024, 2)
    # 使用future 获取执行结果
    print(future.result())
    print(future2.result())

# 执行结果
# 2147483648
# 1048576
复制代码

同步

若有多个线程对同一个资源或内存进行访问或操作就有会产生竞争条件。

Python 提供了 锁、信号量、条件和事件等 同步原语可帮忙我们实现线程的同步机制。

Lock

Lock 有两种状态: lockedunlocked 。它有两个基本方法: acquire()release() ,且都是原子操作的。

一个线程通过 acquire() 获取到锁, Lock 的状态变成 locked ,而其它线程调用 acquire() 时只能等待锁被释放。 当线程调用了 release()Lock 的状态就变成了 unlocked ,此时其它等待线程中只有一个线程将获得锁。

import threading

share_mem_lock = 0
share_mem = 0
count = 1000000

locker = threading.Lock()


def add_in_thread_with_lock():
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock += 1
        locker.release()


def minus_in_thread_with_lock():
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock -= 1
        locker.release()


def add_in_thread():
    global share_mem

    for i in range(count):
        share_mem += 1


def minus_in_thread():
    global share_mem

    for i in range(count):
        share_mem -= 1


if __name__ == '__main__':
    t1 = threading.Thread(target=add_in_thread_with_lock)
    t2 = threading.Thread(target=minus_in_thread_with_lock)

    t3 = threading.Thread(target=add_in_thread)
    t4 = threading.Thread(target=minus_in_thread)

    t1.start()
    t2.start()

    t3.start()
    t4.start()

    t1.join()
    t2.join()

    t3.join()
    t4.join()

    print("share_mem_lock : ", share_mem_lock)
    print("share_mem : ", share_mem)

# 执行结果
# share_mem_lock :  0
# share_mem :  51306
复制代码

没有使用锁机制的代码执行后最后的值很有可能就不为0。而有锁的代码则可以保证同步。

RLock

RLockReentrant Lock ,就是可以重复进入的锁,也叫 递归锁 。它有3个特点:

  • 谁获取锁谁释放。如A线程获取了锁,那么只有A线程才能释放该锁
  • 同一线程可重复多次获取该锁。即可以调用 acquire 多次
  • acquire 多少次,对应 release 就多少次,且最后一次 release 才会释放锁。

Condition

条件是另一种同步原语机制。其实它的内部是封装了 RLock ,它的 acquire()release() 方法就是 RLock 的方法。

Condition 常用的 API 还有 wait()notify()notify_all() 方法。 wait() 方法会释放锁,然后进入阻塞状态直到其它线程通过 notify()notify_all() 唤醒自己。 wait() 方法重新获取到锁就会返回。

notify() 会唤醒其中一个等待的线程,而 notify_all() 会唤醒所有等待的线程。

需要注意的是 notify()notify_all() 执行后并不会释放锁,只有调用了 release() 方法后锁才会释放。

让我们看一个来自于《Python并行编程手册》中的一个 生产者与消费者 例子

from threading import Thread, Condition
import time

items = []
condition = Condition()


class consumer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def consume(self):
        global condition
        global items
        # 获取锁
        condition.acquire()
        if len(items) == 0:
            # 当items为空时,释放了锁,并等待生产者notify
            condition.wait()
            print("Consumer notify : no item to consume")
        # 开始消费
        items.pop()
        print("Consumer notify : consumed 1 item")
        print("Consumer notify : items to consume are " + str(len(items)))
        # 消费之后notify唤醒生产者,因为notify不会释放锁,所以还要调用release释放锁
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 10):
            time.sleep(2)
            self.consume()


class producer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 5:
            # 若items时满的,则执行wait,释放锁,并等待消费者notify
            condition.wait()
            print("Producer notify : items producted are " + str(len(items)))
            print("Producer notify : stop the production!!")
        # 开始生产
        items.append(1)
        print("Producer notify : total items producted " + str(len(items)))
        # 生产后notify消费者,因为notify不会释放锁,所以还执行了release释放锁。
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 10):
            time.sleep(1)
            self.produce()


if __name__ == "__main__":
    producer = producer()
    consumer = consumer()
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

复制代码

Semaphore

信号量内部维护一个计数器。 acquire() 会减少这个计数, release() 会增加这个计数,这个计数器永远不会小于0。当计数器等于0时, acquire() 方法就会等待其它线程调用 release()

还是借助一个 生产者与消费者 的例子来理解

# -*- coding: utf-8 -*-

"""Using a Semaphore to synchronize threads"""
import threading
import time
import random

# 默认情况内部计数为1,这里设置为0。
# 若设置为负数则会抛出ValueError
semaphore = threading.Semaphore(0)


def consumer():
    print("consumer is waiting.")
    # 获取一个信号量,因为初始化时内部计数设置为0,所以这里一开始时是处于等待状态
    semaphore.acquire()
    # 开始消费
    print("Consumer notify : consumed item number %s " % item)


def producer():
    global item
    time.sleep(2)
    # create a random item
    item = random.randint(0, 1000)
    # 开始生产
    print("producer notify : produced item number %s" % item)
    # 释放信号量, 内部计数器+1。当有等待的线程发现计数器大于0时,就会唤醒并从acquire方法中返回
    semaphore.release()


if __name__ == '__main__':
    for i in range(0, 5):
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    print("program terminated")

复制代码

信号量经常会用于资源容量确定的场景,比如数据库连接池等。

Event

事件在线程间的通信方式非常简单。一个线程发送事件另一个线程等待接收。

Event 对象内部维护了一个 bool 变量 flag 。通过 set() 方法设置该变量为 Trueclear() 方法设置 flagFalsewait() 方法会一直等待直到 flag 变成 True

结合例子

# -*- coding: utf-8 -*-

import time
from threading import Thread, Event
import random

items = []
event = Event()

class consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        while True:
            time.sleep(2)
            # 等待事件
            self.event.wait()
            # 开始消费
            item = self.items.pop()
            print('Consumer notify : %d popped from list by %s' % (item, self.name))


class producer(Thread):
    def __init__(self, integers, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        global item
        while True:
            time.sleep(2)
            # 开始生产
            item = random.randint(0, 256)
            self.items.append(item)
            print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
            print('Producer notify : event set by %s' % self.name)
            # 发送事件通知消费者消费
            self.event.set()
            print('Produce notify : event cleared by %s ' % self.name)
            # 设置事件内部变量为False,随后消费者线程调用wait()方法时,进入阻塞状态
            self.event.clear()


if __name__ == '__main__':
    t1 = producer(items, event)
    t2 = consumer(items, event)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

复制代码

Timer

定时器 TimerThread 的子类。用于处理定时执行的任务。启动定时器使用 start() ,取消定时器使用 cancel()

from threading import Timer

def hello():
    print("hello, world")

t = Timer(3.0, hello)
t.start()  # 3秒后 打印 "hello, world"
复制代码

with语法

LockRLockConditionSemaphore 可以使用 with 语法。 这几个对象都实现拥有 acquire()release() 方法,且都实现了上下文管理协议。

with some_lock:
    # do something...
复制代码

等价于

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()
复制代码

0x03 小结

本文主要介绍 Python 中线程的使用,主要是对 threading 模块中 Thread 对象、线程池 Executor 常见用法的展示。还了解了线程的同步原语 LockRLockConditionSemaphoreEvent 以及 TimerAPI 的使用。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

软件的奥秘

软件的奥秘

[美] V. Anton Spraul / 解福祥 / 人们邮电出版社 / 2017-9-1 / 49

软件已经成为人们日常生活与工作中常见的辅助工具,但是对于软件的工作原理,很多人却不是非常了解。 本书对软件的工作原理进行了解析,让读者对常用软件的工作原理有一个大致的了解。内容涉及数据如何加密、密码如何使用和保护、如何创建计算机图像、如何压缩和存储视频、如何搜索数据、程序如何解决同样的问题而不会引发冲突以及如何找出最佳路径等方面。 本书适合从事软件开发工作的专业技术人员,以及对软件工作......一起来看看 《软件的奥秘》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

MD5 加密
MD5 加密

MD5 加密工具